|
#!/usr/bin/env python3 |
|
""" |
|
Checkmate Capture Agent Installer |
|
|
|
A comprehensive installer for Checkmate Capture hardware monitoring agent. |
|
- Prefers Docker if available, falls back to system binary installation |
|
- Works on Linux, macOS, and Windows |
|
- Includes complete uninstaller and health checks |
|
- Environment-aware installation with automatic detection |
|
|
|
Usage (interactive): |
|
python3 checkmate_capture_installer.py |
|
|
|
Non-interactive example: |
|
python3 checkmate_capture_installer.py --mode docker --secret <YOUR_SECRET> --port 59232 --no-interactive |
|
|
|
Uninstall: |
|
python3 checkmate_capture_installer.py --uninstall |
|
""" |
|
|
|
import argparse |
|
import hashlib |
|
import json |
|
import logging |
|
import os |
|
import platform |
|
import secrets |
|
import shutil |
|
import socket |
|
import subprocess |
|
import sys |
|
import tempfile |
|
import time |
|
import urllib.error |
|
import urllib.request |
|
import tarfile |
|
import zipfile |
|
from contextlib import contextmanager |
|
from pathlib import Path |
|
from typing import Optional, List, Dict, Tuple, Set |
|
|
|
# ======= Default Configuration Values ======= |
|
|
|
# Default Checkmate URLs (can be overridden via config file, env vars, or CLI args) |
|
DEFAULT_CHECKMATE_UI_URL = "https://checkmate-xg8w4og8k48scowsk0cc0woc.3.securiace.com" |
|
DEFAULT_CHECKMATE_SERVER_URL = "https://checkmateserver-xg8w4og8k48scowsk0cc0woc.3.securiace.com" |
|
DEFAULT_CAPTURE_IMAGE = "ghcr.io/bluewave-labs/capture:latest" |
|
DEFAULT_CAPTURE_PORT = 59232 |
|
DEFAULT_GITHUB_REPO = "bluewave-labs/capture" |
|
|
|
# Installation directories |
|
INSTALL_METADATA_FILE = Path.home() / ".checkmate-capture" / "install.json" |
|
CONFIG_FILE = Path.home() / ".checkmate-capture" / "config.json" |
|
LOG_DIR = Path.home() / ".checkmate-capture" |
|
LOG_FILE = LOG_DIR / "install.log" |
|
CACHE_DIR = LOG_DIR / "cache" |
|
DOCKER_COMPOSE_DIR = LOG_DIR / "docker" |
|
DOCKER_COMPOSE_FILE = DOCKER_COMPOSE_DIR / "docker-compose.yml" |
|
|
|
# Network settings |
|
DEFAULT_DOWNLOAD_TIMEOUT = 300 # 5 minutes |
|
DEFAULT_MAX_RETRIES = 3 |
|
DEFAULT_RETRY_DELAY_BASE = 2 # seconds |
|
|
|
|
|
class Config: |
|
"""Configuration manager with support for config file, environment variables, and CLI args.""" |
|
|
|
def __init__(self): |
|
"""Initialize configuration with defaults, then load from file, env, and CLI.""" |
|
# Default values |
|
self.checkmate_ui_url = DEFAULT_CHECKMATE_UI_URL |
|
self.checkmate_server_url = DEFAULT_CHECKMATE_SERVER_URL |
|
self.capture_image = DEFAULT_CAPTURE_IMAGE |
|
self.capture_port = DEFAULT_CAPTURE_PORT |
|
self.github_repo = DEFAULT_GITHUB_REPO |
|
self.download_timeout = DEFAULT_DOWNLOAD_TIMEOUT |
|
self.max_retries = DEFAULT_MAX_RETRIES |
|
self.retry_delay_base = DEFAULT_RETRY_DELAY_BASE |
|
|
|
# Load from config file |
|
self._load_from_file() |
|
|
|
# Override with environment variables |
|
self._load_from_env() |
|
|
|
def _load_from_file(self) -> None: |
|
"""Load configuration from config file if it exists.""" |
|
if CONFIG_FILE.exists(): |
|
try: |
|
with open(CONFIG_FILE, "r", encoding="utf-8") as f: |
|
config_data = json.load(f) |
|
|
|
# Validate and apply config values |
|
if "checkmate_ui_url" in config_data: |
|
self.checkmate_ui_url = str(config_data["checkmate_ui_url"]) |
|
if "checkmate_server_url" in config_data: |
|
self.checkmate_server_url = str(config_data["checkmate_server_url"]) |
|
if "capture_image" in config_data: |
|
self.capture_image = str(config_data["capture_image"]) |
|
if "capture_port" in config_data: |
|
try: |
|
self.capture_port = int(config_data["capture_port"]) |
|
except (ValueError, TypeError): |
|
log_warning("Invalid capture_port in config file, using default") |
|
if "github_repo" in config_data: |
|
self.github_repo = str(config_data["github_repo"]) |
|
if "download_timeout" in config_data: |
|
try: |
|
self.download_timeout = int(config_data["download_timeout"]) |
|
except (ValueError, TypeError): |
|
log_warning("Invalid download_timeout in config file, using default") |
|
if "max_retries" in config_data: |
|
try: |
|
self.max_retries = int(config_data["max_retries"]) |
|
except (ValueError, TypeError): |
|
log_warning("Invalid max_retries in config file, using default") |
|
if "retry_delay_base" in config_data: |
|
try: |
|
self.retry_delay_base = int(config_data["retry_delay_base"]) |
|
except (ValueError, TypeError): |
|
log_warning("Invalid retry_delay_base in config file, using default") |
|
|
|
log_debug(f"Loaded configuration from {CONFIG_FILE}") |
|
except json.JSONDecodeError as e: |
|
log_warning(f"Config file {CONFIG_FILE} contains invalid JSON: {e}") |
|
except Exception as e: |
|
log_warning(f"Failed to load config file {CONFIG_FILE}: {e}") |
|
|
|
def _load_from_env(self) -> None: |
|
"""Load configuration from environment variables.""" |
|
self.checkmate_ui_url = os.getenv("CHECKMATE_UI_URL", self.checkmate_ui_url) |
|
self.checkmate_server_url = os.getenv("CHECKMATE_SERVER_URL", self.checkmate_server_url) |
|
self.capture_image = os.getenv("CAPTURE_IMAGE", self.capture_image) |
|
if os.getenv("CAPTURE_PORT"): |
|
try: |
|
self.capture_port = int(os.getenv("CAPTURE_PORT")) |
|
except ValueError: |
|
pass |
|
self.github_repo = os.getenv("GITHUB_REPO", self.github_repo) |
|
if os.getenv("DOWNLOAD_TIMEOUT"): |
|
try: |
|
self.download_timeout = int(os.getenv("DOWNLOAD_TIMEOUT")) |
|
except ValueError: |
|
pass |
|
if os.getenv("MAX_RETRIES"): |
|
try: |
|
self.max_retries = int(os.getenv("MAX_RETRIES")) |
|
except ValueError: |
|
pass |
|
|
|
def apply_cli_args(self, args: argparse.Namespace) -> None: |
|
"""Apply CLI arguments (highest priority).""" |
|
if args.checkmate_ui_url: |
|
self.checkmate_ui_url = args.checkmate_ui_url |
|
if args.checkmate_server_url: |
|
self.checkmate_server_url = args.checkmate_server_url |
|
if args.capture_image: |
|
self.capture_image = args.capture_image |
|
if args.github_repo: |
|
self.github_repo = args.github_repo |
|
|
|
@property |
|
def github_releases_api(self) -> str: |
|
"""Get GitHub releases API URL.""" |
|
return f"https://api.github.com/repos/{self.github_repo}/releases/latest" |
|
|
|
def save_to_file(self) -> None: |
|
"""Save current configuration to config file.""" |
|
try: |
|
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) |
|
config_data = { |
|
"checkmate_ui_url": self.checkmate_ui_url, |
|
"checkmate_server_url": self.checkmate_server_url, |
|
"capture_image": self.capture_image, |
|
"capture_port": self.capture_port, |
|
"github_repo": self.github_repo, |
|
"download_timeout": self.download_timeout, |
|
"max_retries": self.max_retries, |
|
"retry_delay_base": self.retry_delay_base, |
|
} |
|
# Use atomic write to avoid corruption |
|
tmp_file = CONFIG_FILE.with_suffix(".tmp") |
|
with open(tmp_file, "w", encoding="utf-8") as f: |
|
json.dump(config_data, f, indent=2) |
|
# Atomic rename |
|
tmp_file.replace(CONFIG_FILE) |
|
log_info(f"Configuration saved to {CONFIG_FILE}") |
|
except Exception as e: |
|
log_error(f"Failed to save configuration: {e}") |
|
# Clean up temp file on error |
|
try: |
|
tmp_file = CONFIG_FILE.with_suffix(".tmp") |
|
if tmp_file.exists(): |
|
tmp_file.unlink() |
|
except Exception: |
|
pass |
|
raise InstallationError(f"Failed to save configuration: {e}") |
|
|
|
|
|
def validate_url(url: str, name: str) -> None: |
|
"""Validate URL format.""" |
|
if not url: |
|
raise ValidationError(f"{name} cannot be empty") |
|
if not url.startswith(("http://", "https://")): |
|
raise ValidationError(f"{name} must start with http:// or https://") |
|
# Basic URL validation |
|
try: |
|
from urllib.parse import urlparse |
|
parsed = urlparse(url) |
|
if not parsed.netloc: |
|
raise ValidationError(f"{name} must be a valid URL") |
|
except Exception as e: |
|
raise ValidationError(f"{name} is not a valid URL: {e}") |
|
|
|
|
|
def validate_github_repo(repo: str) -> None: |
|
"""Validate GitHub repository format (owner/repo).""" |
|
if not repo: |
|
raise ValidationError("GitHub repository cannot be empty") |
|
if "/" not in repo or repo.count("/") != 1: |
|
raise ValidationError("GitHub repository must be in format 'owner/repo'") |
|
|
|
# Configure logging |
|
def setup_logging(verbose: bool = False) -> None: |
|
"""Configure logging to both file and console.""" |
|
LOG_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
log_level = logging.DEBUG if verbose else logging.INFO |
|
|
|
# File handler |
|
file_handler = logging.FileHandler(LOG_FILE, encoding="utf-8") |
|
file_handler.setLevel(logging.DEBUG) |
|
file_formatter = logging.Formatter( |
|
"%(asctime)s [%(levelname)s] %(message)s", |
|
datefmt="%Y-%m-%d %H:%M:%S" |
|
) |
|
file_handler.setFormatter(file_formatter) |
|
|
|
# Console handler |
|
console_handler = logging.StreamHandler(sys.stdout) |
|
console_handler.setLevel(log_level) |
|
console_formatter = logging.Formatter("[%(levelname)s] %(message)s") |
|
console_handler.setFormatter(console_formatter) |
|
|
|
# Root logger |
|
root_logger = logging.getLogger() |
|
root_logger.setLevel(logging.DEBUG) |
|
root_logger.addHandler(file_handler) |
|
root_logger.addHandler(console_handler) |
|
|
|
|
|
def log_info(msg: str) -> None: |
|
"""Log info message.""" |
|
logging.info(msg) |
|
|
|
|
|
def log_error(msg: str) -> None: |
|
"""Log error message.""" |
|
logging.error(msg) |
|
|
|
|
|
def log_warning(msg: str) -> None: |
|
"""Log warning message.""" |
|
logging.warning(msg) |
|
|
|
|
|
def log_debug(msg: str) -> None: |
|
"""Log debug message.""" |
|
logging.debug(msg) |
|
|
|
|
|
# Note: log_debug is called before Config is initialized, so we can't use config in logging setup |
|
|
|
|
|
def print_header(config: Config) -> None: |
|
"""Print installation header.""" |
|
print("=" * 70) |
|
print("Checkmate Capture Agent Installer") |
|
print("=" * 70) |
|
print(f"Checkmate UI: {config.checkmate_ui_url}") |
|
print(f"Checkmate server: {config.checkmate_server_url}") |
|
print() |
|
|
|
|
|
class InstallationError(Exception): |
|
"""Custom exception for installation errors.""" |
|
pass |
|
|
|
|
|
class ValidationError(Exception): |
|
"""Custom exception for validation errors.""" |
|
pass |
|
|
|
|
|
def is_tty() -> bool: |
|
"""Check if stdin is a TTY (interactive terminal). |
|
|
|
Returns False if stdin is piped, redirected, or not available. |
|
This is critical for detecting when script is run via curl pipe. |
|
""" |
|
try: |
|
# Primary check: is stdin a TTY? |
|
if not sys.stdin.isatty(): |
|
return False |
|
|
|
# Additional safety: check if stdin is closed |
|
if sys.stdin.closed: |
|
return False |
|
|
|
# On some systems, isatty() might return True even for pipes |
|
# Try a non-blocking check if available |
|
try: |
|
import select |
|
if hasattr(select, 'select') and hasattr(sys.stdin, 'fileno'): |
|
# Check if stdin has data available (would indicate pipe) |
|
# This is a best-effort check |
|
ready, _, _ = select.select([sys.stdin], [], [], 0) |
|
# If ready and it's a pipe, we'll get EOF on read |
|
# But we can't read without consuming, so we trust isatty() |
|
except (ImportError, OSError, AttributeError): |
|
# select not available or stdin doesn't support it |
|
pass |
|
|
|
return True |
|
except Exception: |
|
# If any check fails, assume non-interactive to be safe |
|
return False |
|
|
|
|
|
def require_interactive(no_interactive_flag: bool, context: str = "") -> None: |
|
"""Raise EOFError if we're in non-interactive mode. |
|
|
|
This is a safety check before any input() call to ensure we don't |
|
try to read from a pipe. |
|
|
|
Args: |
|
no_interactive_flag: The args.no_interactive flag value |
|
context: Optional context string for error message |
|
|
|
Raises: |
|
EOFError: If non-interactive mode is detected |
|
""" |
|
if no_interactive_flag or not is_tty(): |
|
raise EOFError(f"Non-interactive mode detected{': ' + context if context else ''}") |
|
|
|
|
|
def is_root() -> bool: |
|
"""Check if running as root/admin.""" |
|
if os.name == "nt": |
|
# Windows: check if running as administrator |
|
try: |
|
import ctypes |
|
return ctypes.windll.shell32.IsUserAnAdmin() != 0 |
|
except Exception: |
|
return False |
|
try: |
|
return os.geteuid() == 0 |
|
except AttributeError: |
|
return False |
|
|
|
|
|
def check_port_available(port: int) -> bool: |
|
"""Check if a port is available for binding.""" |
|
try: |
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
|
s.settimeout(1) |
|
result = s.connect_ex(("127.0.0.1", port)) |
|
return result != 0 # Port is available if connection fails |
|
except Exception as e: |
|
log_debug(f"Port check error: {e}") |
|
return True # Assume available if check fails |
|
|
|
|
|
def validate_secret(secret: str) -> None: |
|
"""Validate API secret format.""" |
|
if not secret: |
|
raise ValidationError("Secret cannot be empty") |
|
if len(secret) < 16: |
|
raise ValidationError("Secret must be at least 16 characters long") |
|
if len(secret) > 128: |
|
raise ValidationError("Secret must be no more than 128 characters long") |
|
# Allow alphanumeric and common special characters |
|
if not all(c.isalnum() or c in "-_=+!@#$%^&*()[]{}|;:,.<>?" for c in secret): |
|
raise ValidationError("Secret contains invalid characters") |
|
|
|
|
|
def validate_port(port: int) -> None: |
|
"""Validate port number.""" |
|
if port < 1 or port > 65535: |
|
raise ValidationError(f"Port must be between 1 and 65535, got {port}") |
|
if not check_port_available(port): |
|
log_warning(f"Port {port} appears to be in use. Installation may fail.") |
|
|
|
|
|
def check_disk_space(path: Path, required_mb: int = 100) -> bool: |
|
"""Check if there's enough disk space.""" |
|
try: |
|
stat = shutil.disk_usage(path) |
|
free_mb = stat.free / (1024 * 1024) |
|
return free_mb >= required_mb |
|
except Exception as e: |
|
log_debug(f"Disk space check failed: {e}") |
|
return True # Assume OK if check fails |
|
|
|
|
|
def run(cmd: List[str], check: bool = True, capture_output: bool = False) -> subprocess.CompletedProcess: |
|
"""Run a command with logging.""" |
|
log_info(f"Running: {' '.join(cmd)}") |
|
try: |
|
result = subprocess.run( |
|
cmd, |
|
check=check, |
|
capture_output=capture_output, |
|
text=True, |
|
timeout=300 |
|
) |
|
return result |
|
except subprocess.TimeoutExpired: |
|
raise InstallationError(f"Command timed out: {' '.join(cmd)}") |
|
except subprocess.CalledProcessError as e: |
|
raise InstallationError(f"Command failed: {' '.join(cmd)} - {e}") |
|
|
|
|
|
def detect_docker() -> Tuple[bool, Optional[str]]: |
|
"""Detect Docker availability and variant.""" |
|
docker_cmd = shutil.which("docker") |
|
podman_cmd = shutil.which("podman") |
|
|
|
if docker_cmd: |
|
# Check if Docker daemon is running |
|
try: |
|
result = run(["docker", "info"], check=False, capture_output=True) |
|
if result.returncode == 0: |
|
return True, "docker" |
|
except Exception: |
|
pass |
|
|
|
if podman_cmd: |
|
# Check if Podman is available |
|
try: |
|
result = run(["podman", "info"], check=False, capture_output=True) |
|
if result.returncode == 0: |
|
return True, "podman" |
|
except Exception: |
|
pass |
|
|
|
return False, None |
|
|
|
|
|
def detect_docker_compose(docker_variant: str = "docker") -> Tuple[bool, str]: |
|
"""Detect Docker Compose availability. |
|
|
|
Returns: |
|
Tuple of (is_available, compose_command) |
|
compose_command will be either "docker compose" or "docker-compose" |
|
""" |
|
# Try "docker compose" (Docker Compose V2, integrated into Docker CLI) |
|
try: |
|
result = run([docker_variant, "compose", "version"], check=False, capture_output=True) |
|
if result.returncode == 0: |
|
return True, f"{docker_variant} compose" |
|
except Exception: |
|
pass |
|
|
|
# Try "docker-compose" (standalone Docker Compose V1) |
|
docker_compose_cmd = shutil.which("docker-compose") |
|
if docker_compose_cmd: |
|
try: |
|
result = run([docker_compose_cmd, "version"], check=False, capture_output=True) |
|
if result.returncode == 0: |
|
return True, docker_compose_cmd |
|
except Exception: |
|
pass |
|
|
|
return False, "" |
|
|
|
|
|
def detect_init_system() -> Optional[str]: |
|
"""Detect the init system on Linux.""" |
|
if platform.system().lower() != "linux": |
|
return None |
|
|
|
# Check for systemd |
|
if os.path.exists("/run/systemd/system") or shutil.which("systemctl"): |
|
return "systemd" |
|
|
|
# Check for upstart |
|
if os.path.exists("/sbin/initctl") or os.path.exists("/etc/init"): |
|
return "upstart" |
|
|
|
# Check for sysvinit |
|
if os.path.exists("/etc/init.d") or os.path.exists("/sbin/init"): |
|
return "sysvinit" |
|
|
|
return None |
|
|
|
|
|
def generate_secret() -> str: |
|
"""Generate a secure random secret.""" |
|
return secrets.token_hex(32) |
|
|
|
|
|
def get_local_ips() -> List[str]: |
|
"""Get local IP addresses (IPv4 only, excluding localhost).""" |
|
ips: Set[str] = set() |
|
hostname = socket.gethostname() |
|
|
|
# Method 1: getaddrinfo |
|
try: |
|
for res in socket.getaddrinfo(hostname, None): |
|
ip = res[4][0] |
|
if ":" in ip: # Skip IPv6 |
|
continue |
|
if ip.startswith("127."): # Skip localhost |
|
continue |
|
ips.add(ip) |
|
except Exception as e: |
|
log_debug(f"getaddrinfo failed: {e}") |
|
|
|
# Method 2: Connect to external IP to get local IP |
|
try: |
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
s.settimeout(2) |
|
s.connect(("8.8.8.8", 80)) |
|
local_ip = s.getsockname()[0] |
|
if not local_ip.startswith("127."): |
|
ips.add(local_ip) |
|
s.close() |
|
except Exception as e: |
|
log_debug(f"External IP detection failed: {e}") |
|
|
|
return sorted(ips) |
|
|
|
|
|
def download_with_progress(url: str, dest_path: Path, timeout: int = DEFAULT_DOWNLOAD_TIMEOUT) -> None: |
|
"""Download a file with progress indication.""" |
|
log_info(f"Downloading {url} to {dest_path}") |
|
|
|
def reporthook(count: int, block_size: int, total_size: int) -> None: |
|
if total_size > 0: |
|
percent = min(100, int(count * block_size * 100 / total_size)) |
|
size_mb = count * block_size / (1024 * 1024) |
|
total_mb = total_size / (1024 * 1024) if total_size > 0 else 0 |
|
sys.stdout.write(f"\r[+] Progress: {percent}% ({size_mb:.1f}/{total_mb:.1f} MB)") |
|
sys.stdout.flush() |
|
|
|
try: |
|
urllib.request.urlretrieve(url, str(dest_path), reporthook=reporthook) |
|
print() # New line after progress |
|
except urllib.error.URLError as e: |
|
raise InstallationError(f"Download failed: {e}") |
|
|
|
|
|
def download_with_retry(url: str, dest_path: Path, config: Config) -> None: |
|
"""Download a file with retry logic.""" |
|
for attempt in range(config.max_retries): |
|
try: |
|
download_with_progress(url, dest_path, timeout=config.download_timeout) |
|
return |
|
except Exception as e: |
|
if attempt < config.max_retries - 1: |
|
delay = config.retry_delay_base * (2 ** attempt) |
|
log_warning(f"Download attempt {attempt + 1} failed: {e}. Retrying in {delay}s...") |
|
time.sleep(delay) |
|
else: |
|
raise InstallationError(f"Download failed after {config.max_retries} attempts: {e}") |
|
|
|
|
|
def calculate_sha256(file_path: Path) -> str: |
|
"""Calculate SHA256 checksum of a file.""" |
|
sha256_hash = hashlib.sha256() |
|
with open(file_path, "rb") as f: |
|
for byte_block in iter(lambda: f.read(4096), b""): |
|
sha256_hash.update(byte_block) |
|
return sha256_hash.hexdigest() |
|
|
|
|
|
def verify_checksum(file_path: Path, expected_checksum: Optional[str] = None) -> bool: |
|
"""Verify file checksum if provided.""" |
|
if not expected_checksum: |
|
log_debug("No checksum provided, skipping verification") |
|
return True |
|
|
|
actual_checksum = calculate_sha256(file_path) |
|
if actual_checksum.lower() != expected_checksum.lower(): |
|
log_error(f"Checksum mismatch: expected {expected_checksum}, got {actual_checksum}") |
|
return False |
|
log_info("Checksum verification passed") |
|
return True |
|
|
|
|
|
def detect_platform_tags() -> Tuple[str, str]: |
|
"""Detect platform OS and architecture tags.""" |
|
system = platform.system().lower() |
|
machine = platform.machine().lower() |
|
|
|
# OS detection |
|
if "linux" in system: |
|
os_tag = "linux" |
|
elif "darwin" in system: |
|
os_tag = "darwin" # macOS |
|
elif "windows" in system or "win32" in system: |
|
os_tag = "windows" |
|
else: |
|
os_tag = system |
|
|
|
# Architecture detection |
|
if machine in ("x86_64", "amd64"): |
|
arch_tag = "amd64" |
|
elif machine in ("aarch64", "arm64"): |
|
arch_tag = "arm64" |
|
elif machine.startswith("armv7"): |
|
arch_tag = "armv7" |
|
elif machine.startswith("armv6"): |
|
arch_tag = "armv6" |
|
elif machine.startswith("arm"): |
|
arch_tag = "arm" |
|
elif machine.startswith("i386") or machine.startswith("i686"): |
|
arch_tag = "386" |
|
else: |
|
arch_tag = machine |
|
|
|
return os_tag, arch_tag |
|
|
|
|
|
def get_latest_release_info(config: Config) -> Dict: |
|
"""Get latest release information from GitHub API.""" |
|
log_info(f"Fetching latest release information from GitHub repo: {config.github_repo}") |
|
|
|
req = urllib.request.Request( |
|
config.github_releases_api, |
|
headers={"User-Agent": "checkmate-capture-installer/1.0"} |
|
) |
|
|
|
try: |
|
with urllib.request.urlopen(req, timeout=30) as resp: |
|
data = json.load(resp) |
|
return data |
|
except urllib.error.URLError as e: |
|
raise InstallationError(f"Failed to fetch release info from {config.github_releases_api}: {e}") |
|
|
|
|
|
def find_matching_asset(assets: List[Dict], os_tag: str, arch_tag: str) -> Optional[Dict]: |
|
"""Find matching asset for platform.""" |
|
candidates = [] |
|
for asset in assets: |
|
name = asset.get("name", "").lower() |
|
if os_tag in name and arch_tag in name: |
|
candidates.append(asset) |
|
|
|
if not candidates: |
|
return None |
|
|
|
# Prefer exact matches, then any match |
|
for candidate in candidates: |
|
name = candidate.get("name", "").lower() |
|
if f"{os_tag}-{arch_tag}" in name or f"{os_tag}_{arch_tag}" in name: |
|
return candidate |
|
|
|
return candidates[0] |
|
|
|
|
|
def get_cached_binary_path(version: str, os_tag: str, arch_tag: str) -> Optional[Path]: |
|
"""Get path to cached binary if it exists.""" |
|
cache_file = CACHE_DIR / f"capture-{version}-{os_tag}-{arch_tag}" |
|
if cache_file.exists(): |
|
log_info(f"Found cached binary: {cache_file}") |
|
return cache_file |
|
return None |
|
|
|
|
|
def cache_binary(binary_path: Path, version: str, os_tag: str, arch_tag: str) -> None: |
|
"""Cache downloaded binary.""" |
|
try: |
|
CACHE_DIR.mkdir(parents=True, exist_ok=True) |
|
cache_file = CACHE_DIR / f"capture-{version}-{os_tag}-{arch_tag}" |
|
# Use atomic copy to avoid corruption |
|
tmp_cache = cache_file.with_suffix(cache_file.suffix + ".tmp") |
|
shutil.copy2(binary_path, tmp_cache) |
|
# Atomic rename |
|
tmp_cache.replace(cache_file) |
|
log_info(f"Cached binary to {cache_file}") |
|
except Exception as e: |
|
log_warning(f"Failed to cache binary: {e}") |
|
# Don't fail installation if caching fails |
|
try: |
|
if tmp_cache.exists(): |
|
tmp_cache.unlink() |
|
except Exception: |
|
pass |
|
|
|
|
|
def find_capture_binary(root_dir: Path) -> Optional[Path]: |
|
"""Find Capture binary in extracted directory.""" |
|
for root, _dirs, files in os.walk(root_dir): |
|
for f in files: |
|
if f.lower().startswith("capture"): |
|
full = Path(root) / f |
|
if os.access(full, os.X_OK) or f.lower().endswith((".exe", ".bin")): |
|
return full |
|
return None |
|
|
|
|
|
def download_capture_binary(install_dir: Path, config: Config, use_cache: bool = True) -> Path: |
|
"""Download Capture binary for this platform.""" |
|
log_info(f"Downloading Capture binary into {install_dir}") |
|
install_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
# Check disk space |
|
if not check_disk_space(install_dir, required_mb=100): |
|
raise InstallationError("Insufficient disk space (need at least 100 MB)") |
|
|
|
os_tag, arch_tag = detect_platform_tags() |
|
log_info(f"Detected platform: os={os_tag}, arch={arch_tag}") |
|
|
|
# Get release info |
|
release_data = get_latest_release_info(config) |
|
version = release_data.get("tag_name", "latest") |
|
assets = release_data.get("assets", []) |
|
|
|
if not assets: |
|
raise InstallationError("No assets found in latest Capture release") |
|
|
|
# Check cache first |
|
if use_cache: |
|
cached = get_cached_binary_path(version, os_tag, arch_tag) |
|
if cached and cached.exists(): |
|
log_info("Using cached binary") |
|
final_path = install_dir / ("capture.exe" if os.name == "nt" else "capture") |
|
try: |
|
shutil.copy2(cached, final_path) |
|
if os.name != "nt": |
|
os.chmod(final_path, 0o755) |
|
return final_path |
|
except Exception as e: |
|
log_warning(f"Failed to use cached binary, will download: {e}") |
|
# Continue to download |
|
|
|
# Find matching asset |
|
asset = find_matching_asset(assets, os_tag, arch_tag) |
|
if not asset: |
|
names = [a.get("name", "") for a in assets] |
|
raise InstallationError( |
|
f"No matching binary for os={os_tag}, arch={arch_tag}. " |
|
f"Available: {names}" |
|
) |
|
|
|
url = asset["browser_download_url"] |
|
filename = asset["name"] |
|
tmp_path = Path(tempfile.gettempdir()) / filename |
|
|
|
# Download with retry |
|
download_with_retry(url, tmp_path, config) |
|
|
|
# Verify checksum if available |
|
checksum_url = None |
|
for a in assets: |
|
if "checksum" in a.get("name", "").lower() or "sha256" in a.get("name", "").lower(): |
|
checksum_url = a.get("browser_download_url") |
|
break |
|
|
|
if checksum_url: |
|
checksum_path = tmp_path.with_suffix(tmp_path.suffix + ".sha256") |
|
try: |
|
download_with_retry(checksum_url, checksum_path, config) |
|
with open(checksum_path, "r", encoding="utf-8") as f: |
|
checksum_content = f.read().strip() |
|
expected_checksum = checksum_content.split()[0] if checksum_content else None |
|
if expected_checksum: |
|
verify_checksum(tmp_path, expected_checksum) |
|
# Clean up checksum file |
|
try: |
|
checksum_path.unlink() |
|
except Exception: |
|
pass |
|
except Exception as e: |
|
log_warning(f"Checksum verification skipped: {e}") |
|
# Clean up checksum file on error |
|
try: |
|
if checksum_path.exists(): |
|
checksum_path.unlink() |
|
except Exception: |
|
pass |
|
|
|
# Extract or move binary |
|
extracted_path: Optional[Path] = None |
|
extraction_success = False |
|
try: |
|
if filename.endswith((".tar.gz", ".tgz")): |
|
log_info("Extracting tar.gz archive") |
|
with tarfile.open(tmp_path, "r:gz") as tf: |
|
tf.extractall(install_dir) |
|
extracted_path = find_capture_binary(install_dir) |
|
extraction_success = extracted_path is not None and extracted_path.exists() |
|
elif filename.endswith(".zip"): |
|
log_info("Extracting zip archive") |
|
with zipfile.ZipFile(tmp_path, "r") as zf: |
|
zf.extractall(install_dir) |
|
extracted_path = find_capture_binary(install_dir) |
|
extraction_success = extracted_path is not None and extracted_path.exists() |
|
else: |
|
# Assume it's directly the binary |
|
base = "capture.exe" if os.name == "nt" else "capture" |
|
extracted_path = install_dir / base |
|
shutil.move(str(tmp_path), str(extracted_path)) |
|
extraction_success = extracted_path.exists() |
|
except Exception as e: |
|
log_error(f"Failed to extract binary: {e}") |
|
raise InstallationError(f"Failed to extract Capture binary: {e}") |
|
finally: |
|
# Clean up temporary download file only if extraction succeeded |
|
if extraction_success: |
|
try: |
|
if tmp_path.exists(): |
|
tmp_path.unlink() |
|
except Exception as e: |
|
log_debug(f"Failed to clean up temp file {tmp_path}: {e}") |
|
|
|
if not extracted_path or not extracted_path.exists(): |
|
raise InstallationError("Failed to locate Capture binary after extraction") |
|
|
|
# Normalize final path |
|
final_name = "capture.exe" if os.name == "nt" else "capture" |
|
final_path = install_dir / final_name |
|
if extracted_path != final_path: |
|
try: |
|
if final_path.exists(): |
|
final_path.unlink() |
|
shutil.move(str(extracted_path), str(final_path)) |
|
except Exception as e: |
|
raise InstallationError(f"Failed to move binary to final location: {e}") |
|
|
|
# Set permissions |
|
if os.name != "nt": |
|
os.chmod(final_path, 0o755) |
|
|
|
# Cache binary |
|
cache_binary(final_path, version, os_tag, arch_tag) |
|
|
|
log_info(f"Capture binary installed at: {final_path}") |
|
return final_path |
|
|
|
|
|
def save_installation_info(mode: str, secret: str, port: int, binary_path: Optional[Path] = None) -> None: |
|
"""Save installation metadata.""" |
|
info = { |
|
"mode": mode, |
|
"secret": secret, |
|
"port": port, |
|
"binary_path": str(binary_path) if binary_path else None, |
|
"docker_compose_file": str(DOCKER_COMPOSE_FILE) if mode == "docker" else None, |
|
"timestamp": time.time(), |
|
"platform": { |
|
"system": platform.system(), |
|
"machine": platform.machine(), |
|
} |
|
} |
|
|
|
tmp_file = None |
|
try: |
|
INSTALL_METADATA_FILE.parent.mkdir(parents=True, exist_ok=True) |
|
# Use atomic write to avoid corruption |
|
tmp_file = INSTALL_METADATA_FILE.with_suffix(".tmp") |
|
with open(tmp_file, "w", encoding="utf-8") as f: |
|
json.dump(info, f, indent=2) |
|
# Atomic rename |
|
tmp_file.replace(INSTALL_METADATA_FILE) |
|
log_info(f"Installation info saved to {INSTALL_METADATA_FILE}") |
|
except Exception as e: |
|
log_error(f"Failed to save installation info: {e}") |
|
# Clean up temp file on error |
|
try: |
|
if tmp_file and tmp_file.exists(): |
|
tmp_file.unlink() |
|
except Exception: |
|
pass |
|
raise InstallationError(f"Failed to save installation metadata: {e}") |
|
|
|
|
|
def get_installation_info() -> Optional[Dict]: |
|
"""Get installation metadata if it exists.""" |
|
if INSTALL_METADATA_FILE.exists(): |
|
try: |
|
with open(INSTALL_METADATA_FILE) as f: |
|
return json.load(f) |
|
except Exception as e: |
|
log_warning(f"Failed to read installation info: {e}") |
|
return None |
|
|
|
|
|
def detect_existing_installation() -> Optional[Dict]: |
|
"""Detect existing installation and return detailed information.""" |
|
info = get_installation_info() |
|
if info: |
|
# Enhance with runtime status |
|
mode = info.get("mode", "unknown") |
|
if mode == "docker": |
|
docker_variant = info.get("docker_variant", "docker") |
|
try: |
|
result = run([docker_variant, "ps", "--filter", "name=capture", "--format", "{{.Status}}"], |
|
check=False, capture_output=True) |
|
status = result.stdout.strip() |
|
if status: |
|
info["status"] = status |
|
info["running"] = "Up" in status |
|
else: |
|
info["running"] = False |
|
except Exception: |
|
info["running"] = None |
|
elif mode == "system": |
|
init = info.get("init", "unknown") |
|
if init == "systemd": |
|
try: |
|
result = run(["systemctl", "is-active", "capture"], check=False, capture_output=True) |
|
info["running"] = (result.returncode == 0) |
|
if result.returncode == 0: |
|
info["status"] = "active" |
|
except Exception: |
|
info["running"] = None |
|
return info |
|
|
|
# Check Docker Compose file first |
|
if DOCKER_COMPOSE_FILE.exists(): |
|
docker_available, docker_variant = detect_docker() |
|
if docker_available: |
|
info = {"mode": "docker", "docker_variant": docker_variant, "compose_file": str(DOCKER_COMPOSE_FILE)} |
|
# Check if container is running |
|
try: |
|
result = run([docker_variant, "ps", "--filter", "name=capture", "--format", "{{.Status}}"], |
|
check=False, capture_output=True) |
|
status = result.stdout.strip() |
|
if status: |
|
info["status"] = status |
|
info["running"] = "Up" in status |
|
else: |
|
info["running"] = False |
|
except Exception: |
|
info["running"] = None |
|
return info |
|
|
|
# Check Docker container |
|
docker_available, docker_variant = detect_docker() |
|
if docker_available: |
|
try: |
|
result = run([docker_variant, "ps", "-a", "--filter", "name=capture", "--format", "{{.Names}}"], |
|
check=False, capture_output=True) |
|
if "capture" in result.stdout: |
|
# Get container status |
|
status_result = run([docker_variant, "ps", "--filter", "name=capture", "--format", "{{.Status}}"], |
|
check=False, capture_output=True) |
|
status = status_result.stdout.strip() |
|
return { |
|
"mode": "docker", |
|
"docker_variant": docker_variant, |
|
"running": "Up" in status if status else False, |
|
"status": status if status else "unknown" |
|
} |
|
except Exception: |
|
pass |
|
|
|
# Check systemd service |
|
if shutil.which("systemctl"): |
|
try: |
|
result = run(["systemctl", "is-active", "capture"], check=False, capture_output=True) |
|
if result.returncode == 0: |
|
# Get more details |
|
try: |
|
status_result = run(["systemctl", "status", "capture"], check=False, capture_output=True) |
|
return { |
|
"mode": "system", |
|
"init": "systemd", |
|
"running": True, |
|
"status": "active" |
|
} |
|
except Exception: |
|
return {"mode": "system", "init": "systemd", "running": True} |
|
except Exception: |
|
pass |
|
|
|
# Check binary in common locations |
|
common_paths = [ |
|
Path("/usr/local/bin/capture"), |
|
Path.home() / "capture-agent" / "capture", |
|
Path(os.environ.get("ProgramFiles", "C:\\")) / "Capture" / "capture.exe", |
|
] |
|
for path in common_paths: |
|
if path.exists(): |
|
return {"mode": "system", "binary_path": str(path), "running": None} |
|
|
|
return None |
|
|
|
|
|
def print_existing_installation_guide(existing: Dict) -> None: |
|
"""Print helpful guidance about existing installation.""" |
|
mode = existing.get("mode", "unknown") |
|
running = existing.get("running") |
|
status = existing.get("status", "") |
|
|
|
print("\n" + "=" * 70) |
|
print("EXISTING CAPTURE INSTALLATION DETECTED") |
|
print("=" * 70) |
|
|
|
if mode == "docker": |
|
compose_file = existing.get("compose_file", DOCKER_COMPOSE_FILE) |
|
docker_variant = existing.get("docker_variant", "docker") |
|
|
|
print(f"\nInstallation Mode: Docker ({docker_variant})") |
|
print(f"Compose File: {compose_file}") |
|
|
|
if running: |
|
print(f"Status: Running ({status})") |
|
else: |
|
print(f"Status: Not running ({status if status else 'stopped'})") |
|
|
|
print("\nTo manage your existing installation:") |
|
print(f" • View status: {docker_variant} ps --filter name=capture") |
|
print(f" • View logs: {docker_variant} logs capture") |
|
print(f" • Stop: docker compose -f {compose_file} down") |
|
print(f" • Start: docker compose -f {compose_file} up -d") |
|
print(f" • Restart: docker compose -f {compose_file} restart") |
|
print(f" • Edit config: Edit {compose_file}") |
|
|
|
# Try to get port and secret from compose file |
|
try: |
|
if Path(compose_file).exists(): |
|
with open(compose_file) as f: |
|
content = f.read() |
|
import re |
|
port_match = re.search(r'ports:\s*-\s*"(\d+):59232', content) |
|
if port_match: |
|
print(f" • Port: {port_match.group(1)}") |
|
except Exception: |
|
pass |
|
|
|
elif mode == "system": |
|
init = existing.get("init", "unknown") |
|
binary_path = existing.get("binary_path", "") |
|
|
|
print(f"\nInstallation Mode: System Binary") |
|
if binary_path: |
|
print(f"Binary Location: {binary_path}") |
|
if init == "systemd": |
|
print(f"Service Manager: systemd") |
|
if running: |
|
print(f"Status: Running (active)") |
|
else: |
|
print(f"Status: Not running") |
|
|
|
print("\nTo manage your existing installation:") |
|
print(" • View status: systemctl status capture") |
|
print(" • View logs: journalctl -u capture -f") |
|
print(" • Start: sudo systemctl start capture") |
|
print(" • Stop: sudo systemctl stop capture") |
|
print(" • Restart: sudo systemctl restart capture") |
|
print(" • Edit config: Check /etc/systemd/system/capture.service") |
|
else: |
|
print("\nTo manage your existing installation:") |
|
if binary_path: |
|
print(f" • Binary location: {binary_path}") |
|
print(" • Check process: ps aux | grep capture") |
|
print(" • Check logs: Check ~/.checkmate-capture/install.log") |
|
|
|
# Try to get installation info |
|
install_info = get_installation_info() |
|
if install_info: |
|
port = install_info.get("port") |
|
secret = install_info.get("secret") |
|
if port: |
|
print(f"\nCurrent Configuration:") |
|
print(f" • Port: {port}") |
|
if secret: |
|
masked_secret = secret[:8] + "..." + secret[-4:] if len(secret) > 12 else "***" |
|
print(f" • Secret: {masked_secret} (check {INSTALL_METADATA_FILE} for full secret)") |
|
|
|
print("\n" + "=" * 70) |
|
print("To overwrite this installation, use --force flag:") |
|
print(" python3 checkmate_capture_installer.py --force") |
|
print("=" * 70 + "\n") |
|
|
|
|
|
def verify_installation(port: int, secret: str, timeout: int = 10) -> bool: |
|
"""Verify installation by checking if Capture API responds.""" |
|
log_info("Verifying installation...") |
|
|
|
url = f"http://127.0.0.1:{port}/api/health" |
|
req = urllib.request.Request(url) |
|
req.add_header("X-API-Secret", secret) |
|
|
|
for attempt in range(3): |
|
try: |
|
with urllib.request.urlopen(req, timeout=timeout) as resp: |
|
if resp.getcode() == 200: |
|
log_info("Installation verified successfully") |
|
return True |
|
except urllib.error.HTTPError as e: |
|
if e.code == 401: |
|
log_warning("Health check returned 401 (unauthorized) - secret may be incorrect") |
|
return False |
|
if attempt < 2: |
|
time.sleep(2) |
|
else: |
|
log_warning(f"Health check failed with HTTP {e.code}: {e}") |
|
return False |
|
except urllib.error.URLError as e: |
|
if attempt < 2: |
|
log_debug(f"Health check attempt {attempt + 1} failed, retrying: {e}") |
|
time.sleep(2) |
|
else: |
|
log_warning(f"Health check failed: {e}") |
|
return False |
|
except Exception as e: |
|
if attempt < 2: |
|
log_debug(f"Health check attempt {attempt + 1} failed, retrying: {e}") |
|
time.sleep(2) |
|
else: |
|
log_warning(f"Health check error: {e}") |
|
return False |
|
|
|
return False |
|
|
|
|
|
def choose_install_mode(preferred_mode: str, docker_available: bool, docker_variant: Optional[str], |
|
non_interactive: bool) -> str: |
|
"""Choose installation mode.""" |
|
if preferred_mode != "auto": |
|
if preferred_mode == "docker" and not docker_available: |
|
raise InstallationError("Docker mode requested but Docker is not available") |
|
return preferred_mode |
|
|
|
# Auto mode |
|
if docker_available: |
|
# Double-check non-interactive mode (safety check) |
|
if non_interactive or not is_tty(): |
|
log_info(f"Docker detected ({docker_variant}); using Docker mode (auto, non-interactive)") |
|
return "docker" |
|
try: |
|
require_interactive(non_interactive, "Docker mode selection") |
|
print(f"[?] Docker is available on this machine ({docker_variant}).") |
|
choice = input(" Use Docker to run Capture? [Y/n]: ").strip().lower() |
|
if choice in ("", "y", "yes"): |
|
return "docker" |
|
except (EOFError, KeyboardInterrupt): |
|
# Non-interactive mode (piped input, etc.) |
|
log_info(f"Docker detected ({docker_variant}); using Docker mode (auto, non-interactive)") |
|
return "docker" |
|
|
|
log_info("Falling back to system binary installation mode") |
|
return "system" |
|
|
|
|
|
def create_docker_compose_file(secret: str, port: int, config: Config) -> Path: |
|
"""Create docker-compose.yml file for Capture. |
|
|
|
Returns: |
|
Path to the created docker-compose.yml file |
|
""" |
|
DOCKER_COMPOSE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
system = platform.system().lower() |
|
volumes = [] |
|
|
|
if system == "linux": |
|
if os.path.exists("/etc/os-release"): |
|
volumes.append("/etc/os-release:/etc/os-release:ro") |
|
|
|
compose_content = f"""version: '3.8' |
|
|
|
services: |
|
capture: |
|
image: {config.capture_image} |
|
container_name: capture |
|
restart: unless-stopped |
|
ports: |
|
- "{port}:{port}" |
|
environment: |
|
- API_SECRET={secret} |
|
- GIN_MODE=release |
|
- PORT={port} |
|
""" |
|
|
|
if volumes: |
|
compose_content += " volumes:\n" |
|
for vol in volumes: |
|
compose_content += f" - {vol}\n" |
|
|
|
compose_content += """ |
|
# You can add additional configuration here: |
|
# - Custom network settings |
|
# - Resource limits (memory, CPU) |
|
# - Logging configuration |
|
# - Health checks |
|
# - Additional volumes |
|
""" |
|
|
|
with open(DOCKER_COMPOSE_FILE, "w", encoding="utf-8") as f: |
|
f.write(compose_content) |
|
|
|
log_info(f"Created docker-compose.yml at {DOCKER_COMPOSE_FILE}") |
|
log_info("You can edit this file to customize Capture configuration") |
|
|
|
return DOCKER_COMPOSE_FILE |
|
|
|
|
|
def docker_install(secret: str, port: int, config: Config, docker_variant: str = "docker", force: bool = False) -> None: |
|
"""Install Capture using Docker Compose. |
|
|
|
Args: |
|
secret: API secret for Capture |
|
port: Port to run Capture on |
|
config: Configuration object |
|
docker_variant: Docker variant (docker/podman) |
|
force: If True, overwrite existing installation |
|
""" |
|
log_info(f"Installing Capture using {docker_variant} with Docker Compose") |
|
|
|
# Check if Docker Compose is available |
|
compose_available, compose_cmd = detect_docker_compose(docker_variant) |
|
if not compose_available: |
|
raise InstallationError( |
|
f"Docker Compose not found. Please install Docker Compose.\n" |
|
f" - Docker Compose V2: Usually included with Docker Desktop\n" |
|
f" - Docker Compose V1: Install via 'pip install docker-compose' or package manager" |
|
) |
|
|
|
log_info(f"Using Docker Compose command: {compose_cmd}") |
|
|
|
# Create docker-compose.yml file |
|
compose_file = create_docker_compose_file(secret, port, config) |
|
|
|
# Stop and remove existing containers (only if force or existing container detected) |
|
compose_cmd_parts = compose_cmd.split() |
|
existing = detect_existing_installation() |
|
if existing and existing.get("mode") == "docker": |
|
if existing.get("running") or force: |
|
log_info("Stopping any existing Capture containers") |
|
# Try to stop using existing compose file if different |
|
existing_compose = existing.get("compose_file") |
|
if existing_compose and Path(existing_compose).exists() and Path(existing_compose) != compose_file: |
|
run(compose_cmd_parts + ["-f", existing_compose, "down"], check=False) |
|
# Stop using new compose file |
|
run(compose_cmd_parts + ["-f", str(compose_file), "down"], check=False) |
|
else: |
|
# No existing installation detected, but check for any capture containers |
|
log_info("Checking for existing Capture containers") |
|
run(compose_cmd_parts + ["-f", str(compose_file), "down"], check=False) |
|
|
|
# Pull image |
|
log_info(f"Pulling {config.capture_image}") |
|
run(compose_cmd_parts + ["-f", str(compose_file), "pull"]) |
|
|
|
# Start services |
|
log_info("Starting Capture container") |
|
run(compose_cmd_parts + ["-f", str(compose_file), "up", "-d"]) |
|
|
|
log_info(f"Docker Compose-based Capture agent started on port {port}") |
|
log_info(f"Configuration file: {compose_file}") |
|
log_info("You can edit docker-compose.yml and run 'docker compose -f <file> up -d' to apply changes") |
|
|
|
# Wait a bit for container to start |
|
time.sleep(3) |
|
|
|
# Verify container is running |
|
result = run([docker_variant, "ps", "--filter", "name=capture", "--format", "{{.Status}}"], |
|
check=False, capture_output=True) |
|
if "Up" not in result.stdout: |
|
raise InstallationError("Container started but is not running properly") |
|
|
|
|
|
def create_launchd_plist(binary_path: Path, secret: str, port: int) -> Path: |
|
"""Create macOS launchd plist file.""" |
|
plist_path = Path.home() / "Library" / "LaunchAgents" / "com.checkmate.capture.plist" |
|
plist_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
plist_content = f"""<?xml version="1.0" encoding="UTF-8"?> |
|
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> |
|
<plist version="1.0"> |
|
<dict> |
|
<key>Label</key> |
|
<string>com.checkmate.capture</string> |
|
<key>ProgramArguments</key> |
|
<array> |
|
<string>{binary_path}</string> |
|
</array> |
|
<key>EnvironmentVariables</key> |
|
<dict> |
|
<key>API_SECRET</key> |
|
<string>{secret}</string> |
|
<key>GIN_MODE</key> |
|
<string>release</string> |
|
<key>PORT</key> |
|
<string>{port}</string> |
|
</dict> |
|
<key>RunAtLoad</key> |
|
<true/> |
|
<key>KeepAlive</key> |
|
<true/> |
|
<key>StandardOutPath</key> |
|
<string>{binary_path.parent / "capture.log"}</string> |
|
<key>StandardErrorPath</key> |
|
<string>{binary_path.parent / "capture.error.log"}</string> |
|
</dict> |
|
</plist>""" |
|
|
|
try: |
|
with open(plist_path, "w", encoding="utf-8") as f: |
|
f.write(plist_content) |
|
log_info(f"Created launchd plist: {plist_path}") |
|
except Exception as e: |
|
raise InstallationError(f"Failed to create launchd plist: {e}") |
|
|
|
return plist_path |
|
|
|
|
|
def create_windows_service(binary_path: Path, secret: str, port: int) -> None: |
|
"""Create Windows service using NSSM if available, otherwise use Task Scheduler.""" |
|
# Try NSSM first |
|
nssm_path = shutil.which("nssm") |
|
if nssm_path: |
|
log_info("Using NSSM to create Windows service") |
|
try: |
|
run([nssm_path, "install", "CheckmateCapture", str(binary_path)], check=False) |
|
run([nssm_path, "set", "CheckmateCapture", "AppEnvironmentExtra", |
|
f"API_SECRET={secret};GIN_MODE=release;PORT={port}"], check=False) |
|
run([nssm_path, "start", "CheckmateCapture"], check=False) |
|
log_info("Windows service created using NSSM") |
|
return |
|
except Exception as e: |
|
log_warning(f"NSSM service creation failed: {e}") |
|
|
|
# Fallback: Create scheduled task with batch file wrapper for environment variables |
|
log_info("NSSM not found, creating scheduled task with environment wrapper") |
|
task_name = "CheckmateCapture" |
|
|
|
# Create a batch file wrapper to set environment variables |
|
batch_path = binary_path.parent / "capture_service.bat" |
|
batch_content = f"""@echo off |
|
set API_SECRET={secret} |
|
set GIN_MODE=release |
|
set PORT={port} |
|
"{binary_path}" |
|
""" |
|
try: |
|
with open(batch_path, "w", encoding="utf-8") as f: |
|
f.write(batch_content) |
|
|
|
# Set restrictive permissions on batch file (Windows) |
|
try: |
|
import stat |
|
os.chmod(batch_path, stat.S_IREAD | stat.S_IWRITE) # Owner read/write only |
|
except Exception: |
|
pass |
|
except Exception as e: |
|
log_warning(f"Failed to create batch file: {e}") |
|
raise InstallationError(f"Failed to create Windows service wrapper: {e}") |
|
|
|
task_xml = f"""<?xml version="1.0" encoding="UTF-16"?> |
|
<Task version="1.2" xmlns="http://schemas.microsoft.com/windows/2004/02/mit/task"> |
|
<Triggers> |
|
<BootTrigger> |
|
<Enabled>true</Enabled> |
|
</BootTrigger> |
|
</Triggers> |
|
<Actions> |
|
<Exec> |
|
<Command>{batch_path}</Command> |
|
</Exec> |
|
</Actions> |
|
<Principals> |
|
<Principal id="Author"> |
|
<LogonType>InteractiveToken</LogonType> |
|
<RunLevel>HighestAvailable</RunLevel> |
|
</Principal> |
|
</Principals> |
|
</Task>""" |
|
|
|
xml_path = Path(tempfile.gettempdir()) / "checkmate_capture_task.xml" |
|
try: |
|
with open(xml_path, "w", encoding="utf-16") as f: |
|
f.write(task_xml) |
|
|
|
run(["schtasks", "/Create", "/TN", task_name, "/XML", str(xml_path), "/F"], check=False) |
|
log_info("Created scheduled task for Checkmate Capture") |
|
except Exception as e: |
|
log_warning(f"Failed to create scheduled task: {e}") |
|
log_info("You may need to manually create a service or scheduled task") |
|
log_info(f"Binary location: {binary_path}") |
|
log_info(f"Use this command to run manually: set API_SECRET={secret} && set PORT={port} && {binary_path}") |
|
finally: |
|
# Clean up temp XML file |
|
try: |
|
if xml_path.exists(): |
|
xml_path.unlink() |
|
except Exception: |
|
pass |
|
|
|
|
|
def system_install_linux(secret: str, port: int, config: Config) -> Path: |
|
"""Install Capture on Linux. Returns binary path.""" |
|
if not is_root(): |
|
raise InstallationError( |
|
"Root privileges required for Linux installation. " |
|
"Please run with sudo." |
|
) |
|
|
|
init_system = detect_init_system() |
|
log_info(f"Detected init system: {init_system}") |
|
|
|
install_dir = Path("/usr/local/bin") |
|
binary_path = download_capture_binary(install_dir, config) |
|
|
|
if init_system == "systemd": |
|
service_path = Path("/etc/systemd/system/capture.service") |
|
service_contents = f"""[Unit] |
|
Description=Checkmate Capture hardware monitoring agent |
|
After=network-online.target |
|
Wants=network-online.target |
|
|
|
[Service] |
|
Type=simple |
|
Environment=API_SECRET={secret} |
|
Environment=GIN_MODE=release |
|
Environment=PORT={port} |
|
ExecStart={binary_path} |
|
Restart=on-failure |
|
RestartSec=5 |
|
StandardOutput=journal |
|
StandardError=journal |
|
|
|
[Install] |
|
WantedBy=multi-user.target |
|
""" |
|
|
|
log_info(f"Writing systemd service to {service_path}") |
|
try: |
|
with open(service_path, "w", encoding="utf-8") as f: |
|
f.write(service_contents) |
|
except Exception as e: |
|
raise InstallationError(f"Failed to write systemd service file: {e}") |
|
|
|
run(["systemctl", "daemon-reload"]) |
|
run(["systemctl", "enable", "--now", "capture"]) |
|
|
|
log_info("Systemd service 'capture' installed and started") |
|
else: |
|
log_warning(f"Init system {init_system} not fully supported. Binary installed but service not created.") |
|
log_info(f"Binary location: {binary_path}") |
|
log_info("You may need to manually create a service or use Docker mode.") |
|
|
|
return binary_path |
|
|
|
|
|
def system_install_macos(secret: str, port: int, config: Config) -> Path: |
|
"""Install Capture on macOS. Returns binary path.""" |
|
home = Path.home() |
|
install_dir = home / "capture-agent" |
|
binary_path = download_capture_binary(install_dir, config) |
|
|
|
# Create launchd plist |
|
plist_path = create_launchd_plist(binary_path, secret, port) |
|
|
|
# Load the service |
|
try: |
|
run(["launchctl", "load", str(plist_path)]) |
|
log_info("Launchd service loaded successfully") |
|
except Exception as e: |
|
log_warning(f"Failed to load launchd service: {e}") |
|
log_info("You may need to manually load it: launchctl load " + str(plist_path)) |
|
|
|
return binary_path |
|
|
|
|
|
def system_install_windows(secret: str, port: int, config: Config) -> Path: |
|
"""Install Capture on Windows. Returns binary path.""" |
|
install_dir = Path(os.environ.get("ProgramFiles", "C:\\")) / "CheckmateCapture" |
|
install_dir.mkdir(parents=True, exist_ok=True) |
|
binary_path = download_capture_binary(install_dir, config) |
|
|
|
# Create Windows service |
|
create_windows_service(binary_path, secret, port) |
|
|
|
log_info(f"Capture installed to {install_dir}") |
|
return binary_path |
|
|
|
|
|
def system_install(secret: str, port: int, config: Config) -> Path: |
|
"""Install Capture using system binary. Returns binary path.""" |
|
system = platform.system().lower() |
|
if system == "windows": |
|
return system_install_windows(secret, port, config) |
|
elif system == "darwin": |
|
return system_install_macos(secret, port, config) |
|
elif system == "linux": |
|
return system_install_linux(secret, port, config) |
|
else: |
|
raise InstallationError(f"Unsupported system: {system}") |
|
|
|
|
|
def print_checkmate_instructions(secret: str, port: int, config: Config) -> None: |
|
"""Print instructions for adding server to Checkmate.""" |
|
ips = get_local_ips() |
|
ip_hint = ips[0] if ips else "<REMOTE_SERVER_IP>" |
|
|
|
print() |
|
print("=" * 70) |
|
print("NEXT STEP: Add this server to Checkmate Infrastructure monitoring") |
|
print("=" * 70) |
|
print("1. Open your Checkmate UI in a browser:") |
|
print(f" {config.checkmate_ui_url}") |
|
print() |
|
print("2. Go to: Infrastructure → Create Infrastructure Monitor") |
|
print() |
|
print("3. Use these settings:") |
|
print(f" - Server URL: http://{ip_hint}:{port}") |
|
print(f" - Auth secret: {secret}") |
|
print(" - Name: (e.g.) This server's hostname or role") |
|
print() |
|
print("4. Save the monitor. Within a few seconds, you should see CPU/RAM/Disk etc.") |
|
print() |
|
if ips: |
|
print("Detected local IP addresses on this host:") |
|
for ip in ips: |
|
print(f" - {ip}") |
|
print("If this server is behind NAT or firewalls, ensure Checkmate can reach the chosen IP/port.") |
|
print("=" * 70) |
|
|
|
|
|
def uninstall_docker(config: Config, docker_variant: str = "docker", force: bool = False) -> None: |
|
"""Uninstall Docker-based installation.""" |
|
log_info(f"Uninstalling Docker-based Capture ({docker_variant})") |
|
|
|
# Try Docker Compose approach first |
|
compose_available, compose_cmd = detect_docker_compose(docker_variant) |
|
if compose_available and DOCKER_COMPOSE_FILE.exists(): |
|
log_info("Using Docker Compose to stop and remove containers") |
|
compose_cmd_parts = compose_cmd.split() |
|
run(compose_cmd_parts + ["-f", str(DOCKER_COMPOSE_FILE), "down", "-v"], check=False) |
|
|
|
# Optionally remove image |
|
if force: |
|
try: |
|
run([docker_variant, "rmi", config.capture_image], check=False) |
|
log_info("Docker image removed") |
|
except Exception: |
|
pass |
|
|
|
# Remove compose file and directory |
|
if force: |
|
try: |
|
if DOCKER_COMPOSE_FILE.exists(): |
|
DOCKER_COMPOSE_FILE.unlink() |
|
if DOCKER_COMPOSE_DIR.exists() and not any(DOCKER_COMPOSE_DIR.iterdir()): |
|
DOCKER_COMPOSE_DIR.rmdir() |
|
log_info("Docker Compose files removed") |
|
except Exception as e: |
|
log_warning(f"Failed to remove compose files: {e}") |
|
|
|
log_info("Docker Compose installation removed") |
|
return |
|
|
|
# Fallback to direct docker commands |
|
log_info("Falling back to direct docker commands") |
|
run([docker_variant, "stop", "capture"], check=False) |
|
run([docker_variant, "rm", "capture"], check=False) |
|
|
|
# Optionally remove image |
|
if force: |
|
try: |
|
run([docker_variant, "rmi", config.capture_image], check=False) |
|
log_info("Docker image removed") |
|
except Exception: |
|
pass |
|
|
|
log_info("Docker installation removed") |
|
|
|
|
|
def uninstall_system(keep_logs: bool = False, keep_config: bool = False) -> None: |
|
"""Uninstall system binary installation.""" |
|
log_info("Uninstalling system binary installation") |
|
|
|
info = get_installation_info() |
|
binary_path = None |
|
|
|
if info and info.get("binary_path"): |
|
binary_path = Path(info["binary_path"]) |
|
|
|
# Stop and remove service |
|
system = platform.system().lower() |
|
|
|
if system == "linux": |
|
init_system = detect_init_system() |
|
if init_system == "systemd": |
|
run(["systemctl", "stop", "capture"], check=False) |
|
run(["systemctl", "disable", "capture"], check=False) |
|
service_path = Path("/etc/systemd/system/capture.service") |
|
if service_path.exists(): |
|
service_path.unlink() |
|
run(["systemctl", "daemon-reload"], check=False) |
|
log_info("Systemd service removed") |
|
|
|
elif system == "darwin": |
|
plist_path = Path.home() / "Library" / "LaunchAgents" / "com.checkmate.capture.plist" |
|
if plist_path.exists(): |
|
run(["launchctl", "unload", str(plist_path)], check=False) |
|
if not keep_config: |
|
plist_path.unlink() |
|
log_info("Launchd plist removed") |
|
|
|
elif system == "windows": |
|
# Try NSSM |
|
nssm_path = shutil.which("nssm") |
|
if nssm_path: |
|
run([nssm_path, "stop", "CheckmateCapture"], check=False) |
|
run([nssm_path, "remove", "CheckmateCapture", "confirm"], check=False) |
|
else: |
|
# Remove scheduled task |
|
run(["schtasks", "/Delete", "/TN", "CheckmateCapture", "/F"], check=False) |
|
|
|
# Remove binary |
|
if binary_path and binary_path.exists(): |
|
binary_path.unlink() |
|
log_info(f"Binary removed: {binary_path}") |
|
|
|
# Remove parent directory if empty |
|
try: |
|
if binary_path.parent.exists() and not any(binary_path.parent.iterdir()): |
|
binary_path.parent.rmdir() |
|
except Exception: |
|
pass |
|
|
|
# Remove logs |
|
if not keep_logs: |
|
log_paths = [ |
|
Path.home() / "capture-agent" / "capture.log", |
|
Path.home() / "capture-agent" / "capture.error.log", |
|
Path(os.environ.get("ProgramFiles", "C:\\")) / "CheckmateCapture" / "capture.log", |
|
] |
|
for log_path in log_paths: |
|
if log_path.exists(): |
|
log_path.unlink() |
|
log_info(f"Log removed: {log_path}") |
|
|
|
|
|
def uninstall(config: Config, force: bool = False, keep_logs: bool = False, keep_config: bool = False) -> None: |
|
"""Main uninstaller function.""" |
|
log_info("Starting uninstallation process") |
|
|
|
existing = detect_existing_installation() |
|
if not existing: |
|
log_info("No existing installation detected") |
|
return |
|
|
|
mode = existing.get("mode", "unknown") |
|
|
|
if mode == "docker": |
|
docker_variant = existing.get("docker_variant", "docker") |
|
uninstall_docker(config, docker_variant, force) |
|
elif mode == "system": |
|
uninstall_system(keep_logs, keep_config) |
|
else: |
|
log_warning(f"Unknown installation mode: {mode}") |
|
|
|
# Remove installation metadata |
|
if INSTALL_METADATA_FILE.exists(): |
|
INSTALL_METADATA_FILE.unlink() |
|
log_info("Installation metadata removed") |
|
|
|
# Optionally remove cache |
|
if force and CACHE_DIR.exists(): |
|
shutil.rmtree(CACHE_DIR) |
|
log_info("Cache directory removed") |
|
|
|
log_info("Uninstallation complete") |
|
|
|
|
|
def parse_args(): |
|
"""Parse command line arguments.""" |
|
parser = argparse.ArgumentParser( |
|
description="Install & run Checkmate Capture agent on this server.", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
epilog=""" |
|
Examples: |
|
# Interactive installation |
|
python3 checkmate_capture_installer.py |
|
|
|
# Non-interactive Docker installation |
|
python3 checkmate_capture_installer.py --mode docker --secret MY_SECRET --port 59232 --no-interactive |
|
|
|
# System binary installation |
|
python3 checkmate_capture_installer.py --mode system --secret MY_SECRET |
|
|
|
# Uninstall |
|
python3 checkmate_capture_installer.py --uninstall |
|
|
|
# Uninstall with options |
|
python3 checkmate_capture_installer.py --uninstall --force --keep-logs |
|
""" |
|
) |
|
|
|
parser.add_argument( |
|
"--mode", |
|
choices=["auto", "docker", "system"], |
|
default="auto", |
|
help="Installation mode. 'auto' prefers Docker if available (default: auto).", |
|
) |
|
parser.add_argument( |
|
"--secret", |
|
help="API secret to use for Capture. If omitted, a random one is generated.", |
|
) |
|
parser.add_argument( |
|
"--port", |
|
type=int, |
|
default=None, |
|
help=f"Port for Capture to listen on (default: {DEFAULT_CAPTURE_PORT}).", |
|
) |
|
parser.add_argument( |
|
"--checkmate-ui-url", |
|
dest="checkmate_ui_url", |
|
help=f"Checkmate UI URL (default: {DEFAULT_CHECKMATE_UI_URL}).", |
|
) |
|
parser.add_argument( |
|
"--checkmate-server-url", |
|
dest="checkmate_server_url", |
|
help=f"Checkmate Server URL (default: {DEFAULT_CHECKMATE_SERVER_URL}).", |
|
) |
|
parser.add_argument( |
|
"--capture-image", |
|
dest="capture_image", |
|
help=f"Docker image for Capture (default: {DEFAULT_CAPTURE_IMAGE}).", |
|
) |
|
parser.add_argument( |
|
"--github-repo", |
|
dest="github_repo", |
|
help=f"GitHub repository for Capture releases in format 'owner/repo' (default: {DEFAULT_GITHUB_REPO}).", |
|
) |
|
parser.add_argument( |
|
"--save-config", |
|
action="store_true", |
|
help="Save current configuration (from CLI args) to config file for future use.", |
|
) |
|
parser.add_argument( |
|
"--no-interactive", |
|
action="store_true", |
|
help="Run without interactive prompts, using defaults where possible.", |
|
) |
|
parser.add_argument( |
|
"--uninstall", |
|
action="store_true", |
|
help="Uninstall existing Capture installation.", |
|
) |
|
parser.add_argument( |
|
"--force", |
|
action="store_true", |
|
help="Force installation/uninstall (skip confirmations, overwrite existing installation).", |
|
) |
|
parser.add_argument( |
|
"--keep-logs", |
|
action="store_true", |
|
help="Keep log files when uninstalling.", |
|
) |
|
parser.add_argument( |
|
"--keep-config", |
|
action="store_true", |
|
help="Keep configuration files when uninstalling.", |
|
) |
|
parser.add_argument( |
|
"--verbose", |
|
action="store_true", |
|
help="Enable verbose logging.", |
|
) |
|
|
|
return parser.parse_args() |
|
|
|
|
|
def main() -> None: |
|
"""Main entry point.""" |
|
args = parse_args() |
|
|
|
# Auto-detect non-interactive mode if stdin is not a TTY (MUST be before setup_logging) |
|
# This needs to happen early because Config initialization might log things |
|
if not args.no_interactive: |
|
try: |
|
if not is_tty(): |
|
args.no_interactive = True |
|
except Exception: |
|
# If TTY check fails, assume non-interactive to be safe |
|
args.no_interactive = True |
|
|
|
setup_logging(verbose=args.verbose) |
|
|
|
if args.no_interactive and not args.force: |
|
log_info("Running in non-interactive mode") |
|
|
|
# Initialize configuration |
|
config = Config() |
|
config.apply_cli_args(args) |
|
|
|
# Validate configuration URLs |
|
try: |
|
validate_url(config.checkmate_ui_url, "Checkmate UI URL") |
|
validate_url(config.checkmate_server_url, "Checkmate Server URL") |
|
if args.github_repo: |
|
validate_github_repo(config.github_repo) |
|
except ValidationError as e: |
|
log_error(str(e)) |
|
sys.exit(1) |
|
|
|
# Handle port - use CLI arg if provided, otherwise use config default |
|
port = args.port if args.port is not None else config.capture_port |
|
|
|
# Save config if requested |
|
if args.save_config: |
|
config.save_to_file() |
|
log_info("Configuration saved. You can edit it at any time in ~/.checkmate-capture/config.json") |
|
|
|
print_header(config) |
|
|
|
# Handle uninstall |
|
if args.uninstall: |
|
if not args.force and not args.no_interactive: |
|
existing = detect_existing_installation() |
|
if existing: |
|
try: |
|
require_interactive(args.no_interactive, "uninstall confirmation") |
|
print(f"[?] Found existing installation: {existing.get('mode', 'unknown')}") |
|
choice = input(" Proceed with uninstall? [y/N]: ").strip().lower() |
|
if choice not in ("y", "yes"): |
|
print("Uninstall cancelled.") |
|
return |
|
except (EOFError, KeyboardInterrupt): |
|
print("\n[!] Non-interactive mode detected. Use --force to skip confirmation.") |
|
sys.exit(1) |
|
else: |
|
print("[!] No existing installation detected.") |
|
return |
|
|
|
try: |
|
uninstall(config, force=args.force, keep_logs=args.keep_logs, keep_config=args.keep_config) |
|
print("\n[+] Uninstallation complete.") |
|
except Exception as exc: |
|
log_error(f"Uninstallation failed: {exc}") |
|
sys.exit(1) |
|
return |
|
|
|
# Check for existing installation |
|
existing = detect_existing_installation() |
|
if existing: |
|
if args.force: |
|
log_info("Existing installation detected but --force flag provided, proceeding with overwrite") |
|
print(f"[!] Existing installation detected: {existing.get('mode', 'unknown')}") |
|
print("[!] --force flag provided, will overwrite existing installation") |
|
elif args.no_interactive: |
|
# In non-interactive mode, require --force to overwrite |
|
print_existing_installation_guide(existing) |
|
print("\n[ERROR] Existing Capture installation detected!") |
|
print(" Cannot proceed in non-interactive mode without --force flag.") |
|
print(" Use --force to overwrite existing installation:") |
|
print(" python3 checkmate_capture_installer.py --force") |
|
sys.exit(1) |
|
else: |
|
# Interactive mode - show guide and ask for confirmation |
|
print_existing_installation_guide(existing) |
|
try: |
|
require_interactive(args.no_interactive, "existing installation confirmation") |
|
choice = input("\n[?] Overwrite existing installation? [y/N]: ").strip().lower() |
|
if choice not in ("y", "yes"): |
|
print("\n[!] Installation cancelled.") |
|
print(" Use --force flag to overwrite without prompt.") |
|
return |
|
print("\n[!] Proceeding with installation (will overwrite existing)...") |
|
except (EOFError, KeyboardInterrupt): |
|
print("\n[ERROR] Non-interactive mode detected. Use --force to overwrite existing installation.") |
|
sys.exit(1) |
|
|
|
# Validate inputs |
|
try: |
|
if args.secret: |
|
validate_secret(args.secret) |
|
validate_port(port) |
|
except ValidationError as e: |
|
log_error(str(e)) |
|
sys.exit(1) |
|
|
|
# Decide on secret |
|
if args.secret: |
|
secret = args.secret |
|
log_info("Using API secret provided via --secret") |
|
else: |
|
generated = generate_secret() |
|
# Double-check non-interactive mode (safety check) |
|
if args.no_interactive or not is_tty(): |
|
secret = generated |
|
log_info("Generated API secret (non-interactive mode)") |
|
print(f"[+] Generated API secret: {secret}") |
|
print(" Save this secret - you'll need it to add this agent to Checkmate UI!") |
|
else: |
|
try: |
|
require_interactive(args.no_interactive, "secret confirmation") |
|
print("[?] No API secret provided.") |
|
print(f" Generated a strong random secret: {generated}") |
|
choice = input(" Use this secret? [Y/n]: ").strip().lower() |
|
if choice in ("", "y", "yes"): |
|
secret = generated |
|
else: |
|
require_interactive(args.no_interactive, "custom secret input") |
|
secret = input(" Enter your desired secret: ").strip() |
|
try: |
|
validate_secret(secret) |
|
except ValidationError as e: |
|
log_error(str(e)) |
|
sys.exit(1) |
|
except (EOFError, KeyboardInterrupt): |
|
# Fallback to non-interactive mode |
|
log_info("Non-interactive mode detected (EOF), using generated secret") |
|
secret = generated |
|
print(f"[+] Generated API secret: {secret}") |
|
print(" Save this secret - you'll need it to add this agent to Checkmate UI!") |
|
|
|
# Detect Docker |
|
docker_available, docker_variant = detect_docker() |
|
if docker_available: |
|
log_info(f"Docker detected ({docker_variant})") |
|
else: |
|
log_info("Docker NOT detected") |
|
|
|
# Choose installation mode |
|
try: |
|
mode = choose_install_mode(args.mode, docker_available, docker_variant, args.no_interactive) |
|
log_info(f"Selected installation mode: {mode}") |
|
except InstallationError as e: |
|
log_error(str(e)) |
|
sys.exit(1) |
|
|
|
# Perform installation |
|
try: |
|
if mode == "docker": |
|
docker_install(secret, port, config, docker_variant or "docker", force=args.force) |
|
save_installation_info(mode, secret, port) |
|
else: |
|
binary_path = system_install(secret, port, config) |
|
save_installation_info(mode, secret, port, binary_path) |
|
|
|
# Verify installation |
|
if verify_installation(port, secret): |
|
log_info("Installation verified successfully") |
|
else: |
|
log_warning("Installation completed but health check failed. Service may need a moment to start.") |
|
|
|
print_checkmate_instructions(secret, port, config) |
|
|
|
except Exception as exc: |
|
log_error(f"Installation failed: {exc}") |
|
import traceback |
|
log_debug(traceback.format_exc()) |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |