Created
December 3, 2025 07:40
-
-
Save moosavimaleki/d9811ab9f5127d04dde305080a2e701e to your computer and use it in GitHub Desktop.
port-forward
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/home/h-mousavi/.pyenv/shims/python | |
| import os | |
| import signal | |
| import socket | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| from typing import List | |
| import psutil | |
| from colorama import Fore, Style, init | |
| import socket | |
| import struct | |
| init(autoreset=True) | |
| class ServiceType: | |
| POSTGRES = "postgres" | |
| REDIS = "redis" | |
| CLICKHOUSE_NATIVE = "clickhouse_native" | |
| CLICKHOUSE_HTTP = "clickhouse_http" | |
| GENERIC = "generic" | |
| services = [ | |
| { | |
| "name": "RedisSaba", | |
| "type": ServiceType.REDIS, | |
| "port": 63793, | |
| "command": "kubectl port-forward svc/prod-saba-product-redis 63793:6379 -n prod-saba", | |
| }, | |
| { | |
| "name": "ClickhouseAdwiser9001", | |
| "type": ServiceType.CLICKHOUSE_NATIVE, | |
| "port": 9001, | |
| "command": "kubectl port-forward svc/clickhouse-adwiser --address 0.0.0.0 9001:9000 -n prod-adwiser", | |
| }, | |
| { | |
| "name": "ClickHouseAdwiser8127", | |
| "type": ServiceType.CLICKHOUSE_HTTP, | |
| "port": 8127, | |
| "command": "kubectl port-forward svc/clickhouse-adwiser --address 0.0.0.0 8127:8123 -n prod-adwiser", | |
| }, | |
| { | |
| "name": "PostgresProdAdsApi", | |
| "type": ServiceType.POSTGRES, | |
| "port": 5435, | |
| "command": "kubectl port-forward svc/prod-ads-psql-postgresql-ha 5435:5432 -n prod-ads-api", | |
| }, | |
| { | |
| "name": "RedisProdAdsApi", | |
| "type": ServiceType.REDIS, | |
| "port": 16379, | |
| "command": "kubectl port-forward svc/prod-adwiser-redis 16379:6379 -n prod-ads-api", | |
| }, | |
| { | |
| "name": "PostgresProdSaba", | |
| "type": ServiceType.POSTGRES, | |
| "port": 5430, | |
| "command": "kubectl port-forward svc/prod-saba-pg17-psql-postgresql-ha 5430:5432 -n prod-saba", | |
| }, | |
| { | |
| "name": "PostgresProdBusiness", | |
| "type": ServiceType.POSTGRES, | |
| "port": 5443, | |
| "command": "kubectl port-forward svc/prod-business-psql-postgresql-ha 5443:5432 -n prod-business", | |
| }, | |
| { | |
| "name": "OLAP", | |
| "type": ServiceType.CLICKHOUSE_HTTP, | |
| "port": 8123, | |
| "command": "kubectl port-forward svc/clickhouse-prod-olap-clickhouse 8123:8123 -n prod-olap", | |
| }, | |
| { | |
| "name": "OLAP-Native", | |
| "type": ServiceType.CLICKHOUSE_NATIVE, | |
| "port": 9000, | |
| "command": "kubectl port-forward svc/clickhouse-prod-olap-clickhouse 9000:9000 -n prod-olap", | |
| }, | |
| { | |
| "name": "PostgresCore", | |
| "type": ServiceType.POSTGRES, | |
| "port": 5440, | |
| "command": "kubectl port-forward svc/prod-core-psql-postgresql-ha 5440:5432 -n prod-core-api", | |
| }, | |
| { | |
| "name": "PostgresInsight", | |
| "type": ServiceType.POSTGRES, | |
| "port": 5441, | |
| "command": "kubectl port-forward svc/prod-insight-psql-postgresql-ha 5441:5432 -n prod-insight-api", | |
| }, | |
| { | |
| "name": "RedisCreditService", | |
| "type": ServiceType.REDIS, | |
| "port": 6380, | |
| "command": "kubectl port-forward svc/prod-credit-service-redis-haproxy 6380:6379 -n prod-credit-service", | |
| }, | |
| { | |
| "name": "PostgresOrder", | |
| "type": ServiceType.POSTGRES, | |
| "port": 5439, | |
| "command": "kubectl port-forward svc/prod-order-psql-postgresql-ha 5439:5432 -n prod-platform-order", | |
| }, | |
| { | |
| "name": "PostgresAutomation", | |
| "type": ServiceType.POSTGRES, | |
| "port": 5444, | |
| "command": "kubectl port-forward svc/prod-automation-psql-postgresql-ha 5444:5432 -n prod-automation", | |
| }, | |
| ] | |
| def check_redis_ping(host: str, port: int, timeout: float = 1.0) -> bool: | |
| """Health-check Redis بدون نیاز به دانستن پسورد واقعی.""" | |
| try: | |
| with socket.create_connection((host, port), timeout=timeout) as s: | |
| # RESP: *1\r\n$4\r\nPING\r\n | |
| s.sendall(b"*1\r\n$4\r\nPING\r\n") | |
| data = s.recv(1024) | |
| except OSError: | |
| return False | |
| if not data: | |
| return False | |
| # +PONG\r\n یا -NOAUTH Authentication required\r\n | |
| return data.startswith(b"+PONG") or data.startswith(b"-NOAUTH") | |
| def check_postgres_sslrequest(host: str, port: int, timeout: float = 1.0) -> bool: | |
| """ | |
| Health-check پروتکل Postgres با SSLRequest. | |
| اگر جواب 'S' یا 'N' بگیریم یعنی Postgres زنده است. | |
| """ | |
| try: | |
| with socket.create_connection((host, port), timeout=timeout) as s: | |
| payload = struct.pack("!II", 8, 80877103) | |
| s.sendall(payload) | |
| resp = s.recv(1) | |
| except OSError: | |
| return False | |
| return resp in (b"S", b"N") | |
| def check_clickhouse_http(host: str, port: int = 8123, timeout: float = 1.0) -> bool: | |
| """ | |
| Health-check ClickHouse HTTP با GET /ping. | |
| """ | |
| return b"200 OK" | |
| request = ( | |
| f"GET /ping HTTP/1.1\r\n" | |
| f"Host: {host}\r\n" | |
| f"Connection: close\r\n" | |
| f"\r\n" | |
| ).encode("ascii") | |
| try: | |
| with socket.create_connection((host, port), timeout=timeout) as s: | |
| s.sendall(request) | |
| response = s.recv(4096) | |
| except OSError: | |
| return False | |
| if not response: | |
| return False | |
| # به 200 OK و متن Ok. در هر جای پاسخ بسنده میکنیم | |
| return b"200 OK" in response and b"Ok." in response | |
| class ServiceMonitor: | |
| def __init__( | |
| self, | |
| name: str, | |
| command: str, | |
| port: int, | |
| service_type: str = ServiceType.GENERIC, | |
| host: str = "127.0.0.1", | |
| ): | |
| self.name = name | |
| self.command = command | |
| self.port = port | |
| self.service_type = service_type | |
| self.host = host | |
| self.process: subprocess.Popen = None # may be None at runtime | |
| self.status = "STOP" # STOP, CONNECTING, OK, ERROR | |
| self.lock = threading.Lock() | |
| # === Low-level helpers === | |
| def _tcp_ping(self, timeout: float = 1.0) -> bool: | |
| """Simple telnet-style TCP check to the local port.""" | |
| try: | |
| with socket.create_connection((self.host, self.port), timeout=timeout): | |
| return True | |
| except OSError: | |
| return False | |
| def _check_postgres_banner(self, timeout: float = 1.0) -> bool: | |
| """ | |
| Minimal Postgres probe without password. | |
| SSLRequest میفرستد و از روی پاسخ 'S' یا 'N' تشخیص میدهد که پشت این پورت | |
| احتمالاً Postgres است. نیازی به user/pass ندارد. | |
| """ | |
| import struct | |
| try: | |
| with socket.create_connection((self.host, self.port), timeout=timeout) as sock: | |
| # SSLRequest: length=8, code=80877103 | |
| payload = struct.pack("!I", 8) + struct.pack("!I", 80877103) | |
| sock.sendall(payload) | |
| resp = sock.recv(1) | |
| return resp in (b"S", b"N") | |
| except OSError: | |
| return False | |
| def is_healthy(self) -> bool: | |
| """ | |
| Overall health check: | |
| 1) TCP-level connectivity (must pass). | |
| 2) Optional type-specific checks (برای الان فقط Postgres). | |
| """ | |
| # ---- PostgreSQL ---- | |
| if self.service_type == ServiceType.POSTGRES: | |
| return check_postgres_sslrequest(self.host, self.port) | |
| # ---- Redis ---- | |
| if self.service_type == ServiceType.REDIS: | |
| return check_redis_ping(self.host, self.port) | |
| # ---- ClickHouse HTTP (8123) ---- | |
| if self.service_type == ServiceType.CLICKHOUSE_HTTP: | |
| return check_clickhouse_http(self.host, self.port) | |
| if not self._tcp_ping(): | |
| return False | |
| # ---- generic ---- | |
| return True | |
| def free_port(self): | |
| """Kill any process currently bound to our local port.""" | |
| try: | |
| for conn in psutil.net_connections(kind="inet"): | |
| if conn.laddr and conn.laddr.port == self.port and conn.pid: | |
| try: | |
| proc = psutil.Process(conn.pid) | |
| print( | |
| Fore.RED | |
| + f"Killing process {proc.name()} (PID {proc.pid}) on port {self.port}" | |
| + Style.RESET_ALL | |
| ) | |
| proc.terminate() | |
| try: | |
| proc.wait(timeout=5) | |
| except psutil.TimeoutExpired: | |
| proc.kill() | |
| return | |
| except psutil.NoSuchProcess: | |
| print( | |
| Fore.YELLOW | |
| + f"Process already terminated for port {self.port}" | |
| + Style.RESET_ALL | |
| ) | |
| except Exception as e: | |
| print( | |
| Fore.RED | |
| + f"Error killing process on port {self.port}: {e}" | |
| + Style.RESET_ALL | |
| ) | |
| return | |
| # No owner found → سکوت، فقط پورت خالی بوده. | |
| except Exception as e: | |
| print(Fore.RED + f"Error while freeing port {self.port}: {e}" + Style.RESET_ALL) | |
| # === Lifecycle === | |
| def start(self): | |
| """Start kubectl port-forward for this service.""" | |
| with self.lock: | |
| if self.status in ("OK", "CONNECTING"): | |
| return | |
| self.status = "CONNECTING" | |
| max_attempts = 5 | |
| attempt = 0 | |
| while attempt < max_attempts: | |
| attempt += 1 | |
| # قبل از هر تلاش، اگر چیزی روی پورت هست، میکشیمش. | |
| self.free_port() | |
| try: | |
| proc = subprocess.Popen( | |
| self.command, | |
| shell=True, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| ) | |
| except Exception as e: | |
| print(Fore.RED + f"Exception starting {self.name}: {e}" + Style.RESET_ALL) | |
| with self.lock: | |
| self.status = "ERROR" | |
| time.sleep(2) | |
| continue | |
| # کمی صبر تا اگر پروسس فوراً مرده، متوجه شویم. | |
| time.sleep(2) | |
| if proc.poll() is None: | |
| # Process up؛ health-check در ترد مانیتور انجام میشود. | |
| with self.lock: | |
| self.process = proc | |
| # هنوز CONNECTING میمانیم تا health-check OK شود. | |
| print( | |
| Fore.GREEN | |
| + f"{self.name}: port-forward process started (waiting for health-check)." | |
| + Style.RESET_ALL | |
| ) | |
| return | |
| else: | |
| rc = proc.returncode | |
| print( | |
| Fore.RED | |
| + f"{self.name}: port-forward exited immediately with code {rc}. " | |
| f"Retrying {attempt}/{max_attempts}..." | |
| + Style.RESET_ALL | |
| ) | |
| with self.lock: | |
| self.process = None | |
| self.status = "ERROR" | |
| time.sleep(2) | |
| print( | |
| Fore.RED | |
| + f"Failed to start {self.name} after {max_attempts} attempts." | |
| + Style.RESET_ALL | |
| ) | |
| with self.lock: | |
| self.status = "ERROR" | |
| def stop(self): | |
| """Stop the kubectl process and free the port.""" | |
| with self.lock: | |
| proc = self.process | |
| if not proc: | |
| with self.lock: | |
| self.status = "STOP" | |
| return | |
| try: | |
| proc.terminate() | |
| proc.wait(timeout=3) | |
| except subprocess.TimeoutExpired: | |
| print( | |
| Fore.YELLOW | |
| + f"Force killing process for {self.name}" | |
| + Style.RESET_ALL | |
| ) | |
| try: | |
| os.kill(proc.pid, signal.SIGKILL) | |
| except ProcessLookupError: | |
| pass | |
| except Exception as e: | |
| print( | |
| Fore.RED + f"Error stopping process for {self.name}: {e}" + Style.RESET_ALL | |
| ) | |
| finally: | |
| with self.lock: | |
| self.process = None | |
| self.status = "STOP" | |
| # بعد از توقف هم اگر هنوز کسی روی پورت گوش میدهد، خالیاش میکنیم. | |
| self.free_port() | |
| def check(self): | |
| """ | |
| Check current process + backend health and update status. | |
| - اگر پروسس مرده باشد → ERROR | |
| - اگر پروسس زنده و health-check اوکی باشد → OK | |
| - اگر پروسس زنده ولی health-check fail شود → ERROR | |
| """ | |
| with self.lock: | |
| proc = self.process | |
| if proc is None or proc.poll() is not None: | |
| with self.lock: | |
| self.status = "ERROR" | |
| return | |
| # Process is alive; now check backend health. | |
| if self.is_healthy(): | |
| with self.lock: | |
| self.status = "OK" | |
| else: | |
| with self.lock: | |
| self.status = "ERROR" | |
| def restart(self): | |
| """Restart the service when needed. Called periodically by monitor thread.""" | |
| with self.lock: | |
| status = self.status | |
| if status in ("ERROR", "STOP"): | |
| self.start() | |
| def display_status(service_monitors: List[ServiceMonitor]): | |
| """Continuously print current status of all services.""" | |
| while True: | |
| try: | |
| lines = [] | |
| for m in service_monitors: | |
| with m.lock: | |
| status = m.status | |
| svc_type = m.service_type | |
| host = m.host | |
| port = m.port | |
| if status == "OK": | |
| prefix = Fore.GREEN | |
| elif status == "CONNECTING": | |
| prefix = Fore.YELLOW | |
| elif status == "ERROR": | |
| prefix = Fore.RED | |
| else: # STOP | |
| prefix = Fore.BLUE | |
| lines.append( | |
| prefix | |
| + f"{m.name} [{svc_type}] on {host}:{port}: {status}" | |
| + Style.RESET_ALL | |
| ) | |
| sys.stdout.write("\033[H\033[J") # clear screen | |
| sys.stdout.write("\n".join(lines) + "\n") | |
| sys.stdout.flush() | |
| time.sleep(2) | |
| except KeyboardInterrupt: | |
| break | |
| except Exception as e: | |
| print(Fore.RED + f"Error in display_status: {e}" + Style.RESET_ALL) | |
| time.sleep(2) | |
| def monitor_service(service_monitor: ServiceMonitor): | |
| """Thread loop to periodically run restart() / health-check.""" | |
| while True: | |
| try: | |
| # restart() خودش داخلش check را صدا میزند. | |
| service_monitor.check() | |
| service_monitor.restart() | |
| time.sleep(5) | |
| except Exception as e: | |
| print( | |
| Fore.RED | |
| + f"Monitor thread error for {service_monitor.name}: {e}" | |
| + Style.RESET_ALL | |
| ) | |
| time.sleep(5) | |
| def stop_all_services(service_monitors: List[ServiceMonitor]): | |
| """Stop all services gracefully.""" | |
| for m in service_monitors: | |
| try: | |
| m.stop() | |
| except Exception as e: | |
| print( | |
| Fore.RED + f"Error stopping {m.name}: {e}" + Style.RESET_ALL | |
| ) | |
| def main(): | |
| service_monitors = [ | |
| ServiceMonitor( | |
| s["name"], | |
| s["command"], | |
| s["port"], | |
| s.get("type", ServiceType.GENERIC), | |
| ) | |
| for s in services | |
| ] | |
| def signal_handler(sig, frame): | |
| print(Fore.YELLOW + "\nStopping all services..." + Style.RESET_ALL) | |
| stop_all_services(service_monitors) | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, signal_handler) | |
| threads: List[threading.Thread] = [] | |
| for monitor in service_monitors: | |
| t = threading.Thread(target=monitor_service, args=(monitor,), daemon=True) | |
| t.start() | |
| threads.append(t) | |
| display_status(service_monitors) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment