Introduction
Core Concepts
Full Stack Agents
Frontend & UI
Platform & Tools
Framework Examples
Deployment
Tutorials
Resources
Workflows
Advanced Workflows
Deep dive into advanced workflow features, streaming, custom providers, and production deployment
Advanced Workflows
Master the advanced features of Kubiya workflows for production-grade automation.
Advanced Step Types
1. Shell Steps
Execute shell commands in any container:
Copy
Ask AI
result = step.shell(
name="process-data",
image="alpine:latest",
command="""
echo "Processing..."
cat input.json | jq '.data[]' > output.json
""",
env={"API_KEY": "${secrets.API_KEY}"}
)
2. Python Steps
Run Python scripts with full ecosystem:
Copy
Ask AI
analysis = step.python(
name="analyze-metrics",
image="python:3.11-slim",
packages=["pandas", "numpy", "scikit-learn"],
script="""
import pandas as pd
import numpy as np
df = pd.read_csv('/data/metrics.csv')
summary = df.describe()
print(summary.to_json())
""",
volumes={"/data": "metrics-volume"}
)
3. Container Steps
Run any containerized application:
Copy
Ask AI
server = step.container(
name="web-server",
image="nginx:alpine",
ports={"80": "8080"},
volumes={
"./config": "/etc/nginx/conf.d",
"./static": "/usr/share/nginx/html"
},
health_check={
"test": ["CMD", "curl", "-f", "http://localhost/health"],
"interval": "30s",
"retries": 3
}
)
4. Inline Agent Steps
Embed AI decision-making:
Copy
Ask AI
decision = step.inline_agent(
name="deployment-decision",
message="Analyze these metrics and decide if we should deploy: ${metrics}",
runners=["kubiya-hosted"],
llm_model="gpt-4",
is_debug_mode=True,
tools=[
{
"name": "check-metrics",
"type": "docker",
"image": "datadog/agent:latest",
"content": "datadog-check.sh",
"args": {"threshold": "0.95"}
}
]
)
Streaming and Real-Time Updates
SSE Streaming
Copy
Ask AI
from kubiya_workflow_sdk import Client
import json
client = Client(api_key="your-key")
# Stream workflow execution
for event in client.execute_workflow(workflow, stream=True):
if event.type == "step.started":
print(f"Starting: {event.data['step_name']}")
elif event.type == "log":
print(f"Log: {event.data['message']}")
elif event.type == "step.completed":
print(f"Completed: {event.data['step_name']}")
Vercel AI SDK Format
Copy
Ask AI
# For integration with Vercel AI SDK
async for chunk in client.stream_workflow(workflow, format="vercel"):
# Chunk format compatible with Vercel AI SDK
if chunk['type'] == '0': # Text delta
print(chunk['value'], end='')
elif chunk['type'] == '8': # Tool call
tool_data = json.loads(chunk['value'])
print(f"Tool: {tool_data['toolName']}")
Custom Stream Processing
Copy
Ask AI
class WorkflowStreamProcessor:
def __init__(self):
self.steps_completed = 0
self.logs = []
async def process_stream(self, workflow):
async for event in workflow.stream():
if event.type == "step.completed":
self.steps_completed += 1
await self.notify_progress(self.steps_completed)
elif event.type == "log":
self.logs.append(event.data)
await self.send_to_monitoring(event.data)
Custom Providers
Creating a Custom Provider
Copy
Ask AI
from kubiya_workflow_sdk.providers import BaseProvider
from typing import Dict, Any, AsyncGenerator
class CustomLLMProvider(BaseProvider):
"""Custom provider for your LLM"""
def __init__(self, api_key: str, model: str = "custom-model"):
self.api_key = api_key
self.model = model
async def compose(
self,
task: str,
mode: str = "plan",
context: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Generate workflow from natural language"""
# Your LLM logic here
workflow = await self._call_llm(task, context)
return {
"workflow": workflow,
"metadata": {"model": self.model}
}
async def stream_compose(
self,
task: str,
mode: str = "plan"
) -> AsyncGenerator[Dict[str, Any], None]:
"""Stream workflow generation"""
async for chunk in self._stream_llm(task):
yield {
"type": "token",
"content": chunk
}
Registering Custom Provider
Copy
Ask AI
from kubiya_workflow_sdk.providers import register_provider
# Register your provider
register_provider("custom-llm", CustomLLMProvider)
# Use it
provider = get_provider("custom-llm", api_key="...")
workflow = await provider.compose("Deploy my app")
Kubernetes Deployment
Simply create a local runner on the Kubiya platform web interface, REST API, or CLI to get a manifest, give your runner a name - and deploy it on your cluster -> You can now reference this runner string for workflow execution
Local Testing
Copy
Ask AI
# test_workflow.py
import pytest
from kubiya_workflow_sdk import Workflow, Client, Step
def test_data_pipeline():
workflow = Workflow(
name="test-pipeline",
steps=[
Step("fetch", "curl -o data.json https://api.example.com/data"),
Step("process", "jq '.items[]' data.json > processed.json"),
Step("validate", "python validate.py processed.json")
]
)
# Use client with your test runner
client = Client(api_key="test-key")
result = client.execute_workflow(
workflow,
runner="your-test-runner" # Created via Kubiya platform
)
assert result.success
assert len(result.steps) == 3
Debug Mode
Copy
Ask AI
# Enable debug mode for detailed logs
workflow = Workflow(
name="debug-workflow",
debug=True,
log_level="DEBUG"
)
# Step-through debugging
for step in workflow.steps:
print(f"Executing: {step.name}")
result = step.execute(breakpoint=True)
# Inspect state
print(f"Output: {result.output}")
print(f"Logs: {result.logs}")
# Continue or modify
if input("Continue? (y/n): ") != "y":
break
Production Best Practices
1. Error Handling
Copy
Ask AI
@workflow
def production_pipeline():
try:
data = step.fetch_critical_data(
retry=3,
retry_interval="exponential",
timeout="5m",
on_failure="continue" # Don't fail entire workflow
)
except StepError as e:
# Fallback logic
data = step.fetch_cached_data()
step.alert_team(f"Using cached data: {e}")
2. Monitoring & Observability
Copy
Ask AI
# Add monitoring hooks
workflow.add_hook("before_step", lambda s: metrics.increment(f"step.{s.name}.started"))
workflow.add_hook("after_step", lambda s, r: metrics.histogram(f"step.{s.name}.duration", r.duration))
# Custom spans
with workflow.span("critical-section"):
result = step.critical_operation()
3. Secrets Management
Copy
Ask AI
# Use Kubernetes secrets
step.database_operation(
env={
"DB_PASSWORD": "${k8s:secret/db-credentials/password}",
"API_KEY": "${vault:secret/api-keys/external}"
}
)
# Or environment variables
step.secure_operation(
env={
"TOKEN": "${env:SECURE_TOKEN}",
"CERT": "${file:/secrets/cert.pem}"
}
)
4. Caching & Artifacts
Copy
Ask AI
# Cache step outputs
@step.cache(key="data-fetch-${date}", ttl="1h")
def fetch_expensive_data():
return step.shell("./fetch-data.sh")
# Store artifacts
step.generate_report(
output_artifacts={
"report.pdf": "/reports/",
"metrics.json": "/metrics/"
}
)
Advanced Patterns
Dynamic DAG Generation
Copy
Ask AI
@workflow
def dynamic_pipeline(regions: list):
# Generate steps dynamically
results = []
for region in regions:
result = step.process_region(
name=f"process-{region}",
env={"REGION": region},
parallel=True # Run all regions in parallel
)
results.append(result)
# Wait for all and aggregate
step.aggregate_results(
inputs=results,
wait_for_all=True
)
Conditional Workflows
Copy
Ask AI
@workflow
def conditional_deployment():
test_result = step.run_tests()
if test_result.exit_code == 0:
if test_result.coverage > 80:
step.deploy_to_production()
else:
step.deploy_to_staging()
step.alert_team("Low coverage: ${test_result.coverage}%")
else:
step.rollback()
step.create_incident()
Map-Reduce Patterns
Copy
Ask AI
@workflow
def map_reduce_analysis():
# Map phase - parallel processing
chunks = step.split_data(chunks=10)
processed = step.map(
over=chunks,
operation=lambda chunk: step.process_chunk(
data=chunk,
image="processor:latest"
)
)
# Reduce phase
result = step.reduce(
data=processed,
operation="merge",
image="reducer:latest"
)
Troubleshooting
Common Issues
Copy
Ask AI
# Increase timeout for long-running steps
step.long_operation(
timeout="30m", # Default is 5m
grace_period="5m" # Time to cleanup after timeout
)
Copy
Ask AI
# Set appropriate memory limits
step.memory_intensive(
resources={"limits": {"memory": "32Gi"}},
swap_limit="64Gi"
)
Copy
Ask AI
# Configure network policies
step.external_api_call(
network_mode="host",
dns_servers=["8.8.8.8", "8.8.4.4"],
extra_hosts={"api.internal": "10.0.0.100"}
)
Performance Optimization
1. Parallel Execution
Copy
Ask AI
# Maximize parallelism
workflow.configure(
max_parallel_steps=20,
queue_size=100
)
2. Resource Pooling
Copy
Ask AI
# Reuse containers for similar steps
workflow.configure(
container_reuse=True,
pool_size=10
)
3. Caching Strategies
Copy
Ask AI
# Layer caching for builds
step.build_image(
cache_from=["registry/base:latest"],
build_args={"BUILDKIT_INLINE_CACHE": "1"}
)
Next Steps
Was this page helpful?
On this page
- Advanced Workflows
- Advanced Step Types
- 1. Shell Steps
- 2. Python Steps
- 3. Container Steps
- 4. Inline Agent Steps
- Streaming and Real-Time Updates
- SSE Streaming
- Vercel AI SDK Format
- Custom Stream Processing
- Custom Providers
- Creating a Custom Provider
- Registering Custom Provider
- Kubernetes Deployment
- Local Testing
- Debug Mode
- Production Best Practices
- 1. Error Handling
- 2. Monitoring & Observability
- 3. Secrets Management
- 4. Caching & Artifacts
- Advanced Patterns
- Dynamic DAG Generation
- Conditional Workflows
- Map-Reduce Patterns
- Troubleshooting
- Common Issues
- Performance Optimization
- 1. Parallel Execution
- 2. Resource Pooling
- 3. Caching Strategies
- Next Steps
Assistant
Responses are generated using AI and may contain mistakes.