Skip to content

Instantly share code, notes, and snippets.

@amazingsmash
Created October 31, 2025 16:22
Show Gist options
  • Select an option

  • Save amazingsmash/dcbf7cd120a9771b2e1ffa41e86cc5db to your computer and use it in GitHub Desktop.

Select an option

Save amazingsmash/dcbf7cd120a9771b2e1ffa41e86cc5db to your computer and use it in GitHub Desktop.
Benchmark tool that generates random binary blobs and compares I/O performance across loose files, ZIP, TAR, and HDF5 containers.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import os
import random
import shutil
import tarfile
import time
import zipfile
from pathlib import Path
from typing import List, Dict, Tuple
# Optional HDF5 support
try:
import h5py # type: ignore
H5_AVAILABLE = True
except Exception:
H5_AVAILABLE = False
# ------------------------------- Utilities -------------------------------- #
def bytes_from_mb(mb: float) -> int:
return int(mb * 1024 * 1024)
def ensure_clean_dir(path: Path) -> None:
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
def human(s: float) -> str:
return f"{s:.3f}s"
def now() -> float:
return time.perf_counter()
def generate_blob_file(dst: Path, size_bytes: int, chunk_size: int = 4 * 1024 * 1024) -> None:
"""
Streams cryptographically-strong random bytes to avoid holding the whole blob in RAM.
"""
remaining = size_bytes
with dst.open("wb", buffering=0) as f:
while remaining > 0:
n = min(remaining, chunk_size)
f.write(os.urandom(n)) # system RNG; avoids Python-level loops
remaining -= n
def read_file_to_memory(src: Path, chunk_size: int = 4 * 1024 * 1024) -> int:
"""
Reads a file fully into memory (discarded) in chunks. Returns total bytes read.
"""
total = 0
with src.open("rb", buffering=0) as f:
while True:
b = f.read(chunk_size)
if not b:
break
total += len(b)
return total
def shuffled(items: List[str], seed: int) -> List[str]:
r = random.Random(seed)
arr = items[:]
r.shuffle(arr)
return arr
# --------------------------- Bench: loose files ---------------------------- #
def bench_loose_files_write(base_dir: Path, n: int, size_mb: float) -> Tuple[List[Path], float]:
size_bytes = bytes_from_mb(size_mb)
filenames = [base_dir / f"blob_{i:05d}.bin" for i in range(n)]
t0 = now()
for p in filenames:
generate_blob_file(p, size_bytes)
t1 = now()
return filenames, t1 - t0
def bench_loose_files_read_random(paths: List[Path], seed: int = 12345) -> float:
order = shuffled([str(p) for p in paths], seed=seed)
t0 = now()
total = 0
for p in order:
total += read_file_to_memory(Path(p))
t1 = now()
# total is unused, but ensures the loop isn't optimized away by the interpreter/JITs
return t1 - t0
# ------------------------------- ZIP bundle -------------------------------- #
def create_zip_bundle_store(src_files: List[Path], zip_path: Path) -> float:
t0 = now()
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_STORED, allowZip64=True) as zf:
for p in src_files:
zf.write(p, arcname=p.name)
t1 = now()
return t1 - t0
def create_zip_bundle_deflate(src_files: List[Path], zip_path: Path, level: int = 6) -> float:
t0 = now()
comp = zipfile.ZIP_DEFLATED
with zipfile.ZipFile(zip_path, "w", compression=comp, compresslevel=level, allowZip64=True) as zf:
for p in src_files:
zf.write(p, arcname=p.name)
t1 = now()
return t1 - t0
def bench_zip_read_random(zip_path: Path, filenames: List[str], seed: int = 12345) -> float:
order = shuffled(filenames, seed=seed)
t0 = now()
with zipfile.ZipFile(zip_path, "r") as zf:
for name in order:
_ = zf.read(name)
t1 = now()
return t1 - t0
# ------------------------------- TAR bundle -------------------------------- #
def create_tar_bundle(src_files: List[Path], tar_path: Path) -> float:
t0 = now()
with tarfile.open(tar_path, "w") as tf:
for p in src_files:
tf.add(p, arcname=p.name)
t1 = now()
return t1 - t0
def create_targz_bundle(src_files: List[Path], tar_gz_path: Path, compresslevel: int = 6) -> float:
t0 = now()
with tarfile.open(tar_gz_path, "w:gz", compresslevel=compresslevel) as tf:
for p in src_files:
tf.add(p, arcname=p.name)
t1 = now()
return t1 - t0
def bench_tar_read_random(tar_path: Path, filenames: List[str], seed: int = 12345) -> float:
order = shuffled(filenames, seed=seed)
t0 = now()
with tarfile.open(tar_path, "r") as tf:
# tarfile needs TarInfo lookup; build a dict once for O(1) member access
members = {m.name: m for m in tf.getmembers()}
for name in order:
ti = members[name]
fobj = tf.extractfile(ti)
if fobj is None:
raise RuntimeError(f"Failed to extract member {name}")
# Consume without writing to disk
while fobj.read(4 * 1024 * 1024):
pass
fobj.close()
t1 = now()
return t1 - t0
# ------------------------------- HDF5 bundle -------------------------------- #
def create_hdf5_bundle(src_files: List[Path], h5_path: Path, chunk_size: int = 1024 * 1024) -> float:
"""
Stores each file as a byte array dataset under /blobs/<filename>.
Uses chunking for scalable I/O; compression disabled to measure raw container overhead.
"""
if not H5_AVAILABLE:
return float("nan")
t0 = now()
with h5py.File(h5_path, "w") as f:
grp = f.create_group("blobs")
for p in src_files:
size = p.stat().st_size
# Chunked dataset of uint8 to allow partial I/O; no compression to match ZIP store/TAR
dset = grp.create_dataset(
p.name,
shape=(size,),
dtype="u1",
chunks=(min(chunk_size, size),),
compression=None,
)
# Stream copy to avoid loading entire blob in RAM
with p.open("rb", buffering=0) as fh:
offset = 0
while True:
b = fh.read(chunk_size)
if not b:
break
n = len(b)
dset[offset:offset + n] = memoryview(b)
offset += n
t1 = now()
return t1 - t0
def bench_hdf5_read_random(h5_path: Path, filenames: List[str], seed: int = 12345, chunk_size: int = 4 * 1024 * 1024) -> float:
if not H5_AVAILABLE:
return float("nan")
order = shuffled(filenames, seed=seed)
t0 = now()
with h5py.File(h5_path, "r") as f:
grp = f["/blobs"]
for name in order:
dset = grp[name]
size = dset.shape[0]
# Read in chunks to simulate streaming without relying on large RAM
read = 0
while read < size:
end = min(read + chunk_size, size)
_ = dset[read:end]
read = end
t1 = now()
return t1 - t0
# --------------------------------- Main ------------------------------------ #
def main() -> None:
parser = argparse.ArgumentParser(description="Blob bundling and I/O benchmark.")
parser.add_argument("-n", "--num-blobs", type=int, required=True, help="Number of blobs to generate.")
parser.add_argument("-s", "--size-mb", type=float, required=True, help="Size of each blob in MB.")
parser.add_argument("--seed", type=int, default=1337, help="RNG seed for reproducible random order.")
parser.add_argument("--workdir", type=Path, default=Path("./bench_workspace"), help="Working directory.")
parser.add_argument("--keep", action="store_true", help="Keep workspace after run.")
args = parser.parse_args()
base = args.workdir.resolve()
blobs_dir = base / "blobs"
bundles_dir = base / "bundles"
ensure_clean_dir(base)
ensure_clean_dir(blobs_dir)
ensure_clean_dir(bundles_dir)
# 1) Generate blobs and measure loose file write time
files, t_loose_write = bench_loose_files_write(blobs_dir, args.num_blobs, args.size_mb)
# 1b) Loose files read (random order)
t_loose_read = bench_loose_files_read_random(files, seed=args.seed)
# Names used inside bundles
names = [p.name for p in files]
# 2) Create bundles (write/pack times)
results: List[Dict[str, str]] = []
def add_result(container: str, write_s: float, read_s: float) -> None:
results.append({
"Container": container,
"WriteTime": human(write_s) if write_s == write_s else "n/a", # NaN check
"RandomReadTime": human(read_s) if read_s == read_s else "n/a",
})
# ZIP (store)
zip_store = bundles_dir / "blobs_store.zip"
t_zip_store_write = create_zip_bundle_store(files, zip_store)
t_zip_store_read = bench_zip_read_random(zip_store, names, seed=args.seed)
# ZIP (deflate)
zip_def = bundles_dir / "blobs_deflate.zip"
t_zip_def_write = create_zip_bundle_deflate(files, zip_def, level=6)
t_zip_def_read = bench_zip_read_random(zip_def, names, seed=args.seed)
# TAR (no compression)
tar_plain = bundles_dir / "blobs.tar"
t_tar_write = create_tar_bundle(files, tar_plain)
t_tar_read = bench_tar_read_random(tar_plain, names, seed=args.seed)
# TAR.GZ (gzip)
targz = bundles_dir / "blobs.tar.gz"
t_targz_write = create_targz_bundle(files, targz, compresslevel=6)
t_targz_read = bench_tar_read_random(targz, names, seed=args.seed)
# HDF5 (if available)
if H5_AVAILABLE:
h5 = bundles_dir / "blobs.h5"
t_h5_write = create_hdf5_bundle(files, h5)
t_h5_read = bench_hdf5_read_random(h5, names, seed=args.seed)
else:
t_h5_write = float("nan")
t_h5_read = float("nan")
# 3) Collect results
# Row for loose files (reference)
add_result("Loose files (ref)", t_loose_write, t_loose_read)
add_result("ZIP (store)", t_zip_store_write, t_zip_store_read)
add_result("ZIP (deflate)", t_zip_def_write, t_zip_def_read)
add_result("TAR", t_tar_write, t_tar_read)
add_result("TAR.GZ", t_targz_write, t_targz_read)
if H5_AVAILABLE:
add_result("HDF5", t_h5_write, t_h5_read)
else:
add_result("HDF5", float("nan"), float("nan"))
# 4) Print table
print("\n=== Benchmark Summary ===")
print(f"N={args.num_blobs} Size={args.size_mb} MB each Seed={args.seed}")
print(f"Workspace: {base}")
colw = (28, 14, 18)
hdr = ("Container", "WriteTime", "RandomReadTime")
print(f"{hdr[0]:<{colw[0]}} {hdr[1]:>{colw[1]}} {hdr[2]:>{colw[2]}}")
print("-" * (sum(colw) + 2))
for row in results:
print(f"{row['Container']:<{colw[0]}} {row['WriteTime']:>{colw[1]}} {row['RandomReadTime']:>{colw[2]}}")
# 4b) CSV
csv_path = base / "results.csv"
with csv_path.open("w", encoding="utf-8") as f:
f.write("container,write_seconds,random_read_seconds\n")
# Recompute numeric seconds for CSV (store raw floats)
def seconds_from_human(h: str) -> float:
if h == "n/a":
return float("nan")
return float(h[:-1]) # strip 's'
for row in results:
f.write(f"{row['Container']},{seconds_from_human(row['WriteTime'])},{seconds_from_human(row['RandomReadTime'])}\n")
print(f"\nCSV written: {csv_path}")
if not args.keep:
# Keep bundles and blobs; many users want to inspect. If you truly want cleanup, toggle here.
pass
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment