Skip to content

Instantly share code, notes, and snippets.

@parsapoorsh
Last active December 9, 2025 20:34
Show Gist options
  • Select an option

  • Save parsapoorsh/bdf9e2c25435674846a1641fbbde574e to your computer and use it in GitHub Desktop.

Select an option

Save parsapoorsh/bdf9e2c25435674846a1641fbbde574e to your computer and use it in GitHub Desktop.
a python script to convert images to JPEG-XL using cjxl
#!/usr/bin/env python3
import os
import shutil
import subprocess
import logging
from pathlib import Path
from dataclasses import dataclass
from time import sleep, perf_counter
from typing import List, Iterator, Optional
JXL_EXT = ".jxl"
SUPPORTED_IMAGES = dict(
jpeg={".jpeg", ".jpg"},
png={".png"},
# webp={".webp"},
# avif={".avif"},
)
ALL_SUPPORTED_IMAGE_NAMES = tuple(SUPPORTED_IMAGES.keys())
ALL_SUPPORTED_IMAGE_EXTS = {ext for i in SUPPORTED_IMAGES.values() for ext in i}
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class EncoderError(Exception):
pass
@dataclass
class EncodingProcStuff:
proc: subprocess.Popen
input_path: Path
output_path: Path
started_at: float
def time_diff(self) -> float:
return perf_counter() - self.started_at
def get_output_path(input_path: Path):
return input_path.with_suffix(JXL_EXT)
def copy_timestamps(input_path: Path, output_path: Path):
ist = os.stat(input_path)
try:
os.utime(output_path, ns=(ist.st_atime_ns, ist.st_mtime_ns))
except AttributeError:
os.utime(output_path, times=(ist.st_atime, ist.st_mtime))
def get_jxl_encoder_proc(input_path: Path, output_path: Path) -> EncodingProcStuff:
input_path = input_path.resolve()
if not input_path.is_file():
raise FileNotFoundError(input_path)
extension = input_path.suffix.lower()
if extension not in ALL_SUPPORTED_IMAGE_EXTS:
raise KeyError(extension)
if output_path.suffix.lower() != JXL_EXT:
raise ValueError(f"Bad output suffix: {output_path.suffix}")
proc = subprocess.Popen(
args=[
"nice",
"-19",
"cjxl",
"--quiet",
"--lossless_jpeg=1",
"--quality=100",
"--effort=10",
"--num_threads=1",
input_path,
output_path,
],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
return EncodingProcStuff(
proc=proc,
input_path=input_path,
output_path=output_path,
started_at=perf_counter(),
)
def parse_jxl_encoder_proc(ep: EncodingProcStuff) -> float:
ep.proc.wait() # wait until it's done.
if not ep.output_path.is_file() or ep.proc.returncode != 0:
stdout, stderr = ep.proc.communicate()
raise EncoderError(f"cjxl failed (code {ep.proc.returncode}):\n{stderr.decode()}\n{ep.input_path}")
copy_timestamps(input_path=ep.input_path, output_path=ep.output_path)
original_size = ep.input_path.stat().st_size
compressed_size = ep.output_path.stat().st_size
percent_compressed = (
100 - ((compressed_size / original_size) * 100)
if original_size > 0 else 0.0
)
return percent_compressed
def process_single_image(
image_path: Path,
delete_original: bool,
):
extension = image_path.suffix.lower()
if extension not in ALL_SUPPORTED_IMAGE_EXTS:
raise ValueError(f"unsupported format: {extension}")
output_path = get_output_path(input_path=image_path)
if output_path.is_file():
logger.warning(f"{output_path} already exists! deleting it")
output_path.unlink()
logger.info(f"Compressing {image_path}")
ep = get_jxl_encoder_proc(input_path=image_path, output_path=output_path)
percent_compressed = parse_jxl_encoder_proc(ep)
logger.info(f"done! compressed by {percent_compressed:.2f}% in {ep.time_diff():.2f}s")
if delete_original:
ep.input_path.unlink()
logger.info(f"deleted original: {ep.input_path}")
return 0
def iter_directory(
directory_path: Path,
max_depth: int,
extensions: Optional[Iterator[str]] = None,
) -> Iterator[Path]:
"""
Iterates over files in the given directory, optionally limited by depth and filtered by file extensions.
- If max_depth == 0, recurses through all subdirectories.
- If max_depth == 1, only includes files in the current directory.
- If max_depth > 1, recurses up to the specified depth levels.
- If extensions is empty, yields all files regardless of extension.
- Yields only files, not directories.
"""
if max_depth < 0:
raise ValueError("max_depth cannot be negative")
if not directory_path.is_dir():
raise NotADirectoryError(directory_path)
extensions = set(extensions) if extensions else set()
def inner(current_dir: Path, current_depth: int) -> Iterator[Path]:
# noinspection PyChainedComparisons
if max_depth > 0 and current_depth > max_depth:
return
for path in current_dir.iterdir():
if path.is_dir():
yield from inner(path, current_depth + 1)
elif path.is_file() and (not extensions or path.suffix.lower() in extensions):
yield path
yield from inner(directory_path, 1)
def process_batch_images(
directory_path: Path,
image_formats: str,
threads: int,
depth: int,
delete_original: bool,
):
directory_path = directory_path.resolve()
formats = []
for format_name in image_formats.split(","):
format_name = format_name.strip().lower()
if format_name == "jpg":
format_name = "jpeg"
if format_name not in ALL_SUPPORTED_IMAGE_NAMES:
raise ValueError(f"unsupported format: {format_name}")
formats.append(format_name)
if not formats:
raise ValueError(f"empty {image_formats=}")
if threads == 0:
threads = os.cpu_count() or 1
elif threads < 0:
raise ValueError(f"negative threads count: {threads}")
if depth < 0:
raise ValueError(f"negative depth count: {depth}")
extensions_to_search = {ext for fmt in formats for ext in SUPPORTED_IMAGES[fmt]}
dir_list = iter_directory(
directory_path=directory_path,
max_depth=depth,
extensions=extensions_to_search
)
procs_running: List[EncodingProcStuff] = []
def parse_running_procs():
for idx, ep in enumerate(procs_running):
if ep.proc.poll() is None:
continue
input_relative_path = ep.input_path.relative_to(directory_path)
try:
percent_compressed = parse_jxl_encoder_proc(ep)
msg = f"Compressed {input_relative_path} by {percent_compressed:.2f}% in {ep.time_diff():.2f}s"
if delete_original:
ep.input_path.unlink()
msg += " & deleted original"
logger.info(msg)
except EncoderError as e:
logger.error(f"Skipping {input_relative_path} due to {e}")
del procs_running[idx]
try:
for image_path in dir_list:
output_path = get_output_path(image_path)
ep = get_jxl_encoder_proc(
input_path=image_path,
output_path=output_path
)
procs_running.append(ep)
while len(procs_running) >= threads:
parse_running_procs()
# pervent 100% CPU
sleep(0.01)
while procs_running:
parse_running_procs()
# pervent 100% CPU
sleep(0.01)
finally:
if not procs_running:
return 0
logger.warning("\nStopping... killing running subprocesses")
for ep in procs_running:
ep.proc.terminate()
ep.proc.kill()
ep.proc.wait()
logger.warning("subprocesses cleaned up")
return 1
def main(
image_or_directory_paths: List[Path],
image_formats: str,
threads: int,
depth: int,
delete_original: bool,
) -> int:
# check if cjxl is available
if shutil.which("cjxl") is None:
raise RuntimeError("cjxl not found in PATH. Install libjxl-tools.")
for image_or_directory_path in image_or_directory_paths:
if image_or_directory_path.is_file():
process_single_image(
image_path=image_or_directory_path,
delete_original=delete_original,
)
elif image_or_directory_path.is_dir():
process_batch_images(
directory_path=image_or_directory_path,
image_formats=image_formats,
threads=threads,
depth=depth,
delete_original=delete_original,
)
else:
raise ValueError(f"{image_or_directory_path} is not an image or directory")
return 0
if __name__ == "__main__":
import argparse
import traceback
parser = argparse.ArgumentParser(
description="a script to convert images to JPEG-XL using cjxl"
)
parser.add_argument(
"image_or_directory_paths",
type=Path,
nargs="+",
)
all_supported_image_names_comma_separated = ",".join(ALL_SUPPORTED_IMAGE_NAMES)
parser.add_argument(
"-if", "--image-formats",
type=str,
default=all_supported_image_names_comma_separated,
required=False,
dest="image_formats",
help=(
"image formats to search for." +
"\n" +
"supported formats: " + all_supported_image_names_comma_separated +
"\n" +
"default: " + all_supported_image_names_comma_separated
),
)
parser.add_argument(
"-t", "--threads",
type=int,
default=0,
required=False,
dest="threads",
help="number of parallel cjxl processes. 0 for all available CPUs",
)
parser.add_argument(
"-d", "--depth",
type=int,
default=1,
required=False,
dest="depth",
help="search depth. 0 for unlimited recursion, 1 for current dir only",
)
parser.add_argument(
"--delete-the-original-source",
action="store_true",
dest="delete_original",
help="DANGEROUS! delete the original image after conversion",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
)
# noinspection PyBroadException
try:
exit(main(**vars(args)))
except KeyboardInterrupt:
logger.info("\nInterrupted by user.")
exit(130)
except Exception as e:
logger.fatal(f"Error: {e}")
traceback.print_exc()
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment