Created
December 1, 2025 13:58
-
-
Save 0xdeafbeef/d4d6395cb671241437bc0ceed5feab1a to your computer and use it in GitHub Desktop.
uv run wifi_stability_monitor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "rich>=13.7.0", | |
| # "pandas>=2.2.0", | |
| # "matplotlib>=3.8.0", | |
| # ] | |
| # /// | |
| import argparse | |
| import subprocess | |
| import threading | |
| import time | |
| import datetime as dt | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import re | |
| import shutil | |
| import sys | |
| from typing import Optional, List, Tuple | |
| from rich.console import Console | |
| from rich.table import Table | |
| from rich import box | |
| import pandas as pd | |
| import matplotlib | |
| matplotlib.use("Agg") # headless backend | |
| import matplotlib.pyplot as plt | |
| console = Console() | |
| # ---------------------- Data structures ---------------------- | |
| @dataclass | |
| class PingEvent: | |
| timestamp: dt.datetime | |
| status: str # "ok", "loss", "other" | |
| seq: Optional[int] | |
| rtt_ms: Optional[float] | |
| raw: str | |
| @dataclass | |
| class WifiSample: | |
| timestamp: dt.datetime | |
| signal_dbm: Optional[int] | |
| bitrate_mbps: Optional[float] | |
| ssid: Optional[str] | |
| raw: str | |
| # ---------------------- Helpers: detection ---------------------- | |
| def detect_default_gateway() -> Optional[str]: | |
| if shutil.which("ip") is None: | |
| return None | |
| try: | |
| out = subprocess.check_output(["ip", "route"], text=True) | |
| except Exception: | |
| return None | |
| for line in out.splitlines(): | |
| line = line.strip() | |
| if line.startswith("default "): | |
| parts = line.split() | |
| if "via" in parts: | |
| idx = parts.index("via") | |
| if idx + 1 < len(parts): | |
| return parts[idx + 1] | |
| return None | |
| def detect_wifi_iface() -> Optional[str]: | |
| if shutil.which("iw") is None: | |
| return None | |
| try: | |
| out = subprocess.check_output(["iw", "dev"], text=True) | |
| except Exception: | |
| return None | |
| iface = None | |
| current_iface = None | |
| for line in out.splitlines(): | |
| line = line.strip() | |
| if line.startswith("Interface "): | |
| current_iface = line.split()[1] | |
| elif line.startswith("type ") and "managed" in line and current_iface: | |
| iface = current_iface | |
| break | |
| return iface | |
| # ---------------------- Parsing helpers ---------------------- | |
| PING_TS_RE = re.compile(r"^\[(\d+(?:\.\d+)?)\]\s*(.*)$") | |
| def parse_ping_line(line: str) -> Optional[PingEvent]: | |
| """ | |
| Parse a single `ping -D -O` output line into a PingEvent. | |
| """ | |
| line = line.strip() | |
| if not line: | |
| return None | |
| m = PING_TS_RE.match(line) | |
| if m: | |
| ts_epoch = float(m.group(1)) | |
| text = m.group(2) | |
| timestamp = dt.datetime.fromtimestamp(ts_epoch) | |
| else: | |
| timestamp = dt.datetime.now() | |
| text = line | |
| status = "other" | |
| seq = None | |
| rtt_ms = None | |
| if "icmp_seq=" in text: | |
| seq_m = re.search(r"icmp_seq=(\d+)", text) | |
| if seq_m: | |
| try: | |
| seq = int(seq_m.group(1)) | |
| except ValueError: | |
| seq = None | |
| if "time=" in text: | |
| status = "ok" | |
| t_m = re.search(r"time=([\d\.]+)\s*ms", text) | |
| if t_m: | |
| try: | |
| rtt_ms = float(t_m.group(1)) | |
| except ValueError: | |
| rtt_ms = None | |
| elif ( | |
| "no answer yet" in text | |
| or "Destination Host Unreachable" in text | |
| or "Destination Net Unreachable" in text | |
| or "Network is unreachable" in text | |
| or "100% packet loss" in text | |
| or "Request timeout" in text | |
| or "ping: sendto" in text | |
| ): | |
| status = "loss" | |
| return PingEvent( | |
| timestamp=timestamp, | |
| status=status, | |
| seq=seq, | |
| rtt_ms=rtt_ms, | |
| raw=text, | |
| ) | |
| def parse_wifi_output(ts: dt.datetime, output: str) -> WifiSample: | |
| """ | |
| Parse `iw dev <iface> link` output into a WifiSample. | |
| """ | |
| signal_dbm = None | |
| bitrate_mbps = None | |
| ssid = None | |
| if "Not connected." in output: | |
| return WifiSample( | |
| timestamp=ts, | |
| signal_dbm=None, | |
| bitrate_mbps=None, | |
| ssid=None, | |
| raw=output, | |
| ) | |
| m = re.search(r"SSID:\s*(.+)", output) | |
| if m: | |
| ssid = m.group(1).strip() | |
| m = re.search(r"signal:\s*(-?\d+)\s*dBm", output) | |
| if m: | |
| try: | |
| signal_dbm = int(m.group(1)) | |
| except ValueError: | |
| signal_dbm = None | |
| m = re.search(r"tx bitrate:\s*([\d\.]+)\s*MBit/s", output) | |
| if m: | |
| try: | |
| bitrate_mbps = float(m.group(1)) | |
| except ValueError: | |
| bitrate_mbps = None | |
| return WifiSample( | |
| timestamp=ts, | |
| signal_dbm=signal_dbm, | |
| bitrate_mbps=bitrate_mbps, | |
| ssid=ssid, | |
| raw=output, | |
| ) | |
| # ---------------------- Monitors ---------------------- | |
| class PingMonitor: | |
| def __init__(self, name: str, target: str, log_path: Path): | |
| self.name = name | |
| self.target = target | |
| self.log_path = log_path | |
| self.proc: Optional[subprocess.Popen] = None | |
| self.thread: Optional[threading.Thread] = None | |
| self.events: List[PingEvent] = [] | |
| def start(self, stop_event: threading.Event) -> None: | |
| self.thread = threading.Thread( | |
| target=self._run, args=(stop_event,), daemon=True | |
| ) | |
| self.thread.start() | |
| def _run(self, stop_event: threading.Event) -> None: | |
| if shutil.which("ping") is None: | |
| console.print(f"[red]`ping` not found; skipping ping to {self.target}[/red]") | |
| return | |
| cmd = ["ping", "-D", "-O", "-i", "1", self.target] | |
| try: | |
| self.proc = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| ) | |
| except Exception as e: | |
| console.print(f"[red]Failed to start ping to {self.target}: {e}[/red]") | |
| return | |
| with self.log_path.open("w", encoding="utf-8") as f: | |
| f.write("# Command: " + " ".join(cmd) + "\n") | |
| if not self.proc.stdout: | |
| return | |
| for line in self.proc.stdout: | |
| if not line: | |
| break | |
| line = line.rstrip("\n") | |
| f.write(line + "\n") | |
| f.flush() | |
| evt = parse_ping_line(line) | |
| if evt: | |
| self.events.append(evt) | |
| if stop_event.is_set(): | |
| break | |
| if self.proc and self.proc.poll() is None: | |
| try: | |
| self.proc.terminate() | |
| except Exception: | |
| pass | |
| def stop(self) -> None: | |
| if self.proc and self.proc.poll() is None: | |
| try: | |
| self.proc.terminate() | |
| except Exception: | |
| pass | |
| def join(self) -> None: | |
| if self.thread: | |
| self.thread.join() | |
| class JournalMonitor: | |
| def __init__(self, log_path: Path): | |
| self.log_path = log_path | |
| self.proc: Optional[subprocess.Popen] = None | |
| self.thread: Optional[threading.Thread] = None | |
| self.enabled = shutil.which("journalctl") is not None | |
| def start(self, stop_event: threading.Event) -> None: | |
| if not self.enabled: | |
| console.print("[yellow]`journalctl` not found; skipping log capture.[/yellow]") | |
| return | |
| self.thread = threading.Thread( | |
| target=self._run, args=(stop_event,), daemon=True | |
| ) | |
| self.thread.start() | |
| def _run(self, stop_event: threading.Event) -> None: | |
| cmd = [ | |
| "journalctl", | |
| "-f", | |
| "-o", | |
| "short-iso", | |
| "-u", | |
| "NetworkManager", | |
| "-u", | |
| "wpa_supplicant", | |
| ] | |
| try: | |
| self.proc = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| ) | |
| except Exception as e: | |
| console.print(f"[red]Failed to start journalctl: {e}[/red]") | |
| return | |
| with self.log_path.open("w", encoding="utf-8") as f: | |
| f.write("# Command: " + " ".join(cmd) + "\n") | |
| if not self.proc.stdout: | |
| return | |
| for line in self.proc.stdout: | |
| if not line: | |
| break | |
| f.write(line) | |
| f.flush() | |
| if stop_event.is_set(): | |
| break | |
| if self.proc and self.proc.poll() is None: | |
| try: | |
| self.proc.terminate() | |
| except Exception: | |
| pass | |
| def stop(self) -> None: | |
| if self.proc and self.proc.poll() is None: | |
| try: | |
| self.proc.terminate() | |
| except Exception: | |
| pass | |
| def join(self) -> None: | |
| if self.thread: | |
| self.thread.join() | |
| def wifi_sampler( | |
| stop_event: threading.Event, | |
| iface: str, | |
| log_path: Path, | |
| samples: List[WifiSample], | |
| interval: float = 10.0, | |
| ) -> None: | |
| if shutil.which("iw") is None: | |
| console.print("[yellow]`iw` not found; skipping Wi‑Fi sampling.[/yellow]") | |
| return | |
| with log_path.open("w", encoding="utf-8") as f: | |
| f.write(f"# Wi‑Fi link samples for interface {iface}\n") | |
| while not stop_event.is_set(): | |
| ts = dt.datetime.now() | |
| try: | |
| out = subprocess.check_output( | |
| ["iw", "dev", iface, "link"], | |
| text=True, | |
| stderr=subprocess.STDOUT, | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| out = e.output or str(e) | |
| except Exception as e: | |
| out = str(e) | |
| sample = parse_wifi_output(ts, out) | |
| samples.append(sample) | |
| f.write(f"\n=== {ts.isoformat()} ===\n") | |
| f.write(out) | |
| if not out.endswith("\n"): | |
| f.write("\n") | |
| f.flush() | |
| remaining = interval | |
| while remaining > 0 and not stop_event.is_set(): | |
| step = min(0.5, remaining) | |
| time.sleep(step) | |
| remaining -= step | |
| # ---------------------- Analysis helpers ---------------------- | |
| def summarize_ping( | |
| events: List[PingEvent], | |
| label: str, | |
| ) -> Tuple[pd.DataFrame, List[Tuple[dt.datetime, dt.datetime, int]]]: | |
| df = pd.DataFrame( | |
| { | |
| "timestamp": [e.timestamp for e in events], | |
| "status": [e.status for e in events], | |
| "seq": [e.seq for e in events], | |
| "rtt_ms": [e.rtt_ms for e in events], | |
| } | |
| ) | |
| valid_mask = df["status"].isin(["ok", "loss"]) | |
| df_valid = df[valid_mask] | |
| if df_valid.empty: | |
| console.print(f"[{label}] No ping data captured.") | |
| return df, [] | |
| total = len(df_valid) | |
| loss = int((df_valid["status"] == "loss").sum()) | |
| ok = total - loss | |
| loss_pct = (loss / total) * 100.0 if total else 0.0 | |
| rtts = df_valid["rtt_ms"].dropna() | |
| table = Table( | |
| title=f"{label} ping summary", | |
| box=box.HEAVY_HEAD, | |
| ) | |
| table.add_column("Metric", justify="left") | |
| table.add_column("Value", justify="right") | |
| table.add_row("Total pings", str(total)) | |
| table.add_row("OK", str(ok)) | |
| table.add_row("Lost", str(loss)) | |
| table.add_row("Loss %", f"{loss_pct:.2f}%") | |
| if not rtts.empty: | |
| table.add_row( | |
| "RTT min / avg / max", | |
| f"{rtts.min():.2f} / {rtts.mean():.2f} / {rtts.max():.2f} ms", | |
| ) | |
| console.print(table) | |
| # Detect outages (>=3 consecutive lost pings) | |
| valid_events = [e for e in events if e.status in ("ok", "loss")] | |
| outages: List[Tuple[dt.datetime, dt.datetime, int]] = [] | |
| current_start: Optional[dt.datetime] = None | |
| current_count = 0 | |
| for e in valid_events: | |
| if e.status == "loss": | |
| if current_start is None: | |
| current_start = e.timestamp | |
| current_count = 1 | |
| else: | |
| current_count += 1 | |
| else: | |
| if current_start is not None: | |
| outages.append((current_start, e.timestamp, current_count)) | |
| current_start = None | |
| current_count = 0 | |
| if current_start is not None: | |
| outages.append((current_start, valid_events[-1].timestamp, current_count)) | |
| major = [o for o in outages if o[2] >= 3] | |
| if major: | |
| console.print( | |
| f"[{label}] Sustained outages (>=3 consecutive losses): {len(major)}" | |
| ) | |
| for start, end, count in major[:10]: | |
| dur = (end - start).total_seconds() | |
| console.print( | |
| f" • {start.strftime('%F %T')} → {end.strftime('%T')} " | |
| f"({dur:.1f}s, {count} lost)" | |
| ) | |
| if len(major) > 10: | |
| console.print(f" • ... and {len(major) - 10} more") | |
| else: | |
| console.print(f"[{label}] No sustained outages (>=3 consecutive losses).") | |
| return df, major | |
| def summarize_wifi(samples: List[WifiSample]) -> pd.DataFrame: | |
| if not samples: | |
| console.print("[Wi‑Fi] No Wi‑Fi samples captured.") | |
| return pd.DataFrame(columns=["timestamp", "signal_dbm", "bitrate_mbps", "ssid"]) | |
| df = pd.DataFrame( | |
| { | |
| "timestamp": [s.timestamp for s in samples], | |
| "signal_dbm": [s.signal_dbm for s in samples], | |
| "bitrate_mbps": [s.bitrate_mbps for s in samples], | |
| "ssid": [s.ssid for s in samples], | |
| } | |
| ) | |
| signals = df["signal_dbm"].dropna() | |
| bitrates = df["bitrate_mbps"].dropna() | |
| ssids = sorted({s for s in df["ssid"].dropna()}) | |
| table = Table( | |
| title="Wi‑Fi link summary", | |
| box=box.HEAVY_HEAD, | |
| ) | |
| table.add_column("Metric", justify="left") | |
| table.add_column("Value", justify="right") | |
| table.add_row("Samples", str(len(df))) | |
| if not signals.empty: | |
| table.add_row( | |
| "Signal min / avg / max", | |
| f"{signals.min():.0f} / {signals.mean():.1f} / {signals.max():.0f} dBm", | |
| ) | |
| if not bitrates.empty: | |
| table.add_row( | |
| "Bitrate min / avg / max", | |
| f"{bitrates.min():.1f} / {bitrates.mean():.1f} / {bitrates.max():.1f} MBit/s", | |
| ) | |
| if ssids: | |
| table.add_row("SSIDs seen", ", ".join(ssids)) | |
| console.print(table) | |
| # Disconnected periods (signal_dbm is None) | |
| disconnected_periods: List[Tuple[dt.datetime, dt.datetime]] = [] | |
| prev_connected: Optional[bool] = None | |
| current_start: Optional[dt.datetime] = None | |
| for s in samples: | |
| connected = s.signal_dbm is not None | |
| if prev_connected is None: | |
| prev_connected = connected | |
| if not connected: | |
| current_start = s.timestamp | |
| continue | |
| if connected != prev_connected: | |
| if not connected: | |
| current_start = s.timestamp | |
| else: | |
| if current_start: | |
| disconnected_periods.append((current_start, s.timestamp)) | |
| current_start = None | |
| prev_connected = connected | |
| if current_start and (prev_connected is False): | |
| disconnected_periods.append((current_start, samples[-1].timestamp)) | |
| if disconnected_periods: | |
| console.print( | |
| f"[Wi‑Fi] Disconnected periods detected: {len(disconnected_periods)}" | |
| ) | |
| for start, end in disconnected_periods[:5]: | |
| dur = (end - start).total_seconds() | |
| console.print( | |
| f" • {start.strftime('%F %T')} → {end.strftime('%T')} ({dur:.1f}s)" | |
| ) | |
| if len(disconnected_periods) > 5: | |
| console.print(f" • ... and {len(disconnected_periods) - 5} more") | |
| return df | |
| def classify_outages( | |
| inet_outages: List[Tuple[dt.datetime, dt.datetime, int]], | |
| router_outages: List[Tuple[dt.datetime, dt.datetime, int]], | |
| ) -> None: | |
| if not inet_outages: | |
| return | |
| if not router_outages: | |
| console.print( | |
| "[yellow]No router ping outages to compare; cannot classify local vs upstream.[/yellow]" | |
| ) | |
| return | |
| local = 0 | |
| upstream = 0 | |
| for istart, iend, _ in inet_outages: | |
| overlaps = False | |
| for rstart, rend, _ in router_outages: | |
| if rend >= istart and rstart <= iend: | |
| overlaps = True | |
| break | |
| if overlaps: | |
| local += 1 | |
| else: | |
| upstream += 1 | |
| console.print( | |
| "\n[bold]Classification[/bold] (based on sustained internet outages):\n" | |
| f" • Likely local (Wi‑Fi/router) issues: {local}\n" | |
| f" • Likely upstream (ISP/internet) issues: {upstream}" | |
| ) | |
| def correlate_outages_with_wifi( | |
| inet_outages: List[Tuple[dt.datetime, dt.datetime, int]], | |
| wifi_samples: List[WifiSample], | |
| ) -> None: | |
| if not inet_outages or not wifi_samples: | |
| return | |
| wifi_sorted = sorted(wifi_samples, key=lambda s: s.timestamp) | |
| def nearest_wifi(ts: dt.datetime) -> WifiSample: | |
| return min( | |
| wifi_sorted, | |
| key=lambda s: abs((s.timestamp - ts).total_seconds()), | |
| ) | |
| console.print("\n[bold]Correlation[/bold] Internet outages vs Wi‑Fi near start:") | |
| for start, end, count in inet_outages[:10]: | |
| nearest = nearest_wifi(start) | |
| dt_secs = abs((nearest.timestamp - start).total_seconds()) | |
| sig = f"{nearest.signal_dbm} dBm" if nearest.signal_dbm is not None else "N/A" | |
| rate = ( | |
| f"{nearest.bitrate_mbps:.1f} MBit/s" | |
| if nearest.bitrate_mbps is not None | |
| else "N/A" | |
| ) | |
| console.print( | |
| f" • {start.strftime('%F %T')} " | |
| f"(~{(end - start).total_seconds():.1f}s, {count} lost)" | |
| f" → Wi‑Fi @ {nearest.timestamp.strftime('%T')} " | |
| f"(Δt={dt_secs:.0f}s, signal={sig}, rate={rate})" | |
| ) | |
| # ---------------------- Plotting ---------------------- | |
| def plot_ping_rtt(df: pd.DataFrame, label: str, plot_dir: Path) -> Optional[Path]: | |
| df_ok = df[df["status"] == "ok"].copy() | |
| if df_ok.empty or df_ok["rtt_ms"].dropna().empty: | |
| return None | |
| fig, ax = plt.subplots(figsize=(10, 4)) | |
| ax.plot(df_ok["timestamp"], df_ok["rtt_ms"], marker=".", linestyle="-", linewidth=0.8, markersize=2) | |
| ax.set_title(f"{label} RTT over time") | |
| ax.set_ylabel("RTT (ms)") | |
| ax.set_xlabel("Time") | |
| fig.autofmt_xdate() | |
| fig.tight_layout() | |
| filename = f"{label.lower()}_rtt.png".replace(" ", "_") | |
| out_path = plot_dir / filename | |
| fig.savefig(out_path) | |
| plt.close(fig) | |
| return out_path | |
| def plot_wifi_series( | |
| df: pd.DataFrame, | |
| column: str, | |
| title: str, | |
| ylabel: str, | |
| filename: str, | |
| plot_dir: Path, | |
| ) -> Optional[Path]: | |
| df_nonnull = df[df[column].notna()] | |
| if df_nonnull.empty: | |
| return None | |
| fig, ax = plt.subplots(figsize=(10, 4)) | |
| ax.plot(df_nonnull["timestamp"], df_nonnull[column], linestyle="-", marker=".") | |
| ax.set_title(title) | |
| ax.set_ylabel(ylabel) | |
| ax.set_xlabel("Time") | |
| fig.autofmt_xdate() | |
| fig.tight_layout() | |
| out_path = plot_dir / filename | |
| fig.savefig(out_path) | |
| plt.close(fig) | |
| return out_path | |
| def generate_plots( | |
| router_df: pd.DataFrame, | |
| inet_df: pd.DataFrame, | |
| wifi_df: pd.DataFrame, | |
| plot_dir: Path, | |
| ) -> None: | |
| plot_dir.mkdir(parents=True, exist_ok=True) | |
| generated: List[Path] = [] | |
| if not inet_df.empty: | |
| p = plot_ping_rtt(inet_df, "Internet", plot_dir) | |
| if p: | |
| generated.append(p) | |
| if not router_df.empty: | |
| p = plot_ping_rtt(router_df, "Router", plot_dir) | |
| if p: | |
| generated.append(p) | |
| if not wifi_df.empty: | |
| p = plot_wifi_series( | |
| wifi_df, | |
| "signal_dbm", | |
| "Wi‑Fi signal strength", | |
| "Signal (dBm)", | |
| "wifi_signal.png", | |
| plot_dir, | |
| ) | |
| if p: | |
| generated.append(p) | |
| p = plot_wifi_series( | |
| wifi_df, | |
| "bitrate_mbps", | |
| "Wi‑Fi bitrate", | |
| "Bitrate (MBit/s)", | |
| "wifi_bitrate.png", | |
| plot_dir, | |
| ) | |
| if p: | |
| generated.append(p) | |
| if generated: | |
| console.print("\n[bold]Plots saved:[/bold]") | |
| for path in generated: | |
| console.print(f" • {path}") | |
| else: | |
| console.print("\n[bold]No plots generated (no data).[/bold]") | |
| # ---------------------- Main orchestration ---------------------- | |
| def analyze_all( | |
| router_monitor: Optional[PingMonitor], | |
| inet_monitor: Optional[PingMonitor], | |
| wifi_samples: List[WifiSample], | |
| log_dir: Path, | |
| make_plots: bool, | |
| ) -> None: | |
| console.rule("[bold]Analysis summary[/bold]") | |
| inet_outages: List[Tuple[dt.datetime, dt.datetime, int]] = [] | |
| router_outages: List[Tuple[dt.datetime, dt.datetime, int]] = [] | |
| inet_df = pd.DataFrame() | |
| router_df = pd.DataFrame() | |
| wifi_df = pd.DataFrame() | |
| if inet_monitor and inet_monitor.events: | |
| inet_df, inet_outages = summarize_ping(inet_monitor.events, "Internet") | |
| else: | |
| console.print("[Internet] No ping data.") | |
| if router_monitor and router_monitor.events: | |
| router_df, router_outages = summarize_ping(router_monitor.events, "Router") | |
| else: | |
| console.print("[Router] No ping data.") | |
| console.print() | |
| wifi_df = summarize_wifi(wifi_samples) | |
| if inet_outages and router_outages: | |
| classify_outages(inet_outages, router_outages) | |
| if inet_outages and wifi_samples: | |
| correlate_outages_with_wifi(inet_outages, wifi_samples) | |
| if make_plots: | |
| try: | |
| generate_plots(router_df, inet_df, wifi_df, log_dir / "plots") | |
| except Exception as e: | |
| console.print(f"[red]Failed to generate plots: {e}[/red]") | |
| console.print(f"\nRaw logs directory: [bold]{log_dir}[/bold]") | |
| console.print(" - ping_router.log / ping_internet.log") | |
| console.print(" - wifi_link.log") | |
| console.print(" - journal_wifi.log (if available)") | |
| console.print(" - plots/*.png (RTT & Wi‑Fi graphs)") | |
| def parse_args() -> argparse.Namespace: | |
| p = argparse.ArgumentParser( | |
| description="Wi‑Fi stability recorder & analyzer (with plots)" | |
| ) | |
| p.add_argument( | |
| "--router", | |
| help="Router IP; if omitted, auto-detected from `ip route`.", | |
| ) | |
| p.add_argument( | |
| "--target", | |
| default="8.8.8.8", | |
| help="Internet ping target (default: 8.8.8.8).", | |
| ) | |
| p.add_argument( | |
| "--iface", | |
| help="Wi‑Fi interface (e.g. wlp2s0); if omitted, auto-detected from `iw dev`.", | |
| ) | |
| p.add_argument( | |
| "--wifi-interval", | |
| type=float, | |
| default=10.0, | |
| help="Seconds between Wi‑Fi samples (default: 10).", | |
| ) | |
| p.add_argument( | |
| "--no-plots", | |
| dest="plots", | |
| action="store_false", | |
| help="Disable plot generation.", | |
| ) | |
| p.set_defaults(plots=True) | |
| return p.parse_args() | |
| def main() -> None: | |
| args = parse_args() | |
| base_dir = Path.home() / "wifi-debug" | |
| base_dir.mkdir(parents=True, exist_ok=True) | |
| ts_str = dt.datetime.now().strftime("%Y%m%d-%H%M%S") | |
| log_dir = base_dir / ts_str | |
| log_dir.mkdir(parents=True, exist_ok=True) | |
| console.print(f"Log directory: [bold]{log_dir}[/bold]") | |
| router_ip = args.router or detect_default_gateway() | |
| if router_ip: | |
| console.print(f"Router IP: [bold]{router_ip}[/bold]") | |
| else: | |
| console.print( | |
| "[yellow]Router IP could not be auto-detected; router ping will be skipped.[/yellow]" | |
| ) | |
| iface = args.iface or detect_wifi_iface() | |
| if iface: | |
| console.print(f"Wi‑Fi interface: [bold]{iface}[/bold]") | |
| else: | |
| console.print( | |
| "[yellow]Wi‑Fi interface could not be auto-detected; Wi‑Fi sampling will be skipped.[/yellow]" | |
| ) | |
| stop_event = threading.Event() | |
| ping_monitors: List[PingMonitor] = [] | |
| router_monitor: Optional[PingMonitor] = None | |
| inet_monitor: Optional[PingMonitor] = None | |
| if router_ip: | |
| router_monitor = PingMonitor("router", router_ip, log_dir / "ping_router.log") | |
| router_monitor.start(stop_event) | |
| ping_monitors.append(router_monitor) | |
| inet_monitor = PingMonitor( | |
| "internet", args.target, log_dir / "ping_internet.log" | |
| ) | |
| inet_monitor.start(stop_event) | |
| ping_monitors.append(inet_monitor) | |
| wifi_samples: List[WifiSample] = [] | |
| wifi_thread: Optional[threading.Thread] = None | |
| if iface: | |
| wifi_thread = threading.Thread( | |
| target=wifi_sampler, | |
| args=( | |
| stop_event, | |
| iface, | |
| log_dir / "wifi_link.log", | |
| wifi_samples, | |
| args.wifi_interval, | |
| ), | |
| daemon=True, | |
| ) | |
| wifi_thread.start() | |
| journal_monitor = JournalMonitor(log_dir / "journal_wifi.log") | |
| journal_monitor.start(stop_event) | |
| console.print( | |
| "\nRecording started. Press [bold]Ctrl+C[/bold] to stop and run analysis.\n" | |
| ) | |
| try: | |
| while True: | |
| time.sleep(1.0) | |
| except KeyboardInterrupt: | |
| console.print("\nStopping capture, please wait...") | |
| stop_event.set() | |
| for m in ping_monitors: | |
| m.stop() | |
| journal_monitor.stop() | |
| if wifi_thread: | |
| wifi_thread.join(timeout=2.0) | |
| for m in ping_monitors: | |
| m.join() | |
| journal_monitor.join() | |
| console.print("\nCapture stopped.\n") | |
| analyze_all(router_monitor, inet_monitor, wifi_samples, log_dir, args.plots) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment