Skip to content

Instantly share code, notes, and snippets.

@morpheuslord
Created July 1, 2025 06:11
Show Gist options
  • Select an option

  • Save morpheuslord/e85eaccf683c7ccca86debeb235330dc to your computer and use it in GitHub Desktop.

Select an option

Save morpheuslord/e85eaccf683c7ccca86debeb235330dc to your computer and use it in GitHub Desktop.
A easy way to monitor system and container stats for an homelab implementation. This is personalized for me to use Prometheus to scan system resources and Raspi ZeroW to monitor the system stats via a 3.5 Inch Screen.
#!/usr/bin/env python3
"""
Rich-based Docker Container Monitor for 3.5" RPi Screen
Using Rich library for beautiful terminal layouts
"""
import requests
import json
import time
import sys
import os
import psutil
from datetime import datetime
from collections import defaultdict
# Rich imports
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.columns import Columns
from rich.layout import Layout
from rich.live import Live
from rich.text import Text
from rich.progress import BarColumn, Progress
from rich.align import Align
# Configuration
PROMETHEUS_URL = "http://IP:9090"
REFRESH_INTERVAL = 3 # seconds
console = Console()
def fmt_bytes(b):
if b < 1024: return f"{b:.0f}B"
elif b < 1024*1024: return f"{b/1024:.1f}K"
elif b < 1024*1024*1024: return f"{b/(1024*1024):.1f}M"
else: return f"{b/(1024*1024*1024):.1f}G"
def fmt_rate(bps):
return fmt_bytes(bps) + "/s"
def get_temp():
try:
with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f:
return float(f.read()) / 1000.0
except:
return None
def get_load():
"""Get system load average with fallback"""
try:
return os.getloadavg()
except (AttributeError, OSError):
try:
with open('/proc/loadavg', 'r') as f:
load_values = f.read().strip().split()[:3]
return [float(x) for x in load_values]
except:
return [0.0, 0.0, 0.0]
def get_uptime():
try:
with open('/proc/uptime', 'r') as f:
return float(f.read().split()[0])
except:
return None
class PrometheusClient:
def __init__(self, url):
self.url = url
self.ok = False
def query(self, q):
try:
r = requests.get(f"{self.url}/api/v1/query",
params={'query': q}, timeout=3)
r.raise_for_status()
self.ok = True
return r.json()
except:
self.ok = False
return None
def get_containers(self):
stats = defaultdict(dict)
# Network RX/TX rates
for direction, query in [('rx', 'rate(container_network_receive_bytes_total[1m])'),
('tx', 'rate(container_network_transmit_bytes_total[1m])')]:
result = self.query(query)
if result and result['status'] == 'success':
for item in result['data']['result']:
name = item['metric'].get('name', '')
if name and not name.startswith('/'):
stats[name][direction] = float(item['value'][1])
# CPU usage
cpu = self.query('rate(container_cpu_usage_seconds_total[1m]) * 100')
if cpu and cpu['status'] == 'success':
for item in cpu['data']['result']:
name = item['metric'].get('name', '')
if name and not name.startswith('/'):
stats[name]['cpu'] = float(item['value'][1])
# Memory usage
mem = self.query('container_memory_usage_bytes')
if mem and mem['status'] == 'success':
for item in mem['data']['result']:
name = item['metric'].get('name', '')
if name and not name.startswith('/'):
stats[name]['mem'] = float(item['value'][1])
return dict(stats)
class RichMonitor:
def __init__(self):
self.prom = PrometheusClient(PROMETHEUS_URL)
self.last_net = None
self.last_disk = None
self.net_rates = {'rx': 0, 'tx': 0}
self.disk_rates = {'read': 0, 'write': 0}
# Historical data for graphs (last 20 points for small screen)
self.history_size = 20
self.cpu_history = []
self.net_rx_history = []
self.net_tx_history = []
self.disk_read_history = []
self.disk_write_history = []
def get_sys(self):
cpu = psutil.cpu_percent(interval=None)
mem = psutil.virtual_memory()
swap = psutil.swap_memory()
disk = psutil.disk_usage('/')
disk_io = psutil.disk_io_counters()
net = psutil.net_io_counters()
# Calculate rates
if self.last_net:
time_diff = REFRESH_INTERVAL
rx_rate = (net.bytes_recv - self.last_net.bytes_recv) / time_diff
tx_rate = (net.bytes_sent - self.last_net.bytes_sent) / time_diff
self.net_rates = {'rx': rx_rate, 'tx': tx_rate}
self.last_net = net
if self.last_disk:
time_diff = REFRESH_INTERVAL
read_rate = (disk_io.read_bytes - self.last_disk.read_bytes) / time_diff
write_rate = (disk_io.write_bytes - self.last_disk.write_bytes) / time_diff
self.disk_rates = {'read': read_rate, 'write': write_rate}
self.last_disk = disk_io
# Store historical data
self.cpu_history.append(cpu)
self.net_rx_history.append(self.net_rates['rx'])
self.net_tx_history.append(self.net_rates['tx'])
self.disk_read_history.append(self.disk_rates['read'])
self.disk_write_history.append(self.disk_rates['write'])
# Keep only last N points
for history in [self.cpu_history, self.net_rx_history, self.net_tx_history,
self.disk_read_history, self.disk_write_history]:
if len(history) > self.history_size:
history.pop(0)
temp = get_temp()
load_avg = get_load()
uptime = get_uptime()
processes = len(psutil.pids())
return {
'cpu': cpu, 'mem_pct': mem.percent, 'mem_used': mem.used, 'mem_total': mem.total,
'swap_pct': swap.percent, 'swap_used': swap.used,
'disk_pct': (disk.used/disk.total)*100, 'disk_free': disk.free,
'disk_read_rate': self.disk_rates['read'], 'disk_write_rate': self.disk_rates['write'],
'net_rx_total': net.bytes_recv, 'net_tx_total': net.bytes_sent,
'net_rx_rate': self.net_rates['rx'], 'net_tx_rate': self.net_rates['tx'],
'temp': temp, 'load1': load_avg[0], 'load5': load_avg[1], 'uptime': uptime, 'processes': processes
}
def format_uptime(self, seconds):
if not seconds:
return "N/A"
if seconds < 3600:
return f"{int(seconds/60)}m"
elif seconds < 86400:
return f"{int(seconds/3600)}h"
else:
return f"{int(seconds/86400)}d"
def create_sparkline(self, data, width=15):
"""Create sparkline from data"""
if not data or len(data) < 2:
return "─" * width
# Scale data to width
scaled_data = []
step = max(1, len(data) // width)
for i in range(0, len(data), step):
chunk = data[i:i+step]
scaled_data.append(sum(chunk) / len(chunk))
# Ensure exact width
while len(scaled_data) > width:
scaled_data.pop(0)
while len(scaled_data) < width:
scaled_data.insert(0, 0)
# Create sparkline
if max(scaled_data) > 0:
chars = ['▁', '▂', '▃', '▄', '▅', '▆', '▇', '█']
max_val = max(scaled_data)
sparkline = ""
for value in scaled_data:
normalized = value / max_val
char_idx = min(int(normalized * len(chars)), len(chars) - 1)
sparkline += chars[char_idx]
else:
sparkline = "▁" * width
return sparkline
def create_system_panel(self, sys_data):
"""Create system information panel"""
# System overview
temp_str = f"{sys_data['temp']:.1f}°C" if sys_data['temp'] else "N/A"
load_str = f"{sys_data['load1']:.2f}" if sys_data['load1'] >= 0 else "N/A"
uptime_str = self.format_uptime(sys_data['uptime'])
content = Text()
content.append(f"TEMP: ", style="white")
content.append(f"{temp_str:<8}", style="yellow" if sys_data['temp'] and sys_data['temp'] > 60 else "green")
content.append(f"LOAD: ", style="white")
content.append(f"{load_str:<6}", style="cyan")
content.append("\n")
content.append(f"UP: ", style="white")
content.append(f"{uptime_str:<8}", style="magenta")
content.append(f"PROC: ", style="white")
content.append(f"{sys_data['processes']}", style="yellow")
content.append("\n\n")
# Resource usage
cpu_color = "red" if sys_data['cpu'] > 80 else "yellow" if sys_data['cpu'] > 50 else "green"
mem_color = "red" if sys_data['mem_pct'] > 80 else "yellow" if sys_data['mem_pct'] > 60 else "green"
content.append(f"CPU: ", style="white")
content.append(f"{sys_data['cpu']:4.1f}%", style=cpu_color)
content.append("\n")
content.append(f"MEM: ", style="white")
content.append(f"{sys_data['mem_pct']:4.1f}%", style=mem_color)
content.append(f" {fmt_bytes(sys_data['mem_used'])}", style="white")
content.append("\n")
disk_color = "red" if sys_data['disk_pct'] > 90 else "yellow" if sys_data['disk_pct'] > 70 else "green"
content.append(f"DISK: ", style="white")
content.append(f"{sys_data['disk_pct']:4.1f}%", style=disk_color)
content.append(f" FREE:{fmt_bytes(sys_data['disk_free'])}", style="white")
content.append("\n\n")
# Network
content.append(f"NET RX: ", style="white")
content.append(f"{fmt_rate(sys_data['net_rx_rate'])}", style="green")
content.append("\n")
content.append(f"NET TX: ", style="white")
content.append(f"{fmt_rate(sys_data['net_tx_rate'])}", style="blue")
content.append("\n")
# Prometheus status
prom_status = "✓ OK" if self.prom.ok else "✗ FAIL"
prom_color = "green" if self.prom.ok else "red"
content.append(f"PROM: ", style="white")
content.append(prom_status, style=prom_color)
return Panel(content, title="[bold white]SYSTEM", border_style="blue")
def create_trends_panel(self):
"""Create trends panel with sparklines"""
if len(self.cpu_history) < 5:
return Panel("Loading trends...", title="[bold white]TRENDS", border_style="yellow")
content = Text()
# CPU trend
cpu_current = self.cpu_history[-1] if self.cpu_history else 0
content.append(f"CPU {cpu_current:4.1f}%\n", style="yellow")
content.append(self.create_sparkline(self.cpu_history), style="yellow")
content.append("\n\n")
# Network RX trend
rx_current = self.net_rx_history[-1] if self.net_rx_history else 0
content.append(f"RX {fmt_rate(rx_current)}\n", style="green")
content.append(self.create_sparkline(self.net_rx_history), style="green")
content.append("\n\n")
# Network TX trend
tx_current = self.net_tx_history[-1] if self.net_tx_history else 0
content.append(f"TX {fmt_rate(tx_current)}\n", style="blue")
content.append(self.create_sparkline(self.net_tx_history), style="blue")
content.append("\n\n")
# Disk trends
read_current = self.disk_read_history[-1] if self.disk_read_history else 0
write_current = self.disk_write_history[-1] if self.disk_write_history else 0
content.append(f"DSK_R {fmt_rate(read_current)}\n", style="cyan")
content.append(self.create_sparkline(self.disk_read_history), style="cyan")
content.append("\n")
content.append(f"DSK_W {fmt_rate(write_current)}\n", style="magenta")
content.append(self.create_sparkline(self.disk_write_history), style="magenta")
return Panel(content, title="[bold white]TRENDS", border_style="green")
def create_containers_panel(self, containers):
"""Create containers table panel"""
if not containers:
if self.prom.ok:
return Panel("No container data available", title="[bold white]CONTAINERS", border_style="red")
else:
return Panel("Prometheus disconnected", title="[bold white]CONTAINERS", border_style="red")
table = Table()
table.add_column("NAME", style="white", width=9)
table.add_column("CPU%", style="yellow", width=5)
table.add_column("MEM", style="magenta", width=6)
table.add_column("RX", style="green", width=7)
table.add_column("TX", style="blue", width=7)
table.add_column("UP", style="white", width=4)
# Sort containers by activity
sorted_containers = sorted(
containers.items(),
key=lambda x: (x[1].get('rx', 0) + x[1].get('tx', 0) + x[1].get('cpu', 0) * 1000),
reverse=True
)
count = 0
max_containers = 12
for name, stats in sorted_containers:
if count >= max_containers:
break
# Format data
short_name = name[:8] if len(name) <= 8 else name[:5] + "..."
cpu = stats.get('cpu', 0)
mem = stats.get('mem', 0)
rx = stats.get('rx', 0)
tx = stats.get('tx', 0)
cpu_str = f"{cpu:.1f}" if cpu > 0.1 else "-"
mem_str = fmt_bytes(mem) if mem > 0 else "-"
rx_str = fmt_rate(rx) if rx > 100 else "-"
tx_str = fmt_rate(tx) if tx > 100 else "-"
# Color coding based on activity
total_activity = rx + tx + (cpu * 1000)
if total_activity > 1024*1024:
name_style = "bold green"
elif total_activity > 1024:
name_style = "bold yellow"
elif total_activity > 0:
name_style = "cyan"
else:
name_style = "dim white"
cpu_style = "red" if cpu > 80 else "yellow" if cpu > 50 else "green" if cpu > 0.1 else "dim white"
table.add_row(
Text(short_name, style=name_style),
Text(cpu_str, style=cpu_style),
Text(mem_str, style="magenta"),
Text(rx_str, style="green"),
Text(tx_str, style="blue"),
Text("", style="white")
)
count += 1
# Add summary
remaining = len(containers) - count
total_containers = len(containers)
active_containers = len([c for c in containers.values()
if c.get('rx', 0) + c.get('tx', 0) > 0])
footer = f"Total: {total_containers} | Active: {active_containers}"
if remaining > 0:
footer += f" | +{remaining} more"
return Panel(table, title=f"[bold white]CONTAINERS", subtitle=footer, border_style="green")
def create_layout(self):
"""Create the main layout"""
sys_data = self.get_sys()
containers = self.prom.get_containers()
# Create panels
system_panel = self.create_system_panel(sys_data)
trends_panel = self.create_trends_panel()
containers_panel = self.create_containers_panel(containers)
# Create top row layout (system + trends side by side)
top_row = Columns([system_panel, trends_panel], equal=False, expand=True)
# Create main layout
layout = Layout()
layout.split_column(
Layout(Panel(Align.center(f"[bold white]HOMELAB MONITOR - {datetime.now().strftime('%H:%M:%S')}"),
border_style="cyan"), size=3),
Layout(top_row, size=12),
Layout(containers_panel)
)
return layout
def run(self):
"""Main monitoring loop with Rich Live display"""
console.print("[bold green]Starting Rich Monitor...[/]")
console.print(f"Prometheus URL: {PROMETHEUS_URL}")
console.print("[dim]Press Ctrl+C to exit[/]")
time.sleep(2)
try:
with Live(self.create_layout(), console=console, screen=True, refresh_per_second=1) as live:
while True:
time.sleep(REFRESH_INTERVAL)
live.update(self.create_layout())
except KeyboardInterrupt:
console.print("\n[yellow]Monitor stopped[/]")
if __name__ == "__main__":
try:
monitor = RichMonitor()
monitor.run()
except KeyboardInterrupt:
pass
except Exception as e:
console.print(f"[red]Error: {e}[/]")
finally:
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment