SDK Reference
SDK Examples
Real-world examples demonstrating SDK capabilities and patterns
SDK Examples
Learn by example with these real-world SDK usage patterns.
Basic Examples
Hello World Workflow
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow
from kubiya_workflow_sdk import execute_workflow
# Create a simple workflow
wf = (
workflow("hello-world")
.description("My first Kubiya workflow")
.step("greet", "echo 'Hello from Kubiya!'")
)
# Execute it
result = execute_workflow(
wf.to_dict(),
api_key="YOUR_API_KEY",
stream=False
)
print(result)
Multi-Step Pipeline
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, shell_executor, python_executor
# Create a data processing pipeline
pipeline = (
workflow("data-pipeline")
.description("Download and process data")
# Step 1: Download data
.step("download")
.executor(shell_executor(
"wget https://example.com/data.csv -O /tmp/data.csv"
))
# Step 2: Process with Python
.step("process")
.executor(python_executor("""
import pandas as pd
# Load and process data
df = pd.read_csv('/tmp/data.csv')
df_clean = df.dropna()
df_clean.to_csv('/tmp/clean.csv', index=False)
print(f"Processed {len(df_clean)} rows")
""", packages=["pandas"]))
# Step 3: Upload results
.step("upload", "aws s3 cp /tmp/clean.csv s3://mybucket/processed/")
)
# Execute with streaming
import json
for event_str in execute_workflow(pipeline.to_dict(), stream=True):
try:
event = json.loads(event_str)
print(f"Event: {event.get('type')} - {event.get('message', '')}")
except:
print(event_str)
AI-Powered Workflows
Generate Workflow from Natural Language
Copy
Ask AI
from kubiya_workflow_sdk.providers import get_provider
import asyncio
async def generate_and_run():
# Get ADK provider
adk = get_provider(
"adk",
api_key="YOUR_API_KEY",
model="gemini-1.5-pro"
)
# Generate workflow from description
result = await adk.compose(
task="""
Create a workflow that:
1. Clones a git repository
2. Runs tests with pytest
3. Builds a Docker image if tests pass
4. Pushes to registry
""",
mode="plan" # Just generate, don't execute yet
)
# The result contains the generated workflow
print("Generated workflow:")
print(result)
# Run the async function
asyncio.run(generate_and_run())
Inline AI Agent for Decision Making
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, inline_agent_executor
# Create workflow with AI analysis
smart_workflow = (
workflow("smart-deployment")
.description("Deploy with AI decision making")
# Run tests
.step("test", "pytest tests/ -v --json-report --json-report-file=/tmp/test-results.json")
# AI analyzes results
.step("analyze")
.executor(inline_agent_executor(
message="Analyze test results from /tmp/test-results.json and decide if we should deploy",
runners=["kubiya-hosted"],
ai_instructions="""You are a deployment decision maker. Analyze test results and respond with:
- should_deploy: true/false
- confidence: 0-100
- reason: explanation of decision"""
))
# Deploy if approved
.step("deploy", "kubectl apply -f deployment.yaml")
)
DevOps Automation
CI/CD Pipeline
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow
# Create CI/CD pipeline
cicd = (
workflow("ci-cd-pipeline")
.description("Complete CI/CD pipeline")
.params(BRANCH="${BRANCH:-main}")
# Checkout code
.step("checkout", "git clone -b ${BRANCH} https://github.com/myorg/myapp.git")
# Run linting
.step("lint", "cd myapp && flake8 . --config=.flake8")
# Run tests in parallel
.parallel_steps(
"tests",
items=["unit", "integration", "e2e"],
command="cd myapp && pytest tests/${ITEM} -v",
max_concurrent=3
)
# Build Docker image
.step("build", """
cd myapp
docker build -t myapp:${BUILD_ID} .
docker tag myapp:${BUILD_ID} myapp:latest
""")
# Push to registry
.step("push", """
docker push myregistry.io/myapp:${BUILD_ID}
docker push myregistry.io/myapp:latest
""")
)
Infrastructure as Code
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, inline_agent_executor
# Terraform deployment with AI review
terraform_workflow = (
workflow("terraform-deploy")
.description("Deploy infrastructure with Terraform")
.params(ENVIRONMENT="${ENVIRONMENT}")
# Initialize Terraform
.step("init", "terraform init")
# Plan changes
.step("plan", "terraform plan -var='env=${ENVIRONMENT}' -out=tfplan")
# AI reviews the plan
.step("review")
.executor(inline_agent_executor(
message="Review the Terraform plan output from the previous step",
ai_instructions="You are a Terraform expert. Review the plan and identify any risks.",
runners=["kubiya-hosted"]
))
# Apply if safe
.step("apply", "terraform apply -auto-approve tfplan")
)
Data Engineering
ETL Pipeline
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, python_executor
# Create ETL workflow
etl = (
workflow("etl-pipeline")
.description("Extract, Transform, Load pipeline")
.env(
DB_HOST="${DB_HOST}",
DB_USER="${DB_USER}",
DB_PASS="${DB_PASS}"
)
# Extract data from multiple sources in parallel
.parallel_steps(
"extract",
items=["postgres", "mysql", "mongodb"],
command="python extract_${ITEM}.py",
max_concurrent=3
)
# Transform data
.step("transform")
.executor(python_executor("""
import pandas as pd
import glob
# Load all extracted files
dfs = []
for file in glob.glob("/tmp/extract_*.csv"):
dfs.append(pd.read_csv(file))
# Combine and transform
df_combined = pd.concat(dfs, ignore_index=True)
df_transformed = df_combined.drop_duplicates()
# Add metadata
df_transformed['processed_at'] = pd.Timestamp.now()
df_transformed['pipeline_version'] = '1.0.0'
# Save result
df_transformed.to_parquet("/tmp/transformed.parquet")
print(f"Transformed {len(df_transformed)} total rows")
""", packages=["pandas", "pyarrow"]))
# Load to data warehouse
.step("load")
.executor(python_executor("""
import pandas as pd
from sqlalchemy import create_engine
import os
# Load transformed data
df = pd.read_parquet("/tmp/transformed.parquet")
# Connect to warehouse
engine = create_engine(
f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASS')}@{os.getenv('DB_HOST')}/warehouse"
)
# Load data
df.to_sql('fact_table', engine, if_exists='append', index=False)
print(f"Loaded {len(df)} rows to warehouse")
""", packages=["pandas", "sqlalchemy", "psycopg2-binary"]))
)
ML Pipeline
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow, python_executor
# Machine learning training pipeline
ml_pipeline = (
workflow("ml-training")
.description("Train and evaluate ML model")
.params(
DATASET_PATH="${DATASET_PATH}",
MODEL_TYPE="${MODEL_TYPE:-random_forest}"
)
# Prepare data
.step("prepare")
.executor(python_executor("""
import pandas as pd
from sklearn.model_selection import train_test_split
import os
# Load data
df = pd.read_csv(os.getenv('DATASET_PATH'))
# Prepare features and target
X = df.drop('target', axis=1)
y = df['target']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Save splits
X_train.to_csv('/tmp/X_train.csv', index=False)
X_test.to_csv('/tmp/X_test.csv', index=False)
y_train.to_csv('/tmp/y_train.csv', index=False)
y_test.to_csv('/tmp/y_test.csv', index=False)
print(f"Train set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")
""", packages=["pandas", "scikit-learn"]))
# Train model
.step("train")
.executor(python_executor("""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import os
# Load training data
X_train = pd.read_csv('/tmp/X_train.csv')
y_train = pd.read_csv('/tmp/y_train.csv').values.ravel()
# Select model based on parameter
model_type = os.getenv('MODEL_TYPE', 'random_forest')
if model_type == 'random_forest':
model = RandomForestClassifier(n_estimators=100, random_state=42)
else:
model = LogisticRegression(random_state=42)
# Train model
model.fit(X_train, y_train)
# Save model
joblib.dump(model, '/tmp/model.pkl')
print(f"Trained {model_type} model successfully")
""", packages=["pandas", "scikit-learn", "joblib"]))
# Evaluate model
.step("evaluate")
.executor(python_executor("""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import json
# Load model and test data
model = joblib.load('/tmp/model.pkl')
X_test = pd.read_csv('/tmp/X_test.csv')
y_test = pd.read_csv('/tmp/y_test.csv').values.ravel()
# Make predictions
y_pred = model.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision, recall, f1, _ = precision_recall_fscore_support(
y_test, y_pred, average='weighted'
)
metrics = {
'accuracy': float(accuracy),
'precision': float(precision),
'recall': float(recall),
'f1_score': float(f1)
}
# Save metrics
with open('/tmp/metrics.json', 'w') as f:
json.dump(metrics, f, indent=2)
print(f"Model Evaluation:")
print(f" Accuracy: {accuracy:.3f}")
print(f" Precision: {precision:.3f}")
print(f" Recall: {recall:.3f}")
print(f" F1 Score: {f1:.3f}")
""", packages=["pandas", "scikit-learn", "joblib"]))
)
Advanced Patterns
Dynamic Workflow Generation
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow
from kubiya_workflow_sdk import execute_workflow
def create_dynamic_workflow(services: list):
"""Generate a workflow based on the services list"""
# Start with base workflow
wf = (
workflow("dynamic-deployment")
.description(f"Deploy {len(services)} services")
.env(ENVIRONMENT="production")
)
# Add health check for each service
for service in services:
wf.step(
f"check-{service}",
f"kubectl get deployment {service} -n production"
)
# Deploy services in parallel
wf.parallel_steps(
"deploy-services",
items=services,
command="kubectl set image deployment/${ITEM} ${ITEM}=${ITEM}:latest -n production",
max_concurrent=3
)
# Verify all services
for service in services:
wf.step(
f"verify-{service}",
f"kubectl rollout status deployment/{service} -n production"
)
return wf
# Create and execute dynamic workflow
services = ["api", "frontend", "worker", "scheduler"]
dynamic_wf = create_dynamic_workflow(services)
# Execute
for event in execute_workflow(dynamic_wf.to_dict(), stream=True):
print(event)
Error Handling and Retry Patterns
Copy
Ask AI
from kubiya_workflow_sdk.dsl import workflow
# Workflow with comprehensive error handling
resilient_workflow = (
workflow("resilient-deployment")
.description("Deployment with retries and rollback")
# Take snapshot before deployment
.step("snapshot", "kubectl create backup deployment-$(date +%s)")
# Deploy with health checks
.step("deploy", "kubectl apply -f deployment.yaml")
# Health check with retries
.step("health_check", """
for i in {1..5}; do
if curl -f http://service/health; then
echo "Health check passed"
exit 0
fi
echo "Health check failed, attempt $i/5"
sleep 30
done
exit 1
""")
# Rollback on failure
.step("rollback", "kubectl rollback deployment/app")
)
Using FastMCP Provider
Copy
Ask AI
from kubiya_workflow_sdk.providers import get_provider
import asyncio
async def use_fastmcp():
# Get FastMCP provider for tool execution
mcp = get_provider(
"fastmcp",
server_path="/path/to/mcp/server",
args=["--config", "mcp-config.json"]
)
# Execute tools via MCP
result = await mcp.execute_tool(
tool_name="database-query",
arguments={"query": "SELECT * FROM users LIMIT 10"}
)
print(f"Query result: {result}")
asyncio.run(use_fastmcp())
Working with the Kubiya Client
Copy
Ask AI
from kubiya_workflow_sdk import KubiyaClient
# Initialize client
client = KubiyaClient(
api_key="YOUR_API_KEY",
base_url="https://api.kubiya.ai", # Optional
runner="kubiya-hosted" # Optional
)
# List available integrations
integrations = client.get_integrations()
for integration in integrations:
print(f"- {integration['name']}: {integration.get('description', 'No description')}")
# Get available runners
runners = client.get_runners()
for runner in runners:
print(f"Runner: {runner['name']} - Status: {runner.get('status', 'Unknown')}")
# Execute a workflow with the client
workflow_dict = {
"name": "quick-task",
"steps": [
{"name": "task", "command": "echo 'Running quick task'"}
]
}
# Stream execution
for event in client.execute_workflow(workflow_dict, stream=True):
print(event)
Next Steps
Was this page helpful?
On this page
- SDK Examples
- Basic Examples
- Hello World Workflow
- Multi-Step Pipeline
- AI-Powered Workflows
- Generate Workflow from Natural Language
- Inline AI Agent for Decision Making
- DevOps Automation
- CI/CD Pipeline
- Infrastructure as Code
- Data Engineering
- ETL Pipeline
- ML Pipeline
- Advanced Patterns
- Dynamic Workflow Generation
- Error Handling and Retry Patterns
- Using FastMCP Provider
- Working with the Kubiya Client
- Next Steps
Assistant
Responses are generated using AI and may contain mistakes.