Created
November 26, 2024 11:38
-
-
Save RETr4ce/853e0d14e19622b4f3c1c912ddc87ea3 to your computer and use it in GitHub Desktop.
https://docs.bsky.app/docs/api/app-bsky-feed-get-author-feed / https://docs.bsky.app/docs/advanced-guides/rate-limits
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| from getpass import getpass | |
| from pathlib import Path | |
| from time import time | |
| from typing import List, Optional | |
| import aiohttp | |
| from aiohttp import ClientSession | |
| from argparse import ArgumentParser | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| # load_dotenv(dotenv_path="path/to/.env") | |
| # BSKY_IDENTIFIER=your-username | |
| # BSKY_PASSWORD=your-password | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler("bsky_client.log", encoding="utf-8"), | |
| ], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Rate limit constants | |
| RATE_LIMIT = {"per_second": 50, "per_hour": 1500, "per_day": 10000} | |
| class RateLimiter: | |
| """ | |
| Handles rate limiting for requests based on pre-defined limits. | |
| """ | |
| def __init__(self) -> None: | |
| self.request_times = {key: [] for key in RATE_LIMIT.keys()} | |
| self.lock = asyncio.Lock() | |
| async def is_allowed(self) -> bool: | |
| async with self.lock: | |
| current_time = time() | |
| self._cleanup_requests(current_time) | |
| if all(len(self.request_times[key]) < RATE_LIMIT[key] for key in RATE_LIMIT): | |
| self._log_request(current_time) | |
| return True | |
| logger.debug("Rate limit exceeded.") | |
| return False | |
| def _cleanup_requests(self, current_time: float) -> None: | |
| for key, duration in [("per_second", 1), ("per_hour", 3600), ("per_day", 86400)]: | |
| self.request_times[key] = [ | |
| t for t in self.request_times[key] if t > current_time - duration | |
| ] | |
| def _log_request(self, current_time: float) -> None: | |
| for key in RATE_LIMIT: | |
| self.request_times[key].append(current_time) | |
| class BlueSkyClient: | |
| """ | |
| Client for interacting with the BlueSky API. | |
| """ | |
| BASE_URL = "https://bsky.social/xrpc" | |
| def __init__(self, auth_token: str) -> None: | |
| self.auth_token = auth_token | |
| self.headers = { | |
| "Authorization": f"Bearer {auth_token}", | |
| "Content-Type": "application/json", | |
| } | |
| self.rate_limiter = RateLimiter() | |
| async def fetch_timeline(self, actor: str, session: ClientSession) -> List[dict]: | |
| fetched_posts, cursor = [], None | |
| page_count = 0 | |
| while True: | |
| if not await self.rate_limiter.is_allowed(): | |
| logger.info("Rate limit reached. Waiting for 1 second...") | |
| await asyncio.sleep(1) | |
| continue | |
| params = {"actor": actor, "limit": 100} | |
| if cursor: | |
| params["cursor"] = cursor | |
| logger.info("Fetching page %d for actor: %s. Current cursor: %s", page_count + 1, actor, cursor or "None") | |
| try: | |
| async with session.get( | |
| f"{self.BASE_URL}/app.bsky.feed.getAuthorFeed", | |
| headers=self.headers, | |
| params=params, | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| posts = data.get("feed", []) | |
| fetched_posts.extend(posts) | |
| cursor = data.get("cursor") | |
| page_count += 1 | |
| logger.info( | |
| "Fetched %d posts from page %d for actor: %s. Next cursor: %s", | |
| len(posts), page_count, actor, cursor or "None" | |
| ) | |
| if not cursor: | |
| logger.info("Completed fetching for actor: %s. Total posts: %d", actor, len(fetched_posts)) | |
| break | |
| else: | |
| error_details = await response.text() | |
| logger.error( | |
| "Failed to fetch page %d for actor: %s. HTTP Status: %d. Details: %s", | |
| page_count + 1, actor, response.status, error_details | |
| ) | |
| if '"UpstreamFailure"' in error_details: | |
| logger.warning("Upstream failure for actor: %s. Continuing with fetched data.", actor) | |
| break | |
| except Exception as e: | |
| logger.error("Error fetching page %d for actor: %s. Details: %s", page_count + 1, actor, e) | |
| break | |
| await asyncio.sleep(1 / RATE_LIMIT["per_second"]) | |
| return fetched_posts | |
| async def fetch_and_save_timeline(self, actor: str, output_dir: Path, session: ClientSession) -> None: | |
| try: | |
| posts = await self.fetch_timeline(actor, session) | |
| output_path = output_dir / f"{actor}.json" | |
| output_path.write_text(json.dumps(posts, indent=4), encoding="utf-8") | |
| logger.info("Timeline for %s saved to %s", actor, output_path) | |
| except Exception as e: | |
| logger.error("Error saving timeline for %s: %s", actor, e) | |
| async def fetch_multiple_timelines(self, actors: List[str], output_dir: Path) -> None: | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [self.fetch_and_save_timeline(actor, output_dir, session) for actor in actors] | |
| await asyncio.gather(*tasks) | |
| class Authenticator: | |
| """ | |
| Handles authentication with the BlueSky API. | |
| """ | |
| @staticmethod | |
| async def authenticate() -> Optional[str]: | |
| identifier = os.getenv("BSKY_IDENTIFIER") or input("Enter username: ").strip() | |
| password = os.getenv("BSKY_PASSWORD") or getpass("Enter password: ").strip() | |
| if not identifier or not password: | |
| logger.error("Missing credentials.") | |
| return None | |
| auth_url = "https://bsky.social/xrpc/com.atproto.server.createSession" | |
| async with aiohttp.ClientSession() as session: | |
| try: | |
| async with session.post(auth_url, json={"identifier": identifier, "password": password}) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| logger.info("Authenticated successfully.") | |
| return data.get("accessJwt") | |
| logger.error("Authentication failed: %s", await response.text()) | |
| except Exception as e: | |
| logger.error("Error during authentication: %s", e) | |
| return None | |
| async def parse_arguments() -> ArgumentParser: | |
| parser = ArgumentParser(description="Fetch timelines from the BlueSky API.") | |
| group = parser.add_mutually_exclusive_group(required=True) | |
| group.add_argument("--list", type=Path, help="File containing a list of actor handles or DIDs. Per newline") | |
| group.add_argument("--actor", type=str, help="Single actor handle or DID.") | |
| parser.add_argument("--output", type=Path, default=Path("./timelines"), help="Directory to save timelines. for specific use cases") | |
| return parser.parse_args() | |
| async def main() -> None: | |
| """ | |
| Main function to fetch timelines. | |
| """ | |
| args = await parse_arguments() | |
| output_dir = args.output.resolve() | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| auth_token = await Authenticator.authenticate() | |
| if not auth_token: | |
| exit(1) | |
| client = BlueSkyClient(auth_token) | |
| if args.list: | |
| try: | |
| actors = [line.strip() for line in args.list.read_text().splitlines() if line.strip()] | |
| await client.fetch_multiple_timelines(actors, output_dir) | |
| except Exception as e: | |
| logger.error("Error reading actor list: %s", e) | |
| else: | |
| async with aiohttp.ClientSession() as session: | |
| await client.fetch_and_save_timeline(args.actor.strip(), output_dir, session) | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| logger.info("Program interrupted.") | |
| except Exception as e: | |
| logger.critical("Fatal error: %s", e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment