Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

RyanJHamby/FlowState

Repository files navigation

FlowState
Rust-accelerated temporal alignment engine for quantitative finance

CI License Python


Problem

Every quantitative trading firm builds the same internal infrastructure: join heterogeneous market data streams — trades, quotes, bars, signals — into point-in-time correct feature matrices for model training and backtesting. The requirements are always the same:

  • No look-ahead bias. A trade at time T must only see quotes at time <= T. Violating this invalidates every backtest downstream.
  • Nanosecond precision. Microsecond timestamps lose ordering information in high-frequency data. Timestamps are int64 nanoseconds, not floats.
  • Hundreds of symbols, billions of rows. pandas falls over at 10M rows. Polars handles it but treats as-of joins as one operation among hundreds — not the primary design target.
  • Streaming and batch. Research needs batch replay over historical data. Production needs incremental alignment on live feeds with watermark semantics.
  • GPU-ready tensors. The output goes into PyTorch or JAX. Every CPU copy between alignment and the GPU is wasted latency.

FlowState solves this pipeline end-to-end: partitioned storage with three-level pruning, Rust-accelerated temporal joins, streaming watermark alignment, and GPU-direct data feeding — all connected through Apache Arrow zero-copy.

Architecture

×ばつ) │ │ kvikio GDS │ │ NVMe LRU cache │ │ Watermark stream │ │ CUDA streams │ │ S3/GCS/Azure │ │ Batch coalescer │ │ PyTorch/JAX │ └─────────────────┘ └──────────────────┘ └──────────────────┘">
 ┌──────────────────────────────────┐
 │ Python API │
 │ TemporalAligner · StreamingAligner│
 │ FeatureStore · ReplayEngine │
 └──────────────┬───────────────────┘
 │ Arrow PyCapsule Interface
 │ (zero-copy, no serialization)
 ┌──────────────▼───────────────────┐
 │ Rust Core (PyO3) │
 │ O(n+m) merge-scan · Rayon parallel│
 │ Streaming joins · Arrow IPC I/O │
 │ 6,400 lines · 132 tests │
 └──────────────┬───────────────────┘
 │
 ┌────────────────────────┼────────────────────────┐
 ▼ ▼ ▼
 ┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
 │ Storage │ │ Alignment │ │ Data Feeding │
 │ Hive partitions │ │ As-of join engine │ │ Pinned memory │
 │ xxhash bucketing │ │ Multi-stream (×ばつ) │ │ kvikio GDS │
 │ NVMe LRU cache │ │ Watermark stream │ │ CUDA streams │
 │ S3/GCS/Azure │ │ Batch coalescer │ │ PyTorch/JAX │
 └─────────────────┘ └──────────────────┘ └──────────────────┘

Performance

Benchmarked on 1M left ×ばつ 500K right rows, 1,000 symbols, Apple M-series. Measured against Polars 1.x, the fastest general-purpose option.

Operation FlowState Polars Speedup
Grouped as-of join (1M rows, 1K symbols) 10 ms 18 ms 1.8x
Ungrouped as-of join (1M rows) 4 ms 7 ms 1.8x
Multi-stream alignment (4 streams) 42 ms 42 ms Parity
Multi-stream alignment (8 streams) 85 ms 100 ms 1.2x
Streaming incremental join Sub-microsecond emit N/A
SPSC ring buffer throughput 82M elem/s N/A

FlowState is faster because it solves a narrower problem. Polars handles arbitrary DataFrame operations; FlowState handles exactly one thing — temporal joins on sorted timestamp data — and exploits every invariant that implies: pre-sorted merge scans, symbol-partitioned parallelism, tolerance early termination, and pre-partitioned storage that eliminates runtime hash table construction.

Capability comparison

Capability FlowState Polars pandas kdb+/q DuckDB
Streaming incremental joins Watermark + late policy No No wj (windowed) No
GPU data feeding kvikio GDS + CUDA streams No No No No
Multi-stream single pass Rayon parallel N-way Sequential N/A Manual N/A
Lock-free pipeline SPSC ring → join → coalesce No No IPC No
Point-in-time default Backward (no look-ahead) Backward Forward-fill aj (backward) Backward
Pinned memory pool cudaMallocHost + CPU fallback No No No No
Arrow zero-copy PyCapsule Interface Yes No (copies) No Yes
ML DataLoader PyTorch + JAX adapters No No No No

