×ばつm nodes (n keys ×ばつ m columns) and evaluating it across f files is a huge win. When this optimization helps less: Small datasets (< 10k keys): Already fast with In() predicates, minimal improvement Dense numeric keys: Range filter helps, but less dramatic than the sparse case Workloads where the original filter already performed well 2. Anti-Join for Insert Filtering Replaced per-batch expression filtering with a single anti-join operation after processing all batches: # Before: Per-batch expression filtering (slow) for batch in matched_iceberg_record_batches: expr_match = create_match_filter(rows, join_cols) expr_match_arrow = expression_to_pyarrow(bind(...)) rows_to_insert = rows_to_insert.filter(~expr_match_arrow) # After: Single anti-join (fast) combined_matched_keys = pa.concat_tables(matched_target_keys).group_by(join_cols).aggregate([]) rows_to_insert = df_keys.join(combined_matched_keys, keys=join_cols, join_type="left anti") Note on memory usage: The new approach accumulates matched keys in memory during batch processing. We only store key columns (not full rows) to minimize memory footprint, and deduplicate after the loop. For tables with millions of matching rows, this could increase peak memory usage compared to the previous approach. A potential future improvement would be incremental deduplication during the loop. 3. Vectorized Row Comparison Replaced row-by-row Python comparison with vectorized PyArrow operations: # Before: Python loop with slice() for source_idx, target_idx in zip(...): source_row = source_table.slice(source_idx, 1) target_row = target_table.slice(target_idx, 1) for key in non_key_cols: if source_row.column(key)[0].as_py() != target_row.column(key)[0].as_py(): to_update_indices.append(source_idx) # After: Vectorized with take() and compute matched_source = source_table.take(source_indices) matched_target = target_table.take(target_indices) for col in non_key_cols: col_diff = _compare_columns_vectorized(source_col, target_col) diff_masks.append(col_diff) combined_mask = functools.reduce(pc.or_, diff_masks) The _compare_columns_vectorized() function handles: Primitive types: Uses pc.not_equal() with proper null handling Struct types: Recursively compares each nested field List/Map types: Falls back to Python comparison (still batched) Benchmark Results Ran benchmarks on a table with ~2M rows, doing incremental upserts: Run Table Size Batches Original Optimized Speedup 2 2M rows 32 11.9 min 2.3 min 5.1x 3 2M rows 96 31.5 min 3.9 min 8.0x 4 2M rows 160+ 51.2 min 5.5 min 9.3x Why times increase with each run: The table uses bucketing, and each upsert modifies files independently, causing file count to increase over time. The original implementation's big filter expression (Or(And(...), ...)) had to be evaluated against every file, so more files = dramatically more time. The optimized version avoids this by using AlwaysTrue(), making the scan time grow linearly with data size rather than exponentially with file count. This file increase could be mitigated with table maintenance (compaction), which is not yet implemented in PyIceberg. Where the Time Went (Run 2: 2M rows, 32 batches) Step Original Optimized Filter creation 7.9s 0.07s (114x faster) Table scan 382.2s 1.4s (273x faster) Batch processing 212.0s 24.5s Insert filtering (included above) 0.2s The coarse filter approach shows the biggest improvement: Original filter complexity: Or(And(...), And(...), ...) with millions of nodes Optimized filter: AlwaysTrue() or simple And(In(), In()) Incremental Adoption If the anti-join change is concerning due to memory implications, the coarse match filter optimization can be contributed separately as it provides the majority of the performance benefit and doesn't change the memory characteristics. Suggested PR split: Coarse match filter for initial scan (biggest win, minimal risk) Vectorized row comparison in get_rows_to_update() Anti-join for insert filtering Future Considerations Why Rust bindings weren't explored for this PR: In ticket #2159, a suggestion was made to side-step performance issues by using the Python binding of the rust implementation. However, we would like to stick with a Python-centric implementation, because our use case requires mocking datetime using time-machine for: Creating historical backfills with accurate snapshot timestamps Deterministically rerunning failed pipeline runs with the same timestamps This is why I kept the implementation in pure Python rather than exploring Rust bindings. Potential hybrid approach: The data processing (filtering, joins, comparisons) is where most of the time is spent and could benefit from Rust bindings. However - and I'll be selfish here - snapshot creation and metadata operations should remain in Python to preserve the ability to mock time. Without this, our backfill and replay workflows would break. A future optimization could: Move scan filtering and row comparison to Rust for performance Keep snapshot/commit operations in Python for datetime mocking flexibility I'd happily trade some performance for keeping our time-mocking capability intact. Testing Added comprehensive tests for: create_coarse_match_filter() behavior across different dataset sizes and types Threshold boundary conditions (< 10k, = 10k, > 10k unique keys) Density calculations for range filter decisions _compare_columns_vectorized() with primitives, nulls, structs, nested structs, and lists Edge cases: empty datasets, single values, negative numbers, composite keys Breaking Changes None. The API remains unchanged; this is purely an internal optimization. Files Changed pyiceberg/table/__init__.py - upsert method optimizations pyiceberg/table/upsert_util.py - new create_coarse_match_filter(), vectorized comparison functions tests/table/test_upsert.py - new tests for optimization functions Note: This code was co-written with the help of an AI agent (Claude), primarily to speed up exploration and understanding of the PyIceberg codebase. All the speed up ideas are mine. The benchmark results are from our real-world production data that we actively use and store. I have reviewed all the generated code. All related tests pass.">
Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Optimize upsert performance for large datasets #2943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
EnyMan wants to merge 9 commits into apache:main
base: main
Choose a base branch
Loading
from EnyMan:upsert-optimization

Merge branch 'main' into upsert-optimization

ed0f72f
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
Open

Optimize upsert performance for large datasets #2943

Merge branch 'main' into upsert-optimization
ed0f72f
Select commit
Loading
Failed to load commit list.

AltStyle によって変換されたページ (->オリジナル) /