Created
November 6, 2025 18:28
-
-
Save jasonnerothin/0fde092475983ce1ba5e577689ad1038 to your computer and use it in GitHub Desktop.
redis transport layer otlp trace provider
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Redis Tracer - Distributed tracing for Redis pub/sub messaging | |
| Provides utilities to publish and subscribe to Redis channels with | |
| W3C Trace Context propagation via message envelopes. | |
| """ | |
| import json | |
| import time | |
| from typing import Optional, Dict, Any, Callable | |
| from uuid import uuid4 | |
| import redis | |
| from opentelemetry import trace | |
| from opentelemetry.trace import SpanKind | |
| from opentelemetry.propagate import inject, extract | |
| from common.threadsafe_otlp_traces import ( | |
| get_tracer, | |
| set_messaging_producer_attributes, | |
| set_messaging_consumer_attributes, | |
| record_event, | |
| record_exception | |
| ) | |
| class MessageEnvelope: | |
| """ | |
| Message envelope with trace context for Redis pub/sub. | |
| Format: | |
| { | |
| "trace_context": { | |
| "traceparent": "00-trace_id-span_id-flags", | |
| "tracestate": "..." | |
| }, | |
| "metadata": { | |
| "timestamp": "ISO-8601", | |
| "source_service": "fpv-ops", | |
| "target_service": "striker-002", # or None for broadcast | |
| "channel": "request-lock", | |
| "message_id": "uuid" | |
| }, | |
| "payload": { | |
| # Actual message content | |
| } | |
| } | |
| """ | |
| @staticmethod | |
| def create( | |
| payload: Dict[str, Any], | |
| source_service: str, | |
| channel: str, | |
| target_service: Optional[str] = None, | |
| message_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create a message envelope with trace context. | |
| Args: | |
| payload: Actual message data | |
| source_service: Name of the publishing service | |
| channel: Redis channel name | |
| target_service: Target service name (None for broadcast) | |
| message_id: Optional message ID (generated if not provided) | |
| Returns: | |
| Complete message envelope dict | |
| """ | |
| # Inject trace context into a carrier dict | |
| carrier = {} | |
| inject(carrier) | |
| envelope = { | |
| "trace_context": carrier, | |
| "metadata": { | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| "source_service": source_service, | |
| "target_service": target_service, | |
| "channel": channel, | |
| "message_id": message_id or str(uuid4()) | |
| }, | |
| "payload": payload | |
| } | |
| return envelope | |
| @staticmethod | |
| def parse(message_data: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Parse message envelope from JSON string. | |
| Args: | |
| message_data: Raw message data from Redis | |
| Returns: | |
| Parsed envelope dict, or None if parsing fails | |
| """ | |
| try: | |
| envelope = json.loads(message_data) | |
| if not isinstance(envelope, dict): | |
| return None | |
| if 'trace_context' not in envelope: | |
| return None | |
| return envelope | |
| except (json.JSONDecodeError, Exception): | |
| return None | |
| @staticmethod | |
| def extract_context(envelope: Dict[str, Any]) -> Optional[trace.Context]: | |
| """ | |
| Extract OpenTelemetry context from envelope. | |
| Args: | |
| envelope: Parsed message envelope | |
| Returns: | |
| OpenTelemetry Context object, or None if extraction fails | |
| """ | |
| try: | |
| trace_context = envelope.get('trace_context', {}) | |
| ctx = extract(trace_context) | |
| return ctx | |
| except Exception: | |
| return None | |
| def publish_with_trace( | |
| tracer: trace.Tracer, | |
| redis_client: redis.Redis, | |
| channel: str, | |
| payload: Dict[str, Any], | |
| source_service: str, | |
| target_service: Optional[str] = None, | |
| span_name: Optional[str] = None, | |
| message_id: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Publish a message to Redis with trace context propagation. | |
| Creates a PRODUCER span and wraps the payload in a trace envelope. | |
| Args: | |
| tracer: OpenTelemetry tracer instance | |
| redis_client: Connected Redis client | |
| channel: Redis channel to publish to | |
| payload: Message payload (will be wrapped in envelope) | |
| source_service: Name of the publishing service | |
| target_service: Target service name (None for broadcast) | |
| span_name: Optional span name (default: "publish_{channel}") | |
| message_id: Optional message ID (generated if not provided) | |
| Returns: | |
| Message ID of the published message | |
| Example: | |
| from common.redis_tracer import publish_with_trace | |
| msg_id = publish_with_trace( | |
| tracer=tracer, | |
| redis_client=redis_client, | |
| channel="request-lock", | |
| payload={"command": "lock", "requester": "fpv-ops"}, | |
| source_service="fpv-ops", | |
| target_service="striker-002" | |
| ) | |
| """ | |
| span_name = span_name or f"publish_{channel}" | |
| message_id = message_id or str(uuid4()) | |
| with tracer.start_as_current_span( | |
| span_name, | |
| kind=SpanKind.PRODUCER | |
| ) as span: | |
| try: | |
| # Set messaging attributes | |
| set_messaging_producer_attributes( | |
| span, | |
| system="redis", | |
| destination=channel, | |
| message_id=message_id, | |
| peer_service="tower" | |
| ) | |
| # Add custom attributes | |
| span.set_attribute("message.source_service", source_service) | |
| if target_service: | |
| span.set_attribute("message.target_service", target_service) | |
| span.set_attribute("message.type", "unicast") | |
| else: | |
| span.set_attribute("message.type", "broadcast") | |
| # Create envelope with trace context | |
| envelope = MessageEnvelope.create( | |
| payload=payload, | |
| source_service=source_service, | |
| channel=channel, | |
| target_service=target_service, | |
| message_id=message_id | |
| ) | |
| # Publish to Redis | |
| message_json = json.dumps(envelope) | |
| redis_client.publish(channel, message_json) | |
| # Record event | |
| record_event(span, "message_published", { | |
| "channel": channel, | |
| "message_id": message_id, | |
| "payload_size": len(message_json) | |
| }) | |
| return message_id | |
| except Exception as e: | |
| record_exception(span, e) | |
| raise | |
| def subscribe_with_trace( | |
| tracer: trace.Tracer, | |
| redis_client: redis.Redis, | |
| channels: list, | |
| handler: Callable[[str, Dict[str, Any], trace.Span], None], | |
| service_name: str, | |
| use_pattern: bool = False | |
| ): | |
| """ | |
| Subscribe to Redis channels and handle messages with trace context extraction. | |
| For each message: | |
| 1. Extracts trace context from envelope | |
| 2. Creates CONSUMER span with parent context | |
| 3. Calls handler with payload and span | |
| Args: | |
| tracer: OpenTelemetry tracer instance | |
| redis_client: Connected Redis client | |
| channels: List of channels to subscribe to | |
| handler: Callback function(channel, payload, span) | |
| service_name: Name of the subscribing service | |
| use_pattern: If True, use psubscribe instead of subscribe | |
| Example: | |
| from common.redis_tracer import subscribe_with_trace | |
| def handle_message(channel, payload, span): | |
| print(f"Received on {channel}: {payload}") | |
| span.set_attribute("processed", True) | |
| subscribe_with_trace( | |
| tracer=tracer, | |
| redis_client=redis_client, | |
| channels=["request-lock"], | |
| handler=handle_message, | |
| service_name="striker-002" | |
| ) | |
| """ | |
| pubsub = redis_client.pubsub() | |
| # Subscribe to channels | |
| if use_pattern: | |
| for channel in channels: | |
| pubsub.psubscribe(channel) | |
| else: | |
| for channel in channels: | |
| pubsub.subscribe(channel) | |
| try: | |
| for message in pubsub.listen(): | |
| msg_type = message['type'] | |
| # Handle actual messages | |
| if msg_type in ('message', 'pmessage'): | |
| channel = message['channel'] | |
| data = message['data'] | |
| # Parse envelope | |
| envelope = MessageEnvelope.parse(data) | |
| if envelope is None: | |
| # Not a traced message, skip or handle without tracing | |
| continue | |
| # Extract trace context | |
| parent_context = MessageEnvelope.extract_context(envelope) | |
| if parent_context is None: | |
| # Invalid trace context, skip | |
| continue | |
| # Get metadata and payload | |
| metadata = envelope.get('metadata', {}) | |
| payload = envelope.get('payload', {}) | |
| message_id = metadata.get('message_id', 'unknown') | |
| source_service = metadata.get('source_service', 'unknown') | |
| # Create CONSUMER span with parent context | |
| span_name = f"consume_{channel}" | |
| with tracer.start_as_current_span( | |
| span_name, | |
| context=parent_context, | |
| kind=SpanKind.CONSUMER | |
| ) as span: | |
| try: | |
| # Set messaging attributes | |
| set_messaging_consumer_attributes( | |
| span, | |
| system="redis", | |
| source=channel, | |
| message_id=message_id | |
| ) | |
| # Add custom attributes | |
| span.set_attribute("message.source_service", source_service) | |
| span.set_attribute("message.consumer_service", service_name) | |
| record_event(span, "message_received", { | |
| "channel": channel, | |
| "message_id": message_id | |
| }) | |
| # Call handler with payload and span | |
| handler(channel, payload, span) | |
| record_event(span, "message_processed", { | |
| "channel": channel, | |
| "message_id": message_id | |
| }) | |
| except Exception as e: | |
| record_exception(span, e) | |
| # Don't re-raise - continue processing other messages | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| pubsub.close() | |
| class TracedRedisSubscriber: | |
| """ | |
| Context manager for traced Redis subscription. | |
| Provides a cleaner interface for subscribing with tracing. | |
| """ | |
| def __init__( | |
| self, | |
| tracer: trace.Tracer, | |
| redis_client: redis.Redis, | |
| service_name: str | |
| ): | |
| """ | |
| Initialize traced subscriber. | |
| Args: | |
| tracer: OpenTelemetry tracer instance | |
| redis_client: Connected Redis client | |
| service_name: Name of the subscribing service | |
| """ | |
| self.tracer = tracer | |
| self.redis_client = redis_client | |
| self.service_name = service_name | |
| self.pubsub = None | |
| def subscribe(self, channels: list, use_pattern: bool = False): | |
| """ | |
| Subscribe to channels. | |
| Args: | |
| channels: List of channels to subscribe to | |
| use_pattern: If True, use psubscribe | |
| """ | |
| self.pubsub = self.redis_client.pubsub() | |
| if use_pattern: | |
| for channel in channels: | |
| self.pubsub.psubscribe(channel) | |
| else: | |
| for channel in channels: | |
| self.pubsub.subscribe(channel) | |
| def process_messages(self, handler: Callable[[str, Dict[str, Any], trace.Span], None]): | |
| """ | |
| Process messages with trace context extraction. | |
| Args: | |
| handler: Callback function(channel, payload, span) | |
| """ | |
| if not self.pubsub: | |
| raise RuntimeError("Must call subscribe() first") | |
| for message in self.pubsub.listen(): | |
| msg_type = message['type'] | |
| if msg_type in ('message', 'pmessage'): | |
| channel = message['channel'] | |
| data = message['data'] | |
| envelope = MessageEnvelope.parse(data) | |
| if envelope is None: | |
| continue | |
| parent_context = MessageEnvelope.extract_context(envelope) | |
| if parent_context is None: | |
| continue | |
| metadata = envelope.get('metadata', {}) | |
| payload = envelope.get('payload', {}) | |
| message_id = metadata.get('message_id', 'unknown') | |
| source_service = metadata.get('source_service', 'unknown') | |
| span_name = f"consume_{channel}" | |
| with self.tracer.start_as_current_span( | |
| span_name, | |
| context=parent_context, | |
| kind=SpanKind.CONSUMER | |
| ) as span: | |
| try: | |
| set_messaging_consumer_attributes( | |
| span, | |
| system="redis", | |
| source=channel, | |
| message_id=message_id | |
| ) | |
| span.set_attribute("message.source_service", source_service) | |
| span.set_attribute("message.consumer_service", self.service_name) | |
| record_event(span, "message_received", { | |
| "channel": channel, | |
| "message_id": message_id | |
| }) | |
| handler(channel, payload, span) | |
| record_event(span, "message_processed") | |
| except Exception as e: | |
| record_exception(span, e) | |
| def close(self): | |
| """Close the subscription.""" | |
| if self.pubsub: | |
| self.pubsub.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment