Created
July 1, 2025 06:11
-
-
Save morpheuslord/e85eaccf683c7ccca86debeb235330dc to your computer and use it in GitHub Desktop.
A easy way to monitor system and container stats for an homelab implementation. This is personalized for me to use Prometheus to scan system resources and Raspi ZeroW to monitor the system stats via a 3.5 Inch Screen.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Rich-based Docker Container Monitor for 3.5" RPi Screen | |
| Using Rich library for beautiful terminal layouts | |
| """ | |
| import requests | |
| import json | |
| import time | |
| import sys | |
| import os | |
| import psutil | |
| from datetime import datetime | |
| from collections import defaultdict | |
| # Rich imports | |
| from rich.console import Console | |
| from rich.panel import Panel | |
| from rich.table import Table | |
| from rich.columns import Columns | |
| from rich.layout import Layout | |
| from rich.live import Live | |
| from rich.text import Text | |
| from rich.progress import BarColumn, Progress | |
| from rich.align import Align | |
| # Configuration | |
| PROMETHEUS_URL = "http://IP:9090" | |
| REFRESH_INTERVAL = 3 # seconds | |
| console = Console() | |
| def fmt_bytes(b): | |
| if b < 1024: return f"{b:.0f}B" | |
| elif b < 1024*1024: return f"{b/1024:.1f}K" | |
| elif b < 1024*1024*1024: return f"{b/(1024*1024):.1f}M" | |
| else: return f"{b/(1024*1024*1024):.1f}G" | |
| def fmt_rate(bps): | |
| return fmt_bytes(bps) + "/s" | |
| def get_temp(): | |
| try: | |
| with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f: | |
| return float(f.read()) / 1000.0 | |
| except: | |
| return None | |
| def get_load(): | |
| """Get system load average with fallback""" | |
| try: | |
| return os.getloadavg() | |
| except (AttributeError, OSError): | |
| try: | |
| with open('/proc/loadavg', 'r') as f: | |
| load_values = f.read().strip().split()[:3] | |
| return [float(x) for x in load_values] | |
| except: | |
| return [0.0, 0.0, 0.0] | |
| def get_uptime(): | |
| try: | |
| with open('/proc/uptime', 'r') as f: | |
| return float(f.read().split()[0]) | |
| except: | |
| return None | |
| class PrometheusClient: | |
| def __init__(self, url): | |
| self.url = url | |
| self.ok = False | |
| def query(self, q): | |
| try: | |
| r = requests.get(f"{self.url}/api/v1/query", | |
| params={'query': q}, timeout=3) | |
| r.raise_for_status() | |
| self.ok = True | |
| return r.json() | |
| except: | |
| self.ok = False | |
| return None | |
| def get_containers(self): | |
| stats = defaultdict(dict) | |
| # Network RX/TX rates | |
| for direction, query in [('rx', 'rate(container_network_receive_bytes_total[1m])'), | |
| ('tx', 'rate(container_network_transmit_bytes_total[1m])')]: | |
| result = self.query(query) | |
| if result and result['status'] == 'success': | |
| for item in result['data']['result']: | |
| name = item['metric'].get('name', '') | |
| if name and not name.startswith('/'): | |
| stats[name][direction] = float(item['value'][1]) | |
| # CPU usage | |
| cpu = self.query('rate(container_cpu_usage_seconds_total[1m]) * 100') | |
| if cpu and cpu['status'] == 'success': | |
| for item in cpu['data']['result']: | |
| name = item['metric'].get('name', '') | |
| if name and not name.startswith('/'): | |
| stats[name]['cpu'] = float(item['value'][1]) | |
| # Memory usage | |
| mem = self.query('container_memory_usage_bytes') | |
| if mem and mem['status'] == 'success': | |
| for item in mem['data']['result']: | |
| name = item['metric'].get('name', '') | |
| if name and not name.startswith('/'): | |
| stats[name]['mem'] = float(item['value'][1]) | |
| return dict(stats) | |
| class RichMonitor: | |
| def __init__(self): | |
| self.prom = PrometheusClient(PROMETHEUS_URL) | |
| self.last_net = None | |
| self.last_disk = None | |
| self.net_rates = {'rx': 0, 'tx': 0} | |
| self.disk_rates = {'read': 0, 'write': 0} | |
| # Historical data for graphs (last 20 points for small screen) | |
| self.history_size = 20 | |
| self.cpu_history = [] | |
| self.net_rx_history = [] | |
| self.net_tx_history = [] | |
| self.disk_read_history = [] | |
| self.disk_write_history = [] | |
| def get_sys(self): | |
| cpu = psutil.cpu_percent(interval=None) | |
| mem = psutil.virtual_memory() | |
| swap = psutil.swap_memory() | |
| disk = psutil.disk_usage('/') | |
| disk_io = psutil.disk_io_counters() | |
| net = psutil.net_io_counters() | |
| # Calculate rates | |
| if self.last_net: | |
| time_diff = REFRESH_INTERVAL | |
| rx_rate = (net.bytes_recv - self.last_net.bytes_recv) / time_diff | |
| tx_rate = (net.bytes_sent - self.last_net.bytes_sent) / time_diff | |
| self.net_rates = {'rx': rx_rate, 'tx': tx_rate} | |
| self.last_net = net | |
| if self.last_disk: | |
| time_diff = REFRESH_INTERVAL | |
| read_rate = (disk_io.read_bytes - self.last_disk.read_bytes) / time_diff | |
| write_rate = (disk_io.write_bytes - self.last_disk.write_bytes) / time_diff | |
| self.disk_rates = {'read': read_rate, 'write': write_rate} | |
| self.last_disk = disk_io | |
| # Store historical data | |
| self.cpu_history.append(cpu) | |
| self.net_rx_history.append(self.net_rates['rx']) | |
| self.net_tx_history.append(self.net_rates['tx']) | |
| self.disk_read_history.append(self.disk_rates['read']) | |
| self.disk_write_history.append(self.disk_rates['write']) | |
| # Keep only last N points | |
| for history in [self.cpu_history, self.net_rx_history, self.net_tx_history, | |
| self.disk_read_history, self.disk_write_history]: | |
| if len(history) > self.history_size: | |
| history.pop(0) | |
| temp = get_temp() | |
| load_avg = get_load() | |
| uptime = get_uptime() | |
| processes = len(psutil.pids()) | |
| return { | |
| 'cpu': cpu, 'mem_pct': mem.percent, 'mem_used': mem.used, 'mem_total': mem.total, | |
| 'swap_pct': swap.percent, 'swap_used': swap.used, | |
| 'disk_pct': (disk.used/disk.total)*100, 'disk_free': disk.free, | |
| 'disk_read_rate': self.disk_rates['read'], 'disk_write_rate': self.disk_rates['write'], | |
| 'net_rx_total': net.bytes_recv, 'net_tx_total': net.bytes_sent, | |
| 'net_rx_rate': self.net_rates['rx'], 'net_tx_rate': self.net_rates['tx'], | |
| 'temp': temp, 'load1': load_avg[0], 'load5': load_avg[1], 'uptime': uptime, 'processes': processes | |
| } | |
| def format_uptime(self, seconds): | |
| if not seconds: | |
| return "N/A" | |
| if seconds < 3600: | |
| return f"{int(seconds/60)}m" | |
| elif seconds < 86400: | |
| return f"{int(seconds/3600)}h" | |
| else: | |
| return f"{int(seconds/86400)}d" | |
| def create_sparkline(self, data, width=15): | |
| """Create sparkline from data""" | |
| if not data or len(data) < 2: | |
| return "─" * width | |
| # Scale data to width | |
| scaled_data = [] | |
| step = max(1, len(data) // width) | |
| for i in range(0, len(data), step): | |
| chunk = data[i:i+step] | |
| scaled_data.append(sum(chunk) / len(chunk)) | |
| # Ensure exact width | |
| while len(scaled_data) > width: | |
| scaled_data.pop(0) | |
| while len(scaled_data) < width: | |
| scaled_data.insert(0, 0) | |
| # Create sparkline | |
| if max(scaled_data) > 0: | |
| chars = ['▁', '▂', '▃', '▄', '▅', '▆', '▇', '█'] | |
| max_val = max(scaled_data) | |
| sparkline = "" | |
| for value in scaled_data: | |
| normalized = value / max_val | |
| char_idx = min(int(normalized * len(chars)), len(chars) - 1) | |
| sparkline += chars[char_idx] | |
| else: | |
| sparkline = "▁" * width | |
| return sparkline | |
| def create_system_panel(self, sys_data): | |
| """Create system information panel""" | |
| # System overview | |
| temp_str = f"{sys_data['temp']:.1f}°C" if sys_data['temp'] else "N/A" | |
| load_str = f"{sys_data['load1']:.2f}" if sys_data['load1'] >= 0 else "N/A" | |
| uptime_str = self.format_uptime(sys_data['uptime']) | |
| content = Text() | |
| content.append(f"TEMP: ", style="white") | |
| content.append(f"{temp_str:<8}", style="yellow" if sys_data['temp'] and sys_data['temp'] > 60 else "green") | |
| content.append(f"LOAD: ", style="white") | |
| content.append(f"{load_str:<6}", style="cyan") | |
| content.append("\n") | |
| content.append(f"UP: ", style="white") | |
| content.append(f"{uptime_str:<8}", style="magenta") | |
| content.append(f"PROC: ", style="white") | |
| content.append(f"{sys_data['processes']}", style="yellow") | |
| content.append("\n\n") | |
| # Resource usage | |
| cpu_color = "red" if sys_data['cpu'] > 80 else "yellow" if sys_data['cpu'] > 50 else "green" | |
| mem_color = "red" if sys_data['mem_pct'] > 80 else "yellow" if sys_data['mem_pct'] > 60 else "green" | |
| content.append(f"CPU: ", style="white") | |
| content.append(f"{sys_data['cpu']:4.1f}%", style=cpu_color) | |
| content.append("\n") | |
| content.append(f"MEM: ", style="white") | |
| content.append(f"{sys_data['mem_pct']:4.1f}%", style=mem_color) | |
| content.append(f" {fmt_bytes(sys_data['mem_used'])}", style="white") | |
| content.append("\n") | |
| disk_color = "red" if sys_data['disk_pct'] > 90 else "yellow" if sys_data['disk_pct'] > 70 else "green" | |
| content.append(f"DISK: ", style="white") | |
| content.append(f"{sys_data['disk_pct']:4.1f}%", style=disk_color) | |
| content.append(f" FREE:{fmt_bytes(sys_data['disk_free'])}", style="white") | |
| content.append("\n\n") | |
| # Network | |
| content.append(f"NET RX: ", style="white") | |
| content.append(f"{fmt_rate(sys_data['net_rx_rate'])}", style="green") | |
| content.append("\n") | |
| content.append(f"NET TX: ", style="white") | |
| content.append(f"{fmt_rate(sys_data['net_tx_rate'])}", style="blue") | |
| content.append("\n") | |
| # Prometheus status | |
| prom_status = "✓ OK" if self.prom.ok else "✗ FAIL" | |
| prom_color = "green" if self.prom.ok else "red" | |
| content.append(f"PROM: ", style="white") | |
| content.append(prom_status, style=prom_color) | |
| return Panel(content, title="[bold white]SYSTEM", border_style="blue") | |
| def create_trends_panel(self): | |
| """Create trends panel with sparklines""" | |
| if len(self.cpu_history) < 5: | |
| return Panel("Loading trends...", title="[bold white]TRENDS", border_style="yellow") | |
| content = Text() | |
| # CPU trend | |
| cpu_current = self.cpu_history[-1] if self.cpu_history else 0 | |
| content.append(f"CPU {cpu_current:4.1f}%\n", style="yellow") | |
| content.append(self.create_sparkline(self.cpu_history), style="yellow") | |
| content.append("\n\n") | |
| # Network RX trend | |
| rx_current = self.net_rx_history[-1] if self.net_rx_history else 0 | |
| content.append(f"RX {fmt_rate(rx_current)}\n", style="green") | |
| content.append(self.create_sparkline(self.net_rx_history), style="green") | |
| content.append("\n\n") | |
| # Network TX trend | |
| tx_current = self.net_tx_history[-1] if self.net_tx_history else 0 | |
| content.append(f"TX {fmt_rate(tx_current)}\n", style="blue") | |
| content.append(self.create_sparkline(self.net_tx_history), style="blue") | |
| content.append("\n\n") | |
| # Disk trends | |
| read_current = self.disk_read_history[-1] if self.disk_read_history else 0 | |
| write_current = self.disk_write_history[-1] if self.disk_write_history else 0 | |
| content.append(f"DSK_R {fmt_rate(read_current)}\n", style="cyan") | |
| content.append(self.create_sparkline(self.disk_read_history), style="cyan") | |
| content.append("\n") | |
| content.append(f"DSK_W {fmt_rate(write_current)}\n", style="magenta") | |
| content.append(self.create_sparkline(self.disk_write_history), style="magenta") | |
| return Panel(content, title="[bold white]TRENDS", border_style="green") | |
| def create_containers_panel(self, containers): | |
| """Create containers table panel""" | |
| if not containers: | |
| if self.prom.ok: | |
| return Panel("No container data available", title="[bold white]CONTAINERS", border_style="red") | |
| else: | |
| return Panel("Prometheus disconnected", title="[bold white]CONTAINERS", border_style="red") | |
| table = Table() | |
| table.add_column("NAME", style="white", width=9) | |
| table.add_column("CPU%", style="yellow", width=5) | |
| table.add_column("MEM", style="magenta", width=6) | |
| table.add_column("RX", style="green", width=7) | |
| table.add_column("TX", style="blue", width=7) | |
| table.add_column("UP", style="white", width=4) | |
| # Sort containers by activity | |
| sorted_containers = sorted( | |
| containers.items(), | |
| key=lambda x: (x[1].get('rx', 0) + x[1].get('tx', 0) + x[1].get('cpu', 0) * 1000), | |
| reverse=True | |
| ) | |
| count = 0 | |
| max_containers = 12 | |
| for name, stats in sorted_containers: | |
| if count >= max_containers: | |
| break | |
| # Format data | |
| short_name = name[:8] if len(name) <= 8 else name[:5] + "..." | |
| cpu = stats.get('cpu', 0) | |
| mem = stats.get('mem', 0) | |
| rx = stats.get('rx', 0) | |
| tx = stats.get('tx', 0) | |
| cpu_str = f"{cpu:.1f}" if cpu > 0.1 else "-" | |
| mem_str = fmt_bytes(mem) if mem > 0 else "-" | |
| rx_str = fmt_rate(rx) if rx > 100 else "-" | |
| tx_str = fmt_rate(tx) if tx > 100 else "-" | |
| # Color coding based on activity | |
| total_activity = rx + tx + (cpu * 1000) | |
| if total_activity > 1024*1024: | |
| name_style = "bold green" | |
| elif total_activity > 1024: | |
| name_style = "bold yellow" | |
| elif total_activity > 0: | |
| name_style = "cyan" | |
| else: | |
| name_style = "dim white" | |
| cpu_style = "red" if cpu > 80 else "yellow" if cpu > 50 else "green" if cpu > 0.1 else "dim white" | |
| table.add_row( | |
| Text(short_name, style=name_style), | |
| Text(cpu_str, style=cpu_style), | |
| Text(mem_str, style="magenta"), | |
| Text(rx_str, style="green"), | |
| Text(tx_str, style="blue"), | |
| Text("", style="white") | |
| ) | |
| count += 1 | |
| # Add summary | |
| remaining = len(containers) - count | |
| total_containers = len(containers) | |
| active_containers = len([c for c in containers.values() | |
| if c.get('rx', 0) + c.get('tx', 0) > 0]) | |
| footer = f"Total: {total_containers} | Active: {active_containers}" | |
| if remaining > 0: | |
| footer += f" | +{remaining} more" | |
| return Panel(table, title=f"[bold white]CONTAINERS", subtitle=footer, border_style="green") | |
| def create_layout(self): | |
| """Create the main layout""" | |
| sys_data = self.get_sys() | |
| containers = self.prom.get_containers() | |
| # Create panels | |
| system_panel = self.create_system_panel(sys_data) | |
| trends_panel = self.create_trends_panel() | |
| containers_panel = self.create_containers_panel(containers) | |
| # Create top row layout (system + trends side by side) | |
| top_row = Columns([system_panel, trends_panel], equal=False, expand=True) | |
| # Create main layout | |
| layout = Layout() | |
| layout.split_column( | |
| Layout(Panel(Align.center(f"[bold white]HOMELAB MONITOR - {datetime.now().strftime('%H:%M:%S')}"), | |
| border_style="cyan"), size=3), | |
| Layout(top_row, size=12), | |
| Layout(containers_panel) | |
| ) | |
| return layout | |
| def run(self): | |
| """Main monitoring loop with Rich Live display""" | |
| console.print("[bold green]Starting Rich Monitor...[/]") | |
| console.print(f"Prometheus URL: {PROMETHEUS_URL}") | |
| console.print("[dim]Press Ctrl+C to exit[/]") | |
| time.sleep(2) | |
| try: | |
| with Live(self.create_layout(), console=console, screen=True, refresh_per_second=1) as live: | |
| while True: | |
| time.sleep(REFRESH_INTERVAL) | |
| live.update(self.create_layout()) | |
| except KeyboardInterrupt: | |
| console.print("\n[yellow]Monitor stopped[/]") | |
| if __name__ == "__main__": | |
| try: | |
| monitor = RichMonitor() | |
| monitor.run() | |
| except KeyboardInterrupt: | |
| pass | |
| except Exception as e: | |
| console.print(f"[red]Error: {e}[/]") | |
| finally: | |
| sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment