SDK Examples

Learn by example with these real-world SDK usage patterns.

Basic Examples

Hello World Workflow

from kubiya_workflow_sdk.dsl import workflow
from kubiya_workflow_sdk import execute_workflow

# Create a simple workflow
wf = (
    workflow("hello-world")
    .description("My first Kubiya workflow")
    .step("greet", "echo 'Hello from Kubiya!'")
)

# Execute it
result = execute_workflow(
    wf.to_dict(),
    api_key="YOUR_API_KEY",
    stream=False
)
print(result)

Multi-Step Pipeline

from kubiya_workflow_sdk.dsl import workflow, shell_executor, python_executor

# Create a data processing pipeline
pipeline = (
    workflow("data-pipeline")
    .description("Download and process data")
    
    # Step 1: Download data
    .step("download")
    .executor(shell_executor(
        "wget https://example.com/data.csv -O /tmp/data.csv"
    ))
    
    # Step 2: Process with Python
    .step("process")
    .executor(python_executor("""
import pandas as pd

# Load and process data
df = pd.read_csv('/tmp/data.csv')
df_clean = df.dropna()
df_clean.to_csv('/tmp/clean.csv', index=False)
print(f"Processed {len(df_clean)} rows")
    """, packages=["pandas"]))
    
    # Step 3: Upload results
    .step("upload", "aws s3 cp /tmp/clean.csv s3://mybucket/processed/")
)

# Execute with streaming
import json
for event_str in execute_workflow(pipeline.to_dict(), stream=True):
    try:
        event = json.loads(event_str)
        print(f"Event: {event.get('type')} - {event.get('message', '')}")
    except:
        print(event_str)

AI-Powered Workflows

Generate Workflow from Natural Language

from kubiya_workflow_sdk.providers import get_provider
import asyncio

async def generate_and_run():
    # Get ADK provider
    adk = get_provider(
        "adk",
        api_key="YOUR_API_KEY",
        model="gemini-1.5-pro"
    )
    
    # Generate workflow from description
    result = await adk.compose(
        task="""
        Create a workflow that:
        1. Clones a git repository
        2. Runs tests with pytest
        3. Builds a Docker image if tests pass
        4. Pushes to registry
        """,
        mode="plan"  # Just generate, don't execute yet
    )
    
    # The result contains the generated workflow
    print("Generated workflow:")
    print(result)

# Run the async function
asyncio.run(generate_and_run())

Inline AI Agent for Decision Making

from kubiya_workflow_sdk.dsl import workflow, inline_agent_executor

# Create workflow with AI analysis
smart_workflow = (
    workflow("smart-deployment")
    .description("Deploy with AI decision making")
    
    # Run tests
    .step("test", "pytest tests/ -v --json-report --json-report-file=/tmp/test-results.json")
    
    # AI analyzes results
    .step("analyze")
    .executor(inline_agent_executor(
        message="Analyze test results from /tmp/test-results.json and decide if we should deploy",
        runners=["kubiya-hosted"],
        ai_instructions="""You are a deployment decision maker. Analyze test results and respond with:
        - should_deploy: true/false
        - confidence: 0-100
        - reason: explanation of decision"""
    ))
    
    # Deploy if approved
    .step("deploy", "kubectl apply -f deployment.yaml")
)

DevOps Automation

CI/CD Pipeline

from kubiya_workflow_sdk.dsl import workflow

# Create CI/CD pipeline
cicd = (
    workflow("ci-cd-pipeline")
    .description("Complete CI/CD pipeline")
    .params(BRANCH="${BRANCH:-main}")
    
    # Checkout code
    .step("checkout", "git clone -b ${BRANCH} https://github.com/myorg/myapp.git")
    
    # Run linting
    .step("lint", "cd myapp && flake8 . --config=.flake8")
    
    # Run tests in parallel
    .parallel_steps(
        "tests",
        items=["unit", "integration", "e2e"],
        command="cd myapp && pytest tests/${ITEM} -v",
        max_concurrent=3
    )
    
    # Build Docker image
    .step("build", """
        cd myapp
        docker build -t myapp:${BUILD_ID} .
        docker tag myapp:${BUILD_ID} myapp:latest
    """)
    
    # Push to registry
    .step("push", """
        docker push myregistry.io/myapp:${BUILD_ID}
        docker push myregistry.io/myapp:latest
    """)
)

Infrastructure as Code

from kubiya_workflow_sdk.dsl import workflow, inline_agent_executor

# Terraform deployment with AI review
terraform_workflow = (
    workflow("terraform-deploy")
    .description("Deploy infrastructure with Terraform")
    .params(ENVIRONMENT="${ENVIRONMENT}")
    
    # Initialize Terraform
    .step("init", "terraform init")
    
    # Plan changes
    .step("plan", "terraform plan -var='env=${ENVIRONMENT}' -out=tfplan")
    
    # AI reviews the plan
    .step("review")
    .executor(inline_agent_executor(
        message="Review the Terraform plan output from the previous step",
        ai_instructions="You are a Terraform expert. Review the plan and identify any risks.",
        runners=["kubiya-hosted"]
    ))
    
    # Apply if safe
    .step("apply", "terraform apply -auto-approve tfplan")
)

Data Engineering

ETL Pipeline

from kubiya_workflow_sdk.dsl import workflow, python_executor

# Create ETL workflow
etl = (
    workflow("etl-pipeline")
    .description("Extract, Transform, Load pipeline")
    .env(
        DB_HOST="${DB_HOST}",
        DB_USER="${DB_USER}",
        DB_PASS="${DB_PASS}"
    )
    
    # Extract data from multiple sources in parallel
    .parallel_steps(
        "extract",
        items=["postgres", "mysql", "mongodb"],
        command="python extract_${ITEM}.py",
        max_concurrent=3
    )
    
    # Transform data
    .step("transform")
    .executor(python_executor("""
import pandas as pd
import glob

# Load all extracted files
dfs = []
for file in glob.glob("/tmp/extract_*.csv"):
    dfs.append(pd.read_csv(file))

# Combine and transform
df_combined = pd.concat(dfs, ignore_index=True)
df_transformed = df_combined.drop_duplicates()

# Add metadata
df_transformed['processed_at'] = pd.Timestamp.now()
df_transformed['pipeline_version'] = '1.0.0'

# Save result
df_transformed.to_parquet("/tmp/transformed.parquet")
print(f"Transformed {len(df_transformed)} total rows")
    """, packages=["pandas", "pyarrow"]))
    
    # Load to data warehouse
    .step("load")
    .executor(python_executor("""
import pandas as pd
from sqlalchemy import create_engine
import os

# Load transformed data
df = pd.read_parquet("/tmp/transformed.parquet")

# Connect to warehouse
engine = create_engine(
    f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASS')}@{os.getenv('DB_HOST')}/warehouse"
)

# Load data
df.to_sql('fact_table', engine, if_exists='append', index=False)
print(f"Loaded {len(df)} rows to warehouse")
    """, packages=["pandas", "sqlalchemy", "psycopg2-binary"]))
)

ML Pipeline

from kubiya_workflow_sdk.dsl import workflow, python_executor

# Machine learning training pipeline
ml_pipeline = (
    workflow("ml-training")
    .description("Train and evaluate ML model")
    .params(
        DATASET_PATH="${DATASET_PATH}",
        MODEL_TYPE="${MODEL_TYPE:-random_forest}"
    )
    
    # Prepare data
    .step("prepare")
    .executor(python_executor("""
import pandas as pd
from sklearn.model_selection import train_test_split
import os

# Load data
df = pd.read_csv(os.getenv('DATASET_PATH'))

# Prepare features and target
X = df.drop('target', axis=1)
y = df['target']

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Save splits
X_train.to_csv('/tmp/X_train.csv', index=False)
X_test.to_csv('/tmp/X_test.csv', index=False)
y_train.to_csv('/tmp/y_train.csv', index=False)
y_test.to_csv('/tmp/y_test.csv', index=False)

print(f"Train set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")
    """, packages=["pandas", "scikit-learn"]))
    
    # Train model
    .step("train")
    .executor(python_executor("""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import os

# Load training data
X_train = pd.read_csv('/tmp/X_train.csv')
y_train = pd.read_csv('/tmp/y_train.csv').values.ravel()

# Select model based on parameter
model_type = os.getenv('MODEL_TYPE', 'random_forest')
if model_type == 'random_forest':
    model = RandomForestClassifier(n_estimators=100, random_state=42)
else:
    model = LogisticRegression(random_state=42)

# Train model
model.fit(X_train, y_train)

# Save model
joblib.dump(model, '/tmp/model.pkl')
print(f"Trained {model_type} model successfully")
    """, packages=["pandas", "scikit-learn", "joblib"]))
    
    # Evaluate model
    .step("evaluate")
    .executor(python_executor("""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import json

# Load model and test data
model = joblib.load('/tmp/model.pkl')
X_test = pd.read_csv('/tmp/X_test.csv')
y_test = pd.read_csv('/tmp/y_test.csv').values.ravel()

# Make predictions
y_pred = model.predict(X_test)

# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision, recall, f1, _ = precision_recall_fscore_support(
    y_test, y_pred, average='weighted'
)

metrics = {
    'accuracy': float(accuracy),
    'precision': float(precision),
    'recall': float(recall),
    'f1_score': float(f1)
}

# Save metrics
with open('/tmp/metrics.json', 'w') as f:
    json.dump(metrics, f, indent=2)

print(f"Model Evaluation:")
print(f"  Accuracy: {accuracy:.3f}")
print(f"  Precision: {precision:.3f}")
print(f"  Recall: {recall:.3f}")
print(f"  F1 Score: {f1:.3f}")
    """, packages=["pandas", "scikit-learn", "joblib"]))
)

