Skip to content

Shared Infrastructure

The microservices architecture includes shared infrastructure components that provide common functionality across all services. These components ensure consistency, reduce code duplication, and simplify maintenance.

Overview

All services share the following infrastructure components:

  • Configuration Management: Pydantic-based settings with environment variable support
  • Database Integration: SQLAlchemy with connection pooling and health checks
  • Redis Client Management: Centralized Redis client with clustering support
  • Structured Logging: Service-specific logging with context and formatting

Configuration Management

Settings Architecture

The shared configuration system extends the main API configuration with service-specific settings:

from services.shared.config import settings

# Access main API settings
database_url = settings.SQLALCHEMY_DATABASE_URI
redis_host = settings.REDIS_HOST

# Access service-specific settings
fetch_interval = settings.FETCH_INTERVAL
max_batch_size = settings.MAX_KEYS_PER_BATCH

Environment Variables

Services support both main API and service-specific environment variables:

# Main API variables
DATABASE_URL=postgresql://user:pass@localhost/tracker
REDIS_HOST=localhost
REDIS_PORT=6379

# Service-specific variables
TRACKER_FETCHER_FETCH_INTERVAL=3600
TRACKER_FETCHER_MAX_BATCH_SIZE=50
TRACKER_FETCHER_PROCESSING_TIMEOUT=300

Configuration Validation

All settings are validated using Pydantic:

class ServiceSettings(BaseSettings):
    # Service identification
    SERVICE_NAME: str = "microservice"
    SERVICE_VERSION: str = "1.0.0"

    # Processing configuration
    FETCH_INTERVAL: int = Field(default=3600, ge=60, le=86400)
    MAX_KEYS_PER_BATCH: int = Field(default=50, ge=1, le=1000)

    # Computed properties
    @property
    def service_instance_id(self) -> str:
        return f"{self.SERVICE_NAME}_{uuid.uuid4().hex[:8]}"

Database Integration

Connection Management

The shared database manager provides SQLAlchemy integration with connection pooling:

from services.shared.database import get_db_context, db_manager

# Context manager for database operations
with get_db_context() as db:
    trackers = db.query(Tracker).all()

# Health check
if db_manager.health_check():
    print("Database is healthy")

Connection Pooling

Database connections are pooled for efficiency:

# Connection pool configuration
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True

Error Handling

Comprehensive error handling with retry logic:

try:
    with get_db_context() as db:
        # Database operations
        pass
except OperationalError as e:
    logger.error("Database operation failed", error=str(e))
    # Automatic retry logic

Health Monitoring

Built-in health checks and connection monitoring:

# Check database health
healthy = db_manager.health_check()

# Get connection information
info = db_manager.get_connection_info()
# Returns: {"host": "localhost", "database": "tracker", "pool_size": 10}

Redis Client Management

Client Architecture

The Redis client manager supports both standalone and cluster configurations:

from services.shared.redis_client import get_service_redis_client

# Get service-specific Redis client
redis_client = get_service_redis_client("tracker_fetcher")

# All keys are automatically prefixed
redis_client.set("status", "active")  # Stored as "service:tracker_fetcher:status"

Service-Specific Clients

Each service gets its own Redis client with automatic key prefixing:

class ServiceRedisClient:
    def __init__(self, service_name: str):
        self.key_prefix = f"service:{service_name}"

    def set(self, key: str, value: str) -> bool:
        return self.client.set(self._prefixed_key(key), value)

Clustering Support

Automatic detection and support for Redis clusters:

# Cluster detection
is_cluster = (
    settings.REDIS_CLUSTER_MODE or
    "clustercfg" in settings.REDIS_HOST.lower() or
    "cluster" in settings.REDIS_HOST.lower()
)

if is_cluster:
    client = RedisCluster(**connection_params)
else:
    client = redis.Redis(**connection_params)

TLS Support

Full TLS configuration support:

if settings.REDIS_TLS_ENABLED:
    ssl_context = ssl.create_default_context()

    # Certificate validation
    if settings.REDIS_TLS_CERT_REQS == "none":
        ssl_context.verify_mode = ssl.CERT_NONE

    # Client certificates
    if settings.REDIS_TLS_CERTFILE:
        ssl_context.load_cert_chain(
            settings.REDIS_TLS_CERTFILE,
            settings.REDIS_TLS_KEYFILE
        )

Operations

Common Redis operations with service-specific key prefixing:

# Key-value operations
redis_client.set("config", "value")
redis_client.get("config")
redis_client.delete("config")

# Hash operations
redis_client.hset("metadata", {"field": "value"})
redis_client.hgetall("metadata")

# Set operations
redis_client.sadd("workers", "worker_1")
redis_client.smembers("workers")

# Sorted set operations
redis_client.zadd("queue", {"item_1": 1234567890})
redis_client.zrangebyscore("queue", 0, time.time())

# Pub/sub operations
redis_client.publish("notifications", "message")
pubsub = redis_client.subscribe("notifications")

Structured Logging

Logging Architecture

The shared logging system provides structured, contextual logging:

from services.shared.logging import create_service_logger, setup_logging

# Set up logging for the service
setup_logging("tracker_fetcher")

# Create component-specific logger
logger = create_service_logger("tracker_fetcher", "queue_manager")

Service Logger

The ServiceLogger class provides structured logging with context:

# Basic logging with context
logger.info("Processing tracker", tracker_id=123, batch_id="abc")

# Operation logging
logger.log_operation_start("batch processing", batch_size=10)
logger.log_operation_success("batch processing", duration=5.2, processed=10)
logger.log_operation_error("batch processing", error, tracker_id=123)

# Specialized logging
logger.log_database_operation("insert", table="location_reports", record_count=5)
logger.log_redis_operation("zadd", key="pending_queue")
logger.log_http_request("POST", "/api/reports", status_code=201, duration=0.5)

Log Formatting

Structured log output with service context:

{
  "timestamp": "2025-01-15T10:00:00Z",
  "level": "INFO",
  "service": "tracker_fetcher",
  "component": "queue_manager",
  "process": 1234,
  "message": "Processing tracker | tracker_id=123 batch_id=abc"
}

Log Levels

Configurable log levels per component:

# Service-specific logging
"tracker_fetcher": {
    "level": "INFO",
    "handlers": ["console", "error_console"]
},

# Reduce third-party noise
"sqlalchemy.engine": {
    "level": "WARNING"
},
"redis": {
    "level": "WARNING"
}

Health Checks

Database Health

def health_check() -> bool:
    try:
        with get_db_context() as db:
            db.execute(text("SELECT 1"))
        return True
    except Exception:
        return False

Redis Health

def health_check() -> bool:
    try:
        client = self.get_client()
        client.ping()
        return True
    except Exception:
        return False

Service Health

Services combine infrastructure health checks:

@app.get("/health")
async def health_check():
    return {
        "service_name": settings.SERVICE_NAME,
        "database_healthy": db_manager.health_check(),
        "redis_healthy": redis_manager.health_check(),
        "status": "healthy" if all_healthy else "unhealthy"
    }

Error Handling

Database Errors

try:
    with get_db_context() as db:
        # Database operations
        pass
except OperationalError as e:
    logger.error("Database operation failed", error=str(e))
    raise HTTPException(status_code=503, detail="Database unavailable")

Redis Errors

try:
    redis_client.set("key", "value")
except redis.ConnectionError as e:
    logger.error("Redis connection failed", error=str(e))
    # Fallback behavior or raise appropriate error

Service Errors

try:
    result = await process_operation()
except Exception as e:
    logger.log_operation_error("process_operation", e)
    raise HTTPException(status_code=500, detail=str(e)) from e

Performance Considerations

Connection Pooling

  • Database connections are pooled and reused
  • Redis connections are persistent per service
  • Health checks validate connections before use

Resource Management

  • Context managers ensure proper cleanup
  • Graceful shutdown handles resource cleanup
  • Connection timeouts prevent hanging operations

Monitoring

  • Structured logging provides performance metrics
  • Health checks monitor infrastructure status
  • Connection pool metrics track resource usage

Best Practices

Configuration

  • Use environment variables for deployment-specific settings
  • Validate all configuration with Pydantic
  • Provide sensible defaults for optional settings

Database

  • Always use context managers for database operations
  • Handle connection errors gracefully
  • Use connection pooling for efficiency

Redis

  • Use service-specific key prefixes
  • Handle connection failures gracefully
  • Use appropriate data structures for use cases

Logging

  • Include relevant context in all log messages
  • Use appropriate log levels
  • Structure logs for easy parsing and analysis