Introduction
Core Concepts
Full Stack Agents
Frontend & UI
Platform & Tools
Framework Examples
Deployment
Tutorials
Resources
Introduction
Quickstart
Build your first workflow in 5 minutes
Prerequisites: Make sure you have installed the SDK and have your Kubiya API key ready.
Your First Workflow
Let’s build a simple deployment workflow that showcases the power of Kubiya SDK.
1
Import the SDK
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow
from kubiya_workflow_sdk import KubiyaClient, execute_workflow
2
Create a Basic Workflow
Copy
Ask AI
# Define workflow using the DSL
my_workflow = (
workflow("hello-deployment")
.description("My first deployment workflow")
.step("prepare", "echo 'Preparing deployment...'")
.step("deploy", "echo 'Deploying application v1.0'")
.step("verify", "echo 'Deployment successful!'")
)
3
Execute the Workflow
Copy
Ask AI
# Option 1: Using the convenience function
result = execute_workflow(
my_workflow.to_dict(),
api_key="YOUR_API_KEY", # Or use KUBIYA_API_KEY env var
stream=True
)
for event in result:
print(event)
# Option 2: Using the client
client = KubiyaClient(api_key="YOUR_API_KEY")
result = client.execute_workflow(my_workflow.to_dict(), stream=True)
4
Stream Real-time Updates
Copy
Ask AI
# Execute with streaming for real-time updates
import json
for event_str in execute_workflow(my_workflow.to_dict(), stream=True):
try:
event = json.loads(event_str)
if event.get("type") == "step_started":
print(f"▶️ Starting: {event.get('step_name')}")
elif event.get("type") == "step_completed":
print(f"✅ Completed: {event.get('step_name')}")
elif event.get("type") == "log":
print(f" Output: {event.get('message')}")
except json.JSONDecodeError:
# Handle raw string events
print(event_str)
Advanced Example: Real Deployment
Now let’s create a more realistic deployment workflow with error handling and retries:
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, step, retry_policy
from kubiya_workflow_sdk import execute_workflow
import json
# Create a production-ready deployment workflow
deployment_workflow = (
workflow("production-deployment")
.description("Deploy application with health checks")
.params(
APP_NAME="${APP_NAME}",
VERSION="${VERSION}",
ENVIRONMENT="staging"
)
# Step 1: Validate deployment parameters
.step("validate", """
if [ -z "$APP_NAME" ] || [ -z "$VERSION" ]; then
echo "Error: APP_NAME and VERSION are required"
exit 1
fi
echo "Deploying $APP_NAME version $VERSION to $ENVIRONMENT"
""")
# Step 2: Deploy application
.step("deploy", """
echo "Deploying ${APP_NAME}:${VERSION}..."
# In real scenario: kubectl set image deployment/${APP_NAME} app=${APP_NAME}:${VERSION}
sleep 2
echo "Deployment initiated"
""")
# Step 3: Health check with retries
.step("health_check", """
echo "Checking health..."
# In real scenario: curl -f http://localhost:8080/health
echo "Health check passed"
""")
# Step 4: Notify success
.step("notify", """
echo "🎉 Deployment Complete!"
echo "Application: ${APP_NAME}"
echo "Version: ${VERSION}"
echo "Environment: ${ENVIRONMENT}"
echo "Status: Healthy"
""")
)
# Execute the workflow
params = {
"APP_NAME": "my-awesome-app",
"VERSION": "2.1.0"
}
print("🚀 Starting production deployment workflow...\n")
for event_str in execute_workflow(
deployment_workflow.to_dict(),
api_key="YOUR_API_KEY",
parameters=params,
stream=True
):
try:
event = json.loads(event_str)
if event.get("type") == "step_started":
print(f"▶️ {event.get('step_name', 'Unknown')}: Starting...")
elif event.get("type") == "step_completed":
print(f"✅ {event.get('step_name', 'Unknown')}: Completed")
elif event.get("type") == "log":
print(f" {event.get('message', '')}")
except json.JSONDecodeError:
# Handle non-JSON events
if "data:" in event_str:
print(event_str.replace("data:", "").strip())
Working with Tools and AI
Use the ADK provider to generate workflows from natural language:
Copy
Ask AI
from kubiya_workflow_sdk.providers import get_provider
import asyncio
async def generate_workflow():
# Initialize ADK provider
adk = get_provider(
"adk",
api_key="YOUR_API_KEY",
model="gemini-1.5-pro"
)
# Generate a workflow from description
task = """
Create a workflow that:
1. Backs up a PostgreSQL database
2. Uploads the backup to S3
3. Verifies the backup integrity
4. Sends a Slack notification with results
"""
# Generate workflow
result = await adk.compose(
task=task,
mode="plan", # Just generate, don't execute
stream=True
)
async for event in result:
print(event)
# Run the async function
asyncio.run(generate_workflow())
Using Inline Agents
Execute AI agents within your workflows:
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, inline_agent_executor
# Create workflow with AI decision making
analysis_workflow = (
workflow("log-analysis")
.description("Analyze logs with AI")
# Collect logs
.step("collect_logs", "tail -n 100 /var/log/app.log")
# AI analyzes logs
.step("analyze")
.executor(inline_agent_executor(
message="Analyze these logs and identify any errors or anomalies: ${collect_logs.output}",
runners=["kubiya-hosted"],
ai_instructions="You are a log analysis expert. Identify errors, patterns, and anomalies."
))
# Take action based on analysis
.step("notify_if_critical", """
echo "Analysis complete"
# In real scenario: send alerts if critical issues found
""")
)
Using Python and Shell Executors
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, python_executor, shell_executor
# Mix Python and shell steps
data_workflow = (
workflow("data-processing")
.description("Process CSV data")
# Download with shell
.step("download")
.executor(shell_executor("wget https://example.com/data.csv -O /tmp/data.csv"))
# Process with Python
.step("process")
.executor(python_executor("""
import pandas as pd
# Load data
df = pd.read_csv('/tmp/data.csv')
print(f"Loaded {len(df)} rows")
# Clean data
df_clean = df.dropna()
df_clean.to_csv('/tmp/clean.csv', index=False)
print(f"Cleaned data: {len(df_clean)} rows")
""", packages=["pandas"]))
# Upload result
.step("upload")
.executor(shell_executor("aws s3 cp /tmp/clean.csv s3://mybucket/clean/"))
)
Next Steps
Workflow DSL
Master the workflow DSL for complex automations
AI Providers
Generate workflows with AI using ADK provider
Executors
Learn about different executors
Examples
More real-world examples
Common Patterns
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, retry_policy
wf = (
workflow("resilient-workflow")
.step("risky_operation", "./deploy.sh")
.step("health_check", "curl -f http://localhost:8080/health")
# Retry configuration can be added to workflow definition
)
Copy
Ask AI
wf = (
workflow("parallel-deployment")
.parallel_steps(
"deploy_regions",
items=["us-east-1", "eu-west-1", "ap-south-1"],
command="deploy-to-region.sh ${ITEM}",
max_concurrent=2
)
)
Copy
Ask AI
wf = (
workflow("env-example")
.env(
LOG_LEVEL="debug",
API_ENDPOINT="https://api.example.com"
)
.step("use_env", "echo $LOG_LEVEL")
)
Pro tip: Set your API key as an environment variable to avoid hardcoding:
Copy
Ask AI
export KUBIYA_API_KEY="your-api-key-here"
Was this page helpful?
Assistant
Responses are generated using AI and may contain mistakes.