Last active
January 17, 2025 22:16
-
-
Save RETr4ce/f25c2c9d87f688b69830dcd14f833394 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import json | |
| import logging | |
| import random | |
| import re | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional, Pattern, Set, Tuple | |
| import aiohttp | |
| import websockets | |
| from websockets.exceptions import ConnectionClosedError, InvalidStatus | |
| # === SETTINGS === | |
| # External WebSocket URLs | |
| EXTERNAL_WS_URLS = [ | |
| "wss://jetstream1.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post", | |
| "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post", | |
| "wss://jetstream1.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post", | |
| "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post", | |
| ] | |
| # Language filter: Set desired languages (e.g., ["en", "nl", "unknown"]) or [] to allow all | |
| LANGUAGE_FILTER: List[str] = ["nl"] | |
| # Enable or disable fetching of usernames | |
| ENABLE_FETCH_USERNAME: bool = False # Set to True to enable username fetching | |
| # DID Blocklist | |
| DID_BLOCKLIST: Set[str] = { | |
| } | |
| # Regex Patterns Mapped to Categories | |
| RAW_PATTERNS_WITH_CATEGORIES = { | |
| r"\b(?:" | |
| r"50PLUS|BBB|BIJ1|BVNL|CDA|CU|D66|#DENK|" | |
| r"FvD|GL(?![a-zA-Z])|PvdA|JA21|LEF|LP|NSC|" | |
| r"PartijvdSport|Piratenpartij|PLAN|PPBI|" | |
| r"PvdA|PvdD|PVV|SGP|SP(?![a-zA-Z])|" | |
| r"Splinter|SVDN|Volt|VVD" | |
| r")\b": "Politieke partijen", | |
| r"\b(?:" | |
| r"Volkspartij\svoor\sVrijheid\sen\sDemocratie|Democraten\s66|" | |
| r"GroenLinks|Partij\svan\sde\sArbeid|Partij\svoor\sde\sVrijheid|" | |
| r"Christen-Democratisch\sAppèl|Socialistische\sPartij|" | |
| r"Forum\svoor\sDemocratie|Partij\svoor\sde\sDieren|ChristenUnie|" | |
| r"Volt\sNederland|JA21|Staatkundig\sGereformeerde\sPartij|" | |
| r"#DENK|50PLUS|BoerBurgerBeweging|BIJ1|Piratenpartij|" | |
| r"De\sGroenen|Belang\svan\sNederland|Groep\sVan\sHaga|" | |
| r"Nieuw\sSociaal\sContract|Splinter|Libertaire\sPartij|" | |
| r"Samen\svoor\sNederland|Nederland\smet\seen\sPLAN|" | |
| r"Partij\svoor\sde\sSport|Politieke\sPartij\svoor\sBasisinkomen" | |
| r")\b": "Politieke partijen", | |
| r"\b(?:" | |
| r"Segers|Bontenbal|Wilders|Timmermans|" | |
| r"Klaver|Jetten|Omtzigt|Yeşilgöz|Zegerius|" | |
| r"Bikker|Ouwehand|Baudet|van\sBaarle|" | |
| r"Eerdmans|van\sder\sPlas|Stoffer|Dijk|" | |
| r"Dassen|Olf|schoof" | |
| r")\b": "Politieke leiders", | |
| r"\b(?:" | |
| r"ambu|ambulance|ambulancier|" | |
| r"Politie|agent|Marechaussee|KMAR|" | |
| r"Brandweer|BOA|GGD|RIVM|" | |
| r"Douane|Landmacht|Marine|Traumateam|KNMI|" | |
| r"ambtenaar|AIVD|justitie|JIT|RvdK|CJIB|DJI" | |
| r")\b": "Overheids diensten", | |
| r"\b(?:" | |
| r"nu\.nl|nos|AD|Telegraaf|Tweakers|NRC|volkskrant|rtlnieuws|Trouw|Parool|" | |
| r"Metro|dumpert|ntr|LINDA|Gelderlander|Stentor|Tubantia|Limburger|FD|" | |
| r"destem|hartvannederland|powned|Reformatorisch\sDagblad|BNR" | |
| r")\b": "Journalistiek", | |
| r"\b(?:" | |
| r"Amsterdam|Rotterdam|Den\sHaag|Utrecht|Eindhoven|Groningen|Almere|Breda|" | |
| r"Nijmegen|Apeldoorn|Arnhem|Haarlem|Amersfoort|Zaanstad|Enschede|'s-Hertogenbosch|" | |
| r"Haarlemmermeer|Zwolle|Zoetermeer|Leiden|Maastricht|Dordrecht|Ede|" | |
| r"Alphen\saan\sden\sRijn|Leeuwarden|Alkmaar|Emmen|Westland|Delft|Venlo|" | |
| r"Deventer|Helmond|Amstelveen|Purmerend|Hilversum|Oss|Sittard-Geleen|" | |
| r"Súdwest-Fryslân|Heerlen|Meierijstad|Lelystad|Hoorn|Gouda|Voorne\saan\sZee|" | |
| r"Roosendaal|Barneveld|Almelo|Vlaardingen|Schiedam|Assen" | |
| r")\b": "Gemeentes", | |
| r"\b(?:" | |
| r"popo|pikieuw|osso|doezoe|salaris|nippen|spa|kinder\sSpa|N26|sexjob" | |
| r")\b": "Straattaal", | |
| r"\b(?:" | |
| r"broenoe|Peru\sdingen|Colo|300\stest|ice|" | |
| r"neustest|A-olie|boli" | |
| r")\b": "Drugs", | |
| r"\b(?:" | |
| r"\bA[1-9A-HJ-NP-Za-km-z]{33}\b" # Neo & Gas | |
| r"\b(?:1[1-9A-HJ-NP-Za-km-z]{25,34}|3[1-9A-HJ-NP-Za-km-z]{25,34}|bc1[q0-9a-z]{11,87})\b" # Bitcoin | |
| r")\b": "Cryptocurrency", | |
| r"\b(?:" | |
| r"katvanger|mule|money\smule|ronselaar|kernlid|kernleden" | |
| r")\b": "Jargon", | |
| r"\b(?:" | |
| r"zionisten|vernietigen|antisemitisme|genocide|Moslimhater|" | |
| r"xenofobe|IslamIsTheProblem" | |
| r")\b": "art. 131.1 Sr" , | |
| r"\b(?:" | |
| r"crimeware|cyber\s*security|darkweb|infosec" | |
| r")\b": "Cybersecurity en Crimeware", | |
| } | |
| # Regex Exclusion Patterns | |
| EXCLUSION_RAW_PATTERNS_WITH_CATEGORIES = { | |
| } | |
| PATTERNS: List[Tuple[Pattern[str], str]] = [ | |
| (re.compile(pattern, re.IGNORECASE), category) | |
| for pattern, category in RAW_PATTERNS_WITH_CATEGORIES.items() | |
| ] | |
| EXCLUSION_PATTERNS: List[Tuple[Pattern[str], str]] = [ | |
| (re.compile(pattern, re.IGNORECASE), category) | |
| for pattern, category in EXCLUSION_RAW_PATTERNS_WITH_CATEGORIES.items() | |
| ] | |
| # === WEBSOCKET CLIENT === | |
| class WebSocketClient: | |
| def __init__(self, urls: List[str], message_queue: asyncio.Queue) -> None: | |
| self.urls = urls | |
| self.message_queue = message_queue | |
| self.current_url_index = 0 | |
| self.running = True | |
| async def run(self) -> None: | |
| retry_delay = 1 | |
| while self.running: | |
| current_url = self.urls[self.current_url_index] | |
| try: | |
| await self.connect_and_listen(current_url) | |
| retry_delay = 1 | |
| except (ConnectionClosedError, ConnectionError, OSError) as exc: | |
| logging.warning(f"Connection error with {current_url}: {exc}") | |
| except InvalidStatus as exc: | |
| logging.error(f"Invalid status for {current_url}: {exc}") | |
| except Exception as exc: | |
| logging.error(f"Unhandled exception with {current_url}: {exc}", exc_info=True) | |
| finally: | |
| self.current_url_index = (self.current_url_index + 1) % len(self.urls) | |
| delay = retry_delay + random.uniform(0, 1) | |
| logging.info(f"Retrying with {self.urls[self.current_url_index]} after {delay:.2f} seconds...") | |
| await asyncio.sleep(delay) | |
| retry_delay = min(retry_delay * 2, 60) | |
| async def connect_and_listen(self, url: str) -> None: | |
| logging.info(f"Connecting to WebSocket at {url}...") | |
| try: | |
| async with websockets.connect(url) as websocket: | |
| logging.info(f"Connected to WebSocket at {url}.") | |
| async for message in websocket: | |
| await self.message_queue.put(message) | |
| except asyncio.TimeoutError: | |
| logging.error(f"Connection attempt to {url} timed out.") | |
| except Exception as exc: | |
| logging.error(f"Unhandled exception with {url}: {exc}", exc_info=True) | |
| async def stop(self) -> None: | |
| self.running = False | |
| # === MESSAGE PROCESSOR === | |
| class MessageProcessor: | |
| def __init__( | |
| self, | |
| message_queue: asyncio.Queue, | |
| patterns: List[Tuple[Pattern[str], str]], | |
| exclusions: List[Tuple[Pattern[str], str]], | |
| did_blocklist: Set[str], | |
| language_filter: Optional[List[str]] = None, | |
| fetch_username: bool = False, | |
| session: Optional[aiohttp.ClientSession] = None, | |
| ) -> None: | |
| self.message_queue = message_queue | |
| self.patterns = patterns | |
| self.exclusions = exclusions | |
| self.did_blocklist = did_blocklist | |
| self.language_filter = language_filter or [] | |
| self.fetch_username = fetch_username | |
| self.lock = asyncio.Lock() | |
| self.session = session or aiohttp.ClientSession() | |
| self.running = True | |
| logging.info( | |
| f"Listening for messages, matching against {len(self.patterns)} patterns..." | |
| ) | |
| async def worker(self) -> None: | |
| while self.running: | |
| try: | |
| message = await asyncio.wait_for(self.message_queue.get(), timeout=1.0) | |
| try: | |
| await self.handle_message(message) | |
| except Exception as exc: | |
| logging.error(f"Error handling message: {exc}", exc_info=True) | |
| finally: | |
| self.message_queue.task_done() | |
| except asyncio.TimeoutError: | |
| continue | |
| except asyncio.CancelledError: | |
| logging.info("Worker task cancelled. Exiting...") | |
| break | |
| async def handle_message(self, message: str) -> None: | |
| async with self.lock: | |
| try: | |
| data: Dict[str, Any] = json.loads(message) | |
| await self.process_message(data) | |
| except json.JSONDecodeError as exc: | |
| logging.error(f"Failed to decode JSON message: {exc}") | |
| async def process_message(self, data: Dict[str, Any]) -> None: | |
| commit = data.get("commit", {}) | |
| record = commit.get("record", {}) | |
| text = record.get("text", "") | |
| did = data.get("did", "unknown") | |
| langs = record.get("langs", []) | |
| rkey = commit.get("rkey", "unknown") | |
| if did in self.did_blocklist or not text or self.contains_exclusion(text): | |
| return | |
| if not self.is_language_allowed(langs): | |
| return | |
| matched_category = self.match_patterns(text) | |
| if self.patterns and not matched_category: | |
| return | |
| timestamp = self.get_timestamp(data.get("time_us")) | |
| lang_str = ", ".join(langs) if langs else "unknown" | |
| profile_url = f"https://bsky.app/profile/{did}/post/{rkey}" | |
| username = await self.fetch_username_func(did) if self.fetch_username else None | |
| user_display = username or did | |
| logging.info( | |
| f"{timestamp.isoformat()} - {user_display}: {text} " | |
| f"[Matched: {matched_category or 'None'} - Language: {lang_str} - {profile_url}]\n\n" | |
| ) | |
| def is_language_allowed(self, langs: List[str]) -> bool: | |
| if not self.language_filter: | |
| return True | |
| return any(lang in self.language_filter for lang in langs or ["unknown"]) | |
| def match_patterns(self, text: str) -> Optional[str]: | |
| for pattern, category in self.patterns: | |
| if pattern.search(text): | |
| return category | |
| return None | |
| def contains_exclusion(self, text: str) -> bool: | |
| for pattern, _ in self.exclusions: | |
| if pattern.search(text): | |
| return True | |
| return False | |
| @staticmethod | |
| def get_timestamp(time_us: Optional[int]) -> datetime: | |
| try: | |
| if time_us: | |
| return datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc) | |
| return datetime.now(tz=timezone.utc) | |
| except (ValueError, TypeError) as exc: | |
| logging.warning(f"Invalid timestamp format: {time_us} ({exc})") | |
| return datetime.now(tz=timezone.utc) | |
| async def fetch_username_func(self, did: str) -> Optional[str]: | |
| profile_url = f"https://bsky.app/profile/{did}" | |
| try: | |
| async with self.session.get(profile_url) as response: | |
| if response.status == 200: | |
| html = await response.text() | |
| match = re.search(r'<meta property="profile:username" content="([^"]+)"', html) | |
| if match: | |
| return match.group(1) | |
| except Exception as exc: | |
| logging.error(f"Error fetching username for DID {did}: {exc}") | |
| return None | |
| async def stop(self) -> None: | |
| self.running = False | |
| await self.session.close() | |
| # === SHUTDOWN FUNCTION === | |
| async def shutdown(client: WebSocketClient, processor: MessageProcessor, tasks: List[asyncio.Task]) -> None: | |
| logging.info("Shutting down gracefully...") | |
| await client.stop() | |
| await processor.stop() | |
| for task in tasks: | |
| task.cancel() | |
| try: | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| except asyncio.CancelledError: | |
| logging.info("Tasks successfully cancelled.") | |
| logging.info("Shutdown complete.") | |
| # === MAIN FUNCTION === | |
| async def main() -> None: | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| message_queue = asyncio.Queue(maxsize=100000) | |
| client = WebSocketClient(urls=EXTERNAL_WS_URLS, message_queue=message_queue) | |
| async with aiohttp.ClientSession() as session: | |
| processor = MessageProcessor( | |
| message_queue=message_queue, | |
| patterns=PATTERNS, | |
| exclusions=EXCLUSION_PATTERNS, | |
| did_blocklist=DID_BLOCKLIST, | |
| language_filter=LANGUAGE_FILTER, | |
| fetch_username=ENABLE_FETCH_USERNAME, | |
| session=session, | |
| ) | |
| client_task = asyncio.create_task(client.run()) | |
| processor_tasks = [asyncio.create_task(processor.worker()) for _ in range(10)] | |
| try: | |
| await asyncio.gather(client_task, *processor_tasks) | |
| except asyncio.CancelledError: | |
| logging.info("Main tasks cancelled.") | |
| finally: | |
| await shutdown(client, processor, [client_task] + processor_tasks) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment