Created
January 7, 2026 15:13
-
-
Save isc/8e1c88b1f610a7e4de367e61ed313570 to your computer and use it in GitHub Desktop.
MCP Proxy for test-failures.doctolib.com - handles CloudFlare Access authentication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| MCP Proxy for test-failures.doctolib.com | |
| Handles CloudFlare Access authentication automatically. | |
| """ | |
| import json | |
| import sys | |
| import os | |
| import subprocess | |
| import time | |
| import traceback | |
| from typing import Optional | |
| from pathlib import Path | |
| from urllib.request import Request, urlopen | |
| from urllib.error import URLError, HTTPError | |
| # Configuration | |
| MCP_URL = "https://test-failures.doctolib.com/mcp" | |
| TOKEN_CACHE_FILE = Path.home() / ".cache" / "doctolib" / "test-failures-cf-token" | |
| def log_error(message: str): | |
| """Log error messages to stderr so they don't interfere with MCP protocol.""" | |
| print(f"[test-failures-mcp-proxy] ERROR: {message}", file=sys.stderr, flush=True) | |
| def log_info(message: str): | |
| """Log info messages to stderr.""" | |
| print(f"[test-failures-mcp-proxy] {message}", file=sys.stderr, flush=True) | |
| def get_cloudflare_token() -> Optional[str]: | |
| """ | |
| Get CloudFlare Access token using cloudflared. | |
| Attempts to use cached token first, falls back to login flow. | |
| """ | |
| # Check if we have a cached token | |
| if TOKEN_CACHE_FILE.exists(): | |
| try: | |
| with open(TOKEN_CACHE_FILE, "r") as f: | |
| token_data = json.load(f) | |
| token = token_data.get("token") | |
| expires_at = token_data.get("expires_at", 0) | |
| # Check if token is still valid (with 5 minute buffer) | |
| if token and time.time() < expires_at - 300: | |
| log_info("Using cached CF Access token") | |
| return token | |
| except Exception as e: | |
| log_error(f"Failed to read cached token: {e}") | |
| # Need to get a new token | |
| log_info("Getting new CF Access token...") | |
| def try_get_token(): | |
| """Helper to attempt getting token.""" | |
| result = subprocess.run( | |
| [ | |
| "cloudflared", | |
| "access", | |
| "token", | |
| "-app=https://test-failures.doctolib.com", | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=60, | |
| ) | |
| return result | |
| try: | |
| # First attempt to get token | |
| result = try_get_token() | |
| # If failed due to missing login, try to login first | |
| if result.returncode != 0 and ( | |
| "Unable to find token" in result.stderr | |
| or "Please run login command" in result.stderr | |
| ): | |
| log_info("No CF Access token found, running login flow...") | |
| log_info( | |
| "A browser window will open for authentication. Please complete the login." | |
| ) | |
| # Run the login command | |
| login_result = subprocess.run( | |
| ["cloudflared", "access", "login", "https://test-failures.doctolib.com"], | |
| capture_output=True, | |
| text=True, | |
| timeout=120, | |
| ) | |
| if login_result.returncode != 0: | |
| log_error(f"cloudflared login failed: {login_result.stderr}") | |
| log_error( | |
| "Please manually run: cloudflared access login https://test-failures.doctolib.com" | |
| ) | |
| return None | |
| log_info("Login successful, getting token...") | |
| # Retry getting the token | |
| result = try_get_token() | |
| if result.returncode != 0: | |
| log_error(f"cloudflared failed: {result.stderr}") | |
| log_error( | |
| "Please try manually running: cloudflared access login https://test-failures.doctolib.com" | |
| ) | |
| return None | |
| token = result.stdout.strip() | |
| # Cache the token (tokens typically valid for 24 hours) | |
| TOKEN_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with open(TOKEN_CACHE_FILE, "w") as f: | |
| json.dump( | |
| { | |
| "token": token, | |
| "expires_at": time.time() + 86400, # 24 hours | |
| }, | |
| f, | |
| ) | |
| TOKEN_CACHE_FILE.chmod(0o600) # Secure the file | |
| log_info("Successfully obtained CF Access token") | |
| return token | |
| except FileNotFoundError: | |
| log_error( | |
| "cloudflared not found. Please install it: brew install cloudflare/cloudflare/cloudflared" | |
| ) | |
| return None | |
| except subprocess.TimeoutExpired: | |
| log_error("cloudflared timeout - authentication may have failed") | |
| return None | |
| except Exception as e: | |
| log_error(f"Unexpected error getting token: {e}") | |
| return None | |
| def get_bearer_token() -> Optional[str]: | |
| """Get Bearer token from environment variable.""" | |
| token = os.environ.get("GITHUB_PERSONAL_ACCESS_TOKEN") or os.environ.get("HOMEBREW_GITHUB_API_TOKEN") | |
| if not token: | |
| log_error("GITHUB_PERSONAL_ACCESS_TOKEN or HOMEBREW_GITHUB_API_TOKEN environment variable not set") | |
| else: | |
| log_info(f"Bearer token found: {token[:10]}... (length: {len(token)})") | |
| return token | |
| def make_mcp_request(cf_token: str, bearer_token: str, json_rpc_request: dict) -> dict: | |
| """Forward MCP request to remote server with authentication.""" | |
| headers = { | |
| "Authorization": f"Bearer {bearer_token}", | |
| "cf-access-token": cf_token, | |
| "Content-Type": "application/json", | |
| "User-Agent": "mcp-proxy/1.0", | |
| } | |
| def _make_request(token: str) -> dict: | |
| """Helper to make the actual HTTP request.""" | |
| headers["cf-access-token"] = token | |
| data = json.dumps(json_rpc_request).encode("utf-8") | |
| request = Request(MCP_URL, data=data, headers=headers, method="POST") | |
| # Debug: log request details (truncate sensitive data) | |
| bearer_preview = bearer_token[:10] if bearer_token else "MISSING" | |
| cf_preview = token[:30] if token else "MISSING" | |
| log_info( | |
| f"Sending request - Bearer: {bearer_preview}... (len={len(bearer_token) if bearer_token else 0}), CF token: {cf_preview}... (len={len(token) if token else 0})" | |
| ) | |
| try: | |
| with urlopen(request, timeout=30) as response: | |
| response_data = response.read().decode("utf-8") | |
| log_info(f"Response status: {response.status}") | |
| log_info(f"Response length: {len(response_data)} bytes") | |
| # Log first 500 chars of response for debugging | |
| if len(response_data) > 0: | |
| preview = response_data[:500] | |
| log_info(f"Response preview: {preview}") | |
| else: | |
| log_error("Response is empty!") | |
| if not response_data.strip(): | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": json_rpc_request.get("id"), | |
| "error": { | |
| "code": -32603, | |
| "message": "Server returned empty response", | |
| }, | |
| } | |
| # Check if CloudFlare returned a login page (expired token) | |
| if ( | |
| "<!DOCTYPE html>" in response_data | |
| or "Sign in ・ Cloudflare Access" in response_data | |
| or "<title>Cloudflare" in response_data | |
| ): | |
| log_info("Detected CloudFlare Access login page - token expired") | |
| return { | |
| "status_code": 403, | |
| "body": "CF Access token expired", | |
| "error": True, | |
| } | |
| return json.loads(response_data) | |
| except HTTPError as e: | |
| # Read error response body | |
| error_body = e.read().decode("utf-8") if e.fp else "" | |
| log_error(f"HTTP {e.code} error body: {error_body}") | |
| return {"status_code": e.code, "body": error_body, "error": True} | |
| try: | |
| result = _make_request(cf_token) | |
| # Handle CF Access challenge (any 403 should trigger token refresh) | |
| if result.get("error") and result.get("status_code") == 403: | |
| log_info("Got 403 response, refreshing CF Access token...") | |
| # Delete cached token and retry | |
| if TOKEN_CACHE_FILE.exists(): | |
| TOKEN_CACHE_FILE.unlink() | |
| new_cf_token = get_cloudflare_token() | |
| if new_cf_token: | |
| result = _make_request(new_cf_token) | |
| else: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": json_rpc_request.get("id"), | |
| "error": { | |
| "code": -32000, | |
| "message": "Failed to refresh CF Access token", | |
| }, | |
| } | |
| # Handle HTTP errors (not JSON-RPC errors which also have an "error" field) | |
| # HTTP errors have status_code, JSON-RPC errors have error.code | |
| if result.get("status_code"): | |
| status_code = result.get("status_code") | |
| body = result.get("body", "")[:200] | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": json_rpc_request.get("id"), | |
| "error": {"code": -32000, "message": f"HTTP {status_code}: {body}"}, | |
| } | |
| return result | |
| except TimeoutError: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": json_rpc_request.get("id"), | |
| "error": { | |
| "code": -32000, | |
| "message": "Request timeout - test-failures MCP server did not respond", | |
| }, | |
| } | |
| except URLError as e: | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": json_rpc_request.get("id"), | |
| "error": { | |
| "code": -32000, | |
| "message": f"Connection failed - cannot reach test-failures.doctolib.com: {e.reason}", | |
| }, | |
| } | |
| except json.JSONDecodeError as e: | |
| log_error(f"Failed to parse JSON response: {e}") | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": json_rpc_request.get("id"), | |
| "error": { | |
| "code": -32603, | |
| "message": f"Invalid JSON response from server: {str(e)}", | |
| }, | |
| } | |
| except Exception as e: | |
| log_error(f"Unexpected error in make_mcp_request: {e}") | |
| log_error(traceback.format_exc()) | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": json_rpc_request.get("id"), | |
| "error": {"code": -32603, "message": f"Internal error: {str(e)}"}, | |
| } | |
| def main(): | |
| """Main proxy loop - reads from stdin, writes to stdout.""" | |
| log_info("Starting test-failures MCP proxy") | |
| # Get initial tokens | |
| cf_token = get_cloudflare_token() | |
| if not cf_token: | |
| log_error("Failed to obtain CF Access token - exiting") | |
| sys.exit(1) | |
| bearer_token = get_bearer_token() | |
| if not bearer_token: | |
| log_error("Failed to obtain Bearer token - exiting") | |
| sys.exit(1) | |
| log_info("Proxy ready, waiting for MCP requests...") | |
| # Read JSON-RPC messages from stdin and forward to remote server | |
| for line in sys.stdin: | |
| try: | |
| request = json.loads(line.strip()) | |
| # Log request (but not full content to avoid spam) | |
| method = request.get("method", "unknown") | |
| request_id = request.get("id", "no-id") | |
| log_info(f"Forwarding request: {method} (id={request_id})") | |
| # Forward to remote MCP server | |
| response = make_mcp_request(cf_token, bearer_token, request) | |
| # Write response to stdout | |
| print(json.dumps(response), flush=True) | |
| except json.JSONDecodeError as e: | |
| log_error(f"Invalid JSON received: {e}") | |
| # Send error response | |
| error_response = { | |
| "jsonrpc": "2.0", | |
| "id": None, | |
| "error": {"code": -32700, "message": f"Parse error: {str(e)}"}, | |
| } | |
| print(json.dumps(error_response), flush=True) | |
| except Exception: | |
| log_error("Unexpected error processing request") | |
| log_error(traceback.format_exc()) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment