Prerequisites: Make sure you have installed the SDK and have your Kubiya API key ready.

Your First Workflow

Let’s build a simple deployment workflow that showcases the power of Kubiya SDK.

1

Import the SDK

from kubiya_workflow_sdk.dsl import workflow
from kubiya_workflow_sdk import KubiyaClient, execute_workflow
2

Create a Basic Workflow

# Define workflow using the DSL
my_workflow = (
    workflow("hello-deployment")
    .description("My first deployment workflow")
    .step("prepare", "echo 'Preparing deployment...'")
    .step("deploy", "echo 'Deploying application v1.0'")
    .step("verify", "echo 'Deployment successful!'")
)
3

Execute the Workflow

# Option 1: Using the convenience function
result = execute_workflow(
    my_workflow.to_dict(),
    api_key="YOUR_API_KEY",  # Or use KUBIYA_API_KEY env var
    stream=True
)

for event in result:
    print(event)

# Option 2: Using the client
client = KubiyaClient(api_key="YOUR_API_KEY")
result = client.execute_workflow(my_workflow.to_dict(), stream=True)
4

Stream Real-time Updates

# Execute with streaming for real-time updates
import json

for event_str in execute_workflow(my_workflow.to_dict(), stream=True):
    try:
        event = json.loads(event_str)
        if event.get("type") == "step_started":
            print(f"▶️  Starting: {event.get('step_name')}")
        elif event.get("type") == "step_completed":
            print(f"✅ Completed: {event.get('step_name')}")
        elif event.get("type") == "log":
            print(f"   Output: {event.get('message')}")
    except json.JSONDecodeError:
        # Handle raw string events
        print(event_str)

Advanced Example: Real Deployment

Now let’s create a more realistic deployment workflow with error handling and retries:

from kubiya_workflow_sdk.dsl import workflow, step, retry_policy
from kubiya_workflow_sdk import execute_workflow
import json

# Create a production-ready deployment workflow
deployment_workflow = (
    workflow("production-deployment")
    .description("Deploy application with health checks")
    .params(
        APP_NAME="${APP_NAME}",
        VERSION="${VERSION}",
        ENVIRONMENT="staging"
    )
    
    # Step 1: Validate deployment parameters
    .step("validate", """
        if [ -z "$APP_NAME" ] || [ -z "$VERSION" ]; then
            echo "Error: APP_NAME and VERSION are required"
            exit 1
        fi
        echo "Deploying $APP_NAME version $VERSION to $ENVIRONMENT"
    """)
    
    # Step 2: Deploy application
    .step("deploy", """
        echo "Deploying ${APP_NAME}:${VERSION}..."
        # In real scenario: kubectl set image deployment/${APP_NAME} app=${APP_NAME}:${VERSION}
        sleep 2
        echo "Deployment initiated"
    """)
    
    # Step 3: Health check with retries
    .step("health_check", """
        echo "Checking health..."
        # In real scenario: curl -f http://localhost:8080/health
        echo "Health check passed"
    """)
    
    # Step 4: Notify success
    .step("notify", """
        echo "🎉 Deployment Complete!"
        echo "Application: ${APP_NAME}"
        echo "Version: ${VERSION}"
        echo "Environment: ${ENVIRONMENT}"
        echo "Status: Healthy"
    """)
)

# Execute the workflow
params = {
    "APP_NAME": "my-awesome-app",
    "VERSION": "2.1.0"
}

print("🚀 Starting production deployment workflow...\n")

for event_str in execute_workflow(
    deployment_workflow.to_dict(),
    api_key="YOUR_API_KEY",
    parameters=params,
    stream=True
):
    try:
        event = json.loads(event_str)
        if event.get("type") == "step_started":
            print(f"▶️  {event.get('step_name', 'Unknown')}: Starting...")
        elif event.get("type") == "step_completed":
            print(f"✅ {event.get('step_name', 'Unknown')}: Completed")
        elif event.get("type") == "log":
            print(f"   {event.get('message', '')}")
    except json.JSONDecodeError:
        # Handle non-JSON events
        if "data:" in event_str:
            print(event_str.replace("data:", "").strip())

Working with Tools and AI

Use the ADK provider to generate workflows from natural language:

from kubiya_workflow_sdk.providers import get_provider
import asyncio

async def generate_workflow():
    # Initialize ADK provider
    adk = get_provider(
        "adk", 
        api_key="YOUR_API_KEY",
        model="gemini-1.5-pro"
    )
    
    # Generate a workflow from description
    task = """
    Create a workflow that:
    1. Backs up a PostgreSQL database
    2. Uploads the backup to S3
    3. Verifies the backup integrity
    4. Sends a Slack notification with results
    """
    
    # Generate workflow
    result = await adk.compose(
        task=task,
        mode="plan",  # Just generate, don't execute
        stream=True
    )
    
    async for event in result:
        print(event)

# Run the async function
asyncio.run(generate_workflow())

Using Inline Agents

Execute AI agents within your workflows:

from kubiya_workflow_sdk.dsl import workflow, inline_agent_executor

# Create workflow with AI decision making
analysis_workflow = (
    workflow("log-analysis")
    .description("Analyze logs with AI")
    
    # Collect logs
    .step("collect_logs", "tail -n 100 /var/log/app.log")
    
    # AI analyzes logs
    .step("analyze")
    .executor(inline_agent_executor(
        message="Analyze these logs and identify any errors or anomalies: ${collect_logs.output}",
        runners=["kubiya-hosted"],
        ai_instructions="You are a log analysis expert. Identify errors, patterns, and anomalies."
    ))
    
    # Take action based on analysis
    .step("notify_if_critical", """
        echo "Analysis complete"
        # In real scenario: send alerts if critical issues found
    """)
)

Using Python and Shell Executors

from kubiya_workflow_sdk.dsl import workflow, python_executor, shell_executor

# Mix Python and shell steps
data_workflow = (
    workflow("data-processing")
    .description("Process CSV data")
    
    # Download with shell
    .step("download")
    .executor(shell_executor("wget https://example.com/data.csv -O /tmp/data.csv"))
    
    # Process with Python
    .step("process")
    .executor(python_executor("""
import pandas as pd

# Load data
df = pd.read_csv('/tmp/data.csv')
print(f"Loaded {len(df)} rows")

# Clean data
df_clean = df.dropna()
df_clean.to_csv('/tmp/clean.csv', index=False)
print(f"Cleaned data: {len(df_clean)} rows")
    """, packages=["pandas"]))
    
    # Upload result
    .step("upload")
    .executor(shell_executor("aws s3 cp /tmp/clean.csv s3://mybucket/clean/"))
)

Next Steps

Common Patterns

Pro tip: Set your API key as an environment variable to avoid hardcoding:

export KUBIYA_API_KEY="your-api-key-here"