Skip to content

Instantly share code, notes, and snippets.

@RETr4ce
Created November 26, 2024 11:38
Show Gist options
  • Select an option

  • Save RETr4ce/853e0d14e19622b4f3c1c912ddc87ea3 to your computer and use it in GitHub Desktop.

Select an option

Save RETr4ce/853e0d14e19622b4f3c1c912ddc87ea3 to your computer and use it in GitHub Desktop.
import asyncio
import json
import logging
import os
from getpass import getpass
from pathlib import Path
from time import time
from typing import List, Optional
import aiohttp
from aiohttp import ClientSession
from argparse import ArgumentParser
from dotenv import load_dotenv
# Load environment variables
# load_dotenv(dotenv_path="path/to/.env")
# BSKY_IDENTIFIER=your-username
# BSKY_PASSWORD=your-password
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("bsky_client.log", encoding="utf-8"),
],
)
logger = logging.getLogger(__name__)
# Rate limit constants
RATE_LIMIT = {"per_second": 50, "per_hour": 1500, "per_day": 10000}
class RateLimiter:
"""
Handles rate limiting for requests based on pre-defined limits.
"""
def __init__(self) -> None:
self.request_times = {key: [] for key in RATE_LIMIT.keys()}
self.lock = asyncio.Lock()
async def is_allowed(self) -> bool:
async with self.lock:
current_time = time()
self._cleanup_requests(current_time)
if all(len(self.request_times[key]) < RATE_LIMIT[key] for key in RATE_LIMIT):
self._log_request(current_time)
return True
logger.debug("Rate limit exceeded.")
return False
def _cleanup_requests(self, current_time: float) -> None:
for key, duration in [("per_second", 1), ("per_hour", 3600), ("per_day", 86400)]:
self.request_times[key] = [
t for t in self.request_times[key] if t > current_time - duration
]
def _log_request(self, current_time: float) -> None:
for key in RATE_LIMIT:
self.request_times[key].append(current_time)
class BlueSkyClient:
"""
Client for interacting with the BlueSky API.
"""
BASE_URL = "https://bsky.social/xrpc"
def __init__(self, auth_token: str) -> None:
self.auth_token = auth_token
self.headers = {
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json",
}
self.rate_limiter = RateLimiter()
async def fetch_timeline(self, actor: str, session: ClientSession) -> List[dict]:
fetched_posts, cursor = [], None
page_count = 0
while True:
if not await self.rate_limiter.is_allowed():
logger.info("Rate limit reached. Waiting for 1 second...")
await asyncio.sleep(1)
continue
params = {"actor": actor, "limit": 100}
if cursor:
params["cursor"] = cursor
logger.info("Fetching page %d for actor: %s. Current cursor: %s", page_count + 1, actor, cursor or "None")
try:
async with session.get(
f"{self.BASE_URL}/app.bsky.feed.getAuthorFeed",
headers=self.headers,
params=params,
) as response:
if response.status == 200:
data = await response.json()
posts = data.get("feed", [])
fetched_posts.extend(posts)
cursor = data.get("cursor")
page_count += 1
logger.info(
"Fetched %d posts from page %d for actor: %s. Next cursor: %s",
len(posts), page_count, actor, cursor or "None"
)
if not cursor:
logger.info("Completed fetching for actor: %s. Total posts: %d", actor, len(fetched_posts))
break
else:
error_details = await response.text()
logger.error(
"Failed to fetch page %d for actor: %s. HTTP Status: %d. Details: %s",
page_count + 1, actor, response.status, error_details
)
if '"UpstreamFailure"' in error_details:
logger.warning("Upstream failure for actor: %s. Continuing with fetched data.", actor)
break
except Exception as e:
logger.error("Error fetching page %d for actor: %s. Details: %s", page_count + 1, actor, e)
break
await asyncio.sleep(1 / RATE_LIMIT["per_second"])
return fetched_posts
async def fetch_and_save_timeline(self, actor: str, output_dir: Path, session: ClientSession) -> None:
try:
posts = await self.fetch_timeline(actor, session)
output_path = output_dir / f"{actor}.json"
output_path.write_text(json.dumps(posts, indent=4), encoding="utf-8")
logger.info("Timeline for %s saved to %s", actor, output_path)
except Exception as e:
logger.error("Error saving timeline for %s: %s", actor, e)
async def fetch_multiple_timelines(self, actors: List[str], output_dir: Path) -> None:
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_and_save_timeline(actor, output_dir, session) for actor in actors]
await asyncio.gather(*tasks)
class Authenticator:
"""
Handles authentication with the BlueSky API.
"""
@staticmethod
async def authenticate() -> Optional[str]:
identifier = os.getenv("BSKY_IDENTIFIER") or input("Enter username: ").strip()
password = os.getenv("BSKY_PASSWORD") or getpass("Enter password: ").strip()
if not identifier or not password:
logger.error("Missing credentials.")
return None
auth_url = "https://bsky.social/xrpc/com.atproto.server.createSession"
async with aiohttp.ClientSession() as session:
try:
async with session.post(auth_url, json={"identifier": identifier, "password": password}) as response:
if response.status == 200:
data = await response.json()
logger.info("Authenticated successfully.")
return data.get("accessJwt")
logger.error("Authentication failed: %s", await response.text())
except Exception as e:
logger.error("Error during authentication: %s", e)
return None
async def parse_arguments() -> ArgumentParser:
parser = ArgumentParser(description="Fetch timelines from the BlueSky API.")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--list", type=Path, help="File containing a list of actor handles or DIDs. Per newline")
group.add_argument("--actor", type=str, help="Single actor handle or DID.")
parser.add_argument("--output", type=Path, default=Path("./timelines"), help="Directory to save timelines. for specific use cases")
return parser.parse_args()
async def main() -> None:
"""
Main function to fetch timelines.
"""
args = await parse_arguments()
output_dir = args.output.resolve()
output_dir.mkdir(parents=True, exist_ok=True)
auth_token = await Authenticator.authenticate()
if not auth_token:
exit(1)
client = BlueSkyClient(auth_token)
if args.list:
try:
actors = [line.strip() for line in args.list.read_text().splitlines() if line.strip()]
await client.fetch_multiple_timelines(actors, output_dir)
except Exception as e:
logger.error("Error reading actor list: %s", e)
else:
async with aiohttp.ClientSession() as session:
await client.fetch_and_save_timeline(args.actor.strip(), output_dir, session)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Program interrupted.")
except Exception as e:
logger.critical("Fatal error: %s", e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment