situation
I have got a large NetCDF file with some Earth System model data. I access the file and the data within via xarray and want to process one variable via dask distributed on several workers (some calculation). The xarray variable is chunked in a way that the processing can be performed on each chunk individually without information from other chunks. Number of chunks equals number of workers. After defining my calculation, I call compute(). The full chunked variable seems to be send to each worker (see example below). Instead, I would have expected that each worker gets only only chunk a performs his calculation on it.
I am missing something important but I don't know what.
minimal example
You find the file and a jupyter notebook in a dedicated GitHub repository: https://github.com/neumannd/issue_usage_dask
Two workers are started and each worker gets 350 MB of RAM. The size of my xarray dataset is 395.5 MB and it is split into two chunks of the size of 197.75 MB. It is split in a way that the operation (mean(dim='time')) can be performed individually. However, the workers crash because more than 95% of their memory is filled (332.5 MB) by data. That means that they do not only receive one chunk but the whole variable.
In this minimal example, it is not issue (I could increate the memory of the workers). But, if the dataset is 8 GB in size and each of four workers has to have 8 GB memory or if I work one n > 10 workers with a xarray dataset of x > 10 GB size, I occupy n * x > 100 GB of memory, which get quite bad for increasing n and x.
# load libraries
import xarray as xr
import dask
from dask.distributed import Client
# set path of data
data_file = "/home/k204221/large_data/more_netcdf/CMAQ_t_zoo_59_1.nc"
# initialize workers
client = Client(n_workers=2, threads_per_worker=1,
memory_limit='350MB', dashboard_address=':8787')
client
# open data file
ds = xr.open_dataset(data_file, chunks = {"xt_ocean": 112, "yt_ocean": 242})
t_zoo = ds.t_zoo
t_zoo.data
# process data
zoo_mean_delay = t_zoo.mean(dim = 'time')
zoo_mean_delay
## output
# <xarray.DataArray 't_zoo' (st_ocean: 152, yt_ocean: 242, xt_ocean: 224)>
# dask.array<mean_agg-aggregate, shape=(152, 242, 224), dtype=float32, chunksize=(152, 242, 112)>
# Coordinates:
# * xt_ocean (xt_ocean) float64 8.283 8.383 8.483 8.583 ... 30.38 30.48 30.58
# * yt_ocean (yt_ocean) float64 53.86 53.91 53.96 54.01 ... 65.81 65.86 65.91
# * st_ocean (st_ocean) float64 0.25 0.7508 1.257 1.771 ... 263.0 265.0 267.0
# I tried to explicitely list the workers in the call of compute
# hoping to let it work this way
zoo_mean_comp = zoo_mean_delay.compute(workers = ['localhost:39661', 'localhost:34049'])
## output
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
#
# KeyboardInterrupt
#
# distributed.nanny - WARNING - Restarting worker
notes
- I tried
client.scatter(t_zoo)but it did not help. - Calling
client.compute(zoo_mean_delay)instead ofzoo_mean_delay.compute()did not help.
-
After looking at the workers' work load and the task graph in the scheduler it seems to me that one worker is collecting the whole data at certain steps of the calculation. Moreover (after reading the dask documentation on chunking), the sizes of the chunks seem to be inappropriate.daniel.heydebreck– daniel.heydebreck2019年11月20日 18:23:47 +00:00Commented Nov 20, 2019 at 18:23
1 Answer 1
Two workers are started and each worker gets 350 MB of RAM
However, the workers crash because more than 95% of their memory is filled (332.5 MB) by data
This is not enough RAM to effectively run the Numpy/Pandas/Xarray stack. Just running Python and importing these libraries takes up considerable memory. I wouldn't try running anything in under 2GB of RAM.
2 Comments
Explore related questions
See similar questions with these tags.