Skip to content

Heartbeat System for Service Liveness Monitoring

Overview

The heartbeat system provides continuous service liveness monitoring that operates independently of task processing. This ensures the admin panel can detect if services are alive even when they're not actively processing tasks.

Problem Solved

Original Issue: Health checks only occurred during task execution, making it impossible to detect if long-running services had crashed or become unresponsive between tasks.

Solution: Implemented a separate heartbeat system that publishes lightweight liveness signals every minute, independent of health monitoring and task processing.

Architecture

Components

  1. HeartbeatPublisher - Core heartbeat publishing functionality
  2. ServiceHeartbeatManager - Higher-level manager with common data collectors
  3. Service Integration - Integration points in service lifecycle
  4. Monitoring Tools - Scripts for testing and monitoring heartbeats

Message Flow

Service Instance
    ↓
HeartbeatPublisher (every 60s)
    ↓
AWS Valkey Cluster (heartbeat:service:{service_name})
    ↓
Admin Panel / Monitoring Tools

Implementation Details

Heartbeat Message Format

{
  "service_name": "tracker_fetcher",
  "instance_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "timestamp": "2025-01-16T08:30:00.123456+00:00",
  "uptime_seconds": 3600.5,
  "heartbeat_count": 60,
  "memory_usage_mb": 156.3,
  "cpu_usage_percent": 12.5,

  // Service-specific data
  "queue_size": 1380,
  "last_successful_operation": "2025-01-16T08:29:45.123456+00:00",
  "api_status": "authenticated"
}

Channel Naming Convention

  • Pattern: health:service:{service_name}:heartbeat
  • Examples:
  • health:service:tracker_fetcher:heartbeat
  • health:service:geocoding_service:heartbeat
  • health:service:geofence_service:heartbeat

Configuration

# services/shared/config.py
HEARTBEAT_INTERVAL: int = 60  # Seconds between heartbeats

Service Integration

Tracker Fetcher Service

# In TrackerFetcherService.start()
from services.shared.heartbeat_publisher import create_tracker_fetcher_heartbeat

self.heartbeat_manager = create_tracker_fetcher_heartbeat(self)
await self.heartbeat_manager.start()

# In TrackerFetcherService.stop()
if hasattr(self, 'heartbeat_manager'):
    await self.heartbeat_manager.stop()

Service-Specific Data Collectors

Tracker Fetcher:

  • queue_size - Current queue depth
  • last_successful_operation - Last successful fetch timestamp
  • api_status - Apple API authentication status

Geocoding Service:

  • cache_size - Current cache size
  • pending_requests - Number of pending geocoding requests

Geofence Service:

  • active_geofences - Number of active geofences
  • pending_events - Number of pending geofence events

Usage Examples

Basic Heartbeat Publisher

from services.shared.heartbeat_publisher import HeartbeatPublisher

publisher = HeartbeatPublisher("my_service")
publisher.add_data_collector("custom_metric", lambda: get_custom_value())
await publisher.start()
# ... service runs ...
await publisher.stop()

Service Heartbeat Manager

from services.shared.heartbeat_publisher import ServiceHeartbeatManager

manager = ServiceHeartbeatManager("my_service", service_instance)
manager.add_queue_size_collector(lambda: service.get_queue_size())
manager.add_api_status_collector(lambda: service.get_api_status())
await manager.start()
# ... service runs ...
await manager.stop()

Convenience Functions

# For tracker fetcher
from services.shared.heartbeat_publisher import create_tracker_fetcher_heartbeat
manager = create_tracker_fetcher_heartbeat(tracker_service)

# For geocoding service
from services.shared.heartbeat_publisher import create_geocoding_heartbeat
manager = create_geocoding_heartbeat(geocoding_service)

# For geofence service
from services.shared.heartbeat_publisher import create_geofence_heartbeat
manager = create_geofence_heartbeat(geofence_service)

Monitoring and Testing

Real-time Monitoring

# Monitor all heartbeat messages
python scripts/monitor_heartbeat_messages.py

# Example output:
🔄 tracker_fetcher (a1b2c3d4) - 08:30:15
   Uptime: 1.2h | Heartbeats: 72 | Memory: 156.3MB | CPU: 12.5%
   Service Data:
     queue_size: 1380
     last_successful_operation: 2025-01-16T08:29:45.123456+00:00
     api_status: authenticated

Testing

# Run all heartbeat tests
python scripts/test_heartbeat_system.py

# Test specific components
python scripts/test_heartbeat_system.py basic     # Basic heartbeat
python scripts/test_heartbeat_system.py manager  # Heartbeat manager
python scripts/test_heartbeat_system.py tracker  # Tracker fetcher specific
python scripts/test_heartbeat_system.py monitor  # Monitor messages

Relationship to Health Monitoring

Heartbeat vs Health Monitoring

Aspect Heartbeat Health Monitoring
Purpose Liveness detection Comprehensive health status
Frequency Every 60 seconds Every 30 seconds (status), 5 minutes (metrics)
Data Size Lightweight Comprehensive
Independence Fully independent Tied to service operations
Channel heartbeat:service:{name} health:service:{name}:*

Complementary Systems

  • Heartbeat: "Is the service alive?"
  • Health Monitoring: "How is the service performing?"
  • Task Processing: "What work is being done?"

Admin Panel Integration

Subscription Pattern

// Subscribe to heartbeat messages
const redis = new Redis(config);
redis.psubscribe("heartbeat:service:*");

redis.on("pmessage", (pattern, channel, message) => {
  const data = JSON.parse(message);
  updateServiceLiveness(data.service_name, data);
});

Liveness Detection Logic

function checkServiceLiveness(lastHeartbeat) {
  const now = new Date();
  const heartbeatAge = (now - new Date(lastHeartbeat.timestamp)) / 1000;

  if (heartbeatAge < 90) return "alive"; // Within 1.5 minutes
  if (heartbeatAge < 180) return "stale"; // Within 3 minutes
  return "dead"; // Over 3 minutes
}

Benefits

  1. Continuous Liveness Detection: Services prove they're alive every minute
  2. Independent of Task Processing: Works even when services are idle
  3. Lightweight: Minimal overhead and data transfer
  4. Service-Specific Insights: Each service can include relevant metrics
  5. Real-time Monitoring: Immediate visibility into service status
  6. Admin Panel Ready: Structured data for dashboard integration

Future Enhancements

  1. Heartbeat Aggregation: Collect heartbeat statistics over time
  2. Alert Integration: Generate alerts when heartbeats stop
  3. Load Balancing: Use heartbeat data for service discovery
  4. Performance Correlation: Correlate heartbeat data with performance metrics
  5. Historical Analysis: Track service uptime and reliability patterns

Files Created/Modified

New Files

  • services/shared/heartbeat_publisher.py - Core heartbeat functionality
  • scripts/test_heartbeat_system.py - Comprehensive test suite
  • scripts/monitor_heartbeat_messages.py - Real-time monitoring tool
  • docs/services/heartbeat-system.md - This documentation

Modified Files

  • services/tracker_fetcher/service.py - Integrated heartbeat publishing
  • services/shared/config.py - Added HEARTBEAT_INTERVAL setting

Configuration Settings

# Heartbeat interval (seconds)
HEARTBEAT_INTERVAL: int = 60

# Health Redis settings (used by heartbeat system)
HEALTH_REDIS_HOST: str = "dragonfly"
HEALTH_REDIS_PORT: int = 6379
HEALTH_REDIS_PASSWORD: str = ""

This heartbeat system provides the foundation for robust service liveness monitoring, ensuring the admin panel can always detect service availability regardless of task processing activity.