Last active
July 15, 2025 00:46
-
-
Save rlizzo/faea1546d11c701d2bd7e004acafaf06 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| This script runs indefinitely to verify log ingestion by a Lepton AI job. | |
| On startup, it installs the Lepton client and authenticates. In each | |
| iteration, it prints a unique log and then uses the `lep` CLI to poll the | |
| job's logs, ensuring the unique log appears within a timeout. | |
| It automatically removes the random suffix from the LEPTON_JOB_NAME. | |
| Required environment variables: | |
| - RIZZO_INVESTIGATION_TOKEN: The Lepton AI token for `lep login`. | |
| - LEPTON_JOB_NAME: The full name of the job (e.g., my-job-name-xxxxx). | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import uuid | |
| import subprocess | |
| # --- ⚙️ CONFIGURATION (from environment) ⚙️ --- | |
| RIZZO_INVESTIGATION_TOKEN = os.environ.get("RIZZO_INVESTIGATION_TOKEN") | |
| # Get the full job name from the environment variable. | |
| RAW_LEPTON_JOB_NAME = os.environ.get("LEPTON_JOB_NAME") | |
| # Process the name to remove the random suffix after the last hyphen. | |
| LEPTON_JOB_NAME_BASE = "" | |
| if RAW_LEPTON_JOB_NAME: | |
| LEPTON_JOB_NAME_BASE = RAW_LEPTON_JOB_NAME.rsplit('-', 1)[0] | |
| # --- Script Parameters --- | |
| VERIFICATION_TIMEOUT_SECONDS = 60 | |
| POLL_INTERVAL_SECONDS = 5 | |
| DELAY_BETWEEN_ITERATIONS_SECONDS = 2 | |
| def run_startup_command(command_str: str, description: str): | |
| """Executes a shell command string, handling environment variables.""" | |
| print(f"\n▶️ {description}...") | |
| print(f" Running command: {command_str}") | |
| try: | |
| # Using shell=True to allow for environment variable expansion like $VAR | |
| result = subprocess.run( | |
| command_str, | |
| shell=True, | |
| capture_output=True, | |
| text=True, | |
| check=True, # Throws CalledProcessError on non-zero exit codes | |
| env=os.environ # Pass environment to the subprocess | |
| ) | |
| print("✅ Success.") | |
| # Print stdout/stderr for transparency, useful for debugging | |
| if result.stdout: | |
| print("--- stdout ---\n" + result.stdout.strip()) | |
| if result.stderr: | |
| print("--- stderr ---\n" + result.stderr.strip()) | |
| except subprocess.CalledProcessError as e: | |
| print(f"🔥 FATAL: Command failed with exit code {e.returncode}", file=sys.stderr) | |
| print("--- stdout ---", file=sys.stderr) | |
| print(e.stdout, file=sys.stderr) | |
| print("--- stderr ---", file=sys.stderr) | |
| print(e.stderr, file=sys.stderr) | |
| sys.exit(1) | |
| except FileNotFoundError: | |
| print(f"🔥 FATAL: Command not found. Is '{command_str.split()[0]}' installed and in your PATH?", file=sys.stderr) | |
| sys.exit(1) | |
| def initial_setup(): | |
| """Installs the leptonai library and logs in.""" | |
| print(f"{'='*15} 🚀 Performing Initial Setup {'='*15}") | |
| run_startup_command("pip install -U leptonai", "Installing/Updating LeptonAI client") | |
| run_startup_command("lep login -c $RIZZO_INVESTIGATION_TOKEN", "Logging into Lepton AI") | |
| print(f"{'='*18} ✅ Setup Complete {'='*18}") | |
| def run_verification() -> bool: | |
| """ | |
| Generates a unique log, prints it, and verifies its existence in the job logs. | |
| Returns True if found within the timeout, False otherwise. | |
| """ | |
| unique_id = uuid.uuid4() | |
| log_message = f"Lepton log verification. Unique ID: {unique_id}" | |
| # 1. Print the unique log to stdout. This is what we expect to find. | |
| print(log_message) | |
| sys.stdout.flush() | |
| # 2. Prepare the command to fetch logs using the processed base job name. | |
| log_command = ["lep", "log", "get", "--job-name", LEPTON_JOB_NAME_BASE] | |
| print(f"\n🔎 Will poll job logs using: {' '.join(log_command)}") | |
| print(f"⏳ Waiting up to {VERIFICATION_TIMEOUT_SECONDS} seconds for the log to appear...") | |
| start_time = time.time() | |
| end_time = start_time + VERIFICATION_TIMEOUT_SECONDS | |
| # 3. Poll the logs until the message is found or timeout is reached | |
| while time.time() < end_time: | |
| try: | |
| result = subprocess.run( | |
| log_command, | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| job_logs = result.stdout | |
| if log_message in job_logs: | |
| print("\n✅ Log found in job output.") | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| print(f"\n⚠️ Warning: `lep log get` failed with exit code {e.returncode}. Will retry.", file=sys.stderr) | |
| print(e.stderr, file=sys.stderr) | |
| print(".", end="", flush=True) | |
| time.sleep(POLL_INTERVAL_SECONDS) | |
| return False | |
| if __name__ == "__main__": | |
| # --- Pre-run Checks for environment variables --- | |
| if not RAW_LEPTON_JOB_NAME: | |
| print("🔥 FATAL: LEPTON_JOB_NAME environment variable must be set.", file=sys.stderr) | |
| sys.exit(1) | |
| if not RIZZO_INVESTIGATION_TOKEN: | |
| print("🔥 FATAL: RIZZO_INVESTIGATION_TOKEN environment variable must be set for authentication.", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"ℹ️ Original job name: {RAW_LEPTON_JOB_NAME}") | |
| print(f"ℹ️ Using base job name for logs: {LEPTON_JOB_NAME_BASE}") | |
| initial_setup() | |
| # --- Main Loop --- | |
| iteration_count = 0 | |
| while True: | |
| iteration_count += 1 | |
| print(f"\n{'='*20} Iteration: {iteration_count} {'='*20}") | |
| was_found = run_verification() | |
| if was_found: | |
| print(f"✅ Iteration {iteration_count} SUCCEEDED.") | |
| time.sleep(DELAY_BETWEEN_ITERATIONS_SECONDS) | |
| else: | |
| print( | |
| f"\n🔥 FAILURE on iteration {iteration_count}: The unique log was not found within {VERIFICATION_TIMEOUT_SECONDS} seconds.", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment