Foundation Integration¶
Advanced integration patterns leveraging Foundation's infrastructure for production-ready plugin systems.
๐ค AI-Generated Content
This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
Overview¶
Foundation provides core infrastructure for configuration, cryptography, rate limiting, logging, and error handling. This guide demonstrates advanced integration patterns that extend pyvider-rpcplugin with Foundation's capabilities.
Configuration Inheritance¶
Extend Foundation's RuntimeConfig for plugin-specific configuration:
from dataclasses import dataclass, field
from provide.foundation.config import RuntimeConfig, ConfigError
from provide.foundation import logger
from pyvider.rpcplugin.config import rpcplugin_config
import os
@dataclass
class PluginServiceConfig(RuntimeConfig):
"""Plugin configuration extending Foundation's RuntimeConfig."""
# Service identification
service_name: str = field(default=os.environ.get("SERVICE_NAME", "unnamed"))
service_version: str = field(default=os.environ.get("SERVICE_VERSION", "1.0.0"))
# Performance tuning
max_concurrent_requests: int = field(default=int(os.environ.get("MAX_CONCURRENT_REQUESTS", "100")))
request_timeout: float = field(default=float(os.environ.get("REQUEST_TIMEOUT", "30.0")))
# Foundation integration
enable_structured_logging: bool = field(default=True)
enable_rate_limiting: bool = field(default=True)
rate_limit_rps: float = field(default=100.0)
def validate(self):
"""Validate configuration using Foundation patterns."""
super().validate()
if self.max_concurrent_requests < 1:
raise ConfigError("max_concurrent_requests must be positive")
if self.request_timeout <= 0:
raise ConfigError("request_timeout must be positive")
logger.info("Configuration validated", extra={
"service": self.service_name,
"version": self.service_version
})
# Usage
config = PluginServiceConfig()
config.validate()
Cryptography & Certificates¶
Dynamic Certificate Generation¶
Use Foundation's crypto module for certificate management:
from pathlib import Path
from provide.foundation.crypto import Certificate
from provide.foundation import logger
from pyvider.rpcplugin import plugin_server
class CertificateManager:
"""Manages certificates using Foundation's crypto utilities."""
def __init__(self, cert_dir: Path):
self.cert_dir = cert_dir
self.cert_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Certificate manager initialized: {cert_dir}")
async def ensure_certificates(self) -> Certificate:
"""Ensure valid certificates exist."""
cert_path = self.cert_dir / "server.crt"
key_path = self.cert_dir / "server.key"
# Check existing certificates
if cert_path.exists() and key_path.exists():
try:
cert_pem_content = cert_path.read_text()
key_pem_content = key_path.read_text()
cert = Certificate.from_pem(
cert_pem=cert_pem_content,
key_pem=key_pem_content
)
logger.info("Loaded existing certificates")
return cert
except Exception as e:
logger.warning(f"Error loading certificates: {e}, regenerating")
# Generate new self-signed certificate
logger.info("Generating new certificate")
cert = Certificate.create_self_signed_server_cert(
common_name="plugin.local",
organization_name="Pyvider RPC Plugin",
validity_days=365,
alt_names=["localhost", "127.0.0.1"]
)
# Save to disk
cert_path.write_text(cert.cert_pem)
key_path.write_text(cert.key_pem)
logger.info("Certificate generated and saved")
return cert
# Usage
async def create_secure_server():
cert_manager = CertificateManager(Path("/etc/plugin/certs"))
cert = await cert_manager.ensure_certificates()
transport = TCPSocketTransport(host="0.0.0.0", port=8443)
return plugin_server(
protocol=plugin_protocol(),
handler=SecureHandler(),
transport=transport
)
Certificate Rotation¶
Implement automatic certificate rotation:
import asyncio
from provide.foundation import logger
class CertificateRotator:
"""Handles automatic certificate rotation."""
def __init__(self, cert_manager, server):
self.cert_manager = cert_manager
self.server = server
self.rotation_task = None
async def start_rotation(self, check_interval: int = 3600):
"""Start automatic certificate rotation."""
self.rotation_task = asyncio.create_task(
self._rotation_loop(check_interval)
)
logger.info(f"Certificate rotation started (interval: {check_interval}s)")
async def _rotation_loop(self, interval: int):
"""Periodically check and rotate certificates."""
while True:
try:
await asyncio.sleep(interval)
logger.info("Rotating certificates on schedule")
await self._rotate_certificates()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Certificate rotation error: {e}", exc_info=True)
async def _rotate_certificates(self):
"""Perform certificate rotation."""
new_cert = await self.cert_manager.ensure_certificates()
await self.server.update_certificates(new_cert)
logger.info("Certificate rotated successfully")
Advanced Rate Limiting¶
Per-client rate limiting using Foundation:
from collections import defaultdict
from provide.foundation.utils.rate_limiting import TokenBucketRateLimiter
from provide.foundation import logger
import time
class PerClientRateLimiter:
"""Per-client rate limiting with Foundation's TokenBucketRateLimiter."""
def __init__(self, default_rps: float = 10.0, default_burst: float = 20.0):
self.default_rps = default_rps
self.default_burst = default_burst
self.limiters = {}
self.last_cleanup = time.time()
logger.info(f"Per-client rate limiter initialized (RPS: {default_rps})")
def get_limiter(self, client_id: str) -> TokenBucketRateLimiter:
"""Get or create rate limiter for client."""
if client_id not in self.limiters:
self.limiters[client_id] = TokenBucketRateLimiter(
capacity=self.default_burst,
refill_rate=self.default_rps
)
logger.debug(f"Created rate limiter for client: {client_id}")
# Periodic cleanup
if time.time() - self.last_cleanup > 300:
self._cleanup_old_limiters()
return self.limiters[client_id]
def _cleanup_old_limiters(self):
"""Remove limiters for inactive clients."""
current_time = time.time()
to_remove = []
for client_id, limiter in self.limiters.items():
if current_time - limiter.last_access > 600: # 10 minutes
to_remove.append(client_id)
for client_id in to_remove:
del self.limiters[client_id]
logger.debug(f"Removed inactive rate limiter: {client_id}")
self.last_cleanup = current_time
async def check_rate_limit(self, client_id: str, weight: float = 1.0) -> bool:
"""Check if request is allowed for client."""
limiter = self.get_limiter(client_id)
if await limiter.is_allowed():
logger.debug(f"Request allowed for client: {client_id}")
return True
else:
tokens = await limiter.get_current_tokens()
logger.warning(f"Rate limit exceeded for client: {client_id}", extra={
"available_tokens": tokens,
"required_tokens": weight
})
return False
# Integration with RPC handler
class RateLimitedHandler:
def __init__(self):
self.rate_limiter = PerClientRateLimiter(
default_rps=100.0,
default_burst=200.0
)
async def handle_request(self, request, context):
client_id = context.peer() or "unknown"
if not await self.rate_limiter.check_rate_limit(client_id):
raise Exception("Rate limit exceeded")
return await self.process_request(request)
Structured Logging & Observability¶
Leverage Foundation's structured logging for comprehensive observability:
from contextvars import ContextVar
from provide.foundation import logger
from pyvider.rpcplugin import plugin_server
import uuid
import time
# Context variables for request tracking
request_id_var = ContextVar('request_id', default=None)
client_id_var = ContextVar('client_id', default=None)
class ObservableHandler:
"""Handler with comprehensive observability using Foundation logging."""
def __init__(self):
self.request_counter = 0
self.error_counter = 0
self.latencies = []
async def handle_request(self, request, context):
"""Handle request with full observability."""
request_id = str(uuid.uuid4())
client_id = context.peer() or "unknown"
start_time = time.time()
# Set context variables
request_id_var.set(request_id)
client_id_var.set(client_id)
# Log request with context
logger.info("Request received", extra={
"request_id": request_id,
"client_id": client_id,
"method": context.method,
"request_size": len(str(request))
})
try:
result = await self.process_request(request)
duration = time.time() - start_time
self.request_counter += 1
self.latencies.append(duration)
logger.info("Request completed", extra={
"request_id": request_id,
"client_id": client_id,
"duration_ms": duration * 1000,
"response_size": len(str(result)),
"total_requests": self.request_counter
})
return result
except Exception as e:
duration = time.time() - start_time
self.error_counter += 1
logger.error("Request failed", extra={
"request_id": request_id,
"client_id": client_id,
"duration_ms": duration * 1000,
"error": str(e),
"error_rate": self.error_counter / max(1, self.request_counter)
}, exc_info=True)
raise
def get_metrics(self):
"""Get handler metrics."""
if not self.latencies:
return {"requests": 0}
return {
"total_requests": self.request_counter,
"total_errors": self.error_counter,
"error_rate": self.error_counter / max(1, self.request_counter),
"avg_latency_ms": sum(self.latencies) / len(self.latencies) * 1000,
"p95_latency_ms": sorted(self.latencies)[int(len(self.latencies) * 0.95)] * 1000
}
Error Handling & Recovery¶
Circuit Breaker Pattern¶
Implement circuit breaker using Foundation patterns:
from enum import Enum
from datetime import datetime, timedelta
from provide.foundation import logger
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker implementation using Foundation patterns."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
logger.info("Circuit breaker initialized", extra={
"threshold": failure_threshold,
"timeout": recovery_timeout
})
async def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
logger.info("Circuit breaker entering half-open state")
else:
logger.warning("Circuit breaker is open, rejecting call")
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if we should try to reset the circuit."""
if self.last_failure_time is None:
return False
return datetime.now() >= self.last_failure_time + timedelta(seconds=self.recovery_timeout)
def _on_success(self):
"""Handle successful call."""
if self.state == CircuitState.HALF_OPEN:
logger.info("Circuit breaker reset to closed")
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error("Circuit breaker opened", extra={
"failures": self.failure_count,
"threshold": self.failure_threshold
})
# Usage in plugin handler
class ResilientHandler:
def __init__(self):
self.circuit_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30
)
async def handle_request(self, request, context):
return await self.circuit_breaker.call(
self._process_with_retry,
request
)
async def _process_with_retry(self, request):
return await external_service.process(request)
Best Practices¶
Configuration¶
- Extend
RuntimeConfigfor type-safe configuration - Validate early in lifecycle
- Use environment variables with sensible defaults
Security¶
- Use Foundation's crypto module for certificate management
- Implement certificate rotation for long-running services
- Never hardcode secrets or credentials
Observability¶
- Use structured logging with Foundation's logger
- Include request context in all log messages
- Track metrics for error rates and latencies
Rate Limiting¶
- Implement per-client rate limiting
- Clean up inactive limiters periodically
- Log rate limit violations for monitoring
Error Handling¶
- Use circuit breakers for external dependencies
- Implement retry logic with exponential backoff
- Log errors with full context for debugging
Related Topics¶
- Configuration - Configuration patterns
- Observability - Metrics and tracing
- Lifecycle Management - Plugin lifecycle
- Middleware - Cross-cutting concerns