Skip to main content
The Kubiya Workflow DSL (Domain Specific Language) allows you to define complex automation workflows using Python’s intuitive syntax. Write workflows as code with full IDE support, type safety, and version control integration.

Key Features

  • Python-native DSL - Use Python’s syntax you already know
  • Type-safe - IDE autocompletion and type checking
  • Version controlled - Git-friendly workflow definitions
  • Testable - Unit test workflows before deployment
  • Composable - Reuse steps and workflows across projects

Quick Start

Simple Workflow

from kubiya.dsl import workflow

# Define a workflow
wf = (
    workflow("hello-world")
        .description("My first workflow")
        .step("greet", "echo 'Hello from Kubiya!'")
)

# Convert to dict for execution
workflow_dict = wf.to_dict()

Multi-Step Workflow

wf = (
    workflow("deploy-service")
        .description("Deploy application to Kubernetes")
        .step("build", "docker build -t myapp:latest .")
        .step("push", "docker push myapp:latest")
        .step("deploy", "kubectl apply -f deployment.yaml")
        .step("verify", "kubectl rollout status deployment/myapp")
)

Core Concepts

1. Workflows

Workflows are containers for steps and configuration:
from kubiya.dsl import workflow

wf = (
    workflow("workflow-name")
        .description("What this workflow does")
        .params(
            ENVIRONMENT="staging",
            VERSION="v1.0.0"
        )
        .runner("kubiya-hosted-runner")
)

2. Steps

Steps are the building blocks - individual operations:
# Simple shell command
wf.step("check-status", "kubectl get pods")

# Step with callback for advanced configuration
wf.step("deploy", callback=lambda s:
    s.shell("kubectl apply -f deployment.yaml")
        .output("DEPLOYMENT_RESULT")
)

3. Dependencies

Control execution order with dependencies:
wf = (
    workflow("pipeline")
        .step("build", "docker build -t app .")
        .step("test", callback=lambda s:
            s.shell("pytest tests/")
                .depends("build")
        )
        .step("deploy", callback=lambda s:
            s.shell("kubectl apply -f deployment.yaml")
                .depends("test")
        )
)

4. Parameters and Outputs

Pass data between steps:
wf = (
    workflow("data-pipeline")
        .params(INPUT_FILE="data.csv")

        # Step that produces output
        .step("process", callback=lambda s:
            s.shell("wc -l ${INPUT_FILE}")
                .output("LINE_COUNT")
        )

        # Step that uses the output
        .step("report", callback=lambda s:
            s.shell("echo 'Processed {{LINE_COUNT}} lines'")
                .depends("process")
        )
)

Step Types

Shell Commands

wf.step("simple-command", "ls -la /tmp")

wf.step("with-script", callback=lambda s:
    s.shell("""
        #!/bin/bash
        echo "Starting process..."
        for i in {1..5}; do
            echo "Step $i"
        done
    """)
)

Python Code

wf.step("python-task", callback=lambda s:
    s.python("""
import json
import sys

data = {"status": "success", "count": 42}
print(json.dumps(data))
    """)
)

Docker Containers

wf.step("containerized-task", callback=lambda s:
    s.docker(
        image="python:3.11-slim",
        content="""
#!/usr/bin/env python3
print("Running in Docker container!")
        """
    )
)

Kubiya API Calls

wf.step("api-call", callback=lambda s:
    s.kubiya(
        url="api/v3/runners",
        method="GET"
    ).output("RUNNERS_DATA")
)

Advanced Features

Conditional Execution

from kubiya.dsl import workflow

wf = (
    workflow("conditional-deploy")
        .params(ENVIRONMENT="staging")

        # Always run
        .step("build", "docker build -t app .")

        # Only run in production
        .step("security-scan", callback=lambda s:
            s.shell("trivy image app:latest")
                .condition("${ENVIRONMENT} == 'production'")
        )
)

Error Handling

wf = (
    workflow("resilient-pipeline")
        .step("risky-operation", callback=lambda s:
            s.shell("./deploy.sh")
                .retry(max_attempts=3, backoff="exponential")
        )

        # Rollback on failure
        .step("rollback", callback=lambda s:
            s.shell("./rollback.sh")
                .condition("${risky-operation.status} == 'failed'")
        )
)

Tool Definitions

Define custom tools inline:
wf.step("custom-tool", callback=lambda s:
    s.tool_def(
        name="health_checker",
        description="Check service health",
        type="docker",
        image="alpine:latest",
        content="""
#!/bin/sh
curl -f $SERVICE_URL/health || exit 1
        """,
        args={"SERVICE_URL": "https://api.example.com"}
    )
)

Complete Example

Here’s a comprehensive CI/CD workflow:
from kubiya import KubiyaClient, workflow

# Initialize client
client = KubiyaClient(api_key="your-api-key")

# Define workflow
cicd_pipeline = (
    workflow("ci-cd-pipeline")
        .description("Complete CI/CD pipeline with testing and deployment")
        .params(
            BRANCH="main",
            SERVICE_NAME="my-service",
            VERSION="v1.0.0",
            ENVIRONMENT="staging"
        )

        # Step 1: Checkout code
        .step("checkout", "git clone -b ${BRANCH} https://github.com/org/repo.git")

        # Step 2: Run tests
        .step("test", callback=lambda s:
            s.shell("cd repo && pytest tests/ -v")
                .depends("checkout")
        )

        # Step 3: Build Docker image
        .step("build", callback=lambda s:
            s.shell("cd repo && docker build -t ${SERVICE_NAME}:${VERSION} .")
                .depends("test")
        )

        # Step 4: Security scan
        .step("security-scan", callback=lambda s:
            s.docker(
                image="aquasec/trivy:latest",
                content="trivy image ${SERVICE_NAME}:${VERSION}"
            )
            .depends("build")
        )

        # Step 5: Push to registry
        .step("push", callback=lambda s:
            s.shell("docker push ${SERVICE_NAME}:${VERSION}")
                .depends("security-scan")
        )

        # Step 6: Deploy
        .step("deploy", callback=lambda s:
            s.shell("kubectl set image deployment/${SERVICE_NAME} ${SERVICE_NAME}=${SERVICE_NAME}:${VERSION} -n ${ENVIRONMENT}")
                .depends("push")
                .output("DEPLOYMENT_STATUS")
        )

        # Step 7: Verify deployment
        .step("verify", callback=lambda s:
            s.shell("kubectl rollout status deployment/${SERVICE_NAME} -n ${ENVIRONMENT}")
                .depends("deploy")
        )

        # Step 8: Smoke tests
        .step("smoke-test", callback=lambda s:
            s.python("""
import requests
response = requests.get("https://${SERVICE_NAME}.${ENVIRONMENT}.example.com/health")
assert response.status_code == 200, "Health check failed"
print("✅ Smoke tests passed!")
            """)
            .depends("verify")
        )
)

# Execute the workflow with streaming
print("🚀 Starting CI/CD pipeline...")
for event in client.execute_workflow(cicd_pipeline.to_dict(), stream=True):
    print(f"📋 {event}")
print("✅ Pipeline completed!")

DSL Methods Reference

Workflow Methods

MethodDescription
.description(text)Set workflow description
.params(**kwargs)Define workflow parameters
.step(name, command)Add a simple step
.step(name, callback=fn)Add a step with advanced configuration
.runner(name)Set the workflow runner
.to_dict()Convert to dictionary
.to_json()Convert to JSON string
.to_yaml()Convert to YAML string

Step Configuration Methods

MethodDescription
.shell(command)Execute shell command
.python(code)Execute Python code
.docker(image, content)Run in Docker container
.kubiya(url, method)Call Kubiya API
.tool_def(...)Define custom tool
.depends(step_name)Add dependency
.output(var_name)Capture output
.condition(expr)Add condition
.description(text)Set step description

Best Practices

Each step should do one thing well:
# Good - separate concerns
.step("build", "docker build -t app .")
.step("test", "pytest tests/")
.step("deploy", "kubectl apply -f deployment.yaml")

# Avoid - too much in one step
.step("build-test-deploy", "docker build && pytest && kubectl apply")
# Good - clear purpose
.step("validate-deployment-prerequisites", ...)
.step("execute-database-migration", ...)

# Avoid - vague names
.step("step1", ...)
.step("do-stuff", ...)
Make workflows reusable:
wf = (
    workflow("deploy")
        .params(
            ENVIRONMENT="staging",  # Default value
            VERSION="latest",
            REPLICAS="3"
        )
)
Document your workflows:
wf = (
    workflow("complex-pipeline")
        .description("CI/CD pipeline with security scanning and rollback")
)

wf.step("deploy", callback=lambda s:
    s.shell("kubectl apply -f deployment.yaml")
        .description("Deploy application to Kubernetes cluster")
)

Next Steps