Created
December 4, 2025 15:00
-
-
Save lamhoangtung/434679c9f3b16b3b24e8f263ffaa40f1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| NETWORK FORENSIC AUDIT TOOL | |
| --------------------------- | |
| This script performs a network stress test while monitoring latency and packet loss | |
| at the Local Gateway and ISP Gateway layers. It generates an interactive HTML report. | |
| PREREQUISITES: | |
| Please install the following dependencies before running: | |
| pip install pandas plotly speedtest-cli | |
| Usage: | |
| python network_audit.py | |
| """ | |
| import sys | |
| import time | |
| import threading | |
| import subprocess | |
| import re | |
| import platform | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| from datetime import datetime, timedelta, timezone | |
| import shutil | |
| import os | |
| # --- CẤU HÌNH (CONFIGURATION) --- | |
| CYCLES = 5 # Số vòng test tải | |
| WAIT_TIME = 10 # Thời gian nghỉ giữa các vòng (giây) | |
| PING_INTERVAL = 0.2 # Tần suất Ping (giây) | |
| # --- GLOBAL DATA STORE --- | |
| data_log = [] | |
| load_zones = [] | |
| stop_event = threading.Event() | |
| detected_targets = {} | |
| def get_vietnam_time(): | |
| """ | |
| Lấy thời gian hiện tại theo giờ Việt Nam (UTC+7). | |
| Sử dụng datetime.now(timezone.utc) để tránh DeprecationWarning. | |
| Trả về dạng Naive datetime (không chứa tzinfo) để dễ xử lý với Plotly. | |
| """ | |
| # Lấy giờ UTC chuẩn | |
| utc_now = datetime.now(timezone.utc) | |
| # Chuyển về naive time rồi cộng 7 tiếng | |
| return utc_now.replace(tzinfo=None) + timedelta(hours=7) | |
| # --- TỰ ĐỘNG DÒ TÌM MẠNG (TOPOLOGY DETECTION) --- | |
| def get_ip_from_line(line): | |
| match = re.search(r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})", line) | |
| return match.group(1) if match else None | |
| def detect_network_topology(): | |
| """Chạy traceroute để tìm IP Gateway nội bộ và ISP Gateway""" | |
| print("🕵️ Đang dò tìm hạ tầng mạng (Network Topology Detection)...") | |
| target_host = "8.8.8.8" | |
| system = platform.system() | |
| if system == "Windows": | |
| cmd = ["tracert", "-d", "-h", "3", target_host] | |
| else: | |
| cmd = ["traceroute", "-n", "-m", "3", target_host] | |
| try: | |
| if system == "Windows": | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='cp850', errors='replace') | |
| else: | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| lines = result.stdout.splitlines() | |
| hops = [] | |
| for line in lines: | |
| if re.search(r"^\s*\d+", line): | |
| ip = get_ip_from_line(line) | |
| if ip: | |
| hops.append(ip) | |
| local_ip = hops[0] if len(hops) > 0 else "192.168.1.1" | |
| isp_ip = hops[1] if len(hops) > 1 else (hops[0] if hops else "8.8.8.8") | |
| print(f" 📍 Hop 1 (Local Gateway): {local_ip}") | |
| print(f" 📍 Hop 2 (ISP/Building): {isp_ip}") | |
| return {"Local": local_ip, "ISP": isp_ip} | |
| except Exception as e: | |
| print(f"❌ Lỗi khi chạy traceroute: {e}") | |
| return {"Local": "192.168.1.1", "ISP": "8.8.8.8"} | |
| # --- WORKERS (TÁC VỤ NGẦM) --- | |
| def run_stress_cycles(): | |
| """Chạy quy trình test tải (Speedtest)""" | |
| print(f"🕒 Bắt đầu quy trình Audit: {CYCLES} Cycles...") | |
| for i in range(1, CYCLES + 1): | |
| print(f"\n--- CYCLE {i}/{CYCLES}: Nghỉ ({WAIT_TIME}s) ---") | |
| time.sleep(WAIT_TIME) | |
| print(f"--- CYCLE {i}/{CYCLES}: 🚀 BẮT ĐẦU TẢI (Speedtest)...") | |
| start_t = get_vietnam_time() | |
| try: | |
| # Chạy speedtest ẩn | |
| subprocess.run(["speedtest-cli", "--no-upload"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| except Exception as e: | |
| print(f"⚠️ Lỗi speedtest: {e}") | |
| time.sleep(2) | |
| end_t = get_vietnam_time() | |
| load_zones.append((start_t, end_t)) | |
| print(f"--- CYCLE {i}/{CYCLES}: 🛑 Hoàn thành.") | |
| print("\n✅ Hoàn tất kiểm tra. Đang xử lý dữ liệu...") | |
| time.sleep(2) | |
| stop_event.set() | |
| def ping_worker(target, name): | |
| """Ping liên tục để đo Latency và Packet Loss""" | |
| system = platform.system() | |
| if system == "Windows": | |
| cmd = ["ping", "-t", target] | |
| pattern = re.compile(r"time[=<]([\d\.]+)ms") | |
| else: | |
| cmd = ["ping", "-i", str(PING_INTERVAL), target] | |
| pattern = re.compile(r"icmp_seq=(\d+) .*time=([\d.]+)") | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| print(f"📡 Đang giám sát: {name} ({target})...") | |
| last_seq = -1 | |
| while not stop_event.is_set(): | |
| line = process.stdout.readline() | |
| if not line: break | |
| timestamp = get_vietnam_time() | |
| if system != "Windows": | |
| match = pattern.search(line) | |
| if match: | |
| seq = int(match.group(1)) | |
| latency = float(match.group(2)) | |
| # Detect Loss (Gap Detection) | |
| if last_seq != -1 and seq > last_seq + 1: | |
| missing_count = seq - last_seq - 1 | |
| for k in range(missing_count): | |
| t_loss = timestamp - timedelta(seconds=PING_INTERVAL * (missing_count - k)) | |
| data_log.append({"Time": t_loss, "Target": name, "Latency": None, "Loss": 1, "Seq": "MISSING"}) | |
| data_log.append({"Time": timestamp, "Target": name, "Latency": latency, "Loss": 0, "Seq": seq}) | |
| last_seq = seq | |
| else: | |
| if "time=" in line or "time<" in line: | |
| match = pattern.search(line) | |
| if match: | |
| latency = float(match.group(1)) | |
| data_log.append({"Time": timestamp, "Target": name, "Latency": latency, "Loss": 0, "Seq": 0}) | |
| elif "Request timed out" in line or "unreachable" in line: | |
| data_log.append({"Time": timestamp, "Target": name, "Latency": None, "Loss": 1, "Seq": 0}) | |
| process.terminate() | |
| # --- TẠO BÁO CÁO (REPORT GENERATION) --- | |
| def generate_html_report(df, targets): | |
| print("📊 Đang tạo báo cáo Material Design (Blue Theme)...") | |
| df_local = df[df["Target"] == "Local"] | |
| df_isp = df[df["Target"] == "ISP"] | |
| isp_loss_count = df_isp["Loss"].sum() | |
| isp_total = len(df_isp) | |
| isp_loss_rate = (isp_loss_count / isp_total * 100) if isp_total > 0 else 0 | |
| local_loss_rate = (df_local["Loss"].sum() / len(df_local) * 100) if len(df_local) > 0 else 0 | |
| # Tạo Subplots | |
| fig = make_subplots( | |
| rows=3, cols=1, | |
| shared_xaxes=True, | |
| vertical_spacing=0.06, | |
| subplot_titles=( | |
| f"1. Local Infrastructure ({targets['Local']})", | |
| f"2. ISP Gateway ({targets['ISP']})", | |
| "3. Packet Loss Events" | |
| ), | |
| row_heights=[0.3, 0.3, 0.4] | |
| ) | |
| # --- ROW 1: LOCAL --- | |
| fig.add_trace(go.Scatter( | |
| x=df_local["Time"], y=df_local["Latency"], | |
| name="Local Latency", line=dict(color='#00C853', width=1.5), # Green | |
| hovertemplate='%{y:.1f}ms', | |
| legendgroup="group1" | |
| ), row=1, col=1) | |
| # --- ROW 2: ISP --- | |
| fig.add_trace(go.Scatter( | |
| x=df_isp["Time"], y=df_isp["Latency"], | |
| name="ISP Latency", line=dict(color='#1976D2', width=2), # Material Blue 700 | |
| hovertemplate='%{y:.1f}ms', | |
| legendgroup="group2" | |
| ), row=2, col=1) | |
| # --- ROW 3: LOSS EVENTS --- | |
| isp_drops = df_isp[df_isp["Loss"] == 1] | |
| fig.add_trace(go.Scatter( | |
| x=isp_drops["Time"], y=[1]*len(isp_drops), | |
| mode='markers', name="ISP DROP (Tail Drop)", | |
| marker=dict(color='#D32F2F', symbol='x', size=12, line=dict(width=2)), # Red 700 | |
| hovertemplate='<b>ISP LOSS</b><br>%{x}<extra></extra>' | |
| ), row=3, col=1) | |
| local_drops = df_local[df_local["Loss"] == 1] | |
| fig.add_trace(go.Scatter( | |
| x=local_drops["Time"], y=[2]*len(local_drops), | |
| mode='markers', name="Local Drop", | |
| marker=dict(color='#FFCA28', symbol='circle', size=8), # Amber | |
| hovertemplate='<b>LOCAL LOSS</b><br>%{x}<extra></extra>' | |
| ), row=3, col=1) | |
| # --- HIGHLIGHT LOAD ZONES --- | |
| for start, end in load_zones: | |
| fig.add_vrect( | |
| x0=start, x1=end, | |
| fillcolor="#FF9800", opacity=0.1, # Orange | |
| layer="below", line_width=0, | |
| annotation_text="LOAD", annotation_position="top left", | |
| annotation_font_color="#F57C00" | |
| ) | |
| # --- CẤU HÌNH INTERACTION (FIXED HOVER) --- | |
| fig.update_layout( | |
| title_text="Network Forensic Audit", | |
| height=900, | |
| hovermode="x unified", # Hộp thoại thống nhất | |
| hoverdistance=-1, # Kích hoạt hover toàn trục X | |
| spikedistance=-1, # Spike (thanh dọc) luôn hiển thị | |
| template="plotly_white", | |
| font=dict(family="Roboto, Arial, sans-serif", size=12), | |
| margin=dict(l=60, r=40, t=80, b=40), | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1) | |
| ) | |
| fig.update_xaxes( | |
| showspikes=True, | |
| spikemode="across", | |
| spikesnap="cursor", | |
| showline=True, | |
| showgrid=True, | |
| gridcolor="#f0f0f0", | |
| spikethickness=1, | |
| spikecolor="#757575", | |
| spikedash="solid", | |
| matches='x' | |
| ) | |
| fig.update_yaxes(gridcolor="#f0f0f0") | |
| fig.update_yaxes(title_text="ms", row=1, col=1) | |
| fig.update_yaxes(title_text="ms", row=2, col=1) | |
| fig.update_yaxes(title_text="Device", row=3, col=1, tickvals=[1, 2], ticktext=["ISP", "LOCAL"], range=[0.5, 2.5]) | |
| plot_html = fig.to_html(full_html=False, include_plotlyjs='cdn') | |
| # --- MATERIAL DESIGN REPORT TEMPLATE (BLUE THEME) --- | |
| status_class = "status-fail" if isp_loss_rate > 1.0 else ("status-warn" if isp_loss_rate > 0 else "status-pass") | |
| status_text = "CRITICAL FAIL" if isp_loss_rate > 1.0 else ("WARNING" if isp_loss_rate > 0 else "OPTIMAL") | |
| current_time_str = get_vietnam_time().strftime("%d/%m/%Y %H:%M") | |
| final_html = f""" | |
| <!DOCTYPE html> | |
| <html lang="vi"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Network Forensic Audit Report</title> | |
| <link href="https://fonts.googleapis.com/css2?family=Roboto:wght@300;400;500;700&display=swap" rel="stylesheet"> | |
| <style> | |
| :root {{ | |
| /* BLUE THEME */ | |
| --primary: #1976D2; /* Blue 700 */ | |
| --primary-variant: #0D47A1; /* Blue 900 */ | |
| --secondary: #03A9F4; /* Light Blue */ | |
| --background: #F5F5F5; | |
| --surface: #FFFFFF; | |
| --error: #D32F2F; | |
| --text-primary: rgba(0, 0, 0, 0.87); | |
| --text-secondary: rgba(0, 0, 0, 0.6); | |
| }} | |
| body {{ font-family: 'Roboto', sans-serif; background-color: var(--background); margin: 0; color: var(--text-primary); }} | |
| .header {{ | |
| background-color: var(--primary); | |
| color: white; | |
| padding: 24px 0; | |
| box-shadow: 0 4px 6px -1px rgba(0,0,0,0.1), 0 2px 4px -1px rgba(0,0,0,0.06); | |
| }} | |
| .header-content {{ max-width: 1200px; margin: 0 auto; padding: 0 24px; }} | |
| h1 {{ margin: 0; font-weight: 500; letter-spacing: 0.5px; }} | |
| .subtitle {{ opacity: 0.9; font-weight: 300; margin-top: 8px; font-size: 0.95rem; }} | |
| .container {{ max-width: 1200px; margin: 24px auto; padding: 0 16px; display: grid; gap: 24px; }} | |
| .card {{ background: var(--surface); border-radius: 8px; box-shadow: 0 1px 3px rgba(0,0,0,0.12), 0 1px 2px rgba(0,0,0,0.24); padding: 24px; }} | |
| .card-title {{ color: var(--primary); font-size: 1.3rem; font-weight: 500; margin-top: 0; margin-bottom: 20px; display: flex; align-items: center; justify-content: space-between; border-bottom: 1px solid #eee; padding-bottom: 12px; }} | |
| .chip {{ display: inline-flex; align-items: center; padding: 0 16px; height: 32px; border-radius: 16px; font-size: 0.875rem; font-weight: 500; color: white; letter-spacing: 0.5px; }} | |
| .status-pass {{ background-color: #43A047; }} /* Green 600 */ | |
| .status-warn {{ background-color: #FB8C00; }} /* Orange 600 */ | |
| .status-fail {{ background-color: #D32F2F; }} /* Red 700 */ | |
| .metrics {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 24px; margin-bottom: 20px; }} | |
| .metric-item {{ border-left: 4px solid #E0E0E0; padding-left: 16px; }} | |
| .metric-label {{ display: block; color: var(--text-secondary); font-size: 0.75rem; text-transform: uppercase; letter-spacing: 1px; font-weight: 500; }} | |
| .metric-value {{ font-size: 1.75rem; font-weight: 400; margin-top: 4px; color: #333; }} | |
| ul {{ padding-left: 20px; line-height: 1.6; color: var(--text-primary); margin-top: 0; }} | |
| li {{ margin-bottom: 8px; }} | |
| strong {{ font-weight: 500; color: #000; }} | |
| .hint-text {{ background-color: #E3F2FD; padding: 12px; border-radius: 4px; color: #0D47A1; font-size: 0.9rem; margin-bottom: 20px; display: flex; align-items: center; gap: 8px; }} | |
| /* Fix Plotly Modebar */ | |
| .js-plotly-plot .plotly .modebar {{ left: 0; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="header"> | |
| <div class="header-content"> | |
| <h1>Network Forensic Audit Report</h1> | |
| <div class="subtitle">Ngày tạo: {current_time_str} (ICT)</div> | |
| </div> | |
| </div> | |
| <div class="container"> | |
| <!-- Card 1: Executive Summary --> | |
| <div class="card"> | |
| <div class="card-title"> | |
| <span>Tóm tắt Kết quả (Executive Summary)</span> | |
| <span class="chip {status_class}">{status_text}</span> | |
| </div> | |
| <div class="metrics"> | |
| <div class="metric-item" style="border-left-color: #00C853;"> | |
| <span class="metric-label">Local ({targets['Local']})</span> | |
| <div class="metric-value">{local_loss_rate:.2f}% <span style="font-size:1rem">Loss</span></div> | |
| </div> | |
| <div class="metric-item" style="border-left-color: #1976D2;"> | |
| <span class="metric-label">ISP Gateway ({targets['ISP']})</span> | |
| <div class="metric-value">{isp_loss_rate:.2f}% <span style="font-size:1rem">Loss</span></div> | |
| </div> | |
| <div class="metric-item"> | |
| <span class="metric-label">Stress Cycles</span> | |
| <div class="metric-value">{CYCLES} vòng</div> | |
| </div> | |
| </div> | |
| <p style="color: var(--text-secondary);"> | |
| <strong>Đánh giá sơ bộ:</strong> | |
| Hệ thống ghi nhận <strong>{isp_loss_count}</strong> gói tin bị mất tại ISP Node trong quá trình kiểm tra. | |
| {'Hiện tượng Packet Loss xuất hiện đồng bộ với thời điểm tải cao (Load Zones), cho thấy dấu hiệu của nghẽn cổ chai vật lý hoặc lỗi phần cứng.' if isp_loss_rate > 0 else 'Mạng hoạt động ổn định, không có dấu hiệu rớt gói dưới tải cao.'} | |
| </p> | |
| </div> | |
| <!-- Card 2: Interactive Visualization --> | |
| <div class="card"> | |
| <div class="card-title">Biểu đồ Phân tích (Interactive Analysis)</div> | |
| <div class="hint-text"> | |
| 💡 <strong>Mẹo:</strong> Di chuột ngang qua biểu đồ để thấy thanh thước kẻ (Scan Line) hiển thị dữ liệu đồng bộ trên cả 3 lớp. | |
| </div> | |
| {plot_html} | |
| </div> | |
| <!-- Card 3: Hypothesis & Methodology --> | |
| <div class="card"> | |
| <div class="card-title">1. Giả thuyết & Phương pháp (Methodology)</div> | |
| <p>Bài kiểm tra sử dụng phương pháp <strong>A/B Isolation Testing</strong> để xác định vị trí lỗi:</p> | |
| <ul> | |
| <li><strong>Control Group (Local - Green):</strong> Ping liên tục tới {targets['Local']}. | |
| <br><em>Mục đích:</em> Chứng minh mạng LAN và thiết bị đầu cuối hoạt động bình thường.</li> | |
| <li><strong>Target Group (ISP - Blue):</strong> Ping liên tục tới {targets['ISP']}. | |
| <br><em>Mục đích:</em> Phát hiện nghẽn hoặc rớt gói tại điểm truy cập đầu tiên của nhà mạng.</li> | |
| <li><strong>Stress Testing:</strong> Thực hiện {CYCLES} vòng tải nặng (Download saturation) để ép bộ đệm (buffer) của thiết bị mạng phải hoạt động hết công suất.</li> | |
| </ul> | |
| </div> | |
| <!-- Card 4: Recommendations --> | |
| <div class="card"> | |
| <div class="card-title">2. Đề xuất Xử lý (Recommendations)</div> | |
| <p>Dựa trên dữ liệu Forensics thu thập được:</p> | |
| <ul> | |
| <li><strong>Nếu Packet Loss > 0% tại ISP Gateway:</strong> Đây là lỗi <strong>Tail Drop</strong> do tràn bộ đệm (Buffer Overflow) trên Switch Access hoặc lỗi tín hiệu quang (Optical Signal). | |
| <ul> | |
| <li>Yêu cầu kỹ thuật viên kiểm tra suy hao quang (dBm).</li> | |
| <li>Yêu cầu đổi port trên OLT hoặc chuyển đổi sang hạ tầng <strong>XGS-PON</strong> nếu khả dụng.</li> | |
| </ul> | |
| </li> | |
| <li><strong>Nếu Latency tăng cao (Spikes) nhưng không Loss:</strong> Đây là hiện tượng <strong>Bufferbloat</strong>. Cần cấu hình QoS (Smart Queue Management) trên Router chính.</li> | |
| <li><strong>Nếu Local cũng bị Loss:</strong> Kiểm tra lại dây cáp LAN, Switch nội bộ hoặc Card Wifi của máy tính.</li> | |
| </ul> | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| # Generate timestamped filename | |
| file_ts = get_vietnam_time().strftime("%Y%m%d_%H%M%S") | |
| filename = f"Network_Audit_Report_{file_ts}.html" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| f.write(final_html) | |
| print(f"\n✅ ĐÃ XUẤT BÁO CÁO: {os.path.abspath(filename)}") | |
| # --- MAIN --- | |
| if __name__ == "__main__": | |
| if not shutil.which("speedtest-cli"): | |
| print("❌ Lỗi: Cần cài đặt speedtest-cli. Chạy: pip install speedtest-cli") | |
| sys.exit() | |
| # 1. Dò tìm Topology | |
| detected_targets = detect_network_topology() | |
| # 2. Khởi tạo Ping Workers | |
| t_local = threading.Thread(target=ping_worker, args=(detected_targets['Local'], "Local")) | |
| t_isp = threading.Thread(target=ping_worker, args=(detected_targets['ISP'], "ISP")) | |
| t_local.start() | |
| t_isp.start() | |
| # 3. Chạy Stress Test | |
| try: | |
| run_stress_cycles() | |
| except KeyboardInterrupt: | |
| print("\n⚠️ Người dùng hủy bỏ.") | |
| stop_event.set() | |
| # 4. Kết thúc | |
| t_local.join() | |
| t_isp.join() | |
| # 5. Xuất báo cáo | |
| if data_log: | |
| df = pd.DataFrame(data_log) | |
| # Lưu CSV backup cũng có timestamp | |
| csv_name = f"network_data_{get_vietnam_time().strftime('%Y%m%d_%H%M%S')}.csv" | |
| df.to_csv(csv_name, index=False) | |
| generate_html_report(df, detected_targets) | |
| else: | |
| print("❌ Không có dữ liệu thu thập được.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment