Introduction
Core Concepts
Full Stack Agents
Frontend & UI
Platform & Tools
Framework Examples
Deployment
Tutorials
Resources
MCP Overview
Integrate Kubiya workflows with Claude Desktop, Cursor, and other MCP-compatible tools
MCP Overview
The Model Context Protocol (MCP) enables AI assistants like Claude Desktop and Cursor to interact with external tools and services. Kubiya provides MCP server integration, allowing these AI tools to discover, generate, and execute workflows directly.
What is MCP?
MCP (Model Context Protocol) is an open protocol that standardizes how AI assistants connect to external data sources and tools. It enables:
- Tool Discovery: AI agents can discover available tools and their capabilities
- Context Sharing: Share relevant context between AI and external systems
- Action Execution: AI can execute actions in connected systems
- Real-time Updates: Stream results back to the AI interface
Kubiya MCP Integration
Setting Up MCP Server
1. Create MCP Server
Create a Python file for your MCP server:
# kubiya_mcp_server.py
import os
import asyncio
from typing import Dict, List, Any
from kubiya_workflow_sdk import KubiyaClient, workflow, step
from kubiya_workflow_sdk.mcp import KubiyaWorkflowServer
# Initialize Kubiya client
client = KubiyaClient(
api_key=os.getenv("KUBIYA_API_KEY"),
org_name=os.getenv("KUBIYA_ORG", "default")
)
# Create MCP server
server = KubiyaWorkflowServer(
name="kubiya-workflows",
description="Execute and manage Kubiya workflows",
client=client
)
# Add custom tools
@server.tool
async def list_workflows():
"""List all available workflows in Kubiya"""
workflows = await client.list_workflows_async()
return [
{
"name": w.name,
"description": w.description,
"version": w.version,
"params": w.parameters
}
for w in workflows
]
@server.tool
async def execute_workflow(
workflow_name: str,
params: Dict[str, Any] = None,
stream: bool = True
):
"""Execute a Kubiya workflow by name"""
# Find workflow
workflows = await client.list_workflows_async()
workflow = next((w for w in workflows if w.name == workflow_name), None)
if not workflow:
return {"error": f"Workflow '{workflow_name}' not found"}
# Execute with streaming
if stream:
events = []
async for event in client.execute_workflow_events_async(
workflow=workflow,
params=params or {}
):
events.append({
"type": event.type,
"data": event.data,
"timestamp": event.timestamp
})
return {"execution_id": events[0].get("execution_id"), "events": events}
else:
result = await client.execute_workflow_async(
workflow=workflow,
params=params or {}
)
return {
"execution_id": result.execution_id,
"status": result.status,
"outputs": result.outputs
}
@server.tool
async def create_workflow(
name: str,
description: str,
steps: List[Dict[str, Any]]
):
"""Create a new workflow programmatically"""
# Build workflow using DSL
wf = workflow(name).description(description)
for step_def in steps:
s = step(step_def["name"])
# Configure step based on type
if step_def.get("type") == "shell":
s.shell(step_def["command"])
elif step_def.get("type") == "docker":
s.docker(
image=step_def["image"],
command=step_def.get("command")
)
elif step_def.get("type") == "inline_agent":
s.inline_agent(
message=step_def["message"],
agent_name=step_def["agent_name"],
ai_instructions=step_def["ai_instructions"],
runners=step_def.get("runners", ["auto"])
)
# Add dependencies
if step_def.get("depends"):
s.depends(step_def["depends"])
wf.add_step(s)
# Create in Kubiya
result = await client.create_workflow_async(wf)
return {
"workflow_id": result.id,
"name": result.name,
"status": "created"
}
@server.tool
async def get_execution_status(execution_id: str):
"""Get the status of a workflow execution"""
status = await client.get_execution_status_async(execution_id)
return {
"execution_id": execution_id,
"status": status.state,
"started_at": status.started_at,
"completed_at": status.completed_at,
"outputs": status.outputs,
"errors": status.errors
}
# Run the server
if __name__ == "__main__":
server.run()
2. Configure Claude Desktop
Add the MCP server to Claude Desktop’s configuration:
// ~/Library/Application Support/Claude/claude_desktop_config.json
{
"mcpServers": {
"kubiya": {
"command": "python",
"args": ["/path/to/kubiya_mcp_server.py"],
"env": {
"KUBIYA_API_KEY": "your-api-key",
"KUBIYA_ORG": "your-org"
}
}
}
}
3. Configure Cursor
For Cursor IDE, add to settings:
// .cursor/mcp_config.json
{
"servers": [
{
"name": "kubiya",
"command": "python",
"args": ["kubiya_mcp_server.py"],
"env": {
"KUBIYA_API_KEY": "${env:KUBIYA_API_KEY}"
}
}
]
}
Usage Examples
In Claude Desktop
Once configured, you can ask Claude to:
"List all available Kubiya workflows"
"Execute the 'deploy-application' workflow with version 1.2.3"
"Create a workflow that backs up the database and uploads to S3"
"Check the status of execution abc123"
Claude will use the MCP tools to interact with Kubiya directly.
Advanced MCP Tools
Create specialized tools for your use cases:
@server.tool
async def deploy_to_kubernetes(
app_name: str,
image: str,
namespace: str = "default",
replicas: int = 3
):
"""Deploy an application to Kubernetes via Kubiya workflow"""
# Create deployment workflow
wf = workflow(f"deploy-{app_name}")
# Update deployment
wf.add_step(
step("update-deployment")
.docker(
image="bitnami/kubectl:latest",
command=f"""
kubectl set image deployment/{app_name} \
{app_name}={image} \
-n {namespace}
"""
)
)
# Scale deployment
wf.add_step(
step("scale-deployment")
.docker(
image="bitnami/kubectl:latest",
command=f"kubectl scale deployment/{app_name} --replicas={replicas} -n {namespace}"
)
.depends("update-deployment")
)
# Wait for rollout
wf.add_step(
step("wait-rollout")
.docker(
image="bitnami/kubectl:latest",
command=f"kubectl rollout status deployment/{app_name} -n {namespace}"
)
.depends("scale-deployment")
.retry(limit=5, interval_sec=30)
)
# Execute workflow
result = await client.execute_workflow_async(wf)
return {
"status": "deployed",
"app": app_name,
"image": image,
"execution_id": result.execution_id
}
@server.tool
async def run_database_migration(
database_url: str,
migration_dir: str,
rollback: bool = False
):
"""Run database migrations through Kubiya"""
wf = workflow("database-migration")
# Create migration step with database service
migration_step = (
step("run-migration")
.tool_def(
name="migrator",
type="docker",
image="migrate/migrate:latest",
content=f"""#!/bin/sh
migrate -path {migration_dir} -database {database_url} {"down" if rollback else "up"}
""",
args=[],
with_services=[{
"name": "postgres",
"image": "postgres:15",
"env": {
"POSTGRES_PASSWORD": "temp"
}
}]
)
)
wf.add_step(migration_step)
# Execute
result = await client.execute_workflow_async(wf)
return {
"status": "completed",
"direction": "rollback" if rollback else "forward",
"execution_id": result.execution_id
}
Resource Management
MCP servers can manage Kubiya resources:
@server.resource("workflows")
async def list_workflow_resources():
"""Provide workflows as MCP resources"""
workflows = await client.list_workflows_async()
return [
{
"uri": f"kubiya://workflows/{w.name}",
"name": w.name,
"mimeType": "application/x-kubiya-workflow",
"description": w.description
}
for w in workflows
]
@server.resource("executions")
async def list_execution_resources():
"""Provide recent executions as resources"""
executions = await client.list_executions_async(limit=50)
return [
{
"uri": f"kubiya://executions/{e.id}",
"name": f"{e.workflow_name} - {e.started_at}",
"mimeType": "application/x-kubiya-execution",
"metadata": {
"status": e.status,
"duration": e.duration_seconds
}
}
for e in executions
]
Prompts and Context
Provide contextual prompts for AI assistants:
@server.prompt("debug-workflow")
async def debug_workflow_prompt(execution_id: str):
"""Generate a debugging prompt for failed workflows"""
# Get execution details
execution = await client.get_execution_async(execution_id)
logs = await client.get_execution_logs_async(execution_id)
return f"""
Debug the following Kubiya workflow execution:
Workflow: {execution.workflow_name}
Status: {execution.status}
Error: {execution.error_message}
Recent logs:
{logs[-100:]} # Last 100 log lines
Analyze the failure and suggest fixes.
"""
@server.prompt("optimize-workflow")
async def optimize_workflow_prompt(workflow_name: str):
"""Generate optimization suggestions for workflows"""
# Get workflow definition and recent executions
workflow = await client.get_workflow_async(workflow_name)
executions = await client.list_executions_async(
workflow_name=workflow_name,
limit=10
)
avg_duration = sum(e.duration_seconds for e in executions) / len(executions)
return f"""
Analyze this Kubiya workflow for optimization opportunities:
Workflow: {workflow_name}
Average Duration: {avg_duration}s
Steps: {len(workflow.steps)}
Definition:
{workflow.to_yaml()}
Suggest optimizations for:
1. Performance (parallel execution, caching)
2. Reliability (retry policies, error handling)
3. Resource usage (container sizes, concurrency)
"""
Security Considerations
Authentication
MCP servers should authenticate with Kubiya:
# Use environment variables
KUBIYA_API_KEY = os.getenv("KUBIYA_API_KEY")
if not KUBIYA_API_KEY:
raise ValueError("KUBIYA_API_KEY environment variable required")
# Or use configuration file
config_path = os.path.expanduser("~/.kubiya/config.json")
if os.path.exists(config_path):
with open(config_path) as f:
config = json.load(f)
api_key = config.get("api_key")
Authorization
Implement authorization checks:
@server.tool
async def execute_production_workflow(
workflow_name: str,
require_approval: bool = True
):
"""Execute production workflows with safety checks"""
# Check if workflow is production
if "production" in workflow_name.lower():
if require_approval:
# Could integrate with approval systems
return {
"error": "Production workflow requires approval",
"workflow": workflow_name,
"action_required": "approval"
}
# Proceed with execution
return await execute_workflow(workflow_name)
Audit Logging
Log all MCP operations:
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("kubiya-mcp")
@server.middleware
async def audit_log(tool_name: str, args: dict, result: Any):
"""Log all tool invocations"""
logger.info(f"MCP Tool: {tool_name}, Args: {args}, User: {os.getenv('USER')}")
return result
Best Practices
1. Tool Design
- Keep tools focused and single-purpose
- Provide clear descriptions and parameter docs
- Return structured data for AI consumption
2. Error Handling
@server.tool
async def safe_execution(workflow_name: str):
"""Execute workflow with comprehensive error handling"""
try:
result = await execute_workflow(workflow_name)
return {"success": True, "result": result}
except WorkflowNotFoundError:
return {"success": False, "error": "Workflow not found"}
except ExecutionError as e:
return {"success": False, "error": str(e), "details": e.details}
except Exception as e:
logger.error(f"Unexpected error: {e}")
return {"success": False, "error": "Internal error occurred"}
3. Performance
- Cache frequently accessed data
- Use async operations throughout
- Implement pagination for large results
4. Versioning
@server.metadata
async def server_info():
"""Provide server version and capabilities"""
return {
"version": "1.0.0",
"kubiya_sdk_version": kubiya_workflow_sdk.__version__,
"capabilities": [
"workflow_execution",
"workflow_creation",
"status_monitoring",
"resource_management"
]
}
Next Steps
Was this page helpful?
- MCP Overview
- What is MCP?
- Kubiya MCP Integration
- Setting Up MCP Server
- 1. Create MCP Server
- 2. Configure Claude Desktop
- 3. Configure Cursor
- Usage Examples
- In Claude Desktop
- Advanced MCP Tools
- Resource Management
- Prompts and Context
- Security Considerations
- Authentication
- Authorization
- Audit Logging
- Best Practices
- 1. Tool Design
- 2. Error Handling
- 3. Performance
- 4. Versioning
- Next Steps