Last active
December 13, 2025 08:36
-
-
Save zzstoatzz/90f138b90b05c369ecd0107d5e8358b6 to your computer and use it in GitHub Desktop.
consume ATProto lexicon events from tap
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = ["websockets", "rich"] | |
| # /// | |
| """consume ATProto lexicon events from tap with formatted output. | |
| automatically starts tap if not running. | |
| usage: | |
| uv run https://gist.github.com/zzstoatzz/90f138b90b05c369ecd0107d5e8358b6 | |
| uv run https://gist.github.com/zzstoatzz/90f138b90b05c369ecd0107d5e8358b6 --signal app.bsky.feed.post --filter "app.bsky.feed.*" | |
| flags: | |
| -n, --count N limit to N events (default: all) | |
| --live only live firehose events (skip backfill) | |
| --signal COLLECTION signal collection to find repos (default: fm.plyr.track) | |
| --filter PATTERN collection filter pattern (default: fm.plyr.*) | |
| --url URL tap websocket url (default: ws://localhost:2480/channel) | |
| """ | |
| import argparse | |
| import asyncio | |
| import json | |
| import os | |
| import shutil | |
| import socket | |
| import subprocess | |
| import time | |
| from typing import Any | |
| import websockets | |
| from rich.console import Console | |
| from rich.live import Live | |
| from rich.panel import Panel | |
| from rich.text import Text | |
| console = Console() | |
| TAP_PORT = 2480 | |
| COLOR_MAP = { | |
| "tracks": "cyan", | |
| "likes": "red", | |
| "lists": "magenta", | |
| "comments": "blue", | |
| "identities": "green", | |
| } | |
| def is_tap_running() -> bool: | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| return s.connect_ex(("localhost", TAP_PORT)) == 0 | |
| def start_tap(signal: str, filter_pattern: str) -> subprocess.Popen[bytes] | None: | |
| tap_bin = shutil.which("tap") or os.path.expanduser("~/go/bin/tap") | |
| if not os.path.exists(tap_bin): | |
| console.print("[red]tap binary not found[/]") | |
| console.print( | |
| "[dim]install: go install github.com/bluesky-social/indigo/cmd/tap@latest[/]" | |
| ) | |
| return None | |
| return subprocess.Popen( | |
| [tap_bin, "run"], | |
| env={ | |
| **os.environ, | |
| "TAP_SIGNAL_COLLECTION": signal, | |
| "TAP_COLLECTION_FILTERS": filter_pattern, | |
| }, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| ) | |
| PANEL_WIDTH = 70 | |
| def format_did(did: str) -> str: | |
| if did.startswith("did:plc:"): | |
| return f"did:plc:{did[8:16]}…" | |
| return did[:24] + "…" if len(did) > 24 else did | |
| def format_track(record: dict[str, Any], live: bool, did: str) -> Panel: | |
| badge = "[green]LIVE[/]" if live else "[dim]backfill[/]" | |
| text = Text() | |
| text.append("♫ ", style="cyan bold") | |
| text.append(f"{record.get('title', '?')}\n", style="white bold") | |
| text.append(" by ", style="dim") | |
| text.append(f"{record.get('artist', '?')}\n", style="yellow") | |
| text.append(f" {format_did(did)}", style="dim") | |
| return Panel( | |
| text, title=f"[cyan]track[/] {badge}", border_style="cyan", width=PANEL_WIDTH | |
| ) | |
| def format_like(record: dict[str, Any], live: bool, did: str) -> Panel: | |
| badge = "[green]LIVE[/]" if live else "[dim]backfill[/]" | |
| subject = record.get("subject", {}) | |
| subject_uri = subject.get("uri", "?") | |
| # extract rkey from at://did/collection/rkey | |
| rkey = subject_uri.split("/")[-1] if "/" in subject_uri else subject_uri | |
| text = Text() | |
| text.append("♥ ", style="red bold") | |
| text.append(f"{rkey}\n", style="white") | |
| text.append(f" {format_did(did)}", style="dim") | |
| return Panel( | |
| text, title=f"[red]like[/] {badge}", border_style="red", width=PANEL_WIDTH | |
| ) | |
| def format_list(record: dict[str, Any], live: bool, did: str) -> Panel: | |
| badge = "[green]LIVE[/]" if live else "[dim]backfill[/]" | |
| text = Text() | |
| text.append("📋 ", style="magenta bold") | |
| text.append(f"{record.get('name') or '(unnamed)'}\n", style="white bold") | |
| text.append(" type: ", style="dim") | |
| text.append(f"{record.get('listType', '?')}\n", style="magenta") | |
| text.append(" items: ", style="dim") | |
| text.append(f"{len(record.get('items', []))}\n", style="white") | |
| text.append(f" {format_did(did)}", style="dim") | |
| return Panel( | |
| text, | |
| title=f"[magenta]list[/] {badge}", | |
| border_style="magenta", | |
| width=PANEL_WIDTH, | |
| ) | |
| def format_comment(record: dict[str, Any], live: bool, did: str) -> Panel: | |
| badge = "[green]LIVE[/]" if live else "[dim]backfill[/]" | |
| comment_text = record.get("text", "")[:50] | |
| text = Text() | |
| text.append("💬 ", style="blue bold") | |
| text.append(f'"{comment_text}"\n', style="italic") | |
| text.append(" at ", style="dim") | |
| text.append(f"{record.get('timestampMs', 0)}ms\n", style="blue") | |
| text.append(f" {format_did(did)}", style="dim") | |
| return Panel( | |
| text, title=f"[blue]comment[/] {badge}", border_style="blue", width=PANEL_WIDTH | |
| ) | |
| def format_identity(ident: dict[str, Any]) -> Panel: | |
| status = ident.get("status", "?") | |
| status_style = "green" if status == "active" else "yellow" | |
| did = ident.get("did", "?") | |
| text = Text() | |
| text.append("👤 ", style="green bold") | |
| text.append(f"@{ident.get('handle', '?')}\n", style="white bold") | |
| text.append(f" {format_did(did)}\n", style="dim") | |
| text.append(" status: ", style="dim") | |
| text.append(status, style=status_style) | |
| return Panel( | |
| text, title="[green]identity[/]", border_style="green", width=PANEL_WIDTH | |
| ) | |
| def make_status(stats: dict[str, int], elapsed: float, backfill_done: bool) -> Text: | |
| total = sum(stats.values()) | |
| phase = "[green]live[/]" if backfill_done else "[yellow]backfill[/]" | |
| parts = [f"{k[0]}:{v}" for k, v in stats.items() if v > 0] | |
| return Text.from_markup( | |
| f" {phase} | {total} events | {elapsed:.1f}s | {' '.join(parts)}" | |
| ) | |
| async def consume( | |
| url: str, | |
| max_events: int | None, | |
| live_only: bool, | |
| signal: str, | |
| filter_pattern: str, | |
| ) -> int: | |
| tap_proc: subprocess.Popen[bytes] | None = None | |
| if not is_tap_running(): | |
| console.print("[yellow]tap not running, starting...[/]") | |
| console.print(f"[dim] signal: {signal}[/]") | |
| console.print(f"[dim] filter: {filter_pattern}[/]") | |
| tap_proc = start_tap(signal, filter_pattern) | |
| if tap_proc is None: | |
| return 1 | |
| for _ in range(50): | |
| if is_tap_running(): | |
| break | |
| await asyncio.sleep(0.1) | |
| else: | |
| console.print("[red]tap failed to start[/]") | |
| tap_proc.terminate() | |
| return 1 | |
| console.print(f"[green]tap started (pid {tap_proc.pid})[/]") | |
| stats = {"tracks": 0, "likes": 0, "lists": 0, "comments": 0, "identities": 0} | |
| count = 0 | |
| backfill_done = False | |
| start_time = time.monotonic() | |
| console.print(f"\n[bold]connecting to {url}...[/]\n") | |
| ws = None | |
| try: | |
| ws = await websockets.connect(url) | |
| limit_msg = f" (limit: {max_events})" if max_events else " (all events)" | |
| console.print(f"[green]connected![/]{limit_msg}\n") | |
| with Live( | |
| make_status(stats, 0, backfill_done), | |
| console=console, | |
| refresh_per_second=4, | |
| ) as status: | |
| while max_events is None or count < max_events: | |
| try: | |
| msg = await asyncio.wait_for(ws.recv(), timeout=0.5) | |
| except asyncio.TimeoutError: | |
| if count > 0 and not live_only: | |
| break | |
| if live_only or backfill_done: | |
| status.update( | |
| Text.from_markup( | |
| f" [dim]waiting for live events...[/] | {count} events | {time.monotonic() - start_time:.1f}s" | |
| ) | |
| ) | |
| continue | |
| data: dict[str, Any] = json.loads(msg) | |
| event_type = data.get("type") | |
| if event_type == "record": | |
| meta: dict[str, Any] = data.get("record", {}) | |
| is_live: bool = meta.get("live", False) | |
| if is_live and not backfill_done: | |
| backfill_done = True | |
| if live_only and not is_live: | |
| await ws.send(json.dumps({"ack": data.get("id")})) | |
| continue | |
| panel: Panel | None = None | |
| if event_type == "record": | |
| meta = data.get("record", {}) | |
| collection: str = meta.get("collection", "") | |
| record: dict[str, Any] = meta.get("record", {}) | |
| from_live: bool = meta.get("live", False) | |
| did: str = meta.get("did", "") | |
| if "track" in collection: | |
| panel = format_track(record, from_live, did) | |
| stats["tracks"] += 1 | |
| elif "like" in collection: | |
| panel = format_like(record, from_live, did) | |
| stats["likes"] += 1 | |
| elif "list" in collection: | |
| panel = format_list(record, from_live, did) | |
| stats["lists"] += 1 | |
| elif "comment" in collection: | |
| panel = format_comment(record, from_live, did) | |
| stats["comments"] += 1 | |
| elif event_type == "identity": | |
| panel = format_identity(data.get("identity", {})) | |
| stats["identities"] += 1 | |
| if panel: | |
| status.console.print(panel) | |
| count += 1 | |
| status.update( | |
| make_status( | |
| stats, time.monotonic() - start_time, backfill_done | |
| ) | |
| ) | |
| await ws.send(json.dumps({"ack": data.get("id")})) | |
| except websockets.exceptions.ConnectionClosed: | |
| console.print("[yellow]connection closed[/]") | |
| except KeyboardInterrupt: | |
| console.print("\n[yellow]interrupted[/]") | |
| finally: | |
| # kill tap first, then just abandon websocket (no graceful close) | |
| if tap_proc is not None: | |
| tap_proc.kill() | |
| tap_proc.wait() | |
| elapsed = time.monotonic() - start_time | |
| console.print() | |
| labels = [k for k, v in stats.items() if v > 0] | |
| values = [v for v in stats.values() if v > 0] | |
| if values: | |
| max_val = max(values) | |
| max_label_len = max(len(label) for label in labels) | |
| for label, value in zip(labels, values): | |
| bar_width = int(40 * value / max_val) if max_val > 0 else 0 | |
| bar = "▇" * bar_width | |
| console.print(f" {label:>{max_label_len}} [{COLOR_MAP[label]}]{bar}[/] {value}") | |
| console.print(f"[bold]total: {count}[/] in [cyan]{elapsed:.2f}s[/]") | |
| if count > 0 and elapsed > 0: | |
| console.print(f"[dim]{count / elapsed:.1f} events/sec[/]") | |
| return 0 | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="consume ATProto lexicon events from tap" | |
| ) | |
| parser.add_argument( | |
| "-n", "--count", type=int, default=None, help="limit events (default: all)" | |
| ) | |
| parser.add_argument("--live", action="store_true", help="only live firehose events") | |
| parser.add_argument( | |
| "--signal", default="fm.plyr.track", help="signal collection to find repos" | |
| ) | |
| parser.add_argument( | |
| "--filter", default="fm.plyr.*", help="collection filter pattern" | |
| ) | |
| parser.add_argument( | |
| "--url", default="ws://localhost:2480/channel", help="tap websocket url" | |
| ) | |
| args = parser.parse_args() | |
| console.rule("[cyan bold]tap consumer[/]", style="cyan") | |
| console.print("[dim]consuming ATProto records from the atmosphere[/]\n") | |
| return asyncio.run( | |
| consume(args.url, args.count, args.live, args.signal, args.filter) | |
| ) | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment