Skip to main content
Learn by example with these real-world SDK usage patterns covering common automation scenarios.

Basic Examples

Hello World Workflow

The simplest possible workflow:
from kubiya import KubiyaClient
from kubiya.dsl import workflow

# Initialize client
client = KubiyaClient(api_key="your-api-key")

# Create workflow
wf = (
    workflow("hello-world")
        .description("My first Kubiya workflow")
        .step("greet", "echo 'Hello from Kubiya!'")
)

# Execute
for event in client.execute_workflow(wf.to_dict(), stream=True):
    print(event)

Multi-Step Pipeline

A workflow with multiple dependent steps:
from kubiya.dsl import workflow

wf = (
    workflow("data-pipeline")
        .description("Download and process data")
        .params(DATA_URL="https://example.com/data.csv")

        # Step 1: Download
        .step("download", "wget ${DATA_URL} -O /tmp/data.csv")

        # Step 2: Process
        .step("process", callback=lambda s:
            s.python("""
import pandas as pd
df = pd.read_csv('/tmp/data.csv')
df_clean = df.dropna()
df_clean.to_csv('/tmp/clean.csv', index=False)
print(f"Processed {len(df_clean)} rows")
            """)
            .depends("download")
        )

        # Step 3: Upload
        .step("upload", callback=lambda s:
            s.shell("aws s3 cp /tmp/clean.csv s3://mybucket/processed/")
                .depends("process")
        )
)

DevOps Automation

CI/CD Pipeline

Complete continuous integration and deployment:
from kubiya.dsl import workflow

cicd_pipeline = (
    workflow("ci-cd-pipeline")
        .description("Complete CI/CD pipeline")
        .params(
            BRANCH="main",
            SERVICE_NAME="my-service",
            VERSION="v1.0.0"
        )

        # Checkout code
        .step("checkout", "git clone -b ${BRANCH} https://github.com/org/repo.git")

        # Run linting
        .step("lint", callback=lambda s:
            s.shell("cd repo && flake8 . --config=.flake8")
                .depends("checkout")
        )

        # Run tests
        .step("test", callback=lambda s:
            s.shell("cd repo && pytest tests/ -v")
                .depends("lint")
        )

        # Build Docker image
        .step("build", callback=lambda s:
            s.shell("""
cd repo
docker build -t ${SERVICE_NAME}:${VERSION} .
docker tag ${SERVICE_NAME}:${VERSION} ${SERVICE_NAME}:latest
            """)
            .depends("test")
        )

        # Push to registry
        .step("push", callback=lambda s:
            s.shell("""
docker push ${SERVICE_NAME}:${VERSION}
docker push ${SERVICE_NAME}:latest
            """)
            .depends("build")
        )

        # Deploy to Kubernetes
        .step("deploy", callback=lambda s:
            s.shell("kubectl set image deployment/${SERVICE_NAME} ${SERVICE_NAME}=${SERVICE_NAME}:${VERSION}")
                .depends("push")
        )

        # Verify deployment
        .step("verify", callback=lambda s:
            s.shell("kubectl rollout status deployment/${SERVICE_NAME}")
                .depends("deploy")
        )
)

Infrastructure as Code

Terraform deployment workflow:
from kubiya.dsl import workflow

terraform_workflow = (
    workflow("terraform-deploy")
        .description("Deploy infrastructure with Terraform")
        .params(ENVIRONMENT="staging")

        # Initialize
        .step("init", "terraform init")

        # Plan changes
        .step("plan", callback=lambda s:
            s.shell("terraform plan -var='env=${ENVIRONMENT}' -out=tfplan")
                .output("PLAN_OUTPUT")
                .depends("init")
        )

        # Apply changes
        .step("apply", callback=lambda s:
            s.shell("terraform apply -auto-approve tfplan")
                .depends("plan")
        )

        # Verify resources
        .step("verify", callback=lambda s:
            s.shell("terraform show")
                .depends("apply")
        )
)

Kubernetes Operations

Manage Kubernetes deployments:
from kubiya.dsl import workflow

