2

situation

I have got a large NetCDF file with some Earth System model data. I access the file and the data within via xarray and want to process one variable via dask distributed on several workers (some calculation). The xarray variable is chunked in a way that the processing can be performed on each chunk individually without information from other chunks. Number of chunks equals number of workers. After defining my calculation, I call compute(). The full chunked variable seems to be send to each worker (see example below). Instead, I would have expected that each worker gets only only chunk a performs his calculation on it.

I am missing something important but I don't know what.

minimal example

You find the file and a jupyter notebook in a dedicated GitHub repository: https://github.com/neumannd/issue_usage_dask

Two workers are started and each worker gets 350 MB of RAM. The size of my xarray dataset is 395.5 MB and it is split into two chunks of the size of 197.75 MB. It is split in a way that the operation (mean(dim='time')) can be performed individually. However, the workers crash because more than 95% of their memory is filled (332.5 MB) by data. That means that they do not only receive one chunk but the whole variable.

In this minimal example, it is not issue (I could increate the memory of the workers). But, if the dataset is 8 GB in size and each of four workers has to have 8 GB memory or if I work one n > 10 workers with a xarray dataset of x > 10 GB size, I occupy n * x > 100 GB of memory, which get quite bad for increasing n and x.

# load libraries
import xarray as xr
import dask
from dask.distributed import Client
# set path of data
data_file = "/home/k204221/large_data/more_netcdf/CMAQ_t_zoo_59_1.nc"
# initialize workers
client = Client(n_workers=2, threads_per_worker=1,
 memory_limit='350MB', dashboard_address=':8787')
client

enter image description here

# open data file
ds = xr.open_dataset(data_file, chunks = {"xt_ocean": 112, "yt_ocean": 242})
t_zoo = ds.t_zoo
t_zoo.data

enter image description here

# process data
zoo_mean_delay = t_zoo.mean(dim = 'time')
zoo_mean_delay
## output
# <xarray.DataArray 't_zoo' (st_ocean: 152, yt_ocean: 242, xt_ocean: 224)>
# dask.array<mean_agg-aggregate, shape=(152, 242, 224), dtype=float32, chunksize=(152, 242, 112)>
# Coordinates:
# * xt_ocean (xt_ocean) float64 8.283 8.383 8.483 8.583 ... 30.38 30.48 30.58
# * yt_ocean (yt_ocean) float64 53.86 53.91 53.96 54.01 ... 65.81 65.86 65.91
# * st_ocean (st_ocean) float64 0.25 0.7508 1.257 1.771 ... 263.0 265.0 267.0
# I tried to explicitely list the workers in the call of compute
# hoping to let it work this way
zoo_mean_comp = zoo_mean_delay.compute(workers = ['localhost:39661', 'localhost:34049'])
## output
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
# 
# KeyboardInterrupt
# 
# distributed.nanny - WARNING - Restarting worker

notes

  • I tried client.scatter(t_zoo) but it did not help.
  • Calling client.compute(zoo_mean_delay) instead of zoo_mean_delay.compute() did not help.
asked Nov 15, 2019 at 20:19
1
  • After looking at the workers' work load and the task graph in the scheduler it seems to me that one worker is collecting the whole data at certain steps of the calculation. Moreover (after reading the dask documentation on chunking), the sizes of the chunks seem to be inappropriate. Commented Nov 20, 2019 at 18:23

1 Answer 1

0

Two workers are started and each worker gets 350 MB of RAM

However, the workers crash because more than 95% of their memory is filled (332.5 MB) by data

This is not enough RAM to effectively run the Numpy/Pandas/Xarray stack. Just running Python and importing these libraries takes up considerable memory. I wouldn't try running anything in under 2GB of RAM.

answered Nov 17, 2019 at 16:45
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for the hint. This is does not answer my problem. I clearly stated that the example above is a minimal example. If I have a dataset of 3 GB in total size and two workers of 2GB memory the same issue occurrs.
Question: How large should my example data file be that I provide here?

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.