Advanced Patterns

Dynamic Workflow Generation

from kubiya_workflow_sdk.dsl import workflow
from kubiya_workflow_sdk import execute_workflow

def create_dynamic_workflow(services: list):
    """Generate a workflow based on the services list"""
    
    # Start with base workflow
    wf = (
        workflow("dynamic-deployment")
        .description(f"Deploy {len(services)} services")
        .env(ENVIRONMENT="production")
    )
    
    # Add health check for each service
    for service in services:
        wf.step(
            f"check-{service}",
            f"kubectl get deployment {service} -n production"
        )
    
    # Deploy services in parallel
    wf.parallel_steps(
        "deploy-services",
        items=services,
        command="kubectl set image deployment/${ITEM} ${ITEM}=${ITEM}:latest -n production",
        max_concurrent=3
    )
    
    # Verify all services
    for service in services:
        wf.step(
            f"verify-{service}",
            f"kubectl rollout status deployment/{service} -n production"
        )
    
    return wf

# Create and execute dynamic workflow
services = ["api", "frontend", "worker", "scheduler"]
dynamic_wf = create_dynamic_workflow(services)

# Execute
for event in execute_workflow(dynamic_wf.to_dict(), stream=True):
    print(event)

Error Handling and Retry Patterns

from kubiya_workflow_sdk.dsl import workflow

# Workflow with comprehensive error handling
resilient_workflow = (
    workflow("resilient-deployment")
    .description("Deployment with retries and rollback")
    
    # Take snapshot before deployment
    .step("snapshot", "kubectl create backup deployment-$(date +%s)")
    
    # Deploy with health checks
    .step("deploy", "kubectl apply -f deployment.yaml")
    
    # Health check with retries
    .step("health_check", """
        for i in {1..5}; do
            if curl -f http://service/health; then
                echo "Health check passed"
                exit 0
            fi
            echo "Health check failed, attempt $i/5"
            sleep 30
        done
        exit 1
    """)
    
    # Rollback on failure
    .step("rollback", "kubectl rollback deployment/app")
)

Using FastMCP Provider

from kubiya_workflow_sdk.providers import get_provider
import asyncio

async def use_fastmcp():
    # Get FastMCP provider for tool execution
    mcp = get_provider(
        "fastmcp",
        server_path="/path/to/mcp/server",
        args=["--config", "mcp-config.json"]
    )
    
    # Execute tools via MCP
    result = await mcp.execute_tool(
        tool_name="database-query",
        arguments={"query": "SELECT * FROM users LIMIT 10"}
    )
    
    print(f"Query result: {result}")

asyncio.run(use_fastmcp())

Working with the Kubiya Client

from kubiya_workflow_sdk import KubiyaClient

# Initialize client
client = KubiyaClient(
    api_key="YOUR_API_KEY",
    base_url="https://api.kubiya.ai",  # Optional
    runner="kubiya-hosted"  # Optional
)

# List available integrations
integrations = client.get_integrations()
for integration in integrations:
    print(f"- {integration['name']}: {integration.get('description', 'No description')}")

# Get available runners
runners = client.get_runners()
for runner in runners:
    print(f"Runner: {runner['name']} - Status: {runner.get('status', 'Unknown')}")

# Execute a workflow with the client
workflow_dict = {
    "name": "quick-task",
    "steps": [
        {"name": "task", "command": "echo 'Running quick task'"}
    ]
}

# Stream execution
for event in client.execute_workflow(workflow_dict, stream=True):
    print(event)

Next Steps