I have previously posted on stackoverflow, however, I have been directed here.
I'm getting to the stage where I need to analyse lots of flat CSVs:
- Read-in around 500GB of CSVs (e.g. daily data split by month)
- Group the data (e.g. by month or year)
- Output the aggregated data as a small CSV
I wasn't sure whether sqlite was the right package for this (adding CSVs to it appears to take a long time and as in my link above after a certain size I can't access the database) and whether I should consider something else like PostgreSQL or Microsoft SQL Server?
I'm looking to invest in a hardware/software platform for this (e.g. SSD, RAID, Microsoft SQL Server) and was hoping for some information on where to begin.
Particularly, if PostgreSQL is a possibility - is there a similar way to quickly import CSVs like here:
Edit (08/10/15):
I'm testing out uploading the data into a PostgreSQL database and am averaging 16 minutes per 10GB CSV. My issue is that some of my columns are very big so I would have to change them from bigint to varchar, etc. The data has 38 columns and around 50 mill rows per file so figuring out which categorical variables are strings or integers is a real pain.
At the moment I am using:
cur.copy_expert(sql="COPY %s FROM stdin DELIMITERS '~' CSV;", file=f)
, with my data-type mainly being varchar. I did this because the CSV is a bit messy and sometimes what I think is an integer turns out be alphanumeric and I have to re-upload.
Would it be much slower to import as a Pandas data-frame (so that panda takes care of column-type for me) and use that with sqlachemy to insert into Postgres? I'm guessing if PANDAs is a bit slower it will make up for it because the column types will be optimised (since it will decide on that for me).
E.g. something like this:
import pandas as pd
df = pd.read_csv('mypath.csv')
from sqlalchemy import create_engine
engine = create_engine('postgresql://username:password@localhost:5432/dbname')
df.to_sql("table", engine)
My main goal is to optimise this for the group-by command.
Second Edit:
Ahh, I guess best thing is to upload one CSV file using PANDAS and then copy the table structure into a new table which is then filled with the faster COPY command?
4 Answers 4
Frankly, I probably wouldn't use a database in the first place. It doesn't sound like you have any need for ACID, which is one the main things which databases provide and which doesn't come cheap in terms of performance. Nor does it sound like you need complicated indexes.
What I would do (and have done, an awful lot) is just loop over the files with Perl or Python and use their built-in hash table features to generate your aggregates. Since you have specified that the output is small, you should have little problem holding the intermediate aggregation state in memory for the duration.
-
Thank you for your response. That is what I ended up doing -> since the data wasn't linked I was able to load each 10GB file into RAM collapse and repeat. However, I was hoping for a more robust solution (for the future) if for instance: (a) I can't load the individual files into RAM because they are 20GB, (b) I would need to perform joins or any other operation which involves the data being joined. Which is why I though it would be nicer to have all the data in full sitting in a database which I can then cut-up / collapse / join / etc.Ilia– Ilia2015年10月07日 15:42:37 +00:00Commented Oct 7, 2015 at 15:42
-
Perl works very well for streaming files, you don't have to (and usually should not) read it all into RAM. If you do want to load it to database,
PostgreSQL
has the COPY command which can read csv files. You have to careful around how NULLs are handled, and how escapes and embedded newlines are handled (there is not a single CSV specification which defines these things, so you can find quirks in various data sets).jjanes– jjanes2015年10月07日 19:32:55 +00:00Commented Oct 7, 2015 at 19:32 -
Thanks, I'm testing out uploading the data into a PostgreSQL database and am averaging 16 minutes per 10GB CSV. My issue is that some of my columns are very big so I would have to change them from bigint to varchar, etc. At the moment I am using "COPY %s FROM stdin DELIMITERS '~' CSV;" with my data-type mainly being varchar. Would it be much sloer to import as Pandas data-frame and use that with sqlachemy to insert into Postgres? I'm guessing if PANDAs is a bit slower it will make up for it because the column types will be optimised. My main goal is to optimise this for the group-by command.Ilia– Ilia2015年10月07日 23:27:36 +00:00Commented Oct 7, 2015 at 23:27
-
Sorry, I don't anything about Panda or sqlalchemy. I would think sqlalchemy, just based on the name, should be able hook into the postgresql COPY methodjjanes– jjanes2015年10月08日 14:49:14 +00:00Commented Oct 8, 2015 at 14:49
Use a data integration (ETL) tool like Pentaho Data Integration (PDI). You can either do everything within PDI or upload it to Postgres for additional processing. PDI is free, open source, and I use it daily.
Aggregating is a common process associated w/ ETL tools, typically transforming data to a higher grain for additional processing. Which is sounds like what you're doing. Plus, this tool will allow you to develop quickly and handle errors more readily.
In situations like these, there is no reason to write code!
Well, thanks to the all help -> I was able to painlessly create a postgreSQL database quite easily using the below code.
However, it hasn't really helped because it still takes days and days to perform the group by.
I'm not really sure of a faster way of doing this; would creating 20 indexes first help? It seems the fastest way to collapse is to keep the data in its disaggregated form.
def call_robocopy(from_folder='',
to_folder='',
my_log='H:/robocopy_log.txt'):
"""
Copy files to working directory
robocopy <Source> <Destination> [<File>[ ...]] [<Options>]
"""
if os.path.isdir(from_folder) & \
os.path.isdir(to_folder):
call(["robocopy", from_folder, to_folder, "/LOG:%s" % my_log])
else:
print("Paths not entered correctly")
def pandas_temp_table(path_2_csv='',
tmp_table='',):
"""
Upload data to a temporary table first using PANDAs to identify optimal data-types for columns
PANDAS is not speed-efficient as it uses INSERT commands rather than COPY e.g. it took COPY 16mins average
to get a 15GB CSV into the database (door-to-door) whereas pandas.to_sql took 50mins
"""
# Pandas can use sqlalchemy engine
engine = create_engine('postgresql://%s:%s@localhost:5432/%s' %(myusername, mypassword, mydatabase))
if engine:
print('Connected: %s' % engine)
else:
print('Connection lost')
sys.exit(1)
tmp_table += '_temp'
counter = 0
start_time = time.time()
for i in os.listdir(path_2_csv):
# Cycle through all CSVs and upload a small chunk to make sure everything is OK
if counter < 1:
if i.endswith(".csv") & i.startswith("100_pct"):
print("Reading CSV: %s into PANDAs data-frame" % i)
# First 1,000,000 rows
#df = pd.read_csv(os.path.join(path_2_csv, i), nrows=1000000, header=None, sep='~') #sep=None; automatically find by sniffing
# Upload whole file
df = pd.read_csv(os.path.join(path_2_csv, i), header=None, sep='~') #sep=None; automatically find by sniffing
df.columns = [
.. around 30 columns ..
]
print("CSV read-in successfully")
print(df.shape)
print("Uploading %s to SQL Table: %s" % (i, tmp_table))
df.to_sql(tmp_table, engine, if_exists='append', index=False)
counter += 1
current_speed = ((time.time()-start_time)/60)/counter
print("Average speed is %.2f minutes per database" % current_speed)
print("Successfully uploaded: %d" % counter)
end_time = time.time()
print("Total duration of INSERT: %.2f minutes" % (end_time - start_time)/60)
def create_postgresql_table(my_table=''):
"""
Create table copying the structure of the temp table created using pandas
Timer to benchmark
"""
# Connect
con = psycopg2.connect(database=mydatabase, user=myusername, password=mypassword)
cur = con.cursor()
if con:
print('Connected: %s' % con)
else:
print('Connection lost')
sys.exit(1)
try:
# Check if table exists already
cur.execute("""
SELECT relname FROM pg_class WHERE relname = '{0}';
""".format(my_table))
table_test = cur.fetchone()[0]
except Exception as e:
print('Table %s does not exist' % my_table)
table_test = None
if table_test:
print('%s already exists' % mytable)
else:
print('Creating table: %s' % mytable)
try:
# Copy structure and no data (1=2 is false)
cur.execute("""
CREATE TABLE {0} AS SELECT * FROM {1} WHERE 1=2;
""".format(my_table, my_table+'_temp'))
con.commit()
print('Table created successfully')
except psycopg2.DatabaseError as e:
if con:
con.rollback()
print('Error %s' % e)
sys.exit(1)
con.close()
def copy_csv_to_table(path_2_csv='',
my_table=''):
"""
Use the PostgreSQL COPY command to bulk-copy the CSVs into the newly created table
"""
# Connect
con = psycopg2.connect(database=mydatabase, user=myusername, password=mypassword)
cur = con.cursor()
if con:
print('Connected: %s' % con)
else:
print('Connection lost')
sys.exit(1)
copy_sql = """
COPY %s FROM stdin DELIMITERS '~' CSV;
""" % my_table
counter = 0
start_time = time.time()
for i in os.listdir(path_2_csv):
if i.endswith(".csv") & i.startswith("100_pct"):
print("Uploading %s to %s" % (i, mytable))
with open(os.path.join(path_2_csv, i), 'r') as f:
cur.copy_expert(sql=copy_sql, file=f)
con.commit()
counter += 1
print("Successfully uploaded %d CSVs" % counter)
current_speed = ((time.time()-start_time)/60)/counter
print("Average speed is %.2f minutes per database" % current_speed)
con.close()
end_time = time.time()
print("Total duration of COPY: %.2f minutes" % (end_time - start_time)/60)
def sql_query_to_csv(my_table='',
csv_out=''):
"""
Submit query to created PostgreSQL database and output results to a CSV
"""
# Connect
con = psycopg2.connect(database=mydatabase, user=myusername, password=mypassword)
cur = con.cursor()
if con:
print('Connected: %s' % con)
else:
print('Connection lost')
sys.exit(1)
start_time = time.time()
my_query = """
SELECT
SUM("A"),
SUM("B"),
SUM("C"),
COUNT(1) AS "D",
EXTRACT(YEAR FROM "PURCHASE_DATE"::text::date) AS "YEAR",
EXTRACT(MONTH FROM "PURCHASE_DATE"::text::date) AS "MONTH",
.. 20 more columns ...
FROM {0}
GROUP BY
.. 20 columns ...
""".format(my_table)
start_time = time.time()
output_query = "COPY ({0}) TO STDOUT WITH CSV HEADER".format(my_query)
with open(csv_out, 'w') as f:
cur.copy_expert(output_query, f)
print("Successfully submitted results to: %s" % csv_out)
con.close()
end_time = time.time()
print("Total duration of Query: %.2f minutes" % (end_time - start_time)/60)
In today's standard, DuckDB is probably one of the fastest ETL to use as an efficient ingestion tool for large CSV files, whether you want to do some further analytics in SQL and save it to a CSV/parquet, to another database or even on a cloud storage (AWS or Azure).
The processing can process huge files even on modest hardware as explained on those two articles
No Memory? No Problem. External Aggregation in DuckDB https://duckdb.org/2024/03/29/external-aggregation.html
Memory Management in DuckDB https://duckdb.org/2024/07/09/memory-management.html
bcp
iscopy
: postgresql.org/docs/current/static/sql-copy.html (which processes text file located on the server). Another alternative is pgloader: pgloader.io