Skip to content

Instantly share code, notes, and snippets.

@nmoinvaz
Created February 25, 2026 22:42
Show Gist options
  • Select an option

  • Save nmoinvaz/488b32f2295f3d5653f60a352499b47f to your computer and use it in GitHub Desktop.

Select an option

Save nmoinvaz/488b32f2295f3d5653f60a352499b47f to your computer and use it in GitHub Desktop.
IMAP top senders analyzer
import argparse
import imaplib
import email.utils
import sys
from collections import Counter
from rich.console import Console
from rich.table import Table
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, MofNCompleteColumn
console = Console()
parser = argparse.ArgumentParser(description="Analyze top email senders via IMAP")
parser.add_argument("--host", required=True, help="IMAP server hostname")
parser.add_argument("--user", required=True, help="IMAP username/email")
parser.add_argument("--password", required=True, help="IMAP app password")
args = parser.parse_args()
BATCH_SIZE = 500
console.print("[bold]Connecting to IMAP server...[/bold]")
mail = imaplib.IMAP4_SSL(args.host)
mail.login(args.user, args.password)
mail.select("INBOX", readonly=True)
console.print("[bold]Searching all messages...[/bold]")
status, data = mail.search(None, "ALL")
if status != "OK":
console.print("[bold red]Search failed[/bold red]")
sys.exit(1)
uids = data[0].split()
total = len(uids)
console.print(f"[bold green]Found {total} messages.[/bold green] Fetching From headers...")
sender_counter = Counter()
errors = 0
with Progress(
TextColumn("[bold blue]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("[bold]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=console,
) as progress:
task = progress.add_task("Fetching headers", total=total)
for i in range(0, total, BATCH_SIZE):
batch = uids[i:i + BATCH_SIZE]
uid_range = b",".join(batch)
status, response = mail.fetch(uid_range, "(BODY.PEEK[HEADER.FIELDS (FROM)])")
if status != "OK":
errors += 1
progress.advance(task, len(batch))
continue
for item in response:
if isinstance(item, tuple) and len(item) == 2:
header = item[1].decode("utf-8", errors="replace")
value = header.replace("From: ", "").replace("from: ", "").strip()
if value:
name, addr = email.utils.parseaddr(value)
if addr:
sender_counter[addr.lower()] += 1
elif name:
sender_counter[name.lower()] += 1
progress.advance(task, len(batch))
mail.logout()
# Top 100 senders table
senders_table = Table(title=f"Top 100 Senders ({total} messages, {len(sender_counter)} unique)")
senders_table.add_column("#", justify="right", style="dim")
senders_table.add_column("Count", justify="right", style="bold cyan")
senders_table.add_column("%", justify="right", style="green")
senders_table.add_column("Sender", style="white")
for rank, (sender, count) in enumerate(sender_counter.most_common(100), 1):
pct = count * 100 / total
senders_table.add_row(str(rank), str(count), f"{pct:.1f}%", sender)
# Top 50 domains table
import tldextract
domain_counter = Counter()
for addr, count in sender_counter.items():
parts = addr.rsplit("@", 1)
hostname = parts[1] if len(parts) == 2 else addr
ext = tldextract.extract(hostname)
domain = f"{ext.domain}.{ext.suffix}" if ext.suffix else hostname
domain_counter[domain] += count
domains_table = Table(title=f"Top 50 Sender Domains ({len(domain_counter)} unique)")
domains_table.add_column("#", justify="right", style="dim")
domains_table.add_column("Count", justify="right", style="bold cyan")
domains_table.add_column("%", justify="right", style="green")
domains_table.add_column("Domain", style="white")
for rank, (domain, count) in enumerate(domain_counter.most_common(50), 1):
pct = count * 100 / total
domains_table.add_row(str(rank), str(count), f"{pct:.1f}%", domain)
with console.pager(styles=True):
console.print(senders_table)
console.print()
console.print(domains_table)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment