Documentation Index
Fetch the complete documentation index at: https://docs.kubiya.ai/llms.txt
Use this file to discover all available pages before exploring further.
Follow these best practices to build reliable, maintainable, and performant applications with the Kubiya SDK.
Overview
This guide covers best practices across:- Authentication & Security: Protect credentials and handle permissions
- Performance: Optimize API usage and reduce latency
- Error Handling: Build resilient applications
- Resource Management: Clean up resources properly
- Code Organization: Structure your code effectively
- Testing: Ensure reliability through testing
Authentication & Security
Use Environment Variables for Credentials
# ✅ GOOD - Environment variables
import os
from kubiya import ControlPlaneClient
api_key = os.getenv("KUBIYA_API_KEY")
client = ControlPlaneClient(api_key=api_key)
# ❌ BAD - Hardcoded credentials
client = ControlPlaneClient(api_key="hardcoded-key-here") # Don't do this!
Use .env Files for Local Development
# .env
KUBIYA_API_KEY=your-api-key-here
KUBIYA_BASE_URL=https://control-plane.kubiya.ai
from dotenv import load_dotenv
from kubiya import ControlPlaneClient
import os
load_dotenv()
client = ControlPlaneClient(
api_key=os.getenv("KUBIYA_API_KEY"),
base_url=os.getenv("KUBIYA_BASE_URL")
)
Never Log Sensitive Data
import logging
from kubiya import ControlPlaneClient
logger = logging.getLogger(__name__)
client = ControlPlaneClient(api_key="your-api-key")
# ❌ BAD - Logging sensitive data
secret = client.secrets.get_value(name="api-token")
logger.info(f"Got secret: {secret['value']}") # NEVER LOG SECRETS!
# ✅ GOOD - Log metadata only
secret = client.secrets.get_value(name="api-token")
logger.info(f"Retrieved secret: {secret['name']}")
Clear Sensitive Data from Memory
from kubiya import ControlPlaneClient
import gc
client = ControlPlaneClient(api_key="your-api-key")
# Get secret
secret = client.secrets.get_value(name="api-token")
token = secret['value']
# Use token
# ... perform operations ...
# Clear from memory
token = None
secret = None
gc.collect()
Performance Optimization
Use Batch Operations
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# ❌ BAD - Individual operations in loop
for node in nodes:
client.ingestion.ingest_node(
id=node['id'],
labels=node['labels'],
properties=node['properties']
)
# ✅ GOOD - Batch operation
client.ingestion.ingest_nodes_batch(nodes=nodes)
Implement Caching
from kubiya import ControlPlaneClient
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
class CachedClient:
"""Wrapper with caching for frequently accessed data."""
def __init__(self, client: ControlPlaneClient, ttl_seconds: int = 300):
self.client = client
self.ttl_seconds = ttl_seconds
self._cache: Dict[str, tuple[Any, datetime]] = {}
def list_models(self, use_cache: bool = True):
"""List models with caching."""
cache_key = "models_list"
if use_cache and cache_key in self._cache:
data, expiry = self._cache[cache_key]
if datetime.utcnow() < expiry:
return data
# Fetch from API
models = self.client.models.list()
# Cache result
expiry = datetime.utcnow() + timedelta(seconds=self.ttl_seconds)
self._cache[cache_key] = (models, expiry)
return models
# Usage
from kubiya import ControlPlaneClient
base_client = ControlPlaneClient(api_key="your-api-key")
cached_client = CachedClient(base_client, ttl_seconds=300)
# First call hits API
models1 = cached_client.list_models()
# Second call uses cache (within TTL)
models2 = cached_client.list_models()
Paginate Large Result Sets
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
def fetch_all_memories(client: ControlPlaneClient, batch_size: int = 100):
"""Fetch all memories with pagination."""
all_memories = []
skip = 0
while True:
batch = client.graph.list_memories(skip=skip, limit=batch_size)
if not batch:
break
all_memories.extend(batch)
skip += batch_size
# Process batch immediately instead of loading all into memory
# process_batch(batch)
return all_memories
Use Async Storage for Large Data
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# ✅ GOOD - Async for large content
job = client.graph.store_memory_async(
dataset_id="logs",
context=large_log_data, # Multiple MB of data
metadata={"source": "application"}
)
# Continue with other work while storage completes
# ...
# ❌ BAD - Blocking for large content
memory = client.graph.store_memory(
dataset_id="logs",
context=large_log_data # Blocks until complete
)
Error Handling
Catch Specific Exceptions
from kubiya import ControlPlaneClient
from kubiya.core.exceptions import (
AuthenticationError as KubiyaAuthenticationError,
TimeoutError as KubiyaTimeoutError,
APIError as KubiyaAPIError
)
from kubiya.resources.exceptions import GraphError
client = ControlPlaneClient(api_key="your-api-key")
try:
result = client.graph.intelligent_search(keywords="query")
except GraphError as e:
# Handle graph-specific errors
print(f"Graph error: {e}")
except KubiyaAuthenticationError as e:
# Handle auth errors
print(f"Authentication failed: {e}")
except KubiyaTimeoutError as e:
# Handle timeouts
print(f"Request timed out: {e}")
except KubiyaAPIError as e:
# Handle general API errors
print(f"API error: {e}")
Implement Retry Logic
from kubiya import ControlPlaneClient
from kubiya.core.exceptions import (
ConnectionError as KubiyaConnectionError,
TimeoutError as KubiyaTimeoutError
)
import time
def retry_operation(operation, max_attempts=3, backoff_factor=2):
"""Retry operation with exponential backoff."""
for attempt in range(max_attempts):
try:
return operation()
except (KubiyaConnectionError, KubiyaTimeoutError) as e:
if attempt == max_attempts - 1:
raise
delay = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
time.sleep(delay)
# Usage
client = ControlPlaneClient(api_key="your-api-key")
result = retry_operation(
lambda: client.graph.intelligent_search(keywords="query")
)
Clean Up Resources
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
session_id = None
try:
result = client.graph.intelligent_search(keywords="query")
session_id = result["session_id"]
# Use session...
finally:
# Always clean up
if session_id:
try:
client.graph.delete_search_session(session_id)
except Exception:
pass # Ignore cleanup errors
Resource Management
Use Context Managers
from contextlib import contextmanager
from kubiya import ControlPlaneClient
@contextmanager
def temp_dataset(client: ControlPlaneClient, name: str):
"""Context manager for temporary dataset."""
dataset = client.datasets.create_dataset(name=name, scope="user")
try:
yield dataset
finally:
# Automatic cleanup
try:
client.datasets.delete_dataset(dataset_id=dataset['id'])
except Exception:
pass
# Usage
client = ControlPlaneClient(api_key="your-api-key")
with temp_dataset(client, "temp-data") as dataset:
# Use dataset
client.graph.store_memory(
dataset_id=dataset['id'],
context="temporary data"
)
# Dataset automatically deleted
Clean Up Search Sessions
from kubiya import ControlPlaneClient
client = ControlPlaneClient(api_key="your-api-key")
# Track sessions for cleanup
active_sessions = []
try:
# Create searches
result1 = client.graph.intelligent_search(keywords="query1")
active_sessions.append(result1["session_id"])
result2 = client.graph.intelligent_search(keywords="query2")
active_sessions.append(result2["session_id"])
finally:
# Clean up all sessions
for session_id in active_sessions:
try:
client.graph.delete_search_session(session_id)
except Exception:
pass
Monitor Dataset Status
from kubiya import ControlPlaneClient
import time
client = ControlPlaneClient(api_key="your-api-key")
# Create dataset
dataset = client.datasets.create_dataset(name="large-dataset", scope="org")
# Store large amount of data
for data in large_data_batches:
client.graph.store_memory_async(
dataset_id=dataset['id'],
context=data
)
# Wait for processing to complete
while True:
status = client.datasets.get_dataset_status(dataset_id=dataset['id'])
if status['status'] == 'ready':
break
time.sleep(5)
# Now safe to query
results = client.graph.semantic_search(
query="test",
filters={"dataset_id": dataset['id']}
)
Code Organization
Use Type Hints
from kubiya import ControlPlaneClient
from typing import List, Dict, Optional
def search_infrastructure(
client: ControlPlaneClient,
query: str,
integration: Optional[str] = None
) -> Dict[str, any]:
"""Search infrastructure with type hints."""
return client.graph.intelligent_search(
keywords=query,
integration=integration
)
def batch_import_nodes(
client: ControlPlaneClient,
nodes: List[Dict[str, any]],
dataset_id: str
) -> Dict[str, any]:
"""Import nodes with type hints."""
return client.ingestion.ingest_nodes_batch(
nodes=nodes,
dataset_id=dataset_id
)
Create Reusable Functions
from kubiya import ControlPlaneClient
from typing import List, Dict
def create_infrastructure_dataset(
client: ControlPlaneClient,
environment: str
) -> Dict:
"""Create dataset for specific environment."""
return client.datasets.create_dataset(
name=f"{environment}-infrastructure",
description=f"Infrastructure data for {environment} environment",
scope="org"
)
def import_aws_resources(
client: ControlPlaneClient,
region: str,
dataset_id: str
) -> Dict:
"""Import AWS resources from region."""
# Fetch resources
nodes = fetch_aws_resources(region)
# Import to graph
return client.ingestion.ingest_nodes_batch(
nodes=nodes,
dataset_id=dataset_id,
duplicate_handling="update"
)
Use Configuration Classes
from dataclasses import dataclass
from kubiya import ControlPlaneClient
from typing import Optional
@dataclass
class SearchConfig:
"""Configuration for intelligent search."""
max_turns: int = 5
temperature: float = 0.7
enable_semantic_search: bool = True
enable_cypher_queries: bool = False
integration: Optional[str] = None
def search_with_config(
client: ControlPlaneClient,
query: str,
config: SearchConfig
) -> dict:
"""Search using configuration object."""
return client.graph.intelligent_search(
keywords=query,
max_turns=config.max_turns,
temperature=config.temperature,
enable_semantic_search=config.enable_semantic_search,
enable_cypher_queries=config.enable_cypher_queries,
integration=config.integration
)
# Usage
client = ControlPlaneClient(api_key="your-api-key")
config = SearchConfig(max_turns=10, temperature=0.5)
result = search_with_config(client, "test query", config)
Testing
Write Unit Tests
import pytest
from kubiya import ControlPlaneClient
from unittest.mock import Mock
@pytest.fixture
def mock_client():
"""Create mock client for testing."""
client = Mock(spec=ControlPlaneClient)
client.graph.intelligent_search.return_value = {
"answer": "Test answer",
"nodes": [],
"confidence": "high"
}
return client
def test_search_function(mock_client):
"""Test function using mocked client."""
result = search_infrastructure(mock_client, "test query")
assert result["answer"] == "Test answer"
mock_client.graph.intelligent_search.assert_called_once()
Test Error Handling
import pytest
from kubiya.resources.exceptions import GraphError
def test_error_handling():
"""Test that errors are handled correctly."""
# Your error handling test
with pytest.raises(GraphError):
# Operation that should fail
pass
Use Test Fixtures
import pytest
from kubiya import ControlPlaneClient
@pytest.fixture(scope="session")
def client():
"""Provide test client."""
return ControlPlaneClient(api_key="test-key")
@pytest.fixture
def test_dataset(client):
"""Create and cleanup test dataset."""
dataset = client.datasets.create_dataset(
name="test-dataset",
scope="user"
)
yield dataset
# Cleanup
client.datasets.delete_dataset(dataset_id=dataset['id'])
Monitoring & Logging
Log Important Operations
import logging
from kubiya import ControlPlaneClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = ControlPlaneClient(api_key="your-api-key")
def import_infrastructure(region: str):
"""Import infrastructure with logging."""
logger.info(f"Starting infrastructure import for region: {region}")
try:
nodes = fetch_resources(region)
logger.info(f"Fetched {len(nodes)} resources")
result = client.ingestion.ingest_nodes_batch(nodes=nodes)
logger.info(
f"Import completed: {result['summary']['success']} succeeded, "
f"{result['summary']['failed']} failed"
)
return result
except Exception as e:
logger.error(f"Import failed: {e}", exc_info=True)
raise
Track Metrics
from kubiya import ControlPlaneClient
from datetime import datetime
import time
class MetricsTracker:
"""Track SDK operation metrics."""
def __init__(self):
self.operations = []
def track_operation(self, operation_name: str, duration: float, success: bool):
"""Record operation metrics."""
self.operations.append({
"name": operation_name,
"duration": duration,
"success": success,
"timestamp": datetime.utcnow().isoformat()
})
def get_stats(self):
"""Get operation statistics."""
if not self.operations:
return {}
return {
"total_operations": len(self.operations),
"success_rate": sum(1 for op in self.operations if op['success']) / len(self.operations),
"average_duration": sum(op['duration'] for op in self.operations) / len(self.operations)
}
# Usage
metrics = MetricsTracker()
client = ControlPlaneClient(api_key="your-api-key")
start = time.time()
try:
result = client.graph.intelligent_search(keywords="query")
metrics.track_operation("intelligent_search", time.time() - start, True)
except Exception:
metrics.track_operation("intelligent_search", time.time() - start, False)
print(metrics.get_stats())
Documentation
Document Functions
from kubiya import ControlPlaneClient
from typing import List, Dict, Optional
def search_and_analyze(
client: ControlPlaneClient,
query: str,
integration: Optional[str] = None
) -> Dict[str, any]:
"""
Search and analyze infrastructure using intelligent search.
Args:
client: Control Plane client instance
query: Natural language search query
integration: Optional integration filter (e.g., "AWS", "GitHub")
Returns:
Dictionary containing:
- answer: Natural language answer
- nodes: List of matching nodes
- analysis: Additional analysis results
Raises:
GraphError: If search operation fails
AuthenticationError: If API key is invalid
Example:
>>> client = ControlPlaneClient(api_key="your-key")
>>> result = search_and_analyze(client, "production databases")
>>> print(result["answer"])
"""
result = client.graph.intelligent_search(
keywords=query,
integration=integration
)
return {
"answer": result["answer"],
"nodes": result["nodes"],
"analysis": analyze_nodes(result["nodes"])
}
Add Code Comments for Complex Logic
from kubiya import ControlPlaneClient
def complex_data_migration(client: ControlPlaneClient):
"""Migrate data between datasets with validation."""
# Step 1: Fetch all memories from source dataset
# Using pagination to handle large datasets efficiently
memories = []
skip = 0
batch_size = 100
while True:
batch = client.graph.list_memories(skip=skip, limit=batch_size)
if not batch:
break
memories.extend(batch)
skip += batch_size
# Step 2: Transform memories for target dataset
# Filter out test data and normalize format
transformed = [
transform_memory(m) for m in memories
if not m.get('metadata', {}).get('is_test', False)
]
# Step 3: Batch import to target dataset
# Use async to avoid blocking on large imports
for i in range(0, len(transformed), 100):
batch = transformed[i:i+100]
client.graph.store_memory_async(
dataset_id="target-dataset",
context=batch
)
Next Steps
Error Handling
Handle SDK exceptions
Control Plane Client
Learn the Control Plane client services
Examples
Code examples and recipes
API Reference
Complete API documentation