Introduction
Core Concepts
MCP Integration
Full-Stack Agents
Streaming UI
Tutorials
LangGraph Integration
Integrate your existing LangGraph agents with Kubiya’s execution platform
LangGraph Integration
LangGraph’s stateful multi-agent workflows integrate seamlessly with Kubiya’s execution platform. You can use pre-built Docker images, inject your code on-the-fly, or create custom agent servers - all while maintaining LangGraph’s powerful state management and human-in-the-loop capabilities.
Integration Options
Kubiya provides multiple ways to integrate LangGraph, depending on your setup and requirements:
Pre-built Images
Use existing LangGraph Docker images for immediate execution
Code Injection
Inject your LangGraph code using with_files
for dynamic execution
Agent Servers
Deploy LangGraph as a custom agent server provider
Pattern 1: Pre-built Docker Images
The simplest integration uses existing LangGraph Docker images:
from kubiya_workflow_sdk import workflow, step
@workflow
def langgraph_research_agent():
"""Research agent using pre-built LangGraph image"""
research = step("langgraph-research").docker(
image="langchain/langgraph:latest",
command="python -c \"$LANGGRAPH_CODE\"",
env={
"LANGGRAPH_CODE": """
import os
from langchain.schema import SystemMessage
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from tavily import TavilyClient
# Initialize components
llm = ChatOpenAI(model="gpt-4o")
tavily = TavilyClient(api_key=os.environ['TAVILY_API_KEY'])
# Create LangGraph agent
agent = create_react_agent(
llm,
[tavily.search],
state_modifier=SystemMessage(content="You are a research assistant")
)
# Execute research
result = agent.invoke({
"messages": [{"role": "user", "content": "${INPUT_QUERY}"}]
})
print(result['messages'][-1].content)
""",
"OPENAI_API_KEY": "${OPENAI_API_KEY}",
"TAVILY_API_KEY": "${TAVILY_API_KEY}",
"INPUT_QUERY": "Research the latest trends in AI automation"
}
)
return research
Pattern 2: Code Injection with Files
For more complex LangGraph setups, inject your code and dependencies:
from kubiya_workflow_sdk import workflow, step
@workflow
def langgraph_multi_agent():
"""Multi-agent system with injected LangGraph code"""
multi_agent = step("langgraph-team").tool_def(
name="langgraph-multi-agent",
type="docker",
image="python:3.11-slim",
description="LangGraph multi-agent collaboration",
# Install dependencies and run
content="""#!/bin/bash
set -e
pip install -r /app/requirements.txt
python /app/multi_agent.py
""",
# Inject LangGraph code and dependencies
with_files=[
{
"destination": "/app/requirements.txt",
"content": """
langgraph>=0.0.20
langchain>=0.1.0
langchain-openai>=0.0.5
tavily-python>=0.3.0
"""
},
{
"destination": "/app/multi_agent.py",
"content": """
import os
import json
from typing import TypedDict, List
from langchain.schema import BaseMessage, HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
class TeamState(TypedDict):
messages: List[BaseMessage]
task: str
research_data: str
analysis: str
report: str
def researcher_node(state: TeamState):
llm = ChatOpenAI(model="gpt-4o")
prompt = f'''
You are a research specialist. Research this task: {state["task"]}
Provide comprehensive research data.
'''
response = llm.invoke([SystemMessage(content=prompt)])
return {
"research_data": response.content,
"messages": state["messages"] + [response]
}
def analyst_node(state: TeamState):
llm = ChatOpenAI(model="gpt-4o")
prompt = f'''
You are a data analyst. Analyze this research: {state["research_data"]}
Provide insights and recommendations.
'''
response = llm.invoke([SystemMessage(content=prompt)])
return {
"analysis": response.content,
"messages": state["messages"] + [response]
}
def writer_node(state: TeamState):
llm = ChatOpenAI(model="gpt-4o")
prompt = f'''
You are a technical writer. Create a report based on:
Research: {state["research_data"]}
Analysis: {state["analysis"]}
Write a comprehensive report.
'''
response = llm.invoke([SystemMessage(content=prompt)])
return {
"report": response.content,
"messages": state["messages"] + [response]
}
# Create the LangGraph workflow
def create_team_workflow():
workflow = StateGraph(TeamState)
# Add nodes
workflow.add_node("researcher", researcher_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("writer", writer_node)
# Define flow
workflow.add_edge(START, "researcher")
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "writer")
workflow.add_edge("writer", END)
# Compile with memory
memory = MemorySaver()
return workflow.compile(checkpointer=memory)
# Main execution
if __name__ == "__main__":
task = os.environ.get("TASK", "Analyze current AI market trends")
graph = create_team_workflow()
result = graph.invoke({
"messages": [HumanMessage(content=task)],
"task": task,
"research_data": "",
"analysis": "",
"report": ""
})
print(json.dumps({
"final_report": result["report"],
"research_summary": result["research_data"][:200] + "...",
"analysis_summary": result["analysis"][:200] + "..."
}, indent=2))
"""
}
],
args=[
{
"name": "task",
"type": "string",
"required": True,
"description": "Research task for the team"
}
]
)
return multi_agent
Pattern 3: LangGraph with Services
Use with_services
to run supporting infrastructure:
from kubiya_workflow_sdk import workflow, step
@workflow
def langgraph_with_vectordb():
"""LangGraph agent with vector database service"""
rag_agent = step("langgraph-rag").tool_def(
name="langgraph-rag-agent",
type="docker",
image="python:3.11-slim",
description="LangGraph RAG agent with Chroma",
content="""#!/bin/bash
set -e
pip install langgraph langchain langchain-openai chromadb
sleep 5 # Wait for Chroma service
python /app/rag_agent.py
""",
with_files=[
{
"destination": "/app/rag_agent.py",
"content": """
import chromadb
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain.tools import Tool
import os
# Connect to Chroma service
chroma_client = chromadb.HttpClient(host="vectordb", port=8000)
# Create retrieval tool
def retrieval_tool(query: str) -> str:
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
client=chroma_client,
embedding_function=embeddings
)
docs = vectorstore.similarity_search(query, k=3)
return "\\n".join([doc.page_content for doc in docs])
# Create LangGraph agent with RAG
llm = ChatOpenAI(model="gpt-4o")
tools = [
Tool(
name="retrieve_documents",
description="Retrieve relevant documents",
func=retrieval_tool
)
]
agent = create_react_agent(llm, tools)
# Execute query
result = agent.invoke({
"messages": [{"role": "user", "content": "${QUERY}"}]
})
print(result["messages"][-1].content)
"""
}
],
# Chroma vector database service
with_services=[
{
"name": "vectordb",
"image": "chromadb/chroma:latest",
"exposed_ports": [8000],
"env": {
"CHROMA_SERVER_HOST": "0.0.0.0",
"CHROMA_SERVER_HTTP_PORT": "8000"
}
}
],
args=[
{
"name": "query",
"type": "string",
"required": True,
"description": "Query for the RAG agent"
}
]
)
return rag_agent
Advanced Patterns
Streaming LangGraph Results
Stream execution updates in real-time:
from kubiya_workflow_sdk import workflow, step
@workflow
def streaming_langgraph():
"""Stream LangGraph execution updates"""
streaming_agent = step("stream-langgraph").tool_def(
name="langgraph-streaming",
type="docker",
image="python:3.11-slim",
description="Streaming LangGraph execution",
content="""#!/bin/bash
pip install langgraph langchain-openai
python /app/streaming_agent.py | tee /proc/1/fd/1
""",
with_files=[
{
"destination": "/app/streaming_agent.py",
"content": """
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
import json
import sys
def streaming_node(state):
llm = ChatOpenAI(model="gpt-4o", streaming=True)
print(json.dumps({"status": "starting", "step": "analysis"}))
sys.stdout.flush()
chunks = []
for chunk in llm.stream([{"role": "user", "content": state["query"]}]):
chunks.append(chunk.content)
print(json.dumps({
"status": "streaming",
"chunk": chunk.content,
"partial_result": "".join(chunks)
}))
sys.stdout.flush()
result = "".join(chunks)
print(json.dumps({"status": "complete", "result": result}))
return {"result": result}
# Execute streaming workflow
workflow = StateGraph(dict)
workflow.add_node("stream", streaming_node)
workflow.add_edge(START, "stream")
workflow.add_edge("stream", END)
graph = workflow.compile()
graph.invoke({"query": "${STREAMING_QUERY}"})
"""
}
],
args=[
{
"name": "streaming_query",
"type": "string",
"required": True,
"description": "Query to process with streaming"
}
]
)
return streaming_agent
Human-in-the-Loop Integration
Implement approval workflows by polling external APIs:
from kubiya_workflow_sdk import workflow, step
@workflow
def langgraph_human_approval():
"""LangGraph workflow with human approval via API polling"""
# Phase 1: Initial research
research = step("research").tool_def(
name="langgraph-research",
type="docker",
image="python:3.11-slim",
description="Research phase with LangGraph",
content="""#!/bin/bash
pip install langgraph langchain langchain-openai requests
python /app/research_agent.py
""",
with_files=[
{
"destination": "/app/research_agent.py",
"content": """
import requests
import json
import os
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage
def research_step(state):
llm = ChatOpenAI(model="gpt-4o")
result = llm.invoke([
HumanMessage(content=f"Research: {state['query']}")
])
# Submit research for approval via API
approval_data = {
"workflow_id": "${WORKFLOW_ID}",
"step": "research",
"content": result.content,
"requires_approval": True
}
approval_api = os.environ.get("APPROVAL_API_URL", "https://api.example.com/approvals")
response = requests.post(approval_api, json=approval_data)
approval_id = response.json().get("approval_id")
return {
"research": result.content,
"approval_id": approval_id,
"status": "pending_approval"
}
# Execute research and submit for approval
workflow = StateGraph(dict)
workflow.add_node("research", research_step)
workflow.add_edge(START, "research")
workflow.add_edge("research", END)
graph = workflow.compile()
result = graph.invoke({"query": "${RESEARCH_QUERY}"})
print(json.dumps(result, indent=2))
"""
}
],
args=[
{
"name": "research_query",
"type": "string",
"required": True,
"description": "What to research"
},
{
"name": "workflow_id",
"type": "string",
"required": True,
"description": "Unique workflow identifier"
}
]
)
# Poll for approval status
approval_check = step("check-approval").tool_def(
name="approval-checker",
type="docker",
image="python:3.11-slim",
description="Poll API for approval status",
content="""#!/bin/bash
pip install requests
python /app/approval_checker.py
""",
with_files=[
{
"destination": "/app/approval_checker.py",
"content": """
import requests
import time
import json
import os
def check_approval_status(approval_id, max_wait=300):
approval_api = os.environ.get("APPROVAL_API_URL", "https://api.example.com/approvals")
start_time = time.time()
while time.time() - start_time < max_wait:
try:
response = requests.get(f"{approval_api}/{approval_id}")
if response.status_code == 200:
approval_data = response.json()
status = approval_data.get("status")
if status == "approved":
print(json.dumps({
"status": "approved",
"message": "Research approved by human reviewer",
"approved_at": approval_data.get("approved_at")
}))
return True
elif status == "rejected":
print(json.dumps({
"status": "rejected",
"message": "Research rejected by human reviewer",
"reason": approval_data.get("rejection_reason")
}))
exit(1)
else:
print(f"Status: {status}, waiting...")
except Exception as e:
print(f"Error checking approval: {e}")
time.sleep(10) # Poll every 10 seconds
print(json.dumps({
"status": "timeout",
"message": "Approval timeout - no response within time limit"
}))
exit(1)
# Extract approval_id from previous step
import sys
research_output = json.loads('${research}')
approval_id = research_output.get("approval_id")
if not approval_id:
print("No approval ID found in research output")
exit(1)
check_approval_status(approval_id)
"""
}
],
args=[
{
"name": "approval_timeout",
"type": "string",
"required": False,
"default": "300",
"description": "Timeout in seconds for approval"
}
]
).depends("research")
# Continue processing after approval
final_report = step("generate-report").tool_def(
name="report-generator",
type="docker",
image="python:3.11-slim",
description="Generate final report after approval",
content="""#!/bin/bash
pip install langchain langchain-openai
python /app/report_generator.py
""",
with_files=[
{
"destination": "/app/report_generator.py",
"content": """
import json
from langchain_openai import ChatOpenAI
# Extract approved research
research_data = json.loads('${research}')
approved_research = research_data.get("research", "")
# Generate final report
llm = ChatOpenAI(model="gpt-4o")
result = llm.invoke([{
"role": "user",
"content": f"Create a comprehensive final report based on this approved research: {approved_research}"
}])
final_output = {
"final_report": result.content,
"based_on_research": approved_research[:200] + "...",
"approval_status": "approved"
}
print(json.dumps(final_output, indent=2))
"""
}
]
).depends("check-approval")
return final_report
Best Practices
LangGraph State in Kubiya:
- Use Redis or database services for persistent state
- Leverage Kubiya’s workflow checkpointing for long-running processes
- Structure state to survive container restarts
# Good: Externalized state
with_services=[{
"name": "postgres",
"image": "postgres:15",
"env": {"POSTGRES_DB": "langgraph_state"}
}]
Optimize Resource Usage:
- Set appropriate CPU/memory limits for LangGraph containers
- Use slim base images to reduce startup time
- Cache dependencies between runs
# Efficient resource allocation
resources={
"requests": {"cpu": "500m", "memory": "1Gi"},
"limits": {"cpu": "2000m", "memory": "4Gi"}
}
Real-time Updates:
- Stream LangGraph node execution progress
- Use JSON streaming for structured updates
- Flush stdout for immediate visibility
print(json.dumps({"step": "research", "progress": 0.3}))
sys.stdout.flush()
Migration from Standalone LangGraph
Analyze Current Setup
Identify your LangGraph components: agents, tools, state management, and dependencies
Choose Integration Pattern
Select the pattern that best fits your architecture: Docker images, code injection, or agent servers
Containerize Dependencies
Package your LangGraph code and dependencies for container execution
Add Kubiya Orchestration
Wrap your LangGraph workflows in Kubiya workflow steps
Test and Iterate
Verify functionality and optimize for production use
Next Steps
Was this page helpful?