Last active
October 17, 2025 12:23
-
-
Save bitplane/40469ac881c386c1194e0b5063edf4e3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Download latest snapshots of all pages from Wayback Machine. | |
| """ | |
| import sys | |
| import argparse | |
| import requests | |
| import time | |
| import os | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| from datetime import datetime | |
| # ANSI color codes | |
| class Colors: | |
| RED = '\033[91m' | |
| GREEN = '\033[92m' | |
| YELLOW = '\033[93m' | |
| GREY = '\033[90m' | |
| RESET = '\033[0m' | |
| class ColoredFormatter(logging.Formatter): | |
| """Colored log formatter for TTY output.""" | |
| COLORS = { | |
| logging.DEBUG: Colors.GREY, | |
| logging.INFO: Colors.GREEN, | |
| logging.WARNING: Colors.YELLOW, | |
| logging.ERROR: Colors.RED, | |
| } | |
| def __init__(self, use_color=True): | |
| super().__init__( | |
| fmt='%(asctime)s [%(levelname)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| self.use_color = use_color | |
| def format(self, record): | |
| if self.use_color and record.levelno in self.COLORS: | |
| record.levelname = f"{self.COLORS[record.levelno]}{record.levelname}{Colors.RESET}" | |
| record.msg = f"{self.COLORS[record.levelno]}{record.msg}{Colors.RESET}" | |
| return super().format(record) | |
| def setup_logger(): | |
| """Setup logger with colored output if TTY.""" | |
| logger = logging.getLogger('wayback') | |
| logger.setLevel(logging.INFO) | |
| handler = logging.StreamHandler(sys.stdout) | |
| use_color = sys.stdout.isatty() | |
| handler.setFormatter(ColoredFormatter(use_color=use_color)) | |
| logger.addHandler(handler) | |
| return logger | |
| logger = setup_logger() | |
| def get_cache_filename(domain, before_date): | |
| """Generate cache filename based on domain and date.""" | |
| date_part = f"_{before_date}" if before_date else "" | |
| return f".wayback_cache_{domain.replace('/', '_')}{date_part}.json" | |
| def load_cache(cache_file): | |
| """Load snapshots from cache file.""" | |
| try: | |
| with open(cache_file, 'r') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Failed to load cache: {e}") | |
| return None | |
| def save_cache(cache_file, snapshots): | |
| """Save snapshots to cache file.""" | |
| try: | |
| with open(cache_file, 'w') as f: | |
| json.dump(snapshots, f) | |
| logger.info(f"Saved {len(snapshots)} snapshots to cache: {cache_file}") | |
| except Exception as e: | |
| logger.warning(f"Failed to save cache: {e}") | |
| def get_all_urls(domain, before_date=None, use_cache=True): | |
| """Get all archived URLs for a domain from Wayback CDX API.""" | |
| cache_file = get_cache_filename(domain, before_date) | |
| # Try to load from cache first | |
| if use_cache: | |
| cached = load_cache(cache_file) | |
| if cached is not None: | |
| logger.info(f"Loaded {len(cached)} snapshots from cache: {cache_file}") | |
| return cached | |
| logger.info(f"Fetching URL list for {domain}...") | |
| if before_date: | |
| logger.info(f"Only fetching snapshots before {before_date}") | |
| all_snapshots = [] | |
| page = 0 | |
| fetch_error = False | |
| while True: | |
| logger.info(f"Fetching page {page}...") | |
| # Use CDX API to get URLs | |
| cdx_url = f"https://web.archive.org/cdx/search/cdx" | |
| params = { | |
| 'url': f'{domain}/*', | |
| 'output': 'json', | |
| 'fl': 'original,timestamp,statuscode,mimetype', | |
| 'filter': 'statuscode:200', | |
| 'from': page * 10000, | |
| 'limit': 10000, | |
| } | |
| # Add date filter if specified | |
| if before_date: | |
| params['to'] = before_date | |
| try: | |
| response = requests.get(cdx_url, params=params, timeout=60) | |
| response.raise_for_status() | |
| data = response.json() | |
| # First row is headers on first page | |
| start_idx = 1 if page == 0 else 0 | |
| if len(data) <= start_idx: | |
| # No more results | |
| break | |
| new_results = data[start_idx:] | |
| all_snapshots.extend(new_results) | |
| logger.info(f" Got {len(new_results)} results (total: {len(all_snapshots)})") | |
| # If we got less than pageSize results, we're done | |
| if len(new_results) < 10000: | |
| break | |
| page += 1 | |
| time.sleep(1) # Be nice between pages | |
| except Exception as e: | |
| logger.error(f"Error fetching page {page}: {e}") | |
| logger.error("CDX fetch incomplete - will not cache partial results") | |
| fetch_error = True | |
| break | |
| # Only save to cache if we got all the data successfully | |
| if all_snapshots and not fetch_error: | |
| save_cache(cache_file, all_snapshots) | |
| elif fetch_error and all_snapshots: | |
| logger.warning(f"Not caching {len(all_snapshots)} snapshots due to fetch error") | |
| return all_snapshots | |
| def sanitize_path(path_str): | |
| """Sanitize path to be filesystem safe.""" | |
| path_str = path_str.replace('..', '_') | |
| # Truncate long components | |
| parts = path_str.split('/') | |
| sanitized_parts = [] | |
| for part in parts: | |
| if len(part.encode('utf-8')) > 200: | |
| if '.' in part: | |
| name, ext = part.rsplit('.', 1) | |
| name = name[:150] | |
| part = f"{name}.{ext}" | |
| else: | |
| part = part[:200] | |
| sanitized_parts.append(part) | |
| return '/'.join(sanitized_parts) | |
| def download_snapshot(url, timestamp, output_dir, retries=3): | |
| """Download a specific snapshot from Wayback Machine.""" | |
| wayback_url = f"https://web.archive.org/web/{timestamp}id_/{url}" | |
| # Create file path from URL | |
| parsed = urlparse(url) | |
| path = parsed.path.lstrip('/') | |
| if not path or path.endswith('/'): | |
| path = (path.rstrip('/') + '/index.html').lstrip('/') | |
| # Handle query strings - append to filename | |
| if parsed.query: | |
| # Sanitize query string for filename | |
| query_safe = parsed.query[:100].replace('/', '_').replace('?', '_') | |
| if '.' in path.split('/')[-1]: # Check if last component has extension | |
| name, ext = path.rsplit('.', 1) | |
| path = f"{name}_{query_safe}.{ext}" | |
| else: | |
| path = f"{path}_{query_safe}.html" | |
| else: | |
| # If path doesn't end with / and last component has no extension, add .html | |
| # This prevents file/directory conflicts | |
| last_component = path.split('/')[-1] | |
| if last_component and '.' not in last_component: | |
| path = f"{path}.html" | |
| path = sanitize_path(path) | |
| file_path = output_dir / parsed.netloc / path | |
| # If file exists, just update timestamp and skip download | |
| if file_path.exists(): | |
| try: | |
| dt = datetime.strptime(timestamp, '%Y%m%d%H%M%S') | |
| mtime = dt.timestamp() | |
| os.utime(file_path, (mtime, mtime)) | |
| except Exception: | |
| pass | |
| return False, "exists" | |
| # Create parent dirs | |
| try: | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| except OSError as e: | |
| return False, f"mkdir_error: {e}" | |
| # Download with retries | |
| session = requests.Session() | |
| session.headers.update({'User-Agent': 'Mozilla/5.0 (compatible; archive-downloader/1.0)'}) | |
| last_error = "unknown" | |
| max_attempts = max(1, retries) # At least one attempt | |
| for attempt in range(max_attempts): | |
| try: | |
| response = session.get(wayback_url, timeout=30) | |
| if response.status_code == 200: | |
| file_path.write_bytes(response.content) | |
| # Set file modification time to archive timestamp | |
| # timestamp format: YYYYMMDDhhmmss | |
| try: | |
| dt = datetime.strptime(timestamp, '%Y%m%d%H%M%S') | |
| mtime = dt.timestamp() | |
| os.utime(file_path, (mtime, mtime)) | |
| except Exception: | |
| pass # If timestamp parsing fails, just skip setting mtime | |
| # Return with attempt number (0-based, so add 1) | |
| return True, f"ok_attempt_{attempt + 1}" | |
| elif response.status_code == 429: # Too many requests | |
| last_error = f"http_429_rate_limited" | |
| if attempt < max_attempts - 1: | |
| time.sleep(5) | |
| continue | |
| else: | |
| last_error = f"http_{response.status_code}" | |
| if attempt < max_attempts - 1: | |
| time.sleep(2) | |
| continue | |
| except requests.exceptions.Timeout: | |
| last_error = "timeout" | |
| if attempt < max_attempts - 1: | |
| time.sleep(2) | |
| continue | |
| except requests.exceptions.ConnectionError as e: | |
| # Connection errors (refused, pool exhausted, etc) | |
| error_str = str(e) | |
| if "Connection refused" in error_str: | |
| last_error = "connection_refused" | |
| elif "Max retries exceeded" in error_str: | |
| last_error = "connection_max_retries" | |
| else: | |
| last_error = f"connection_error: {error_str[:100]}" | |
| if attempt < max_attempts - 1: | |
| time.sleep(2) | |
| continue | |
| except Exception as e: | |
| last_error = f"exception: {str(e)[:150]}" | |
| if attempt < max_attempts - 1: | |
| time.sleep(2) | |
| continue | |
| return False, last_error | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='Download latest snapshots of all pages from Wayback Machine.', | |
| formatter_class=argparse.RawDescriptionHelpFormatter | |
| ) | |
| parser.add_argument('domain', help='Domain to download (e.g., www.crichq.com)') | |
| parser.add_argument('output_dir', nargs='?', default='.', help='Output directory (default: current directory)') | |
| parser.add_argument('before_date', nargs='?', help='Only download snapshots before this date (YYYYMMDD format, e.g., 20240801)') | |
| parser.add_argument('--retries', type=int, default=3, help='Number of retry attempts (default: 3)') | |
| parser.add_argument('--sleep', type=float, default=1.0, help='Sleep time in seconds between requests (default: 1.0)') | |
| parser.add_argument('--no-cache', action='store_true', help='Ignore cache and fetch fresh data from CDX API') | |
| args = parser.parse_args() | |
| # Validate date format if provided | |
| if args.before_date and (len(args.before_date) != 8 or not args.before_date.isdigit()): | |
| parser.error("Date must be in YYYYMMDD format (e.g., 20240801)") | |
| output_dir = Path(args.output_dir) | |
| # Get all URLs | |
| snapshots = get_all_urls(args.domain, args.before_date, use_cache=not args.no_cache) | |
| if not snapshots: | |
| logger.error("No snapshots found!") | |
| sys.exit(1) | |
| logger.info(f"Found {len(snapshots)} total snapshots") | |
| # Group by URL to get latest timestamp | |
| url_latest = {} | |
| for item in snapshots: | |
| if len(item) < 4: | |
| continue | |
| original, timestamp, statuscode, mimetype = item[0], item[1], item[2], item[3] | |
| # Skip mailto: and other non-http(s) URLs | |
| if not original.startswith('http://') and not original.startswith('https://'): | |
| continue | |
| if original not in url_latest or timestamp > url_latest[original][0]: | |
| url_latest[original] = (timestamp, mimetype) | |
| logger.info(f"Found {len(url_latest)} unique URLs (will download latest snapshot of each)") | |
| downloaded = 0 | |
| skipped = 0 | |
| errors = 0 | |
| for i, (url, (timestamp, mimetype)) in enumerate(url_latest.items(), 1): | |
| success, msg = download_snapshot(url, timestamp, output_dir, args.retries) | |
| if success: | |
| downloaded += 1 | |
| logger.info(f"[{i}/{len(url_latest)}] Downloaded ({msg}): {url}") | |
| elif msg == "exists": | |
| skipped += 1 | |
| if skipped % 100 == 0: | |
| logger.info(f"[{i}/{len(url_latest)}] Progress: {downloaded} new, {skipped} skipped, {errors} errors") | |
| else: | |
| errors += 1 | |
| logger.error(f"[{i}/{len(url_latest)}] Error ({msg}): {url}") | |
| # Rate limiting - be nice to archive.org | |
| # Sleep after successes and errors, but not for skipped files | |
| if msg != "exists": | |
| time.sleep(args.sleep) | |
| logger.info(f"Done! Downloaded: {downloaded}, Skipped: {skipped}, Errors: {errors}") | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment