Skip to content

Instantly share code, notes, and snippets.

@notthetup
Created January 24, 2026 05:57
Show Gist options
  • Select an option

  • Save notthetup/a9ebe90f782834377a8ac0e51fa7056c to your computer and use it in GitHub Desktop.

Select an option

Save notthetup/a9ebe90f782834377a8ac0e51fa7056c to your computer and use it in GitHub Desktop.
Log analyzer that generates a systemd-blame style Gantt chart.
#!/usr/bin/env python3
"""
Log analyzer that generates a systemd-blame style Gantt chart.
Parses log files and visualizes which operations took how long to run.
"""
import re
import sys
from datetime import datetime, timedelta
from pathlib import Path
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.ticker import FuncFormatter
def parse_log_line(line):
"""Parse a single log line and extract timestamp, level, logger, and message."""
parts = line.strip().split('|', 3)
if len(parts) < 4:
return None
try:
timestamp_ms = int(parts[0])
level = parts[1]
logger = parts[2]
message = parts[3]
return {
'timestamp': timestamp_ms,
'level': level,
'logger': logger,
'message': message
}
except (ValueError, IndexError):
return None
def parse_log_file(filepath):
"""Parse entire log file and return list of events."""
events = []
with open(filepath, 'r') as f:
for line in f:
parsed = parse_log_line(line)
if parsed:
events.append(parsed)
return events
def calculate_durations(events):
"""Calculate duration for each event (time until next event)."""
durations = []
for i in range(len(events)):
event = events[i]
# Calculate duration until next event
if i < len(events) - 1:
duration_ms = events[i + 1]['timestamp'] - event['timestamp']
else:
# Last event gets a nominal 1ms duration
duration_ms = 1
# Create a short label from logger and message
logger_short = event['logger'].split('/')[-1].split('@')[0]
message_short = event['message'][:60]
if len(event['message']) > 60:
message_short += '...'
label = f"{logger_short}: {message_short}"
durations.append({
'label': label,
'duration_ms': duration_ms,
'timestamp': event['timestamp'],
'level': event['level'],
'full_logger': event['logger'],
'full_message': event['message']
})
return durations
def format_duration(ms):
"""Format milliseconds into human-readable duration."""
if ms < 1000:
return f"{ms}ms"
elif ms < 60000:
return f"{ms/1000:.2f}s"
else:
return f"{ms/60000:.2f}m"
def create_gantt_chart(durations, output_file='log_analysis.svg', top_n=50):
"""Create a systemd-blame style Gantt chart."""
# Keep chronological order, just take first N
top_durations = durations[:top_n]
# Calculate start time for each event (for Gantt chart positioning)
start_time = durations[0]['timestamp']
for d in top_durations:
d['relative_start'] = (d['timestamp'] - start_time) / 1000.0 # Convert to seconds
# Create figure
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, max(12, len(top_durations) * 0.3)),
gridspec_kw={'width_ratios': [1, 2]})
# Left subplot: Duration bars (blame style)
labels = [d['label'] for d in top_durations]
durations_sec = [d['duration_ms'] / 1000.0 for d in top_durations]
levels = [d['level'] for d in top_durations]
# Color by log level
colors = []
for level in levels:
if level == 'WARNING':
colors.append('#ff9800')
elif level == 'ERROR':
colors.append('#f44336')
else:
colors.append('#2196f3')
y_pos = range(len(labels))
bars = ax1.barh(y_pos, durations_sec, color=colors, alpha=0.7)
ax1.set_yticks(y_pos)
ax1.set_yticklabels(labels, fontsize=8)
ax1.invert_yaxis()
ax1.set_xlabel('Duration (seconds)', fontsize=10)
ax1.set_title(f'Operations in Chronological Order',
fontsize=12, fontweight='bold')
ax1.grid(axis='x', alpha=0.3)
# Add duration text on bars
for i, (bar, dur) in enumerate(zip(bars, durations_sec)):
width = bar.get_width()
ax1.text(width, bar.get_y() + bar.get_height()/2,
f' {format_duration(top_durations[i]["duration_ms"])}',
ha='left', va='center', fontsize=7, fontweight='bold')
# Right subplot: Timeline (Gantt chart)
for i, d in enumerate(top_durations):
start = d['relative_start']
duration = d['duration_ms'] / 1000.0
color = colors[i]
ax2.barh(i, duration, left=start, height=0.8, color=color, alpha=0.7)
ax2.set_yticks(y_pos)
ax2.set_yticklabels([''] * len(labels)) # No labels, they're on the left
ax2.invert_yaxis()
ax2.set_xlabel('Time since boot (seconds)', fontsize=10)
ax2.set_title('Timeline View', fontsize=12, fontweight='bold')
ax2.grid(axis='x', alpha=0.3)
# Add legend
legend_elements = [
mpatches.Patch(facecolor='#2196f3', alpha=0.7, label='INFO'),
mpatches.Patch(facecolor='#ff9800', alpha=0.7, label='WARNING'),
mpatches.Patch(facecolor='#f44336', alpha=0.7, label='ERROR')
]
ax2.legend(handles=legend_elements, loc='upper right')
# Calculate total time
total_time_ms = durations[-1]['timestamp'] + durations[-1]['duration_ms'] - durations[0]['timestamp']
plt.suptitle(f'Boot Log Analysis - Total Time: {format_duration(total_time_ms)}',
fontsize=14, fontweight='bold', y=0.995)
plt.tight_layout(rect=[0, 0, 1, 0.99])
# Save as SVG
plt.savefig(output_file, format='svg', dpi=300, bbox_inches='tight')
print(f"✓ Chart saved to: {output_file}")
# Print summary statistics
print(f"\n=== Summary ===")
print(f"Total events: {len(durations)}")
print(f"Total time: {format_duration(total_time_ms)}")
print(f"\nTop 5 longest operations:")
sorted_by_duration = sorted(durations, key=lambda x: x['duration_ms'], reverse=True)
for i, d in enumerate(sorted_by_duration[:5], 1):
print(f" {i}. {format_duration(d['duration_ms'])}: {d['label']}")
def main():
if len(sys.argv) < 2:
print("Usage: python analyze_log.py <log_file> [output.svg] [top_n]")
print("Example: python analyze_log.py log-0.txt analysis.svg 50")
sys.exit(1)
log_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else 'log_analysis.svg'
top_n = int(sys.argv[3]) if len(sys.argv) > 3 else 50
if not Path(log_file).exists():
print(f"Error: Log file '{log_file}' not found")
sys.exit(1)
print(f"Analyzing log file: {log_file}")
# Parse log
events = parse_log_file(log_file)
print(f"Parsed {len(events)} events")
if len(events) == 0:
print("Error: No events found in log file")
sys.exit(1)
# Calculate durations
durations = calculate_durations(events)
# Create chart
create_gantt_chart(durations, output_file, top_n)
print(f"\nDone! Open {output_file} in a browser or SVG viewer to zoom and explore.")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment