Skip to content

Instantly share code, notes, and snippets.

@tomasliubinas
Created December 3, 2025 07:38
Show Gist options
  • Select an option

  • Save tomasliubinas/d46171a0f4fa39e9f736a723a1fcb6ea to your computer and use it in GitHub Desktop.

Select an option

Save tomasliubinas/d46171a0f4fa39e9f736a723a1fcb6ea to your computer and use it in GitHub Desktop.
# Detects kernel-level UDP socket leaks by opening and closing sockets rapidly.
import socket
import subprocess
import re
import time
import sys
# Configuration
TARGET_IP = "8.8.8.8" # Google DNS
TARGET_PORT = 443 # QUIC port
BATCH_SIZE = 100 # Size of test batch
DELAY_BETWEEN_TESTS = 3
def get_udp_socket_count():
"""Parses 'netstat -s -p udp' to get the exact count of open UDP sockets."""
try:
result = subprocess.run(
["netstat", "-s", "-p", "udp"],
capture_output=True,
text=True
)
match = re.search(r'(\d+)\s+open UDP sockets', result.stdout)
if match:
return int(match.group(1))
return -1
except Exception as e:
print(f"Error running netstat: {e}")
return -1
def test_unconnected_udp():
"""Test 1: Standard UDP (sendto) - Should NOT leak."""
print(f"\n[Test 1] Running UNCONNECTED UDP test (socket.sendto)...")
sockets = []
for _ in range(BATCH_SIZE):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# No connect() here
s.sendto(b'x', (TARGET_IP, TARGET_PORT))
sockets.append(s)
except OSError:
pass
print(f" > Opened {len(sockets)} sockets.")
print(" > Closing all...")
for s in sockets:
s.close()
sockets = []
time.sleep(1) # Allow OS cleanup
def test_connected_udp():
"""Test 2: Connected UDP (connect + send) - The suspected QUIC behavior."""
print(f"\n[Test 2] Running CONNECTED UDP test (socket.connect + socket.send)...")
sockets = []
for _ in range(BATCH_SIZE):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# This is the "QUIC" behavior
s.connect((TARGET_IP, TARGET_PORT))
s.send(b'x')
sockets.append(s)
except OSError:
pass
print(f" > Opened {len(sockets)} sockets.")
print(" > Closing all...")
for s in sockets:
s.close()
sockets = []
time.sleep(1) # Allow OS cleanup
def main():
print("--- macOS UDP Leak Isolation Test ---")
# Baseline
baseline = get_udp_socket_count()
print(f"Baseline Sockets: {baseline}")
if baseline == -1: sys.exit(1)
# --- TEST 1: Unconnected ---
test_unconnected_udp()
after_test1 = get_udp_socket_count()
delta1 = after_test1 - baseline
print(f"Result Test 1 (Unconnected): {after_test1} sockets (Delta: {delta1})")
if delta1 > 10:
print(" [FAIL] Leak detected in Unconnected mode.")
else:
print(" [PASS] No significant leak in Unconnected mode.")
time.sleep(DELAY_BETWEEN_TESTS)
# Update baseline for Test 2
baseline_2 = get_udp_socket_count()
# --- TEST 2: Connected ---
test_connected_udp()
after_test2 = get_udp_socket_count()
delta2 = after_test2 - baseline_2
print(f"Result Test 2 (Connected): {after_test2} sockets (Delta: {delta2})")
if delta2 > 10:
print(" [FAIL] LEAK DETECTED in Connected mode.")
print(" !!! ISOLATION CONFIRMED: socket.connect() on UDP is the trigger. !!!")
else:
print(" [PASS] No significant leak in Connected mode.")
print("-" * 30)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment