Context: I have 4 xarray datasets that are 8Gb, 45Gb, 8Gb and 20Gb (80Gb total). They all have 1 3D variable with axis: time, y, x. I want to combine them and save the output on disk.
Operation on each dataset:
For each of them I:
- Load them chunked
- Sort them by date
- Drop 'time' duplicates
- Coarsen 'y' and 'x' axis with
xr.ds.coarsen(x=2, y=2, boundary='trim').mean() - Store them in a list
Assembly of combined dataset:
Once I have loaded, sorted and coarsened all the datasets in the list (called cubes) I combine them in two steps:
- I keep the dates shared by all datasets only
- I combine them using
combined = xr.combine_by_coords(cubes).
Save on disk:
Finally, I use dask capabilities to write the dataset on disk:
write_job = combined.to_netcdf(f'combined.nc', compute=False)
with ProgressBar():
print(f"Writing to {'combined.nc'}")
write_job.compute()
Result:
My combined dataset is a bit more than 8Gb on memory and chunked.
Problem:
When I compute the lines to write the dataset, it completely blows my 120Gb of RAM. Is it because all the dask delayed operations are simply postponed until the end and compute all at once ? Why does it break my RAM ?
I also tried simply using xr.to_netcdf(combines, compute = True) but it would also overload my RAM.
Code:
list_cubes = ['cube0.nc','cube1.nc','cube2.nc','cube3.nc'] # Names of files
cubes = [] # Initialize list of datasets
chunks = "auto" # change the default value to auto
# --- Operation on each dataset --- #
for filepath in list_cubes:
# Open dataset
ds = xr.open_dataset(filepath, chunks=chunks) # Chunk it
ds = ds.sortby('time') # Sort by time
ds = ds.drop_duplicates(dim='time') # Drop time duplicates
ds = ds.coarsen(x=2, y=2, boundary='trim').mean() # Coarsen by average
cubes.append(ds) # Append sorted, coarsed and chunked dataset to list
# --- Make sure we only keep dates shared by all the datasets --- #
# Assuming your datasets are stored in a list called 'cubes'
common_dates = cubes[0]['time'] # Start with the dates from the first dataset
for ds in cubes[1:]:
common_dates = np.intersect1d(common_dates, ds['time']) # Find common dates
# Now filter each dataset to keep only the common dates
cubes = [ds.sel(time=common_dates) for ds in cubes]
# --- Combine the datasets --- #
combined= xr.combine_by_coords(cubes)
from dask.diagnostics import ProgressBar
write_job = combined.to_netcdf(f'combined.nc', compute=False)
with ProgressBar():
print(f"Writing to {'combined.nc'}")
write_job.compute()
-
Question: does all your files share the same xy coordinates? using combining_by_coords within files with different xy coordinates might lead to the creation of many undesirable nan values that will increase the output file size. you also might want to try saving your output file as chunks of the time dimension (e.g. one combined file per day)Gabriel Lucas– Gabriel Lucas2024年05月28日 03:41:18 +00:00Commented May 28, 2024 at 3:41
-
They do not share the same XY, although their combined footprint forms one big box. In that sense they can be stitched together. For what you suggest, is there a command to split the save into different files (per chunk) or should I write a loop iterating through each temporal chunk and saving them as netcdfs ?vdc– vdc2024年05月28日 04:13:56 +00:00Commented May 28, 2024 at 4:13
-
1I usually just loop as: for t in combined.time: \ ds_chunk = combined.sel(time=t) \ ds_chunk.load() \ ds_chunk.to_netcdf('chunk_time_{}.nc'.format(t.dt.strftime("%Y-%m-%d").values[0]))Gabriel Lucas– Gabriel Lucas2024年05月28日 04:32:21 +00:00Commented May 28, 2024 at 4:32