Created
November 7, 2025 19:44
-
-
Save cab938/350761f4c66e6d23e7da9cb4888056b0 to your computer and use it in GitHub Desktop.
Logging for MCP client server communication over stdio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Transparent stdio wrapper for MCP processes. | |
| This helper launches a child process, forwards stdin, stdout, and stderr between | |
| the parent and child, and tees every byte into timestamped log files. The wrapper | |
| avoids writing to stdout so that MCP JSON exchanges remain intact. Optional | |
| verbose logs go to stderr for troubleshooting. | |
| Examples | |
| -------- | |
| python wrapper.py --log-base ripgrep docker compose run --rm -T mcp-ripgrep | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import contextlib | |
| import errno | |
| import os | |
| import selectors | |
| import signal | |
| import subprocess | |
| import sys | |
| import time | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import BinaryIO, Callable, Iterator, Optional, Sequence | |
| @dataclass(frozen=True) | |
| class LogPaths: | |
| directory: Path | |
| stdin: Path | |
| stdout: Path | |
| stderr: Path | |
| @dataclass | |
| class LogFiles: | |
| stdin: BinaryIO | |
| stdout: BinaryIO | |
| stderr: BinaryIO | |
| @dataclass | |
| class StreamRoute: | |
| log: BinaryIO | |
| target_fd: Optional[int] | |
| label: str | |
| def now_stamp(fmt: str) -> str: | |
| return time.strftime(fmt, time.localtime()) | |
| def safe_write(fd: int, data: bytes) -> bool: | |
| """Write all data to file descriptor, returning False if the peer is closed.""" | |
| view = memoryview(data) | |
| while view: | |
| try: | |
| written = os.write(fd, view) | |
| except InterruptedError: | |
| continue | |
| except OSError as exc: | |
| if exc.errno in (errno.EPIPE, errno.EBADF): | |
| return False | |
| raise | |
| view = view[written:] | |
| return True | |
| def debug(enabled: bool, message: str) -> None: | |
| if enabled: | |
| print(message, file=sys.stderr) | |
| def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Forward stdio to a child process while teeing raw logs to disk." | |
| ) | |
| parser.add_argument( | |
| "--log-dir", | |
| help="Directory for log files (defaults to LOG_DIR env var or ./mcp-logs).", | |
| default=None, | |
| ) | |
| parser.add_argument( | |
| "--log-base", | |
| default=os.environ.get("LOG_BASENAME", "mcp"), | |
| help="Prefix for log files (default: %(default)s).", | |
| ) | |
| parser.add_argument( | |
| "--ts-fmt", | |
| default=os.environ.get("TS_FMT", "%Y%m%d-%H%M%S"), | |
| help="Timestamp format applied to log filenames (default: %(default)s).", | |
| ) | |
| parser.add_argument( | |
| "--verbose", | |
| action="store_true", | |
| default=bool(os.environ.get("WRAPPER_VERBOSE")), | |
| help="Emit diagnostic information to stderr.", | |
| ) | |
| parser.add_argument( | |
| "cmd", | |
| nargs=argparse.REMAINDER, | |
| metavar="CMD", | |
| help="Child command and arguments to execute.", | |
| ) | |
| args = parser.parse_args(argv) | |
| if not args.cmd: | |
| parser.error("a command to wrap is required") | |
| return args | |
| def resolve_log_dir(raw: Optional[str]) -> Path: | |
| candidate = raw or os.environ.get("LOG_DIR") | |
| path = Path(candidate).expanduser() if candidate else Path.cwd() / "mcp-logs" | |
| path.mkdir(parents=True, exist_ok=True) | |
| return path | |
| def build_log_paths(log_dir: Path, base: str, ts_fmt: str) -> LogPaths: | |
| stamp = now_stamp(ts_fmt) | |
| return LogPaths( | |
| directory=log_dir, | |
| stdin=log_dir / f"{base}.{stamp}.in.log", | |
| stdout=log_dir / f"{base}.{stamp}.out.log", | |
| stderr=log_dir / f"{base}.{stamp}.err.log", | |
| ) | |
| def open_log_files(paths: LogPaths, stack: contextlib.ExitStack) -> LogFiles: | |
| return LogFiles( | |
| stdin=stack.enter_context(paths.stdin.open("ab", buffering=0)), | |
| stdout=stack.enter_context(paths.stdout.open("ab", buffering=0)), | |
| stderr=stack.enter_context(paths.stderr.open("ab", buffering=0)), | |
| ) | |
| @contextlib.contextmanager | |
| def forward_signals(child: subprocess.Popen[bytes], verbose: bool) -> Iterator[None]: | |
| forwarded = [] | |
| def handler(signum: int, _frame: Optional[object]) -> None: | |
| try: | |
| name = signal.Signals(signum).name | |
| except ValueError: | |
| name = str(signum) | |
| debug(verbose, f"Forwarding {name} to child.") | |
| try: | |
| child.send_signal(signum) | |
| except Exception as exc: # pragma: no cover - defensive | |
| debug(verbose, f"Failed to forward signal {name}: {exc}") | |
| for sig_name in ("SIGINT", "SIGTERM", "SIGHUP", "SIGQUIT"): | |
| sig = getattr(signal, sig_name, None) | |
| if sig is None: | |
| continue | |
| previous = signal.getsignal(sig) | |
| signal.signal(sig, handler) | |
| forwarded.append((sig, previous)) | |
| sigchld = getattr(signal, "SIGCHLD", None) | |
| if sigchld is not None: | |
| # Leave SIGCHLD untouched so the wrapper can detect child exit promptly. | |
| debug(verbose, "SIGCHLD is not forwarded; default handling informs the wrapper of child exit.") | |
| try: | |
| yield | |
| finally: | |
| for sig, previous in forwarded: | |
| signal.signal(sig, previous) | |
| def _handle_parent_stdin( | |
| parent_fd: int, | |
| child_fd: int, | |
| log: BinaryIO, | |
| close_cb: Callable[[str], None], | |
| ) -> bool: | |
| try: | |
| data = os.read(parent_fd, 65536) | |
| except BlockingIOError: | |
| return True | |
| if not data: | |
| close_cb("Upstream stdin reached EOF. Closed child stdin.") | |
| return False | |
| log.write(data) | |
| accepted = safe_write(child_fd, data) | |
| if not accepted: | |
| close_cb("Child stdin closed early. Stopped forwarding input to avoid deadlock.") | |
| return False | |
| return True | |
| def _forward_stream(source_fd: int, route: StreamRoute, verbose: bool) -> bool: | |
| try: | |
| data = os.read(source_fd, 65536) | |
| except BlockingIOError: | |
| return True | |
| if not data: | |
| debug(verbose, f"{route.label} reached EOF.") | |
| return False | |
| route.log.write(data) | |
| if route.target_fd is not None: | |
| accepted = safe_write(route.target_fd, data) | |
| if not accepted: | |
| debug(verbose, f"{route.label} consumer closed. Continuing to log only.") | |
| route.target_fd = None | |
| return True | |
| def _drain_fd(fd: int, route: StreamRoute) -> None: | |
| while True: | |
| try: | |
| chunk = os.read(fd, 65536) | |
| except BlockingIOError: | |
| continue | |
| except OSError as exc: | |
| if exc.errno in (errno.EBADF, errno.EINVAL): | |
| break | |
| raise | |
| if not chunk: | |
| break | |
| route.log.write(chunk) | |
| if route.target_fd is not None: | |
| if not safe_write(route.target_fd, chunk): | |
| route.target_fd = None | |
| def _pump_io(child: subprocess.Popen[bytes], logs: LogFiles, verbose: bool) -> int: | |
| if child.stdin is None or child.stdout is None or child.stderr is None: | |
| raise RuntimeError("Child process not configured with pipes.") | |
| selector = selectors.SelectSelector() | |
| parent_in_fd = sys.stdin.buffer.fileno() | |
| child_in_fd = child.stdin.fileno() | |
| child_out_fd = child.stdout.fileno() | |
| child_err_fd = child.stderr.fileno() | |
| parent_out_fd = sys.stdout.buffer.fileno() | |
| parent_err_fd = sys.stderr.buffer.fileno() | |
| try: | |
| import fcntl # type: ignore | |
| flags = fcntl.fcntl(parent_in_fd, fcntl.F_GETFL) | |
| fcntl.fcntl(parent_in_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | |
| except Exception: | |
| pass | |
| selector.register(parent_in_fd, selectors.EVENT_READ) | |
| selector.register(child_out_fd, selectors.EVENT_READ) | |
| selector.register(child_err_fd, selectors.EVENT_READ) | |
| routes = { | |
| child_out_fd: StreamRoute(log=logs.stdout, target_fd=parent_out_fd, label="stdout"), | |
| child_err_fd: StreamRoute(log=logs.stderr, target_fd=parent_err_fd, label="stderr"), | |
| } | |
| stdin_open = True | |
| def close_child_stdin(reason: str) -> None: | |
| nonlocal stdin_open | |
| if not stdin_open: | |
| return | |
| stdin_open = False | |
| try: | |
| selector.unregister(parent_in_fd) | |
| except Exception: | |
| pass | |
| try: | |
| child.stdin.close() | |
| except Exception: | |
| pass | |
| debug(verbose, reason) | |
| while True: | |
| if child.poll() is not None and not selector.get_map(): | |
| break | |
| events = selector.select(timeout=0.2) | |
| if not events and child.poll() is not None: | |
| break | |
| for key, _ in events: | |
| fd = key.fd | |
| if fd == parent_in_fd and stdin_open: | |
| _handle_parent_stdin(parent_in_fd, child_in_fd, logs.stdin, close_child_stdin) | |
| elif fd in routes: | |
| keep = _forward_stream(fd, routes[fd], verbose) | |
| if not keep: | |
| try: | |
| selector.unregister(fd) | |
| except Exception: | |
| pass | |
| routes.pop(fd, None) | |
| if child.poll() is not None and not routes and not stdin_open: | |
| break | |
| child.wait() | |
| for fd, route in list(routes.items()): | |
| _drain_fd(fd, route) | |
| return child.returncode or 0 | |
| def run_wrapper(args: argparse.Namespace) -> int: | |
| verbose = bool(args.verbose) | |
| log_dir = resolve_log_dir(args.log_dir) | |
| log_paths = build_log_paths(log_dir, args.log_base, args.ts_fmt) | |
| with contextlib.ExitStack() as stack: | |
| logs = open_log_files(log_paths, stack) | |
| try: | |
| child = subprocess.Popen( | |
| args.cmd, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| bufsize=0, | |
| close_fds=True, | |
| ) | |
| except OSError as exc: | |
| print(f"Failed to launch command {' '.join(args.cmd)}: {exc}", file=sys.stderr) | |
| return 1 | |
| stack.callback(lambda: child.stdout and child.stdout.close()) | |
| stack.callback(lambda: child.stderr and child.stderr.close()) | |
| stack.callback(lambda: child.stdin and child.stdin.close()) | |
| with forward_signals(child, verbose): | |
| return _pump_io(child, logs, verbose) | |
| def main(argv: Optional[Sequence[str]] = None) -> int: | |
| args = parse_args(argv) | |
| return run_wrapper(args) | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment