Last active
August 18, 2025 10:22
-
-
Save huntfx/cc0891773fda1651b6cd6d45c8e27670 to your computer and use it in GitHub Desktop.
Convert all images and videos to a lower quality/resolution to save storage space on Ente.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3.12 | |
| """Convert all images and videos to a lower quality/resolution to save storage space. | |
| EXIF data is kept intact. | |
| Warning: Do not delete your original photos! The output from this is low quality. | |
| Setup: | |
| 1. If using Windows, install `pywin32` to Python 3.12. | |
| 2. Copy `ente-convert.py` to somewhere on the system path. | |
| 3. Create `ente-convert.bat` with the following text: | |
| @echo off | |
| py -3.12 "%~dp0%~n0.py" %* | |
| Usage: | |
| 1. Load `cmd` | |
| 2. `cd path/to/images` | |
| 3. `ente-convert path/to/output` (output path defaults to `/.ente-convert` if not set) | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import platform | |
| import subprocess | |
| import struct | |
| import shutil | |
| from concurrent.futures import ProcessPoolExecutor | |
| from functools import wraps | |
| from pathlib import Path | |
| import piexif | |
| import pillow_heif | |
| from PIL import Image, ImageOps | |
| IMAGE_FORMATS = ('.heic', '.jpg', '.jpeg', '.png', '.tiff', '.bmp', '.gif') | |
| IMAGE_QUALITY = 85 # 0-100, higher is better | |
| IMAGE_MAX_SIZE = 2048 | |
| VIDEO_FORMATS = ('.mp4', '.mov', '.avi', '.mkv', '.flv') | |
| VIDEO_QUALITY = 28 # Lower is better | |
| VIDEO_SIZE_MAX = 1280 | |
| OUTPUT_FOLDER = '.ente-convert' | |
| PXD = piexif.ExifIFD.PixelXDimension | |
| PYD = piexif.ExifIFD.PixelYDimension | |
| IW = piexif.ImageIFD.ImageWidth | |
| IH = piexif.ImageIFD.ImageLength | |
| Image.MAX_IMAGE_PIXELS = 1024 * 1024 * 1024 * 8 | |
| def _copy_times(input_path: Path, output_path: Path) -> None: | |
| # Copy creation time on Windows | |
| if platform.system() == 'Windows': | |
| import pywintypes | |
| import win32file | |
| import win32con | |
| handle = win32file.CreateFile( | |
| str(output_path), | |
| win32con.GENERIC_WRITE, | |
| 0, | |
| None, | |
| win32con.OPEN_EXISTING, | |
| win32con.FILE_ATTRIBUTE_NORMAL, | |
| None | |
| ) | |
| try: | |
| ctime = pywintypes.Time(input_path.stat().st_ctime) | |
| atime = pywintypes.Time(input_path.stat().st_atime) | |
| mtime = pywintypes.Time(input_path.stat().st_mtime) | |
| win32file.SetFileTime(handle, ctime, atime, mtime) | |
| finally: | |
| handle.close() | |
| # Copy mode, mtime, atime | |
| else: | |
| shutil.copystat(input_path, output_path) | |
| def _compare_modified_times(input_path: Path, output_path: Path) -> bool: | |
| if not output_path.exists(): | |
| return False | |
| s1 = input_path.stat() | |
| s2 = output_path.stat() | |
| return max(abs(s1.st_ctime - s2.st_ctime), abs(s1.st_mtime - s2.st_mtime)) < 1 | |
| def copy_handler(fn): | |
| @wraps(fn) | |
| def wrapper(input_path: Path, output_path: Path): | |
| print(f"Processing {input_path} -> {output_path}") | |
| if _compare_modified_times(input_path, output_path): | |
| print(f'{output_path} already processed.') | |
| else: | |
| fn(input_path, output_path) | |
| _copy_times(input_path, output_path) | |
| print(f'Processed: {input_path} -> {output_path}') | |
| return wrapper | |
| def _get_video_info(path: Path) -> tuple[int, int, float, dict[str, str]]: | |
| if not path.exists() or path.stat().st_size == 0: | |
| return 0, 0, 0, {} | |
| probe_command = [ | |
| 'ffprobe', | |
| '-v', 'error', | |
| '-show_entries', 'stream=width,height,duration,tags', | |
| '-show_streams', | |
| '-print_format', 'json', | |
| str(path), | |
| ] | |
| result = subprocess.run(probe_command, capture_output=True, text=True) | |
| if not result.returncode: | |
| data = json.loads(result.stdout) | |
| for stream in data['streams']: | |
| if stream['codec_type'] == 'video': | |
| return stream['width'], stream['height'], float(stream['duration']), stream['tags'] | |
| return 0, 0, 0, {} | |
| @copy_handler | |
| def convert_video(input_path: Path, output_path: Path) -> None: | |
| # Read the video data | |
| input_data = _get_video_info(input_path) | |
| output_data = _get_video_info(output_path) | |
| # Skip if already converted | |
| if not all(input_data[:2]) or input_data[:2] != output_data[:2] or abs(input_data[2] - output_data[2]) > 0.5: | |
| # Get the width and height | |
| width, height, _, tags = input_data | |
| if tags.get('rotate') in ('90', '-90', '270', '-270'): | |
| width, height = height, width | |
| if 0 in (width, height): | |
| raise RuntimeError('failed to read width and height from stream') | |
| # Calculate new resolution preserving the aspect ratio | |
| if width > height: | |
| new_width = min(width, VIDEO_SIZE_MAX) | |
| new_height = int(new_width * height / width) | |
| else: | |
| new_height = min(height, VIDEO_SIZE_MAX) | |
| new_width = int(new_height * width / height) | |
| # Ensure width and height are divisible by 2 for ffmpeg | |
| new_height += new_height % 2 | |
| new_width += new_width % 2 | |
| # Command to compress and resize video | |
| command = [ | |
| 'ffmpeg', | |
| '-i', input_path, | |
| '-y', # Overwrite | |
| '-vf', f'scale={new_width}:{new_height}', | |
| '-c:v', 'libx264', | |
| '-crf', str(VIDEO_QUALITY), # Compression quality (lower is better) | |
| '-preset', 'fast', | |
| str(output_path), | |
| ] | |
| # Save the file | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| proc = subprocess.run(command) | |
| if proc.returncode and not output_path.exists(): | |
| proc.check_returncode() | |
| def convert_videos(input_folder: Path, output_folder: Path) -> None: | |
| try: | |
| video_files = [f.name for f in input_folder.iterdir() if os.path.splitext(f.name)[1].lower() in VIDEO_FORMATS] | |
| except PermissionError: | |
| return | |
| for filename in video_files: | |
| convert_video(input_folder / filename, output_folder / f'{os.path.splitext(filename)[0]}.mp4') | |
| @copy_handler | |
| def convert_image(input_path: Path, output_path: Path) -> None: | |
| save_opts = {'quality': IMAGE_QUALITY} | |
| # Read the exif data | |
| try: | |
| exif = piexif.load(str(input_path)) | |
| except (ValueError, struct.error): | |
| pass | |
| else: | |
| # If image is too small then just directly copy it | |
| if 0 < max((exif['Exif'].get(PXD, IMAGE_MAX_SIZE), exif['Exif'].get(PYD, IMAGE_MAX_SIZE), | |
| exif['0th'].get(IW, IMAGE_MAX_SIZE), exif['0th'].get(IH, IMAGE_MAX_SIZE))) < IMAGE_MAX_SIZE: | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(input_path, output_path) | |
| return | |
| # Open the image using Pillow | |
| if pillow_heif.is_supported(input_path): | |
| heif_image = pillow_heif.open_heif(input_path) | |
| info = heif_image.info | |
| image = Image.frombytes(heif_image.mode, heif_image.size, heif_image.data) | |
| else: | |
| try: | |
| image = Image.open(input_path) | |
| except Image.DecompressionBombError: | |
| raise RuntimeError(f'{input_path} is too large, increase `MAX_IMAGE_PIXELS`') | |
| info = image.info | |
| # Resize the image, keeping aspect ratio | |
| image.thumbnail((IMAGE_MAX_SIZE, IMAGE_MAX_SIZE)) | |
| # Set the correct rotation | |
| image = ImageOps.exif_transpose(image) | |
| # Generate the new exif data | |
| try: | |
| exif = piexif.load(info['exif']) | |
| except (KeyError, struct.error): | |
| pass | |
| else: | |
| exif['thumbnail'] = None | |
| exif['0th'][piexif.ImageIFD.Orientation] = 1 | |
| exif['Exif'][PXD], exif['Exif'][PYD] = exif['0th'][IW], exif['0th'][IH] = image.size | |
| while True: | |
| try: | |
| save_opts['exif'] = piexif.dump(exif) | |
| # "dump" got wrong type of exif value.\n41729 in Exif IFD. Got as <class 'int'>. | |
| except ValueError as e: | |
| if 'got wrong type of exif value' in str(e): | |
| value, _, key = str(e).split('\n')[1].split(' ', 3)[:3] | |
| del exif[key][int(value)] | |
| else: | |
| raise | |
| else: | |
| break | |
| # Save the file | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| image.convert('RGB').save(output_path, 'JPEG', **save_opts) | |
| def convert_images(input_folder: Path, output_folder: Path) -> None: | |
| # Iterate through all files in the input folder | |
| try: | |
| image_files = [f.name for f in input_folder.iterdir() if os.path.splitext(f.name)[1].lower() in IMAGE_FORMATS] | |
| except PermissionError: | |
| return | |
| tasks = [] | |
| with ProcessPoolExecutor(max_workers=1) as executor: | |
| for i, filename in enumerate(image_files): | |
| image_src = input_folder / filename | |
| image_dst = output_folder / f'{os.path.splitext(filename)[0]}.jpg' | |
| tasks.append(executor.submit(convert_image, image_src, image_dst)) | |
| if not i % 32: | |
| while tasks: | |
| tasks.pop().result() | |
| # Optional: wait for all tasks to complete and check results | |
| for task in tasks: | |
| task.result() # This will re-raise any exceptions if they occurred | |
| def remove_deleted(input_folder: Path, output_folder: Path) -> None: | |
| try: | |
| input_files = {f.name.lower(): f for f in input_folder.iterdir()} | |
| output_files = {f.name.lower(): f for f in output_folder.iterdir()} | |
| except (PermissionError, FileNotFoundError): | |
| return | |
| # Handle remapped formats | |
| for name, f in dict(input_files).items(): | |
| if name.endswith(IMAGE_FORMATS): | |
| input_files[f'{os.path.splitext(name)[0]}.jpg'] = f | |
| if name.endswith(VIDEO_FORMATS): | |
| input_files[f'{os.path.splitext(name)[0]}.mp4'] = f | |
| # Delete anything that doesn't exist | |
| for name in set(output_files) - set(input_files): | |
| f = output_files[name] | |
| if f.is_dir(): | |
| print(f'Removing old directory: {f}') | |
| shutil.rmtree(f) | |
| elif f.is_file(): | |
| print(f'Removing old file: {f}') | |
| f.unlink() | |
| def convert_folders(input_folder: Path | str, output_folder: Path | str): | |
| input_folder = Path(input_folder).resolve() | |
| output_folder = Path(output_folder).resolve() | |
| remove_deleted(input_folder, output_folder) | |
| convert_images(input_folder, output_folder) | |
| convert_videos(input_folder, output_folder) | |
| for root, dirs, files in input_folder.walk(): | |
| for d in dirs: | |
| copy_from = root / d | |
| if copy_from.is_relative_to(output_folder): | |
| continue | |
| copy_to = Path(str(root).replace(str(input_folder), str(output_folder))) / d | |
| remove_deleted(copy_from, copy_to) | |
| convert_images(copy_from, copy_to) | |
| convert_videos(copy_from, copy_to) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description='Optimise and downscale images/videos') | |
| parser.add_argument('output_folder', nargs='?', help='Path to the output folder where the files will be saved.', default=OUTPUT_FOLDER) | |
| args = parser.parse_args() | |
| convert_folders(os.getcwd(), args.output_folder) | |
| if __name__ == '__main__': | |
| main() |
Author
A bit of mess happens in case the output folder is a subdirectory of the input folder as this seems to create recursive subdirectories.
Hey, I've just fixed and cleaned this up so it safely handles that now.
If you don't give it an output directory, it puts it under an .ente-convert folder in the same directory.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A bit of mess happens in case the output folder is a subdirectory of the input folder as this seems to create recursive subdirectories.