Last active
December 4, 2025 13:14
-
-
Save ayoubzulfiqar/8c6f1fcffb35bd39a944b62febaf117a to your computer and use it in GitHub Desktop.
Reverse Proxy Test Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import asyncio | |
| import concurrent.futures | |
| import gzip | |
| import io | |
| # import hashlib | |
| import json | |
| import logging | |
| # import random | |
| import socket | |
| import statistics | |
| # import string | |
| import sys | |
| import time | |
| import traceback | |
| import uuid | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any, Dict, List, Optional | |
| from urllib.parse import quote | |
| # import grpc | |
| import requests | |
| import websockets | |
| # import yaml | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| handlers=[ | |
| logging.FileHandler("proxify_test.log", encoding="utf-8"), | |
| logging.StreamHandler(), | |
| ], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| ADDR = "0.0.0.0" # Define ADDR for WebSocket testing | |
| # External Testing Service Endpoints | |
| MOCKER_API_BASE = "https://api.mockerapi.com" | |
| HTTPBUN_BASE = "https://httpbun.com" | |
| HTTPBIN_BASE = ( | |
| "https://httpbingo.org" # Using httpbingo as a stable httpbin alternative | |
| ) | |
| WEBHOOK_SITE_BASE = "https://webhook.site/26880a76-4638-4b21-89d8-8185222b7489" | |
| # Add these lines after the imports to handle UTF-8 output on Windows | |
| if sys.stdout.encoding != "utf-8": | |
| sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") | |
| if sys.stderr.encoding != "utf-8": | |
| sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") | |
| def convert_keys_to_str(obj): | |
| """Recursively convert dictionary keys that are not basic JSON types to strings.""" | |
| if isinstance(obj, dict): | |
| new_dict = {} | |
| for k, v in obj.items(): | |
| # Convert the key to a string if it's not a basic JSON key type (str, int, float, bool, None) | |
| new_key = ( | |
| str(k) if not isinstance(k, (str, int, float, bool, type(None))) else k | |
| ) | |
| # Recursively process the value | |
| new_dict[new_key] = convert_keys_to_str(v) | |
| return new_dict | |
| elif isinstance(obj, list): | |
| # If the object is a list, recursively process its items | |
| return [convert_keys_to_str(item) for item in obj] | |
| else: | |
| # Return the object itself if it's not a dict or list | |
| return obj | |
| class Protocol(Enum): | |
| """Supported protocols""" | |
| HTTP = "http" | |
| HTTPS = "https" | |
| WS = "ws" | |
| WSS = "wss" | |
| WEBSOCKET = "ws" # Alias for WS | |
| GRPC = "grpc" | |
| TCP = "tcp" | |
| UDP = "udp" | |
| class LoadBalanceAlgorithm(Enum): | |
| """Load balancing algorithms""" | |
| ROUND_ROBIN = "round-robin" | |
| LEAST_CONNECTIONS = "leastconn" | |
| SOURCE_IP = "source" | |
| URI_HASH = "uri" | |
| HEADER_HASH = "hdr" | |
| @dataclass | |
| class TestResult: | |
| """Test result container""" | |
| test_name: str | |
| success: bool | |
| details: Dict[str, Any] = field(default_factory=dict) | |
| error: Optional[str] = None | |
| duration: float = 0.0 | |
| class TestSession: | |
| """Test session with context management""" | |
| def __init__(self, base_url: str = "http://127.0.0.1:4201"): | |
| self.base_url = base_url | |
| self.session_id = str(uuid.uuid4())[:8] | |
| self.session = requests.Session() | |
| self.start_time = time.time() | |
| self.results: List[TestResult] = [] | |
| def __enter__(self): | |
| logger.info(f"Starting test session {self.session_id}") | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| duration = time.time() - self.start_time | |
| logger.info(f"Test session {self.session_id} completed in {duration:.2f}s") | |
| self.session.close() | |
| def record_result(self, result: TestResult): | |
| """Record test result""" | |
| self.results.append(result) | |
| if result.success: | |
| logger.info(f"[PASS] {result.test_name} passed ({result.duration:.2f}s)") | |
| else: | |
| logger.error(f"[FAIL] {result.test_name} failed: {result.error}") | |
| def get_summary(self) -> Dict[str, Any]: | |
| """Get test session summary""" | |
| total = len(self.results) | |
| passed = sum(1 for r in self.results if r.success) | |
| failed = total - passed | |
| return { | |
| "session_id": self.session_id, | |
| "total_tests": total, | |
| "passed": passed, | |
| "failed": failed, | |
| "success_rate": (passed / total * 100) if total > 0 else 0, | |
| "duration": time.time() - self.start_time, | |
| "results": [r.__dict__ for r in self.results], | |
| } | |
| class MultiProtocolTester: | |
| """Enhanced multi-protocol testing""" | |
| def __init__(self, base_host: str = "127.0.0.1"): | |
| self.base_host = base_host | |
| self.ports = { | |
| Protocol.HTTP: 8080, | |
| Protocol.HTTPS: 443, # Assuming HTTPS on 443 | |
| Protocol.WS: 8082, | |
| Protocol.WEBSOCKET: 8082, # Alias for WS | |
| Protocol.GRPC: 8083, | |
| Protocol.TCP: 8080, | |
| Protocol.UDP: 8081, | |
| } | |
| async def test_websocket_protocol(self) -> TestResult: | |
| """Test WebSocket protocol with full duplex communication""" | |
| test_name = "websocket_protocol" | |
| start_time = time.time() | |
| try: | |
| # Test WebSocket connection and bidirectional communication | |
| websocket_url = f"ws://{self.base_host}:{self.ports[Protocol.WEBSOCKET]}/ws" | |
| async with websockets.connect(websocket_url) as websocket: | |
| # Test sending and receiving messages | |
| test_messages = [ | |
| {"type": "echo", "data": "Hello WebSocket"}, | |
| {"type": "ping", "data": str(time.time())}, | |
| {"type": "large", "data": "x" * 4096}, # Test large message | |
| ] | |
| for msg in test_messages: | |
| await websocket.send(json.dumps(msg)) | |
| response = await asyncio.wait_for(websocket.recv(), timeout=5.0) | |
| # Verify response | |
| try: | |
| resp_data = json.loads(response) | |
| if resp_data.get("type") != msg["type"]: | |
| raise ValueError( | |
| f"Response type mismatch: {resp_data.get('type')}" | |
| ) | |
| except json.JSONDecodeError: | |
| # Accept non-JSON responses for echo | |
| if msg["type"] == "echo" and response == msg["data"]: | |
| continue | |
| raise | |
| # Test binary data | |
| binary_data = b"\x00\x01\x02\x03\x04\x05" | |
| await websocket.send(binary_data) | |
| binary_response = await asyncio.wait_for(websocket.recv(), timeout=5.0) | |
| if binary_response != binary_data: | |
| raise ValueError("Binary data mismatch") | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=True, | |
| details={ | |
| "protocol": "websocket", | |
| "messages_exchanged": len(test_messages) + 1, | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| def test_tcp_stream_protocol(self) -> TestResult: | |
| """Test TCP stream protocol with various data patterns""" | |
| test_name = "tcp_stream_protocol" | |
| start_time = time.time() | |
| try: | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.settimeout(10) | |
| sock.connect((self.base_host, self.ports[Protocol.TCP])) | |
| # Test small message | |
| small_msg = b"GET /test HTTP/1.1\r\nHost: localhost\r\n\r\n" | |
| sock.send(small_msg) | |
| response = sock.recv(4096) # Read response for small_msg | |
| if not response or b"200 OK" not in response: | |
| raise ValueError( | |
| f"Small message test failed: {response.decode() if response else 'No response'}" | |
| ) | |
| # Test large message (should be streamed) | |
| large_msg = b"X" * 65536 # 64KB | |
| sock.send(b"POST /upload HTTP/1.1\r\n") | |
| sock.send(b"Host: localhost\r\n") | |
| sock.send(b"Content-Length: 65536\r\n\r\n") | |
| # Send in chunks | |
| chunk_size = 8192 | |
| for i in range(0, len(large_msg), chunk_size): | |
| sock.send(large_msg[i : i + chunk_size]) | |
| # Get response for large message | |
| full_response = b"" | |
| while True: | |
| try: | |
| chunk = sock.recv(4096) | |
| if not chunk: | |
| break | |
| full_response += chunk | |
| # A more robust HTTP response parser might be needed here | |
| # For now, a simple check for double CRLF and presence of Content-Length or end of stream | |
| if b"\r\n\r\n" in full_response and ( | |
| b"Content-Length" in full_response or not chunk | |
| ): | |
| break | |
| except socket.timeout: | |
| break | |
| if b"200 OK" not in full_response: | |
| raise ValueError( | |
| f"Large message upload test failed: {full_response.decode() if full_response else 'No response'}" | |
| ) | |
| # Test persistent connection | |
| for i in range(3): | |
| sock.send(b"GET /ping HTTP/1.1\r\nHost: localhost\r\n\r\n") | |
| response = sock.recv(4096) | |
| if not response or b"200 OK" not in response: | |
| raise ValueError( | |
| f"Persistent connection test {i + 1} failed: {response.decode() if response else 'No response'}" | |
| ) | |
| sock.close() | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=True, | |
| details={ | |
| "protocol": "tcp", | |
| "messages_sent": 5, | |
| "connection_persistent": True, | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| def test_udp_datagram_protocol(self) -> TestResult: | |
| """Test UDP datagram protocol""" | |
| test_name = "udp_datagram_protocol" | |
| start_time = time.time() | |
| success = False | |
| error_message = "" | |
| datagrams_sent = 0 | |
| responses_received = 0 | |
| all_responses_matched = True | |
| sock = None # Initialize sock to None for finally block | |
| try: | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| sock.settimeout(5) # Set a timeout for recvfrom | |
| # Test small datagram | |
| test_data_small = b"test datagram small" | |
| sock.sendto(test_data_small, (self.base_host, self.ports[Protocol.UDP])) | |
| datagrams_sent += 1 | |
| try: | |
| response, _ = sock.recvfrom(1024) | |
| responses_received += 1 | |
| if response != test_data_small: | |
| all_responses_matched = False | |
| error_message = f"Small datagram response mismatch: Expected {test_data_small}, Got {response}" | |
| raise ValueError(error_message) | |
| except socket.timeout as e: | |
| error_message = "No response received for small datagram." | |
| raise TimeoutError(error_message) from e | |
| # Test multiple datagrams and verify responses | |
| sent_datagrams = [] | |
| received_datagrams = [] | |
| for i in range(5): | |
| data = f"datagram_{i}_{time.time()}".encode() | |
| sent_datagrams.append(data) | |
| sock.sendto(data, (self.base_host, self.ports[Protocol.UDP])) | |
| datagrams_sent += 1 | |
| # Give the server a moment to respond to all datagrams | |
| time.sleep(1) | |
| # Attempt to receive all responses | |
| for _ in range(len(sent_datagrams)): # Try to receive as many as sent | |
| try: | |
| response, _ = sock.recvfrom(1024) | |
| received_datagrams.append(response) | |
| responses_received += 1 | |
| except socket.timeout: | |
| # No more responses or timeout | |
| break | |
| # Verify all sent datagrams have a matching response | |
| if len(sent_datagrams) != len(received_datagrams): | |
| all_responses_matched = False | |
| error_message = f"Mismatch in sent/received datagram count: Sent {len(sent_datagrams)}, Received {len(received_datagrams)}" | |
| raise ValueError(error_message) | |
| for sent, received in zip(sent_datagrams, received_datagrams): | |
| if sent != received: | |
| all_responses_matched = False | |
| error_message = ( | |
| f"Datagram content mismatch: Sent {sent}, Received {received}" | |
| ) | |
| raise ValueError(error_message) | |
| success = all_responses_matched | |
| except Exception as e: | |
| success = False | |
| error_message = str(e) | |
| logger.error(f"UDP Datagram Test Error: {error_message}") | |
| finally: | |
| if sock: # Ensure socket is closed only if it was created | |
| sock.close() | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=success, | |
| details={ | |
| "protocol": "udp", | |
| "datagrams_sent": datagrams_sent, | |
| "responses_received": responses_received, | |
| "all_responses_matched": all_responses_matched, | |
| }, | |
| duration=duration, | |
| error=error_message if not success else None, | |
| ) | |
| class LoadBalancingTester: | |
| """Comprehensive load balancing testing""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| self.backends = {} | |
| def test_load_balancing_algorithm( | |
| self, algorithm: LoadBalanceAlgorithm, backend_name: str = "api-backend" | |
| ) -> TestResult: | |
| """Test specific load balancing algorithm""" | |
| test_name = f"load_balancing_{algorithm.value}" | |
| start_time = time.time() | |
| try: | |
| if algorithm == LoadBalanceAlgorithm.ROUND_ROBIN: | |
| return self._test_round_robin(backend_name, start_time) | |
| elif algorithm == LoadBalanceAlgorithm.LEAST_CONNECTIONS: | |
| return self._test_least_connections(backend_name, start_time) | |
| elif algorithm == LoadBalanceAlgorithm.SOURCE_IP: | |
| return self._test_source_ip_hash(backend_name, start_time) | |
| elif algorithm == LoadBalanceAlgorithm.URI_HASH: | |
| return self._test_uri_hash(backend_name, start_time) | |
| elif algorithm == LoadBalanceAlgorithm.HEADER_HASH: | |
| return self._test_header_hash(backend_name, start_time) | |
| else: | |
| raise ValueError(f"Unsupported algorithm: {algorithm}") | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| def _test_round_robin(self, backend_name: str, start_time: float) -> TestResult: | |
| """Test round-robin algorithm""" | |
| # Make requests and track which server handles them | |
| request_count = 100 | |
| server_counts = {} | |
| response_times = [] | |
| for i in range(request_count): | |
| try: | |
| req_start = time.time() | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?backend={backend_name}&path=/test-{i}", | |
| timeout=10, | |
| ) | |
| req_time = time.time() - req_start | |
| response_times.append(req_time) | |
| server = response.headers.get("X-Backend-Server", "unknown") | |
| server_counts[server] = server_counts.get(server, 0) + 1 | |
| except Exception as e: | |
| logger.warning(f"Request {i} failed: {e}") | |
| # Analyze distribution | |
| if len(server_counts) < 2: | |
| raise ValueError( | |
| f"Only {len(server_counts)} backend servers were hit. " | |
| "For round-robin load balancing to work, please ensure your proxy is configured " | |
| "with at least two active backend servers for the '{backend_name}' backend." | |
| ) | |
| # Check if distribution is roughly equal | |
| values = list(server_counts.values()) | |
| avg = sum(values) / len(values) | |
| deviation = sum(abs(v - avg) for v in values) / len(values) | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name="load_balancing_round_robin", | |
| success=deviation <= (avg * 0.3), # Allow 30% deviation | |
| details={ | |
| "algorithm": "round-robin", | |
| "server_distribution": server_counts, | |
| "requests": request_count, | |
| "avg_requests_per_server": avg, | |
| "distribution_deviation": deviation, | |
| "avg_response_time": statistics.mean(response_times) | |
| if response_times | |
| else 0, | |
| }, | |
| duration=duration, | |
| ) | |
| def _test_least_connections( | |
| self, backend_name: str, start_time: float | |
| ) -> TestResult: | |
| """Test least connections algorithm""" | |
| # Simulate different connection loads | |
| results = [] | |
| def make_delayed_request(request_id: int, delay_endpoint: bool = False): | |
| try: | |
| path = "/delay/1" if delay_endpoint else "/instant" | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?backend={backend_name}&path={path}", | |
| timeout=30, | |
| ) | |
| return { | |
| "request_id": request_id, | |
| "server": response.headers.get("X-Backend-Server", "unknown"), | |
| "delayed": delay_endpoint, | |
| } | |
| except Exception as e: | |
| return {"request_id": request_id, "error": str(e)} | |
| # Make concurrent requests with some delayed | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
| futures = [] | |
| for i in range(20): | |
| delayed = i % 4 == 0 # Every 4th request is delayed | |
| futures.append(executor.submit(make_delayed_request, i, delayed)) | |
| for future in concurrent.futures.as_completed(futures): | |
| results.append(future.result()) | |
| # Analyze results | |
| server_counts = {} | |
| delayed_counts = {} | |
| for result in results: | |
| if "server" in result: | |
| server = result["server"] | |
| server_counts[server] = server_counts.get(server, 0) + 1 | |
| if result.get("delayed"): | |
| delayed_counts[server] = delayed_counts.get(server, 0) + 1 | |
| duration = time.time() - start_time | |
| duration = time.time() - start_time | |
| success_status = len(server_counts) > 1 | |
| error_message = None | |
| if not success_status: | |
| error_message = ( | |
| f"Only {len(server_counts)} backend servers were hit. " | |
| "For least connections load balancing to work effectively, please ensure your proxy " | |
| "is configured with at least two active backend servers for the " | |
| f"'{backend_name}' backend, and that connection weights are allowing distribution." | |
| ) | |
| return TestResult( | |
| test_name="load_balancing_least_connections", | |
| success=success_status, | |
| error=error_message, | |
| details={ | |
| "algorithm": "least-connections", | |
| "server_distribution": server_counts, | |
| "delayed_requests_distribution": delayed_counts, | |
| "total_requests": len(results), | |
| }, | |
| duration=duration, | |
| ) | |
| def _test_source_ip_hash(self, backend_name: str, start_time: float) -> TestResult: | |
| """Test source IP hash algorithm""" | |
| # Simulate requests from different source IPs | |
| server_by_ip = {} | |
| for i in range(10): | |
| ip = f"192.168.1.{i + 1}" | |
| headers = {"X-Forwarded-For": ip, "X-Real-IP": ip} | |
| # Make 5 requests from same IP | |
| servers_seen = set() | |
| for _ in range(5): | |
| try: | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?backend={backend_name}&path=/test", | |
| headers=headers, | |
| timeout=10, | |
| ) | |
| server = response.headers.get("X-Backend-Server", "unknown") | |
| servers_seen.add(server) | |
| except Exception as e: | |
| logger.warning(f"Request from {ip} failed: {e}") | |
| # All requests from same IP should go to same server | |
| if len(servers_seen) > 1: | |
| raise ValueError(f"IP {ip} routed to multiple servers: {servers_seen}") | |
| server_by_ip[ip] = list(servers_seen)[0] if servers_seen else None | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name="load_balancing_source_ip_hash", | |
| success=True, | |
| details={ | |
| "algorithm": "source-ip-hash", | |
| "ip_to_server_mapping": server_by_ip, | |
| "ips_tested": len(server_by_ip), | |
| }, | |
| duration=duration, | |
| ) | |
| def _test_uri_hash(self, backend_name: str, start_time: float) -> TestResult: | |
| """Test URI hash algorithm""" | |
| # Same URI should always go to same server | |
| uri_to_server = {} | |
| test_uris = [ | |
| "/api/users/123", | |
| "/api/products/456", | |
| "/api/orders/789", | |
| "/static/image1.jpg", | |
| "/static/image2.png", | |
| ] | |
| for uri in test_uris: | |
| servers_seen = set() | |
| for _ in range(5): # Multiple requests to same URI | |
| try: | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?backend={backend_name}&path={uri}", | |
| timeout=10, | |
| ) | |
| server = response.headers.get("X-Backend-Server", "unknown") | |
| servers_seen.add(server) | |
| except Exception as e: | |
| logger.warning(f"Request for {uri} failed: {e}") | |
| if len(servers_seen) > 1: | |
| raise ValueError( | |
| f"URI {uri} routed to multiple servers: {servers_seen}" | |
| ) | |
| uri_to_server[uri] = list(servers_seen)[0] if servers_seen else None | |
| # Different URIs might go to different servers | |
| unique_servers = len(set(uri_to_server.values())) | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name="load_balancing_uri_hash", | |
| success=True, | |
| details={ | |
| "algorithm": "uri-hash", | |
| "uri_to_server_mapping": uri_to_server, | |
| "unique_servers_used": unique_servers, | |
| }, | |
| duration=duration, | |
| ) | |
| def _test_header_hash(self, backend_name: str, start_time: float) -> TestResult: | |
| """Test header hash algorithm""" | |
| # Same header value should route to same server | |
| header_to_server = {} | |
| test_headers = [ | |
| {"X-User-ID": "user_123"}, | |
| {"X-User-ID": "user_456"}, | |
| {"X-Session-ID": "session_abc"}, | |
| {"X-Session-ID": "session_def"}, | |
| ] | |
| for header_set in test_headers: | |
| servers_seen = set() | |
| for _ in range(3): | |
| try: | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?backend={backend_name}&path=/api/data", | |
| headers=header_set, | |
| timeout=10, | |
| ) | |
| server = response.headers.get("X-Backend-Server", "unknown") | |
| servers_seen.add(server) | |
| except Exception as e: | |
| logger.warning(f"Request with headers {header_set} failed: {e}") | |
| if len(servers_seen) > 1: | |
| raise ValueError( | |
| f"Headers {header_set} routed to multiple servers: {servers_seen}" | |
| ) | |
| key = list(header_set.items())[0] | |
| header_to_server[key] = list(servers_seen)[0] if servers_seen else None | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name="load_balancing_header_hash", | |
| success=True, | |
| details={ | |
| "algorithm": "header-hash", | |
| "header_to_server_mapping": header_to_server, | |
| }, | |
| duration=duration, | |
| ) | |
| def test_session_stickiness(self, backend_name: str = "api-backend") -> TestResult: | |
| """Test session stickiness with cookies""" | |
| test_name = "session_stickiness" | |
| start_time = time.time() | |
| try: | |
| session_cookie = None | |
| first_server = None | |
| servers_seen = set() | |
| # Make multiple requests in a session | |
| for i in range(10): | |
| cookies = {} | |
| if session_cookie: | |
| cookies["PROXIFY_SESSION"] = session_cookie | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?backend={backend_name}&path=/session-test-{i}", | |
| cookies=cookies, | |
| timeout=10, | |
| ) | |
| server = response.headers.get("X-Backend-Server", "unknown") | |
| servers_seen.add(server) | |
| # Store cookie if provided | |
| if "set-cookie" in response.headers: | |
| for cookie in response.headers.get_list("set-cookie"): # pyright: ignore[reportAttributeAccessIssue] | |
| if "PROXIFY_SESSION" in cookie: | |
| session_cookie = cookie.split(";")[0].split("=")[1] | |
| if first_server is None: | |
| first_server = server | |
| # With stickiness, should stay on first server | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=len(servers_seen) == 1 and first_server in servers_seen, | |
| details={ | |
| "servers_seen": list(servers_seen), | |
| "first_server": first_server, | |
| "session_cookie_set": session_cookie is not None, | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| class AdvancedSecurityTester: | |
| """Advanced security testing including all Proxify security features""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_authentication_methods(self) -> List[TestResult]: | |
| """Test all authentication methods""" | |
| results = [] | |
| # Test Basic Authentication | |
| start_time = time.time() | |
| try: | |
| # Test with valid credentials | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/basic-auth/admin/password", | |
| auth=("admin", "password"), | |
| timeout=10, | |
| ) | |
| valid_success = response.status_code == 200 | |
| # Test with invalid credentials | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/basic-auth/admin/password", | |
| auth=("wrong", "wrong"), | |
| timeout=10, | |
| ) | |
| invalid_blocked = response.status_code in [401, 403] | |
| duration = time.time() - start_time | |
| results.append( | |
| TestResult( | |
| test_name="basic_authentication", | |
| success=valid_success and invalid_blocked, | |
| details={ | |
| "valid_auth_works": valid_success, | |
| "invalid_auth_blocked": invalid_blocked, | |
| }, | |
| duration=duration, | |
| ) | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| results.append( | |
| TestResult( | |
| test_name="basic_authentication", | |
| success=False, | |
| error=str(e), | |
| duration=duration, | |
| ) | |
| ) | |
| # Test client certificate authentication (if configured) | |
| # This requires proper cert setup | |
| # Test client certificate authentication (if configured) | |
| # This test requires manual setup of client certificates on the client | |
| # and configuration of the proxy to request and verify them. | |
| # Marking as skipped if not explicitly configured/detected. | |
| results.append( | |
| TestResult( | |
| test_name="client_cert_authentication", | |
| success=False, # Set to False as it's not actively tested | |
| details={ | |
| "note": "Client certificate authentication requires specific proxy configuration " | |
| "and client certificate setup, which cannot be automated in this test. " | |
| "This test is skipped. Please configure your proxy for client certificate " | |
| "authentication and manually verify if needed." | |
| }, | |
| error="Test requires external client/proxy configuration, currently skipped.", | |
| ) | |
| ) | |
| return results | |
| def test_cross_origin_policies(self) -> TestResult: | |
| """Test all cross-origin security headers""" | |
| test_name = "cross_origin_policies" | |
| start_time = time.time() | |
| try: | |
| # Use HTTPBIN_BASE for testing | |
| test_url = f"{HTTPBIN_BASE}/get" | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url={quote(test_url)}", timeout=10 | |
| ) | |
| # Check all cross-origin headers | |
| cross_origin_headers = { | |
| "Cross-Origin-Resource-Policy": response.headers.get( | |
| "Cross-Origin-Resource-Policy" | |
| ), | |
| "Cross-Origin-Embedder-Policy": response.headers.get( | |
| "Cross-Origin-Embedder-Policy" | |
| ), | |
| "Cross-Origin-Opener-Policy": response.headers.get( | |
| "Cross-Origin-Opener-Policy" | |
| ), | |
| "Access-Control-Allow-Origin": response.headers.get( | |
| "Access-Control-Allow-Origin" | |
| ), | |
| "Access-Control-Allow-Methods": response.headers.get( | |
| "Access-Control-Allow-Methods" | |
| ), | |
| "Access-Control-Allow-Headers": response.headers.get( | |
| "Access-Control-Allow-Headers" | |
| ), | |
| "Access-Control-Allow-Credentials": response.headers.get( | |
| "Access-Control-Allow-Credentials" | |
| ), | |
| "Access-Control-Expose-Headers": response.headers.get( | |
| "Access-Control-Expose-Headers" | |
| ), | |
| } | |
| # Additional security headers | |
| security_headers = { | |
| "Content-Security-Policy": response.headers.get( | |
| "Content-Security-Policy" | |
| ), | |
| "X-Content-Type-Options": response.headers.get( | |
| "X-Content-Type-Options" | |
| ), | |
| "X-Frame-Options": response.headers.get("X-Frame-Options"), | |
| "X-XSS-Protection": response.headers.get("X-XSS-Protection"), | |
| "Strict-Transport-Security": response.headers.get( | |
| "Strict-Transport-Security" | |
| ), | |
| "Referrer-Policy": response.headers.get("Referrer-Policy"), | |
| "Permissions-Policy": response.headers.get("Permissions-Policy"), | |
| } | |
| headers_present = { | |
| k: v is not None | |
| for k, v in {**cross_origin_headers, **security_headers}.items() | |
| } | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=any(cross_origin_headers.values()), | |
| details={ | |
| "cross_origin_headers": cross_origin_headers, | |
| "security_headers": security_headers, | |
| "headers_present": headers_present, | |
| "total_headers": len(headers_present), | |
| "present_count": sum(headers_present.values()), | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| def test_ip_filtering(self) -> TestResult: | |
| """Test IP whitelist/blacklist functionality""" | |
| test_name = "ip_filtering" | |
| start_time = time.time() | |
| # Note: This requires specific Proxify configuration | |
| # Testing assumes certain IPs are blocked/whitelisted | |
| test_ips = [ | |
| ("127.0.0.1", "localhost"), | |
| ("192.168.1.100", "internal IP"), | |
| ("10.0.0.1", "private IP"), | |
| ("8.8.8.8", "public IP"), | |
| ] | |
| results = [] | |
| for ip, description in test_ips: | |
| headers = {"X-Forwarded-For": ip, "X-Real-IP": ip} | |
| try: | |
| response = requests.get( | |
| f"{self.proxy_url}/health", headers=headers, timeout=5 | |
| ) | |
| results.append( | |
| { | |
| "ip": ip, | |
| "description": description, | |
| "allowed": response.status_code == 200, | |
| "status": response.status_code, | |
| } | |
| ) | |
| except Exception as e: | |
| results.append( | |
| { | |
| "ip": ip, | |
| "description": description, | |
| "allowed": False, | |
| "error": str(e), | |
| } | |
| ) | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=True, # Success means test completed, not that all IPs are filtered | |
| details={"ip_test_results": results}, | |
| duration=duration, | |
| ) | |
| def test_request_validation(self) -> TestResult: | |
| """Test comprehensive request validation""" | |
| test_name = "request_validation" | |
| start_time = time.time() | |
| malformed_cases = [ | |
| # Invalid URLs | |
| {"url": "javascript:alert('xss')", "desc": "JavaScript protocol"}, | |
| {"url": "data:text/html,<script>alert('xss')</script>", "desc": "Data URL"}, | |
| {"url": "file:///etc/passwd", "desc": "File protocol"}, | |
| {"url": "gopher://example.com", "desc": "Gopher protocol"}, | |
| # Path traversal | |
| { | |
| "url": "https://example.com/../../../etc/passwd", | |
| "desc": "Path traversal", | |
| }, | |
| { | |
| "url": "https://example.com/%2e%2e%2fetc%2fpasswd", | |
| "desc": "Encoded traversal", | |
| }, | |
| # Invalid ports | |
| {"url": "https://example.com:99999", "desc": "Invalid port"}, | |
| {"url": "https://example.com:0", "desc": "Port 0"}, | |
| {"url": "https://example.com:-1", "desc": "Negative port"}, | |
| # SSRF attempts | |
| {"url": "http://169.254.169.254/latest/meta-data", "desc": "AWS metadata"}, | |
| {"url": "http://metadata.google.internal", "desc": "GCP metadata"}, | |
| {"url": "http://[::1]:80", "desc": "IPv6 localhost"}, | |
| # CRLF injection | |
| { | |
| "url": "https://example.com/%0d%0aHost:%20evil.com", | |
| "desc": "CRLF injection", | |
| }, | |
| # Overly long URL | |
| {"url": "https://example.com/" + "a" * 10000, "desc": "Very long URL"}, | |
| ] | |
| results = [] | |
| for case in malformed_cases: | |
| try: | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url={quote(case['url'])}", timeout=5 | |
| ) | |
| blocked = response.status_code != 200 | |
| results.append( | |
| { | |
| **case, | |
| "blocked": blocked, | |
| "status": response.status_code, | |
| "success": blocked, # Should be blocked | |
| } | |
| ) | |
| except Exception as e: | |
| results.append( | |
| { | |
| **case, | |
| "blocked": True, | |
| "error": str(e), | |
| "success": True, # Exception means blocked | |
| } | |
| ) | |
| blocked_count = sum(1 for r in results if r.get("blocked", False)) | |
| total_count = len(results) | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=blocked_count == total_count, | |
| details={ | |
| "cases_tested": total_count, | |
| "blocked": blocked_count, | |
| "allowed": total_count - blocked_count, | |
| "results": results, | |
| }, | |
| duration=duration, | |
| ) | |
| def test_size_limits(self) -> TestResult: | |
| """Test request and response size limits""" | |
| test_name = "size_limits" | |
| start_time = time.time() | |
| try: | |
| # Test large request body | |
| large_body = {"data": "x" * (10 * 1024 * 1024)} # 10MB | |
| response = requests.post( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/post", | |
| json=large_body, | |
| timeout=30, | |
| ) | |
| large_request_handled = response.status_code in [200, 413] | |
| # Test large response | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/bytes/5242880", # 5MB | |
| timeout=30, | |
| stream=True, | |
| ) | |
| # Stream to check if large response is handled | |
| total_received = 0 | |
| for chunk in response.iter_content(chunk_size=8192): | |
| total_received += len(chunk) | |
| if total_received > 1024 * 1024: # Stop after 1MB | |
| break | |
| large_response_handled = total_received > 0 | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=large_request_handled and large_response_handled, | |
| details={ | |
| "large_request_handled": large_request_handled, | |
| "large_response_handled": large_response_handled, | |
| "response_bytes_received": total_received, | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| class MiddlewareTester: | |
| """Test middleware functionality""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_circuit_breaker(self) -> TestResult: | |
| """Test circuit breaker middleware""" | |
| test_name = "circuit_breaker" | |
| start_time = time.time() | |
| try: | |
| # Target an endpoint that might fail | |
| test_url = "https://httpbin.org/status/500" | |
| failures = 0 | |
| for _ in range(10): | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url={quote(test_url)}", timeout=5 | |
| ) | |
| if response.status_code >= 500: | |
| failures += 1 | |
| # After certain failures, circuit should open | |
| # Note: Actual behavior depends on Proxify configuration | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=True, # Test completed | |
| details={ | |
| "requests_made": 10, | |
| "failures": failures, | |
| "note": "Circuit breaker behavior depends on Proxify config", | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| def test_retry_logic(self) -> TestResult: | |
| """Test retry middleware""" | |
| test_name = "retry_logic" | |
| start_time = time.time() | |
| try: | |
| # Test with intermittent failures | |
| test_url = "https://httpbin.org/status/429" # Too Many Requests | |
| responses = [] | |
| for i in range(5): | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url={quote(test_url)}", timeout=10 | |
| ) | |
| responses.append( | |
| { | |
| "attempt": i + 1, | |
| "status": response.status_code, | |
| "retry_after": response.headers.get("Retry-After"), | |
| } | |
| ) | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=True, | |
| details={ | |
| "attempts": responses, | |
| "note": "Retry logic depends on Proxify configuration", | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| def test_compression_middleware(self) -> TestResult: | |
| """Test compression middleware""" | |
| test_name = "compression_middleware" | |
| start_time = time.time() | |
| try: | |
| # Request with accept-encoding | |
| headers = { | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Accept": "application/json", | |
| } | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/gzip", | |
| headers=headers, | |
| timeout=10, | |
| ) | |
| content_encoding = response.headers.get("content-encoding", "").lower() | |
| is_compressed = any( | |
| enc in content_encoding for enc in ["gzip", "deflate", "br"] | |
| ) | |
| # Verify we can decompress | |
| if is_compressed and "gzip" in content_encoding: | |
| decompressed = gzip.decompress(response.content) | |
| can_decompress = len(decompressed) > 0 | |
| else: | |
| can_decompress = True | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=is_compressed and can_decompress, | |
| details={ | |
| "content_encoding": content_encoding, | |
| "is_compressed": is_compressed, | |
| "can_decompress": can_decompress, | |
| "content_length": len(response.content), | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| class MonitoringTester: | |
| """Test monitoring and metrics endpoints""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_all_monitoring_endpoints(self) -> List[TestResult]: | |
| """Test all monitoring endpoints""" | |
| results = [] | |
| endpoints = [ | |
| ("/health", "health_endpoint"), | |
| ("/metrics", "metrics_endpoint"), | |
| ("/stats", "stats_endpoint"), | |
| ("/logs", "logs_endpoint"), # If available | |
| ] | |
| for endpoint, test_name in endpoints: | |
| start_time = time.time() | |
| current_success = False | |
| current_error = None | |
| details: Dict[str, Any] = {"endpoint": endpoint} | |
| try: | |
| response = requests.get(f"{self.proxy_url}{endpoint}", timeout=10) | |
| is_json = "application/json" in response.headers.get("content-type", "") | |
| has_content = len(response.content) > 0 | |
| details.update( | |
| { | |
| "status": response.status_code, | |
| "content_type": response.headers.get("content-type"), | |
| "is_json": is_json, | |
| "content_length": len(response.content), | |
| } | |
| ) | |
| if endpoint == "/logs": | |
| if response.status_code == 200 and has_content: | |
| current_success = True | |
| details["log_content_sample"] = ( | |
| response.text[:200] + "..." | |
| if len(response.text) > 200 | |
| else response.text | |
| ) | |
| else: | |
| current_success = False | |
| current_error = ( | |
| f"Logs endpoint failed: Status code {response.status_code}, " | |
| f"Has content: {has_content}. Expected 200 OK with content." | |
| ) | |
| else: # For other endpoints like /health, /metrics, /stats | |
| if response.status_code == 200 and has_content: | |
| current_success = True | |
| else: | |
| current_success = False | |
| current_error = ( | |
| f"{test_name} failed: Status code {response.status_code}, " | |
| f"Has content: {has_content}. Expected 200 OK with content." | |
| ) | |
| except requests.exceptions.Timeout: | |
| current_success = False | |
| current_error = f"Request to {endpoint} timed out after 10 seconds." | |
| except requests.exceptions.ConnectionError as e: | |
| current_success = False | |
| current_error = f"Connection error to {endpoint}: {e}" | |
| except Exception as e: | |
| current_success = False | |
| current_error = str(e) | |
| duration = time.time() - start_time | |
| results.append( | |
| TestResult( | |
| test_name=test_name, | |
| success=current_success, | |
| details=details, | |
| duration=duration, | |
| error=current_error, | |
| ) | |
| ) | |
| return results | |
| def test_metrics_collection(self) -> TestResult: | |
| """Test that metrics are collected properly""" | |
| test_name = "metrics_collection" | |
| start_time = time.time() | |
| try: | |
| # Make some requests to generate metrics | |
| test_urls = [ | |
| "https://httpbin.org/get", | |
| "https://httpbin.org/post", | |
| "https://httpbin.org/status/404", | |
| "https://httpbin.org/delay/1", | |
| ] | |
| for url in test_urls: | |
| if "post" in url: | |
| requests.post( | |
| f"{self.proxy_url}/proxy?url={quote(url)}", | |
| json={"test": "data"}, | |
| timeout=10, | |
| ) | |
| else: | |
| requests.get(f"{self.proxy_url}/proxy?url={quote(url)}", timeout=10) | |
| # Check metrics endpoint | |
| time.sleep(2) # Give time for metrics collection | |
| response = requests.get(f"{self.proxy_url}/metrics", timeout=10) | |
| if response.status_code == 200: | |
| try: | |
| metrics_data = response.json() | |
| has_metrics = len(metrics_data) > 0 | |
| # Look for specific metrics | |
| metric_keys = list(metrics_data.keys()) | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=has_metrics, | |
| details={ | |
| "status": response.status_code, | |
| "metric_count": len(metrics_data), | |
| "metric_keys_sample": metric_keys[:10], | |
| "has_request_metrics": any( | |
| "request" in k.lower() for k in metric_keys | |
| ), | |
| "has_error_metrics": any( | |
| "error" in k.lower() for k in metric_keys | |
| ), | |
| }, | |
| duration=duration, | |
| ) | |
| except json.JSONDecodeError: | |
| # Metrics might be in different format | |
| has_content = len(response.content) > 0 | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=has_content, | |
| details={ | |
| "status": response.status_code, | |
| "content_length": len(response.content), | |
| "note": "Metrics in non-JSON format", | |
| }, | |
| duration=duration, | |
| ) | |
| else: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=False, | |
| error=f"Metrics endpoint returned {response.status_code}", | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| class FingerprintTester: | |
| """Test fingerprint spoofing capabilities""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_fingerprint_spoofing(self) -> TestResult: | |
| """Test JA4/JA4H fingerprint spoofing""" | |
| test_name = "fingerprint_spoofing" | |
| start_time = time.time() | |
| try: | |
| test_url = "https://httpbin.org/headers" | |
| # Make request through proxy | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url={quote(test_url)}", timeout=10 | |
| ) | |
| # Get the headers that were sent to target | |
| # Note: This assumes the proxy forwards our request info | |
| if response.status_code == 200: | |
| try: | |
| data = response.json() | |
| headers_sent = data.get("headers", {}) | |
| # Check for Chrome-like user agent | |
| user_agent = headers_sent.get("User-Agent", "") | |
| is_chrome_like = any( | |
| marker in user_agent | |
| for marker in ["Chrome/", "Safari/", "AppleWebKit/"] | |
| ) | |
| # Check for other Chrome headers | |
| chrome_headers = { | |
| "Accept": "text/html,application/xhtml+xml,application/xml", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1", | |
| } | |
| matching_headers = {} | |
| for header, expected_value in chrome_headers.items(): | |
| actual_value = headers_sent.get(header) | |
| if actual_value: | |
| # Check if actual contains expected | |
| if expected_value in actual_value: | |
| matching_headers[header] = True | |
| else: | |
| matching_headers[header] = False | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=is_chrome_like, | |
| details={ | |
| "user_agent": user_agent, | |
| "is_chrome_like": is_chrome_like, | |
| "headers_analyzed": len(matching_headers), | |
| "matching_chrome_headers": matching_headers, | |
| "note": "JA4 (TLS) fingerprint requires TLS handshake analysis", | |
| }, | |
| duration=duration, | |
| ) | |
| except json.JSONDecodeError: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=False, | |
| error="Could not parse response as JSON", | |
| duration=duration, | |
| ) | |
| else: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=False, | |
| error=f"Request failed with status {response.status_code}", | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| class ConfigurationTester: | |
| """Test configuration features""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_configuration_features(self) -> List[TestResult]: | |
| """Test various configuration features""" | |
| results = [] | |
| # Test environment-based config | |
| start_time = time.time() | |
| try: | |
| # This is more of a deployment test | |
| # We can test that different endpoints work with different configs | |
| # Test that basic endpoints respond | |
| endpoints_to_test = ["/health", "/metrics", "/stats"] | |
| all_respond = True | |
| for endpoint in endpoints_to_test: | |
| response = requests.get(f"{self.proxy_url}{endpoint}", timeout=5) | |
| if response.status_code != 200: | |
| all_respond = False | |
| break | |
| duration = time.time() - start_time | |
| results.append( | |
| TestResult( | |
| test_name="configuration_basics", | |
| success=all_respond, | |
| details={ | |
| "endpoints_tested": endpoints_to_test, | |
| "all_respond": all_respond, | |
| }, | |
| duration=duration, | |
| ) | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| results.append( | |
| TestResult( | |
| test_name="configuration_basics", | |
| success=False, | |
| error=str(e), | |
| duration=duration, | |
| ) | |
| ) | |
| # Note: Hot reload testing would require: | |
| # 1. Access to config file | |
| # 2. Ability to send SIGHUP or use admin endpoint | |
| # This is environment-dependent | |
| results.append( | |
| TestResult( | |
| test_name="hot_reload", | |
| success=False, | |
| details={ | |
| "note": "Requires external config modification and reload signal" | |
| }, | |
| error="Environment-dependent test", | |
| ) | |
| ) | |
| return results | |
| class ConcurrentTester: | |
| """Test concurrent request handling capabilities""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_concurrent_requests(self) -> TestResult: | |
| """Test concurrent request handling""" | |
| test_name = "concurrent_requests" | |
| start_time = time.time() | |
| try: | |
| # Test with 50 concurrent requests | |
| def make_request(request_id: int): | |
| try: | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/get", | |
| timeout=10, | |
| ) | |
| return { | |
| "request_id": request_id, | |
| "status": response.status_code, | |
| "success": response.status_code == 200, | |
| } | |
| except Exception as e: | |
| return { | |
| "request_id": request_id, | |
| "error": str(e), | |
| "success": False, | |
| } | |
| # Execute 50 concurrent requests | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: | |
| futures = [] | |
| for i in range(50): | |
| futures.append(executor.submit(make_request, i)) | |
| results = [] | |
| for future in concurrent.futures.as_completed(futures): | |
| results.append(future.result()) | |
| # Analyze results | |
| success_count = sum(1 for r in results if r.get("success")) | |
| failure_count = len(results) - success_count | |
| avg_response_time = ( | |
| statistics.mean([r.get("response_time", 0) for r in results]) | |
| if results | |
| else 0 | |
| ) | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=success_count >= 40, # At least 80% success | |
| details={ | |
| "total_requests": len(results), | |
| "success_count": success_count, | |
| "failure_count": failure_count, | |
| "success_rate": (success_count / len(results) * 100) | |
| if results | |
| else 0, | |
| "avg_response_time": avg_response_time, | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| class BurstTester: | |
| """Test burst request handling capabilities""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_burst_requests(self) -> TestResult: | |
| """Test burst request handling""" | |
| test_name = "burst_requests" | |
| start_time = time.time() | |
| try: | |
| # Test with 100 rapid requests in quick succession | |
| results = [] | |
| for i in range(100): | |
| try: | |
| burst_start = time.time() | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/get", | |
| timeout=5, | |
| ) | |
| response_time = time.time() - burst_start | |
| results.append( | |
| { | |
| "request_id": i, | |
| "status": response.status_code, | |
| "response_time": response_time, | |
| "success": response.status_code == 200, | |
| } | |
| ) | |
| except Exception as e: | |
| results.append( | |
| { | |
| "request_id": i, | |
| "error": str(e), | |
| "response_time": 0, | |
| "success": False, | |
| } | |
| ) | |
| # Analyze results | |
| success_count = sum(1 for r in results if r.get("success")) | |
| failure_count = len(results) - success_count | |
| total_time = time.time() - start_time | |
| requests_per_second = len(results) / total_time if total_time > 0 else 0 | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=success_count >= 80, # At least 80% success | |
| details={ | |
| "total_requests": len(results), | |
| "success_count": success_count, | |
| "failure_count": failure_count, | |
| "success_rate": (success_count / len(results) * 100) | |
| if results | |
| else 0, | |
| "total_time": total_time, | |
| "requests_per_second": requests_per_second, | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| class MemoryTester: | |
| """Test memory usage and leak detection""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_memory_usage(self) -> TestResult: | |
| """Test memory usage patterns""" | |
| test_name = "memory_usage" | |
| start_time = time.time() | |
| try: | |
| # Test with large payloads to check memory handling | |
| memory_results = [] | |
| # Test different payload sizes | |
| payload_sizes = [1024, 10240, 102400, 1024000] # 1KB, 10KB, 100KB, 1MB | |
| for size in payload_sizes: | |
| try: | |
| payload = {"data": "x" * size} | |
| response = requests.post( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/post", | |
| json=payload, | |
| timeout=10, | |
| ) | |
| memory_results.append( | |
| { | |
| "size": size, | |
| "status": response.status_code, | |
| "success": response.status_code == 200, | |
| } | |
| ) | |
| except Exception as e: | |
| memory_results.append( | |
| { | |
| "size": size, | |
| "error": str(e), | |
| "success": False, | |
| } | |
| ) | |
| # Analyze memory handling | |
| success_count = sum(1 for r in memory_results if r.get("success")) | |
| failure_count = len(memory_results) - success_count | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=success_count == len(memory_results), # All should succeed | |
| details={ | |
| "payload_sizes_tested": payload_sizes, | |
| "success_count": success_count, | |
| "failure_count": failure_count, | |
| "memory_results": memory_results, | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| class StressTester: | |
| """Test stress and endurance capabilities""" | |
| def __init__(self, proxy_url: str = "http://127.0.0.1:4201"): | |
| self.proxy_url = proxy_url | |
| def test_stress_endurance(self) -> TestResult: | |
| """Test stress endurance over time""" | |
| test_name = "stress_endurance" | |
| start_time = time.time() | |
| try: | |
| # Test sustained load over 60 seconds | |
| end_time = start_time + 60 # 60 seconds | |
| request_count = 0 | |
| success_count = 0 | |
| failure_count = 0 | |
| while time.time() < end_time: | |
| try: | |
| response = requests.get( | |
| f"{self.proxy_url}/proxy?url=https://httpbin.org/get", | |
| timeout=5, | |
| ) | |
| request_count += 1 | |
| if response.status_code == 200: | |
| success_count += 1 | |
| else: | |
| failure_count += 1 | |
| except Exception: | |
| request_count += 1 | |
| failure_count += 1 | |
| # Small delay to simulate real usage | |
| time.sleep(0.1) | |
| # Calculate metrics | |
| total_time = time.time() - start_time | |
| requests_per_second = request_count / total_time if total_time > 0 else 0 | |
| success_rate = ( | |
| (success_count / request_count * 100) if request_count > 0 else 0 | |
| ) | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, | |
| success=success_rate >= 70, # At least 70% success rate | |
| details={ | |
| "total_requests": request_count, | |
| "success_count": success_count, | |
| "failure_count": failure_count, | |
| "success_rate": success_rate, | |
| "total_time": total_time, | |
| "requests_per_second": requests_per_second, | |
| }, | |
| duration=duration, | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| return TestResult( | |
| test_name=test_name, success=False, error=str(e), duration=duration | |
| ) | |
| def test_configuration_features(self) -> List[TestResult]: | |
| """Test various configuration features""" | |
| results = [] | |
| # Test environment-based config | |
| start_time = time.time() | |
| try: | |
| # This is more of a deployment test | |
| # We can test that different endpoints work with different configs | |
| # Test that basic endpoints respond | |
| endpoints_to_test = ["/health", "/metrics", "/stats"] | |
| all_respond = True | |
| for endpoint in endpoints_to_test: | |
| response = requests.get(f"{self.proxy_url}{endpoint}", timeout=5) | |
| if response.status_code != 200: | |
| all_respond = False | |
| break | |
| duration = time.time() - start_time | |
| results.append( | |
| TestResult( | |
| test_name="configuration_basics", | |
| success=all_respond, | |
| details={ | |
| "endpoints_tested": endpoints_to_test, | |
| "all_respond": all_respond, | |
| }, | |
| duration=duration, | |
| ) | |
| ) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| results.append( | |
| TestResult( | |
| test_name="configuration_basics", | |
| success=False, | |
| error=str(e), | |
| duration=duration, | |
| ) | |
| ) | |
| # Note: Hot reload testing would require: | |
| # 1. Access to config file | |
| # 2. Ability to send SIGHUP or use admin endpoint | |
| # This is environment-dependent | |
| results.append( | |
| TestResult( | |
| test_name="hot_reload", | |
| success=False, | |
| details={ | |
| "note": "Requires external config modification and reload signal" | |
| }, | |
| error="Environment-dependent test", | |
| ) | |
| ) | |
| return results | |
| class ProxifyComprehensiveTester: | |
| """Comprehensive Proxify tester covering all features""" | |
| def __init__(self, base_url: str = "http://127.0.0.1:4201"): | |
| self.base_url = base_url | |
| self.test_session = TestSession(base_url) | |
| # Initialize all testers | |
| self.multi_protocol_tester = MultiProtocolTester() | |
| self.load_balancing_tester = LoadBalancingTester(base_url) | |
| self.security_tester = AdvancedSecurityTester(base_url) | |
| self.middleware_tester = MiddlewareTester(base_url) | |
| self.monitoring_tester = MonitoringTester(base_url) | |
| self.fingerprint_tester = FingerprintTester(base_url) | |
| self.config_tester = ConfigurationTester(base_url) | |
| # Add comprehensive test types | |
| self.concurrent_tester = ConcurrentTester(base_url) | |
| self.burst_tester = BurstTester(base_url) | |
| self.memory_tester = MemoryTester(base_url) | |
| self.stress_tester = StressTester(base_url) | |
| async def run_comprehensive_test(self) -> Dict[str, Any]: | |
| """Run comprehensive test covering all Proxify features""" | |
| with self.test_session as session: | |
| logger.info("=" * 80) | |
| logger.info("PROXIFY COMPREHENSIVE TEST SUITE") | |
| logger.info("=" * 80) | |
| # 1. Multi-Protocol Testing | |
| logger.info("\n1. Testing Multi-Protocol Support...") | |
| session.record_result( | |
| await self.multi_protocol_tester.test_websocket_protocol() | |
| ) | |
| session.record_result(self.multi_protocol_tester.test_tcp_stream_protocol()) | |
| session.record_result( | |
| self.multi_protocol_tester.test_udp_datagram_protocol() | |
| ) | |
| # 2. Load Balancing Testing | |
| logger.info("\n2. Testing Load Balancing...") | |
| algorithms = [ | |
| LoadBalanceAlgorithm.ROUND_ROBIN, | |
| LoadBalanceAlgorithm.LEAST_CONNECTIONS, | |
| LoadBalanceAlgorithm.SOURCE_IP, | |
| LoadBalanceAlgorithm.URI_HASH, | |
| LoadBalanceAlgorithm.HEADER_HASH, | |
| ] | |
| for algo in algorithms: | |
| result = self.load_balancing_tester.test_load_balancing_algorithm(algo) | |
| session.record_result(result) | |
| session.record_result(self.load_balancing_tester.test_session_stickiness()) | |
| # 3. Advanced Security Testing | |
| logger.info("\n3. Testing Advanced Security Features...") | |
| # Authentication | |
| auth_results = self.security_tester.test_authentication_methods() | |
| for result in auth_results: | |
| session.record_result(result) | |
| # Security headers and policies | |
| session.record_result(self.security_tester.test_cross_origin_policies()) | |
| session.record_result(self.security_tester.test_ip_filtering()) | |
| session.record_result(self.security_tester.test_request_validation()) | |
| session.record_result(self.security_tester.test_size_limits()) | |
| # 4. Middleware Testing | |
| logger.info("\n4. Testing Middleware...") | |
| session.record_result(self.middleware_tester.test_circuit_breaker()) | |
| session.record_result(self.middleware_tester.test_retry_logic()) | |
| session.record_result(self.middleware_tester.test_compression_middleware()) | |
| # 5. Monitoring & Metrics | |
| logger.info("\n5. Testing Monitoring & Metrics...") | |
| monitoring_results = self.monitoring_tester.test_all_monitoring_endpoints() | |
| for result in monitoring_results: | |
| session.record_result(result) | |
| session.record_result(self.monitoring_tester.test_metrics_collection()) | |
| # 6. Fingerprint Spoofing | |
| logger.info("\n6. Testing Fingerprint Spoofing...") | |
| session.record_result(self.fingerprint_tester.test_fingerprint_spoofing()) | |
| # 7. Configuration Features | |
| logger.info("\n7. Testing Configuration Features...") | |
| config_results = self.config_tester.test_configuration_features() | |
| for result in config_results: | |
| session.record_result(result) | |
| # Generate comprehensive report | |
| return self.generate_report() | |
| def generate_report(self) -> Dict[str, Any]: | |
| """Generate comprehensive test report""" | |
| summary = self.test_session.get_summary() | |
| # Categorize results | |
| categories = { | |
| "Multi-Protocol": [], | |
| "Load Balancing": [], | |
| "Security": [], | |
| "Middleware": [], | |
| "Monitoring": [], | |
| "Fingerprint": [], | |
| "Configuration": [], | |
| } | |
| # Map test names to categories | |
| for result in self.test_session.results: | |
| test_name = result.test_name.lower() | |
| if any( | |
| proto in test_name for proto in ["websocket", "tcp", "udp", "protocol"] | |
| ): | |
| categories["Multi-Protocol"].append(result) | |
| elif any( | |
| lb_term in test_name | |
| for lb_term in [ | |
| "load_balancing", | |
| "session_stickiness", | |
| "round_robin", | |
| "least_connections", | |
| ] | |
| ): | |
| categories["Load Balancing"].append(result) | |
| elif any( | |
| sec_term in test_name | |
| for sec_term in [ | |
| "authentication", | |
| "security", | |
| "cross_origin", | |
| "ip_filtering", | |
| "validation", | |
| "size_limits", | |
| ] | |
| ): | |
| categories["Security"].append(result) | |
| elif any( | |
| mw_term in test_name | |
| for mw_term in ["circuit", "retry", "compression", "middleware"] | |
| ): | |
| categories["Middleware"].append(result) | |
| elif any( | |
| mon_term in test_name | |
| for mon_term in ["metrics", "monitoring", "health", "stats", "logs"] | |
| ): | |
| categories["Monitoring"].append(result) | |
| elif "fingerprint" in test_name: | |
| categories["Fingerprint"].append(result) | |
| elif any( | |
| config_term in test_name | |
| for config_term in ["configuration", "hot_reload"] | |
| ): | |
| categories["Configuration"].append(result) | |
| else: | |
| # Default category | |
| categories["Other"].append(result) | |
| # Calculate category success rates | |
| category_stats = {} | |
| for category, results in categories.items(): | |
| if results: | |
| total = len(results) | |
| passed = sum(1 for r in results if r.success) | |
| category_stats[category] = { | |
| "total": total, | |
| "passed": passed, | |
| "success_rate": (passed / total * 100) if total > 0 else 0, | |
| "tests": [r.test_name for r in results], | |
| } | |
| # Features covered (from Proxify documentation) | |
| features_covered = [ | |
| "Multi-protocol proxy (HTTP/HTTPS, WebSocket, gRPC, TCP, UDP)", | |
| "Load balancing algorithms (Round Robin, Least Connections, Source IP, URI hash, Header hash)", | |
| "Session stickiness with cookies", | |
| "Health checks and automatic failover", | |
| "Basic and client certificate authentication", | |
| "Cross-origin security policies (CORP, COEP, COOP)", | |
| "IP filtering (whitelist/blacklist)", | |
| "Request validation and SSRF protection", | |
| "Rate limiting per client IP", | |
| "Request/response size limits", | |
| "Circuit breaker middleware", | |
| "Retry middleware with exponential backoff", | |
| "Compression middleware (GZIP)", | |
| "Comprehensive metrics collection", | |
| "HAProxy-like stats endpoint", | |
| "Fingerprint spoofing (JA4/JA4H)", | |
| "Configuration hot reload", | |
| "Structured logging (JSON/text)", | |
| ] | |
| report = { | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "proxify_version": "Production-Ready", | |
| "test_session": summary, | |
| "category_stats": category_stats, | |
| "features_covered": features_covered, | |
| "recommendations": self._generate_recommendations(category_stats), | |
| "detailed_results": [r.__dict__ for r in self.test_session.results], | |
| } | |
| # Save report | |
| self._save_report(report) | |
| return report | |
| def _generate_recommendations(self, category_stats: Dict) -> List[str]: | |
| """Generate recommendations based on test results""" | |
| recommendations = [] | |
| for category, stats in category_stats.items(): | |
| success_rate = stats["success_rate"] | |
| if success_rate < 50: | |
| recommendations.append( | |
| f"[WARNING] {category}: Critical issues detected ({success_rate:.1f}% success)" | |
| ) | |
| elif success_rate < 80: | |
| recommendations.append( | |
| f"[WARNING] {category}: Needs improvement ({success_rate:.1f}% success)" | |
| ) | |
| elif success_rate < 100: | |
| recommendations.append( | |
| f"[PASS] {category}: Good ({success_rate:.1f}% success)" | |
| ) | |
| else: | |
| recommendations.append( | |
| f"[EXCELLENT] {category}: Excellent (100% success)" | |
| ) | |
| return recommendations | |
| def _save_report(self, report: Dict[str, Any]): | |
| """Save report to file""" | |
| timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| filename = f"proxify_comprehensive_report_{timestamp}.json" | |
| # Convert report keys to strings for JSON serialization | |
| report_serializable = convert_keys_to_str(report) | |
| with open(filename, "w", encoding="utf-8") as f: | |
| json.dump(report_serializable, f, indent=2, default=str) | |
| # Also generate a markdown summary | |
| md_filename = f"proxify_comprehensive_report_{timestamp}.md" | |
| self._generate_markdown_report(report, md_filename) | |
| logger.info(f"Detailed report saved to: {filename}") | |
| logger.info(f"Markdown summary saved to: {md_filename}") | |
| def _generate_markdown_report(self, report: Dict[str, Any], filename: str): | |
| """Generate markdown report""" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| f.write("# Proxify Comprehensive Test Report\n\n") | |
| f.write(f"**Test Date:** {report['timestamp']}\n") | |
| f.write(f"**Proxify Version:** {report['proxify_version']}\n\n") | |
| # Summary | |
| session = report["test_session"] | |
| f.write("## Executive Summary\n\n") | |
| f.write(f"- **Total Tests:** {session['total_tests']}\n") | |
| f.write(f"- **Tests Passed:** {session['passed']}\n") | |
| f.write(f"- **Tests Failed:** {session['failed']}\n") | |
| f.write(f"- **Success Rate:** {session['success_rate']:.1f}%\n") | |
| f.write(f"- **Test Duration:** {session['duration']:.2f}s\n\n") | |
| # Category Breakdown | |
| f.write("## Category Breakdown\n\n") | |
| f.write("| Category | Tests | Passed | Success Rate |\n") | |
| f.write("|----------|-------|--------|--------------|\n") | |
| for category, stats in report["category_stats"].items(): | |
| f.write( | |
| f"| {category} | {stats['total']} | {stats['passed']} | {stats['success_rate']:.1f}% |\n" | |
| ) | |
| f.write("\n") | |
| # Features Covered | |
| f.write("## Features Tested\n\n") | |
| for feature in report["features_covered"]: | |
| f.write(f"- {feature}\n") | |
| f.write("\n") | |
| # Recommendations | |
| f.write("## Recommendations\n\n") | |
| for rec in report["recommendations"]: | |
| f.write(f"- {rec}\n") | |
| f.write("\n") | |
| # Detailed Results | |
| f.write("## Detailed Test Results\n\n") | |
| f.write("| Test Name | Status | Duration | Details |\n") | |
| f.write("|-----------|--------|----------|---------|\n") | |
| for result in report["detailed_results"]: | |
| status = "[PASS]" if result["success"] else "[FAIL]" | |
| duration = f"{result['duration']:.2f}s" | |
| # Truncate details for table | |
| details = str(result.get("details", {})) | |
| if len(details) > 100: | |
| details = details[:100] + "..." | |
| f.write( | |
| f"| {result['test_name']} | {status} | {duration} | {details} |\n" | |
| ) | |
| async def main(): | |
| """Main async function""" | |
| parser = argparse.ArgumentParser( | |
| description="Proxify Comprehensive Test Suite v3.0", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python proxify_test.py --all # Run all comprehensive tests | |
| python proxify_test.py --quick # Run quick functionality tests | |
| python proxify_test.py --security # Run security tests only | |
| python proxify_test.py --load-balancing # Run load balancing tests only | |
| python proxify_test.py --multi-protocol # Test multi-protocol support | |
| python proxify_test.py --url http://localhost:8080 # Test custom Proxify instance | |
| """, | |
| ) | |
| parser.add_argument( | |
| "--url", default="http://127.0.0.1:4201", help="Base URL of the Proxify server" | |
| ) | |
| parser.add_argument( | |
| "--all", action="store_true", help="Run comprehensive test suite (all features)" | |
| ) | |
| parser.add_argument( | |
| "--quick", action="store_true", help="Run quick functionality tests" | |
| ) | |
| parser.add_argument( | |
| "--security", action="store_true", help="Run security tests only" | |
| ) | |
| parser.add_argument( | |
| "--load-balancing", action="store_true", help="Run load balancing tests only" | |
| ) | |
| parser.add_argument( | |
| "--multi-protocol", action="store_true", help="Test multi-protocol support" | |
| ) | |
| parser.add_argument( | |
| "--monitoring", action="store_true", help="Test monitoring endpoints" | |
| ) | |
| parser.add_argument( | |
| "--middleware", action="store_true", help="Test middleware functionality" | |
| ) | |
| parser.add_argument( | |
| "--output", default="comprehensive", help="Output report filename prefix" | |
| ) | |
| args = parser.parse_args() | |
| try: | |
| tester = ProxifyComprehensiveTester(base_url=args.url) | |
| if args.all: | |
| logger.info("Starting comprehensive Proxify test suite...") | |
| report = await tester.run_comprehensive_test() | |
| # Print summary | |
| print("\n" + "=" * 80) | |
| print("PROXIFY TEST COMPLETE") | |
| print("=" * 80) | |
| session = report["test_session"] | |
| print(f"\nTotal Tests: {session['total_tests']}") | |
| print(f"Passed: {session['passed']}") | |
| print(f"Failed: {session['failed']}") | |
| print(f"Success Rate: {session['success_rate']:.1f}%") | |
| print(f"Duration: {session['duration']:.2f}s") | |
| print("\nRecommendations:") | |
| for rec in report["recommendations"]: | |
| print(f" {rec}") | |
| print("\nDetailed report saved to file.") | |
| # Exit code based on success | |
| if session["success_rate"] >= 80: | |
| sys.exit(0) | |
| elif session["success_rate"] >= 50: | |
| sys.exit(1) | |
| else: | |
| sys.exit(2) | |
| elif args.security: | |
| logger.info("Running security tests...") | |
| # Run security tests | |
| with TestSession(args.url) as session: | |
| security_tester = AdvancedSecurityTester(args.url) | |
| auth_results = security_tester.test_authentication_methods() | |
| for result in auth_results: | |
| session.record_result(result) | |
| session.record_result(security_tester.test_cross_origin_policies()) | |
| session.record_result(security_tester.test_ip_filtering()) | |
| session.record_result(security_tester.test_request_validation()) | |
| session.record_result(security_tester.test_size_limits()) | |
| summary = session.get_summary() | |
| print(json.dumps(summary, indent=2)) | |
| elif args.load_balancing: | |
| logger.info("Running load balancing tests...") | |
| with TestSession(args.url) as session: | |
| lb_tester = LoadBalancingTester(args.url) | |
| algorithms = [ | |
| LoadBalanceAlgorithm.ROUND_ROBIN, | |
| LoadBalanceAlgorithm.LEAST_CONNECTIONS, | |
| LoadBalanceAlgorithm.SOURCE_IP, | |
| ] | |
| for algo in algorithms: | |
| result = lb_tester.test_load_balancing_algorithm(algo) | |
| session.record_result(result) | |
| session.record_result(lb_tester.test_session_stickiness()) | |
| summary = session.get_summary() | |
| print(json.dumps(summary, indent=2)) | |
| elif args.multi_protocol: | |
| logger.info("Running multi-protocol tests...") | |
| with TestSession(args.url) as session: | |
| mp_tester = MultiProtocolTester() | |
| # Run async test | |
| result = await mp_tester.test_websocket_protocol() | |
| session.record_result(result) | |
| # Run sync tests | |
| session.record_result(mp_tester.test_tcp_stream_protocol()) | |
| session.record_result(mp_tester.test_udp_datagram_protocol()) | |
| summary = session.get_summary() | |
| print(json.dumps(summary, indent=2)) | |
| elif args.monitoring: | |
| logger.info("Running monitoring tests...") | |
| with TestSession(args.url) as session: | |
| monitoring_tester = MonitoringTester(args.url) | |
| results = monitoring_tester.test_all_monitoring_endpoints() | |
| for result in results: | |
| session.record_result(result) | |
| session.record_result(monitoring_tester.test_metrics_collection()) | |
| summary = session.get_summary() | |
| print(json.dumps(summary, indent=2)) | |
| elif args.middleware: | |
| logger.info("Running middleware tests...") | |
| with TestSession(args.url) as session: | |
| middleware_tester = MiddlewareTester(args.url) | |
| session.record_result(middleware_tester.test_circuit_breaker()) | |
| session.record_result(middleware_tester.test_retry_logic()) | |
| session.record_result(middleware_tester.test_compression_middleware()) | |
| summary = session.get_summary() | |
| print(json.dumps(summary, indent=2)) | |
| elif args.quick: | |
| logger.info("Running quick test...") | |
| # Basic functionality test | |
| with TestSession(args.url) as session: | |
| # Test basic endpoints | |
| try: | |
| response = requests.get(f"{args.url}/health", timeout=10) | |
| session.record_result( | |
| TestResult( | |
| test_name="health_endpoint", | |
| success=response.status_code == 200, | |
| details={"status": response.status_code}, | |
| duration=0.0, | |
| ) | |
| ) | |
| except Exception as e: | |
| session.record_result( | |
| TestResult( | |
| test_name="health_endpoint", | |
| success=False, | |
| error=str(e), | |
| duration=0.0, | |
| ) | |
| ) | |
| # Test basic proxy | |
| try: | |
| response = requests.get( | |
| f"{args.url}/proxy?url=https://httpbin.org/get", timeout=10 | |
| ) | |
| session.record_result( | |
| TestResult( | |
| test_name="basic_proxy", | |
| success=response.status_code == 200, | |
| details={"status": response.status_code}, | |
| duration=0.0, | |
| ) | |
| ) | |
| except Exception as e: | |
| session.record_result( | |
| TestResult( | |
| test_name="basic_proxy", | |
| success=False, | |
| error=str(e), | |
| duration=0.0, | |
| ) | |
| ) | |
| summary = session.get_summary() | |
| print(json.dumps(summary, indent=2)) | |
| else: | |
| # Default: show help | |
| parser.print_help() | |
| except KeyboardInterrupt: | |
| logger.info("Testing interrupted by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| logger.error(f"Testing failed with error: {e}") | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment