Last active
March 13, 2026 15:31
-
-
Save LenkaSeg/fd5fc87370e98b063ef33fdecd0f43b0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Watch a Kubernetes Secret for changes and trigger runner reconfiguration. | |
| Designed to run as a non-root systemd user service inside a KubeVirt VM. | |
| Uses the official Kubernetes Python client. | |
| Configuration is via environment variables (see defaults below). | |
| """ | |
| import base64 | |
| import logging | |
| import os | |
| import signal | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from kubernetes import client, watch | |
| LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() | |
| TOKEN_PATH = os.environ.get("TOKEN_PATH", "/mnt/sa-token/token") | |
| CA_CERT_PATH = os.environ.get("CA_CERT_PATH", "/etc/pki/tls/certs/ca-bundle.crt") | |
| NAMESPACE = os.environ.get("NAMESPACE", "forgejo") | |
| SECRET_NAME = os.environ.get("SECRET_NAME", "forgejo-runner-config") | |
| API_SERVER = os.environ.get("API_SERVER", "https://kubernetes.default.svc") | |
| OUTPUT_PATH = os.environ.get("OUTPUT_PATH", "/home/fedora/forgejo-runners-vars.yaml") | |
| ON_CHANGE_CMD = os.environ.get("ON_CHANGE_CMD", "") | |
| WATCH_TIMEOUT = int(os.environ.get("WATCH_TIMEOUT", "300")) | |
| RETRY_INTERVAL = int(os.environ.get("RETRY_INTERVAL", "5")) | |
| ON_CHANGE_TIMEOUT = int(os.environ.get("ON_CHANGE_TIMEOUT", "600")) | |
| logging.basicConfig( | |
| level=getattr(logging, LOG_LEVEL, logging.INFO), | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| stream=sys.stdout, | |
| ) | |
| log = logging.getLogger("config-watcher") | |
| shutdown_requested = False | |
| def handle_signal(signum, frame): | |
| global shutdown_requested | |
| log.info("Received signal %s, shutting down gracefully", signal.Signals(signum).name) | |
| shutdown_requested = True | |
| signal.signal(signal.SIGTERM, handle_signal) | |
| signal.signal(signal.SIGINT, handle_signal) | |
| def build_api_client(): | |
| """Build a Kubernetes API client using the mounted SA token.""" | |
| token = open(TOKEN_PATH).read().strip() | |
| configuration = client.Configuration() | |
| configuration.host = API_SERVER | |
| configuration.api_key = {"authorization": f"Bearer {token}"} | |
| configuration.ssl_ca_cert = CA_CERT_PATH if os.path.exists(CA_CERT_PATH) else None | |
| configuration.verify_ssl = configuration.ssl_ca_cert is not None | |
| return client.CoreV1Api(client.ApiClient(configuration)) | |
| def decode_secret_data(secret): | |
| """Decode base64-encoded Secret data fields.""" | |
| if not secret.data: | |
| return {} | |
| return {k: base64.b64decode(v).decode() for k, v in secret.data.items()} | |
| def write_config(data): | |
| """Write decoded Secret data to OUTPUT_PATH atomically.""" | |
| content = "\n".join(data.values()) | |
| output_dir = os.path.dirname(OUTPUT_PATH) | |
| fd, tmp_path = tempfile.mkstemp(dir=output_dir, prefix=".runner-config-") | |
| try: | |
| with os.fdopen(fd, "w") as f: | |
| f.write(content) | |
| os.rename(tmp_path, OUTPUT_PATH) | |
| log.info("Config written to %s (%d bytes)", OUTPUT_PATH, len(content)) | |
| except Exception: | |
| os.unlink(tmp_path) | |
| raise | |
| def run_on_change(): | |
| """Execute the on-change command if configured.""" | |
| if not ON_CHANGE_CMD: | |
| log.debug("No ON_CHANGE_CMD configured") | |
| return | |
| log.info("Executing: %s", ON_CHANGE_CMD) | |
| try: | |
| result = subprocess.run( | |
| ON_CHANGE_CMD, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| timeout=ON_CHANGE_TIMEOUT, | |
| ) | |
| if result.returncode == 0: | |
| log.info("On-change command succeeded") | |
| if result.stdout.strip(): | |
| log.debug("stdout: %s", result.stdout.strip()) | |
| else: | |
| log.error( | |
| "On-change command failed (exit %d)\nstderr: %s", | |
| result.returncode, | |
| result.stderr.strip(), | |
| ) | |
| except subprocess.TimeoutExpired: | |
| log.error("On-change command timed out after %ds", ON_CHANGE_TIMEOUT) | |
| except Exception as e: | |
| log.error("On-change command error: %s", e) | |
| def main(): | |
| log.info("Starting forgejo-runner config watcher") | |
| log.info("API server: %s", API_SERVER) | |
| log.info("Namespace: %s, Secret: %s", NAMESPACE, SECRET_NAME) | |
| log.info("Output: %s", OUTPUT_PATH) | |
| log.info("Watch timeout: %ds, Retry interval: %ds", WATCH_TIMEOUT, RETRY_INTERVAL) | |
| if ON_CHANGE_CMD: | |
| log.info("On-change command: %s", ON_CHANGE_CMD) | |
| v1 = build_api_client() | |
| log.info("Token loaded from %s", TOKEN_PATH) | |
| # Initial fetch | |
| secret = v1.read_namespaced_secret(SECRET_NAME, NAMESPACE) | |
| data = decode_secret_data(secret) | |
| log.info( | |
| "Initial fetch OK (resourceVersion=%s, keys=%s)", | |
| secret.metadata.resource_version, | |
| list(data.keys()), | |
| ) | |
| write_config(data) | |
| run_on_change() | |
| # Watch loop | |
| w = watch.Watch() | |
| while not shutdown_requested: | |
| try: | |
| log.debug("Opening watch") | |
| stream = w.stream( | |
| v1.list_namespaced_secret, | |
| NAMESPACE, | |
| field_selector=f"metadata.name={SECRET_NAME}", | |
| timeout_seconds=WATCH_TIMEOUT, | |
| ) | |
| for event in stream: | |
| if shutdown_requested: | |
| w.stop() | |
| break | |
| event_type = event["type"] | |
| secret = event["object"] | |
| rv = secret.metadata.resource_version | |
| if event_type == "MODIFIED": | |
| data = decode_secret_data(secret) | |
| log.info( | |
| "Secret MODIFIED (resourceVersion=%s, keys=%s)", | |
| rv, | |
| list(data.keys()), | |
| ) | |
| write_config(data) | |
| run_on_change() | |
| elif event_type == "DELETED": | |
| log.warning("Secret DELETED (resourceVersion=%s)", rv) | |
| elif event_type == "BOOKMARK": | |
| log.debug("Bookmark (resourceVersion=%s)", rv) | |
| elif event_type == "ADDED": | |
| log.debug("ADDED event (resourceVersion=%s)", rv) | |
| else: | |
| log.info("Event %s (resourceVersion=%s)", event_type, rv) | |
| log.debug("Watch stream ended, reconnecting") | |
| except client.exceptions.ApiException as e: | |
| if e.status == 401: | |
| log.error("HTTP 401: token may be invalid or revoked") | |
| v1 = build_api_client() | |
| log.info("Rebuilt API client with re-read token") | |
| else: | |
| log.error("API error (%d): %s", e.status, e.reason) | |
| time.sleep(RETRY_INTERVAL) | |
| except Exception as e: | |
| log.error("Watch error: %s", e) | |
| time.sleep(RETRY_INTERVAL) | |
| log.info("Watcher stopped") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment