Created
October 27, 2025 12:07
-
-
Save adrianlzt/f6285f7b6c79e5180b0d8b3bffb1d45e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [] | |
| # [tool.uv] | |
| # exclude-newer = "2025-06-03T00:00:00Z" | |
| # /// | |
| """ | |
| Monitor TIME-WAIT connections and detect threshold violations. | |
| This script identifies potential connection tuple exhaustion by monitoring: | |
| - For listening ports: local_ip:local_port:remote_ip combinations | |
| - For outgoing connections: local_ip:remote_ip:remote_port combinations | |
| The problem occurs when the same quadruplet (src_ip, src_port, dst_ip, dst_port) | |
| is reused too frequently, potentially exhausting available ephemeral ports. | |
| """ | |
| import argparse | |
| import sys | |
| from collections import defaultdict | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple | |
| class ConnectionStats: | |
| """Statistics for a specific connection pattern.""" | |
| def __init__(self, pattern: str): | |
| self.pattern = pattern | |
| self.count = 0 | |
| self.connections = [] # List of full quadruplets for reference | |
| def add_connection(self, local_addr: str, local_port: str, | |
| remote_addr: str, remote_port: str): | |
| self.count += 1 | |
| self.connections.append(f"{local_addr}:{local_port} -> {remote_addr}:{remote_port}") | |
| def get_ephemeral_port_range() -> Tuple[int, int]: | |
| """Read the ephemeral port range from /proc.""" | |
| try: | |
| with open('/proc/sys/net/ipv4/ip_local_port_range', 'r') as f: | |
| parts = f.read().strip().split() | |
| return int(parts[0]), int(parts[1]) | |
| except (IOError, ValueError) as e: | |
| print(f"Warning: Could not read ephemeral port range: {e}", file=sys.stderr) | |
| return 32768, 60999 # Default range | |
| def get_timewait_duration() -> int: | |
| """Read the TIME-WAIT (FIN-WAIT-2) timeout from /proc in seconds.""" | |
| try: | |
| with open('/proc/sys/net/ipv4/tcp_fin_timeout', 'r') as f: | |
| return int(f.read().strip()) | |
| except (IOError, ValueError) as e: | |
| print(f"Warning: Could not read tcp_fin_timeout: {e}", file=sys.stderr) | |
| return 60 # Default 60 seconds | |
| def is_ephemeral_port(port: int, port_range: Tuple[int, int]) -> bool: | |
| """Check if a port is in the ephemeral range.""" | |
| return port_range[0] <= port <= port_range[1] | |
| def parse_proc_net_tcp(filename: str) -> List[Dict[str, str]]: | |
| """Parse /proc/net/tcp or /proc/net/tcp6 file.""" | |
| connections = [] | |
| try: | |
| with open(filename, 'r') as f: | |
| # Skip header line | |
| next(f) | |
| for line in f: | |
| parts = line.split() | |
| if len(parts) < 4: | |
| continue | |
| # Parse state (column 3, 0-indexed) | |
| state = int(parts[3], 16) | |
| # Only process TIME-WAIT connections (state 06) | |
| # TCP states: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h | |
| # TCP_TIME_WAIT = 6 | |
| if state != 6: | |
| continue | |
| # Parse local address (column 1) | |
| local_addr, local_port = parts[1].split(':') | |
| local_ip = parse_hex_ip(local_addr) | |
| local_port_int = int(local_port, 16) | |
| # Parse remote address (column 2) | |
| remote_addr, remote_port = parts[2].split(':') | |
| remote_ip = parse_hex_ip(remote_addr) | |
| remote_port_int = int(remote_port, 16) | |
| connections.append({ | |
| 'local_ip': local_ip, | |
| 'local_port': str(local_port_int), | |
| 'remote_ip': remote_ip, | |
| 'remote_port': str(remote_port_int), | |
| }) | |
| except IOError as e: | |
| print(f"Warning: Could not read {filename}: {e}", file=sys.stderr) | |
| return connections | |
| def parse_hex_ip(hex_str: str) -> str: | |
| """Convert hex IP address to dotted decimal notation (IPv4) or standard IPv6.""" | |
| if len(hex_str) == 8: # IPv4 | |
| # IP is in little-endian format | |
| octets = [] | |
| for i in range(0, 8, 2): | |
| octets.append(str(int(hex_str[i:i+2], 16))) | |
| # Reverse because of little-endian | |
| return '.'.join(reversed(octets)) | |
| else: # IPv6 | |
| # IPv6 is in little-endian 32-bit chunks | |
| # Convert to standard notation | |
| parts = [] | |
| for i in range(0, len(hex_str), 8): | |
| chunk = hex_str[i:i+8] | |
| # Reverse byte order within each 32-bit chunk | |
| reversed_chunk = ''.join([chunk[j:j+2] for j in range(6, -1, -2)]) | |
| parts.append(reversed_chunk) | |
| ipv6_str = ''.join(parts) | |
| # Format as standard IPv6 | |
| formatted = ':'.join([ipv6_str[i:i+4] for i in range(0, len(ipv6_str), 4)]) | |
| return formatted | |
| def get_listening_ports() -> set: | |
| """Get all listening ports from /proc/net/tcp and /proc/net/tcp6.""" | |
| listening_ports = set() | |
| for filename in ['/proc/net/tcp', '/proc/net/tcp6']: | |
| try: | |
| with open(filename, 'r') as f: | |
| next(f) # Skip header | |
| for line in f: | |
| parts = line.split() | |
| if len(parts) < 4: | |
| continue | |
| # State 0A is TCP_LISTEN | |
| state = int(parts[3], 16) | |
| if state == 10: # TCP_LISTEN = 10 (0x0A) | |
| local_addr, local_port = parts[1].split(':') | |
| local_port_int = int(local_port, 16) | |
| listening_ports.add(local_port_int) | |
| except IOError: | |
| pass | |
| return listening_ports | |
| def analyze_connections(threshold_percentage: float, show_details: bool = False): | |
| """Analyze TIME-WAIT connections and report threshold violations.""" | |
| # Get ephemeral port range | |
| port_range = get_ephemeral_port_range() | |
| min_port, max_port = port_range | |
| available_ports = max_port - min_port + 1 | |
| # Get TIME-WAIT duration | |
| timewait_duration = get_timewait_duration() | |
| # Calculate absolute threshold | |
| threshold = int(available_ports * threshold_percentage / 100) | |
| print(f"Ephemeral port range: {min_port}-{max_port} ({available_ports} ports)") | |
| print(f"TIME-WAIT duration: {timewait_duration} seconds") | |
| print(f"Threshold: {threshold_percentage}% = {threshold} connections per pattern") | |
| print() | |
| # Get listening ports | |
| listening_ports = get_listening_ports() | |
| # Collect all TIME-WAIT connections | |
| all_connections = [] | |
| all_connections.extend(parse_proc_net_tcp('/proc/net/tcp')) | |
| all_connections.extend(parse_proc_net_tcp('/proc/net/tcp6')) | |
| if not all_connections: | |
| print("No TIME-WAIT connections found.") | |
| return | |
| print(f"Total TIME-WAIT connections: {len(all_connections)}") | |
| print() | |
| # Categorize connections | |
| # For listening ports: local_ip:local_port:remote_ip | |
| # For outgoing: local_ip:remote_ip:remote_port | |
| listening_patterns = defaultdict(lambda: ConnectionStats("")) | |
| outgoing_patterns = defaultdict(lambda: ConnectionStats("")) | |
| for conn in all_connections: | |
| local_port_int = int(conn['local_port']) | |
| remote_port_int = int(conn['remote_port']) | |
| # Determine if this is an incoming connection (to a listening port) | |
| # or an outgoing connection (from an ephemeral port) | |
| if local_port_int in listening_ports: | |
| # Incoming connection: pattern is local_ip:local_port:remote_ip | |
| pattern = f"{conn['local_ip']}:{conn['local_port']}:{conn['remote_ip']}" | |
| if pattern not in listening_patterns: | |
| listening_patterns[pattern] = ConnectionStats(pattern) | |
| listening_patterns[pattern].add_connection( | |
| conn['local_ip'], conn['local_port'], | |
| conn['remote_ip'], conn['remote_port'] | |
| ) | |
| elif is_ephemeral_port(local_port_int, port_range): | |
| # Outgoing connection: pattern is local_ip:remote_ip:remote_port | |
| pattern = f"{conn['local_ip']}:{conn['remote_ip']}:{conn['remote_port']}" | |
| if pattern not in outgoing_patterns: | |
| outgoing_patterns[pattern] = ConnectionStats(pattern) | |
| outgoing_patterns[pattern].add_connection( | |
| conn['local_ip'], conn['local_port'], | |
| conn['remote_ip'], conn['remote_port'] | |
| ) | |
| # Report violations and top 5 for each category | |
| print("=" * 80) | |
| print("INCOMING CONNECTIONS (to listening ports)") | |
| print("Pattern: local_ip:local_port:remote_ip") | |
| print("=" * 80) | |
| violations = [stat for stat in listening_patterns.values() if stat.count >= threshold] | |
| if violations: | |
| print(f"\n⚠ THRESHOLD VIOLATIONS ({len(violations)} patterns):\n") | |
| for stat in sorted(violations, key=lambda x: x.count, reverse=True): | |
| print(f" {stat.pattern}: {stat.count} connections") | |
| if show_details and stat.count <= 10: | |
| for conn in stat.connections[:10]: | |
| print(f" {conn}") | |
| else: | |
| print(f"\n✓ No patterns exceed threshold of {threshold}") | |
| # Show top 5 | |
| top5_listening = sorted(listening_patterns.values(), key=lambda x: x.count, reverse=True)[:5] | |
| if top5_listening: | |
| print(f"\nTop 5 patterns by connection count:") | |
| for i, stat in enumerate(top5_listening, 1): | |
| percentage = (stat.count / len(all_connections)) * 100 | |
| print(f" {i}. {stat.pattern}: {stat.count} ({percentage:.1f}%)") | |
| print("\n" + "=" * 80) | |
| print("OUTGOING CONNECTIONS (from ephemeral ports)") | |
| print("Pattern: local_ip:remote_ip:remote_port") | |
| print("=" * 80) | |
| violations = [stat for stat in outgoing_patterns.values() if stat.count >= threshold] | |
| if violations: | |
| print(f"\n⚠ THRESHOLD VIOLATIONS ({len(violations)} patterns):\n") | |
| for stat in sorted(violations, key=lambda x: x.count, reverse=True): | |
| port_usage = (stat.count / available_ports) * 100 | |
| print(f" {stat.pattern}: {stat.count} connections ({port_usage:.1f}% of available ports)") | |
| # Warn if approaching port exhaustion | |
| if port_usage > 50: | |
| print(f" ⚠⚠ WARNING: Using >{port_usage:.0f}% of ephemeral ports for this pattern!") | |
| print(f" Risk of 'Cannot assign requested address' errors") | |
| if show_details and stat.count <= 10: | |
| for conn in stat.connections[:10]: | |
| print(f" {conn}") | |
| else: | |
| print(f"\n✓ No patterns exceed threshold of {threshold}") | |
| # Show top 5 | |
| top5_outgoing = sorted(outgoing_patterns.values(), key=lambda x: x.count, reverse=True)[:5] | |
| if top5_outgoing: | |
| print(f"\nTop 5 patterns by connection count:") | |
| for i, stat in enumerate(top5_outgoing, 1): | |
| percentage = (stat.count / len(all_connections)) * 100 | |
| port_usage = (stat.count / available_ports) * 100 | |
| print(f" {i}. {stat.pattern}: {stat.count} ({percentage:.1f}% of TIME-WAIT, {port_usage:.1f}% of ports)") | |
| print("\n" + "=" * 80) | |
| print("ANALYSIS") | |
| print("=" * 80) | |
| # Provide recommendations | |
| print(f"\nConnection rate capacity (assuming {timewait_duration}s TIME-WAIT):") | |
| print(f" Max sustainable rate per pattern: ~{available_ports / timewait_duration:.0f} conn/sec") | |
| print(f" Current max pattern: {max(top5_outgoing[0].count if top5_outgoing else 0, top5_listening[0].count if top5_listening else 0)} connections") | |
| if top5_outgoing and top5_outgoing[0].count > available_ports * 0.5: | |
| print("\n⚠ RECOMMENDATIONS:") | |
| print(" - Increase net.ipv4.ip_local_port_range") | |
| print(" - Enable net.ipv4.tcp_tw_reuse=1 (safe for clients)") | |
| print(" - Add more client/server IP addresses") | |
| print(" - Use connection pooling/keep-alive to reduce new connections") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Monitor TIME-WAIT connections and detect threshold violations", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s --threshold 50 # Alert if pattern uses >50%% of available ports | |
| %(prog)s --threshold 25 --details | |
| %(prog)s -t 10 -d | |
| For more information about TIME-WAIT state: | |
| https://vincent.bernat.ch/en/blog/2014-tcp-time-wait-state-linux | |
| """ | |
| ) | |
| parser.add_argument( | |
| '-t', '--threshold', | |
| type=float, | |
| required=True, | |
| help='Threshold as percentage of available ephemeral ports (0-100)' | |
| ) | |
| parser.add_argument( | |
| '-d', '--details', | |
| action='store_true', | |
| help='Show detailed connection information for violations (max 10 per pattern)' | |
| ) | |
| args = parser.parse_args() | |
| if args.threshold < 0 or args.threshold > 100: | |
| parser.error("Threshold must be between 0 and 100 (percentage)") | |
| try: | |
| analyze_connections(args.threshold, args.details) | |
| except KeyboardInterrupt: | |
| print("\nInterrupted by user", file=sys.stderr) | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"Error: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment