Skip to content

Instantly share code, notes, and snippets.

@cab938
Created November 7, 2025 19:44
Show Gist options
  • Select an option

  • Save cab938/350761f4c66e6d23e7da9cb4888056b0 to your computer and use it in GitHub Desktop.

Select an option

Save cab938/350761f4c66e6d23e7da9cb4888056b0 to your computer and use it in GitHub Desktop.
Logging for MCP client server communication over stdio
#!/usr/bin/env python3
"""
Transparent stdio wrapper for MCP processes.
This helper launches a child process, forwards stdin, stdout, and stderr between
the parent and child, and tees every byte into timestamped log files. The wrapper
avoids writing to stdout so that MCP JSON exchanges remain intact. Optional
verbose logs go to stderr for troubleshooting.
Examples
--------
python wrapper.py --log-base ripgrep docker compose run --rm -T mcp-ripgrep
"""
from __future__ import annotations
import argparse
import contextlib
import errno
import os
import selectors
import signal
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import BinaryIO, Callable, Iterator, Optional, Sequence
@dataclass(frozen=True)
class LogPaths:
directory: Path
stdin: Path
stdout: Path
stderr: Path
@dataclass
class LogFiles:
stdin: BinaryIO
stdout: BinaryIO
stderr: BinaryIO
@dataclass
class StreamRoute:
log: BinaryIO
target_fd: Optional[int]
label: str
def now_stamp(fmt: str) -> str:
return time.strftime(fmt, time.localtime())
def safe_write(fd: int, data: bytes) -> bool:
"""Write all data to file descriptor, returning False if the peer is closed."""
view = memoryview(data)
while view:
try:
written = os.write(fd, view)
except InterruptedError:
continue
except OSError as exc:
if exc.errno in (errno.EPIPE, errno.EBADF):
return False
raise
view = view[written:]
return True
def debug(enabled: bool, message: str) -> None:
if enabled:
print(message, file=sys.stderr)
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Forward stdio to a child process while teeing raw logs to disk."
)
parser.add_argument(
"--log-dir",
help="Directory for log files (defaults to LOG_DIR env var or ./mcp-logs).",
default=None,
)
parser.add_argument(
"--log-base",
default=os.environ.get("LOG_BASENAME", "mcp"),
help="Prefix for log files (default: %(default)s).",
)
parser.add_argument(
"--ts-fmt",
default=os.environ.get("TS_FMT", "%Y%m%d-%H%M%S"),
help="Timestamp format applied to log filenames (default: %(default)s).",
)
parser.add_argument(
"--verbose",
action="store_true",
default=bool(os.environ.get("WRAPPER_VERBOSE")),
help="Emit diagnostic information to stderr.",
)
parser.add_argument(
"cmd",
nargs=argparse.REMAINDER,
metavar="CMD",
help="Child command and arguments to execute.",
)
args = parser.parse_args(argv)
if not args.cmd:
parser.error("a command to wrap is required")
return args
def resolve_log_dir(raw: Optional[str]) -> Path:
candidate = raw or os.environ.get("LOG_DIR")
path = Path(candidate).expanduser() if candidate else Path.cwd() / "mcp-logs"
path.mkdir(parents=True, exist_ok=True)
return path
def build_log_paths(log_dir: Path, base: str, ts_fmt: str) -> LogPaths:
stamp = now_stamp(ts_fmt)
return LogPaths(
directory=log_dir,
stdin=log_dir / f"{base}.{stamp}.in.log",
stdout=log_dir / f"{base}.{stamp}.out.log",
stderr=log_dir / f"{base}.{stamp}.err.log",
)
def open_log_files(paths: LogPaths, stack: contextlib.ExitStack) -> LogFiles:
return LogFiles(
stdin=stack.enter_context(paths.stdin.open("ab", buffering=0)),
stdout=stack.enter_context(paths.stdout.open("ab", buffering=0)),
stderr=stack.enter_context(paths.stderr.open("ab", buffering=0)),
)
@contextlib.contextmanager
def forward_signals(child: subprocess.Popen[bytes], verbose: bool) -> Iterator[None]:
forwarded = []
def handler(signum: int, _frame: Optional[object]) -> None:
try:
name = signal.Signals(signum).name
except ValueError:
name = str(signum)
debug(verbose, f"Forwarding {name} to child.")
try:
child.send_signal(signum)
except Exception as exc: # pragma: no cover - defensive
debug(verbose, f"Failed to forward signal {name}: {exc}")
for sig_name in ("SIGINT", "SIGTERM", "SIGHUP", "SIGQUIT"):
sig = getattr(signal, sig_name, None)
if sig is None:
continue
previous = signal.getsignal(sig)
signal.signal(sig, handler)
forwarded.append((sig, previous))
sigchld = getattr(signal, "SIGCHLD", None)
if sigchld is not None:
# Leave SIGCHLD untouched so the wrapper can detect child exit promptly.
debug(verbose, "SIGCHLD is not forwarded; default handling informs the wrapper of child exit.")
try:
yield
finally:
for sig, previous in forwarded:
signal.signal(sig, previous)
def _handle_parent_stdin(
parent_fd: int,
child_fd: int,
log: BinaryIO,
close_cb: Callable[[str], None],
) -> bool:
try:
data = os.read(parent_fd, 65536)
except BlockingIOError:
return True
if not data:
close_cb("Upstream stdin reached EOF. Closed child stdin.")
return False
log.write(data)
accepted = safe_write(child_fd, data)
if not accepted:
close_cb("Child stdin closed early. Stopped forwarding input to avoid deadlock.")
return False
return True
def _forward_stream(source_fd: int, route: StreamRoute, verbose: bool) -> bool:
try:
data = os.read(source_fd, 65536)
except BlockingIOError:
return True
if not data:
debug(verbose, f"{route.label} reached EOF.")
return False
route.log.write(data)
if route.target_fd is not None:
accepted = safe_write(route.target_fd, data)
if not accepted:
debug(verbose, f"{route.label} consumer closed. Continuing to log only.")
route.target_fd = None
return True
def _drain_fd(fd: int, route: StreamRoute) -> None:
while True:
try:
chunk = os.read(fd, 65536)
except BlockingIOError:
continue
except OSError as exc:
if exc.errno in (errno.EBADF, errno.EINVAL):
break
raise
if not chunk:
break
route.log.write(chunk)
if route.target_fd is not None:
if not safe_write(route.target_fd, chunk):
route.target_fd = None
def _pump_io(child: subprocess.Popen[bytes], logs: LogFiles, verbose: bool) -> int:
if child.stdin is None or child.stdout is None or child.stderr is None:
raise RuntimeError("Child process not configured with pipes.")
selector = selectors.SelectSelector()
parent_in_fd = sys.stdin.buffer.fileno()
child_in_fd = child.stdin.fileno()
child_out_fd = child.stdout.fileno()
child_err_fd = child.stderr.fileno()
parent_out_fd = sys.stdout.buffer.fileno()
parent_err_fd = sys.stderr.buffer.fileno()
try:
import fcntl # type: ignore
flags = fcntl.fcntl(parent_in_fd, fcntl.F_GETFL)
fcntl.fcntl(parent_in_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
except Exception:
pass
selector.register(parent_in_fd, selectors.EVENT_READ)
selector.register(child_out_fd, selectors.EVENT_READ)
selector.register(child_err_fd, selectors.EVENT_READ)
routes = {
child_out_fd: StreamRoute(log=logs.stdout, target_fd=parent_out_fd, label="stdout"),
child_err_fd: StreamRoute(log=logs.stderr, target_fd=parent_err_fd, label="stderr"),
}
stdin_open = True
def close_child_stdin(reason: str) -> None:
nonlocal stdin_open
if not stdin_open:
return
stdin_open = False
try:
selector.unregister(parent_in_fd)
except Exception:
pass
try:
child.stdin.close()
except Exception:
pass
debug(verbose, reason)
while True:
if child.poll() is not None and not selector.get_map():
break
events = selector.select(timeout=0.2)
if not events and child.poll() is not None:
break
for key, _ in events:
fd = key.fd
if fd == parent_in_fd and stdin_open:
_handle_parent_stdin(parent_in_fd, child_in_fd, logs.stdin, close_child_stdin)
elif fd in routes:
keep = _forward_stream(fd, routes[fd], verbose)
if not keep:
try:
selector.unregister(fd)
except Exception:
pass
routes.pop(fd, None)
if child.poll() is not None and not routes and not stdin_open:
break
child.wait()
for fd, route in list(routes.items()):
_drain_fd(fd, route)
return child.returncode or 0
def run_wrapper(args: argparse.Namespace) -> int:
verbose = bool(args.verbose)
log_dir = resolve_log_dir(args.log_dir)
log_paths = build_log_paths(log_dir, args.log_base, args.ts_fmt)
with contextlib.ExitStack() as stack:
logs = open_log_files(log_paths, stack)
try:
child = subprocess.Popen(
args.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
close_fds=True,
)
except OSError as exc:
print(f"Failed to launch command {' '.join(args.cmd)}: {exc}", file=sys.stderr)
return 1
stack.callback(lambda: child.stdout and child.stdout.close())
stack.callback(lambda: child.stderr and child.stderr.close())
stack.callback(lambda: child.stdin and child.stdin.close())
with forward_signals(child, verbose):
return _pump_io(child, logs, verbose)
def main(argv: Optional[Sequence[str]] = None) -> int:
args = parse_args(argv)
return run_wrapper(args)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment