Heartbeat System for Service Liveness Monitoring
Overview
The heartbeat system provides continuous service liveness monitoring that operates independently of task processing. This ensures the admin panel can detect if services are alive even when they're not actively processing tasks.
Problem Solved
Original Issue: Health checks only occurred during task execution, making it impossible to detect if long-running services had crashed or become unresponsive between tasks.
Solution: Implemented a separate heartbeat system that publishes lightweight liveness signals every minute, independent of health monitoring and task processing.
Architecture
Components
- HeartbeatPublisher - Core heartbeat publishing functionality
- ServiceHeartbeatManager - Higher-level manager with common data collectors
- Service Integration - Integration points in service lifecycle
- Monitoring Tools - Scripts for testing and monitoring heartbeats
Message Flow
Service Instance
↓
HeartbeatPublisher (every 60s)
↓
AWS Valkey Cluster (heartbeat:service:{service_name})
↓
Admin Panel / Monitoring Tools
Implementation Details
Heartbeat Message Format
{
"service_name": "tracker_fetcher",
"instance_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"timestamp": "2025-01-16T08:30:00.123456+00:00",
"uptime_seconds": 3600.5,
"heartbeat_count": 60,
"memory_usage_mb": 156.3,
"cpu_usage_percent": 12.5,
// Service-specific data
"queue_size": 1380,
"last_successful_operation": "2025-01-16T08:29:45.123456+00:00",
"api_status": "authenticated"
}
Channel Naming Convention
- Pattern:
health:service:{service_name}:heartbeat - Examples:
health:service:tracker_fetcher:heartbeathealth:service:geocoding_service:heartbeathealth:service:geofence_service:heartbeat
Configuration
# services/shared/config.py
HEARTBEAT_INTERVAL: int = 60 # Seconds between heartbeats
Service Integration
Tracker Fetcher Service
# In TrackerFetcherService.start()
from services.shared.heartbeat_publisher import create_tracker_fetcher_heartbeat
self.heartbeat_manager = create_tracker_fetcher_heartbeat(self)
await self.heartbeat_manager.start()
# In TrackerFetcherService.stop()
if hasattr(self, 'heartbeat_manager'):
await self.heartbeat_manager.stop()
Service-Specific Data Collectors
Tracker Fetcher:
queue_size- Current queue depthlast_successful_operation- Last successful fetch timestampapi_status- Apple API authentication status
Geocoding Service:
cache_size- Current cache sizepending_requests- Number of pending geocoding requests
Geofence Service:
active_geofences- Number of active geofencespending_events- Number of pending geofence events
Usage Examples
Basic Heartbeat Publisher
from services.shared.heartbeat_publisher import HeartbeatPublisher
publisher = HeartbeatPublisher("my_service")
publisher.add_data_collector("custom_metric", lambda: get_custom_value())
await publisher.start()
# ... service runs ...
await publisher.stop()
Service Heartbeat Manager
from services.shared.heartbeat_publisher import ServiceHeartbeatManager
manager = ServiceHeartbeatManager("my_service", service_instance)
manager.add_queue_size_collector(lambda: service.get_queue_size())
manager.add_api_status_collector(lambda: service.get_api_status())
await manager.start()
# ... service runs ...
await manager.stop()
Convenience Functions
# For tracker fetcher
from services.shared.heartbeat_publisher import create_tracker_fetcher_heartbeat
manager = create_tracker_fetcher_heartbeat(tracker_service)
# For geocoding service
from services.shared.heartbeat_publisher import create_geocoding_heartbeat
manager = create_geocoding_heartbeat(geocoding_service)
# For geofence service
from services.shared.heartbeat_publisher import create_geofence_heartbeat
manager = create_geofence_heartbeat(geofence_service)
Monitoring and Testing
Real-time Monitoring
# Monitor all heartbeat messages
python scripts/monitor_heartbeat_messages.py
# Example output:
🔄 tracker_fetcher (a1b2c3d4) - 08:30:15
Uptime: 1.2h | Heartbeats: 72 | Memory: 156.3MB | CPU: 12.5%
Service Data:
queue_size: 1380
last_successful_operation: 2025-01-16T08:29:45.123456+00:00
api_status: authenticated
Testing
# Run all heartbeat tests
python scripts/test_heartbeat_system.py
# Test specific components
python scripts/test_heartbeat_system.py basic # Basic heartbeat
python scripts/test_heartbeat_system.py manager # Heartbeat manager
python scripts/test_heartbeat_system.py tracker # Tracker fetcher specific
python scripts/test_heartbeat_system.py monitor # Monitor messages
Relationship to Health Monitoring
Heartbeat vs Health Monitoring
| Aspect | Heartbeat | Health Monitoring |
|---|---|---|
| Purpose | Liveness detection | Comprehensive health status |
| Frequency | Every 60 seconds | Every 30 seconds (status), 5 minutes (metrics) |
| Data Size | Lightweight | Comprehensive |
| Independence | Fully independent | Tied to service operations |
| Channel | heartbeat:service:{name} |
health:service:{name}:* |
Complementary Systems
- Heartbeat: "Is the service alive?"
- Health Monitoring: "How is the service performing?"
- Task Processing: "What work is being done?"
Admin Panel Integration
Subscription Pattern
// Subscribe to heartbeat messages
const redis = new Redis(config);
redis.psubscribe("heartbeat:service:*");
redis.on("pmessage", (pattern, channel, message) => {
const data = JSON.parse(message);
updateServiceLiveness(data.service_name, data);
});
Liveness Detection Logic
function checkServiceLiveness(lastHeartbeat) {
const now = new Date();
const heartbeatAge = (now - new Date(lastHeartbeat.timestamp)) / 1000;
if (heartbeatAge < 90) return "alive"; // Within 1.5 minutes
if (heartbeatAge < 180) return "stale"; // Within 3 minutes
return "dead"; // Over 3 minutes
}
Benefits
- Continuous Liveness Detection: Services prove they're alive every minute
- Independent of Task Processing: Works even when services are idle
- Lightweight: Minimal overhead and data transfer
- Service-Specific Insights: Each service can include relevant metrics
- Real-time Monitoring: Immediate visibility into service status
- Admin Panel Ready: Structured data for dashboard integration
Future Enhancements
- Heartbeat Aggregation: Collect heartbeat statistics over time
- Alert Integration: Generate alerts when heartbeats stop
- Load Balancing: Use heartbeat data for service discovery
- Performance Correlation: Correlate heartbeat data with performance metrics
- Historical Analysis: Track service uptime and reliability patterns
Files Created/Modified
New Files
services/shared/heartbeat_publisher.py- Core heartbeat functionalityscripts/test_heartbeat_system.py- Comprehensive test suitescripts/monitor_heartbeat_messages.py- Real-time monitoring tooldocs/services/heartbeat-system.md- This documentation
Modified Files
services/tracker_fetcher/service.py- Integrated heartbeat publishingservices/shared/config.py- Added HEARTBEAT_INTERVAL setting
Configuration Settings
# Heartbeat interval (seconds)
HEARTBEAT_INTERVAL: int = 60
# Health Redis settings (used by heartbeat system)
HEALTH_REDIS_HOST: str = "dragonfly"
HEALTH_REDIS_PORT: int = 6379
HEALTH_REDIS_PASSWORD: str = ""
This heartbeat system provides the foundation for robust service liveness monitoring, ensuring the admin panel can always detect service availability regardless of task processing activity.