Created
November 6, 2025 18:07
-
-
Save jasonnerothin/0e878fc436622086eb92699d2c0615c3 to your computer and use it in GitHub Desktop.
redis sidecar supporting otlp metrics and traces
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Tower Sidecar - Unified Metrics and Distributed Tracing Bridge | |
| Monitors Redis channels and provides: | |
| 1. Metrics: Message throughput, rates (existing functionality) | |
| 2. Traces: Distributed trace context propagation (new functionality) | |
| Implements the three-span pattern for async messaging while maintaining | |
| metrics collection. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import logging | |
| from typing import Optional, Dict, Any | |
| import redis | |
| from opentelemetry import trace, metrics | |
| from opentelemetry.trace import SpanKind | |
| from opentelemetry.propagate import extract | |
| # Metrics | |
| from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter | |
| from opentelemetry.sdk.metrics import MeterProvider | |
| from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader | |
| from opentelemetry.sdk.resources import Resource as MetricResource | |
| # Import our enhanced tracing module | |
| from common.threadsafe_otlp_traces import ( | |
| initialize_traces, | |
| get_tracer, | |
| record_event, | |
| set_messaging_consumer_attributes, | |
| set_messaging_producer_attributes, | |
| set_sidecar_attributes, | |
| shutdown_traces | |
| ) | |
| # Configure logging | |
| log_level = os.getenv('LOG_LEVEL', 'INFO').upper() | |
| logging.basicConfig( | |
| level=getattr(logging, log_level, logging.INFO), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[logging.StreamHandler(sys.stdout)] | |
| ) | |
| logging.getLogger().handlers[0].flush = lambda: sys.stdout.flush() | |
| logger = logging.getLogger(__name__) | |
| class MessageEnvelope: | |
| """Parse and handle message envelopes with trace context.""" | |
| @staticmethod | |
| def parse(message_data: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Parse message envelope from JSON string. | |
| Returns None if message is not a valid envelope. | |
| """ | |
| try: | |
| envelope = json.loads(message_data) | |
| if not isinstance(envelope, dict): | |
| return None | |
| if 'trace_context' not in envelope: | |
| return None | |
| return envelope | |
| except (json.JSONDecodeError, Exception): | |
| return None | |
| @staticmethod | |
| def extract_trace_context(envelope: Dict[str, Any]) -> Optional[trace.Context]: | |
| """Extract OpenTelemetry context from envelope.""" | |
| try: | |
| trace_context = envelope.get('trace_context', {}) | |
| ctx = extract(trace_context) | |
| return ctx | |
| except Exception as e: | |
| logger.debug(f"Failed to extract trace context: {e}") | |
| return None | |
| class UnifiedTowerSidecar: | |
| """ | |
| Unified sidecar that provides both metrics and distributed tracing. | |
| Metrics (existing): | |
| - redis.channel.messages.total (counter) | |
| - redis.channel.messages.rate (gauge) | |
| Traces (new): | |
| - CONSUMER span: tower receives from publisher | |
| - PRODUCER span: tower forwards to subscribers | |
| """ | |
| def __init__( | |
| self, | |
| redis_host: str, | |
| redis_port: int, | |
| channel: str, | |
| service_name: str, | |
| otel_endpoint: str, | |
| heartbeat_interval: int = 60, | |
| user_id: str = "default-user" | |
| ): | |
| """ | |
| Initialize the unified tower sidecar. | |
| Args: | |
| redis_host: Redis hostname | |
| redis_port: Redis port | |
| channel: Channel or pattern to monitor | |
| service_name: Service name for telemetry | |
| otel_endpoint: OTLP collector endpoint | |
| heartbeat_interval: Seconds between heartbeat logs | |
| user_id: User identifier for metrics | |
| """ | |
| self.redis_host = redis_host | |
| self.redis_port = redis_port | |
| self.channel = channel | |
| self.service_name = service_name | |
| self.otel_endpoint = otel_endpoint | |
| self.heartbeat_interval = heartbeat_interval | |
| self.user_id = user_id | |
| # Statistics | |
| self.stats = { | |
| 'messages_total': 0, | |
| 'messages_with_trace': 0, | |
| 'messages_without_trace': 0, | |
| 'last_count': 0, | |
| 'last_time': time.time() | |
| } | |
| # ======================================================== | |
| # Initialize TRACES | |
| # ======================================================== | |
| initialize_traces( | |
| service_name=service_name, | |
| exporter_endpoint=otel_endpoint, | |
| deployment_environment=os.getenv('DEPLOYMENT_ENV', 'production') | |
| ) | |
| self.tracer = get_tracer(__name__) | |
| # ======================================================== | |
| # Initialize METRICS | |
| # ======================================================== | |
| metric_resource = MetricResource(attributes={ | |
| "service.name": service_name, | |
| "service.version": "1.0.0", | |
| "user.id": user_id, | |
| }) | |
| metric_reader = PeriodicExportingMetricReader( | |
| OTLPMetricExporter(endpoint=f"{otel_endpoint}/v1/metrics"), | |
| export_interval_millis=10000 # Export every 10 seconds | |
| ) | |
| metric_provider = MeterProvider( | |
| resource=metric_resource, | |
| metric_readers=[metric_reader] | |
| ) | |
| metrics.set_meter_provider(metric_provider) | |
| # Create metrics | |
| meter = metrics.get_meter("redis.monitor", version="1.0.0") | |
| self.message_counter = meter.create_counter( | |
| name="redis.channel.messages.total", | |
| description="Total number of messages on Redis channel", | |
| unit="messages" | |
| ) | |
| # Message rate gauge with callback | |
| def get_message_rate(options): | |
| current_time = time.time() | |
| time_delta = current_time - self.stats['last_time'] | |
| if time_delta > 0: | |
| count_delta = self.stats['messages_total'] - self.stats['last_count'] | |
| rate = count_delta / time_delta | |
| self.stats['last_count'] = self.stats['messages_total'] | |
| self.stats['last_time'] = current_time | |
| yield metrics.Observation(rate, {"channel": self.channel}) | |
| self.message_rate = meter.create_observable_gauge( | |
| name="redis.channel.messages.rate", | |
| description="Message rate per second", | |
| unit="messages/s", | |
| callbacks=[get_message_rate] | |
| ) | |
| logger.info("=" * 60) | |
| logger.info("Unified Tower Sidecar initialized") | |
| logger.info(f" Service: {service_name}") | |
| logger.info(f" Redis: {redis_host}:{redis_port}") | |
| logger.info(f" Channel: {channel}") | |
| logger.info(f" OTLP: {otel_endpoint}") | |
| logger.info(f" Metrics: ✓ Enabled") | |
| logger.info(f" Traces: ✓ Enabled") | |
| logger.info("=" * 60) | |
| def connect_redis(self, max_retries: int = 5, retry_delay: int = 5) -> redis.Redis: | |
| """Connect to Redis with retry logic.""" | |
| for attempt in range(max_retries): | |
| try: | |
| client = redis.Redis( | |
| host=self.redis_host, | |
| port=self.redis_port, | |
| decode_responses=True | |
| ) | |
| client.ping() | |
| logger.info(f"✓ Connected to Redis at {self.redis_host}:{self.redis_port}") | |
| return client | |
| except redis.ConnectionError as e: | |
| if attempt < max_retries - 1: | |
| logger.warning( | |
| f"Failed to connect (attempt {attempt + 1}/{max_retries}): {e}" | |
| ) | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| logger.error(f"Could not connect after {max_retries} attempts") | |
| raise | |
| def process_message(self, channel: str, message_data: str): | |
| """ | |
| Process a single message: update metrics AND create trace spans. | |
| This function does both: | |
| 1. Increments message counter (metrics) | |
| 2. Creates CONSUMER → PRODUCER spans (traces) | |
| Args: | |
| channel: Redis channel name | |
| message_data: Raw message data | |
| """ | |
| # ======================================================== | |
| # METRICS: Always increment counter for all messages | |
| # ======================================================== | |
| self.stats['messages_total'] += 1 | |
| self.message_counter.add(1, {"channel": channel}) | |
| # Log first message immediately | |
| if self.stats['messages_total'] == 1: | |
| logger.info(f"✓ First message received on {channel}") | |
| # Periodic logging | |
| if self.stats['messages_total'] % 10 == 0: | |
| logger.info( | |
| f"Processed {self.stats['messages_total']} messages " | |
| f"({self.stats['messages_with_trace']} traced)" | |
| ) | |
| # ======================================================== | |
| # TRACES: Create spans if message has trace context | |
| # ======================================================== | |
| envelope = MessageEnvelope.parse(message_data) | |
| if envelope is None: | |
| # Non-envelope message: metrics recorded, no trace | |
| self.stats['messages_without_trace'] += 1 | |
| logger.debug(f"Non-envelope message on {channel}: {message_data[:100]}") | |
| return | |
| parent_context = MessageEnvelope.extract_trace_context(envelope) | |
| if parent_context is None: | |
| # Envelope without valid trace: metrics recorded, no trace | |
| self.stats['messages_without_trace'] += 1 | |
| logger.debug(f"Message without valid trace context on {channel}") | |
| return | |
| # Message has valid trace context: create trace spans | |
| self.stats['messages_with_trace'] += 1 | |
| metadata = envelope.get('metadata', {}) | |
| message_id = metadata.get('message_id', 'unknown') | |
| source_service = metadata.get('source_service', 'unknown') | |
| target_service = metadata.get('target_service') | |
| # ================================================================ | |
| # TRACE SPAN 1: CONSUMER (tower receives from publisher) | |
| # ================================================================ | |
| with self.tracer.start_as_current_span( | |
| f"tower_receive_{channel}", | |
| context=parent_context, | |
| kind=SpanKind.CONSUMER | |
| ) as consumer_span: | |
| set_messaging_consumer_attributes( | |
| consumer_span, | |
| system="redis", | |
| source=channel, | |
| message_id=message_id | |
| ) | |
| set_sidecar_attributes( | |
| consumer_span, | |
| role="bridge", | |
| channel=channel, | |
| action="observe" | |
| ) | |
| consumer_span.set_attribute("message.source_service", source_service) | |
| if target_service: | |
| consumer_span.set_attribute("message.target_service", target_service) | |
| consumer_span.set_attribute("message.type", "unicast") | |
| else: | |
| consumer_span.set_attribute("message.type", "broadcast") | |
| record_event(consumer_span, "message_received", { | |
| "channel": channel, | |
| "message_id": message_id | |
| }) | |
| logger.debug( | |
| f"[TRACE] Received on {channel} from {source_service} " | |
| f"(msg_id: {message_id})" | |
| ) | |
| # ================================================================ | |
| # TRACE SPAN 2: PRODUCER (tower forwards to subscribers) | |
| # ================================================================ | |
| with self.tracer.start_as_current_span( | |
| f"tower_forward_{channel}", | |
| kind=SpanKind.PRODUCER | |
| ) as producer_span: | |
| set_messaging_producer_attributes( | |
| producer_span, | |
| system="redis", | |
| destination=channel, | |
| message_id=message_id, | |
| peer_service="tower" | |
| ) | |
| set_sidecar_attributes( | |
| producer_span, | |
| role="bridge", | |
| channel=channel, | |
| action="forward" | |
| ) | |
| record_event(producer_span, "message_forwarded", { | |
| "channel": channel, | |
| "message_id": message_id | |
| }) | |
| logger.debug(f"[TRACE] Forwarded on {channel} (msg_id: {message_id})") | |
| def run(self): | |
| """Main sidecar loop: subscribe to channel and process messages.""" | |
| logger.info("Starting unified metrics + tracing...") | |
| # Connect to Redis | |
| r = self.connect_redis() | |
| # Subscribe | |
| pubsub = r.pubsub() | |
| if '*' in self.channel or '?' in self.channel: | |
| pubsub.psubscribe(self.channel) | |
| logger.info(f"Subscribed to pattern: {self.channel}") | |
| else: | |
| pubsub.subscribe(self.channel) | |
| logger.info(f"Subscribed to channel: {self.channel}") | |
| logger.info("Monitoring messages...") | |
| last_heartbeat = time.time() | |
| try: | |
| for message in pubsub.listen(): | |
| # Heartbeat | |
| current_time = time.time() | |
| if current_time - last_heartbeat > self.heartbeat_interval: | |
| logger.info( | |
| f"Heartbeat: {self.stats['messages_total']} messages " | |
| f"({self.stats['messages_with_trace']} traced)" | |
| ) | |
| last_heartbeat = current_time | |
| msg_type = message['type'] | |
| if msg_type in ('message', 'pmessage'): | |
| channel = message['channel'] | |
| data = message['data'] | |
| # Process: updates metrics AND creates traces | |
| self.process_message(channel, data) | |
| elif msg_type in ('subscribe', 'psubscribe'): | |
| pattern = message.get('pattern') or message.get('channel') | |
| logger.info(f"✓ Subscribed to: {pattern}") | |
| except KeyboardInterrupt: | |
| logger.info("Shutting down...") | |
| except Exception as e: | |
| logger.error(f"Error: {e}", exc_info=True) | |
| raise | |
| finally: | |
| pubsub.close() | |
| shutdown_traces() | |
| logger.info("=" * 60) | |
| logger.info("Unified Sidecar Statistics:") | |
| logger.info(f" Total messages: {self.stats['messages_total']}") | |
| logger.info(f" Traced messages: {self.stats['messages_with_trace']}") | |
| logger.info(f" Untraced messages: {self.stats['messages_without_trace']}") | |
| logger.info("=" * 60) | |
| def main(): | |
| """Main entry point.""" | |
| redis_host = os.getenv('REDIS_HOST', 'tower-service') | |
| redis_port = int(os.getenv('REDIS_PORT', '6379')) | |
| otel_endpoint = os.getenv('OTEL_COLLECTOR_BASE_URL', 'http://localhost:4318') | |
| channel = os.getenv('MONITOR_CHANNEL', 'drone_command') | |
| service_name = os.getenv('SERVICE_NAME', 'tower-sidecar') | |
| user_id = os.getenv('USER_ID', 'default-user') | |
| heartbeat_interval = int(os.getenv('HEARTBEAT_INTERVAL_SECS', '60')) | |
| sidecar = UnifiedTowerSidecar( | |
| redis_host=redis_host, | |
| redis_port=redis_port, | |
| channel=channel, | |
| service_name=service_name, | |
| otel_endpoint=otel_endpoint, | |
| heartbeat_interval=heartbeat_interval, | |
| user_id=user_id | |
| ) | |
| sidecar.run() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment