Introduction
Core Concepts
Full Stack Agents
Frontend & UI
Platform & Tools
Framework Examples
Deployment
Tutorials
Resources
Workflow Examples
Real-world examples showcasing container orchestration, inline AI agents, and automation
Workflow Examples
Explore real-world examples that demonstrate Kubiya’s power in orchestrating containers, integrating AI agents, and creating complex automation workflows.
Basic Workflow Patterns
Sequential Workflow
Simple sequential execution of steps:
from kubiya_workflow_sdk import workflow, step
# Basic sequential workflow
wf = (
workflow("data-pipeline")
.description("ETL pipeline for data processing")
.step("extract", "python extract.py")
.step("transform", "python transform.py")
.step("load", "python load.py")
)
Parallel Execution
Run multiple tasks in parallel:
from kubiya_workflow_sdk import workflow
# Process multiple files in parallel
wf = (
workflow("parallel-processor")
.description("Process multiple files concurrently")
.parallel_steps(
"process-files",
items=["file1.csv", "file2.csv", "file3.csv"],
command="python process.py ${ITEM}",
max_concurrent=2
)
)
Container Orchestration
Multi-Container Pipeline
Orchestrate different containers for each step:
from kubiya_workflow_sdk import workflow, step
@workflow
def build_and_deploy():
"""Build, test, and deploy application using different containers"""
# Build with Node.js
build = step("build").docker(
image="node:18-alpine",
command="npm run build"
).env(NODE_ENV="production")
# Test with specialized test container
test = step("test").docker(
image="cypress/included:latest",
command="cypress run"
).depends("build")
# Deploy with kubectl
deploy = step("deploy").docker(
image="bitnami/kubectl:latest",
command="kubectl apply -f k8s/"
).depends("test")
return workflow("ci-cd-pipeline").add_steps([build, test, deploy])
Database Migration with Services
Use bounded services for database operations:
from kubiya_workflow_sdk import step
# Database migration with temporary PostgreSQL
migration_step = (
step("db-migration")
.tool_def(
name="migrator",
type="docker",
image="migrate/migrate:latest",
content="""#!/bin/sh
migrate -path /migrations -database postgres://user:pass@database:5432/db up
""",
args=[],
with_services=[{
"name": "database",
"image": "postgres:15-alpine",
"exposed_ports": [5432],
"env": {
"POSTGRES_USER": "user",
"POSTGRES_PASSWORD": "pass",
"POSTGRES_DB": "db"
}
}]
)
)
AI Agent Integration
Serverless AI Agents in Workflows
Serverless Agents: Kubiya’s inline agents are serverless AI components that use state-of-the-art function calling to execute workflow tools. No complex agent frameworks needed - just simple workflow steps!
Inline AI Agent for Log Analysis
Integrate AI agents directly in workflows using inline_agent with function calling:
from kubiya_workflow_sdk import workflow, step
@workflow
def intelligent_log_analysis():
"""Analyze logs using serverless AI agent with function calling"""
# Fetch logs
fetch = step("fetch-logs").docker(
image="awscli/aws-cli:latest",
command="aws logs tail /aws/lambda/my-function --since 1h --output text"
).output("LOG_CONTENT")
# Serverless AI agent analyzes logs
analyze = (
step("analyze-logs")
.inline_agent(
message="Analyze these logs and identify errors, patterns, and anomalies: ${LOG_CONTENT}",
agent_name="log-analyzer",
ai_instructions="""You are a log analysis expert.
Identify:
1. Error patterns and their frequency
2. Performance bottlenecks
3. Security concerns
4. Recommendations for improvement
Use the provided tools to gather additional context.
Format your response as structured JSON with keys:
error_summary, performance_issues, security_risks, recommendations""",
runners=["kubiya-hosted"], # Serverless execution
llm_model="gpt-4o",
is_debug_mode=True,
# Agent uses function calling to execute tools
tools=[
{
"name": "parse-json-logs",
"type": "docker",
"image": "stedolan/jq:latest",
"content": """#!/bin/sh
# Parse JSON formatted log entries
cat /tmp/logs.txt | jq -r 'select(.level=="ERROR") | .message' | sort | uniq -c
""",
"args": []
},
{
"name": "extract-timestamps",
"type": "docker",
"image": "alpine:latest",
"content": """#!/bin/sh
# Extract and analyze timestamps
grep -oE '[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}' /tmp/logs.txt | sort | uniq -c
""",
"args": []
}
]
)
.depends("fetch-logs")
.output("ANALYSIS_RESULT")
)
# Generate report
report = step("generate-report").docker(
image="python:3.11-slim",
script="""
import json
import os
analysis = json.loads(os.environ['ANALYSIS_RESULT'])
# Generate markdown report
report = f"""# Log Analysis Report
## Error Summary
{analysis.get('error_summary', 'No errors found')}
## Performance Issues
{', '.join(analysis.get('performance_issues', [])) or 'None detected'}
## Security Risks
{', '.join(analysis.get('security_risks', [])) or 'None detected'}
## Recommendations
{chr(10).join('- ' + r for r in analysis.get('recommendations', []))}
"""
with open('/tmp/report.md', 'w') as f:
f.write(report)
print("Report generated at /tmp/report.md")
"""
).depends("analyze-logs")
return workflow("log-analysis").add_steps([fetch, analyze, report])
AI-Powered Code Review with Function Calling
Use inline agents for intelligent code review:
from kubiya_workflow_sdk import workflow, step
@workflow
def ai_code_review(pr_number: int):
"""AI-powered pull request review using serverless agents"""
# Fetch PR diff
fetch_pr = step("fetch-pr").docker(
image="alpine/git:latest",
command=f"git diff origin/main...origin/pr-{pr_number} > /tmp/pr.diff && cat /tmp/pr.diff"
).output("PR_DIFF")
# Serverless AI agent reviews the code
review = (
step("ai-review")
.inline_agent(
message="""Review this pull request diff:
${PR_DIFF}
Provide feedback on:
1. Code quality and best practices
2. Potential bugs or issues
3. Performance implications
4. Security concerns
5. Suggestions for improvement""",
agent_name="code-reviewer",
ai_instructions="""You are a senior software engineer reviewing code.
Be constructive, specific, and provide actionable feedback.
Use the provided tools to analyze code complexity and patterns.
Point out both issues and good practices.
Format as markdown with code examples where relevant.
Structure your response with sections:
## Summary
## Issues Found
## Suggestions
## Security Considerations""",
runners=["kubiya-hosted"], # Serverless execution
llm_model="gpt-4o",
is_debug_mode=True,
# Function calling for code analysis
tools=[
{
"name": "count-lines-changed",
"type": "docker",
"image": "alpine:latest",
"content": """#!/bin/sh
# Count added/removed lines
echo "Added: $(grep -c '^+' /tmp/pr.diff)"
echo "Removed: $(grep -c '^-' /tmp/pr.diff)"
""",
"args": []
},
{
"name": "detect-patterns",
"type": "docker",
"image": "alpine:latest",
"content": """#!/bin/sh
# Detect common problematic patterns
echo "Checking for common issues..."
grep -n "TODO\\|FIXME\\|HACK" /tmp/pr.diff || echo "No TODOs found"
grep -n "console\\.log\\|print(" /tmp/pr.diff || echo "No debug statements found"
""",
"args": []
}
]
)
.depends("fetch-pr")
.output("REVIEW_COMMENTS")
)
# Post review comment
post = step("post-review").docker(
image="alpine:latest",
command=f"""
echo "Review for PR #{pr_number}:"
echo ""
cat << 'EOF'
${{REVIEW_COMMENTS}}
EOF
"""
).depends("ai-review")
return workflow("pr-review").add_steps([fetch_pr, review, post])
Intelligent Incident Response
Serverless agents for automated incident response:
@workflow
def incident_response():
"""Automated incident response with AI decision making"""
# Detect incident (simulate with metrics check)
detect = step("detect-incident").docker(
image="alpine:latest",
command="""
# Simulate incident detection
echo '{
"service": "api-gateway",
"error_rate": 15.2,
"response_time": 2500,
"status_5xx": 127,
"timestamp": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
}'
"""
).output("INCIDENTS")
# AI agent analyzes and responds
respond = (
step("ai-responder")
.inline_agent(
message="Analyze this incident and determine response: ${INCIDENTS}",
agent_name="incident-commander",
ai_instructions="""You are an incident response expert.
For the incident:
1. Assess severity (P1/P2/P3/P4)
2. Determine likely root cause
3. Recommend immediate actions
4. Create runbook steps for resolution
Use tools to gather system state before making recommendations.
Output JSON with structure:
{
"severity": "P1-P4",
"impact": "description",
"root_cause": "likely cause",
"immediate_actions": ["action1", "action2"],
"runbook_steps": ["step1", "step2"],
"escalation_needed": true|false
}""",
# Serverless execution with powerful tools
runners=["kubiya-hosted"],
llm_model="gpt-4o",
is_debug_mode=True,
tools=[
{
"name": "check-service-health",
"type": "docker",
"image": "curlimages/curl:latest",
"content": """#!/bin/sh
# Check service health endpoint
SERVICE_NAME=$(echo $1 | jq -r '.service')
echo "Checking health for $SERVICE_NAME..."
# Simulate health check
echo '{"status": "degraded", "healthy_instances": 2, "total_instances": 5}'
""",
"args": [{"name": "incident_data", "type": "string"}]
},
{
"name": "get-recent-deployments",
"type": "docker",
"image": "alpine:latest",
"content": """#!/bin/sh
# Check recent deployments
echo "Recent deployments:"
echo "- 2h ago: v2.3.1 (api-gateway)"
echo "- 4h ago: v1.8.0 (auth-service)"
echo "- 24h ago: v3.2.0 (database)"
""",
"args": []
},
{
"name": "check-resource-usage",
"type": "docker",
"image": "alpine:latest",
"content": """#!/bin/sh
# Simulate resource check
echo '{
"cpu_usage": "78%",
"memory_usage": "92%",
"disk_usage": "45%",
"network_in": "120MB/s",
"network_out": "95MB/s"
}'
""",
"args": []
}
]
)
.output("RESPONSE_PLAN")
.depends("detect-incident")
)
# Execute response plan
execute = step("execute-response").shell("""
echo "=== Executing Incident Response Plan ==="
echo ""
echo "Response Plan:"
echo "${RESPONSE_PLAN}" | jq '.'
SEVERITY=$(echo "${RESPONSE_PLAN}" | jq -r '.severity')
echo ""
echo "Severity: $SEVERITY"
if [ "$SEVERITY" = "P1" ] || [ "$SEVERITY" = "P2" ]; then
echo "⚠️ HIGH PRIORITY INCIDENT - Executing immediate actions..."
# In real scenario, execute actual remediation
fi
""").depends("ai-responder")
# Send notification
notify = step("notify-team").docker(
image="alpine:latest",
command="""
SEVERITY=$(echo "${RESPONSE_PLAN}" | jq -r '.severity')
IMPACT=$(echo "${RESPONSE_PLAN}" | jq -r '.impact')
echo "📢 Incident Alert:"
echo "Severity: $SEVERITY"
echo "Impact: $IMPACT"
echo "See full response plan above"
"""
).depends("execute-response")
return workflow("incident-response").add_steps([detect, respond, execute, notify])
Tool Definitions
Custom CLI Tool
Define reusable tools inline:
from kubiya_workflow_sdk import step
# Slack notification tool
notify = (
step("notify-team")
.tool_def(
name="slack-notifier",
type="docker",
image="curlimages/curl:latest",
content="""#!/bin/sh
set -e
curl -X POST "$SLACK_WEBHOOK" \
-H "Content-Type: application/json" \
-d "{\\"text\\": \\"$message\\", \\"channel\\": \\"$channel\\"}"
""",
args=[
{"name": "message", "type": "string", "required": True},
{"name": "channel", "type": "string", "default": "#general"},
{"name": "SLACK_WEBHOOK", "type": "string", "required": True}
]
)
.args(
message="Deployment completed successfully!",
channel="#deployments",
SLACK_WEBHOOK="${SLACK_WEBHOOK_URL}"
)
)
Data Processing Tool with Files
Tool with configuration files:
transform_step = (
step("transform-data")
.tool_def(
name="data-transformer",
type="docker",
image="python:3.11-slim",
content="""#!/usr/bin/env python
import pandas as pd
import sys
df = pd.read_csv(sys.argv[1])
# Apply transformations from config
exec(open('/config/transform.py').read())
df.to_csv(sys.argv[2], index=False)
""",
args=[
{"name": "input_file", "type": "string"},
{"name": "output_file", "type": "string"}
],
with_files=[{
"path": "/config/transform.py",
"content": """
# Transformation logic
df['processed'] = df['value'] * 2
df = df[df['processed'] > 100]
"""
}]
)
.args(input_file="data.csv", output_file="processed.csv")
)
Complex DAG Workflows
Multi-Stage Data Pipeline
Complex DAG with conditional execution:
from kubiya_workflow_sdk import workflow, step
@workflow
def etl_pipeline(source: str, target: str):
"""Complex ETL pipeline with validation and error handling"""
# Extract from multiple sources
extract_db = step("extract-db").docker(
image="postgres:15",
command="pg_dump -h ${DB_HOST} -U ${DB_USER} -d ${DB_NAME}"
).output("DB_DATA")
extract_api = step("extract-api").docker(
image="python:3.11",
script="""
import requests
data = requests.get(os.environ['API_URL']).json()
print(json.dumps(data))
"""
).output("API_DATA")
# Validate data quality
validate = step("validate").docker(
image="python:3.11",
script="""
import json
db_data = json.loads(os.environ['DB_DATA'])
api_data = json.loads(os.environ['API_DATA'])
# Validation logic
errors = []
if len(db_data) == 0:
errors.append("No DB data")
if 'required_field' not in api_data:
errors.append("Missing required field")
if errors:
print(json.dumps({"valid": False, "errors": errors}))
sys.exit(1)
else:
print(json.dumps({"valid": True}))
"""
).depends(["extract-db", "extract-api"])
.output("VALIDATION_RESULT")
# Transform data
transform = step("transform").docker(
image="apache/spark:3.4",
command="spark-submit transform.py"
).depends("validate")
.preconditions("${VALIDATION_RESULT.valid} == true")
# Load to warehouse
load = step("load").docker(
image="snowflake/snowcli:latest",
command="snow stage copy @mystage ${TRANSFORMED_DATA}"
).depends("transform")
.retry(limit=3, interval_sec=60)
# Error handling path
error_notify = (
step("notify-error")
.inline_agent(
message="Generate error report for validation failures: ${VALIDATION_RESULT}",
agent_name="error-reporter",
ai_instructions="Create a detailed error report with remediation steps",
runners=["auto"]
)
.depends("validate")
.preconditions("${VALIDATION_RESULT.valid} == false")
)
return workflow("etl-pipeline").add_steps([
extract_db, extract_api, validate, transform, load, error_notify
])
Control Flow Examples
Retry and Error Handling
Robust error handling patterns:
from kubiya_workflow_sdk import step
# Step with comprehensive retry policy
resilient_step = (
step("fetch-external-data")
.docker(
image="curlimages/curl:latest",
command="curl -f https://api.example.com/data"
)
.retry(
limit=5,
interval_sec=30,
exponential_base=2.0,
max_interval_sec=300
)
.timeout(300) # 5 minute timeout
.continue_on(
exit_code=[404, 503], # Continue on specific errors
mark_success=True
)
.output("API_RESPONSE")
)
# Cleanup that always runs
cleanup = (
step("cleanup")
.shell("rm -rf /tmp/workspace/*")
.continue_on(failure=True) # Always run, even on failure
)
Conditional Execution
Execute steps based on conditions:
# Production deployment with safety checks
deploy_prod = (
step("deploy-production")
.docker(
image="kubectl:latest",
command="kubectl apply -f production.yaml"
)
.preconditions(
"${TEST_RESULT.coverage} > 80",
"${SECURITY_SCAN.vulnerabilities} == 0"
)
.depends(["test", "security-scan"])
)
# Staging deployment (less strict)
deploy_staging = (
step("deploy-staging")
.docker(
image="kubectl:latest",
command="kubectl apply -f staging.yaml"
)
.preconditions("${TEST_RESULT.passed} == true")
.depends("test")
)
ADK Provider Integration
While the workflow DSL focuses on execution, the ADK provider can generate workflows using AI:
from kubiya_workflow_sdk.providers import get_provider
# Use ADK provider to generate workflows from natural language
async def generate_workflow():
adk = get_provider("adk")
# Generate workflow from description
result = await adk.compose(
task="Create a CI/CD pipeline that builds a Node.js app, runs tests, and deploys to Kubernetes",
mode="plan" # Just generate, don't execute
)
# The provider generates a complete workflow using the SDK
print(result.workflow_yaml)
# Execute if desired
if result.workflow:
execution = await adk.compose(
task="Execute the generated workflow",
mode="act", # Generate and execute
workflow=result.workflow
)
Best Practices
1. Use Appropriate Executors
# Shell for simple commands
step("list-files").shell("ls -la")
# Docker for isolated environments
step("build").docker("node:18", "npm run build")
# Inline agent for AI tasks
step("analyze").inline_agent(
message="Analyze this data",
agent_name="analyzer",
ai_instructions="You are a data analyst..."
)
2. Handle Dependencies Properly
# Single dependency
step("deploy").depends("test")
# Multiple dependencies
step("report").depends(["analyze", "validate", "transform"])
# Conditional dependencies
step("notify").depends("deploy").preconditions("${DEPLOY_STATUS} == 'success'")
3. Capture and Use Outputs
# Capture output
step("get-version").shell("git describe --tags").output("VERSION")
# Use in next step
step("build").docker(
"docker:latest",
"docker build -t myapp:${VERSION} ."
).depends("get-version")
4. Error Handling
# Retry transient failures
step("download").retry(limit=3, interval_sec=30)
# Continue on specific errors
step("optional-check").continue_on(exit_code=[404])
# Always run cleanup
step("cleanup").continue_on(failure=True)
Next Steps
Was this page helpful?
- Workflow Examples
- Basic Workflow Patterns
- Sequential Workflow
- Parallel Execution
- Container Orchestration
- Multi-Container Pipeline
- Database Migration with Services
- AI Agent Integration
- Serverless AI Agents in Workflows
- Inline AI Agent for Log Analysis
- AI-Powered Code Review with Function Calling
- Intelligent Incident Response
- Tool Definitions
- Custom CLI Tool
- Data Processing Tool with Files
- Complex DAG Workflows
- Multi-Stage Data Pipeline
- Control Flow Examples
- Retry and Error Handling
- Conditional Execution
- ADK Provider Integration
- Best Practices
- 1. Use Appropriate Executors
- 2. Handle Dependencies Properly
- 3. Capture and Use Outputs
- 4. Error Handling
- Next Steps