Workflow DSL Reference

The Kubiya Workflow DSL provides a fluent, chainable API for building workflows programmatically.

Workflow Builder

Creating a Workflow

from kubiya_workflow_sdk.dsl import workflow

# Basic workflow
wf = workflow("my-workflow")

# With description
wf = workflow("my-workflow").description("Process daily data")

# Chain workflow (default)
wf = workflow("my-workflow").type("chain")

# Graph workflow (explicit dependencies)
wf = workflow("my-workflow").type("graph")

Workflow Methods

.description(desc: str)

Set workflow description.

wf.description("Daily ETL pipeline")

.runner(name: str)

Specify the runner to execute on.

wf.runner("my-k8s-runner")  # Custom runner
wf.runner("kubiya-hosted")  # Default

.env(**variables)

Set environment variables.

wf.env(
    LOG_LEVEL="debug",
    API_URL="https://api.example.com"
)

.params(**parameters)

Define parameters with defaults.

wf.params(
    ENVIRONMENT="${ENVIRONMENT:-staging}",
    VERSION="${VERSION}",
    REPLICAS="3"
)

.schedule(cron: str)

Set cron schedule.

wf.schedule("0 2 * * *")  # Daily at 2 AM

.timeout(seconds: int)

Set workflow timeout.

wf.timeout(3600)  # 1 hour

Steps

Basic Steps

.step(name: str, command: str)

Add a simple command step.

wf.step("build", "docker build -t myapp:latest .")
wf.step("test", "pytest tests/")

Parallel Steps

.parallel_steps(name: str, items: list, command: str, max_concurrent: int = None)

Execute steps in parallel.

wf.parallel_steps(
    "deploy-regions",
    items=["us-east-1", "eu-west-1", "ap-south-1"],
    command="deploy.sh ${ITEM}",
    max_concurrent=2
)

Sub-workflows

.sub_workflow(name: str, workflow: str, params: dict = None)

Execute another workflow as a step.

wf.sub_workflow(
    "run-tests",
    workflow="test-suite",
    params={"env": "staging"}
)

Executors

Use specific executors for steps:

Shell Executor

from kubiya_workflow_sdk.dsl import shell_executor

wf.step("backup").executor(
    shell_executor("pg_dump -h localhost -U postgres mydb > backup.sql")
)

Python Executor

from kubiya_workflow_sdk.dsl import python_executor

wf.step("process").executor(
    python_executor("""
import pandas as pd
df = pd.read_csv('data.csv')
print(f"Processed {len(df)} rows")
    """, 
    packages=["pandas", "numpy"])
)

Docker Executor

from kubiya_workflow_sdk.dsl import docker_executor

wf.step("scan").executor(
    docker_executor(
        image="aquasec/trivy:latest",
        command="image --severity HIGH myapp:latest"
    )
)

Inline Agent Executor

from kubiya_workflow_sdk.dsl import inline_agent_executor

wf.step("analyze").executor(
    inline_agent_executor(
        message="Analyze the test results and decide if we should deploy",
        runners=["kubiya-hosted"],
        ai_instructions="You are a deployment decision maker"
    )
)

Advanced Features

Lifecycle Handlers

wf.handlers(
    success="echo 'Workflow completed successfully'",
    failure="./scripts/rollback.sh",
    exit="./scripts/cleanup.sh"
)

Email Notifications

wf.notifications(
    mail_on_failure=True,
    mail_on_success=False,
    error_mail={
        "to": ["ops@example.com"],
        "subject": "Workflow Failed: ${WORKFLOW_NAME}"
    }
)

Queue Management

wf.queue("critical-jobs", max_active_runs=1)

Resource Management

wf.max_active_steps(5)  # Limit concurrent steps
wf.max_output_size(10485760)  # 10MB max output

Metadata

wf.tags("production", "etl", "daily")
wf.group("data-pipelines")

Complete Example

from kubiya_workflow_sdk.dsl import workflow, python_executor, shell_executor

# Build a complete data pipeline
pipeline = (
    workflow("data-pipeline")
    .description("Daily data processing pipeline")
    .runner("production-runner")
    .schedule("0 2 * * *")
    .env(
        AWS_REGION="us-east-1",
        LOG_LEVEL="info"
    )
    .params(
        DATE="${DATE:-$(date +%Y-%m-%d)}",
        BATCH_SIZE="1000"
    )
    
    # Extract data
    .step("extract", "aws s3 cp s3://data-lake/raw/${DATE}/ /tmp/data/ --recursive")
    
    # Process with Python
    .step("transform")
    .executor(python_executor("""
import pandas as pd
import glob
import os

date = os.getenv('DATE')
batch_size = int(os.getenv('BATCH_SIZE'))

# Process all files
for file in glob.glob('/tmp/data/*.csv'):
    df = pd.read_csv(file)
    # Transform logic here
    df.to_parquet(file.replace('.csv', '.parquet'))
    print(f"Processed {file}: {len(df)} rows")
    """, packages=["pandas", "pyarrow"]))
    
    # Load to warehouse
    .step("load", "aws s3 sync /tmp/data/ s3://data-warehouse/processed/${DATE}/")
    
    # Cleanup
    .step("cleanup", "rm -rf /tmp/data/")
    
    # Handlers
    .handlers(
        success="./notify.sh success",
        failure="./notify.sh failure && ./rollback.sh"
    )
    
    # Resource limits
    .timeout(7200)  # 2 hours
    .max_active_steps(3)
)

# Export as YAML
print(pipeline.to_yaml())

# Execute
from kubiya_workflow_sdk import execute_workflow
result = execute_workflow(pipeline.to_dict(), api_key="YOUR_KEY")

Validation

Validate workflow before execution:

validation = wf.validate()
if validation["valid"]:
    print("Workflow is valid!")
else:
    print(f"Errors: {validation['errors']}")
    print(f"Warnings: {validation['warnings']}")

Export Formats

# As dictionary
workflow_dict = wf.to_dict()

# As YAML
workflow_yaml = wf.to_yaml()

# As JSON
workflow_json = wf.to_json(indent=2)

Next Steps