Created
March 12, 2026 22:05
-
-
Save avaitla/8e383625ea17fc3b724006796be323fd to your computer and use it in GitHub Desktop.
Send test traces / logs / metrics to grpc endpoint
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "opentelemetry-api", | |
| # "opentelemetry-sdk", | |
| # "opentelemetry-exporter-otlp-proto-grpc", | |
| # ] | |
| # /// | |
| """Send example OTLP traces, logs, and metrics to a gRPC endpoint.""" | |
| import argparse | |
| import logging | |
| import shlex | |
| import sys | |
| import time | |
| import random | |
| from contextlib import nullcontext as _noop_ctx | |
| from opentelemetry import trace, metrics | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
| from opentelemetry.sdk.metrics import MeterProvider | |
| from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader | |
| from opentelemetry.sdk.resources import Resource | |
| from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | |
| from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter | |
| from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler | |
| from opentelemetry.sdk._logs.export import BatchLogRecordProcessor | |
| from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter | |
| from opentelemetry._logs import set_logger_provider | |
| # ANSI colors | |
| RESET = "\033[0m" | |
| BOLD = "\033[1m" | |
| DIM = "\033[2m" | |
| RED = "\033[31m" | |
| GREEN = "\033[32m" | |
| YELLOW = "\033[33m" | |
| BLUE = "\033[34m" | |
| MAGENTA = "\033[35m" | |
| CYAN = "\033[36m" | |
| WHITE = "\033[37m" | |
| GRAY = "\033[90m" | |
| def parse_args(): | |
| parser = argparse.ArgumentParser(description="Send example OTLP telemetry") | |
| parser.add_argument("--grpc", default=None, help="gRPC endpoint (host:port)") | |
| parser.add_argument("--logs", action="store_true", help="Send example logs") | |
| parser.add_argument("--traces", action="store_true", help="Send example traces") | |
| parser.add_argument("--metrics", action="store_true", help="Send example metrics") | |
| parser.add_argument("--count", type=int, default=None, help="Number of example requests to simulate") | |
| parser.add_argument("--sleep", type=float, default=None, help="Sleep between sends in seconds (default: 0.5)") | |
| return parser.parse_args() | |
| def prompt_interactive(): | |
| """Ask the user for configuration and return the equivalent CLI args.""" | |
| print(f"{BOLD}No flags provided — entering interactive mode.{RESET}\n") | |
| grpc = input("gRPC endpoint (host:port): ").strip() | |
| if not grpc: | |
| print("Endpoint is required.") | |
| sys.exit(1) | |
| print("\nWhich signals to send?") | |
| send_traces = input(" Send traces? [Y/n]: ").strip().lower() not in ("n", "no") | |
| send_logs = input(" Send logs? [Y/n]: ").strip().lower() not in ("n", "no") | |
| send_metrics = input(" Send metrics? [Y/n]: ").strip().lower() not in ("n", "no") | |
| if not (send_traces or send_logs or send_metrics): | |
| print("Nothing selected.") | |
| sys.exit(1) | |
| count_str = input("\nNumber of example requests [15]: ").strip() | |
| count = int(count_str) if count_str else 15 | |
| sleep_str = input("Sleep between sends in seconds [0.5]: ").strip() | |
| sleep_s = float(sleep_str) if sleep_str else 0.5 | |
| # Build and print the reusable command | |
| parts = ["uv run send_otlp.py", f"--grpc={shlex.quote(grpc)}"] | |
| if send_traces: | |
| parts.append("--traces") | |
| if send_logs: | |
| parts.append("--logs") | |
| if send_metrics: | |
| parts.append("--metrics") | |
| if count != 15: | |
| parts.append(f"--count={count}") | |
| if sleep_s != 0.5: | |
| parts.append(f"--sleep={sleep_s}") | |
| cmd = " ".join(parts) | |
| print(f"\n{DIM}Run again with:{RESET}\n {BOLD}{cmd}{RESET}\n") | |
| return argparse.Namespace( | |
| grpc=grpc, | |
| traces=send_traces, | |
| logs=send_logs, | |
| metrics=send_metrics, | |
| count=count, | |
| sleep=sleep_s, | |
| ) | |
| def needs_interactive(args): | |
| """True if no meaningful flags were provided.""" | |
| return args.grpc is None and not args.traces and not args.logs and not args.metrics | |
| def run(args): | |
| endpoint = f"http://{args.grpc}" | |
| sleep_s = args.sleep if args.sleep is not None else 0.5 | |
| if not (args.logs or args.traces or args.metrics): | |
| print("Nothing to send. Specify at least one of --logs, --traces, --metrics") | |
| return | |
| resource = Resource.create({ | |
| "service.name": "example-web-api", | |
| "service.version": "1.2.3", | |
| "service.instance.id": "instance-001", | |
| "deployment.environment": "staging", | |
| }) | |
| tracer = None | |
| meter = None | |
| logger = None | |
| tracer_provider = None | |
| logger_provider = None | |
| meter_provider = None | |
| # --- Traces --- | |
| if args.traces: | |
| tracer_provider = TracerProvider(resource=resource) | |
| tracer_provider.add_span_processor( | |
| BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint, insecure=True)) | |
| ) | |
| trace.set_tracer_provider(tracer_provider) | |
| tracer = trace.get_tracer("example-tracer", "0.1.0") | |
| print(f"{CYAN}Traces -> {endpoint}{RESET}") | |
| # --- Metrics --- | |
| if args.metrics: | |
| metric_reader = PeriodicExportingMetricReader( | |
| OTLPMetricExporter(endpoint=endpoint, insecure=True), | |
| export_interval_millis=5000, | |
| ) | |
| meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) | |
| metrics.set_meter_provider(meter_provider) | |
| meter = metrics.get_meter("example-meter", "0.1.0") | |
| print(f"{MAGENTA}Metrics -> {endpoint}{RESET}") | |
| # --- Logs --- | |
| if args.logs: | |
| logger_provider = LoggerProvider(resource=resource) | |
| logger_provider.add_log_record_processor( | |
| BatchLogRecordProcessor(OTLPLogExporter(endpoint=endpoint, insecure=True)) | |
| ) | |
| set_logger_provider(logger_provider) | |
| handler = LoggingHandler(level=logging.DEBUG, logger_provider=logger_provider) | |
| logger = logging.getLogger("example-app") | |
| logger.setLevel(logging.DEBUG) | |
| logger.addHandler(handler) | |
| print(f"{GREEN}Logs -> {endpoint}{RESET}") | |
| # Create metric instruments | |
| request_counter = None | |
| request_duration = None | |
| active_connections = None | |
| if meter: | |
| request_counter = meter.create_counter( | |
| "http.server.request.count", | |
| description="Number of HTTP requests", | |
| unit="requests", | |
| ) | |
| request_duration = meter.create_histogram( | |
| "http.server.request.duration", | |
| description="HTTP request duration", | |
| unit="ms", | |
| ) | |
| active_connections = meter.create_up_down_counter( | |
| "http.server.active_connections", | |
| description="Number of active connections", | |
| unit="connections", | |
| ) | |
| # --- Generate telemetry --- | |
| endpoints = ["/api/users", "/api/orders", "/api/products", "/api/health", "/api/auth/login"] | |
| http_methods = ["GET", "POST", "PUT", "DELETE"] | |
| status_codes = [200, 200, 200, 201, 204, 400, 404, 500] | |
| count = args.count or 15 | |
| print(f"\n{BOLD}Generating {count} example requests (sleep={sleep_s}s)...{RESET}\n") | |
| for i in range(count): | |
| ep = random.choice(endpoints) | |
| method = random.choice(http_methods) | |
| status = random.choice(status_codes) | |
| duration_ms = random.uniform(5, 500) | |
| attrs = {"http.method": method, "http.route": ep, "http.status_code": status} | |
| # Trace: parent span with child spans | |
| ctx_mgr = tracer.start_as_current_span( | |
| f"{method} {ep}", | |
| attributes={ | |
| "http.method": method, | |
| "http.url": f"https://api.example.com{ep}", | |
| "http.status_code": status, | |
| "http.route": ep, | |
| }, | |
| ) if tracer else _noop_ctx() | |
| with ctx_mgr as parent_span: | |
| if tracer: | |
| trace_id = format(parent_span.get_span_context().trace_id, '032x') | |
| print(f" {CYAN}{BOLD}[TRACE]{RESET} {CYAN}{method} {ep} -> {status} {DIM}(trace_id={trace_id}){RESET}") | |
| # Metrics | |
| if meter: | |
| request_counter.add(1, attrs) | |
| request_duration.record(duration_ms, {"http.method": method, "http.route": ep}) | |
| print(f" {MAGENTA}[METRIC]{RESET} {MAGENTA}http.server.request.count +1 | http.server.request.duration={duration_ms:.1f}ms{RESET}") | |
| # Child span: auth middleware | |
| if tracer: | |
| with tracer.start_as_current_span( | |
| "auth.middleware", | |
| attributes={"auth.method": "jwt", "auth.success": status != 401}, | |
| ): | |
| time.sleep(random.uniform(0.001, 0.01)) | |
| # Child span: database query | |
| if tracer: | |
| with tracer.start_as_current_span( | |
| "db.query", | |
| attributes={ | |
| "db.system": "postgresql", | |
| "db.statement": f"SELECT * FROM {ep.split('/')[-1]} WHERE id = $1", | |
| "db.name": "app_db", | |
| }, | |
| ) as db_span: | |
| time.sleep(random.uniform(0.005, 0.02)) | |
| if status == 500: | |
| db_span.set_status(trace.StatusCode.ERROR, "Query timeout") | |
| db_span.record_exception(Exception("Connection pool exhausted")) | |
| # Child span: cache lookup | |
| if tracer: | |
| with tracer.start_as_current_span( | |
| "cache.lookup", | |
| attributes={ | |
| "cache.system": "redis", | |
| "cache.hit": random.choice([True, False]), | |
| "cache.key": f"{ep.split('/')[-1]}:123", | |
| }, | |
| ): | |
| time.sleep(random.uniform(0.001, 0.005)) | |
| # Logs | |
| if logger: | |
| if status >= 500: | |
| msg = f"Internal server error on {method} {ep}: status={status}" | |
| logger.error(msg, extra={"http.method": method, "http.route": ep, "error.type": "ServerError"}) | |
| print(f" {RED}{BOLD}[LOG] ERROR:{RESET} {RED}{msg}{RESET}") | |
| elif status >= 400: | |
| msg = f"Client error on {method} {ep}: status={status}" | |
| logger.warning(msg, extra={"http.method": method, "http.route": ep}) | |
| print(f" {YELLOW}{BOLD}[LOG] WARN:{RESET} {YELLOW}{msg}{RESET}") | |
| else: | |
| msg = f"Request completed: {method} {ep} -> {status} in {duration_ms:.1f}ms" | |
| logger.info(msg, extra={"http.method": method, "http.route": ep, "http.status_code": status}) | |
| print(f" {GREEN}{BOLD}[LOG] INFO:{RESET} {GREEN}{msg}{RESET}") | |
| if meter: | |
| delta = random.choice([-1, 1]) | |
| active_connections.add(delta) | |
| sign = "+" if delta > 0 else "" | |
| print(f" {MAGENTA}[METRIC]{RESET} {MAGENTA}http.server.active_connections {sign}{delta}{RESET}") | |
| print() # blank line after each request | |
| if i < count - 1: | |
| time.sleep(sleep_s) | |
| # Extra standalone logs | |
| if logger: | |
| standalone_logs = [ | |
| (logger.info, "INFO", "Application started successfully", {"app.version": "1.2.3"}), | |
| (logger.warning, "WARN", "Cache hit ratio below threshold: 45%", {"cache.hit_ratio": 0.45}), | |
| (logger.error, "ERROR", "Failed to connect to payment gateway", {"payment.provider": "stripe", "retry.count": 3}), | |
| (logger.debug, "DEBUG", "Configuration loaded from environment", {"config.source": "env"}), | |
| (logger.info, "INFO", "Scheduled job 'cleanup_sessions' completed", {"job.name": "cleanup_sessions", "job.duration_ms": 1234}), | |
| ] | |
| log_colors = {"INFO": GREEN, "WARN": YELLOW, "ERROR": RED, "DEBUG": GRAY} | |
| for log_fn, level, msg, extra in standalone_logs: | |
| log_fn(msg, extra=extra) | |
| c = log_colors.get(level, WHITE) | |
| print(f" {c}{BOLD}[LOG] {level}:{RESET} {c}{msg}{RESET}") | |
| # Flush | |
| print(f"\n{DIM}Flushing...{RESET}") | |
| if tracer_provider: | |
| tracer_provider.force_flush() | |
| if logger_provider: | |
| logger_provider.force_flush() | |
| if meter_provider: | |
| meter_provider.force_flush() | |
| print(f"{GREEN}{BOLD}Done!{RESET}") | |
| def main(): | |
| args = parse_args() | |
| if needs_interactive(args): | |
| args = prompt_interactive() | |
| elif args.grpc is None: | |
| print("Error: --grpc is required when using flags.") | |
| sys.exit(1) | |
| run(args) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment