Create Dask Arrays#

You can load or store Dask arrays from a variety of common sources like HDF5, NetCDF, Zarr, or any format that supports NumPy-style slicing.

from_array(x[, chunks, name, lock, asarray, ...])

Create dask array from something that looks like an array.

from_delayed(value, shape[, dtype, meta, name])

Create a dask array from a dask delayed value

from_npy_stack(dirname[, mmap_mode])

Load dask array from stack of npy files

from_zarr(url[, component, storage_options, ...])

Load array from the zarr storage format

stack(seq[, axis, allow_unknown_chunksizes])

Stack arrays along a new axis

concatenate(seq[, axis, ...])

Concatenate arrays along an existing axis

NumPy Slicing#

from_array(x[, chunks, name, lock, asarray, ...])

Create dask array from something that looks like an array.

Many storage formats have Python projects that expose storage using NumPy slicing syntax. These include HDF5, NetCDF, BColz, Zarr, GRIB, etc. For example, we can load a Dask array from an HDF5 file using h5py:

>>> importh5py
>>> f = h5py.File('myfile.hdf5') # HDF5 file
>>> d = f['/data/path'] # Pointer on on-disk array
>>> d.shape # d can be very large
(1000000, 1000000)
>>> x = d[:5, :5] # We slice to get numpy arrays

Given an object like d above that has dtype and shape properties and that supports NumPy style slicing, we can construct a lazy Dask array:

>>> importdask.arrayasda
>>> x = da.from_array(d, chunks=(1000, 1000))

This process is entirely lazy. Neither creating the h5py object nor wrapping it with da.from_array have loaded any data.

Random Data#

For experimentation or benchmarking it is common to create arrays of random data. The dask.array.random module implements most of the functions in the numpy.random module. We list some common functions below but for a full list see the Array API:

random.binomial(*args, **kwargs)

Draw samples from a binomial distribution.

random.normal(*args, **kwargs)

Draw random samples from a normal (Gaussian) distribution.

random.poisson(*args, **kwargs)

Draw samples from a Poisson distribution.

random.random(*args, **kwargs)

Return random floats in the half-open interval [0.0, 1.0).

>>> importdask.arrayasda
>>> rng = da.random.default_rng()
>>> x = rng.random((10000, 10000), chunks=(1000, 1000))

Concatenation and Stacking#

stack(seq[, axis, allow_unknown_chunksizes])

Stack arrays along a new axis

concatenate(seq[, axis, ...])

Concatenate arrays along an existing axis

Often we store data in several different locations and want to stitch them together:

dask_arrays = []
for fn in filenames:
 f = h5py.File(fn)
 d = f['/data']
 array = da.from_array(d, chunks=(1000, 1000))
 dask_arrays.append(array)
x = da.concatenate(dask_arrays, axis=0) # concatenate arrays along first axis

For more information, see concatenation and stacking docs.

Using dask.delayed#

from_delayed(value, shape[, dtype, meta, name])

Create a dask array from a dask delayed value

stack(seq[, axis, allow_unknown_chunksizes])

Stack arrays along a new axis

concatenate(seq[, axis, ...])

Concatenate arrays along an existing axis

Sometimes NumPy-style data resides in formats that do not support NumPy-style slicing. We can still construct Dask arrays around this data if we have a Python function that can generate pieces of the full array if we use dask.delayed. Dask delayed lets us delay a single function call that would create a NumPy array. We can then wrap this delayed object with da.from_delayed, providing a dtype and shape to produce a single-chunked Dask array. Furthermore, we can use stack or concatenate from before to construct a larger lazy array.

As an example, consider loading a stack of images using skimage.io.imread:

importskimage.io
importdask.arrayasda
importdask
imread = dask.delayed(skimage.io.imread, pure=True) # Lazy version of imread
filenames = sorted(glob.glob('*.jpg'))
lazy_images = [imread(path) for path in filenames] # Lazily evaluate imread on each path
sample = lazy_images[0].compute() # load the first image (assume rest are same shape/dtype)
arrays = [da.from_delayed(lazy_image, # Construct a small Dask array
 dtype=sample.dtype, # for every lazy value
 shape=sample.shape)
 for lazy_image in lazy_images]
stack = da.stack(arrays, axis=0) # Stack all small Dask arrays into one

See documentation on using dask.delayed with collections.

Often it is substantially faster to use da.map_blocks rather than da.stack

importglob
importskimage.io
importnumpyasnp
importdask.arrayasda
filenames = sorted(glob.glob('*.jpg'))
defread_one_image(block_id, filenames=filenames, axis=0):
 # a function that reads in one chunk of data
 path = filenames[block_id[axis]]
 image = skimage.io.imread(path)
 return np.expand_dims(image, axis=axis)
# load the first image (assume rest are same shape/dtype)
sample = skimage.io.imread(filenames[0])
stack = da.map_blocks(
 read_one_image,
 dtype=sample.dtype,
 chunks=((1,) * len(filenames), *sample.shape)
)

From Dask DataFrame#

There are several ways to create a Dask array from a Dask DataFrame. Dask DataFrames have a to_dask_array method:

>>> df = dask.dataframes.from_pandas(...)
>>> df.to_dask_array()
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

This mirrors the to_numpy function in Pandas. The values attribute is also supported:

>>> df.values
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

However, these arrays do not have known chunk sizes because dask.dataframe does not track the number of rows in each partition. This means that some operations like slicing will not operate correctly.

The chunk sizes can be computed:

>>> df.to_dask_array(lengths=True)
dask.array<array, shape=(100, 3), dtype=float64, chunksize=(50, 3), chunktype=numpy.ndarray>

Specifying lengths=True triggers immediate computation of the chunk sizes. This enables downstream computations that rely on having known chunk sizes (e.g., slicing).

The Dask DataFrame to_records method also returns a Dask Array, but does not compute the shape information:

>>> df.to_records()
dask.array<to_records, shape=(nan,), dtype=(numpy.record, [('index', '<i8'), ('x', '<f8'), ('y', '<f8'), ('z', '<f8')]), chunksize=(nan,), chunktype=numpy.ndarray>

If you have a function that converts a Pandas DataFrame into a NumPy array, then calling map_partitions with that function on a Dask DataFrame will produce a Dask array:

>>> df.map_partitions(np.asarray)
dask.array<asarray, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>

Interactions with NumPy arrays#

Dask array operations will automatically convert NumPy arrays into single-chunk dask arrays:

>>> x = da.sum(np.ones(5))
>>> x.compute()
5

When NumPy and Dask arrays interact, the result will be a Dask array. Automatic rechunking rules will generally slice the NumPy array into the appropriate Dask chunk shape:

>>> x = da.ones(10, chunks=(5,))
>>> y = np.ones(10)
>>> z = x + y
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray>

These interactions work not just for NumPy arrays but for any object that has shape and dtype attributes and implements NumPy slicing syntax.

Memory mapping#

Memory mapping can be a highly effective method to access raw binary data since it has nearly zero overhead if the data is already in the file system cache. For the threaded scheduler, creating a Dask array from a raw binary file can be as simple as a = da.from_array(np.memmap(filename, shape=shape, dtype=dtype, mode='r')).

For multiprocessing or distributed schedulers, the memory map for each array chunk should be created on the correct worker process and not on the main process to avoid data transfer through the cluster. This can be achieved by wrapping the function that creates the memory map using dask.delayed.

importnumpyasnp
importdask
importdask.arrayasda
defmmap_load_chunk(filename, shape, dtype, offset, sl):
'''
 Memory map the given file with overall shape and dtype and return a slice
 specified by :code:`sl`.
 Parameters
 ----------
 filename : str
 shape : tuple
 Total shape of the data in the file
 dtype:
 NumPy dtype of the data in the file
 offset : int
 Skip :code:`offset` bytes from the beginning of the file.
 sl:
 Object that can be used for indexing or slicing a NumPy array to
 extract a chunk
 Returns
 -------
 numpy.memmap or numpy.ndarray
 View into memory map created by indexing with :code:`sl`,
 or NumPy ndarray in case no view can be created using :code:`sl`.
 '''
 data = np.memmap(filename, mode='r', shape=shape, dtype=dtype, offset=offset)
 return data[sl]
defmmap_dask_array(filename, shape, dtype, offset=0, blocksize=5):
'''
 Create a Dask array from raw binary data in :code:`filename`
 by memory mapping.
 This method is particularly effective if the file is already
 in the file system cache and if arbitrary smaller subsets are
 to be extracted from the Dask array without optimizing its
 chunking scheme.
 It may perform poorly on Windows if the file is not in the file
 system cache. On Linux it performs well under most circumstances.
 Parameters
 ----------
 filename : str
 shape : tuple
 Total shape of the data in the file
 dtype:
 NumPy dtype of the data in the file
 offset : int, optional
 Skip :code:`offset` bytes from the beginning of the file.
 blocksize : int, optional
 Chunk size for the outermost axis. The other axes remain unchunked.
 Returns
 -------
 dask.array.Array
 Dask array matching :code:`shape` and :code:`dtype`, backed by
 memory-mapped chunks.
 '''
 load = dask.delayed(mmap_load_chunk)
 chunks = []
 for index in range(0, shape[0], blocksize):
 # Truncate the last chunk if necessary
 chunk_size = min(blocksize, shape[0] - index)
 chunk = dask.array.from_delayed(
 load(
 filename,
 shape=shape,
 dtype=dtype,
 offset=offset,
 sl=slice(index, index + chunk_size)
 ),
 shape=(chunk_size, ) + shape[1:],
 dtype=dtype
 )
 chunks.append(chunk)
 return da.concatenate(chunks, axis=0)
x = mmap_dask_array(
 filename='testfile-50-50-100-100-float32.raw',
 shape=(50, 50, 100, 100),
 dtype=np.float32
)

Chunks#

See documentation on Array Chunks for more information.

Store Dask Arrays#

store(sources, targets[, lock, regions, ...])

Store dask arrays in array-like objects, overwrite data in target

to_hdf5(filename, *args[, chunks])

Store arrays in HDF5 file

to_npy_stack(dirname, x[, axis])

Write dask array to a stack of .npy files

to_zarr(arr, url[, component, ...])

Save array to the zarr storage format

compute(*args[, traverse, optimize_graph, ...])

Compute several dask collections at once.

In Memory#

compute(*args[, traverse, optimize_graph, ...])

Compute several dask collections at once.

If you have a small amount of data, you can call np.array or .compute() on your Dask array to turn in to a normal NumPy array:

>>> x = da.arange(6, chunks=3)
>>> y = x**2
>>> np.array(y)
array([0, 1, 4, 9, 16, 25])
>>> y.compute()
array([0, 1, 4, 9, 16, 25])

NumPy style slicing#

store(sources, targets[, lock, regions, ...])

Store dask arrays in array-like objects, overwrite data in target

You can store Dask arrays in any object that supports NumPy-style slice assignment like h5py.Dataset:

>>> importh5py
>>> f = h5py.File('myfile.hdf5')
>>> d = f.require_dataset('/data', shape=x.shape, dtype=x.dtype)
>>> da.store(x, d)

Also, you can store several arrays in one computation by passing lists of sources and destinations:

>>> da.store([array1, array2], [output1, output2]) # doctest: +SKIP

HDF5#

to_hdf5(filename, *args[, chunks])

Store arrays in HDF5 file

HDF5 is sufficiently common that there is a special function to_hdf5 to store data into HDF5 files using h5py:

>>> da.to_hdf5('myfile.hdf5', '/y', y) # doctest: +SKIP

You can store several arrays in one computation with the function da.to_hdf5 by passing in a dictionary:

>>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y}) # doctest: +SKIP

Zarr#

The Zarr format is a chunk-wise binary array storage file format with a good selection of encoding and compression options. Due to each chunk being stored in a separate file, it is ideal for parallel access in both reading and writing (for the latter, if the Dask array chunks are aligned with the target). Furthermore, storage in remote data services such as S3 and GCS is supported.

For example, to save data to a local zarr dataset you would do:

>>> arr.to_zarr('output.zarr')

or to save to a particular bucket on S3:

>>> arr.to_zarr('s3://mybucket/output.zarr', storage_option={'key': 'mykey',
 'secret': 'mysecret'})

or your own custom zarr Array:

>>> z = zarr.create((10,), dtype=float, store=zarr.ZipStore("output.zarr"))
>>> arr.to_zarr(z)

To retrieve those data, you would do da.from_zarr with exactly the same arguments. The chunking of the resultant Dask array is defined by how the files were saved, unless otherwise specified.

TileDB#

TileDB is a binary array format and storage manager with tunable chunking, layout, and compression options. The TileDB storage manager library includes support for scalable storage backends such as S3 API compatible object stores and HDFS, with automatic scaling, and supports multi-threaded and multi-process reads (consistent) and writes (eventually-consistent).

To save data to a local TileDB array:

>>> arr.to_tiledb('output.tdb')

or to save to a bucket on S3:

>>> arr.to_tiledb('s3://mybucket/output.tdb',
 storage_options={'vfs.s3.aws_access_key_id': 'mykey',
 'vfs.s3.aws_secret_access_key': 'mysecret'})

Files may be retrieved by running da.from_tiledb with the same URI, and any necessary arguments.

Intermediate storage#

store(sources, targets[, lock, regions, ...])

Store dask arrays in array-like objects, overwrite data in target

In some cases, one may wish to store an intermediate result in long term storage. This differs from persist, which is mainly used to manage intermediate results within Dask that don’t necessarily have longevity. Also it differs from storing final results as these mark the end of the Dask graph. Thus intermediate results are easier to reuse without reloading data. Intermediate storage is mainly useful in cases where the data is needed outside of Dask (e.g. on disk, in a database, in the cloud, etc.). It can be useful as a checkpoint for long running or error-prone computations.

The intermediate storage use case differs from the typical storage use case as a Dask Array is returned to the user that represents the result of that storage operation. This is typically done by setting the store function’s return_stored flag to True.

x.store() # stores data, returns nothing
x = x.store(return_stored=True) # stores data, returns new dask array backed by that data

The user can then decide whether the storage operation happens immediately (by setting the compute flag to True) or later (by setting the compute flag to False). In all other ways, this behaves the same as a normal call to store. Some examples are shown below.

>>> importdask.arrayasda
>>> importzarraszr
>>> c = (2, 2)
>>> d = da.ones((10, 11), chunks=c)
>>> z1 = zr.open_array('lazy.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> z2 = zr.open_array('eager.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> d1 = d.store(z1, compute=False, return_stored=True)
>>> d2 = d.store(z2, compute=True, return_stored=True)

This can be combined with any other storage strategies either noted above, in the docs or for any specialized storage types.

Plugins#

We can run arbitrary user-defined functions on Dask arrays whenever they are constructed. This allows us to build a variety of custom behaviors that improve debugging, user warning, etc. You can register a list of functions to run on all Dask arrays to the global array_plugins= value:

>>> deff(x):
...  print(x.nbytes)
>>> with dask.config.set(array_plugins=[f]):
...  x = da.ones((10, 1), chunks=(5, 1))
...  y = x.dot(x.T)
80
80
800
800

If the plugin function returns None, then the input Dask array will be returned without change. If the plugin function returns something else, then that value will be the result of the constructor.

Examples#

Automatically compute#

We may wish to turn some Dask array code into normal NumPy code. This is useful, for example, to track down errors immediately that would otherwise be hidden by Dask’s lazy semantics:

>>> with dask.config.set(array_plugins=[lambda x: x.compute()]):
...  x = da.arange(5, chunks=2)
>>> x # this was automatically converted into a numpy array
array([0, 1, 2, 3, 4])

Warn on large chunks#

We may wish to warn users if they are creating chunks that are too large:

defwarn_on_large_chunks(x):
 shapes = list(itertools.product(*x.chunks))
 nbytes = [x.dtype.itemsize * np.prod(shape) for shape in shapes]
 if any(nb > 1e9 for nb in nbytes):
 warnings.warn("Array contains very large chunks")
with dask.config.set(array_plugins=[warn_on_large_chunks]):
 ...

Combine#

You can also combine these plugins into a list. They will run one after the other, chaining results through them:

with dask.config.set(array_plugins=[warn_on_large_chunks, lambda x: x.compute()]):
 ...