Skip to content

Instantly share code, notes, and snippets.

@ayoubzulfiqar
Last active December 4, 2025 13:14
Show Gist options
  • Select an option

  • Save ayoubzulfiqar/8c6f1fcffb35bd39a944b62febaf117a to your computer and use it in GitHub Desktop.

Select an option

Save ayoubzulfiqar/8c6f1fcffb35bd39a944b62febaf117a to your computer and use it in GitHub Desktop.
Reverse Proxy Test Script
import argparse
import asyncio
import concurrent.futures
import gzip
import io
# import hashlib
import json
import logging
# import random
import socket
import statistics
# import string
import sys
import time
import traceback
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
from urllib.parse import quote
# import grpc
import requests
import websockets
# import yaml
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("proxify_test.log", encoding="utf-8"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
ADDR = "0.0.0.0" # Define ADDR for WebSocket testing
# External Testing Service Endpoints
MOCKER_API_BASE = "https://api.mockerapi.com"
HTTPBUN_BASE = "https://httpbun.com"
HTTPBIN_BASE = (
"https://httpbingo.org" # Using httpbingo as a stable httpbin alternative
)
WEBHOOK_SITE_BASE = "https://webhook.site/26880a76-4638-4b21-89d8-8185222b7489"
# Add these lines after the imports to handle UTF-8 output on Windows
if sys.stdout.encoding != "utf-8":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
if sys.stderr.encoding != "utf-8":
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
def convert_keys_to_str(obj):
"""Recursively convert dictionary keys that are not basic JSON types to strings."""
if isinstance(obj, dict):
new_dict = {}
for k, v in obj.items():
# Convert the key to a string if it's not a basic JSON key type (str, int, float, bool, None)
new_key = (
str(k) if not isinstance(k, (str, int, float, bool, type(None))) else k
)
# Recursively process the value
new_dict[new_key] = convert_keys_to_str(v)
return new_dict
elif isinstance(obj, list):
# If the object is a list, recursively process its items
return [convert_keys_to_str(item) for item in obj]
else:
# Return the object itself if it's not a dict or list
return obj
class Protocol(Enum):
"""Supported protocols"""
HTTP = "http"
HTTPS = "https"
WS = "ws"
WSS = "wss"
WEBSOCKET = "ws" # Alias for WS
GRPC = "grpc"
TCP = "tcp"
UDP = "udp"
class LoadBalanceAlgorithm(Enum):
"""Load balancing algorithms"""
ROUND_ROBIN = "round-robin"
LEAST_CONNECTIONS = "leastconn"
SOURCE_IP = "source"
URI_HASH = "uri"
HEADER_HASH = "hdr"
@dataclass
class TestResult:
"""Test result container"""
test_name: str
success: bool
details: Dict[str, Any] = field(default_factory=dict)
error: Optional[str] = None
duration: float = 0.0
class TestSession:
"""Test session with context management"""
def __init__(self, base_url: str = "http://127.0.0.1:4201"):
self.base_url = base_url
self.session_id = str(uuid.uuid4())[:8]
self.session = requests.Session()
self.start_time = time.time()
self.results: List[TestResult] = []
def __enter__(self):
logger.info(f"Starting test session {self.session_id}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration = time.time() - self.start_time
logger.info(f"Test session {self.session_id} completed in {duration:.2f}s")
self.session.close()
def record_result(self, result: TestResult):
"""Record test result"""
self.results.append(result)
if result.success:
logger.info(f"[PASS] {result.test_name} passed ({result.duration:.2f}s)")
else:
logger.error(f"[FAIL] {result.test_name} failed: {result.error}")
def get_summary(self) -> Dict[str, Any]:
"""Get test session summary"""
total = len(self.results)
passed = sum(1 for r in self.results if r.success)
failed = total - passed
return {
"session_id": self.session_id,
"total_tests": total,
"passed": passed,
"failed": failed,
"success_rate": (passed / total * 100) if total > 0 else 0,
"duration": time.time() - self.start_time,
"results": [r.__dict__ for r in self.results],
}
class MultiProtocolTester:
"""Enhanced multi-protocol testing"""
def __init__(self, base_host: str = "127.0.0.1"):
self.base_host = base_host
self.ports = {
Protocol.HTTP: 8080,
Protocol.HTTPS: 443, # Assuming HTTPS on 443
Protocol.WS: 8082,
Protocol.WEBSOCKET: 8082, # Alias for WS
Protocol.GRPC: 8083,
Protocol.TCP: 8080,
Protocol.UDP: 8081,
}
async def test_websocket_protocol(self) -> TestResult:
"""Test WebSocket protocol with full duplex communication"""
test_name = "websocket_protocol"
start_time = time.time()
try:
# Test WebSocket connection and bidirectional communication
websocket_url = f"ws://{self.base_host}:{self.ports[Protocol.WEBSOCKET]}/ws"
async with websockets.connect(websocket_url) as websocket:
# Test sending and receiving messages
test_messages = [
{"type": "echo", "data": "Hello WebSocket"},
{"type": "ping", "data": str(time.time())},
{"type": "large", "data": "x" * 4096}, # Test large message
]
for msg in test_messages:
await websocket.send(json.dumps(msg))
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
# Verify response
try:
resp_data = json.loads(response)
if resp_data.get("type") != msg["type"]:
raise ValueError(
f"Response type mismatch: {resp_data.get('type')}"
)
except json.JSONDecodeError:
# Accept non-JSON responses for echo
if msg["type"] == "echo" and response == msg["data"]:
continue
raise
# Test binary data
binary_data = b"\x00\x01\x02\x03\x04\x05"
await websocket.send(binary_data)
binary_response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
if binary_response != binary_data:
raise ValueError("Binary data mismatch")
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=True,
details={
"protocol": "websocket",
"messages_exchanged": len(test_messages) + 1,
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
def test_tcp_stream_protocol(self) -> TestResult:
"""Test TCP stream protocol with various data patterns"""
test_name = "tcp_stream_protocol"
start_time = time.time()
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect((self.base_host, self.ports[Protocol.TCP]))
# Test small message
small_msg = b"GET /test HTTP/1.1\r\nHost: localhost\r\n\r\n"
sock.send(small_msg)
response = sock.recv(4096) # Read response for small_msg
if not response or b"200 OK" not in response:
raise ValueError(
f"Small message test failed: {response.decode() if response else 'No response'}"
)
# Test large message (should be streamed)
large_msg = b"X" * 65536 # 64KB
sock.send(b"POST /upload HTTP/1.1\r\n")
sock.send(b"Host: localhost\r\n")
sock.send(b"Content-Length: 65536\r\n\r\n")
# Send in chunks
chunk_size = 8192
for i in range(0, len(large_msg), chunk_size):
sock.send(large_msg[i : i + chunk_size])
# Get response for large message
full_response = b""
while True:
try:
chunk = sock.recv(4096)
if not chunk:
break
full_response += chunk
# A more robust HTTP response parser might be needed here
# For now, a simple check for double CRLF and presence of Content-Length or end of stream
if b"\r\n\r\n" in full_response and (
b"Content-Length" in full_response or not chunk
):
break
except socket.timeout:
break
if b"200 OK" not in full_response:
raise ValueError(
f"Large message upload test failed: {full_response.decode() if full_response else 'No response'}"
)
# Test persistent connection
for i in range(3):
sock.send(b"GET /ping HTTP/1.1\r\nHost: localhost\r\n\r\n")
response = sock.recv(4096)
if not response or b"200 OK" not in response:
raise ValueError(
f"Persistent connection test {i + 1} failed: {response.decode() if response else 'No response'}"
)
sock.close()
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=True,
details={
"protocol": "tcp",
"messages_sent": 5,
"connection_persistent": True,
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
def test_udp_datagram_protocol(self) -> TestResult:
"""Test UDP datagram protocol"""
test_name = "udp_datagram_protocol"
start_time = time.time()
success = False
error_message = ""
datagrams_sent = 0
responses_received = 0
all_responses_matched = True
sock = None # Initialize sock to None for finally block
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5) # Set a timeout for recvfrom
# Test small datagram
test_data_small = b"test datagram small"
sock.sendto(test_data_small, (self.base_host, self.ports[Protocol.UDP]))
datagrams_sent += 1
try:
response, _ = sock.recvfrom(1024)
responses_received += 1
if response != test_data_small:
all_responses_matched = False
error_message = f"Small datagram response mismatch: Expected {test_data_small}, Got {response}"
raise ValueError(error_message)
except socket.timeout as e:
error_message = "No response received for small datagram."
raise TimeoutError(error_message) from e
# Test multiple datagrams and verify responses
sent_datagrams = []
received_datagrams = []
for i in range(5):
data = f"datagram_{i}_{time.time()}".encode()
sent_datagrams.append(data)
sock.sendto(data, (self.base_host, self.ports[Protocol.UDP]))
datagrams_sent += 1
# Give the server a moment to respond to all datagrams
time.sleep(1)
# Attempt to receive all responses
for _ in range(len(sent_datagrams)): # Try to receive as many as sent
try:
response, _ = sock.recvfrom(1024)
received_datagrams.append(response)
responses_received += 1
except socket.timeout:
# No more responses or timeout
break
# Verify all sent datagrams have a matching response
if len(sent_datagrams) != len(received_datagrams):
all_responses_matched = False
error_message = f"Mismatch in sent/received datagram count: Sent {len(sent_datagrams)}, Received {len(received_datagrams)}"
raise ValueError(error_message)
for sent, received in zip(sent_datagrams, received_datagrams):
if sent != received:
all_responses_matched = False
error_message = (
f"Datagram content mismatch: Sent {sent}, Received {received}"
)
raise ValueError(error_message)
success = all_responses_matched
except Exception as e:
success = False
error_message = str(e)
logger.error(f"UDP Datagram Test Error: {error_message}")
finally:
if sock: # Ensure socket is closed only if it was created
sock.close()
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=success,
details={
"protocol": "udp",
"datagrams_sent": datagrams_sent,
"responses_received": responses_received,
"all_responses_matched": all_responses_matched,
},
duration=duration,
error=error_message if not success else None,
)
class LoadBalancingTester:
"""Comprehensive load balancing testing"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
self.backends = {}
def test_load_balancing_algorithm(
self, algorithm: LoadBalanceAlgorithm, backend_name: str = "api-backend"
) -> TestResult:
"""Test specific load balancing algorithm"""
test_name = f"load_balancing_{algorithm.value}"
start_time = time.time()
try:
if algorithm == LoadBalanceAlgorithm.ROUND_ROBIN:
return self._test_round_robin(backend_name, start_time)
elif algorithm == LoadBalanceAlgorithm.LEAST_CONNECTIONS:
return self._test_least_connections(backend_name, start_time)
elif algorithm == LoadBalanceAlgorithm.SOURCE_IP:
return self._test_source_ip_hash(backend_name, start_time)
elif algorithm == LoadBalanceAlgorithm.URI_HASH:
return self._test_uri_hash(backend_name, start_time)
elif algorithm == LoadBalanceAlgorithm.HEADER_HASH:
return self._test_header_hash(backend_name, start_time)
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
def _test_round_robin(self, backend_name: str, start_time: float) -> TestResult:
"""Test round-robin algorithm"""
# Make requests and track which server handles them
request_count = 100
server_counts = {}
response_times = []
for i in range(request_count):
try:
req_start = time.time()
response = requests.get(
f"{self.proxy_url}/proxy?backend={backend_name}&path=/test-{i}",
timeout=10,
)
req_time = time.time() - req_start
response_times.append(req_time)
server = response.headers.get("X-Backend-Server", "unknown")
server_counts[server] = server_counts.get(server, 0) + 1
except Exception as e:
logger.warning(f"Request {i} failed: {e}")
# Analyze distribution
if len(server_counts) < 2:
raise ValueError(
f"Only {len(server_counts)} backend servers were hit. "
"For round-robin load balancing to work, please ensure your proxy is configured "
"with at least two active backend servers for the '{backend_name}' backend."
)
# Check if distribution is roughly equal
values = list(server_counts.values())
avg = sum(values) / len(values)
deviation = sum(abs(v - avg) for v in values) / len(values)
duration = time.time() - start_time
return TestResult(
test_name="load_balancing_round_robin",
success=deviation <= (avg * 0.3), # Allow 30% deviation
details={
"algorithm": "round-robin",
"server_distribution": server_counts,
"requests": request_count,
"avg_requests_per_server": avg,
"distribution_deviation": deviation,
"avg_response_time": statistics.mean(response_times)
if response_times
else 0,
},
duration=duration,
)
def _test_least_connections(
self, backend_name: str, start_time: float
) -> TestResult:
"""Test least connections algorithm"""
# Simulate different connection loads
results = []
def make_delayed_request(request_id: int, delay_endpoint: bool = False):
try:
path = "/delay/1" if delay_endpoint else "/instant"
response = requests.get(
f"{self.proxy_url}/proxy?backend={backend_name}&path={path}",
timeout=30,
)
return {
"request_id": request_id,
"server": response.headers.get("X-Backend-Server", "unknown"),
"delayed": delay_endpoint,
}
except Exception as e:
return {"request_id": request_id, "error": str(e)}
# Make concurrent requests with some delayed
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for i in range(20):
delayed = i % 4 == 0 # Every 4th request is delayed
futures.append(executor.submit(make_delayed_request, i, delayed))
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
# Analyze results
server_counts = {}
delayed_counts = {}
for result in results:
if "server" in result:
server = result["server"]
server_counts[server] = server_counts.get(server, 0) + 1
if result.get("delayed"):
delayed_counts[server] = delayed_counts.get(server, 0) + 1
duration = time.time() - start_time
duration = time.time() - start_time
success_status = len(server_counts) > 1
error_message = None
if not success_status:
error_message = (
f"Only {len(server_counts)} backend servers were hit. "
"For least connections load balancing to work effectively, please ensure your proxy "
"is configured with at least two active backend servers for the "
f"'{backend_name}' backend, and that connection weights are allowing distribution."
)
return TestResult(
test_name="load_balancing_least_connections",
success=success_status,
error=error_message,
details={
"algorithm": "least-connections",
"server_distribution": server_counts,
"delayed_requests_distribution": delayed_counts,
"total_requests": len(results),
},
duration=duration,
)
def _test_source_ip_hash(self, backend_name: str, start_time: float) -> TestResult:
"""Test source IP hash algorithm"""
# Simulate requests from different source IPs
server_by_ip = {}
for i in range(10):
ip = f"192.168.1.{i + 1}"
headers = {"X-Forwarded-For": ip, "X-Real-IP": ip}
# Make 5 requests from same IP
servers_seen = set()
for _ in range(5):
try:
response = requests.get(
f"{self.proxy_url}/proxy?backend={backend_name}&path=/test",
headers=headers,
timeout=10,
)
server = response.headers.get("X-Backend-Server", "unknown")
servers_seen.add(server)
except Exception as e:
logger.warning(f"Request from {ip} failed: {e}")
# All requests from same IP should go to same server
if len(servers_seen) > 1:
raise ValueError(f"IP {ip} routed to multiple servers: {servers_seen}")
server_by_ip[ip] = list(servers_seen)[0] if servers_seen else None
duration = time.time() - start_time
return TestResult(
test_name="load_balancing_source_ip_hash",
success=True,
details={
"algorithm": "source-ip-hash",
"ip_to_server_mapping": server_by_ip,
"ips_tested": len(server_by_ip),
},
duration=duration,
)
def _test_uri_hash(self, backend_name: str, start_time: float) -> TestResult:
"""Test URI hash algorithm"""
# Same URI should always go to same server
uri_to_server = {}
test_uris = [
"/api/users/123",
"/api/products/456",
"/api/orders/789",
"/static/image1.jpg",
"/static/image2.png",
]
for uri in test_uris:
servers_seen = set()
for _ in range(5): # Multiple requests to same URI
try:
response = requests.get(
f"{self.proxy_url}/proxy?backend={backend_name}&path={uri}",
timeout=10,
)
server = response.headers.get("X-Backend-Server", "unknown")
servers_seen.add(server)
except Exception as e:
logger.warning(f"Request for {uri} failed: {e}")
if len(servers_seen) > 1:
raise ValueError(
f"URI {uri} routed to multiple servers: {servers_seen}"
)
uri_to_server[uri] = list(servers_seen)[0] if servers_seen else None
# Different URIs might go to different servers
unique_servers = len(set(uri_to_server.values()))
duration = time.time() - start_time
return TestResult(
test_name="load_balancing_uri_hash",
success=True,
details={
"algorithm": "uri-hash",
"uri_to_server_mapping": uri_to_server,
"unique_servers_used": unique_servers,
},
duration=duration,
)
def _test_header_hash(self, backend_name: str, start_time: float) -> TestResult:
"""Test header hash algorithm"""
# Same header value should route to same server
header_to_server = {}
test_headers = [
{"X-User-ID": "user_123"},
{"X-User-ID": "user_456"},
{"X-Session-ID": "session_abc"},
{"X-Session-ID": "session_def"},
]
for header_set in test_headers:
servers_seen = set()
for _ in range(3):
try:
response = requests.get(
f"{self.proxy_url}/proxy?backend={backend_name}&path=/api/data",
headers=header_set,
timeout=10,
)
server = response.headers.get("X-Backend-Server", "unknown")
servers_seen.add(server)
except Exception as e:
logger.warning(f"Request with headers {header_set} failed: {e}")
if len(servers_seen) > 1:
raise ValueError(
f"Headers {header_set} routed to multiple servers: {servers_seen}"
)
key = list(header_set.items())[0]
header_to_server[key] = list(servers_seen)[0] if servers_seen else None
duration = time.time() - start_time
return TestResult(
test_name="load_balancing_header_hash",
success=True,
details={
"algorithm": "header-hash",
"header_to_server_mapping": header_to_server,
},
duration=duration,
)
def test_session_stickiness(self, backend_name: str = "api-backend") -> TestResult:
"""Test session stickiness with cookies"""
test_name = "session_stickiness"
start_time = time.time()
try:
session_cookie = None
first_server = None
servers_seen = set()
# Make multiple requests in a session
for i in range(10):
cookies = {}
if session_cookie:
cookies["PROXIFY_SESSION"] = session_cookie
response = requests.get(
f"{self.proxy_url}/proxy?backend={backend_name}&path=/session-test-{i}",
cookies=cookies,
timeout=10,
)
server = response.headers.get("X-Backend-Server", "unknown")
servers_seen.add(server)
# Store cookie if provided
if "set-cookie" in response.headers:
for cookie in response.headers.get_list("set-cookie"): # pyright: ignore[reportAttributeAccessIssue]
if "PROXIFY_SESSION" in cookie:
session_cookie = cookie.split(";")[0].split("=")[1]
if first_server is None:
first_server = server
# With stickiness, should stay on first server
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=len(servers_seen) == 1 and first_server in servers_seen,
details={
"servers_seen": list(servers_seen),
"first_server": first_server,
"session_cookie_set": session_cookie is not None,
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
class AdvancedSecurityTester:
"""Advanced security testing including all Proxify security features"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_authentication_methods(self) -> List[TestResult]:
"""Test all authentication methods"""
results = []
# Test Basic Authentication
start_time = time.time()
try:
# Test with valid credentials
response = requests.get(
f"{self.proxy_url}/proxy?url=https://httpbin.org/basic-auth/admin/password",
auth=("admin", "password"),
timeout=10,
)
valid_success = response.status_code == 200
# Test with invalid credentials
response = requests.get(
f"{self.proxy_url}/proxy?url=https://httpbin.org/basic-auth/admin/password",
auth=("wrong", "wrong"),
timeout=10,
)
invalid_blocked = response.status_code in [401, 403]
duration = time.time() - start_time
results.append(
TestResult(
test_name="basic_authentication",
success=valid_success and invalid_blocked,
details={
"valid_auth_works": valid_success,
"invalid_auth_blocked": invalid_blocked,
},
duration=duration,
)
)
except Exception as e:
duration = time.time() - start_time
results.append(
TestResult(
test_name="basic_authentication",
success=False,
error=str(e),
duration=duration,
)
)
# Test client certificate authentication (if configured)
# This requires proper cert setup
# Test client certificate authentication (if configured)
# This test requires manual setup of client certificates on the client
# and configuration of the proxy to request and verify them.
# Marking as skipped if not explicitly configured/detected.
results.append(
TestResult(
test_name="client_cert_authentication",
success=False, # Set to False as it's not actively tested
details={
"note": "Client certificate authentication requires specific proxy configuration "
"and client certificate setup, which cannot be automated in this test. "
"This test is skipped. Please configure your proxy for client certificate "
"authentication and manually verify if needed."
},
error="Test requires external client/proxy configuration, currently skipped.",
)
)
return results
def test_cross_origin_policies(self) -> TestResult:
"""Test all cross-origin security headers"""
test_name = "cross_origin_policies"
start_time = time.time()
try:
# Use HTTPBIN_BASE for testing
test_url = f"{HTTPBIN_BASE}/get"
response = requests.get(
f"{self.proxy_url}/proxy?url={quote(test_url)}", timeout=10
)
# Check all cross-origin headers
cross_origin_headers = {
"Cross-Origin-Resource-Policy": response.headers.get(
"Cross-Origin-Resource-Policy"
),
"Cross-Origin-Embedder-Policy": response.headers.get(
"Cross-Origin-Embedder-Policy"
),
"Cross-Origin-Opener-Policy": response.headers.get(
"Cross-Origin-Opener-Policy"
),
"Access-Control-Allow-Origin": response.headers.get(
"Access-Control-Allow-Origin"
),
"Access-Control-Allow-Methods": response.headers.get(
"Access-Control-Allow-Methods"
),
"Access-Control-Allow-Headers": response.headers.get(
"Access-Control-Allow-Headers"
),
"Access-Control-Allow-Credentials": response.headers.get(
"Access-Control-Allow-Credentials"
),
"Access-Control-Expose-Headers": response.headers.get(
"Access-Control-Expose-Headers"
),
}
# Additional security headers
security_headers = {
"Content-Security-Policy": response.headers.get(
"Content-Security-Policy"
),
"X-Content-Type-Options": response.headers.get(
"X-Content-Type-Options"
),
"X-Frame-Options": response.headers.get("X-Frame-Options"),
"X-XSS-Protection": response.headers.get("X-XSS-Protection"),
"Strict-Transport-Security": response.headers.get(
"Strict-Transport-Security"
),
"Referrer-Policy": response.headers.get("Referrer-Policy"),
"Permissions-Policy": response.headers.get("Permissions-Policy"),
}
headers_present = {
k: v is not None
for k, v in {**cross_origin_headers, **security_headers}.items()
}
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=any(cross_origin_headers.values()),
details={
"cross_origin_headers": cross_origin_headers,
"security_headers": security_headers,
"headers_present": headers_present,
"total_headers": len(headers_present),
"present_count": sum(headers_present.values()),
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
def test_ip_filtering(self) -> TestResult:
"""Test IP whitelist/blacklist functionality"""
test_name = "ip_filtering"
start_time = time.time()
# Note: This requires specific Proxify configuration
# Testing assumes certain IPs are blocked/whitelisted
test_ips = [
("127.0.0.1", "localhost"),
("192.168.1.100", "internal IP"),
("10.0.0.1", "private IP"),
("8.8.8.8", "public IP"),
]
results = []
for ip, description in test_ips:
headers = {"X-Forwarded-For": ip, "X-Real-IP": ip}
try:
response = requests.get(
f"{self.proxy_url}/health", headers=headers, timeout=5
)
results.append(
{
"ip": ip,
"description": description,
"allowed": response.status_code == 200,
"status": response.status_code,
}
)
except Exception as e:
results.append(
{
"ip": ip,
"description": description,
"allowed": False,
"error": str(e),
}
)
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=True, # Success means test completed, not that all IPs are filtered
details={"ip_test_results": results},
duration=duration,
)
def test_request_validation(self) -> TestResult:
"""Test comprehensive request validation"""
test_name = "request_validation"
start_time = time.time()
malformed_cases = [
# Invalid URLs
{"url": "javascript:alert('xss')", "desc": "JavaScript protocol"},
{"url": "data:text/html,<script>alert('xss')</script>", "desc": "Data URL"},
{"url": "file:///etc/passwd", "desc": "File protocol"},
{"url": "gopher://example.com", "desc": "Gopher protocol"},
# Path traversal
{
"url": "https://example.com/../../../etc/passwd",
"desc": "Path traversal",
},
{
"url": "https://example.com/%2e%2e%2fetc%2fpasswd",
"desc": "Encoded traversal",
},
# Invalid ports
{"url": "https://example.com:99999", "desc": "Invalid port"},
{"url": "https://example.com:0", "desc": "Port 0"},
{"url": "https://example.com:-1", "desc": "Negative port"},
# SSRF attempts
{"url": "http://169.254.169.254/latest/meta-data", "desc": "AWS metadata"},
{"url": "http://metadata.google.internal", "desc": "GCP metadata"},
{"url": "http://[::1]:80", "desc": "IPv6 localhost"},
# CRLF injection
{
"url": "https://example.com/%0d%0aHost:%20evil.com",
"desc": "CRLF injection",
},
# Overly long URL
{"url": "https://example.com/" + "a" * 10000, "desc": "Very long URL"},
]
results = []
for case in malformed_cases:
try:
response = requests.get(
f"{self.proxy_url}/proxy?url={quote(case['url'])}", timeout=5
)
blocked = response.status_code != 200
results.append(
{
**case,
"blocked": blocked,
"status": response.status_code,
"success": blocked, # Should be blocked
}
)
except Exception as e:
results.append(
{
**case,
"blocked": True,
"error": str(e),
"success": True, # Exception means blocked
}
)
blocked_count = sum(1 for r in results if r.get("blocked", False))
total_count = len(results)
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=blocked_count == total_count,
details={
"cases_tested": total_count,
"blocked": blocked_count,
"allowed": total_count - blocked_count,
"results": results,
},
duration=duration,
)
def test_size_limits(self) -> TestResult:
"""Test request and response size limits"""
test_name = "size_limits"
start_time = time.time()
try:
# Test large request body
large_body = {"data": "x" * (10 * 1024 * 1024)} # 10MB
response = requests.post(
f"{self.proxy_url}/proxy?url=https://httpbin.org/post",
json=large_body,
timeout=30,
)
large_request_handled = response.status_code in [200, 413]
# Test large response
response = requests.get(
f"{self.proxy_url}/proxy?url=https://httpbin.org/bytes/5242880", # 5MB
timeout=30,
stream=True,
)
# Stream to check if large response is handled
total_received = 0
for chunk in response.iter_content(chunk_size=8192):
total_received += len(chunk)
if total_received > 1024 * 1024: # Stop after 1MB
break
large_response_handled = total_received > 0
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=large_request_handled and large_response_handled,
details={
"large_request_handled": large_request_handled,
"large_response_handled": large_response_handled,
"response_bytes_received": total_received,
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
class MiddlewareTester:
"""Test middleware functionality"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_circuit_breaker(self) -> TestResult:
"""Test circuit breaker middleware"""
test_name = "circuit_breaker"
start_time = time.time()
try:
# Target an endpoint that might fail
test_url = "https://httpbin.org/status/500"
failures = 0
for _ in range(10):
response = requests.get(
f"{self.proxy_url}/proxy?url={quote(test_url)}", timeout=5
)
if response.status_code >= 500:
failures += 1
# After certain failures, circuit should open
# Note: Actual behavior depends on Proxify configuration
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=True, # Test completed
details={
"requests_made": 10,
"failures": failures,
"note": "Circuit breaker behavior depends on Proxify config",
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
def test_retry_logic(self) -> TestResult:
"""Test retry middleware"""
test_name = "retry_logic"
start_time = time.time()
try:
# Test with intermittent failures
test_url = "https://httpbin.org/status/429" # Too Many Requests
responses = []
for i in range(5):
response = requests.get(
f"{self.proxy_url}/proxy?url={quote(test_url)}", timeout=10
)
responses.append(
{
"attempt": i + 1,
"status": response.status_code,
"retry_after": response.headers.get("Retry-After"),
}
)
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=True,
details={
"attempts": responses,
"note": "Retry logic depends on Proxify configuration",
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
def test_compression_middleware(self) -> TestResult:
"""Test compression middleware"""
test_name = "compression_middleware"
start_time = time.time()
try:
# Request with accept-encoding
headers = {
"Accept-Encoding": "gzip, deflate, br",
"Accept": "application/json",
}
response = requests.get(
f"{self.proxy_url}/proxy?url=https://httpbin.org/gzip",
headers=headers,
timeout=10,
)
content_encoding = response.headers.get("content-encoding", "").lower()
is_compressed = any(
enc in content_encoding for enc in ["gzip", "deflate", "br"]
)
# Verify we can decompress
if is_compressed and "gzip" in content_encoding:
decompressed = gzip.decompress(response.content)
can_decompress = len(decompressed) > 0
else:
can_decompress = True
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=is_compressed and can_decompress,
details={
"content_encoding": content_encoding,
"is_compressed": is_compressed,
"can_decompress": can_decompress,
"content_length": len(response.content),
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
class MonitoringTester:
"""Test monitoring and metrics endpoints"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_all_monitoring_endpoints(self) -> List[TestResult]:
"""Test all monitoring endpoints"""
results = []
endpoints = [
("/health", "health_endpoint"),
("/metrics", "metrics_endpoint"),
("/stats", "stats_endpoint"),
("/logs", "logs_endpoint"), # If available
]
for endpoint, test_name in endpoints:
start_time = time.time()
current_success = False
current_error = None
details: Dict[str, Any] = {"endpoint": endpoint}
try:
response = requests.get(f"{self.proxy_url}{endpoint}", timeout=10)
is_json = "application/json" in response.headers.get("content-type", "")
has_content = len(response.content) > 0
details.update(
{
"status": response.status_code,
"content_type": response.headers.get("content-type"),
"is_json": is_json,
"content_length": len(response.content),
}
)
if endpoint == "/logs":
if response.status_code == 200 and has_content:
current_success = True
details["log_content_sample"] = (
response.text[:200] + "..."
if len(response.text) > 200
else response.text
)
else:
current_success = False
current_error = (
f"Logs endpoint failed: Status code {response.status_code}, "
f"Has content: {has_content}. Expected 200 OK with content."
)
else: # For other endpoints like /health, /metrics, /stats
if response.status_code == 200 and has_content:
current_success = True
else:
current_success = False
current_error = (
f"{test_name} failed: Status code {response.status_code}, "
f"Has content: {has_content}. Expected 200 OK with content."
)
except requests.exceptions.Timeout:
current_success = False
current_error = f"Request to {endpoint} timed out after 10 seconds."
except requests.exceptions.ConnectionError as e:
current_success = False
current_error = f"Connection error to {endpoint}: {e}"
except Exception as e:
current_success = False
current_error = str(e)
duration = time.time() - start_time
results.append(
TestResult(
test_name=test_name,
success=current_success,
details=details,
duration=duration,
error=current_error,
)
)
return results
def test_metrics_collection(self) -> TestResult:
"""Test that metrics are collected properly"""
test_name = "metrics_collection"
start_time = time.time()
try:
# Make some requests to generate metrics
test_urls = [
"https://httpbin.org/get",
"https://httpbin.org/post",
"https://httpbin.org/status/404",
"https://httpbin.org/delay/1",
]
for url in test_urls:
if "post" in url:
requests.post(
f"{self.proxy_url}/proxy?url={quote(url)}",
json={"test": "data"},
timeout=10,
)
else:
requests.get(f"{self.proxy_url}/proxy?url={quote(url)}", timeout=10)
# Check metrics endpoint
time.sleep(2) # Give time for metrics collection
response = requests.get(f"{self.proxy_url}/metrics", timeout=10)
if response.status_code == 200:
try:
metrics_data = response.json()
has_metrics = len(metrics_data) > 0
# Look for specific metrics
metric_keys = list(metrics_data.keys())
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=has_metrics,
details={
"status": response.status_code,
"metric_count": len(metrics_data),
"metric_keys_sample": metric_keys[:10],
"has_request_metrics": any(
"request" in k.lower() for k in metric_keys
),
"has_error_metrics": any(
"error" in k.lower() for k in metric_keys
),
},
duration=duration,
)
except json.JSONDecodeError:
# Metrics might be in different format
has_content = len(response.content) > 0
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=has_content,
details={
"status": response.status_code,
"content_length": len(response.content),
"note": "Metrics in non-JSON format",
},
duration=duration,
)
else:
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=False,
error=f"Metrics endpoint returned {response.status_code}",
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
class FingerprintTester:
"""Test fingerprint spoofing capabilities"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_fingerprint_spoofing(self) -> TestResult:
"""Test JA4/JA4H fingerprint spoofing"""
test_name = "fingerprint_spoofing"
start_time = time.time()
try:
test_url = "https://httpbin.org/headers"
# Make request through proxy
response = requests.get(
f"{self.proxy_url}/proxy?url={quote(test_url)}", timeout=10
)
# Get the headers that were sent to target
# Note: This assumes the proxy forwards our request info
if response.status_code == 200:
try:
data = response.json()
headers_sent = data.get("headers", {})
# Check for Chrome-like user agent
user_agent = headers_sent.get("User-Agent", "")
is_chrome_like = any(
marker in user_agent
for marker in ["Chrome/", "Safari/", "AppleWebKit/"]
)
# Check for other Chrome headers
chrome_headers = {
"Accept": "text/html,application/xhtml+xml,application/xml",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
matching_headers = {}
for header, expected_value in chrome_headers.items():
actual_value = headers_sent.get(header)
if actual_value:
# Check if actual contains expected
if expected_value in actual_value:
matching_headers[header] = True
else:
matching_headers[header] = False
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=is_chrome_like,
details={
"user_agent": user_agent,
"is_chrome_like": is_chrome_like,
"headers_analyzed": len(matching_headers),
"matching_chrome_headers": matching_headers,
"note": "JA4 (TLS) fingerprint requires TLS handshake analysis",
},
duration=duration,
)
except json.JSONDecodeError:
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=False,
error="Could not parse response as JSON",
duration=duration,
)
else:
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=False,
error=f"Request failed with status {response.status_code}",
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
class ConfigurationTester:
"""Test configuration features"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_configuration_features(self) -> List[TestResult]:
"""Test various configuration features"""
results = []
# Test environment-based config
start_time = time.time()
try:
# This is more of a deployment test
# We can test that different endpoints work with different configs
# Test that basic endpoints respond
endpoints_to_test = ["/health", "/metrics", "/stats"]
all_respond = True
for endpoint in endpoints_to_test:
response = requests.get(f"{self.proxy_url}{endpoint}", timeout=5)
if response.status_code != 200:
all_respond = False
break
duration = time.time() - start_time
results.append(
TestResult(
test_name="configuration_basics",
success=all_respond,
details={
"endpoints_tested": endpoints_to_test,
"all_respond": all_respond,
},
duration=duration,
)
)
except Exception as e:
duration = time.time() - start_time
results.append(
TestResult(
test_name="configuration_basics",
success=False,
error=str(e),
duration=duration,
)
)
# Note: Hot reload testing would require:
# 1. Access to config file
# 2. Ability to send SIGHUP or use admin endpoint
# This is environment-dependent
results.append(
TestResult(
test_name="hot_reload",
success=False,
details={
"note": "Requires external config modification and reload signal"
},
error="Environment-dependent test",
)
)
return results
class ConcurrentTester:
"""Test concurrent request handling capabilities"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_concurrent_requests(self) -> TestResult:
"""Test concurrent request handling"""
test_name = "concurrent_requests"
start_time = time.time()
try:
# Test with 50 concurrent requests
def make_request(request_id: int):
try:
response = requests.get(
f"{self.proxy_url}/proxy?url=https://httpbin.org/get",
timeout=10,
)
return {
"request_id": request_id,
"status": response.status_code,
"success": response.status_code == 200,
}
except Exception as e:
return {
"request_id": request_id,
"error": str(e),
"success": False,
}
# Execute 50 concurrent requests
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = []
for i in range(50):
futures.append(executor.submit(make_request, i))
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
# Analyze results
success_count = sum(1 for r in results if r.get("success"))
failure_count = len(results) - success_count
avg_response_time = (
statistics.mean([r.get("response_time", 0) for r in results])
if results
else 0
)
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=success_count >= 40, # At least 80% success
details={
"total_requests": len(results),
"success_count": success_count,
"failure_count": failure_count,
"success_rate": (success_count / len(results) * 100)
if results
else 0,
"avg_response_time": avg_response_time,
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
class BurstTester:
"""Test burst request handling capabilities"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_burst_requests(self) -> TestResult:
"""Test burst request handling"""
test_name = "burst_requests"
start_time = time.time()
try:
# Test with 100 rapid requests in quick succession
results = []
for i in range(100):
try:
burst_start = time.time()
response = requests.get(
f"{self.proxy_url}/proxy?url=https://httpbin.org/get",
timeout=5,
)
response_time = time.time() - burst_start
results.append(
{
"request_id": i,
"status": response.status_code,
"response_time": response_time,
"success": response.status_code == 200,
}
)
except Exception as e:
results.append(
{
"request_id": i,
"error": str(e),
"response_time": 0,
"success": False,
}
)
# Analyze results
success_count = sum(1 for r in results if r.get("success"))
failure_count = len(results) - success_count
total_time = time.time() - start_time
requests_per_second = len(results) / total_time if total_time > 0 else 0
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=success_count >= 80, # At least 80% success
details={
"total_requests": len(results),
"success_count": success_count,
"failure_count": failure_count,
"success_rate": (success_count / len(results) * 100)
if results
else 0,
"total_time": total_time,
"requests_per_second": requests_per_second,
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
class MemoryTester:
"""Test memory usage and leak detection"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_memory_usage(self) -> TestResult:
"""Test memory usage patterns"""
test_name = "memory_usage"
start_time = time.time()
try:
# Test with large payloads to check memory handling
memory_results = []
# Test different payload sizes
payload_sizes = [1024, 10240, 102400, 1024000] # 1KB, 10KB, 100KB, 1MB
for size in payload_sizes:
try:
payload = {"data": "x" * size}
response = requests.post(
f"{self.proxy_url}/proxy?url=https://httpbin.org/post",
json=payload,
timeout=10,
)
memory_results.append(
{
"size": size,
"status": response.status_code,
"success": response.status_code == 200,
}
)
except Exception as e:
memory_results.append(
{
"size": size,
"error": str(e),
"success": False,
}
)
# Analyze memory handling
success_count = sum(1 for r in memory_results if r.get("success"))
failure_count = len(memory_results) - success_count
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=success_count == len(memory_results), # All should succeed
details={
"payload_sizes_tested": payload_sizes,
"success_count": success_count,
"failure_count": failure_count,
"memory_results": memory_results,
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
class StressTester:
"""Test stress and endurance capabilities"""
def __init__(self, proxy_url: str = "http://127.0.0.1:4201"):
self.proxy_url = proxy_url
def test_stress_endurance(self) -> TestResult:
"""Test stress endurance over time"""
test_name = "stress_endurance"
start_time = time.time()
try:
# Test sustained load over 60 seconds
end_time = start_time + 60 # 60 seconds
request_count = 0
success_count = 0
failure_count = 0
while time.time() < end_time:
try:
response = requests.get(
f"{self.proxy_url}/proxy?url=https://httpbin.org/get",
timeout=5,
)
request_count += 1
if response.status_code == 200:
success_count += 1
else:
failure_count += 1
except Exception:
request_count += 1
failure_count += 1
# Small delay to simulate real usage
time.sleep(0.1)
# Calculate metrics
total_time = time.time() - start_time
requests_per_second = request_count / total_time if total_time > 0 else 0
success_rate = (
(success_count / request_count * 100) if request_count > 0 else 0
)
duration = time.time() - start_time
return TestResult(
test_name=test_name,
success=success_rate >= 70, # At least 70% success rate
details={
"total_requests": request_count,
"success_count": success_count,
"failure_count": failure_count,
"success_rate": success_rate,
"total_time": total_time,
"requests_per_second": requests_per_second,
},
duration=duration,
)
except Exception as e:
duration = time.time() - start_time
return TestResult(
test_name=test_name, success=False, error=str(e), duration=duration
)
def test_configuration_features(self) -> List[TestResult]:
"""Test various configuration features"""
results = []
# Test environment-based config
start_time = time.time()
try:
# This is more of a deployment test
# We can test that different endpoints work with different configs
# Test that basic endpoints respond
endpoints_to_test = ["/health", "/metrics", "/stats"]
all_respond = True
for endpoint in endpoints_to_test:
response = requests.get(f"{self.proxy_url}{endpoint}", timeout=5)
if response.status_code != 200:
all_respond = False
break
duration = time.time() - start_time
results.append(
TestResult(
test_name="configuration_basics",
success=all_respond,
details={
"endpoints_tested": endpoints_to_test,
"all_respond": all_respond,
},
duration=duration,
)
)
except Exception as e:
duration = time.time() - start_time
results.append(
TestResult(
test_name="configuration_basics",
success=False,
error=str(e),
duration=duration,
)
)
# Note: Hot reload testing would require:
# 1. Access to config file
# 2. Ability to send SIGHUP or use admin endpoint
# This is environment-dependent
results.append(
TestResult(
test_name="hot_reload",
success=False,
details={
"note": "Requires external config modification and reload signal"
},
error="Environment-dependent test",
)
)
return results
class ProxifyComprehensiveTester:
"""Comprehensive Proxify tester covering all features"""
def __init__(self, base_url: str = "http://127.0.0.1:4201"):
self.base_url = base_url
self.test_session = TestSession(base_url)
# Initialize all testers
self.multi_protocol_tester = MultiProtocolTester()
self.load_balancing_tester = LoadBalancingTester(base_url)
self.security_tester = AdvancedSecurityTester(base_url)
self.middleware_tester = MiddlewareTester(base_url)
self.monitoring_tester = MonitoringTester(base_url)
self.fingerprint_tester = FingerprintTester(base_url)
self.config_tester = ConfigurationTester(base_url)
# Add comprehensive test types
self.concurrent_tester = ConcurrentTester(base_url)
self.burst_tester = BurstTester(base_url)
self.memory_tester = MemoryTester(base_url)
self.stress_tester = StressTester(base_url)
async def run_comprehensive_test(self) -> Dict[str, Any]:
"""Run comprehensive test covering all Proxify features"""
with self.test_session as session:
logger.info("=" * 80)
logger.info("PROXIFY COMPREHENSIVE TEST SUITE")
logger.info("=" * 80)
# 1. Multi-Protocol Testing
logger.info("\n1. Testing Multi-Protocol Support...")
session.record_result(
await self.multi_protocol_tester.test_websocket_protocol()
)
session.record_result(self.multi_protocol_tester.test_tcp_stream_protocol())
session.record_result(
self.multi_protocol_tester.test_udp_datagram_protocol()
)
# 2. Load Balancing Testing
logger.info("\n2. Testing Load Balancing...")
algorithms = [
LoadBalanceAlgorithm.ROUND_ROBIN,
LoadBalanceAlgorithm.LEAST_CONNECTIONS,
LoadBalanceAlgorithm.SOURCE_IP,
LoadBalanceAlgorithm.URI_HASH,
LoadBalanceAlgorithm.HEADER_HASH,
]
for algo in algorithms:
result = self.load_balancing_tester.test_load_balancing_algorithm(algo)
session.record_result(result)
session.record_result(self.load_balancing_tester.test_session_stickiness())
# 3. Advanced Security Testing
logger.info("\n3. Testing Advanced Security Features...")
# Authentication
auth_results = self.security_tester.test_authentication_methods()
for result in auth_results:
session.record_result(result)
# Security headers and policies
session.record_result(self.security_tester.test_cross_origin_policies())
session.record_result(self.security_tester.test_ip_filtering())
session.record_result(self.security_tester.test_request_validation())
session.record_result(self.security_tester.test_size_limits())
# 4. Middleware Testing
logger.info("\n4. Testing Middleware...")
session.record_result(self.middleware_tester.test_circuit_breaker())
session.record_result(self.middleware_tester.test_retry_logic())
session.record_result(self.middleware_tester.test_compression_middleware())
# 5. Monitoring & Metrics
logger.info("\n5. Testing Monitoring & Metrics...")
monitoring_results = self.monitoring_tester.test_all_monitoring_endpoints()
for result in monitoring_results:
session.record_result(result)
session.record_result(self.monitoring_tester.test_metrics_collection())
# 6. Fingerprint Spoofing
logger.info("\n6. Testing Fingerprint Spoofing...")
session.record_result(self.fingerprint_tester.test_fingerprint_spoofing())
# 7. Configuration Features
logger.info("\n7. Testing Configuration Features...")
config_results = self.config_tester.test_configuration_features()
for result in config_results:
session.record_result(result)
# Generate comprehensive report
return self.generate_report()
def generate_report(self) -> Dict[str, Any]:
"""Generate comprehensive test report"""
summary = self.test_session.get_summary()
# Categorize results
categories = {
"Multi-Protocol": [],
"Load Balancing": [],
"Security": [],
"Middleware": [],
"Monitoring": [],
"Fingerprint": [],
"Configuration": [],
}
# Map test names to categories
for result in self.test_session.results:
test_name = result.test_name.lower()
if any(
proto in test_name for proto in ["websocket", "tcp", "udp", "protocol"]
):
categories["Multi-Protocol"].append(result)
elif any(
lb_term in test_name
for lb_term in [
"load_balancing",
"session_stickiness",
"round_robin",
"least_connections",
]
):
categories["Load Balancing"].append(result)
elif any(
sec_term in test_name
for sec_term in [
"authentication",
"security",
"cross_origin",
"ip_filtering",
"validation",
"size_limits",
]
):
categories["Security"].append(result)
elif any(
mw_term in test_name
for mw_term in ["circuit", "retry", "compression", "middleware"]
):
categories["Middleware"].append(result)
elif any(
mon_term in test_name
for mon_term in ["metrics", "monitoring", "health", "stats", "logs"]
):
categories["Monitoring"].append(result)
elif "fingerprint" in test_name:
categories["Fingerprint"].append(result)
elif any(
config_term in test_name
for config_term in ["configuration", "hot_reload"]
):
categories["Configuration"].append(result)
else:
# Default category
categories["Other"].append(result)
# Calculate category success rates
category_stats = {}
for category, results in categories.items():
if results:
total = len(results)
passed = sum(1 for r in results if r.success)
category_stats[category] = {
"total": total,
"passed": passed,
"success_rate": (passed / total * 100) if total > 0 else 0,
"tests": [r.test_name for r in results],
}
# Features covered (from Proxify documentation)
features_covered = [
"Multi-protocol proxy (HTTP/HTTPS, WebSocket, gRPC, TCP, UDP)",
"Load balancing algorithms (Round Robin, Least Connections, Source IP, URI hash, Header hash)",
"Session stickiness with cookies",
"Health checks and automatic failover",
"Basic and client certificate authentication",
"Cross-origin security policies (CORP, COEP, COOP)",
"IP filtering (whitelist/blacklist)",
"Request validation and SSRF protection",
"Rate limiting per client IP",
"Request/response size limits",
"Circuit breaker middleware",
"Retry middleware with exponential backoff",
"Compression middleware (GZIP)",
"Comprehensive metrics collection",
"HAProxy-like stats endpoint",
"Fingerprint spoofing (JA4/JA4H)",
"Configuration hot reload",
"Structured logging (JSON/text)",
]
report = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"proxify_version": "Production-Ready",
"test_session": summary,
"category_stats": category_stats,
"features_covered": features_covered,
"recommendations": self._generate_recommendations(category_stats),
"detailed_results": [r.__dict__ for r in self.test_session.results],
}
# Save report
self._save_report(report)
return report
def _generate_recommendations(self, category_stats: Dict) -> List[str]:
"""Generate recommendations based on test results"""
recommendations = []
for category, stats in category_stats.items():
success_rate = stats["success_rate"]
if success_rate < 50:
recommendations.append(
f"[WARNING] {category}: Critical issues detected ({success_rate:.1f}% success)"
)
elif success_rate < 80:
recommendations.append(
f"[WARNING] {category}: Needs improvement ({success_rate:.1f}% success)"
)
elif success_rate < 100:
recommendations.append(
f"[PASS] {category}: Good ({success_rate:.1f}% success)"
)
else:
recommendations.append(
f"[EXCELLENT] {category}: Excellent (100% success)"
)
return recommendations
def _save_report(self, report: Dict[str, Any]):
"""Save report to file"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"proxify_comprehensive_report_{timestamp}.json"
# Convert report keys to strings for JSON serialization
report_serializable = convert_keys_to_str(report)
with open(filename, "w", encoding="utf-8") as f:
json.dump(report_serializable, f, indent=2, default=str)
# Also generate a markdown summary
md_filename = f"proxify_comprehensive_report_{timestamp}.md"
self._generate_markdown_report(report, md_filename)
logger.info(f"Detailed report saved to: {filename}")
logger.info(f"Markdown summary saved to: {md_filename}")
def _generate_markdown_report(self, report: Dict[str, Any], filename: str):
"""Generate markdown report"""
with open(filename, "w", encoding="utf-8") as f:
f.write("# Proxify Comprehensive Test Report\n\n")
f.write(f"**Test Date:** {report['timestamp']}\n")
f.write(f"**Proxify Version:** {report['proxify_version']}\n\n")
# Summary
session = report["test_session"]
f.write("## Executive Summary\n\n")
f.write(f"- **Total Tests:** {session['total_tests']}\n")
f.write(f"- **Tests Passed:** {session['passed']}\n")
f.write(f"- **Tests Failed:** {session['failed']}\n")
f.write(f"- **Success Rate:** {session['success_rate']:.1f}%\n")
f.write(f"- **Test Duration:** {session['duration']:.2f}s\n\n")
# Category Breakdown
f.write("## Category Breakdown\n\n")
f.write("| Category | Tests | Passed | Success Rate |\n")
f.write("|----------|-------|--------|--------------|\n")
for category, stats in report["category_stats"].items():
f.write(
f"| {category} | {stats['total']} | {stats['passed']} | {stats['success_rate']:.1f}% |\n"
)
f.write("\n")
# Features Covered
f.write("## Features Tested\n\n")
for feature in report["features_covered"]:
f.write(f"- {feature}\n")
f.write("\n")
# Recommendations
f.write("## Recommendations\n\n")
for rec in report["recommendations"]:
f.write(f"- {rec}\n")
f.write("\n")
# Detailed Results
f.write("## Detailed Test Results\n\n")
f.write("| Test Name | Status | Duration | Details |\n")
f.write("|-----------|--------|----------|---------|\n")
for result in report["detailed_results"]:
status = "[PASS]" if result["success"] else "[FAIL]"
duration = f"{result['duration']:.2f}s"
# Truncate details for table
details = str(result.get("details", {}))
if len(details) > 100:
details = details[:100] + "..."
f.write(
f"| {result['test_name']} | {status} | {duration} | {details} |\n"
)
async def main():
"""Main async function"""
parser = argparse.ArgumentParser(
description="Proxify Comprehensive Test Suite v3.0",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python proxify_test.py --all # Run all comprehensive tests
python proxify_test.py --quick # Run quick functionality tests
python proxify_test.py --security # Run security tests only
python proxify_test.py --load-balancing # Run load balancing tests only
python proxify_test.py --multi-protocol # Test multi-protocol support
python proxify_test.py --url http://localhost:8080 # Test custom Proxify instance
""",
)
parser.add_argument(
"--url", default="http://127.0.0.1:4201", help="Base URL of the Proxify server"
)
parser.add_argument(
"--all", action="store_true", help="Run comprehensive test suite (all features)"
)
parser.add_argument(
"--quick", action="store_true", help="Run quick functionality tests"
)
parser.add_argument(
"--security", action="store_true", help="Run security tests only"
)
parser.add_argument(
"--load-balancing", action="store_true", help="Run load balancing tests only"
)
parser.add_argument(
"--multi-protocol", action="store_true", help="Test multi-protocol support"
)
parser.add_argument(
"--monitoring", action="store_true", help="Test monitoring endpoints"
)
parser.add_argument(
"--middleware", action="store_true", help="Test middleware functionality"
)
parser.add_argument(
"--output", default="comprehensive", help="Output report filename prefix"
)
args = parser.parse_args()
try:
tester = ProxifyComprehensiveTester(base_url=args.url)
if args.all:
logger.info("Starting comprehensive Proxify test suite...")
report = await tester.run_comprehensive_test()
# Print summary
print("\n" + "=" * 80)
print("PROXIFY TEST COMPLETE")
print("=" * 80)
session = report["test_session"]
print(f"\nTotal Tests: {session['total_tests']}")
print(f"Passed: {session['passed']}")
print(f"Failed: {session['failed']}")
print(f"Success Rate: {session['success_rate']:.1f}%")
print(f"Duration: {session['duration']:.2f}s")
print("\nRecommendations:")
for rec in report["recommendations"]:
print(f" {rec}")
print("\nDetailed report saved to file.")
# Exit code based on success
if session["success_rate"] >= 80:
sys.exit(0)
elif session["success_rate"] >= 50:
sys.exit(1)
else:
sys.exit(2)
elif args.security:
logger.info("Running security tests...")
# Run security tests
with TestSession(args.url) as session:
security_tester = AdvancedSecurityTester(args.url)
auth_results = security_tester.test_authentication_methods()
for result in auth_results:
session.record_result(result)
session.record_result(security_tester.test_cross_origin_policies())
session.record_result(security_tester.test_ip_filtering())
session.record_result(security_tester.test_request_validation())
session.record_result(security_tester.test_size_limits())
summary = session.get_summary()
print(json.dumps(summary, indent=2))
elif args.load_balancing:
logger.info("Running load balancing tests...")
with TestSession(args.url) as session:
lb_tester = LoadBalancingTester(args.url)
algorithms = [
LoadBalanceAlgorithm.ROUND_ROBIN,
LoadBalanceAlgorithm.LEAST_CONNECTIONS,
LoadBalanceAlgorithm.SOURCE_IP,
]
for algo in algorithms:
result = lb_tester.test_load_balancing_algorithm(algo)
session.record_result(result)
session.record_result(lb_tester.test_session_stickiness())
summary = session.get_summary()
print(json.dumps(summary, indent=2))
elif args.multi_protocol:
logger.info("Running multi-protocol tests...")
with TestSession(args.url) as session:
mp_tester = MultiProtocolTester()
# Run async test
result = await mp_tester.test_websocket_protocol()
session.record_result(result)
# Run sync tests
session.record_result(mp_tester.test_tcp_stream_protocol())
session.record_result(mp_tester.test_udp_datagram_protocol())
summary = session.get_summary()
print(json.dumps(summary, indent=2))
elif args.monitoring:
logger.info("Running monitoring tests...")
with TestSession(args.url) as session:
monitoring_tester = MonitoringTester(args.url)
results = monitoring_tester.test_all_monitoring_endpoints()
for result in results:
session.record_result(result)
session.record_result(monitoring_tester.test_metrics_collection())
summary = session.get_summary()
print(json.dumps(summary, indent=2))
elif args.middleware:
logger.info("Running middleware tests...")
with TestSession(args.url) as session:
middleware_tester = MiddlewareTester(args.url)
session.record_result(middleware_tester.test_circuit_breaker())
session.record_result(middleware_tester.test_retry_logic())
session.record_result(middleware_tester.test_compression_middleware())
summary = session.get_summary()
print(json.dumps(summary, indent=2))
elif args.quick:
logger.info("Running quick test...")
# Basic functionality test
with TestSession(args.url) as session:
# Test basic endpoints
try:
response = requests.get(f"{args.url}/health", timeout=10)
session.record_result(
TestResult(
test_name="health_endpoint",
success=response.status_code == 200,
details={"status": response.status_code},
duration=0.0,
)
)
except Exception as e:
session.record_result(
TestResult(
test_name="health_endpoint",
success=False,
error=str(e),
duration=0.0,
)
)
# Test basic proxy
try:
response = requests.get(
f"{args.url}/proxy?url=https://httpbin.org/get", timeout=10
)
session.record_result(
TestResult(
test_name="basic_proxy",
success=response.status_code == 200,
details={"status": response.status_code},
duration=0.0,
)
)
except Exception as e:
session.record_result(
TestResult(
test_name="basic_proxy",
success=False,
error=str(e),
duration=0.0,
)
)
summary = session.get_summary()
print(json.dumps(summary, indent=2))
else:
# Default: show help
parser.print_help()
except KeyboardInterrupt:
logger.info("Testing interrupted by user")
sys.exit(1)
except Exception as e:
logger.error(f"Testing failed with error: {e}")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment