ADK Streaming

Streaming is a core feature of ADK that enables real-time feedback during workflow generation and execution. This creates responsive AI applications where users see progress as it happens.

Why Streaming Matters

When using ADK to generate and execute workflows, operations can take time:

  • AI Generation: The LLM needs time to analyze and create workflows
  • Execution: Workflows may have multiple steps that run sequentially
  • Tool Calls: Context gathering and tool execution add latency

Streaming solves this by providing immediate feedback at every stage, creating a better user experience.

Architecture Overview

Streaming Formats

Standard format for web applications:

async for event in adk.compose(
    task="Deploy application",
    stream=True,
    stream_format="sse"
):
    # Events formatted as SSE
    # data: {"type": "text", "content": "Generating workflow..."}
    print(event)

Event Types

Generation Events

Events emitted during workflow generation:

# Text generation
{
    "type": "text",
    "content": "Analyzing requirements..."
}

# Tool calls (context loading)
{
    "type": "tool_call",
    "name": "get_runners",
    "arguments": {}
}

# Tool results
{
    "type": "tool_result",
    "name": "get_runners",
    "result": {"runners": [...]}
}

# Workflow ready
{
    "type": "workflow",
    "data": {
        "name": "deploy-app",
        "steps": [...]
    }
}

Execution Events

Events during workflow execution:

# Execution start
{
    "type": "execution_start",
    "workflow": "deploy-app",
    "run_id": "run_123"
}

# Step progress
{
    "type": "step_start",
    "step": "build-image",
    "index": 1
}

{
    "type": "step_output",
    "step": "build-image",
    "output": "Building Docker image..."
}

{
    "type": "step_complete",
    "step": "build-image",
    "status": "success",
    "duration": 45.2
}

# Execution complete
{
    "type": "execution_complete",
    "status": "success",
    "duration": 120.5
}

Streaming Examples

Basic Streaming

from kubiya_workflow_sdk.providers import get_provider
import asyncio

async def stream_workflow_generation():
    adk = get_provider("adk")
    
    async for event in adk.compose(
        task="Create a backup workflow for PostgreSQL databases",
        mode="plan",
        stream=True
    ):
        # Parse SSE format
        if event.startswith("data: "):
            data = json.loads(event[6:])
            
            if data["type"] == "text":
                print(f"AI: {data['content']}")
            elif data["type"] == "workflow":
                print(f"Generated workflow: {data['data']['name']}")

asyncio.run(stream_workflow_generation())

Streaming with UI Updates

# For a web application
async def handle_request(task: str, websocket):
    adk = get_provider("adk")
    
    async for event in adk.compose(
        task=task,
        mode="act",
        stream=True,
        stream_format="vercel"
    ):
        # Send to frontend via WebSocket
        await websocket.send_json(event)
        
        # Update UI based on event type
        if event.get("type") == "step_complete":
            await update_progress_bar(event["step"])

Error Handling in Streams

async def stream_with_error_handling():
    adk = get_provider("adk")
    
    try:
        async for event in adk.compose(
            task="Deploy to production",
            mode="act",
            stream=True
        ):
            if event.get("type") == "error":
                # Handle errors gracefully
                await notify_error(event["message"])
                break
            
            # Process normal events
            await process_event(event)
            
    except StreamingError as e:
        # Handle streaming-specific errors
        logger.error(f"Streaming failed: {e}")

Advanced Streaming Features

Filtering Events

Control which events you receive:

async for event in adk.compose(
    task="...",
    stream=True,
    stream_filter={
        "include_tool_calls": False,  # Skip tool events
        "include_thoughts": True,      # Include AI reasoning
        "include_metrics": True        # Include performance data
    }
):
    process_filtered_event(event)

Event Buffering

Handle high-frequency events:

from collections import deque
import asyncio

class EventBuffer:
    def __init__(self, max_size=100):
        self.buffer = deque(maxlen=max_size)
        self.lock = asyncio.Lock()
    
    async def add(self, event):
        async with self.lock:
            self.buffer.append(event)
    
    async def process_batch(self):
        async with self.lock:
            batch = list(self.buffer)
            self.buffer.clear()
        
        # Process events in batch
        await bulk_update_ui(batch)

# Usage
buffer = EventBuffer()

async for event in adk.compose(task="...", stream=True):
    await buffer.add(event)
    
    # Process buffer periodically
    if len(buffer.buffer) >= 10:
        await buffer.process_batch()

Stream Transformation

Transform events before processing:

class StreamTransformer:
    def __init__(self, adk_stream):
        self.stream = adk_stream
    
    async def __aiter__(self):
        async for event in self.stream:
            # Transform event
            transformed = await self.transform(event)
            if transformed:
                yield transformed
    
    async def transform(self, event):
        # Add timestamps
        event["timestamp"] = datetime.now().isoformat()
        
        # Enhance with metadata
        if event.get("type") == "step_complete":
            event["metrics"] = await get_step_metrics(event["step"])
        
        return event

# Usage
stream = adk.compose(task="...", stream=True)
transformer = StreamTransformer(stream)

async for enhanced_event in transformer:
    process_enhanced_event(enhanced_event)

Integration Examples

FastAPI Streaming Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/generate-workflow")
async def generate_workflow(request: WorkflowRequest):
    adk = get_provider("adk")
    
    async def event_generator():
        async for event in adk.compose(
            task=request.task,
            mode=request.mode,
            stream=True,
            stream_format="sse"
        ):
            yield f"{event}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

React Frontend Integration

// React component for streaming
function WorkflowGenerator() {
  const [events, setEvents] = useState([]);
  
  const generateWorkflow = async (task) => {
    const response = await fetch('/generate-workflow', {
      method: 'POST',
      body: JSON.stringify({ task, mode: 'act' }),
      headers: { 'Content-Type': 'application/json' }
    });
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      const chunk = decoder.decode(value);
      const event = JSON.parse(chunk.replace('data: ', ''));
      
      setEvents(prev => [...prev, event]);
      
      // Update UI based on event type
      if (event.type === 'workflow') {
        displayWorkflow(event.data);
      }
    }
  };
  
  // Render events...
}

Performance Considerations

Debugging Streams

Event Logging

Log all streaming events:

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("stream_debug")

async for event in adk.compose(task="...", stream=True):
    logger.debug(f"Event: {json.dumps(event, indent=2)}")
    process_event(event)

Stream Recording

Record streams for replay:

class StreamRecorder:
    def __init__(self):
        self.events = []
        self.start_time = time.time()
    
    async def record(self, stream):
        async for event in stream:
            self.events.append({
                "timestamp": time.time() - self.start_time,
                "event": event
            })
            yield event
    
    def save(self, filename):
        with open(filename, 'w') as f:
            json.dump(self.events, f, indent=2)

# Usage
recorder = StreamRecorder()
stream = adk.compose(task="...", stream=True)

async for event in recorder.record(stream):
    process_event(event)

recorder.save("stream_recording.json")

Best Practices

Use Appropriate Format

Choose SSE for web apps, Vercel for AI SDK compatibility

Handle Errors Gracefully

Always implement error handling for network issues

Implement Timeouts

Set reasonable timeouts to prevent hanging

Monitor Performance

Track streaming metrics and optimize buffers

Troubleshooting

Next Steps