Skip to content

Instantly share code, notes, and snippets.

@charliermarsh
Created September 15, 2025 19:17
Show Gist options
  • Select an option

  • Save charliermarsh/c213699438b6ddf7dbd0b53c2d94d6ce to your computer and use it in GitHub Desktop.

Select an option

Save charliermarsh/c213699438b6ddf7dbd0b53c2d94d6ce to your computer and use it in GitHub Desktop.
Download files from a Simple API-compatible index.
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "aiofiles>=24.1.0",
# "aiohttp>=3.11.11",
# "beautifulsoup4>=4.13.3",
# "rich>=13.9.5",
# ]
# ///
"""Download all packages from a Simple API-compatible index.
This script downloads all packages and their files from a Simple API index
(PEP 503) to a local directory. Files that already exist are skipped.
Example usage:
uv run download-files.py --username $USERNAME --password $PASSWORD https://private.pypi.org/simple/ ./downloads/
"""
import argparse
import asyncio
import os
from pathlib import Path
from typing import cast
from urllib.parse import urljoin, urlparse
import aiofiles
import aiohttp
from bs4 import BeautifulSoup, Tag
from rich.console import Console
from rich.progress import Progress, TaskID
console = Console()
async def fetch_html(session: aiohttp.ClientSession, url: str) -> str:
"""Fetch HTML content from a URL."""
max_attempts = 5
http_err = None
for attempt in range(1, max_attempts + 1):
try:
async with session.get(url) as response:
if response.status == 429:
wait_time = int(response.headers.get("Retry-After", 30))
await asyncio.sleep(wait_time)
continue
response.raise_for_status()
return await response.text()
except aiohttp.ClientError as err:
if attempt < max_attempts:
await asyncio.sleep(2**attempt)
http_err = err
raise Exception(
f"Failed to fetch HTML from `{url}` after {max_attempts} attempts"
) from http_err
async def download_file(
session: aiohttp.ClientSession,
url: str,
target_path: Path,
*,
progress: Progress,
task_id: TaskID,
) -> bool:
"""Download a file if it doesn't already exist.
Returns True if the file was downloaded, False if skipped.
"""
if target_path.exists():
progress.advance(task_id)
return False
max_attempts = 5
http_err = None
for attempt in range(1, max_attempts + 1):
try:
async with session.get(url) as response:
if response.status == 429:
wait_time = int(response.headers.get("Retry-After", 30))
await asyncio.sleep(wait_time)
continue
response.raise_for_status()
# Create parent directories if they don't exist.
target_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(target_path, "wb") as file:
async for chunk in response.content.iter_chunked(8192):
await file.write(chunk)
progress.advance(task_id)
return True
except aiohttp.ClientError as err:
if attempt < max_attempts:
await asyncio.sleep(2**attempt)
http_err = err
raise Exception(
f"Failed to download `{url}` after {max_attempts} attempts"
) from http_err
async def get_package_files(
session: aiohttp.ClientSession,
package_url: str,
*,
progress: Progress,
task_id: TaskID,
) -> list[str]:
"""Get list of file URLs for a package from its Simple API page."""
html = await fetch_html(session, package_url)
soup = BeautifulSoup(html, "html.parser")
file_urls = []
for link in soup.find_all("a"):
href = cast(Tag, link).get("href")
assert isinstance(href, str), "Expected a `str`"
if href:
# Handle relative URLs.
file_url = urljoin(package_url, href)
file_urls.append(file_url)
progress.advance(task_id)
return file_urls
async def get_packages(
session: aiohttp.ClientSession,
index_url: str,
) -> list[str]:
"""Get list of package names from the Simple API index."""
html = await fetch_html(session, index_url)
soup = BeautifulSoup(html, "html.parser")
packages = []
for link in soup.find_all("a"):
href = cast(Tag, link).get("href")
assert isinstance(href, str), "Expected a `str`"
if href:
# Package URL should be relative to the index.
package_url = urljoin(index_url, href)
packages.append(package_url)
return packages
async def export_index(
index_url: str,
target_dir: Path,
max_concurrent: int = 10,
username: str | None = None,
password: str | None = None,
):
"""Download all packages from a Simple API index."""
target_dir.mkdir(parents=True, exist_ok=True)
# Ensure index URL ends with a slash for proper URL joining.
if not index_url.endswith("/"):
index_url += "/"
semaphore = asyncio.Semaphore(max_concurrent)
# If username or password are provided, create an authentication object.
auth = None
if password:
auth = aiohttp.BasicAuth(username or "__token__", password)
async with aiohttp.ClientSession(auth=auth) as session:
console.print(f"[blue]Fetching package list from {index_url}[/blue]")
try:
package_urls = await get_packages(session, index_url)
except Exception as err:
console.print(f"[red]Error fetching package list: {err}[/red]")
return
console.print(f"[blue]Found {len(package_urls)} packages[/blue]")
if not package_urls:
console.print("[yellow]No packages found in index[/yellow]")
return
# Collect all file URLs first.
all_files = []
with Progress() as progress:
task_id = progress.add_task(
"[blue]Collecting file URLs...", total=len(package_urls)
)
async def func(package_url: str):
async with semaphore:
file_urls = await get_package_files(
session, package_url, progress=progress, task_id=task_id
)
package_files = []
for file_url in file_urls:
# Extract filename from URL.
parsed = urlparse(file_url)
filename = os.path.basename(parsed.path)
if filename:
package_files.append((file_url, filename))
return package_files
# Create tasks for all package file collection.
collection_tasks = [func(package_url) for package_url in package_urls]
# Execute all tasks concurrently and flatten results.
package_file_results = await asyncio.gather(*collection_tasks)
all_files = [file for result in package_file_results for file in result]
console.print(f"[green]Found {len(all_files)} files to download[/green]")
if not all_files:
console.print("[yellow]No files found to download[/yellow]")
return
with Progress() as progress:
task_id = progress.add_task(
"[green]Downloading files...", total=len(all_files)
)
async def func(file_url: str, filename: str):
async with semaphore:
target_path = target_dir / filename
return await download_file(
session,
file_url,
target_path,
progress=progress,
task_id=task_id,
)
tasks = [func(file_url, filename) for file_url, filename in all_files]
results = await asyncio.gather(*tasks)
downloaded_count = sum(1 for result in results if result is True)
skipped_count = sum(1 for result in results if result is False)
console.print("[green]Download complete![/green]")
console.print(f"Downloaded: {downloaded_count}")
console.print(f"Skipped (already exist): {skipped_count}")
async def main():
parser = argparse.ArgumentParser(
description="Download all packages from a Simple API-compatible index"
)
parser.add_argument("index_url", help="URL of the Simple API index")
parser.add_argument("target_dir", help="Directory to download files to")
parser.add_argument(
"--max-concurrent",
type=int,
default=8,
help="Maximum number of concurrent downloads (default: 8)",
)
parser.add_argument(
"--username",
help="Username for authentication",
)
parser.add_argument(
"--password",
help="Password for authentication",
)
args = parser.parse_args()
target_dir = Path(args.target_dir)
await export_index(
args.index_url,
target_dir,
args.max_concurrent,
args.username,
args.password,
)
if __name__ == "__main__":
asyncio.run(main())

Comments are disabled for this gist.