Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Quick Start

Manu Murugesan edited this page Mar 14, 2026 · 3 revisions

Quick Start

This page walks through the basic workflow: setting up a Dask cluster, loading claims, cleaning them, and extracting a patient cohort.

1. Set Up a Dask Cluster

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
 n_workers=8,
 threads_per_worker=1, # 1 thread per worker avoids GIL contention with pandas
 memory_limit="8GB",
)
client = Client(cluster)
print(client.dashboard_link) # Opens Dask dashboard for monitoring

For HPC/SLURM environments, see Scaling with Dask.

2. Load and Clean Claims

MAX Format (Pre-2016)

from medicaid_utils.preprocessing import max_ip, max_ot, max_ps
# Load and preprocess inpatient claims (cleaning + variable construction)
ip = max_ip.MAXIP(year=2012, state="WY", data_root="/path/to/data")
# Access the cleaned Dask DataFrame
df_ip = ip.df
# Load outpatient claims with IP overlap flagging
ot = max_ot.MAXOT(year=2012, state="WY", data_root="/path/to/data")
ot.flag_ip_overlaps_and_ed(df_ip)
# Load person summary with rural classification
ps = max_ps.MAXPS(year=2012, state="WY", data_root="/path/to/data")

TAF Format (2016+)

from medicaid_utils.preprocessing import taf_ip, taf_ot, taf_ps
ip = taf_ip.TAFIP(year=2019, state="AL", data_root="/path/to/data")
ps = taf_ps.TAFPS(year=2019, state="AL", data_root="/path/to/data")
# TAF data is in dct_files (keyed by subtype: "base", "line", "occurrence_code", "base_diag_codes", "line_ndc_codes")
df_ip_base = ip.dct_files["base"]

Key difference: MAX files use ip.df, TAF files use ip.dct_files["base"]. See MAX vs TAF for details.

3. Apply Risk Adjustment

from medicaid_utils.adapted_algorithms.py_elixhauser.elixhauser_comorbidity import score
# MAX — first construct LST_DIAG_CD from individual diagnosis columns
diag_cols = [c for c in ip.df.columns if c.startswith("DIAG_CD_")]
ip.df = ip.df.map_partitions(
 lambda pdf: pdf.assign(
 LST_DIAG_CD=pdf[diag_cols].apply(
 lambda row: ",".join(v for v in row if v and str(v).strip()), axis=1
 )
 )
)
df_scored = score(ip.df, lst_diag_col_name="LST_DIAG_CD", cms_format="MAX")
# TAF — gather diagnosis codes (creates LST_DIAG_CD on dct_files["base_diag_codes"])
# ip.gather_bene_level_diag_ndc_codes()
# df_scored = score(ip.dct_files["base_diag_codes"], lst_diag_col_name="LST_DIAG_CD", cms_format="TAF")

4. Extract a Patient Cohort

from medicaid_utils.filters.patients.cohort_extraction import extract_cohort
# Define ICD-9 and ICD-10 diagnosis codes for Type 2 diabetes
dct_codes = {
 "diag_codes": {"diabetes_t2": {"incl": {9: ["250"], 10: ["E11"]}}},
 "proc_codes": {},
}
# Define filters and paths
dct_filters = {"cohort": {"ip": {"missing_dob": 0}}, "export": {}}
dct_paths = {"source_root": "/path/to/data", "export_folder": "/output/cohort/"}
# Extract and export cohort claim files
extract_cohort(
 state="WY", lst_year=[2012],
 dct_diag_proc_codes=dct_codes,
 dct_filters=dct_filters,
 lst_types_to_export=["ip", "ot", "ps"],
 dct_data_paths=dct_paths,
 cms_format="MAX",
)

5. Flag Claims by Diagnosis or Procedure

from medicaid_utils.filters.claims import dx_and_proc
# Flag claims matching ICD-9 diagnosis codes
df_flagged = dx_and_proc.flag_diagnoses_and_procedures(
 dct_diag_codes={"asthma": {"incl": {9: ["4939", "49390"]}}},
 dct_proc_codes={},
 df_claims=ot.df,
 cms_format="MAX",
)

Next Steps

Clone this wiki locally

AltStyle によって変換されたページ (->オリジナル) /