Skip to content

Instantly share code, notes, and snippets.

@rcarmo
Created January 24, 2026 11:03
Show Gist options
  • Select an option

  • Save rcarmo/21bac4c2ca7e41f508dc8e38a000f7fa to your computer and use it in GitHub Desktop.

Select an option

Save rcarmo/21bac4c2ca7e41f508dc8e38a000f7fa to your computer and use it in GitHub Desktop.
A nice way to keep tabs of Docker container resource usage
#!/usr/bin/env python3
import asyncio
import socket
import json
import curses
import argparse
from collections import defaultdict, deque
MAX_HISTORY = 200 # Max history samples to keep
BRAILLE_4 = ["⣀", "⣄", "⣆", "⣇", "⣿"] # 0–4 levels
BAR_CHARS = ["▏", "▎", "▍", "▌", "▋", "▊", "▉", "█"] # 1/8 increments
DEFAULT_DOCKER_SOCKET = "/var/run/docker.sock"
# ------------------------------------------------------------
# Docker API via Unix socket
# ------------------------------------------------------------
async def docker_api_request(path, socket_path):
"""Make an HTTP request to the Docker API via Unix socket."""
try:
reader, writer = await asyncio.open_unix_connection(socket_path)
request = f"GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
writer.write(request.encode())
await writer.drain()
response = await reader.read()
writer.close()
await writer.wait_closed()
# Parse HTTP response - split headers from body
response = response.decode('utf-8', errors='replace')
if '\r\n\r\n' in response:
headers, body = response.split('\r\n\r\n', 1)
else:
return None # Handle chunked transfer encoding
if 'Transfer-Encoding: chunked' in headers: decoded = []
while body: if '\r\n' not in body:
break size_str, rest = body.split('\r\n', 1) try:
size = int(size_str, 16) except ValueError: break
if size == 0: break decoded.append(rest[:size])
body = rest[size + 2:] # Skip chunk data + \r\n body = ''.join(decoded)
return json.loads(body) if body.strip() else None except (OSError, json.JSONDecodeError, ConnectionError): return None
async def get_containers(socket_path):
"""Get list of running containers.""" data = await docker_api_request("/containers/json", socket_path) return data if data else []
async def get_container_stats(container_id, socket_path):
"""Get stats for a single container (non-streaming)."""
data = await docker_api_request(f"/containers/{container_id}/stats?stream=false", socket_path)
return data
def calculate_cpu_percent(stats):
"""Calculate CPU percentage from Docker stats.""" try:
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
num_cpus = stats['cpu_stats'].get('online_cpus') or \
len(stats['cpu_stats']['cpu_usage'].get('percpu_usage', [1]))
if system_delta > 0 and cpu_delta > 0:
return (cpu_delta / system_delta) * num_cpus * 100.0
except (KeyError, TypeError, ZeroDivisionError):
pass
return 0.0
def calculate_mem_percent(stats):
"""Calculate memory percentage from Docker stats.""" try:
usage = stats['memory_stats']['usage'] # Subtract cache if available (more accurate)
cache = stats['memory_stats'].get('stats', {}).get('cache', 0)
usage = usage - cache
limit = stats['memory_stats']['limit']
if limit > 0: return (usage / limit) * 100.0
except (KeyError, TypeError, ZeroDivisionError): pass
return 0.0
async def get_docker_stats(socket_path, project=None):
"""Fetch stats for all containers concurrently.""" containers = await get_containers(socket_path)
if not containers: return []
async def fetch_one(c):
cid = c['Id'] name = c['Names'][0].lstrip('/') if c.get('Names') else cid[:12]
labels = c.get('Labels', {}) proj = labels.get('com.docker.compose.project')
if project and proj != project: return None
stats = await get_container_stats(cid, socket_path)
if not stats: return None
return { "name": name,
"cpu": calculate_cpu_percent(stats), "mem": calculate_mem_percent(stats),
"project": proj }
results = await asyncio.gather(*[fetch_one(c) for c in containers]) return [r for r in results if r is not None]
class AsyncStatsFetcher: """Background task for non-blocking docker stats fetching."""
def __init__(self, socket_path, project, interval): self.socket_path = socket_path
self.project = project self.interval = interval self.stats = []
self.generation = 0 # Increments each time new data is fetched self.lock = asyncio.Lock()
self.running = True self.task = None
async def start(self): self.task = asyncio.create_task(self._run())
async def _run(self): while self.running:
try: new_stats = await get_docker_stats(self.socket_path, self.project) async with self.lock:
self.stats = new_stats self.generation += 1
except Exception: pass
await asyncio.sleep(self.interval) async def get(self):
"""Returns (stats, generation) tuple.""" async with self.lock: return list(self.stats), self.generation
def stop(self): self.running = False
if self.task: self.task.cancel()
# ------------------------------------------------------------ # Charting helpers
# ------------------------------------------------------------ def braille4_for_value(v):
idx = int((v / 100.0) * 4) return BRAILLE_4[idx]
def render_braille4_timeseries(history, vmin, vmax, width): """Auto-scale history into 0–100% before mapping to braille."""
# Use most recent 'width' samples
samples = list(history)[-width:] if len(history) >= width else list(history)
scaled = []
rng = max(vmax - vmin, 1e-6)
for v in samples:
norm = (v - vmin) / rng scaled.append(norm * 100.0)
# Pad with empty if needed while len(scaled) < width:
scaled.insert(0, 0.0) return "".join(braille4_for_value(v) for v in scaled)
def render_horizontal_bar(value, width):
"""Render a horizontal bar for 0-100% value.""" value = max(0, min(100, value))
filled = (value / 100.0) * width full_blocks = int(filled)
remainder = filled - full_blocks
bar = "█" * full_blocks if remainder > 0 and full_blocks < width:
idx = int(remainder * 8) idx = min(idx, 7)
bar += BAR_CHARS[idx]
# Pad with spaces bar = f"{bar:{width}}"
return bar[:width]
def color_for_value(base_color, v): """
Map 0–100% to a 256-color intensity ramp. base_color is the hue; intensity is brightness.
""" # intensity 0–5
level = int((v / 100.0) * 5) return curses.color_pair(base_color + level)
# ------------------------------------------------------------
# Curses UI # ------------------------------------------------------------
async def ui(stdscr, args): curses.curs_set(0)
curses.start_color() curses.use_default_colors() stdscr.keypad(True) # Enable special keys (arrows, etc.)
# CPU color ramp (blue-ish) for i in range(6):
curses.init_pair(10 + i, 39 + i, -1)
# MEM color ramp (orange-ish) for i in range(6): curses.init_pair(20 + i, 208 - i, -1)
cpu_hist = defaultdict(lambda: deque([0.0] * MAX_HISTORY, maxlen=MAX_HISTORY))
mem_hist = defaultdict(lambda: deque([0.0] * MAX_HISTORY, maxlen=MAX_HISTORY)) cpu_min = defaultdict(lambda: 0.0) cpu_max = defaultdict(lambda: 100.0)
mem_min = defaultdict(lambda: 0.0) mem_max = defaultdict(lambda: 100.0)
paused = False sort_mode = args.sort
show_mem = args.mem bar_mode = args.bar scroll = 0
last_stats = [] last_generation = -1 # Track which generation we've already processed
KEY_POLL_MS = 50 # Poll keys every 50ms for responsive input
# Start background stats fetcher fetcher = AsyncStatsFetcher(args.socket, args.project, args.interval) await fetcher.start()
try: while True:
# Get latest stats from background task new_stats, generation = await fetcher.get()
# Only update history when we have genuinely new data (new generation) if generation > last_generation and new_stats and not paused: last_generation = generation
last_stats = new_stats for s in new_stats: name = s["name"]
cpu_hist[name].append(s["cpu"]) mem_hist[name].append(s["mem"])
# Update scaling windows cpu_min[name] = min(cpu_min[name], s["cpu"]) cpu_max[name] = max(cpu_max[name], s["cpu"])
mem_min[name] = min(mem_min[name], s["mem"]) mem_max[name] = max(mem_max[name], s["mem"])
stats = new_stats if new_stats else last_stats
stdscr.erase()
# Sorting
if sort_mode == "cpu":
stats.sort(key=lambda x: x["cpu"], reverse=True)
elif sort_mode == "mem":
stats.sort(key=lambda x: x["mem"], reverse=True)
else:
stats.sort(key=lambda x: x["name"])
# Overview
if stats:
avg_cpu = sum(s["cpu"] for s in stats) / len(stats)
avg_mem = sum(s["mem"] for s in stats) / len(stats)
overview = f"{len(stats)} containers | avg CPU {avg_cpu:4.1f}% | avg MEM {avg_mem:4.1f}%"
else:
overview = "No containers"
stdscr.addstr(0, 0, overview)
# Calculate dynamic chart width based on terminal size
# Layout: NAME(30) + gap(2) + chart + gap(1) + value(7)
name_col = 30
value_width = 7
min_chart_width = 20
# Calculate if we can fit side-by-side (two charts)
# Side-by-side: NAME + gap + (chart + gap + value) * 2
side_by_side_min = name_col + 2 + (min_chart_width + 1 + value_width) * 2
# Determine layout mode
stacked = show_mem and curses.COLS < side_by_side_min
if show_mem and not stacked:
# Side-by-side: split available space between two charts
available = curses.COLS - name_col - 2 - (1 + value_width) * 2
chart_width = max(min_chart_width, available // 2)
else:
# Single chart or stacked: full width
chart_width = max(min_chart_width, curses.COLS - name_col - 2 - 1 - value_width)
# Footer takes 2 lines
footer_lines = 2
available_rows = curses.LINES - 3 - footer_lines # header(3) + footer(2)
rows_per_container = 2 if stacked else 1
max_containers = available_rows // rows_per_container
# Header row
stdscr.addstr(2, 0, "NAME")
if show_mem:
if stacked:
stdscr.addstr(2, name_col + 2, "CPU / MEM (stacked)")
else:
stdscr.addstr(2, name_col + 2, "CPU")
stdscr.addstr(2, name_col + 2 + chart_width + 1 + value_width + 1, "MEM")
else:
stdscr.addstr(2, name_col + 2, "CPU")
# Render rows
visible = stats[scroll:scroll + max_containers]
row = 3
for s in visible:
name = s["name"]
cpu = s["cpu"]
mem = s["mem"]
stdscr.addstr(row, 0, f"{name[:name_col]:{name_col}}")
# CPU chart
if bar_mode:
cpu_line = render_horizontal_bar(cpu, chart_width)
else:
cpu_line = render_braille4_timeseries(cpu_hist[name], cpu_min[name], cpu_max[name], chart_width)
cpu_color = color_for_value(10, cpu)
stdscr.addstr(row, name_col + 2, cpu_line, cpu_color)
stdscr.addstr(row, name_col + 2 + chart_width + 1, f"{cpu:5.1f}%")
# MEM chart (stacked below if narrow, else side-by-side)
if show_mem:
if stacked:
row += 1 if bar_mode:
mem_line = render_horizontal_bar(mem, chart_width) else:
mem_line = render_braille4_timeseries(mem_hist[name], mem_min[name], mem_max[name], chart_width) mem_color = color_for_value(20, mem)
stdscr.addstr(row, name_col + 2, mem_line, mem_color) stdscr.addstr(row, name_col + 2 + chart_width + 1, f"{mem:5.1f}%")
else: # Side-by-side mem_col = name_col + 2 + chart_width + 1 + value_width + 1
if bar_mode: mem_line = render_horizontal_bar(mem, chart_width) else:
mem_line = render_braille4_timeseries(mem_hist[name], mem_min[name], mem_max[name], chart_width) mem_color = color_for_value(20, mem)
stdscr.addstr(row, mem_col, mem_line, mem_color) stdscr.addstr(row, mem_col + chart_width + 1, f"{mem:5.1f}%")
row += 1
# Footer with keyboard instructions footer_row = curses.LINES - 2
footer1 = "[q] Quit [p] Pause [m] Toggle MEM [b] Toggle bars [c] Cycle sort [r] Reset scale" mode_str = "BAR" if bar_mode else "TIMESERIES" footer2 = "[↑/↓] Scroll | Sort: {:<4} | Mode: {} | {}".format(sort_mode, mode_str, "PAUSED" if paused else "RUNNING") try: stdscr.addstr(footer_row, 0, footer1[:curses.COLS-1])
stdscr.addstr(footer_row + 1, 0, footer2[:curses.COLS-1]) except curses.error:
pass
stdscr.refresh()
# Keyboard input (short timeout for responsive keys)
stdscr.timeout(KEY_POLL_MS)
key = stdscr.getch()
if key == ord('q'):
break
elif key == ord('p'):
paused = not paused
elif key == ord('m'):
show_mem = not show_mem
elif key == ord('b'):
bar_mode = not bar_mode
elif key == ord('c'):
sort_mode = {"name": "cpu", "cpu": "mem", "mem": "name"}[sort_mode]
elif key == ord('r'):
cpu_min.clear(); cpu_max.clear()
mem_min.clear(); mem_max.clear()
elif key == curses.KEY_DOWN:
rows_per = 2 if show_mem else 1
max_c = (curses.LINES - 5) // rows_per
scroll = min(scroll + 1, max(0, len(stats) - max_c))
elif key == curses.KEY_UP:
scroll = max(scroll - 1, 0)
elif key == curses.KEY_RESIZE:
curses.update_lines_cols()
rows_per = 2 if show_mem else 1
max_c = (curses.LINES - 5) // rows_per
scroll = min(scroll, max(0, len(stats) - max_c))
# Yield to allow other async tasks to run
await asyncio.sleep(0)
finally:
fetcher.stop()
# ------------------------------------------------------------
# Main
# ------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Docker Compose time-series monitor (curses)")
parser.add_argument("--mem", action="store_true", help="Show memory charts")
parser.add_argument("--bar", action="store_true", help="Use horizontal bar mode instead of time-series")
parser.add_argument("--sort", choices=["cpu", "mem", "name"], default="name")
parser.add_argument("--project", help="Filter by compose project name")
parser.add_argument("--interval", type=float, default=1.0)
parser.add_argument("--socket", default=DEFAULT_DOCKER_SOCKET, help="Docker socket path")
args = parser.parse_args()
def run_ui(stdscr):
asyncio.run(ui(stdscr, args))
try:
curses.wrapper(run_ui)
except KeyboardInterrupt:
pass # Exit gracefully on Ctrl+C
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment