Last active
November 20, 2025 14:37
-
-
Save jpic/715f46ba7c32228db9f0d97ce90dca68 to your computer and use it in GitHub Desktop.
file read tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| read_file_slice() { | |
| local path="$1" | |
| local start_byte="${2:-0}" | |
| local length="${3:-32768}" | |
| local show_hex="${4:-false}" | |
| # Safety caps | |
| (( length > 131072 )) && length=131072 | |
| (( start_byte < 0 )) && start_byte=0 | |
| # Resolve path | |
| path=$(realpath "$path" 2>/dev/null || printf '%s\n' "$path") | |
| # Header | |
| printf '[read_file_slice] %s\n' "$path" | |
| printf '[Requested bytes: %d–%d (%d bytes)]\n\n' \ | |
| "$start_byte" "$((start_byte + length - 1))" "$length" | |
| # ── Extract the exact byte range ───────────────────────────────────── | |
| if [[ "$path" == *.gz ]]; then | |
| block_size=1048576 | |
| approx_block=$(( start_byte / block_size )) | |
| offset_in_block=$(( start_byte % block_size )) | |
| (dd if="$path" bs="$block_size" skip="$approx_block" count=8 status=none 2>/dev/null || true) | | |
| gzip -dc 2>/dev/null | | |
| dd bs=1 skip="$offset_in_block" count="$length" status=none 2>/dev/null | |
| else | |
| dd if="$path" bs=1 skip="$start_byte" count="$length" status=none 2>/dev/null | |
| fi | { | |
| # Read everything once (preserves NUL bytes!) | |
| IFS= read -r -d '' data || true | |
| if [ "$show_hex" = true ]; then | |
| printf '[show_hex forced → hexdump]\n' | |
| printf '%s' "$data" | hexdump -ve '1/1 "%.02X "' -e '16/1 "%_c" "\n"' | |
| return | |
| fi | |
| # Detect binary: >15% non-printable/control chars in first 32KB | |
| nonprint=$(printf '%s' "$data" | head -c 32768 | | |
| tr -d -c '[\001-\010\016-\037\177-\377]' | wc -c) | |
| if (( nonprint * 100 <= 32768 * 15 )); then | |
| # Text → raw output | |
| printf '%s' "$data" | |
| else | |
| # Binary → hexdump | |
| printf '[BINARY DETECTED → hexdump of requested range]\n' | |
| printf '%s' "$data" | hexdump -ve '1/1 "%.02X "' -e '16/1 "%_c" "\n"' | |
| fi | |
| } | |
| } |
Author
Author
{
"name": "find_files",
"description": "Powerful and safe file discovery (replacement for raw find). Automatically limits output, shows size+mtime, supports globs and common debugging filters. Use this every time you need to locate logs, cores, configs, sockets, huge files, or anything modified recently.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"default": "/",
"description": "Start directory (e.g. /var/log, /home, /tmp)"
},
"name": {
"type": "string",
"description": "Name pattern or glob (e.g. '*.log', 'core.*', 'access.log*', 'hs_err_pid*')"
},
"type": {
"type": "string",
"enum": ["f", "d", "l", "s", "p", "c", "b"],
"description": "File type: f=file, d=directory, l=symlink, s=socket, p=pipe, c=char dev, b=block dev"
},
"size_min_mb": {
"type": "integer",
"description": "Minimum size in MB (e.g. 100 → only files ≥ 100 MB)"
},
"size_max_mb": {
"type": "integer",
"description": "Maximum size in MB"
},
"mtime_days": {
"type": "integer",
"description": "Modified in last N days (e.g. 7 → past week)"
},
"newer_than": {
"type": "string",
"description": "Modified more recently than this file (e.g. /var/log/syslog)"
},
"perm": {
"type": "string",
"description": "Permissions (octal or symbolic, e.g. 777, u+w)"
},
"user": {
"type": "string",
"description": "Owner username or UID"
},
"limit": {
"type": "integer",
"default": 150,
"maximum": 500,
"description": "Max results (default 150)"
},
"sort_by": {
"type": "string",
"enum": ["size", "mtime", "name"],
"default": "mtime",
"description": "Sort results by size, modification time, or name"
}
},
"required": ["path"],
"additionalProperties": false
}
}find_files() {
local path="${1:-/}"
local name="$2"
local type="$3"
local size_min_mb="$4"
local size_max_mb="$5"
local mtime_days="$6"
local newer_than="$7"
local perm="$8"
local user="$9"
local limit="${10:-150}"
local sort_by="${11:-mtime}"
echo "[find_files] Searching in $path"
[ -n "$name" ] && echo " name: $name"
[ -n "$type" ] && echo " type: $type"
[ -n "$size_min_mb" ] && echo " size ≥ ${size_min_mb}MB"
[ -n "$mtime_days" ] && echo " modified ≤ ${mtime_days} days ago"
echo
local find_expr=( "$path" )
[ -n "$name" ] && find_expr+=( -iname "$name" )
[ -n "$type" ] && find_expr+=( -type "$type" )
[ -n "$size_min_mb" ] && find_expr+=( -size "+${size_min_mb}M" )
[ -n "$size_max_mb" ] && find_expr+=( -size "-${size_max_mb}M" )
[ -n "$mtime_days" ] && find_expr+=( -mtime "-$mtime_days" )
[ -n "$newer_than" ] && find_expr+=( -newer "$newer_than" )
[ -n "$perm" ] && find_expr+=( -perm "$perm" )
[ -n "$user" ] && find_expr+=( -user "$user" )
# Always exclude noisy paths
find_expr+=( -not \( -path "*/proc/*" -o -path "*/sys/*" -o -path "*/dev/*" -o -path "*/run/*" -prune \) )
command find "${find_expr[@]}" 2>/dev/null | \
xargs -d '\n' -r stat -c "%A %8s %n %Y" 2>/dev/null | \
awk -v limit="$limit" '
{
perm=$1; size=$2; path=$3; mtime=$4 + 0
print size, mtime, path
}' | \
sort -k "$(case "'$sort_by'" in
size) echo "1nr" ;;
mtime) echo "2nr" ;;
*) echo "3" ;;
esac)" | \
head -n "$limit" | \
awk -v limit="$limit" '{
printf "%10d %s %s\n", $1, strftime("%Y-%m-%d %H:%M", $2), $3
}'
}
Author
list_network() {
local filter="$1" listening="$2" established="$3" port="$4" pid="$5"
local unix="$6" high_conn="$7" susp="$8" limit="${9:-200}"
echo "[list_network] Network overview"
[ "$listening" = "true" ] && echo " → listening sockets only"
[ "$established" = "true" ] && echo " → established connections only"
[ -n "$filter" ] && echo " → filter: $filter"
[ -n "$port" ] && echo " → port: $port"
[ "$high_conn" = "true" ] && echo " → high connection count only"
[ "$susp" = "true" ] && echo " → highlighting suspicious ports"
[ "$unix" = "true" ] && echo " → Unix domain sockets"
echo
# TCP/UDP + process info via ss (fastest + works everywhere)
if [ "$unix" != "true" ]; then
ss -anp 2>/dev/null | tail +2 | \
awk -v f="$filter" -v l="$listening" -v e="$established" \
-v port="$port" -v pid="$pid" -v hc="$high_conn" -v susp="$susp" '
function basename(s) {sub(".*/","",s); return s}
{
proto=$1; state=$2; local=$4; remote=$5; proc=$7
gsub(/users:\(\("/,"",proc); gsub(/\"\)\)/,"",proc)
split(proc,a,",")
pid=a[2]; cmd=basename(a[1]); gsub(/.*=/,"",pid)
split(local, la, ":"); lport=la[length(la)]
split(remote, ra, ":"); rport=ra[length(ra)]
if (l == "true" && state != "LISTEN") next
if (e == "true" && state == "LISTEN") next
if (port && lport != port && rport != port) next
if (pid && $pid != pid) next
if (f && cmd !~ f && pid !~ f) next
flag=""
if (susp == "true") {
if (lport+0 < 1024 && pid != "0") flag=flag " [ROOT<1024]"
if (lport+0 > 49152) flag=flag " [EPHEMERAL]"
if (lport+0 == 22 || lport+0 == 80 || lport+0 == 443 || lport+0 == 5432) flag=""
}
printf "%-5s %-8s %-25s %-25s %8s %6s %-20s %s\n",
proto, state, local, remote, pid, lport+0, cmd, flag
count[pid]++
} END {
if (hc == "true") {
print "\n[High connection processes]"
for (p in count) if (count[p] > 500) print " " p ": " count[p] " connections"
}
}'
fi
# Unix sockets (separate because ss -a shows them differently)
if [ "$unix" = "true" ] || [ -z "$1" ]; then
echo
echo "[Unix domain sockets]"
ss -a -x 2>/dev/null | tail +2 | head -50 | \
awk '{print " " $5 " → " $6 " " $7 " " $8}'
fi
}{
"name": "list_network",
"description": "Smart replacement for netstat/ss/lsof -i. Shows all listening ports, established connections, Unix sockets, and anomalies (high connection count, weird states, non-standard ports). Automatically highlights suspicious/risky things. Never run ss or netstat manually again.",
"parameters": {
"type": "object",
"properties": {
"filter": {
"type": "string",
"description": "Filter by process name, PID, or port (e.g. nginx, 5432, redis, java)"
},
"listening": {
"type": "boolean",
"default": false,
"description": "Show only listening sockets (like netstat -tuln)"
},
"established": {
"type": "boolean",
"default": false,
"description": "Show only established connections"
},
"port": {
"type": "integer",
"description": "Show only this port number"
},
"pid": {
"type": "integer",
"description": "Show network activity for this PID only"
},
"unix_sockets": {
"type": "boolean",
"default": false,
"description": "Show Unix domain sockets (docker.sock, mysql.sock, etc.)"
},
"high_connections": {
"type": "boolean",
"default": false,
"description": "Show processes with > 500 established connections"
},
"suspicious": {
"type": "boolean",
"default": false,
"description": "Highlight non-standard listening ports (<1024 without root, >49152, odd services)"
},
"limit": {
"type": "integer",
"default": 200,
"maximum": 1000
}
},
"additionalProperties": false
}
}
Author
import textwrap
from typing import Optional
import httpx
import time
import asyncio
from django.db import models
from your_app.models import HostName
from your_framework import Tool, Parameter
async def get_thanos_url_for_hostname(hostname_hint: str) -> str:
'''Resolve any hostname hint → correct Thanos URL using async Django ORM.'''
host_obj = await HostName.objects.filter(
models.Q(hostname__iexact=hostname_hint) |
models.Q(fqdn__iexact=hostname_hint)
).select_related('server__environment').afirst()
if not host_obj:
raise ValueError(f"Host '{hostname_hint}' not found in inventory")
if not host_obj.server or not host_obj.server.environment:
raise ValueError(f"Host '{hostname_hint}' has no server or environment configured")
url = host_obj.server.environment.thanos_query_server
if not url:
raise ValueError(f"Environment for '{hostname_hint}' has no Thanos URL")
return url.rstrip('/')
class ListPrometheusInstances(Tool):
description = textwrap.dedent('''
List currently scraped instances in the correct environment.
Handles:
• node_exporter → instance = http://primary-fqdn:9100
• cassandra_exporter → instance = cass-node-name:8080 + cluster/rack labels
''')
hostname_hint = Parameter(
type='string',
description='Any hostname/FQDN you know (e.g. db01, cass-stg-07.example.com)',
required=True,
)
job = Parameter(
type='string',
description="Optional filter: 'node_exporter' or 'cassandra_exporter'",
required=False,
)
async def run(self, conversation, hostname_hint: str, job: Optional[str] = None) -> str:
try:
thanos_base = await get_thanos_url_for_hostname(hostname_hint)
except Exception as e:
return f"[list_prometheus_instances] ERROR: {e}"
env_name = 'PRODUCTION' if 'prod' in thanos_base.lower() else 'STAGING'
# Build correct match expression
if job == 'node_exporter':
match_query = 'up{job="node_exporter"}'
elif job == 'cassandra_exporter':
match_query = 'up{job="cassandra_exporter"}'
else:
match_query = 'up'
async with httpx.AsyncClient(timeout=20.0) as client:
try:
resp = await client.get(
f"{thanos_base}/api/v1/query",
params={'query': match_query},
timeout=20,
)
resp.raise_for_status()
results = resp.json()['data']['result']
except Exception as e:
return f"[list_prometheus_instances] {env_name} — Query failed: {e}"
if not results:
return f"[list_prometheus_instances] {env_name} — No instances found."
lines = [
f"[list_prometheus_instances] {env_name} (resolved via {hostname_hint})",
f" Found {len(results)} instances\n"
]
for r in sorted(results, key=lambda x: x['metric'].get('instance', '')):
m = r['metric']
instance_raw = m.get('instance', '?')
job_name = m.get('job', 'unknown')
if job_name == 'node_exporter':
# http://host.prod.example.com:9100 → host.prod.example.com
clean = instance_raw.replace('http://', '').split(':')[0]
display = f"{clean:<48} node_exporter"
else:
# Cassandra exporter
cass_node = instance_raw.split(':')[0]
cluster = m.get('cluster', '?')
rack = m.get('rack', '?')
display = f"{cass_node:<30} cassandra_exporter cluster={cluster} rack={rack}"
lines.append(f" {display}")
return '\n'.join(lines)
class QueryPrometheus(Tool):
description = textwrap.dedent('''
Run any PromQL query in the correct environment.
Works perfectly with both:
• instance="http://primary-fqdn:9100" (node_exporter)
• instance="cassandra-node:8080" + cluster/rack (cassandra_exporter)
''')
hostname_hint = Parameter(
type='string',
description='Any known hostname in the target environment',
required=True,
)
query = Parameter(
type='string',
description='Exact PromQL query — use instance labels from list_prometheus_instances',
required=True,
)
time_range_minutes = Parameter(
type='integer',
description='For range queries: look back N minutes',
required=False,
)
async def run(self, conversation, hostname_hint: str, query: str,
time_range_minutes: Optional[int] = None) -> str:
try:
thanos_base = await get_thanos_url_for_hostname(hostname_hint)
except Exception as e:
return f"[query_prometheus] ERROR: {e}"
env_name = 'PRODUCTION' if 'prod' in thanos_base.lower() else 'STAGING'
async with httpx.AsyncClient(timeout=45.0) as client:
try:
if time_range_minutes and time_range_minutes > 0:
end = int(time.time())
start = end - time_range_minutes * 60
resp = await client.get(
f"{thanos_base}/api/v1/query_range",
params={
'query': query,
'start': start,
'end': end,
'step': '60s',
},
)
else:
resp = await client.get(
f"{thanos_base}/api/v1/query",
params={'query': query},
)
resp.raise_for_status()
data = resp.json()['data']
except Exception as e:
return f"[query_prometheus] {env_name} — Request failed: {e}"
if not data.get('result'):
return f"[query_prometheus] {env_name} — No results\nQuery: {query}"
lines = [
f"[query_prometheus] {env_name} (via {hostname_hint})",
f"Query: {query}",
]
if time_range_minutes:
lines.append(f"Range: last {time_range_minutes} min")
for series in data['result'][:50]:
m = series['metric']
inst_raw = m.get('instance', m.get('job', 'unknown'))
job_name = m.get('job', 'unknown')
if job_name == 'node_exporter':
inst_display = inst_raw.replace('http://', '').split(':')[0]
extra = ''
else:
inst_display = inst_raw.split(':')[0]
cluster = m.get('cluster', '?')
rack = m.get('rack', '?')
extra = f" cluster={cluster} rack={rack}"
labels = ' '.join(
f"{k}={v}"
for k, v in m.items()
if k not in {'__name__', 'instance', 'job', 'cluster', 'rack'}
)
lines.append(f"\n • {inst_display}{extra} {labels}".strip())
if 'value' in series:
lines.append(f" → {series['value'][1]}")
else:
for ts, val in series['values'][-5:]:
try:
tm_resp = await client.get(
f"{thanos_base}/api/v1/format_time",
params={'time': ts},
timeout=5,
)
tm = tm_resp.text.strip() if tm_resp.ok else '?'
except Exception:
tm = '?'
lines.append(f" [{tm}] {val}")
if len(data['result']) > 50:
lines.append(f"\n... (truncated, {len(data['result'])} total series)")
return '\n'.join(lines)
Author
class ListPrometheusInstances(Tool):
description = textwrap.dedent('''
List currently scraped instances in the correct environment.
Your setup:
• job="node" → node_exporter (instance = primary-hostname:9100)
• job="nodetool" → centralized custom exporter (instance is useless, use hostname= label instead)
''')
hostname_hint = Parameter(
type='string',
description='Any hostname you know in the target environment (e.g. web01, cass07, db-stg-03)',
required=True,
)
job = Parameter(
type='string',
description="Filter by job: 'node' or 'nodetool'",
required=False,
)
async def run(self, conversation, hostname_hint: str, job: Optional[str] = None) -> str:
try:
thanos_base = await get_thanos_url_for_hostname(hostname_hint)
except Exception as e:
return f"[list_prometheus_instances] ERROR: {e}"
env_name = 'PRODUCTION' if 'prod' in thanos_base.lower() else 'STAGING'
# Build correct query for each job
if job == 'node':
promql = 'up{job="node"}'
elif job == 'nodetool':
promql = 'up{job="nodetool"}'
else:
promql = 'up{job=~"node|nodetool"}'
async with httpx.AsyncClient(timeout=20.0) as client:
try:
resp = await client.get(
f"{thanos_base}/api/v1/query",
params={'query': promql},
timeout=20,
)
resp.raise_for_status()
results = resp.json()['data']['result']
except Exception as e:
return f"[list_prometheus_instances] {env_name} — Query failed: {e}"
if not results:
return f"[list_prometheus_instances] {env_name} — No instances found."
lines = [
f"[list_prometheus_instances] {env_name} (via {hostname_hint})",
f" Found {len(results)} instances\n"
]
for r in sorted(results, key=lambda x: x['metric'].get('instance', '') or x['metric'].get('hostname', '')):
m = r['metric']
job_name = m.get('job', 'unknown')
if job_name == 'node':
instance = m.get('instance', '?')
clean_host = instance.split(':')[0] # removes :9100
display = f"{clean_host:<45} node_exporter"
else: # job="nodetool"
hostname_label = m.get('hostname', '?')
useless_instance = m.get('instance', '?')
display = f"{hostname_label:<30} nodetool_exporter (instance={useless_instance} → ignore)"
lines.append(f" {display}")
return '\n'.join(lines)### METRICS RULES — OBEY OR FAIL
You have exactly two exporters:
1. Node exporter
job="node"
instance = "primary-hostname:9100" → use this exactly
2. Centralized nodetool exporter
job="nodetool"
instance = useless (always the same) → IGNORE IT
hostname = "cass-prod-07" (real Cassandra node) → use this instead
### WORKFLOW (never skip a step)
1. Always start with:
list_prometheus_instances hostname_hint=<any-host> job=<node|nodetool>
2. Copy the exact instance (for node) or hostname label (for nodetool) from the output.
3. Write queries exactly like this:
# System / host
node_memory_MemAvailable_bytes{job="node", instance="web-prod-01.example.com:9100"}
100 * (1 - node_memory_MemAvailable_bytes{job="node", instance="db01:9100"} / node_memory_MemTotal_bytes{job="node", instance="db01:9100"})
# Cassandra
cassandra_heap_used_bytes{job="nodetool", hostname="cass-prod-07"}
rate(cassandra_gc_duration_seconds_count{job="nodetool", hostname="cass-stg-03"}[5m])
Never guess labels. Never use the wrong job.
If unsure → run list_prometheus_instances first.
Do it right → you’re faster than any human.
Do it wrong → you’re useless.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
{ "name": "find_files", "description": "Powerful and safe file discovery (replacement for raw find). Automatically limits output, shows size+mtime, supports globs and common debugging filters. Use this every time you need to locate logs, cores, configs, sockets, huge files, or anything modified recently.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "default": "/", "description": "Start directory (e.g. /var/log, /home, /tmp)" }, "name": { "type": "string", "description": "Name pattern or glob (e.g. '*.log', 'core.*', 'access.log*', 'hs_err_pid*')" }, "type": { "type": "string", "enum": ["f", "d", "l", "s", "p", "c", "b"], "description": "File type: f=file, d=directory, l=symlink, s=socket, p=pipe, c=char dev, b=block dev" }, "size_min_mb": { "type": "integer", "description": "Minimum size in MB (e.g. 100 → only files ≥ 100 MB)" }, "size_max_mb": { "type": "integer", "description": "Maximum size in MB" }, "mtime_days": { "type": "integer", "description": "Modified in last N days (e.g. 7 → past week)" }, "newer_than": { "type": "string", "description": "Modified more recently than this file (e.g. /var/log/syslog)" }, "perm": { "type": "string", "description": "Permissions (octal or symbolic, e.g. 777, u+w)" }, "user": { "type": "string", "description": "Owner username or UID" }, "limit": { "type": "integer", "default": 150, "maximum": 500, "description": "Max results (default 150)" }, "sort_by": { "type": "string", "enum": ["size", "mtime", "name"], "default": "mtime", "description": "Sort results by size, modification time, or name" } }, "required": ["path"], "additionalProperties": false } }