SDK API Reference

Complete reference for all SDK classes, methods, and functions.

Client

The main entry point for interacting with Kubiya API.

Class: Client

from kubiya_workflow_sdk import Client

client = Client(
    api_key: str = None,
    api_url: str = None,
    org_name: str = None,
    timeout: int = 30,
    retry_count: int = 3,
    verify_ssl: bool = True,
    proxy: str = None
)

Parameters

ParameterTypeDefaultDescription
api_keystrNoneKubiya API key. If not provided, reads from KUBIYA_API_KEY env var
api_urlstr"https://api.kubiya.ai"API endpoint URL
org_namestrNoneOrganization name for multi-org accounts
timeoutint30Request timeout in seconds
retry_countint3Number of retries for failed requests
verify_sslboolTrueWhether to verify SSL certificates
proxystrNoneHTTP/SOCKS proxy URL

Methods

execute_workflow(workflow, params=None, stream=False, runner=None)

Execute a workflow.

result = client.execute_workflow(
    workflow=my_workflow,
    params={"env": "production"},
    stream=True,
    runner="my-custom-runner"  # Optional: specify a runner created via Kubiya platform
)

Parameters:

  • workflow: Workflow object or dict
  • params: Dict of parameters to pass to workflow
  • stream: Boolean to enable streaming response
  • runner: String name of the runner (created via Kubiya platform). Defaults to “kubiya-hosted”

Returns: ExecutionResult or generator of StreamEvent

Runners must be created through the Kubiya platform (web interface or API). The platform will provide a Kubernetes manifest or Helm chart to deploy the runner in your infrastructure.

list_workflows()

List all workflows in the organization.

workflows = client.list_workflows()
for wf in workflows:
    print(f"{wf.name}: {wf.description}")
get_workflow(name)

Get a specific workflow by name.

workflow = client.get_workflow("data-pipeline")
test_connection()

Test API connectivity and authentication.

if client.test_connection():
    print("Connected successfully!")

Workflow

Class: Workflow

from kubiya_workflow_sdk import Workflow

workflow = Workflow(
    name: str,
    description: str = None,
    version: str = "1.0.0",
    steps: List[Step] = None,
    parameters: Dict[str, Parameter] = None,
    runner: str = "kubiya-hosted",
    tags: List[str] = None,
    metadata: Dict[str, Any] = None
)

Methods

add_step(step)

Add a step to the workflow.

workflow.add_step(
    Step(name="process", image="python:3.11", command="process.py")
)
to_dict()

Convert workflow to dictionary representation.

workflow_dict = workflow.to_dict()
to_yaml()

Convert workflow to YAML string.

yaml_string = workflow.to_yaml()
print(yaml_string)
validate()

Validate workflow structure and dependencies.

errors = workflow.validate()
if errors:
    print(f"Validation errors: {errors}")
execute(client=None, **kwargs)

Execute the workflow.

result = workflow.execute(
    client=client,
    params={"env": "staging"}
)

Step

Class: Step

from kubiya_workflow_sdk import Step

step = Step(
    name: str,
    image: str = None,
    command: str = None,
    script: str = None,
    env: Dict[str, str] = None,
    volumes: Dict[str, str] = None,
    resources: Dict[str, Any] = None,
    depends: List[str] = None,
    retry: RetryPolicy = None,
    timeout: str = "5m",
    when: str = None,
    outputs: Dict[str, str] = None
)

Properties

PropertyTypeDescription
namestrUnique step identifier
imagestrDocker image to use
commandstrCommand to execute
scriptstrScript content to run
envDict[str, str]Environment variables
volumesDict[str, str]Volume mounts
resourcesDict[str, Any]Resource limits/requests
dependsList[str]Step dependencies
retryRetryPolicyRetry configuration
timeoutstrExecution timeout
whenstrConditional execution
outputsDict[str, str]Output mappings

DSL Functions

workflow Decorator

from kubiya_workflow_sdk.dsl import workflow

@workflow(
    name: str = None,
    description: str = None,
    runner: str = "kubiya-hosted",
    version: str = "1.0.0"
)
def my_workflow():
    # Workflow logic
    pass

step Functions

step.shell(command, **kwargs)

Execute a shell command.

result = step.shell(
    "echo 'Hello World'",
    name="greeting",
    image="alpine:latest"
)

step.python(script, **kwargs)

Execute Python code.

output = step.python(
    """
    import pandas as pd
    df = pd.read_csv('data.csv')
    print(df.describe())
    """,
    name="analyze",
    packages=["pandas", "numpy"]
)

step.container(image, **kwargs)

Run a container with advanced options.

step.container(
    image="nginx:alpine",
    name="webserver",
    ports={"80": "8080"},
    health_check={
        "test": ["CMD", "curl", "-f", "http://localhost/"],
        "interval": "30s"
    }
)

step.inline_agent(message, **kwargs)

Execute an inline AI agent.

analysis = step.inline_agent(
    message="Analyze these logs: ${logs}",
    runners=["kubiya-hosted"],
    llm_model="gpt-4",
    tools=[{
        "name": "parse-logs",
        "type": "docker",
        "image": "log-parser:latest"
    }]
)

Providers

Function: get_provider(name, **kwargs)

Get a workflow provider instance.

from kubiya_workflow_sdk.providers import get_provider

provider = get_provider(
    "adk",
    api_key="...",
    model="gemini-1.5-pro"
)

Class: BaseProvider

Base class for custom providers.

from kubiya_workflow_sdk.providers import BaseProvider

class CustomProvider(BaseProvider):
    async def compose(self, task: str, mode: str = "plan", **kwargs):
        # Implementation
        pass
    
    async def execute_workflow(self, workflow, **kwargs):
        # Implementation
        pass

Streaming

Class: StreamEvent

@dataclass
class StreamEvent:
    type: str  # "step.started", "log", "step.completed", etc.
    timestamp: float
    step_name: str = None
    data: Dict[str, Any] = None
    error: str = None

Stream Event Types

TypeDescriptionData Fields
workflow.startedWorkflow execution startedworkflow_id, name
step.startedStep execution startedstep_name, image
logLog outputmessage, level
step.completedStep finishedstep_name, exit_code, duration
workflow.completedWorkflow finishedworkflow_id, status
errorError occurredmessage, step_name

Errors

Exception Classes

KubiyaError

Base exception for all SDK errors.

from kubiya_workflow_sdk.errors import KubiyaError

try:
    client.execute_workflow(workflow)
except KubiyaError as e:
    print(f"Kubiya error: {e}")

AuthenticationError

Raised for authentication failures.

from kubiya_workflow_sdk.errors import AuthenticationError

try:
    client = Client(api_key="invalid")
except AuthenticationError as e:
    print("Invalid API key")

ValidationError

Raised for workflow validation failures.

from kubiya_workflow_sdk.errors import ValidationError

try:
    workflow.validate()
except ValidationError as e:
    print(f"Validation failed: {e.errors}")

ExecutionError

Raised for workflow execution failures.

from kubiya_workflow_sdk.errors import ExecutionError

try:
    result = workflow.execute()
except ExecutionError as e:
    print(f"Execution failed at step: {e.step_name}")

Utilities

Function: load_workflow(path)

Load workflow from YAML file.

from kubiya_workflow_sdk.utils import load_workflow

workflow = load_workflow("workflows/pipeline.yaml")

Function: save_workflow(workflow, path)

Save workflow to YAML file.

from kubiya_workflow_sdk.utils import save_workflow

save_workflow(workflow, "workflows/pipeline.yaml")

Function: merge_workflows(*workflows)

Merge multiple workflows into one.

from kubiya_workflow_sdk.utils import merge_workflows

combined = merge_workflows(etl_workflow, ml_workflow)

Type Definitions

RetryPolicy

from kubiya_workflow_sdk.types import RetryPolicy

retry = RetryPolicy(
    max_attempts: int = 3,
    backoff: str = "exponential",  # "constant", "linear", "exponential"
    initial_delay: str = "1s",
    max_delay: str = "5m"
)

ResourceSpec

from kubiya_workflow_sdk.types import ResourceSpec

resources = ResourceSpec(
    requests={"cpu": "1", "memory": "2Gi"},
    limits={"cpu": "2", "memory": "4Gi"},
    gpus=1  # Optional GPU request
)

Parameter

from kubiya_workflow_sdk.types import Parameter

param = Parameter(
    name="environment",
    type="string",
    default="staging",
    choices=["dev", "staging", "production"],
    required=False,
    description="Target environment"
)

Examples

Basic Workflow

from kubiya_workflow_sdk import Client, Workflow, Step

# Create workflow
workflow = Workflow(
    name="hello-world",
    steps=[
        Step(name="greet", image="alpine", command="echo 'Hello!'")
    ]
)

# Execute
client = Client()
result = client.execute_workflow(workflow)
print(result.output)

Streaming Execution

# Stream execution events
for event in client.execute_workflow(workflow, stream=True):
    if event.type == "log":
        print(event.data["message"])
    elif event.type == "step.completed":
        print(f"Step {event.step_name} completed")

Error Handling

from kubiya_workflow_sdk.errors import ExecutionError, StepError

try:
    result = workflow.execute()
except StepError as e:
    print(f"Step {e.step_name} failed: {e.message}")
    # Handle step failure
except ExecutionError as e:
    print(f"Workflow failed: {e}")
    # Handle workflow failure

Version Compatibility

SDK VersionAPI VersionPython Version
2.0.xv23.8+
1.5.xv13.7+
1.0.xv13.6+

Next Steps