Skip to main content
The Context Graph Service provides access to the knowledge graph containing entities and their relationships from various data sources. Query AWS, Azure, Github, Custom integrations, and any other data source to understand topology, find dependencies, and enable intelligent agent operations.

Overview

The Context Graph is a graph database that stores:
  • Nodes: Entities from any source (cloud resources, databases, custom data, CSV imports, etc.)
  • Relationships: Connections between entities (CONTAINS, USES, MEMBER_OF, custom relationships, etc.)
  • Properties: Entity metadata and attributes from the source system

Quick Start

from kubiya import ControlPlaneClient

cp_client = ControlPlaneClient(api_key="your-api-key")

# Get graph statistics
stats = cp_client.graph.get_stats()
print(f"Nodes: {stats['node_count']}")
print(f"Relationships: {stats['relationship_count']}")

# List entities from AWS integration
nodes = cp_client.graph.list_nodes(integration="aws", limit=10)
for node in nodes['nodes']:
    print(f"Entity: {node['id']}")

# Search for production entities
production_entities = cp_client.graph.search_nodes_by_text({
    "property_name": "resourcegroup",
    "search_text": "PRODUCTION"
})
print(f"Found {production_entities['count']} production entities")

Graph Statistics

Get overview statistics about the graph:
stats = cp_client.graph.get_stats()

print(f"Total Nodes: {stats['node_count']}")
print(f"Total Relationships: {stats['relationship_count']}")
print(f"Node Types: {len(stats['labels'])}")
print(f"Relationship Types: {len(stats['relationship_types'])}")

# Example output:
# Total Nodes: 3306
# Total Relationships: 4937
# Node Types: 100
# Relationship Types: 57

Filter by Integration

# AWS statistics
aws_stats = cp_client.graph.get_stats(integration="aws")

# Azure statistics
azure_stats = cp_client.graph.get_stats(integration="azure")

List Nodes

Basic Listing

# List all nodes (paginated)
nodes = cp_client.graph.list_nodes(skip=0, limit=100)

print(f"Total count: {nodes['count']}")
print(f"Returned: {len(nodes['nodes'])}")

for node in nodes['nodes']:
    print(f"ID: {node['id']}")
    print(f"Labels: {node['labels']}")
    print(f"Properties: {node.get('name', 'N/A')}")

Filter by Integration

# List only AWS integration nodes
aws_nodes = cp_client.graph.list_nodes(
    integration="aws",
    skip=0,
    limit=50
)

# List only Azure integration nodes
azure_nodes = cp_client.graph.list_nodes(
    integration="azure",
    skip=0,
    limit=50
)

# List CSV integration nodes
csv_nodes = cp_client.graph.list_nodes(
    integration="csv",
    skip=0,
    limit=50
)

# List custom integration nodes
custom_nodes = cp_client.graph.list_nodes(
    integration="custom",
    skip=0,
    limit=50
)

Pagination Example

def fetch_all_nodes(cp_client, integration=None):
    """Fetch all nodes with pagination"""
    all_nodes = []
    skip = 0
    limit = 100

    while True:
        result = cp_client.graph.list_nodes(
            integration=integration,
            skip=skip,
            limit=limit
        )

        all_nodes.extend(result['nodes'])

        if len(result['nodes']) < limit:
            break

        skip += limit

    return all_nodes

# Fetch all AWS nodes
aws_nodes = fetch_all_nodes(cp_client, integration="aws")
print(f"Total AWS nodes: {len(aws_nodes)}")

Get Specific Node

Retrieve details for a specific node by ID:
# Get node by ID
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:0"
node = cp_client.graph.get_node(node_id)

print(f"Node ID: {node['id']}")
print(f"Labels: {node['labels']}")
print(f"Properties: {node}")

# With integration filter
node = cp_client.graph.get_node(
    node_id,
    integration="aws"
)

Search Nodes

Search nodes using structured filters:
# Search with filters
search_data = {
    "filters": {
        "labels": ["EC2Instance"],
        "properties": {
            "state": "running"
        }
    }
}

results = cp_client.graph.search_nodes(
    search_data=search_data,
    limit=50
)

print(f"Found {len(results['nodes'])} running EC2 instances")
Search nodes by property values:
# Search by resource group
production_nodes = cp_client.graph.search_nodes_by_text({
    "property_name": "resourcegroup",
    "search_text": "PRODUCTION"
})

print(f"Found {production_nodes['count']} production resources")
for node in production_nodes['nodes']:
    print(f"- {node.get('name')}: {node.get('resourcegroup')}")

# Search by name
db_nodes = cp_client.graph.search_nodes_by_text({
    "property_name": "name",
    "search_text": "database"
})

# Search by tags
tagged_nodes = cp_client.graph.search_nodes_by_text({
    "property_name": "tags",
    "search_text": "production"
})

Advanced Search with Pagination

def search_all_text(cp_client, property_name, search_text, integration=None):
    """Search all nodes matching text criteria"""
    all_nodes = []
    skip = 0
    limit = 100

    while True:
        result = cp_client.graph.search_nodes_by_text(
            text_query={
                "property_name": property_name,
                "search_text": search_text
            },
            integration=integration,
            skip=skip,
            limit=limit
        )

        all_nodes.extend(result['nodes'])

        if len(result['nodes']) < limit:
            break

        skip += limit

    return all_nodes

# Find all production resources
prod_resources = search_all_text(
    cp_client,
    property_name="environment",
    search_text="production"
)

Node Labels (Types)

List all node types in the graph:
labels = cp_client.graph.list_labels()

print(f"Total node types: {labels['count']}")
print("Available node types:")

for label in labels['labels']:
    print(f"- {label}")

# Example output:
# - EC2Instance
# - S3Bucket
# - RDSInstance
# - AzureDisk
# - AzureStorageAccount
# - KMSKey
# - EKSCluster

Filter Labels by Integration

# AWS labels only
aws_labels = cp_client.graph.list_labels(integration="aws")

# Azure labels only
azure_labels = cp_client.graph.list_labels(integration="azure")

Common Node Types

AWS Integration:
  • EC2Instance - EC2 virtual machines
  • S3Bucket - S3 storage buckets
  • RDSInstance - RDS databases
  • EKSCluster - Kubernetes clusters
  • KMSKey - Encryption keys
  • AWSRole - IAM roles
  • AWSUser - IAM users
  • AWSPolicy - IAM policies
  • AWSVpc - Virtual private clouds
  • EC2SecurityGroup - Security groups
  • LoadBalancerV2 - Application/Network load balancers
Azure Integration:
  • AzureDisk - Managed disks
  • AzureStorageAccount - Storage accounts
  • AzureStorageBlobContainer - Blob containers
  • AzureResourceGroup - Resource groups
  • AzureSubscription - Azure subscriptions
Custom Integrations:
  • Custom node types defined by your data source
  • Types are determined by the data structure and labels in your integration

Relationships

Get Node Relationships

# Get all relationships for a node
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:0"
relationships = cp_client.graph.get_relationships(node_id)

print(f"Found {len(relationships['relationships'])} relationships")

for rel in relationships['relationships']:
    print(f"Type: {rel['type']}")
    print(f"From: {rel['from']}")
    print(f"To: {rel['to']}")

Filter by Direction

# Only incoming relationships
incoming = cp_client.graph.get_relationships(
    node_id=node_id,
    direction="incoming"
)

# Only outgoing relationships
outgoing = cp_client.graph.get_relationships(
    node_id=node_id,
    direction="outgoing"
)

# Both directions (default)
both = cp_client.graph.get_relationships(
    node_id=node_id,
    direction="both"
)

Filter by Relationship Type

# Only CONTAINS relationships
contains_rels = cp_client.graph.get_relationships(
    node_id=node_id,
    relationship_type="CONTAINS"
)

# Only MEMBER_OF relationships
member_rels = cp_client.graph.get_relationships(
    node_id=node_id,
    relationship_type="MEMBER_OF_AWS_VPC"
)

List All Relationship Types

rel_types = cp_client.graph.list_relationship_types()

print(f"Total relationship types: {rel_types['count']}")
for rel_type in rel_types['relationship_types']:
    print(f"- {rel_type}")

# Example output:
# - CONTAINS
# - USES
# - MEMBER_OF_AWS_VPC
# - ATTACHED_TO
# - ASSOCIATED_WITH
# - POLICY
# - TAGGED

Common Relationship Types

  • CONTAINS - Container/containment relationships
  • USES - Usage dependencies
  • MEMBER_OF_AWS_VPC - VPC membership
  • ATTACHED_TO - Attachment relationships (volumes, NICs)
  • ASSOCIATED_WITH - General associations
  • POLICY - Policy attachments
  • TAGGED - Tag relationships
  • TRUSTS_AWS_PRINCIPAL - IAM trust relationships
  • ALLOWS_TRAFFIC_FROM - Security group rules
  • ROUTES_TO_GATEWAY - Routing relationships

Subgraph Queries

Get a subgraph (portion of the graph) starting from a specific node:
# Get subgraph with depth 3
node_id = "4:6e07742f-ec5f-4079-bbfd-e60ffccb0db2:100"
subgraph = cp_client.graph.get_subgraph(
    node_id=node_id,
    depth=3
)

print(f"Subgraph contains {len(subgraph['nodes'])} nodes")
print(f"Subgraph contains {len(subgraph['relationships'])} relationships")

# Visualize subgraph
for node in subgraph['nodes']:
    print(f"Node: {node['id']} - {node['labels']}")

for rel in subgraph['relationships']:
    print(f"Relationship: {rel['from']} -{rel['type']}-> {rel['to']}")

Use Cases for Subgraphs

# Find all resources connected to a VPC
vpc_id = "vpc-123456"
vpc_subgraph = cp_client.graph.get_subgraph(
    node_id=vpc_id,
    depth=2
)
print(f"VPC contains {len(vpc_subgraph['nodes'])} connected resources")

# Find all resources in a resource group
rg_id = "rg-production"
rg_subgraph = cp_client.graph.get_subgraph(
    node_id=rg_id,
    depth=1
)
print(f"Resource group contains {len(rg_subgraph['nodes'])} resources")

# Analyze security group dependencies
sg_id = "sg-123456"
sg_subgraph = cp_client.graph.get_subgraph(
    node_id=sg_id,
    depth=2,
    integration="aws"
)

Custom Cypher Queries

Execute custom Cypher queries for advanced graph operations:
# Count all nodes
query = {
    "query": "MATCH (n) RETURN count(n) as count"
}
result = cp_client.graph.execute_query(query)
print(f"Total nodes: {result['results'][0]['count']}")

# Find specific node types
query = {
    "query": """
        MATCH (n:EC2Instance)
        WHERE n.state = 'running'
        RETURN n.id, n.name, n.instanceType
        LIMIT 10
    """
}
result = cp_client.graph.execute_query(query)
for row in result['results']:
    print(f"Instance: {row['n.name']}, Type: {row['n.instanceType']}")

# Find relationships
query = {
    "query": """
        MATCH (s:S3Bucket)-[r:CONTAINS]->(o)
        RETURN s.name, type(r), o.name
        LIMIT 20
    """
}
result = cp_client.graph.execute_query(query)
for row in result['results']:
    print(f"{row['s.name']} -{row['type(r)']}-> {row['o.name']}")

Advanced Cypher Examples

# Find all EC2 instances in a specific VPC
query = {
    "query": """
        MATCH (vpc:AWSVpc {id: $vpc_id})-[:CONTAINS*]->(ec2:EC2Instance)
        RETURN ec2.id, ec2.name, ec2.state
    """,
    "parameters": {
        "vpc_id": "vpc-123456"
    }
}
result = cp_client.graph.execute_query(query)

# Find security group rules
query = {
    "query": """
        MATCH (sg:EC2SecurityGroup)-[r:ALLOWS_TRAFFIC_FROM]->(source)
        WHERE sg.name CONTAINS 'production'
        RETURN sg.name, r.port, r.protocol, source.name
    """
}
result = cp_client.graph.execute_query(query)

# Find orphaned resources
query = {
    "query": """
        MATCH (n)
        WHERE NOT (n)-[]-()
        RETURN labels(n), n.id, n.name
        LIMIT 50
    """
}
result = cp_client.graph.execute_query(query)

Integration Management

List available integrations in the graph:
integrations = cp_client.graph.list_integrations()

print(f"Total integrations: {integrations['count']}")
for integration in integrations['integrations']:
    print(f"- {integration}")

# Example output:
# - Aws
# - Azure
# - Csv
# - Custom

Health Check

Check the health of the Context Graph service:
health = cp_client.graph.health()
print(f"Graph service status: {health['status']}")
# Output: healthy

Practical Use Cases

1. Find All Production Entities

def find_production_entities(cp_client):
    """Find all entities tagged or named with 'production'"""
    # Search by resource group
    rg_results = cp_client.graph.search_nodes_by_text({
        "property_name": "resourcegroup",
        "search_text": "production"
    })

    # Search by tags
    tag_results = cp_client.graph.search_nodes_by_text({
        "property_name": "tags",
        "search_text": "production"
    })

    # Search by name
    name_results = cp_client.graph.search_nodes_by_text({
        "property_name": "name",
        "search_text": "production"
    })

    # Combine results
    all_nodes = (
        rg_results['nodes'] +
        tag_results['nodes'] +
        name_results['nodes']
    )

    # Deduplicate by ID
    unique_nodes = {node['id']: node for node in all_nodes}
    return list(unique_nodes.values())

production_entities = find_production_entities(cp_client)
print(f"Found {len(production_entities)} production entities")

2. Analyze Entity Dependencies

def analyze_dependencies(cp_client, entity_id):
    """Analyze dependencies for an entity"""
    # Get the entity
    entity = cp_client.graph.get_node(entity_id)

    # Get all relationships
    relationships = cp_client.graph.get_relationships(
        entity_id,
        direction="both"
    )

    # Categorize relationships
    dependencies = {
        'incoming': [],
        'outgoing': []
    }

    for rel in relationships['relationships']:
        if rel['from'] == entity_id:
            dependencies['outgoing'].append(rel)
        else:
            dependencies['incoming'].append(rel)

    print(f"Entity: {entity['id']}")
    print(f"Incoming dependencies: {len(dependencies['incoming'])}")
    print(f"Outgoing dependencies: {len(dependencies['outgoing'])}")

    return dependencies

deps = analyze_dependencies(cp_client, "vpc-123456")

3. Find Unused Entities

def find_unused_entities(cp_client, node_type):
    """Find entities with no relationships"""
    query = {
        "query": f"""
            MATCH (n:{node_type})
            WHERE NOT (n)-[]-()
            RETURN n.id, n.name, labels(n)
        """
    }

    result = cp_client.graph.execute_query(query)

    unused = []
    for row in result['results']:
        unused.append({
            'id': row['n.id'],
            'name': row.get('n.name', 'N/A'),
            'labels': row['labels(n)']
        })

    return unused

# Find unused S3 buckets
unused_buckets = find_unused_entities(cp_client, "S3Bucket")
print(f"Unused S3 buckets: {len(unused_buckets)}")

4. Security Audit

def audit_security_groups(cp_client):
    """Audit security groups for overly permissive rules"""
    query = {
        "query": """
            MATCH (sg:EC2SecurityGroup)-[r:ALLOWS_TRAFFIC_FROM]->(source)
            WHERE r.cidr = '0.0.0.0/0'
            RETURN sg.name, sg.id, r.port, r.protocol
        """
    }

    result = cp_client.graph.execute_query(query)

    risky_groups = []
    for row in result['results']:
        risky_groups.append({
            'name': row['sg.name'],
            'id': row['sg.id'],
            'port': row['r.port'],
            'protocol': row['r.protocol']
        })

    return risky_groups

risky = audit_security_groups(cp_client)
print(f"Found {len(risky)} security groups with open access")

Error Handling

from kubiya.resources.exceptions import GraphError

try:
    nodes = cp_client.graph.list_nodes(limit=100)
except GraphError as e:
    print(f"Graph operation failed: {e}")
    # Handle error (retry, log, alert, etc.)

Best Practices

1. Use Pagination

# Always paginate for large result sets
skip = 0
limit = 100

while True:
    nodes = cp_client.graph.list_nodes(skip=skip, limit=limit)
    if not nodes['nodes']:
        break

    process_nodes(nodes['nodes'])
    skip += limit

2. Filter by Integration

# Filter early to reduce data transfer
aws_nodes = cp_client.graph.list_nodes(integration="aws", limit=50)
azure_nodes = cp_client.graph.list_nodes(integration="azure", limit=50)

3. Use Specific Queries

# Instead of fetching all nodes and filtering client-side
# Use text search or Cypher queries

# Bad (fetches all nodes)
all_nodes = cp_client.graph.list_nodes(limit=10000)
prod_nodes = [n for n in all_nodes['nodes'] if 'prod' in n.get('name', '')]

# Good (server-side filtering)
prod_nodes = cp_client.graph.search_nodes_by_text({
    "property_name": "name",
    "search_text": "prod"
})

API Reference

Methods

MethodDescription
health()Check graph service health
list_nodes(integration, skip, limit)List all nodes with pagination
get_node(node_id, integration)Get specific node by ID
search_nodes(search_data, integration, skip, limit)Structured node search
search_nodes_by_text(text_query, integration, skip, limit)Text-based node search
get_relationships(node_id, direction, relationship_type, integration, skip, limit)Get node relationships
get_subgraph(node_id, depth, integration)Get subgraph from node
list_labels(integration, skip, limit)List all node types
list_relationship_types(integration, skip, limit)List all relationship types
get_stats(integration, skip, limit)Get graph statistics
execute_query(query)Execute custom Cypher query
list_integrations(skip, limit)List available integrations

Next Steps