Skip to content

Instantly share code, notes, and snippets.

@zhangyoufu
Created January 25, 2026 06:30
Show Gist options
  • Select an option

  • Save zhangyoufu/e1f7745c411aa63037f1f5c4edc9fdec to your computer and use it in GitHub Desktop.

Select an option

Save zhangyoufu/e1f7745c411aa63037f1f5c4edc9fdec to your computer and use it in GitHub Desktop.
A Python script to parse and monitor Round-Trip Time (RTT) log files produced by pppd's `lcp-rtt-file` configuration directive.
#!/usr/bin/env python3
from typing import Optional
import argparse
import ctypes
import datetime
import mmap
import sys
import time
try:
from ctypes import memoryview_at
except ImportError:
def memoryview_at(addr: int, len: int) -> memoryview:
return memoryview((ctypes.c_ubyte * len).from_address(addr))
# https://stackoverflow.com/a/34096544
def fake_writable_buffer(obj: object) -> memoryview:
address = ctypes.c_void_p()
length = ctypes.c_ssize_t()
ctypes.pythonapi.PyObject_AsReadBuffer(ctypes.py_object(obj), ctypes.byref(address), ctypes.byref(length))
return memoryview_at(address.value, length.value)
class Header(ctypes.BigEndianStructure):
_pack_ = 1
_fields_ = [
('magic', ctypes.c_uint32),
('status', ctypes.c_uint32),
('last_entry', ctypes.c_uint32),
('echo_interval', ctypes.c_uint32),
]
class Entry(ctypes.BigEndianStructure):
_pack_ = 1
_fields_ = [
('timestamp', ctypes.c_uint32),
('lost', ctypes.c_uint32, 8),
('rtt_us', ctypes.c_uint32, 24),
]
@property
def valid(self) -> bool:
return self.lost > 0 or self.rtt_us > 0
def main():
parser = argparse.ArgumentParser(description='Parse and display LCP RTT log files from pppd.')
parser.add_argument('path', metavar='FILE', help='LCP RTT log file to parse')
parser.add_argument('-f', '--follow', action='store_true', help='continue monitoring file for new entries (like tail -f)')
args = parser.parse_args()
with open(args.path, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
# known issue: size change is not supported
buf = fake_writable_buffer(mm)
header = Header.from_buffer(buf)
if header.magic != 0x19450425:
raise ValueError('invalid magic')
status = header.status
status_str = {0: 'closed', 1: 'active'}.get(status, f'{status}')
print(f'status: {status_str}')
echo_interval = header.echo_interval
print(f'echo_interval: {echo_interval}s')
num_entries = (len(buf) - ctypes.sizeof(Header)) // ctypes.sizeof(Entry)
entries = (Entry * num_entries).from_buffer(buf, ctypes.sizeof(Header))
last_entry = header.last_entry
print('timestamp | rtt (ms) | lost')
for entry in sorted((entry for entry in entries if entry.valid), key=lambda entry: entry.timestamp):
print(f"{datetime.datetime.fromtimestamp(entry.timestamp).isoformat(' ')} | {entry.rtt_us/1000:8.3f} | {entry.lost}")
if args.follow:
def check_last_entry(last_entry: int) -> None:
if (last_entry & 1) or last_entry >= num_entries * 2:
raise ValueError('invalid last_entry')
check_last_entry(last_entry)
try:
while True:
time.sleep(1)
_status = header.status
if _status != status:
print(f'status changed from {status} to {_status}')
status = _status
_echo_interval = header.echo_interval
if _echo_interval != echo_interval:
print(f'echo_interval changed from {echo_interval} to {_echo_interval}')
echo_interval = _echo_interval
_last_entry = header.last_entry
if _last_entry != last_entry:
check_last_entry(_last_entry)
i = (last_entry // 2)
end = (_last_entry // 2)
while 1:
i += 1
if i >= num_entries:
i = 0
entry = entries[i]
if entry.valid:
print(f"{datetime.datetime.fromtimestamp(entry.timestamp).isoformat(' ')} | {entry.rtt_us/1000:8.3f} | {entry.lost}", flush=True)
if i == end:
break
last_entry = _last_entry
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment