Load and Save Data with Dask DataFrames#
You can create a Dask DataFrame from various data storage formats like CSV, HDF, Apache Parquet, and others. For most formats, this data can live on various storage systems including local disk, network file systems (NFS), the Hadoop Distributed File System (HDFS), Google Cloud Storage, and Amazon S3 (excepting HDF, which is only available on POSIX like file systems).
See the DataFrame overview page for more on
dask.dataframe scope, use, and limitations and
DataFrame Best Practices for more tips
and solutions to common problems.
API#
The following functions provide access to convert between Dask DataFrames, file formats, and other Dask or Python collections.
File Formats:
read_csv(urlpath[, blocksize, ...])
Read CSV files into a Dask.DataFrame
read_parquet([path, columns, filters, ...])
Read a Parquet file into a Dask DataFrame
read_hdf(pattern, key[, start, stop, ...])
Read HDF files into a Dask DataFrame
read_orc(path[, engine, columns, index, ...])
Read dataframe from ORC file(s)
read_json(url_path[, orient, lines, ...])
Create a dataframe from a set of JSON files
read_sql_table(table_name, con, index_col[, ...])
Read SQL database table into a DataFrame.
read_sql_query(sql, con, index_col[, ...])
Read SQL query into a DataFrame.
read_sql(sql, con, index_col, **kwargs)
Read SQL query or database table into a DataFrame.
read_table(urlpath[, blocksize, ...])
Read delimited files into a Dask.DataFrame
read_fwf(urlpath[, blocksize, ...])
Read fixed-width files into a Dask.DataFrame
from_array(arr[, chunksize, columns, meta])
Read any sliceable array into a Dask Dataframe
to_csv(df, filename[, single_file, ...])
Store Dask DataFrame to CSV files
to_parquet(df, path[, compression, ...])
Store Dask.dataframe to Parquet files
to_hdf(df, path, key[, mode, append, ...])
Store Dask Dataframe to Hierarchical Data Format (HDF) files
to_sql(df, name, uri[, schema, if_exists, ...])
Store Dask Dataframe to a SQL table
Dask Collections:
from_delayed(dfs[, meta, divisions, prefix, ...])
Create Dask DataFrame from many Dask Delayed objects
from_dask_array(x[, columns, index, meta])
Create a Dask DataFrame from a Dask Array.
from_map(func, *iterables[, args, meta, ...])
Create a DataFrame collection from a custom function map.
dask.bag.core.Bag.to_dataframe([meta, ...])
Create Dask Dataframe from a Dask Bag.
DataFrame.to_delayed([optimize_graph])
Convert into a list of dask.delayed objects, one per partition.
to_records(df)
Create Dask Array from a Dask Dataframe
to_bag(df[, index, format])
Create Dask Bag from a Dask DataFrame
Pandas:
from_pandas(data[, npartitions, sort, chunksize])
Construct a Dask DataFrame from a Pandas DataFrame
DataFrame.from_dict(data, *[, npartitions, ...])
Construct a Dask DataFrame from a Python Dictionary
Other File Formats:
Creating#
Read from CSV#
You can use read_csv() to read one or more CSV files into a Dask DataFrame.
It supports loading multiple files at once using globstrings:
>>> df = dd.read_csv('myfiles.*.csv')
You can break up a single large file with the blocksize parameter:
>>> df = dd.read_csv('largefile.csv', blocksize=25e6) # 25MB chunks
Changing the blocksize parameter will change the number of partitions (see the explanation on
partitions). A good rule of thumb when working with
Dask DataFrames is to keep your partitions under 100MB in size.
Read from Parquet#
Similarly, you can use read_parquet() for reading one or more Parquet files.
You can read in a single Parquet file:
>>> df = dd.read_parquet("path/to/mydata.parquet")
Or a directory of local Parquet files:
>>> df = dd.read_parquet("path/to/my/parquet/")
For more details on working with Parquet files, including tips and best practices, see the documentation on Dask Dataframe and Parquet.
Read from cloud storage#
Dask can read data from a variety of data stores including cloud object stores.
You can do this by prepending a protocol like s3:// to paths
used in common data access functions like dd.read_csv:
>>> df = dd.read_csv('s3://bucket/path/to/data-*.csv') >>> df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')
For remote systems like Amazon S3 or Google Cloud Storage, you may need to provide credentials.
These are usually stored in a configuration file, but in some cases you may want to pass
storage-specific options through to the storage backend.
You can do this with the storage_options parameter:
>>> df = dd.read_csv('s3://bucket-name/my-data-*.csv', ... storage_options={'anon': True}) >>> df = dd.read_parquet('gs://dask-nyc-taxi/yellowtrip.parquet', ... storage_options={'token': 'anon'})
See the documentation on connecting to Amazon S3 or Google Cloud Storage.
Mapping from a function#
For cases that are not covered by the functions above, but can be
captured by a simple map operation, from_map() is likely to be
the most convenient means for DataFrame creation. For example, this
API can be used to convert an arbitrary PyArrow Dataset object into a
DataFrame collection by mapping fragments to DataFrame partitions:
>>> importpyarrow.datasetasds >>> dataset = ds.dataset("hive_data_path", format="orc", partitioning="hive") >>> fragments = dataset.get_fragments() >>> func = lambda frag: frag.to_table().to_pandas() >>> df = dd.from_map(func, fragments)
Dask Delayed#
Dask delayed is particularly useful when simple map operations aren’t sufficient to capture the complexity of your data layout. It lets you construct Dask DataFrames out of arbitrary Python function calls, which can be
helpful to handle custom data formats or bake in particular logic around loading data.
See the documentation on using dask.delayed with collections.
Storing#
Writing files locally#
You can save files locally, assuming each worker can access the same file system. The workers could be located on the same machine, or a network file system can be mounted and referenced at the same path location for every worker node. See the documentation on accessing data locally.
Writing to remote locations#
Dask can write to a variety of data stores including cloud object stores.
For example, you can write a dask.dataframe to an Azure storage blob as:
>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]} >>> df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2) >>> dd.to_parquet(df=df, ... path='abfs://CONTAINER/FILE.parquet' ... storage_options={'account_name': 'ACCOUNT_NAME', ... 'account_key': 'ACCOUNT_KEY'}