Skip to content

Instantly share code, notes, and snippets.

@meepak
Last active December 8, 2025 09:58
Show Gist options
  • Select an option

  • Save meepak/f468e3239c5579b1cb84d7c5696efd88 to your computer and use it in GitHub Desktop.

Select an option

Save meepak/f468e3239c5579b1cb84d7c5696efd88 to your computer and use it in GitHub Desktop.
Linux sysadmin script using Open AI API - It will perform the any task asked in the given server details..
import json
import time
from typing import List, Dict, Any
import paramiko
import subprocess
import select
from openai import OpenAI
# ============================================================
# HARD-CODE YOUR API KEY HERE
# ============================================================
OPENAI_API_KEY = "sk-proj-Uozj....._MA"
# ============================================================
client = OpenAI(api_key=OPENAI_API_KEY)
# ---------------- COLORS (ANSI) ----------------
RESET = "\033[0m"
COL_SEP = "\033[95m" # magenta
COL_THOUGHT = "\033[96m" # bright cyan
COL_CMD = "\033[93m" # bright yellow
COL_INFO = "\033[94m" # blue
COL_WARN = "\033[91m" # red
COL_STDOUT_LABEL = "\033[92m" # green
COL_STDERR_LABEL = "\033[91m" # red
COL_EXIT_OK = "\033[92m" # green
COL_EXIT_BAD = "\033[91m" # red
COL_INPUT = "\033[90m" # dim gray
COL_RISK = "\033[33m" # yellow-ish
# ---------------- GPT CALL ----------------
def call_gpt(messages: List[Dict[str, str]]) -> Dict[str, Any]:
response = client.responses.create(
model="gpt-4.1-mini", # change if you like
input=messages,
max_output_tokens=600,
temperature=0.2,
)
text = response.output[0].content[0].text.strip()
# Handle possible ```json ... ``` wrapping
if text.startswith("```"):
text = text.strip("`")
if text.startswith("json"):
text = text[4:].lstrip()
try:
return json.loads(text)
except Exception:
raise RuntimeError(f"Model did not return valid JSON: {text[:200]}...")
# ---------------- PROMPT DETECTION ----------------
def is_password_prompt(line: str, username: str | None) -> bool:
"""
Detect lines that are likely asking specifically for a password.
We handle these automatically with the stored password.
"""
lower = line.lower().strip()
if "password" not in lower:
return False
# Typical sudo prompts
if "sudo" in lower:
return True
if username and username.lower() in lower:
return True
if lower.endswith(":"):
return True
return True
def is_interactive_prompt(line: str) -> bool:
"""
Detect lines that are likely asking for interactive *non-password* input:
confirmations, choices, etc.
NOTE: password prompts are handled separately in is_password_prompt().
"""
lower = line.lower().rstrip()
if not lower:
return False
# Don't treat password prompts as generic interactive prompts
if "password" in lower or "passphrase" in lower:
return False
looks_like_prompt = (
lower.endswith(":")
or lower.endswith("?")
or "[y/n]" in lower
or "[y/n]" in lower.replace(" ", "")
or "[y/n]" in lower.replace(" ", "").replace("[y/n]", "[y/n]")
)
if not looks_like_prompt:
return False
keywords = [
"do you want to continue",
"continue? [y/n]",
"press enter",
"press return",
"enter choice",
"enter selection",
"enter new",
"are you sure",
]
return any(k in lower for k in keywords)
# ---------------- COMMAND SANITISER ----------------
def make_noninteractive(command: str) -> str:
"""
Rewrite certain commands to be non-interactive.
- For any 'systemctl' invocation, ensure '--no-pager' is present.
"""
parts = command.strip().split()
if not parts:
return command
# Find 'systemctl' in the token list
try:
idx = parts.index("systemctl")
except ValueError:
return command # nothing to change
# If '--no-pager' is not already present, insert it after 'systemctl'
if "--no-pager" not in parts[idx + 1:]:
parts.insert(idx + 1, "--no-pager")
return " ".join(parts)
# ---------------- SSH SESSION ----------------
class SSHSession:
"""
Remote executor over SSH.
Interface: connect(), run(command, idle_timeout), close()
"""
def __init__(self, host: str, username: str, password: str, port: int = 22):
self.host = host
self.username = username
self.password = password
self.port = port
self.client: paramiko.SSHClient | None = None
def connect(self):
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(
hostname=self.host,
port=self.port,
username=self.username,
password=self.password,
look_for_keys=False,
)
def run(self, command: str, idle_timeout: int = 300) -> Dict[str, Any]:
"""
Run a command on the remote host with an *idle* timeout, streaming output.
If the command prompts for input (passwords, confirmations, etc.),
ask the user (for non-password prompts) or send the stored password.
"""
if not self.client:
raise RuntimeError("SSH not connected")
transport = self.client.get_transport()
if not transport:
raise RuntimeError("No SSH transport")
# Sanitize to avoid interactive pagers etc.
cmd_stripped = command.strip()
effective_cmd = make_noninteractive(cmd_stripped)
channel = transport.open_session()
uses_sudo = effective_cmd.startswith("sudo ")
if uses_sudo:
channel.get_pty() # sudo likes a TTY
channel.exec_command(effective_cmd)
# Send sudo password once if needed (initial)
if uses_sudo and self.password:
try:
channel.send(self.password + "\n")
except Exception:
pass
stdout_chunks: list[str] = []
stderr_chunks: list[str] = []
last_activity = time.time()
timed_out = False
stdout_started = False
stderr_started = False
while True:
activity = False
if channel.recv_ready():
chunk = channel.recv(4096).decode("utf-8", errors="replace")
stdout_chunks.append(chunk)
activity = True
if not stdout_started:
print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="")
stdout_started = True
for line in chunk.splitlines(keepends=True):
print(line, end="", flush=True)
# Password prompt? auto-send stored password
if is_password_prompt(line, self.username):
if self.password:
try:
channel.send(self.password + "\n")
except Exception:
pass
continue
# Generic interactive prompt? ask user
if is_interactive_prompt(line):
user_val = input(
f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} "
)
try:
channel.send(user_val + "\n")
except Exception:
pass
if channel.recv_stderr_ready():
chunk = channel.recv_stderr(4096).decode("utf-8", errors="replace")
stderr_chunks.append(chunk)
activity = True
if not stderr_started:
print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="")
stderr_started = True
for line in chunk.splitlines(keepends=True):
print(line, end="", flush=True)
if is_password_prompt(line, self.username):
if self.password:
try:
channel.send(self.password + "\n")
except Exception:
pass
continue
if is_interactive_prompt(line):
user_val = input(
f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} "
)
try:
channel.send(user_val + "\n")
except Exception:
pass
if activity:
last_activity = time.time()
if channel.exit_status_ready():
# Drain any remaining data
while channel.recv_ready():
chunk = channel.recv(4096).decode("utf-8", errors="replace")
stdout_chunks.append(chunk)
if not stdout_started:
print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="")
stdout_started = True
for line in chunk.splitlines(keepends=True):
print(line, end="", flush=True)
if is_password_prompt(line, self.username):
if self.password:
try:
channel.send(self.password + "\n")
except Exception:
pass
continue
if is_interactive_prompt(line):
user_val = input(
f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} "
)
try:
channel.send(user_val + "\n")
except Exception:
pass
while channel.recv_stderr_ready():
chunk = channel.recv_stderr(4096).decode("utf-8", errors="replace")
stderr_chunks.append(chunk)
if not stderr_started:
print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="")
stderr_started = True
for line in chunk.splitlines(keepends=True):
print(line, end="", flush=True)
if is_password_prompt(line, self.username):
if self.password:
try:
channel.send(self.password + "\n")
except Exception:
pass
continue
if is_interactive_prompt(line):
user_val = input(
f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} "
)
try:
channel.send(user_val + "\n")
except Exception:
pass
break
if time.time() - last_activity > idle_timeout:
timed_out = True
note = (
f"\n[CLIENT NOTICE] No output for {idle_timeout} seconds. "
f"Command treated as stuck and channel closed by client.\n"
)
stderr_chunks.append(note)
if not stderr_started:
print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="")
stderr_started = True
print(note, end="", flush=True)
channel.close()
break
time.sleep(0.2)
if timed_out:
exit_status = -1
else:
exit_status = channel.recv_exit_status()
stdout = "".join(stdout_chunks)
stderr = "".join(stderr_chunks)
if "sudo" in effective_cmd and "not in the sudoers file" in stderr:
print(
f"\n{COL_WARN}[SUDO ERROR]{RESET} User '{self.username}' is not allowed to use sudo on remote host."
)
return {
"stdout": stdout,
"stderr": stderr,
"exit_status": exit_status,
"timed_out": timed_out,
"idle_timeout": idle_timeout,
}
def close(self):
if self.client:
self.client.close()
# ---------------- LOCAL SESSION ----------------
class LocalSession:
"""
Local executor (no SSH) when host is localhost/127.0.0.1.
Interface: connect(), run(command, idle_timeout), close()
"""
def __init__(self, username: str, password: str):
self.username = username
self.password = password
def connect(self):
# Nothing to do for local
pass
def run(self, command: str, idle_timeout: int = 300) -> Dict[str, Any]:
"""
Run a command *locally* with an idle timeout, streaming output.
For sudo commands:
- We rewrite 'sudo cmd...' to 'sudo -S cmd...' so sudo reads password from stdin.
- We send the provided password once.
If the command prompts for input (passwords, confirmations, etc.),
ask the user (for non-password prompts) or send the stored password.
"""
cmd_stripped = command.strip()
cmd_sanitized = make_noninteractive(cmd_stripped)
uses_sudo = cmd_sanitized.startswith("sudo ")
effective_cmd = cmd_sanitized
if uses_sudo:
# ensure sudo reads password from stdin: add -S
parts = cmd_sanitized.split()
if len(parts) >= 1 and parts[0] == "sudo" and "-S" not in parts[1:2]:
parts.insert(1, "-S")
effective_cmd = " ".join(parts)
proc = subprocess.Popen(
effective_cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
def safe_write_stdin(data: str):
"""Write to stdin only if it is still open, swallow BrokenPipe."""
if proc.stdin is None or proc.stdin.closed:
return
try:
proc.stdin.write(data)
proc.stdin.flush()
except BrokenPipeError:
# child closed pipe, don't spam errors
pass
except Exception:
# ignore any other stdin write errors
pass
# Send sudo password if needed
if uses_sudo and self.password:
safe_write_stdin(self.password + "\n")
stdout_chunks: list[str] = []
stderr_chunks: list[str] = []
last_activity = time.time()
timed_out = False
stdout_started = False
stderr_started = False
while True:
# --- If process is finished, drain remaining output and break ---
if proc.poll() is not None:
# Drain remaining stdout
if proc.stdout:
for line in proc.stdout:
stdout_chunks.append(line)
if not stdout_started:
print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="")
stdout_started = True
print(line, end="", flush=True)
if is_password_prompt(line, self.username):
if self.password:
safe_write_stdin(self.password + "\n")
continue
if is_interactive_prompt(line):
user_val = input(
f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} "
)
safe_write_stdin(user_val + "\n")
# Drain remaining stderr
if proc.stderr:
for line in proc.stderr:
stderr_chunks.append(line)
if not stderr_started:
print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="")
stderr_started = True
print(line, end="", flush=True)
if is_password_prompt(line, self.username):
if self.password:
safe_write_stdin(self.password + "\n")
continue
if is_interactive_prompt(line):
user_val = input(
f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} "
)
safe_write_stdin(user_val + "\n")
break # process is done, leave loop
# --- Process still running: check for new output with select() ---
activity = False
fds = []
if proc.stdout:
fds.append(proc.stdout)
if proc.stderr:
fds.append(proc.stderr)
if not fds:
break
ready, _, _ = select.select(fds, [], [], 0.2)
if proc.stdout in ready:
line = proc.stdout.readline()
if line:
stdout_chunks.append(line)
activity = True
if not stdout_started:
print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="")
stdout_started = True
print(line, end="", flush=True)
if is_password_prompt(line, self.username):
if self.password:
safe_write_stdin(self.password + "\n")
activity = True
elif is_interactive_prompt(line):
user_val = input(
f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} "
)
safe_write_stdin(user_val + "\n")
activity = True
if proc.stderr in ready:
line = proc.stderr.readline()
if line:
stderr_chunks.append(line)
activity = True
if not stderr_started:
print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="")
stderr_started = True
print(line, end="", flush=True)
if is_password_prompt(line, self.username):
if self.password:
safe_write_stdin(self.password + "\n")
activity = True
elif is_interactive_prompt(line):
user_val = input(
f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} "
)
safe_write_stdin(user_val + "\n")
activity = True
if activity:
last_activity = time.time()
# --- Idle-timeout only while the process is still running and silent ---
if time.time() - last_activity > idle_timeout:
timed_out = True
note = (
f"\n[CLIENT NOTICE] No output for {idle_timeout} seconds. "
f"Command treated as stuck and process killed by client.\n"
)
stderr_chunks.append(note)
if not stderr_started:
print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="")
stderr_started = True
print(note, end="", flush=True)
proc.kill()
break
# Make sure stdin is closed so GC doesn't try to flush a dead pipe
if proc.stdin is not None and not proc.stdin.closed:
try:
proc.stdin.close()
except Exception:
pass
exit_status = proc.returncode if proc.returncode is not None else -1
stdout = "".join(stdout_chunks)
stderr = "".join(stderr_chunks)
if uses_sudo and "not in the sudoers file" in stderr:
print(
f"\n{COL_WARN}[SUDO ERROR]{RESET} Local user '{self.username}' is not allowed to use sudo."
)
return {
"stdout": stdout,
"stderr": stderr,
"exit_status": exit_status,
"timed_out": timed_out,
"idle_timeout": idle_timeout,
}
def close(self):
# Nothing to close for local
pass
# ---------------- SYSTEM PROMPT ----------------
SYSTEM_PROMPT = """
You are an autonomous Linux sysadmin assistant running behind an API that
expects STRICT JSON responses.
IMPORTANT RULES ABOUT OUTPUT:
- Your ENTIRE reply MUST be exactly ONE JSON object.
- Do NOT include multiple JSON objects.
- Do NOT include any extra text, comments, or markdown.
- Do NOT wrap the JSON in ``` fences.
INTERACTION MODEL:
- You do NOT have direct shell access; you can only propose commands.
- The client program runs your commands and sends back stdout/stderr.
- The client supports answering line-based prompts (like 'Do you want to continue?'),
but it CANNOT reliably drive interactive TUIs or full-screen console menus.
ABSOLUTE RESTRICTIONS:
- NEVER use interactive console/TUI tools like: whiptail, dialog, nmtui, top, htop,
alsamixer, raspi-config, menuconfig, or dpkg-reconfigure without a fully
non-interactive mode.
- NEVER rely on curses-style full-screen UIs, wizard menus, or arrow-key navigation.
- ALWAYS use non-interactive configuration methods:
- edit config files directly with sed/awk/cat/tee,
- or use tools/flags that are specifically documented as non-interactive.
APT / PACKAGE MANAGEMENT:
- When using apt-get/apt:
- Always include: `DEBIAN_FRONTEND=noninteractive` in the environment.
- Use: `apt-get -y` (assume yes to prompts).
Example:
DEBIAN_FRONTEND=noninteractive sudo apt-get update
DEBIAN_FRONTEND=noninteractive sudo apt-get -y install rkhunter
COMMAND RESPONSE FORMAT:
At each step you MUST output JSON ONLY, in one of these forms:
1) To run a command:
{
"type": "command",
"thought": "<brief reasoning>",
"risk": "<brief risk description>",
"command": "<bash command>"
}
2) When finished:
{
"type": "done",
"thought": "<why>",
"summary": "<final result for the user>"
}
The "risk" field is REQUIRED for every command.
Notes about timeouts & interactivity:
- The client enforces an *idle* timeout: if your command produces NO output
(neither stdout nor stderr) for some period, it will be killed and marked
as timed out.
- Do NOT wrap your commands with the 'timeout' utility; the client already
handles stuck commands.
- Avoid interactive commands that wait for user input (like pagers or editors).
Always use non-interactive flags such as '--no-pager', '-n', '-y', etc.
- If a command times out or fails, read the error / partial output and try
a different strategy (shorter commands, non-interactive flags, etc.).
General behavior:
- Keep commands simple and robust.
- Prefer non-destructive checks first (ls, cat, grep, systemctl status).
- Avoid obviously destructive commands (rm -rf /, dropping entire databases, etc.)
unless the user explicitly requested such destructive action.
- Use previous command outputs to adjust your next step.
"""
def truncate(s: str, limit: int = 2000) -> str:
s = s or ""
if len(s) <= limit:
return s
return s[:limit] + f"\n...[truncated {len(s)-limit} chars]"
# ---------------- AGENT LOOP ----------------
def compute_effective_idle_timeout(command: str, default_idle: int) -> int:
"""
If command starts with 'timeout N ...', use max(default_idle, N+30) as idle timeout,
so we don't kill it earlier than the command's own timeout.
Otherwise return default_idle.
"""
cmd = command.strip()
if not cmd.startswith("timeout "):
return default_idle
parts = cmd.split()
if len(parts) < 2:
return default_idle
try:
secs = int(parts[1])
return max(default_idle, secs + 30)
except ValueError:
return max(default_idle, 600)
def run_agent(goal: str, executor, username: str, max_steps: int = 20, idle_timeout: int = 300):
history = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"GOAL: {goal}"},
]
allow_all = False # when True, no more prompts; all commands auto-run
for step in range(1, max_steps + 1):
print(f"\n\n{COL_SEP}==================== STEP {step} ===================={RESET}\n")
history.append({"role": "user", "content": f"STEP {step}: Decide next action."})
action = call_gpt(history)
action_type = action.get("type")
thought = action.get("thought", "")
risk = action.get("risk", "").strip()
thought_text = thought or "(no thought provided)"
print(f"{COL_THOUGHT}[MODEL THOUGHT]{RESET} {COL_THOUGHT}{thought_text}{RESET}")
if not risk:
risk = "Risk not specified (assume minimal, but review command carefully)."
print(f"{COL_RISK}[RISK]{RESET} {COL_RISK}{risk}{RESET}")
if action_type == "done":
summary = action.get("summary", "") or "(no summary provided)"
print(f"{COL_INFO}[SUMMARY]{RESET} {COL_INFO}{summary}{RESET}")
# Soft close: ask user if anything else / not working
while True:
follow = input(
f"\n{COL_INPUT}Please test the changes. "
"Is something not working or do you have another request? "
"[y = yes, continue, n = no, finish]: "
f"{RESET}"
).strip().lower()
if follow in ("y", "n"):
break
print(f"{COL_WARN}Please enter 'y' or 'n'.{RESET}")
if follow == "n":
print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!")
break
else:
print(
f"{COL_INPUT}Describe what you want next or what is not working "
"(single line is fine):{RESET}"
)
extra = input("> ").strip()
if not extra:
print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!")
break
history.append({
"role": "user",
"content": (
"New follow-up request / issue from user after testing:\n"
f"{extra}\n\n"
"Please continue from the current state and propose the next command."
),
})
continue
command = action.get("command", "").strip()
if not command:
print(f"{COL_WARN}[WARN]{RESET} No command returned by model. Aborting.")
print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!")
break
print(f"{COL_CMD}[PROPOSED COMMAND]{RESET} {COL_CMD}{command}{RESET}")
# Decide whether to prompt, or auto-run
if not allow_all:
# y=run, s=skip, n=terminate, e=edit thought, a=allow all
while True:
choice = input(
f"\n{COL_INPUT}Allow this command? "
"[y = run, s = skip, n = terminate, e = edit thought, a = allow all]: "
f"{RESET}"
).strip().lower()
if choice in ("y", "s", "n", "e", "a"):
break
print(f"{COL_WARN}Please enter 'y', 's', 'n', 'e', or 'a'.{RESET}")
if choice == "n":
print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!")
break
if choice == "s":
history.append({
"role": "user",
"content": (
f"I refused to run this command:\n{command}\n\n"
"The stated risk was:\n"
f"{risk}\n\n"
"Please propose a safer or alternative command."
),
})
continue
if choice == "e":
print(
f"{COL_INPUT}Enter your instructions or edited reasoning "
"(single line; keep it brief):{RESET}"
)
user_edit = input("> ").strip()
if not user_edit:
user_edit = "No additional details; just propose a better command."
history.append({
"role": "user",
"content": (
"I want you to revise your reasoning and plan. "
"Instead of the previous command, follow this guidance:\n"
f"{user_edit}\n\n"
"Ignore the last proposed command and propose a new command "
"in the next step."
),
})
continue
if choice == "a":
allow_all = True
print(f"{COL_INFO}[INFO]{RESET} All future commands will be executed without confirmation.")
# --- Guardrail: block known interactive TUI / console GUI tools ---
TUI_BLOCKLIST = [
"whiptail",
"dialog ",
" nmtui",
"raspi-config",
"menuconfig",
"htop",
"top ",
"alsamixer",
"dpkg-reconfigure",
]
if any(bad in command for bad in TUI_BLOCKLIST):
print(
f"{COL_WARN}[BLOCKED]{RESET} Command looks like an interactive console GUI/TUI "
"which the agent cannot reliably drive."
)
history.append({
"role": "user",
"content": (
"I refused to run this command because it uses an interactive "
"console GUI/TUI tool that cannot be driven programmatically:\n"
f"{command}\n\n"
"You MUST instead use a non-interactive approach, such as editing "
"configuration files directly, or using flags/environment variables "
"for non-interactive mode. Propose a different command."
),
})
continue
# --- Normalize apt commands to be non-interactive ---
def make_apt_noninteractive(cmd: str) -> str:
if "apt-get" in cmd or " apt " in cmd or cmd.startswith("apt "):
# prepend DEBIAN_FRONTEND if not present
if "DEBIAN_FRONTEND=" not in cmd:
cmd = "DEBIAN_FRONTEND=noninteractive " + cmd
# ensure -y is present for apt-get/apt install
if ("apt-get" in cmd or " apt " in cmd) and " install " in cmd and " -y" not in cmd:
cmd = cmd.replace(" install ", " install -y ")
return cmd
command = make_apt_noninteractive(command)
effective_idle = compute_effective_idle_timeout(command, idle_timeout)
print(
f"{COL_INFO}[INFO]{RESET} Executing command with idle timeout = "
f"{effective_idle} seconds..."
)
result = executor.run(command, idle_timeout=effective_idle)
if result["timed_out"] or result["exit_status"] != 0:
col = COL_EXIT_BAD
else:
col = COL_EXIT_OK
if result["timed_out"]:
print(
f"\n{col}[EXIT STATUS]{RESET} -1 "
f"(IDLE TIMEOUT: no output for {result['idle_timeout']} seconds)"
)
else:
print(f"\n{col}[EXIT STATUS]{RESET} {result['exit_status']}")
feedback = (
f"Command: {command}\n"
f"Exit status: {result['exit_status']}\n"
f"Timed out (idle): {result['timed_out']} "
f"(idle_timeout={result['idle_timeout']} seconds)\n"
f"STDOUT:\n{truncate(result['stdout'])}\n\n"
f"STDERR:\n{truncate(result['stderr'])}\n"
)
history.append({"role": "user", "content": "RESULT OF YOUR COMMAND:\n" + feedback})
# ---------------- MAIN ENTRY (INTERACTIVE + ARGUMENTS) ----------------
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Autonomous SSH GPT Sysadmin (Semi-Manual)"
)
parser.add_argument("--host", type=str, help="Target host (IP or hostname)")
parser.add_argument("--username", type=str, help="SSH/local username")
parser.add_argument("--password", type=str, help="SSH/local & sudo password")
parser.add_argument("--goal", type=str, help="Task/goal for the agent to perform")
args = parser.parse_args()
print(f"{COL_SEP}=== Autonomous SSH GPT Sysadmin (Semi-Manual) ==={RESET}\n")
# Resolve missing arguments with prompts
host = args.host or input("Enter host (IP or hostname): ").strip()
username = args.username or input("Enter username: ").strip()
if args.password:
password = args.password
else:
password = input("Enter password (SSH/local & sudo): ").strip()
goal = args.goal or input("Enter goal (task to perform): ").strip()
# Decide local vs SSH
host_lower = host.lower()
if host_lower in ("localhost", "127.0.0.1"):
print(f"\n{COL_INFO}[INFO]{RESET} Using LOCAL execution (no SSH) on this machine.")
executor = LocalSession(username=username, password=password)
executor.connect()
else:
print(f"\n{COL_INFO}[INFO]{RESET} Using SSH to connect to {host}.")
executor = SSHSession(host=host, username=username, password=password)
print(f"{COL_INFO}[INFO]{RESET} Connecting...")
executor.connect()
print(f"{COL_INFO}[INFO]{RESET} Connected.\n")
try:
run_agent(goal, executor, username=username, max_steps=100, idle_timeout=300)
finally:
executor.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment