Skip to content

Read-Write Database Splitting Implementation Guide

Executive Summary

This document outlines the implementation of read-write database splitting for the Tracker API system. By leveraging our existing HAProxy infrastructure, we can distribute read traffic across PostgreSQL replicas while maintaining write traffic on the primary node.

Key Benefits:

  • Reduced load on primary database server
  • Improved read performance through load distribution
  • Better resource utilization of replica nodes
  • Scalable architecture as read traffic grows

Effort Estimate: 3-5 days

Risk Level: Low (infrastructure handles routing, application changes are straightforward)

Current Architecture

Our current database infrastructure is well-suited for read-write splitting:

Application Layer
    ↓
HAProxy Load Balancer
├─ Port 5432 (Write) → Patroni Primary → PgBouncer (6432) → PostgreSQL (5432)
└─ Port 5433 (Read)  → Patroni Replicas → PgBouncer (6432) → PostgreSQL (5432)
                       (Load balanced across multiple replicas)

Why This Setup is Ideal

  1. HAProxy handles routing logic - Knows which node is primary vs replica
  2. PgBouncer provides connection pooling - Each node has local PgBouncer
  3. Patroni manages replication - Automatic failover and health checking
  4. Transaction pooling mode - Perfect for read-write splitting (no prepared statement issues)

Current Configuration

Each Patroni node has a PgBouncer instance:

[databases]
* = host=production-pg-1.timescale.glimpse.internal port=5432

[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432

pool_mode = transaction          # ✓ Perfect for read-write splitting
max_client_conn = 2000
default_pool_size = 20
max_db_connections = 80

Implementation Phases

Phase 1: Infrastructure Configuration (2-3 hours)

1.1 Environment Variables

Add read port configuration to all environment files:

.env, .env.local, .env.test:

# Existing
POSTGRES_SERVER=haproxy.production.internal
POSTGRES_PORT=5432
POSTGRES_USER=tracker
POSTGRES_PASSWORD=...
POSTGRES_DB=tracker

# New - Add this
POSTGRES_READ_PORT=5433

Docker Compose (compose.yml):

services:
  api:
    environment:
      - POSTGRES_SERVER=haproxy.production.internal
      - POSTGRES_PORT=5432
      - POSTGRES_READ_PORT=5433 # Add this

1.2 Configuration Classes

Update configuration to read the new environment variable:

app/core/config.py:

class Settings(BaseSettings):
    # ... existing settings ...

    POSTGRES_SERVER: str = os.getenv("POSTGRES_SERVER", "localhost")
    POSTGRES_USER: str = os.getenv("POSTGRES_USER", "postgres")
    POSTGRES_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "postgres")
    POSTGRES_DB: str = os.getenv("POSTGRES_DB", "tracker")
    POSTGRES_PORT: str = os.getenv("POSTGRES_PORT", "5432")
    POSTGRES_READ_PORT: str = os.getenv("POSTGRES_READ_PORT", "5433")  # Add this

services/shared/config.py:

class ServiceSettings(BaseSettings):
    # ... existing settings ...

    POSTGRES_SERVER: str = "localhost"
    POSTGRES_USER: str = "postgres"
    POSTGRES_PASSWORD: str = "postgres"
    POSTGRES_DB: str = "tracker"
    POSTGRES_PORT: str = "5432"
    POSTGRES_READ_PORT: str = "5433"  # Add this

Phase 2: Database Module Updates (1 day)

2.1 Sync Engine Configuration

File: app/core/database.py

Create separate engines for read and write operations:

# After existing imports and before current engine creation

# Determine ports
POSTGRES_WRITE_PORT = settings.POSTGRES_PORT  # 5432
POSTGRES_READ_PORT = settings.POSTGRES_READ_PORT  # 5433

# Build connection URLs
WRITE_DB_URL = (
    f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}"
    f"@{settings.POSTGRES_SERVER}:{POSTGRES_WRITE_PORT}/{settings.POSTGRES_DB}"
)

READ_DB_URL = (
    f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}"
    f"@{settings.POSTGRES_SERVER}:{POSTGRES_READ_PORT}/{settings.POSTGRES_DB}"
)

# Write engine (primary) - existing pool configuration
write_engine = create_engine(
    WRITE_DB_URL,
    future=True,
    pool_size=POOL_SIZE,
    max_overflow=MAX_OVERFLOW,
    pool_timeout=30,
    pool_recycle=POOL_RECYCLE,
    pool_pre_ping=True,
    connect_args=connect_args,
)

# Read engine (replicas) - larger pool for read-heavy workloads
read_engine = create_engine(
    READ_DB_URL,
    future=True,
    pool_size=POOL_SIZE * 2,  # More connections for reads
    max_overflow=MAX_OVERFLOW * 2,
    pool_timeout=30,
    pool_recycle=POOL_RECYCLE,
    pool_pre_ping=True,
    connect_args=connect_args,
)

# Keep existing engine as write_engine for backward compatibility
engine = write_engine

logger.info(f"Write engine configured for port {POSTGRES_WRITE_PORT}")
logger.info(f"Read engine configured for port {POSTGRES_READ_PORT} "
            f"(pool_size={POOL_SIZE * 2})")

Update session factories:

# Create separate session factories
WriteSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=write_engine)
ReadSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=read_engine)

# Keep existing for backward compatibility
SessionLocal = WriteSessionLocal

Add new dependency functions:

def get_write_db() -> Generator[Session, None, None]:
    """
    Get database session for write operations.

    Use this for:
    - POST, PUT, PATCH, DELETE operations
    - Any operation that modifies data
    - Operations requiring read-after-write consistency
    - Transactional operations

    Yields:
        Session: A SQLAlchemy database session connected to primary
    """
    session = _create_session_with_retry(WriteSessionLocal)
    try:
        yield session
    except Exception as e:
        logger.error(f"Error during write operation: {e}")
        try:
            session.rollback()
        except Exception as rollback_error:
            logger.error(f"Error during rollback: {rollback_error}")
        raise
    finally:
        _close_session_safely(session)


def get_read_db() -> Generator[Session, None, None]:
    """
    Get database session for read operations.

    Use this for:
    - GET operations that don't need strong consistency
    - List/search endpoints
    - Analytics and reporting
    - Health checks

    Note: May return slightly stale data due to replication lag.
    For read-after-write scenarios, use get_write_db() instead.

    Yields:
        Session: A SQLAlchemy database session connected to replica
    """
    session = _create_session_with_retry(ReadSessionLocal)
    try:
        yield session
    except Exception as e:
        logger.error(f"Error during read operation: {e}")
        raise
    finally:
        _close_session_safely(session)


# Update existing get_db for backward compatibility (uses write)
def get_db() -> Generator[Session, None, None]:
    """
    Get database session (backward compatibility - uses write engine).

    For new code, prefer using get_write_db() or get_read_db() explicitly.
    """
    return get_write_db()

Update retry logic to accept session factory:

def _create_session_with_retry(
    session_factory: sessionmaker = WriteSessionLocal
) -> Session:
    """Create a database session with retry logic.

    Args:
        session_factory: The sessionmaker to use (Write or Read)

    Returns:
        A valid database session
    """
    retry_count = 0
    retry_delay = INITIAL_RETRY_DELAY
    session: Optional[Session] = None

    while retry_count < MAX_CONNECTION_RETRIES:
        try:
            session = session_factory()
            if session is None:
                raise RuntimeError("Failed to create database session")
            session.execute(text("SELECT 1"))
            return session
        except OperationalError as e:
            if _is_timeout_error(e):
                retry_count += 1
                logger.warning(
                    f"Database connection timeout, retrying "
                    f"({retry_count}/{MAX_CONNECTION_RETRIES})..."
                )
                _close_session_safely(session)
                if retry_count < MAX_CONNECTION_RETRIES:
                    time.sleep(retry_delay)
                    retry_delay *= RETRY_BACKOFF_MULTIPLIER
            else:
                logger.error(f"Database error: {e}")
                _close_session_safely(session)
                raise
        except Exception as e:
            logger.error(f"Unexpected database error: {e}")
            _close_session_safely(session)
            raise

    logger.error("Maximum database connection retries exceeded")
    raise OperationalError(
        "Could not establish database connection after multiple retries",
        {},
        None,
    )

2.2 Async Engine Configuration

File: app/core/database.py (continued)

# Build async connection URLs
ASYNC_WRITE_DB_URL = WRITE_DB_URL.replace(
    "postgresql://", "postgresql+asyncpg://"
)
ASYNC_READ_DB_URL = READ_DB_URL.replace(
    "postgresql://", "postgresql+asyncpg://"
)

# Async Write Engine
async_write_engine = create_async_engine(
    ASYNC_WRITE_DB_URL,
    future=True,
    pool_size=ASYNC_POOL_SIZE,
    max_overflow=ASYNC_MAX_OVERFLOW,
    pool_timeout=20,
    pool_recycle=ASYNC_POOL_RECYCLE,
    pool_pre_ping=True,
    pool_reset_on_return="commit",
    connect_args=async_connect_args,
)

# Async Read Engine
async_read_engine = create_async_engine(
    ASYNC_READ_DB_URL,
    future=True,
    pool_size=ASYNC_POOL_SIZE * 2,  # More connections for reads
    max_overflow=ASYNC_MAX_OVERFLOW * 2,
    pool_timeout=20,
    pool_recycle=ASYNC_POOL_RECYCLE,
    pool_pre_ping=True,
    pool_reset_on_return="commit",
    connect_args=async_connect_args,
)

# Keep existing as write for backward compatibility
async_engine = async_write_engine

logger.info("Async write and read engines configured")

# Async Session Factories
AsyncWriteSessionLocal = async_sessionmaker(
    async_write_engine, class_=AsyncSession, expire_on_commit=False
)
AsyncReadSessionLocal = async_sessionmaker(
    async_read_engine, class_=AsyncSession, expire_on_commit=False
)
AsyncSessionLocal = AsyncWriteSessionLocal  # Backward compatibility


async def get_async_write_db() -> AsyncSession:
    """Get async database session for write operations."""
    async with AsyncWriteSessionLocal() as session:
        yield session


async def get_async_read_db() -> AsyncSession:
    """Get async database session for read operations."""
    async with AsyncReadSessionLocal() as session:
        yield session


# Backward compatibility
async def get_async_db() -> AsyncSession:
    """Get async database session (uses write engine for compatibility)."""
    return get_async_write_db()

2.3 Service Database Module Updates

File: services/shared/database.py

Apply the same pattern:

# After imports and configuration
POSTGRES_WRITE_PORT = settings.POSTGRES_PORT
POSTGRES_READ_PORT = settings.POSTGRES_READ_PORT

WRITE_DB_URL = (
    f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}"
    f"@{settings.POSTGRES_SERVER}:{POSTGRES_WRITE_PORT}/{settings.POSTGRES_DB}"
)

READ_DB_URL = (
    f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}"
    f"@{settings.POSTGRES_SERVER}:{POSTGRES_READ_PORT}/{settings.POSTGRES_DB}"
)

# Create write and read engines
write_engine = create_engine(
    WRITE_DB_URL,
    future=True,
    pool_size=POOL_SIZE,
    max_overflow=MAX_OVERFLOW,
    pool_timeout=30,
    pool_recycle=POOL_RECYCLE,
    pool_pre_ping=True,
    connect_args=connect_args,
)

read_engine = create_engine(
    READ_DB_URL,
    future=True,
    pool_size=POOL_SIZE * 2,
    max_overflow=MAX_OVERFLOW * 2,
    pool_timeout=30,
    pool_recycle=POOL_RECYCLE,
    pool_pre_ping=True,
    connect_args=connect_args,
)

engine = write_engine  # Backward compatibility

# Session factories
write_session_local = sessionmaker(autocommit=False, autoflush=False, bind=write_engine)
read_session_local = sessionmaker(autocommit=False, autoflush=False, bind=read_engine)
session_local = write_session_local  # Backward compatibility


def get_write_db_session() -> Generator[Session, None, None]:
    """Get database session for write operations."""
    # Implementation similar to app/core/database.py
    ...


def get_read_db_session() -> Generator[Session, None, None]:
    """Get database session for read operations."""
    # Implementation similar to app/core/database.py
    ...


# Backward compatibility
def get_db_session() -> Generator[Session, None, None]:
    """Get database session (uses write engine)."""
    return get_write_db_session()


@contextmanager
def get_write_db_context():
    """Context manager for write database sessions."""
    db_gen = get_write_db_session()
    db = next(db_gen)
    try:
        yield db
    except Exception:
        db.rollback()
        raise
    finally:
        try:
            next(db_gen)
        except StopIteration:
            pass


@contextmanager
def get_read_db_context():
    """Context manager for read database sessions."""
    db_gen = get_read_db_session()
    db = next(db_gen)
    try:
        yield db
    finally:
        try:
            next(db_gen)
        except StopIteration:
            pass

Phase 3: Route Updates (2-3 days)

3.1 Decision Flowchart

Use this flowchart to decide which dependency to use:

Is this operation writing data?
├─ YES → Use get_write_db()
└─ NO → ↓

Does this operation need to read data written in the same request?
├─ YES → Use get_write_db() (read-after-write consistency)
└─ NO → ↓

Is this a critical operation requiring absolute latest data?
├─ YES → Use get_write_db()
└─ NO → Use get_read_db() ✓

3.2 Route Patterns

Simple List/Search Endpoints (use read_db):

@router.get("/trackers/", response_model=List[TrackerResponse])
def list_trackers(
    skip: int = 0,
    limit: int = 100,
    db: Session = Depends(get_read_db),  # ← Changed
) -> List[Tracker]:
    """List trackers - uses read replica."""
    return db.query(Tracker).offset(skip).limit(limit).all()


@router.get("/brands/", response_model=List[BrandResponse])
def list_brands(
    db: Session = Depends(get_read_db),  # ← Changed
) -> List[Brand]:
    """List brands - uses read replica."""
    return db.query(Brand).all()

Create/Update/Delete Operations (use write_db):

@router.post("/trackers/", response_model=TrackerResponse)
def create_tracker(
    tracker: TrackerCreate,
    db: Session = Depends(get_write_db),  # ← Keep/change to write
) -> Tracker:
    """Create tracker - uses primary."""
    db_tracker = Tracker(**tracker.dict())
    db.add(db_tracker)
    db.commit()
    db.refresh(db_tracker)
    return db_tracker


@router.put("/trackers/{tracker_id}", response_model=TrackerResponse)
def update_tracker(
    tracker_id: int,
    tracker: TrackerUpdate,
    db: Session = Depends(get_write_db),  # ← Keep/change to write
) -> Tracker:
    """Update tracker - uses primary."""
    db_tracker = db.query(Tracker).filter(Tracker.id == tracker_id).first()
    if not db_tracker:
        raise HTTPException(status_code=404, detail="Tracker not found")

    for key, value in tracker.dict(exclude_unset=True).items():
        setattr(db_tracker, key, value)

    db.commit()
    db.refresh(db_tracker)
    return db_tracker

Read-After-Write Scenarios (use write_db):

@router.post("/trackers/{tracker_id}/locations", response_model=TrackerWithLocations)
def add_location_and_return_tracker(
    tracker_id: int,
    location: LocationCreate,
    db: Session = Depends(get_write_db),  # ← Must use write!
) -> Dict:
    """
    Add location then return tracker with locations.
    Uses write DB to ensure we see the newly created location.
    """
    # Write operation
    db_location = Location(tracker_id=tracker_id, **location.dict())
    db.add(db_location)
    db.commit()

    # Read operation - must see the write we just made
    tracker = db.query(Tracker).filter(Tracker.id == tracker_id).first()
    locations = db.query(Location).filter(
        Location.tracker_id == tracker_id
    ).order_by(Location.timestamp.desc()).limit(10).all()

    return {
        "tracker": tracker,
        "recent_locations": locations
    }

Health Check Endpoints (use read_db):

@router.get("/health/database")
def database_health_check(
    db: Session = Depends(get_read_db),  # ← Use read
) -> Dict:
    """Database health check - uses read replica to avoid load on primary."""
    try:
        db.execute(text("SELECT 1"))
        return {"status": "healthy", "database": "connected"}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

3.3 Phased Route Updates

Week 1 - Low Risk Reads:

Start with these endpoints (low risk, high value):

  • GET /api/v1/trackers/ - List trackers
  • GET /api/v1/brands/ - List brands
  • GET /api/v1/models/ - List models
  • GET /api/v1/delivery-locations/ - List delivery locations
  • GET /api/v1/health/* - Health checks
  • GET /api/v1/metrics/* - Metrics endpoints

Week 2 - Expand Coverage:

Add more GET endpoints:

  • GET /api/v1/trackers/{id}/locations - Location history (if no recent write)
  • GET /api/v1/search/* - Search endpoints
  • GET /api/v1/reports/* - Reporting endpoints

Keep on Write DB (for now):

  • Any endpoint with POST/PUT/PATCH/DELETE
  • GET /api/v1/trackers/{id} - Single tracker (might need recent data)
  • Any endpoint that does writes then reads

Phase 4: Service Updates (1-2 days)

4.1 Read-Heavy Services

Location Aggregator Service:

# services/location_aggregator/aggregator.py

from services.shared.database import get_read_db_context

class LocationAggregator:
    def fetch_location_reports(self, batch_size: int = 1000):
        """Fetch location reports for aggregation - use read DB."""
        with get_read_db_context() as db:  # ← Use read
            return db.query(LocationReport).filter(
                LocationReport.processed == False
            ).limit(batch_size).all()

Tracker Status Service:

# services/tracker_status_service/status_checker.py

from services.shared.database import get_read_db_context

class StatusChecker:
    def check_tracker_activity(self):
        """Check tracker activity - use read DB."""
        with get_read_db_context() as db:  # ← Use read
            return db.query(Tracker).filter(
                Tracker.last_seen < datetime.utcnow() - timedelta(days=7)
            ).all()

4.2 Write-Heavy Services

Tracker Fetcher Service:

# services/tracker_fetcher/fetcher.py

from services.shared.database import get_write_db_context

class TrackerFetcher:
    def save_location_reports(self, reports: List[Dict]):
        """Save location reports - use write DB."""
        with get_write_db_context() as db:  # ← Use write
            for report in reports:
                location = LocationReport(**report)
                db.add(location)
            db.commit()

Unified Geofence Service:

# services/unified_geofence_service/processor.py

from services.shared.database import get_write_db_context, get_read_db_context

class GeofenceProcessor:
    def fetch_unprocessed_locations(self):
        """Fetch locations to process - use read DB."""
        with get_read_db_context() as db:  # ← Use read
            return db.query(LocationReport).filter(
                LocationReport.geofence_processed == False
            ).all()

    def save_geofence_events(self, events: List[GeofenceEvent]):
        """Save geofence events - use write DB."""
        with get_write_db_context() as db:  # ← Use write
            for event in events:
                db.add(event)
            db.commit()

Phase 5: Testing Strategy (1 day)

5.1 Quick Verification Test

Create a test endpoint to verify routing:

File: app/api/routes/test_read_split.py

from fastapi import APIRouter, Depends
from sqlalchemy import text
from sqlalchemy.orm import Session

from app.core.database import get_read_db, get_write_db

router = APIRouter()


@router.get("/test/read-write-split")
def test_read_write_split(
    read_db: Session = Depends(get_read_db),
    write_db: Session = Depends(get_write_db),
) -> Dict:
    """
    Test read-write splitting by checking which nodes we connect to.

    Returns:
        Dict showing read and write connection details
    """
    # Check read connection
    read_result = read_db.execute(text(
        "SELECT inet_server_addr() as host, "
        "inet_server_port() as port, "
        "pg_is_in_recovery() as is_replica"
    )).fetchone()

    # Check write connection
    write_result = write_db.execute(text(
        "SELECT inet_server_addr() as host, "
        "inet_server_port() as port, "
        "pg_is_in_recovery() as is_replica"
    )).fetchone()

    return {
        "read_connection": {
            "host": str(read_result.host),
            "port": read_result.port,
            "is_replica": read_result.is_replica,
            "expected": "replica (is_replica=true)"
        },
        "write_connection": {
            "host": str(write_result.host),
            "port": write_result.port,
            "is_replica": write_result.is_replica,
            "expected": "primary (is_replica=false)"
        },
        "status": "✓ Working correctly" if (
            read_result.is_replica == True and
            write_result.is_replica == False
        ) else "✗ Issue detected"
    }

Expected Output:

{
  "read_connection": {
    "host": "10.0.2.15",
    "port": 5432,
    "is_replica": true,
    "expected": "replica (is_replica=true)"
  },
  "write_connection": {
    "host": "10.0.1.10",
    "port": 5432,
    "is_replica": false,
    "expected": "primary (is_replica=false)"
  },
  "status": "✓ Working correctly"
}

5.2 Load Testing

Test Read Distribution:

# Send 1000 read requests
for i in {1..1000}; do
  curl -s http://localhost:8100/api/v1/trackers/ > /dev/null &
done
wait

# Check HAProxy stats to see distribution across replicas
curl http://haproxy:1936/stats

Monitor Connection Pools:

# Add endpoint to monitor pool status
@router.get("/debug/pool-stats")
def pool_stats() -> Dict:
    """Get connection pool statistics."""
    from app.core.database import write_engine, read_engine

    return {
        "write_pool": {
            "size": write_engine.pool.size(),
            "checked_in": write_engine.pool.checkedin(),
            "checked_out": write_engine.pool.checkedout(),
            "overflow": write_engine.pool.overflow(),
        },
        "read_pool": {
            "size": read_engine.pool.size(),
            "checked_in": read_engine.pool.checkedin(),
            "checked_out": read_engine.pool.checkedout(),
            "overflow": read_engine.pool.overflow(),
        }
    }

5.3 Replication Lag Testing

Monitor Replication Lag:

-- On primary server
SELECT
    client_addr,
    state,
    sent_lsn,
    write_lsn,
    flush_lsn,
    replay_lsn,
    sync_state,
    pg_wal_lsn_diff(sent_lsn, replay_lsn) as lag_bytes
FROM pg_stat_replication;

Test Lag Scenarios:

  1. Normal load - lag should be < 1MB
  2. Heavy write load - monitor if lag increases
  3. If lag > 10MB consistently, consider:
  4. Reducing read traffic to replicas
  5. Adding more replicas
  6. Optimizing write queries

Rollout Plan

Week 1: Infrastructure & Low-Risk Reads

Day 1: Infrastructure Setup

  • Add POSTGRES_READ_PORT=5433 to all environment files
  • Update app/core/config.py and services/shared/config.py
  • Deploy configuration changes
  • Verify HAProxy routing (check stats page)

Day 2: Database Modules

  • Update app/core/database.py with write/read engines
  • Update services/shared/database.py with write/read engines
  • Deploy changes
  • Test with verification endpoint
  • Monitor logs for any connection issues

Day 3-4: Low-Risk Route Updates

  • Update list endpoints (trackers, brands, models, etc.)
  • Update health check endpoints
  • Deploy changes
  • Monitor HAProxy stats for read distribution
  • Check for any errors in logs

Day 5: Verify & Monitor

  • Run load tests
  • Check replication lag
  • Verify pool statistics
  • Monitor performance metrics
  • Document any issues

Week 2: Expand Coverage

Day 6-8: More Read Endpoints

  • Update search endpoints
  • Update reporting endpoints
  • Update analytics endpoints
  • Deploy in batches
  • Monitor after each batch

Day 9: Service Updates

  • Update read-heavy services
  • Deploy service changes
  • Monitor service health
  • Check connection distribution

Day 10: Testing & Validation

  • Comprehensive load testing
  • Verify all metrics
  • Check for any consistency issues
  • Document findings

Week 3: Optimize & Document

Day 11-12: Optimization

  • Tune connection pool sizes based on metrics
  • Adjust timeout values if needed
  • Optimize slow queries identified during monitoring
  • Update configurations based on findings

Day 13-15: Documentation & Training

  • Update developer documentation
  • Create decision flowchart for team
  • Document patterns and best practices
  • Train team on when to use read vs write
  • Create troubleshooting guide

Troubleshooting Guide

Issue: Replication Lag

Symptoms:

  • Users report seeing stale data
  • Lag monitoring shows high lag (> 10MB)
  • Inconsistent query results

Solutions:

  1. Check replication status:
SELECT * FROM pg_stat_replication;
  1. If lag is consistently high:
  2. Reduce read traffic temporarily
  3. Investigate slow replaying on replicas
  4. Consider scaling up replica resources

  5. For critical operations:

  6. Use get_write_db() instead of get_read_db()
  7. Add application-level caching

Issue: Read-After-Write Inconsistency

Symptoms:

  • User creates resource, immediately requests it, gets 404
  • Recent changes not visible in list views
  • Inconsistent data in UI

Solutions:

  1. Use write DB for read-after-write:
@router.post("/trackers/")
def create_tracker(
    tracker: TrackerCreate,
    db: Session = Depends(get_write_db),  # ← Changed to write
) -> Dict:
    """Create tracker and return with related data."""
    # Write operation
    db_tracker = Tracker(**tracker.dict())
    db.add(db_tracker)
    db.commit()
    db.refresh(db_tracker)

    # Read operation - use same connection to see the write
    related_data = db.query(RelatedModel).filter(...).all()

    return {
        "tracker": db_tracker,
        "related": related_data
    }
  1. Add delay for non-critical views:
# If replication lag is < 1 second, a small delay may help
import time
time.sleep(0.5)  # Wait for replication
  1. Use cache for frequently accessed data:
  2. Cache list views with short TTL
  3. Invalidate cache on writes
  4. Reduce dependency on read replicas for hot data

Issue: Connection Pool Exhaustion

Symptoms:

  • "Connection pool timeout" errors
  • High number of waiting connections
  • Pool statistics show all connections checked out

Solutions:

  1. Increase pool sizes:
# In database.py
POOL_SIZE = 30  # Increased from 20
MAX_OVERFLOW = 50  # Increased from 30
  1. Check for connection leaks:
# Ensure all database sessions are properly closed
# Use context managers or try/finally blocks
  1. Monitor slow queries:
    -- Check for long-running queries
    SELECT pid, usename, application_name, state, query_start, query
    FROM pg_stat_activity
    WHERE state != 'idle'
    AND query_start < now() - interval '5 minutes';
    

Issue: HAProxy Routing Problems

Symptoms:

  • Writes going to replicas (causing errors)
  • Reads not distributed across replicas
  • Connection errors to specific nodes

Solutions:

  1. Verify HAProxy configuration:
# Check HAProxy stats
curl http://haproxy:1936/stats

# Look for:
# - Port 5432 routing only to primary
# - Port 5433 routing to all replicas
# - Health check status of all nodes
  1. Test port routing directly:
# Test write port
psql -h haproxy.internal -p 5432 -U tracker -d tracker -c "SELECT pg_is_in_recovery();"
# Should return: f (false - primary)

# Test read port
psql -h haproxy.internal -p 5433 -U tracker -d tracker -c "SELECT pg_is_in_recovery();"
# Should return: t (true - replica)

Developer Guidelines

Quick Reference Card

Use get_write_db() when:

  • POST, PUT, PATCH, DELETE operations
  • Creating, updating, or deleting data
  • Need read-after-write consistency
  • Critical operations requiring latest data
  • Using transactions

Use get_read_db() when:

  • GET operations (lists, searches)
  • Analytics and reporting
  • Health checks
  • Can tolerate slight staleness
  • High-volume read queries

When in doubt: Use get_write_db() (safer)

Code Review Checklist

When reviewing code that uses database connections:

  • Does this endpoint write data? → Should use get_write_db()
  • Does this endpoint read after writing? → Must use get_write_db()
  • Is this a simple read-only query? → Can use get_read_db()
  • Will replication lag cause issues? → Consider using get_write_db()
  • Is this a high-traffic endpoint? → get_read_db() can help distribute load
  • Are database sessions properly closed? → Check for leaks

Common Patterns

Pattern 1: Simple List Endpoint

@router.get("/items/")
def list_items(
    skip: int = 0,
    limit: int = 100,
    db: Session = Depends(get_read_db),  # ✓ Use read
):
    return db.query(Item).offset(skip).limit(limit).all()

**Pattern 2: Create and Return

@router.post("/items/")
def create_item(
    item: ItemCreate,
    db: Session = Depends(get_write_db),  # ✓ Use write
):
    db_item = Item(**item.dict())
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item
@router.post("/items/")
def create_item_with_details(
    item: ItemCreate,
    db: Session = Depends(get_write_db),  # ✓ Use write (reads after write)
):
    # Write operation
    db_item = Item(**item.dict())
    db.add(db_item)
    db.commit()

    # Read operation - needs to see the write
    category = db.query(Category).get(item.category_id)

    return {
        "item": db_item,
        "category": category
    }

Pattern 4: Service with Mixed Operations

class MyService:
    def process_batch(self):
        # Read data - use read connection
        with get_read_db_context() as read_db:
            items = read_db.query(Item).filter(Item.processed == False).all()

        # Process items...
        results = self._process(items)

        # Write results - use write connection
        with get_write_db_context() as write_db:
            for result in results:
                write_db.add(result)
            write_db.commit()

Monitoring and Metrics

Key Metrics to Track

Connection Pool Metrics:

  • Write pool size and utilization
  • Read pool size and utilization
  • Pool timeout occurrences
  • Connection creation rate

Database Metrics:

  • Queries per second (read vs write)
  • Query latency (read vs write)
  • Replication lag (bytes and time)
  • Connection count per node

Application Metrics:

  • Endpoint response times (before/after)
  • Error rates by endpoint
  • Cache hit rates (if using cache)
  • HAProxy backend status

Expected Improvements

After implementing read-write splitting, you should see:

On Primary Node:

  • 40-60% reduction in query load
  • Improved write performance
  • Lower connection count
  • More available resources

On Replica Nodes:

  • Increased utilization
  • Distributed read load
  • Better resource usage across cluster

Application Level:

  • Lower latency for read operations (distributed load)
  • Better scalability for read-heavy workloads
  • Improved overall system capacity

Conclusion

Read-write database splitting is a powerful pattern for scaling database workloads, especially with your existing infrastructure. By leveraging HAProxy's routing capabilities and PgBouncer's connection pooling, you can achieve significant performance improvements with minimal application complexity.

Key Takeaways:

  1. Infrastructure does the heavy lifting - HAProxy routes to correct nodes
  2. Application changes are straightforward - Just choose the right dependency
  3. Start conservatively - Begin with simple read-only endpoints
  4. Monitor closely - Watch replication lag and pool utilization
  5. Document patterns - Help team understand when to use each connection type

Next Steps:

  1. Review this document with the team
  2. Set up monitoring before starting implementation
  3. Begin with Phase 1 (infrastructure configuration)
  4. Roll out changes gradually over 3 weeks
  5. Gather metrics and optimize based on findings

For questions or issues during implementation, refer to the troubleshooting section or consult with the database team.