MCP Examples

Real-world examples showing how to use Kubiya’s MCP server with different AI frameworks and use cases.

Basic Examples

Hello World with Direct Tools

import asyncio
from mcp_use import MCPClient
from mcp_use.adapters.langchain_adapter import LangChainAdapter

async def hello_world():
    # Connect to Kubiya
    client = MCPClient.from_dict({
        "mcpServers": {
            "kubiya": {
                "command": "python3",
                "args": ["-m", "kubiya_workflow_sdk.mcp.server"]
            }
        }
    })
    
    # Get tools
    adapter = LangChainAdapter()
    tools = await adapter.create_tools(client)
    
    # Create a simple workflow
    define_tool = next(t for t in tools if t.name == "define_workflow")
    
    result = await define_tool.ainvoke({
        "name": "hello-world",
        "code": '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("hello-world")
wf.description("Simple greeting workflow")
wf.step("greet", "echo 'Hello, World!'")
wf.step("time", "date")
'''
    })
    
    print(f"Created: {result}")

asyncio.run(hello_world())

List and Validate Workflows

async def manage_workflows():
    # ... client setup ...
    
    # List all workflows
    list_tool = next(t for t in tools if t.name == "list_workflows")
    workflows = await list_tool.ainvoke({})
    print(f"Found {len(workflows['workflows'])} workflows")
    
    # Validate a new workflow
    validate_tool = next(t for t in tools if t.name == "validate_workflow")
    validation = await validate_tool.ainvoke({
        "code": '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("test-validation")
wf.step("test", "echo 'Valid!'")
'''
    })
    
    if validation['valid']:
        print("✅ Workflow is valid!")
    else:
        print(f"❌ Errors: {validation['errors']}")

AI Integration Examples

Working with Runners

from mcp_use import MCPAgent, MCPClient
from langchain_openai import ChatOpenAI

async def runner_example():
    # Setup client
    client = MCPClient.from_dict({
        "mcpServers": {
            "kubiya": {
                "command": "python3",
                "args": ["-m", "kubiya_workflow_sdk.mcp.server"],
                "env": {
                    "KUBIYA_API_KEY": os.getenv("KUBIYA_API_KEY")
                }
            }
        }
    })
    
    # Create agent
    llm = ChatOpenAI(model="gpt-4")
    agent = MCPAgent(llm=llm, client=client)
    
    # List available runners
    result = await agent.run("List all available Kubiya runners")
    print("Available runners:", result)
    
    # Execute workflow on specific runner
    result = await agent.run("""
        Execute the 'data-processing' workflow with these parameters:
        - input_bucket: raw-data
        - output_bucket: processed-data
        - runner: gpu-runner-1
    """)
    print("Execution started on GPU runner!")

asyncio.run(runner_example())

OpenAI GPT-4 Integration

from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
import os

async def openai_example():
    # Setup MCP client
    client = MCPClient.from_dict({
        "mcpServers": {
            "kubiya": {
                "command": "python3",
                "args": ["-m", "kubiya_workflow_sdk.mcp.server"],
                "env": {
                    "KUBIYA_API_KEY": os.getenv("KUBIYA_API_KEY")
                }
            }
        }
    })
    
    # Create AI agent
    llm = ChatOpenAI(
        model="gpt-4",
        api_key=os.getenv("OPENAI_API_KEY")
    )
    agent = MCPAgent(llm=llm, client=client)
    
    # Example 1: Create a deployment workflow
    result = await agent.run(
        "Create a workflow that deploys a Node.js application to Kubernetes, "
        "including health checks and rollback on failure"
    )
    print("Deployment workflow created!")
    
    # Example 2: Data processing pipeline
    result = await agent.run(
        "Create a workflow that downloads CSV files from S3, "
        "processes them with pandas, and uploads results back to S3"
    )
    print("Data pipeline created!")

asyncio.run(openai_example())

Claude 3 Integration

from langchain_anthropic import ChatAnthropic
from mcp_use import MCPAgent, MCPClient

async def claude_example():
    # Setup
    client = MCPClient.from_dict({
        "mcpServers": {
            "kubiya": {
                "command": "python3",
                "args": ["-m", "kubiya_workflow_sdk.mcp.server"]
            }
        }
    })
    
    # Use Claude
    llm = ChatAnthropic(
        model="claude-3-opus-20240229",
        api_key=os.getenv("ANTHROPIC_API_KEY")
    )
    agent = MCPAgent(llm=llm, client=client)
    
    # Complex workflow generation
    result = await agent.run("""
        Create a comprehensive CI/CD workflow that:
        1. Runs tests in parallel for Python, JavaScript, and Go
        2. Builds Docker images for each service
        3. Deploys to staging environment
        4. Runs integration tests
        5. Promotes to production with manual approval
        6. Sends Slack notifications at each stage
    """)
    
    print("CI/CD pipeline created!")

asyncio.run(claude_example())

Real-World Use Cases

1. Database Backup and Restore

backup_code = '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("database-backup-restore")
wf.description("Automated database backup with restore capability")

# Parameters
wf.params(
    action="backup",  # backup or restore
    db_host="localhost",
    db_name="myapp",
    db_user="postgres",
    s3_bucket="db-backups",
    backup_file=""  # For restore
)

# Backup branch
wf.step(
    "create-backup",
    """
    TIMESTAMP=$(date +%Y%m%d_%H%M%S)
    pg_dump -h {{db_host}} -U {{db_user}} {{db_name}} > backup_${TIMESTAMP}.sql
    echo "backup_${TIMESTAMP}.sql" > backup_file.txt
    """,
    condition="{{action}} == 'backup'"
)

wf.step(
    "compress-backup",
    "gzip backup_*.sql",
    condition="{{action}} == 'backup'"
)

wf.step(
    "upload-backup",
    """
    BACKUP_FILE=$(cat backup_file.txt).gz
    aws s3 cp ${BACKUP_FILE} s3://{{s3_bucket}}/{{db_name}}/
    echo "Uploaded: ${BACKUP_FILE}"
    """,
    condition="{{action}} == 'backup'"
)

# Restore branch
wf.step(
    "download-backup",
    "aws s3 cp s3://{{s3_bucket}}/{{db_name}}/{{backup_file}} .",
    condition="{{action}} == 'restore'"
)

wf.step(
    "decompress-backup",
    "gunzip {{backup_file}}",
    condition="{{action}} == 'restore'"
)

wf.step(
    "restore-database",
    """
    BACKUP_SQL=${{{backup_file%.gz}}}
    psql -h {{db_host}} -U {{db_user}} {{db_name}} < ${BACKUP_SQL}
    """,
    condition="{{action}} == 'restore'"
)

# Cleanup
wf.step("cleanup", "rm -f backup_*.sql backup_*.sql.gz backup_file.txt")
'''

# Create and execute
await define_tool.ainvoke({"name": "database-backup-restore", "code": backup_code})

# Execute backup
await execute_tool.ainvoke({
    "workflow_name": "database-backup-restore",
    "params": {
        "action": "backup",
        "db_name": "production",
        "s3_bucket": "my-backups"
    }
})

2. Multi-Cloud Deployment

multicloud_code = '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("multi-cloud-deploy")
wf.description("Deploy application across AWS, GCP, and Azure")

wf.params(
    app_name="myapp",
    version="latest",
    clouds=["aws", "gcp", "azure"],  # Which clouds to deploy to
    environment="staging"
)

# AWS Deployment
wf.step(
    "deploy-aws",
    """
    # Update ECS service
    aws ecs update-service \
        --cluster {{environment}}-cluster \
        --service {{app_name}} \
        --force-new-deployment
    
    # Wait for deployment
    aws ecs wait services-stable \
        --cluster {{environment}}-cluster \
        --services {{app_name}}
    """,
    condition="'aws' in {{clouds}}"
)

# GCP Deployment
wf.step(
    "deploy-gcp",
    """
    # Deploy to Cloud Run
    gcloud run deploy {{app_name}} \
        --image gcr.io/project/{{app_name}}:{{version}} \
        --region us-central1 \
        --platform managed
    """,
    condition="'gcp' in {{clouds}}"
)

# Azure Deployment
wf.step(
    "deploy-azure",
    """
    # Deploy to Container Instances
    az container create \
        --resource-group {{environment}}-rg \
        --name {{app_name}} \
        --image myregistry.azurecr.io/{{app_name}}:{{version}} \
        --dns-name-label {{app_name}}-{{environment}}
    """,
    condition="'azure' in {{clouds}}"
)

# Verify all deployments
wf.parallel_steps(
    "verify-deployments",
    items={{clouds}},
    command="""
    case {{item}} in
        aws)
            curl -f https://{{app_name}}-{{environment}}.aws.example.com/health
            ;;
        gcp)
            curl -f https://{{app_name}}-{{environment}}.gcp.example.com/health
            ;;
        azure)
            curl -f https://{{app_name}}-{{environment}}.azure.example.com/health
            ;;
    esac
    """
)
'''

3. Data Pipeline with ML Model Training

ml_pipeline_code = '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("ml-training-pipeline")
wf.description("End-to-end ML model training pipeline")

wf.params(
    dataset="sales_data",
    model_type="random_forest",
    s3_data_path="s3://ml-data/raw/",
    s3_model_path="s3://ml-models/"
)

# Data preparation
wf.step(
    "download-data",
    """
    aws s3 sync {{s3_data_path}}{{dataset}}/ /tmp/data/
    echo "Downloaded $(ls /tmp/data | wc -l) files"
    """
)

wf.step(
    "preprocess-data",
    """
    python -c "
import pandas as pd
import os

# Load all CSV files
dfs = []
for file in os.listdir('/tmp/data'):
    if file.endswith('.csv'):
        dfs.append(pd.read_csv(f'/tmp/data/{file}'))

# Combine and clean
df = pd.concat(dfs, ignore_index=True)
df = df.dropna()
df = df.drop_duplicates()

# Save processed data
df.to_csv('/tmp/processed_data.csv', index=False)
print(f'Processed {len(df)} records')
"
    """
)

# Model training
wf.step(
    "train-model",
    """
    python -c "
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import joblib

# Load data
df = pd.read_csv('/tmp/processed_data.csv')

# Prepare features and target
X = df.drop('target', axis=1)
y = df['target']

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Train model
model = RandomForestRegressor(n_estimators=100)
model.fit(X_train, y_train)

# Evaluate
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
print(f'Model MSE: {mse}')

# Save model
joblib.dump(model, '/tmp/model.pkl')
"
    """
)

# Model deployment
wf.step(
    "package-model",
    """
    # Create model package
    mkdir -p /tmp/model_package
    cp /tmp/model.pkl /tmp/model_package/
    cp /tmp/processed_data.csv /tmp/model_package/sample_data.csv
    
    # Create metadata
    echo '{
        "model_type": "{{model_type}}",
        "dataset": "{{dataset}}",
        "timestamp": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
    }' > /tmp/model_package/metadata.json
    
    # Package
    cd /tmp && tar -czf model_package.tar.gz model_package/
    """
)

wf.step(
    "deploy-model",
    """
    # Upload to S3
    TIMESTAMP=$(date +%Y%m%d_%H%M%S)
    aws s3 cp /tmp/model_package.tar.gz \
        {{s3_model_path}}{{dataset}}/{{model_type}}_${TIMESTAMP}.tar.gz
    
    # Update latest pointer
    echo "{{s3_model_path}}{{dataset}}/{{model_type}}_${TIMESTAMP}.tar.gz" | \
        aws s3 cp - {{s3_model_path}}{{dataset}}/latest.txt
    """
)

# Cleanup
wf.step("cleanup", "rm -rf /tmp/data /tmp/*.csv /tmp/*.pkl /tmp/model_package*")
'''

4. Security Scanning Pipeline

security_scan_code = '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("security-scan-pipeline")
wf.description("Comprehensive security scanning for applications")

wf.params(
    repo_url="https://github.com/myorg/myapp",
    branch="main",
    scan_types=["sast", "dependency", "container", "secrets"]
)

# Clone repository
wf.step(
    "clone-repo",
    """
    git clone {{repo_url}} /tmp/repo
    cd /tmp/repo && git checkout {{branch}}
    """
)

# Static Application Security Testing (SAST)
wf.step(
    "sast-scan",
    """
    cd /tmp/repo
    # Run Semgrep
    semgrep --config=auto --json -o /tmp/sast_results.json .
    
    # Check for critical issues
    CRITICAL=$(jq '.results | map(select(.severity == "ERROR")) | length' /tmp/sast_results.json)
    echo "Found $CRITICAL critical issues"
    """,
    condition="'sast' in {{scan_types}}"
)

# Dependency scanning
wf.step(
    "dependency-scan",
    """
    cd /tmp/repo
    # Run OWASP Dependency Check
    dependency-check --project {{repo_url}} --scan . \
        --format JSON --out /tmp/dependency_results.json
    
    # Run Snyk
    snyk test --json > /tmp/snyk_results.json || true
    """,
    condition="'dependency' in {{scan_types}}"
)

# Container scanning
wf.step(
    "container-scan",
    """
    cd /tmp/repo
    if [ -f Dockerfile ]; then
        # Build image
        docker build -t security-scan:latest .
        
        # Run Trivy
        trivy image --format json \
            -o /tmp/container_results.json \
            security-scan:latest
    fi
    """,
    condition="'container' in {{scan_types}}"
)

# Secret scanning
wf.step(
    "secret-scan",
    """
    cd /tmp/repo
    # Run GitLeaks
    gitleaks detect --source . \
        --report-format json \
        --report-path /tmp/secrets_results.json
    """,
    condition="'secrets' in {{scan_types}}"
)

# Generate report
wf.step(
    "generate-report",
    """
    python -c "
import json
import os
from datetime import datetime

report = {
    'scan_date': datetime.now().isoformat(),
    'repository': '{{repo_url}}',
    'branch': '{{branch}}',
    'results': {}
}

# Collect all results
for scan_type in {{scan_types}}:
    result_file = f'/tmp/{scan_type}_results.json'
    if os.path.exists(result_file):
        with open(result_file) as f:
            report['results'][scan_type] = json.load(f)

# Save report
with open('/tmp/security_report.json', 'w') as f:
    json.dump(report, f, indent=2)

print('Security scan complete!')
"
    """
)

# Upload results
wf.step(
    "upload-results",
    """
    TIMESTAMP=$(date +%Y%m%d_%H%M%S)
    aws s3 cp /tmp/security_report.json \
        s3://security-reports/{{repo_url}}/scan_${TIMESTAMP}.json
    """
)
'''

Integration Patterns

Workflow Chaining

async def chain_workflows():
    # Create a build workflow
    await define_tool.ainvoke({
        "name": "build-app",
        "code": '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("build-app")
wf.params(app="myapp", version="latest")
wf.step("build", "docker build -t {{app}}:{{version}} .")
wf.step("push", "docker push {{app}}:{{version}}")
'''
    })
    
    # Create a deploy workflow that depends on build
    await define_tool.ainvoke({
        "name": "deploy-app",
        "code": '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("deploy-app")
wf.params(app="myapp", version="latest", env="staging")

# First run the build workflow
wf.step("build", "kubiya run build-app --app={{app}} --version={{version}}")

# Then deploy
wf.step("deploy", "kubectl set image deployment/{{app}} {{app}}={{app}}:{{version}}")
'''
    })

Error Handling and Retries

error_handling_code = '''
from kubiya_workflow_sdk.dsl import Workflow

wf = Workflow("resilient-deployment")
wf.description("Deployment with error handling and retries")

wf.params(service="api", retries=3)

# Deploy with retries
wf.step(
    "deploy",
    """
    # Deployment script with built-in retry logic
    RETRY_COUNT=0
    MAX_RETRIES={{retries}}
    
    while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do
        echo "Deployment attempt $((RETRY_COUNT + 1))"
        
        if kubectl rollout restart deployment/{{service}}; then
            echo "Deployment successful"
            break
        else
            echo "Deployment failed, retrying..."
            RETRY_COUNT=$((RETRY_COUNT + 1))
            sleep 10
        fi
    done
    
    if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then
        echo "Deployment failed after $MAX_RETRIES attempts"
        exit 1
    fi
    """
)

# Health check with timeout
wf.step(
    "health-check",
    """
    TIMEOUT=300  # 5 minutes
    START=$(date +%s)
    
    while true; do
        if curl -f http://{{service}}/health; then
            echo "Service is healthy"
            break
        fi
        
        NOW=$(date +%s)
        if [ $((NOW - START)) -gt $TIMEOUT ]; then
            echo "Health check timeout"
            exit 1
        fi
        
        sleep 5
    done
    """
)

# Rollback on failure
wf.step(
    "rollback",
    "kubectl rollout undo deployment/{{service}}",
    condition="previous_step_failed"
)
'''

Next Steps