Skip to content

Instantly share code, notes, and snippets.

@moosavimaleki
Created December 3, 2025 07:40
Show Gist options
  • Select an option

  • Save moosavimaleki/d9811ab9f5127d04dde305080a2e701e to your computer and use it in GitHub Desktop.

Select an option

Save moosavimaleki/d9811ab9f5127d04dde305080a2e701e to your computer and use it in GitHub Desktop.
port-forward
#!/home/h-mousavi/.pyenv/shims/python
import os
import signal
import socket
import subprocess
import sys
import threading
import time
from typing import List
import psutil
from colorama import Fore, Style, init
import socket
import struct
init(autoreset=True)
class ServiceType:
POSTGRES = "postgres"
REDIS = "redis"
CLICKHOUSE_NATIVE = "clickhouse_native"
CLICKHOUSE_HTTP = "clickhouse_http"
GENERIC = "generic"
services = [
{
"name": "RedisSaba",
"type": ServiceType.REDIS,
"port": 63793,
"command": "kubectl port-forward svc/prod-saba-product-redis 63793:6379 -n prod-saba",
},
{
"name": "ClickhouseAdwiser9001",
"type": ServiceType.CLICKHOUSE_NATIVE,
"port": 9001,
"command": "kubectl port-forward svc/clickhouse-adwiser --address 0.0.0.0 9001:9000 -n prod-adwiser",
},
{
"name": "ClickHouseAdwiser8127",
"type": ServiceType.CLICKHOUSE_HTTP,
"port": 8127,
"command": "kubectl port-forward svc/clickhouse-adwiser --address 0.0.0.0 8127:8123 -n prod-adwiser",
},
{
"name": "PostgresProdAdsApi",
"type": ServiceType.POSTGRES,
"port": 5435,
"command": "kubectl port-forward svc/prod-ads-psql-postgresql-ha 5435:5432 -n prod-ads-api",
},
{
"name": "RedisProdAdsApi",
"type": ServiceType.REDIS,
"port": 16379,
"command": "kubectl port-forward svc/prod-adwiser-redis 16379:6379 -n prod-ads-api",
},
{
"name": "PostgresProdSaba",
"type": ServiceType.POSTGRES,
"port": 5430,
"command": "kubectl port-forward svc/prod-saba-pg17-psql-postgresql-ha 5430:5432 -n prod-saba",
},
{
"name": "PostgresProdBusiness",
"type": ServiceType.POSTGRES,
"port": 5443,
"command": "kubectl port-forward svc/prod-business-psql-postgresql-ha 5443:5432 -n prod-business",
},
{
"name": "OLAP",
"type": ServiceType.CLICKHOUSE_HTTP,
"port": 8123,
"command": "kubectl port-forward svc/clickhouse-prod-olap-clickhouse 8123:8123 -n prod-olap",
},
{
"name": "OLAP-Native",
"type": ServiceType.CLICKHOUSE_NATIVE,
"port": 9000,
"command": "kubectl port-forward svc/clickhouse-prod-olap-clickhouse 9000:9000 -n prod-olap",
},
{
"name": "PostgresCore",
"type": ServiceType.POSTGRES,
"port": 5440,
"command": "kubectl port-forward svc/prod-core-psql-postgresql-ha 5440:5432 -n prod-core-api",
},
{
"name": "PostgresInsight",
"type": ServiceType.POSTGRES,
"port": 5441,
"command": "kubectl port-forward svc/prod-insight-psql-postgresql-ha 5441:5432 -n prod-insight-api",
},
{
"name": "RedisCreditService",
"type": ServiceType.REDIS,
"port": 6380,
"command": "kubectl port-forward svc/prod-credit-service-redis-haproxy 6380:6379 -n prod-credit-service",
},
{
"name": "PostgresOrder",
"type": ServiceType.POSTGRES,
"port": 5439,
"command": "kubectl port-forward svc/prod-order-psql-postgresql-ha 5439:5432 -n prod-platform-order",
},
{
"name": "PostgresAutomation",
"type": ServiceType.POSTGRES,
"port": 5444,
"command": "kubectl port-forward svc/prod-automation-psql-postgresql-ha 5444:5432 -n prod-automation",
},
]
def check_redis_ping(host: str, port: int, timeout: float = 1.0) -> bool:
"""Health-check Redis بدون نیاز به دانستن پسورد واقعی."""
try:
with socket.create_connection((host, port), timeout=timeout) as s:
# RESP: *1\r\n$4\r\nPING\r\n
s.sendall(b"*1\r\n$4\r\nPING\r\n")
data = s.recv(1024)
except OSError:
return False
if not data:
return False
# +PONG\r\n یا -NOAUTH Authentication required\r\n
return data.startswith(b"+PONG") or data.startswith(b"-NOAUTH")
def check_postgres_sslrequest(host: str, port: int, timeout: float = 1.0) -> bool:
"""
Health-check پروتکل Postgres با SSLRequest.
اگر جواب 'S' یا 'N' بگیریم یعنی Postgres زنده است.
"""
try:
with socket.create_connection((host, port), timeout=timeout) as s:
payload = struct.pack("!II", 8, 80877103)
s.sendall(payload)
resp = s.recv(1)
except OSError:
return False
return resp in (b"S", b"N")
def check_clickhouse_http(host: str, port: int = 8123, timeout: float = 1.0) -> bool:
"""
Health-check ClickHouse HTTP با GET /ping.
"""
return b"200 OK"
request = (
f"GET /ping HTTP/1.1\r\n"
f"Host: {host}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("ascii")
try:
with socket.create_connection((host, port), timeout=timeout) as s:
s.sendall(request)
response = s.recv(4096)
except OSError:
return False
if not response:
return False
# به 200 OK و متن Ok. در هر جای پاسخ بسنده می‌کنیم
return b"200 OK" in response and b"Ok." in response
class ServiceMonitor:
def __init__(
self,
name: str,
command: str,
port: int,
service_type: str = ServiceType.GENERIC,
host: str = "127.0.0.1",
):
self.name = name
self.command = command
self.port = port
self.service_type = service_type
self.host = host
self.process: subprocess.Popen = None # may be None at runtime
self.status = "STOP" # STOP, CONNECTING, OK, ERROR
self.lock = threading.Lock()
# === Low-level helpers ===
def _tcp_ping(self, timeout: float = 1.0) -> bool:
"""Simple telnet-style TCP check to the local port."""
try:
with socket.create_connection((self.host, self.port), timeout=timeout):
return True
except OSError:
return False
def _check_postgres_banner(self, timeout: float = 1.0) -> bool:
"""
Minimal Postgres probe without password.
SSLRequest می‌فرستد و از روی پاسخ 'S' یا 'N' تشخیص می‌دهد که پشت این پورت
احتمالاً Postgres است. نیازی به user/pass ندارد.
"""
import struct
try:
with socket.create_connection((self.host, self.port), timeout=timeout) as sock:
# SSLRequest: length=8, code=80877103
payload = struct.pack("!I", 8) + struct.pack("!I", 80877103)
sock.sendall(payload)
resp = sock.recv(1)
return resp in (b"S", b"N")
except OSError:
return False
def is_healthy(self) -> bool:
"""
Overall health check:
1) TCP-level connectivity (must pass).
2) Optional type-specific checks (برای الان فقط Postgres).
"""
# ---- PostgreSQL ----
if self.service_type == ServiceType.POSTGRES:
return check_postgres_sslrequest(self.host, self.port)
# ---- Redis ----
if self.service_type == ServiceType.REDIS:
return check_redis_ping(self.host, self.port)
# ---- ClickHouse HTTP (8123) ----
if self.service_type == ServiceType.CLICKHOUSE_HTTP:
return check_clickhouse_http(self.host, self.port)
if not self._tcp_ping():
return False
# ---- generic ----
return True
def free_port(self):
"""Kill any process currently bound to our local port."""
try:
for conn in psutil.net_connections(kind="inet"):
if conn.laddr and conn.laddr.port == self.port and conn.pid:
try:
proc = psutil.Process(conn.pid)
print(
Fore.RED
+ f"Killing process {proc.name()} (PID {proc.pid}) on port {self.port}"
+ Style.RESET_ALL
)
proc.terminate()
try:
proc.wait(timeout=5)
except psutil.TimeoutExpired:
proc.kill()
return
except psutil.NoSuchProcess:
print(
Fore.YELLOW
+ f"Process already terminated for port {self.port}"
+ Style.RESET_ALL
)
except Exception as e:
print(
Fore.RED
+ f"Error killing process on port {self.port}: {e}"
+ Style.RESET_ALL
)
return
# No owner found → سکوت، فقط پورت خالی بوده.
except Exception as e:
print(Fore.RED + f"Error while freeing port {self.port}: {e}" + Style.RESET_ALL)
# === Lifecycle ===
def start(self):
"""Start kubectl port-forward for this service."""
with self.lock:
if self.status in ("OK", "CONNECTING"):
return
self.status = "CONNECTING"
max_attempts = 5
attempt = 0
while attempt < max_attempts:
attempt += 1
# قبل از هر تلاش، اگر چیزی روی پورت هست، می‌کشیمش.
self.free_port()
try:
proc = subprocess.Popen(
self.command,
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except Exception as e:
print(Fore.RED + f"Exception starting {self.name}: {e}" + Style.RESET_ALL)
with self.lock:
self.status = "ERROR"
time.sleep(2)
continue
# کمی صبر تا اگر پروسس فوراً مرده، متوجه شویم.
time.sleep(2)
if proc.poll() is None:
# Process up؛ health-check در ترد مانیتور انجام می‌شود.
with self.lock:
self.process = proc
# هنوز CONNECTING می‌مانیم تا health-check OK شود.
print(
Fore.GREEN
+ f"{self.name}: port-forward process started (waiting for health-check)."
+ Style.RESET_ALL
)
return
else:
rc = proc.returncode
print(
Fore.RED
+ f"{self.name}: port-forward exited immediately with code {rc}. "
f"Retrying {attempt}/{max_attempts}..."
+ Style.RESET_ALL
)
with self.lock:
self.process = None
self.status = "ERROR"
time.sleep(2)
print(
Fore.RED
+ f"Failed to start {self.name} after {max_attempts} attempts."
+ Style.RESET_ALL
)
with self.lock:
self.status = "ERROR"
def stop(self):
"""Stop the kubectl process and free the port."""
with self.lock:
proc = self.process
if not proc:
with self.lock:
self.status = "STOP"
return
try:
proc.terminate()
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
print(
Fore.YELLOW
+ f"Force killing process for {self.name}"
+ Style.RESET_ALL
)
try:
os.kill(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
except Exception as e:
print(
Fore.RED + f"Error stopping process for {self.name}: {e}" + Style.RESET_ALL
)
finally:
with self.lock:
self.process = None
self.status = "STOP"
# بعد از توقف هم اگر هنوز کسی روی پورت گوش می‌دهد، خالی‌اش می‌کنیم.
self.free_port()
def check(self):
"""
Check current process + backend health and update status.
- اگر پروسس مرده باشد → ERROR
- اگر پروسس زنده و health-check اوکی باشد → OK
- اگر پروسس زنده ولی health-check fail شود → ERROR
"""
with self.lock:
proc = self.process
if proc is None or proc.poll() is not None:
with self.lock:
self.status = "ERROR"
return
# Process is alive; now check backend health.
if self.is_healthy():
with self.lock:
self.status = "OK"
else:
with self.lock:
self.status = "ERROR"
def restart(self):
"""Restart the service when needed. Called periodically by monitor thread."""
with self.lock:
status = self.status
if status in ("ERROR", "STOP"):
self.start()
def display_status(service_monitors: List[ServiceMonitor]):
"""Continuously print current status of all services."""
while True:
try:
lines = []
for m in service_monitors:
with m.lock:
status = m.status
svc_type = m.service_type
host = m.host
port = m.port
if status == "OK":
prefix = Fore.GREEN
elif status == "CONNECTING":
prefix = Fore.YELLOW
elif status == "ERROR":
prefix = Fore.RED
else: # STOP
prefix = Fore.BLUE
lines.append(
prefix
+ f"{m.name} [{svc_type}] on {host}:{port}: {status}"
+ Style.RESET_ALL
)
sys.stdout.write("\033[H\033[J") # clear screen
sys.stdout.write("\n".join(lines) + "\n")
sys.stdout.flush()
time.sleep(2)
except KeyboardInterrupt:
break
except Exception as e:
print(Fore.RED + f"Error in display_status: {e}" + Style.RESET_ALL)
time.sleep(2)
def monitor_service(service_monitor: ServiceMonitor):
"""Thread loop to periodically run restart() / health-check."""
while True:
try:
# restart() خودش داخلش check را صدا می‌زند.
service_monitor.check()
service_monitor.restart()
time.sleep(5)
except Exception as e:
print(
Fore.RED
+ f"Monitor thread error for {service_monitor.name}: {e}"
+ Style.RESET_ALL
)
time.sleep(5)
def stop_all_services(service_monitors: List[ServiceMonitor]):
"""Stop all services gracefully."""
for m in service_monitors:
try:
m.stop()
except Exception as e:
print(
Fore.RED + f"Error stopping {m.name}: {e}" + Style.RESET_ALL
)
def main():
service_monitors = [
ServiceMonitor(
s["name"],
s["command"],
s["port"],
s.get("type", ServiceType.GENERIC),
)
for s in services
]
def signal_handler(sig, frame):
print(Fore.YELLOW + "\nStopping all services..." + Style.RESET_ALL)
stop_all_services(service_monitors)
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
threads: List[threading.Thread] = []
for monitor in service_monitors:
t = threading.Thread(target=monitor_service, args=(monitor,), daemon=True)
t.start()
threads.append(t)
display_status(service_monitors)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment