LangGraph Integration

LangGraph’s stateful multi-agent workflows integrate seamlessly with Kubiya’s execution platform. You can use pre-built Docker images, inject your code on-the-fly, or create custom agent servers - all while maintaining LangGraph’s powerful state management and human-in-the-loop capabilities.

Integration Options

Kubiya provides multiple ways to integrate LangGraph, depending on your setup and requirements:

Pre-built Images

Use existing LangGraph Docker images for immediate execution

Code Injection

Inject your LangGraph code using with_files for dynamic execution

Agent Servers

Deploy LangGraph as a custom agent server provider

Pattern 1: Pre-built Docker Images

The simplest integration uses existing LangGraph Docker images:

from kubiya_workflow_sdk import workflow, step

@workflow
def langgraph_research_agent():
    """Research agent using pre-built LangGraph image"""
    
    research = step("langgraph-research").docker(
        image="langchain/langgraph:latest",
        command="python -c \"$LANGGRAPH_CODE\"",
        env={
            "LANGGRAPH_CODE": """
import os
from langchain.schema import SystemMessage
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from tavily import TavilyClient

# Initialize components
llm = ChatOpenAI(model="gpt-4o")
tavily = TavilyClient(api_key=os.environ['TAVILY_API_KEY'])

# Create LangGraph agent
agent = create_react_agent(
    llm, 
    [tavily.search],
    state_modifier=SystemMessage(content="You are a research assistant")
)

# Execute research
result = agent.invoke({
    "messages": [{"role": "user", "content": "${INPUT_QUERY}"}]
})

print(result['messages'][-1].content)
            """,
            "OPENAI_API_KEY": "${OPENAI_API_KEY}",
            "TAVILY_API_KEY": "${TAVILY_API_KEY}",
            "INPUT_QUERY": "Research the latest trends in AI automation"
        }
    )
    
    return research

Pattern 2: Code Injection with Files

For more complex LangGraph setups, inject your code and dependencies:

from kubiya_workflow_sdk import workflow, step

@workflow
def langgraph_multi_agent():
    """Multi-agent system with injected LangGraph code"""
    
    multi_agent = step("langgraph-team").tool_def(
        name="langgraph-multi-agent",
        type="docker", 
        image="python:3.11-slim",
        description="LangGraph multi-agent collaboration",
        
        # Install dependencies and run
        content="""#!/bin/bash
set -e
pip install -r /app/requirements.txt
python /app/multi_agent.py
        """,
        
        # Inject LangGraph code and dependencies
        with_files=[
            {
                "destination": "/app/requirements.txt",
                "content": """
langgraph>=0.0.20
langchain>=0.1.0
langchain-openai>=0.0.5
tavily-python>=0.3.0
                """
            },
            {
                "destination": "/app/multi_agent.py", 
                "content": """
import os
import json
from typing import TypedDict, List
from langchain.schema import BaseMessage, HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI

class TeamState(TypedDict):
    messages: List[BaseMessage]
    task: str
    research_data: str
    analysis: str
    report: str

def researcher_node(state: TeamState):
    llm = ChatOpenAI(model="gpt-4o")
    
    prompt = f'''
    You are a research specialist. Research this task: {state["task"]}
    
    Provide comprehensive research data.
    '''
    
    response = llm.invoke([SystemMessage(content=prompt)])
    return {
        "research_data": response.content,
        "messages": state["messages"] + [response]
    }

def analyst_node(state: TeamState):
    llm = ChatOpenAI(model="gpt-4o")
    
    prompt = f'''
    You are a data analyst. Analyze this research: {state["research_data"]}
    
    Provide insights and recommendations.
    '''
    
    response = llm.invoke([SystemMessage(content=prompt)])
    return {
        "analysis": response.content,
        "messages": state["messages"] + [response]
    }

def writer_node(state: TeamState):
    llm = ChatOpenAI(model="gpt-4o")
    
    prompt = f'''
    You are a technical writer. Create a report based on:
    
    Research: {state["research_data"]}
    Analysis: {state["analysis"]}
    
    Write a comprehensive report.
    '''
    
    response = llm.invoke([SystemMessage(content=prompt)])
    return {
        "report": response.content,
        "messages": state["messages"] + [response]
    }

# Create the LangGraph workflow
def create_team_workflow():
    workflow = StateGraph(TeamState)
    
    # Add nodes
    workflow.add_node("researcher", researcher_node)
    workflow.add_node("analyst", analyst_node)
    workflow.add_node("writer", writer_node)
    
    # Define flow
    workflow.add_edge(START, "researcher")
    workflow.add_edge("researcher", "analyst")
    workflow.add_edge("analyst", "writer")
    workflow.add_edge("writer", END)
    
    # Compile with memory
    memory = MemorySaver()
    return workflow.compile(checkpointer=memory)

# Main execution
if __name__ == "__main__":
    task = os.environ.get("TASK", "Analyze current AI market trends")
    
    graph = create_team_workflow()
    
    result = graph.invoke({
        "messages": [HumanMessage(content=task)],
        "task": task,
        "research_data": "",
        "analysis": "",
        "report": ""
    })
    
    print(json.dumps({
        "final_report": result["report"],
        "research_summary": result["research_data"][:200] + "...",
        "analysis_summary": result["analysis"][:200] + "..."
    }, indent=2))
                """
            }
        ],
        
        args=[
            {
                "name": "task",
                "type": "string", 
                "required": True,
                "description": "Research task for the team"
            }
        ]
    )
    
    return multi_agent

Pattern 3: LangGraph with Services

Use with_services to run supporting infrastructure:

from kubiya_workflow_sdk import workflow, step

@workflow  
def langgraph_with_vectordb():
    """LangGraph agent with vector database service"""
    
    rag_agent = step("langgraph-rag").tool_def(
        name="langgraph-rag-agent",
        type="docker",
        image="python:3.11-slim", 
        description="LangGraph RAG agent with Chroma",
        
        content="""#!/bin/bash
set -e
pip install langgraph langchain langchain-openai chromadb
sleep 5  # Wait for Chroma service
python /app/rag_agent.py
        """,
        
        with_files=[
            {
                "destination": "/app/rag_agent.py",
                "content": """
import chromadb
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain.tools import Tool
import os

# Connect to Chroma service
chroma_client = chromadb.HttpClient(host="vectordb", port=8000)

# Create retrieval tool
def retrieval_tool(query: str) -> str:
    embeddings = OpenAIEmbeddings()
    vectorstore = Chroma(
        client=chroma_client,
        embedding_function=embeddings
    )
    docs = vectorstore.similarity_search(query, k=3)
    return "\\n".join([doc.page_content for doc in docs])

# Create LangGraph agent with RAG
llm = ChatOpenAI(model="gpt-4o")
tools = [
    Tool(
        name="retrieve_documents", 
        description="Retrieve relevant documents",
        func=retrieval_tool
    )
]

agent = create_react_agent(llm, tools)

# Execute query
result = agent.invoke({
    "messages": [{"role": "user", "content": "${QUERY}"}]
})

print(result["messages"][-1].content)
                """
            }
        ],
        
        # Chroma vector database service
        with_services=[
            {
                "name": "vectordb",
                "image": "chromadb/chroma:latest",
                "exposed_ports": [8000],
                "env": {
                    "CHROMA_SERVER_HOST": "0.0.0.0",
                    "CHROMA_SERVER_HTTP_PORT": "8000"
                }
            }
        ],
        
        args=[
            {
                "name": "query", 
                "type": "string",
                "required": True,
                "description": "Query for the RAG agent"
            }
        ]
    )
    
    return rag_agent

Advanced Patterns

Streaming LangGraph Results

Stream execution updates in real-time:

from kubiya_workflow_sdk import workflow, step

@workflow
def streaming_langgraph():
    """Stream LangGraph execution updates"""
    
    streaming_agent = step("stream-langgraph").tool_def(
        name="langgraph-streaming",
        type="docker",
        image="python:3.11-slim",
        description="Streaming LangGraph execution",
        
        content="""#!/bin/bash
pip install langgraph langchain-openai
python /app/streaming_agent.py | tee /proc/1/fd/1
        """,
        
        with_files=[
            {
                "destination": "/app/streaming_agent.py",
                "content": """
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
import json
import sys

def streaming_node(state):
    llm = ChatOpenAI(model="gpt-4o", streaming=True)
    
    print(json.dumps({"status": "starting", "step": "analysis"}))
    sys.stdout.flush()
    
    chunks = []
    for chunk in llm.stream([{"role": "user", "content": state["query"]}]):
        chunks.append(chunk.content)
        print(json.dumps({
            "status": "streaming", 
            "chunk": chunk.content,
            "partial_result": "".join(chunks)
        }))
        sys.stdout.flush()
    
    result = "".join(chunks)
    print(json.dumps({"status": "complete", "result": result}))
    
    return {"result": result}

# Execute streaming workflow
workflow = StateGraph(dict)
workflow.add_node("stream", streaming_node)
workflow.add_edge(START, "stream")
workflow.add_edge("stream", END)

graph = workflow.compile()
graph.invoke({"query": "${STREAMING_QUERY}"})
                """
            }
        ],
        args=[
            {
                "name": "streaming_query",
                "type": "string", 
                "required": True,
                "description": "Query to process with streaming"
            }
        ]
    )
    
    return streaming_agent

Human-in-the-Loop Integration

Implement approval workflows by polling external APIs:

from kubiya_workflow_sdk import workflow, step

@workflow
def langgraph_human_approval():
    """LangGraph workflow with human approval via API polling"""
    
    # Phase 1: Initial research
    research = step("research").tool_def(
        name="langgraph-research",
        type="docker",
        image="python:3.11-slim",
        description="Research phase with LangGraph",
        
        content="""#!/bin/bash
pip install langgraph langchain langchain-openai requests
python /app/research_agent.py
        """,
        
        with_files=[
            {
                "destination": "/app/research_agent.py",
                "content": """
import requests
import json
import os
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage

def research_step(state):
    llm = ChatOpenAI(model="gpt-4o")
    result = llm.invoke([
        HumanMessage(content=f"Research: {state['query']}")
    ])
    
    # Submit research for approval via API
    approval_data = {
        "workflow_id": "${WORKFLOW_ID}",
        "step": "research",
        "content": result.content,
        "requires_approval": True
    }
    
    approval_api = os.environ.get("APPROVAL_API_URL", "https://api.example.com/approvals")
    response = requests.post(approval_api, json=approval_data)
    approval_id = response.json().get("approval_id")
    
    return {
        "research": result.content, 
        "approval_id": approval_id,
        "status": "pending_approval"
    }

# Execute research and submit for approval
workflow = StateGraph(dict)
workflow.add_node("research", research_step)
workflow.add_edge(START, "research")
workflow.add_edge("research", END)

graph = workflow.compile()
result = graph.invoke({"query": "${RESEARCH_QUERY}"})
print(json.dumps(result, indent=2))
                """
            }
        ],
        
        args=[
            {
                "name": "research_query",
                "type": "string",
                "required": True,
                "description": "What to research"
            },
            {
                "name": "workflow_id",
                "type": "string",
                "required": True,
                "description": "Unique workflow identifier"
            }
        ]
    )
    
    # Poll for approval status
    approval_check = step("check-approval").tool_def(
        name="approval-checker",
        type="docker",
        image="python:3.11-slim",
        description="Poll API for approval status",
        
        content="""#!/bin/bash
pip install requests
python /app/approval_checker.py
        """,
        
        with_files=[
            {
                "destination": "/app/approval_checker.py",
                "content": """
import requests
import time
import json
import os

def check_approval_status(approval_id, max_wait=300):
    approval_api = os.environ.get("APPROVAL_API_URL", "https://api.example.com/approvals")
    start_time = time.time()
    
    while time.time() - start_time < max_wait:
        try:
            response = requests.get(f"{approval_api}/{approval_id}")
            if response.status_code == 200:
                approval_data = response.json()
                status = approval_data.get("status")
                
                if status == "approved":
                    print(json.dumps({
                        "status": "approved",
                        "message": "Research approved by human reviewer",
                        "approved_at": approval_data.get("approved_at")
                    }))
                    return True
                elif status == "rejected":
                    print(json.dumps({
                        "status": "rejected", 
                        "message": "Research rejected by human reviewer",
                        "reason": approval_data.get("rejection_reason")
                    }))
                    exit(1)
                else:
                    print(f"Status: {status}, waiting...")
                    
        except Exception as e:
            print(f"Error checking approval: {e}")
            
        time.sleep(10)  # Poll every 10 seconds
    
    print(json.dumps({
        "status": "timeout",
        "message": "Approval timeout - no response within time limit"
    }))
    exit(1)

# Extract approval_id from previous step
import sys
research_output = json.loads('${research}')
approval_id = research_output.get("approval_id")

if not approval_id:
    print("No approval ID found in research output")
    exit(1)

check_approval_status(approval_id)
                """
            }
        ],
        
        args=[
            {
                "name": "approval_timeout",
                "type": "string",
                "required": False,
                "default": "300",
                "description": "Timeout in seconds for approval"
            }
        ]
    ).depends("research")
    
    # Continue processing after approval
    final_report = step("generate-report").tool_def(
        name="report-generator",
        type="docker", 
        image="python:3.11-slim",
        description="Generate final report after approval",
        
        content="""#!/bin/bash
pip install langchain langchain-openai
python /app/report_generator.py
        """,
        
        with_files=[
            {
                "destination": "/app/report_generator.py",
                "content": """
import json
from langchain_openai import ChatOpenAI

# Extract approved research
research_data = json.loads('${research}')
approved_research = research_data.get("research", "")

# Generate final report
llm = ChatOpenAI(model="gpt-4o")
result = llm.invoke([{
    "role": "user", 
    "content": f"Create a comprehensive final report based on this approved research: {approved_research}"
}])

final_output = {
    "final_report": result.content,
    "based_on_research": approved_research[:200] + "...",
    "approval_status": "approved"
}

print(json.dumps(final_output, indent=2))
                """
            }
        ]
    ).depends("check-approval")
    
    return final_report

Best Practices

Migration from Standalone LangGraph

1

Analyze Current Setup

Identify your LangGraph components: agents, tools, state management, and dependencies

2

Choose Integration Pattern

Select the pattern that best fits your architecture: Docker images, code injection, or agent servers

3

Containerize Dependencies

Package your LangGraph code and dependencies for container execution

4

Add Kubiya Orchestration

Wrap your LangGraph workflows in Kubiya workflow steps

5

Test and Iterate

Verify functionality and optimize for production use

Next Steps