Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

nshkrdotcom/pipeline_ex

Folders and files

NameName
Last commit message
Last commit date

Latest commit

History

117 Commits

Repository files navigation

Pipeline

CI Hex.pm Hex Docs

AI Pipeline Orchestration Library for Elixir

A robust, production-ready library for chaining AI providers (Claude and Gemini) with advanced features like fault tolerance, session management, and self-improving Genesis pipelines.

🎯 Library Readiness: 8.5/10 - Ready for immediate use as a Git dependency with comprehensive testing, clean API, and flexible configuration.

📋 Remaining 1.5/10 for Full Production Readiness

Missing Features (1.5/10):

  1. Hex Package Publication (0.5/10) - Currently Git-only, needs mix hex.publish workflow
  2. Enhanced Documentation (0.3/10) - ExDoc polish, API reference examples, getting started guide
  3. Backward Compatibility (0.2/10) - Semantic versioning strategy, deprecation warnings, migration guides
  4. Performance Benchmarks (0.2/10) - Baseline metrics, memory profiling, concurrency benchmarks
  5. Production Hardening (0.3/10) - Rate limiting, circuit breakers, structured logging with correlation IDs

Current Status: 8.5/10 = Excellent for Git dependency | 10/10 = Enterprise-ready Hex package

🧬 Genesis Pipeline: Self-Improving AI System

NEW: Our flagship feature - a pipeline that generates pipelines! The Genesis Pipeline is an AI system that creates other AI pipelines, enabling true self-improvement and evolution.

Quick Start with Genesis

# Generate a new AI pipeline with one command
mix pipeline.generate.live "Create a sentiment analysis pipeline"
# The system will create a complete, executable pipeline in evolved_pipelines/
# Run your generated pipeline immediately:
mix pipeline.run evolved_pipelines/sentiment_analyzer_*.yaml

What just happened? The Genesis Pipeline used Claude to analyze your request, design the optimal pipeline structure, and generate a complete YAML configuration that's immediately ready to execute.

📦 Library Usage

Use pipeline_ex as a dependency in your Elixir applications:

Add to Your Project

# mix.exs
defp deps do
 [
 # From Hex.pm (recommended)
 {:pipeline_ex, "~> 0.1.0"}
 # Or from GitHub
 # {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git", tag: "v0.1.0"}
 ]
end

⚠️ Breaking Change in v0.1.0: Async streaming system removed. ClaudeCodeSDK already streams messages optimally - the custom buffering system added unnecessary complexity. See docs/ASYNC_STREAMING_DEPRECATION.md for details.

Simple API

# Load and execute a pipeline
{:ok, config} = Pipeline.load_workflow("my_analysis.yaml")
{:ok, results} = Pipeline.execute(config)
# Execute with custom configuration
{:ok, results} = Pipeline.execute(config,
 workspace_dir: "/app/ai_workspace",
 output_dir: "/app/pipeline_outputs"
)
# Convenience function
{:ok, results} = Pipeline.run("my_analysis.yaml", debug: true)
# Health check
case Pipeline.health_check() do
 :ok -> IO.puts("Pipeline system ready")
 {:error, issues} -> IO.puts("Issues: #{inspect(issues)}")
end

Configuration Options

The library supports flexible configuration through multiple sources:

  1. Function options (highest priority):

    Pipeline.execute(config, workspace_dir: "/custom/workspace")
  2. Environment variables:

    export PIPELINE_WORKSPACE_DIR="/app/workspace"
    export PIPELINE_OUTPUT_DIR="/app/outputs"
    export PIPELINE_CHECKPOINT_DIR="/app/checkpoints"
  3. YAML configuration and defaults

Integration Examples

# Phoenix controller
defmodule MyAppWeb.AIController do
 def analyze(conn, %{"code" => code}) do
 case Pipeline.run("pipelines/code_analysis.yaml",
 workspace_dir: "/tmp/ai_workspace") do
 {:ok, %{"analysis" => result}} -> 
 json(conn, %{analysis: result})
 {:error, reason} -> 
 put_status(conn, 500) |> json(%{error: reason})
 end
 end
end
# Background job with Oban
defmodule MyApp.AnalysisWorker do
 use Oban.Worker, queue: :ai_analysis
 
 def perform(%Oban.Job{args: %{"project_id" => project_id}}) do
 case Pipeline.execute(get_analysis_config(), 
 workspace_dir: "/tmp/analysis_#{project_id}",
 output_dir: "/app/results/#{project_id}") do
 {:ok, results} -> 
 MyApp.Projects.update_analysis(project, results)
 :ok
 {:error, reason} -> 
 {:error, reason}
 end
 end
end

Testing Integration

# Enable mock mode for development/testing
Application.put_env(:pipeline, :test_mode, true)
# All AI calls will be mocked
{:ok, results} = Pipeline.execute(config)

📖 Complete Library Guide: See the documentation sections below for detailed usage instructions, configuration options, and integration patterns.

🚀 Advanced Features: See ADVANCED_FEATURES.md for comprehensive documentation on loops, complex conditions, file operations, data transformation, codebase intelligence, and state management.

📋 YAML Format v2 Documentation: See docs/20250704_yaml_format_v2/index.md for complete reference documentation on the Pipeline YAML format, including all step types, prompt systems, control flow, and best practices.

Features

📦 Library-Ready

  • 🏗️ Elixir Library: Use as a dependency in any Elixir application
  • 🔧 Clean API: Simple Pipeline.execute/2 and Pipeline.load_workflow/1 functions
  • ⚙️ Configurable: All paths and settings customizable via options/environment variables
  • 🧪 Mock Mode: Complete testing support without API costs
  • 🏥 Health Checks: Built-in system validation and monitoring

🤖 AI Integration

  • 🤖 Multi-AI Integration: Chain Claude and Gemini APIs together
  • 💰 Model Selection & Cost Control: Choose between Claude models (sonnet ~0ドル.01 vs opus ~0ドル.26 per query)
  • 🔄 Flexible Execution Modes: Mock, Live, and Mixed modes for testing
  • 📋 YAML Workflow Configuration: Define complex multi-step workflows
  • 🎯 Structured Output: JSON-based responses with proper error handling
  • 🔧 InstructorLite Integration: Structured generation with Gemini
  • 📊 Result Management: Organized output storage and display

⚡ Advanced Features

  • Enhanced Claude Steps: Smart presets, sessions, extraction, batch processing, robust error handling
  • Model Selection: Automatic cost optimization (development=sonnet, production=opus+fallback, analysis=opus)
  • Genesis Pipeline: Self-improving AI system that generates other pipelines
  • Session Management: Persistent conversations with automatic checkpointing
  • Fault Tolerance: Retry mechanisms, circuit breakers, graceful degradation
  • Loop Constructs: For/while loops with parallel execution and nested support
  • Complex Conditions: Boolean logic, comparisons, mathematical expressions
  • File Operations: Copy, move, validate, convert with format transformations
  • Data Transformation: Filter, aggregate, join with schema validation
  • Codebase Intelligence: Project discovery, code analysis, dependency mapping
  • State Management: Variables, interpolation, checkpoints with persistence
  • Async Streaming: Real-time response streaming for Claude steps with multiple handlers

📚 See ADVANCED_FEATURES.md for detailed documentation and examples of all advanced capabilities.

Quick Start

1. Installation

As a Library Dependency (Recommended)

# mix.exs
defp deps do
 [
 # From Hex.pm (recommended)
 {:pipeline_ex, "~> 0.0.1"}
 
 # Or from GitHub
 # {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git", tag: "v0.0.1"}
 ]
end

Standalone Development

git clone <repository>
cd pipeline_ex
mix deps.get

2. Try the Showcase (Recommended)

🌟 Start with one of these commands:

# Mock mode (safe, free, fast)
mix showcase # Complete demo with mocks
# Live mode (real API calls, costs money)
mix showcase --live # Complete demo with live APIs

🎭 Mock vs Live Mode

All examples and tests can run in two modes:

Mode Command Format API Calls Costs Authentication Required
Mock mix showcase None (mocked) 0ドル.00 No
Live mix showcase --live Real API calls Real costs Yes (claude login)

Running Tests

Mock Mode (Recommended)

mix test # All tests with mocked APIs (fast, free)

Live Mode (Real API Calls)

# Setup authentication first:
export GEMINI_API_KEY="your_api_key"
claude login
# Run tests with real APIs:
mix pipeline.test.live # Only integration tests with live APIs (costs money)

Execution Modes

The pipeline supports three execution modes controlled by the TEST_MODE environment variable:

Mode Description Use Case
mock Uses fake responses Development, unit testing, CI/CD
live Uses real API calls Production, integration testing
mixed Mocks for unit tests, live for integration Hybrid testing approach

Mode Examples

# Mock mode - fast, no API costs
mix pipeline.run examples/comprehensive_config_example.yaml
# Live mode - real AI responses 
mix pipeline.run.live examples/comprehensive_config_example.yaml
# Mixed mode - context-dependent
TEST_MODE=mixed mix test

Configuration

API Keys

Claude

Claude uses the authenticated CLI. Run once:

claude login

Gemini

Set your API key:

export GEMINI_API_KEY="your_gemini_api_key_here"

Or in your application config:

config :pipeline, gemini_api_key: "your_api_key"

Workflow Configuration

Create YAML workflow files like test_simple_workflow.yaml:

workflow:
 name: "simple_test_workflow"
 description: "Test basic claude functionality"
 
 steps:
 - name: "analyze_code"
 type: "claude"
 prompt: 
 - type: "static"
 content: |
 Analyze this simple Python function and provide feedback:

 def add(a, b):
 return a + b

 Please provide your analysis in JSON format.
 
 - name: "plan_improvements"
 type: "gemini" 
 prompt:
 - type: "static"
 content: |
 Based on the previous analysis, create a plan to improve the function.
 Consider error handling, type hints, and documentation.

Example Usage

Library Usage in Your Application

defmodule MyApp.AIProcessor do
 @doc "Analyze code using the pipeline library"
 def analyze_code(code_content) do
 # Load your pipeline configuration
 case Pipeline.load_workflow("pipelines/code_analysis.yaml") do
 {:ok, config} ->
 # Execute with custom workspace
 Pipeline.execute(config,
 workspace_dir: "/tmp/ai_workspace",
 debug: true
 )
 {:error, reason} ->
 {:error, "Failed to load workflow: #{reason}"}
 end
 end
 
 @doc "Health check for the AI system"
 def system_ready? do
 case Pipeline.health_check() do
 :ok -> true
 {:error, _issues} -> false
 end
 end
end
# Usage in your application
{:ok, analysis} = MyApp.AIProcessor.analyze_code(user_code)
IO.inspect(analysis["analysis_step"])

Simple Script Example

#!/usr/bin/env elixir
Mix.install([
 {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git"}
])
# Load and execute pipeline
case Pipeline.run("test_simple_workflow.yaml", output_dir: "outputs") do
 {:ok, results} ->
 IO.puts("✅ Pipeline completed!")
 IO.inspect(results)
 
 {:error, reason} ->
 IO.puts("❌ Pipeline failed: #{reason}")
end

Testing Different Scenarios

# Test with mock responses (fast)
TEST_MODE=mock elixir run_example.exs
# Test with real Claude + mock Gemini
# (useful when you have Claude access but no Gemini API key)
TEST_MODE=mixed elixir -e "
System.put_env(\"FORCE_MOCK_GEMINI\", \"true\")
Code.compile_file(\"run_example.exs\")
"
# Full live test (requires both API keys)
elixir run_example.exs

Running the Comprehensive Example

The project includes a comprehensive configuration example that demonstrates all available features with minimal steps. This example showcases every configuration option, step type, and feature available in the pipeline system.

Mock Mode (Safe, No API Keys Required)

# Run the comprehensive example with mocked AI responses
mix pipeline.run examples/comprehensive_config_example.yaml
# Run with debug output to see detailed execution
PIPELINE_DEBUG=true mix pipeline.run examples/comprehensive_config_example.yaml

Live Mode (Real AI Providers)

To run the comprehensive example with actual AI providers:

1. Set Up API Keys

# Set your Gemini API key (get from https://aistudio.google.com/)
export GEMINI_API_KEY="your_gemini_api_key_here"
# Authenticate Claude CLI (pre-authenticated, no API key needed)
claude auth

2. Run in Live Mode

# Run with real AI providers
mix pipeline.run.live examples/comprehensive_config_example.yaml
# Run with full debug logging
PIPELINE_DEBUG=true PIPELINE_LOG_LEVEL=debug mix pipeline.run.live examples/comprehensive_config_example.yaml

What the Comprehensive Example Demonstrates

The examples/comprehensive_config_example.yaml shows:

  • Basic step types: gemini, claude, parallel_claude, gemini_instructor
  • Enhanced Claude steps: claude_smart, claude_session, claude_extract, claude_batch, claude_robust
  • Function calling: Gemini with structured function definitions
  • All Claude tools: Write, Edit, Read, Bash, Search, Glob, Grep
  • Parallel execution: Multiple Claude instances running simultaneously
  • Conditional steps: Steps that run based on previous results
  • All prompt types: Static content, file content, previous responses
  • Workspace management: Sandboxed file operations
  • Token budgets: Fine-tuned AI response configurations
  • Model selection: Different AI models for different tasks
  • Output management: Structured result saving and organization

Enhanced Claude Step Types

The pipeline includes five advanced Claude step types that extend the basic claude step with specialized capabilities:

🎯 Claude Smart (claude_smart)

Intelligent preset-based configuration with environment awareness.

  • Presets: development, production, analysis, chat, test
  • Auto-optimization: Preset-specific tool restrictions and performance tuning
  • Environment detection: Automatic preset selection based on Mix environment
- name: "smart_analysis" 
 type: "claude_smart"
 preset: "analysis" # Uses analysis-optimized settings
 prompt: [...]

🗣️ Claude Session (claude_session)

Persistent conversation management with session state.

  • Session persistence: Continue conversations across multiple steps
  • Automatic checkpointing: Save session state for recovery
  • Turn management: Configurable conversation length limits
- name: "session_start"
 type: "claude_session" 
 session_name: "math_tutor"
 session_config:
 persist: true
 max_turns: 50

📄 Claude Extract (claude_extract)

Advanced content extraction and post-processing.

  • Output formats: json, markdown, structured, summary
  • Post-processing: Extract code blocks, recommendations, links, key points
  • Content filtering: Apply extraction rules and transformations
- name: "extract_json"
 type: "claude_extract"
 preset: "analysis"
 extraction_config:
 format: "json"
 post_processing: ["extract_code_blocks", "extract_recommendations"]
 include_metadata: true

⚡ Claude Batch (claude_batch)

Parallel task execution with load balancing.

  • Concurrent processing: Run multiple Claude queries simultaneously
  • Task management: Queue and execute independent tasks
  • Performance scaling: Configurable parallelism limits
- name: "batch_analysis"
 type: "claude_batch"
 batch_config:
 max_parallel: 3
 tasks:
 - id: "task1"
 prompt: "Analyze JavaScript code..."
 - id: "task2" 
 prompt: "Analyze Python code..."

🛡️ Claude Robust (claude_robust)

Enterprise-grade error recovery and fault tolerance.

  • Retry mechanisms: Configurable backoff strategies
  • Fallback actions: Graceful degradation options
  • Circuit breaker: Prevent cascade failures
- name: "robust_analysis"
 type: "claude_robust"
 retry_config:
 max_retries: 3
 backoff_strategy: "exponential"
 fallback_action: "simplified_prompt"

Testing Enhanced Step Types

Each enhanced step type has dedicated example files for testing:

# Test individual enhanced step types
mix pipeline.run.live examples/claude_smart_example.yaml
mix pipeline.run.live examples/claude_session_example.yaml 
mix pipeline.run.live examples/claude_extract_example.yaml
mix pipeline.run.live examples/claude_batch_example.yaml
mix pipeline.run.live examples/claude_robust_example.yaml
# Or run all enhanced examples in mock mode (free)
mix pipeline.run examples/claude_smart_example.yaml
mix pipeline.run examples/claude_session_example.yaml
# ... etc

🚀 Async Streaming (NEW)

The pipeline now supports real-time message streaming for all Claude-based steps, displaying complete messages as they arrive from ClaudeCodeSDK for better user experience.

Why Use Async Streaming?

  • Real-time feedback: See Claude's complete messages as they arrive (message-by-message)
  • Lower memory usage: Process messages without buffering entire responses
  • Better UX: Progressive display of assistant responses, tool uses, and results
  • Visibility: Watch Claude's tool usage and thinking process in real-time

Basic Streaming Example

- name: "streaming_claude"
 type: "claude"
 claude_options:
 # Enable async streaming
 async_streaming: true
 stream_handler: "console" # Real-time console output
 stream_buffer_size: 100 # Message buffer size
 
 # Standard options
 max_turns: 10
 allowed_tools: ["Write", "Read", "Edit"]

Available Stream Handlers

  1. Console Handler (console) - Real-time terminal output

    stream_handler: "console"
    stream_console_config:
     show_timestamps: true
     color_output: true
     show_progress: true
  2. File Handler (file) - Stream to file with rotation

    stream_handler: "file"
    stream_file_path: "./outputs/stream.jsonl"
    stream_file_rotation:
     enabled: true
     max_size_mb: 10
     max_files: 5
  3. Buffer Handler (buffer) - Collect in memory with stats

    stream_handler: "buffer"
    stream_buffer_config:
     max_size: 1000
     circular: true
     deduplication: true
  4. Callback Handler (callback) - Custom processing

    stream_handler: "callback"
    stream_callback_config:
     filter_types: ["text", "tool_use"]
     rate_limit: 10

Streaming Examples

# Simple streaming example
mix pipeline.run examples/clean_streaming_numbers.yaml
# Multi-message streaming with file operations
mix pipeline.run examples/streaming_file_operations.yaml
# Run streaming tests
mix test test/integration/async_streaming_test.exs

📖 Complete Streaming Guide: See examples/STREAMING_GUIDE.md for implementation details and docs/ASYNC_STREAMING_MIGRATION_GUIDE.md for adding streaming to existing pipelines.

Works with All Claude Step Types

Async streaming is supported by all Claude-based steps:

  • claude - Basic Claude step
  • claude_smart - With presets
  • claude_session - With session continuity
  • claude_extract - With extraction
  • claude_batch - Parallel streaming
  • claude_robust - With error recovery
  • parallel_claude - Multiple streams

Environment Configuration

For advanced configuration, you can set these environment variables:

# API Configuration
export GEMINI_API_KEY="your_gemini_api_key"
# Note: Claude uses CLI authentication (claude auth), no API key needed
# Pipeline Directories
export PIPELINE_WORKSPACE_DIR="./workspace" # Claude's sandbox
export PIPELINE_OUTPUT_DIR="./outputs" # Result storage
export PIPELINE_CHECKPOINT_DIR="./checkpoints" # State management
# Logging and Debug
export PIPELINE_LOG_LEVEL="debug" # debug, info, warn, error
export PIPELINE_DEBUG="true" # Detailed execution logs
# Execution Mode
export TEST_MODE="live" # live, mock, mixed

Creating Your Own Workflows

  1. Start with the example: Copy examples/comprehensive_config_example.yaml
  2. Read the guides:
  3. Test in mock mode: Validate your workflow logic without API costs
  4. Run live: Execute with real AI providers when ready

Development

Running the Example

The run_example.exs script demonstrates the pipeline:

# Quick test with mocks
TEST_MODE=mock elixir run_example.exs
# Full test with APIs
elixir run_example.exs

Test Structure

test/
├── unit/ # Fast unit tests (mocked)
├── integration/ # Integration tests (live APIs) 
├── fixtures/ # Test data and workflows
└── support/ # Test helpers

Adding New Tests

defmodule MyTest do
 use ExUnit.Case
 use Pipeline.TestCase # Provides test mode helpers
 
 test "my feature works in mock mode" do
 # Test automatically uses mocks
 assert Pipeline.execute_something() == expected_result
 end
end

Debugging

Enable debug output:

# See detailed execution logs
DEBUG=true elixir run_example.exs
# See API request/response details
VERBOSE=true elixir run_example.exs

Debug output includes:

  • Step execution flow
  • API request/response details
  • Provider selection (mock vs live)
  • Result processing

Common Issues

Claude CLI Not Authenticated

# Error: Claude CLI not found or not authenticated
claude login

Missing Gemini API Key

# Error: GEMINI_API_KEY environment variable not set
export GEMINI_API_KEY="your_key_here"

Mixed Results (Some Success, Some Failure)

# Check which providers are mocked vs live
TEST_MODE=mock elixir run_example.exs # All mocked
TEST_MODE=live elixir run_example.exs # All live

License

TODO: Add license information

AltStyle によって変換されたページ (->オリジナル) /