Workflow DSL Reference
The Kubiya Workflow DSL provides a fluent, chainable API for building workflows programmatically.
Workflow Builder
Creating a Workflow
from kubiya_workflow_sdk.dsl import workflow
# Basic workflow
wf = workflow("my-workflow")
# With description
wf = workflow("my-workflow").description("Process daily data")
# Chain workflow (default)
wf = workflow("my-workflow").type("chain")
# Graph workflow (explicit dependencies)
wf = workflow("my-workflow").type("graph")
Workflow Methods
.description(desc: str)
Set workflow description.
wf.description("Daily ETL pipeline")
.runner(name: str)
Specify the runner to execute on.
wf.runner("my-k8s-runner") # Custom runner
wf.runner("kubiya-hosted") # Default
.env(**variables)
Set environment variables.
wf.env(
LOG_LEVEL="debug",
API_URL="https://api.example.com"
)
.params(**parameters)
Define parameters with defaults.
wf.params(
ENVIRONMENT="${ENVIRONMENT:-staging}",
VERSION="${VERSION}",
REPLICAS="3"
)
.schedule(cron: str)
Set cron schedule.
wf.schedule("0 2 * * *") # Daily at 2 AM
.timeout(seconds: int)
Set workflow timeout.
wf.timeout(3600) # 1 hour
Steps
Basic Steps
.step(name: str, command: str)
Add a simple command step.
wf.step("build", "docker build -t myapp:latest .")
wf.step("test", "pytest tests/")
Parallel Steps
.parallel_steps(name: str, items: list, command: str, max_concurrent: int = None)
Execute steps in parallel.
wf.parallel_steps(
"deploy-regions",
items=["us-east-1", "eu-west-1", "ap-south-1"],
command="deploy.sh ${ITEM}",
max_concurrent=2
)
Sub-workflows
.sub_workflow(name: str, workflow: str, params: dict = None)
Execute another workflow as a step.
wf.sub_workflow(
"run-tests",
workflow="test-suite",
params={"env": "staging"}
)
Executors
Use specific executors for steps:
Shell Executor
from kubiya_workflow_sdk.dsl import shell_executor
wf.step("backup").executor(
shell_executor("pg_dump -h localhost -U postgres mydb > backup.sql")
)
Python Executor
from kubiya_workflow_sdk.dsl import python_executor
wf.step("process").executor(
python_executor("""
import pandas as pd
df = pd.read_csv('data.csv')
print(f"Processed {len(df)} rows")
""",
packages=["pandas", "numpy"])
)
Docker Executor
from kubiya_workflow_sdk.dsl import docker_executor
wf.step("scan").executor(
docker_executor(
image="aquasec/trivy:latest",
command="image --severity HIGH myapp:latest"
)
)
Inline Agent Executor
from kubiya_workflow_sdk.dsl import inline_agent_executor
wf.step("analyze").executor(
inline_agent_executor(
message="Analyze the test results and decide if we should deploy",
runners=["kubiya-hosted"],
ai_instructions="You are a deployment decision maker"
)
)
Advanced Features
Lifecycle Handlers
wf.handlers(
success="echo 'Workflow completed successfully'",
failure="./scripts/rollback.sh",
exit="./scripts/cleanup.sh"
)
Email Notifications
wf.notifications(
mail_on_failure=True,
mail_on_success=False,
error_mail={
"to": ["ops@example.com"],
"subject": "Workflow Failed: ${WORKFLOW_NAME}"
}
)
Queue Management
wf.queue("critical-jobs", max_active_runs=1)
Resource Management
wf.max_active_steps(5) # Limit concurrent steps
wf.max_output_size(10485760) # 10MB max output
wf.tags("production", "etl", "daily")
wf.group("data-pipelines")
Complete Example
from kubiya_workflow_sdk.dsl import workflow, python_executor, shell_executor
# Build a complete data pipeline
pipeline = (
workflow("data-pipeline")
.description("Daily data processing pipeline")
.runner("production-runner")
.schedule("0 2 * * *")
.env(
AWS_REGION="us-east-1",
LOG_LEVEL="info"
)
.params(
DATE="${DATE:-$(date +%Y-%m-%d)}",
BATCH_SIZE="1000"
)
# Extract data
.step("extract", "aws s3 cp s3://data-lake/raw/${DATE}/ /tmp/data/ --recursive")
# Process with Python
.step("transform")
.executor(python_executor("""
import pandas as pd
import glob
import os
date = os.getenv('DATE')
batch_size = int(os.getenv('BATCH_SIZE'))
# Process all files
for file in glob.glob('/tmp/data/*.csv'):
df = pd.read_csv(file)
# Transform logic here
df.to_parquet(file.replace('.csv', '.parquet'))
print(f"Processed {file}: {len(df)} rows")
""", packages=["pandas", "pyarrow"]))
# Load to warehouse
.step("load", "aws s3 sync /tmp/data/ s3://data-warehouse/processed/${DATE}/")
# Cleanup
.step("cleanup", "rm -rf /tmp/data/")
# Handlers
.handlers(
success="./notify.sh success",
failure="./notify.sh failure && ./rollback.sh"
)
# Resource limits
.timeout(7200) # 2 hours
.max_active_steps(3)
)
# Export as YAML
print(pipeline.to_yaml())
# Execute
from kubiya_workflow_sdk import execute_workflow
result = execute_workflow(pipeline.to_dict(), api_key="YOUR_KEY")
Validation
Validate workflow before execution:
validation = wf.validate()
if validation["valid"]:
print("Workflow is valid!")
else:
print(f"Errors: {validation['errors']}")
print(f"Warnings: {validation['warnings']}")
# As dictionary
workflow_dict = wf.to_dict()
# As YAML
workflow_yaml = wf.to_yaml()
# As JSON
workflow_json = wf.to_json(indent=2)
Next Steps