Usage

Batch alignment: trades with quotes

from flowstate.prism.alignment import TemporalAligner
aligner = TemporalAligner(
 primary_type="trade",
 secondary_specs={"quote": ["bid_price", "ask_price"]},
 tolerance_ns=5_000_000_000, # 5 second max staleness
)
aligner.add_data("trade", trade_table) # pa.Table, int64 ns timestamps
aligner.add_data("quote", quote_table)
aligned, stats = aligner.flush()
# Every row is point-in-time correct — quote timestamp <= trade timestamp

Streaming alignment with watermarks

from flowstate.prism.streaming import StreamingAligner, StreamingAlignConfig
aligner = StreamingAligner(StreamingAlignConfig(
 group_col="symbol",
 tolerance_ns=5_000_000_000,
 lateness_ns=1_000_000_000, # 1s late data tolerance
))
for batch in live_feed:
 aligner.push_left(trade_batch)
 aligner.push_right(quote_batch)
 aligner.advance_watermark(current_event_time_ns)
 result = aligner.emit() # rows sealed by watermark
 if result is not None:
 model.predict(result)
final = aligner.flush() # end-of-stream

Rust kernel directly

import flowstate_core
# Grouped as-of join — dispatches to Rayon parallel merge-scan
result = flowstate_core.asof_join(
 trades, quotes, on="timestamp", by="symbol",
 direction="backward", tolerance_ns=5_000_000_000,
)
# Streaming join with watermark semantics
join = flowstate_core.StreamingJoin(
 on="timestamp", by="symbol", direction="backward",
 tolerance_ns=5_000_000_000, lateness_ns=1_000_000_000,
)
join.push_left(trade_batch)
join.push_right(quote_batch)
join.advance_watermark(current_time_ns)
result = join.emit()

Temporal feature store

from flowstate.store import (
 FeatureCatalog, FeatureDefinition, FeatureMaterializer, FeatureServer,
)
catalog = FeatureCatalog("/data/features/catalog.json")
catalog.register(FeatureDefinition(
 name="trade_with_quote",
 primary_stream="trade",
 secondary_stream="quote",
 columns=["bid_price", "ask_price"],
 tolerance_ns=5_000_000_000,
))
materializer = FeatureMaterializer(catalog=catalog, output_dir="/data/features/mat")
materializer.add_stream("trade", trade_table)
materializer.add_stream("quote", quote_table)
materializer.materialize_all()
server = FeatureServer(catalog=catalog, data_dir="/data/features/mat")
table = server.get_feature("trade_with_quote", symbols=["AAPL"])

GPU data feeding

from flowstate.prism.gpu_direct import GPUDirectReader, GPUDirectConfig
reader = GPUDirectReader(GPUDirectConfig(
 device_id=0,
 num_streams=2, # async H2D overlap
 gds_task_size=4*1024*1024,
))
# NVMe → PCIe DMA → GPU VRAM (zero CPU copies via kvikio GDS)
gpu_array = reader.read_binary_to_gpu("/data/prices.bin", dtype=np.float32)
# Arrow IPC I/O with column projection and temporal range filtering
table = flowstate_core.read_ipc("aligned.arrow", projection=[0, 1, 3])
table = flowstate_core.read_ipc_time_range("aligned.arrow", on="timestamp", min_ts=t0, max_ts=t1)

Project Structure

FlowState/
├── flowstate-core/ # Rust crate — 6,400 lines, 132 tests
│ └── src/
│ ├── lib.rs # PyO3 bindings: joins, streaming, IPC
│ ├── asof/
│ │ ├── scan.rs # O(n+m) merge-scan kernels (backward/forward/nearest)
│ │ ├── parallel_scan.rs # Chunked parallel scan, binary-search cursor starts
│ │ ├── join.rs # Orchestration: sort-detect, ahash grouping, Rayon dispatch
│ │ ├── gather.rs # Parallel column gather via Arrow take()
│ │ ├── multi.rs # Multi-stream parallel alignment
│ │ ├── streaming.rs # Watermark-based streaming join (900 lines)
│ │ └── config.rs # Direction enum, config struct
│ ├── ipc.rs # Arrow IPC read/write/scan, projection, time-range filter
│ ├── spsc.rs # Lock-free SPSC ring buffer, AtomicU64, cache-line padded
│ ├── pipeline.rs # Streaming pipeline: SPSC → join → coalesce → output
│ ├── coalesce.rs # Adaptive batch coalescer, target-row flushing
│ ├── hdr.rs # HDR histogram, log-linear bucketing, CAS min/max
│ ├── bloom.rs # Bloom filter, double-hashing, auto-tuned FPR
│ ├── pool.rs # Slab buffer pool, auto-return, zero-on-drop
│ └── pinned.rs # CUDA pinned memory allocator, page-aligned fallback
│
├── src/flowstate/ # Python package — 7,800 lines
│ ├── prism/ # Query, alignment, data feeding
│ │ ├── alignment.py # TemporalAligner, AlignmentSpec, Rust/Python dual backend
│ │ ├── streaming.py # StreamingAligner with watermark emission
│ │ ├── replay.py # Replay engine with 3-level partition pruning
│ │ ├── gpu_direct.py # kvikio GDS reads, CUDA stream H2D transfers
│ │ ├── pinned_buffer.py # CUDA pinned memory pool with CPU fallback
│ │ ├── prefetcher.py # Double-buffered async prefetch pipeline
│ │ ├── dataloader.py # PyTorch IterableDataset, JAX iterator
│ │ ├── distributed.py # Multi-rank replay with NCCL barrier sync
│ │ └── shard.py # File-level sharding strategies
│ ├── store/ # Temporal feature store
│ │ ├── catalog.py # Versioned feature definitions, dependency DAG
│ │ ├── materializer.py # Alignment-based materialization to Arrow IPC
│ │ └── server.py # Feature serving with symbol filtering
│ ├── storage/ # Partitioned storage, caching, cloud
│ │ ├── partitioning.py # Hive partitioning with xxhash bucketing
│ │ ├── writer.py # Partitioned Parquet writer (zstd)
│ │ ├── cache.py # NVMe LRU cache tier
│ │ └── object_store.py # fsspec backends (S3, GCS, Azure)
│ ├── schema/ # Arrow schemas, validation, normalization
│ └── features/ # Microstructure feature library
│
├── orderbook/ # C++ limit order book — header-only, 25 Catch2 tests
│ └── include/orderbook/
│ ├── types.h # Integer-tick prices, order/fill structs
│ ├── price_level.h # FIFO queue per price (std::deque, not std::list)
│ └── order_book.h # Array-indexed levels, O(1) BBO, FIFO matching
│
├── tests/ # 636 Python tests — 8,100 lines
├── benchmarks/ # Full-stack benchmark suite
├── .github/workflows/ci.yml # CI: Python 3.11–3.13, Rust, C++, Criterion, integration
└── DESIGN.md # System architecture and design decisions

Testing

python -m pytest tests/ -v # 636 Python tests
cd flowstate-core && cargo test --no-default-features # 132 Rust tests (121 unit + 11 proptest)
cargo bench --no-default-features # Criterion benchmarks
python benchmarks/bench_full_suite.py # Full-stack Python benchmarks

Test coverage includes:

  • Correctness: 11 proptest property-based tests verify Rust kernels against reference implementations across random inputs
  • Integration: 14 end-to-end tests validate the full pipeline (replay → align → materialize → serve)
  • Point-in-time: Dedicated tests verify no look-ahead bias in backward joins and correct look-ahead in forward joins
  • Streaming parity: Tests verify streaming alignment produces identical results to batch alignment

Quick Start

git clone https://github.com/RyanJHamby/flowstate.git && cd flowstate
pip install -e ".[dev]"
# Build the Rust core (requires Rust toolchain + maturin)
cd flowstate-core && maturin develop --release && cd ..
# Optional: GPU support (kvikio + cupy)
pip install -e ".[gpu]"
# Verify
python -m pytest tests/ -v

The Rust kernel is a transparent accelerator. If flowstate_core is not installed, all operations fall back to a pure Python implementation using NumPy and bisect — same API, same correctness guarantees, lower throughput.

License

Apache License 2.0 — see LICENSE for details.

About

High-throughput, Rust-accelerated temporal join engine for quantitative finance ML pipelines. Engineered via PyO3 to eliminate look-ahead bias and accelerate point-in-time feature generation on massive time-series market datasets

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

Contributors

AltStyle によって変換されたページ (->オリジナル) /