Skip to content

Instantly share code, notes, and snippets.

@mulderp
Created November 1, 2025 22:02
Show Gist options
  • Select an option

  • Save mulderp/8c1f892aa41c32f3d4dbd79c87e508f6 to your computer and use it in GitHub Desktop.

Select an option

Save mulderp/8c1f892aa41c32f3d4dbd79c87e508f6 to your computer and use it in GitHub Desktop.
pgn download
#!/usr/bin/env python3
import argparse
import requests
import sys
import chess.pgn
import io
import os
from collections import Counter
import time
import hashlib
class ChessComAPI:
"""Handles all interactions with the Chess.com API."""
def __init__(self, user_agent: str):
self.headers = {'User-Agent': user_agent}
def fetch_games(self, username: str, year: int, month: int, for_recursive: bool = False) -> str | None:
"""Fetches games for a given player and month from Chess.com."""
url = f"https://api.chess.com/pub/player/{username}/games/{year}/{month:02d}/pgn"
time.sleep(1.2)
print(f"Attempting to fetch games from: {url}")
try:
response = requests.get(url, headers=self.headers, timeout=30)
response.raise_for_status()
return response.text
except requests.exceptions.HTTPError as err:
if err.response.status_code == 404:
print(f"Info: No games found for {username} in {year}-{month:02d} (HTTP 404).")
return None
elif err.response.status_code == 403:
print(f"CRITICAL Error: Access forbidden for {username} (HTTP 403). User-Agent or IP block likely. Script will exit.")
sys.exit(1)
elif err.response.status_code == 410:
print(f"Info: Resource gone for {username} (HTTP 410). Data no longer available.")
return None
elif err.response.status_code == 429:
print(f"CRITICAL Error: Rate limit hit while fetching for {username} (HTTP 429). Please wait significantly. Script will exit.")
sys.exit(1)
else:
print(f"Error: HTTP error fetching games for {username}. Status {err.response.status_code}: {err}")
return None if for_recursive else sys.exit(1)
except requests.exceptions.RequestException as e:
print(f"Warning: Network request problem for {username}: {e}")
return None if for_recursive else sys.exit(1)
class PGNProcessor:
"""Handles PGN data processing and file operations."""
@staticmethod
def get_opponents(pgn_data: str, current_player: str) -> set[str]:
"""Extracts opponents from PGN data."""
opponents = set()
pgn_io = io.StringIO(pgn_data)
current_player_lower = current_player.lower()
while True:
try:
game_headers = chess.pgn.read_headers(pgn_io)
except Exception:
break
if game_headers is None:
break
white = game_headers.get("White", "UnknownPlayer")
black = game_headers.get("Black", "UnknownPlayer")
if white.lower() == current_player_lower and black != "UnknownPlayer":
opponents.add(black)
elif black.lower() == current_player_lower and white != "UnknownPlayer":
opponents.add(white)
return opponents
@staticmethod
def save_to_file(content: str, filename: str) -> bool:
"""Saves content to a file in the 'pgns' directory, ensuring the directory exists."""
try:
os.makedirs("pgns", exist_ok=True)
filepath = os.path.join("pgns", filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
print(f"Games saved to {filepath}")
return True
except IOError as e:
print(f"Error saving file {filepath}: {e}")
return False
@staticmethod
def generate_unique_filename(base: str, extension: str = ".pgn") -> str:
"""Generates a unique filename in the 'pgns' directory by appending a counter if the file exists."""
counter = 1
filename = f"{base}{extension}"
filepath = os.path.join("pgns", filename)
while os.path.exists(filepath):
filename = f"{base}_{counter}{extension}"
filepath = os.path.join("pgns", filename)
counter += 1
return filename
@staticmethod
def deduplicate_games(pgn_blocks: list[str]) -> str:
"""Deduplicates games across multiple PGN blocks."""
unique_game_hashes = set()
final_pgn_content = ""
for pgn_block in pgn_blocks:
pgn_io = io.StringIO(pgn_block)
while True:
try:
game = chess.pgn.read_game(pgn_io)
except Exception:
continue
if game is None:
break
game_id = (
game.headers.get("Event", "") +
game.headers.get("Site", "") +
game.headers.get("Date", "") +
game.headers.get("White", "") +
game.headers.get("Black", "") +
game.headers.get("Result", "")
)
game_hash = hash(game_id)
if game_hash not in unique_game_hashes:
unique_game_hashes.add(game_hash)
exporter = chess.pgn.StringExporter(headers=True, variations=True, comments=True)
final_pgn_content += game.accept(exporter) + "\n\n"
return final_pgn_content.strip()
class Fetcher:
"""Handles recursive fetching of games and their opponents."""
def __init__(self, api: ChessComAPI, processor: PGNProcessor):
self.api = api
self.processor = processor
def fetch_recursively(self, initial_username: str, year: int, month: int, max_depth: int, output_base: str, max_api_calls: int) -> None:
"""Fetches games recursively for a player and their opponents."""
processed_players = set()
all_pgns = []
queue = [(initial_username, 0)]
players_in_queue = {initial_username.lower()}
fetch_api_call_count = 0
while queue and fetch_api_call_count < max_api_calls:
current_username, current_depth = queue.pop(0)
players_in_queue.remove(current_username.lower())
if current_username.lower() in processed_players or current_depth > max_depth:
continue
print(f"{'--' * current_depth}> Processing: {current_username} (Depth: {current_depth})")
fetch_api_call_count += 1
pgn_data = self.api.fetch_games(current_username, year, month, for_recursive=True)
processed_players.add(current_username.lower())
if pgn_data:
all_pgns.append(pgn_data)
if current_depth < max_depth:
opponents = self.processor.get_opponents(pgn_data, current_username)
for opponent in opponents:
if opponent.lower() not in processed_players and opponent.lower() not in players_in_queue:
queue.append((opponent, current_depth + 1))
players_in_queue.add(opponent.lower())
if all_pgns:
safe_username = "".join(c if c.isalnum() else "_" for c in initial_username)
base_filename = f"{output_base}_{safe_username}_d{max_depth}_{year}-{month:02d}"
final_filename = self.processor.generate_unique_filename(base_filename)
final_pgn_content = self.processor.deduplicate_games(all_pgns)
self.processor.save_to_file(final_pgn_content, final_filename)
print(f"\nOutput saved to pgns/{final_filename}")
else:
print("\nNo games were fetched.")
class ChessAnalyzer:
"""Provides analysis functions for PGN files."""
@staticmethod
def analyze_headers(pgn_data: str, player: str) -> None:
"""Prints game headers for a specific player."""
print(f"\n--- Analyzing Games for player: {player} ---")
pgn_io = io.StringIO(pgn_data)
game_num = 0
while True:
try:
game = chess.pgn.read_game(pgn_io)
except Exception:
continue
if game is None:
break
game_num += 1
print(f"Game {game_num}: White: {game.headers.get('White', 'N/A')}, Black: {game.headers.get('Black', 'N/A')}, Result: {game.headers.get('Result', '*')}")
if game_num == 0:
print("No games found in PGN data.")
@staticmethod
def filter_games(input_file: str, player: str, output_file: str) -> None:
"""Filters games by player and saves to a new file in the 'pgns' directory."""
try:
with open(os.path.join("pgns", input_file), 'r', encoding='utf-8') as f:
pgn_data = f.read()
except Exception as e:
print(f"Error reading pgns/{input_file}: {e}")
return
pgn_io = io.StringIO(pgn_data)
filtered_games = []
player_lower = player.lower()
exporter = chess.pgn.StringExporter(headers=True, variations=True, comments=True)
count = 0
while True:
try:
game = chess.pgn.read_game(pgn_io)
except Exception:
continue
if game is None:
break
if player_lower in (game.headers.get("White", "").lower(), game.headers.get("Black", "").lower()):
filtered_games.append(game.accept(exporter))
count += 1
if filtered_games:
output_file = PGNProcessor.generate_unique_filename(output_file)
PGNProcessor.save_to_file('\n\n'.join(filtered_games) + '\n', output_file)
print(f"{count} games for '{player}' saved to pgns/{output_file}.")
else:
print(f"No games for '{player}' in pgns/{input_file}.")
@staticmethod
def list_openings(input_file: str) -> None:
"""Lists openings from a PGN file in the 'pgns' directory."""
try:
with open(os.path.join("pgns", input_file), 'r', encoding='utf-8') as f:
pgn_data = f.read()
except Exception as e:
print(f"Error reading pgns/{input_file}: {e}")
return
pgn_io = io.StringIO(pgn_data)
openings = Counter()
game_count = 0
while True:
try:
headers = chess.pgn.read_headers(pgn_io)
except Exception:
break
if headers is None:
break
game_count += 1
openings[f"{headers.get('ECO', 'N/A')} | {headers.get('Opening', 'N/A')}"] += 1
if game_count:
print(f"\n--- Openings Report for pgns/{input_file} ---")
for opening, count in openings.most_common(20):
print(f" {opening}: {count} game(s)")
if len(openings) > 20:
print(f" ... and {len(openings) - 20} other unique opening variations.")
else:
print("No games to analyze.")
def main():
parser = argparse.ArgumentParser(description="Chess.com PGN fetcher and analysis tool.")
parser.add_argument('--version', action='version', version='%(prog)s 1.3')
subparsers = parser.add_subparsers(dest='command', required=True)
fetch_parser = subparsers.add_parser("fetch-single", help="Fetch PGN for a single player/month.")
fetch_parser.add_argument("username", type=str, help="Chess.com username")
fetch_parser.add_argument("year", type=int, help="Year")
fetch_parser.add_argument("month", type=int, choices=range(1, 13), help="Month (1-12)")
fetch_parser.add_argument("--output", type=str, help="Output PGN file")
recursive_parser = subparsers.add_parser("fetch-recursive", help="Recursively fetch PGNs for player & opponents.")
recursive_parser.add_argument("username", type=str, help="Initial Chess.com username")
recursive_parser.add_argument("year", type=int, help="Year")
recursive_parser.add_argument("month", type=int, choices=range(1, 13), help="Month")
recursive_parser.add_argument("max_depth", type=int, help="Max recursion depth")
recursive_parser.add_argument("--output-base", type=str, default="games_archive", help="Base output filename")
recursive_parser.add_argument("--max-calls", type=int, default=200, help="Max API calls")
filter_parser = subparsers.add_parser("filter", help="Filter PGN file by player.")
filter_parser.add_argument("input_file", type=str, help="Input PGN file")
filter_parser.add_argument("player", type=str, help="Player name")
filter_parser.add_argument("--output", type=str, default="filtered_games.pgn", help="Output PGN file")
openings_parser = subparsers.add_parser("list-openings", help="List openings from PGN file.")
openings_parser.add_argument("input_file", type=str, help="Input PGN file")
args = parser.parse_args()
api = ChessComAPI("ChessDataFetcher/1.3 (contact: [email protected]; purpose: chess game analysis script)")
processor = PGNProcessor()
analyzer = ChessAnalyzer()
fetcher = Fetcher(api, processor)
if args.command == "fetch-single":
pgn_data = api.fetch_games(args.username, args.year, args.month, for_recursive=False)
if pgn_data:
output_file = args.output if args.output else f"{args.username}_{args.year}_{args.month:02d}.pgn"
output_file = processor.generate_unique_filename(output_file)
processor.save_to_file(pgn_data, output_file)
elif args.command == "fetch-recursive":
fetcher.fetch_recursively(args.username, args.year, args.month, args.max_depth, args.output_base, args.max_calls)
elif args.command == "filter":
analyzer.filter_games(args.input_file, args.player, args.output)
elif args.command == "list-openings":
analyzer.list_openings(args.input_file)
if __name__ == "__main__":
print("Reminder: Update the User-Agent in the script with valid contact details!")
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment