Pipelines work when the steps are ordered and each step refines the previous output. Research then summarize then translate. Draft then review then polish.
from dataclasses import dataclass
from agent_decision_log import DecisionLog
from token_budget_py import BudgetPool
log = DecisionLog(path="pipeline_decisions.jsonl")
@dataclass
class PipelineResult:
stage: str
output: str
tokens_used: int
async def run_pipeline(raw_input: str, budget_usd: float = 0.10) -> list[PipelineResult]:
pool = BudgetPool(usd=budget_usd)
results = []
# Stage 1: research
async with pool.acquire(max_usd=0.04) as stage_budget:
research_out = await research_agent.run(
raw_input,
budget=stage_budget,
)
results.append(PipelineResult("research", research_out.text, research_out.tokens))
log.record(decision="stage_complete", stage="research", tokens=research_out.tokens)
# Stage 2: draft (uses research output)
async with pool.acquire(max_usd=0.04) as stage_budget:
draft_out = await draft_agent.run(
f"Research notes:\n{research_out.text}\n\nWrite a report.",
budget=stage_budget,
)
results.append(PipelineResult("draft", draft_out.text, draft_out.tokens))
log.record(decision="stage_complete", stage="draft", tokens=draft_out.tokens)
# Stage 3: review
async with pool.acquire(max_usd=0.02) as stage_budget:
review_out = await review_agent.run(
draft_out.text,
budget=stage_budget,
)
results.append(PipelineResult("review", review_out.text, review_out.tokens))
return results
The budget pool (token-budget-py) partitions spend across stages. If research burns through its allocation, the draft stage still has budget. If the total pool is exhausted, the pipeline halts cleanly rather than silently overspending.
Pattern 3: Broadcast
Same input sent to N agents. Aggregate results.
Use broadcast when you want multiple independent answers and then synthesize. Useful for research (multiple search strategies), fact checking (multiple verifiers), or generating variations.
import asyncio
from agent_event_bus import EventBus
from agenttrace import Tracer
bus = EventBus()
tracer = Tracer(export_path="traces/broadcast.jsonl")
async def broadcast_and_aggregate(query: str) -> dict:
with tracer.span("broadcast") as span:
# Fire all agents in parallel
tasks = [
research_agent_a.run(query),
research_agent_b.run(query),
research_agent_c.run(query),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions, log them
valid = []
for i, r in enumerate(results):
if isinstance(r, Exception):
span.add_event(f"agent_{i}_failed", error=str(r))
else:
valid.append(r)
if not valid:
raise RuntimeError("All broadcast agents failed")
# Synthesize: pass all valid results to an aggregator model
combined = "\n\n---\n\n".join(r.text for r in valid)
synthesis = await aggregator_agent.run(
f"Synthesize these research results:\n\n{combined}"
)
span.set_attribute("agents_succeeded", len(valid))
return {"synthesis": synthesis.text, "source_count": len(valid)}
Broadcast is expensive. N agents means N model calls in parallel. Use it when the quality gain is worth the cost. For most tasks, one good agent with good tools is cheaper and sufficient.
Pattern 4: Supervisor
One agent reviews another's output and requests revisions.
The supervisor pattern adds a quality gate. The supervisor model reads the worker's output and either approves it or returns a critique. The worker revises. This loops until approved or until a max-iteration limit fires.
from agent_deadline import Deadline
from agent_decision_log import DecisionLog
log = DecisionLog(path="supervisor_decisions.jsonl")
async def supervised_run(task: str, max_revisions: int = 3) -> str:
deadline = Deadline(seconds=120) # hard wall, not just iteration count
draft = await worker_agent.run(task)
iteration = 0
while not deadline.expired() and iteration < max_revisions:
review = await supervisor_agent.run(
f"Task: {task}\n\nSubmission:\n{draft.text}\n\n"
"Approve or return a numbered critique."
)
log.record(
decision="supervisor_review",
iteration=iteration,
approved="APPROVED" in review.text,
critique_length=len(review.text),
)
if "APPROVED" in review.text:
break
# Worker revises based on critique
draft = await worker_agent.run(
f"Original task: {task}\n\nYour previous attempt:\n{draft.text}\n\n"
f"Supervisor critique:\n{review.text}\n\n"
"Revise and improve."
)
iteration += 1
if deadline.expired():
log.record(decision="supervisor_timeout", iterations_completed=iteration)
return draft.text
The deadline is a hard cap. Without it, a demanding supervisor and a stubborn worker loop until your budget runs out. Set the deadline shorter than you think you need.
What Does NOT Work at Multi-Agent Scale
Shared context window. Agents do not share memory by default. If agent A discovers a fact, agent B does not know it unless you pass it explicitly. Context passing is your responsibility, not the framework's.
Budget partitioning across agents. If you give each agent its own budget, you cannot guarantee the total spend. Use a shared pool (token-budget-py BudgetPool) and let agents draw from it. If the pool is empty, agents block or fail fast.
Assuming agents finish in order. Parallel agents complete in arbitrary order. Your aggregation step needs to handle partial results and late arrivals. Use asyncio.gather with return_exceptions=True, not bare awaits.
Quick-Start Snippet
pip install agent-event-bus token-budget-py agent-decision-log agenttrace
from agent_event_bus import EventBus
from token_budget_py import BudgetPool
from agent_decision_log import DecisionLog
bus = EventBus()
pool = BudgetPool(usd=0.25)
log = DecisionLog(path="multi_agent.jsonl")
# Register handlers
@bus.on("task:summarize")
async def on_summarize(event):
async with pool.acquire(max_usd=0.05) as budget:
result = await summarizer.run(event["text"], budget=budget)
log.record(decision="task_complete", task="summarize", tokens=result.tokens)
return result.text
# Dispatch
result = await bus.emit("task:summarize", text="Your input here")
Related Libraries
| Library |
What It Does |
| agent-event-bus |
In-process pub/sub for routing events between agents |
| token-budget-py |
Shared USD/token budget pool for multi-agent spend control |
| agent-deadline |
Cooperative per-task deadline enforcement |
| agent-decision-log |
Structured JSONL log of agent routing and review decisions |
| agentsnap |
Snapshot traces for regression testing multi-agent flows |
| agenttrace |
Cost and latency tracing across agent spans |
What's Next
Multi-agent systems fail differently than single-agent systems. When something goes wrong, you need to know which agent failed, what it was given, and what it returned. That is what agenttrace and agent-decision-log are for: structured, queryable records of every routing and review decision in the run.
The next gap after observability is error recovery. What happens when the research agent times out halfway through a pipeline? Post 113 covers the three recovery patterns that matter: retry, fallback, and graceful degrade.