Created
February 25, 2026 22:42
-
-
Save nmoinvaz/488b32f2295f3d5653f60a352499b47f to your computer and use it in GitHub Desktop.
IMAP top senders analyzer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import imaplib | |
| import email.utils | |
| import sys | |
| from collections import Counter | |
| from rich.console import Console | |
| from rich.table import Table | |
| from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, MofNCompleteColumn | |
| console = Console() | |
| parser = argparse.ArgumentParser(description="Analyze top email senders via IMAP") | |
| parser.add_argument("--host", required=True, help="IMAP server hostname") | |
| parser.add_argument("--user", required=True, help="IMAP username/email") | |
| parser.add_argument("--password", required=True, help="IMAP app password") | |
| args = parser.parse_args() | |
| BATCH_SIZE = 500 | |
| console.print("[bold]Connecting to IMAP server...[/bold]") | |
| mail = imaplib.IMAP4_SSL(args.host) | |
| mail.login(args.user, args.password) | |
| mail.select("INBOX", readonly=True) | |
| console.print("[bold]Searching all messages...[/bold]") | |
| status, data = mail.search(None, "ALL") | |
| if status != "OK": | |
| console.print("[bold red]Search failed[/bold red]") | |
| sys.exit(1) | |
| uids = data[0].split() | |
| total = len(uids) | |
| console.print(f"[bold green]Found {total} messages.[/bold green] Fetching From headers...") | |
| sender_counter = Counter() | |
| errors = 0 | |
| with Progress( | |
| TextColumn("[bold blue]{task.description}"), | |
| BarColumn(), | |
| MofNCompleteColumn(), | |
| TextColumn("[bold]{task.percentage:>3.0f}%"), | |
| TimeRemainingColumn(), | |
| console=console, | |
| ) as progress: | |
| task = progress.add_task("Fetching headers", total=total) | |
| for i in range(0, total, BATCH_SIZE): | |
| batch = uids[i:i + BATCH_SIZE] | |
| uid_range = b",".join(batch) | |
| status, response = mail.fetch(uid_range, "(BODY.PEEK[HEADER.FIELDS (FROM)])") | |
| if status != "OK": | |
| errors += 1 | |
| progress.advance(task, len(batch)) | |
| continue | |
| for item in response: | |
| if isinstance(item, tuple) and len(item) == 2: | |
| header = item[1].decode("utf-8", errors="replace") | |
| value = header.replace("From: ", "").replace("from: ", "").strip() | |
| if value: | |
| name, addr = email.utils.parseaddr(value) | |
| if addr: | |
| sender_counter[addr.lower()] += 1 | |
| elif name: | |
| sender_counter[name.lower()] += 1 | |
| progress.advance(task, len(batch)) | |
| mail.logout() | |
| # Top 100 senders table | |
| senders_table = Table(title=f"Top 100 Senders ({total} messages, {len(sender_counter)} unique)") | |
| senders_table.add_column("#", justify="right", style="dim") | |
| senders_table.add_column("Count", justify="right", style="bold cyan") | |
| senders_table.add_column("%", justify="right", style="green") | |
| senders_table.add_column("Sender", style="white") | |
| for rank, (sender, count) in enumerate(sender_counter.most_common(100), 1): | |
| pct = count * 100 / total | |
| senders_table.add_row(str(rank), str(count), f"{pct:.1f}%", sender) | |
| # Top 50 domains table | |
| import tldextract | |
| domain_counter = Counter() | |
| for addr, count in sender_counter.items(): | |
| parts = addr.rsplit("@", 1) | |
| hostname = parts[1] if len(parts) == 2 else addr | |
| ext = tldextract.extract(hostname) | |
| domain = f"{ext.domain}.{ext.suffix}" if ext.suffix else hostname | |
| domain_counter[domain] += count | |
| domains_table = Table(title=f"Top 50 Sender Domains ({len(domain_counter)} unique)") | |
| domains_table.add_column("#", justify="right", style="dim") | |
| domains_table.add_column("Count", justify="right", style="bold cyan") | |
| domains_table.add_column("%", justify="right", style="green") | |
| domains_table.add_column("Domain", style="white") | |
| for rank, (domain, count) in enumerate(domain_counter.most_common(50), 1): | |
| pct = count * 100 / total | |
| domains_table.add_row(str(rank), str(count), f"{pct:.1f}%", domain) | |
| with console.pager(styles=True): | |
| console.print(senders_table) | |
| console.print() | |
| console.print(domains_table) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment