-
Notifications
You must be signed in to change notification settings - Fork 3
Quick Start
Manu Murugesan edited this page Mar 14, 2026
·
3 revisions
This page walks through the basic workflow: setting up a Dask cluster, loading claims, cleaning them, and extracting a patient cohort.
from dask.distributed import Client, LocalCluster cluster = LocalCluster( n_workers=8, threads_per_worker=1, # 1 thread per worker avoids GIL contention with pandas memory_limit="8GB", ) client = Client(cluster) print(client.dashboard_link) # Opens Dask dashboard for monitoring
For HPC/SLURM environments, see Scaling with Dask.
from medicaid_utils.preprocessing import max_ip, max_ot, max_ps # Load and preprocess inpatient claims (cleaning + variable construction) ip = max_ip.MAXIP(year=2012, state="WY", data_root="/path/to/data") # Access the cleaned Dask DataFrame df_ip = ip.df # Load outpatient claims with IP overlap flagging ot = max_ot.MAXOT(year=2012, state="WY", data_root="/path/to/data") ot.flag_ip_overlaps_and_ed(df_ip) # Load person summary with rural classification ps = max_ps.MAXPS(year=2012, state="WY", data_root="/path/to/data")
from medicaid_utils.preprocessing import taf_ip, taf_ot, taf_ps ip = taf_ip.TAFIP(year=2019, state="AL", data_root="/path/to/data") ps = taf_ps.TAFPS(year=2019, state="AL", data_root="/path/to/data") # TAF data is in dct_files (keyed by subtype: "base", "line", "occurrence_code", "base_diag_codes", "line_ndc_codes") df_ip_base = ip.dct_files["base"]
Key difference: MAX files use
ip.df, TAF files useip.dct_files["base"]. See MAX vs TAF for details.
from medicaid_utils.adapted_algorithms.py_elixhauser.elixhauser_comorbidity import score # MAX — first construct LST_DIAG_CD from individual diagnosis columns diag_cols = [c for c in ip.df.columns if c.startswith("DIAG_CD_")] ip.df = ip.df.map_partitions( lambda pdf: pdf.assign( LST_DIAG_CD=pdf[diag_cols].apply( lambda row: ",".join(v for v in row if v and str(v).strip()), axis=1 ) ) ) df_scored = score(ip.df, lst_diag_col_name="LST_DIAG_CD", cms_format="MAX") # TAF — gather diagnosis codes (creates LST_DIAG_CD on dct_files["base_diag_codes"]) # ip.gather_bene_level_diag_ndc_codes() # df_scored = score(ip.dct_files["base_diag_codes"], lst_diag_col_name="LST_DIAG_CD", cms_format="TAF")
from medicaid_utils.filters.patients.cohort_extraction import extract_cohort # Define ICD-9 and ICD-10 diagnosis codes for Type 2 diabetes dct_codes = { "diag_codes": {"diabetes_t2": {"incl": {9: ["250"], 10: ["E11"]}}}, "proc_codes": {}, } # Define filters and paths dct_filters = {"cohort": {"ip": {"missing_dob": 0}}, "export": {}} dct_paths = {"source_root": "/path/to/data", "export_folder": "/output/cohort/"} # Extract and export cohort claim files extract_cohort( state="WY", lst_year=[2012], dct_diag_proc_codes=dct_codes, dct_filters=dct_filters, lst_types_to_export=["ip", "ot", "ps"], dct_data_paths=dct_paths, cms_format="MAX", )
from medicaid_utils.filters.claims import dx_and_proc # Flag claims matching ICD-9 diagnosis codes df_flagged = dx_and_proc.flag_diagnoses_and_procedures( dct_diag_codes={"asthma": {"incl": {9: ["4939", "49390"]}}}, dct_proc_codes={}, df_claims=ot.df, cms_format="MAX", )
- Cohort Extraction — Detailed guide to building patient cohorts
- Risk Adjustment Algorithms — All 8 clinical algorithms explained
- Common Recipes — Frequently needed operations
Getting Started
User Guide
Recipes & How-Tos
Reference
Links