Skip to content

Instantly share code, notes, and snippets.

@lamhoangtung
Created December 4, 2025 15:00
Show Gist options
  • Select an option

  • Save lamhoangtung/434679c9f3b16b3b24e8f263ffaa40f1 to your computer and use it in GitHub Desktop.

Select an option

Save lamhoangtung/434679c9f3b16b3b24e8f263ffaa40f1 to your computer and use it in GitHub Desktop.
"""
NETWORK FORENSIC AUDIT TOOL
---------------------------
This script performs a network stress test while monitoring latency and packet loss
at the Local Gateway and ISP Gateway layers. It generates an interactive HTML report.
PREREQUISITES:
Please install the following dependencies before running:
pip install pandas plotly speedtest-cli
Usage:
python network_audit.py
"""
import sys
import time
import threading
import subprocess
import re
import platform
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta, timezone
import shutil
import os
# --- CẤU HÌNH (CONFIGURATION) ---
CYCLES = 5 # Số vòng test tải
WAIT_TIME = 10 # Thời gian nghỉ giữa các vòng (giây)
PING_INTERVAL = 0.2 # Tần suất Ping (giây)
# --- GLOBAL DATA STORE ---
data_log = []
load_zones = []
stop_event = threading.Event()
detected_targets = {}
def get_vietnam_time():
"""
Lấy thời gian hiện tại theo giờ Việt Nam (UTC+7).
Sử dụng datetime.now(timezone.utc) để tránh DeprecationWarning.
Trả về dạng Naive datetime (không chứa tzinfo) để dễ xử lý với Plotly.
"""
# Lấy giờ UTC chuẩn
utc_now = datetime.now(timezone.utc)
# Chuyển về naive time rồi cộng 7 tiếng
return utc_now.replace(tzinfo=None) + timedelta(hours=7)
# --- TỰ ĐỘNG DÒ TÌM MẠNG (TOPOLOGY DETECTION) ---
def get_ip_from_line(line):
match = re.search(r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})", line)
return match.group(1) if match else None
def detect_network_topology():
"""Chạy traceroute để tìm IP Gateway nội bộ và ISP Gateway"""
print("🕵️ Đang dò tìm hạ tầng mạng (Network Topology Detection)...")
target_host = "8.8.8.8"
system = platform.system()
if system == "Windows":
cmd = ["tracert", "-d", "-h", "3", target_host]
else:
cmd = ["traceroute", "-n", "-m", "3", target_host]
try:
if system == "Windows":
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='cp850', errors='replace')
else:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
lines = result.stdout.splitlines()
hops = []
for line in lines:
if re.search(r"^\s*\d+", line):
ip = get_ip_from_line(line)
if ip:
hops.append(ip)
local_ip = hops[0] if len(hops) > 0 else "192.168.1.1"
isp_ip = hops[1] if len(hops) > 1 else (hops[0] if hops else "8.8.8.8")
print(f" 📍 Hop 1 (Local Gateway): {local_ip}")
print(f" 📍 Hop 2 (ISP/Building): {isp_ip}")
return {"Local": local_ip, "ISP": isp_ip}
except Exception as e:
print(f"❌ Lỗi khi chạy traceroute: {e}")
return {"Local": "192.168.1.1", "ISP": "8.8.8.8"}
# --- WORKERS (TÁC VỤ NGẦM) ---
def run_stress_cycles():
"""Chạy quy trình test tải (Speedtest)"""
print(f"🕒 Bắt đầu quy trình Audit: {CYCLES} Cycles...")
for i in range(1, CYCLES + 1):
print(f"\n--- CYCLE {i}/{CYCLES}: Nghỉ ({WAIT_TIME}s) ---")
time.sleep(WAIT_TIME)
print(f"--- CYCLE {i}/{CYCLES}: 🚀 BẮT ĐẦU TẢI (Speedtest)...")
start_t = get_vietnam_time()
try:
# Chạy speedtest ẩn
subprocess.run(["speedtest-cli", "--no-upload"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception as e:
print(f"⚠️ Lỗi speedtest: {e}")
time.sleep(2)
end_t = get_vietnam_time()
load_zones.append((start_t, end_t))
print(f"--- CYCLE {i}/{CYCLES}: 🛑 Hoàn thành.")
print("\n✅ Hoàn tất kiểm tra. Đang xử lý dữ liệu...")
time.sleep(2)
stop_event.set()
def ping_worker(target, name):
"""Ping liên tục để đo Latency và Packet Loss"""
system = platform.system()
if system == "Windows":
cmd = ["ping", "-t", target]
pattern = re.compile(r"time[=<]([\d\.]+)ms")
else:
cmd = ["ping", "-i", str(PING_INTERVAL), target]
pattern = re.compile(r"icmp_seq=(\d+) .*time=([\d.]+)")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print(f"📡 Đang giám sát: {name} ({target})...")
last_seq = -1
while not stop_event.is_set():
line = process.stdout.readline()
if not line: break
timestamp = get_vietnam_time()
if system != "Windows":
match = pattern.search(line)
if match:
seq = int(match.group(1))
latency = float(match.group(2))
# Detect Loss (Gap Detection)
if last_seq != -1 and seq > last_seq + 1:
missing_count = seq - last_seq - 1
for k in range(missing_count):
t_loss = timestamp - timedelta(seconds=PING_INTERVAL * (missing_count - k))
data_log.append({"Time": t_loss, "Target": name, "Latency": None, "Loss": 1, "Seq": "MISSING"})
data_log.append({"Time": timestamp, "Target": name, "Latency": latency, "Loss": 0, "Seq": seq})
last_seq = seq
else:
if "time=" in line or "time<" in line:
match = pattern.search(line)
if match:
latency = float(match.group(1))
data_log.append({"Time": timestamp, "Target": name, "Latency": latency, "Loss": 0, "Seq": 0})
elif "Request timed out" in line or "unreachable" in line:
data_log.append({"Time": timestamp, "Target": name, "Latency": None, "Loss": 1, "Seq": 0})
process.terminate()
# --- TẠO BÁO CÁO (REPORT GENERATION) ---
def generate_html_report(df, targets):
print("📊 Đang tạo báo cáo Material Design (Blue Theme)...")
df_local = df[df["Target"] == "Local"]
df_isp = df[df["Target"] == "ISP"]
isp_loss_count = df_isp["Loss"].sum()
isp_total = len(df_isp)
isp_loss_rate = (isp_loss_count / isp_total * 100) if isp_total > 0 else 0
local_loss_rate = (df_local["Loss"].sum() / len(df_local) * 100) if len(df_local) > 0 else 0
# Tạo Subplots
fig = make_subplots(
rows=3, cols=1,
shared_xaxes=True,
vertical_spacing=0.06,
subplot_titles=(
f"1. Local Infrastructure ({targets['Local']})",
f"2. ISP Gateway ({targets['ISP']})",
"3. Packet Loss Events"
),
row_heights=[0.3, 0.3, 0.4]
)
# --- ROW 1: LOCAL ---
fig.add_trace(go.Scatter(
x=df_local["Time"], y=df_local["Latency"],
name="Local Latency", line=dict(color='#00C853', width=1.5), # Green
hovertemplate='%{y:.1f}ms',
legendgroup="group1"
), row=1, col=1)
# --- ROW 2: ISP ---
fig.add_trace(go.Scatter(
x=df_isp["Time"], y=df_isp["Latency"],
name="ISP Latency", line=dict(color='#1976D2', width=2), # Material Blue 700
hovertemplate='%{y:.1f}ms',
legendgroup="group2"
), row=2, col=1)
# --- ROW 3: LOSS EVENTS ---
isp_drops = df_isp[df_isp["Loss"] == 1]
fig.add_trace(go.Scatter(
x=isp_drops["Time"], y=[1]*len(isp_drops),
mode='markers', name="ISP DROP (Tail Drop)",
marker=dict(color='#D32F2F', symbol='x', size=12, line=dict(width=2)), # Red 700
hovertemplate='<b>ISP LOSS</b><br>%{x}<extra></extra>'
), row=3, col=1)
local_drops = df_local[df_local["Loss"] == 1]
fig.add_trace(go.Scatter(
x=local_drops["Time"], y=[2]*len(local_drops),
mode='markers', name="Local Drop",
marker=dict(color='#FFCA28', symbol='circle', size=8), # Amber
hovertemplate='<b>LOCAL LOSS</b><br>%{x}<extra></extra>'
), row=3, col=1)
# --- HIGHLIGHT LOAD ZONES ---
for start, end in load_zones:
fig.add_vrect(
x0=start, x1=end,
fillcolor="#FF9800", opacity=0.1, # Orange
layer="below", line_width=0,
annotation_text="LOAD", annotation_position="top left",
annotation_font_color="#F57C00"
)
# --- CẤU HÌNH INTERACTION (FIXED HOVER) ---
fig.update_layout(
title_text="Network Forensic Audit",
height=900,
hovermode="x unified", # Hộp thoại thống nhất
hoverdistance=-1, # Kích hoạt hover toàn trục X
spikedistance=-1, # Spike (thanh dọc) luôn hiển thị
template="plotly_white",
font=dict(family="Roboto, Arial, sans-serif", size=12),
margin=dict(l=60, r=40, t=80, b=40),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
)
fig.update_xaxes(
showspikes=True,
spikemode="across",
spikesnap="cursor",
showline=True,
showgrid=True,
gridcolor="#f0f0f0",
spikethickness=1,
spikecolor="#757575",
spikedash="solid",
matches='x'
)
fig.update_yaxes(gridcolor="#f0f0f0")
fig.update_yaxes(title_text="ms", row=1, col=1)
fig.update_yaxes(title_text="ms", row=2, col=1)
fig.update_yaxes(title_text="Device", row=3, col=1, tickvals=[1, 2], ticktext=["ISP", "LOCAL"], range=[0.5, 2.5])
plot_html = fig.to_html(full_html=False, include_plotlyjs='cdn')
# --- MATERIAL DESIGN REPORT TEMPLATE (BLUE THEME) ---
status_class = "status-fail" if isp_loss_rate > 1.0 else ("status-warn" if isp_loss_rate > 0 else "status-pass")
status_text = "CRITICAL FAIL" if isp_loss_rate > 1.0 else ("WARNING" if isp_loss_rate > 0 else "OPTIMAL")
current_time_str = get_vietnam_time().strftime("%d/%m/%Y %H:%M")
final_html = f"""
<!DOCTYPE html>
<html lang="vi">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Network Forensic Audit Report</title>
<link href="https://fonts.googleapis.com/css2?family=Roboto:wght@300;400;500;700&display=swap" rel="stylesheet">
<style>
:root {{
/* BLUE THEME */
--primary: #1976D2; /* Blue 700 */
--primary-variant: #0D47A1; /* Blue 900 */
--secondary: #03A9F4; /* Light Blue */
--background: #F5F5F5;
--surface: #FFFFFF;
--error: #D32F2F;
--text-primary: rgba(0, 0, 0, 0.87);
--text-secondary: rgba(0, 0, 0, 0.6);
}}
body {{ font-family: 'Roboto', sans-serif; background-color: var(--background); margin: 0; color: var(--text-primary); }}
.header {{
background-color: var(--primary);
color: white;
padding: 24px 0;
box-shadow: 0 4px 6px -1px rgba(0,0,0,0.1), 0 2px 4px -1px rgba(0,0,0,0.06);
}}
.header-content {{ max-width: 1200px; margin: 0 auto; padding: 0 24px; }}
h1 {{ margin: 0; font-weight: 500; letter-spacing: 0.5px; }}
.subtitle {{ opacity: 0.9; font-weight: 300; margin-top: 8px; font-size: 0.95rem; }}
.container {{ max-width: 1200px; margin: 24px auto; padding: 0 16px; display: grid; gap: 24px; }}
.card {{ background: var(--surface); border-radius: 8px; box-shadow: 0 1px 3px rgba(0,0,0,0.12), 0 1px 2px rgba(0,0,0,0.24); padding: 24px; }}
.card-title {{ color: var(--primary); font-size: 1.3rem; font-weight: 500; margin-top: 0; margin-bottom: 20px; display: flex; align-items: center; justify-content: space-between; border-bottom: 1px solid #eee; padding-bottom: 12px; }}
.chip {{ display: inline-flex; align-items: center; padding: 0 16px; height: 32px; border-radius: 16px; font-size: 0.875rem; font-weight: 500; color: white; letter-spacing: 0.5px; }}
.status-pass {{ background-color: #43A047; }} /* Green 600 */
.status-warn {{ background-color: #FB8C00; }} /* Orange 600 */
.status-fail {{ background-color: #D32F2F; }} /* Red 700 */
.metrics {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 24px; margin-bottom: 20px; }}
.metric-item {{ border-left: 4px solid #E0E0E0; padding-left: 16px; }}
.metric-label {{ display: block; color: var(--text-secondary); font-size: 0.75rem; text-transform: uppercase; letter-spacing: 1px; font-weight: 500; }}
.metric-value {{ font-size: 1.75rem; font-weight: 400; margin-top: 4px; color: #333; }}
ul {{ padding-left: 20px; line-height: 1.6; color: var(--text-primary); margin-top: 0; }}
li {{ margin-bottom: 8px; }}
strong {{ font-weight: 500; color: #000; }}
.hint-text {{ background-color: #E3F2FD; padding: 12px; border-radius: 4px; color: #0D47A1; font-size: 0.9rem; margin-bottom: 20px; display: flex; align-items: center; gap: 8px; }}
/* Fix Plotly Modebar */
.js-plotly-plot .plotly .modebar {{ left: 0; }}
</style>
</head>
<body>
<div class="header">
<div class="header-content">
<h1>Network Forensic Audit Report</h1>
<div class="subtitle">Ngày tạo: {current_time_str} (ICT)</div>
</div>
</div>
<div class="container">
<!-- Card 1: Executive Summary -->
<div class="card">
<div class="card-title">
<span>Tóm tắt Kết quả (Executive Summary)</span>
<span class="chip {status_class}">{status_text}</span>
</div>
<div class="metrics">
<div class="metric-item" style="border-left-color: #00C853;">
<span class="metric-label">Local ({targets['Local']})</span>
<div class="metric-value">{local_loss_rate:.2f}% <span style="font-size:1rem">Loss</span></div>
</div>
<div class="metric-item" style="border-left-color: #1976D2;">
<span class="metric-label">ISP Gateway ({targets['ISP']})</span>
<div class="metric-value">{isp_loss_rate:.2f}% <span style="font-size:1rem">Loss</span></div>
</div>
<div class="metric-item">
<span class="metric-label">Stress Cycles</span>
<div class="metric-value">{CYCLES} vòng</div>
</div>
</div>
<p style="color: var(--text-secondary);">
<strong>Đánh giá sơ bộ:</strong>
Hệ thống ghi nhận <strong>{isp_loss_count}</strong> gói tin bị mất tại ISP Node trong quá trình kiểm tra.
{'Hiện tượng Packet Loss xuất hiện đồng bộ với thời điểm tải cao (Load Zones), cho thấy dấu hiệu của nghẽn cổ chai vật lý hoặc lỗi phần cứng.' if isp_loss_rate > 0 else 'Mạng hoạt động ổn định, không có dấu hiệu rớt gói dưới tải cao.'}
</p>
</div>
<!-- Card 2: Interactive Visualization -->
<div class="card">
<div class="card-title">Biểu đồ Phân tích (Interactive Analysis)</div>
<div class="hint-text">
💡 <strong>Mẹo:</strong> Di chuột ngang qua biểu đồ để thấy thanh thước kẻ (Scan Line) hiển thị dữ liệu đồng bộ trên cả 3 lớp.
</div>
{plot_html}
</div>
<!-- Card 3: Hypothesis & Methodology -->
<div class="card">
<div class="card-title">1. Giả thuyết & Phương pháp (Methodology)</div>
<p>Bài kiểm tra sử dụng phương pháp <strong>A/B Isolation Testing</strong> để xác định vị trí lỗi:</p>
<ul>
<li><strong>Control Group (Local - Green):</strong> Ping liên tục tới {targets['Local']}.
<br><em>Mục đích:</em> Chứng minh mạng LAN và thiết bị đầu cuối hoạt động bình thường.</li>
<li><strong>Target Group (ISP - Blue):</strong> Ping liên tục tới {targets['ISP']}.
<br><em>Mục đích:</em> Phát hiện nghẽn hoặc rớt gói tại điểm truy cập đầu tiên của nhà mạng.</li>
<li><strong>Stress Testing:</strong> Thực hiện {CYCLES} vòng tải nặng (Download saturation) để ép bộ đệm (buffer) của thiết bị mạng phải hoạt động hết công suất.</li>
</ul>
</div>
<!-- Card 4: Recommendations -->
<div class="card">
<div class="card-title">2. Đề xuất Xử lý (Recommendations)</div>
<p>Dựa trên dữ liệu Forensics thu thập được:</p>
<ul>
<li><strong>Nếu Packet Loss > 0% tại ISP Gateway:</strong> Đây là lỗi <strong>Tail Drop</strong> do tràn bộ đệm (Buffer Overflow) trên Switch Access hoặc lỗi tín hiệu quang (Optical Signal).
<ul>
<li>Yêu cầu kỹ thuật viên kiểm tra suy hao quang (dBm).</li>
<li>Yêu cầu đổi port trên OLT hoặc chuyển đổi sang hạ tầng <strong>XGS-PON</strong> nếu khả dụng.</li>
</ul>
</li>
<li><strong>Nếu Latency tăng cao (Spikes) nhưng không Loss:</strong> Đây là hiện tượng <strong>Bufferbloat</strong>. Cần cấu hình QoS (Smart Queue Management) trên Router chính.</li>
<li><strong>Nếu Local cũng bị Loss:</strong> Kiểm tra lại dây cáp LAN, Switch nội bộ hoặc Card Wifi của máy tính.</li>
</ul>
</div>
</div>
</body>
</html>
"""
# Generate timestamped filename
file_ts = get_vietnam_time().strftime("%Y%m%d_%H%M%S")
filename = f"Network_Audit_Report_{file_ts}.html"
with open(filename, "w", encoding="utf-8") as f:
f.write(final_html)
print(f"\n✅ ĐÃ XUẤT BÁO CÁO: {os.path.abspath(filename)}")
# --- MAIN ---
if __name__ == "__main__":
if not shutil.which("speedtest-cli"):
print("❌ Lỗi: Cần cài đặt speedtest-cli. Chạy: pip install speedtest-cli")
sys.exit()
# 1. Dò tìm Topology
detected_targets = detect_network_topology()
# 2. Khởi tạo Ping Workers
t_local = threading.Thread(target=ping_worker, args=(detected_targets['Local'], "Local"))
t_isp = threading.Thread(target=ping_worker, args=(detected_targets['ISP'], "ISP"))
t_local.start()
t_isp.start()
# 3. Chạy Stress Test
try:
run_stress_cycles()
except KeyboardInterrupt:
print("\n⚠️ Người dùng hủy bỏ.")
stop_event.set()
# 4. Kết thúc
t_local.join()
t_isp.join()
# 5. Xuất báo cáo
if data_log:
df = pd.DataFrame(data_log)
# Lưu CSV backup cũng có timestamp
csv_name = f"network_data_{get_vietnam_time().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(csv_name, index=False)
generate_html_report(df, detected_targets)
else:
print("❌ Không có dữ liệu thu thập được.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment