Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jasonnerothin/73b90e5eadec909fe222d51b4aeb3c31 to your computer and use it in GitHub Desktop.

Select an option

Save jasonnerothin/73b90e5eadec909fe222d51b4aeb3c31 to your computer and use it in GitHub Desktop.
"""
Thread-safe Death Telemetry Metrics Module (Multi-Drone Version)
This module provides observable gauges for death data including:
- drone id
- drone id position (latitude and longitude)
The module uses callbacks to pull the latest data from shared state when
the OTLP exporter requests metric values. It includes staleness checking
to prevent reporting outdated data.
KEY FEATURE: Tracks MULTIPLE drones simultaneously to prevent race conditions
where one death overwrites another.
Usage:
1. Initialize the MeterProvider (via unified_metrics.py)
2. Call register_death_metrics() to create the gauges
3. Call update_and_report_death_metrics() to report deaths
"""
import logging
import threading
import os
import time
from typing import Dict, Any, List
# --- OpenTelemetry Imports ---
from opentelemetry.metrics import get_meter_provider, CallbackOptions, Observation
# -----------------------------
# --- GLOBAL STATE MANAGEMENT ---
# Define the maximum time (in seconds) the last received metric should be reported.
# If data is older than this, the metric callback will stop reporting the value.
STALENESS_THRESHOLD_SECONDS = 5
# Maximum number of drones to track (prevents memory leak from old drones)
MAX_TRACKED_DRONES = 100
# Global logger setup
logger = logging.getLogger(__name__)
# Thread-safe lock to protect access to shared state
_data_lock = threading.Lock()
# Shared state dictionary for tracking MULTIPLE drones
# Structure: {
# 'striker-001': {
# 'latitude': {'value': 50.2, 'updated_at': 123456.789},
# 'longitude': {'value': 36.3, 'updated_at': 123456.789}
# },
# 'orlan-001': {...},
# ...
# }
_latest_telemetry_data: Dict[str, Dict[str, Dict[str, Any]]] = {}
# Registration flag
_METRICS_REGISTERED = False
# --- STATE UPDATE FUNCTION ---
def update_and_report_death_metrics(data: Dict[str, Any]) -> None:
"""
Updates the shared state with the latest death telemetry data from the payload,
and records the timestamp to track data freshness.
This function is thread-safe and should be called whenever a drone death occurs.
Each drone's data is stored separately to prevent overwrites.
Args:
data: Dictionary containing death data with keys:
- drone (str): ID of the dead drone
- latitude (float): death point latitude
- longitude (float): death point longitude
Example:
>>> update_and_report_death_metrics({
... 'drone': 'striker-001',
... 'latitude': 40.7580,
... 'longitude': -73.9855
... })
"""
global _latest_telemetry_data
drone_id = data.get('drone')
latitude = data.get('latitude')
longitude = data.get('longitude')
if not drone_id or latitude is None or longitude is None:
logger.warning(f"Incomplete death data received: {data}")
return
current_timestamp = time.time()
with _data_lock:
# Initialize drone entry if it doesn't exist
if drone_id not in _latest_telemetry_data:
_latest_telemetry_data[drone_id] = {
'latitude': {'value': 0.0, 'updated_at': 0.0},
'longitude': {'value': 0.0, 'updated_at': 0.0}
}
# Update the drone's data
_latest_telemetry_data[drone_id]['latitude'] = {
'value': float(latitude),
'updated_at': current_timestamp
}
_latest_telemetry_data[drone_id]['longitude'] = {
'value': float(longitude),
'updated_at': current_timestamp
}
# Cleanup old drones if we exceed the limit
if len(_latest_telemetry_data) > MAX_TRACKED_DRONES:
_cleanup_stale_drones(current_timestamp)
num_tracked = len(_latest_telemetry_data)
logger.debug(
f"Death recorded for {drone_id} at {current_timestamp:.0f}. "
f"Location: ({latitude:.5f}, {longitude:.5f}). "
f"Tracking {num_tracked} drones."
)
def _cleanup_stale_drones(current_time: float) -> None:
"""
Remove drones whose data is older than STALENESS_THRESHOLD_SECONDS.
Must be called with _data_lock already held!
Args:
current_time: Current timestamp
"""
global _latest_telemetry_data
stale_drones = []
for drone_id, drone_data in _latest_telemetry_data.items():
lat_age = current_time - drone_data['latitude']['updated_at']
lon_age = current_time - drone_data['longitude']['updated_at']
if lat_age > STALENESS_THRESHOLD_SECONDS and lon_age > STALENESS_THRESHOLD_SECONDS:
stale_drones.append(drone_id)
for drone_id in stale_drones:
del _latest_telemetry_data[drone_id]
logger.debug(f"Cleaned up stale data for {drone_id}")
# --- METRIC CALLBACKS with Staleness Check ---
def _check_and_report_all(metric_key: str) -> List[Observation]:
"""
Helper function to check for data staleness across ALL tracked drones
and return Observations for each fresh drone.
Args:
metric_key: Either 'latitude' or 'longitude'
Returns:
List of Observation objects (one per fresh drone)
"""
current_time = time.time()
observations = []
with _data_lock:
# Iterate over all tracked drones
for drone_id, drone_data in _latest_telemetry_data.items():
latitude_data = drone_data.get('latitude', {})
longitude_data = drone_data.get('longitude', {})
# Get timestamps
lat_updated_at = latitude_data.get('updated_at', 0.0)
lon_updated_at = longitude_data.get('updated_at', 0.0)
# Skip if uninitialized
if lat_updated_at == 0.0 or lon_updated_at == 0.0:
continue
# Check freshness for both coordinates
lat_age = current_time - lat_updated_at
lon_age = current_time - lon_updated_at
# Only report if BOTH coordinates are fresh
if lat_age < STALENESS_THRESHOLD_SECONDS and lon_age < STALENESS_THRESHOLD_SECONDS:
latitude = latitude_data.get('value', 0.0)
longitude = longitude_data.get('value', 0.0)
# Get the value for the requested metric
if metric_key == 'latitude':
value = latitude
else: # longitude
value = longitude
# Create observation with attributes
attributes = {
"drone.id": drone_id,
"death.latitude": latitude,
"death.longitude": longitude,
}
observations.append(Observation(value, attributes))
if observations:
logger.debug(f"Reporting {len(observations)} fresh {metric_key} observations")
return observations
def latitude_callback(options: CallbackOptions) -> List[Observation]:
"""Yields latitude observations for all fresh drones."""
return _check_and_report_all('latitude')
def longitude_callback(options: CallbackOptions) -> List[Observation]:
"""Yields longitude observations for all fresh drones."""
return _check_and_report_all('longitude')
# --- METRIC REGISTRATION ---
def register_death_metrics(meter_name: str = "death.telemetry") -> None:
"""
Register death-specific metrics with the existing MeterProvider.
This function should be called AFTER initialize_meter_provider() has been called.
It creates observable gauges for death telemetry that use callbacks to pull
the latest data from the shared state.
Args:
meter_name: Name for this meter (default: "death.telemetry")
Raises:
RuntimeError: If MeterProvider has not been initialized
Example:
>>> from common.unified_metrics import initialize_meter_provider
>>> from common.threadsafe_otlp_death_metrics import register_death_metrics
>>>
>>> initialize_meter_provider("my-service", "user-1", "http://collector:4318/v1/metrics")
>>> register_death_metrics()
"""
global _METRICS_REGISTERED
if _METRICS_REGISTERED:
logger.debug("Death metrics already registered, skipping")
return
provider = get_meter_provider()
meter = provider.get_meter(meter_name)
# Create observable gauges for death metrics
# These will report ALL fresh drones on each export cycle
meter.create_observable_gauge(
name="death.latitude",
callbacks=[latitude_callback],
description="Latitude of the point of death for each drone"
)
meter.create_observable_gauge(
name="death.longitude",
callbacks=[longitude_callback],
description="Longitude of the point of death for each drone"
)
_METRICS_REGISTERED = True
logger.info(f"Death metrics registered successfully under meter: {meter_name}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment