Let’s start with a simple example - creating a workflow that checks system health:
Copy
Ask AI
import asynciofrom kubiya_workflow_sdk.providers import get_providerasync def create_health_check(): # Initialize AI provider adk = get_provider("adk") # Describe what you want in plain English task = """ Create a workflow that: 1. Checks disk space usage 2. Checks memory usage 3. Checks CPU load 4. Sends a Slack alert if any metric is above 80% """ # Generate the workflow result = await adk.compose( task=task, mode="plan", # Just generate, don't execute stream=False ) # Print the generated workflow workflow = result["workflow"] print(f"Generated workflow: {workflow['name']}") print(f"Steps: {len(workflow['steps'])}") # Show the workflow structure for step in workflow["steps"]: print(f" - {step['name']}: {step.get('description', '')}")# Run itasyncio.run(create_health_check())
Expected Output:
Copy
Ask AI
Generated workflow: system-health-checkSteps: 5 - check_disk_space: Check disk space usage - check_memory: Check memory usage - check_cpu: Check CPU load - evaluate_metrics: Analyze metrics and determine alerts - send_alert: Send Slack notification if thresholds exceeded
async def incident_response(): adk = get_provider("adk") task = """ Create an incident response workflow that: 1. Triggered by PagerDuty alert 2. Gather diagnostics: - Application logs (last 30 minutes) - Database query performance - API response times - Resource utilization 3. Attempt auto-remediation: - If high memory: restart service - If high connections: increase pool size - If disk full: clean old logs 4. If not resolved: - Create Jira ticket with diagnostics - Page on-call engineer - Start incident channel in Slack 5. After resolution: - Generate incident report - Update runbook if needed """ # Execute immediately when needed async for event in adk.compose( task=task, mode="act", parameters={ "alert_id": "PD-12345", "service": "api-gateway" }, stream=True ): handle_event(event)# Can be triggered by your monitoring system
async def error_handling_demo(): adk = get_provider("adk") # Even with errors, AI will try to help try: result = await adk.compose( task="Deploy to runner that doesn't exist", context={ "preferred_runner": "non-existent-runner" }, mode="plan" ) # AI will use alternative runner print(f"AI selected runner: {result['workflow']['runner']}") except Exception as e: print(f"Error: {e}")asyncio.run(error_handling_demo())
The more detailed your description, the better the result:
Copy
Ask AI
# Goodtask = """Create a workflow to:1. Check if Redis cache has > 80% memory usage2. If yes, identify keys by pattern 'session:*' older than 7 days3. Delete old keys in batches of 10004. Log cleanup summary to CloudWatch"""# Too vaguetask = "Clean Redis"
Always use plan mode first:
Copy
Ask AI
# Generate and reviewplan = await adk.compose(task=task, mode="plan")review_workflow(plan)# Then execute if satisfiedawait adk.compose(task=task, mode="act")
The AI-powered approach dramatically accelerates workflow development while maintaining all the power and flexibility of the Kubiya platform. Start describing your automation needs in plain English and let AI handle the implementation details!