Basic Examples
Hello World Workflow
The simplest possible workflow:Copy
Ask AI
from kubiya import KubiyaClient
from kubiya.dsl import workflow
# Initialize client
client = KubiyaClient(api_key="your-api-key")
# Create workflow
wf = (
workflow("hello-world")
.description("My first Kubiya workflow")
.step("greet", "echo 'Hello from Kubiya!'")
)
# Execute
for event in client.execute_workflow(wf.to_dict(), stream=True):
print(event)
Multi-Step Pipeline
A workflow with multiple dependent steps:Copy
Ask AI
from kubiya.dsl import workflow
wf = (
workflow("data-pipeline")
.description("Download and process data")
.params(DATA_URL="https://example.com/data.csv")
# Step 1: Download
.step("download", "wget ${DATA_URL} -O /tmp/data.csv")
# Step 2: Process
.step("process", callback=lambda s:
s.python("""
import pandas as pd
df = pd.read_csv('/tmp/data.csv')
df_clean = df.dropna()
df_clean.to_csv('/tmp/clean.csv', index=False)
print(f"Processed {len(df_clean)} rows")
""")
.depends("download")
)
# Step 3: Upload
.step("upload", callback=lambda s:
s.shell("aws s3 cp /tmp/clean.csv s3://mybucket/processed/")
.depends("process")
)
)
DevOps Automation
CI/CD Pipeline
Complete continuous integration and deployment:Copy
Ask AI
from kubiya.dsl import workflow
cicd_pipeline = (
workflow("ci-cd-pipeline")
.description("Complete CI/CD pipeline")
.params(
BRANCH="main",
SERVICE_NAME="my-service",
VERSION="v1.0.0"
)
# Checkout code
.step("checkout", "git clone -b ${BRANCH} https://github.com/org/repo.git")
# Run linting
.step("lint", callback=lambda s:
s.shell("cd repo && flake8 . --config=.flake8")
.depends("checkout")
)
# Run tests
.step("test", callback=lambda s:
s.shell("cd repo && pytest tests/ -v")
.depends("lint")
)
# Build Docker image
.step("build", callback=lambda s:
s.shell("""
cd repo
docker build -t ${SERVICE_NAME}:${VERSION} .
docker tag ${SERVICE_NAME}:${VERSION} ${SERVICE_NAME}:latest
""")
.depends("test")
)
# Push to registry
.step("push", callback=lambda s:
s.shell("""
docker push ${SERVICE_NAME}:${VERSION}
docker push ${SERVICE_NAME}:latest
""")
.depends("build")
)
# Deploy to Kubernetes
.step("deploy", callback=lambda s:
s.shell("kubectl set image deployment/${SERVICE_NAME} ${SERVICE_NAME}=${SERVICE_NAME}:${VERSION}")
.depends("push")
)
# Verify deployment
.step("verify", callback=lambda s:
s.shell("kubectl rollout status deployment/${SERVICE_NAME}")
.depends("deploy")
)
)
Infrastructure as Code
Terraform deployment workflow:Copy
Ask AI
from kubiya.dsl import workflow
terraform_workflow = (
workflow("terraform-deploy")
.description("Deploy infrastructure with Terraform")
.params(ENVIRONMENT="staging")
# Initialize
.step("init", "terraform init")
# Plan changes
.step("plan", callback=lambda s:
s.shell("terraform plan -var='env=${ENVIRONMENT}' -out=tfplan")
.output("PLAN_OUTPUT")
.depends("init")
)
# Apply changes
.step("apply", callback=lambda s:
s.shell("terraform apply -auto-approve tfplan")
.depends("plan")
)
# Verify resources
.step("verify", callback=lambda s:
s.shell("terraform show")
.depends("apply")
)
)
Kubernetes Operations
Manage Kubernetes deployments:Copy
Ask AI
from kubiya.dsl import workflow
k8s_workflow = (
workflow("k8s-operations")
.description("Kubernetes deployment and scaling")
.params(
SERVICE_NAME="my-app",
NAMESPACE="production",
REPLICAS="5"
)
# Check current status
.step("status", "kubectl get deployment ${SERVICE_NAME} -n ${NAMESPACE}")
# Update deployment
.step("update", callback=lambda s:
s.shell("kubectl apply -f deployment.yaml -n ${NAMESPACE}")
.depends("status")
)
# Scale deployment
.step("scale", callback=lambda s:
s.shell("kubectl scale deployment/${SERVICE_NAME} --replicas=${REPLICAS} -n ${NAMESPACE}")
.depends("update")
)
# Wait for rollout
.step("wait", callback=lambda s:
s.shell("kubectl rollout status deployment/${SERVICE_NAME} -n ${NAMESPACE}")
.depends("scale")
)
# Verify pods
.step("verify-pods", callback=lambda s:
s.shell("kubectl get pods -n ${NAMESPACE} -l app=${SERVICE_NAME}")
.depends("wait")
)
)
Data Engineering
ETL Pipeline
Extract, transform, and load data:Copy
Ask AI
from kubiya.dsl import workflow
etl_pipeline = (
workflow("etl-pipeline")
.description("Extract, Transform, Load pipeline")
.params(
SOURCE_DB="postgresql://source/db",
TARGET_DB="postgresql://target/db"
)
# Extract from source
.step("extract", callback=lambda s:
s.python("""
import pandas as pd
from sqlalchemy import create_engine
import os
# Connect to source database
engine = create_engine(os.getenv('SOURCE_DB'))
# Extract data
df = pd.read_sql('SELECT * FROM customers WHERE active = true', engine)
# Save to temporary file
df.to_csv('/tmp/extracted_data.csv', index=False)
print(f"Extracted {len(df)} records")
""")
)
# Transform data
.step("transform", callback=lambda s:
s.python("""
import pandas as pd
# Load extracted data
df = pd.read_csv('/tmp/extracted_data.csv')
# Clean data
df_clean = df.dropna()
df_clean = df_clean.drop_duplicates()
# Transform
df_clean['full_name'] = df_clean['first_name'] + ' ' + df_clean['last_name']
df_clean['processed_at'] = pd.Timestamp.now()
# Save transformed data
df_clean.to_csv('/tmp/transformed_data.csv', index=False)
print(f"Transformed {len(df_clean)} records")
""")
.depends("extract")
)
# Load to target
.step("load", callback=lambda s:
s.python("""
import pandas as pd
from sqlalchemy import create_engine
import os
# Load transformed data
df = pd.read_csv('/tmp/transformed_data.csv')
# Connect to target database
engine = create_engine(os.getenv('TARGET_DB'))
# Load data
df.to_sql('customers_clean', engine, if_exists='append', index=False)
print(f"Loaded {len(df)} records to target database")
""")
.depends("transform")
)
)
ML Training Pipeline
Machine learning workflow:Copy
Ask AI
from kubiya.dsl import workflow
ml_pipeline = (
workflow("ml-training")
.description("Train and evaluate ML model")
.params(
DATASET_PATH="/data/training_data.csv",
MODEL_TYPE="random_forest"
)
# Prepare data
.step("prepare", callback=lambda s:
s.python("""
import pandas as pd
from sklearn.model_selection import train_test_split
import os
# Load data
df = pd.read_csv(os.getenv('DATASET_PATH'))
# Prepare features and target
X = df.drop('target', axis=1)
y = df['target']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Save splits
X_train.to_csv('/tmp/X_train.csv', index=False)
X_test.to_csv('/tmp/X_test.csv', index=False)
y_train.to_csv('/tmp/y_train.csv', index=False)
y_test.to_csv('/tmp/y_test.csv', index=False)
print(f"Train set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")
""")
)
# Train model
.step("train", callback=lambda s:
s.python("""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
import os
# Load training data
X_train = pd.read_csv('/tmp/X_train.csv')
y_train = pd.read_csv('/tmp/y_train.csv').values.ravel()
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Save model
joblib.dump(model, '/tmp/model.pkl')
print("Model trained successfully")
""")
.depends("prepare")
)
# Evaluate model
.step("evaluate", callback=lambda s:
s.python("""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, classification_report
# Load model and test data
model = joblib.load('/tmp/model.pkl')
X_test = pd.read_csv('/tmp/X_test.csv')
y_test = pd.read_csv('/tmp/y_test.csv').values.ravel()
# Make predictions
y_pred = model.predict(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.3f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
""")
.depends("train")
)
)
Advanced Patterns
Dynamic Workflow Generation
Generate workflows programmatically:Copy
Ask AI
from kubiya.dsl import workflow
def create_multi_service_deployment(services):
"""Generate deployment workflow for multiple services"""
wf = (
workflow("multi-service-deploy")
.description(f"Deploy {len(services)} services")
.params(VERSION="v1.0.0", ENVIRONMENT="staging")
)
# Build all services
for service in services:
wf.step(
f"build-{service}",
f"docker build -t {service}:${{VERSION}} ./{service}"
)
# Test all services
for service in services:
wf.step(
f"test-{service}",
callback=lambda s, svc=service:
s.shell(f"docker run {svc}:${{VERSION}} pytest tests/")
.depends(f"build-{svc}")
)
# Deploy all services
for service in services:
wf.step(
f"deploy-{service}",
callback=lambda s, svc=service:
s.shell(f"kubectl set image deployment/{svc} {svc}={svc}:${{VERSION}} -n ${{ENVIRONMENT}}")
.depends(f"test-{svc}")
)
return wf
# Use the generator
services = ["api", "frontend", "worker"]
deployment_wf = create_multi_service_deployment(services)
Workflow with Docker Containers
Using containerized steps:Copy
Ask AI
from kubiya.dsl import workflow
containerized_wf = (
workflow("containerized-workflow")
.description("Workflow using Docker containers")
# Python task in container
.step("python-analysis", callback=lambda s:
s.docker(
image="python:3.11-slim",
content="""
#!/usr/bin/env python3
import json
import sys
# Perform analysis
results = {
"status": "success",
"analysis": "Data processed",
"count": 1000
}
print(json.dumps(results))
"""
)
.output("ANALYSIS_RESULTS")
)
# Node.js task in container
.step("nodejs-task", callback=lambda s:
s.docker(
image="node:18-alpine",
content="""
#!/usr/bin/env node
console.log('Running Node.js task');
console.log('Processing data...');
console.log('Complete!');
"""
)
)
# Alpine task with curl
.step("health-check", callback=lambda s:
s.docker(
image="alpine:latest",
content="""
#!/bin/sh
apk add --no-cache curl
curl -f https://api.example.com/health || exit 1
echo "Health check passed"
"""
)
)
)
Async Workflow Execution
Using the async client:Copy
Ask AI
from kubiya import StreamingKubiyaClient
from kubiya.dsl import workflow
import asyncio
async def run_workflow():
wf = (
workflow("async-workflow")
.description("Async workflow execution")
.step("task1", "echo 'Task 1'")
.step("task2", callback=lambda s:
s.shell("echo 'Task 2'")
.depends("task1")
)
)
async with StreamingKubiyaClient(api_key="your-api-key") as client:
async for event in client.execute_workflow_stream(wf.to_dict()):
print(f"Event: {event}")
# Run the async workflow
asyncio.run(run_workflow())
Client SDK Examples
Managing Agents
Copy
Ask AI
from kubiya import KubiyaClient
client = KubiyaClient()
# Create agent
agent = client.agents.create(
name="devops-assistant",
description="AI assistant for DevOps tasks",
llm_model="claude-sonnet-4",
tools=["kubectl", "terraform"],
integrations=["slack"]
)
# Configure access
client.agents.access.add_group(agent['uuid'], ["devops-team"])
# Set environment
client.agents.env.set(agent['uuid'], {
"CLUSTER": "production",
"REGION": "us-east-1"
})
# Set AI instructions
client.agents.prompt.set(agent['uuid'], """
You are a DevOps assistant. Always:
- Verify context before operations
- Explain commands clearly
- Ask for confirmation on destructive operations
""")
print(f"✅ Agent configured: {agent['name']}")
Managing Secrets
Copy
Ask AI
# Create secrets
client.secrets.create(
name="github-token",
value="ghp_xxxxxxxxxxxx",
description="GitHub API token"
)
client.secrets.create(
name="database-password",
value="super-secret-password",
description="Production database password"
)
# List secrets (metadata only)
secrets = client.secrets.list()
for secret in secrets:
print(f"Secret: {secret['name']}")
# Get secret value
token = client.secrets.value("github-token")
print(f"Token: {token[:10]}...")
Querying Knowledge
Copy
Ask AI
# Simple query
response = client.knowledge.query(
prompt="How do I deploy a Kubernetes application?"
)
print(response)
# Streaming query
for event in client.knowledge.query(
prompt="Best practices for container security",
stream=True
):
print(event)