Created
January 25, 2026 06:30
-
-
Save zhangyoufu/e1f7745c411aa63037f1f5c4edc9fdec to your computer and use it in GitHub Desktop.
A Python script to parse and monitor Round-Trip Time (RTT) log files produced by pppd's `lcp-rtt-file` configuration directive.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from typing import Optional | |
| import argparse | |
| import ctypes | |
| import datetime | |
| import mmap | |
| import sys | |
| import time | |
| try: | |
| from ctypes import memoryview_at | |
| except ImportError: | |
| def memoryview_at(addr: int, len: int) -> memoryview: | |
| return memoryview((ctypes.c_ubyte * len).from_address(addr)) | |
| # https://stackoverflow.com/a/34096544 | |
| def fake_writable_buffer(obj: object) -> memoryview: | |
| address = ctypes.c_void_p() | |
| length = ctypes.c_ssize_t() | |
| ctypes.pythonapi.PyObject_AsReadBuffer(ctypes.py_object(obj), ctypes.byref(address), ctypes.byref(length)) | |
| return memoryview_at(address.value, length.value) | |
| class Header(ctypes.BigEndianStructure): | |
| _pack_ = 1 | |
| _fields_ = [ | |
| ('magic', ctypes.c_uint32), | |
| ('status', ctypes.c_uint32), | |
| ('last_entry', ctypes.c_uint32), | |
| ('echo_interval', ctypes.c_uint32), | |
| ] | |
| class Entry(ctypes.BigEndianStructure): | |
| _pack_ = 1 | |
| _fields_ = [ | |
| ('timestamp', ctypes.c_uint32), | |
| ('lost', ctypes.c_uint32, 8), | |
| ('rtt_us', ctypes.c_uint32, 24), | |
| ] | |
| @property | |
| def valid(self) -> bool: | |
| return self.lost > 0 or self.rtt_us > 0 | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Parse and display LCP RTT log files from pppd.') | |
| parser.add_argument('path', metavar='FILE', help='LCP RTT log file to parse') | |
| parser.add_argument('-f', '--follow', action='store_true', help='continue monitoring file for new entries (like tail -f)') | |
| args = parser.parse_args() | |
| with open(args.path, 'rb') as f: | |
| mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) | |
| # known issue: size change is not supported | |
| buf = fake_writable_buffer(mm) | |
| header = Header.from_buffer(buf) | |
| if header.magic != 0x19450425: | |
| raise ValueError('invalid magic') | |
| status = header.status | |
| status_str = {0: 'closed', 1: 'active'}.get(status, f'{status}') | |
| print(f'status: {status_str}') | |
| echo_interval = header.echo_interval | |
| print(f'echo_interval: {echo_interval}s') | |
| num_entries = (len(buf) - ctypes.sizeof(Header)) // ctypes.sizeof(Entry) | |
| entries = (Entry * num_entries).from_buffer(buf, ctypes.sizeof(Header)) | |
| last_entry = header.last_entry | |
| print('timestamp | rtt (ms) | lost') | |
| for entry in sorted((entry for entry in entries if entry.valid), key=lambda entry: entry.timestamp): | |
| print(f"{datetime.datetime.fromtimestamp(entry.timestamp).isoformat(' ')} | {entry.rtt_us/1000:8.3f} | {entry.lost}") | |
| if args.follow: | |
| def check_last_entry(last_entry: int) -> None: | |
| if (last_entry & 1) or last_entry >= num_entries * 2: | |
| raise ValueError('invalid last_entry') | |
| check_last_entry(last_entry) | |
| try: | |
| while True: | |
| time.sleep(1) | |
| _status = header.status | |
| if _status != status: | |
| print(f'status changed from {status} to {_status}') | |
| status = _status | |
| _echo_interval = header.echo_interval | |
| if _echo_interval != echo_interval: | |
| print(f'echo_interval changed from {echo_interval} to {_echo_interval}') | |
| echo_interval = _echo_interval | |
| _last_entry = header.last_entry | |
| if _last_entry != last_entry: | |
| check_last_entry(_last_entry) | |
| i = (last_entry // 2) | |
| end = (_last_entry // 2) | |
| while 1: | |
| i += 1 | |
| if i >= num_entries: | |
| i = 0 | |
| entry = entries[i] | |
| if entry.valid: | |
| print(f"{datetime.datetime.fromtimestamp(entry.timestamp).isoformat(' ')} | {entry.rtt_us/1000:8.3f} | {entry.lost}", flush=True) | |
| if i == end: | |
| break | |
| last_entry = _last_entry | |
| except KeyboardInterrupt: | |
| pass | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment