The Orchestration API provides advanced features for workflow management, execution control, and multi-provider orchestration beyond the basic compose endpoint.

Overview

The Orchestration API extends the basic compose functionality with:

  • Provider Management: Switch between different orchestration providers (ADK, MCP, etc.)
  • Workflow Refinement: Iterative workflow improvement and validation
  • Execution Control: Advanced execution monitoring and control
  • Multi-Modal Operations: Support for different input/output formats

Core Endpoints

Provider Management

List Providers

GET /providers

Returns available orchestration providers and their capabilities.

Response:

{
  "providers": {
    "adk": {
      "name": "ADKProvider",
      "available": true,
      "features": {
        "generation": true,
        "execution": true,
        "refinement": true,
        "streaming": true,
        "compose": true
      }
    },
    "mcp": {
      "name": "MCPProvider", 
      "available": true,
      "features": {
        "generation": true,
        "execution": false,
        "refinement": false,
        "streaming": true,
        "compose": true
      }
    }
  },
  "default": "adk"
}

Switch Provider

POST /providers/switch

Request Body:

{
  "provider": "mcp",
  "config": {
    "timeout": 60000,
    "max_retries": 3
  }
}

Workflow Generation

Generate Workflow

POST /generate

Generate a workflow without execution (similar to compose in plan mode).

Request Body:

{
  "task": "Deploy application to Kubernetes",
  "context": {
    "environment": "staging",
    "namespace": "app-staging"
  },
  "provider": "adk",
  "options": {
    "include_validation": true,
    "format": "yaml"
  }
}

Response:

{
  "workflow": {
    "name": "deploy-to-kubernetes",
    "description": "Deploy application to Kubernetes staging environment",
    "steps": [
      {
        "name": "validate_environment",
        "tool": "kubectl",
        "parameters": {
          "command": "get namespace app-staging"
        }
      }
    ]
  },
  "validation": {
    "valid": true,
    "issues": [],
    "suggestions": []
  },
  "metadata": {
    "generated_at": "2024-01-15T10:30:00Z",
    "provider": "adk",
    "model": "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo"
  }
}

Workflow Refinement

Refine Workflow

POST /refine

Improve an existing workflow based on feedback or additional requirements.

Request Body:

{
  "workflow": {
    "name": "deploy-app",
    "steps": [...]
  },
  "requirements": [
    "Add health checks after deployment",
    "Include rollback strategy",
    "Add monitoring setup"
  ],
  "context": {
    "previous_errors": [
      "Deployment failed due to missing health check"
    ]
  }
}

Workflow Execution

Execute Workflow

POST /execute

Execute a pre-generated workflow with real-time monitoring.

Request Body:

{
  "workflow": {
    "name": "deploy-app",
    "steps": [...]
  },
  "parameters": {
    "image_tag": "v1.2.3",
    "replicas": 3
  },
  "options": {
    "stream": true,
    "timeout": 300000,
    "on_failure": "rollback"
  }
}

Streaming Response (SSE):

event: execution_start
data: {"workflow": "deploy-app", "run_id": "exec_123", "timestamp": "2024-01-15T10:30:00Z"}

event: step_start  
data: {"step": "validate_environment", "index": 0}

event: step_output
data: {"step": "validate_environment", "output": "Namespace exists"}

event: step_complete
data: {"step": "validate_environment", "status": "success", "duration": 1.2}

event: execution_complete
data: {"status": "success", "duration": 45.7, "outputs": {...}}

Advanced Features

Multi-Provider Orchestration

POST /orchestrate

Advanced orchestration that can utilize multiple providers for different aspects of workflow generation and execution.

Request Body:

{
  "task": "Set up comprehensive monitoring for microservices",
  "strategy": {
    "generation": {
      "provider": "adk",
      "model": "deepseek-ai/DeepSeek-V3"
    },
    "validation": {
      "provider": "mcp",
      "tools": ["kubernetes-validator", "prometheus-validator"]
    },
    "execution": {
      "provider": "adk",
      "runner": "kubernetes-runner"
    }
  },
  "context": {
    "services": ["api", "web", "worker"],
    "infrastructure": "kubernetes",
    "monitoring_stack": "prometheus+grafana"
  }
}

Workflow Templates

List Templates

GET /templates

Get available workflow templates.

Create from Template

POST /templates/{template_id}/instantiate

Create a workflow from a template with specific parameters.

Session Management

Create Session

POST /sessions

Create a conversation session for multi-turn workflow development.

Request Body:

{
  "context": {
    "project": "microservices-deployment",
    "environment": "production"
  },
  "preferences": {
    "provider": "adk",
    "streaming": true
  }
}

Response:

{
  "session_id": "sess_abc123",
  "created_at": "2024-01-15T10:30:00Z",
  "context": {...},
  "preferences": {...}
}

Continue Session

POST /sessions/{session_id}/continue

Continue a workflow development session with context preservation.

Usage Examples

TypeScript Client

export class OrchestrationClient {
  constructor(private baseUrl: string, private apiKey: string) {}
  
  // Provider management
  async listProviders() {
    const response = await fetch(`${this.baseUrl}/providers`, {
      headers: { 'Authorization': `Bearer ${this.apiKey}` }
    });
    return response.json();
  }
  
  async switchProvider(provider: string, config?: any) {
    const response = await fetch(`${this.baseUrl}/providers/switch`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ provider, config })
    });
    return response.json();
  }
  
  // Workflow operations
  async generateWorkflow(task: string, context?: any) {
    const response = await fetch(`${this.baseUrl}/generate`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ task, context })
    });
    return response.json();
  }
  
  async refineWorkflow(workflow: any, requirements: string[]) {
    const response = await fetch(`${this.baseUrl}/refine`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ workflow, requirements })
    });
    return response.json();
  }
  
  // Streaming execution
  async *executeWorkflow(workflow: any, parameters?: any) {
    const response = await fetch(`${this.baseUrl}/execute`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ 
        workflow, 
        parameters,
        options: { stream: true }
      })
    });
    
    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
    
    while (reader) {
      const { done, value } = await reader.read();
      if (done) break;
      
      const chunk = decoder.decode(value);
      const lines = chunk.split('\n');
      
      for (const line of lines) {
        if (line.startsWith('data: ')) {
          yield JSON.parse(line.slice(6));
        }
      }
    }
  }
}

Python Client

import asyncio
import httpx
from typing import AsyncGenerator, Dict, List, Optional

class OrchestrationClient:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.headers = {"Authorization": f"Bearer {api_key}"}
    
    async def generate_workflow(
        self, 
        task: str, 
        context: Optional[Dict] = None
    ) -> Dict:
        """Generate a workflow without execution."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/generate",
                json={"task": task, "context": context or {}},
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
    
    async def refine_workflow(
        self,
        workflow: Dict,
        requirements: List[str],
        context: Optional[Dict] = None
    ) -> Dict:
        """Refine an existing workflow."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/refine",
                json={
                    "workflow": workflow,
                    "requirements": requirements,
                    "context": context or {}
                },
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
    
    async def execute_workflow_stream(
        self,
        workflow: Dict,
        parameters: Optional[Dict] = None
    ) -> AsyncGenerator[Dict, None]:
        """Execute workflow with streaming updates."""
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                f"{self.base_url}/execute",
                json={
                    "workflow": workflow,
                    "parameters": parameters or {},
                    "options": {"stream": True}
                },
                headers=self.headers
            ) as response:
                response.raise_for_status()
                
                async for line in response.aiter_lines():
                    if line.startswith("data: "):
                        yield json.loads(line[6:])

Multi-Provider Workflow

async def create_comprehensive_deployment():
    client = OrchestrationClient("http://localhost:8001", api_key)
    
    # Step 1: Generate initial workflow with ADK
    workflow = await client.generate_workflow(
        task="Deploy microservices with monitoring and logging",
        context={
            "services": ["api", "web", "worker"],
            "environment": "production"
        }
    )
    
    # Step 2: Refine with additional requirements
    refined_workflow = await client.refine_workflow(
        workflow["workflow"],
        requirements=[
            "Add Prometheus monitoring for all services",
            "Include ELK stack for centralized logging", 
            "Add health checks and readiness probes",
            "Include rollback strategy for failed deployments"
        ]
    )
    
    # Step 3: Execute with streaming monitoring
    print("🚀 Starting deployment...")
    async for event in client.execute_workflow_stream(
        refined_workflow["workflow"],
        parameters={
            "image_tag": "v2.1.0",
            "replicas": 3,
            "monitoring_enabled": True
        }
    ):
        event_type = event.get("event", "unknown")
        
        if event_type == "step_start":
            print(f"▶️  Starting: {event['step']}")
        elif event_type == "step_complete":
            status = event["status"]
            duration = event["duration"]
            emoji = "✅" if status == "success" else "❌"
            print(f"{emoji} {event['step']}: {status} ({duration}s)")
        elif event_type == "execution_complete":
            print(f"🎉 Deployment complete! Status: {event['status']}")

Error Handling

HTTP StatusDescriptionHandling
200SuccessProcess response
400Bad RequestFix request parameters
401UnauthorizedCheck API key
404Not FoundEndpoint or resource not found
429Rate LimitedImplement backoff
500Server ErrorRetry with exponential backoff

Authentication

All orchestration endpoints require authentication:

curl -X POST http://localhost:8001/generate \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"task": "Deploy to staging"}'

Best Practices

Use Sessions

Create sessions for multi-turn workflow development

Stream Execution

Always use streaming for workflow execution monitoring

Handle Failures

Implement proper error handling and retry logic

Validate Workflows

Use refinement to validate and improve workflows

Next Steps