Skip to content

Instantly share code, notes, and snippets.

@vega113
Created September 16, 2025 03:34
Show Gist options
  • Select an option

  • Save vega113/7f966aa77bf9d50bf483242bfd534c99 to your computer and use it in GitHub Desktop.

Select an option

Save vega113/7f966aa77bf9d50bf483242bfd534c99 to your computer and use it in GitHub Desktop.
Compress videos so each output file is <=200 MB, copying smaller files as-is.
#!/usr/bin/env python3
"""Compress videos so each output file is <=200 MB, copying smaller files as-is."""
from __future__ import annotations
import json
import math
import shutil
import subprocess
from pathlib import Path
VIDEO_EXTS = {
".mp4",
".mkv",
".mov",
".avi",
".wmv",
".flv",
".mpg",
".mpeg",
".m4v",
}
MAX_SIZE_BYTES = 200 * 1024 * 1024
INITIAL_TARGET_BYTES = 190 * 1024 * 1024
MIN_VIDEO_BITRATE = 120_000 # bits per second
DEFAULT_AUDIO_BITRATE = 96_000 # bits per second
MIN_AUDIO_BITRATE = 64_000
SCALE_THRESHOLD = 200_000
MAX_ATTEMPTS = 5
SOURCE_DIR = Path.cwd()
OUTPUT_DIR = SOURCE_DIR / "compressed"
PASSLOG_DIR = OUTPUT_DIR / ".2pass"
def run(cmd: list[str]) -> None:
print(" Running:", " ".join(cmd))
subprocess.run(cmd, check=True)
def probe_video_info(video: Path) -> tuple[float, int, int]:
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"format=duration:stream=width,height",
"-of",
"json",
str(video),
]
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
info = json.loads(result.stdout or "{}")
duration = float(info.get("format", {}).get("duration", 0.0))
streams = info.get("streams", [])
width = int(streams[0].get("width", 0)) if streams else 0
height = int(streams[0].get("height", 0)) if streams else 0
return duration, width, height
def cleanup_pass_logs(prefix: Path) -> None:
base = prefix.name
for path in prefix.parent.glob(f"{base}-*.log*"):
try:
path.unlink()
except FileNotFoundError:
pass
def compress(video: Path, dest: Path) -> None:
duration, width, height = probe_video_info(video)
if not math.isfinite(duration) or duration <= 0:
raise RuntimeError(f"Could not determine duration for {video}")
target_bytes = min(
INITIAL_TARGET_BYTES,
int(MAX_SIZE_BYTES * 0.95),
max(video.stat().st_size - 2048, int(MAX_SIZE_BYTES * 0.9)),
)
PASSLOG_DIR.mkdir(exist_ok=True)
passlog_prefix = PASSLOG_DIR / video.stem
for attempt in range(1, MAX_ATTEMPTS + 1):
total_bitrate = (target_bytes * 8) / duration
audio_bitrate = DEFAULT_AUDIO_BITRATE
if total_bitrate - audio_bitrate < MIN_VIDEO_BITRATE:
audio_bitrate = max(MIN_AUDIO_BITRATE, int(total_bitrate - MIN_VIDEO_BITRATE))
audio_bitrate = max(MIN_AUDIO_BITRATE, audio_bitrate)
video_bitrate = max(MIN_VIDEO_BITRATE, int(total_bitrate - audio_bitrate))
scale_filter = None
if video_bitrate < SCALE_THRESHOLD and max(width, height) > 720:
if width >= height:
scale_filter = "scale=-2:720"
else:
scale_filter = "scale=720:-2"
maxrate = max(int(video_bitrate * 4 / 3), video_bitrate + 10_000)
bufsize = max(int(video_bitrate * 5 / 2), video_bitrate * 2)
if dest.exists():
dest.unlink()
first_pass = [
"ffmpeg",
"-y",
"-i",
str(video),
"-c:v",
"libx264",
"-b:v",
str(video_bitrate),
"-maxrate",
str(maxrate),
"-bufsize",
str(bufsize),
"-preset",
"medium",
"-pass",
"1",
"-passlogfile",
str(passlog_prefix),
]
if scale_filter:
first_pass += ["-vf", scale_filter]
first_pass += ["-an", "-f", "mp4", "/dev/null"]
second_pass = [
"ffmpeg",
"-y",
"-i",
str(video),
"-c:v",
"libx264",
"-b:v",
str(video_bitrate),
"-maxrate",
str(maxrate),
"-bufsize",
str(bufsize),
"-preset",
"medium",
"-pass",
"2",
"-passlogfile",
str(passlog_prefix),
"-c:a",
"aac",
"-b:a",
str(audio_bitrate),
"-movflags",
"+faststart",
]
if scale_filter:
second_pass += ["-vf", scale_filter]
second_pass.append(str(dest))
try:
run(first_pass)
run(second_pass)
finally:
cleanup_pass_logs(passlog_prefix)
if dest.exists():
out_size = dest.stat().st_size
if out_size <= MAX_SIZE_BYTES:
print(f"✓ {dest.name} compressed to {out_size / 1024 / 1024:.2f} MB")
return
print(
f"Attempt {attempt} for {video.name} produced {out_size / 1024 / 1024:.2f} MB; reducing target size"
)
else:
print(f"Attempt {attempt} for {video.name} failed; retrying")
target_bytes = int(target_bytes * 0.8)
raise RuntimeError(f"Unable to compress {video} below {MAX_SIZE_BYTES / 1024 / 1024:.0f} MB")
def main() -> None:
OUTPUT_DIR.mkdir(exist_ok=True)
videos = [p for p in sorted(SOURCE_DIR.iterdir()) if p.is_file() and p.suffix.lower() in VIDEO_EXTS]
if not videos:
print("No video files found.")
return
for video in videos:
dest = OUTPUT_DIR / video.name
size_mb = video.stat().st_size / 1024 / 1024
if video.stat().st_size <= MAX_SIZE_BYTES:
print(f"Copying {video.name} ({size_mb:.2f} MB) without compression")
shutil.copy2(video, dest)
continue
print(f"Compressing {video.name} ({size_mb:.2f} MB)")
compress(video, dest)
print("Done. Outputs are in", OUTPUT_DIR)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment