Last active
February 12, 2026 12:36
-
-
Save dsoprea/d2e7326de7d11fb55c0054867674e863 to your computer and use it in GitHub Desktop.
Find all ZIPs in the source path, extract all files, and rename base portion of the output name to be the SHA1 digest of the data of that file. Creates a manifest. Sets all timestamps according to EXIF.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Requirements: tqdm | |
| import sys | |
| import os | |
| import argparse | |
| import logging | |
| import zipfile | |
| import tempfile | |
| import contextlib | |
| import shutil | |
| import hashlib | |
| import json | |
| import time | |
| import datetime | |
| import tqdm | |
| import PIL.Image | |
| _DESCRIPTION = \ | |
| "Find all ZIPs in the source path, extract all files, and rename base " \ | |
| "portion of the output name to be the SHA1 digest of the data of that " \ | |
| "file. Useful for establishing a flat directory with a large number of " \ | |
| "images and ensuring uniqueness between images, not precluding the " \ | |
| "ability to update the images later while knowing which you already " \ | |
| "have, and skipping those that are already present." | |
| _LOGGER = logging.getLogger(__name__) | |
| _ALGORITHM = hashlib.sha1 | |
| _IMAGE_EXTENSIONS = [ | |
| '.jpg', | |
| '.png', | |
| ] | |
| _ARCHIVE_EXTENSIONS = [ | |
| '.zip', | |
| ] | |
| _EXTENSIONS = _IMAGE_EXTENSIONS + _ARCHIVE_EXTENSIONS | |
| def _get_args(): | |
| parser = \ | |
| argparse.ArgumentParser( | |
| description=_DESCRIPTION) | |
| parser.add_argument( | |
| 'source_path', | |
| help="Source path") | |
| parser.add_argument( | |
| 'target_path', | |
| help="Target path") | |
| parser.add_argument( | |
| '--mappings-output-filepath', | |
| help="Write mappings of original files to final files.") | |
| args = parser.parse_args() | |
| return args | |
| def _print(message): | |
| print(message, file=sys.stderr) | |
| def _write_pretty_json(data, f): | |
| json.dump( | |
| data, | |
| f, | |
| sort_keys=True, | |
| indent=4, | |
| separators=(',', ': ')) | |
| f.write('\n') | |
| def _find_files_gen(root_path, extensions, classifier_cb): | |
| extensions = map(str.lower, extensions) | |
| extensions = list(extensions) | |
| root_path = root_path.rstrip(os.sep) | |
| root_path_len = len(root_path) | |
| entries = os.walk(root_path) | |
| archive_filepaths = [] | |
| for this_rel_path, folders, files in entries: | |
| path = os.path.join(root_path, this_rel_path) | |
| rel_path = path[root_path_len + 1:] | |
| # Process alphabetically for intuitiveness | |
| folders.sort() | |
| files.sort() | |
| for filename in files: | |
| filename_lc = filename.lower() | |
| for extension in extensions: | |
| if filename_lc.endswith(extension) is False: | |
| continue | |
| class_ = classifier_cb(extension) | |
| yield \ | |
| os.path.join(rel_path, filename), \ | |
| extension, \ | |
| class_ | |
| @contextlib.contextmanager | |
| def _temp_path(): | |
| original_wd = os.getcwd() | |
| path = tempfile.mkdtemp() | |
| os.chdir(path) | |
| try: | |
| yield path | |
| finally: | |
| os.chdir(original_wd) | |
| try: | |
| shutil.rmtree(path) | |
| except: | |
| pass | |
| def _get_timestamp(filepath): | |
| with PIL.Image.open(filepath) as im: | |
| exif = None | |
| if exif is None: | |
| try: | |
| exif = im._getexif() | |
| except AttributeError: | |
| pass | |
| if exif is None: | |
| try: | |
| exif = im.getexif() | |
| except AttributeError: | |
| pass | |
| # No EXIF | |
| if not exif: | |
| return None | |
| # Get DateTimeOriginal tag value | |
| timestamp_phrase = exif.get(0x9003) | |
| # Absent tag | |
| if timestamp_phrase is None: | |
| return None | |
| timestamp_phrase = timestamp_phrase.strip() | |
| # Empty string | |
| if timestamp_phrase == '': | |
| return None | |
| timestamp_dt = \ | |
| datetime.datetime.strptime( | |
| timestamp_phrase, | |
| '%Y:%m:%d %H:%M:%S') | |
| return timestamp_dt | |
| def _yield_images_from_archive_gen( | |
| target_path, t, archive_filepath, mappings): | |
| with zipfile.ZipFile(archive_filepath) as z: | |
| # We establish the temp-path here so that we don't interfere with | |
| # the potentially relative source-path | |
| with _temp_path() as temp_path: | |
| entries = z.infolist() | |
| t.total += len(entries) | |
| t.refresh() | |
| archive_basename = os.path.basename(archive_filepath) | |
| for entry in entries: | |
| entry_basename = entry.filename | |
| # Ignore everything but images | |
| hit = False | |
| for extension in _IMAGE_EXTENSIONS: | |
| entry_basename_lc = entry_basename.lower() | |
| if entry_basename_lc.endswith(extension) is False: | |
| continue | |
| hit = True | |
| break | |
| if hit is False: | |
| continue | |
| # Set description into progress bar | |
| description = \ | |
| '{}: {}'.format( | |
| archive_basename, | |
| entry_basename) | |
| t.set_description(description) | |
| # Extract | |
| extracted_filepath = z.extract(entry, path=temp_path) | |
| # Establish timestamp | |
| timestamp_dt = _get_timestamp(extracted_filepath) | |
| # Rename to a temporary name, to avoid ambiguity with the other | |
| # files in the path | |
| extracted_filename = os.path.basename(extracted_filepath) | |
| temp_filename = '.temp.{}'.format(extracted_filename) | |
| temp_filepath = \ | |
| os.path.join( | |
| target_path, | |
| temp_filename) | |
| shutil.move(extracted_filepath, temp_filepath) | |
| yield temp_filepath, entry_basename, timestamp_dt | |
| def _construct_target_filename(source_filepath, extension): | |
| # Calculate the digest | |
| with open(source_filepath, 'rb') as f: | |
| data = f.read() | |
| h = _ALGORITHM(data) | |
| digest = h.hexdigest() | |
| # Determine if it already exists | |
| extension_lc = extension.lower() | |
| final_filename = digest + extension_lc | |
| return final_filename | |
| def _main(): | |
| args = _get_args() | |
| assert \ | |
| os.path.exists(args.source_path) is True, \ | |
| "Source path does not exist: [{}]".format(args.source_path) | |
| assert \ | |
| os.path.exists(args.target_path) is True, \ | |
| "Target path does not exist: [{}]".format(args.target_path) | |
| # We're gonna be changing into a temp path, so canonicalize the paths we | |
| # were given | |
| source_path = os.path.abspath(args.source_path) | |
| target_path = os.path.abspath(args.target_path) | |
| def is_archive_cb(extension): | |
| return extension in _ARCHIVE_EXTENSIONS | |
| archive_filepaths = \ | |
| _find_files_gen( | |
| source_path, | |
| _EXTENSIONS, | |
| is_archive_cb) | |
| archive_filepaths = list(archive_filepaths) | |
| t = tqdm.tqdm(total=0) | |
| mappings = {} | |
| for rel_filepath, extension, is_archive in archive_filepaths: | |
| # TODO(dustin): It would be useful to extract the EXIF timestamp and use for the mtime | |
| filepath = os.path.join(source_path, rel_filepath) | |
| if is_archive is True: | |
| member_filepaths_and_names = \ | |
| _yield_images_from_archive_gen( | |
| target_path, | |
| t, | |
| filepath, | |
| mappings) | |
| for temp_filepath, filename, timestamp_dt \ | |
| in member_filepaths_and_names: | |
| _, extension = os.path.splitext(filename) | |
| final_filename = \ | |
| _construct_target_filename( | |
| temp_filepath, | |
| extension) | |
| archive_and_entry = '{}//{}'.format(rel_filepath, filename) | |
| mappings[archive_and_entry] = final_filename | |
| target_filepath = \ | |
| os.path.join( | |
| target_path, | |
| final_filename) | |
| os.rename(temp_filepath, target_filepath) | |
| # Set timestamp | |
| if timestamp_dt is not None: | |
| timestamp_epoch = timestamp_dt.timestamp() | |
| os.utime( | |
| target_filepath, | |
| times=(timestamp_epoch, timestamp_epoch)) | |
| t.update(1) | |
| else: | |
| filename = os.path.basename(filepath) | |
| t.set_description(filename) | |
| _, extension = os.path.splitext(filename) | |
| final_filename = \ | |
| _construct_target_filename( | |
| filepath, | |
| extension) | |
| target_filepath = \ | |
| os.path.join( | |
| target_path, | |
| final_filename) | |
| mappings[filename] = final_filename | |
| shutil.copyfile(filepath, target_filepath) | |
| t.update(1) | |
| if args.mappings_output_filepath is not None: | |
| with open(args.mappings_output_filepath, 'w') as f: | |
| _write_pretty_json(mappings, f) | |
| _main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment