Skip to content

Instantly share code, notes, and snippets.

@gaoyifan
Created October 11, 2025 10:33
Show Gist options
  • Select an option

  • Save gaoyifan/fb0799f5edf76cfbd815bf3143134cea to your computer and use it in GitHub Desktop.

Select an option

Save gaoyifan/fb0799f5edf76cfbd815bf3143134cea to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
shorten_names.py — Safely shorten file/dir names to <= max bytes (UTF-8), with dry-run & statistics.
Linux/macOS.
Changes vs prior version:
- Predict and avoid collisions even in dry-run using per-directory "reserved" names
- Optional case-insensitive collision mode (default True on macOS)
- Safer last-resort truncation edge path
- Cleaner stats and distribution output
"""
import argparse
import csv
import hashlib
import os
import platform
import sys
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
# ---------- Helpers ----------
def utf8_len_bytes(s: str) -> int:
return len(s.encode("utf-8"))
def truncate_utf8_by_bytes(s: str, max_bytes: int) -> str:
"""Return s truncated to <= max_bytes in UTF-8 without splitting code points."""
b = s.encode("utf-8")
if len(b) <= max_bytes:
return s
# Cut and decode ignoring partial trailing code point
return b[:max_bytes].decode("utf-8", errors="ignore")
def split_multi_ext(name: str, depth: int) -> Tuple[str, str]:
"""
Split name into (base, kept_ext) keeping up to 'depth' dotted extensions.
depth=1: 'a.tar.gz' -> ('a.tar', '.gz')
depth=2: 'a.tar.gz' -> ('a', '.tar.gz')
"""
if depth <= 0:
return name, ""
base = name
parts = []
for _ in range(depth):
base2, ext = os.path.splitext(base)
if ext:
parts.insert(0, ext)
base = base2
else:
break
return base, "".join(parts)
def sha1_short(text: str, n: int = 6) -> str:
return hashlib.sha1(text.encode("utf-8")).hexdigest()[:n]
# ---------- Planning ----------
def plan_new_name(
old_name: str,
max_bytes: int,
ext_depth: int,
suffix_mode: str = "hash",
allow_truncate_ext: bool = False,
) -> str:
"""
Compute a new name <= max_bytes in UTF-8.
- Keeps last 'ext_depth' extensions.
- Adds "-<hash>" when truncation is needed (suffix_mode='hash').
- May truncate/drop extension if necessary and --trunc-ext enabled.
"""
if utf8_len_bytes(old_name) <= max_bytes:
return old_name # nothing to change
base, ext = split_multi_ext(old_name, ext_depth)
suffix = "" if suffix_mode == "none" else f"-{sha1_short(old_name)}"
# Budget for base
budget = max_bytes - utf8_len_bytes(suffix) - utf8_len_bytes(ext)
if budget <= 0:
# Try truncating extension if allowed and extension is relatively large.
if allow_truncate_ext and utf8_len_bytes(ext) >= max_bytes // 3:
max_ext_bytes = max(4, max_bytes // 6)
ext = truncate_utf8_by_bytes(ext, max_ext_bytes)
budget = max_bytes - utf8_len_bytes(suffix) - utf8_len_bytes(ext)
if budget <= 0:
# Drop extension as last resort to fit at least suffix/base
ext = ""
budget = max_bytes - utf8_len_bytes(suffix)
if budget <= 0:
# Extreme case: keep as much as possible; prefer hashed id if any
core = truncate_utf8_by_bytes(old_name, max_bytes)
if utf8_len_bytes(core) <= max_bytes:
return core
# Fallback to just suffix if it helps at all
return truncate_utf8_by_bytes(suffix or core, max_bytes)
new_base = truncate_utf8_by_bytes(base, budget)
candidate = f"{new_base}{suffix}{ext}"
# Double-check and shave if multi-byte boundaries left us 1–2 bytes over
cand_len = utf8_len_bytes(candidate)
if cand_len > max_bytes:
shave = cand_len - max_bytes
# shave +1 to be safe against another multibyte boundary
new_base = truncate_utf8_by_bytes(new_base, max(0, utf8_len_bytes(new_base) - shave - 1))
candidate = f"{new_base}{suffix}{ext}"
return candidate
# ---------- Collision handling ----------
def unique_name_in_dir(
dirpath: str,
desired: str,
max_bytes: int,
reserved: set,
case_insensitive: bool,
) -> Tuple[str, bool]:
"""
Ensure 'desired' is unique in dirpath against both existing filesystem *and* 'reserved'.
Returns (unique_name, was_collision_resolved).
"""
def norm(s: str) -> str:
return s.lower() if case_insensitive else s
base, ext = os.path.splitext(desired)
i = 0
while True:
candidate = desired if i == 0 else _candidate_with_suffix(base, ext, i, max_bytes)
exists = os.path.exists(os.path.join(dirpath, candidate)) or norm(candidate) in reserved
if not exists:
reserved.add(norm(candidate))
return candidate, (i > 0)
i += 1
if i > 9999:
raise RuntimeError("Too many collisions while generating a unique name.")
def _candidate_with_suffix(base: str, ext: str, i: int, max_bytes: int) -> str:
suffix = f"~{i}"
# Try to keep extension; if too tight, drop it.
def blen(s: str) -> int: return utf8_len_bytes(s)
budget = max_bytes - blen(ext) - blen(suffix)
if budget < 1:
ext = ""
budget = max_bytes - blen(suffix)
base_trunc = truncate_utf8_by_bytes(base, budget)
return f"{base_trunc}{suffix}{ext}"
# ---------- Stats ----------
@dataclass
class Stats:
scanned_files: int = 0
scanned_dirs: int = 0
overlimit_files: int = 0
overlimit_dirs: int = 0
planned_renames: int = 0
collisions_resolved: int = 0
renamed_ok: int = 0
skipped_exists: int = 0
skipped_errors: int = 0
unchanged: int = 0
longest_components: List[Tuple[int, str]] = field(default_factory=list)
def track_component(self, path: str, name: str, top_k: int):
blen = utf8_len_bytes(name)
self.longest_components.append((blen, path))
self.longest_components.sort(key=lambda x: x[0], reverse=True)
if len(self.longest_components) > top_k:
self.longest_components.pop()
# ---------- Walk ----------
def iter_entries(root: str, include_dirs: bool):
# bottom-up so we can safely rename directories after their children
for dirpath, dirnames, filenames in os.walk(root, topdown=False):
if include_dirs:
for d in dirnames:
yield dirpath, d, True
for f in filenames:
yield dirpath, f, False
# ---------- Main ----------
def main():
default_ci = (platform.system() == "Darwin") # macOS default FS is usually case-insensitive
ap = argparse.ArgumentParser(description="Shorten file/dir names to <= max bytes (UTF-8) with dry-run & stats.")
ap.add_argument("root", nargs="?", default=".", help="Root directory to scan.")
ap.add_argument("--targets", choices=["files", "dirs", "both"], default="files",
help="Operate on files, dirs, or both (default: files).")
ap.add_argument("--include-dirs", action="store_true",
help="Alias for --targets both (deprecated, kept for convenience).")
ap.add_argument("--max-bytes", type=int, default=255, help="Max UTF-8 bytes per name (default: 255).")
ap.add_argument("--ext-depth", type=int, default=1, help="Number of dotted extensions to preserve (default: 1).")
ap.add_argument("--suffix-mode", choices=["hash", "none"], default="hash",
help="Append short hash on truncation (default: hash).")
ap.add_argument("--trunc-ext", action="store_true",
help="Allow truncating overly long extensions (default: off).")
ap.add_argument("--apply", action="store_true", help="Actually perform renames (default: dry-run).")
ap.add_argument("--log-csv", type=str, default=None, help="Write actions to CSV.")
ap.add_argument("--top", type=int, default=10, help="Show top-N longest components (default: 10).")
ap.add_argument("--limit", type=int, default=0, help="Process at most N entries (testing).")
ap.add_argument("--verbose", "-v", action="store_true", help="Verbose output.")
ap.add_argument("--follow-symlinks", action="store_true", help="Process symlinks as well (default: skip).")
ap.add_argument("--case-insensitive-collisions", action="store_true", default=default_ci,
help=f"Treat names differing only by case as collisions (default: {default_ci}).")
ap.add_argument("--case-sensitive-collisions", action="store_true",
help="Force case-sensitive collision checks (overrides the above).")
args = ap.parse_args()
if args.include_dirs:
args.targets = "both"
if args.case_sensitive_collisions:
args.case_insensitive_collisions = False
root = os.path.abspath(args.root)
if not os.path.exists(root):
print(f"Root not found: {root}", file=sys.stderr)
sys.exit(2)
stats = Stats()
# Distribution buckets (bytes)
bucket_edges = [64, 128, 192, 224, 255]
bucket_counts: Dict[str, int] = {f"(<= {b})": 0 for b in bucket_edges}
bucket_counts[">(255)"] = 0
def bucket_of(n: int) -> str:
for b in bucket_edges:
if n <= b:
return f"(<= {b})"
return ">(255)"
# CSV
csv_writer = None
csv_file = None
if args.log_csv:
csv_file = open(args.log_csv, "w", newline="", encoding="utf-8")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["action", "type", "dir", "old_name", "new_name", "old_bytes", "new_bytes", "note"])
# Reserved names per directory (normalized for collision mode)
reserved_by_dir: Dict[str, set] = {}
def get_reserved(dirpath: str) -> set:
rs = reserved_by_dir.get(dirpath)
if rs is None:
rs = set()
reserved_by_dir[dirpath] = rs
return rs
def should_process(is_dir: bool) -> bool:
return (args.targets == "both") or (args.targets == "files" and not is_dir) or (args.targets == "dirs" and is_dir)
processed = 0
for dirpath, name, is_dir in iter_entries(root, include_dirs=(args.targets in ("dirs", "both"))):
if args.limit and processed >= args.limit:
break
processed += 1
full_path = os.path.join(dirpath, name)
if not args.follow_symlinks and os.path.islink(full_path):
continue
if is_dir:
stats.scanned_dirs += 1
else:
stats.scanned_files += 1
stats.track_component(full_path, name, args.top)
bucket_counts[bucket_of(utf8_len_bytes(name))] += 1
if not should_process(is_dir):
stats.unchanged += 1
continue
old_len = utf8_len_bytes(name)
if old_len <= args.max_bytes:
stats.unchanged += 1
continue
if is_dir:
stats.overlimit_dirs += 1
else:
stats.overlimit_files += 1
# Plan
desired = plan_new_name(
old_name=name,
max_bytes=args.max_bytes,
ext_depth=0 if is_dir else args.ext_depth,
suffix_mode=args.suffix_mode,
allow_truncate_ext=args.trunc_ext,
)
# Defensively skip if plan is a no-op (shouldn't happen for >max)
if desired == name:
stats.unchanged += 1
continue
reserved = get_reserved(dirpath)
try:
desired_unique, collided = unique_name_in_dir(
dirpath=dirpath,
desired=desired,
max_bytes=args.max_bytes,
reserved=reserved,
case_insensitive=args.case_insensitive_collisions,
)
if collided:
stats.collisions_resolved += 1
except Exception as e:
stats.skipped_errors += 1
if args.verbose:
print(f"[ERROR] {full_path}: collision-resolution-failed: {e}", file=sys.stderr)
if csv_writer:
csv_writer.writerow(["skip", "dir" if is_dir else "file", dirpath, name, "", old_len, "", "collision-failed"])
continue
new_len = utf8_len_bytes(desired_unique)
stats.planned_renames += 1
if args.verbose:
act = "RENAME" if args.apply else "DRYRUN"
print(f"[{act}] {full_path}\n -> {os.path.join(dirpath, desired_unique)} (bytes {old_len} -> {new_len})")
if csv_writer:
csv_writer.writerow([
"rename" if args.apply else "dryrun",
"dir" if is_dir else "file",
dirpath, name, desired_unique, old_len, new_len,
"collision-resolved" if collided else ""
])
if args.apply:
try:
os.rename(full_path, os.path.join(dirpath, desired_unique))
stats.renamed_ok += 1
except FileExistsError:
stats.skipped_exists += 1
if args.verbose:
print(f"[SKIP] Exists: {os.path.join(dirpath, desired_unique)}")
except Exception as e:
stats.skipped_errors += 1
if args.verbose:
print(f"[ERROR] {full_path}: {e}", file=sys.stderr)
if csv_file:
csv_file.close()
# ---------- Summary ----------
total_scanned = stats.scanned_files + stats.scanned_dirs
total_over = stats.overlimit_files + stats.overlimit_dirs
print("\n========== SUMMARY ==========")
print(f"Root: {root}")
print(f"Mode: {'APPLY' if args.apply else 'DRY-RUN'}")
print(f"Targets: {args.targets}")
print(f"Max bytes: {args.max_bytes}")
print(f"Ext depth: {args.ext_depth}")
print(f"Suffix mode: {args.suffix_mode}")
print(f"Truncate ext: {args.trunc_ext}")
print(f"Follow symlinks: {args.follow_symlinks}")
print(f"Case-insensitive: {args.case_insensitive_collisions}")
if args.log_csv:
print(f"CSV log: {args.log_csv}")
print("\n-- Counters --")
print(f"Scanned entries: {total_scanned} (files={stats.scanned_files}, dirs={stats.scanned_dirs})")
print(f"Over limit: {total_over} (files={stats.overlimit_files}, dirs={stats.overlimit_dirs})")
print(f"Planned renames: {stats.planned_renames}")
if not args.apply:
print(f"Would rename: {stats.planned_renames}")
else:
print(f"Renamed OK: {stats.renamed_ok}")
print(f"Skipped exists: {stats.skipped_exists}")
print(f"Errors: {stats.skipped_errors}")
print(f"Collisions resolved: {stats.collisions_resolved}")
print(f"Unchanged: {stats.unchanged}")
print("\n-- Byte-length distribution (UTF-8) --")
for b in bucket_edges:
label = f"(<= {b})"
print(f"{label:<10} {bucket_counts[label]}")
print(f"{'(> 255)':<10} {bucket_counts['>(255)']}")
print(f"\n-- Top {args.top} longest components --")
for blen, path in stats.longest_components:
print(f"{blen:>4} bytes {path}")
print("\nDone.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment