Skip to content

Instantly share code, notes, and snippets.

@rlizzo
Last active July 15, 2025 00:46
Show Gist options
  • Select an option

  • Save rlizzo/faea1546d11c701d2bd7e004acafaf06 to your computer and use it in GitHub Desktop.

Select an option

Save rlizzo/faea1546d11c701d2bd7e004acafaf06 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This script runs indefinitely to verify log ingestion by a Lepton AI job.
On startup, it installs the Lepton client and authenticates. In each
iteration, it prints a unique log and then uses the `lep` CLI to poll the
job's logs, ensuring the unique log appears within a timeout.
It automatically removes the random suffix from the LEPTON_JOB_NAME.
Required environment variables:
- RIZZO_INVESTIGATION_TOKEN: The Lepton AI token for `lep login`.
- LEPTON_JOB_NAME: The full name of the job (e.g., my-job-name-xxxxx).
"""
import os
import sys
import time
import uuid
import subprocess
# --- ⚙️ CONFIGURATION (from environment) ⚙️ ---
RIZZO_INVESTIGATION_TOKEN = os.environ.get("RIZZO_INVESTIGATION_TOKEN")
# Get the full job name from the environment variable.
RAW_LEPTON_JOB_NAME = os.environ.get("LEPTON_JOB_NAME")
# Process the name to remove the random suffix after the last hyphen.
LEPTON_JOB_NAME_BASE = ""
if RAW_LEPTON_JOB_NAME:
LEPTON_JOB_NAME_BASE = RAW_LEPTON_JOB_NAME.rsplit('-', 1)[0]
# --- Script Parameters ---
VERIFICATION_TIMEOUT_SECONDS = 60
POLL_INTERVAL_SECONDS = 5
DELAY_BETWEEN_ITERATIONS_SECONDS = 2
def run_startup_command(command_str: str, description: str):
"""Executes a shell command string, handling environment variables."""
print(f"\n▶️ {description}...")
print(f" Running command: {command_str}")
try:
# Using shell=True to allow for environment variable expansion like $VAR
result = subprocess.run(
command_str,
shell=True,
capture_output=True,
text=True,
check=True, # Throws CalledProcessError on non-zero exit codes
env=os.environ # Pass environment to the subprocess
)
print("✅ Success.")
# Print stdout/stderr for transparency, useful for debugging
if result.stdout:
print("--- stdout ---\n" + result.stdout.strip())
if result.stderr:
print("--- stderr ---\n" + result.stderr.strip())
except subprocess.CalledProcessError as e:
print(f"🔥 FATAL: Command failed with exit code {e.returncode}", file=sys.stderr)
print("--- stdout ---", file=sys.stderr)
print(e.stdout, file=sys.stderr)
print("--- stderr ---", file=sys.stderr)
print(e.stderr, file=sys.stderr)
sys.exit(1)
except FileNotFoundError:
print(f"🔥 FATAL: Command not found. Is '{command_str.split()[0]}' installed and in your PATH?", file=sys.stderr)
sys.exit(1)
def initial_setup():
"""Installs the leptonai library and logs in."""
print(f"{'='*15} 🚀 Performing Initial Setup {'='*15}")
run_startup_command("pip install -U leptonai", "Installing/Updating LeptonAI client")
run_startup_command("lep login -c $RIZZO_INVESTIGATION_TOKEN", "Logging into Lepton AI")
print(f"{'='*18} ✅ Setup Complete {'='*18}")
def run_verification() -> bool:
"""
Generates a unique log, prints it, and verifies its existence in the job logs.
Returns True if found within the timeout, False otherwise.
"""
unique_id = uuid.uuid4()
log_message = f"Lepton log verification. Unique ID: {unique_id}"
# 1. Print the unique log to stdout. This is what we expect to find.
print(log_message)
sys.stdout.flush()
# 2. Prepare the command to fetch logs using the processed base job name.
log_command = ["lep", "log", "get", "--job-name", LEPTON_JOB_NAME_BASE]
print(f"\n🔎 Will poll job logs using: {' '.join(log_command)}")
print(f"⏳ Waiting up to {VERIFICATION_TIMEOUT_SECONDS} seconds for the log to appear...")
start_time = time.time()
end_time = start_time + VERIFICATION_TIMEOUT_SECONDS
# 3. Poll the logs until the message is found or timeout is reached
while time.time() < end_time:
try:
result = subprocess.run(
log_command,
capture_output=True,
text=True,
check=True
)
job_logs = result.stdout
if log_message in job_logs:
print("\n✅ Log found in job output.")
return True
except subprocess.CalledProcessError as e:
print(f"\n⚠️ Warning: `lep log get` failed with exit code {e.returncode}. Will retry.", file=sys.stderr)
print(e.stderr, file=sys.stderr)
print(".", end="", flush=True)
time.sleep(POLL_INTERVAL_SECONDS)
return False
if __name__ == "__main__":
# --- Pre-run Checks for environment variables ---
if not RAW_LEPTON_JOB_NAME:
print("🔥 FATAL: LEPTON_JOB_NAME environment variable must be set.", file=sys.stderr)
sys.exit(1)
if not RIZZO_INVESTIGATION_TOKEN:
print("🔥 FATAL: RIZZO_INVESTIGATION_TOKEN environment variable must be set for authentication.", file=sys.stderr)
sys.exit(1)
print(f"ℹ️ Original job name: {RAW_LEPTON_JOB_NAME}")
print(f"ℹ️ Using base job name for logs: {LEPTON_JOB_NAME_BASE}")
initial_setup()
# --- Main Loop ---
iteration_count = 0
while True:
iteration_count += 1
print(f"\n{'='*20} Iteration: {iteration_count} {'='*20}")
was_found = run_verification()
if was_found:
print(f"✅ Iteration {iteration_count} SUCCEEDED.")
time.sleep(DELAY_BETWEEN_ITERATIONS_SECONDS)
else:
print(
f"\n🔥 FAILURE on iteration {iteration_count}: The unique log was not found within {VERIFICATION_TIMEOUT_SECONDS} seconds.",
file=sys.stderr,
)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment