Created
January 24, 2026 05:57
-
-
Save notthetup/a9ebe90f782834377a8ac0e51fa7056c to your computer and use it in GitHub Desktop.
Log analyzer that generates a systemd-blame style Gantt chart.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Log analyzer that generates a systemd-blame style Gantt chart. | |
| Parses log files and visualizes which operations took how long to run. | |
| """ | |
| import re | |
| import sys | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| import matplotlib.pyplot as plt | |
| import matplotlib.patches as mpatches | |
| from matplotlib.ticker import FuncFormatter | |
| def parse_log_line(line): | |
| """Parse a single log line and extract timestamp, level, logger, and message.""" | |
| parts = line.strip().split('|', 3) | |
| if len(parts) < 4: | |
| return None | |
| try: | |
| timestamp_ms = int(parts[0]) | |
| level = parts[1] | |
| logger = parts[2] | |
| message = parts[3] | |
| return { | |
| 'timestamp': timestamp_ms, | |
| 'level': level, | |
| 'logger': logger, | |
| 'message': message | |
| } | |
| except (ValueError, IndexError): | |
| return None | |
| def parse_log_file(filepath): | |
| """Parse entire log file and return list of events.""" | |
| events = [] | |
| with open(filepath, 'r') as f: | |
| for line in f: | |
| parsed = parse_log_line(line) | |
| if parsed: | |
| events.append(parsed) | |
| return events | |
| def calculate_durations(events): | |
| """Calculate duration for each event (time until next event).""" | |
| durations = [] | |
| for i in range(len(events)): | |
| event = events[i] | |
| # Calculate duration until next event | |
| if i < len(events) - 1: | |
| duration_ms = events[i + 1]['timestamp'] - event['timestamp'] | |
| else: | |
| # Last event gets a nominal 1ms duration | |
| duration_ms = 1 | |
| # Create a short label from logger and message | |
| logger_short = event['logger'].split('/')[-1].split('@')[0] | |
| message_short = event['message'][:60] | |
| if len(event['message']) > 60: | |
| message_short += '...' | |
| label = f"{logger_short}: {message_short}" | |
| durations.append({ | |
| 'label': label, | |
| 'duration_ms': duration_ms, | |
| 'timestamp': event['timestamp'], | |
| 'level': event['level'], | |
| 'full_logger': event['logger'], | |
| 'full_message': event['message'] | |
| }) | |
| return durations | |
| def format_duration(ms): | |
| """Format milliseconds into human-readable duration.""" | |
| if ms < 1000: | |
| return f"{ms}ms" | |
| elif ms < 60000: | |
| return f"{ms/1000:.2f}s" | |
| else: | |
| return f"{ms/60000:.2f}m" | |
| def create_gantt_chart(durations, output_file='log_analysis.svg', top_n=50): | |
| """Create a systemd-blame style Gantt chart.""" | |
| # Keep chronological order, just take first N | |
| top_durations = durations[:top_n] | |
| # Calculate start time for each event (for Gantt chart positioning) | |
| start_time = durations[0]['timestamp'] | |
| for d in top_durations: | |
| d['relative_start'] = (d['timestamp'] - start_time) / 1000.0 # Convert to seconds | |
| # Create figure | |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20, max(12, len(top_durations) * 0.3)), | |
| gridspec_kw={'width_ratios': [1, 2]}) | |
| # Left subplot: Duration bars (blame style) | |
| labels = [d['label'] for d in top_durations] | |
| durations_sec = [d['duration_ms'] / 1000.0 for d in top_durations] | |
| levels = [d['level'] for d in top_durations] | |
| # Color by log level | |
| colors = [] | |
| for level in levels: | |
| if level == 'WARNING': | |
| colors.append('#ff9800') | |
| elif level == 'ERROR': | |
| colors.append('#f44336') | |
| else: | |
| colors.append('#2196f3') | |
| y_pos = range(len(labels)) | |
| bars = ax1.barh(y_pos, durations_sec, color=colors, alpha=0.7) | |
| ax1.set_yticks(y_pos) | |
| ax1.set_yticklabels(labels, fontsize=8) | |
| ax1.invert_yaxis() | |
| ax1.set_xlabel('Duration (seconds)', fontsize=10) | |
| ax1.set_title(f'Operations in Chronological Order', | |
| fontsize=12, fontweight='bold') | |
| ax1.grid(axis='x', alpha=0.3) | |
| # Add duration text on bars | |
| for i, (bar, dur) in enumerate(zip(bars, durations_sec)): | |
| width = bar.get_width() | |
| ax1.text(width, bar.get_y() + bar.get_height()/2, | |
| f' {format_duration(top_durations[i]["duration_ms"])}', | |
| ha='left', va='center', fontsize=7, fontweight='bold') | |
| # Right subplot: Timeline (Gantt chart) | |
| for i, d in enumerate(top_durations): | |
| start = d['relative_start'] | |
| duration = d['duration_ms'] / 1000.0 | |
| color = colors[i] | |
| ax2.barh(i, duration, left=start, height=0.8, color=color, alpha=0.7) | |
| ax2.set_yticks(y_pos) | |
| ax2.set_yticklabels([''] * len(labels)) # No labels, they're on the left | |
| ax2.invert_yaxis() | |
| ax2.set_xlabel('Time since boot (seconds)', fontsize=10) | |
| ax2.set_title('Timeline View', fontsize=12, fontweight='bold') | |
| ax2.grid(axis='x', alpha=0.3) | |
| # Add legend | |
| legend_elements = [ | |
| mpatches.Patch(facecolor='#2196f3', alpha=0.7, label='INFO'), | |
| mpatches.Patch(facecolor='#ff9800', alpha=0.7, label='WARNING'), | |
| mpatches.Patch(facecolor='#f44336', alpha=0.7, label='ERROR') | |
| ] | |
| ax2.legend(handles=legend_elements, loc='upper right') | |
| # Calculate total time | |
| total_time_ms = durations[-1]['timestamp'] + durations[-1]['duration_ms'] - durations[0]['timestamp'] | |
| plt.suptitle(f'Boot Log Analysis - Total Time: {format_duration(total_time_ms)}', | |
| fontsize=14, fontweight='bold', y=0.995) | |
| plt.tight_layout(rect=[0, 0, 1, 0.99]) | |
| # Save as SVG | |
| plt.savefig(output_file, format='svg', dpi=300, bbox_inches='tight') | |
| print(f"✓ Chart saved to: {output_file}") | |
| # Print summary statistics | |
| print(f"\n=== Summary ===") | |
| print(f"Total events: {len(durations)}") | |
| print(f"Total time: {format_duration(total_time_ms)}") | |
| print(f"\nTop 5 longest operations:") | |
| sorted_by_duration = sorted(durations, key=lambda x: x['duration_ms'], reverse=True) | |
| for i, d in enumerate(sorted_by_duration[:5], 1): | |
| print(f" {i}. {format_duration(d['duration_ms'])}: {d['label']}") | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print("Usage: python analyze_log.py <log_file> [output.svg] [top_n]") | |
| print("Example: python analyze_log.py log-0.txt analysis.svg 50") | |
| sys.exit(1) | |
| log_file = sys.argv[1] | |
| output_file = sys.argv[2] if len(sys.argv) > 2 else 'log_analysis.svg' | |
| top_n = int(sys.argv[3]) if len(sys.argv) > 3 else 50 | |
| if not Path(log_file).exists(): | |
| print(f"Error: Log file '{log_file}' not found") | |
| sys.exit(1) | |
| print(f"Analyzing log file: {log_file}") | |
| # Parse log | |
| events = parse_log_file(log_file) | |
| print(f"Parsed {len(events)} events") | |
| if len(events) == 0: | |
| print("Error: No events found in log file") | |
| sys.exit(1) | |
| # Calculate durations | |
| durations = calculate_durations(events) | |
| # Create chart | |
| create_gantt_chart(durations, output_file, top_n) | |
| print(f"\nDone! Open {output_file} in a browser or SVG viewer to zoom and explore.") | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment