Advanced Workflows

Master the advanced features of Kubiya workflows for production-grade automation.

Advanced Step Types

1. Shell Steps

Execute shell commands in any container:

result = step.shell(
    name="process-data",
    image="alpine:latest",
    command="""
    echo "Processing..."
    cat input.json | jq '.data[]' > output.json
    """,
    env={"API_KEY": "${secrets.API_KEY}"}
)

2. Python Steps

Run Python scripts with full ecosystem:

analysis = step.python(
    name="analyze-metrics",
    image="python:3.11-slim",
    packages=["pandas", "numpy", "scikit-learn"],
    script="""
import pandas as pd
import numpy as np

df = pd.read_csv('/data/metrics.csv')
summary = df.describe()
print(summary.to_json())
    """,
    volumes={"/data": "metrics-volume"}
)

3. Container Steps

Run any containerized application:

server = step.container(
    name="web-server",
    image="nginx:alpine",
    ports={"80": "8080"},
    volumes={
        "./config": "/etc/nginx/conf.d",
        "./static": "/usr/share/nginx/html"
    },
    health_check={
        "test": ["CMD", "curl", "-f", "http://localhost/health"],
        "interval": "30s",
        "retries": 3
    }
)

4. Inline Agent Steps

Embed AI decision-making:

decision = step.inline_agent(
    name="deployment-decision",
    message="Analyze these metrics and decide if we should deploy: ${metrics}",
    runners=["kubiya-hosted"],
    llm_model="gpt-4",
    is_debug_mode=True,
    tools=[
        {
            "name": "check-metrics",
            "type": "docker",
            "image": "datadog/agent:latest",
            "content": "datadog-check.sh",
            "args": {"threshold": "0.95"}
        }
    ]
)

Streaming and Real-Time Updates

SSE Streaming

from kubiya_workflow_sdk import Client
import json

client = Client(api_key="your-key")

# Stream workflow execution
for event in client.execute_workflow(workflow, stream=True):
    if event.type == "step.started":
        print(f"Starting: {event.data['step_name']}")
    elif event.type == "log":
        print(f"Log: {event.data['message']}")
    elif event.type == "step.completed":
        print(f"Completed: {event.data['step_name']}")

Vercel AI SDK Format

# For integration with Vercel AI SDK
async for chunk in client.stream_workflow(workflow, format="vercel"):
    # Chunk format compatible with Vercel AI SDK
    if chunk['type'] == '0':  # Text delta
        print(chunk['value'], end='')
    elif chunk['type'] == '8':  # Tool call
        tool_data = json.loads(chunk['value'])
        print(f"Tool: {tool_data['toolName']}")

Custom Stream Processing

class WorkflowStreamProcessor:
    def __init__(self):
        self.steps_completed = 0
        self.logs = []
        
    async def process_stream(self, workflow):
        async for event in workflow.stream():
            if event.type == "step.completed":
                self.steps_completed += 1
                await self.notify_progress(self.steps_completed)
            elif event.type == "log":
                self.logs.append(event.data)
                await self.send_to_monitoring(event.data)

Custom Providers

Creating a Custom Provider

from kubiya_workflow_sdk.providers import BaseProvider
from typing import Dict, Any, AsyncGenerator

class CustomLLMProvider(BaseProvider):
    """Custom provider for your LLM"""
    
    def __init__(self, api_key: str, model: str = "custom-model"):
        self.api_key = api_key
        self.model = model
        
    async def compose(
        self, 
        task: str,
        mode: str = "plan",
        context: Dict[str, Any] = None
    ) -> Dict[str, Any]:
        """Generate workflow from natural language"""
        # Your LLM logic here
        workflow = await self._call_llm(task, context)
        return {
            "workflow": workflow,
            "metadata": {"model": self.model}
        }
        
    async def stream_compose(
        self,
        task: str,
        mode: str = "plan"
    ) -> AsyncGenerator[Dict[str, Any], None]:
        """Stream workflow generation"""
        async for chunk in self._stream_llm(task):
            yield {
                "type": "token",
                "content": chunk
            }

Registering Custom Provider

from kubiya_workflow_sdk.providers import register_provider

# Register your provider
register_provider("custom-llm", CustomLLMProvider)

# Use it
provider = get_provider("custom-llm", api_key="...")
workflow = await provider.compose("Deploy my app")

Kubernetes Deployment

Simply create a local runner on the Kubiya platform web interface, REST API, or CLI to get a manifest, give your runner a name - and deploy it on your cluster -> You can now reference this runner string for workflow execution

Local Testing

# test_workflow.py
import pytest
from kubiya_workflow_sdk import Workflow, Client, Step

def test_data_pipeline():
    workflow = Workflow(
        name="test-pipeline",
        steps=[
            Step("fetch", "curl -o data.json https://api.example.com/data"),
            Step("process", "jq '.items[]' data.json > processed.json"),
            Step("validate", "python validate.py processed.json")
        ]
    )
    
    # Use client with your test runner
    client = Client(api_key="test-key")
    result = client.execute_workflow(
        workflow,
        runner="your-test-runner"  # Created via Kubiya platform
    )
    assert result.success
    assert len(result.steps) == 3

Debug Mode

# Enable debug mode for detailed logs
workflow = Workflow(
    name="debug-workflow",
    debug=True,
    log_level="DEBUG"
)

# Step-through debugging
for step in workflow.steps:
    print(f"Executing: {step.name}")
    result = step.execute(breakpoint=True)
    
    # Inspect state
    print(f"Output: {result.output}")
    print(f"Logs: {result.logs}")
    
    # Continue or modify
    if input("Continue? (y/n): ") != "y":
        break

Production Best Practices

1. Error Handling

@workflow
def production_pipeline():
    try:
        data = step.fetch_critical_data(
            retry=3,
            retry_interval="exponential",
            timeout="5m",
            on_failure="continue"  # Don't fail entire workflow
        )
    except StepError as e:
        # Fallback logic
        data = step.fetch_cached_data()
        step.alert_team(f"Using cached data: {e}")

2. Monitoring & Observability

# Add monitoring hooks
workflow.add_hook("before_step", lambda s: metrics.increment(f"step.{s.name}.started"))
workflow.add_hook("after_step", lambda s, r: metrics.histogram(f"step.{s.name}.duration", r.duration))

# Custom spans
with workflow.span("critical-section"):
    result = step.critical_operation()

3. Secrets Management

# Use Kubernetes secrets
step.database_operation(
    env={
        "DB_PASSWORD": "${k8s:secret/db-credentials/password}",
        "API_KEY": "${vault:secret/api-keys/external}"
    }
)

# Or environment variables
step.secure_operation(
    env={
        "TOKEN": "${env:SECURE_TOKEN}",
        "CERT": "${file:/secrets/cert.pem}"
    }
)

4. Caching & Artifacts

# Cache step outputs
@step.cache(key="data-fetch-${date}", ttl="1h")
def fetch_expensive_data():
    return step.shell("./fetch-data.sh")

# Store artifacts
step.generate_report(
    output_artifacts={
        "report.pdf": "/reports/",
        "metrics.json": "/metrics/"
    }
)

Advanced Patterns

Dynamic DAG Generation

@workflow
def dynamic_pipeline(regions: list):
    # Generate steps dynamically
    results = []
    for region in regions:
        result = step.process_region(
            name=f"process-{region}",
            env={"REGION": region},
            parallel=True  # Run all regions in parallel
        )
        results.append(result)
    
    # Wait for all and aggregate
    step.aggregate_results(
        inputs=results,
        wait_for_all=True
    )

Conditional Workflows

@workflow
def conditional_deployment():
    test_result = step.run_tests()
    
    if test_result.exit_code == 0:
        if test_result.coverage > 80:
            step.deploy_to_production()
        else:
            step.deploy_to_staging()
            step.alert_team("Low coverage: ${test_result.coverage}%")
    else:
        step.rollback()
        step.create_incident()

Map-Reduce Patterns

@workflow
def map_reduce_analysis():
    # Map phase - parallel processing
    chunks = step.split_data(chunks=10)
    
    processed = step.map(
        over=chunks,
        operation=lambda chunk: step.process_chunk(
            data=chunk,
            image="processor:latest"
        )
    )
    
    # Reduce phase
    result = step.reduce(
        data=processed,
        operation="merge",
        image="reducer:latest"
    )

Troubleshooting

Common Issues

Performance Optimization

1. Parallel Execution

# Maximize parallelism
workflow.configure(
    max_parallel_steps=20,
    queue_size=100
)

2. Resource Pooling

# Reuse containers for similar steps
workflow.configure(
    container_reuse=True,
    pool_size=10
)

3. Caching Strategies

# Layer caching for builds
step.build_image(
    cache_from=["registry/base:latest"],
    build_args={"BUILDKIT_INLINE_CACHE": "1"}
)

Next Steps