Skip to content

Instantly share code, notes, and snippets.

@adrianlzt
Created October 27, 2025 12:07
Show Gist options
  • Select an option

  • Save adrianlzt/f6285f7b6c79e5180b0d8b3bffb1d45e to your computer and use it in GitHub Desktop.

Select an option

Save adrianlzt/f6285f7b6c79e5180b0d8b3bffb1d45e to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S uv run
# /// script
# requires-python = ">=3.10"
# dependencies = []
# [tool.uv]
# exclude-newer = "2025-06-03T00:00:00Z"
# ///
"""
Monitor TIME-WAIT connections and detect threshold violations.
This script identifies potential connection tuple exhaustion by monitoring:
- For listening ports: local_ip:local_port:remote_ip combinations
- For outgoing connections: local_ip:remote_ip:remote_port combinations
The problem occurs when the same quadruplet (src_ip, src_port, dst_ip, dst_port)
is reused too frequently, potentially exhausting available ephemeral ports.
"""
import argparse
import sys
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple
class ConnectionStats:
"""Statistics for a specific connection pattern."""
def __init__(self, pattern: str):
self.pattern = pattern
self.count = 0
self.connections = [] # List of full quadruplets for reference
def add_connection(self, local_addr: str, local_port: str,
remote_addr: str, remote_port: str):
self.count += 1
self.connections.append(f"{local_addr}:{local_port} -> {remote_addr}:{remote_port}")
def get_ephemeral_port_range() -> Tuple[int, int]:
"""Read the ephemeral port range from /proc."""
try:
with open('/proc/sys/net/ipv4/ip_local_port_range', 'r') as f:
parts = f.read().strip().split()
return int(parts[0]), int(parts[1])
except (IOError, ValueError) as e:
print(f"Warning: Could not read ephemeral port range: {e}", file=sys.stderr)
return 32768, 60999 # Default range
def get_timewait_duration() -> int:
"""Read the TIME-WAIT (FIN-WAIT-2) timeout from /proc in seconds."""
try:
with open('/proc/sys/net/ipv4/tcp_fin_timeout', 'r') as f:
return int(f.read().strip())
except (IOError, ValueError) as e:
print(f"Warning: Could not read tcp_fin_timeout: {e}", file=sys.stderr)
return 60 # Default 60 seconds
def is_ephemeral_port(port: int, port_range: Tuple[int, int]) -> bool:
"""Check if a port is in the ephemeral range."""
return port_range[0] <= port <= port_range[1]
def parse_proc_net_tcp(filename: str) -> List[Dict[str, str]]:
"""Parse /proc/net/tcp or /proc/net/tcp6 file."""
connections = []
try:
with open(filename, 'r') as f:
# Skip header line
next(f)
for line in f:
parts = line.split()
if len(parts) < 4:
continue
# Parse state (column 3, 0-indexed)
state = int(parts[3], 16)
# Only process TIME-WAIT connections (state 06)
# TCP states: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h
# TCP_TIME_WAIT = 6
if state != 6:
continue
# Parse local address (column 1)
local_addr, local_port = parts[1].split(':')
local_ip = parse_hex_ip(local_addr)
local_port_int = int(local_port, 16)
# Parse remote address (column 2)
remote_addr, remote_port = parts[2].split(':')
remote_ip = parse_hex_ip(remote_addr)
remote_port_int = int(remote_port, 16)
connections.append({
'local_ip': local_ip,
'local_port': str(local_port_int),
'remote_ip': remote_ip,
'remote_port': str(remote_port_int),
})
except IOError as e:
print(f"Warning: Could not read {filename}: {e}", file=sys.stderr)
return connections
def parse_hex_ip(hex_str: str) -> str:
"""Convert hex IP address to dotted decimal notation (IPv4) or standard IPv6."""
if len(hex_str) == 8: # IPv4
# IP is in little-endian format
octets = []
for i in range(0, 8, 2):
octets.append(str(int(hex_str[i:i+2], 16)))
# Reverse because of little-endian
return '.'.join(reversed(octets))
else: # IPv6
# IPv6 is in little-endian 32-bit chunks
# Convert to standard notation
parts = []
for i in range(0, len(hex_str), 8):
chunk = hex_str[i:i+8]
# Reverse byte order within each 32-bit chunk
reversed_chunk = ''.join([chunk[j:j+2] for j in range(6, -1, -2)])
parts.append(reversed_chunk)
ipv6_str = ''.join(parts)
# Format as standard IPv6
formatted = ':'.join([ipv6_str[i:i+4] for i in range(0, len(ipv6_str), 4)])
return formatted
def get_listening_ports() -> set:
"""Get all listening ports from /proc/net/tcp and /proc/net/tcp6."""
listening_ports = set()
for filename in ['/proc/net/tcp', '/proc/net/tcp6']:
try:
with open(filename, 'r') as f:
next(f) # Skip header
for line in f:
parts = line.split()
if len(parts) < 4:
continue
# State 0A is TCP_LISTEN
state = int(parts[3], 16)
if state == 10: # TCP_LISTEN = 10 (0x0A)
local_addr, local_port = parts[1].split(':')
local_port_int = int(local_port, 16)
listening_ports.add(local_port_int)
except IOError:
pass
return listening_ports
def analyze_connections(threshold_percentage: float, show_details: bool = False):
"""Analyze TIME-WAIT connections and report threshold violations."""
# Get ephemeral port range
port_range = get_ephemeral_port_range()
min_port, max_port = port_range
available_ports = max_port - min_port + 1
# Get TIME-WAIT duration
timewait_duration = get_timewait_duration()
# Calculate absolute threshold
threshold = int(available_ports * threshold_percentage / 100)
print(f"Ephemeral port range: {min_port}-{max_port} ({available_ports} ports)")
print(f"TIME-WAIT duration: {timewait_duration} seconds")
print(f"Threshold: {threshold_percentage}% = {threshold} connections per pattern")
print()
# Get listening ports
listening_ports = get_listening_ports()
# Collect all TIME-WAIT connections
all_connections = []
all_connections.extend(parse_proc_net_tcp('/proc/net/tcp'))
all_connections.extend(parse_proc_net_tcp('/proc/net/tcp6'))
if not all_connections:
print("No TIME-WAIT connections found.")
return
print(f"Total TIME-WAIT connections: {len(all_connections)}")
print()
# Categorize connections
# For listening ports: local_ip:local_port:remote_ip
# For outgoing: local_ip:remote_ip:remote_port
listening_patterns = defaultdict(lambda: ConnectionStats(""))
outgoing_patterns = defaultdict(lambda: ConnectionStats(""))
for conn in all_connections:
local_port_int = int(conn['local_port'])
remote_port_int = int(conn['remote_port'])
# Determine if this is an incoming connection (to a listening port)
# or an outgoing connection (from an ephemeral port)
if local_port_int in listening_ports:
# Incoming connection: pattern is local_ip:local_port:remote_ip
pattern = f"{conn['local_ip']}:{conn['local_port']}:{conn['remote_ip']}"
if pattern not in listening_patterns:
listening_patterns[pattern] = ConnectionStats(pattern)
listening_patterns[pattern].add_connection(
conn['local_ip'], conn['local_port'],
conn['remote_ip'], conn['remote_port']
)
elif is_ephemeral_port(local_port_int, port_range):
# Outgoing connection: pattern is local_ip:remote_ip:remote_port
pattern = f"{conn['local_ip']}:{conn['remote_ip']}:{conn['remote_port']}"
if pattern not in outgoing_patterns:
outgoing_patterns[pattern] = ConnectionStats(pattern)
outgoing_patterns[pattern].add_connection(
conn['local_ip'], conn['local_port'],
conn['remote_ip'], conn['remote_port']
)
# Report violations and top 5 for each category
print("=" * 80)
print("INCOMING CONNECTIONS (to listening ports)")
print("Pattern: local_ip:local_port:remote_ip")
print("=" * 80)
violations = [stat for stat in listening_patterns.values() if stat.count >= threshold]
if violations:
print(f"\n⚠ THRESHOLD VIOLATIONS ({len(violations)} patterns):\n")
for stat in sorted(violations, key=lambda x: x.count, reverse=True):
print(f" {stat.pattern}: {stat.count} connections")
if show_details and stat.count <= 10:
for conn in stat.connections[:10]:
print(f" {conn}")
else:
print(f"\n✓ No patterns exceed threshold of {threshold}")
# Show top 5
top5_listening = sorted(listening_patterns.values(), key=lambda x: x.count, reverse=True)[:5]
if top5_listening:
print(f"\nTop 5 patterns by connection count:")
for i, stat in enumerate(top5_listening, 1):
percentage = (stat.count / len(all_connections)) * 100
print(f" {i}. {stat.pattern}: {stat.count} ({percentage:.1f}%)")
print("\n" + "=" * 80)
print("OUTGOING CONNECTIONS (from ephemeral ports)")
print("Pattern: local_ip:remote_ip:remote_port")
print("=" * 80)
violations = [stat for stat in outgoing_patterns.values() if stat.count >= threshold]
if violations:
print(f"\n⚠ THRESHOLD VIOLATIONS ({len(violations)} patterns):\n")
for stat in sorted(violations, key=lambda x: x.count, reverse=True):
port_usage = (stat.count / available_ports) * 100
print(f" {stat.pattern}: {stat.count} connections ({port_usage:.1f}% of available ports)")
# Warn if approaching port exhaustion
if port_usage > 50:
print(f" ⚠⚠ WARNING: Using >{port_usage:.0f}% of ephemeral ports for this pattern!")
print(f" Risk of 'Cannot assign requested address' errors")
if show_details and stat.count <= 10:
for conn in stat.connections[:10]:
print(f" {conn}")
else:
print(f"\n✓ No patterns exceed threshold of {threshold}")
# Show top 5
top5_outgoing = sorted(outgoing_patterns.values(), key=lambda x: x.count, reverse=True)[:5]
if top5_outgoing:
print(f"\nTop 5 patterns by connection count:")
for i, stat in enumerate(top5_outgoing, 1):
percentage = (stat.count / len(all_connections)) * 100
port_usage = (stat.count / available_ports) * 100
print(f" {i}. {stat.pattern}: {stat.count} ({percentage:.1f}% of TIME-WAIT, {port_usage:.1f}% of ports)")
print("\n" + "=" * 80)
print("ANALYSIS")
print("=" * 80)
# Provide recommendations
print(f"\nConnection rate capacity (assuming {timewait_duration}s TIME-WAIT):")
print(f" Max sustainable rate per pattern: ~{available_ports / timewait_duration:.0f} conn/sec")
print(f" Current max pattern: {max(top5_outgoing[0].count if top5_outgoing else 0, top5_listening[0].count if top5_listening else 0)} connections")
if top5_outgoing and top5_outgoing[0].count > available_ports * 0.5:
print("\n⚠ RECOMMENDATIONS:")
print(" - Increase net.ipv4.ip_local_port_range")
print(" - Enable net.ipv4.tcp_tw_reuse=1 (safe for clients)")
print(" - Add more client/server IP addresses")
print(" - Use connection pooling/keep-alive to reduce new connections")
def main():
parser = argparse.ArgumentParser(
description="Monitor TIME-WAIT connections and detect threshold violations",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --threshold 50 # Alert if pattern uses >50%% of available ports
%(prog)s --threshold 25 --details
%(prog)s -t 10 -d
For more information about TIME-WAIT state:
https://vincent.bernat.ch/en/blog/2014-tcp-time-wait-state-linux
"""
)
parser.add_argument(
'-t', '--threshold',
type=float,
required=True,
help='Threshold as percentage of available ephemeral ports (0-100)'
)
parser.add_argument(
'-d', '--details',
action='store_true',
help='Show detailed connection information for violations (max 10 per pattern)'
)
args = parser.parse_args()
if args.threshold < 0 or args.threshold > 100:
parser.error("Threshold must be between 0 and 100 (percentage)")
try:
analyze_connections(args.threshold, args.details)
except KeyboardInterrupt:
print("\nInterrupted by user", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment