Created
March 14, 2026 14:05
-
-
Save twobob/9c0446b50d8060c3d65b5268da67de8f to your computer and use it in GitHub Desktop.
reduce mp4s to certain sizes with ffmpeg
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import json | |
| import math | |
| import os | |
| import subprocess | |
| import sys | |
| import uuid | |
| from pathlib import Path | |
| def run_command(cmd): | |
| result = subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| if result.returncode != 0: | |
| rendered = " ".join(f'"{c}"' if " " in str(c) else str(c) for c in cmd) | |
| raise RuntimeError(f"Command failed:\n{rendered}\n\n{result.stderr}") | |
| return result | |
| def ffprobe_json(input_file): | |
| cmd = [ | |
| "ffprobe", | |
| "-v", "error", | |
| "-print_format", "json", | |
| "-show_format", | |
| "-show_streams", | |
| str(input_file) | |
| ] | |
| result = run_command(cmd) | |
| try: | |
| return json.loads(result.stdout) | |
| except json.JSONDecodeError as exc: | |
| raise RuntimeError("ffprobe returned invalid JSON") from exc | |
| def get_media_info(input_file): | |
| data = ffprobe_json(input_file) | |
| fmt = data.get("format", {}) | |
| streams = data.get("streams", []) | |
| duration = float(fmt.get("duration", 0.0)) | |
| if duration <= 0: | |
| raise RuntimeError("Could not determine valid media duration") | |
| video_stream = None | |
| audio_stream = None | |
| for stream in streams: | |
| if stream.get("codec_type") == "video" and video_stream is None: | |
| video_stream = stream | |
| elif stream.get("codec_type") == "audio" and audio_stream is None: | |
| audio_stream = stream | |
| if video_stream is None: | |
| raise RuntimeError("No video stream found") | |
| width = int(video_stream.get("width", 0)) | |
| height = int(video_stream.get("height", 0)) | |
| if width <= 0 or height <= 0: | |
| raise RuntimeError("Could not determine source dimensions") | |
| src_video_bitrate = int(video_stream.get("bit_rate", 0) or 0) | |
| src_audio_bitrate = int(audio_stream.get("bit_rate", 0) or 0) if audio_stream else 0 | |
| src_total_bitrate = int(fmt.get("bit_rate", 0) or 0) | |
| return { | |
| "duration": duration, | |
| "width": width, | |
| "height": height, | |
| "src_video_bitrate": src_video_bitrate, | |
| "src_audio_bitrate": src_audio_bitrate, | |
| "src_total_bitrate": src_total_bitrate, | |
| } | |
| def probe_output(path): | |
| cmd = [ | |
| "ffprobe", | |
| "-v", "error", | |
| "-print_format", "json", | |
| "-show_format", | |
| "-show_streams", | |
| str(path) | |
| ] | |
| result = run_command(cmd) | |
| data = json.loads(result.stdout) | |
| fmt = data.get("format", {}) | |
| streams = data.get("streams", []) | |
| video_bitrate = 0 | |
| audio_bitrate = 0 | |
| width = 0 | |
| height = 0 | |
| for stream in streams: | |
| if stream.get("codec_type") == "video": | |
| video_bitrate = int(stream.get("bit_rate", 0) or 0) | |
| width = int(stream.get("width", 0) or 0) | |
| height = int(stream.get("height", 0) or 0) | |
| elif stream.get("codec_type") == "audio": | |
| audio_bitrate = int(stream.get("bit_rate", 0) or 0) | |
| return { | |
| "duration": float(fmt.get("duration", 0.0) or 0.0), | |
| "size_bytes": int(fmt.get("size", 0) or 0), | |
| "total_bitrate": int(fmt.get("bit_rate", 0) or 0), | |
| "video_bitrate": video_bitrate, | |
| "audio_bitrate": audio_bitrate, | |
| "width": width, | |
| "height": height, | |
| } | |
| def cleanup_pass_logs(prefix): | |
| for suffix in ("-0.log", "-0.log.mbtree"): | |
| p = Path(f"{prefix}{suffix}") | |
| if p.exists(): | |
| p.unlink() | |
| def even_floor(value): | |
| value = int(math.floor(value)) | |
| if value < 2: | |
| return 2 | |
| return value if value % 2 == 0 else value - 1 | |
| def even_ceil(value): | |
| value = int(math.ceil(value)) | |
| if value < 2: | |
| return 2 | |
| return value if value % 2 == 0 else value + 1 | |
| def compute_target_video_bitrate_kbps(target_size_mb, duration_s, audio_bitrate_kbps): | |
| target_bits = target_size_mb * 1024 * 1024 * 8 | |
| total_kbps = target_bits / duration_s / 1000.0 | |
| video_kbps = int(total_kbps - audio_bitrate_kbps) | |
| return max(video_kbps, 50) | |
| def build_vf(width, crop_bottom): | |
| if crop_bottom > 0: | |
| return f"crop=in_w:in_h-{crop_bottom},scale={width}:-2" | |
| return f"scale={width}:-2" | |
| def get_null_sink(): | |
| return "NUL" if os.name == "nt" else "/dev/null" | |
| def encode_two_pass( | |
| input_file, | |
| output_file, | |
| width, | |
| crop_bottom, | |
| video_bitrate_kbps, | |
| audio_bitrate_kbps, | |
| preset, | |
| passlog_prefix | |
| ): | |
| vf = build_vf(width, crop_bottom) | |
| null_sink = get_null_sink() | |
| cleanup_pass_logs(passlog_prefix) | |
| first_pass = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", str(input_file), | |
| "-vf", vf, | |
| "-c:v", "libx264", | |
| "-preset", preset, | |
| "-b:v", f"{video_bitrate_kbps}k", | |
| "-pass", "1", | |
| "-passlogfile", passlog_prefix, | |
| "-an", | |
| "-f", "null", | |
| null_sink | |
| ] | |
| run_command(first_pass) | |
| second_pass = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", str(input_file), | |
| "-vf", vf, | |
| "-c:v", "libx264", | |
| "-preset", preset, | |
| "-b:v", f"{video_bitrate_kbps}k", | |
| "-pass", "2", | |
| "-passlogfile", passlog_prefix, | |
| "-c:a", "aac", | |
| "-b:a", f"{audio_bitrate_kbps}k", | |
| "-movflags", "+faststart", | |
| str(output_file) | |
| ] | |
| run_command(second_pass) | |
| if not output_file.exists(): | |
| raise RuntimeError("Output file was not created") | |
| size = output_file.stat().st_size | |
| if size <= 0: | |
| raise RuntimeError("Output file is zero bytes") | |
| return probe_output(output_file) | |
| def compress_video( | |
| input_file, | |
| output_file, | |
| target_size_mb=5.0, | |
| audio_bitrate=96, | |
| crop_bottom=90, | |
| min_width=160, | |
| tolerance_pct=5.0, | |
| preset="medium", | |
| max_attempts=12, | |
| max_video_bitrate_kbps=None, | |
| ): | |
| info = get_media_info(input_file) | |
| duration = info["duration"] | |
| src_width = info["width"] | |
| src_height = info["height"] | |
| src_video_bitrate = info["src_video_bitrate"] | |
| if crop_bottom < 0: | |
| raise ValueError("crop_bottom must be >= 0") | |
| if crop_bottom >= src_height: | |
| raise ValueError("crop_bottom must be less than source height") | |
| if target_size_mb <= 0: | |
| raise ValueError("target_size_mb must be > 0") | |
| if audio_bitrate < 0: | |
| raise ValueError("audio_bitrate must be >= 0") | |
| if tolerance_pct <= 0: | |
| raise ValueError("tolerance_pct must be > 0") | |
| if min_width < 2: | |
| raise ValueError("min_width must be >= 2") | |
| if max_attempts <= 0: | |
| raise ValueError("max_attempts must be > 0") | |
| src_width = even_floor(src_width) | |
| min_width = even_floor(min(min_width, src_width)) | |
| target_video_bitrate_kbps = compute_target_video_bitrate_kbps( | |
| target_size_mb=target_size_mb, | |
| duration_s=duration, | |
| audio_bitrate_kbps=audio_bitrate | |
| ) | |
| if src_video_bitrate > 0: | |
| source_video_kbps = max(50, int(src_video_bitrate / 1000)) | |
| target_video_bitrate_kbps = min(target_video_bitrate_kbps, source_video_kbps) | |
| if max_video_bitrate_kbps is not None: | |
| target_video_bitrate_kbps = min(target_video_bitrate_kbps, max_video_bitrate_kbps) | |
| target_video_bitrate_kbps = max(target_video_bitrate_kbps, 50) | |
| print( | |
| f"Duration={duration:.2f}s | " | |
| f"Source={src_width}x{src_height} | " | |
| f"Target={target_size_mb:.2f}MB | " | |
| f"Target video bitrate={target_video_bitrate_kbps}k | " | |
| f"Audio bitrate={audio_bitrate}k" | |
| ) | |
| low = min_width | |
| high = src_width | |
| best_fit = None | |
| best_under = None | |
| attempt = 0 | |
| while low <= high and attempt < max_attempts: | |
| attempt += 1 | |
| width = even_floor((low + high) // 2) | |
| passlog_prefix = f"ffmpeg2pass_{uuid.uuid4().hex}" | |
| try: | |
| result = encode_two_pass( | |
| input_file=input_file, | |
| output_file=output_file, | |
| width=width, | |
| crop_bottom=crop_bottom, | |
| video_bitrate_kbps=target_video_bitrate_kbps, | |
| audio_bitrate_kbps=audio_bitrate, | |
| preset=preset, | |
| passlog_prefix=passlog_prefix | |
| ) | |
| finally: | |
| cleanup_pass_logs(passlog_prefix) | |
| size_mb = result["size_bytes"] / (1024 * 1024) | |
| print( | |
| f"Attempt {attempt}: " | |
| f"Size={size_mb:.2f}MB | " | |
| f"Width={result['width']} | " | |
| f"Video={result['video_bitrate']} bps | " | |
| f"Audio={result['audio_bitrate']} bps | " | |
| f"Total={result['total_bitrate']} bps" | |
| ) | |
| lower_bound = target_size_mb * (1.0 - tolerance_pct / 100.0) | |
| upper_bound = target_size_mb * (1.0 + tolerance_pct / 100.0) | |
| if size_mb <= upper_bound: | |
| if best_under is None or size_mb > best_under["size_mb"]: | |
| best_under = { | |
| "size_mb": size_mb, | |
| "width": result["width"], | |
| "height": result["height"], | |
| "video_bitrate": result["video_bitrate"], | |
| "audio_bitrate": result["audio_bitrate"], | |
| "total_bitrate": result["total_bitrate"], | |
| } | |
| if lower_bound <= size_mb <= upper_bound: | |
| best_fit = { | |
| "size_mb": size_mb, | |
| "width": result["width"], | |
| "height": result["height"], | |
| "video_bitrate": result["video_bitrate"], | |
| "audio_bitrate": result["audio_bitrate"], | |
| "total_bitrate": result["total_bitrate"], | |
| } | |
| break | |
| if size_mb > upper_bound: | |
| high = width - 2 | |
| else: | |
| low = width + 2 | |
| if best_fit is not None: | |
| print( | |
| f"Final size achieved: {best_fit['size_mb']:.2f}MB | " | |
| f"{best_fit['width']}x{best_fit['height']}" | |
| ) | |
| return | |
| if best_under is not None: | |
| print( | |
| f"Closest under target: {best_under['size_mb']:.2f}MB | " | |
| f"{best_under['width']}x{best_under['height']} | " | |
| f"Video={best_under['video_bitrate']} bps" | |
| ) | |
| return | |
| raise RuntimeError("Could not produce an output under the requested size") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Compress video to target size by keeping bitrate sane and searching width from source downward.", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
| ) | |
| parser.add_argument("input_file", type=Path, help="Input video file") | |
| parser.add_argument("output_file", type=Path, help="Output video file") | |
| parser.add_argument("-s", "--target-size", type=float, default=5.0, help="Target output size in MB") | |
| parser.add_argument("-a", "--audio-bitrate", type=int, default=96, help="AAC audio bitrate in kbps") | |
| parser.add_argument("-c", "--crop-bottom", type=int, default=90, help="Pixels to crop from bottom") | |
| parser.add_argument("--min-width", type=int, default=160, help="Minimum search width") | |
| parser.add_argument("--tolerance-pct", type=float, default=5.0, help="Allowed size tolerance percentage") | |
| parser.add_argument( | |
| "--preset", | |
| choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"], | |
| default="medium", | |
| help="x264 preset" | |
| ) | |
| parser.add_argument("-m", "--max-attempts", type=int, default=12, help="Maximum width-search attempts") | |
| parser.add_argument("--max-video-bitrate", type=int, default=None, help="Optional cap for video bitrate in kbps") | |
| parser.add_argument("-t", "--max-duration", type=int, default=None, help="Optional truncate duration in seconds") | |
| args = parser.parse_args() | |
| if not args.input_file.exists(): | |
| parser.error(f"Input file does not exist: {args.input_file}") | |
| args.output_file.parent.mkdir(parents=True, exist_ok=True) | |
| temp_input = None | |
| try: | |
| input_for_compression = args.input_file | |
| if args.max_duration is not None: | |
| temp_input = args.output_file.parent / f"truncated_input_{uuid.uuid4().hex}.mp4" | |
| truncate_cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", str(args.input_file), | |
| "-t", str(args.max_duration), | |
| "-c", "copy", | |
| str(temp_input) | |
| ] | |
| run_command(truncate_cmd) | |
| if not temp_input.exists() or temp_input.stat().st_size <= 0: | |
| parser.error("Failed to create truncated input file") | |
| input_for_compression = temp_input | |
| compress_video( | |
| input_file=input_for_compression, | |
| output_file=args.output_file, | |
| target_size_mb=args.target_size, | |
| audio_bitrate=args.audio_bitrate, | |
| crop_bottom=args.crop_bottom, | |
| min_width=args.min_width, | |
| tolerance_pct=args.tolerance_pct, | |
| preset=args.preset, | |
| max_attempts=args.max_attempts, | |
| max_video_bitrate_kbps=args.max_video_bitrate | |
| ) | |
| except Exception as exc: | |
| parser.error(str(exc)) | |
| finally: | |
| if temp_input is not None and temp_input.exists(): | |
| temp_input.unlink() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment