Skip to content

Instantly share code, notes, and snippets.

@RETr4ce
Last active January 17, 2025 22:16
Show Gist options
  • Select an option

  • Save RETr4ce/f25c2c9d87f688b69830dcd14f833394 to your computer and use it in GitHub Desktop.

Select an option

Save RETr4ce/f25c2c9d87f688b69830dcd14f833394 to your computer and use it in GitHub Desktop.
import asyncio
import json
import logging
import random
import re
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Pattern, Set, Tuple
import aiohttp
import websockets
from websockets.exceptions import ConnectionClosedError, InvalidStatus
# === SETTINGS ===
# External WebSocket URLs
EXTERNAL_WS_URLS = [
"wss://jetstream1.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post",
"wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post",
"wss://jetstream1.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post",
"wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post",
]
# Language filter: Set desired languages (e.g., ["en", "nl", "unknown"]) or [] to allow all
LANGUAGE_FILTER: List[str] = ["nl"]
# Enable or disable fetching of usernames
ENABLE_FETCH_USERNAME: bool = False # Set to True to enable username fetching
# DID Blocklist
DID_BLOCKLIST: Set[str] = {
}
# Regex Patterns Mapped to Categories
RAW_PATTERNS_WITH_CATEGORIES = {
r"\b(?:"
r"50PLUS|BBB|BIJ1|BVNL|CDA|CU|D66|#DENK|"
r"FvD|GL(?![a-zA-Z])|PvdA|JA21|LEF|LP|NSC|"
r"PartijvdSport|Piratenpartij|PLAN|PPBI|"
r"PvdA|PvdD|PVV|SGP|SP(?![a-zA-Z])|"
r"Splinter|SVDN|Volt|VVD"
r")\b": "Politieke partijen",
r"\b(?:"
r"Volkspartij\svoor\sVrijheid\sen\sDemocratie|Democraten\s66|"
r"GroenLinks|Partij\svan\sde\sArbeid|Partij\svoor\sde\sVrijheid|"
r"Christen-Democratisch\sAppèl|Socialistische\sPartij|"
r"Forum\svoor\sDemocratie|Partij\svoor\sde\sDieren|ChristenUnie|"
r"Volt\sNederland|JA21|Staatkundig\sGereformeerde\sPartij|"
r"#DENK|50PLUS|BoerBurgerBeweging|BIJ1|Piratenpartij|"
r"De\sGroenen|Belang\svan\sNederland|Groep\sVan\sHaga|"
r"Nieuw\sSociaal\sContract|Splinter|Libertaire\sPartij|"
r"Samen\svoor\sNederland|Nederland\smet\seen\sPLAN|"
r"Partij\svoor\sde\sSport|Politieke\sPartij\svoor\sBasisinkomen"
r")\b": "Politieke partijen",
r"\b(?:"
r"Segers|Bontenbal|Wilders|Timmermans|"
r"Klaver|Jetten|Omtzigt|Yeşilgöz|Zegerius|"
r"Bikker|Ouwehand|Baudet|van\sBaarle|"
r"Eerdmans|van\sder\sPlas|Stoffer|Dijk|"
r"Dassen|Olf|schoof"
r")\b": "Politieke leiders",
r"\b(?:"
r"ambu|ambulance|ambulancier|"
r"Politie|agent|Marechaussee|KMAR|"
r"Brandweer|BOA|GGD|RIVM|"
r"Douane|Landmacht|Marine|Traumateam|KNMI|"
r"ambtenaar|AIVD|justitie|JIT|RvdK|CJIB|DJI"
r")\b": "Overheids diensten",
r"\b(?:"
r"nu\.nl|nos|AD|Telegraaf|Tweakers|NRC|volkskrant|rtlnieuws|Trouw|Parool|"
r"Metro|dumpert|ntr|LINDA|Gelderlander|Stentor|Tubantia|Limburger|FD|"
r"destem|hartvannederland|powned|Reformatorisch\sDagblad|BNR"
r")\b": "Journalistiek",
r"\b(?:"
r"Amsterdam|Rotterdam|Den\sHaag|Utrecht|Eindhoven|Groningen|Almere|Breda|"
r"Nijmegen|Apeldoorn|Arnhem|Haarlem|Amersfoort|Zaanstad|Enschede|'s-Hertogenbosch|"
r"Haarlemmermeer|Zwolle|Zoetermeer|Leiden|Maastricht|Dordrecht|Ede|"
r"Alphen\saan\sden\sRijn|Leeuwarden|Alkmaar|Emmen|Westland|Delft|Venlo|"
r"Deventer|Helmond|Amstelveen|Purmerend|Hilversum|Oss|Sittard-Geleen|"
r"Súdwest-Fryslân|Heerlen|Meierijstad|Lelystad|Hoorn|Gouda|Voorne\saan\sZee|"
r"Roosendaal|Barneveld|Almelo|Vlaardingen|Schiedam|Assen"
r")\b": "Gemeentes",
r"\b(?:"
r"popo|pikieuw|osso|doezoe|salaris|nippen|spa|kinder\sSpa|N26|sexjob"
r")\b": "Straattaal",
r"\b(?:"
r"broenoe|Peru\sdingen|Colo|300\stest|ice|"
r"neustest|A-olie|boli"
r")\b": "Drugs",
r"\b(?:"
r"\bA[1-9A-HJ-NP-Za-km-z]{33}\b" # Neo & Gas
r"\b(?:1[1-9A-HJ-NP-Za-km-z]{25,34}|3[1-9A-HJ-NP-Za-km-z]{25,34}|bc1[q0-9a-z]{11,87})\b" # Bitcoin
r")\b": "Cryptocurrency",
r"\b(?:"
r"katvanger|mule|money\smule|ronselaar|kernlid|kernleden"
r")\b": "Jargon",
r"\b(?:"
r"zionisten|vernietigen|antisemitisme|genocide|Moslimhater|"
r"xenofobe|IslamIsTheProblem"
r")\b": "art. 131.1 Sr" ,
r"\b(?:"
r"crimeware|cyber\s*security|darkweb|infosec"
r")\b": "Cybersecurity en Crimeware",
}
# Regex Exclusion Patterns
EXCLUSION_RAW_PATTERNS_WITH_CATEGORIES = {
}
PATTERNS: List[Tuple[Pattern[str], str]] = [
(re.compile(pattern, re.IGNORECASE), category)
for pattern, category in RAW_PATTERNS_WITH_CATEGORIES.items()
]
EXCLUSION_PATTERNS: List[Tuple[Pattern[str], str]] = [
(re.compile(pattern, re.IGNORECASE), category)
for pattern, category in EXCLUSION_RAW_PATTERNS_WITH_CATEGORIES.items()
]
# === WEBSOCKET CLIENT ===
class WebSocketClient:
def __init__(self, urls: List[str], message_queue: asyncio.Queue) -> None:
self.urls = urls
self.message_queue = message_queue
self.current_url_index = 0
self.running = True
async def run(self) -> None:
retry_delay = 1
while self.running:
current_url = self.urls[self.current_url_index]
try:
await self.connect_and_listen(current_url)
retry_delay = 1
except (ConnectionClosedError, ConnectionError, OSError) as exc:
logging.warning(f"Connection error with {current_url}: {exc}")
except InvalidStatus as exc:
logging.error(f"Invalid status for {current_url}: {exc}")
except Exception as exc:
logging.error(f"Unhandled exception with {current_url}: {exc}", exc_info=True)
finally:
self.current_url_index = (self.current_url_index + 1) % len(self.urls)
delay = retry_delay + random.uniform(0, 1)
logging.info(f"Retrying with {self.urls[self.current_url_index]} after {delay:.2f} seconds...")
await asyncio.sleep(delay)
retry_delay = min(retry_delay * 2, 60)
async def connect_and_listen(self, url: str) -> None:
logging.info(f"Connecting to WebSocket at {url}...")
try:
async with websockets.connect(url) as websocket:
logging.info(f"Connected to WebSocket at {url}.")
async for message in websocket:
await self.message_queue.put(message)
except asyncio.TimeoutError:
logging.error(f"Connection attempt to {url} timed out.")
except Exception as exc:
logging.error(f"Unhandled exception with {url}: {exc}", exc_info=True)
async def stop(self) -> None:
self.running = False
# === MESSAGE PROCESSOR ===
class MessageProcessor:
def __init__(
self,
message_queue: asyncio.Queue,
patterns: List[Tuple[Pattern[str], str]],
exclusions: List[Tuple[Pattern[str], str]],
did_blocklist: Set[str],
language_filter: Optional[List[str]] = None,
fetch_username: bool = False,
session: Optional[aiohttp.ClientSession] = None,
) -> None:
self.message_queue = message_queue
self.patterns = patterns
self.exclusions = exclusions
self.did_blocklist = did_blocklist
self.language_filter = language_filter or []
self.fetch_username = fetch_username
self.lock = asyncio.Lock()
self.session = session or aiohttp.ClientSession()
self.running = True
logging.info(
f"Listening for messages, matching against {len(self.patterns)} patterns..."
)
async def worker(self) -> None:
while self.running:
try:
message = await asyncio.wait_for(self.message_queue.get(), timeout=1.0)
try:
await self.handle_message(message)
except Exception as exc:
logging.error(f"Error handling message: {exc}", exc_info=True)
finally:
self.message_queue.task_done()
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
logging.info("Worker task cancelled. Exiting...")
break
async def handle_message(self, message: str) -> None:
async with self.lock:
try:
data: Dict[str, Any] = json.loads(message)
await self.process_message(data)
except json.JSONDecodeError as exc:
logging.error(f"Failed to decode JSON message: {exc}")
async def process_message(self, data: Dict[str, Any]) -> None:
commit = data.get("commit", {})
record = commit.get("record", {})
text = record.get("text", "")
did = data.get("did", "unknown")
langs = record.get("langs", [])
rkey = commit.get("rkey", "unknown")
if did in self.did_blocklist or not text or self.contains_exclusion(text):
return
if not self.is_language_allowed(langs):
return
matched_category = self.match_patterns(text)
if self.patterns and not matched_category:
return
timestamp = self.get_timestamp(data.get("time_us"))
lang_str = ", ".join(langs) if langs else "unknown"
profile_url = f"https://bsky.app/profile/{did}/post/{rkey}"
username = await self.fetch_username_func(did) if self.fetch_username else None
user_display = username or did
logging.info(
f"{timestamp.isoformat()} - {user_display}: {text} "
f"[Matched: {matched_category or 'None'} - Language: {lang_str} - {profile_url}]\n\n"
)
def is_language_allowed(self, langs: List[str]) -> bool:
if not self.language_filter:
return True
return any(lang in self.language_filter for lang in langs or ["unknown"])
def match_patterns(self, text: str) -> Optional[str]:
for pattern, category in self.patterns:
if pattern.search(text):
return category
return None
def contains_exclusion(self, text: str) -> bool:
for pattern, _ in self.exclusions:
if pattern.search(text):
return True
return False
@staticmethod
def get_timestamp(time_us: Optional[int]) -> datetime:
try:
if time_us:
return datetime.fromtimestamp(time_us / 1_000_000, tz=timezone.utc)
return datetime.now(tz=timezone.utc)
except (ValueError, TypeError) as exc:
logging.warning(f"Invalid timestamp format: {time_us} ({exc})")
return datetime.now(tz=timezone.utc)
async def fetch_username_func(self, did: str) -> Optional[str]:
profile_url = f"https://bsky.app/profile/{did}"
try:
async with self.session.get(profile_url) as response:
if response.status == 200:
html = await response.text()
match = re.search(r'<meta property="profile:username" content="([^"]+)"', html)
if match:
return match.group(1)
except Exception as exc:
logging.error(f"Error fetching username for DID {did}: {exc}")
return None
async def stop(self) -> None:
self.running = False
await self.session.close()
# === SHUTDOWN FUNCTION ===
async def shutdown(client: WebSocketClient, processor: MessageProcessor, tasks: List[asyncio.Task]) -> None:
logging.info("Shutting down gracefully...")
await client.stop()
await processor.stop()
for task in tasks:
task.cancel()
try:
await asyncio.gather(*tasks, return_exceptions=True)
except asyncio.CancelledError:
logging.info("Tasks successfully cancelled.")
logging.info("Shutdown complete.")
# === MAIN FUNCTION ===
async def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
message_queue = asyncio.Queue(maxsize=100000)
client = WebSocketClient(urls=EXTERNAL_WS_URLS, message_queue=message_queue)
async with aiohttp.ClientSession() as session:
processor = MessageProcessor(
message_queue=message_queue,
patterns=PATTERNS,
exclusions=EXCLUSION_PATTERNS,
did_blocklist=DID_BLOCKLIST,
language_filter=LANGUAGE_FILTER,
fetch_username=ENABLE_FETCH_USERNAME,
session=session,
)
client_task = asyncio.create_task(client.run())
processor_tasks = [asyncio.create_task(processor.worker()) for _ in range(10)]
try:
await asyncio.gather(client_task, *processor_tasks)
except asyncio.CancelledError:
logging.info("Main tasks cancelled.")
finally:
await shutdown(client, processor, [client_task] + processor_tasks)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment