Skip to content

Instantly share code, notes, and snippets.

@ankurpandeyvns
Created September 17, 2025 12:33
Show Gist options
  • Select an option

  • Save ankurpandeyvns/1b87e33968418ac68fd0b855f3af05ec to your computer and use it in GitHub Desktop.

Select an option

Save ankurpandeyvns/1b87e33968418ac68fd0b855f3af05ec to your computer and use it in GitHub Desktop.
Fixes Phantom WAN on VSOL Hathway
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
wan_fixer_daemon.py
-------------------
A long-running daemon to monitor and fix WAN connectivity issues for a
VSOL V2802RH router.
"""
import re
import subprocess
import sys
import time
from typing import Tuple, Dict, Any
import os
import requests
import urllib3
# ───────────────────────────────────────────────────────── Configuration ──
CONFIG: Dict[str, Any] = {
# --- Main Timings ---
"MONITOR_INTERVAL_S": 60,
"THROTTLE_S": 5 * 60,
"ARP_FLUSH_COOLDOWN_S": 120,
"POST_REBOOT_WAIT_S": 60,
# --- Router Details ---
"ROUTER_URL": "http://192.168.1.58",
"ROUTER_USER": "admin",
"ROUTER_PASS": "password",
"WAN_INDEX": 2,
# --- Monitoring Details ---
"PING_TARGET": "8.8.8.8",
"PING_INTERFACE": "ppp0",
# --- Logging ---
"LOG_FILE": "/var/log/wan_fixer.log",
"MAX_LOG_SIZE_KB": 250,
"TRIM_LINE_COUNT": 500,
}
UA = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0"
)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ───────────────────────────────────────────────────── Logging & System ──
def log_message(message: str):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
formatted_message = f"{timestamp} | {message}"
print(formatted_message)
try:
with open(CONFIG["LOG_FILE"], "a") as f:
f.write(formatted_message + "\n")
except Exception as e:
print(f"{timestamp} | ERROR: Could not write to log file: {e}")
def rotate_log_if_needed():
log_file = CONFIG["LOG_FILE"]
if not os.path.exists(log_file): return
try:
if (os.path.getsize(log_file) / 1024) > CONFIG["MAX_LOG_SIZE_KB"]:
log_message(f"Log > {CONFIG['MAX_LOG_SIZE_KB']}KB, trimming.")
with open(log_file, "r") as f: lines = f.readlines()
with open(log_file, "w") as f: f.writelines(lines[-CONFIG["TRIM_LINE_COUNT"]:])
except Exception as e:
log_message(f"ERROR: Could not rotate log file: {e}")
def check_connectivity() -> bool:
try:
subprocess.run(
["ping", "-I", CONFIG["PING_INTERFACE"], "-c2", "-W3", CONFIG["PING_TARGET"]],
check=True, capture_output=True,
)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def flush_arp_cache():
log_message("Router unresponsive. Flushing system ARP cache...")
try:
subprocess.run(["sudo", "ip", "-s", "neigh", "flush", "all"], check=True, capture_output=True)
log_message("✓ System ARP cache flushed.")
except Exception as e:
log_message(f"✗ ERROR: Failed to flush ARP cache: {e}")
# ─────────────────────────────────────────────────────── Router Actions ──
def login(session: requests.Session) -> bool:
"""Logs into the router. Can raise ConnectTimeout."""
log_message("Attempting to log in...")
resp = session.post(
f"{CONFIG['ROUTER_URL']}/login.cgi",
data={"username": CONFIG["ROUTER_USER"], "password": CONFIG["ROUTER_PASS"], "submit.htm?login.htm": "0"},
timeout=10,
)
resp.raise_for_status()
if "window.location.href='index.htm'" in resp.text:
log_message("✓ Login successful.")
return True
log_message("✗ Login failed: Router did not return the expected redirect.")
return False
def get_wan_status(session: requests.Session) -> str:
log_message("Checking WAN connection status on router...")
try:
resp = session.get(f"{CONFIG['ROUTER_URL']}/status.htm", timeout=10)
resp.raise_for_status()
if "TR069" in resp.text:
log_message("✓ WAN status is: TR069_PRESENT")
return "TR069_PRESENT"
status_text = "UNKNOWN"
match = re.search(r'Status\s*</th>\s*<td[^>]*>([^<]+)</td>', resp.text, re.IGNORECASE)
if match:
status_text = match.group(1).strip().upper()
if "NO INFORM" in status_text or "DOWN" in status_text:
log_message(f"✓ WAN status on router page is: DOWN ({status_text})")
return "DOWN"
log_message(f"✓ WAN status is: {status_text}")
return status_text
except Exception as e:
log_message(f"✗ ERROR checking WAN status: {e}")
return "ERROR"
def _extract_nonce(html: str, page: str) -> str:
pattern = (r'<input[^>]+name=["\']submit\.htm\?' + re.escape(page) + r'["\'][^>]+'r'value=["\'](?P<val>[^"\']+)')
m = re.search(pattern, html, flags=re.I)
if not m: raise RuntimeError(f"Nonce not found on page {page}")
return m.group('val')
def toggle_led(session: requests.Session):
log_message("Toggling router LED...")
try:
for state in [1, 0]:
resp = session.get(f"{CONFIG['ROUTER_URL']}/led_onoff.htm", timeout=10)
nonce = _extract_nonce(resp.text, "led_onoff.htm")
payload = {"led": state, f"submit.htm?led_onoff.htm": nonce}
session.post(f"{CONFIG['ROUTER_URL']}/form2Led.cgi", data=payload, timeout=10)
log_message(f" ✓ LED {'Enabled' if state == 1 else 'Disabled'}.")
time.sleep(1)
log_message("✓ LED toggle complete.")
except Exception as e:
log_message(f"✗ ERROR toggling LED: {e}")
def fix_tr069_entry(session: requests.Session):
log_message("Phantom TR069 entry found. Deleting it...")
try:
resp = session.get(f"{CONFIG['ROUTER_URL']}/wan.htm", timeout=10)
nonce = _extract_nonce(resp.text, "wan.htm")
payload = {"action": 0, "idx": CONFIG['WAN_INDEX'], "connid": "", f"submit.htm?wan.htm": nonce}
session.post(f"{CONFIG['ROUTER_URL']}/form2WanAdsl.cgi", data=payload, timeout=10)
log_message(f"✓ WAN entry {CONFIG['WAN_INDEX']} removed.")
toggle_led(session)
except Exception as e:
log_message(f"✗ ERROR deleting TR069 entry: {e}")
def reboot_router(session: requests.Session):
log_message("Rebooting router...")
try:
resp = session.get(f"{CONFIG['ROUTER_URL']}/reboot.htm", timeout=10)
nonce = _extract_nonce(resp.text, "reboot.htm")
payload = [("save", "Reboot"), (f"submit.htm?reboot.htm", nonce), (f"submit.htm?reboot.htm", nonce)]
session.post(f"{CONFIG['ROUTER_URL']}/form2Reboot.cgi", data=payload, timeout=10)
log_message("✓ Reboot command accepted by router.")
except Exception as e:
log_message(f"✗ ERROR sending reboot command: {e}")
# ──────────────────────────────────────────────────────────── Main Daemon ──
def main():
log_message("======== Starting WAN Fixer Daemon ========")
last_fix_time = 0
session = requests.Session()
session.headers.update({"User-Agent": UA, "Origin": CONFIG["ROUTER_URL"]})
while True:
rotate_log_if_needed()
if check_connectivity():
log_message("Internet is UP. Sleeping for normal interval.")
time.sleep(CONFIG["MONITOR_INTERVAL_S"])
continue
log_message("--- Internet is DOWN. Evaluating fix procedure. ---")
try:
# Throttle "logical" fix attempts, but not connection timeouts
if time.time() - last_fix_time < CONFIG["THROTTLE_S"]:
log_message(f"Throttled. Last full fix attempt was recent. Waiting...")
time.sleep(CONFIG["MONITOR_INTERVAL_S"])
continue
if not login(session):
log_message("Login failed (bad credentials or unexpected page). Throttling next attempt.")
last_fix_time = time.time() # Start throttle after a logical login fail
continue
# If login is successful, attempt a fix and then throttle the *next* attempt
last_fix_time = time.time()
wan_status = get_wan_status(session)
if wan_status == "TR069_PRESENT":
fix_tr069_entry(session)
else: # Covers DOWN, UNKNOWN, ERROR, or even UP if internet is somehow down
log_message(f"WAN status is '{wan_status}'. Rebooting as corrective measure.")
reboot_router(session)
log_message(f"Waiting {CONFIG['POST_REBOOT_WAIT_S']}s for router to reboot...")
time.sleep(CONFIG['POST_REBOOT_WAIT_S'])
log_message("--- Fix complete. Waiting for normal interval before re-checking. ---")
except requests.exceptions.ConnectTimeout:
# This is a special case: the router is totally unresponsive.
# We do NOT update last_fix_time here, to avoid the 5-min throttle.
flush_arp_cache()
log_message(f"Router unresponsive. Entering special {CONFIG['ARP_FLUSH_COOLDOWN_S']}s cooldown.")
time.sleep(CONFIG["ARP_FLUSH_COOLDOWN_S"])
# After cooldown, loop immediately to retry
continue
except Exception as e:
log_message(f"An unhandled error occurred: {e}")
last_fix_time = time.time() # Throttle after any other unexpected error
time.sleep(CONFIG["MONITOR_INTERVAL_S"])
if __name__ == "__main__":
lock_file_path = "/tmp/wan_fixer_daemon.lock"
if os.path.exists(lock_file_path):
print("ERROR: Lock file exists. Daemon may already be running.")
sys.exit(1)
try:
with open(lock_file_path, "w") as f: f.write(str(os.getpid()))
main()
except KeyboardInterrupt:
print("\nDaemon stopped by user.")
finally:
os.remove(lock_file_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment