Read-Write Database Splitting Implementation Guide
Executive Summary
This document outlines the implementation of read-write database splitting for the Tracker API system. By leveraging our existing HAProxy infrastructure, we can distribute read traffic across PostgreSQL replicas while maintaining write traffic on the primary node.
Key Benefits:
- Reduced load on primary database server
- Improved read performance through load distribution
- Better resource utilization of replica nodes
- Scalable architecture as read traffic grows
Effort Estimate: 3-5 days
Risk Level: Low (infrastructure handles routing, application changes are straightforward)
Current Architecture
Our current database infrastructure is well-suited for read-write splitting:
Application Layer
↓
HAProxy Load Balancer
├─ Port 5432 (Write) → Patroni Primary → PgBouncer (6432) → PostgreSQL (5432)
└─ Port 5433 (Read) → Patroni Replicas → PgBouncer (6432) → PostgreSQL (5432)
(Load balanced across multiple replicas)
Why This Setup is Ideal
- HAProxy handles routing logic - Knows which node is primary vs replica
- PgBouncer provides connection pooling - Each node has local PgBouncer
- Patroni manages replication - Automatic failover and health checking
- Transaction pooling mode - Perfect for read-write splitting (no prepared statement issues)
Current Configuration
Each Patroni node has a PgBouncer instance:
[databases]
* = host=production-pg-1.timescale.glimpse.internal port=5432
[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
pool_mode = transaction # ✓ Perfect for read-write splitting
max_client_conn = 2000
default_pool_size = 20
max_db_connections = 80
Implementation Phases
Phase 1: Infrastructure Configuration (2-3 hours)
1.1 Environment Variables
Add read port configuration to all environment files:
.env, .env.local, .env.test:
# Existing
POSTGRES_SERVER=haproxy.production.internal
POSTGRES_PORT=5432
POSTGRES_USER=tracker
POSTGRES_PASSWORD=...
POSTGRES_DB=tracker
# New - Add this
POSTGRES_READ_PORT=5433
Docker Compose (compose.yml):
services:
api:
environment:
- POSTGRES_SERVER=haproxy.production.internal
- POSTGRES_PORT=5432
- POSTGRES_READ_PORT=5433 # Add this
1.2 Configuration Classes
Update configuration to read the new environment variable:
app/core/config.py:
class Settings(BaseSettings):
# ... existing settings ...
POSTGRES_SERVER: str = os.getenv("POSTGRES_SERVER", "localhost")
POSTGRES_USER: str = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "postgres")
POSTGRES_DB: str = os.getenv("POSTGRES_DB", "tracker")
POSTGRES_PORT: str = os.getenv("POSTGRES_PORT", "5432")
POSTGRES_READ_PORT: str = os.getenv("POSTGRES_READ_PORT", "5433") # Add this
services/shared/config.py:
class ServiceSettings(BaseSettings):
# ... existing settings ...
POSTGRES_SERVER: str = "localhost"
POSTGRES_USER: str = "postgres"
POSTGRES_PASSWORD: str = "postgres"
POSTGRES_DB: str = "tracker"
POSTGRES_PORT: str = "5432"
POSTGRES_READ_PORT: str = "5433" # Add this
Phase 2: Database Module Updates (1 day)
2.1 Sync Engine Configuration
File: app/core/database.py
Create separate engines for read and write operations:
# After existing imports and before current engine creation
# Determine ports
POSTGRES_WRITE_PORT = settings.POSTGRES_PORT # 5432
POSTGRES_READ_PORT = settings.POSTGRES_READ_PORT # 5433
# Build connection URLs
WRITE_DB_URL = (
f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}"
f"@{settings.POSTGRES_SERVER}:{POSTGRES_WRITE_PORT}/{settings.POSTGRES_DB}"
)
READ_DB_URL = (
f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}"
f"@{settings.POSTGRES_SERVER}:{POSTGRES_READ_PORT}/{settings.POSTGRES_DB}"
)
# Write engine (primary) - existing pool configuration
write_engine = create_engine(
WRITE_DB_URL,
future=True,
pool_size=POOL_SIZE,
max_overflow=MAX_OVERFLOW,
pool_timeout=30,
pool_recycle=POOL_RECYCLE,
pool_pre_ping=True,
connect_args=connect_args,
)
# Read engine (replicas) - larger pool for read-heavy workloads
read_engine = create_engine(
READ_DB_URL,
future=True,
pool_size=POOL_SIZE * 2, # More connections for reads
max_overflow=MAX_OVERFLOW * 2,
pool_timeout=30,
pool_recycle=POOL_RECYCLE,
pool_pre_ping=True,
connect_args=connect_args,
)
# Keep existing engine as write_engine for backward compatibility
engine = write_engine
logger.info(f"Write engine configured for port {POSTGRES_WRITE_PORT}")
logger.info(f"Read engine configured for port {POSTGRES_READ_PORT} "
f"(pool_size={POOL_SIZE * 2})")
Update session factories:
# Create separate session factories
WriteSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=write_engine)
ReadSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=read_engine)
# Keep existing for backward compatibility
SessionLocal = WriteSessionLocal
Add new dependency functions:
def get_write_db() -> Generator[Session, None, None]:
"""
Get database session for write operations.
Use this for:
- POST, PUT, PATCH, DELETE operations
- Any operation that modifies data
- Operations requiring read-after-write consistency
- Transactional operations
Yields:
Session: A SQLAlchemy database session connected to primary
"""
session = _create_session_with_retry(WriteSessionLocal)
try:
yield session
except Exception as e:
logger.error(f"Error during write operation: {e}")
try:
session.rollback()
except Exception as rollback_error:
logger.error(f"Error during rollback: {rollback_error}")
raise
finally:
_close_session_safely(session)
def get_read_db() -> Generator[Session, None, None]:
"""
Get database session for read operations.
Use this for:
- GET operations that don't need strong consistency
- List/search endpoints
- Analytics and reporting
- Health checks
Note: May return slightly stale data due to replication lag.
For read-after-write scenarios, use get_write_db() instead.
Yields:
Session: A SQLAlchemy database session connected to replica
"""
session = _create_session_with_retry(ReadSessionLocal)
try:
yield session
except Exception as e:
logger.error(f"Error during read operation: {e}")
raise
finally:
_close_session_safely(session)
# Update existing get_db for backward compatibility (uses write)
def get_db() -> Generator[Session, None, None]:
"""
Get database session (backward compatibility - uses write engine).
For new code, prefer using get_write_db() or get_read_db() explicitly.
"""
return get_write_db()
Update retry logic to accept session factory:
def _create_session_with_retry(
session_factory: sessionmaker = WriteSessionLocal
) -> Session:
"""Create a database session with retry logic.
Args:
session_factory: The sessionmaker to use (Write or Read)
Returns:
A valid database session
"""
retry_count = 0
retry_delay = INITIAL_RETRY_DELAY
session: Optional[Session] = None
while retry_count < MAX_CONNECTION_RETRIES:
try:
session = session_factory()
if session is None:
raise RuntimeError("Failed to create database session")
session.execute(text("SELECT 1"))
return session
except OperationalError as e:
if _is_timeout_error(e):
retry_count += 1
logger.warning(
f"Database connection timeout, retrying "
f"({retry_count}/{MAX_CONNECTION_RETRIES})..."
)
_close_session_safely(session)
if retry_count < MAX_CONNECTION_RETRIES:
time.sleep(retry_delay)
retry_delay *= RETRY_BACKOFF_MULTIPLIER
else:
logger.error(f"Database error: {e}")
_close_session_safely(session)
raise
except Exception as e:
logger.error(f"Unexpected database error: {e}")
_close_session_safely(session)
raise
logger.error("Maximum database connection retries exceeded")
raise OperationalError(
"Could not establish database connection after multiple retries",
{},
None,
)
2.2 Async Engine Configuration
File: app/core/database.py (continued)
# Build async connection URLs
ASYNC_WRITE_DB_URL = WRITE_DB_URL.replace(
"postgresql://", "postgresql+asyncpg://"
)
ASYNC_READ_DB_URL = READ_DB_URL.replace(
"postgresql://", "postgresql+asyncpg://"
)
# Async Write Engine
async_write_engine = create_async_engine(
ASYNC_WRITE_DB_URL,
future=True,
pool_size=ASYNC_POOL_SIZE,
max_overflow=ASYNC_MAX_OVERFLOW,
pool_timeout=20,
pool_recycle=ASYNC_POOL_RECYCLE,
pool_pre_ping=True,
pool_reset_on_return="commit",
connect_args=async_connect_args,
)
# Async Read Engine
async_read_engine = create_async_engine(
ASYNC_READ_DB_URL,
future=True,
pool_size=ASYNC_POOL_SIZE * 2, # More connections for reads
max_overflow=ASYNC_MAX_OVERFLOW * 2,
pool_timeout=20,
pool_recycle=ASYNC_POOL_RECYCLE,
pool_pre_ping=True,
pool_reset_on_return="commit",
connect_args=async_connect_args,
)
# Keep existing as write for backward compatibility
async_engine = async_write_engine
logger.info("Async write and read engines configured")
# Async Session Factories
AsyncWriteSessionLocal = async_sessionmaker(
async_write_engine, class_=AsyncSession, expire_on_commit=False
)
AsyncReadSessionLocal = async_sessionmaker(
async_read_engine, class_=AsyncSession, expire_on_commit=False
)
AsyncSessionLocal = AsyncWriteSessionLocal # Backward compatibility
async def get_async_write_db() -> AsyncSession:
"""Get async database session for write operations."""
async with AsyncWriteSessionLocal() as session:
yield session
async def get_async_read_db() -> AsyncSession:
"""Get async database session for read operations."""
async with AsyncReadSessionLocal() as session:
yield session
# Backward compatibility
async def get_async_db() -> AsyncSession:
"""Get async database session (uses write engine for compatibility)."""
return get_async_write_db()
2.3 Service Database Module Updates
File: services/shared/database.py
Apply the same pattern:
# After imports and configuration
POSTGRES_WRITE_PORT = settings.POSTGRES_PORT
POSTGRES_READ_PORT = settings.POSTGRES_READ_PORT
WRITE_DB_URL = (
f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}"
f"@{settings.POSTGRES_SERVER}:{POSTGRES_WRITE_PORT}/{settings.POSTGRES_DB}"
)
READ_DB_URL = (
f"postgresql+psycopg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}"
f"@{settings.POSTGRES_SERVER}:{POSTGRES_READ_PORT}/{settings.POSTGRES_DB}"
)
# Create write and read engines
write_engine = create_engine(
WRITE_DB_URL,
future=True,
pool_size=POOL_SIZE,
max_overflow=MAX_OVERFLOW,
pool_timeout=30,
pool_recycle=POOL_RECYCLE,
pool_pre_ping=True,
connect_args=connect_args,
)
read_engine = create_engine(
READ_DB_URL,
future=True,
pool_size=POOL_SIZE * 2,
max_overflow=MAX_OVERFLOW * 2,
pool_timeout=30,
pool_recycle=POOL_RECYCLE,
pool_pre_ping=True,
connect_args=connect_args,
)
engine = write_engine # Backward compatibility
# Session factories
write_session_local = sessionmaker(autocommit=False, autoflush=False, bind=write_engine)
read_session_local = sessionmaker(autocommit=False, autoflush=False, bind=read_engine)
session_local = write_session_local # Backward compatibility
def get_write_db_session() -> Generator[Session, None, None]:
"""Get database session for write operations."""
# Implementation similar to app/core/database.py
...
def get_read_db_session() -> Generator[Session, None, None]:
"""Get database session for read operations."""
# Implementation similar to app/core/database.py
...
# Backward compatibility
def get_db_session() -> Generator[Session, None, None]:
"""Get database session (uses write engine)."""
return get_write_db_session()
@contextmanager
def get_write_db_context():
"""Context manager for write database sessions."""
db_gen = get_write_db_session()
db = next(db_gen)
try:
yield db
except Exception:
db.rollback()
raise
finally:
try:
next(db_gen)
except StopIteration:
pass
@contextmanager
def get_read_db_context():
"""Context manager for read database sessions."""
db_gen = get_read_db_session()
db = next(db_gen)
try:
yield db
finally:
try:
next(db_gen)
except StopIteration:
pass
Phase 3: Route Updates (2-3 days)
3.1 Decision Flowchart
Use this flowchart to decide which dependency to use:
Is this operation writing data?
├─ YES → Use get_write_db()
└─ NO → ↓
Does this operation need to read data written in the same request?
├─ YES → Use get_write_db() (read-after-write consistency)
└─ NO → ↓
Is this a critical operation requiring absolute latest data?
├─ YES → Use get_write_db()
└─ NO → Use get_read_db() ✓
3.2 Route Patterns
Simple List/Search Endpoints (use read_db):
@router.get("/trackers/", response_model=List[TrackerResponse])
def list_trackers(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_read_db), # ← Changed
) -> List[Tracker]:
"""List trackers - uses read replica."""
return db.query(Tracker).offset(skip).limit(limit).all()
@router.get("/brands/", response_model=List[BrandResponse])
def list_brands(
db: Session = Depends(get_read_db), # ← Changed
) -> List[Brand]:
"""List brands - uses read replica."""
return db.query(Brand).all()
Create/Update/Delete Operations (use write_db):
@router.post("/trackers/", response_model=TrackerResponse)
def create_tracker(
tracker: TrackerCreate,
db: Session = Depends(get_write_db), # ← Keep/change to write
) -> Tracker:
"""Create tracker - uses primary."""
db_tracker = Tracker(**tracker.dict())
db.add(db_tracker)
db.commit()
db.refresh(db_tracker)
return db_tracker
@router.put("/trackers/{tracker_id}", response_model=TrackerResponse)
def update_tracker(
tracker_id: int,
tracker: TrackerUpdate,
db: Session = Depends(get_write_db), # ← Keep/change to write
) -> Tracker:
"""Update tracker - uses primary."""
db_tracker = db.query(Tracker).filter(Tracker.id == tracker_id).first()
if not db_tracker:
raise HTTPException(status_code=404, detail="Tracker not found")
for key, value in tracker.dict(exclude_unset=True).items():
setattr(db_tracker, key, value)
db.commit()
db.refresh(db_tracker)
return db_tracker
Read-After-Write Scenarios (use write_db):
@router.post("/trackers/{tracker_id}/locations", response_model=TrackerWithLocations)
def add_location_and_return_tracker(
tracker_id: int,
location: LocationCreate,
db: Session = Depends(get_write_db), # ← Must use write!
) -> Dict:
"""
Add location then return tracker with locations.
Uses write DB to ensure we see the newly created location.
"""
# Write operation
db_location = Location(tracker_id=tracker_id, **location.dict())
db.add(db_location)
db.commit()
# Read operation - must see the write we just made
tracker = db.query(Tracker).filter(Tracker.id == tracker_id).first()
locations = db.query(Location).filter(
Location.tracker_id == tracker_id
).order_by(Location.timestamp.desc()).limit(10).all()
return {
"tracker": tracker,
"recent_locations": locations
}
Health Check Endpoints (use read_db):
@router.get("/health/database")
def database_health_check(
db: Session = Depends(get_read_db), # ← Use read
) -> Dict:
"""Database health check - uses read replica to avoid load on primary."""
try:
db.execute(text("SELECT 1"))
return {"status": "healthy", "database": "connected"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
3.3 Phased Route Updates
Week 1 - Low Risk Reads:
Start with these endpoints (low risk, high value):
GET /api/v1/trackers/- List trackersGET /api/v1/brands/- List brandsGET /api/v1/models/- List modelsGET /api/v1/delivery-locations/- List delivery locationsGET /api/v1/health/*- Health checksGET /api/v1/metrics/*- Metrics endpoints
Week 2 - Expand Coverage:
Add more GET endpoints:
GET /api/v1/trackers/{id}/locations- Location history (if no recent write)GET /api/v1/search/*- Search endpointsGET /api/v1/reports/*- Reporting endpoints
Keep on Write DB (for now):
- Any endpoint with POST/PUT/PATCH/DELETE
GET /api/v1/trackers/{id}- Single tracker (might need recent data)- Any endpoint that does writes then reads
Phase 4: Service Updates (1-2 days)
4.1 Read-Heavy Services
Location Aggregator Service:
# services/location_aggregator/aggregator.py
from services.shared.database import get_read_db_context
class LocationAggregator:
def fetch_location_reports(self, batch_size: int = 1000):
"""Fetch location reports for aggregation - use read DB."""
with get_read_db_context() as db: # ← Use read
return db.query(LocationReport).filter(
LocationReport.processed == False
).limit(batch_size).all()
Tracker Status Service:
# services/tracker_status_service/status_checker.py
from services.shared.database import get_read_db_context
class StatusChecker:
def check_tracker_activity(self):
"""Check tracker activity - use read DB."""
with get_read_db_context() as db: # ← Use read
return db.query(Tracker).filter(
Tracker.last_seen < datetime.utcnow() - timedelta(days=7)
).all()
4.2 Write-Heavy Services
Tracker Fetcher Service:
# services/tracker_fetcher/fetcher.py
from services.shared.database import get_write_db_context
class TrackerFetcher:
def save_location_reports(self, reports: List[Dict]):
"""Save location reports - use write DB."""
with get_write_db_context() as db: # ← Use write
for report in reports:
location = LocationReport(**report)
db.add(location)
db.commit()
Unified Geofence Service:
# services/unified_geofence_service/processor.py
from services.shared.database import get_write_db_context, get_read_db_context
class GeofenceProcessor:
def fetch_unprocessed_locations(self):
"""Fetch locations to process - use read DB."""
with get_read_db_context() as db: # ← Use read
return db.query(LocationReport).filter(
LocationReport.geofence_processed == False
).all()
def save_geofence_events(self, events: List[GeofenceEvent]):
"""Save geofence events - use write DB."""
with get_write_db_context() as db: # ← Use write
for event in events:
db.add(event)
db.commit()
Phase 5: Testing Strategy (1 day)
5.1 Quick Verification Test
Create a test endpoint to verify routing:
File: app/api/routes/test_read_split.py
from fastapi import APIRouter, Depends
from sqlalchemy import text
from sqlalchemy.orm import Session
from app.core.database import get_read_db, get_write_db
router = APIRouter()
@router.get("/test/read-write-split")
def test_read_write_split(
read_db: Session = Depends(get_read_db),
write_db: Session = Depends(get_write_db),
) -> Dict:
"""
Test read-write splitting by checking which nodes we connect to.
Returns:
Dict showing read and write connection details
"""
# Check read connection
read_result = read_db.execute(text(
"SELECT inet_server_addr() as host, "
"inet_server_port() as port, "
"pg_is_in_recovery() as is_replica"
)).fetchone()
# Check write connection
write_result = write_db.execute(text(
"SELECT inet_server_addr() as host, "
"inet_server_port() as port, "
"pg_is_in_recovery() as is_replica"
)).fetchone()
return {
"read_connection": {
"host": str(read_result.host),
"port": read_result.port,
"is_replica": read_result.is_replica,
"expected": "replica (is_replica=true)"
},
"write_connection": {
"host": str(write_result.host),
"port": write_result.port,
"is_replica": write_result.is_replica,
"expected": "primary (is_replica=false)"
},
"status": "✓ Working correctly" if (
read_result.is_replica == True and
write_result.is_replica == False
) else "✗ Issue detected"
}
Expected Output:
{
"read_connection": {
"host": "10.0.2.15",
"port": 5432,
"is_replica": true,
"expected": "replica (is_replica=true)"
},
"write_connection": {
"host": "10.0.1.10",
"port": 5432,
"is_replica": false,
"expected": "primary (is_replica=false)"
},
"status": "✓ Working correctly"
}
5.2 Load Testing
Test Read Distribution:
# Send 1000 read requests
for i in {1..1000}; do
curl -s http://localhost:8100/api/v1/trackers/ > /dev/null &
done
wait
# Check HAProxy stats to see distribution across replicas
curl http://haproxy:1936/stats
Monitor Connection Pools:
# Add endpoint to monitor pool status
@router.get("/debug/pool-stats")
def pool_stats() -> Dict:
"""Get connection pool statistics."""
from app.core.database import write_engine, read_engine
return {
"write_pool": {
"size": write_engine.pool.size(),
"checked_in": write_engine.pool.checkedin(),
"checked_out": write_engine.pool.checkedout(),
"overflow": write_engine.pool.overflow(),
},
"read_pool": {
"size": read_engine.pool.size(),
"checked_in": read_engine.pool.checkedin(),
"checked_out": read_engine.pool.checkedout(),
"overflow": read_engine.pool.overflow(),
}
}
5.3 Replication Lag Testing
Monitor Replication Lag:
-- On primary server
SELECT
client_addr,
state,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
sync_state,
pg_wal_lsn_diff(sent_lsn, replay_lsn) as lag_bytes
FROM pg_stat_replication;
Test Lag Scenarios:
- Normal load - lag should be < 1MB
- Heavy write load - monitor if lag increases
- If lag > 10MB consistently, consider:
- Reducing read traffic to replicas
- Adding more replicas
- Optimizing write queries
Rollout Plan
Week 1: Infrastructure & Low-Risk Reads
Day 1: Infrastructure Setup
- Add
POSTGRES_READ_PORT=5433to all environment files - Update
app/core/config.pyandservices/shared/config.py - Deploy configuration changes
- Verify HAProxy routing (check stats page)
Day 2: Database Modules
- Update
app/core/database.pywith write/read engines - Update
services/shared/database.pywith write/read engines - Deploy changes
- Test with verification endpoint
- Monitor logs for any connection issues
Day 3-4: Low-Risk Route Updates
- Update list endpoints (trackers, brands, models, etc.)
- Update health check endpoints
- Deploy changes
- Monitor HAProxy stats for read distribution
- Check for any errors in logs
Day 5: Verify & Monitor
- Run load tests
- Check replication lag
- Verify pool statistics
- Monitor performance metrics
- Document any issues
Week 2: Expand Coverage
Day 6-8: More Read Endpoints
- Update search endpoints
- Update reporting endpoints
- Update analytics endpoints
- Deploy in batches
- Monitor after each batch
Day 9: Service Updates
- Update read-heavy services
- Deploy service changes
- Monitor service health
- Check connection distribution
Day 10: Testing & Validation
- Comprehensive load testing
- Verify all metrics
- Check for any consistency issues
- Document findings
Week 3: Optimize & Document
Day 11-12: Optimization
- Tune connection pool sizes based on metrics
- Adjust timeout values if needed
- Optimize slow queries identified during monitoring
- Update configurations based on findings
Day 13-15: Documentation & Training
- Update developer documentation
- Create decision flowchart for team
- Document patterns and best practices
- Train team on when to use read vs write
- Create troubleshooting guide
Troubleshooting Guide
Issue: Replication Lag
Symptoms:
- Users report seeing stale data
- Lag monitoring shows high lag (> 10MB)
- Inconsistent query results
Solutions:
- Check replication status:
SELECT * FROM pg_stat_replication;
- If lag is consistently high:
- Reduce read traffic temporarily
- Investigate slow replaying on replicas
-
Consider scaling up replica resources
-
For critical operations:
- Use
get_write_db()instead ofget_read_db() - Add application-level caching
Issue: Read-After-Write Inconsistency
Symptoms:
- User creates resource, immediately requests it, gets 404
- Recent changes not visible in list views
- Inconsistent data in UI
Solutions:
- Use write DB for read-after-write:
@router.post("/trackers/")
def create_tracker(
tracker: TrackerCreate,
db: Session = Depends(get_write_db), # ← Changed to write
) -> Dict:
"""Create tracker and return with related data."""
# Write operation
db_tracker = Tracker(**tracker.dict())
db.add(db_tracker)
db.commit()
db.refresh(db_tracker)
# Read operation - use same connection to see the write
related_data = db.query(RelatedModel).filter(...).all()
return {
"tracker": db_tracker,
"related": related_data
}
- Add delay for non-critical views:
# If replication lag is < 1 second, a small delay may help
import time
time.sleep(0.5) # Wait for replication
- Use cache for frequently accessed data:
- Cache list views with short TTL
- Invalidate cache on writes
- Reduce dependency on read replicas for hot data
Issue: Connection Pool Exhaustion
Symptoms:
- "Connection pool timeout" errors
- High number of waiting connections
- Pool statistics show all connections checked out
Solutions:
- Increase pool sizes:
# In database.py
POOL_SIZE = 30 # Increased from 20
MAX_OVERFLOW = 50 # Increased from 30
- Check for connection leaks:
# Ensure all database sessions are properly closed
# Use context managers or try/finally blocks
- Monitor slow queries:
-- Check for long-running queries SELECT pid, usename, application_name, state, query_start, query FROM pg_stat_activity WHERE state != 'idle' AND query_start < now() - interval '5 minutes';
Issue: HAProxy Routing Problems
Symptoms:
- Writes going to replicas (causing errors)
- Reads not distributed across replicas
- Connection errors to specific nodes
Solutions:
- Verify HAProxy configuration:
# Check HAProxy stats
curl http://haproxy:1936/stats
# Look for:
# - Port 5432 routing only to primary
# - Port 5433 routing to all replicas
# - Health check status of all nodes
- Test port routing directly:
# Test write port
psql -h haproxy.internal -p 5432 -U tracker -d tracker -c "SELECT pg_is_in_recovery();"
# Should return: f (false - primary)
# Test read port
psql -h haproxy.internal -p 5433 -U tracker -d tracker -c "SELECT pg_is_in_recovery();"
# Should return: t (true - replica)
Developer Guidelines
Quick Reference Card
Use get_write_db() when:
- POST, PUT, PATCH, DELETE operations
- Creating, updating, or deleting data
- Need read-after-write consistency
- Critical operations requiring latest data
- Using transactions
Use get_read_db() when:
- GET operations (lists, searches)
- Analytics and reporting
- Health checks
- Can tolerate slight staleness
- High-volume read queries
When in doubt: Use get_write_db() (safer)
Code Review Checklist
When reviewing code that uses database connections:
- Does this endpoint write data? → Should use
get_write_db() - Does this endpoint read after writing? → Must use
get_write_db() - Is this a simple read-only query? → Can use
get_read_db() - Will replication lag cause issues? → Consider using
get_write_db() - Is this a high-traffic endpoint? →
get_read_db()can help distribute load - Are database sessions properly closed? → Check for leaks
Common Patterns
Pattern 1: Simple List Endpoint
@router.get("/items/")
def list_items(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_read_db), # ✓ Use read
):
return db.query(Item).offset(skip).limit(limit).all()
**Pattern 2: Create and Return
@router.post("/items/")
def create_item(
item: ItemCreate,
db: Session = Depends(get_write_db), # ✓ Use write
):
db_item = Item(**item.dict())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
Pattern 3: Write Then Read Related Data
@router.post("/items/")
def create_item_with_details(
item: ItemCreate,
db: Session = Depends(get_write_db), # ✓ Use write (reads after write)
):
# Write operation
db_item = Item(**item.dict())
db.add(db_item)
db.commit()
# Read operation - needs to see the write
category = db.query(Category).get(item.category_id)
return {
"item": db_item,
"category": category
}
Pattern 4: Service with Mixed Operations
class MyService:
def process_batch(self):
# Read data - use read connection
with get_read_db_context() as read_db:
items = read_db.query(Item).filter(Item.processed == False).all()
# Process items...
results = self._process(items)
# Write results - use write connection
with get_write_db_context() as write_db:
for result in results:
write_db.add(result)
write_db.commit()
Monitoring and Metrics
Key Metrics to Track
Connection Pool Metrics:
- Write pool size and utilization
- Read pool size and utilization
- Pool timeout occurrences
- Connection creation rate
Database Metrics:
- Queries per second (read vs write)
- Query latency (read vs write)
- Replication lag (bytes and time)
- Connection count per node
Application Metrics:
- Endpoint response times (before/after)
- Error rates by endpoint
- Cache hit rates (if using cache)
- HAProxy backend status
Expected Improvements
After implementing read-write splitting, you should see:
On Primary Node:
- 40-60% reduction in query load
- Improved write performance
- Lower connection count
- More available resources
On Replica Nodes:
- Increased utilization
- Distributed read load
- Better resource usage across cluster
Application Level:
- Lower latency for read operations (distributed load)
- Better scalability for read-heavy workloads
- Improved overall system capacity
Conclusion
Read-write database splitting is a powerful pattern for scaling database workloads, especially with your existing infrastructure. By leveraging HAProxy's routing capabilities and PgBouncer's connection pooling, you can achieve significant performance improvements with minimal application complexity.
Key Takeaways:
- Infrastructure does the heavy lifting - HAProxy routes to correct nodes
- Application changes are straightforward - Just choose the right dependency
- Start conservatively - Begin with simple read-only endpoints
- Monitor closely - Watch replication lag and pool utilization
- Document patterns - Help team understand when to use each connection type
Next Steps:
- Review this document with the team
- Set up monitoring before starting implementation
- Begin with Phase 1 (infrastructure configuration)
- Roll out changes gradually over 3 weeks
- Gather metrics and optimize based on findings
For questions or issues during implementation, refer to the troubleshooting section or consult with the database team.