_____
.-' '-.
.' .---. '.
/ / \ \======[ Raster ]
| | GEO | |=====[ Tabular ]
| | PIPE | |=====[ Vector ]
\ \ / /======[ Fusion ]
'. '---' .'
'-._____.-'
Geospatial data pipeline framework for causal inference workflows
geopipe simplifies large-scale geospatial data pipelines that integrate satellite imagery with heterogeneous tabular data sources. It's designed for researchers running causal inference workflows with complex data fusion requirements.
- Declarative Data Fusion: YAML/Python schemas for joining 27+ heterogeneous sources
- Remote Data Access: Download from Earth Engine, Planetary Computer, and STAC catalogs
- Data Discovery: Query available datasets across cloud catalogs with
discover() - Quality Intelligence: Proactive data auditing with
schema.audit()before expensive computation - Robustness DSL: Declarative YAML specifications for sensitivity analysis across parameter space
- Pipeline Orchestration: DAG-based workflows with checkpointing and resume
- Cluster Computing: Native SLURM/PBS integration with job monitoring
pip install geopipe # With optional dependencies pip install geopipe[prefect,cluster] # Pipeline orchestration + HPC pip install geopipe[remote] # Earth Engine, Planetary Computer pip install geopipe[all] # Everything
geopipe requires GDAL and PROJ for geospatial operations. Install these first:
macOS (Homebrew):
brew install gdal proj
Ubuntu/Debian:
sudo apt-get install gdal-bin libgdal-dev libproj-dev
Conda (recommended):
conda install -c conda-forge gdal proj pip install geopipe
from geopipe import FusionSchema, RasterSource, TabularSource schema = FusionSchema( name="aid_effects", resolution="5km", temporal_range=("2000-01-01", "2020-12-31"), sources=[ RasterSource("nightlights", path="data/viirs/*.tif", aggregation="mean"), RasterSource("landcover", path="data/modis/*.tif", aggregation="mode"), TabularSource("conflict", path="data/acled.csv", spatial_join="buffer_10km", temporal_align="yearly_sum"), TabularSource("aid_projects", path="data/aiddata.csv", spatial_join="nearest"), ], output="data/interim/consolidated.parquet" ) # Execute fusion result = schema.execute()
# sources.yaml name: aid_effects resolution: 5km temporal_range: [2000年01月01日, 2020年12月31日] sources: - type: raster name: nightlights path: data/viirs/*.tif aggregation: mean - type: tabular name: conflict path: data/acled.csv spatial_join: buffer_10km temporal_align: yearly_sum output: data/interim/consolidated.parquet
geopipe fuse sources.yaml
Spatial Join Methods (spatial_join):
| Method | Description |
|---|---|
nearest |
Nearest neighbor join (default) |
intersects |
Join on geometric intersection |
within |
Source geometries within target |
contains |
Target contains source geometries |
buffer_5km |
Buffer target by 5km, then intersect |
buffer_10km |
Buffer target by 10km, then intersect |
buffer_25km |
Buffer target by 25km, then intersect |
buffer_50km |
Buffer target by 50km, then intersect |
Temporal Alignment (temporal_align):
| Method | Description |
|---|---|
exact |
No aggregation (default) |
yearly_sum |
Sum numeric values by year |
yearly_mean |
Average numeric values by year |
monthly_sum |
Sum numeric values by month |
monthly_mean |
Average numeric values by month |
latest |
Keep most recent record per geometry |
Raster Aggregation (aggregation):
| Method | Description |
|---|---|
mean |
Arithmetic mean (default) |
sum |
Sum of pixel values |
min / max |
Minimum / maximum value |
std |
Standard deviation |
median |
Median value |
mode |
Most frequent value |
count |
Count of valid pixels |
Resolution Formats:
"5km"- Kilometers"1000m"- Meters"0.5deg"- Decimal degrees
from geopipe import Pipeline, task @task(cache=True, checkpoint=True) def download_imagery(region, year): ... @task(cache=True, resources={"memory": "16GB"}) def extract_features(imagery_path, model="resnet50"): ... pipeline = Pipeline([download_imagery, extract_features]) pipeline.run(resume=True) # Resume from last checkpoint
Checkpoint Behavior:
- Task checkpoints stored in
.geopipe/checkpoints/(configurable viacheckpoint_dir) - Pipeline state stored in
.geopipe/pipeline/{name}_state.json - Resume skips completed stages and re-executes from failure point
- Clear checkpoints with
geopipe cleanor delete.geopipe/directory
# Configure checkpoint location pipeline = Pipeline( [download_imagery, extract_features], checkpoint_dir="my_checkpoints/", ) # Force fresh run (ignore existing checkpoints) pipeline.run(resume=False) # Resume from last successful stage pipeline.run(resume=True)
from geopipe.cluster import SLURMExecutor executor = SLURMExecutor( partition="gpu", nodes=10, time_limit="24:00:00", conda_env="geopipe", ) pipeline.run(executor=executor) executor.status() # Monitor progress
Run parallel analysis variants to assess sensitivity of results. See Robustness DSL for the declarative YAML approach.
from geopipe.specs import Spec, SpecRegistry # Define analysis variants programmatically specs = SpecRegistry([ Spec("MAIN", buffer_km=5, include_ntl=True), Spec("ROBUST_BUFFER", buffer_km=10, include_ntl=True), Spec("ROBUST_NO_NTL", buffer_km=5, include_ntl=False), ]) # Run each specification for spec in specs: schema.output = f"results/{spec.name}/estimates.csv" schema.execute() # Compare results and generate LaTeX table latex = specs.to_latex( pattern="{spec}/estimates.csv", estimate_col="estimate", se_col="std_error", caption="Robustness Specifications", )
Or use the declarative YAML approach (recommended):
# schema.yaml robustness: dimensions: buffer_km: [5, 10, 25] include_ntl: [true, false]
geopipe specs run schema.yaml
geopipe was designed for workflows like estimating causal effects from satellite imagery across multiple countries, integrating:
- Satellite data: VIIRS nightlights, MODIS land cover, Sentinel-2 imagery
- Conflict data: ACLED, UCDP
- Climate data: CHIRPS precipitation, ERA5 temperature
- Development data: World Bank indicators, DHS surveys
- Treatment data: Aid project locations (AidData, IATI)
Instead of coordinating 25+ scripts manually:
Analysis/
├── 01_setup.R
├── 02_get_conflict.R
├── 03_get_climate.R
├── ... (11 data prep scripts)
├── call_CI_analysis.R
└── consolidate_results.R
Use a single declarative pipeline:
schema = FusionSchema.from_yaml("sources.yaml") pipeline = Pipeline.from_schema(schema) pipeline.add_stage(run_cnn_inference) pipeline.add_stage(estimate_causal_effects) pipeline.run(executor=SLURMExecutor(), specs=["MAIN", "ROBUST_BUFFER"])
All sources default to EPSG:4326 (WGS84). Buffer operations automatically convert to EPSG:3857 for accurate distance calculations.
- Formats: GeoTIFF, Cloud-Optimized GeoTIFF (COG)
- Bands: 1-indexed (e.g.,
band=1for first band) - Nodata: Auto-detected from file metadata or explicit parameter
- Formats: CSV, Parquet, Excel (.xlsx), JSON
- Required columns (one of):
latitude+longitudecolumns (configurable vialat_col,lon_col)geometrycolumn with WKT strings
ISO 8601 strings: "2020-01-01" or "2020-01-01T00:00:00"
Tuple of (minx, miny, maxx, maxy) in WGS84 coordinates.
geopipe uses colored terminal output via rich:
- Blue: Progress information
- Green: Success messages
- Yellow: Warnings and retries
- Red: Errors
File not found errors:
# Validate sources before execution issues = schema.validate_sources() if issues: print("\n".join(issues))
Task failures with retry:
@task(retries=3, retry_delay=5.0) # Retry 3x with exponential backoff def flaky_download(url): ...
Checkpoint corruption:
# Clear all checkpoints and restart geopipe clean # Or manually: rm -rf .geopipe/
R users can access geopipe via the reticulate package. Results are returned as GeoDataFrames that convert directly to sf objects.
install.packages(c("reticulate", "sf")) library(reticulate) # Create conda environment with dependencies conda_create("geopipe-env") conda_install("geopipe-env", c("gdal", "proj"), channel = "conda-forge") py_install("geopipe", envname = "geopipe-env")
library(reticulate) library(sf) use_condaenv("geopipe-env") # Import geopipe geopipe <- import("geopipe") # Define fusion schema schema <- geopipe$FusionSchema( name = "analysis", resolution = "5km", sources = list( geopipe$RasterSource("nightlights", path = "data/viirs/*.tif", aggregation = "mean"), geopipe$TabularSource("conflict", path = "data/acled.csv", spatial_join = "buffer_10km") ), output = "results/fused.parquet" ) # Execute and convert to sf result <- schema$execute() result_sf <- st_as_sf(result)
Load a pre-configured YAML schema to minimize Python syntax:
schema <- geopipe$FusionSchema$from_yaml("sources.yaml") result_sf <- st_as_sf(schema$execute())
geopipe can download satellite imagery directly from cloud providers. Install remote dependencies:
pip install geopipe[remote]
| Provider | Source Class | Collections |
|---|---|---|
| Google Earth Engine | EarthEngineSource |
VIIRS, Landsat, Sentinel-2, MODIS, ERA5, etc. |
| Microsoft Planetary Computer | PlanetaryComputerSource |
Sentinel-2, Landsat, NAIP, etc. |
| Any STAC Catalog | STACSource |
Varies by catalog |
Earth Engine (required):
# One-time authentication earthengine authenticate # Or use service account export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
Planetary Computer (optional, for higher rate limits):
export PC_SDK_SUBSCRIPTION_KEY=your-api-keySTAC Catalogs: No authentication needed for public catalogs.
from geopipe import EarthEngineSource source = EarthEngineSource( name="viirs_nightlights", collection="NOAA/VIIRS/DNB/MONTHLY_V1/VCMCFG", bands=["avg_rad"], center=(7.5, 9.0), # lon, lat (Nigeria) patch_size_km=100, # 100km x 100km area resolution=500, # 500m resolution temporal_range=("2023-01-01", "2023-12-31"), reducer="mean", # Temporal composite method output_dir="data/remote", ) gdf = source.load() # Downloads and returns GeoDataFrame
from geopipe import PlanetaryComputerSource source = PlanetaryComputerSource( name="sentinel2", collection="sentinel-2-l2a", bands=["B04", "B03", "B02", "B08"], # Red, Green, Blue, NIR bounds=(-122.5, 37.7, -122.4, 37.8), resolution=10, temporal_range=("2023-06-01", "2023-08-31"), cloud_cover_max=20, # Filter cloudy images output_dir="data/remote", ) gdf = source.load()
from geopipe import STACSource source = STACSource( name="sentinel2_element84", catalog_url="https://earth-search.aws.element84.com/v1", collection="sentinel-2-l2a", assets=["red", "green", "blue"], center=(-105.27, 40.01), # Boulder, CO patch_size_km=10, temporal_range=("2023-07-01", "2023-09-30"), output_dir="data/remote", ) gdf = source.load()
Spatial Extent (choose one):
bounds=(minx, miny, maxx, maxy)- Explicit bounding box in WGS84center=(lon, lat)+patch_size_km=N- Center point with patch size
Common Parameters:
| Parameter | Description |
|---|---|
resolution |
Target resolution in meters |
temporal_range |
("start_date", "end_date") ISO format |
output_dir |
Directory for cached downloads |
bands / assets |
List of band names to extract |
Earth Engine Specific:
| Parameter | Description |
|---|---|
reducer |
Temporal composite: mean, median, max, min, sum, first |
cloud_mask |
Apply cloud masking for optical sensors |
scale_factor |
Multiply band values by factor |
Planetary Computer Specific:
| Parameter | Description |
|---|---|
cloud_cover_max |
Maximum cloud cover percentage (0-100) |
mosaic_method |
How to combine images: first, median, mean |
Downloaded files are automatically cached based on query parameters:
data/remote/
├── earthengine/
│ ├── abc123def456.tif
│ └── abc123def456.json # metadata
├── planetary_computer/
└── stac/
Subsequent calls with the same parameters load from cache without re-downloading.
Discover available datasets across cloud catalogs before building your pipeline:
from geopipe import discover # Find nightlights data results = discover( bounds=(-122.5, 37.7, -122.4, 37.8), categories=["nightlights"], ) for r in results: print(f"{r.name}: {r.resolution_m}m ({r.provider})") # Convert discovery result to source source = results[0].to_source(name="sf_ntl") schema.add_source(source)
# Search by category geopipe discover search --category nightlights # Search by provider geopipe discover search --provider earthengine --format table # Get dataset details geopipe discover info viirs_dnb_monthly # List all known datasets geopipe discover list-datasets # List categories geopipe discover categories
| Category | Description | Example Datasets |
|---|---|---|
nightlights |
Nighttime light emissions | VIIRS DNB, DMSP-OLS |
optical |
Multispectral imagery | Sentinel-2, Landsat, MODIS |
sar |
Synthetic aperture radar | Sentinel-1, ALOS PALSAR |
elevation |
Digital elevation models | SRTM, Copernicus DEM |
climate |
Weather and climate | ERA5, CHIRPS, TerraClimate |
land_cover |
Land use classification | ESA WorldCover, Dynamic World |
vegetation |
Vegetation indices | MODIS NDVI/EVI, LAI |
population |
Population density | WorldPop, LandScan |
infrastructure |
Built environment | GHSL, OSM Buildings |
Discovery results can suggest related datasets:
viirs = discover(categories=["nightlights"])[0] related = viirs.suggest_complementary() # Returns: Sentinel-2, Landsat, etc.
Audit data quality before running expensive computations:
from geopipe import FusionSchema schema = FusionSchema.from_yaml("sources.yaml") # Run quality audit report = schema.audit() # Check results print(report.summary()) print(f"Overall score: {report.overall_score}/100") if report.has_errors: print("Critical issues found!") for issue in report.filter(severity="error"): print(f" - {issue.source_name}: {issue.message}") # Export report report.to_markdown("quality_report.md")
# Basic audit geopipe audit schema.yaml # Save report geopipe audit schema.yaml --output report.md # Auto-fix issues geopipe audit schema.yaml --fix # Strict mode (fail on warnings) geopipe audit schema.yaml --strict
| Check | Category | Description |
|---|---|---|
TemporalOverlapCheck |
Cross-source | Verify all sources have overlapping time ranges |
CRSAlignmentCheck |
Cross-source | Check CRS compatibility across sources |
BoundsOverlapCheck |
Cross-source | Verify spatial extents overlap |
TemporalGapCheck |
Raster | Detect missing dates in time series |
SpatialCoverageCheck |
Raster | Report NoData percentage in AOI |
MissingValueCheck |
Tabular | Check for nulls in key columns |
GeocodingPrecisionCheck |
Tabular | Analyze coordinate precision |
- ERROR: Critical issues that will cause fusion to fail
- WARNING: Issues that may affect data quality (-10 points)
- INFO: Informational notes (-2 points)
Define robustness specifications declaratively in YAML for sensitivity analysis:
# schema.yaml name: aid_effects resolution: 5km temporal_range: [2010年01月01日, 2020年12月31日] sources: - type: tabular name: conflict path: data/acled.csv spatial_join: buffer_${buffer_km}km # Template substitution output: results/${spec_name}/fused.parquet # Per-spec output robustness: dimensions: buffer_km: [5, 10, 25, 50] include_ntl: [true, false] temporal_lag: [0, 1, 2] exclude: - buffer_km: 50 temporal_lag: 2 # Invalid combination named: MAIN: buffer_km: 10 include_ntl: true temporal_lag: 0
This generates 4 x 2 x 3 - 1 = 23 specifications automatically.
from geopipe import FusionSchema schema = FusionSchema.from_yaml("schema.yaml") # Expand to spec + configured schema pairs for spec, configured_schema in schema.expand_specs(): print(f"Running {spec.name}...") configured_schema.execute()
# Preview specifications geopipe specs list schema.yaml # Preview with parameters geopipe specs list schema.yaml --verbose # Write individual schema files geopipe specs expand schema.yaml --output-dir schemas/ # Run all specifications geopipe specs run schema.yaml # Run specific specs only geopipe specs run schema.yaml --spec MAIN --spec ROBUST_BUFFER # Dry run (show what would execute) geopipe specs run schema.yaml --dry-run
Analyze results across all specifications:
# Generate specification curve from results geopipe specs curve "results/*/estimates.csv" --output curve.pdf # Export as LaTeX table geopipe specs curve "results/*.csv" --format latex # With custom column names geopipe specs curve "results/*.csv" \ --estimate-col treatment_effect \ --se-col robust_se
from geopipe.specs import SpecificationCurve curve = SpecificationCurve( specs=specs, results_pattern="results/{spec_name}/estimates.csv", estimate_col="treatment_effect", ) # Summary statistics summary = curve.compute_summary() print(f"Mean estimate: {summary['mean_estimate']:.4f}") print(f"% Significant: {summary['pct_significant_05']:.1f}%") # Rank dimensions by influence influence = curve.rank_by_influence() print(influence[["dimension", "variance_ratio"]]) # Generate plot curve.plot("figures/spec_curve.pdf") # Export for publication print(curve.to_latex())
Use ${param} or {param} in schema values:
sources: - name: conflict spatial_join: buffer_${buffer_km}km output: results/{spec_name}/data.parquet
Remote sources work in YAML schemas:
sources: - type: earthengine name: viirs collection: NOAA/VIIRS/DNB/MONTHLY_V1/VCMCFG bands: [avg_rad] center: [7.5, 9.0] patch_size_km: 100 resolution: 500 temporal_range: ["2023-01-01", "2023-12-31"] output_dir: data/remote - type: planetary_computer name: sentinel2 collection: sentinel-2-l2a bands: [B04, B08] bounds: [-122.5, 37.5, -122.0, 38.0] cloud_cover_max: 20 resolution: 10 output_dir: data/remote
MIT License. See LICENSE for details.
If you use geopipe in your research, please cite:
@software{geopipe, author = {Jerzak, Connor T.}, title = {geopipe: Geospatial Data Pipeline Framework}, year = {2025}, url = {https://github.com/cjerzak/geopipe-software} }