Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.kubiya.ai/llms.txt

Use this file to discover all available pages before exploring further.

Follow these best practices to build reliable, maintainable, and performant applications with the Kubiya SDK.

Overview

This guide covers best practices across:
  • Authentication & Security: Protect credentials and handle permissions
  • Performance: Optimize API usage and reduce latency
  • Error Handling: Build resilient applications
  • Resource Management: Clean up resources properly
  • Code Organization: Structure your code effectively
  • Testing: Ensure reliability through testing

Authentication & Security

Use Environment Variables for Credentials

# ✅ GOOD - Environment variables
import os
from kubiya import ControlPlaneClient

api_key = os.getenv("KUBIYA_API_KEY")
client = ControlPlaneClient(api_key=api_key)

# ❌ BAD - Hardcoded credentials
client = ControlPlaneClient(api_key="hardcoded-key-here")  # Don't do this!

Use .env Files for Local Development

# .env
KUBIYA_API_KEY=your-api-key-here
KUBIYA_BASE_URL=https://control-plane.kubiya.ai
from dotenv import load_dotenv
from kubiya import ControlPlaneClient
import os

load_dotenv()

client = ControlPlaneClient(
    api_key=os.getenv("KUBIYA_API_KEY"),
    base_url=os.getenv("KUBIYA_BASE_URL")
)

Never Log Sensitive Data

import logging
from kubiya import ControlPlaneClient

logger = logging.getLogger(__name__)
client = ControlPlaneClient(api_key="your-api-key")

# ❌ BAD - Logging sensitive data
secret = client.secrets.get_value(name="api-token")
logger.info(f"Got secret: {secret['value']}")  # NEVER LOG SECRETS!

# ✅ GOOD - Log metadata only
secret = client.secrets.get_value(name="api-token")
logger.info(f"Retrieved secret: {secret['name']}")

Clear Sensitive Data from Memory

from kubiya import ControlPlaneClient
import gc

client = ControlPlaneClient(api_key="your-api-key")

# Get secret
secret = client.secrets.get_value(name="api-token")
token = secret['value']

# Use token
# ... perform operations ...

# Clear from memory
token = None
secret = None
gc.collect()

Performance Optimization

Use Batch Operations

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# ❌ BAD - Individual operations in loop
for node in nodes:
    client.ingestion.ingest_node(
        id=node['id'],
        labels=node['labels'],
        properties=node['properties']
    )

# ✅ GOOD - Batch operation
client.ingestion.ingest_nodes_batch(nodes=nodes)

Implement Caching

from kubiya import ControlPlaneClient
from datetime import datetime, timedelta
from typing import Optional, Dict, Any

class CachedClient:
    """Wrapper with caching for frequently accessed data."""

    def __init__(self, client: ControlPlaneClient, ttl_seconds: int = 300):
        self.client = client
        self.ttl_seconds = ttl_seconds
        self._cache: Dict[str, tuple[Any, datetime]] = {}

    def list_models(self, use_cache: bool = True):
        """List models with caching."""
        cache_key = "models_list"

        if use_cache and cache_key in self._cache:
            data, expiry = self._cache[cache_key]
            if datetime.utcnow() < expiry:
                return data

        # Fetch from API
        models = self.client.models.list()

        # Cache result
        expiry = datetime.utcnow() + timedelta(seconds=self.ttl_seconds)
        self._cache[cache_key] = (models, expiry)

        return models

# Usage
from kubiya import ControlPlaneClient

base_client = ControlPlaneClient(api_key="your-api-key")
cached_client = CachedClient(base_client, ttl_seconds=300)

# First call hits API
models1 = cached_client.list_models()

# Second call uses cache (within TTL)
models2 = cached_client.list_models()

Paginate Large Result Sets

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

def fetch_all_memories(client: ControlPlaneClient, batch_size: int = 100):
    """Fetch all memories with pagination."""
    all_memories = []
    skip = 0

    while True:
        batch = client.graph.list_memories(skip=skip, limit=batch_size)

        if not batch:
            break

        all_memories.extend(batch)
        skip += batch_size

        # Process batch immediately instead of loading all into memory
        # process_batch(batch)

    return all_memories

Use Async Storage for Large Data

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# ✅ GOOD - Async for large content
job = client.graph.store_memory_async(
    dataset_id="logs",
    context=large_log_data,  # Multiple MB of data
    metadata={"source": "application"}
)

# Continue with other work while storage completes
# ...

# ❌ BAD - Blocking for large content
memory = client.graph.store_memory(
    dataset_id="logs",
    context=large_log_data  # Blocks until complete
)

Error Handling

Catch Specific Exceptions

from kubiya import ControlPlaneClient
from kubiya.core.exceptions import (
    AuthenticationError as KubiyaAuthenticationError,
    TimeoutError as KubiyaTimeoutError,
    APIError as KubiyaAPIError
)
from kubiya.resources.exceptions import GraphError

client = ControlPlaneClient(api_key="your-api-key")

try:
    result = client.graph.intelligent_search(keywords="query")

except GraphError as e:
    # Handle graph-specific errors
    print(f"Graph error: {e}")

except KubiyaAuthenticationError as e:
    # Handle auth errors
    print(f"Authentication failed: {e}")

except KubiyaTimeoutError as e:
    # Handle timeouts
    print(f"Request timed out: {e}")

except KubiyaAPIError as e:
    # Handle general API errors
    print(f"API error: {e}")

Implement Retry Logic

from kubiya import ControlPlaneClient
from kubiya.core.exceptions import (
    ConnectionError as KubiyaConnectionError,
    TimeoutError as KubiyaTimeoutError
)
import time

def retry_operation(operation, max_attempts=3, backoff_factor=2):
    """Retry operation with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            return operation()

        except (KubiyaConnectionError, KubiyaTimeoutError) as e:
            if attempt == max_attempts - 1:
                raise

            delay = backoff_factor ** attempt
            print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
            time.sleep(delay)

# Usage
client = ControlPlaneClient(api_key="your-api-key")

result = retry_operation(
    lambda: client.graph.intelligent_search(keywords="query")
)

Clean Up Resources

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")
session_id = None

try:
    result = client.graph.intelligent_search(keywords="query")
    session_id = result["session_id"]

    # Use session...

finally:
    # Always clean up
    if session_id:
        try:
            client.graph.delete_search_session(session_id)
        except Exception:
            pass  # Ignore cleanup errors

Resource Management

Use Context Managers

from contextlib import contextmanager
from kubiya import ControlPlaneClient

@contextmanager
def temp_dataset(client: ControlPlaneClient, name: str):
    """Context manager for temporary dataset."""
    dataset = client.datasets.create_dataset(name=name, scope="user")

    try:
        yield dataset
    finally:
        # Automatic cleanup
        try:
            client.datasets.delete_dataset(dataset_id=dataset['id'])
        except Exception:
            pass

# Usage
client = ControlPlaneClient(api_key="your-api-key")

with temp_dataset(client, "temp-data") as dataset:
    # Use dataset
    client.graph.store_memory(
        dataset_id=dataset['id'],
        context="temporary data"
    )
# Dataset automatically deleted

Clean Up Search Sessions

from kubiya import ControlPlaneClient

client = ControlPlaneClient(api_key="your-api-key")

# Track sessions for cleanup
active_sessions = []

try:
    # Create searches
    result1 = client.graph.intelligent_search(keywords="query1")
    active_sessions.append(result1["session_id"])

    result2 = client.graph.intelligent_search(keywords="query2")
    active_sessions.append(result2["session_id"])

finally:
    # Clean up all sessions
    for session_id in active_sessions:
        try:
            client.graph.delete_search_session(session_id)
        except Exception:
            pass

Monitor Dataset Status

from kubiya import ControlPlaneClient
import time

client = ControlPlaneClient(api_key="your-api-key")

# Create dataset
dataset = client.datasets.create_dataset(name="large-dataset", scope="org")

# Store large amount of data
for data in large_data_batches:
    client.graph.store_memory_async(
        dataset_id=dataset['id'],
        context=data
    )

# Wait for processing to complete
while True:
    status = client.datasets.get_dataset_status(dataset_id=dataset['id'])

    if status['status'] == 'ready':
        break

    time.sleep(5)

# Now safe to query
results = client.graph.semantic_search(
    query="test",
    filters={"dataset_id": dataset['id']}
)

Code Organization

Use Type Hints

from kubiya import ControlPlaneClient
from typing import List, Dict, Optional

def search_infrastructure(
    client: ControlPlaneClient,
    query: str,
    integration: Optional[str] = None
) -> Dict[str, any]:
    """Search infrastructure with type hints."""
    return client.graph.intelligent_search(
        keywords=query,
        integration=integration
    )

def batch_import_nodes(
    client: ControlPlaneClient,
    nodes: List[Dict[str, any]],
    dataset_id: str
) -> Dict[str, any]:
    """Import nodes with type hints."""
    return client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id=dataset_id
    )

Create Reusable Functions

from kubiya import ControlPlaneClient
from typing import List, Dict

def create_infrastructure_dataset(
    client: ControlPlaneClient,
    environment: str
) -> Dict:
    """Create dataset for specific environment."""
    return client.datasets.create_dataset(
        name=f"{environment}-infrastructure",
        description=f"Infrastructure data for {environment} environment",
        scope="org"
    )

def import_aws_resources(
    client: ControlPlaneClient,
    region: str,
    dataset_id: str
) -> Dict:
    """Import AWS resources from region."""
    # Fetch resources
    nodes = fetch_aws_resources(region)

    # Import to graph
    return client.ingestion.ingest_nodes_batch(
        nodes=nodes,
        dataset_id=dataset_id,
        duplicate_handling="update"
    )

Use Configuration Classes

from dataclasses import dataclass
from kubiya import ControlPlaneClient
from typing import Optional

@dataclass
class SearchConfig:
    """Configuration for intelligent search."""
    max_turns: int = 5
    temperature: float = 0.7
    enable_semantic_search: bool = True
    enable_cypher_queries: bool = False
    integration: Optional[str] = None

def search_with_config(
    client: ControlPlaneClient,
    query: str,
    config: SearchConfig
) -> dict:
    """Search using configuration object."""
    return client.graph.intelligent_search(
        keywords=query,
        max_turns=config.max_turns,
        temperature=config.temperature,
        enable_semantic_search=config.enable_semantic_search,
        enable_cypher_queries=config.enable_cypher_queries,
        integration=config.integration
    )

# Usage
client = ControlPlaneClient(api_key="your-api-key")
config = SearchConfig(max_turns=10, temperature=0.5)
result = search_with_config(client, "test query", config)

Testing

Write Unit Tests

import pytest
from kubiya import ControlPlaneClient
from unittest.mock import Mock

@pytest.fixture
def mock_client():
    """Create mock client for testing."""
    client = Mock(spec=ControlPlaneClient)
    client.graph.intelligent_search.return_value = {
        "answer": "Test answer",
        "nodes": [],
        "confidence": "high"
    }
    return client

def test_search_function(mock_client):
    """Test function using mocked client."""
    result = search_infrastructure(mock_client, "test query")

    assert result["answer"] == "Test answer"
    mock_client.graph.intelligent_search.assert_called_once()

Test Error Handling

import pytest
from kubiya.resources.exceptions import GraphError

def test_error_handling():
    """Test that errors are handled correctly."""
    # Your error handling test
    with pytest.raises(GraphError):
        # Operation that should fail
        pass

Use Test Fixtures

import pytest
from kubiya import ControlPlaneClient

@pytest.fixture(scope="session")
def client():
    """Provide test client."""
    return ControlPlaneClient(api_key="test-key")

@pytest.fixture
def test_dataset(client):
    """Create and cleanup test dataset."""
    dataset = client.datasets.create_dataset(
        name="test-dataset",
        scope="user"
    )

    yield dataset

    # Cleanup
    client.datasets.delete_dataset(dataset_id=dataset['id'])

Monitoring & Logging

Log Important Operations

import logging
from kubiya import ControlPlaneClient

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

client = ControlPlaneClient(api_key="your-api-key")

def import_infrastructure(region: str):
    """Import infrastructure with logging."""
    logger.info(f"Starting infrastructure import for region: {region}")

    try:
        nodes = fetch_resources(region)
        logger.info(f"Fetched {len(nodes)} resources")

        result = client.ingestion.ingest_nodes_batch(nodes=nodes)

        logger.info(
            f"Import completed: {result['summary']['success']} succeeded, "
            f"{result['summary']['failed']} failed"
        )

        return result

    except Exception as e:
        logger.error(f"Import failed: {e}", exc_info=True)
        raise

Track Metrics

from kubiya import ControlPlaneClient
from datetime import datetime
import time

class MetricsTracker:
    """Track SDK operation metrics."""

    def __init__(self):
        self.operations = []

    def track_operation(self, operation_name: str, duration: float, success: bool):
        """Record operation metrics."""
        self.operations.append({
            "name": operation_name,
            "duration": duration,
            "success": success,
            "timestamp": datetime.utcnow().isoformat()
        })

    def get_stats(self):
        """Get operation statistics."""
        if not self.operations:
            return {}

        return {
            "total_operations": len(self.operations),
            "success_rate": sum(1 for op in self.operations if op['success']) / len(self.operations),
            "average_duration": sum(op['duration'] for op in self.operations) / len(self.operations)
        }

# Usage
metrics = MetricsTracker()
client = ControlPlaneClient(api_key="your-api-key")

start = time.time()
try:
    result = client.graph.intelligent_search(keywords="query")
    metrics.track_operation("intelligent_search", time.time() - start, True)
except Exception:
    metrics.track_operation("intelligent_search", time.time() - start, False)

print(metrics.get_stats())

Documentation

Document Functions

from kubiya import ControlPlaneClient
from typing import List, Dict, Optional

def search_and_analyze(
    client: ControlPlaneClient,
    query: str,
    integration: Optional[str] = None
) -> Dict[str, any]:
    """
    Search and analyze infrastructure using intelligent search.

    Args:
        client: Control Plane client instance
        query: Natural language search query
        integration: Optional integration filter (e.g., "AWS", "GitHub")

    Returns:
        Dictionary containing:
        - answer: Natural language answer
        - nodes: List of matching nodes
        - analysis: Additional analysis results

    Raises:
        GraphError: If search operation fails
        AuthenticationError: If API key is invalid

    Example:
        >>> client = ControlPlaneClient(api_key="your-key")
        >>> result = search_and_analyze(client, "production databases")
        >>> print(result["answer"])
    """
    result = client.graph.intelligent_search(
        keywords=query,
        integration=integration
    )

    return {
        "answer": result["answer"],
        "nodes": result["nodes"],
        "analysis": analyze_nodes(result["nodes"])
    }

Add Code Comments for Complex Logic

from kubiya import ControlPlaneClient

def complex_data_migration(client: ControlPlaneClient):
    """Migrate data between datasets with validation."""

    # Step 1: Fetch all memories from source dataset
    # Using pagination to handle large datasets efficiently
    memories = []
    skip = 0
    batch_size = 100

    while True:
        batch = client.graph.list_memories(skip=skip, limit=batch_size)
        if not batch:
            break

        memories.extend(batch)
        skip += batch_size

    # Step 2: Transform memories for target dataset
    # Filter out test data and normalize format
    transformed = [
        transform_memory(m) for m in memories
        if not m.get('metadata', {}).get('is_test', False)
    ]

    # Step 3: Batch import to target dataset
    # Use async to avoid blocking on large imports
    for i in range(0, len(transformed), 100):
        batch = transformed[i:i+100]

        client.graph.store_memory_async(
            dataset_id="target-dataset",
            context=batch
        )

Next Steps

Error Handling

Handle SDK exceptions

Control Plane Client

Learn the Control Plane client services

Examples

Code examples and recipes

API Reference

Complete API documentation