Skip to content

Instantly share code, notes, and snippets.

@jasonnerothin
Created November 6, 2025 18:07
Show Gist options
  • Select an option

  • Save jasonnerothin/0e878fc436622086eb92699d2c0615c3 to your computer and use it in GitHub Desktop.

Select an option

Save jasonnerothin/0e878fc436622086eb92699d2c0615c3 to your computer and use it in GitHub Desktop.
redis sidecar supporting otlp metrics and traces
#!/usr/bin/env python3
"""
Tower Sidecar - Unified Metrics and Distributed Tracing Bridge
Monitors Redis channels and provides:
1. Metrics: Message throughput, rates (existing functionality)
2. Traces: Distributed trace context propagation (new functionality)
Implements the three-span pattern for async messaging while maintaining
metrics collection.
"""
import os
import sys
import json
import time
import logging
from typing import Optional, Dict, Any
import redis
from opentelemetry import trace, metrics
from opentelemetry.trace import SpanKind
from opentelemetry.propagate import extract
# Metrics
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource as MetricResource
# Import our enhanced tracing module
from common.threadsafe_otlp_traces import (
initialize_traces,
get_tracer,
record_event,
set_messaging_consumer_attributes,
set_messaging_producer_attributes,
set_sidecar_attributes,
shutdown_traces
)
# Configure logging
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logging.getLogger().handlers[0].flush = lambda: sys.stdout.flush()
logger = logging.getLogger(__name__)
class MessageEnvelope:
"""Parse and handle message envelopes with trace context."""
@staticmethod
def parse(message_data: str) -> Optional[Dict[str, Any]]:
"""
Parse message envelope from JSON string.
Returns None if message is not a valid envelope.
"""
try:
envelope = json.loads(message_data)
if not isinstance(envelope, dict):
return None
if 'trace_context' not in envelope:
return None
return envelope
except (json.JSONDecodeError, Exception):
return None
@staticmethod
def extract_trace_context(envelope: Dict[str, Any]) -> Optional[trace.Context]:
"""Extract OpenTelemetry context from envelope."""
try:
trace_context = envelope.get('trace_context', {})
ctx = extract(trace_context)
return ctx
except Exception as e:
logger.debug(f"Failed to extract trace context: {e}")
return None
class UnifiedTowerSidecar:
"""
Unified sidecar that provides both metrics and distributed tracing.
Metrics (existing):
- redis.channel.messages.total (counter)
- redis.channel.messages.rate (gauge)
Traces (new):
- CONSUMER span: tower receives from publisher
- PRODUCER span: tower forwards to subscribers
"""
def __init__(
self,
redis_host: str,
redis_port: int,
channel: str,
service_name: str,
otel_endpoint: str,
heartbeat_interval: int = 60,
user_id: str = "default-user"
):
"""
Initialize the unified tower sidecar.
Args:
redis_host: Redis hostname
redis_port: Redis port
channel: Channel or pattern to monitor
service_name: Service name for telemetry
otel_endpoint: OTLP collector endpoint
heartbeat_interval: Seconds between heartbeat logs
user_id: User identifier for metrics
"""
self.redis_host = redis_host
self.redis_port = redis_port
self.channel = channel
self.service_name = service_name
self.otel_endpoint = otel_endpoint
self.heartbeat_interval = heartbeat_interval
self.user_id = user_id
# Statistics
self.stats = {
'messages_total': 0,
'messages_with_trace': 0,
'messages_without_trace': 0,
'last_count': 0,
'last_time': time.time()
}
# ========================================================
# Initialize TRACES
# ========================================================
initialize_traces(
service_name=service_name,
exporter_endpoint=otel_endpoint,
deployment_environment=os.getenv('DEPLOYMENT_ENV', 'production')
)
self.tracer = get_tracer(__name__)
# ========================================================
# Initialize METRICS
# ========================================================
metric_resource = MetricResource(attributes={
"service.name": service_name,
"service.version": "1.0.0",
"user.id": user_id,
})
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=f"{otel_endpoint}/v1/metrics"),
export_interval_millis=10000 # Export every 10 seconds
)
metric_provider = MeterProvider(
resource=metric_resource,
metric_readers=[metric_reader]
)
metrics.set_meter_provider(metric_provider)
# Create metrics
meter = metrics.get_meter("redis.monitor", version="1.0.0")
self.message_counter = meter.create_counter(
name="redis.channel.messages.total",
description="Total number of messages on Redis channel",
unit="messages"
)
# Message rate gauge with callback
def get_message_rate(options):
current_time = time.time()
time_delta = current_time - self.stats['last_time']
if time_delta > 0:
count_delta = self.stats['messages_total'] - self.stats['last_count']
rate = count_delta / time_delta
self.stats['last_count'] = self.stats['messages_total']
self.stats['last_time'] = current_time
yield metrics.Observation(rate, {"channel": self.channel})
self.message_rate = meter.create_observable_gauge(
name="redis.channel.messages.rate",
description="Message rate per second",
unit="messages/s",
callbacks=[get_message_rate]
)
logger.info("=" * 60)
logger.info("Unified Tower Sidecar initialized")
logger.info(f" Service: {service_name}")
logger.info(f" Redis: {redis_host}:{redis_port}")
logger.info(f" Channel: {channel}")
logger.info(f" OTLP: {otel_endpoint}")
logger.info(f" Metrics: ✓ Enabled")
logger.info(f" Traces: ✓ Enabled")
logger.info("=" * 60)
def connect_redis(self, max_retries: int = 5, retry_delay: int = 5) -> redis.Redis:
"""Connect to Redis with retry logic."""
for attempt in range(max_retries):
try:
client = redis.Redis(
host=self.redis_host,
port=self.redis_port,
decode_responses=True
)
client.ping()
logger.info(f"✓ Connected to Redis at {self.redis_host}:{self.redis_port}")
return client
except redis.ConnectionError as e:
if attempt < max_retries - 1:
logger.warning(
f"Failed to connect (attempt {attempt + 1}/{max_retries}): {e}"
)
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
logger.error(f"Could not connect after {max_retries} attempts")
raise
def process_message(self, channel: str, message_data: str):
"""
Process a single message: update metrics AND create trace spans.
This function does both:
1. Increments message counter (metrics)
2. Creates CONSUMER → PRODUCER spans (traces)
Args:
channel: Redis channel name
message_data: Raw message data
"""
# ========================================================
# METRICS: Always increment counter for all messages
# ========================================================
self.stats['messages_total'] += 1
self.message_counter.add(1, {"channel": channel})
# Log first message immediately
if self.stats['messages_total'] == 1:
logger.info(f"✓ First message received on {channel}")
# Periodic logging
if self.stats['messages_total'] % 10 == 0:
logger.info(
f"Processed {self.stats['messages_total']} messages "
f"({self.stats['messages_with_trace']} traced)"
)
# ========================================================
# TRACES: Create spans if message has trace context
# ========================================================
envelope = MessageEnvelope.parse(message_data)
if envelope is None:
# Non-envelope message: metrics recorded, no trace
self.stats['messages_without_trace'] += 1
logger.debug(f"Non-envelope message on {channel}: {message_data[:100]}")
return
parent_context = MessageEnvelope.extract_trace_context(envelope)
if parent_context is None:
# Envelope without valid trace: metrics recorded, no trace
self.stats['messages_without_trace'] += 1
logger.debug(f"Message without valid trace context on {channel}")
return
# Message has valid trace context: create trace spans
self.stats['messages_with_trace'] += 1
metadata = envelope.get('metadata', {})
message_id = metadata.get('message_id', 'unknown')
source_service = metadata.get('source_service', 'unknown')
target_service = metadata.get('target_service')
# ================================================================
# TRACE SPAN 1: CONSUMER (tower receives from publisher)
# ================================================================
with self.tracer.start_as_current_span(
f"tower_receive_{channel}",
context=parent_context,
kind=SpanKind.CONSUMER
) as consumer_span:
set_messaging_consumer_attributes(
consumer_span,
system="redis",
source=channel,
message_id=message_id
)
set_sidecar_attributes(
consumer_span,
role="bridge",
channel=channel,
action="observe"
)
consumer_span.set_attribute("message.source_service", source_service)
if target_service:
consumer_span.set_attribute("message.target_service", target_service)
consumer_span.set_attribute("message.type", "unicast")
else:
consumer_span.set_attribute("message.type", "broadcast")
record_event(consumer_span, "message_received", {
"channel": channel,
"message_id": message_id
})
logger.debug(
f"[TRACE] Received on {channel} from {source_service} "
f"(msg_id: {message_id})"
)
# ================================================================
# TRACE SPAN 2: PRODUCER (tower forwards to subscribers)
# ================================================================
with self.tracer.start_as_current_span(
f"tower_forward_{channel}",
kind=SpanKind.PRODUCER
) as producer_span:
set_messaging_producer_attributes(
producer_span,
system="redis",
destination=channel,
message_id=message_id,
peer_service="tower"
)
set_sidecar_attributes(
producer_span,
role="bridge",
channel=channel,
action="forward"
)
record_event(producer_span, "message_forwarded", {
"channel": channel,
"message_id": message_id
})
logger.debug(f"[TRACE] Forwarded on {channel} (msg_id: {message_id})")
def run(self):
"""Main sidecar loop: subscribe to channel and process messages."""
logger.info("Starting unified metrics + tracing...")
# Connect to Redis
r = self.connect_redis()
# Subscribe
pubsub = r.pubsub()
if '*' in self.channel or '?' in self.channel:
pubsub.psubscribe(self.channel)
logger.info(f"Subscribed to pattern: {self.channel}")
else:
pubsub.subscribe(self.channel)
logger.info(f"Subscribed to channel: {self.channel}")
logger.info("Monitoring messages...")
last_heartbeat = time.time()
try:
for message in pubsub.listen():
# Heartbeat
current_time = time.time()
if current_time - last_heartbeat > self.heartbeat_interval:
logger.info(
f"Heartbeat: {self.stats['messages_total']} messages "
f"({self.stats['messages_with_trace']} traced)"
)
last_heartbeat = current_time
msg_type = message['type']
if msg_type in ('message', 'pmessage'):
channel = message['channel']
data = message['data']
# Process: updates metrics AND creates traces
self.process_message(channel, data)
elif msg_type in ('subscribe', 'psubscribe'):
pattern = message.get('pattern') or message.get('channel')
logger.info(f"✓ Subscribed to: {pattern}")
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
raise
finally:
pubsub.close()
shutdown_traces()
logger.info("=" * 60)
logger.info("Unified Sidecar Statistics:")
logger.info(f" Total messages: {self.stats['messages_total']}")
logger.info(f" Traced messages: {self.stats['messages_with_trace']}")
logger.info(f" Untraced messages: {self.stats['messages_without_trace']}")
logger.info("=" * 60)
def main():
"""Main entry point."""
redis_host = os.getenv('REDIS_HOST', 'tower-service')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
otel_endpoint = os.getenv('OTEL_COLLECTOR_BASE_URL', 'http://localhost:4318')
channel = os.getenv('MONITOR_CHANNEL', 'drone_command')
service_name = os.getenv('SERVICE_NAME', 'tower-sidecar')
user_id = os.getenv('USER_ID', 'default-user')
heartbeat_interval = int(os.getenv('HEARTBEAT_INTERVAL_SECS', '60'))
sidecar = UnifiedTowerSidecar(
redis_host=redis_host,
redis_port=redis_port,
channel=channel,
service_name=service_name,
otel_endpoint=otel_endpoint,
heartbeat_interval=heartbeat_interval,
user_id=user_id
)
sidecar.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment