Workflow Examples

Explore real-world examples that demonstrate Kubiya’s power in orchestrating containers, integrating AI agents, and creating complex automation workflows.

Basic Workflow Patterns

Sequential Workflow

Simple sequential execution of steps:

from kubiya_workflow_sdk import workflow, step

# Basic sequential workflow
wf = (
    workflow("data-pipeline")
    .description("ETL pipeline for data processing")
    .step("extract", "python extract.py")
    .step("transform", "python transform.py") 
    .step("load", "python load.py")
)

Parallel Execution

Run multiple tasks in parallel:

from kubiya_workflow_sdk import workflow

# Process multiple files in parallel
wf = (
    workflow("parallel-processor")
    .description("Process multiple files concurrently")
    .parallel_steps(
        "process-files",
        items=["file1.csv", "file2.csv", "file3.csv"],
        command="python process.py ${ITEM}",
        max_concurrent=2
    )
)

Container Orchestration

Multi-Container Pipeline

Orchestrate different containers for each step:

from kubiya_workflow_sdk import workflow, step

@workflow
def build_and_deploy():
    """Build, test, and deploy application using different containers"""
    
    # Build with Node.js
    build = step("build").docker(
        image="node:18-alpine",
        command="npm run build"
    ).env(NODE_ENV="production")
    
    # Test with specialized test container
    test = step("test").docker(
        image="cypress/included:latest",
        command="cypress run"
    ).depends("build")
    
    # Deploy with kubectl
    deploy = step("deploy").docker(
        image="bitnami/kubectl:latest",
        command="kubectl apply -f k8s/"
    ).depends("test")
    
    return workflow("ci-cd-pipeline").add_steps([build, test, deploy])

Database Migration with Services

Use bounded services for database operations:

from kubiya_workflow_sdk import step

# Database migration with temporary PostgreSQL
migration_step = (
    step("db-migration")
    .tool_def(
        name="migrator",
        type="docker",
        image="migrate/migrate:latest",
        content="""#!/bin/sh
migrate -path /migrations -database postgres://user:pass@database:5432/db up
""",
        args=[],
        with_services=[{
            "name": "database",
            "image": "postgres:15-alpine",
            "exposed_ports": [5432],
            "env": {
                "POSTGRES_USER": "user",
                "POSTGRES_PASSWORD": "pass",
                "POSTGRES_DB": "db"
            }
        }]
    )
)

AI Agent Integration

Serverless AI Agents in Workflows

Serverless Agents: Kubiya’s inline agents are serverless AI components that use state-of-the-art function calling to execute workflow tools. No complex agent frameworks needed - just simple workflow steps!

Inline AI Agent for Log Analysis

Integrate AI agents directly in workflows using inline_agent with function calling:

from kubiya_workflow_sdk import workflow, step

@workflow
def intelligent_log_analysis():
    """Analyze logs using serverless AI agent with function calling"""
    
    # Fetch logs
    fetch = step("fetch-logs").docker(
        image="awscli/aws-cli:latest",
        command="aws logs tail /aws/lambda/my-function --since 1h --output text"
    ).output("LOG_CONTENT")
    
    # Serverless AI agent analyzes logs
    analyze = (
        step("analyze-logs")
        .inline_agent(
            message="Analyze these logs and identify errors, patterns, and anomalies: ${LOG_CONTENT}",
            agent_name="log-analyzer",
            ai_instructions="""You are a log analysis expert. 
            Identify:
            1. Error patterns and their frequency
            2. Performance bottlenecks  
            3. Security concerns
            4. Recommendations for improvement
            
            Use the provided tools to gather additional context.
            Format your response as structured JSON with keys:
            error_summary, performance_issues, security_risks, recommendations""",
            runners=["kubiya-hosted"],  # Serverless execution
            llm_model="gpt-4o",
            is_debug_mode=True,
            
            # Agent uses function calling to execute tools
            tools=[
                {
                    "name": "parse-json-logs",
                    "type": "docker",
                    "image": "stedolan/jq:latest",
                    "content": """#!/bin/sh
# Parse JSON formatted log entries
cat /tmp/logs.txt | jq -r 'select(.level=="ERROR") | .message' | sort | uniq -c
""",
                    "args": []
                },
                {
                    "name": "extract-timestamps",
                    "type": "docker",
                    "image": "alpine:latest",
                    "content": """#!/bin/sh
# Extract and analyze timestamps
grep -oE '[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}' /tmp/logs.txt | sort | uniq -c
""",
                    "args": []
                }
            ]
        )
        .depends("fetch-logs")
        .output("ANALYSIS_RESULT")
    )
    
    # Generate report
    report = step("generate-report").docker(
        image="python:3.11-slim",
        script="""
import json
import os

analysis = json.loads(os.environ['ANALYSIS_RESULT'])

# Generate markdown report
report = f"""# Log Analysis Report

## Error Summary
{analysis.get('error_summary', 'No errors found')}

## Performance Issues
{', '.join(analysis.get('performance_issues', [])) or 'None detected'}

## Security Risks  
{', '.join(analysis.get('security_risks', [])) or 'None detected'}

## Recommendations
{chr(10).join('- ' + r for r in analysis.get('recommendations', []))}
"""

with open('/tmp/report.md', 'w') as f:
    f.write(report)
    
print("Report generated at /tmp/report.md")
"""
    ).depends("analyze-logs")
    
    return workflow("log-analysis").add_steps([fetch, analyze, report])

AI-Powered Code Review with Function Calling

Use inline agents for intelligent code review:

from kubiya_workflow_sdk import workflow, step

@workflow  
def ai_code_review(pr_number: int):
    """AI-powered pull request review using serverless agents"""
    
    # Fetch PR diff
    fetch_pr = step("fetch-pr").docker(
        image="alpine/git:latest",
        command=f"git diff origin/main...origin/pr-{pr_number} > /tmp/pr.diff && cat /tmp/pr.diff"
    ).output("PR_DIFF")
    
    # Serverless AI agent reviews the code
    review = (
        step("ai-review")
        .inline_agent(
            message="""Review this pull request diff:
            
${PR_DIFF}

Provide feedback on:
1. Code quality and best practices
2. Potential bugs or issues
3. Performance implications
4. Security concerns
5. Suggestions for improvement""",
            agent_name="code-reviewer",
            ai_instructions="""You are a senior software engineer reviewing code.
            Be constructive, specific, and provide actionable feedback.
            Use the provided tools to analyze code complexity and patterns.
            Point out both issues and good practices.
            Format as markdown with code examples where relevant.
            
            Structure your response with sections:
            ## Summary
            ## Issues Found
            ## Suggestions
            ## Security Considerations""",
            runners=["kubiya-hosted"],  # Serverless execution
            llm_model="gpt-4o",
            is_debug_mode=True,
            
            # Function calling for code analysis
            tools=[
                {
                    "name": "count-lines-changed",
                    "type": "docker",
                    "image": "alpine:latest", 
                    "content": """#!/bin/sh
# Count added/removed lines
echo "Added: $(grep -c '^+' /tmp/pr.diff)"
echo "Removed: $(grep -c '^-' /tmp/pr.diff)"
""",
                    "args": []
                },
                {
                    "name": "detect-patterns",
                    "type": "docker",
                    "image": "alpine:latest",
                    "content": """#!/bin/sh  
# Detect common problematic patterns
echo "Checking for common issues..."
grep -n "TODO\\|FIXME\\|HACK" /tmp/pr.diff || echo "No TODOs found"
grep -n "console\\.log\\|print(" /tmp/pr.diff || echo "No debug statements found"
""",
                    "args": []
                }
            ]
        )
        .depends("fetch-pr")
        .output("REVIEW_COMMENTS")
    )
    
    # Post review comment
    post = step("post-review").docker(
        image="alpine:latest",
        command=f"""
echo "Review for PR #{pr_number}:"
echo ""
cat << 'EOF'
${{REVIEW_COMMENTS}}
EOF
"""
    ).depends("ai-review")
    
    return workflow("pr-review").add_steps([fetch_pr, review, post])

Intelligent Incident Response

Serverless agents for automated incident response:

@workflow
def incident_response():
    """Automated incident response with AI decision making"""
    
    # Detect incident (simulate with metrics check)
    detect = step("detect-incident").docker(
        image="alpine:latest",
        command="""
# Simulate incident detection
echo '{
  "service": "api-gateway",
  "error_rate": 15.2,
  "response_time": 2500,
  "status_5xx": 127,
  "timestamp": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
}'
"""
    ).output("INCIDENTS")
    
    # AI agent analyzes and responds
    respond = (
        step("ai-responder")
        .inline_agent(
            message="Analyze this incident and determine response: ${INCIDENTS}",
            agent_name="incident-commander", 
            ai_instructions="""You are an incident response expert.
            For the incident:
            1. Assess severity (P1/P2/P3/P4)
            2. Determine likely root cause
            3. Recommend immediate actions
            4. Create runbook steps for resolution
            
            Use tools to gather system state before making recommendations.
            
            Output JSON with structure:
            {
              "severity": "P1-P4",
              "impact": "description",
              "root_cause": "likely cause",
              "immediate_actions": ["action1", "action2"],
              "runbook_steps": ["step1", "step2"],
              "escalation_needed": true|false
            }""",
            
            # Serverless execution with powerful tools
            runners=["kubiya-hosted"],
            llm_model="gpt-4o", 
            is_debug_mode=True,
            
            tools=[
                {
                    "name": "check-service-health",
                    "type": "docker",
                    "image": "curlimages/curl:latest",
                    "content": """#!/bin/sh
# Check service health endpoint
SERVICE_NAME=$(echo $1 | jq -r '.service')
echo "Checking health for $SERVICE_NAME..."
# Simulate health check
echo '{"status": "degraded", "healthy_instances": 2, "total_instances": 5}'
""",
                    "args": [{"name": "incident_data", "type": "string"}]
                },
                {
                    "name": "get-recent-deployments",
                    "type": "docker",
                    "image": "alpine:latest",
                    "content": """#!/bin/sh
# Check recent deployments
echo "Recent deployments:"
echo "- 2h ago: v2.3.1 (api-gateway)"
echo "- 4h ago: v1.8.0 (auth-service)"
echo "- 24h ago: v3.2.0 (database)"
""",
                    "args": []
                },
                {
                    "name": "check-resource-usage",
                    "type": "docker",
                    "image": "alpine:latest",
                    "content": """#!/bin/sh
# Simulate resource check
echo '{
  "cpu_usage": "78%",
  "memory_usage": "92%", 
  "disk_usage": "45%",
  "network_in": "120MB/s",
  "network_out": "95MB/s"
}'
""",
                    "args": []
                }
            ]
        )
        .output("RESPONSE_PLAN")
        .depends("detect-incident")
    )
    
    # Execute response plan
    execute = step("execute-response").shell("""
echo "=== Executing Incident Response Plan ==="
echo ""
echo "Response Plan:"
echo "${RESPONSE_PLAN}" | jq '.'

SEVERITY=$(echo "${RESPONSE_PLAN}" | jq -r '.severity')
echo ""
echo "Severity: $SEVERITY"

if [ "$SEVERITY" = "P1" ] || [ "$SEVERITY" = "P2" ]; then
    echo "⚠️  HIGH PRIORITY INCIDENT - Executing immediate actions..."
    # In real scenario, execute actual remediation
fi
""").depends("ai-responder")
    
    # Send notification
    notify = step("notify-team").docker(
        image="alpine:latest",
        command="""
SEVERITY=$(echo "${RESPONSE_PLAN}" | jq -r '.severity')
IMPACT=$(echo "${RESPONSE_PLAN}" | jq -r '.impact')

echo "📢 Incident Alert:"
echo "Severity: $SEVERITY"  
echo "Impact: $IMPACT"
echo "See full response plan above"
"""
    ).depends("execute-response")
    
    return workflow("incident-response").add_steps([detect, respond, execute, notify])

Tool Definitions

Custom CLI Tool

Define reusable tools inline:

from kubiya_workflow_sdk import step

# Slack notification tool
notify = (
    step("notify-team")
    .tool_def(
        name="slack-notifier",
        type="docker",
        image="curlimages/curl:latest",
        content="""#!/bin/sh
set -e
curl -X POST "$SLACK_WEBHOOK" \
  -H "Content-Type: application/json" \
  -d "{\\"text\\": \\"$message\\", \\"channel\\": \\"$channel\\"}"
""",
        args=[
            {"name": "message", "type": "string", "required": True},
            {"name": "channel", "type": "string", "default": "#general"},
            {"name": "SLACK_WEBHOOK", "type": "string", "required": True}
        ]
    )
    .args(
        message="Deployment completed successfully!",
        channel="#deployments",
        SLACK_WEBHOOK="${SLACK_WEBHOOK_URL}"
    )
)

Data Processing Tool with Files

Tool with configuration files:

transform_step = (
    step("transform-data")
    .tool_def(
        name="data-transformer",
        type="docker",
        image="python:3.11-slim",
        content="""#!/usr/bin/env python
import pandas as pd
import sys

df = pd.read_csv(sys.argv[1])
# Apply transformations from config
exec(open('/config/transform.py').read())
df.to_csv(sys.argv[2], index=False)
""",
        args=[
            {"name": "input_file", "type": "string"},
            {"name": "output_file", "type": "string"}
        ],
        with_files=[{
            "path": "/config/transform.py",
            "content": """
# Transformation logic
df['processed'] = df['value'] * 2
df = df[df['processed'] > 100]
"""
        }]
    )
    .args(input_file="data.csv", output_file="processed.csv")
)

Complex DAG Workflows

Multi-Stage Data Pipeline

Complex DAG with conditional execution:

from kubiya_workflow_sdk import workflow, step

@workflow
def etl_pipeline(source: str, target: str):
    """Complex ETL pipeline with validation and error handling"""
    
    # Extract from multiple sources
    extract_db = step("extract-db").docker(
        image="postgres:15",
        command="pg_dump -h ${DB_HOST} -U ${DB_USER} -d ${DB_NAME}"
    ).output("DB_DATA")
    
    extract_api = step("extract-api").docker(
        image="python:3.11",
        script="""
import requests
data = requests.get(os.environ['API_URL']).json()
print(json.dumps(data))
"""
    ).output("API_DATA")
    
    # Validate data quality
    validate = step("validate").docker(
        image="python:3.11",
        script="""
import json
db_data = json.loads(os.environ['DB_DATA'])
api_data = json.loads(os.environ['API_DATA'])

# Validation logic
errors = []
if len(db_data) == 0:
    errors.append("No DB data")
if 'required_field' not in api_data:
    errors.append("Missing required field")

if errors:
    print(json.dumps({"valid": False, "errors": errors}))
    sys.exit(1)
else:
    print(json.dumps({"valid": True}))
"""
    ).depends(["extract-db", "extract-api"])
    .output("VALIDATION_RESULT")
    
    # Transform data
    transform = step("transform").docker(
        image="apache/spark:3.4",
        command="spark-submit transform.py"
    ).depends("validate")
    .preconditions("${VALIDATION_RESULT.valid} == true")
    
    # Load to warehouse
    load = step("load").docker(
        image="snowflake/snowcli:latest",
        command="snow stage copy @mystage ${TRANSFORMED_DATA}"
    ).depends("transform")
    .retry(limit=3, interval_sec=60)
    
    # Error handling path
    error_notify = (
        step("notify-error")
        .inline_agent(
            message="Generate error report for validation failures: ${VALIDATION_RESULT}",
            agent_name="error-reporter",
            ai_instructions="Create a detailed error report with remediation steps",
            runners=["auto"]
        )
        .depends("validate")
        .preconditions("${VALIDATION_RESULT.valid} == false")
    )
    
    return workflow("etl-pipeline").add_steps([
        extract_db, extract_api, validate, transform, load, error_notify
    ])

Control Flow Examples

Retry and Error Handling

Robust error handling patterns:

from kubiya_workflow_sdk import step

# Step with comprehensive retry policy
resilient_step = (
    step("fetch-external-data")
    .docker(
        image="curlimages/curl:latest",
        command="curl -f https://api.example.com/data"
    )
    .retry(
        limit=5,
        interval_sec=30,
        exponential_base=2.0,
        max_interval_sec=300
    )
    .timeout(300)  # 5 minute timeout
    .continue_on(
        exit_code=[404, 503],  # Continue on specific errors
        mark_success=True
    )
    .output("API_RESPONSE")
)

# Cleanup that always runs
cleanup = (
    step("cleanup")
    .shell("rm -rf /tmp/workspace/*")
    .continue_on(failure=True)  # Always run, even on failure
)

Conditional Execution

Execute steps based on conditions:

# Production deployment with safety checks
deploy_prod = (
    step("deploy-production")
    .docker(
        image="kubectl:latest",
        command="kubectl apply -f production.yaml"
    )
    .preconditions(
        "${TEST_RESULT.coverage} > 80",
        "${SECURITY_SCAN.vulnerabilities} == 0"
    )
    .depends(["test", "security-scan"])
)

# Staging deployment (less strict)
deploy_staging = (
    step("deploy-staging")
    .docker(
        image="kubectl:latest", 
        command="kubectl apply -f staging.yaml"
    )
    .preconditions("${TEST_RESULT.passed} == true")
    .depends("test")
)

ADK Provider Integration

While the workflow DSL focuses on execution, the ADK provider can generate workflows using AI:

from kubiya_workflow_sdk.providers import get_provider

# Use ADK provider to generate workflows from natural language
async def generate_workflow():
    adk = get_provider("adk")
    
    # Generate workflow from description
    result = await adk.compose(
        task="Create a CI/CD pipeline that builds a Node.js app, runs tests, and deploys to Kubernetes",
        mode="plan"  # Just generate, don't execute
    )
    
    # The provider generates a complete workflow using the SDK
    print(result.workflow_yaml)
    
    # Execute if desired
    if result.workflow:
        execution = await adk.compose(
            task="Execute the generated workflow",
            mode="act",  # Generate and execute
            workflow=result.workflow
        )

Best Practices

1. Use Appropriate Executors

# Shell for simple commands
step("list-files").shell("ls -la")

# Docker for isolated environments
step("build").docker("node:18", "npm run build")

# Inline agent for AI tasks
step("analyze").inline_agent(
    message="Analyze this data",
    agent_name="analyzer",
    ai_instructions="You are a data analyst..."
)

2. Handle Dependencies Properly

# Single dependency
step("deploy").depends("test")

# Multiple dependencies
step("report").depends(["analyze", "validate", "transform"])

# Conditional dependencies
step("notify").depends("deploy").preconditions("${DEPLOY_STATUS} == 'success'")

3. Capture and Use Outputs

# Capture output
step("get-version").shell("git describe --tags").output("VERSION")

# Use in next step
step("build").docker(
    "docker:latest",
    "docker build -t myapp:${VERSION} ."
).depends("get-version")

4. Error Handling

# Retry transient failures
step("download").retry(limit=3, interval_sec=30)

# Continue on specific errors
step("optional-check").continue_on(exit_code=[404])

# Always run cleanup
step("cleanup").continue_on(failure=True)

Next Steps