Created
September 15, 2025 19:17
-
-
Save charliermarsh/c213699438b6ddf7dbd0b53c2d94d6ce to your computer and use it in GitHub Desktop.
Download files from a Simple API-compatible index.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.13" | |
| # dependencies = [ | |
| # "aiofiles>=24.1.0", | |
| # "aiohttp>=3.11.11", | |
| # "beautifulsoup4>=4.13.3", | |
| # "rich>=13.9.5", | |
| # ] | |
| # /// | |
| """Download all packages from a Simple API-compatible index. | |
| This script downloads all packages and their files from a Simple API index | |
| (PEP 503) to a local directory. Files that already exist are skipped. | |
| Example usage: | |
| uv run download-files.py --username $USERNAME --password $PASSWORD https://private.pypi.org/simple/ ./downloads/ | |
| """ | |
| import argparse | |
| import asyncio | |
| import os | |
| from pathlib import Path | |
| from typing import cast | |
| from urllib.parse import urljoin, urlparse | |
| import aiofiles | |
| import aiohttp | |
| from bs4 import BeautifulSoup, Tag | |
| from rich.console import Console | |
| from rich.progress import Progress, TaskID | |
| console = Console() | |
| async def fetch_html(session: aiohttp.ClientSession, url: str) -> str: | |
| """Fetch HTML content from a URL.""" | |
| max_attempts = 5 | |
| http_err = None | |
| for attempt in range(1, max_attempts + 1): | |
| try: | |
| async with session.get(url) as response: | |
| if response.status == 429: | |
| wait_time = int(response.headers.get("Retry-After", 30)) | |
| await asyncio.sleep(wait_time) | |
| continue | |
| response.raise_for_status() | |
| return await response.text() | |
| except aiohttp.ClientError as err: | |
| if attempt < max_attempts: | |
| await asyncio.sleep(2**attempt) | |
| http_err = err | |
| raise Exception( | |
| f"Failed to fetch HTML from `{url}` after {max_attempts} attempts" | |
| ) from http_err | |
| async def download_file( | |
| session: aiohttp.ClientSession, | |
| url: str, | |
| target_path: Path, | |
| *, | |
| progress: Progress, | |
| task_id: TaskID, | |
| ) -> bool: | |
| """Download a file if it doesn't already exist. | |
| Returns True if the file was downloaded, False if skipped. | |
| """ | |
| if target_path.exists(): | |
| progress.advance(task_id) | |
| return False | |
| max_attempts = 5 | |
| http_err = None | |
| for attempt in range(1, max_attempts + 1): | |
| try: | |
| async with session.get(url) as response: | |
| if response.status == 429: | |
| wait_time = int(response.headers.get("Retry-After", 30)) | |
| await asyncio.sleep(wait_time) | |
| continue | |
| response.raise_for_status() | |
| # Create parent directories if they don't exist. | |
| target_path.parent.mkdir(parents=True, exist_ok=True) | |
| async with aiofiles.open(target_path, "wb") as file: | |
| async for chunk in response.content.iter_chunked(8192): | |
| await file.write(chunk) | |
| progress.advance(task_id) | |
| return True | |
| except aiohttp.ClientError as err: | |
| if attempt < max_attempts: | |
| await asyncio.sleep(2**attempt) | |
| http_err = err | |
| raise Exception( | |
| f"Failed to download `{url}` after {max_attempts} attempts" | |
| ) from http_err | |
| async def get_package_files( | |
| session: aiohttp.ClientSession, | |
| package_url: str, | |
| *, | |
| progress: Progress, | |
| task_id: TaskID, | |
| ) -> list[str]: | |
| """Get list of file URLs for a package from its Simple API page.""" | |
| html = await fetch_html(session, package_url) | |
| soup = BeautifulSoup(html, "html.parser") | |
| file_urls = [] | |
| for link in soup.find_all("a"): | |
| href = cast(Tag, link).get("href") | |
| assert isinstance(href, str), "Expected a `str`" | |
| if href: | |
| # Handle relative URLs. | |
| file_url = urljoin(package_url, href) | |
| file_urls.append(file_url) | |
| progress.advance(task_id) | |
| return file_urls | |
| async def get_packages( | |
| session: aiohttp.ClientSession, | |
| index_url: str, | |
| ) -> list[str]: | |
| """Get list of package names from the Simple API index.""" | |
| html = await fetch_html(session, index_url) | |
| soup = BeautifulSoup(html, "html.parser") | |
| packages = [] | |
| for link in soup.find_all("a"): | |
| href = cast(Tag, link).get("href") | |
| assert isinstance(href, str), "Expected a `str`" | |
| if href: | |
| # Package URL should be relative to the index. | |
| package_url = urljoin(index_url, href) | |
| packages.append(package_url) | |
| return packages | |
| async def export_index( | |
| index_url: str, | |
| target_dir: Path, | |
| max_concurrent: int = 10, | |
| username: str | None = None, | |
| password: str | None = None, | |
| ): | |
| """Download all packages from a Simple API index.""" | |
| target_dir.mkdir(parents=True, exist_ok=True) | |
| # Ensure index URL ends with a slash for proper URL joining. | |
| if not index_url.endswith("/"): | |
| index_url += "/" | |
| semaphore = asyncio.Semaphore(max_concurrent) | |
| # If username or password are provided, create an authentication object. | |
| auth = None | |
| if password: | |
| auth = aiohttp.BasicAuth(username or "__token__", password) | |
| async with aiohttp.ClientSession(auth=auth) as session: | |
| console.print(f"[blue]Fetching package list from {index_url}[/blue]") | |
| try: | |
| package_urls = await get_packages(session, index_url) | |
| except Exception as err: | |
| console.print(f"[red]Error fetching package list: {err}[/red]") | |
| return | |
| console.print(f"[blue]Found {len(package_urls)} packages[/blue]") | |
| if not package_urls: | |
| console.print("[yellow]No packages found in index[/yellow]") | |
| return | |
| # Collect all file URLs first. | |
| all_files = [] | |
| with Progress() as progress: | |
| task_id = progress.add_task( | |
| "[blue]Collecting file URLs...", total=len(package_urls) | |
| ) | |
| async def func(package_url: str): | |
| async with semaphore: | |
| file_urls = await get_package_files( | |
| session, package_url, progress=progress, task_id=task_id | |
| ) | |
| package_files = [] | |
| for file_url in file_urls: | |
| # Extract filename from URL. | |
| parsed = urlparse(file_url) | |
| filename = os.path.basename(parsed.path) | |
| if filename: | |
| package_files.append((file_url, filename)) | |
| return package_files | |
| # Create tasks for all package file collection. | |
| collection_tasks = [func(package_url) for package_url in package_urls] | |
| # Execute all tasks concurrently and flatten results. | |
| package_file_results = await asyncio.gather(*collection_tasks) | |
| all_files = [file for result in package_file_results for file in result] | |
| console.print(f"[green]Found {len(all_files)} files to download[/green]") | |
| if not all_files: | |
| console.print("[yellow]No files found to download[/yellow]") | |
| return | |
| with Progress() as progress: | |
| task_id = progress.add_task( | |
| "[green]Downloading files...", total=len(all_files) | |
| ) | |
| async def func(file_url: str, filename: str): | |
| async with semaphore: | |
| target_path = target_dir / filename | |
| return await download_file( | |
| session, | |
| file_url, | |
| target_path, | |
| progress=progress, | |
| task_id=task_id, | |
| ) | |
| tasks = [func(file_url, filename) for file_url, filename in all_files] | |
| results = await asyncio.gather(*tasks) | |
| downloaded_count = sum(1 for result in results if result is True) | |
| skipped_count = sum(1 for result in results if result is False) | |
| console.print("[green]Download complete![/green]") | |
| console.print(f"Downloaded: {downloaded_count}") | |
| console.print(f"Skipped (already exist): {skipped_count}") | |
| async def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Download all packages from a Simple API-compatible index" | |
| ) | |
| parser.add_argument("index_url", help="URL of the Simple API index") | |
| parser.add_argument("target_dir", help="Directory to download files to") | |
| parser.add_argument( | |
| "--max-concurrent", | |
| type=int, | |
| default=8, | |
| help="Maximum number of concurrent downloads (default: 8)", | |
| ) | |
| parser.add_argument( | |
| "--username", | |
| help="Username for authentication", | |
| ) | |
| parser.add_argument( | |
| "--password", | |
| help="Password for authentication", | |
| ) | |
| args = parser.parse_args() | |
| target_dir = Path(args.target_dir) | |
| await export_index( | |
| args.index_url, | |
| target_dir, | |
| args.max_concurrent, | |
| args.username, | |
| args.password, | |
| ) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Comments are disabled for this gist.