Created
October 31, 2025 16:22
-
-
Save amazingsmash/dcbf7cd120a9771b2e1ffa41e86cc5db to your computer and use it in GitHub Desktop.
Benchmark tool that generates random binary blobs and compares I/O performance across loose files, ZIP, TAR, and HDF5 containers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import argparse | |
| import os | |
| import random | |
| import shutil | |
| import tarfile | |
| import time | |
| import zipfile | |
| from pathlib import Path | |
| from typing import List, Dict, Tuple | |
| # Optional HDF5 support | |
| try: | |
| import h5py # type: ignore | |
| H5_AVAILABLE = True | |
| except Exception: | |
| H5_AVAILABLE = False | |
| # ------------------------------- Utilities -------------------------------- # | |
| def bytes_from_mb(mb: float) -> int: | |
| return int(mb * 1024 * 1024) | |
| def ensure_clean_dir(path: Path) -> None: | |
| if path.exists(): | |
| shutil.rmtree(path) | |
| path.mkdir(parents=True, exist_ok=True) | |
| def human(s: float) -> str: | |
| return f"{s:.3f}s" | |
| def now() -> float: | |
| return time.perf_counter() | |
| def generate_blob_file(dst: Path, size_bytes: int, chunk_size: int = 4 * 1024 * 1024) -> None: | |
| """ | |
| Streams cryptographically-strong random bytes to avoid holding the whole blob in RAM. | |
| """ | |
| remaining = size_bytes | |
| with dst.open("wb", buffering=0) as f: | |
| while remaining > 0: | |
| n = min(remaining, chunk_size) | |
| f.write(os.urandom(n)) # system RNG; avoids Python-level loops | |
| remaining -= n | |
| def read_file_to_memory(src: Path, chunk_size: int = 4 * 1024 * 1024) -> int: | |
| """ | |
| Reads a file fully into memory (discarded) in chunks. Returns total bytes read. | |
| """ | |
| total = 0 | |
| with src.open("rb", buffering=0) as f: | |
| while True: | |
| b = f.read(chunk_size) | |
| if not b: | |
| break | |
| total += len(b) | |
| return total | |
| def shuffled(items: List[str], seed: int) -> List[str]: | |
| r = random.Random(seed) | |
| arr = items[:] | |
| r.shuffle(arr) | |
| return arr | |
| # --------------------------- Bench: loose files ---------------------------- # | |
| def bench_loose_files_write(base_dir: Path, n: int, size_mb: float) -> Tuple[List[Path], float]: | |
| size_bytes = bytes_from_mb(size_mb) | |
| filenames = [base_dir / f"blob_{i:05d}.bin" for i in range(n)] | |
| t0 = now() | |
| for p in filenames: | |
| generate_blob_file(p, size_bytes) | |
| t1 = now() | |
| return filenames, t1 - t0 | |
| def bench_loose_files_read_random(paths: List[Path], seed: int = 12345) -> float: | |
| order = shuffled([str(p) for p in paths], seed=seed) | |
| t0 = now() | |
| total = 0 | |
| for p in order: | |
| total += read_file_to_memory(Path(p)) | |
| t1 = now() | |
| # total is unused, but ensures the loop isn't optimized away by the interpreter/JITs | |
| return t1 - t0 | |
| # ------------------------------- ZIP bundle -------------------------------- # | |
| def create_zip_bundle_store(src_files: List[Path], zip_path: Path) -> float: | |
| t0 = now() | |
| with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_STORED, allowZip64=True) as zf: | |
| for p in src_files: | |
| zf.write(p, arcname=p.name) | |
| t1 = now() | |
| return t1 - t0 | |
| def create_zip_bundle_deflate(src_files: List[Path], zip_path: Path, level: int = 6) -> float: | |
| t0 = now() | |
| comp = zipfile.ZIP_DEFLATED | |
| with zipfile.ZipFile(zip_path, "w", compression=comp, compresslevel=level, allowZip64=True) as zf: | |
| for p in src_files: | |
| zf.write(p, arcname=p.name) | |
| t1 = now() | |
| return t1 - t0 | |
| def bench_zip_read_random(zip_path: Path, filenames: List[str], seed: int = 12345) -> float: | |
| order = shuffled(filenames, seed=seed) | |
| t0 = now() | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| for name in order: | |
| _ = zf.read(name) | |
| t1 = now() | |
| return t1 - t0 | |
| # ------------------------------- TAR bundle -------------------------------- # | |
| def create_tar_bundle(src_files: List[Path], tar_path: Path) -> float: | |
| t0 = now() | |
| with tarfile.open(tar_path, "w") as tf: | |
| for p in src_files: | |
| tf.add(p, arcname=p.name) | |
| t1 = now() | |
| return t1 - t0 | |
| def create_targz_bundle(src_files: List[Path], tar_gz_path: Path, compresslevel: int = 6) -> float: | |
| t0 = now() | |
| with tarfile.open(tar_gz_path, "w:gz", compresslevel=compresslevel) as tf: | |
| for p in src_files: | |
| tf.add(p, arcname=p.name) | |
| t1 = now() | |
| return t1 - t0 | |
| def bench_tar_read_random(tar_path: Path, filenames: List[str], seed: int = 12345) -> float: | |
| order = shuffled(filenames, seed=seed) | |
| t0 = now() | |
| with tarfile.open(tar_path, "r") as tf: | |
| # tarfile needs TarInfo lookup; build a dict once for O(1) member access | |
| members = {m.name: m for m in tf.getmembers()} | |
| for name in order: | |
| ti = members[name] | |
| fobj = tf.extractfile(ti) | |
| if fobj is None: | |
| raise RuntimeError(f"Failed to extract member {name}") | |
| # Consume without writing to disk | |
| while fobj.read(4 * 1024 * 1024): | |
| pass | |
| fobj.close() | |
| t1 = now() | |
| return t1 - t0 | |
| # ------------------------------- HDF5 bundle -------------------------------- # | |
| def create_hdf5_bundle(src_files: List[Path], h5_path: Path, chunk_size: int = 1024 * 1024) -> float: | |
| """ | |
| Stores each file as a byte array dataset under /blobs/<filename>. | |
| Uses chunking for scalable I/O; compression disabled to measure raw container overhead. | |
| """ | |
| if not H5_AVAILABLE: | |
| return float("nan") | |
| t0 = now() | |
| with h5py.File(h5_path, "w") as f: | |
| grp = f.create_group("blobs") | |
| for p in src_files: | |
| size = p.stat().st_size | |
| # Chunked dataset of uint8 to allow partial I/O; no compression to match ZIP store/TAR | |
| dset = grp.create_dataset( | |
| p.name, | |
| shape=(size,), | |
| dtype="u1", | |
| chunks=(min(chunk_size, size),), | |
| compression=None, | |
| ) | |
| # Stream copy to avoid loading entire blob in RAM | |
| with p.open("rb", buffering=0) as fh: | |
| offset = 0 | |
| while True: | |
| b = fh.read(chunk_size) | |
| if not b: | |
| break | |
| n = len(b) | |
| dset[offset:offset + n] = memoryview(b) | |
| offset += n | |
| t1 = now() | |
| return t1 - t0 | |
| def bench_hdf5_read_random(h5_path: Path, filenames: List[str], seed: int = 12345, chunk_size: int = 4 * 1024 * 1024) -> float: | |
| if not H5_AVAILABLE: | |
| return float("nan") | |
| order = shuffled(filenames, seed=seed) | |
| t0 = now() | |
| with h5py.File(h5_path, "r") as f: | |
| grp = f["/blobs"] | |
| for name in order: | |
| dset = grp[name] | |
| size = dset.shape[0] | |
| # Read in chunks to simulate streaming without relying on large RAM | |
| read = 0 | |
| while read < size: | |
| end = min(read + chunk_size, size) | |
| _ = dset[read:end] | |
| read = end | |
| t1 = now() | |
| return t1 - t0 | |
| # --------------------------------- Main ------------------------------------ # | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Blob bundling and I/O benchmark.") | |
| parser.add_argument("-n", "--num-blobs", type=int, required=True, help="Number of blobs to generate.") | |
| parser.add_argument("-s", "--size-mb", type=float, required=True, help="Size of each blob in MB.") | |
| parser.add_argument("--seed", type=int, default=1337, help="RNG seed for reproducible random order.") | |
| parser.add_argument("--workdir", type=Path, default=Path("./bench_workspace"), help="Working directory.") | |
| parser.add_argument("--keep", action="store_true", help="Keep workspace after run.") | |
| args = parser.parse_args() | |
| base = args.workdir.resolve() | |
| blobs_dir = base / "blobs" | |
| bundles_dir = base / "bundles" | |
| ensure_clean_dir(base) | |
| ensure_clean_dir(blobs_dir) | |
| ensure_clean_dir(bundles_dir) | |
| # 1) Generate blobs and measure loose file write time | |
| files, t_loose_write = bench_loose_files_write(blobs_dir, args.num_blobs, args.size_mb) | |
| # 1b) Loose files read (random order) | |
| t_loose_read = bench_loose_files_read_random(files, seed=args.seed) | |
| # Names used inside bundles | |
| names = [p.name for p in files] | |
| # 2) Create bundles (write/pack times) | |
| results: List[Dict[str, str]] = [] | |
| def add_result(container: str, write_s: float, read_s: float) -> None: | |
| results.append({ | |
| "Container": container, | |
| "WriteTime": human(write_s) if write_s == write_s else "n/a", # NaN check | |
| "RandomReadTime": human(read_s) if read_s == read_s else "n/a", | |
| }) | |
| # ZIP (store) | |
| zip_store = bundles_dir / "blobs_store.zip" | |
| t_zip_store_write = create_zip_bundle_store(files, zip_store) | |
| t_zip_store_read = bench_zip_read_random(zip_store, names, seed=args.seed) | |
| # ZIP (deflate) | |
| zip_def = bundles_dir / "blobs_deflate.zip" | |
| t_zip_def_write = create_zip_bundle_deflate(files, zip_def, level=6) | |
| t_zip_def_read = bench_zip_read_random(zip_def, names, seed=args.seed) | |
| # TAR (no compression) | |
| tar_plain = bundles_dir / "blobs.tar" | |
| t_tar_write = create_tar_bundle(files, tar_plain) | |
| t_tar_read = bench_tar_read_random(tar_plain, names, seed=args.seed) | |
| # TAR.GZ (gzip) | |
| targz = bundles_dir / "blobs.tar.gz" | |
| t_targz_write = create_targz_bundle(files, targz, compresslevel=6) | |
| t_targz_read = bench_tar_read_random(targz, names, seed=args.seed) | |
| # HDF5 (if available) | |
| if H5_AVAILABLE: | |
| h5 = bundles_dir / "blobs.h5" | |
| t_h5_write = create_hdf5_bundle(files, h5) | |
| t_h5_read = bench_hdf5_read_random(h5, names, seed=args.seed) | |
| else: | |
| t_h5_write = float("nan") | |
| t_h5_read = float("nan") | |
| # 3) Collect results | |
| # Row for loose files (reference) | |
| add_result("Loose files (ref)", t_loose_write, t_loose_read) | |
| add_result("ZIP (store)", t_zip_store_write, t_zip_store_read) | |
| add_result("ZIP (deflate)", t_zip_def_write, t_zip_def_read) | |
| add_result("TAR", t_tar_write, t_tar_read) | |
| add_result("TAR.GZ", t_targz_write, t_targz_read) | |
| if H5_AVAILABLE: | |
| add_result("HDF5", t_h5_write, t_h5_read) | |
| else: | |
| add_result("HDF5", float("nan"), float("nan")) | |
| # 4) Print table | |
| print("\n=== Benchmark Summary ===") | |
| print(f"N={args.num_blobs} Size={args.size_mb} MB each Seed={args.seed}") | |
| print(f"Workspace: {base}") | |
| colw = (28, 14, 18) | |
| hdr = ("Container", "WriteTime", "RandomReadTime") | |
| print(f"{hdr[0]:<{colw[0]}} {hdr[1]:>{colw[1]}} {hdr[2]:>{colw[2]}}") | |
| print("-" * (sum(colw) + 2)) | |
| for row in results: | |
| print(f"{row['Container']:<{colw[0]}} {row['WriteTime']:>{colw[1]}} {row['RandomReadTime']:>{colw[2]}}") | |
| # 4b) CSV | |
| csv_path = base / "results.csv" | |
| with csv_path.open("w", encoding="utf-8") as f: | |
| f.write("container,write_seconds,random_read_seconds\n") | |
| # Recompute numeric seconds for CSV (store raw floats) | |
| def seconds_from_human(h: str) -> float: | |
| if h == "n/a": | |
| return float("nan") | |
| return float(h[:-1]) # strip 's' | |
| for row in results: | |
| f.write(f"{row['Container']},{seconds_from_human(row['WriteTime'])},{seconds_from_human(row['RandomReadTime'])}\n") | |
| print(f"\nCSV written: {csv_path}") | |
| if not args.keep: | |
| # Keep bundles and blobs; many users want to inspect. If you truly want cleanup, toggle here. | |
| pass | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment