Last active
December 9, 2025 20:34
-
-
Save parsapoorsh/bdf9e2c25435674846a1641fbbde574e to your computer and use it in GitHub Desktop.
a python script to convert images to JPEG-XL using cjxl
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import os | |
| import shutil | |
| import subprocess | |
| import logging | |
| from pathlib import Path | |
| from dataclasses import dataclass | |
| from time import sleep, perf_counter | |
| from typing import List, Iterator, Optional | |
| JXL_EXT = ".jxl" | |
| SUPPORTED_IMAGES = dict( | |
| jpeg={".jpeg", ".jpg"}, | |
| png={".png"}, | |
| # webp={".webp"}, | |
| # avif={".avif"}, | |
| ) | |
| ALL_SUPPORTED_IMAGE_NAMES = tuple(SUPPORTED_IMAGES.keys()) | |
| ALL_SUPPORTED_IMAGE_EXTS = {ext for i in SUPPORTED_IMAGES.values() for ext in i} | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| class EncoderError(Exception): | |
| pass | |
| @dataclass | |
| class EncodingProcStuff: | |
| proc: subprocess.Popen | |
| input_path: Path | |
| output_path: Path | |
| started_at: float | |
| def time_diff(self) -> float: | |
| return perf_counter() - self.started_at | |
| def get_output_path(input_path: Path): | |
| return input_path.with_suffix(JXL_EXT) | |
| def copy_timestamps(input_path: Path, output_path: Path): | |
| ist = os.stat(input_path) | |
| try: | |
| os.utime(output_path, ns=(ist.st_atime_ns, ist.st_mtime_ns)) | |
| except AttributeError: | |
| os.utime(output_path, times=(ist.st_atime, ist.st_mtime)) | |
| def get_jxl_encoder_proc(input_path: Path, output_path: Path) -> EncodingProcStuff: | |
| input_path = input_path.resolve() | |
| if not input_path.is_file(): | |
| raise FileNotFoundError(input_path) | |
| extension = input_path.suffix.lower() | |
| if extension not in ALL_SUPPORTED_IMAGE_EXTS: | |
| raise KeyError(extension) | |
| if output_path.suffix.lower() != JXL_EXT: | |
| raise ValueError(f"Bad output suffix: {output_path.suffix}") | |
| proc = subprocess.Popen( | |
| args=[ | |
| "nice", | |
| "-19", | |
| "cjxl", | |
| "--quiet", | |
| "--lossless_jpeg=1", | |
| "--quality=100", | |
| "--effort=10", | |
| "--num_threads=1", | |
| input_path, | |
| output_path, | |
| ], | |
| stderr=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| ) | |
| return EncodingProcStuff( | |
| proc=proc, | |
| input_path=input_path, | |
| output_path=output_path, | |
| started_at=perf_counter(), | |
| ) | |
| def parse_jxl_encoder_proc(ep: EncodingProcStuff) -> float: | |
| ep.proc.wait() # wait until it's done. | |
| if not ep.output_path.is_file() or ep.proc.returncode != 0: | |
| stdout, stderr = ep.proc.communicate() | |
| raise EncoderError(f"cjxl failed (code {ep.proc.returncode}):\n{stderr.decode()}\n{ep.input_path}") | |
| copy_timestamps(input_path=ep.input_path, output_path=ep.output_path) | |
| original_size = ep.input_path.stat().st_size | |
| compressed_size = ep.output_path.stat().st_size | |
| percent_compressed = ( | |
| 100 - ((compressed_size / original_size) * 100) | |
| if original_size > 0 else 0.0 | |
| ) | |
| return percent_compressed | |
| def process_single_image( | |
| image_path: Path, | |
| delete_original: bool, | |
| ): | |
| extension = image_path.suffix.lower() | |
| if extension not in ALL_SUPPORTED_IMAGE_EXTS: | |
| raise ValueError(f"unsupported format: {extension}") | |
| output_path = get_output_path(input_path=image_path) | |
| if output_path.is_file(): | |
| logger.warning(f"{output_path} already exists! deleting it") | |
| output_path.unlink() | |
| logger.info(f"Compressing {image_path}") | |
| ep = get_jxl_encoder_proc(input_path=image_path, output_path=output_path) | |
| percent_compressed = parse_jxl_encoder_proc(ep) | |
| logger.info(f"done! compressed by {percent_compressed:.2f}% in {ep.time_diff():.2f}s") | |
| if delete_original: | |
| ep.input_path.unlink() | |
| logger.info(f"deleted original: {ep.input_path}") | |
| return 0 | |
| def iter_directory( | |
| directory_path: Path, | |
| max_depth: int, | |
| extensions: Optional[Iterator[str]] = None, | |
| ) -> Iterator[Path]: | |
| """ | |
| Iterates over files in the given directory, optionally limited by depth and filtered by file extensions. | |
| - If max_depth == 0, recurses through all subdirectories. | |
| - If max_depth == 1, only includes files in the current directory. | |
| - If max_depth > 1, recurses up to the specified depth levels. | |
| - If extensions is empty, yields all files regardless of extension. | |
| - Yields only files, not directories. | |
| """ | |
| if max_depth < 0: | |
| raise ValueError("max_depth cannot be negative") | |
| if not directory_path.is_dir(): | |
| raise NotADirectoryError(directory_path) | |
| extensions = set(extensions) if extensions else set() | |
| def inner(current_dir: Path, current_depth: int) -> Iterator[Path]: | |
| # noinspection PyChainedComparisons | |
| if max_depth > 0 and current_depth > max_depth: | |
| return | |
| for path in current_dir.iterdir(): | |
| if path.is_dir(): | |
| yield from inner(path, current_depth + 1) | |
| elif path.is_file() and (not extensions or path.suffix.lower() in extensions): | |
| yield path | |
| yield from inner(directory_path, 1) | |
| def process_batch_images( | |
| directory_path: Path, | |
| image_formats: str, | |
| threads: int, | |
| depth: int, | |
| delete_original: bool, | |
| ): | |
| directory_path = directory_path.resolve() | |
| formats = [] | |
| for format_name in image_formats.split(","): | |
| format_name = format_name.strip().lower() | |
| if format_name == "jpg": | |
| format_name = "jpeg" | |
| if format_name not in ALL_SUPPORTED_IMAGE_NAMES: | |
| raise ValueError(f"unsupported format: {format_name}") | |
| formats.append(format_name) | |
| if not formats: | |
| raise ValueError(f"empty {image_formats=}") | |
| if threads == 0: | |
| threads = os.cpu_count() or 1 | |
| elif threads < 0: | |
| raise ValueError(f"negative threads count: {threads}") | |
| if depth < 0: | |
| raise ValueError(f"negative depth count: {depth}") | |
| extensions_to_search = {ext for fmt in formats for ext in SUPPORTED_IMAGES[fmt]} | |
| dir_list = iter_directory( | |
| directory_path=directory_path, | |
| max_depth=depth, | |
| extensions=extensions_to_search | |
| ) | |
| procs_running: List[EncodingProcStuff] = [] | |
| def parse_running_procs(): | |
| for idx, ep in enumerate(procs_running): | |
| if ep.proc.poll() is None: | |
| continue | |
| input_relative_path = ep.input_path.relative_to(directory_path) | |
| try: | |
| percent_compressed = parse_jxl_encoder_proc(ep) | |
| msg = f"Compressed {input_relative_path} by {percent_compressed:.2f}% in {ep.time_diff():.2f}s" | |
| if delete_original: | |
| ep.input_path.unlink() | |
| msg += " & deleted original" | |
| logger.info(msg) | |
| except EncoderError as e: | |
| logger.error(f"Skipping {input_relative_path} due to {e}") | |
| del procs_running[idx] | |
| try: | |
| for image_path in dir_list: | |
| output_path = get_output_path(image_path) | |
| ep = get_jxl_encoder_proc( | |
| input_path=image_path, | |
| output_path=output_path | |
| ) | |
| procs_running.append(ep) | |
| while len(procs_running) >= threads: | |
| parse_running_procs() | |
| # pervent 100% CPU | |
| sleep(0.01) | |
| while procs_running: | |
| parse_running_procs() | |
| # pervent 100% CPU | |
| sleep(0.01) | |
| finally: | |
| if not procs_running: | |
| return 0 | |
| logger.warning("\nStopping... killing running subprocesses") | |
| for ep in procs_running: | |
| ep.proc.terminate() | |
| ep.proc.kill() | |
| ep.proc.wait() | |
| logger.warning("subprocesses cleaned up") | |
| return 1 | |
| def main( | |
| image_or_directory_paths: List[Path], | |
| image_formats: str, | |
| threads: int, | |
| depth: int, | |
| delete_original: bool, | |
| ) -> int: | |
| # check if cjxl is available | |
| if shutil.which("cjxl") is None: | |
| raise RuntimeError("cjxl not found in PATH. Install libjxl-tools.") | |
| for image_or_directory_path in image_or_directory_paths: | |
| if image_or_directory_path.is_file(): | |
| process_single_image( | |
| image_path=image_or_directory_path, | |
| delete_original=delete_original, | |
| ) | |
| elif image_or_directory_path.is_dir(): | |
| process_batch_images( | |
| directory_path=image_or_directory_path, | |
| image_formats=image_formats, | |
| threads=threads, | |
| depth=depth, | |
| delete_original=delete_original, | |
| ) | |
| else: | |
| raise ValueError(f"{image_or_directory_path} is not an image or directory") | |
| return 0 | |
| if __name__ == "__main__": | |
| import argparse | |
| import traceback | |
| parser = argparse.ArgumentParser( | |
| description="a script to convert images to JPEG-XL using cjxl" | |
| ) | |
| parser.add_argument( | |
| "image_or_directory_paths", | |
| type=Path, | |
| nargs="+", | |
| ) | |
| all_supported_image_names_comma_separated = ",".join(ALL_SUPPORTED_IMAGE_NAMES) | |
| parser.add_argument( | |
| "-if", "--image-formats", | |
| type=str, | |
| default=all_supported_image_names_comma_separated, | |
| required=False, | |
| dest="image_formats", | |
| help=( | |
| "image formats to search for." + | |
| "\n" + | |
| "supported formats: " + all_supported_image_names_comma_separated + | |
| "\n" + | |
| "default: " + all_supported_image_names_comma_separated | |
| ), | |
| ) | |
| parser.add_argument( | |
| "-t", "--threads", | |
| type=int, | |
| default=0, | |
| required=False, | |
| dest="threads", | |
| help="number of parallel cjxl processes. 0 for all available CPUs", | |
| ) | |
| parser.add_argument( | |
| "-d", "--depth", | |
| type=int, | |
| default=1, | |
| required=False, | |
| dest="depth", | |
| help="search depth. 0 for unlimited recursion, 1 for current dir only", | |
| ) | |
| parser.add_argument( | |
| "--delete-the-original-source", | |
| action="store_true", | |
| dest="delete_original", | |
| help="DANGEROUS! delete the original image after conversion", | |
| ) | |
| args = parser.parse_args() | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(message)s", | |
| ) | |
| # noinspection PyBroadException | |
| try: | |
| exit(main(**vars(args))) | |
| except KeyboardInterrupt: | |
| logger.info("\nInterrupted by user.") | |
| exit(130) | |
| except Exception as e: | |
| logger.fatal(f"Error: {e}") | |
| traceback.print_exc() | |
| exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment