Skip to content

Instantly share code, notes, and snippets.

@twobob
Created March 14, 2026 14:05
Show Gist options
  • Select an option

  • Save twobob/9c0446b50d8060c3d65b5268da67de8f to your computer and use it in GitHub Desktop.

Select an option

Save twobob/9c0446b50d8060c3d65b5268da67de8f to your computer and use it in GitHub Desktop.
reduce mp4s to certain sizes with ffmpeg
import argparse
import json
import math
import os
import subprocess
import sys
import uuid
from pathlib import Path
def run_command(cmd):
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if result.returncode != 0:
rendered = " ".join(f'"{c}"' if " " in str(c) else str(c) for c in cmd)
raise RuntimeError(f"Command failed:\n{rendered}\n\n{result.stderr}")
return result
def ffprobe_json(input_file):
cmd = [
"ffprobe",
"-v", "error",
"-print_format", "json",
"-show_format",
"-show_streams",
str(input_file)
]
result = run_command(cmd)
try:
return json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError("ffprobe returned invalid JSON") from exc
def get_media_info(input_file):
data = ffprobe_json(input_file)
fmt = data.get("format", {})
streams = data.get("streams", [])
duration = float(fmt.get("duration", 0.0))
if duration <= 0:
raise RuntimeError("Could not determine valid media duration")
video_stream = None
audio_stream = None
for stream in streams:
if stream.get("codec_type") == "video" and video_stream is None:
video_stream = stream
elif stream.get("codec_type") == "audio" and audio_stream is None:
audio_stream = stream
if video_stream is None:
raise RuntimeError("No video stream found")
width = int(video_stream.get("width", 0))
height = int(video_stream.get("height", 0))
if width <= 0 or height <= 0:
raise RuntimeError("Could not determine source dimensions")
src_video_bitrate = int(video_stream.get("bit_rate", 0) or 0)
src_audio_bitrate = int(audio_stream.get("bit_rate", 0) or 0) if audio_stream else 0
src_total_bitrate = int(fmt.get("bit_rate", 0) or 0)
return {
"duration": duration,
"width": width,
"height": height,
"src_video_bitrate": src_video_bitrate,
"src_audio_bitrate": src_audio_bitrate,
"src_total_bitrate": src_total_bitrate,
}
def probe_output(path):
cmd = [
"ffprobe",
"-v", "error",
"-print_format", "json",
"-show_format",
"-show_streams",
str(path)
]
result = run_command(cmd)
data = json.loads(result.stdout)
fmt = data.get("format", {})
streams = data.get("streams", [])
video_bitrate = 0
audio_bitrate = 0
width = 0
height = 0
for stream in streams:
if stream.get("codec_type") == "video":
video_bitrate = int(stream.get("bit_rate", 0) or 0)
width = int(stream.get("width", 0) or 0)
height = int(stream.get("height", 0) or 0)
elif stream.get("codec_type") == "audio":
audio_bitrate = int(stream.get("bit_rate", 0) or 0)
return {
"duration": float(fmt.get("duration", 0.0) or 0.0),
"size_bytes": int(fmt.get("size", 0) or 0),
"total_bitrate": int(fmt.get("bit_rate", 0) or 0),
"video_bitrate": video_bitrate,
"audio_bitrate": audio_bitrate,
"width": width,
"height": height,
}
def cleanup_pass_logs(prefix):
for suffix in ("-0.log", "-0.log.mbtree"):
p = Path(f"{prefix}{suffix}")
if p.exists():
p.unlink()
def even_floor(value):
value = int(math.floor(value))
if value < 2:
return 2
return value if value % 2 == 0 else value - 1
def even_ceil(value):
value = int(math.ceil(value))
if value < 2:
return 2
return value if value % 2 == 0 else value + 1
def compute_target_video_bitrate_kbps(target_size_mb, duration_s, audio_bitrate_kbps):
target_bits = target_size_mb * 1024 * 1024 * 8
total_kbps = target_bits / duration_s / 1000.0
video_kbps = int(total_kbps - audio_bitrate_kbps)
return max(video_kbps, 50)
def build_vf(width, crop_bottom):
if crop_bottom > 0:
return f"crop=in_w:in_h-{crop_bottom},scale={width}:-2"
return f"scale={width}:-2"
def get_null_sink():
return "NUL" if os.name == "nt" else "/dev/null"
def encode_two_pass(
input_file,
output_file,
width,
crop_bottom,
video_bitrate_kbps,
audio_bitrate_kbps,
preset,
passlog_prefix
):
vf = build_vf(width, crop_bottom)
null_sink = get_null_sink()
cleanup_pass_logs(passlog_prefix)
first_pass = [
"ffmpeg",
"-y",
"-i", str(input_file),
"-vf", vf,
"-c:v", "libx264",
"-preset", preset,
"-b:v", f"{video_bitrate_kbps}k",
"-pass", "1",
"-passlogfile", passlog_prefix,
"-an",
"-f", "null",
null_sink
]
run_command(first_pass)
second_pass = [
"ffmpeg",
"-y",
"-i", str(input_file),
"-vf", vf,
"-c:v", "libx264",
"-preset", preset,
"-b:v", f"{video_bitrate_kbps}k",
"-pass", "2",
"-passlogfile", passlog_prefix,
"-c:a", "aac",
"-b:a", f"{audio_bitrate_kbps}k",
"-movflags", "+faststart",
str(output_file)
]
run_command(second_pass)
if not output_file.exists():
raise RuntimeError("Output file was not created")
size = output_file.stat().st_size
if size <= 0:
raise RuntimeError("Output file is zero bytes")
return probe_output(output_file)
def compress_video(
input_file,
output_file,
target_size_mb=5.0,
audio_bitrate=96,
crop_bottom=90,
min_width=160,
tolerance_pct=5.0,
preset="medium",
max_attempts=12,
max_video_bitrate_kbps=None,
):
info = get_media_info(input_file)
duration = info["duration"]
src_width = info["width"]
src_height = info["height"]
src_video_bitrate = info["src_video_bitrate"]
if crop_bottom < 0:
raise ValueError("crop_bottom must be >= 0")
if crop_bottom >= src_height:
raise ValueError("crop_bottom must be less than source height")
if target_size_mb <= 0:
raise ValueError("target_size_mb must be > 0")
if audio_bitrate < 0:
raise ValueError("audio_bitrate must be >= 0")
if tolerance_pct <= 0:
raise ValueError("tolerance_pct must be > 0")
if min_width < 2:
raise ValueError("min_width must be >= 2")
if max_attempts <= 0:
raise ValueError("max_attempts must be > 0")
src_width = even_floor(src_width)
min_width = even_floor(min(min_width, src_width))
target_video_bitrate_kbps = compute_target_video_bitrate_kbps(
target_size_mb=target_size_mb,
duration_s=duration,
audio_bitrate_kbps=audio_bitrate
)
if src_video_bitrate > 0:
source_video_kbps = max(50, int(src_video_bitrate / 1000))
target_video_bitrate_kbps = min(target_video_bitrate_kbps, source_video_kbps)
if max_video_bitrate_kbps is not None:
target_video_bitrate_kbps = min(target_video_bitrate_kbps, max_video_bitrate_kbps)
target_video_bitrate_kbps = max(target_video_bitrate_kbps, 50)
print(
f"Duration={duration:.2f}s | "
f"Source={src_width}x{src_height} | "
f"Target={target_size_mb:.2f}MB | "
f"Target video bitrate={target_video_bitrate_kbps}k | "
f"Audio bitrate={audio_bitrate}k"
)
low = min_width
high = src_width
best_fit = None
best_under = None
attempt = 0
while low <= high and attempt < max_attempts:
attempt += 1
width = even_floor((low + high) // 2)
passlog_prefix = f"ffmpeg2pass_{uuid.uuid4().hex}"
try:
result = encode_two_pass(
input_file=input_file,
output_file=output_file,
width=width,
crop_bottom=crop_bottom,
video_bitrate_kbps=target_video_bitrate_kbps,
audio_bitrate_kbps=audio_bitrate,
preset=preset,
passlog_prefix=passlog_prefix
)
finally:
cleanup_pass_logs(passlog_prefix)
size_mb = result["size_bytes"] / (1024 * 1024)
print(
f"Attempt {attempt}: "
f"Size={size_mb:.2f}MB | "
f"Width={result['width']} | "
f"Video={result['video_bitrate']} bps | "
f"Audio={result['audio_bitrate']} bps | "
f"Total={result['total_bitrate']} bps"
)
lower_bound = target_size_mb * (1.0 - tolerance_pct / 100.0)
upper_bound = target_size_mb * (1.0 + tolerance_pct / 100.0)
if size_mb <= upper_bound:
if best_under is None or size_mb > best_under["size_mb"]:
best_under = {
"size_mb": size_mb,
"width": result["width"],
"height": result["height"],
"video_bitrate": result["video_bitrate"],
"audio_bitrate": result["audio_bitrate"],
"total_bitrate": result["total_bitrate"],
}
if lower_bound <= size_mb <= upper_bound:
best_fit = {
"size_mb": size_mb,
"width": result["width"],
"height": result["height"],
"video_bitrate": result["video_bitrate"],
"audio_bitrate": result["audio_bitrate"],
"total_bitrate": result["total_bitrate"],
}
break
if size_mb > upper_bound:
high = width - 2
else:
low = width + 2
if best_fit is not None:
print(
f"Final size achieved: {best_fit['size_mb']:.2f}MB | "
f"{best_fit['width']}x{best_fit['height']}"
)
return
if best_under is not None:
print(
f"Closest under target: {best_under['size_mb']:.2f}MB | "
f"{best_under['width']}x{best_under['height']} | "
f"Video={best_under['video_bitrate']} bps"
)
return
raise RuntimeError("Could not produce an output under the requested size")
def main():
parser = argparse.ArgumentParser(
description="Compress video to target size by keeping bitrate sane and searching width from source downward.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("input_file", type=Path, help="Input video file")
parser.add_argument("output_file", type=Path, help="Output video file")
parser.add_argument("-s", "--target-size", type=float, default=5.0, help="Target output size in MB")
parser.add_argument("-a", "--audio-bitrate", type=int, default=96, help="AAC audio bitrate in kbps")
parser.add_argument("-c", "--crop-bottom", type=int, default=90, help="Pixels to crop from bottom")
parser.add_argument("--min-width", type=int, default=160, help="Minimum search width")
parser.add_argument("--tolerance-pct", type=float, default=5.0, help="Allowed size tolerance percentage")
parser.add_argument(
"--preset",
choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"],
default="medium",
help="x264 preset"
)
parser.add_argument("-m", "--max-attempts", type=int, default=12, help="Maximum width-search attempts")
parser.add_argument("--max-video-bitrate", type=int, default=None, help="Optional cap for video bitrate in kbps")
parser.add_argument("-t", "--max-duration", type=int, default=None, help="Optional truncate duration in seconds")
args = parser.parse_args()
if not args.input_file.exists():
parser.error(f"Input file does not exist: {args.input_file}")
args.output_file.parent.mkdir(parents=True, exist_ok=True)
temp_input = None
try:
input_for_compression = args.input_file
if args.max_duration is not None:
temp_input = args.output_file.parent / f"truncated_input_{uuid.uuid4().hex}.mp4"
truncate_cmd = [
"ffmpeg",
"-y",
"-i", str(args.input_file),
"-t", str(args.max_duration),
"-c", "copy",
str(temp_input)
]
run_command(truncate_cmd)
if not temp_input.exists() or temp_input.stat().st_size <= 0:
parser.error("Failed to create truncated input file")
input_for_compression = temp_input
compress_video(
input_file=input_for_compression,
output_file=args.output_file,
target_size_mb=args.target_size,
audio_bitrate=args.audio_bitrate,
crop_bottom=args.crop_bottom,
min_width=args.min_width,
tolerance_pct=args.tolerance_pct,
preset=args.preset,
max_attempts=args.max_attempts,
max_video_bitrate_kbps=args.max_video_bitrate
)
except Exception as exc:
parser.error(str(exc))
finally:
if temp_input is not None and temp_input.exists():
temp_input.unlink()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment