Copied to Clipboard
, '.join(tools_available)}
You MUST respond with ONLY valid JSON (no markdown, no preamble):
{{
"thought": "brief reasoning about next step",
"action": "tool_name or 'finish'",
"parameters": {{}},
"confidence": 0.0-1.0,
"justification": "why this action progresses toward goal"
}}
"""
message = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
messages=[{"role": "user", "content": prompt}]
)
response_text = message.content[0].text
# Robust JSON parsing with fallback
try:
action = json.loads(response_text)
except json.JSONDecodeError:
# Try to extract JSON from markdown code blocks
match = re.search(r'```
(?:json)?\s*(.*?)\s*
```', response_text, re.DOTALL)
if match:
action = json.loads(match.group(1))
else:
raise ValueError(f"Failed to parse LLM response: {response_text}")
# Validate schema
self._validate_action_schema(action, tools_available)
return action
def _validate_action_schema(self, action: Dict, available_tools: List[str]):
required_keys = {"thought", "action", "parameters"}
if not required_keys.issubset(action.keys()):
raise ValueError(f"Missing required keys. Got: {action.keys()}")
if action["action"] not in available_tools and action["action"] != "finish":
raise ValueError(f"Unknown tool: {action['action']}")
if not isinstance(action.get("parameters"), dict):
raise ValueError("parameters must be a dict")
2. Execution Engine (Tool/Action Runner)
Responsibilities:
- Execute planned actions safely
- Manage resource limits (timeout, memory)
- Capture output and errors
- Provide structured execution feedback
Implementation:
class ExecutionEngine:
def __init__(self, tools: Dict[str, Callable], timeout: int = 30):
self.tools = tools
self.timeout = timeout
self.execution_log = []
def execute(self, action: Dict) -> ExecutionResult:
"""
Safely execute a planned action with timeout protection
"""
tool_name = action["action"]
parameters = action.get("parameters", {})
if tool_name == "finish":
return ExecutionResult(
status="success",
output={"message": "Loop completed successfully"},
tool=tool_name,
duration=0
)
if tool_name not in self.tools:
return ExecutionResult(
status="error",
error=f"Unknown tool: {tool_name}",
tool=tool_name,
duration=0
)
tool = self.tools[tool_name]
start_time = time.time()
try:
# Execute with timeout using signal or ThreadPoolExecutor
result = self._execute_with_timeout(tool, parameters)
return ExecutionResult(
status="success",
output=result,
tool=tool_name,
duration=time.time() - start_time,
parameters=parameters
)
except TimeoutError:
return ExecutionResult(
status="timeout",
error=f"Tool '{tool_name}' exceeded {self.timeout}s timeout",
tool=tool_name,
duration=time.time() - start_time
)
except Exception as e:
return ExecutionResult(
status="error",
error=str(e),
error_type=type(e).__name__,
tool=tool_name,
duration=time.time() - start_time
)
def _execute_with_timeout(self, func: Callable, params: Dict, timeout: int = None):
"""Execute function with timeout protection"""
timeout = timeout or self.timeout
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeout
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, **params)
try:
return future.result(timeout=timeout)
except FutureTimeout:
raise TimeoutError(f"Execution exceeded {timeout}s")
3. Verification Engine (Goal & State Validation)
Responsibilities:
- Check if goal is achieved
- Validate state consistency
- Detect loops/infinite recursion
- Assess progress toward objective
Implementation:
class VerificationEngine:
def __init__(self, goal: str):
self.goal = goal
self.goal_verifier = None # Domain-specific implementation
def verify_goal_met(self, state: Dict, last_result: ExecutionResult) -> VerificationResult:
"""
Determine if the loop goal has been achieved
Returns:
VerificationResult with:
- goal_met: bool
- confidence: float [0-1]
- reason: str
- suggested_action: str (retry, continue, abort)
"""
# Check for explicit success markers
if last_result.status == "success":
if self._check_goal_success_criteria(state, last_result):
return VerificationResult(
goal_met=True,
confidence=0.95,
reason="Goal success criteria met",
suggested_action="exit"
)
# Check for fatal errors
if last_result.status == "error":
if self._is_fatal_error(last_result):
return VerificationResult(
goal_met=False,
confidence=1.0,
reason=f"Fatal error: {last_result.error}",
suggested_action="abort"
)
# Check for infinite loops (same action repeating)
if self._detect_infinite_loop(state):
return VerificationResult(
goal_met=False,
confidence=0.9,
reason="Infinite loop detected",
suggested_action="abort"
)
# Progress check - are we making forward progress?
if self._detect_no_progress(state):
return VerificationResult(
goal_met=False,
confidence=0.7,
reason="No progress for N iterations",
suggested_action="retry_different_approach"
)
# Default: continue looping
return VerificationResult(
goal_met=False,
confidence=0.5,
reason="Goal not yet achieved, continuing",
suggested_action="continue"
)
def _check_goal_success_criteria(self, state: Dict, result: ExecutionResult) -> bool:
"""
Domain-specific implementation
Examples:
- Check if output matches expected format
- Verify file exists with correct content
- Validate test suite passes
"""
# This is implemented per-domain
if self.goal_verifier:
return self.goal_verifier(state, result)
return False
def _detect_infinite_loop(self, state: Dict) -> bool:
"""Detect if same action repeats N times"""
history = state.get("history", [])
if len(history) < 3:
return False
# Check last 3 actions
last_three = [h["action"]["action"] for h in history[-3:]]
return last_three[0] == last_three[1] == last_three[2]
def _detect_no_progress(self, state: Dict) -> bool:
"""Check if state has advanced in last N iterations"""
history = state.get("history", [])
if len(history) < 5:
return False
# Measure state deltas
recent_deltas = self._compute_state_deltas(history[-5:])
return all(delta < 0.1 for delta in recent_deltas) # All changes < 10%
def _compute_state_deltas(self, history: List) -> List[float]:
"""Compute change magnitude for each iteration"""
deltas = []
for i in range(1, len(history)):
prev_state = history[i-1].get("result", {})
curr_state = history[i].get("result", {})
delta = self._state_diff_magnitude(prev_state, curr_state)
deltas.append(delta)
return deltas
def _state_diff_magnitude(self, state1: Dict, state2: Dict) -> float:
"""Compute normalized difference between states (0-1)"""
# Simple heuristic: ratio of different keys to total keys
all_keys = set(state1.keys()) | set(state2.keys())
if not all_keys:
return 0.0
different = sum(1 for k in all_keys if state1.get(k) != state2.get(k))
return different / len(all_keys)
STATE MANAGEMENT
1. State Store Architecture
Loops require external, persistent state because LLMs are stateless. State must survive loop iterations and be accessible to the planning engine.
State Layers:
┌─────────────────────────────────────────────────────┐
│ LAYER 1: WORKING STATE │
│ (In-memory, fast access) │
│ • Current iteration number │
│ • Last execution result │
│ • Short-term flags │
│ │
│ Storage: Redis, in-memory dict │
│ TTL: Duration of loop execution │
└─────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ LAYER 2: PERSISTENT STATE │
│ (Database, durable) │
│ • Full execution history │
│ • Artifacts (generated files) │
│ • Metrics & performance data │
│ │
│ Storage: PostgreSQL, S3, Git │
│ TTL: Indefinite │
└─────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────┐
│ LAYER 3: CONTEXT MEMORY │
│ (Long-form reference for LLM) │
│ • Previous iterations summaries │
│ • Relevant code/docs │
│ • Error patterns & solutions │
│ │
│ Storage: Vector DB, File System │
│ TTL: Loop-scoped or permanent │
└─────────────────────────────────────────────────────┘
2. State Schema
@dataclass
class LoopState:
"""Complete state snapshot for a loop execution"""
# Metadata
loop_id: str # UUID
goal: str # Loop objective
created_at: float # Unix timestamp
# Execution State
iterations: int # Current iteration count
max_iterations: int # Termination limit
status: str # "running", "success", "failed", "timeout"
# History
history: List[IterationRecord] # All iteration results
# Artifacts
artifacts: Dict[str, str] # {"file_path": "content", ...}
generated_files: List[str] # Files created during loop
# Metrics
total_tokens_used: int # Sum of all LLM tokens
total_cost: float # USD cost estimate
wall_clock_duration: float # Seconds
# Context
context_window: str # Serialized state for next LLM call
error_log: List[str] # Error messages for recovery
def serialize(self) -> str:
"""Serialize to JSON for storage"""
return json.dumps(asdict(self), default=str)
@classmethod
def deserialize(cls, data: str) -> 'LoopState':
"""Load from JSON"""
return cls(**json.loads(data))
def to_context_string(self) -> str:
"""Serialize state for LLM prompt injection"""
return f"""
EXECUTION STATE:
- Iteration: {self.iterations}/{self.max_iterations}
- Status: {self.status}
- Tokens Used: {self.total_tokens_used}
- Cost: ${self.total_cost:.4f}
- Duration: {self.wall_clock_duration:.1f}s
RECENT HISTORY:
{self._serialize_recent_history()}
ARTIFACTS CREATED:
{json.dumps(self.generated_files, indent=2)}
ERROR LOG:
{json.dumps(self.error_log[-5:], indent=2)} # Last 5 errors
"""
def _serialize_recent_history(self, limit: int = 5) -> str:
recent = self.history[-limit:]
return "\n".join([
f" [{r.iteration}] {r.action['action']} → {r.result['status']}"
for r in recent
])
@dataclass
class IterationRecord:
"""Single iteration record"""
iteration: int
timestamp: float
action: Dict # {"action": "...", "parameters": {...}}
result: ExecutionResult
tokens_used: int
cost: float
duration: float
3. State Persistence
class StateStore:
"""Unified state persistence layer"""
def __init__(self, backend: str = "postgres"):
self.backend = backend
if backend == "postgres":
self.db = PostgresBackend()
elif backend == "redis":
self.db = RedisBackend()
elif backend == "sqlite":
self.db = SQLiteBackend()
def save_state(self, state: LoopState) -> None:
"""Persist state after each iteration"""
self.db.insert("loop_states", {
"loop_id": state.loop_id,
"goal": state.goal,
"iteration": state.iterations,
"status": state.status,
"state_json": state.serialize(),
"updated_at": datetime.utcnow().isoformat(),
"tokens_used": state.total_tokens_used,
"cost": state.total_cost
})
def load_state(self, loop_id: str) -> LoopState:
"""Restore state from storage"""
record = self.db.query_one(
"SELECT state_json FROM loop_states WHERE loop_id = %s ORDER BY updated_at DESC",
(loop_id,)
)
return LoopState.deserialize(record["state_json"])
def save_iteration(self, loop_id: str, record: IterationRecord) -> None:
"""Append iteration record"""
self.db.insert("iteration_records", {
"loop_id": loop_id,
"iteration_num": record.iteration,
"action": json.dumps(record.action),
"result": json.dumps(asdict(record.result)),
"tokens_used": record.tokens_used,
"cost": record.cost,
"duration": record.duration,
"created_at": datetime.fromtimestamp(record.timestamp).isoformat()
})
def get_history(self, loop_id: str, limit: int = 20) -> List[IterationRecord]:
"""Fetch iteration history"""
records = self.db.query(
"""SELECT * FROM iteration_records
WHERE loop_id = %s
ORDER BY iteration_num DESC
LIMIT %s""",
(loop_id, limit)
)
return [IterationRecord(**r) for r in records]
CONTROL FLOW & TERMINATION LOGIC
1. Termination Conditions
A loop must have multiple, explicit exit criteria:
class TerminationController:
"""Manages loop exit conditions"""
def __init__(self,
max_iterations: int = 20,
timeout_seconds: int = 300,
max_cost_usd: float = 10.0):
self.max_iterations = max_iterations
self.timeout = timeout_seconds
self.max_cost = max_cost_usd
self.iteration_timeout = 30 # Per-iteration limit
def should_continue(self, state: LoopState) -> Tuple[bool, str]:
"""
Determine if loop should continue
Returns: (should_continue: bool, reason: str)
"""
# Check 1: Goal achieved
if state.status == "success":
return (False, "Goal achieved")
# Check 2: Max iterations exceeded
if state.iterations >= self.max_iterations:
return (False, f"Max iterations ({self.max_iterations}) reached")
# Check 3: Total timeout exceeded
if state.wall_clock_duration > self.timeout:
return (False, f"Total timeout ({self.timeout}s) exceeded")
# Check 4: Cost limit exceeded
if state.total_cost >= self.max_cost:
return (False, f"Cost limit (${self.max_cost}) exceeded")
# Check 5: Fatal error occurred
if state.status == "fatal_error":
return (False, "Fatal error encountered")
# Check 6: Unrecoverable state
if self._is_unrecoverable(state):
return (False, "Unrecoverable state detected")
# Otherwise, continue
return (True, "Continue looping")
def _is_unrecoverable(self, state: LoopState) -> bool:
"""Check for states that cannot recover"""
# Pattern: Same error repeating N times
if len(state.error_log) >= 3:
recent_errors = state.error_log[-3:]
if all(e == recent_errors[0] for e in recent_errors):
return True
# Pattern: Cycling between two states infinitely
if len(state.history) >= 4:
actions = [h.action["action"] for h in state.history[-4:]]
if actions[0] == actions[2] and actions[1] == actions[3]:
return True
return False
2. Retry & Backoff Strategy
class RetryController:
"""Intelligent retry logic with exponential backoff"""
def __init__(self,
max_retries: int = 3,
backoff_base: float = 2.0,
jitter: bool = True):
self.max_retries = max_retries
self.backoff_base = backoff_base
self.jitter = jitter
self.retry_counts = {} # {action_id: count}
def should_retry(self, state: LoopState, last_result: ExecutionResult) -> bool:
"""Determine if action should be retried"""
# Never retry if goal is achieved
if state.status == "success":
return False
# Only retry on recoverable errors
if last_result.status not in ["timeout", "error"]:
return False
# Check retry limit
action_id = state.history[-1].action.get("action")
retries = self.retry_counts.get(action_id, 0)
if retries >= self.max_retries:
return False
# Don't retry fatal errors
if self._is_fatal_error(last_result):
return False
return True
def get_backoff_delay(self, retry_count: int) -> float:
"""Calculate exponential backoff with optional jitter"""
delay = self.backoff_base ** retry_count
if self.jitter:
delay *= (0.5 + random.random()) # 50-150% of computed delay
return delay
def _is_fatal_error(self, result: ExecutionResult) -> bool:
"""Identify errors that shouldn't be retried"""
fatal_errors = [
"KeyError", "ValueError", "AttributeError", # Code bugs
"PermissionError", "FileNotFoundError", # Resource errors
"SyntaxError", "ImportError" # Config errors
]
return result.error_type in fatal_errors
ERROR HANDLING & RECOVERY
1. Error Classification
class ErrorClassification:
"""Categorize errors for appropriate handling"""
TRANSIENT = {
"timeout": "Exceeded execution time limit",
"rate_limit": "API rate limit exceeded",
"network": "Network connectivity issue",
"memory": "Temporary memory pressure"
}
RECOVERABLE = {
"invalid_tool": "Tool doesn't exist, try different tool",
"malformed_params": "Parameter format incorrect, fix and retry",
"missing_dependency": "Required file/tool missing, create it first"
}
FATAL = {
"permission_denied": "Insufficient permissions, cannot proceed",
"syntax_error": "Code has syntax errors, manual fix required",
"resource_exhausted": "Cannot allocate required resources",
"auth_failed": "Authentication failure, credentials invalid"
}
@classmethod
def classify(cls, error: Exception, context: Dict) -> Tuple[str, str]:
"""
Classify error and suggest recovery action
Returns: (category, suggested_action)
"""
error_name = type(error).__name__
error_msg = str(error).lower()
# Match against patterns
for category, errors in [
("transient", cls.TRANSIENT),
("recoverable", cls.RECOVERABLE),
("fatal", cls.FATAL)
]:
for error_key, description in errors.items():
if error_key in error_msg or error_key in error_name.lower():
return (category, error_key)
# Default classification
if "timeout" in error_msg:
return ("transient", "timeout")
if "error" in error_msg:
return ("recoverable", "unknown_error")
return ("fatal", "unclassified")
class ErrorRecoveryStrategy:
"""Map error classifications to recovery actions"""
RECOVERY_ACTIONS = {
"timeout": {
"action": "retry_with_longer_timeout",
"max_retries": 2,
"delay_seconds": 5
},
"rate_limit": {
"action": "backoff_and_retry",
"max_retries": 3,
"delay_seconds": 30
},
"invalid_tool": {
"action": "replan_with_available_tools",
"suggest_alternatives": True,
"max_retries": 1
},
"missing_dependency": {
"action": "create_dependency_first",
"max_retries": 2
},
"syntax_error": {
"action": "abort_loop",
"log_error": True
}
}
@staticmethod
def handle_error(error: Exception,
error_type: str,
state: LoopState) -> Dict:
"""
Handle error and return recovery strategy
"""
strategy = ErrorRecoveryStrategy.RECOVERY_ACTIONS.get(
error_type,
{"action": "abort_loop"}
)
return {
"should_retry": strategy.get("max_retries", 0) > 0,
"action": strategy["action"],
"delay": strategy.get("delay_seconds", 0),
"retry_limit": strategy.get("max_retries", 0),
"log_context": {
"error": str(error),
"iteration": state.iterations,
"cost_so_far": state.total_cost
}
}
2. Circuit Breaker Pattern
class CircuitBreaker:
"""Prevent cascading failures with circuit breaker pattern"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # "closed", "open", "half_open"
def record_success(self):
"""Mark successful execution"""
self.failure_count = 0
self.state = "closed"
def record_failure(self):
"""Mark failed execution"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise CircuitBreakerOpen(
f"Circuit opened after {self.failure_threshold} failures"
)
def can_execute(self) -> bool:
"""Check if action should be attempted"""
if self.state == "closed":
return True
if self.state == "open":
# Try to transition to half-open if timeout passed
if time.time() - self.last_failure_time > self.timeout:
self.state = "half_open"
return True
return False
if self.state == "half_open":
return True
return False
def on_half_open_success(self):
"""Reset after successful test in half-open state"""
self.failure_count = 0
self.state = "closed"
def on_half_open_failure(self):
"""Return to open state"""
self.state = "open"
self.last_failure_time = time.time()
CONTEXT MANAGEMENT
1. Rolling Context Window
Since LLMs have finite context windows, loops must manage what information flows to the planning engine:
class ContextManager:
"""Manage context injection for LLM planning"""
def __init__(self, max_context_tokens: int = 8000):
self.max_context_tokens = max_context_tokens
self.tokenizer = Tokenizer() # Claude tokenizer
def build_planning_context(self,
state: LoopState,
goal: str,
tools: List[str]) -> str:
"""
Build optimized context for planning LLM call
Strategy:
1. Goal (fixed)
2. Current iteration state (variable)
3. Recent history summaries (compressed)
4. Relevant artifacts (sampled)
5. Error context (if applicable)
"""
sections = []
tokens_used = 0
# SECTION 1: Goal (Always included, high priority)
goal_section = f"OBJECTIVE:\n{goal}\n\n"
sections.append(("goal", goal_section))
tokens_used += self.tokenizer.count_tokens(goal_section)
# SECTION 2: Current State
state_section = f"""CURRENT STATE (Iteration {state.iterations}/{state.max_iterations}):
- Status: {state.status}
- Cost So Far: ${state.total_cost:.4f}
- Duration: {state.wall_clock_duration:.1f}s
- Available Tools: {', '.join(tools)}"""
sections.append(("state", state_section))
tokens_used += self.tokenizer.count_tokens(state_section)
# SECTION 3: Recent History (Compressed)
history_section = self._compress_history(state.history[-10:])
sections.append(("history", history_section))
tokens_used += self.tokenizer.count_tokens(history_section)
# SECTION 4: Artifacts (Sampled if too large)
if tokens_used < self.max_context_tokens * 0.7:
artifacts_section = self._sample_artifacts(state.artifacts)
sections.append(("artifacts", artifacts_section))
tokens_used += self.tokenizer.count_tokens(artifacts_section)
# SECTION 5: Error Context (If errors occurred)
if state.error_log:
errors_section = f"""RECENT ERRORS:
{chr(10).join(state.error_log[-3:])}"""
sections.append(("errors", errors_section))
tokens_used += self.tokenizer.count_tokens(errors_section)
# Trim if over budget
if tokens_used > self.max_context_tokens:
sections = self._trim_sections(sections, self.max_context_tokens)
return "\n".join([section for _, section in sections])
def _compress_history(self, history: List[IterationRecord]) -> str:
"""Summarize history in compact format"""
lines = ["HISTORY:"]
for record in history[-10:]:
action = record.action.get("action", "unknown")
status = record.result.get("status", "unknown")
lines.append(f" [{record.iteration}] {action} → {status}")
return "\n".join(lines) + "\n"
def _sample_artifacts(self, artifacts: Dict[str, str]) -> str:
"""Include relevant artifacts, truncate if needed"""
section = "GENERATED ARTIFACTS:\n"
for path, content in artifacts.items():
# Include first 500 chars
preview = content[:500] + ("..." if len(content) > 500 else "")
section += f"\n{path}:\n{preview}\n"
return section
def _trim_sections(self, sections: List[Tuple], max_tokens: int) -> List[Tuple]:
"""Drop least important sections to fit budget"""
priority = ["goal", "state", "history", "artifacts", "errors"]
sorted_sections = sorted(
sections,
key=lambda x: priority.index(x[0]) if x[0] in priority else 999
)
trimmed = []
tokens = 0
for name, section in sorted_sections:
section_tokens = self.tokenizer.count_tokens(section)
if tokens + section_tokens <= max_tokens:
trimmed.append((name, section))
tokens += section_tokens
return trimmed
2. Memory Types in Loops
class LoopMemory:
"""Multi-tier memory system for loops"""
def __init__(self):
self.short_term = {} # Iteration-level state
self.working = {} # Task-level artifacts
self.long_term = {} # Loop-lifetime knowledge
self.external = {} # Files, Git, databases
# SHORT-TERM (lasts 1-2 iterations)
# Used for: immediate action results, flags
# WORKING (lasts duration of loop)
# Used for: generated code, intermediate outputs
# LONG-TERM (persists across loops)
# Used for: learned patterns, solutions to recurring problems
# EXTERNAL (persists indefinitely)
# Used for: final artifacts, version control, audit log
VERIFICATION & VALIDATION SYSTEMS
1. Multi-Layer Verification
class VerificationFramework:
"""Multi-layer verification for robust goal achievement"""
def __init__(self):
self.assertions = []
self.validators = []
def register_assertion(self, name: str, predicate: Callable[[Dict], bool]):
"""Register a verifiable assertion about success"""
self.assertions.append({"name": name, "check": predicate})
def register_validator(self, name: str, validator: Callable[[Dict], bool]):
"""Register a validator that must pass"""
self.validators.append({"name": name, "check": validator})
def verify_goal_achieved(self, state: LoopState, result: Dict) -> VerificationReport:
"""
Multi-layer verification:
Layer 1: Assertions (conditions that must be true)
Layer 2: Validators (functional validation)
Layer 3: Integration tests (end-to-end)
"""
report = VerificationReport()
# LAYER 1: Assertions
assertion_results = []
for assertion in self.assertions:
try:
passed = assertion["check"](result)
assertion_results.append({
"assertion": assertion["name"],
"passed": passed,
"weight": 1.0
})
except Exception as e:
assertion_results.append({
"assertion": assertion["name"],
"passed": False,
"error": str(e)
})
# LAYER 2: Validators
validator_results = []
for validator in self.validators:
try:
passed = validator["check"](result)
validator_results.append({
"validator": validator["name"],
"passed": passed,
"required": True
})
except Exception as e:
validator_results.append({
"validator": validator["name"],
"passed": False,
"error": str(e)
})
# LAYER 3: Integration Check
integration_passed = self._run_integration_test(result)
# Aggregate
all_assertions_pass = all(a["passed"] for a in assertion_results)
all_validators_pass = all(v["passed"] for v in validator_results)
report.success = all_assertions_pass and all_validators_pass and integration_passed
report.confidence = self._compute_confidence(
assertion_results, validator_results, integration_passed
)
report.details = {
"assertions": assertion_results,
"validators": validator_results,
"integration": integration_passed
}
return report
def _run_integration_test(self, result: Dict) -> bool:
"""Run end-to-end integration test"""
# Domain-specific implementation
try:
# Example: Run test suite, verify artifact quality, etc.
return True
except:
return False
def _compute_confidence(self, assertions, validators, integration) -> float:
"""Compute confidence [0-1]"""
scores = []
# Assertion confidence
assertion_pass_rate = sum(1 for a in assertions if a["passed"]) / len(assertions)
scores.append(assertion_pass_rate * 0.4)
# Validator confidence
validator_pass_rate = sum(1 for v in validators if v["passed"]) / len(validators)
scores.append(validator_pass_rate * 0.4)
# Integration confidence
scores.append(integration * 0.2)
return sum(scores)
@dataclass
class VerificationReport:
success: bool
confidence: float # [0-1]
details: Dict
COST OPTIMIZATION & TOKEN MANAGEMENT
1. Token Budget System
class TokenBudgetManager:
"""Track and optimize token usage across loop"""
def __init__(self, budget_usd: float = 10.0, model: str = "claude-sonnet-4-6"):
self.budget_usd = budget_usd
self.model = model
self.pricing = self._load_pricing()
self.spent = 0.0
self.token_log = []
def _load_pricing(self) -> Dict:
"""Current pricing (June 2026)"""
return {
"claude-opus-4-6": {"input": 0.015/1000, "output": 0.045/1000},
"claude-sonnet-4-6": {"input": 0.003/1000, "output": 0.015/1000},
"claude-haiku-4-5": {"input": 0.0008/1000, "output": 0.004/1000}
}
def can_afford_call(self, estimated_input_tokens: int) -> bool:
"""Check if next LLM call fits budget"""
estimated_cost = self._estimate_cost(estimated_input_tokens, 2000)
return self.spent + estimated_cost <= self.budget_usd
def record_call(self, input_tokens: int, output_tokens: int) -> float:
"""Log token usage and return cost"""
cost = self._compute_cost(input_tokens, output_tokens)
self.spent += cost
self.token_log.append({
"input": input_tokens,
"output": output_tokens,
"cost": cost,
"timestamp": time.time()
})
return cost
def _estimate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost before call"""
pricing = self.pricing[self.model]
return (input_tokens * pricing["input"] +
output_tokens * pricing["output"])
def _compute_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Compute actual cost"""
return self._estimate_cost(input_tokens, output_tokens)
def get_remaining_budget(self) -> float:
return self.budget_usd - self.spent
def get_budget_report(self) -> Dict:
return {
"total_budget": self.budget_usd,
"spent": self.spent,
"remaining": self.get_remaining_budget(),
"percent_used": (self.spent / self.budget_usd) * 100,
"total_tokens": sum(t["input"] + t["output"] for t in self.token_log),
"call_count": len(self.token_log)
}
class TokenOptimizer:
"""Optimize token usage through compression and caching"""
@staticmethod
def compress_history(history: List[IterationRecord], ratio: float = 0.5) -> str:
"""Compress history summaries to reduce context size"""
# Keep first and last iterations completely
# Summarize middle iterations
if len(history) <= 4:
return "\n".join(f"[{r.iteration}] {r.action['action']}" for r in history)
compressed = []
compressed.append(f"[{history[0].iteration}] {history[0].action['action']}")
# Compress middle
middle = history[1:-1]
summary = f"... {len(middle)} intermediate iterations (actions: {set(m.action['action'] for m in middle)}) ..."
compressed.append(summary)
compressed.append(f"[{history[-1].iteration}] {history[-1].action['action']}")
return "\n".join(compressed)
@staticmethod
def cache_results(result: Dict, key: str) -> None:
"""Cache successful results to avoid recomputation"""
import hashlib
hash_key = hashlib.md5(json.dumps(result).encode()).hexdigest()
# Store in Redis or cache system
pass
ADVANCED PATTERNS
1. Parallel Loop Execution
class ParallelLoopCoordinator:
"""Coordinate multiple parallel loops with shared state"""
def __init__(self, max_parallel: int = 3):
self.max_parallel = max_parallel
self.loops = {}
self.shared_state = {}
self.lock = threading.Lock()
def spawn_loop(self, loop_id: str, goal: str) -> 'AutonomousLoop':
"""Create new autonomous loop"""
if len(self.loops) >= self.max_parallel:
raise RuntimeError("Max parallel loops reached")
loop = AutonomousLoop(goal)
self.loops[loop_id] = loop
return loop
def run_parallel(self) -> Dict[str, LoopResult]:
"""Execute all loops in parallel"""
futures = {}
with ThreadPoolExecutor(max_workers=self.max_parallel) as executor:
for loop_id, loop in self.loops.items():
futures[loop_id] = executor.submit(loop.run)
results = {}
for loop_id, future in futures.items():
results[loop_id] = future.result()
return results
def coordinate_state(self, loop_id: str, state_update: Dict) -> None:
"""Share state between parallel loops"""
with self.lock:
self.shared_state.update(state_update)
2. Nested Loops
class NestedLoopController:
"""Support hierarchical/nested loop execution"""
def __init__(self, parent_goal: str):
self.parent_goal = parent_goal
self.subloops = []
self.parent_state = {}
def create_subloop(self, subgoal: str) -> 'AutonomousLoop':
"""Create nested loop for subtask"""
subloop = AutonomousLoop(subgoal)
subloop.parent_state = self.parent_state
self.subloops.append(subloop)
return subloop
def run_hierarchical(self) -> LoopResult:
"""Execute parent loop with subloop management"""
for subloop in self.subloops:
result = subloop.run()
if not result.success:
return LoopResult(success=False, reason="subloop_failed")
return LoopResult(success=True)
3. Adaptive Loop Configuration
class AdaptiveLoopController:
"""Adjust loop parameters based on performance"""
def __init__(self, initial_config: Dict):
self.config = initial_config
self.metrics = []
def adapt_parameters(self, state: LoopState) -> None:
"""Dynamically adjust loop behavior based on metrics"""
# Measure success rate
success_rate = self._compute_success_rate(state)
# Adjust temperature based on performance
if success_rate < 0.3:
# Increase determinism for struggling loops
self.config["temperature"] = 0.3
elif success_rate > 0.8:
# Can afford more exploration
self.config["temperature"] = 0.8
# Adjust max_iterations based on progress
if self._is_making_progress(state):
self.config["max_iterations"] = min(
self.config["max_iterations"] + 5, 50
)
else:
self.config["max_iterations"] = max(
self.config["max_iterations"] - 5, 10
)
IMPLEMENTATION EXAMPLES
Example 1: Code Generation Loop
def create_code_generation_loop(
requirements: str,
language: str = "python",
budget: float = 5.0
) -> AutonomousLoop:
"""Create a loop that generates and tests code"""
def generate_tool(specs: str) -> Dict:
"""Generate code based on specs"""
# Implementation: Call LLM to generate code
pass
def test_tool(code: str) -> Dict:
"""Execute generated code and test it"""
try:
exec_globals = {}
exec(code, exec_globals)
return {"status": "success", "output": exec_globals}
except Exception as e:
return {"status": "error", "error": str(e)}
def lint_tool(code: str) -> Dict:
"""Lint and validate code quality"""
# Run pylint, mypy, etc.
pass
loop = AutonomousLoop(
goal=requirements,
llm_client=Anthropic(),
tools={
"generate": generate_tool,
"test": test_tool,
"lint": lint_tool
}
)
loop.budget_manager = TokenBudgetManager(budget)
return loop
Example 2: Data Processing Loop
def create_data_processing_loop(
input_file: str,
processing_spec: str
) -> AutonomousLoop:
"""Create a loop that processes data iteratively"""
def load_data_tool(path: str) -> Dict:
import pandas as pd
return {"data": pd.read_csv(path).to_dict()}
def transform_tool(data: Dict, transformation: str) -> Dict:
# Apply transformation
pass
def validate_tool(data: Dict) -> Dict:
# Validate data quality
pass
def save_tool(data: Dict, path: str) -> Dict:
import pandas as pd
pd.DataFrame(data).to_csv(path)
return {"status": "success", "path": path}
loop = AutonomousLoop(
goal=processing_spec,
tools={
"load": load_data_tool,
"transform": transform_tool,
"validate": validate_tool,
"save": save_tool
}
)
return loop
MONITORING & OBSERVABILITY
1. Loop Metrics & Telemetry
class LoopTelemetry:
"""Comprehensive monitoring for loop execution"""
def __init__(self, loop_id: str):
self.loop_id = loop_id
self.metrics = {
"iterations": 0,
"successes": 0,
"failures": 0,
"retries": 0,
"total_tokens": 0,
"total_cost": 0.0,
"start_time": time.time(),
"errors": []
}
def record_iteration(self, success: bool, tokens: int, cost: float) -> None:
self.metrics["iterations"] += 1
if success:
self.metrics["successes"] += 1
else:
self.metrics["failures"] += 1
self.metrics["total_tokens"] += tokens
self.metrics["total_cost"] += cost
def record_error(self, error: Exception) -> None:
self.metrics["errors"].append({
"type": type(error).__name__,
"message": str(error),
"timestamp": time.time()
})
def get_metrics(self) -> Dict:
duration = time.time() - self.metrics["start_time"]
return {
**self.metrics,
"duration_seconds": duration,
"success_rate": self.metrics["successes"] / max(self.metrics["iterations"], 1),
"cost_per_iteration": self.metrics["total_cost"] / max(self.metrics["iterations"], 1),
"tokens_per_iteration": self.metrics["total_tokens"] / max(self.metrics["iterations"], 1)
}
class LoopDashboard:
"""Real-time dashboard for loop monitoring"""
def __init__(self):
self.loops = {}
def register_loop(self, loop_id: str, telemetry: LoopTelemetry):
self.loops[loop_id] = telemetry
def render_dashboard(self) -> str:
"""Generate text-based dashboard"""
output = "LOOP EXECUTION DASHBOARD\n"
output += "=" * 80 + "\n\n"
for loop_id, telemetry in self.loops.items():
metrics = telemetry.get_metrics()
output += f"Loop: {loop_id}\n"
output += f" Iterations: {metrics['iterations']}\n"
output += f" Success Rate: {metrics['success_rate']:.1%}\n"
output += f" Total Cost: ${metrics['total_cost']:.4f}\n"
output += f" Duration: {metrics['duration_seconds']:.1f}s\n"
output += f" Tokens/Iteration: {metrics['tokens_per_iteration']:.0f}\n"
output += "\n"
return output
2. Logging & Audit Trail
class LoopLogger:
"""Comprehensive audit logging for compliance and debugging"""
def __init__(self, loop_id: str, log_file: str = None):
self.loop_id = loop_id
self.logger = self._setup_logger(log_file)
def _setup_logger(self, log_file: str):
import logging
logger = logging.getLogger(f"loop.{self.loop_id}")
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# File handler
if log_file:
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logger.addHandler(fh)
# Console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def log_iteration(self, iteration: int, action: Dict, result: Dict) -> None:
self.logger.info(
f"[ITERATION {iteration}] Action: {action['action']} → "
f"Status: {result.get('status')}"
)
def log_error(self, error: Exception, context: Dict) -> None:
self.logger.error(
f"ERROR: {type(error).__name__}: {str(error)}\n"
f"Context: {json.dumps(context, indent=2)}"
)
def log_state_snapshot(self, state: LoopState) -> None:
self.logger.debug(
f"STATE SNAPSHOT:\n{json.dumps(json.loads(state.serialize()), indent=2)}"
)
PRODUCTION DEPLOYMENT CHECKLIST
- [ ] Token budget enforcement implemented
- [ ] Circuit breaker protection active
- [ ] Cost limits enforced per-loop
- [ ] Comprehensive error classification & recovery
- [ ] State persistence configured (DB backend)
- [ ] Monitoring/telemetry in place
- [ ] Retry logic with exponential backoff
- [ ] Verification framework implemented
- [ ] Context management optimized
- [ ] Logging/audit trail enabled
- [ ] Graceful degradation strategies
- [ ] Performance benchmarking completed
- [ ] Cost projections validated
- [ ] Runaway loop detection active
CONCLUSION
Loop engineering shifts AI development from reactive prompting to proactive system design. The key is building robust infrastructure around iterative autonomy: state management, verification, cost control and error recovery.
The loop becomes the unit of work, not the LLM call.