POST
/
compose
curl --request POST \
  --url https://api.kubiya.ai/api/v1/compose \
  --header 'Authorization: <api-key>' \
  --header 'Content-Type: application/json' \
  --data '{
  "task": "<string>",
  "context": {
    "context.preferred_runner": "<string>",
    "context.available_tools": [
      {}
    ],
    "context.constraints": [
      {}
    ],
    "context.environment": "<string>"
  },
  "parameters": {},
  "mode": "<string>",
  "stream": true,
  "stream_format": "<string>",
  "session_id": "<string>",
  "user_id": "<string>"
}'

The Compose API is the main entry point for AI-powered workflow generation. It handles everything from understanding your request to generating and optionally executing workflows.

Overview

The compose() method provides a unified interface for:

  • Natural language to workflow transformation
  • Automatic validation and refinement
  • Optional execution with streaming
  • Multiple output formats (JSON, YAML, streaming)

HTTP Endpoint

POST /compose
Content-Type: application/json
Authorization: Bearer your-api-key

Request Body

task
string
required

The task description in natural language. Be specific and include requirements.

Examples:

  • “Create a workflow to backup PostgreSQL databases daily”
  • “Deploy a containerized app to Kubernetes with health checks”
  • “Set up CI/CD pipeline for Python project with tests”
context
object

Additional context to guide workflow generation.

parameters
object

Execution parameters (only used in act mode).

mode
string
default:"plan"

Operation mode:

  • "plan": Generate workflow only
  • "act": Generate and execute workflow
stream
boolean
default:"true"

Enable streaming response. When true, returns Server-Sent Events.

stream_format
string
default:"sse"

Streaming format:

  • "sse": Server-Sent Events format
  • "vercel": Vercel AI SDK format
  • null: Raw ADK events
session_id
string

Session ID for conversation continuity. Automatically generated if not provided.

user_id
string

User ID for namespacing and tracking. Defaults to "default_user".

Response Format

{
  "workflow": {
    "name": "backup-databases",
    "description": "Automated database backup workflow",
    "runner": "kubiya-hosted",
    "steps": [
      {
        "name": "backup_postgres",
        "tool": "pg_dump",
        "parameters": {
          "database": "production",
          "output": "/backups/db-backup.sql"
        }
      }
    ]
  },
  "metadata": {
    "generated_at": "2024-01-15T10:30:00Z",
    "model": "gpt-4o",
    "tokens_used": 1250
  }
}

Usage Examples

curl -X POST http://localhost:8001/compose \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer your-api-key" \
  -d '{
    "task": "Deploy my Node.js app to Kubernetes staging",
    "mode": "plan",
    "context": {
      "environment": "staging",
      "preferred_runner": "k8s-staging"
    }
  }'

Streaming Examples

# Stream the generation process
async for event in adk.compose(
    task="Deploy application to Kubernetes",
    mode="plan",
    stream=True
):
    # Handle SSE events
    if event.startswith("data: "):
        data = json.loads(event[6:])
        print(f"{data['type']}: {data.get('content', '')}")

Event Types

When streaming is enabled, various event types are emitted:

// Text generation progress
{"type": "text", "content": "Analyzing requirements..."}

// Tool calls (loading context)
{"type": "tool_call", "name": "get_runners", "arguments": {}}

// Tool results  
{"type": "tool_result", "name": "get_runners", "result": [...]}

// Workflow ready
{"type": "workflow", "data": {...}}

Error Handling

The compose API handles various error scenarios:

Advanced Configuration

Custom Models

from kubiya_workflow_sdk.providers.adk import ADKConfig

config = ADKConfig(
    model_overrides={
        "workflow_generator": "together_ai/Qwen/QwQ-32B-Preview",
        "refinement": "together_ai/deepseek-ai/DeepSeek-V3"
    }
)

adk = get_provider("adk", config=config)

Performance Tuning

config = ADKConfig(
    max_loop_iterations=5,      # More refinement attempts
    timeout=600,                # 10 minute timeout
    enable_caching=True,        # Cache context loading
    stream_buffer_size=2048     # Larger streaming buffer
)

Custom Filters

# Filter streaming events
async for event in adk.compose(
    task="...",
    stream=True,
    stream_filter={
        "include_tool_calls": False,  # Skip tool events
        "include_thoughts": True,      # Include reasoning
        "min_importance": "medium"     # Filter by importance
    }
):
    process_filtered_event(event)

Integration Examples

FastAPI Endpoint

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class ComposeRequest(BaseModel):
    task: str
    mode: str = "plan"
    context: dict = {}
    parameters: dict = {}

@app.post("/api/compose")
async def compose_workflow(request: ComposeRequest):
    try:
        adk = get_provider("adk")
        
        if request.mode == "plan":
            # Non-streaming for plan mode
            result = await adk.compose(
                task=request.task,
                context=request.context,
                mode="plan",
                stream=False
            )
            return result
        else:
            # For act mode, use websocket instead
            raise HTTPException(
                status_code=400,
                detail="Use WebSocket endpoint for act mode"
            )
            
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

WebSocket Streaming

from fastapi import WebSocket
import json

@app.websocket("/ws/compose")
async def compose_stream(websocket: WebSocket):
    await websocket.accept()
    
    try:
        # Receive request
        data = await websocket.receive_json()
        
        adk = get_provider("adk")
        
        # Stream responses
        async for event in adk.compose(
            task=data["task"],
            mode=data.get("mode", "plan"),
            context=data.get("context", {}),
            parameters=data.get("parameters", {}),
            stream=True,
            stream_format="vercel"
        ):
            await websocket.send_text(event)
            
    except Exception as e:
        await websocket.send_json({
            "type": "error",
            "message": str(e)
        })
    finally:
        await websocket.close()

Best Practices

Be Specific

Provide detailed task descriptions for better results

Use Context

Include relevant context about your environment

Handle Errors

Always implement proper error handling

Monitor Usage

Track token usage and generation times

Common Patterns

Retry with Refinement

async def compose_with_retry(task: str, max_attempts: int = 3):
    for attempt in range(max_attempts):
        try:
            result = await adk.compose(
                task=task,
                context={
                    "attempt": attempt + 1,
                    "previous_errors": locals().get("errors", [])
                }
            )
            return result
        except ProviderError as e:
            errors = locals().get("errors", [])
            errors.append(str(e))
            if attempt == max_attempts - 1:
                raise
            await asyncio.sleep(2 ** attempt)

Progress Tracking

class ProgressTracker:
    def __init__(self):
        self.stages = {
            "context_loading": False,
            "generation": False,
            "validation": False,
            "execution": False
        }
    
    async def track_compose(self, adk, task, mode):
        async for event in adk.compose(task=task, mode=mode, stream=True):
            # Update progress based on events
            if "Loading context" in str(event):
                self.stages["context_loading"] = True
            elif "Generating workflow" in str(event):
                self.stages["generation"] = True
            # ... etc
            
            yield event

Troubleshooting