k8s_workflow = (
    workflow("k8s-operations")
        .description("Kubernetes deployment and scaling")
        .params(
            SERVICE_NAME="my-app",
            NAMESPACE="production",
            REPLICAS="5"
        )

        # Check current status
        .step("status", "kubectl get deployment ${SERVICE_NAME} -n ${NAMESPACE}")

        # Update deployment
        .step("update", callback=lambda s:
            s.shell("kubectl apply -f deployment.yaml -n ${NAMESPACE}")
                .depends("status")
        )

        # Scale deployment
        .step("scale", callback=lambda s:
            s.shell("kubectl scale deployment/${SERVICE_NAME} --replicas=${REPLICAS} -n ${NAMESPACE}")
                .depends("update")
        )

        # Wait for rollout
        .step("wait", callback=lambda s:
            s.shell("kubectl rollout status deployment/${SERVICE_NAME} -n ${NAMESPACE}")
                .depends("scale")
        )

        # Verify pods
        .step("verify-pods", callback=lambda s:
            s.shell("kubectl get pods -n ${NAMESPACE} -l app=${SERVICE_NAME}")
                .depends("wait")
        )
)

Data Engineering

ETL Pipeline

Extract, transform, and load data:
from kubiya.dsl import workflow

etl_pipeline = (
    workflow("etl-pipeline")
        .description("Extract, Transform, Load pipeline")
        .params(
            SOURCE_DB="postgresql://source/db",
            TARGET_DB="postgresql://target/db"
        )

        # Extract from source
        .step("extract", callback=lambda s:
            s.python("""
import pandas as pd
from sqlalchemy import create_engine
import os

# Connect to source database
engine = create_engine(os.getenv('SOURCE_DB'))

# Extract data
df = pd.read_sql('SELECT * FROM customers WHERE active = true', engine)

# Save to temporary file
df.to_csv('/tmp/extracted_data.csv', index=False)
print(f"Extracted {len(df)} records")
            """)
        )

        # Transform data
        .step("transform", callback=lambda s:
            s.python("""
import pandas as pd

# Load extracted data
df = pd.read_csv('/tmp/extracted_data.csv')

# Clean data
df_clean = df.dropna()
df_clean = df_clean.drop_duplicates()

# Transform
df_clean['full_name'] = df_clean['first_name'] + ' ' + df_clean['last_name']
df_clean['processed_at'] = pd.Timestamp.now()

# Save transformed data
df_clean.to_csv('/tmp/transformed_data.csv', index=False)
print(f"Transformed {len(df_clean)} records")
            """)
            .depends("extract")
        )

        # Load to target
        .step("load", callback=lambda s:
            s.python("""
import pandas as pd
from sqlalchemy import create_engine
import os

# Load transformed data
df = pd.read_csv('/tmp/transformed_data.csv')

# Connect to target database
engine = create_engine(os.getenv('TARGET_DB'))

# Load data
df.to_sql('customers_clean', engine, if_exists='append', index=False)
print(f"Loaded {len(df)} records to target database")
            """)
            .depends("transform")
        )
)

ML Training Pipeline

Machine learning workflow:
from kubiya.dsl import workflow

ml_pipeline = (
    workflow("ml-training")
        .description("Train and evaluate ML model")
        .params(
            DATASET_PATH="/data/training_data.csv",
            MODEL_TYPE="random_forest"
        )

        # Prepare data
        .step("prepare", callback=lambda s:
            s.python("""
import pandas as pd
from sklearn.model_selection import train_test_split
import os

# Load data
df = pd.read_csv(os.getenv('DATASET_PATH'))

# Prepare features and target
X = df.drop('target', axis=1)
y = df['target']

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Save splits
X_train.to_csv('/tmp/X_train.csv', index=False)
X_test.to_csv('/tmp/X_test.csv', index=False)
y_train.to_csv('/tmp/y_train.csv', index=False)
y_test.to_csv('/tmp/y_test.csv', index=False)

print(f"Train set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")
            """)
        )

        # Train model
        .step("train", callback=lambda s:
            s.python("""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
import os

# Load training data
X_train = pd.read_csv('/tmp/X_train.csv')
y_train = pd.read_csv('/tmp/y_train.csv').values.ravel()

# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Save model
joblib.dump(model, '/tmp/model.pkl')
print("Model trained successfully")
            """)
            .depends("prepare")
        )

        # Evaluate model
        .step("evaluate", callback=lambda s:
            s.python("""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, classification_report

# Load model and test data
model = joblib.load('/tmp/model.pkl')
X_test = pd.read_csv('/tmp/X_test.csv')
y_test = pd.read_csv('/tmp/y_test.csv').values.ravel()

# Make predictions
y_pred = model.predict(X_test)

# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.3f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
            """)
            .depends("train")
        )
)

Advanced Patterns

Dynamic Workflow Generation

Generate workflows programmatically:
from kubiya.dsl import workflow

def create_multi_service_deployment(services):
    """Generate deployment workflow for multiple services"""

    wf = (
        workflow("multi-service-deploy")
            .description(f"Deploy {len(services)} services")
            .params(VERSION="v1.0.0", ENVIRONMENT="staging")
    )

    # Build all services
    for service in services:
        wf.step(
            f"build-{service}",
            f"docker build -t {service}:${{VERSION}} ./{service}"
        )

    # Test all services
    for service in services:
        wf.step(
            f"test-{service}",
            callback=lambda s, svc=service:
                s.shell(f"docker run {svc}:${{VERSION}} pytest tests/")
                    .depends(f"build-{svc}")
        )

    # Deploy all services
    for service in services:
        wf.step(
            f"deploy-{service}",
            callback=lambda s, svc=service:
                s.shell(f"kubectl set image deployment/{svc} {svc}={svc}:${{VERSION}} -n ${{ENVIRONMENT}}")
                    .depends(f"test-{svc}")
        )

    return wf

# Use the generator
services = ["api", "frontend", "worker"]
deployment_wf = create_multi_service_deployment(services)

Workflow with Docker Containers

Using containerized steps:
from kubiya.dsl import workflow

containerized_wf = (
    workflow("containerized-workflow")
        .description("Workflow using Docker containers")

        # Python task in container
        .step("python-analysis", callback=lambda s:
            s.docker(
                image="python:3.11-slim",
                content="""
#!/usr/bin/env python3
import json
import sys

# Perform analysis
results = {
    "status": "success",
    "analysis": "Data processed",
    "count": 1000
}

print(json.dumps(results))
                """
            )
            .output("ANALYSIS_RESULTS")
        )

        # Node.js task in container
        .step("nodejs-task", callback=lambda s:
            s.docker(
                image="node:18-alpine",
                content="""
#!/usr/bin/env node
console.log('Running Node.js task');
console.log('Processing data...');
console.log('Complete!');
                """
            )
        )

        # Alpine task with curl
        .step("health-check", callback=lambda s:
            s.docker(
                image="alpine:latest",
                content="""
#!/bin/sh
apk add --no-cache curl
curl -f https://api.example.com/health || exit 1
echo "Health check passed"
                """
            )
        )
)

Async Workflow Execution

Using the async client:
from kubiya import StreamingKubiyaClient
from kubiya.dsl import workflow
import asyncio

async def run_workflow():
    wf = (
        workflow("async-workflow")
            .description("Async workflow execution")
            .step("task1", "echo 'Task 1'")
            .step("task2", callback=lambda s:
                s.shell("echo 'Task 2'")
                    .depends("task1")
            )
    )

    async with StreamingKubiyaClient(api_key="your-api-key") as client:
        async for event in client.execute_workflow_stream(wf.to_dict()):
            print(f"Event: {event}")

# Run the async workflow
asyncio.run(run_workflow())

Client SDK Examples

Managing Agents

from kubiya import KubiyaClient

client = KubiyaClient()

# Create agent
agent = client.agents.create(
    name="devops-assistant",
    description="AI assistant for DevOps tasks",
    llm_model="claude-sonnet-4",
    tools=["kubectl", "terraform"],
    integrations=["slack"]
)

# Configure access
client.agents.access.add_group(agent['uuid'], ["devops-team"])

# Set environment
client.agents.env.set(agent['uuid'], {
    "CLUSTER": "production",
    "REGION": "us-east-1"
})

# Set AI instructions
client.agents.prompt.set(agent['uuid'], """
You are a DevOps assistant. Always:
- Verify context before operations
- Explain commands clearly
- Ask for confirmation on destructive operations
""")

print(f"✅ Agent configured: {agent['name']}")

Managing Secrets

# Create secrets
client.secrets.create(
    name="github-token",
    value="ghp_xxxxxxxxxxxx",
    description="GitHub API token"
)

client.secrets.create(
    name="database-password",
    value="super-secret-password",
    description="Production database password"
)

# List secrets (metadata only)
secrets = client.secrets.list()
for secret in secrets:
    print(f"Secret: {secret['name']}")

# Get secret value
token = client.secrets.value("github-token")
print(f"Token: {token[:10]}...")

Querying Knowledge

# Simple query
response = client.knowledge.query(
    prompt="How do I deploy a Kubernetes application?"
)
print(response)

# Streaming query
for event in client.knowledge.query(
    prompt="Best practices for container security",
    stream=True
):
    print(event)

Next Steps