Skip to content

Instantly share code, notes, and snippets.

@zzstoatzz
Last active December 13, 2025 08:36
Show Gist options
  • Select an option

  • Save zzstoatzz/90f138b90b05c369ecd0107d5e8358b6 to your computer and use it in GitHub Desktop.

Select an option

Save zzstoatzz/90f138b90b05c369ecd0107d5e8358b6 to your computer and use it in GitHub Desktop.
consume ATProto lexicon events from tap
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["websockets", "rich"]
# ///
"""consume ATProto lexicon events from tap with formatted output.
automatically starts tap if not running.
usage:
uv run https://gist.github.com/zzstoatzz/90f138b90b05c369ecd0107d5e8358b6
uv run https://gist.github.com/zzstoatzz/90f138b90b05c369ecd0107d5e8358b6 --signal app.bsky.feed.post --filter "app.bsky.feed.*"
flags:
-n, --count N limit to N events (default: all)
--live only live firehose events (skip backfill)
--signal COLLECTION signal collection to find repos (default: fm.plyr.track)
--filter PATTERN collection filter pattern (default: fm.plyr.*)
--url URL tap websocket url (default: ws://localhost:2480/channel)
"""
import argparse
import asyncio
import json
import os
import shutil
import socket
import subprocess
import time
from typing import Any
import websockets
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.text import Text
console = Console()
TAP_PORT = 2480
COLOR_MAP = {
"tracks": "cyan",
"likes": "red",
"lists": "magenta",
"comments": "blue",
"identities": "green",
}
def is_tap_running() -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("localhost", TAP_PORT)) == 0
def start_tap(signal: str, filter_pattern: str) -> subprocess.Popen[bytes] | None:
tap_bin = shutil.which("tap") or os.path.expanduser("~/go/bin/tap")
if not os.path.exists(tap_bin):
console.print("[red]tap binary not found[/]")
console.print(
"[dim]install: go install github.com/bluesky-social/indigo/cmd/tap@latest[/]"
)
return None
return subprocess.Popen(
[tap_bin, "run"],
env={
**os.environ,
"TAP_SIGNAL_COLLECTION": signal,
"TAP_COLLECTION_FILTERS": filter_pattern,
},
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
PANEL_WIDTH = 70
def format_did(did: str) -> str:
if did.startswith("did:plc:"):
return f"did:plc:{did[8:16]}…"
return did[:24] + "…" if len(did) > 24 else did
def format_track(record: dict[str, Any], live: bool, did: str) -> Panel:
badge = "[green]LIVE[/]" if live else "[dim]backfill[/]"
text = Text()
text.append("♫ ", style="cyan bold")
text.append(f"{record.get('title', '?')}\n", style="white bold")
text.append(" by ", style="dim")
text.append(f"{record.get('artist', '?')}\n", style="yellow")
text.append(f" {format_did(did)}", style="dim")
return Panel(
text, title=f"[cyan]track[/] {badge}", border_style="cyan", width=PANEL_WIDTH
)
def format_like(record: dict[str, Any], live: bool, did: str) -> Panel:
badge = "[green]LIVE[/]" if live else "[dim]backfill[/]"
subject = record.get("subject", {})
subject_uri = subject.get("uri", "?")
# extract rkey from at://did/collection/rkey
rkey = subject_uri.split("/")[-1] if "/" in subject_uri else subject_uri
text = Text()
text.append("♥ ", style="red bold")
text.append(f"{rkey}\n", style="white")
text.append(f" {format_did(did)}", style="dim")
return Panel(
text, title=f"[red]like[/] {badge}", border_style="red", width=PANEL_WIDTH
)
def format_list(record: dict[str, Any], live: bool, did: str) -> Panel:
badge = "[green]LIVE[/]" if live else "[dim]backfill[/]"
text = Text()
text.append("📋 ", style="magenta bold")
text.append(f"{record.get('name') or '(unnamed)'}\n", style="white bold")
text.append(" type: ", style="dim")
text.append(f"{record.get('listType', '?')}\n", style="magenta")
text.append(" items: ", style="dim")
text.append(f"{len(record.get('items', []))}\n", style="white")
text.append(f" {format_did(did)}", style="dim")
return Panel(
text,
title=f"[magenta]list[/] {badge}",
border_style="magenta",
width=PANEL_WIDTH,
)
def format_comment(record: dict[str, Any], live: bool, did: str) -> Panel:
badge = "[green]LIVE[/]" if live else "[dim]backfill[/]"
comment_text = record.get("text", "")[:50]
text = Text()
text.append("💬 ", style="blue bold")
text.append(f'"{comment_text}"\n', style="italic")
text.append(" at ", style="dim")
text.append(f"{record.get('timestampMs', 0)}ms\n", style="blue")
text.append(f" {format_did(did)}", style="dim")
return Panel(
text, title=f"[blue]comment[/] {badge}", border_style="blue", width=PANEL_WIDTH
)
def format_identity(ident: dict[str, Any]) -> Panel:
status = ident.get("status", "?")
status_style = "green" if status == "active" else "yellow"
did = ident.get("did", "?")
text = Text()
text.append("👤 ", style="green bold")
text.append(f"@{ident.get('handle', '?')}\n", style="white bold")
text.append(f" {format_did(did)}\n", style="dim")
text.append(" status: ", style="dim")
text.append(status, style=status_style)
return Panel(
text, title="[green]identity[/]", border_style="green", width=PANEL_WIDTH
)
def make_status(stats: dict[str, int], elapsed: float, backfill_done: bool) -> Text:
total = sum(stats.values())
phase = "[green]live[/]" if backfill_done else "[yellow]backfill[/]"
parts = [f"{k[0]}:{v}" for k, v in stats.items() if v > 0]
return Text.from_markup(
f" {phase} | {total} events | {elapsed:.1f}s | {' '.join(parts)}"
)
async def consume(
url: str,
max_events: int | None,
live_only: bool,
signal: str,
filter_pattern: str,
) -> int:
tap_proc: subprocess.Popen[bytes] | None = None
if not is_tap_running():
console.print("[yellow]tap not running, starting...[/]")
console.print(f"[dim] signal: {signal}[/]")
console.print(f"[dim] filter: {filter_pattern}[/]")
tap_proc = start_tap(signal, filter_pattern)
if tap_proc is None:
return 1
for _ in range(50):
if is_tap_running():
break
await asyncio.sleep(0.1)
else:
console.print("[red]tap failed to start[/]")
tap_proc.terminate()
return 1
console.print(f"[green]tap started (pid {tap_proc.pid})[/]")
stats = {"tracks": 0, "likes": 0, "lists": 0, "comments": 0, "identities": 0}
count = 0
backfill_done = False
start_time = time.monotonic()
console.print(f"\n[bold]connecting to {url}...[/]\n")
ws = None
try:
ws = await websockets.connect(url)
limit_msg = f" (limit: {max_events})" if max_events else " (all events)"
console.print(f"[green]connected![/]{limit_msg}\n")
with Live(
make_status(stats, 0, backfill_done),
console=console,
refresh_per_second=4,
) as status:
while max_events is None or count < max_events:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=0.5)
except asyncio.TimeoutError:
if count > 0 and not live_only:
break
if live_only or backfill_done:
status.update(
Text.from_markup(
f" [dim]waiting for live events...[/] | {count} events | {time.monotonic() - start_time:.1f}s"
)
)
continue
data: dict[str, Any] = json.loads(msg)
event_type = data.get("type")
if event_type == "record":
meta: dict[str, Any] = data.get("record", {})
is_live: bool = meta.get("live", False)
if is_live and not backfill_done:
backfill_done = True
if live_only and not is_live:
await ws.send(json.dumps({"ack": data.get("id")}))
continue
panel: Panel | None = None
if event_type == "record":
meta = data.get("record", {})
collection: str = meta.get("collection", "")
record: dict[str, Any] = meta.get("record", {})
from_live: bool = meta.get("live", False)
did: str = meta.get("did", "")
if "track" in collection:
panel = format_track(record, from_live, did)
stats["tracks"] += 1
elif "like" in collection:
panel = format_like(record, from_live, did)
stats["likes"] += 1
elif "list" in collection:
panel = format_list(record, from_live, did)
stats["lists"] += 1
elif "comment" in collection:
panel = format_comment(record, from_live, did)
stats["comments"] += 1
elif event_type == "identity":
panel = format_identity(data.get("identity", {}))
stats["identities"] += 1
if panel:
status.console.print(panel)
count += 1
status.update(
make_status(
stats, time.monotonic() - start_time, backfill_done
)
)
await ws.send(json.dumps({"ack": data.get("id")}))
except websockets.exceptions.ConnectionClosed:
console.print("[yellow]connection closed[/]")
except KeyboardInterrupt:
console.print("\n[yellow]interrupted[/]")
finally:
# kill tap first, then just abandon websocket (no graceful close)
if tap_proc is not None:
tap_proc.kill()
tap_proc.wait()
elapsed = time.monotonic() - start_time
console.print()
labels = [k for k, v in stats.items() if v > 0]
values = [v for v in stats.values() if v > 0]
if values:
max_val = max(values)
max_label_len = max(len(label) for label in labels)
for label, value in zip(labels, values):
bar_width = int(40 * value / max_val) if max_val > 0 else 0
bar = "▇" * bar_width
console.print(f" {label:>{max_label_len}} [{COLOR_MAP[label]}]{bar}[/] {value}")
console.print(f"[bold]total: {count}[/] in [cyan]{elapsed:.2f}s[/]")
if count > 0 and elapsed > 0:
console.print(f"[dim]{count / elapsed:.1f} events/sec[/]")
return 0
def main() -> int:
parser = argparse.ArgumentParser(
description="consume ATProto lexicon events from tap"
)
parser.add_argument(
"-n", "--count", type=int, default=None, help="limit events (default: all)"
)
parser.add_argument("--live", action="store_true", help="only live firehose events")
parser.add_argument(
"--signal", default="fm.plyr.track", help="signal collection to find repos"
)
parser.add_argument(
"--filter", default="fm.plyr.*", help="collection filter pattern"
)
parser.add_argument(
"--url", default="ws://localhost:2480/channel", help="tap websocket url"
)
args = parser.parse_args()
console.rule("[cyan bold]tap consumer[/]", style="cyan")
console.print("[dim]consuming ATProto records from the atmosphere[/]\n")
return asyncio.run(
consume(args.url, args.count, args.live, args.signal, args.filter)
)
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment