Introduction
Core Concepts
MCP Integration
Full-Stack Agents
Streaming UI
Tutorials
MCP Integration
MCP Examples
Real-world examples of Kubiya MCP integration
MCP Examples
Real-world examples showing how to use Kubiya’s MCP server with different AI frameworks and use cases.
Basic Examples
Hello World with Direct Tools
Copy
Ask AI
import asyncio
from mcp_use import MCPClient
from mcp_use.adapters.langchain_adapter import LangChainAdapter
async def hello_world():
# Connect to Kubiya
client = MCPClient.from_dict({
"mcpServers": {
"kubiya": {
"command": "python3",
"args": ["-m", "kubiya_workflow_sdk.mcp.server"]
}
}
})
# Get tools
adapter = LangChainAdapter()
tools = await adapter.create_tools(client)
# Create a simple workflow
define_tool = next(t for t in tools if t.name == "define_workflow")
result = await define_tool.ainvoke({
"name": "hello-world",
"code": '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("hello-world")
wf.description("Simple greeting workflow")
wf.step("greet", "echo 'Hello, World!'")
wf.step("time", "date")
'''
})
print(f"Created: {result}")
asyncio.run(hello_world())
List and Validate Workflows
Copy
Ask AI
async def manage_workflows():
# ... client setup ...
# List all workflows
list_tool = next(t for t in tools if t.name == "list_workflows")
workflows = await list_tool.ainvoke({})
print(f"Found {len(workflows['workflows'])} workflows")
# Validate a new workflow
validate_tool = next(t for t in tools if t.name == "validate_workflow")
validation = await validate_tool.ainvoke({
"code": '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("test-validation")
wf.step("test", "echo 'Valid!'")
'''
})
if validation['valid']:
print("✅ Workflow is valid!")
else:
print(f"❌ Errors: {validation['errors']}")
AI Integration Examples
Working with Runners
Copy
Ask AI
from mcp_use import MCPAgent, MCPClient
from langchain_openai import ChatOpenAI
async def runner_example():
# Setup client
client = MCPClient.from_dict({
"mcpServers": {
"kubiya": {
"command": "python3",
"args": ["-m", "kubiya_workflow_sdk.mcp.server"],
"env": {
"KUBIYA_API_KEY": os.getenv("KUBIYA_API_KEY")
}
}
}
})
# Create agent
llm = ChatOpenAI(model="gpt-4")
agent = MCPAgent(llm=llm, client=client)
# List available runners
result = await agent.run("List all available Kubiya runners")
print("Available runners:", result)
# Execute workflow on specific runner
result = await agent.run("""
Execute the 'data-processing' workflow with these parameters:
- input_bucket: raw-data
- output_bucket: processed-data
- runner: gpu-runner-1
""")
print("Execution started on GPU runner!")
asyncio.run(runner_example())
OpenAI GPT-4 Integration
Copy
Ask AI
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
import os
async def openai_example():
# Setup MCP client
client = MCPClient.from_dict({
"mcpServers": {
"kubiya": {
"command": "python3",
"args": ["-m", "kubiya_workflow_sdk.mcp.server"],
"env": {
"KUBIYA_API_KEY": os.getenv("KUBIYA_API_KEY")
}
}
}
})
# Create AI agent
llm = ChatOpenAI(
model="gpt-4",
api_key=os.getenv("OPENAI_API_KEY")
)
agent = MCPAgent(llm=llm, client=client)
# Example 1: Create a deployment workflow
result = await agent.run(
"Create a workflow that deploys a Node.js application to Kubernetes, "
"including health checks and rollback on failure"
)
print("Deployment workflow created!")
# Example 2: Data processing pipeline
result = await agent.run(
"Create a workflow that downloads CSV files from S3, "
"processes them with pandas, and uploads results back to S3"
)
print("Data pipeline created!")
asyncio.run(openai_example())
Claude 3 Integration
Copy
Ask AI
from langchain_anthropic import ChatAnthropic
from mcp_use import MCPAgent, MCPClient
async def claude_example():
# Setup
client = MCPClient.from_dict({
"mcpServers": {
"kubiya": {
"command": "python3",
"args": ["-m", "kubiya_workflow_sdk.mcp.server"]
}
}
})
# Use Claude
llm = ChatAnthropic(
model="claude-3-opus-20240229",
api_key=os.getenv("ANTHROPIC_API_KEY")
)
agent = MCPAgent(llm=llm, client=client)
# Complex workflow generation
result = await agent.run("""
Create a comprehensive CI/CD workflow that:
1. Runs tests in parallel for Python, JavaScript, and Go
2. Builds Docker images for each service
3. Deploys to staging environment
4. Runs integration tests
5. Promotes to production with manual approval
6. Sends Slack notifications at each stage
""")
print("CI/CD pipeline created!")
asyncio.run(claude_example())
Real-World Use Cases
1. Database Backup and Restore
Copy
Ask AI
backup_code = '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("database-backup-restore")
wf.description("Automated database backup with restore capability")
# Parameters
wf.params(
action="backup", # backup or restore
db_host="localhost",
db_name="myapp",
db_user="postgres",
s3_bucket="db-backups",
backup_file="" # For restore
)
# Backup branch
wf.step(
"create-backup",
"""
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
pg_dump -h {{db_host}} -U {{db_user}} {{db_name}} > backup_${TIMESTAMP}.sql
echo "backup_${TIMESTAMP}.sql" > backup_file.txt
""",
condition="{{action}} == 'backup'"
)
wf.step(
"compress-backup",
"gzip backup_*.sql",
condition="{{action}} == 'backup'"
)
wf.step(
"upload-backup",
"""
BACKUP_FILE=$(cat backup_file.txt).gz
aws s3 cp ${BACKUP_FILE} s3://{{s3_bucket}}/{{db_name}}/
echo "Uploaded: ${BACKUP_FILE}"
""",
condition="{{action}} == 'backup'"
)
# Restore branch
wf.step(
"download-backup",
"aws s3 cp s3://{{s3_bucket}}/{{db_name}}/{{backup_file}} .",
condition="{{action}} == 'restore'"
)
wf.step(
"decompress-backup",
"gunzip {{backup_file}}",
condition="{{action}} == 'restore'"
)
wf.step(
"restore-database",
"""
BACKUP_SQL=${{{backup_file%.gz}}}
psql -h {{db_host}} -U {{db_user}} {{db_name}} < ${BACKUP_SQL}
""",
condition="{{action}} == 'restore'"
)
# Cleanup
wf.step("cleanup", "rm -f backup_*.sql backup_*.sql.gz backup_file.txt")
'''
# Create and execute
await define_tool.ainvoke({"name": "database-backup-restore", "code": backup_code})
# Execute backup
await execute_tool.ainvoke({
"workflow_name": "database-backup-restore",
"params": {
"action": "backup",
"db_name": "production",
"s3_bucket": "my-backups"
}
})
2. Multi-Cloud Deployment
Copy
Ask AI
multicloud_code = '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("multi-cloud-deploy")
wf.description("Deploy application across AWS, GCP, and Azure")
wf.params(
app_name="myapp",
version="latest",
clouds=["aws", "gcp", "azure"], # Which clouds to deploy to
environment="staging"
)
# AWS Deployment
wf.step(
"deploy-aws",
"""
# Update ECS service
aws ecs update-service \
--cluster {{environment}}-cluster \
--service {{app_name}} \
--force-new-deployment
# Wait for deployment
aws ecs wait services-stable \
--cluster {{environment}}-cluster \
--services {{app_name}}
""",
condition="'aws' in {{clouds}}"
)
# GCP Deployment
wf.step(
"deploy-gcp",
"""
# Deploy to Cloud Run
gcloud run deploy {{app_name}} \
--image gcr.io/project/{{app_name}}:{{version}} \
--region us-central1 \
--platform managed
""",
condition="'gcp' in {{clouds}}"
)
# Azure Deployment
wf.step(
"deploy-azure",
"""
# Deploy to Container Instances
az container create \
--resource-group {{environment}}-rg \
--name {{app_name}} \
--image myregistry.azurecr.io/{{app_name}}:{{version}} \
--dns-name-label {{app_name}}-{{environment}}
""",
condition="'azure' in {{clouds}}"
)
# Verify all deployments
wf.parallel_steps(
"verify-deployments",
items={{clouds}},
command="""
case {{item}} in
aws)
curl -f https://{{app_name}}-{{environment}}.aws.example.com/health
;;
gcp)
curl -f https://{{app_name}}-{{environment}}.gcp.example.com/health
;;
azure)
curl -f https://{{app_name}}-{{environment}}.azure.example.com/health
;;
esac
"""
)
'''
3. Data Pipeline with ML Model Training
Copy
Ask AI
ml_pipeline_code = '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("ml-training-pipeline")
wf.description("End-to-end ML model training pipeline")
wf.params(
dataset="sales_data",
model_type="random_forest",
s3_data_path="s3://ml-data/raw/",
s3_model_path="s3://ml-models/"
)
# Data preparation
wf.step(
"download-data",
"""
aws s3 sync {{s3_data_path}}{{dataset}}/ /tmp/data/
echo "Downloaded $(ls /tmp/data | wc -l) files"
"""
)
wf.step(
"preprocess-data",
"""
python -c "
import pandas as pd
import os
# Load all CSV files
dfs = []
for file in os.listdir('/tmp/data'):
if file.endswith('.csv'):
dfs.append(pd.read_csv(f'/tmp/data/{file}'))
# Combine and clean
df = pd.concat(dfs, ignore_index=True)
df = df.dropna()
df = df.drop_duplicates()
# Save processed data
df.to_csv('/tmp/processed_data.csv', index=False)
print(f'Processed {len(df)} records')
"
"""
)
# Model training
wf.step(
"train-model",
"""
python -c "
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import joblib
# Load data
df = pd.read_csv('/tmp/processed_data.csv')
# Prepare features and target
X = df.drop('target', axis=1)
y = df['target']
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Train model
model = RandomForestRegressor(n_estimators=100)
model.fit(X_train, y_train)
# Evaluate
predictions = model.predict(X_test)
mse = mean_squared_error(y_test, predictions)
print(f'Model MSE: {mse}')
# Save model
joblib.dump(model, '/tmp/model.pkl')
"
"""
)
# Model deployment
wf.step(
"package-model",
"""
# Create model package
mkdir -p /tmp/model_package
cp /tmp/model.pkl /tmp/model_package/
cp /tmp/processed_data.csv /tmp/model_package/sample_data.csv
# Create metadata
echo '{
"model_type": "{{model_type}}",
"dataset": "{{dataset}}",
"timestamp": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
}' > /tmp/model_package/metadata.json
# Package
cd /tmp && tar -czf model_package.tar.gz model_package/
"""
)
wf.step(
"deploy-model",
"""
# Upload to S3
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
aws s3 cp /tmp/model_package.tar.gz \
{{s3_model_path}}{{dataset}}/{{model_type}}_${TIMESTAMP}.tar.gz
# Update latest pointer
echo "{{s3_model_path}}{{dataset}}/{{model_type}}_${TIMESTAMP}.tar.gz" | \
aws s3 cp - {{s3_model_path}}{{dataset}}/latest.txt
"""
)
# Cleanup
wf.step("cleanup", "rm -rf /tmp/data /tmp/*.csv /tmp/*.pkl /tmp/model_package*")
'''
4. Security Scanning Pipeline
Copy
Ask AI
security_scan_code = '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("security-scan-pipeline")
wf.description("Comprehensive security scanning for applications")
wf.params(
repo_url="https://github.com/myorg/myapp",
branch="main",
scan_types=["sast", "dependency", "container", "secrets"]
)
# Clone repository
wf.step(
"clone-repo",
"""
git clone {{repo_url}} /tmp/repo
cd /tmp/repo && git checkout {{branch}}
"""
)
# Static Application Security Testing (SAST)
wf.step(
"sast-scan",
"""
cd /tmp/repo
# Run Semgrep
semgrep --config=auto --json -o /tmp/sast_results.json .
# Check for critical issues
CRITICAL=$(jq '.results | map(select(.severity == "ERROR")) | length' /tmp/sast_results.json)
echo "Found $CRITICAL critical issues"
""",
condition="'sast' in {{scan_types}}"
)
# Dependency scanning
wf.step(
"dependency-scan",
"""
cd /tmp/repo
# Run OWASP Dependency Check
dependency-check --project {{repo_url}} --scan . \
--format JSON --out /tmp/dependency_results.json
# Run Snyk
snyk test --json > /tmp/snyk_results.json || true
""",
condition="'dependency' in {{scan_types}}"
)
# Container scanning
wf.step(
"container-scan",
"""
cd /tmp/repo
if [ -f Dockerfile ]; then
# Build image
docker build -t security-scan:latest .
# Run Trivy
trivy image --format json \
-o /tmp/container_results.json \
security-scan:latest
fi
""",
condition="'container' in {{scan_types}}"
)
# Secret scanning
wf.step(
"secret-scan",
"""
cd /tmp/repo
# Run GitLeaks
gitleaks detect --source . \
--report-format json \
--report-path /tmp/secrets_results.json
""",
condition="'secrets' in {{scan_types}}"
)
# Generate report
wf.step(
"generate-report",
"""
python -c "
import json
import os
from datetime import datetime
report = {
'scan_date': datetime.now().isoformat(),
'repository': '{{repo_url}}',
'branch': '{{branch}}',
'results': {}
}
# Collect all results
for scan_type in {{scan_types}}:
result_file = f'/tmp/{scan_type}_results.json'
if os.path.exists(result_file):
with open(result_file) as f:
report['results'][scan_type] = json.load(f)
# Save report
with open('/tmp/security_report.json', 'w') as f:
json.dump(report, f, indent=2)
print('Security scan complete!')
"
"""
)
# Upload results
wf.step(
"upload-results",
"""
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
aws s3 cp /tmp/security_report.json \
s3://security-reports/{{repo_url}}/scan_${TIMESTAMP}.json
"""
)
'''
Integration Patterns
Workflow Chaining
Copy
Ask AI
async def chain_workflows():
# Create a build workflow
await define_tool.ainvoke({
"name": "build-app",
"code": '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("build-app")
wf.params(app="myapp", version="latest")
wf.step("build", "docker build -t {{app}}:{{version}} .")
wf.step("push", "docker push {{app}}:{{version}}")
'''
})
# Create a deploy workflow that depends on build
await define_tool.ainvoke({
"name": "deploy-app",
"code": '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("deploy-app")
wf.params(app="myapp", version="latest", env="staging")
# First run the build workflow
wf.step("build", "kubiya run build-app --app={{app}} --version={{version}}")
# Then deploy
wf.step("deploy", "kubectl set image deployment/{{app}} {{app}}={{app}}:{{version}}")
'''
})
Error Handling and Retries
Copy
Ask AI
error_handling_code = '''
from kubiya_workflow_sdk.dsl import Workflow
wf = Workflow("resilient-deployment")
wf.description("Deployment with error handling and retries")
wf.params(service="api", retries=3)
# Deploy with retries
wf.step(
"deploy",
"""
# Deployment script with built-in retry logic
RETRY_COUNT=0
MAX_RETRIES={{retries}}
while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do
echo "Deployment attempt $((RETRY_COUNT + 1))"
if kubectl rollout restart deployment/{{service}}; then
echo "Deployment successful"
break
else
echo "Deployment failed, retrying..."
RETRY_COUNT=$((RETRY_COUNT + 1))
sleep 10
fi
done
if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then
echo "Deployment failed after $MAX_RETRIES attempts"
exit 1
fi
"""
)
# Health check with timeout
wf.step(
"health-check",
"""
TIMEOUT=300 # 5 minutes
START=$(date +%s)
while true; do
if curl -f http://{{service}}/health; then
echo "Service is healthy"
break
fi
NOW=$(date +%s)
if [ $((NOW - START)) -gt $TIMEOUT ]; then
echo "Health check timeout"
exit 1
fi
sleep 5
done
"""
)
# Rollback on failure
wf.step(
"rollback",
"kubectl rollout undo deployment/{{service}}",
condition="previous_step_failed"
)
'''
Next Steps
Was this page helpful?
On this page
- MCP Examples
- Basic Examples
- Hello World with Direct Tools
- List and Validate Workflows
- AI Integration Examples
- Working with Runners
- OpenAI GPT-4 Integration
- Claude 3 Integration
- Real-World Use Cases
- 1. Database Backup and Restore
- 2. Multi-Cloud Deployment
- 3. Data Pipeline with ML Model Training
- 4. Security Scanning Pipeline
- Integration Patterns
- Workflow Chaining
- Error Handling and Retries
- Next Steps
Assistant
Responses are generated using AI and may contain mistakes.