Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

stackql-labs/stackql-agentflow

Repository files navigation

stackql-agentflow

not (yet another) python glue framework

A Rust-native multi-agent orchestration framework - pipelines in YAML,
agents powered by Claude, state machines, parallel dispatch, and built-in observability.


A lightweight, opinionated multi-agent orchestration framework written in Rust. Pipelines are defined in YAML. Agents are powered by Claude. State machines are explicit and type-safe. Observability is built in.

Built by StackQL Studios as the engine behind internal AI automation tooling. Designed to be embeddable as a library in any Rust application that needs structured, auditable, multi-agent workflows.


Table of Contents


Goals

Correctness over convenience. State transitions are explicit Rust types. An invalid transition is a compile-time or runtime error, not a silent bug buried in a callback chain.

Pipelines as config, behaviour as code. The shape of a pipeline - which agents run, in what order, with what retry policy, dispatching to what downstream agents - lives in a YAML file. The things that make your pipeline unique (custom tools, external integrations) live in Rust code that you own.

Built-in observability. Every pipeline run emits structured events to a broadcast hub. A local web UI with a live agent graph starts automatically. Log sinks (file, console, webhook) are first-class citizens, not an afterthought.

No spaghetti. The Rust ecosystem does not need another Python wrapper around LLM APIs. stackql-agentflow is a Rust-native library with no dependency on LangChain, LangGraph, or any other orchestration framework.

Parallel by default where it makes sense. Fan-out dispatch (one upstream result spawning N parallel downstream agents) and broadcast dispatch (one result sent to N different agents in parallel) are built into the dispatcher using tokio::task::JoinSet.

Auditable. Every agent invocation, state transition, tool call, QA feedback loop, and retry is recorded as a structured PipelineEvent. Events are serialisable to JSONL for long-term audit retention.


What it is not

  • A Python library
  • A general-purpose LLM API client
  • A replacement for task queues or workflow engines like Temporal for long-running jobs
  • Production-ready v1 software - this is an active early-stage project

Architecture overview

Core modules

graph TD
 subgraph stackql-agentflow
 CONFIG["config\nPipeline, Agent, Tool, State\ndeserialized from YAML"]
 STATE["state\nStateMachine, transitions\nvalidated at runtime"]
 AGENTS["agents\nAgent trait, ClaudeAgentRunner\nAgentContext, AgentOutput"]
 ORCH["orchestrator\nDispatcher fan-out/broadcast\nAggregator all-pass gate"]
 CLAUDE["claude\nClaudeClient HTTP\nMessages"]
 TOOLS["tools\nTool trait\nbuilt-in: FilesystemTool"]
 RETRY["retry\nRetryGovernor\nper-agent attempt tracking"]
 OBS["observability\nEventHub broadcast\nLogSink trait + built-ins\naxum web server + UI"]
 end
 CONFIG --> STATE
 CONFIG --> AGENTS
 CONFIG --> ORCH
 AGENTS --> CLAUDE
 AGENTS --> TOOLS
 ORCH --> AGENTS
 ORCH --> RETRY
 ORCH --> STATE
 AGENTS --> OBS
 ORCH --> OBS
 STATE --> OBS
Loading

Pipeline lifecycle

stateDiagram-v2
 [*] --> Initialised : from_yaml
 Initialised --> Running : first agent starts
 Running --> WorkDispatched : fan out work items
 WorkDispatched --> InProgress : first item starts
 InProgress --> QAInProgress : all items generated
 QAInProgress --> QAPassed : all items pass all gates
 QAInProgress --> QAInProgress : feedback loop retry
 QAPassed --> Aggregating : aggregator runs
 Aggregating --> Complete : all gates passed
 QAInProgress --> Failed : max retries exceeded
 Aggregating --> Failed : any item aborted
 Complete --> [*]
 Failed --> [*]
Loading

Dispatch strategies

The dispatcher supports three strategies declared in the pipeline YAML.

graph LR
 subgraph parallel_fan_out
 direction TB
 OA[orchestrator_agent]
 W1[worker_agent\nitem_1]
 W2[worker_agent\nitem_2]
 W3[worker_agent\nitem_3]
 OA -->|item 1| W1
 OA -->|item 2| W2
 OA -->|item 3| W3
 end
Loading
graph LR
 subgraph parallel_broadcast
 direction TB
 WA[worker_agent]
 QA1[qa_agent_a]
 QA2[qa_agent_b]
 QA3[qa_agent_c]
 WA --> QA1
 WA --> QA2
 WA --> QA3
 end
Loading
graph LR
 subgraph sequential
 direction LR
 W[producer_agent] --> R[consumer_agent]
 end
Loading

QA feedback loop

QA agents return structured QAIssue objects. The retry governor tracks attempts per agent per work item. If issues are blocking and attempts remain, the dispatcher injects feedback into the next AgentContext and re-dispatches the upstream agent.

sequenceDiagram
 participant D as Dispatcher
 participant W as worker_agent
 participant QA as qa_agent
 participant G as RetryGovernor
 D->>W: run(ctx, attempt=1)
 W-->>D: AgentOutput { passed: true }
 D->>QA: run(ctx, attempt=1)
 QA-->>D: AgentOutput { passed: false, issues: [...] }
 D->>G: record_attempt()
 G-->>D: can_retry = true
 D->>W: run(ctx with feedback, attempt=2)
 W-->>D: AgentOutput { passed: true }
 D->>QA: run(ctx, attempt=2)
 QA-->>D: AgentOutput { passed: true }
 D->>G: record_attempt()
 Note over D,G: item complete
Loading

Observability pipeline

graph LR
 subgraph Runtime
 AGENTS[Agents]
 ORCH[Orchestrator]
 STATE[State Machine]
 end
 subgraph EventHub
 BC[broadcast::channel]
 end
 subgraph Sinks
 CON[ConsoleSink\nstdout JSON]
 FILE[FileSink\nJSONL audit log]
 HOOK[WebhookSink\nHTTP POST]
 CUSTOM[YourSink\nimpl LogSink]
 end
 subgraph UI
 WS[WebSocket /ws]
 GRAPH[live agent graph\ncytoscape.js]
 LOG[event log\nitem filter]
 end
 AGENTS -->|emit| BC
 ORCH -->|emit| BC
 STATE -->|emit| BC
 BC --> CON
 BC --> FILE
 BC --> HOOK
 BC --> CUSTOM
 BC --> WS
 WS --> GRAPH
 WS --> LOG
Loading

The YAML DSL

A complete pipeline is defined in a single YAML file. Here is the minimal shape:

name: my-pipeline
version: "0.1.0"
defaults:
 model: claude-sonnet-4-6
 max_tokens: 1024
 retry:
 max_attempts: 3
 backoff_ms: 1000
tools:
 - id: filesystem
 type: builtin
 - id: my_tool
 type: plugin
 plugin: "my_crate::tools::MyTool"
 config:
 api_key_env: MY_API_KEY
state_machine:
 initial: initialised
 terminal:
 - complete
 - failed
 states:
 - id: initialised
 - id: running
 - id: complete
 - id: failed
agents:
 - id: producer_agent
 prompt: prompts/producer_agent.md
 tools:
 - filesystem
 transitions:
 on_start: running
 on_complete: running
 dispatch:
 strategy: sequential
 target: reviewer_agent
 - id: reviewer_agent
 prompt: prompts/reviewer_agent.md
 tools: []
 retry:
 max_attempts: 2
 on_fail:
 action: feedback_and_retry
 target: producer_agent
 feedback_path: "$.issues"
 transitions:
 on_pass: complete
 on_fail: running
 on_abort: failed
aggregation:
 strategy: all_pass
 gates:
 - reviewer_agent
 on_complete:
 transition: complete
 on_any_abort:
 transition: failed

Dispatch strategies: sequential, parallel_fan_out, parallel_broadcast

Retry actions: feedback_and_retry (re-dispatch upstream with issues injected), abort

Aggregation strategies: all_pass, any_pass, threshold (with min_pass)

Tool types: builtin (provided by the framework), plugin (implemented by you)


Hello world

The hello-world demo runs a two-agent pipeline: a writer and a reviewer. The writer produces a short explanation of any topic you give it. The reviewer checks quality and either passes it or sends structured feedback back for a rewrite. It exercises the full framework: agent dispatch, the feedback/retry loop, tool calls, state transitions, and the observability UI - with no external dependencies beyond an Anthropic API key.

Prerequisites

  • Rust 1.75+
  • An Anthropic API key

Run the demo

git clone https://github.com/stackql-labs/stackql-agentflow
cd stackql-agentflow
cp .env.example .env
# edit .env and set ANTHROPIC_API_KEY
cargo run -p hello-world
# or pass a topic:
cargo run -p hello-world -- "how Rust's ownership model prevents memory bugs"

The observability server starts automatically on port 4000.

observability UI -> http://localhost:4000
audit log -> hello-world-run.jsonl
ctrl+c to exit

What you will see

The graph initialises with two nodes (writer_agent, reviewer_agent) in a pending state. As the pipeline runs:

  1. writer_agent turns blue (running), produces its output, turns green (passed)
  2. reviewer_agent picks it up - on the first attempt it finds issues, turns red, and the feedback edge pulses orange back to writer_agent
  3. writer_agent reruns with the reviewer's feedback injected into its context
  4. reviewer_agent reviews the revised output and passes - both nodes green
  5. Every state transition, tool call, and retry streams into the event log panel

The full event sequence is also written to hello-world-run.jsonl as JSONL.


Extending the framework

Plugin tools

Implement the Tool trait and register it on the pipeline before calling run().

use agentflow::{tools::traits::Tool, AgentFlowError};
use async_trait::async_trait;
use serde_json::Value;
pub struct MyApiTool {
 api_key: String,
}
#[async_trait]
impl Tool for MyApiTool {
 fn id(&self) -> &str { "my_api_tool" }
 async fn execute(&self, input: Value) -> Result<Value, AgentFlowError> {
 // call your API, return structured JSON
 Ok(serde_json::json!({ "result": "..." }))
 }
}
// register before run:
pipeline.register_tool("my_api_tool", MyApiTool { api_key: "...".into() });

Declare the tool in your pipeline YAML:

tools:
 - id: my_api_tool
 type: plugin

Log sinks

Implement LogSink to forward events to any external system.

use agentflow::{LogSink, observability::event::PipelineEvent, AgentFlowError};
use async_trait::async_trait;
pub struct MyRemoteSink {
 endpoint: String,
}
#[async_trait]
impl LogSink for MyRemoteSink {
 async fn emit(&self, event: &PipelineEvent) -> Result<(), AgentFlowError> {
 // PipelineEvent is fully serde-serialisable - forward wherever you need
 Ok(())
 }
}
// register before run:
pipeline.register_sink(Box::new(MyRemoteSink { endpoint: "...".into() }));

Built-in sinks available out of the box:

Sink Description
ConsoleSink Prints events as JSON to stdout
FileSink Appends events as JSONL to a file
WebhookSink POSTs each event to an HTTP endpoint

Project structure

stackql-agentflow/
- src/
 - lib.rs public API surface
 - agents/
 - traits.rs Agent trait, AgentContext, AgentOutput, QAIssue
 - runner.rs ClaudeAgentRunner - default YAML-driven agent impl
 - context.rs AgentContext builder helpers
 - claude/
 - client.rs ClaudeClient - reqwest HTTP to Anthropic API
 - message.rs Message, Role types
 - config/
 - pipeline.rs PipelineConfig, Pipeline runtime struct
 - agent.rs AgentConfig, DispatchConfig, RetryConfig
 - tool.rs ToolConfig
 - state.rs StateMachineConfig
 - error/
 - mod.rs AgentFlowError enum
 - observability/
 - event.rs PipelineEvent, EventPayload enum
 - hub.rs EventHub - broadcast channel + sink fan-out
 - sink.rs LogSink trait, ConsoleSink, FileSink, WebhookSink
 - server.rs axum server - UI, /ws WebSocket, /api/config
 - orchestrator/
 - dispatcher.rs fan-out, broadcast, sequential + retry logic
 - aggregator.rs all-pass gate evaluation
 - retry/
 - governor.rs RetryGovernor - attempt tracking + backoff
 - state/
 - machine.rs StateMachine runtime
 - transitions.rs transition helpers
 - tools/
 - traits.rs Tool trait
 - filesystem.rs built-in FilesystemTool
- static/
 - index.html self-contained observability UI
- hello-world/ minimal demo crate

Roadmap

  • Pipeline::run() - wire dispatcher, state machine, and aggregator into the full execution loop
  • hello-world - replace simulation with a real Claude-powered run
  • Streaming responses - stream Claude output token-by-token to the UI
  • Persistence - optional SQLite backend for run history and replay
  • Multi-run dashboard - track multiple pipeline runs in the UI simultaneously
  • Crates.io publish - once the API stabilises

About

Rust-native multi-agent pipelines powered by Claude.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

Contributors

AltStyle によって変換されたページ (->オリジナル) /