The Orchestration API provides advanced features for workflow management, execution control, and multi-provider orchestration beyond the basic compose endpoint.
Overview
The Orchestration API extends the basic compose functionality with:
Provider Management : Switch between different orchestration providers (ADK, MCP, etc.)
Workflow Refinement : Iterative workflow improvement and validation
Execution Control : Advanced execution monitoring and control
Multi-Modal Operations : Support for different input/output formats
Core Endpoints
Provider Management
List Providers
Returns available orchestration providers and their capabilities.
Response:
{
"providers" : {
"adk" : {
"name" : "ADKProvider" ,
"available" : true ,
"features" : {
"generation" : true ,
"execution" : true ,
"refinement" : true ,
"streaming" : true ,
"compose" : true
}
},
"mcp" : {
"name" : "MCPProvider" ,
"available" : true ,
"features" : {
"generation" : true ,
"execution" : false ,
"refinement" : false ,
"streaming" : true ,
"compose" : true
}
}
},
"default" : "adk"
}
Switch Provider
Request Body:
{
"provider" : "mcp" ,
"config" : {
"timeout" : 60000 ,
"max_retries" : 3
}
}
Workflow Generation
Generate Workflow
Generate a workflow without execution (similar to compose in plan mode).
Request Body:
{
"task" : "Deploy application to Kubernetes" ,
"context" : {
"environment" : "staging" ,
"namespace" : "app-staging"
},
"provider" : "adk" ,
"options" : {
"include_validation" : true ,
"format" : "yaml"
}
}
Response:
{
"workflow" : {
"name" : "deploy-to-kubernetes" ,
"description" : "Deploy application to Kubernetes staging environment" ,
"steps" : [
{
"name" : "validate_environment" ,
"tool" : "kubectl" ,
"parameters" : {
"command" : "get namespace app-staging"
}
}
]
},
"validation" : {
"valid" : true ,
"issues" : [],
"suggestions" : []
},
"metadata" : {
"generated_at" : "2024-01-15T10:30:00Z" ,
"provider" : "adk" ,
"model" : "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo"
}
}
Workflow Refinement
Refine Workflow
Improve an existing workflow based on feedback or additional requirements.
Request Body:
{
"workflow" : {
"name" : "deploy-app" ,
"steps" : [ ... ]
},
"requirements" : [
"Add health checks after deployment" ,
"Include rollback strategy" ,
"Add monitoring setup"
],
"context" : {
"previous_errors" : [
"Deployment failed due to missing health check"
]
}
}
Workflow Execution
Execute Workflow
Execute a pre-generated workflow with real-time monitoring.
Request Body:
{
"workflow" : {
"name" : "deploy-app" ,
"steps" : [ ... ]
},
"parameters" : {
"image_tag" : "v1.2.3" ,
"replicas" : 3
},
"options" : {
"stream" : true ,
"timeout" : 300000 ,
"on_failure" : "rollback"
}
}
Streaming Response (SSE):
event: execution_start
data: {"workflow": "deploy-app", "run_id": "exec_123", "timestamp": "2024-01-15T10:30:00Z"}
event: step_start
data: {"step": "validate_environment", "index": 0}
event: step_output
data: {"step": "validate_environment", "output": "Namespace exists"}
event: step_complete
data: {"step": "validate_environment", "status": "success", "duration": 1.2}
event: execution_complete
data: {"status": "success", "duration": 45.7, "outputs": {...}}
Advanced Features
Multi-Provider Orchestration
Advanced orchestration that can utilize multiple providers for different aspects of workflow generation and execution.
Request Body:
{
"task" : "Set up comprehensive monitoring for microservices" ,
"strategy" : {
"generation" : {
"provider" : "adk" ,
"model" : "deepseek-ai/DeepSeek-V3"
},
"validation" : {
"provider" : "mcp" ,
"tools" : [ "kubernetes-validator" , "prometheus-validator" ]
},
"execution" : {
"provider" : "adk" ,
"runner" : "kubernetes-runner"
}
},
"context" : {
"services" : [ "api" , "web" , "worker" ],
"infrastructure" : "kubernetes" ,
"monitoring_stack" : "prometheus+grafana"
}
}
Workflow Templates
List Templates
Get available workflow templates.
Create from Template
POST /templates/{template_id}/instantiate
Create a workflow from a template with specific parameters.
Session Management
Create Session
Create a conversation session for multi-turn workflow development.
Request Body:
{
"context" : {
"project" : "microservices-deployment" ,
"environment" : "production"
},
"preferences" : {
"provider" : "adk" ,
"streaming" : true
}
}
Response:
{
"session_id" : "sess_abc123" ,
"created_at" : "2024-01-15T10:30:00Z" ,
"context" : { ... },
"preferences" : { ... }
}
Continue Session
POST /sessions/{session_id}/continue
Continue a workflow development session with context preservation.
Usage Examples
TypeScript Client
export class OrchestrationClient {
constructor ( private baseUrl : string , private apiKey : string ) {}
// Provider management
async listProviders () {
const response = await fetch ( ` ${ this . baseUrl } /providers` , {
headers: { 'Authorization' : `Bearer ${ this . apiKey } ` }
});
return response . json ();
}
async switchProvider ( provider : string , config ?: any ) {
const response = await fetch ( ` ${ this . baseUrl } /providers/switch` , {
method: 'POST' ,
headers: {
'Authorization' : `Bearer ${ this . apiKey } ` ,
'Content-Type' : 'application/json'
},
body: JSON . stringify ({ provider , config })
});
return response . json ();
}
// Workflow operations
async generateWorkflow ( task : string , context ?: any ) {
const response = await fetch ( ` ${ this . baseUrl } /generate` , {
method: 'POST' ,
headers: {
'Authorization' : `Bearer ${ this . apiKey } ` ,
'Content-Type' : 'application/json'
},
body: JSON . stringify ({ task , context })
});
return response . json ();
}
async refineWorkflow ( workflow : any , requirements : string []) {
const response = await fetch ( ` ${ this . baseUrl } /refine` , {
method: 'POST' ,
headers: {
'Authorization' : `Bearer ${ this . apiKey } ` ,
'Content-Type' : 'application/json'
},
body: JSON . stringify ({ workflow , requirements })
});
return response . json ();
}
// Streaming execution
async * executeWorkflow ( workflow : any , parameters ?: any ) {
const response = await fetch ( ` ${ this . baseUrl } /execute` , {
method: 'POST' ,
headers: {
'Authorization' : `Bearer ${ this . apiKey } ` ,
'Content-Type' : 'application/json'
},
body: JSON . stringify ({
workflow ,
parameters ,
options: { stream: true }
})
});
const reader = response . body ?. getReader ();
const decoder = new TextDecoder ();
while ( reader ) {
const { done , value } = await reader . read ();
if ( done ) break ;
const chunk = decoder . decode ( value );
const lines = chunk . split ( ' \n ' );
for ( const line of lines ) {
if ( line . startsWith ( 'data: ' )) {
yield JSON . parse ( line . slice ( 6 ));
}
}
}
}
}
Python Client
import asyncio
import httpx
from typing import AsyncGenerator, Dict, List, Optional
class OrchestrationClient :
def __init__ ( self , base_url : str , api_key : str ):
self .base_url = base_url
self .api_key = api_key
self .headers = { "Authorization" : f "Bearer { api_key } " }
async def generate_workflow (
self ,
task : str ,
context : Optional[Dict] = None
) -> Dict:
"""Generate a workflow without execution."""
async with httpx.AsyncClient() as client:
response = await client.post(
f " { self .base_url } /generate" ,
json = { "task" : task, "context" : context or {}},
headers = self .headers
)
response.raise_for_status()
return response.json()
async def refine_workflow (
self ,
workflow : Dict,
requirements : List[ str ],
context : Optional[Dict] = None
) -> Dict:
"""Refine an existing workflow."""
async with httpx.AsyncClient() as client:
response = await client.post(
f " { self .base_url } /refine" ,
json = {
"workflow" : workflow,
"requirements" : requirements,
"context" : context or {}
},
headers = self .headers
)
response.raise_for_status()
return response.json()
async def execute_workflow_stream (
self ,
workflow : Dict,
parameters : Optional[Dict] = None
) -> AsyncGenerator[Dict, None ]:
"""Execute workflow with streaming updates."""
async with httpx.AsyncClient() as client:
async with client.stream(
"POST" ,
f " { self .base_url } /execute" ,
json = {
"workflow" : workflow,
"parameters" : parameters or {},
"options" : { "stream" : True }
},
headers = self .headers
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith( "data: " ):
yield json.loads(line[ 6 :])
Multi-Provider Workflow
async def create_comprehensive_deployment ():
client = OrchestrationClient( "http://localhost:8001" , api_key)
# Step 1: Generate initial workflow with ADK
workflow = await client.generate_workflow(
task = "Deploy microservices with monitoring and logging" ,
context = {
"services" : [ "api" , "web" , "worker" ],
"environment" : "production"
}
)
# Step 2: Refine with additional requirements
refined_workflow = await client.refine_workflow(
workflow[ "workflow" ],
requirements = [
"Add Prometheus monitoring for all services" ,
"Include ELK stack for centralized logging" ,
"Add health checks and readiness probes" ,
"Include rollback strategy for failed deployments"
]
)
# Step 3: Execute with streaming monitoring
print ( "🚀 Starting deployment..." )
async for event in client.execute_workflow_stream(
refined_workflow[ "workflow" ],
parameters = {
"image_tag" : "v2.1.0" ,
"replicas" : 3 ,
"monitoring_enabled" : True
}
):
event_type = event.get( "event" , "unknown" )
if event_type == "step_start" :
print ( f "▶️ Starting: { event[ 'step' ] } " )
elif event_type == "step_complete" :
status = event[ "status" ]
duration = event[ "duration" ]
emoji = "✅" if status == "success" else "❌"
print ( f " { emoji } { event[ 'step' ] } : { status } ( { duration } s)" )
elif event_type == "execution_complete" :
print ( f "🎉 Deployment complete! Status: { event[ 'status' ] } " )
Error Handling
HTTP Status Description Handling 200 Success Process response 400 Bad Request Fix request parameters 401 Unauthorized Check API key 404 Not Found Endpoint or resource not found 429 Rate Limited Implement backoff 500 Server Error Retry with exponential backoff
Authentication
All orchestration endpoints require authentication:
curl -X POST http://localhost:8001/generate \
-H "Authorization: Bearer $API_KEY " \
-H "Content-Type: application/json" \
-d '{"task": "Deploy to staging"}'
Best Practices
Use Sessions Create sessions for multi-turn workflow development
Stream Execution Always use streaming for workflow execution monitoring
Handle Failures Implement proper error handling and retry logic
Validate Workflows Use refinement to validate and improve workflows
Next Steps