Created
December 3, 2025 07:38
-
-
Save tomasliubinas/d46171a0f4fa39e9f736a723a1fcb6ea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Detects kernel-level UDP socket leaks by opening and closing sockets rapidly. | |
| import socket | |
| import subprocess | |
| import re | |
| import time | |
| import sys | |
| # Configuration | |
| TARGET_IP = "8.8.8.8" # Google DNS | |
| TARGET_PORT = 443 # QUIC port | |
| BATCH_SIZE = 100 # Size of test batch | |
| DELAY_BETWEEN_TESTS = 3 | |
| def get_udp_socket_count(): | |
| """Parses 'netstat -s -p udp' to get the exact count of open UDP sockets.""" | |
| try: | |
| result = subprocess.run( | |
| ["netstat", "-s", "-p", "udp"], | |
| capture_output=True, | |
| text=True | |
| ) | |
| match = re.search(r'(\d+)\s+open UDP sockets', result.stdout) | |
| if match: | |
| return int(match.group(1)) | |
| return -1 | |
| except Exception as e: | |
| print(f"Error running netstat: {e}") | |
| return -1 | |
| def test_unconnected_udp(): | |
| """Test 1: Standard UDP (sendto) - Should NOT leak.""" | |
| print(f"\n[Test 1] Running UNCONNECTED UDP test (socket.sendto)...") | |
| sockets = [] | |
| for _ in range(BATCH_SIZE): | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| # No connect() here | |
| s.sendto(b'x', (TARGET_IP, TARGET_PORT)) | |
| sockets.append(s) | |
| except OSError: | |
| pass | |
| print(f" > Opened {len(sockets)} sockets.") | |
| print(" > Closing all...") | |
| for s in sockets: | |
| s.close() | |
| sockets = [] | |
| time.sleep(1) # Allow OS cleanup | |
| def test_connected_udp(): | |
| """Test 2: Connected UDP (connect + send) - The suspected QUIC behavior.""" | |
| print(f"\n[Test 2] Running CONNECTED UDP test (socket.connect + socket.send)...") | |
| sockets = [] | |
| for _ in range(BATCH_SIZE): | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| # This is the "QUIC" behavior | |
| s.connect((TARGET_IP, TARGET_PORT)) | |
| s.send(b'x') | |
| sockets.append(s) | |
| except OSError: | |
| pass | |
| print(f" > Opened {len(sockets)} sockets.") | |
| print(" > Closing all...") | |
| for s in sockets: | |
| s.close() | |
| sockets = [] | |
| time.sleep(1) # Allow OS cleanup | |
| def main(): | |
| print("--- macOS UDP Leak Isolation Test ---") | |
| # Baseline | |
| baseline = get_udp_socket_count() | |
| print(f"Baseline Sockets: {baseline}") | |
| if baseline == -1: sys.exit(1) | |
| # --- TEST 1: Unconnected --- | |
| test_unconnected_udp() | |
| after_test1 = get_udp_socket_count() | |
| delta1 = after_test1 - baseline | |
| print(f"Result Test 1 (Unconnected): {after_test1} sockets (Delta: {delta1})") | |
| if delta1 > 10: | |
| print(" [FAIL] Leak detected in Unconnected mode.") | |
| else: | |
| print(" [PASS] No significant leak in Unconnected mode.") | |
| time.sleep(DELAY_BETWEEN_TESTS) | |
| # Update baseline for Test 2 | |
| baseline_2 = get_udp_socket_count() | |
| # --- TEST 2: Connected --- | |
| test_connected_udp() | |
| after_test2 = get_udp_socket_count() | |
| delta2 = after_test2 - baseline_2 | |
| print(f"Result Test 2 (Connected): {after_test2} sockets (Delta: {delta2})") | |
| if delta2 > 10: | |
| print(" [FAIL] LEAK DETECTED in Connected mode.") | |
| print(" !!! ISOLATION CONFIRMED: socket.connect() on UDP is the trigger. !!!") | |
| else: | |
| print(" [PASS] No significant leak in Connected mode.") | |
| print("-" * 30) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment