Skip to content

Instantly share code, notes, and snippets.

@bitplane
Last active October 17, 2025 12:23
Show Gist options
  • Select an option

  • Save bitplane/40469ac881c386c1194e0b5063edf4e3 to your computer and use it in GitHub Desktop.

Select an option

Save bitplane/40469ac881c386c1194e0b5063edf4e3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Download latest snapshots of all pages from Wayback Machine.
"""
import sys
import argparse
import requests
import time
import os
import json
import logging
from pathlib import Path
from urllib.parse import urlparse
from datetime import datetime
# ANSI color codes
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
GREY = '\033[90m'
RESET = '\033[0m'
class ColoredFormatter(logging.Formatter):
"""Colored log formatter for TTY output."""
COLORS = {
logging.DEBUG: Colors.GREY,
logging.INFO: Colors.GREEN,
logging.WARNING: Colors.YELLOW,
logging.ERROR: Colors.RED,
}
def __init__(self, use_color=True):
super().__init__(
fmt='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.use_color = use_color
def format(self, record):
if self.use_color and record.levelno in self.COLORS:
record.levelname = f"{self.COLORS[record.levelno]}{record.levelname}{Colors.RESET}"
record.msg = f"{self.COLORS[record.levelno]}{record.msg}{Colors.RESET}"
return super().format(record)
def setup_logger():
"""Setup logger with colored output if TTY."""
logger = logging.getLogger('wayback')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
use_color = sys.stdout.isatty()
handler.setFormatter(ColoredFormatter(use_color=use_color))
logger.addHandler(handler)
return logger
logger = setup_logger()
def get_cache_filename(domain, before_date):
"""Generate cache filename based on domain and date."""
date_part = f"_{before_date}" if before_date else ""
return f".wayback_cache_{domain.replace('/', '_')}{date_part}.json"
def load_cache(cache_file):
"""Load snapshots from cache file."""
try:
with open(cache_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
except Exception as e:
logger.warning(f"Failed to load cache: {e}")
return None
def save_cache(cache_file, snapshots):
"""Save snapshots to cache file."""
try:
with open(cache_file, 'w') as f:
json.dump(snapshots, f)
logger.info(f"Saved {len(snapshots)} snapshots to cache: {cache_file}")
except Exception as e:
logger.warning(f"Failed to save cache: {e}")
def get_all_urls(domain, before_date=None, use_cache=True):
"""Get all archived URLs for a domain from Wayback CDX API."""
cache_file = get_cache_filename(domain, before_date)
# Try to load from cache first
if use_cache:
cached = load_cache(cache_file)
if cached is not None:
logger.info(f"Loaded {len(cached)} snapshots from cache: {cache_file}")
return cached
logger.info(f"Fetching URL list for {domain}...")
if before_date:
logger.info(f"Only fetching snapshots before {before_date}")
all_snapshots = []
page = 0
fetch_error = False
while True:
logger.info(f"Fetching page {page}...")
# Use CDX API to get URLs
cdx_url = f"https://web.archive.org/cdx/search/cdx"
params = {
'url': f'{domain}/*',
'output': 'json',
'fl': 'original,timestamp,statuscode,mimetype',
'filter': 'statuscode:200',
'from': page * 10000,
'limit': 10000,
}
# Add date filter if specified
if before_date:
params['to'] = before_date
try:
response = requests.get(cdx_url, params=params, timeout=60)
response.raise_for_status()
data = response.json()
# First row is headers on first page
start_idx = 1 if page == 0 else 0
if len(data) <= start_idx:
# No more results
break
new_results = data[start_idx:]
all_snapshots.extend(new_results)
logger.info(f" Got {len(new_results)} results (total: {len(all_snapshots)})")
# If we got less than pageSize results, we're done
if len(new_results) < 10000:
break
page += 1
time.sleep(1) # Be nice between pages
except Exception as e:
logger.error(f"Error fetching page {page}: {e}")
logger.error("CDX fetch incomplete - will not cache partial results")
fetch_error = True
break
# Only save to cache if we got all the data successfully
if all_snapshots and not fetch_error:
save_cache(cache_file, all_snapshots)
elif fetch_error and all_snapshots:
logger.warning(f"Not caching {len(all_snapshots)} snapshots due to fetch error")
return all_snapshots
def sanitize_path(path_str):
"""Sanitize path to be filesystem safe."""
path_str = path_str.replace('..', '_')
# Truncate long components
parts = path_str.split('/')
sanitized_parts = []
for part in parts:
if len(part.encode('utf-8')) > 200:
if '.' in part:
name, ext = part.rsplit('.', 1)
name = name[:150]
part = f"{name}.{ext}"
else:
part = part[:200]
sanitized_parts.append(part)
return '/'.join(sanitized_parts)
def download_snapshot(url, timestamp, output_dir, retries=3):
"""Download a specific snapshot from Wayback Machine."""
wayback_url = f"https://web.archive.org/web/{timestamp}id_/{url}"
# Create file path from URL
parsed = urlparse(url)
path = parsed.path.lstrip('/')
if not path or path.endswith('/'):
path = (path.rstrip('/') + '/index.html').lstrip('/')
# Handle query strings - append to filename
if parsed.query:
# Sanitize query string for filename
query_safe = parsed.query[:100].replace('/', '_').replace('?', '_')
if '.' in path.split('/')[-1]: # Check if last component has extension
name, ext = path.rsplit('.', 1)
path = f"{name}_{query_safe}.{ext}"
else:
path = f"{path}_{query_safe}.html"
else:
# If path doesn't end with / and last component has no extension, add .html
# This prevents file/directory conflicts
last_component = path.split('/')[-1]
if last_component and '.' not in last_component:
path = f"{path}.html"
path = sanitize_path(path)
file_path = output_dir / parsed.netloc / path
# If file exists, just update timestamp and skip download
if file_path.exists():
try:
dt = datetime.strptime(timestamp, '%Y%m%d%H%M%S')
mtime = dt.timestamp()
os.utime(file_path, (mtime, mtime))
except Exception:
pass
return False, "exists"
# Create parent dirs
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
except OSError as e:
return False, f"mkdir_error: {e}"
# Download with retries
session = requests.Session()
session.headers.update({'User-Agent': 'Mozilla/5.0 (compatible; archive-downloader/1.0)'})
last_error = "unknown"
max_attempts = max(1, retries) # At least one attempt
for attempt in range(max_attempts):
try:
response = session.get(wayback_url, timeout=30)
if response.status_code == 200:
file_path.write_bytes(response.content)
# Set file modification time to archive timestamp
# timestamp format: YYYYMMDDhhmmss
try:
dt = datetime.strptime(timestamp, '%Y%m%d%H%M%S')
mtime = dt.timestamp()
os.utime(file_path, (mtime, mtime))
except Exception:
pass # If timestamp parsing fails, just skip setting mtime
# Return with attempt number (0-based, so add 1)
return True, f"ok_attempt_{attempt + 1}"
elif response.status_code == 429: # Too many requests
last_error = f"http_429_rate_limited"
if attempt < max_attempts - 1:
time.sleep(5)
continue
else:
last_error = f"http_{response.status_code}"
if attempt < max_attempts - 1:
time.sleep(2)
continue
except requests.exceptions.Timeout:
last_error = "timeout"
if attempt < max_attempts - 1:
time.sleep(2)
continue
except requests.exceptions.ConnectionError as e:
# Connection errors (refused, pool exhausted, etc)
error_str = str(e)
if "Connection refused" in error_str:
last_error = "connection_refused"
elif "Max retries exceeded" in error_str:
last_error = "connection_max_retries"
else:
last_error = f"connection_error: {error_str[:100]}"
if attempt < max_attempts - 1:
time.sleep(2)
continue
except Exception as e:
last_error = f"exception: {str(e)[:150]}"
if attempt < max_attempts - 1:
time.sleep(2)
continue
return False, last_error
def main():
parser = argparse.ArgumentParser(
description='Download latest snapshots of all pages from Wayback Machine.',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('domain', help='Domain to download (e.g., www.crichq.com)')
parser.add_argument('output_dir', nargs='?', default='.', help='Output directory (default: current directory)')
parser.add_argument('before_date', nargs='?', help='Only download snapshots before this date (YYYYMMDD format, e.g., 20240801)')
parser.add_argument('--retries', type=int, default=3, help='Number of retry attempts (default: 3)')
parser.add_argument('--sleep', type=float, default=1.0, help='Sleep time in seconds between requests (default: 1.0)')
parser.add_argument('--no-cache', action='store_true', help='Ignore cache and fetch fresh data from CDX API')
args = parser.parse_args()
# Validate date format if provided
if args.before_date and (len(args.before_date) != 8 or not args.before_date.isdigit()):
parser.error("Date must be in YYYYMMDD format (e.g., 20240801)")
output_dir = Path(args.output_dir)
# Get all URLs
snapshots = get_all_urls(args.domain, args.before_date, use_cache=not args.no_cache)
if not snapshots:
logger.error("No snapshots found!")
sys.exit(1)
logger.info(f"Found {len(snapshots)} total snapshots")
# Group by URL to get latest timestamp
url_latest = {}
for item in snapshots:
if len(item) < 4:
continue
original, timestamp, statuscode, mimetype = item[0], item[1], item[2], item[3]
# Skip mailto: and other non-http(s) URLs
if not original.startswith('http://') and not original.startswith('https://'):
continue
if original not in url_latest or timestamp > url_latest[original][0]:
url_latest[original] = (timestamp, mimetype)
logger.info(f"Found {len(url_latest)} unique URLs (will download latest snapshot of each)")
downloaded = 0
skipped = 0
errors = 0
for i, (url, (timestamp, mimetype)) in enumerate(url_latest.items(), 1):
success, msg = download_snapshot(url, timestamp, output_dir, args.retries)
if success:
downloaded += 1
logger.info(f"[{i}/{len(url_latest)}] Downloaded ({msg}): {url}")
elif msg == "exists":
skipped += 1
if skipped % 100 == 0:
logger.info(f"[{i}/{len(url_latest)}] Progress: {downloaded} new, {skipped} skipped, {errors} errors")
else:
errors += 1
logger.error(f"[{i}/{len(url_latest)}] Error ({msg}): {url}")
# Rate limiting - be nice to archive.org
# Sleep after successes and errors, but not for skipped files
if msg != "exists":
time.sleep(args.sleep)
logger.info(f"Done! Downloaded: {downloaded}, Skipped: {skipped}, Errors: {errors}")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment