Skip to content

Instantly share code, notes, and snippets.

@LenkaSeg
Last active March 13, 2026 15:31
Show Gist options
  • Select an option

  • Save LenkaSeg/fd5fc87370e98b063ef33fdecd0f43b0 to your computer and use it in GitHub Desktop.

Select an option

Save LenkaSeg/fd5fc87370e98b063ef33fdecd0f43b0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""Watch a Kubernetes Secret for changes and trigger runner reconfiguration.
Designed to run as a non-root systemd user service inside a KubeVirt VM.
Uses the official Kubernetes Python client.
Configuration is via environment variables (see defaults below).
"""
import base64
import logging
import os
import signal
import subprocess
import sys
import tempfile
import time
from kubernetes import client, watch
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
TOKEN_PATH = os.environ.get("TOKEN_PATH", "/mnt/sa-token/token")
CA_CERT_PATH = os.environ.get("CA_CERT_PATH", "/etc/pki/tls/certs/ca-bundle.crt")
NAMESPACE = os.environ.get("NAMESPACE", "forgejo")
SECRET_NAME = os.environ.get("SECRET_NAME", "forgejo-runner-config")
API_SERVER = os.environ.get("API_SERVER", "https://kubernetes.default.svc")
OUTPUT_PATH = os.environ.get("OUTPUT_PATH", "/home/fedora/forgejo-runners-vars.yaml")
ON_CHANGE_CMD = os.environ.get("ON_CHANGE_CMD", "")
WATCH_TIMEOUT = int(os.environ.get("WATCH_TIMEOUT", "300"))
RETRY_INTERVAL = int(os.environ.get("RETRY_INTERVAL", "5"))
ON_CHANGE_TIMEOUT = int(os.environ.get("ON_CHANGE_TIMEOUT", "600"))
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stdout,
)
log = logging.getLogger("config-watcher")
shutdown_requested = False
def handle_signal(signum, frame):
global shutdown_requested
log.info("Received signal %s, shutting down gracefully", signal.Signals(signum).name)
shutdown_requested = True
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
def build_api_client():
"""Build a Kubernetes API client using the mounted SA token."""
token = open(TOKEN_PATH).read().strip()
configuration = client.Configuration()
configuration.host = API_SERVER
configuration.api_key = {"authorization": f"Bearer {token}"}
configuration.ssl_ca_cert = CA_CERT_PATH if os.path.exists(CA_CERT_PATH) else None
configuration.verify_ssl = configuration.ssl_ca_cert is not None
return client.CoreV1Api(client.ApiClient(configuration))
def decode_secret_data(secret):
"""Decode base64-encoded Secret data fields."""
if not secret.data:
return {}
return {k: base64.b64decode(v).decode() for k, v in secret.data.items()}
def write_config(data):
"""Write decoded Secret data to OUTPUT_PATH atomically."""
content = "\n".join(data.values())
output_dir = os.path.dirname(OUTPUT_PATH)
fd, tmp_path = tempfile.mkstemp(dir=output_dir, prefix=".runner-config-")
try:
with os.fdopen(fd, "w") as f:
f.write(content)
os.rename(tmp_path, OUTPUT_PATH)
log.info("Config written to %s (%d bytes)", OUTPUT_PATH, len(content))
except Exception:
os.unlink(tmp_path)
raise
def run_on_change():
"""Execute the on-change command if configured."""
if not ON_CHANGE_CMD:
log.debug("No ON_CHANGE_CMD configured")
return
log.info("Executing: %s", ON_CHANGE_CMD)
try:
result = subprocess.run(
ON_CHANGE_CMD,
shell=True,
capture_output=True,
text=True,
timeout=ON_CHANGE_TIMEOUT,
)
if result.returncode == 0:
log.info("On-change command succeeded")
if result.stdout.strip():
log.debug("stdout: %s", result.stdout.strip())
else:
log.error(
"On-change command failed (exit %d)\nstderr: %s",
result.returncode,
result.stderr.strip(),
)
except subprocess.TimeoutExpired:
log.error("On-change command timed out after %ds", ON_CHANGE_TIMEOUT)
except Exception as e:
log.error("On-change command error: %s", e)
def main():
log.info("Starting forgejo-runner config watcher")
log.info("API server: %s", API_SERVER)
log.info("Namespace: %s, Secret: %s", NAMESPACE, SECRET_NAME)
log.info("Output: %s", OUTPUT_PATH)
log.info("Watch timeout: %ds, Retry interval: %ds", WATCH_TIMEOUT, RETRY_INTERVAL)
if ON_CHANGE_CMD:
log.info("On-change command: %s", ON_CHANGE_CMD)
v1 = build_api_client()
log.info("Token loaded from %s", TOKEN_PATH)
# Initial fetch
secret = v1.read_namespaced_secret(SECRET_NAME, NAMESPACE)
data = decode_secret_data(secret)
log.info(
"Initial fetch OK (resourceVersion=%s, keys=%s)",
secret.metadata.resource_version,
list(data.keys()),
)
write_config(data)
run_on_change()
# Watch loop
w = watch.Watch()
while not shutdown_requested:
try:
log.debug("Opening watch")
stream = w.stream(
v1.list_namespaced_secret,
NAMESPACE,
field_selector=f"metadata.name={SECRET_NAME}",
timeout_seconds=WATCH_TIMEOUT,
)
for event in stream:
if shutdown_requested:
w.stop()
break
event_type = event["type"]
secret = event["object"]
rv = secret.metadata.resource_version
if event_type == "MODIFIED":
data = decode_secret_data(secret)
log.info(
"Secret MODIFIED (resourceVersion=%s, keys=%s)",
rv,
list(data.keys()),
)
write_config(data)
run_on_change()
elif event_type == "DELETED":
log.warning("Secret DELETED (resourceVersion=%s)", rv)
elif event_type == "BOOKMARK":
log.debug("Bookmark (resourceVersion=%s)", rv)
elif event_type == "ADDED":
log.debug("ADDED event (resourceVersion=%s)", rv)
else:
log.info("Event %s (resourceVersion=%s)", event_type, rv)
log.debug("Watch stream ended, reconnecting")
except client.exceptions.ApiException as e:
if e.status == 401:
log.error("HTTP 401: token may be invalid or revoked")
v1 = build_api_client()
log.info("Rebuilt API client with re-read token")
else:
log.error("API error (%d): %s", e.status, e.reason)
time.sleep(RETRY_INTERVAL)
except Exception as e:
log.error("Watch error: %s", e)
time.sleep(RETRY_INTERVAL)
log.info("Watcher stopped")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment