Skip to content

Instantly share code, notes, and snippets.

@jasonnerothin
Created November 6, 2025 18:28
Show Gist options
  • Select an option

  • Save jasonnerothin/0fde092475983ce1ba5e577689ad1038 to your computer and use it in GitHub Desktop.

Select an option

Save jasonnerothin/0fde092475983ce1ba5e577689ad1038 to your computer and use it in GitHub Desktop.
redis transport layer otlp trace provider
"""
Redis Tracer - Distributed tracing for Redis pub/sub messaging
Provides utilities to publish and subscribe to Redis channels with
W3C Trace Context propagation via message envelopes.
"""
import json
import time
from typing import Optional, Dict, Any, Callable
from uuid import uuid4
import redis
from opentelemetry import trace
from opentelemetry.trace import SpanKind
from opentelemetry.propagate import inject, extract
from common.threadsafe_otlp_traces import (
get_tracer,
set_messaging_producer_attributes,
set_messaging_consumer_attributes,
record_event,
record_exception
)
class MessageEnvelope:
"""
Message envelope with trace context for Redis pub/sub.
Format:
{
"trace_context": {
"traceparent": "00-trace_id-span_id-flags",
"tracestate": "..."
},
"metadata": {
"timestamp": "ISO-8601",
"source_service": "fpv-ops",
"target_service": "striker-002", # or None for broadcast
"channel": "request-lock",
"message_id": "uuid"
},
"payload": {
# Actual message content
}
}
"""
@staticmethod
def create(
payload: Dict[str, Any],
source_service: str,
channel: str,
target_service: Optional[str] = None,
message_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a message envelope with trace context.
Args:
payload: Actual message data
source_service: Name of the publishing service
channel: Redis channel name
target_service: Target service name (None for broadcast)
message_id: Optional message ID (generated if not provided)
Returns:
Complete message envelope dict
"""
# Inject trace context into a carrier dict
carrier = {}
inject(carrier)
envelope = {
"trace_context": carrier,
"metadata": {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"source_service": source_service,
"target_service": target_service,
"channel": channel,
"message_id": message_id or str(uuid4())
},
"payload": payload
}
return envelope
@staticmethod
def parse(message_data: str) -> Optional[Dict[str, Any]]:
"""
Parse message envelope from JSON string.
Args:
message_data: Raw message data from Redis
Returns:
Parsed envelope dict, or None if parsing fails
"""
try:
envelope = json.loads(message_data)
if not isinstance(envelope, dict):
return None
if 'trace_context' not in envelope:
return None
return envelope
except (json.JSONDecodeError, Exception):
return None
@staticmethod
def extract_context(envelope: Dict[str, Any]) -> Optional[trace.Context]:
"""
Extract OpenTelemetry context from envelope.
Args:
envelope: Parsed message envelope
Returns:
OpenTelemetry Context object, or None if extraction fails
"""
try:
trace_context = envelope.get('trace_context', {})
ctx = extract(trace_context)
return ctx
except Exception:
return None
def publish_with_trace(
tracer: trace.Tracer,
redis_client: redis.Redis,
channel: str,
payload: Dict[str, Any],
source_service: str,
target_service: Optional[str] = None,
span_name: Optional[str] = None,
message_id: Optional[str] = None
) -> str:
"""
Publish a message to Redis with trace context propagation.
Creates a PRODUCER span and wraps the payload in a trace envelope.
Args:
tracer: OpenTelemetry tracer instance
redis_client: Connected Redis client
channel: Redis channel to publish to
payload: Message payload (will be wrapped in envelope)
source_service: Name of the publishing service
target_service: Target service name (None for broadcast)
span_name: Optional span name (default: "publish_{channel}")
message_id: Optional message ID (generated if not provided)
Returns:
Message ID of the published message
Example:
from common.redis_tracer import publish_with_trace
msg_id = publish_with_trace(
tracer=tracer,
redis_client=redis_client,
channel="request-lock",
payload={"command": "lock", "requester": "fpv-ops"},
source_service="fpv-ops",
target_service="striker-002"
)
"""
span_name = span_name or f"publish_{channel}"
message_id = message_id or str(uuid4())
with tracer.start_as_current_span(
span_name,
kind=SpanKind.PRODUCER
) as span:
try:
# Set messaging attributes
set_messaging_producer_attributes(
span,
system="redis",
destination=channel,
message_id=message_id,
peer_service="tower"
)
# Add custom attributes
span.set_attribute("message.source_service", source_service)
if target_service:
span.set_attribute("message.target_service", target_service)
span.set_attribute("message.type", "unicast")
else:
span.set_attribute("message.type", "broadcast")
# Create envelope with trace context
envelope = MessageEnvelope.create(
payload=payload,
source_service=source_service,
channel=channel,
target_service=target_service,
message_id=message_id
)
# Publish to Redis
message_json = json.dumps(envelope)
redis_client.publish(channel, message_json)
# Record event
record_event(span, "message_published", {
"channel": channel,
"message_id": message_id,
"payload_size": len(message_json)
})
return message_id
except Exception as e:
record_exception(span, e)
raise
def subscribe_with_trace(
tracer: trace.Tracer,
redis_client: redis.Redis,
channels: list,
handler: Callable[[str, Dict[str, Any], trace.Span], None],
service_name: str,
use_pattern: bool = False
):
"""
Subscribe to Redis channels and handle messages with trace context extraction.
For each message:
1. Extracts trace context from envelope
2. Creates CONSUMER span with parent context
3. Calls handler with payload and span
Args:
tracer: OpenTelemetry tracer instance
redis_client: Connected Redis client
channels: List of channels to subscribe to
handler: Callback function(channel, payload, span)
service_name: Name of the subscribing service
use_pattern: If True, use psubscribe instead of subscribe
Example:
from common.redis_tracer import subscribe_with_trace
def handle_message(channel, payload, span):
print(f"Received on {channel}: {payload}")
span.set_attribute("processed", True)
subscribe_with_trace(
tracer=tracer,
redis_client=redis_client,
channels=["request-lock"],
handler=handle_message,
service_name="striker-002"
)
"""
pubsub = redis_client.pubsub()
# Subscribe to channels
if use_pattern:
for channel in channels:
pubsub.psubscribe(channel)
else:
for channel in channels:
pubsub.subscribe(channel)
try:
for message in pubsub.listen():
msg_type = message['type']
# Handle actual messages
if msg_type in ('message', 'pmessage'):
channel = message['channel']
data = message['data']
# Parse envelope
envelope = MessageEnvelope.parse(data)
if envelope is None:
# Not a traced message, skip or handle without tracing
continue
# Extract trace context
parent_context = MessageEnvelope.extract_context(envelope)
if parent_context is None:
# Invalid trace context, skip
continue
# Get metadata and payload
metadata = envelope.get('metadata', {})
payload = envelope.get('payload', {})
message_id = metadata.get('message_id', 'unknown')
source_service = metadata.get('source_service', 'unknown')
# Create CONSUMER span with parent context
span_name = f"consume_{channel}"
with tracer.start_as_current_span(
span_name,
context=parent_context,
kind=SpanKind.CONSUMER
) as span:
try:
# Set messaging attributes
set_messaging_consumer_attributes(
span,
system="redis",
source=channel,
message_id=message_id
)
# Add custom attributes
span.set_attribute("message.source_service", source_service)
span.set_attribute("message.consumer_service", service_name)
record_event(span, "message_received", {
"channel": channel,
"message_id": message_id
})
# Call handler with payload and span
handler(channel, payload, span)
record_event(span, "message_processed", {
"channel": channel,
"message_id": message_id
})
except Exception as e:
record_exception(span, e)
# Don't re-raise - continue processing other messages
except KeyboardInterrupt:
pass
finally:
pubsub.close()
class TracedRedisSubscriber:
"""
Context manager for traced Redis subscription.
Provides a cleaner interface for subscribing with tracing.
"""
def __init__(
self,
tracer: trace.Tracer,
redis_client: redis.Redis,
service_name: str
):
"""
Initialize traced subscriber.
Args:
tracer: OpenTelemetry tracer instance
redis_client: Connected Redis client
service_name: Name of the subscribing service
"""
self.tracer = tracer
self.redis_client = redis_client
self.service_name = service_name
self.pubsub = None
def subscribe(self, channels: list, use_pattern: bool = False):
"""
Subscribe to channels.
Args:
channels: List of channels to subscribe to
use_pattern: If True, use psubscribe
"""
self.pubsub = self.redis_client.pubsub()
if use_pattern:
for channel in channels:
self.pubsub.psubscribe(channel)
else:
for channel in channels:
self.pubsub.subscribe(channel)
def process_messages(self, handler: Callable[[str, Dict[str, Any], trace.Span], None]):
"""
Process messages with trace context extraction.
Args:
handler: Callback function(channel, payload, span)
"""
if not self.pubsub:
raise RuntimeError("Must call subscribe() first")
for message in self.pubsub.listen():
msg_type = message['type']
if msg_type in ('message', 'pmessage'):
channel = message['channel']
data = message['data']
envelope = MessageEnvelope.parse(data)
if envelope is None:
continue
parent_context = MessageEnvelope.extract_context(envelope)
if parent_context is None:
continue
metadata = envelope.get('metadata', {})
payload = envelope.get('payload', {})
message_id = metadata.get('message_id', 'unknown')
source_service = metadata.get('source_service', 'unknown')
span_name = f"consume_{channel}"
with self.tracer.start_as_current_span(
span_name,
context=parent_context,
kind=SpanKind.CONSUMER
) as span:
try:
set_messaging_consumer_attributes(
span,
system="redis",
source=channel,
message_id=message_id
)
span.set_attribute("message.source_service", source_service)
span.set_attribute("message.consumer_service", self.service_name)
record_event(span, "message_received", {
"channel": channel,
"message_id": message_id
})
handler(channel, payload, span)
record_event(span, "message_processed")
except Exception as e:
record_exception(span, e)
def close(self):
"""Close the subscription."""
if self.pubsub:
self.pubsub.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment