Skip to content

Instantly share code, notes, and snippets.

@0xdeafbeef
Created December 1, 2025 13:58
Show Gist options
  • Select an option

  • Save 0xdeafbeef/d4d6395cb671241437bc0ceed5feab1a to your computer and use it in GitHub Desktop.

Select an option

Save 0xdeafbeef/d4d6395cb671241437bc0ceed5feab1a to your computer and use it in GitHub Desktop.
uv run wifi_stability_monitor.py
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "rich>=13.7.0",
# "pandas>=2.2.0",
# "matplotlib>=3.8.0",
# ]
# ///
import argparse
import subprocess
import threading
import time
import datetime as dt
from dataclasses import dataclass
from pathlib import Path
import re
import shutil
import sys
from typing import Optional, List, Tuple
from rich.console import Console
from rich.table import Table
from rich import box
import pandas as pd
import matplotlib
matplotlib.use("Agg") # headless backend
import matplotlib.pyplot as plt
console = Console()
# ---------------------- Data structures ----------------------
@dataclass
class PingEvent:
timestamp: dt.datetime
status: str # "ok", "loss", "other"
seq: Optional[int]
rtt_ms: Optional[float]
raw: str
@dataclass
class WifiSample:
timestamp: dt.datetime
signal_dbm: Optional[int]
bitrate_mbps: Optional[float]
ssid: Optional[str]
raw: str
# ---------------------- Helpers: detection ----------------------
def detect_default_gateway() -> Optional[str]:
if shutil.which("ip") is None:
return None
try:
out = subprocess.check_output(["ip", "route"], text=True)
except Exception:
return None
for line in out.splitlines():
line = line.strip()
if line.startswith("default "):
parts = line.split()
if "via" in parts:
idx = parts.index("via")
if idx + 1 < len(parts):
return parts[idx + 1]
return None
def detect_wifi_iface() -> Optional[str]:
if shutil.which("iw") is None:
return None
try:
out = subprocess.check_output(["iw", "dev"], text=True)
except Exception:
return None
iface = None
current_iface = None
for line in out.splitlines():
line = line.strip()
if line.startswith("Interface "):
current_iface = line.split()[1]
elif line.startswith("type ") and "managed" in line and current_iface:
iface = current_iface
break
return iface
# ---------------------- Parsing helpers ----------------------
PING_TS_RE = re.compile(r"^\[(\d+(?:\.\d+)?)\]\s*(.*)$")
def parse_ping_line(line: str) -> Optional[PingEvent]:
"""
Parse a single `ping -D -O` output line into a PingEvent.
"""
line = line.strip()
if not line:
return None
m = PING_TS_RE.match(line)
if m:
ts_epoch = float(m.group(1))
text = m.group(2)
timestamp = dt.datetime.fromtimestamp(ts_epoch)
else:
timestamp = dt.datetime.now()
text = line
status = "other"
seq = None
rtt_ms = None
if "icmp_seq=" in text:
seq_m = re.search(r"icmp_seq=(\d+)", text)
if seq_m:
try:
seq = int(seq_m.group(1))
except ValueError:
seq = None
if "time=" in text:
status = "ok"
t_m = re.search(r"time=([\d\.]+)\s*ms", text)
if t_m:
try:
rtt_ms = float(t_m.group(1))
except ValueError:
rtt_ms = None
elif (
"no answer yet" in text
or "Destination Host Unreachable" in text
or "Destination Net Unreachable" in text
or "Network is unreachable" in text
or "100% packet loss" in text
or "Request timeout" in text
or "ping: sendto" in text
):
status = "loss"
return PingEvent(
timestamp=timestamp,
status=status,
seq=seq,
rtt_ms=rtt_ms,
raw=text,
)
def parse_wifi_output(ts: dt.datetime, output: str) -> WifiSample:
"""
Parse `iw dev <iface> link` output into a WifiSample.
"""
signal_dbm = None
bitrate_mbps = None
ssid = None
if "Not connected." in output:
return WifiSample(
timestamp=ts,
signal_dbm=None,
bitrate_mbps=None,
ssid=None,
raw=output,
)
m = re.search(r"SSID:\s*(.+)", output)
if m:
ssid = m.group(1).strip()
m = re.search(r"signal:\s*(-?\d+)\s*dBm", output)
if m:
try:
signal_dbm = int(m.group(1))
except ValueError:
signal_dbm = None
m = re.search(r"tx bitrate:\s*([\d\.]+)\s*MBit/s", output)
if m:
try:
bitrate_mbps = float(m.group(1))
except ValueError:
bitrate_mbps = None
return WifiSample(
timestamp=ts,
signal_dbm=signal_dbm,
bitrate_mbps=bitrate_mbps,
ssid=ssid,
raw=output,
)
# ---------------------- Monitors ----------------------
class PingMonitor:
def __init__(self, name: str, target: str, log_path: Path):
self.name = name
self.target = target
self.log_path = log_path
self.proc: Optional[subprocess.Popen] = None
self.thread: Optional[threading.Thread] = None
self.events: List[PingEvent] = []
def start(self, stop_event: threading.Event) -> None:
self.thread = threading.Thread(
target=self._run, args=(stop_event,), daemon=True
)
self.thread.start()
def _run(self, stop_event: threading.Event) -> None:
if shutil.which("ping") is None:
console.print(f"[red]`ping` not found; skipping ping to {self.target}[/red]")
return
cmd = ["ping", "-D", "-O", "-i", "1", self.target]
try:
self.proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
except Exception as e:
console.print(f"[red]Failed to start ping to {self.target}: {e}[/red]")
return
with self.log_path.open("w", encoding="utf-8") as f:
f.write("# Command: " + " ".join(cmd) + "\n")
if not self.proc.stdout:
return
for line in self.proc.stdout:
if not line:
break
line = line.rstrip("\n")
f.write(line + "\n")
f.flush()
evt = parse_ping_line(line)
if evt:
self.events.append(evt)
if stop_event.is_set():
break
if self.proc and self.proc.poll() is None:
try:
self.proc.terminate()
except Exception:
pass
def stop(self) -> None:
if self.proc and self.proc.poll() is None:
try:
self.proc.terminate()
except Exception:
pass
def join(self) -> None:
if self.thread:
self.thread.join()
class JournalMonitor:
def __init__(self, log_path: Path):
self.log_path = log_path
self.proc: Optional[subprocess.Popen] = None
self.thread: Optional[threading.Thread] = None
self.enabled = shutil.which("journalctl") is not None
def start(self, stop_event: threading.Event) -> None:
if not self.enabled:
console.print("[yellow]`journalctl` not found; skipping log capture.[/yellow]")
return
self.thread = threading.Thread(
target=self._run, args=(stop_event,), daemon=True
)
self.thread.start()
def _run(self, stop_event: threading.Event) -> None:
cmd = [
"journalctl",
"-f",
"-o",
"short-iso",
"-u",
"NetworkManager",
"-u",
"wpa_supplicant",
]
try:
self.proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
except Exception as e:
console.print(f"[red]Failed to start journalctl: {e}[/red]")
return
with self.log_path.open("w", encoding="utf-8") as f:
f.write("# Command: " + " ".join(cmd) + "\n")
if not self.proc.stdout:
return
for line in self.proc.stdout:
if not line:
break
f.write(line)
f.flush()
if stop_event.is_set():
break
if self.proc and self.proc.poll() is None:
try:
self.proc.terminate()
except Exception:
pass
def stop(self) -> None:
if self.proc and self.proc.poll() is None:
try:
self.proc.terminate()
except Exception:
pass
def join(self) -> None:
if self.thread:
self.thread.join()
def wifi_sampler(
stop_event: threading.Event,
iface: str,
log_path: Path,
samples: List[WifiSample],
interval: float = 10.0,
) -> None:
if shutil.which("iw") is None:
console.print("[yellow]`iw` not found; skipping Wi‑Fi sampling.[/yellow]")
return
with log_path.open("w", encoding="utf-8") as f:
f.write(f"# Wi‑Fi link samples for interface {iface}\n")
while not stop_event.is_set():
ts = dt.datetime.now()
try:
out = subprocess.check_output(
["iw", "dev", iface, "link"],
text=True,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
out = e.output or str(e)
except Exception as e:
out = str(e)
sample = parse_wifi_output(ts, out)
samples.append(sample)
f.write(f"\n=== {ts.isoformat()} ===\n")
f.write(out)
if not out.endswith("\n"):
f.write("\n")
f.flush()
remaining = interval
while remaining > 0 and not stop_event.is_set():
step = min(0.5, remaining)
time.sleep(step)
remaining -= step
# ---------------------- Analysis helpers ----------------------
def summarize_ping(
events: List[PingEvent],
label: str,
) -> Tuple[pd.DataFrame, List[Tuple[dt.datetime, dt.datetime, int]]]:
df = pd.DataFrame(
{
"timestamp": [e.timestamp for e in events],
"status": [e.status for e in events],
"seq": [e.seq for e in events],
"rtt_ms": [e.rtt_ms for e in events],
}
)
valid_mask = df["status"].isin(["ok", "loss"])
df_valid = df[valid_mask]
if df_valid.empty:
console.print(f"[{label}] No ping data captured.")
return df, []
total = len(df_valid)
loss = int((df_valid["status"] == "loss").sum())
ok = total - loss
loss_pct = (loss / total) * 100.0 if total else 0.0
rtts = df_valid["rtt_ms"].dropna()
table = Table(
title=f"{label} ping summary",
box=box.HEAVY_HEAD,
)
table.add_column("Metric", justify="left")
table.add_column("Value", justify="right")
table.add_row("Total pings", str(total))
table.add_row("OK", str(ok))
table.add_row("Lost", str(loss))
table.add_row("Loss %", f"{loss_pct:.2f}%")
if not rtts.empty:
table.add_row(
"RTT min / avg / max",
f"{rtts.min():.2f} / {rtts.mean():.2f} / {rtts.max():.2f} ms",
)
console.print(table)
# Detect outages (>=3 consecutive lost pings)
valid_events = [e for e in events if e.status in ("ok", "loss")]
outages: List[Tuple[dt.datetime, dt.datetime, int]] = []
current_start: Optional[dt.datetime] = None
current_count = 0
for e in valid_events:
if e.status == "loss":
if current_start is None:
current_start = e.timestamp
current_count = 1
else:
current_count += 1
else:
if current_start is not None:
outages.append((current_start, e.timestamp, current_count))
current_start = None
current_count = 0
if current_start is not None:
outages.append((current_start, valid_events[-1].timestamp, current_count))
major = [o for o in outages if o[2] >= 3]
if major:
console.print(
f"[{label}] Sustained outages (>=3 consecutive losses): {len(major)}"
)
for start, end, count in major[:10]:
dur = (end - start).total_seconds()
console.print(
f" • {start.strftime('%F %T')} → {end.strftime('%T')} "
f"({dur:.1f}s, {count} lost)"
)
if len(major) > 10:
console.print(f" • ... and {len(major) - 10} more")
else:
console.print(f"[{label}] No sustained outages (>=3 consecutive losses).")
return df, major
def summarize_wifi(samples: List[WifiSample]) -> pd.DataFrame:
if not samples:
console.print("[Wi‑Fi] No Wi‑Fi samples captured.")
return pd.DataFrame(columns=["timestamp", "signal_dbm", "bitrate_mbps", "ssid"])
df = pd.DataFrame(
{
"timestamp": [s.timestamp for s in samples],
"signal_dbm": [s.signal_dbm for s in samples],
"bitrate_mbps": [s.bitrate_mbps for s in samples],
"ssid": [s.ssid for s in samples],
}
)
signals = df["signal_dbm"].dropna()
bitrates = df["bitrate_mbps"].dropna()
ssids = sorted({s for s in df["ssid"].dropna()})
table = Table(
title="Wi‑Fi link summary",
box=box.HEAVY_HEAD,
)
table.add_column("Metric", justify="left")
table.add_column("Value", justify="right")
table.add_row("Samples", str(len(df)))
if not signals.empty:
table.add_row(
"Signal min / avg / max",
f"{signals.min():.0f} / {signals.mean():.1f} / {signals.max():.0f} dBm",
)
if not bitrates.empty:
table.add_row(
"Bitrate min / avg / max",
f"{bitrates.min():.1f} / {bitrates.mean():.1f} / {bitrates.max():.1f} MBit/s",
)
if ssids:
table.add_row("SSIDs seen", ", ".join(ssids))
console.print(table)
# Disconnected periods (signal_dbm is None)
disconnected_periods: List[Tuple[dt.datetime, dt.datetime]] = []
prev_connected: Optional[bool] = None
current_start: Optional[dt.datetime] = None
for s in samples:
connected = s.signal_dbm is not None
if prev_connected is None:
prev_connected = connected
if not connected:
current_start = s.timestamp
continue
if connected != prev_connected:
if not connected:
current_start = s.timestamp
else:
if current_start:
disconnected_periods.append((current_start, s.timestamp))
current_start = None
prev_connected = connected
if current_start and (prev_connected is False):
disconnected_periods.append((current_start, samples[-1].timestamp))
if disconnected_periods:
console.print(
f"[Wi‑Fi] Disconnected periods detected: {len(disconnected_periods)}"
)
for start, end in disconnected_periods[:5]:
dur = (end - start).total_seconds()
console.print(
f" • {start.strftime('%F %T')} → {end.strftime('%T')} ({dur:.1f}s)"
)
if len(disconnected_periods) > 5:
console.print(f" • ... and {len(disconnected_periods) - 5} more")
return df
def classify_outages(
inet_outages: List[Tuple[dt.datetime, dt.datetime, int]],
router_outages: List[Tuple[dt.datetime, dt.datetime, int]],
) -> None:
if not inet_outages:
return
if not router_outages:
console.print(
"[yellow]No router ping outages to compare; cannot classify local vs upstream.[/yellow]"
)
return
local = 0
upstream = 0
for istart, iend, _ in inet_outages:
overlaps = False
for rstart, rend, _ in router_outages:
if rend >= istart and rstart <= iend:
overlaps = True
break
if overlaps:
local += 1
else:
upstream += 1
console.print(
"\n[bold]Classification[/bold] (based on sustained internet outages):\n"
f" • Likely local (Wi‑Fi/router) issues: {local}\n"
f" • Likely upstream (ISP/internet) issues: {upstream}"
)
def correlate_outages_with_wifi(
inet_outages: List[Tuple[dt.datetime, dt.datetime, int]],
wifi_samples: List[WifiSample],
) -> None:
if not inet_outages or not wifi_samples:
return
wifi_sorted = sorted(wifi_samples, key=lambda s: s.timestamp)
def nearest_wifi(ts: dt.datetime) -> WifiSample:
return min(
wifi_sorted,
key=lambda s: abs((s.timestamp - ts).total_seconds()),
)
console.print("\n[bold]Correlation[/bold] Internet outages vs Wi‑Fi near start:")
for start, end, count in inet_outages[:10]:
nearest = nearest_wifi(start)
dt_secs = abs((nearest.timestamp - start).total_seconds())
sig = f"{nearest.signal_dbm} dBm" if nearest.signal_dbm is not None else "N/A"
rate = (
f"{nearest.bitrate_mbps:.1f} MBit/s"
if nearest.bitrate_mbps is not None
else "N/A"
)
console.print(
f" • {start.strftime('%F %T')} "
f"(~{(end - start).total_seconds():.1f}s, {count} lost)"
f" → Wi‑Fi @ {nearest.timestamp.strftime('%T')} "
f"(Δt={dt_secs:.0f}s, signal={sig}, rate={rate})"
)
# ---------------------- Plotting ----------------------
def plot_ping_rtt(df: pd.DataFrame, label: str, plot_dir: Path) -> Optional[Path]:
df_ok = df[df["status"] == "ok"].copy()
if df_ok.empty or df_ok["rtt_ms"].dropna().empty:
return None
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(df_ok["timestamp"], df_ok["rtt_ms"], marker=".", linestyle="-", linewidth=0.8, markersize=2)
ax.set_title(f"{label} RTT over time")
ax.set_ylabel("RTT (ms)")
ax.set_xlabel("Time")
fig.autofmt_xdate()
fig.tight_layout()
filename = f"{label.lower()}_rtt.png".replace(" ", "_")
out_path = plot_dir / filename
fig.savefig(out_path)
plt.close(fig)
return out_path
def plot_wifi_series(
df: pd.DataFrame,
column: str,
title: str,
ylabel: str,
filename: str,
plot_dir: Path,
) -> Optional[Path]:
df_nonnull = df[df[column].notna()]
if df_nonnull.empty:
return None
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(df_nonnull["timestamp"], df_nonnull[column], linestyle="-", marker=".")
ax.set_title(title)
ax.set_ylabel(ylabel)
ax.set_xlabel("Time")
fig.autofmt_xdate()
fig.tight_layout()
out_path = plot_dir / filename
fig.savefig(out_path)
plt.close(fig)
return out_path
def generate_plots(
router_df: pd.DataFrame,
inet_df: pd.DataFrame,
wifi_df: pd.DataFrame,
plot_dir: Path,
) -> None:
plot_dir.mkdir(parents=True, exist_ok=True)
generated: List[Path] = []
if not inet_df.empty:
p = plot_ping_rtt(inet_df, "Internet", plot_dir)
if p:
generated.append(p)
if not router_df.empty:
p = plot_ping_rtt(router_df, "Router", plot_dir)
if p:
generated.append(p)
if not wifi_df.empty:
p = plot_wifi_series(
wifi_df,
"signal_dbm",
"Wi‑Fi signal strength",
"Signal (dBm)",
"wifi_signal.png",
plot_dir,
)
if p:
generated.append(p)
p = plot_wifi_series(
wifi_df,
"bitrate_mbps",
"Wi‑Fi bitrate",
"Bitrate (MBit/s)",
"wifi_bitrate.png",
plot_dir,
)
if p:
generated.append(p)
if generated:
console.print("\n[bold]Plots saved:[/bold]")
for path in generated:
console.print(f" • {path}")
else:
console.print("\n[bold]No plots generated (no data).[/bold]")
# ---------------------- Main orchestration ----------------------
def analyze_all(
router_monitor: Optional[PingMonitor],
inet_monitor: Optional[PingMonitor],
wifi_samples: List[WifiSample],
log_dir: Path,
make_plots: bool,
) -> None:
console.rule("[bold]Analysis summary[/bold]")
inet_outages: List[Tuple[dt.datetime, dt.datetime, int]] = []
router_outages: List[Tuple[dt.datetime, dt.datetime, int]] = []
inet_df = pd.DataFrame()
router_df = pd.DataFrame()
wifi_df = pd.DataFrame()
if inet_monitor and inet_monitor.events:
inet_df, inet_outages = summarize_ping(inet_monitor.events, "Internet")
else:
console.print("[Internet] No ping data.")
if router_monitor and router_monitor.events:
router_df, router_outages = summarize_ping(router_monitor.events, "Router")
else:
console.print("[Router] No ping data.")
console.print()
wifi_df = summarize_wifi(wifi_samples)
if inet_outages and router_outages:
classify_outages(inet_outages, router_outages)
if inet_outages and wifi_samples:
correlate_outages_with_wifi(inet_outages, wifi_samples)
if make_plots:
try:
generate_plots(router_df, inet_df, wifi_df, log_dir / "plots")
except Exception as e:
console.print(f"[red]Failed to generate plots: {e}[/red]")
console.print(f"\nRaw logs directory: [bold]{log_dir}[/bold]")
console.print(" - ping_router.log / ping_internet.log")
console.print(" - wifi_link.log")
console.print(" - journal_wifi.log (if available)")
console.print(" - plots/*.png (RTT & Wi‑Fi graphs)")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Wi‑Fi stability recorder & analyzer (with plots)"
)
p.add_argument(
"--router",
help="Router IP; if omitted, auto-detected from `ip route`.",
)
p.add_argument(
"--target",
default="8.8.8.8",
help="Internet ping target (default: 8.8.8.8).",
)
p.add_argument(
"--iface",
help="Wi‑Fi interface (e.g. wlp2s0); if omitted, auto-detected from `iw dev`.",
)
p.add_argument(
"--wifi-interval",
type=float,
default=10.0,
help="Seconds between Wi‑Fi samples (default: 10).",
)
p.add_argument(
"--no-plots",
dest="plots",
action="store_false",
help="Disable plot generation.",
)
p.set_defaults(plots=True)
return p.parse_args()
def main() -> None:
args = parse_args()
base_dir = Path.home() / "wifi-debug"
base_dir.mkdir(parents=True, exist_ok=True)
ts_str = dt.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = base_dir / ts_str
log_dir.mkdir(parents=True, exist_ok=True)
console.print(f"Log directory: [bold]{log_dir}[/bold]")
router_ip = args.router or detect_default_gateway()
if router_ip:
console.print(f"Router IP: [bold]{router_ip}[/bold]")
else:
console.print(
"[yellow]Router IP could not be auto-detected; router ping will be skipped.[/yellow]"
)
iface = args.iface or detect_wifi_iface()
if iface:
console.print(f"Wi‑Fi interface: [bold]{iface}[/bold]")
else:
console.print(
"[yellow]Wi‑Fi interface could not be auto-detected; Wi‑Fi sampling will be skipped.[/yellow]"
)
stop_event = threading.Event()
ping_monitors: List[PingMonitor] = []
router_monitor: Optional[PingMonitor] = None
inet_monitor: Optional[PingMonitor] = None
if router_ip:
router_monitor = PingMonitor("router", router_ip, log_dir / "ping_router.log")
router_monitor.start(stop_event)
ping_monitors.append(router_monitor)
inet_monitor = PingMonitor(
"internet", args.target, log_dir / "ping_internet.log"
)
inet_monitor.start(stop_event)
ping_monitors.append(inet_monitor)
wifi_samples: List[WifiSample] = []
wifi_thread: Optional[threading.Thread] = None
if iface:
wifi_thread = threading.Thread(
target=wifi_sampler,
args=(
stop_event,
iface,
log_dir / "wifi_link.log",
wifi_samples,
args.wifi_interval,
),
daemon=True,
)
wifi_thread.start()
journal_monitor = JournalMonitor(log_dir / "journal_wifi.log")
journal_monitor.start(stop_event)
console.print(
"\nRecording started. Press [bold]Ctrl+C[/bold] to stop and run analysis.\n"
)
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
console.print("\nStopping capture, please wait...")
stop_event.set()
for m in ping_monitors:
m.stop()
journal_monitor.stop()
if wifi_thread:
wifi_thread.join(timeout=2.0)
for m in ping_monitors:
m.join()
journal_monitor.join()
console.print("\nCapture stopped.\n")
analyze_all(router_monitor, inet_monitor, wifi_samples, log_dir, args.plots)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment