Created
November 1, 2025 22:02
-
-
Save mulderp/8c1f892aa41c32f3d4dbd79c87e508f6 to your computer and use it in GitHub Desktop.
pgn download
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import requests | |
| import sys | |
| import chess.pgn | |
| import io | |
| import os | |
| from collections import Counter | |
| import time | |
| import hashlib | |
| class ChessComAPI: | |
| """Handles all interactions with the Chess.com API.""" | |
| def __init__(self, user_agent: str): | |
| self.headers = {'User-Agent': user_agent} | |
| def fetch_games(self, username: str, year: int, month: int, for_recursive: bool = False) -> str | None: | |
| """Fetches games for a given player and month from Chess.com.""" | |
| url = f"https://api.chess.com/pub/player/{username}/games/{year}/{month:02d}/pgn" | |
| time.sleep(1.2) | |
| print(f"Attempting to fetch games from: {url}") | |
| try: | |
| response = requests.get(url, headers=self.headers, timeout=30) | |
| response.raise_for_status() | |
| return response.text | |
| except requests.exceptions.HTTPError as err: | |
| if err.response.status_code == 404: | |
| print(f"Info: No games found for {username} in {year}-{month:02d} (HTTP 404).") | |
| return None | |
| elif err.response.status_code == 403: | |
| print(f"CRITICAL Error: Access forbidden for {username} (HTTP 403). User-Agent or IP block likely. Script will exit.") | |
| sys.exit(1) | |
| elif err.response.status_code == 410: | |
| print(f"Info: Resource gone for {username} (HTTP 410). Data no longer available.") | |
| return None | |
| elif err.response.status_code == 429: | |
| print(f"CRITICAL Error: Rate limit hit while fetching for {username} (HTTP 429). Please wait significantly. Script will exit.") | |
| sys.exit(1) | |
| else: | |
| print(f"Error: HTTP error fetching games for {username}. Status {err.response.status_code}: {err}") | |
| return None if for_recursive else sys.exit(1) | |
| except requests.exceptions.RequestException as e: | |
| print(f"Warning: Network request problem for {username}: {e}") | |
| return None if for_recursive else sys.exit(1) | |
| class PGNProcessor: | |
| """Handles PGN data processing and file operations.""" | |
| @staticmethod | |
| def get_opponents(pgn_data: str, current_player: str) -> set[str]: | |
| """Extracts opponents from PGN data.""" | |
| opponents = set() | |
| pgn_io = io.StringIO(pgn_data) | |
| current_player_lower = current_player.lower() | |
| while True: | |
| try: | |
| game_headers = chess.pgn.read_headers(pgn_io) | |
| except Exception: | |
| break | |
| if game_headers is None: | |
| break | |
| white = game_headers.get("White", "UnknownPlayer") | |
| black = game_headers.get("Black", "UnknownPlayer") | |
| if white.lower() == current_player_lower and black != "UnknownPlayer": | |
| opponents.add(black) | |
| elif black.lower() == current_player_lower and white != "UnknownPlayer": | |
| opponents.add(white) | |
| return opponents | |
| @staticmethod | |
| def save_to_file(content: str, filename: str) -> bool: | |
| """Saves content to a file in the 'pgns' directory, ensuring the directory exists.""" | |
| try: | |
| os.makedirs("pgns", exist_ok=True) | |
| filepath = os.path.join("pgns", filename) | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| print(f"Games saved to {filepath}") | |
| return True | |
| except IOError as e: | |
| print(f"Error saving file {filepath}: {e}") | |
| return False | |
| @staticmethod | |
| def generate_unique_filename(base: str, extension: str = ".pgn") -> str: | |
| """Generates a unique filename in the 'pgns' directory by appending a counter if the file exists.""" | |
| counter = 1 | |
| filename = f"{base}{extension}" | |
| filepath = os.path.join("pgns", filename) | |
| while os.path.exists(filepath): | |
| filename = f"{base}_{counter}{extension}" | |
| filepath = os.path.join("pgns", filename) | |
| counter += 1 | |
| return filename | |
| @staticmethod | |
| def deduplicate_games(pgn_blocks: list[str]) -> str: | |
| """Deduplicates games across multiple PGN blocks.""" | |
| unique_game_hashes = set() | |
| final_pgn_content = "" | |
| for pgn_block in pgn_blocks: | |
| pgn_io = io.StringIO(pgn_block) | |
| while True: | |
| try: | |
| game = chess.pgn.read_game(pgn_io) | |
| except Exception: | |
| continue | |
| if game is None: | |
| break | |
| game_id = ( | |
| game.headers.get("Event", "") + | |
| game.headers.get("Site", "") + | |
| game.headers.get("Date", "") + | |
| game.headers.get("White", "") + | |
| game.headers.get("Black", "") + | |
| game.headers.get("Result", "") | |
| ) | |
| game_hash = hash(game_id) | |
| if game_hash not in unique_game_hashes: | |
| unique_game_hashes.add(game_hash) | |
| exporter = chess.pgn.StringExporter(headers=True, variations=True, comments=True) | |
| final_pgn_content += game.accept(exporter) + "\n\n" | |
| return final_pgn_content.strip() | |
| class Fetcher: | |
| """Handles recursive fetching of games and their opponents.""" | |
| def __init__(self, api: ChessComAPI, processor: PGNProcessor): | |
| self.api = api | |
| self.processor = processor | |
| def fetch_recursively(self, initial_username: str, year: int, month: int, max_depth: int, output_base: str, max_api_calls: int) -> None: | |
| """Fetches games recursively for a player and their opponents.""" | |
| processed_players = set() | |
| all_pgns = [] | |
| queue = [(initial_username, 0)] | |
| players_in_queue = {initial_username.lower()} | |
| fetch_api_call_count = 0 | |
| while queue and fetch_api_call_count < max_api_calls: | |
| current_username, current_depth = queue.pop(0) | |
| players_in_queue.remove(current_username.lower()) | |
| if current_username.lower() in processed_players or current_depth > max_depth: | |
| continue | |
| print(f"{'--' * current_depth}> Processing: {current_username} (Depth: {current_depth})") | |
| fetch_api_call_count += 1 | |
| pgn_data = self.api.fetch_games(current_username, year, month, for_recursive=True) | |
| processed_players.add(current_username.lower()) | |
| if pgn_data: | |
| all_pgns.append(pgn_data) | |
| if current_depth < max_depth: | |
| opponents = self.processor.get_opponents(pgn_data, current_username) | |
| for opponent in opponents: | |
| if opponent.lower() not in processed_players and opponent.lower() not in players_in_queue: | |
| queue.append((opponent, current_depth + 1)) | |
| players_in_queue.add(opponent.lower()) | |
| if all_pgns: | |
| safe_username = "".join(c if c.isalnum() else "_" for c in initial_username) | |
| base_filename = f"{output_base}_{safe_username}_d{max_depth}_{year}-{month:02d}" | |
| final_filename = self.processor.generate_unique_filename(base_filename) | |
| final_pgn_content = self.processor.deduplicate_games(all_pgns) | |
| self.processor.save_to_file(final_pgn_content, final_filename) | |
| print(f"\nOutput saved to pgns/{final_filename}") | |
| else: | |
| print("\nNo games were fetched.") | |
| class ChessAnalyzer: | |
| """Provides analysis functions for PGN files.""" | |
| @staticmethod | |
| def analyze_headers(pgn_data: str, player: str) -> None: | |
| """Prints game headers for a specific player.""" | |
| print(f"\n--- Analyzing Games for player: {player} ---") | |
| pgn_io = io.StringIO(pgn_data) | |
| game_num = 0 | |
| while True: | |
| try: | |
| game = chess.pgn.read_game(pgn_io) | |
| except Exception: | |
| continue | |
| if game is None: | |
| break | |
| game_num += 1 | |
| print(f"Game {game_num}: White: {game.headers.get('White', 'N/A')}, Black: {game.headers.get('Black', 'N/A')}, Result: {game.headers.get('Result', '*')}") | |
| if game_num == 0: | |
| print("No games found in PGN data.") | |
| @staticmethod | |
| def filter_games(input_file: str, player: str, output_file: str) -> None: | |
| """Filters games by player and saves to a new file in the 'pgns' directory.""" | |
| try: | |
| with open(os.path.join("pgns", input_file), 'r', encoding='utf-8') as f: | |
| pgn_data = f.read() | |
| except Exception as e: | |
| print(f"Error reading pgns/{input_file}: {e}") | |
| return | |
| pgn_io = io.StringIO(pgn_data) | |
| filtered_games = [] | |
| player_lower = player.lower() | |
| exporter = chess.pgn.StringExporter(headers=True, variations=True, comments=True) | |
| count = 0 | |
| while True: | |
| try: | |
| game = chess.pgn.read_game(pgn_io) | |
| except Exception: | |
| continue | |
| if game is None: | |
| break | |
| if player_lower in (game.headers.get("White", "").lower(), game.headers.get("Black", "").lower()): | |
| filtered_games.append(game.accept(exporter)) | |
| count += 1 | |
| if filtered_games: | |
| output_file = PGNProcessor.generate_unique_filename(output_file) | |
| PGNProcessor.save_to_file('\n\n'.join(filtered_games) + '\n', output_file) | |
| print(f"{count} games for '{player}' saved to pgns/{output_file}.") | |
| else: | |
| print(f"No games for '{player}' in pgns/{input_file}.") | |
| @staticmethod | |
| def list_openings(input_file: str) -> None: | |
| """Lists openings from a PGN file in the 'pgns' directory.""" | |
| try: | |
| with open(os.path.join("pgns", input_file), 'r', encoding='utf-8') as f: | |
| pgn_data = f.read() | |
| except Exception as e: | |
| print(f"Error reading pgns/{input_file}: {e}") | |
| return | |
| pgn_io = io.StringIO(pgn_data) | |
| openings = Counter() | |
| game_count = 0 | |
| while True: | |
| try: | |
| headers = chess.pgn.read_headers(pgn_io) | |
| except Exception: | |
| break | |
| if headers is None: | |
| break | |
| game_count += 1 | |
| openings[f"{headers.get('ECO', 'N/A')} | {headers.get('Opening', 'N/A')}"] += 1 | |
| if game_count: | |
| print(f"\n--- Openings Report for pgns/{input_file} ---") | |
| for opening, count in openings.most_common(20): | |
| print(f" {opening}: {count} game(s)") | |
| if len(openings) > 20: | |
| print(f" ... and {len(openings) - 20} other unique opening variations.") | |
| else: | |
| print("No games to analyze.") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Chess.com PGN fetcher and analysis tool.") | |
| parser.add_argument('--version', action='version', version='%(prog)s 1.3') | |
| subparsers = parser.add_subparsers(dest='command', required=True) | |
| fetch_parser = subparsers.add_parser("fetch-single", help="Fetch PGN for a single player/month.") | |
| fetch_parser.add_argument("username", type=str, help="Chess.com username") | |
| fetch_parser.add_argument("year", type=int, help="Year") | |
| fetch_parser.add_argument("month", type=int, choices=range(1, 13), help="Month (1-12)") | |
| fetch_parser.add_argument("--output", type=str, help="Output PGN file") | |
| recursive_parser = subparsers.add_parser("fetch-recursive", help="Recursively fetch PGNs for player & opponents.") | |
| recursive_parser.add_argument("username", type=str, help="Initial Chess.com username") | |
| recursive_parser.add_argument("year", type=int, help="Year") | |
| recursive_parser.add_argument("month", type=int, choices=range(1, 13), help="Month") | |
| recursive_parser.add_argument("max_depth", type=int, help="Max recursion depth") | |
| recursive_parser.add_argument("--output-base", type=str, default="games_archive", help="Base output filename") | |
| recursive_parser.add_argument("--max-calls", type=int, default=200, help="Max API calls") | |
| filter_parser = subparsers.add_parser("filter", help="Filter PGN file by player.") | |
| filter_parser.add_argument("input_file", type=str, help="Input PGN file") | |
| filter_parser.add_argument("player", type=str, help="Player name") | |
| filter_parser.add_argument("--output", type=str, default="filtered_games.pgn", help="Output PGN file") | |
| openings_parser = subparsers.add_parser("list-openings", help="List openings from PGN file.") | |
| openings_parser.add_argument("input_file", type=str, help="Input PGN file") | |
| args = parser.parse_args() | |
| api = ChessComAPI("ChessDataFetcher/1.3 (contact: [email protected]; purpose: chess game analysis script)") | |
| processor = PGNProcessor() | |
| analyzer = ChessAnalyzer() | |
| fetcher = Fetcher(api, processor) | |
| if args.command == "fetch-single": | |
| pgn_data = api.fetch_games(args.username, args.year, args.month, for_recursive=False) | |
| if pgn_data: | |
| output_file = args.output if args.output else f"{args.username}_{args.year}_{args.month:02d}.pgn" | |
| output_file = processor.generate_unique_filename(output_file) | |
| processor.save_to_file(pgn_data, output_file) | |
| elif args.command == "fetch-recursive": | |
| fetcher.fetch_recursively(args.username, args.year, args.month, args.max_depth, args.output_base, args.max_calls) | |
| elif args.command == "filter": | |
| analyzer.filter_games(args.input_file, args.player, args.output) | |
| elif args.command == "list-openings": | |
| analyzer.list_openings(args.input_file) | |
| if __name__ == "__main__": | |
| print("Reminder: Update the User-Agent in the script with valid contact details!") | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment