Integrations

Kubiya provides a unified integration layer that seamlessly connects your infrastructure, applications, cloud services, and AI capabilities. This creates an intelligent automation mesh that spans both application and infrastructure layers.

Integration Architecture

Integration Management API

Kubiya provides a comprehensive API for managing integrations programmatically:

Listing Available Integrations

from kubiya_workflow_sdk import Client

client = Client(api_key="your-key")

# Get all available integrations
integrations = client.get_integrations()

for integration in integrations:
    print(f"Name: {integration['name']}")
    print(f"Type: {integration['type']}")
    print(f"Auth: {integration.get('auth_type', 'none')}")
    print(f"Status: {integration.get('status', 'active')}")

Integration Types

Kubiya supports several integration categories:

# Cloud providers
aws_integration = {
    "name": "aws-prod",
    "type": "cloud",
    "provider": "aws",
    "auth_type": "iam_role",
    "config": {
        "region": "us-east-1",
        "role_arn": "arn:aws:iam::123456789:role/kubiya-automation"
    }
}

# Kubernetes clusters
k8s_integration = {
    "name": "prod-cluster",
    "type": "kubernetes",
    "auth_type": "service_account",
    "config": {
        "cluster_endpoint": "https://k8s.company.com",
        "namespace": "production"
    }
}

Authentication Scenarios

1. Multi-Account AWS IAM

For complex AWS environments with multiple accounts and varying credentials:

@workflow
def multi_account_aws_workflow():
    # Step 1: Assume role in production account
    prod_creds = step("assume-prod-role").docker(
        image="amazon/aws-cli:latest",
        command="""
        aws sts assume-role \
          --role-arn arn:aws:iam::111111111111:role/kubiya-prod \
          --role-session-name kubiya-automation \
          --output json
        """,
        env={
            "AWS_ACCESS_KEY_ID": "${secrets.aws_master_key}",
            "AWS_SECRET_ACCESS_KEY": "${secrets.aws_master_secret}"
        }
    ).output("PROD_CREDS")
    
    # Step 2: Use production credentials
    step("deploy-to-prod").docker(
        image="amazon/aws-cli:latest",
        script="""
        # Parse credentials from previous step
        export AWS_ACCESS_KEY_ID=$(echo $PROD_CREDS | jq -r '.Credentials.AccessKeyId')
        export AWS_SECRET_ACCESS_KEY=$(echo $PROD_CREDS | jq -r '.Credentials.SecretAccessKey')
        export AWS_SESSION_TOKEN=$(echo $PROD_CREDS | jq -r '.Credentials.SessionToken')
        
        # Deploy using production credentials
        aws ecs update-service --cluster prod --service app --force-new-deployment
        """,
        env={"PROD_CREDS": "${PROD_CREDS}"}
    ).depends("assume-prod-role")
    
    # Step 3: Assume role in staging account
    staging_creds = step("assume-staging-role").docker(
        image="amazon/aws-cli:latest",
        command="""
        aws sts assume-role \
          --role-arn arn:aws:iam::222222222222:role/kubiya-staging \
          --role-session-name kubiya-automation
        """,
        env={
            "AWS_ACCESS_KEY_ID": "${secrets.aws_master_key}",
            "AWS_SECRET_ACCESS_KEY": "${secrets.aws_master_secret}"
        }
    ).output("STAGING_CREDS")

2. OAuth2 User-Based Identity

For applications requiring user-specific authentication:

@workflow
def user_specific_github_workflow(user_email: str):
    # Step 1: Get user's OAuth token from secure store
    user_token = step("get-user-token").get_secret(
        secret_name=f"github_oauth_{user_email.replace('@', '_').replace('.', '_')}"
    ).output("USER_TOKEN")
    
    # Step 2: Use user-specific token for operations
    step("create-pr").docker(
        image="github/gh-cli:latest",
        script="""
        # Configure gh CLI with user token
        echo "${USER_TOKEN}" | gh auth login --with-token
        
        # Create PR as the specific user
        gh pr create \
          --title "Automated update by ${USER_EMAIL}" \
          --body "This PR was created by Kubiya on behalf of ${USER_EMAIL}" \
          --base main \
          --head feature/auto-update
        """,
        env={
            "USER_TOKEN": "${USER_TOKEN}",
            "USER_EMAIL": user_email
        }
    ).depends("get-user-token")

3. Certificate-Based Authentication

For systems requiring mTLS or certificate authentication:

@workflow
def mtls_api_workflow():
    # Step 1: Retrieve certificates from secure store
    certs = step("get-certificates").docker(
        image="kubiya/secret-manager:latest",
        command="retrieve-cert-bundle ${CERT_NAME}",
        env={"CERT_NAME": "api-client-cert"}
    ).output("CERT_BUNDLE")
    
    # Step 2: Use certificates for API calls
    step("call-secure-api").docker(
        image="curlimages/curl:latest",
        script="""
        # Extract certificates
        echo "$CERT_BUNDLE" | jq -r '.cert' > /tmp/client.crt
        echo "$CERT_BUNDLE" | jq -r '.key' > /tmp/client.key
        echo "$CERT_BUNDLE" | jq -r '.ca' > /tmp/ca.crt
        
        # Make authenticated API call
        curl --cert /tmp/client.crt \
             --key /tmp/client.key \
             --cacert /tmp/ca.crt \
             https://secure-api.company.com/v1/data
        """,
        env={"CERT_BUNDLE": "${CERT_BUNDLE}"}
    ).depends("get-certificates")

Using Integrations in Workflow Steps

Accessing Secrets

Kubiya provides secure secret management integrated with your workflow steps:

@workflow
def secure_database_workflow():
    # Method 1: Direct secret reference
    step("query-database").docker(
        image="postgres:15",
        command="psql -c 'SELECT * FROM users LIMIT 10'",
        env={
            "PGHOST": "${secrets.db_host}",
            "PGUSER": "${secrets.db_user}",
            "PGPASSWORD": "${secrets.db_password}",
            "PGDATABASE": "${secrets.db_name}"
        }
    )
    
    # Method 2: Get secret as a step
    db_config = step("get-db-config").get_secret(
        secret_name="production_database"
    ).output("DB_CONFIG")
    
    # Use the retrieved secret
    step("process-data").docker(
        image="python:3.11",
        script="""
        import json
        import psycopg2
        
        config = json.loads(os.environ['DB_CONFIG'])
        conn = psycopg2.connect(**config)
        # Process data...
        """,
        env={"DB_CONFIG": "${DB_CONFIG}"}
    ).depends("get-db-config")

Using Registered Integrations

Access pre-configured integrations in your workflows:

@workflow
def use_integrations():
    # Use AWS integration
    step("aws-operation").integration(
        name="aws-prod",
        action="ecs.update_service",
        params={
            "cluster": "production",
            "service": "api",
            "force_new_deployment": True
        }
    )
    
    # Use GitHub integration
    step("github-operation").integration(
        name="github-org",
        action="create_issue",
        params={
            "repo": "infrastructure",
            "title": "Deployment completed",
            "body": "Production deployment finished at ${TIMESTAMP}"
        }
    )

Application Layer Automation

Kubiya makes it easy to automate across your entire application stack:

1. API Integration Pattern

@workflow
def api_automation():
    # REST API automation
    response = step("call-api").http(
        url="https://api.company.com/v1/users",
        method="POST",
        headers={
            "Authorization": "Bearer ${secrets.api_token}",
            "Content-Type": "application/json"
        },
        body={"name": "New User", "role": "developer"}
    ).output("API_RESPONSE")
    
    # GraphQL automation
    step("graphql-query").http(
        url="https://api.company.com/graphql",
        method="POST",
        headers={"Authorization": "Bearer ${secrets.api_token}"},
        body={
            "query": """
            mutation CreateProject($name: String!) {
                createProject(name: $name) {
                    id
                    name
                    created_at
                }
            }
            """,
            "variables": {"name": "New Project"}
        }
    )

2. Database Operations

@workflow
def database_automation():
    # Multi-database operations
    step("sync-databases").docker(
        image="python:3.11",
        script="""
        import psycopg2
        import pymongo
        import redis
        
        # PostgreSQL operations
        pg_conn = psycopg2.connect(
            host="${secrets.pg_host}",
            user="${secrets.pg_user}",
            password="${secrets.pg_password}"
        )
        
        # MongoDB operations
        mongo_client = pymongo.MongoClient("${secrets.mongo_uri}")
        
        # Redis operations
        redis_client = redis.Redis(
            host="${secrets.redis_host}",
            password="${secrets.redis_password}"
        )
        
        # Sync data between systems
        # ... your logic here ...
        """
    )

3. SaaS Application Integration

@workflow
def saas_automation():
    # Salesforce integration
    step("update-salesforce").docker(
        image="salesforce/cli:latest",
        script="""
        # Authenticate
        sfdx auth:jwt:grant \
          --clientid ${SALESFORCE_CLIENT_ID} \
          --jwtkeyfile /tmp/server.key \
          --username ${SALESFORCE_USERNAME} \
          --instanceurl ${SALESFORCE_INSTANCE_URL}
        
        # Update records
        sfdx data:record:update \
          --sobjecttype Account \
          --where "Industry='Technology'" \
          --values "Status='Active'"
        """,
        env={
            "SALESFORCE_CLIENT_ID": "${secrets.sf_client_id}",
            "SALESFORCE_USERNAME": "${secrets.sf_username}",
            "SALESFORCE_INSTANCE_URL": "https://mycompany.my.salesforce.com"
        }
    )

Infrastructure Layer Automation

1. Multi-Cloud Infrastructure

@workflow
def multi_cloud_infrastructure():
    # AWS infrastructure
    step("provision-aws").docker(
        image="hashicorp/terraform:latest",
        script="""
        terraform init -backend-config="bucket=${TF_STATE_BUCKET}"
        terraform apply -auto-approve -var="environment=production"
        """,
        env={
            "AWS_ACCESS_KEY_ID": "${secrets.aws_key}",
            "AWS_SECRET_ACCESS_KEY": "${secrets.aws_secret}",
            "TF_STATE_BUCKET": "company-terraform-state"
        }
    )
    
    # Google Cloud infrastructure
    step("provision-gcp").docker(
        image="google/cloud-sdk:latest",
        script="""
        # Authenticate with service account
        echo '${GCP_SERVICE_ACCOUNT_KEY}' | gcloud auth activate-service-account --key-file=-
        
        # Deploy resources
        gcloud deployment-manager deployments create prod-stack \
          --config=infrastructure.yaml
        """,
        env={
            "GCP_SERVICE_ACCOUNT_KEY": "${secrets.gcp_service_account}"
        }
    )
    
    # Azure infrastructure
    step("provision-azure").docker(
        image="mcr.microsoft.com/azure-cli:latest",
        script="""
        # Login with service principal
        az login --service-principal \
          -u ${AZURE_CLIENT_ID} \
          -p ${AZURE_CLIENT_SECRET} \
          --tenant ${AZURE_TENANT_ID}
        
        # Deploy ARM template
        az deployment group create \
          --resource-group production \
          --template-file template.json
        """,
        env={
            "AZURE_CLIENT_ID": "${secrets.azure_client_id}",
            "AZURE_CLIENT_SECRET": "${secrets.azure_client_secret}",
            "AZURE_TENANT_ID": "${secrets.azure_tenant_id}"
        }
    )

2. Kubernetes Multi-Cluster Management

@workflow
def kubernetes_multi_cluster():
    clusters = ["prod-us-east", "prod-eu-west", "prod-ap-south"]
    
    for cluster in clusters:
        # Get cluster-specific credentials
        kubeconfig = step(f"get-{cluster}-config").get_secret(
            secret_name=f"kubeconfig_{cluster}"
        ).output(f"KUBECONFIG_{cluster.upper().replace('-', '_')}")
        
        # Deploy to each cluster
        step(f"deploy-{cluster}").docker(
            image="bitnami/kubectl:latest",
            script=f"""
            # Set up kubeconfig
            echo "$KUBECONFIG" | base64 -d > /tmp/kubeconfig
            export KUBECONFIG=/tmp/kubeconfig
            
            # Apply manifests
            kubectl apply -f /manifests/
            
            # Wait for rollout
            kubectl rollout status deployment/api -n production
            """,
            env={
                "KUBECONFIG": f"${{{kubeconfig}}}"
            }
        ).depends(f"get-{cluster}-config")

AI Mesh Integration

Kubiya’s AI mesh allows intelligent orchestration across all layers:

Intelligent Decision Making

@workflow
def intelligent_operations():
    # Collect metrics from multiple sources
    metrics = step("collect-metrics").docker(
        image="python:3.11",
        script="""
        import requests
        
        # Collect from Prometheus
        prom_data = requests.get("http://prometheus:9090/api/v1/query",
            params={"query": "up"}).json()
        
        # Collect from CloudWatch
        # ... CloudWatch API calls ...
        
        # Collect from Datadog
        # ... Datadog API calls ...
        
        print(json.dumps({"prometheus": prom_data, ...}))
        """
    ).output("METRICS")
    
    # AI analyzes and decides
    decision = step("ai-analysis").inline_agent(
        message="""Analyze these metrics and determine:
        1. Are there any anomalies?
        2. Should we scale up/down?
        3. Any services need attention?
        
        Metrics: ${METRICS}
        """,
        agent_name="ops-analyst",
        ai_instructions="""You are an expert SRE. Analyze metrics for:
        - Performance anomalies
        - Capacity planning
        - Service health
        Provide specific, actionable recommendations.""",
        runners=["kubiya-hosted"],
        llm_model="gpt-4o"
    ).output("AI_DECISION")
    
    # Execute AI's recommendations
    step("execute-recommendations").docker(
        image="python:3.11",
        script="""
        import json
        
        decision = json.loads(os.environ['AI_DECISION'])
        
        for action in decision.get('actions', []):
            if action['type'] == 'scale':
                # Execute scaling
                subprocess.run([
                    'kubectl', 'scale', 
                    f"deployment/{action['target']}", 
                    f"--replicas={action['replicas']}"
                ])
            elif action['type'] == 'alert':
                # Send alert
                # ... alert logic ...
        """
    ).depends(["collect-metrics", "ai-analysis"])

Integration Best Practices

1. Credential Management

Never hardcode credentials in workflows. Always use Kubiya’s secret management.

# ❌ Bad: Hardcoded credentials
step("bad").env({"PASSWORD": "actual-password"})

# ✅ Good: Secret reference
step("good").env({"PASSWORD": "${secrets.db_password}"})

# ✅ Better: Get secret step with proper error handling
secret = step("get-secret").get_secret(
    secret_name="production_db",
    on_error="fail"  # or "continue" with default
).output("DB_SECRET")

2. Integration Error Handling

@workflow
def resilient_integration():
    # Retry on integration failures
    step("api-call").http(
        url="https://flaky-api.com/endpoint",
        method="POST"
    ).retry(
        limit=3,
        interval_sec=30,
        exponential_base=2.0
    ).continue_on(
        failure=True,  # Continue workflow even if this fails
        mark_success=True  # Mark as success after retries
    )

3. Integration Testing

@workflow
def test_integrations():
    # Test each integration
    integrations = ["aws-prod", "github-org", "postgres-main"]
    
    for integration in integrations:
        step(f"test-{integration}").integration(
            name=integration,
            action="test_connection"
        ).continue_on(failure=True)
        
    # Aggregate results
    step("report-status").inline_agent(
        message="Generate integration health report",
        runners=["kubiya-hosted"]
    )

4. Cross-Layer Security

@workflow
def secure_cross_layer():
    # Application layer: Rotate API keys
    new_key = step("generate-api-key").docker(
        image="python:3.11",
        script="import secrets; print(secrets.token_urlsafe(32))"
    ).output("NEW_KEY")
    
    # Infrastructure layer: Update secret in vault
    step("update-vault").docker(
        image="vault:latest",
        command="vault kv put secret/api key=${NEW_KEY}",
        env={"VAULT_TOKEN": "${secrets.vault_token}"}
    )
    
    # Application layer: Update running services
    step("rolling-update").docker(
        image="kubectl:latest",
        command="kubectl set env deployment/api API_KEY=${NEW_KEY}"
    )

Next Steps