Created
January 24, 2026 11:03
-
-
Save rcarmo/21bac4c2ca7e41f508dc8e38a000f7fa to your computer and use it in GitHub Desktop.
A nice way to keep tabs of Docker container resource usage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import asyncio | |
| import socket | |
| import json | |
| import curses | |
| import argparse | |
| from collections import defaultdict, deque | |
| MAX_HISTORY = 200 # Max history samples to keep | |
| BRAILLE_4 = ["⣀", "⣄", "⣆", "⣇", "⣿"] # 0–4 levels | |
| BAR_CHARS = ["▏", "▎", "▍", "▌", "▋", "▊", "▉", "█"] # 1/8 increments | |
| DEFAULT_DOCKER_SOCKET = "/var/run/docker.sock" | |
| # ------------------------------------------------------------ | |
| # Docker API via Unix socket | |
| # ------------------------------------------------------------ | |
| async def docker_api_request(path, socket_path): | |
| """Make an HTTP request to the Docker API via Unix socket.""" | |
| try: | |
| reader, writer = await asyncio.open_unix_connection(socket_path) | |
| request = f"GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n" | |
| writer.write(request.encode()) | |
| await writer.drain() | |
| response = await reader.read() | |
| writer.close() | |
| await writer.wait_closed() | |
| # Parse HTTP response - split headers from body | |
| response = response.decode('utf-8', errors='replace') | |
| if '\r\n\r\n' in response: | |
| headers, body = response.split('\r\n\r\n', 1) | |
| else: | |
| return None # Handle chunked transfer encoding | |
| if 'Transfer-Encoding: chunked' in headers: decoded = [] | |
| while body: if '\r\n' not in body: | |
| break size_str, rest = body.split('\r\n', 1) try: | |
| size = int(size_str, 16) except ValueError: break | |
| if size == 0: break decoded.append(rest[:size]) | |
| body = rest[size + 2:] # Skip chunk data + \r\n body = ''.join(decoded) | |
| return json.loads(body) if body.strip() else None except (OSError, json.JSONDecodeError, ConnectionError): return None | |
| async def get_containers(socket_path): | |
| """Get list of running containers.""" data = await docker_api_request("/containers/json", socket_path) return data if data else [] | |
| async def get_container_stats(container_id, socket_path): | |
| """Get stats for a single container (non-streaming).""" | |
| data = await docker_api_request(f"/containers/{container_id}/stats?stream=false", socket_path) | |
| return data | |
| def calculate_cpu_percent(stats): | |
| """Calculate CPU percentage from Docker stats.""" try: | |
| cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \ | |
| stats['precpu_stats']['cpu_usage']['total_usage'] | |
| system_delta = stats['cpu_stats']['system_cpu_usage'] - \ | |
| stats['precpu_stats']['system_cpu_usage'] | |
| num_cpus = stats['cpu_stats'].get('online_cpus') or \ | |
| len(stats['cpu_stats']['cpu_usage'].get('percpu_usage', [1])) | |
| if system_delta > 0 and cpu_delta > 0: | |
| return (cpu_delta / system_delta) * num_cpus * 100.0 | |
| except (KeyError, TypeError, ZeroDivisionError): | |
| pass | |
| return 0.0 | |
| def calculate_mem_percent(stats): | |
| """Calculate memory percentage from Docker stats.""" try: | |
| usage = stats['memory_stats']['usage'] # Subtract cache if available (more accurate) | |
| cache = stats['memory_stats'].get('stats', {}).get('cache', 0) | |
| usage = usage - cache | |
| limit = stats['memory_stats']['limit'] | |
| if limit > 0: return (usage / limit) * 100.0 | |
| except (KeyError, TypeError, ZeroDivisionError): pass | |
| return 0.0 | |
| async def get_docker_stats(socket_path, project=None): | |
| """Fetch stats for all containers concurrently.""" containers = await get_containers(socket_path) | |
| if not containers: return [] | |
| async def fetch_one(c): | |
| cid = c['Id'] name = c['Names'][0].lstrip('/') if c.get('Names') else cid[:12] | |
| labels = c.get('Labels', {}) proj = labels.get('com.docker.compose.project') | |
| if project and proj != project: return None | |
| stats = await get_container_stats(cid, socket_path) | |
| if not stats: return None | |
| return { "name": name, | |
| "cpu": calculate_cpu_percent(stats), "mem": calculate_mem_percent(stats), | |
| "project": proj } | |
| results = await asyncio.gather(*[fetch_one(c) for c in containers]) return [r for r in results if r is not None] | |
| class AsyncStatsFetcher: """Background task for non-blocking docker stats fetching.""" | |
| def __init__(self, socket_path, project, interval): self.socket_path = socket_path | |
| self.project = project self.interval = interval self.stats = [] | |
| self.generation = 0 # Increments each time new data is fetched self.lock = asyncio.Lock() | |
| self.running = True self.task = None | |
| async def start(self): self.task = asyncio.create_task(self._run()) | |
| async def _run(self): while self.running: | |
| try: new_stats = await get_docker_stats(self.socket_path, self.project) async with self.lock: | |
| self.stats = new_stats self.generation += 1 | |
| except Exception: pass | |
| await asyncio.sleep(self.interval) async def get(self): | |
| """Returns (stats, generation) tuple.""" async with self.lock: return list(self.stats), self.generation | |
| def stop(self): self.running = False | |
| if self.task: self.task.cancel() | |
| # ------------------------------------------------------------ # Charting helpers | |
| # ------------------------------------------------------------ def braille4_for_value(v): | |
| idx = int((v / 100.0) * 4) return BRAILLE_4[idx] | |
| def render_braille4_timeseries(history, vmin, vmax, width): """Auto-scale history into 0–100% before mapping to braille.""" | |
| # Use most recent 'width' samples | |
| samples = list(history)[-width:] if len(history) >= width else list(history) | |
| scaled = [] | |
| rng = max(vmax - vmin, 1e-6) | |
| for v in samples: | |
| norm = (v - vmin) / rng scaled.append(norm * 100.0) | |
| # Pad with empty if needed while len(scaled) < width: | |
| scaled.insert(0, 0.0) return "".join(braille4_for_value(v) for v in scaled) | |
| def render_horizontal_bar(value, width): | |
| """Render a horizontal bar for 0-100% value.""" value = max(0, min(100, value)) | |
| filled = (value / 100.0) * width full_blocks = int(filled) | |
| remainder = filled - full_blocks | |
| bar = "█" * full_blocks if remainder > 0 and full_blocks < width: | |
| idx = int(remainder * 8) idx = min(idx, 7) | |
| bar += BAR_CHARS[idx] | |
| # Pad with spaces bar = f"{bar:{width}}" | |
| return bar[:width] | |
| def color_for_value(base_color, v): """ | |
| Map 0–100% to a 256-color intensity ramp. base_color is the hue; intensity is brightness. | |
| """ # intensity 0–5 | |
| level = int((v / 100.0) * 5) return curses.color_pair(base_color + level) | |
| # ------------------------------------------------------------ | |
| # Curses UI # ------------------------------------------------------------ | |
| async def ui(stdscr, args): curses.curs_set(0) | |
| curses.start_color() curses.use_default_colors() stdscr.keypad(True) # Enable special keys (arrows, etc.) | |
| # CPU color ramp (blue-ish) for i in range(6): | |
| curses.init_pair(10 + i, 39 + i, -1) | |
| # MEM color ramp (orange-ish) for i in range(6): curses.init_pair(20 + i, 208 - i, -1) | |
| cpu_hist = defaultdict(lambda: deque([0.0] * MAX_HISTORY, maxlen=MAX_HISTORY)) | |
| mem_hist = defaultdict(lambda: deque([0.0] * MAX_HISTORY, maxlen=MAX_HISTORY)) cpu_min = defaultdict(lambda: 0.0) cpu_max = defaultdict(lambda: 100.0) | |
| mem_min = defaultdict(lambda: 0.0) mem_max = defaultdict(lambda: 100.0) | |
| paused = False sort_mode = args.sort | |
| show_mem = args.mem bar_mode = args.bar scroll = 0 | |
| last_stats = [] last_generation = -1 # Track which generation we've already processed | |
| KEY_POLL_MS = 50 # Poll keys every 50ms for responsive input | |
| # Start background stats fetcher fetcher = AsyncStatsFetcher(args.socket, args.project, args.interval) await fetcher.start() | |
| try: while True: | |
| # Get latest stats from background task new_stats, generation = await fetcher.get() | |
| # Only update history when we have genuinely new data (new generation) if generation > last_generation and new_stats and not paused: last_generation = generation | |
| last_stats = new_stats for s in new_stats: name = s["name"] | |
| cpu_hist[name].append(s["cpu"]) mem_hist[name].append(s["mem"]) | |
| # Update scaling windows cpu_min[name] = min(cpu_min[name], s["cpu"]) cpu_max[name] = max(cpu_max[name], s["cpu"]) | |
| mem_min[name] = min(mem_min[name], s["mem"]) mem_max[name] = max(mem_max[name], s["mem"]) | |
| stats = new_stats if new_stats else last_stats | |
| stdscr.erase() | |
| # Sorting | |
| if sort_mode == "cpu": | |
| stats.sort(key=lambda x: x["cpu"], reverse=True) | |
| elif sort_mode == "mem": | |
| stats.sort(key=lambda x: x["mem"], reverse=True) | |
| else: | |
| stats.sort(key=lambda x: x["name"]) | |
| # Overview | |
| if stats: | |
| avg_cpu = sum(s["cpu"] for s in stats) / len(stats) | |
| avg_mem = sum(s["mem"] for s in stats) / len(stats) | |
| overview = f"{len(stats)} containers | avg CPU {avg_cpu:4.1f}% | avg MEM {avg_mem:4.1f}%" | |
| else: | |
| overview = "No containers" | |
| stdscr.addstr(0, 0, overview) | |
| # Calculate dynamic chart width based on terminal size | |
| # Layout: NAME(30) + gap(2) + chart + gap(1) + value(7) | |
| name_col = 30 | |
| value_width = 7 | |
| min_chart_width = 20 | |
| # Calculate if we can fit side-by-side (two charts) | |
| # Side-by-side: NAME + gap + (chart + gap + value) * 2 | |
| side_by_side_min = name_col + 2 + (min_chart_width + 1 + value_width) * 2 | |
| # Determine layout mode | |
| stacked = show_mem and curses.COLS < side_by_side_min | |
| if show_mem and not stacked: | |
| # Side-by-side: split available space between two charts | |
| available = curses.COLS - name_col - 2 - (1 + value_width) * 2 | |
| chart_width = max(min_chart_width, available // 2) | |
| else: | |
| # Single chart or stacked: full width | |
| chart_width = max(min_chart_width, curses.COLS - name_col - 2 - 1 - value_width) | |
| # Footer takes 2 lines | |
| footer_lines = 2 | |
| available_rows = curses.LINES - 3 - footer_lines # header(3) + footer(2) | |
| rows_per_container = 2 if stacked else 1 | |
| max_containers = available_rows // rows_per_container | |
| # Header row | |
| stdscr.addstr(2, 0, "NAME") | |
| if show_mem: | |
| if stacked: | |
| stdscr.addstr(2, name_col + 2, "CPU / MEM (stacked)") | |
| else: | |
| stdscr.addstr(2, name_col + 2, "CPU") | |
| stdscr.addstr(2, name_col + 2 + chart_width + 1 + value_width + 1, "MEM") | |
| else: | |
| stdscr.addstr(2, name_col + 2, "CPU") | |
| # Render rows | |
| visible = stats[scroll:scroll + max_containers] | |
| row = 3 | |
| for s in visible: | |
| name = s["name"] | |
| cpu = s["cpu"] | |
| mem = s["mem"] | |
| stdscr.addstr(row, 0, f"{name[:name_col]:{name_col}}") | |
| # CPU chart | |
| if bar_mode: | |
| cpu_line = render_horizontal_bar(cpu, chart_width) | |
| else: | |
| cpu_line = render_braille4_timeseries(cpu_hist[name], cpu_min[name], cpu_max[name], chart_width) | |
| cpu_color = color_for_value(10, cpu) | |
| stdscr.addstr(row, name_col + 2, cpu_line, cpu_color) | |
| stdscr.addstr(row, name_col + 2 + chart_width + 1, f"{cpu:5.1f}%") | |
| # MEM chart (stacked below if narrow, else side-by-side) | |
| if show_mem: | |
| if stacked: | |
| row += 1 if bar_mode: | |
| mem_line = render_horizontal_bar(mem, chart_width) else: | |
| mem_line = render_braille4_timeseries(mem_hist[name], mem_min[name], mem_max[name], chart_width) mem_color = color_for_value(20, mem) | |
| stdscr.addstr(row, name_col + 2, mem_line, mem_color) stdscr.addstr(row, name_col + 2 + chart_width + 1, f"{mem:5.1f}%") | |
| else: # Side-by-side mem_col = name_col + 2 + chart_width + 1 + value_width + 1 | |
| if bar_mode: mem_line = render_horizontal_bar(mem, chart_width) else: | |
| mem_line = render_braille4_timeseries(mem_hist[name], mem_min[name], mem_max[name], chart_width) mem_color = color_for_value(20, mem) | |
| stdscr.addstr(row, mem_col, mem_line, mem_color) stdscr.addstr(row, mem_col + chart_width + 1, f"{mem:5.1f}%") | |
| row += 1 | |
| # Footer with keyboard instructions footer_row = curses.LINES - 2 | |
| footer1 = "[q] Quit [p] Pause [m] Toggle MEM [b] Toggle bars [c] Cycle sort [r] Reset scale" mode_str = "BAR" if bar_mode else "TIMESERIES" footer2 = "[↑/↓] Scroll | Sort: {:<4} | Mode: {} | {}".format(sort_mode, mode_str, "PAUSED" if paused else "RUNNING") try: stdscr.addstr(footer_row, 0, footer1[:curses.COLS-1]) | |
| stdscr.addstr(footer_row + 1, 0, footer2[:curses.COLS-1]) except curses.error: | |
| pass | |
| stdscr.refresh() | |
| # Keyboard input (short timeout for responsive keys) | |
| stdscr.timeout(KEY_POLL_MS) | |
| key = stdscr.getch() | |
| if key == ord('q'): | |
| break | |
| elif key == ord('p'): | |
| paused = not paused | |
| elif key == ord('m'): | |
| show_mem = not show_mem | |
| elif key == ord('b'): | |
| bar_mode = not bar_mode | |
| elif key == ord('c'): | |
| sort_mode = {"name": "cpu", "cpu": "mem", "mem": "name"}[sort_mode] | |
| elif key == ord('r'): | |
| cpu_min.clear(); cpu_max.clear() | |
| mem_min.clear(); mem_max.clear() | |
| elif key == curses.KEY_DOWN: | |
| rows_per = 2 if show_mem else 1 | |
| max_c = (curses.LINES - 5) // rows_per | |
| scroll = min(scroll + 1, max(0, len(stats) - max_c)) | |
| elif key == curses.KEY_UP: | |
| scroll = max(scroll - 1, 0) | |
| elif key == curses.KEY_RESIZE: | |
| curses.update_lines_cols() | |
| rows_per = 2 if show_mem else 1 | |
| max_c = (curses.LINES - 5) // rows_per | |
| scroll = min(scroll, max(0, len(stats) - max_c)) | |
| # Yield to allow other async tasks to run | |
| await asyncio.sleep(0) | |
| finally: | |
| fetcher.stop() | |
| # ------------------------------------------------------------ | |
| # Main | |
| # ------------------------------------------------------------ | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Docker Compose time-series monitor (curses)") | |
| parser.add_argument("--mem", action="store_true", help="Show memory charts") | |
| parser.add_argument("--bar", action="store_true", help="Use horizontal bar mode instead of time-series") | |
| parser.add_argument("--sort", choices=["cpu", "mem", "name"], default="name") | |
| parser.add_argument("--project", help="Filter by compose project name") | |
| parser.add_argument("--interval", type=float, default=1.0) | |
| parser.add_argument("--socket", default=DEFAULT_DOCKER_SOCKET, help="Docker socket path") | |
| args = parser.parse_args() | |
| def run_ui(stdscr): | |
| asyncio.run(ui(stdscr, args)) | |
| try: | |
| curses.wrapper(run_ui) | |
| except KeyboardInterrupt: | |
| pass # Exit gracefully on Ctrl+C | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment