-
Notifications
You must be signed in to change notification settings - Fork 3
Scaling with Dask
medicaid-utils uses Dask for distributed computation. All DataFrames in the package are lazy Dask DataFrames — operations are deferred until .compute() is called.
For workstations with sufficient RAM (recommended: 64 GB+ for state-level data):
from dask.distributed import Client, LocalCluster cluster = LocalCluster( n_workers=8, threads_per_worker=1, # 1 thread per worker avoids GIL contention with pandas memory_limit="8GB", ) client = Client(cluster) print(client.dashboard_link) # Opens Dask dashboard at http://localhost:8787
For high-performance computing environments, use dask-jobqueue:
from dask_jobqueue import SLURMCluster from dask.distributed import Client cluster = SLURMCluster( cores=4, memory="32GB", processes=1, walltime="04:00:00", queue="standard", ) cluster.scale(jobs=10) # Request 10 SLURM jobs client = Client(cluster)
from dask_jobqueue import PBSCluster cluster = PBSCluster( cores=4, memory="32GB", processes=1, walltime="04:00:00", queue="batch", ) cluster.scale(jobs=10) client = Client(cluster)
Dask defaults to its synchronous scheduler if no distributed client is created. This works for small datasets or debugging:
import dask dask.config.set(scheduler="threads") # Multi-threaded (default) dask.config.set(scheduler="synchronous") # Single-threaded (debugging)
Use tmp_folder when loading claims to cache intermediate results to disk:
ip = max_ip.MAXIP( year=2012, state="WY", data_root="/data/cms", tmp_folder="/tmp/cache" )
This prevents the Dask task graph from growing too large when processing multiple claim files.
Aim for partitions of 50–200 MB each. The package handles partitioning automatically based on the input Parquet files, but you can repartition when exporting:
ip.export("/output/", output_format="parquet", repartition=True)
The Dask dashboard (typically at http://localhost:8787) shows:
- Task progress and scheduling
- Worker memory usage
- CPU utilization per worker
- Task stream (timeline view)
For multi-state analyses, process states sequentially to control memory:
for state in ["AL", "IL", "CA"]: ip = taf_ip.TAFIP(year=2019, state=state, data_root="/data/cms") # ... process ... ip.export(f"/output/{state}/", output_format="parquet") del ip # Free memory before next state import gc; gc.collect()
Getting Started
User Guide
Recipes & How-Tos
Reference
Links