Created
November 3, 2025 15:49
-
-
Save jasonnerothin/73b90e5eadec909fe222d51b4aeb3c31 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Thread-safe Death Telemetry Metrics Module (Multi-Drone Version) | |
| This module provides observable gauges for death data including: | |
| - drone id | |
| - drone id position (latitude and longitude) | |
| The module uses callbacks to pull the latest data from shared state when | |
| the OTLP exporter requests metric values. It includes staleness checking | |
| to prevent reporting outdated data. | |
| KEY FEATURE: Tracks MULTIPLE drones simultaneously to prevent race conditions | |
| where one death overwrites another. | |
| Usage: | |
| 1. Initialize the MeterProvider (via unified_metrics.py) | |
| 2. Call register_death_metrics() to create the gauges | |
| 3. Call update_and_report_death_metrics() to report deaths | |
| """ | |
| import logging | |
| import threading | |
| import os | |
| import time | |
| from typing import Dict, Any, List | |
| # --- OpenTelemetry Imports --- | |
| from opentelemetry.metrics import get_meter_provider, CallbackOptions, Observation | |
| # ----------------------------- | |
| # --- GLOBAL STATE MANAGEMENT --- | |
| # Define the maximum time (in seconds) the last received metric should be reported. | |
| # If data is older than this, the metric callback will stop reporting the value. | |
| STALENESS_THRESHOLD_SECONDS = 5 | |
| # Maximum number of drones to track (prevents memory leak from old drones) | |
| MAX_TRACKED_DRONES = 100 | |
| # Global logger setup | |
| logger = logging.getLogger(__name__) | |
| # Thread-safe lock to protect access to shared state | |
| _data_lock = threading.Lock() | |
| # Shared state dictionary for tracking MULTIPLE drones | |
| # Structure: { | |
| # 'striker-001': { | |
| # 'latitude': {'value': 50.2, 'updated_at': 123456.789}, | |
| # 'longitude': {'value': 36.3, 'updated_at': 123456.789} | |
| # }, | |
| # 'orlan-001': {...}, | |
| # ... | |
| # } | |
| _latest_telemetry_data: Dict[str, Dict[str, Dict[str, Any]]] = {} | |
| # Registration flag | |
| _METRICS_REGISTERED = False | |
| # --- STATE UPDATE FUNCTION --- | |
| def update_and_report_death_metrics(data: Dict[str, Any]) -> None: | |
| """ | |
| Updates the shared state with the latest death telemetry data from the payload, | |
| and records the timestamp to track data freshness. | |
| This function is thread-safe and should be called whenever a drone death occurs. | |
| Each drone's data is stored separately to prevent overwrites. | |
| Args: | |
| data: Dictionary containing death data with keys: | |
| - drone (str): ID of the dead drone | |
| - latitude (float): death point latitude | |
| - longitude (float): death point longitude | |
| Example: | |
| >>> update_and_report_death_metrics({ | |
| ... 'drone': 'striker-001', | |
| ... 'latitude': 40.7580, | |
| ... 'longitude': -73.9855 | |
| ... }) | |
| """ | |
| global _latest_telemetry_data | |
| drone_id = data.get('drone') | |
| latitude = data.get('latitude') | |
| longitude = data.get('longitude') | |
| if not drone_id or latitude is None or longitude is None: | |
| logger.warning(f"Incomplete death data received: {data}") | |
| return | |
| current_timestamp = time.time() | |
| with _data_lock: | |
| # Initialize drone entry if it doesn't exist | |
| if drone_id not in _latest_telemetry_data: | |
| _latest_telemetry_data[drone_id] = { | |
| 'latitude': {'value': 0.0, 'updated_at': 0.0}, | |
| 'longitude': {'value': 0.0, 'updated_at': 0.0} | |
| } | |
| # Update the drone's data | |
| _latest_telemetry_data[drone_id]['latitude'] = { | |
| 'value': float(latitude), | |
| 'updated_at': current_timestamp | |
| } | |
| _latest_telemetry_data[drone_id]['longitude'] = { | |
| 'value': float(longitude), | |
| 'updated_at': current_timestamp | |
| } | |
| # Cleanup old drones if we exceed the limit | |
| if len(_latest_telemetry_data) > MAX_TRACKED_DRONES: | |
| _cleanup_stale_drones(current_timestamp) | |
| num_tracked = len(_latest_telemetry_data) | |
| logger.debug( | |
| f"Death recorded for {drone_id} at {current_timestamp:.0f}. " | |
| f"Location: ({latitude:.5f}, {longitude:.5f}). " | |
| f"Tracking {num_tracked} drones." | |
| ) | |
| def _cleanup_stale_drones(current_time: float) -> None: | |
| """ | |
| Remove drones whose data is older than STALENESS_THRESHOLD_SECONDS. | |
| Must be called with _data_lock already held! | |
| Args: | |
| current_time: Current timestamp | |
| """ | |
| global _latest_telemetry_data | |
| stale_drones = [] | |
| for drone_id, drone_data in _latest_telemetry_data.items(): | |
| lat_age = current_time - drone_data['latitude']['updated_at'] | |
| lon_age = current_time - drone_data['longitude']['updated_at'] | |
| if lat_age > STALENESS_THRESHOLD_SECONDS and lon_age > STALENESS_THRESHOLD_SECONDS: | |
| stale_drones.append(drone_id) | |
| for drone_id in stale_drones: | |
| del _latest_telemetry_data[drone_id] | |
| logger.debug(f"Cleaned up stale data for {drone_id}") | |
| # --- METRIC CALLBACKS with Staleness Check --- | |
| def _check_and_report_all(metric_key: str) -> List[Observation]: | |
| """ | |
| Helper function to check for data staleness across ALL tracked drones | |
| and return Observations for each fresh drone. | |
| Args: | |
| metric_key: Either 'latitude' or 'longitude' | |
| Returns: | |
| List of Observation objects (one per fresh drone) | |
| """ | |
| current_time = time.time() | |
| observations = [] | |
| with _data_lock: | |
| # Iterate over all tracked drones | |
| for drone_id, drone_data in _latest_telemetry_data.items(): | |
| latitude_data = drone_data.get('latitude', {}) | |
| longitude_data = drone_data.get('longitude', {}) | |
| # Get timestamps | |
| lat_updated_at = latitude_data.get('updated_at', 0.0) | |
| lon_updated_at = longitude_data.get('updated_at', 0.0) | |
| # Skip if uninitialized | |
| if lat_updated_at == 0.0 or lon_updated_at == 0.0: | |
| continue | |
| # Check freshness for both coordinates | |
| lat_age = current_time - lat_updated_at | |
| lon_age = current_time - lon_updated_at | |
| # Only report if BOTH coordinates are fresh | |
| if lat_age < STALENESS_THRESHOLD_SECONDS and lon_age < STALENESS_THRESHOLD_SECONDS: | |
| latitude = latitude_data.get('value', 0.0) | |
| longitude = longitude_data.get('value', 0.0) | |
| # Get the value for the requested metric | |
| if metric_key == 'latitude': | |
| value = latitude | |
| else: # longitude | |
| value = longitude | |
| # Create observation with attributes | |
| attributes = { | |
| "drone.id": drone_id, | |
| "death.latitude": latitude, | |
| "death.longitude": longitude, | |
| } | |
| observations.append(Observation(value, attributes)) | |
| if observations: | |
| logger.debug(f"Reporting {len(observations)} fresh {metric_key} observations") | |
| return observations | |
| def latitude_callback(options: CallbackOptions) -> List[Observation]: | |
| """Yields latitude observations for all fresh drones.""" | |
| return _check_and_report_all('latitude') | |
| def longitude_callback(options: CallbackOptions) -> List[Observation]: | |
| """Yields longitude observations for all fresh drones.""" | |
| return _check_and_report_all('longitude') | |
| # --- METRIC REGISTRATION --- | |
| def register_death_metrics(meter_name: str = "death.telemetry") -> None: | |
| """ | |
| Register death-specific metrics with the existing MeterProvider. | |
| This function should be called AFTER initialize_meter_provider() has been called. | |
| It creates observable gauges for death telemetry that use callbacks to pull | |
| the latest data from the shared state. | |
| Args: | |
| meter_name: Name for this meter (default: "death.telemetry") | |
| Raises: | |
| RuntimeError: If MeterProvider has not been initialized | |
| Example: | |
| >>> from common.unified_metrics import initialize_meter_provider | |
| >>> from common.threadsafe_otlp_death_metrics import register_death_metrics | |
| >>> | |
| >>> initialize_meter_provider("my-service", "user-1", "http://collector:4318/v1/metrics") | |
| >>> register_death_metrics() | |
| """ | |
| global _METRICS_REGISTERED | |
| if _METRICS_REGISTERED: | |
| logger.debug("Death metrics already registered, skipping") | |
| return | |
| provider = get_meter_provider() | |
| meter = provider.get_meter(meter_name) | |
| # Create observable gauges for death metrics | |
| # These will report ALL fresh drones on each export cycle | |
| meter.create_observable_gauge( | |
| name="death.latitude", | |
| callbacks=[latitude_callback], | |
| description="Latitude of the point of death for each drone" | |
| ) | |
| meter.create_observable_gauge( | |
| name="death.longitude", | |
| callbacks=[longitude_callback], | |
| description="Longitude of the point of death for each drone" | |
| ) | |
| _METRICS_REGISTERED = True | |
| logger.info(f"Death metrics registered successfully under meter: {meter_name}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment