Created
August 19, 2025 05:17
-
-
Save enchance/75547e269b4696b1ed2c4806d5cdb39a to your computer and use it in GitHub Desktop.
FastAPI integration for writing to Loki
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os, time, logging, uuid | |
| from logging_loki import LokiHandler, emitter | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from contextvars import ContextVar | |
| # Context variables for request data | |
| request_context: ContextVar[dict] = ContextVar('request_context', default={}) | |
| last_log_message: ContextVar[str] = ContextVar('last_log_message', default="Request completed") | |
| class LokiMiddleware(BaseHTTPMiddleware): | |
| async def dispatch(self, request, call_next): | |
| # start_time = time.time() | |
| request_id = str(uuid.uuid4()) | |
| # Set context for this request | |
| context_data = { | |
| "request_id": request_id, | |
| "method": request.method, | |
| "path": request.url.path, | |
| "scheme": str(request.url.scheme) | |
| } | |
| request_context.set(context_data) | |
| response = await call_next(request) | |
| status_code = response.status_code | |
| # Log request completion with all data including status_code | |
| # duration_ms = int((time.time() - start_time) * 1000) | |
| final_context = context_data.copy() | |
| final_context.update({ | |
| "status_code": str(status_code), | |
| # "duration_ms": str(duration_ms) | |
| }) | |
| # request_context.set(final_context) | |
| # Determine log level | |
| if status_code >= 500: | |
| level = logging.ERROR | |
| message = f'{status_code} HTTP request failed' | |
| elif status_code >= 400: | |
| level = logging.WARNING | |
| message = f'{status_code} HTTP request warning' | |
| # elif duration_ms > self.slow_request_threshold: | |
| # level = logging.WARNING | |
| # message = "Slow HTTP request" | |
| else: | |
| level = logging.INFO | |
| message = f'{status_code} HTTP request completed' | |
| # Temporarily update context for completion log | |
| request_context.set(final_context) | |
| # logger.info(f'{final_context["status_code"]} Request completed') | |
| logger.log(level, message) | |
| return response | |
| class ContextLokiHandler(LokiHandler): | |
| def emit(self, record): | |
| last_log_message.set(record.getMessage()) | |
| context_data = request_context.get({}) | |
| original_tags = self.emitter.tags.copy() | |
| # Add extra fields from the log record | |
| if hasattr(record, '__dict__'): | |
| for key, value in record.__dict__.items(): | |
| if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename', 'module', | |
| 'exc_info', 'exc_text', 'stack_info', 'lineno', 'funcName', 'created', 'msecs', | |
| 'relativeCreated', 'thread', 'threadName', 'processName', 'process', 'getMessage']: | |
| context_data[key] = str(value) | |
| self.emitter.tags.update(context_data) | |
| super().emit(record) | |
| self.emitter.tags = original_tags | |
| # Updated handler setup | |
| emitter.LokiEmitter.level_tag = "level" | |
| handler = ContextLokiHandler( | |
| url=os.getenv('LOKI_URL'), | |
| tags={ | |
| "app": s.APPNAME, | |
| "env": s.ENV, | |
| # "service": 'api', | |
| # Add custom labels here: | |
| # "region": "us-east-1", | |
| # "version": "1.0.0" | |
| }, | |
| version="1" | |
| ) | |
| handler.setLevel(logging.INFO) | |
| logger = logging.getLogger('heimdall') | |
| logger.setLevel(logging.INFO) | |
| logger.addHandler(handler) | |
| # FastAPI: | |
| app = FastAPI() | |
| app.add_middleware(LokiMiddleware) # noqa | |
| # Usage | |
| @app.get('/') | |
| async def index(): | |
| logger.warning("You shall not pass!", extra={"user_id": 123, "validation_error": "You are ugly"}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment