Last active
December 8, 2025 09:58
-
-
Save meepak/f468e3239c5579b1cb84d7c5696efd88 to your computer and use it in GitHub Desktop.
Linux sysadmin script using Open AI API - It will perform the any task asked in the given server details..
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import time | |
| from typing import List, Dict, Any | |
| import paramiko | |
| import subprocess | |
| import select | |
| from openai import OpenAI | |
| # ============================================================ | |
| # HARD-CODE YOUR API KEY HERE | |
| # ============================================================ | |
| OPENAI_API_KEY = "sk-proj-Uozj....._MA" | |
| # ============================================================ | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| # ---------------- COLORS (ANSI) ---------------- | |
| RESET = "\033[0m" | |
| COL_SEP = "\033[95m" # magenta | |
| COL_THOUGHT = "\033[96m" # bright cyan | |
| COL_CMD = "\033[93m" # bright yellow | |
| COL_INFO = "\033[94m" # blue | |
| COL_WARN = "\033[91m" # red | |
| COL_STDOUT_LABEL = "\033[92m" # green | |
| COL_STDERR_LABEL = "\033[91m" # red | |
| COL_EXIT_OK = "\033[92m" # green | |
| COL_EXIT_BAD = "\033[91m" # red | |
| COL_INPUT = "\033[90m" # dim gray | |
| COL_RISK = "\033[33m" # yellow-ish | |
| # ---------------- GPT CALL ---------------- | |
| def call_gpt(messages: List[Dict[str, str]]) -> Dict[str, Any]: | |
| response = client.responses.create( | |
| model="gpt-4.1-mini", # change if you like | |
| input=messages, | |
| max_output_tokens=600, | |
| temperature=0.2, | |
| ) | |
| text = response.output[0].content[0].text.strip() | |
| # Handle possible ```json ... ``` wrapping | |
| if text.startswith("```"): | |
| text = text.strip("`") | |
| if text.startswith("json"): | |
| text = text[4:].lstrip() | |
| try: | |
| return json.loads(text) | |
| except Exception: | |
| raise RuntimeError(f"Model did not return valid JSON: {text[:200]}...") | |
| # ---------------- PROMPT DETECTION ---------------- | |
| def is_password_prompt(line: str, username: str | None) -> bool: | |
| """ | |
| Detect lines that are likely asking specifically for a password. | |
| We handle these automatically with the stored password. | |
| """ | |
| lower = line.lower().strip() | |
| if "password" not in lower: | |
| return False | |
| # Typical sudo prompts | |
| if "sudo" in lower: | |
| return True | |
| if username and username.lower() in lower: | |
| return True | |
| if lower.endswith(":"): | |
| return True | |
| return True | |
| def is_interactive_prompt(line: str) -> bool: | |
| """ | |
| Detect lines that are likely asking for interactive *non-password* input: | |
| confirmations, choices, etc. | |
| NOTE: password prompts are handled separately in is_password_prompt(). | |
| """ | |
| lower = line.lower().rstrip() | |
| if not lower: | |
| return False | |
| # Don't treat password prompts as generic interactive prompts | |
| if "password" in lower or "passphrase" in lower: | |
| return False | |
| looks_like_prompt = ( | |
| lower.endswith(":") | |
| or lower.endswith("?") | |
| or "[y/n]" in lower | |
| or "[y/n]" in lower.replace(" ", "") | |
| or "[y/n]" in lower.replace(" ", "").replace("[y/n]", "[y/n]") | |
| ) | |
| if not looks_like_prompt: | |
| return False | |
| keywords = [ | |
| "do you want to continue", | |
| "continue? [y/n]", | |
| "press enter", | |
| "press return", | |
| "enter choice", | |
| "enter selection", | |
| "enter new", | |
| "are you sure", | |
| ] | |
| return any(k in lower for k in keywords) | |
| # ---------------- COMMAND SANITISER ---------------- | |
| def make_noninteractive(command: str) -> str: | |
| """ | |
| Rewrite certain commands to be non-interactive. | |
| - For any 'systemctl' invocation, ensure '--no-pager' is present. | |
| """ | |
| parts = command.strip().split() | |
| if not parts: | |
| return command | |
| # Find 'systemctl' in the token list | |
| try: | |
| idx = parts.index("systemctl") | |
| except ValueError: | |
| return command # nothing to change | |
| # If '--no-pager' is not already present, insert it after 'systemctl' | |
| if "--no-pager" not in parts[idx + 1:]: | |
| parts.insert(idx + 1, "--no-pager") | |
| return " ".join(parts) | |
| # ---------------- SSH SESSION ---------------- | |
| class SSHSession: | |
| """ | |
| Remote executor over SSH. | |
| Interface: connect(), run(command, idle_timeout), close() | |
| """ | |
| def __init__(self, host: str, username: str, password: str, port: int = 22): | |
| self.host = host | |
| self.username = username | |
| self.password = password | |
| self.port = port | |
| self.client: paramiko.SSHClient | None = None | |
| def connect(self): | |
| self.client = paramiko.SSHClient() | |
| self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
| self.client.connect( | |
| hostname=self.host, | |
| port=self.port, | |
| username=self.username, | |
| password=self.password, | |
| look_for_keys=False, | |
| ) | |
| def run(self, command: str, idle_timeout: int = 300) -> Dict[str, Any]: | |
| """ | |
| Run a command on the remote host with an *idle* timeout, streaming output. | |
| If the command prompts for input (passwords, confirmations, etc.), | |
| ask the user (for non-password prompts) or send the stored password. | |
| """ | |
| if not self.client: | |
| raise RuntimeError("SSH not connected") | |
| transport = self.client.get_transport() | |
| if not transport: | |
| raise RuntimeError("No SSH transport") | |
| # Sanitize to avoid interactive pagers etc. | |
| cmd_stripped = command.strip() | |
| effective_cmd = make_noninteractive(cmd_stripped) | |
| channel = transport.open_session() | |
| uses_sudo = effective_cmd.startswith("sudo ") | |
| if uses_sudo: | |
| channel.get_pty() # sudo likes a TTY | |
| channel.exec_command(effective_cmd) | |
| # Send sudo password once if needed (initial) | |
| if uses_sudo and self.password: | |
| try: | |
| channel.send(self.password + "\n") | |
| except Exception: | |
| pass | |
| stdout_chunks: list[str] = [] | |
| stderr_chunks: list[str] = [] | |
| last_activity = time.time() | |
| timed_out = False | |
| stdout_started = False | |
| stderr_started = False | |
| while True: | |
| activity = False | |
| if channel.recv_ready(): | |
| chunk = channel.recv(4096).decode("utf-8", errors="replace") | |
| stdout_chunks.append(chunk) | |
| activity = True | |
| if not stdout_started: | |
| print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="") | |
| stdout_started = True | |
| for line in chunk.splitlines(keepends=True): | |
| print(line, end="", flush=True) | |
| # Password prompt? auto-send stored password | |
| if is_password_prompt(line, self.username): | |
| if self.password: | |
| try: | |
| channel.send(self.password + "\n") | |
| except Exception: | |
| pass | |
| continue | |
| # Generic interactive prompt? ask user | |
| if is_interactive_prompt(line): | |
| user_val = input( | |
| f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} " | |
| ) | |
| try: | |
| channel.send(user_val + "\n") | |
| except Exception: | |
| pass | |
| if channel.recv_stderr_ready(): | |
| chunk = channel.recv_stderr(4096).decode("utf-8", errors="replace") | |
| stderr_chunks.append(chunk) | |
| activity = True | |
| if not stderr_started: | |
| print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="") | |
| stderr_started = True | |
| for line in chunk.splitlines(keepends=True): | |
| print(line, end="", flush=True) | |
| if is_password_prompt(line, self.username): | |
| if self.password: | |
| try: | |
| channel.send(self.password + "\n") | |
| except Exception: | |
| pass | |
| continue | |
| if is_interactive_prompt(line): | |
| user_val = input( | |
| f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} " | |
| ) | |
| try: | |
| channel.send(user_val + "\n") | |
| except Exception: | |
| pass | |
| if activity: | |
| last_activity = time.time() | |
| if channel.exit_status_ready(): | |
| # Drain any remaining data | |
| while channel.recv_ready(): | |
| chunk = channel.recv(4096).decode("utf-8", errors="replace") | |
| stdout_chunks.append(chunk) | |
| if not stdout_started: | |
| print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="") | |
| stdout_started = True | |
| for line in chunk.splitlines(keepends=True): | |
| print(line, end="", flush=True) | |
| if is_password_prompt(line, self.username): | |
| if self.password: | |
| try: | |
| channel.send(self.password + "\n") | |
| except Exception: | |
| pass | |
| continue | |
| if is_interactive_prompt(line): | |
| user_val = input( | |
| f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} " | |
| ) | |
| try: | |
| channel.send(user_val + "\n") | |
| except Exception: | |
| pass | |
| while channel.recv_stderr_ready(): | |
| chunk = channel.recv_stderr(4096).decode("utf-8", errors="replace") | |
| stderr_chunks.append(chunk) | |
| if not stderr_started: | |
| print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="") | |
| stderr_started = True | |
| for line in chunk.splitlines(keepends=True): | |
| print(line, end="", flush=True) | |
| if is_password_prompt(line, self.username): | |
| if self.password: | |
| try: | |
| channel.send(self.password + "\n") | |
| except Exception: | |
| pass | |
| continue | |
| if is_interactive_prompt(line): | |
| user_val = input( | |
| f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} " | |
| ) | |
| try: | |
| channel.send(user_val + "\n") | |
| except Exception: | |
| pass | |
| break | |
| if time.time() - last_activity > idle_timeout: | |
| timed_out = True | |
| note = ( | |
| f"\n[CLIENT NOTICE] No output for {idle_timeout} seconds. " | |
| f"Command treated as stuck and channel closed by client.\n" | |
| ) | |
| stderr_chunks.append(note) | |
| if not stderr_started: | |
| print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="") | |
| stderr_started = True | |
| print(note, end="", flush=True) | |
| channel.close() | |
| break | |
| time.sleep(0.2) | |
| if timed_out: | |
| exit_status = -1 | |
| else: | |
| exit_status = channel.recv_exit_status() | |
| stdout = "".join(stdout_chunks) | |
| stderr = "".join(stderr_chunks) | |
| if "sudo" in effective_cmd and "not in the sudoers file" in stderr: | |
| print( | |
| f"\n{COL_WARN}[SUDO ERROR]{RESET} User '{self.username}' is not allowed to use sudo on remote host." | |
| ) | |
| return { | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "exit_status": exit_status, | |
| "timed_out": timed_out, | |
| "idle_timeout": idle_timeout, | |
| } | |
| def close(self): | |
| if self.client: | |
| self.client.close() | |
| # ---------------- LOCAL SESSION ---------------- | |
| class LocalSession: | |
| """ | |
| Local executor (no SSH) when host is localhost/127.0.0.1. | |
| Interface: connect(), run(command, idle_timeout), close() | |
| """ | |
| def __init__(self, username: str, password: str): | |
| self.username = username | |
| self.password = password | |
| def connect(self): | |
| # Nothing to do for local | |
| pass | |
| def run(self, command: str, idle_timeout: int = 300) -> Dict[str, Any]: | |
| """ | |
| Run a command *locally* with an idle timeout, streaming output. | |
| For sudo commands: | |
| - We rewrite 'sudo cmd...' to 'sudo -S cmd...' so sudo reads password from stdin. | |
| - We send the provided password once. | |
| If the command prompts for input (passwords, confirmations, etc.), | |
| ask the user (for non-password prompts) or send the stored password. | |
| """ | |
| cmd_stripped = command.strip() | |
| cmd_sanitized = make_noninteractive(cmd_stripped) | |
| uses_sudo = cmd_sanitized.startswith("sudo ") | |
| effective_cmd = cmd_sanitized | |
| if uses_sudo: | |
| # ensure sudo reads password from stdin: add -S | |
| parts = cmd_sanitized.split() | |
| if len(parts) >= 1 and parts[0] == "sudo" and "-S" not in parts[1:2]: | |
| parts.insert(1, "-S") | |
| effective_cmd = " ".join(parts) | |
| proc = subprocess.Popen( | |
| effective_cmd, | |
| shell=True, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, | |
| ) | |
| def safe_write_stdin(data: str): | |
| """Write to stdin only if it is still open, swallow BrokenPipe.""" | |
| if proc.stdin is None or proc.stdin.closed: | |
| return | |
| try: | |
| proc.stdin.write(data) | |
| proc.stdin.flush() | |
| except BrokenPipeError: | |
| # child closed pipe, don't spam errors | |
| pass | |
| except Exception: | |
| # ignore any other stdin write errors | |
| pass | |
| # Send sudo password if needed | |
| if uses_sudo and self.password: | |
| safe_write_stdin(self.password + "\n") | |
| stdout_chunks: list[str] = [] | |
| stderr_chunks: list[str] = [] | |
| last_activity = time.time() | |
| timed_out = False | |
| stdout_started = False | |
| stderr_started = False | |
| while True: | |
| # --- If process is finished, drain remaining output and break --- | |
| if proc.poll() is not None: | |
| # Drain remaining stdout | |
| if proc.stdout: | |
| for line in proc.stdout: | |
| stdout_chunks.append(line) | |
| if not stdout_started: | |
| print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="") | |
| stdout_started = True | |
| print(line, end="", flush=True) | |
| if is_password_prompt(line, self.username): | |
| if self.password: | |
| safe_write_stdin(self.password + "\n") | |
| continue | |
| if is_interactive_prompt(line): | |
| user_val = input( | |
| f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} " | |
| ) | |
| safe_write_stdin(user_val + "\n") | |
| # Drain remaining stderr | |
| if proc.stderr: | |
| for line in proc.stderr: | |
| stderr_chunks.append(line) | |
| if not stderr_started: | |
| print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="") | |
| stderr_started = True | |
| print(line, end="", flush=True) | |
| if is_password_prompt(line, self.username): | |
| if self.password: | |
| safe_write_stdin(self.password + "\n") | |
| continue | |
| if is_interactive_prompt(line): | |
| user_val = input( | |
| f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} " | |
| ) | |
| safe_write_stdin(user_val + "\n") | |
| break # process is done, leave loop | |
| # --- Process still running: check for new output with select() --- | |
| activity = False | |
| fds = [] | |
| if proc.stdout: | |
| fds.append(proc.stdout) | |
| if proc.stderr: | |
| fds.append(proc.stderr) | |
| if not fds: | |
| break | |
| ready, _, _ = select.select(fds, [], [], 0.2) | |
| if proc.stdout in ready: | |
| line = proc.stdout.readline() | |
| if line: | |
| stdout_chunks.append(line) | |
| activity = True | |
| if not stdout_started: | |
| print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="") | |
| stdout_started = True | |
| print(line, end="", flush=True) | |
| if is_password_prompt(line, self.username): | |
| if self.password: | |
| safe_write_stdin(self.password + "\n") | |
| activity = True | |
| elif is_interactive_prompt(line): | |
| user_val = input( | |
| f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} " | |
| ) | |
| safe_write_stdin(user_val + "\n") | |
| activity = True | |
| if proc.stderr in ready: | |
| line = proc.stderr.readline() | |
| if line: | |
| stderr_chunks.append(line) | |
| activity = True | |
| if not stderr_started: | |
| print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="") | |
| stderr_started = True | |
| print(line, end="", flush=True) | |
| if is_password_prompt(line, self.username): | |
| if self.password: | |
| safe_write_stdin(self.password + "\n") | |
| activity = True | |
| elif is_interactive_prompt(line): | |
| user_val = input( | |
| f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} " | |
| ) | |
| safe_write_stdin(user_val + "\n") | |
| activity = True | |
| if activity: | |
| last_activity = time.time() | |
| # --- Idle-timeout only while the process is still running and silent --- | |
| if time.time() - last_activity > idle_timeout: | |
| timed_out = True | |
| note = ( | |
| f"\n[CLIENT NOTICE] No output for {idle_timeout} seconds. " | |
| f"Command treated as stuck and process killed by client.\n" | |
| ) | |
| stderr_chunks.append(note) | |
| if not stderr_started: | |
| print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="") | |
| stderr_started = True | |
| print(note, end="", flush=True) | |
| proc.kill() | |
| break | |
| # Make sure stdin is closed so GC doesn't try to flush a dead pipe | |
| if proc.stdin is not None and not proc.stdin.closed: | |
| try: | |
| proc.stdin.close() | |
| except Exception: | |
| pass | |
| exit_status = proc.returncode if proc.returncode is not None else -1 | |
| stdout = "".join(stdout_chunks) | |
| stderr = "".join(stderr_chunks) | |
| if uses_sudo and "not in the sudoers file" in stderr: | |
| print( | |
| f"\n{COL_WARN}[SUDO ERROR]{RESET} Local user '{self.username}' is not allowed to use sudo." | |
| ) | |
| return { | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "exit_status": exit_status, | |
| "timed_out": timed_out, | |
| "idle_timeout": idle_timeout, | |
| } | |
| def close(self): | |
| # Nothing to close for local | |
| pass | |
| # ---------------- SYSTEM PROMPT ---------------- | |
| SYSTEM_PROMPT = """ | |
| You are an autonomous Linux sysadmin assistant running behind an API that | |
| expects STRICT JSON responses. | |
| IMPORTANT RULES ABOUT OUTPUT: | |
| - Your ENTIRE reply MUST be exactly ONE JSON object. | |
| - Do NOT include multiple JSON objects. | |
| - Do NOT include any extra text, comments, or markdown. | |
| - Do NOT wrap the JSON in ``` fences. | |
| INTERACTION MODEL: | |
| - You do NOT have direct shell access; you can only propose commands. | |
| - The client program runs your commands and sends back stdout/stderr. | |
| - The client supports answering line-based prompts (like 'Do you want to continue?'), | |
| but it CANNOT reliably drive interactive TUIs or full-screen console menus. | |
| ABSOLUTE RESTRICTIONS: | |
| - NEVER use interactive console/TUI tools like: whiptail, dialog, nmtui, top, htop, | |
| alsamixer, raspi-config, menuconfig, or dpkg-reconfigure without a fully | |
| non-interactive mode. | |
| - NEVER rely on curses-style full-screen UIs, wizard menus, or arrow-key navigation. | |
| - ALWAYS use non-interactive configuration methods: | |
| - edit config files directly with sed/awk/cat/tee, | |
| - or use tools/flags that are specifically documented as non-interactive. | |
| APT / PACKAGE MANAGEMENT: | |
| - When using apt-get/apt: | |
| - Always include: `DEBIAN_FRONTEND=noninteractive` in the environment. | |
| - Use: `apt-get -y` (assume yes to prompts). | |
| Example: | |
| DEBIAN_FRONTEND=noninteractive sudo apt-get update | |
| DEBIAN_FRONTEND=noninteractive sudo apt-get -y install rkhunter | |
| COMMAND RESPONSE FORMAT: | |
| At each step you MUST output JSON ONLY, in one of these forms: | |
| 1) To run a command: | |
| { | |
| "type": "command", | |
| "thought": "<brief reasoning>", | |
| "risk": "<brief risk description>", | |
| "command": "<bash command>" | |
| } | |
| 2) When finished: | |
| { | |
| "type": "done", | |
| "thought": "<why>", | |
| "summary": "<final result for the user>" | |
| } | |
| The "risk" field is REQUIRED for every command. | |
| Notes about timeouts & interactivity: | |
| - The client enforces an *idle* timeout: if your command produces NO output | |
| (neither stdout nor stderr) for some period, it will be killed and marked | |
| as timed out. | |
| - Do NOT wrap your commands with the 'timeout' utility; the client already | |
| handles stuck commands. | |
| - Avoid interactive commands that wait for user input (like pagers or editors). | |
| Always use non-interactive flags such as '--no-pager', '-n', '-y', etc. | |
| - If a command times out or fails, read the error / partial output and try | |
| a different strategy (shorter commands, non-interactive flags, etc.). | |
| General behavior: | |
| - Keep commands simple and robust. | |
| - Prefer non-destructive checks first (ls, cat, grep, systemctl status). | |
| - Avoid obviously destructive commands (rm -rf /, dropping entire databases, etc.) | |
| unless the user explicitly requested such destructive action. | |
| - Use previous command outputs to adjust your next step. | |
| """ | |
| def truncate(s: str, limit: int = 2000) -> str: | |
| s = s or "" | |
| if len(s) <= limit: | |
| return s | |
| return s[:limit] + f"\n...[truncated {len(s)-limit} chars]" | |
| # ---------------- AGENT LOOP ---------------- | |
| def compute_effective_idle_timeout(command: str, default_idle: int) -> int: | |
| """ | |
| If command starts with 'timeout N ...', use max(default_idle, N+30) as idle timeout, | |
| so we don't kill it earlier than the command's own timeout. | |
| Otherwise return default_idle. | |
| """ | |
| cmd = command.strip() | |
| if not cmd.startswith("timeout "): | |
| return default_idle | |
| parts = cmd.split() | |
| if len(parts) < 2: | |
| return default_idle | |
| try: | |
| secs = int(parts[1]) | |
| return max(default_idle, secs + 30) | |
| except ValueError: | |
| return max(default_idle, 600) | |
| def run_agent(goal: str, executor, username: str, max_steps: int = 20, idle_timeout: int = 300): | |
| history = [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": f"GOAL: {goal}"}, | |
| ] | |
| allow_all = False # when True, no more prompts; all commands auto-run | |
| for step in range(1, max_steps + 1): | |
| print(f"\n\n{COL_SEP}==================== STEP {step} ===================={RESET}\n") | |
| history.append({"role": "user", "content": f"STEP {step}: Decide next action."}) | |
| action = call_gpt(history) | |
| action_type = action.get("type") | |
| thought = action.get("thought", "") | |
| risk = action.get("risk", "").strip() | |
| thought_text = thought or "(no thought provided)" | |
| print(f"{COL_THOUGHT}[MODEL THOUGHT]{RESET} {COL_THOUGHT}{thought_text}{RESET}") | |
| if not risk: | |
| risk = "Risk not specified (assume minimal, but review command carefully)." | |
| print(f"{COL_RISK}[RISK]{RESET} {COL_RISK}{risk}{RESET}") | |
| if action_type == "done": | |
| summary = action.get("summary", "") or "(no summary provided)" | |
| print(f"{COL_INFO}[SUMMARY]{RESET} {COL_INFO}{summary}{RESET}") | |
| # Soft close: ask user if anything else / not working | |
| while True: | |
| follow = input( | |
| f"\n{COL_INPUT}Please test the changes. " | |
| "Is something not working or do you have another request? " | |
| "[y = yes, continue, n = no, finish]: " | |
| f"{RESET}" | |
| ).strip().lower() | |
| if follow in ("y", "n"): | |
| break | |
| print(f"{COL_WARN}Please enter 'y' or 'n'.{RESET}") | |
| if follow == "n": | |
| print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!") | |
| break | |
| else: | |
| print( | |
| f"{COL_INPUT}Describe what you want next or what is not working " | |
| "(single line is fine):{RESET}" | |
| ) | |
| extra = input("> ").strip() | |
| if not extra: | |
| print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!") | |
| break | |
| history.append({ | |
| "role": "user", | |
| "content": ( | |
| "New follow-up request / issue from user after testing:\n" | |
| f"{extra}\n\n" | |
| "Please continue from the current state and propose the next command." | |
| ), | |
| }) | |
| continue | |
| command = action.get("command", "").strip() | |
| if not command: | |
| print(f"{COL_WARN}[WARN]{RESET} No command returned by model. Aborting.") | |
| print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!") | |
| break | |
| print(f"{COL_CMD}[PROPOSED COMMAND]{RESET} {COL_CMD}{command}{RESET}") | |
| # Decide whether to prompt, or auto-run | |
| if not allow_all: | |
| # y=run, s=skip, n=terminate, e=edit thought, a=allow all | |
| while True: | |
| choice = input( | |
| f"\n{COL_INPUT}Allow this command? " | |
| "[y = run, s = skip, n = terminate, e = edit thought, a = allow all]: " | |
| f"{RESET}" | |
| ).strip().lower() | |
| if choice in ("y", "s", "n", "e", "a"): | |
| break | |
| print(f"{COL_WARN}Please enter 'y', 's', 'n', 'e', or 'a'.{RESET}") | |
| if choice == "n": | |
| print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!") | |
| break | |
| if choice == "s": | |
| history.append({ | |
| "role": "user", | |
| "content": ( | |
| f"I refused to run this command:\n{command}\n\n" | |
| "The stated risk was:\n" | |
| f"{risk}\n\n" | |
| "Please propose a safer or alternative command." | |
| ), | |
| }) | |
| continue | |
| if choice == "e": | |
| print( | |
| f"{COL_INPUT}Enter your instructions or edited reasoning " | |
| "(single line; keep it brief):{RESET}" | |
| ) | |
| user_edit = input("> ").strip() | |
| if not user_edit: | |
| user_edit = "No additional details; just propose a better command." | |
| history.append({ | |
| "role": "user", | |
| "content": ( | |
| "I want you to revise your reasoning and plan. " | |
| "Instead of the previous command, follow this guidance:\n" | |
| f"{user_edit}\n\n" | |
| "Ignore the last proposed command and propose a new command " | |
| "in the next step." | |
| ), | |
| }) | |
| continue | |
| if choice == "a": | |
| allow_all = True | |
| print(f"{COL_INFO}[INFO]{RESET} All future commands will be executed without confirmation.") | |
| # --- Guardrail: block known interactive TUI / console GUI tools --- | |
| TUI_BLOCKLIST = [ | |
| "whiptail", | |
| "dialog ", | |
| " nmtui", | |
| "raspi-config", | |
| "menuconfig", | |
| "htop", | |
| "top ", | |
| "alsamixer", | |
| "dpkg-reconfigure", | |
| ] | |
| if any(bad in command for bad in TUI_BLOCKLIST): | |
| print( | |
| f"{COL_WARN}[BLOCKED]{RESET} Command looks like an interactive console GUI/TUI " | |
| "which the agent cannot reliably drive." | |
| ) | |
| history.append({ | |
| "role": "user", | |
| "content": ( | |
| "I refused to run this command because it uses an interactive " | |
| "console GUI/TUI tool that cannot be driven programmatically:\n" | |
| f"{command}\n\n" | |
| "You MUST instead use a non-interactive approach, such as editing " | |
| "configuration files directly, or using flags/environment variables " | |
| "for non-interactive mode. Propose a different command." | |
| ), | |
| }) | |
| continue | |
| # --- Normalize apt commands to be non-interactive --- | |
| def make_apt_noninteractive(cmd: str) -> str: | |
| if "apt-get" in cmd or " apt " in cmd or cmd.startswith("apt "): | |
| # prepend DEBIAN_FRONTEND if not present | |
| if "DEBIAN_FRONTEND=" not in cmd: | |
| cmd = "DEBIAN_FRONTEND=noninteractive " + cmd | |
| # ensure -y is present for apt-get/apt install | |
| if ("apt-get" in cmd or " apt " in cmd) and " install " in cmd and " -y" not in cmd: | |
| cmd = cmd.replace(" install ", " install -y ") | |
| return cmd | |
| command = make_apt_noninteractive(command) | |
| effective_idle = compute_effective_idle_timeout(command, idle_timeout) | |
| print( | |
| f"{COL_INFO}[INFO]{RESET} Executing command with idle timeout = " | |
| f"{effective_idle} seconds..." | |
| ) | |
| result = executor.run(command, idle_timeout=effective_idle) | |
| if result["timed_out"] or result["exit_status"] != 0: | |
| col = COL_EXIT_BAD | |
| else: | |
| col = COL_EXIT_OK | |
| if result["timed_out"]: | |
| print( | |
| f"\n{col}[EXIT STATUS]{RESET} -1 " | |
| f"(IDLE TIMEOUT: no output for {result['idle_timeout']} seconds)" | |
| ) | |
| else: | |
| print(f"\n{col}[EXIT STATUS]{RESET} {result['exit_status']}") | |
| feedback = ( | |
| f"Command: {command}\n" | |
| f"Exit status: {result['exit_status']}\n" | |
| f"Timed out (idle): {result['timed_out']} " | |
| f"(idle_timeout={result['idle_timeout']} seconds)\n" | |
| f"STDOUT:\n{truncate(result['stdout'])}\n\n" | |
| f"STDERR:\n{truncate(result['stderr'])}\n" | |
| ) | |
| history.append({"role": "user", "content": "RESULT OF YOUR COMMAND:\n" + feedback}) | |
| # ---------------- MAIN ENTRY (INTERACTIVE + ARGUMENTS) ---------------- | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Autonomous SSH GPT Sysadmin (Semi-Manual)" | |
| ) | |
| parser.add_argument("--host", type=str, help="Target host (IP or hostname)") | |
| parser.add_argument("--username", type=str, help="SSH/local username") | |
| parser.add_argument("--password", type=str, help="SSH/local & sudo password") | |
| parser.add_argument("--goal", type=str, help="Task/goal for the agent to perform") | |
| args = parser.parse_args() | |
| print(f"{COL_SEP}=== Autonomous SSH GPT Sysadmin (Semi-Manual) ==={RESET}\n") | |
| # Resolve missing arguments with prompts | |
| host = args.host or input("Enter host (IP or hostname): ").strip() | |
| username = args.username or input("Enter username: ").strip() | |
| if args.password: | |
| password = args.password | |
| else: | |
| password = input("Enter password (SSH/local & sudo): ").strip() | |
| goal = args.goal or input("Enter goal (task to perform): ").strip() | |
| # Decide local vs SSH | |
| host_lower = host.lower() | |
| if host_lower in ("localhost", "127.0.0.1"): | |
| print(f"\n{COL_INFO}[INFO]{RESET} Using LOCAL execution (no SSH) on this machine.") | |
| executor = LocalSession(username=username, password=password) | |
| executor.connect() | |
| else: | |
| print(f"\n{COL_INFO}[INFO]{RESET} Using SSH to connect to {host}.") | |
| executor = SSHSession(host=host, username=username, password=password) | |
| print(f"{COL_INFO}[INFO]{RESET} Connecting...") | |
| executor.connect() | |
| print(f"{COL_INFO}[INFO]{RESET} Connected.\n") | |
| try: | |
| run_agent(goal, executor, username=username, max_steps=100, idle_timeout=300) | |
| finally: | |
| executor.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment