Skip to content

Instantly share code, notes, and snippets.

@avaitla
Created March 12, 2026 22:05
Show Gist options
  • Select an option

  • Save avaitla/8e383625ea17fc3b724006796be323fd to your computer and use it in GitHub Desktop.

Select an option

Save avaitla/8e383625ea17fc3b724006796be323fd to your computer and use it in GitHub Desktop.
Send test traces / logs / metrics to grpc endpoint
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "opentelemetry-api",
# "opentelemetry-sdk",
# "opentelemetry-exporter-otlp-proto-grpc",
# ]
# ///
"""Send example OTLP traces, logs, and metrics to a gRPC endpoint."""
import argparse
import logging
import shlex
import sys
import time
import random
from contextlib import nullcontext as _noop_ctx
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry._logs import set_logger_provider
# ANSI colors
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
GRAY = "\033[90m"
def parse_args():
parser = argparse.ArgumentParser(description="Send example OTLP telemetry")
parser.add_argument("--grpc", default=None, help="gRPC endpoint (host:port)")
parser.add_argument("--logs", action="store_true", help="Send example logs")
parser.add_argument("--traces", action="store_true", help="Send example traces")
parser.add_argument("--metrics", action="store_true", help="Send example metrics")
parser.add_argument("--count", type=int, default=None, help="Number of example requests to simulate")
parser.add_argument("--sleep", type=float, default=None, help="Sleep between sends in seconds (default: 0.5)")
return parser.parse_args()
def prompt_interactive():
"""Ask the user for configuration and return the equivalent CLI args."""
print(f"{BOLD}No flags provided — entering interactive mode.{RESET}\n")
grpc = input("gRPC endpoint (host:port): ").strip()
if not grpc:
print("Endpoint is required.")
sys.exit(1)
print("\nWhich signals to send?")
send_traces = input(" Send traces? [Y/n]: ").strip().lower() not in ("n", "no")
send_logs = input(" Send logs? [Y/n]: ").strip().lower() not in ("n", "no")
send_metrics = input(" Send metrics? [Y/n]: ").strip().lower() not in ("n", "no")
if not (send_traces or send_logs or send_metrics):
print("Nothing selected.")
sys.exit(1)
count_str = input("\nNumber of example requests [15]: ").strip()
count = int(count_str) if count_str else 15
sleep_str = input("Sleep between sends in seconds [0.5]: ").strip()
sleep_s = float(sleep_str) if sleep_str else 0.5
# Build and print the reusable command
parts = ["uv run send_otlp.py", f"--grpc={shlex.quote(grpc)}"]
if send_traces:
parts.append("--traces")
if send_logs:
parts.append("--logs")
if send_metrics:
parts.append("--metrics")
if count != 15:
parts.append(f"--count={count}")
if sleep_s != 0.5:
parts.append(f"--sleep={sleep_s}")
cmd = " ".join(parts)
print(f"\n{DIM}Run again with:{RESET}\n {BOLD}{cmd}{RESET}\n")
return argparse.Namespace(
grpc=grpc,
traces=send_traces,
logs=send_logs,
metrics=send_metrics,
count=count,
sleep=sleep_s,
)
def needs_interactive(args):
"""True if no meaningful flags were provided."""
return args.grpc is None and not args.traces and not args.logs and not args.metrics
def run(args):
endpoint = f"http://{args.grpc}"
sleep_s = args.sleep if args.sleep is not None else 0.5
if not (args.logs or args.traces or args.metrics):
print("Nothing to send. Specify at least one of --logs, --traces, --metrics")
return
resource = Resource.create({
"service.name": "example-web-api",
"service.version": "1.2.3",
"service.instance.id": "instance-001",
"deployment.environment": "staging",
})
tracer = None
meter = None
logger = None
tracer_provider = None
logger_provider = None
meter_provider = None
# --- Traces ---
if args.traces:
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True))
)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("example-tracer", "0.1.0")
print(f"{CYAN}Traces -> {endpoint}{RESET}")
# --- Metrics ---
if args.metrics:
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=endpoint, insecure=True),
export_interval_millis=5000,
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter("example-meter", "0.1.0")
print(f"{MAGENTA}Metrics -> {endpoint}{RESET}")
# --- Logs ---
if args.logs:
logger_provider = LoggerProvider(resource=resource)
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(OTLPLogExporter(endpoint=endpoint, insecure=True))
)
set_logger_provider(logger_provider)
handler = LoggingHandler(level=logging.DEBUG, logger_provider=logger_provider)
logger = logging.getLogger("example-app")
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
print(f"{GREEN}Logs -> {endpoint}{RESET}")
# Create metric instruments
request_counter = None
request_duration = None
active_connections = None
if meter:
request_counter = meter.create_counter(
"http.server.request.count",
description="Number of HTTP requests",
unit="requests",
)
request_duration = meter.create_histogram(
"http.server.request.duration",
description="HTTP request duration",
unit="ms",
)
active_connections = meter.create_up_down_counter(
"http.server.active_connections",
description="Number of active connections",
unit="connections",
)
# --- Generate telemetry ---
endpoints = ["/api/users", "/api/orders", "/api/products", "/api/health", "/api/auth/login"]
http_methods = ["GET", "POST", "PUT", "DELETE"]
status_codes = [200, 200, 200, 201, 204, 400, 404, 500]
count = args.count or 15
print(f"\n{BOLD}Generating {count} example requests (sleep={sleep_s}s)...{RESET}\n")
for i in range(count):
ep = random.choice(endpoints)
method = random.choice(http_methods)
status = random.choice(status_codes)
duration_ms = random.uniform(5, 500)
attrs = {"http.method": method, "http.route": ep, "http.status_code": status}
# Trace: parent span with child spans
ctx_mgr = tracer.start_as_current_span(
f"{method} {ep}",
attributes={
"http.method": method,
"http.url": f"https://api.example.com{ep}",
"http.status_code": status,
"http.route": ep,
},
) if tracer else _noop_ctx()
with ctx_mgr as parent_span:
if tracer:
trace_id = format(parent_span.get_span_context().trace_id, '032x')
print(f" {CYAN}{BOLD}[TRACE]{RESET} {CYAN}{method} {ep} -> {status} {DIM}(trace_id={trace_id}){RESET}")
# Metrics
if meter:
request_counter.add(1, attrs)
request_duration.record(duration_ms, {"http.method": method, "http.route": ep})
print(f" {MAGENTA}[METRIC]{RESET} {MAGENTA}http.server.request.count +1 | http.server.request.duration={duration_ms:.1f}ms{RESET}")
# Child span: auth middleware
if tracer:
with tracer.start_as_current_span(
"auth.middleware",
attributes={"auth.method": "jwt", "auth.success": status != 401},
):
time.sleep(random.uniform(0.001, 0.01))
# Child span: database query
if tracer:
with tracer.start_as_current_span(
"db.query",
attributes={
"db.system": "postgresql",
"db.statement": f"SELECT * FROM {ep.split('/')[-1]} WHERE id = $1",
"db.name": "app_db",
},
) as db_span:
time.sleep(random.uniform(0.005, 0.02))
if status == 500:
db_span.set_status(trace.StatusCode.ERROR, "Query timeout")
db_span.record_exception(Exception("Connection pool exhausted"))
# Child span: cache lookup
if tracer:
with tracer.start_as_current_span(
"cache.lookup",
attributes={
"cache.system": "redis",
"cache.hit": random.choice([True, False]),
"cache.key": f"{ep.split('/')[-1]}:123",
},
):
time.sleep(random.uniform(0.001, 0.005))
# Logs
if logger:
if status >= 500:
msg = f"Internal server error on {method} {ep}: status={status}"
logger.error(msg, extra={"http.method": method, "http.route": ep, "error.type": "ServerError"})
print(f" {RED}{BOLD}[LOG] ERROR:{RESET} {RED}{msg}{RESET}")
elif status >= 400:
msg = f"Client error on {method} {ep}: status={status}"
logger.warning(msg, extra={"http.method": method, "http.route": ep})
print(f" {YELLOW}{BOLD}[LOG] WARN:{RESET} {YELLOW}{msg}{RESET}")
else:
msg = f"Request completed: {method} {ep} -> {status} in {duration_ms:.1f}ms"
logger.info(msg, extra={"http.method": method, "http.route": ep, "http.status_code": status})
print(f" {GREEN}{BOLD}[LOG] INFO:{RESET} {GREEN}{msg}{RESET}")
if meter:
delta = random.choice([-1, 1])
active_connections.add(delta)
sign = "+" if delta > 0 else ""
print(f" {MAGENTA}[METRIC]{RESET} {MAGENTA}http.server.active_connections {sign}{delta}{RESET}")
print() # blank line after each request
if i < count - 1:
time.sleep(sleep_s)
# Extra standalone logs
if logger:
standalone_logs = [
(logger.info, "INFO", "Application started successfully", {"app.version": "1.2.3"}),
(logger.warning, "WARN", "Cache hit ratio below threshold: 45%", {"cache.hit_ratio": 0.45}),
(logger.error, "ERROR", "Failed to connect to payment gateway", {"payment.provider": "stripe", "retry.count": 3}),
(logger.debug, "DEBUG", "Configuration loaded from environment", {"config.source": "env"}),
(logger.info, "INFO", "Scheduled job 'cleanup_sessions' completed", {"job.name": "cleanup_sessions", "job.duration_ms": 1234}),
]
log_colors = {"INFO": GREEN, "WARN": YELLOW, "ERROR": RED, "DEBUG": GRAY}
for log_fn, level, msg, extra in standalone_logs:
log_fn(msg, extra=extra)
c = log_colors.get(level, WHITE)
print(f" {c}{BOLD}[LOG] {level}:{RESET} {c}{msg}{RESET}")
# Flush
print(f"\n{DIM}Flushing...{RESET}")
if tracer_provider:
tracer_provider.force_flush()
if logger_provider:
logger_provider.force_flush()
if meter_provider:
meter_provider.force_flush()
print(f"{GREEN}{BOLD}Done!{RESET}")
def main():
args = parse_args()
if needs_interactive(args):
args = prompt_interactive()
elif args.grpc is None:
print("Error: --grpc is required when using flags.")
sys.exit(1)
run(args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment