Created
October 11, 2025 10:33
-
-
Save gaoyifan/fb0799f5edf76cfbd815bf3143134cea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| shorten_names.py — Safely shorten file/dir names to <= max bytes (UTF-8), with dry-run & statistics. | |
| Linux/macOS. | |
| Changes vs prior version: | |
| - Predict and avoid collisions even in dry-run using per-directory "reserved" names | |
| - Optional case-insensitive collision mode (default True on macOS) | |
| - Safer last-resort truncation edge path | |
| - Cleaner stats and distribution output | |
| """ | |
| import argparse | |
| import csv | |
| import hashlib | |
| import os | |
| import platform | |
| import sys | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Tuple | |
| # ---------- Helpers ---------- | |
| def utf8_len_bytes(s: str) -> int: | |
| return len(s.encode("utf-8")) | |
| def truncate_utf8_by_bytes(s: str, max_bytes: int) -> str: | |
| """Return s truncated to <= max_bytes in UTF-8 without splitting code points.""" | |
| b = s.encode("utf-8") | |
| if len(b) <= max_bytes: | |
| return s | |
| # Cut and decode ignoring partial trailing code point | |
| return b[:max_bytes].decode("utf-8", errors="ignore") | |
| def split_multi_ext(name: str, depth: int) -> Tuple[str, str]: | |
| """ | |
| Split name into (base, kept_ext) keeping up to 'depth' dotted extensions. | |
| depth=1: 'a.tar.gz' -> ('a.tar', '.gz') | |
| depth=2: 'a.tar.gz' -> ('a', '.tar.gz') | |
| """ | |
| if depth <= 0: | |
| return name, "" | |
| base = name | |
| parts = [] | |
| for _ in range(depth): | |
| base2, ext = os.path.splitext(base) | |
| if ext: | |
| parts.insert(0, ext) | |
| base = base2 | |
| else: | |
| break | |
| return base, "".join(parts) | |
| def sha1_short(text: str, n: int = 6) -> str: | |
| return hashlib.sha1(text.encode("utf-8")).hexdigest()[:n] | |
| # ---------- Planning ---------- | |
| def plan_new_name( | |
| old_name: str, | |
| max_bytes: int, | |
| ext_depth: int, | |
| suffix_mode: str = "hash", | |
| allow_truncate_ext: bool = False, | |
| ) -> str: | |
| """ | |
| Compute a new name <= max_bytes in UTF-8. | |
| - Keeps last 'ext_depth' extensions. | |
| - Adds "-<hash>" when truncation is needed (suffix_mode='hash'). | |
| - May truncate/drop extension if necessary and --trunc-ext enabled. | |
| """ | |
| if utf8_len_bytes(old_name) <= max_bytes: | |
| return old_name # nothing to change | |
| base, ext = split_multi_ext(old_name, ext_depth) | |
| suffix = "" if suffix_mode == "none" else f"-{sha1_short(old_name)}" | |
| # Budget for base | |
| budget = max_bytes - utf8_len_bytes(suffix) - utf8_len_bytes(ext) | |
| if budget <= 0: | |
| # Try truncating extension if allowed and extension is relatively large. | |
| if allow_truncate_ext and utf8_len_bytes(ext) >= max_bytes // 3: | |
| max_ext_bytes = max(4, max_bytes // 6) | |
| ext = truncate_utf8_by_bytes(ext, max_ext_bytes) | |
| budget = max_bytes - utf8_len_bytes(suffix) - utf8_len_bytes(ext) | |
| if budget <= 0: | |
| # Drop extension as last resort to fit at least suffix/base | |
| ext = "" | |
| budget = max_bytes - utf8_len_bytes(suffix) | |
| if budget <= 0: | |
| # Extreme case: keep as much as possible; prefer hashed id if any | |
| core = truncate_utf8_by_bytes(old_name, max_bytes) | |
| if utf8_len_bytes(core) <= max_bytes: | |
| return core | |
| # Fallback to just suffix if it helps at all | |
| return truncate_utf8_by_bytes(suffix or core, max_bytes) | |
| new_base = truncate_utf8_by_bytes(base, budget) | |
| candidate = f"{new_base}{suffix}{ext}" | |
| # Double-check and shave if multi-byte boundaries left us 1–2 bytes over | |
| cand_len = utf8_len_bytes(candidate) | |
| if cand_len > max_bytes: | |
| shave = cand_len - max_bytes | |
| # shave +1 to be safe against another multibyte boundary | |
| new_base = truncate_utf8_by_bytes(new_base, max(0, utf8_len_bytes(new_base) - shave - 1)) | |
| candidate = f"{new_base}{suffix}{ext}" | |
| return candidate | |
| # ---------- Collision handling ---------- | |
| def unique_name_in_dir( | |
| dirpath: str, | |
| desired: str, | |
| max_bytes: int, | |
| reserved: set, | |
| case_insensitive: bool, | |
| ) -> Tuple[str, bool]: | |
| """ | |
| Ensure 'desired' is unique in dirpath against both existing filesystem *and* 'reserved'. | |
| Returns (unique_name, was_collision_resolved). | |
| """ | |
| def norm(s: str) -> str: | |
| return s.lower() if case_insensitive else s | |
| base, ext = os.path.splitext(desired) | |
| i = 0 | |
| while True: | |
| candidate = desired if i == 0 else _candidate_with_suffix(base, ext, i, max_bytes) | |
| exists = os.path.exists(os.path.join(dirpath, candidate)) or norm(candidate) in reserved | |
| if not exists: | |
| reserved.add(norm(candidate)) | |
| return candidate, (i > 0) | |
| i += 1 | |
| if i > 9999: | |
| raise RuntimeError("Too many collisions while generating a unique name.") | |
| def _candidate_with_suffix(base: str, ext: str, i: int, max_bytes: int) -> str: | |
| suffix = f"~{i}" | |
| # Try to keep extension; if too tight, drop it. | |
| def blen(s: str) -> int: return utf8_len_bytes(s) | |
| budget = max_bytes - blen(ext) - blen(suffix) | |
| if budget < 1: | |
| ext = "" | |
| budget = max_bytes - blen(suffix) | |
| base_trunc = truncate_utf8_by_bytes(base, budget) | |
| return f"{base_trunc}{suffix}{ext}" | |
| # ---------- Stats ---------- | |
| @dataclass | |
| class Stats: | |
| scanned_files: int = 0 | |
| scanned_dirs: int = 0 | |
| overlimit_files: int = 0 | |
| overlimit_dirs: int = 0 | |
| planned_renames: int = 0 | |
| collisions_resolved: int = 0 | |
| renamed_ok: int = 0 | |
| skipped_exists: int = 0 | |
| skipped_errors: int = 0 | |
| unchanged: int = 0 | |
| longest_components: List[Tuple[int, str]] = field(default_factory=list) | |
| def track_component(self, path: str, name: str, top_k: int): | |
| blen = utf8_len_bytes(name) | |
| self.longest_components.append((blen, path)) | |
| self.longest_components.sort(key=lambda x: x[0], reverse=True) | |
| if len(self.longest_components) > top_k: | |
| self.longest_components.pop() | |
| # ---------- Walk ---------- | |
| def iter_entries(root: str, include_dirs: bool): | |
| # bottom-up so we can safely rename directories after their children | |
| for dirpath, dirnames, filenames in os.walk(root, topdown=False): | |
| if include_dirs: | |
| for d in dirnames: | |
| yield dirpath, d, True | |
| for f in filenames: | |
| yield dirpath, f, False | |
| # ---------- Main ---------- | |
| def main(): | |
| default_ci = (platform.system() == "Darwin") # macOS default FS is usually case-insensitive | |
| ap = argparse.ArgumentParser(description="Shorten file/dir names to <= max bytes (UTF-8) with dry-run & stats.") | |
| ap.add_argument("root", nargs="?", default=".", help="Root directory to scan.") | |
| ap.add_argument("--targets", choices=["files", "dirs", "both"], default="files", | |
| help="Operate on files, dirs, or both (default: files).") | |
| ap.add_argument("--include-dirs", action="store_true", | |
| help="Alias for --targets both (deprecated, kept for convenience).") | |
| ap.add_argument("--max-bytes", type=int, default=255, help="Max UTF-8 bytes per name (default: 255).") | |
| ap.add_argument("--ext-depth", type=int, default=1, help="Number of dotted extensions to preserve (default: 1).") | |
| ap.add_argument("--suffix-mode", choices=["hash", "none"], default="hash", | |
| help="Append short hash on truncation (default: hash).") | |
| ap.add_argument("--trunc-ext", action="store_true", | |
| help="Allow truncating overly long extensions (default: off).") | |
| ap.add_argument("--apply", action="store_true", help="Actually perform renames (default: dry-run).") | |
| ap.add_argument("--log-csv", type=str, default=None, help="Write actions to CSV.") | |
| ap.add_argument("--top", type=int, default=10, help="Show top-N longest components (default: 10).") | |
| ap.add_argument("--limit", type=int, default=0, help="Process at most N entries (testing).") | |
| ap.add_argument("--verbose", "-v", action="store_true", help="Verbose output.") | |
| ap.add_argument("--follow-symlinks", action="store_true", help="Process symlinks as well (default: skip).") | |
| ap.add_argument("--case-insensitive-collisions", action="store_true", default=default_ci, | |
| help=f"Treat names differing only by case as collisions (default: {default_ci}).") | |
| ap.add_argument("--case-sensitive-collisions", action="store_true", | |
| help="Force case-sensitive collision checks (overrides the above).") | |
| args = ap.parse_args() | |
| if args.include_dirs: | |
| args.targets = "both" | |
| if args.case_sensitive_collisions: | |
| args.case_insensitive_collisions = False | |
| root = os.path.abspath(args.root) | |
| if not os.path.exists(root): | |
| print(f"Root not found: {root}", file=sys.stderr) | |
| sys.exit(2) | |
| stats = Stats() | |
| # Distribution buckets (bytes) | |
| bucket_edges = [64, 128, 192, 224, 255] | |
| bucket_counts: Dict[str, int] = {f"(<= {b})": 0 for b in bucket_edges} | |
| bucket_counts[">(255)"] = 0 | |
| def bucket_of(n: int) -> str: | |
| for b in bucket_edges: | |
| if n <= b: | |
| return f"(<= {b})" | |
| return ">(255)" | |
| # CSV | |
| csv_writer = None | |
| csv_file = None | |
| if args.log_csv: | |
| csv_file = open(args.log_csv, "w", newline="", encoding="utf-8") | |
| csv_writer = csv.writer(csv_file) | |
| csv_writer.writerow(["action", "type", "dir", "old_name", "new_name", "old_bytes", "new_bytes", "note"]) | |
| # Reserved names per directory (normalized for collision mode) | |
| reserved_by_dir: Dict[str, set] = {} | |
| def get_reserved(dirpath: str) -> set: | |
| rs = reserved_by_dir.get(dirpath) | |
| if rs is None: | |
| rs = set() | |
| reserved_by_dir[dirpath] = rs | |
| return rs | |
| def should_process(is_dir: bool) -> bool: | |
| return (args.targets == "both") or (args.targets == "files" and not is_dir) or (args.targets == "dirs" and is_dir) | |
| processed = 0 | |
| for dirpath, name, is_dir in iter_entries(root, include_dirs=(args.targets in ("dirs", "both"))): | |
| if args.limit and processed >= args.limit: | |
| break | |
| processed += 1 | |
| full_path = os.path.join(dirpath, name) | |
| if not args.follow_symlinks and os.path.islink(full_path): | |
| continue | |
| if is_dir: | |
| stats.scanned_dirs += 1 | |
| else: | |
| stats.scanned_files += 1 | |
| stats.track_component(full_path, name, args.top) | |
| bucket_counts[bucket_of(utf8_len_bytes(name))] += 1 | |
| if not should_process(is_dir): | |
| stats.unchanged += 1 | |
| continue | |
| old_len = utf8_len_bytes(name) | |
| if old_len <= args.max_bytes: | |
| stats.unchanged += 1 | |
| continue | |
| if is_dir: | |
| stats.overlimit_dirs += 1 | |
| else: | |
| stats.overlimit_files += 1 | |
| # Plan | |
| desired = plan_new_name( | |
| old_name=name, | |
| max_bytes=args.max_bytes, | |
| ext_depth=0 if is_dir else args.ext_depth, | |
| suffix_mode=args.suffix_mode, | |
| allow_truncate_ext=args.trunc_ext, | |
| ) | |
| # Defensively skip if plan is a no-op (shouldn't happen for >max) | |
| if desired == name: | |
| stats.unchanged += 1 | |
| continue | |
| reserved = get_reserved(dirpath) | |
| try: | |
| desired_unique, collided = unique_name_in_dir( | |
| dirpath=dirpath, | |
| desired=desired, | |
| max_bytes=args.max_bytes, | |
| reserved=reserved, | |
| case_insensitive=args.case_insensitive_collisions, | |
| ) | |
| if collided: | |
| stats.collisions_resolved += 1 | |
| except Exception as e: | |
| stats.skipped_errors += 1 | |
| if args.verbose: | |
| print(f"[ERROR] {full_path}: collision-resolution-failed: {e}", file=sys.stderr) | |
| if csv_writer: | |
| csv_writer.writerow(["skip", "dir" if is_dir else "file", dirpath, name, "", old_len, "", "collision-failed"]) | |
| continue | |
| new_len = utf8_len_bytes(desired_unique) | |
| stats.planned_renames += 1 | |
| if args.verbose: | |
| act = "RENAME" if args.apply else "DRYRUN" | |
| print(f"[{act}] {full_path}\n -> {os.path.join(dirpath, desired_unique)} (bytes {old_len} -> {new_len})") | |
| if csv_writer: | |
| csv_writer.writerow([ | |
| "rename" if args.apply else "dryrun", | |
| "dir" if is_dir else "file", | |
| dirpath, name, desired_unique, old_len, new_len, | |
| "collision-resolved" if collided else "" | |
| ]) | |
| if args.apply: | |
| try: | |
| os.rename(full_path, os.path.join(dirpath, desired_unique)) | |
| stats.renamed_ok += 1 | |
| except FileExistsError: | |
| stats.skipped_exists += 1 | |
| if args.verbose: | |
| print(f"[SKIP] Exists: {os.path.join(dirpath, desired_unique)}") | |
| except Exception as e: | |
| stats.skipped_errors += 1 | |
| if args.verbose: | |
| print(f"[ERROR] {full_path}: {e}", file=sys.stderr) | |
| if csv_file: | |
| csv_file.close() | |
| # ---------- Summary ---------- | |
| total_scanned = stats.scanned_files + stats.scanned_dirs | |
| total_over = stats.overlimit_files + stats.overlimit_dirs | |
| print("\n========== SUMMARY ==========") | |
| print(f"Root: {root}") | |
| print(f"Mode: {'APPLY' if args.apply else 'DRY-RUN'}") | |
| print(f"Targets: {args.targets}") | |
| print(f"Max bytes: {args.max_bytes}") | |
| print(f"Ext depth: {args.ext_depth}") | |
| print(f"Suffix mode: {args.suffix_mode}") | |
| print(f"Truncate ext: {args.trunc_ext}") | |
| print(f"Follow symlinks: {args.follow_symlinks}") | |
| print(f"Case-insensitive: {args.case_insensitive_collisions}") | |
| if args.log_csv: | |
| print(f"CSV log: {args.log_csv}") | |
| print("\n-- Counters --") | |
| print(f"Scanned entries: {total_scanned} (files={stats.scanned_files}, dirs={stats.scanned_dirs})") | |
| print(f"Over limit: {total_over} (files={stats.overlimit_files}, dirs={stats.overlimit_dirs})") | |
| print(f"Planned renames: {stats.planned_renames}") | |
| if not args.apply: | |
| print(f"Would rename: {stats.planned_renames}") | |
| else: | |
| print(f"Renamed OK: {stats.renamed_ok}") | |
| print(f"Skipped exists: {stats.skipped_exists}") | |
| print(f"Errors: {stats.skipped_errors}") | |
| print(f"Collisions resolved: {stats.collisions_resolved}") | |
| print(f"Unchanged: {stats.unchanged}") | |
| print("\n-- Byte-length distribution (UTF-8) --") | |
| for b in bucket_edges: | |
| label = f"(<= {b})" | |
| print(f"{label:<10} {bucket_counts[label]}") | |
| print(f"{'(> 255)':<10} {bucket_counts['>(255)']}") | |
| print(f"\n-- Top {args.top} longest components --") | |
| for blen, path in stats.longest_components: | |
| print(f"{blen:>4} bytes {path}") | |
| print("\nDone.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment