Created
September 17, 2025 12:33
-
-
Save ankurpandeyvns/1b87e33968418ac68fd0b855f3af05ec to your computer and use it in GitHub Desktop.
Fixes Phantom WAN on VSOL Hathway
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| wan_fixer_daemon.py | |
| ------------------- | |
| A long-running daemon to monitor and fix WAN connectivity issues for a | |
| VSOL V2802RH router. | |
| """ | |
| import re | |
| import subprocess | |
| import sys | |
| import time | |
| from typing import Tuple, Dict, Any | |
| import os | |
| import requests | |
| import urllib3 | |
| # ───────────────────────────────────────────────────────── Configuration ── | |
| CONFIG: Dict[str, Any] = { | |
| # --- Main Timings --- | |
| "MONITOR_INTERVAL_S": 60, | |
| "THROTTLE_S": 5 * 60, | |
| "ARP_FLUSH_COOLDOWN_S": 120, | |
| "POST_REBOOT_WAIT_S": 60, | |
| # --- Router Details --- | |
| "ROUTER_URL": "http://192.168.1.58", | |
| "ROUTER_USER": "admin", | |
| "ROUTER_PASS": "password", | |
| "WAN_INDEX": 2, | |
| # --- Monitoring Details --- | |
| "PING_TARGET": "8.8.8.8", | |
| "PING_INTERFACE": "ppp0", | |
| # --- Logging --- | |
| "LOG_FILE": "/var/log/wan_fixer.log", | |
| "MAX_LOG_SIZE_KB": 250, | |
| "TRIM_LINE_COUNT": 500, | |
| } | |
| UA = ( | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0" | |
| ) | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| # ───────────────────────────────────────────────────── Logging & System ── | |
| def log_message(message: str): | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) | |
| formatted_message = f"{timestamp} | {message}" | |
| print(formatted_message) | |
| try: | |
| with open(CONFIG["LOG_FILE"], "a") as f: | |
| f.write(formatted_message + "\n") | |
| except Exception as e: | |
| print(f"{timestamp} | ERROR: Could not write to log file: {e}") | |
| def rotate_log_if_needed(): | |
| log_file = CONFIG["LOG_FILE"] | |
| if not os.path.exists(log_file): return | |
| try: | |
| if (os.path.getsize(log_file) / 1024) > CONFIG["MAX_LOG_SIZE_KB"]: | |
| log_message(f"Log > {CONFIG['MAX_LOG_SIZE_KB']}KB, trimming.") | |
| with open(log_file, "r") as f: lines = f.readlines() | |
| with open(log_file, "w") as f: f.writelines(lines[-CONFIG["TRIM_LINE_COUNT"]:]) | |
| except Exception as e: | |
| log_message(f"ERROR: Could not rotate log file: {e}") | |
| def check_connectivity() -> bool: | |
| try: | |
| subprocess.run( | |
| ["ping", "-I", CONFIG["PING_INTERFACE"], "-c2", "-W3", CONFIG["PING_TARGET"]], | |
| check=True, capture_output=True, | |
| ) | |
| return True | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| return False | |
| def flush_arp_cache(): | |
| log_message("Router unresponsive. Flushing system ARP cache...") | |
| try: | |
| subprocess.run(["sudo", "ip", "-s", "neigh", "flush", "all"], check=True, capture_output=True) | |
| log_message("✓ System ARP cache flushed.") | |
| except Exception as e: | |
| log_message(f"✗ ERROR: Failed to flush ARP cache: {e}") | |
| # ─────────────────────────────────────────────────────── Router Actions ── | |
| def login(session: requests.Session) -> bool: | |
| """Logs into the router. Can raise ConnectTimeout.""" | |
| log_message("Attempting to log in...") | |
| resp = session.post( | |
| f"{CONFIG['ROUTER_URL']}/login.cgi", | |
| data={"username": CONFIG["ROUTER_USER"], "password": CONFIG["ROUTER_PASS"], "submit.htm?login.htm": "0"}, | |
| timeout=10, | |
| ) | |
| resp.raise_for_status() | |
| if "window.location.href='index.htm'" in resp.text: | |
| log_message("✓ Login successful.") | |
| return True | |
| log_message("✗ Login failed: Router did not return the expected redirect.") | |
| return False | |
| def get_wan_status(session: requests.Session) -> str: | |
| log_message("Checking WAN connection status on router...") | |
| try: | |
| resp = session.get(f"{CONFIG['ROUTER_URL']}/status.htm", timeout=10) | |
| resp.raise_for_status() | |
| if "TR069" in resp.text: | |
| log_message("✓ WAN status is: TR069_PRESENT") | |
| return "TR069_PRESENT" | |
| status_text = "UNKNOWN" | |
| match = re.search(r'Status\s*</th>\s*<td[^>]*>([^<]+)</td>', resp.text, re.IGNORECASE) | |
| if match: | |
| status_text = match.group(1).strip().upper() | |
| if "NO INFORM" in status_text or "DOWN" in status_text: | |
| log_message(f"✓ WAN status on router page is: DOWN ({status_text})") | |
| return "DOWN" | |
| log_message(f"✓ WAN status is: {status_text}") | |
| return status_text | |
| except Exception as e: | |
| log_message(f"✗ ERROR checking WAN status: {e}") | |
| return "ERROR" | |
| def _extract_nonce(html: str, page: str) -> str: | |
| pattern = (r'<input[^>]+name=["\']submit\.htm\?' + re.escape(page) + r'["\'][^>]+'r'value=["\'](?P<val>[^"\']+)') | |
| m = re.search(pattern, html, flags=re.I) | |
| if not m: raise RuntimeError(f"Nonce not found on page {page}") | |
| return m.group('val') | |
| def toggle_led(session: requests.Session): | |
| log_message("Toggling router LED...") | |
| try: | |
| for state in [1, 0]: | |
| resp = session.get(f"{CONFIG['ROUTER_URL']}/led_onoff.htm", timeout=10) | |
| nonce = _extract_nonce(resp.text, "led_onoff.htm") | |
| payload = {"led": state, f"submit.htm?led_onoff.htm": nonce} | |
| session.post(f"{CONFIG['ROUTER_URL']}/form2Led.cgi", data=payload, timeout=10) | |
| log_message(f" ✓ LED {'Enabled' if state == 1 else 'Disabled'}.") | |
| time.sleep(1) | |
| log_message("✓ LED toggle complete.") | |
| except Exception as e: | |
| log_message(f"✗ ERROR toggling LED: {e}") | |
| def fix_tr069_entry(session: requests.Session): | |
| log_message("Phantom TR069 entry found. Deleting it...") | |
| try: | |
| resp = session.get(f"{CONFIG['ROUTER_URL']}/wan.htm", timeout=10) | |
| nonce = _extract_nonce(resp.text, "wan.htm") | |
| payload = {"action": 0, "idx": CONFIG['WAN_INDEX'], "connid": "", f"submit.htm?wan.htm": nonce} | |
| session.post(f"{CONFIG['ROUTER_URL']}/form2WanAdsl.cgi", data=payload, timeout=10) | |
| log_message(f"✓ WAN entry {CONFIG['WAN_INDEX']} removed.") | |
| toggle_led(session) | |
| except Exception as e: | |
| log_message(f"✗ ERROR deleting TR069 entry: {e}") | |
| def reboot_router(session: requests.Session): | |
| log_message("Rebooting router...") | |
| try: | |
| resp = session.get(f"{CONFIG['ROUTER_URL']}/reboot.htm", timeout=10) | |
| nonce = _extract_nonce(resp.text, "reboot.htm") | |
| payload = [("save", "Reboot"), (f"submit.htm?reboot.htm", nonce), (f"submit.htm?reboot.htm", nonce)] | |
| session.post(f"{CONFIG['ROUTER_URL']}/form2Reboot.cgi", data=payload, timeout=10) | |
| log_message("✓ Reboot command accepted by router.") | |
| except Exception as e: | |
| log_message(f"✗ ERROR sending reboot command: {e}") | |
| # ──────────────────────────────────────────────────────────── Main Daemon ── | |
| def main(): | |
| log_message("======== Starting WAN Fixer Daemon ========") | |
| last_fix_time = 0 | |
| session = requests.Session() | |
| session.headers.update({"User-Agent": UA, "Origin": CONFIG["ROUTER_URL"]}) | |
| while True: | |
| rotate_log_if_needed() | |
| if check_connectivity(): | |
| log_message("Internet is UP. Sleeping for normal interval.") | |
| time.sleep(CONFIG["MONITOR_INTERVAL_S"]) | |
| continue | |
| log_message("--- Internet is DOWN. Evaluating fix procedure. ---") | |
| try: | |
| # Throttle "logical" fix attempts, but not connection timeouts | |
| if time.time() - last_fix_time < CONFIG["THROTTLE_S"]: | |
| log_message(f"Throttled. Last full fix attempt was recent. Waiting...") | |
| time.sleep(CONFIG["MONITOR_INTERVAL_S"]) | |
| continue | |
| if not login(session): | |
| log_message("Login failed (bad credentials or unexpected page). Throttling next attempt.") | |
| last_fix_time = time.time() # Start throttle after a logical login fail | |
| continue | |
| # If login is successful, attempt a fix and then throttle the *next* attempt | |
| last_fix_time = time.time() | |
| wan_status = get_wan_status(session) | |
| if wan_status == "TR069_PRESENT": | |
| fix_tr069_entry(session) | |
| else: # Covers DOWN, UNKNOWN, ERROR, or even UP if internet is somehow down | |
| log_message(f"WAN status is '{wan_status}'. Rebooting as corrective measure.") | |
| reboot_router(session) | |
| log_message(f"Waiting {CONFIG['POST_REBOOT_WAIT_S']}s for router to reboot...") | |
| time.sleep(CONFIG['POST_REBOOT_WAIT_S']) | |
| log_message("--- Fix complete. Waiting for normal interval before re-checking. ---") | |
| except requests.exceptions.ConnectTimeout: | |
| # This is a special case: the router is totally unresponsive. | |
| # We do NOT update last_fix_time here, to avoid the 5-min throttle. | |
| flush_arp_cache() | |
| log_message(f"Router unresponsive. Entering special {CONFIG['ARP_FLUSH_COOLDOWN_S']}s cooldown.") | |
| time.sleep(CONFIG["ARP_FLUSH_COOLDOWN_S"]) | |
| # After cooldown, loop immediately to retry | |
| continue | |
| except Exception as e: | |
| log_message(f"An unhandled error occurred: {e}") | |
| last_fix_time = time.time() # Throttle after any other unexpected error | |
| time.sleep(CONFIG["MONITOR_INTERVAL_S"]) | |
| if __name__ == "__main__": | |
| lock_file_path = "/tmp/wan_fixer_daemon.lock" | |
| if os.path.exists(lock_file_path): | |
| print("ERROR: Lock file exists. Daemon may already be running.") | |
| sys.exit(1) | |
| try: | |
| with open(lock_file_path, "w") as f: f.write(str(os.getpid())) | |
| main() | |
| except KeyboardInterrupt: | |
| print("\nDaemon stopped by user.") | |
| finally: | |
| os.remove(lock_file_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment