Skip to content

Instantly share code, notes, and snippets.

@ShrootBuck
Created January 25, 2026 00:07
Show Gist options
  • Select an option

  • Save ShrootBuck/1d7ef1c73e5b1dd8e70944fa72b194a6 to your computer and use it in GitHub Desktop.

Select an option

Save ShrootBuck/1d7ef1c73e5b1dd8e70944fa72b194a6 to your computer and use it in GitHub Desktop.
A resilient network surveillance script designed to audit residential ISP uptime (Xfinity is lying to me)
import hashlib
import json
import os
import sqlite3
import subprocess
import sys
import time
from datetime import datetime
import psycopg2
SERVER_URL = ""
SECRET_KEY = ""
CLOUD_DB_CONN = ""
LOCAL_DB_FILE = "monitor_buffer.sqlite"
BATCH_TRIGGER = 420
def init_local_db():
try:
conn = sqlite3.connect(LOCAL_DB_FILE)
c = conn.cursor()
c.execute("""CREATE TABLE IF NOT EXISTS buffer (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
latency_ms REAL,
dns_time REAL,
tcp_time REAL,
ttfb_time REAL,
packet_loss INTEGER,
error_msg TEXT
)""")
conn.commit()
conn.close()
except Exception as e:
print(f"FATAL: Could not init local DB: {e}")
sys.exit(1)
def get_network_metrics():
dummy = str(time.time())
target = f"{SERVER_URL}?_={dummy}"
curl_fmt = (
'{"dns": %{time_namelookup}, '
'"tcp_connect": %{time_connect}, '
'"ssl_handshake": %{time_appconnect}, '
'"ttfb": %{time_starttransfer}, '
'"total": %{time_total}, '
'"http_code": %{http_code}}'
)
cmd = ["curl", "-s", "-w", curl_fmt, "-o", "-", target]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
raw_output = result.stdout
try:
split_index = raw_output.rfind("}{") + 1
body_json = raw_output[:split_index]
timing_json = raw_output[split_index:]
payload = json.loads(body_json)
metrics = json.loads(timing_json)
return payload, metrics, None
except:
return None, None, f"Parse/HTTP Error. Raw len: {len(raw_output)}"
except subprocess.TimeoutExpired:
return None, None, "TIMEOUT"
except Exception as e:
return None, None, str(e)
def verify_integrity(payload):
t_server = payload.get("t")
h_server = payload.get("h")
if not t_server or not h_server:
return False, "Missing fields"
check_str = f"{SECRET_KEY}{t_server}"
h_local = hashlib.sha256(check_str.encode()).hexdigest()
if h_local != h_server:
return False, f"Hash Expect: {h_server} | Got: {h_local}"
return True, "OK"
def buffer_data(metrics, error=None):
try:
conn = sqlite3.connect(LOCAL_DB_FILE)
c = conn.cursor()
timestamp = time.time()
if error:
c.execute(
"""
INSERT INTO buffer (timestamp, packet_loss, error_msg)
VALUES (?, 1, ?)
""",
(timestamp, error),
)
else:
c.execute(
"""
INSERT INTO buffer (timestamp, latency_ms, dns_time, tcp_time, ttfb_time, packet_loss)
VALUES (?, ?, ?, ?, ?, 0)
""",
(
timestamp,
metrics["total"] * 1000,
metrics["dns"] * 1000,
metrics["tcp_connect"] * 1000,
metrics["ttfb"] * 1000,
),
)
conn.commit()
c.execute("SELECT COUNT(*) FROM buffer")
count = c.fetchone()[0]
conn.close()
return count
except Exception as e:
print(f"CRITICAL: Local Disk Write Failed: {e}")
return 0
def flush_buffer_to_cloud():
local_conn = sqlite3.connect(LOCAL_DB_FILE)
local_cur = local_conn.cursor()
local_cur.execute("SELECT * FROM buffer ORDER BY timestamp ASC")
rows = local_cur.fetchall()
if not rows:
local_conn.close()
return
print(f"[*] Batch Limit Reached. Flushing {len(rows)} records...")
try:
cloud_conn = psycopg2.connect(CLOUD_DB_CONN, connect_timeout=15)
cloud_cur = cloud_conn.cursor()
sql_success = """
INSERT INTO monitor_log
(timestamp, latency_ms, dns_time, tcp_time, ttfb_time, packet_loss)
VALUES (to_timestamp(%s), %s, %s, %s, %s, FALSE)
"""
sql_fail = """
INSERT INTO monitor_log
(timestamp, packet_loss, error_msg)
VALUES (to_timestamp(%s), TRUE, %s)
"""
ids_to_delete = []
for row in rows:
if len(row) == 8:
row_id, ts, lat, dns, tcp, ttfb, loss, error = row
else:
print("[!] Skipping row with invalid column count")
continue
if loss:
cloud_cur.execute(sql_fail, (ts, error))
else:
cloud_cur.execute(sql_success, (ts, lat, dns, tcp, ttfb))
ids_to_delete.append(row_id)
cloud_conn.commit()
if ids_to_delete:
placeholders = ",".join("?" * len(ids_to_delete))
local_cur.execute(
f"DELETE FROM buffer WHERE id IN ({placeholders})", ids_to_delete
)
local_conn.commit()
print(f"[*] Upload Complete. Buffer cleared.")
cloud_conn.close()
except psycopg2.OperationalError as e:
print(f"[!] Upload Failed (Network/Auth): {e}")
except Exception as e:
print(f"[!] Upload Error: {e}")
finally:
local_conn.close()
if __name__ == "__main__":
print(f"Initializing Watchdog...")
init_local_db()
while True:
payload, metrics, error = get_network_metrics()
if not error:
is_valid, msg = verify_integrity(payload)
if not is_valid:
error = f"INTEGRITY FAILURE: {msg}"
buffered_count = buffer_data(metrics, error)
if error:
print(f"[!] {datetime.now()} - FAIL: {error} (Buffer: {buffered_count})")
else:
print(
f"[*] {datetime.now()} - TCP: {metrics['tcp_connect'] * 1000:.2f}ms (Buffer: {buffered_count}/{BATCH_TRIGGER})"
)
if buffered_count >= BATCH_TRIGGER:
flush_buffer_to_cloud()
time.sleep(60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment