Skip to content

Instantly share code, notes, and snippets.

@dsoprea
Last active February 12, 2026 12:36
Show Gist options
  • Select an option

  • Save dsoprea/d2e7326de7d11fb55c0054867674e863 to your computer and use it in GitHub Desktop.

Select an option

Save dsoprea/d2e7326de7d11fb55c0054867674e863 to your computer and use it in GitHub Desktop.
Find all ZIPs in the source path, extract all files, and rename base portion of the output name to be the SHA1 digest of the data of that file. Creates a manifest. Sets all timestamps according to EXIF.
#!/usr/bin/env python3
# Requirements: tqdm
import sys
import os
import argparse
import logging
import zipfile
import tempfile
import contextlib
import shutil
import hashlib
import json
import time
import datetime
import tqdm
import PIL.Image
_DESCRIPTION = \
"Find all ZIPs in the source path, extract all files, and rename base " \
"portion of the output name to be the SHA1 digest of the data of that " \
"file. Useful for establishing a flat directory with a large number of " \
"images and ensuring uniqueness between images, not precluding the " \
"ability to update the images later while knowing which you already " \
"have, and skipping those that are already present."
_LOGGER = logging.getLogger(__name__)
_ALGORITHM = hashlib.sha1
_IMAGE_EXTENSIONS = [
'.jpg',
'.png',
]
_ARCHIVE_EXTENSIONS = [
'.zip',
]
_EXTENSIONS = _IMAGE_EXTENSIONS + _ARCHIVE_EXTENSIONS
def _get_args():
parser = \
argparse.ArgumentParser(
description=_DESCRIPTION)
parser.add_argument(
'source_path',
help="Source path")
parser.add_argument(
'target_path',
help="Target path")
parser.add_argument(
'--mappings-output-filepath',
help="Write mappings of original files to final files.")
args = parser.parse_args()
return args
def _print(message):
print(message, file=sys.stderr)
def _write_pretty_json(data, f):
json.dump(
data,
f,
sort_keys=True,
indent=4,
separators=(',', ': '))
f.write('\n')
def _find_files_gen(root_path, extensions, classifier_cb):
extensions = map(str.lower, extensions)
extensions = list(extensions)
root_path = root_path.rstrip(os.sep)
root_path_len = len(root_path)
entries = os.walk(root_path)
archive_filepaths = []
for this_rel_path, folders, files in entries:
path = os.path.join(root_path, this_rel_path)
rel_path = path[root_path_len + 1:]
# Process alphabetically for intuitiveness
folders.sort()
files.sort()
for filename in files:
filename_lc = filename.lower()
for extension in extensions:
if filename_lc.endswith(extension) is False:
continue
class_ = classifier_cb(extension)
yield \
os.path.join(rel_path, filename), \
extension, \
class_
@contextlib.contextmanager
def _temp_path():
original_wd = os.getcwd()
path = tempfile.mkdtemp()
os.chdir(path)
try:
yield path
finally:
os.chdir(original_wd)
try:
shutil.rmtree(path)
except:
pass
def _get_timestamp(filepath):
with PIL.Image.open(filepath) as im:
exif = None
if exif is None:
try:
exif = im._getexif()
except AttributeError:
pass
if exif is None:
try:
exif = im.getexif()
except AttributeError:
pass
# No EXIF
if not exif:
return None
# Get DateTimeOriginal tag value
timestamp_phrase = exif.get(0x9003)
# Absent tag
if timestamp_phrase is None:
return None
timestamp_phrase = timestamp_phrase.strip()
# Empty string
if timestamp_phrase == '':
return None
timestamp_dt = \
datetime.datetime.strptime(
timestamp_phrase,
'%Y:%m:%d %H:%M:%S')
return timestamp_dt
def _yield_images_from_archive_gen(
target_path, t, archive_filepath, mappings):
with zipfile.ZipFile(archive_filepath) as z:
# We establish the temp-path here so that we don't interfere with
# the potentially relative source-path
with _temp_path() as temp_path:
entries = z.infolist()
t.total += len(entries)
t.refresh()
archive_basename = os.path.basename(archive_filepath)
for entry in entries:
entry_basename = entry.filename
# Ignore everything but images
hit = False
for extension in _IMAGE_EXTENSIONS:
entry_basename_lc = entry_basename.lower()
if entry_basename_lc.endswith(extension) is False:
continue
hit = True
break
if hit is False:
continue
# Set description into progress bar
description = \
'{}: {}'.format(
archive_basename,
entry_basename)
t.set_description(description)
# Extract
extracted_filepath = z.extract(entry, path=temp_path)
# Establish timestamp
timestamp_dt = _get_timestamp(extracted_filepath)
# Rename to a temporary name, to avoid ambiguity with the other
# files in the path
extracted_filename = os.path.basename(extracted_filepath)
temp_filename = '.temp.{}'.format(extracted_filename)
temp_filepath = \
os.path.join(
target_path,
temp_filename)
shutil.move(extracted_filepath, temp_filepath)
yield temp_filepath, entry_basename, timestamp_dt
def _construct_target_filename(source_filepath, extension):
# Calculate the digest
with open(source_filepath, 'rb') as f:
data = f.read()
h = _ALGORITHM(data)
digest = h.hexdigest()
# Determine if it already exists
extension_lc = extension.lower()
final_filename = digest + extension_lc
return final_filename
def _main():
args = _get_args()
assert \
os.path.exists(args.source_path) is True, \
"Source path does not exist: [{}]".format(args.source_path)
assert \
os.path.exists(args.target_path) is True, \
"Target path does not exist: [{}]".format(args.target_path)
# We're gonna be changing into a temp path, so canonicalize the paths we
# were given
source_path = os.path.abspath(args.source_path)
target_path = os.path.abspath(args.target_path)
def is_archive_cb(extension):
return extension in _ARCHIVE_EXTENSIONS
archive_filepaths = \
_find_files_gen(
source_path,
_EXTENSIONS,
is_archive_cb)
archive_filepaths = list(archive_filepaths)
t = tqdm.tqdm(total=0)
mappings = {}
for rel_filepath, extension, is_archive in archive_filepaths:
# TODO(dustin): It would be useful to extract the EXIF timestamp and use for the mtime
filepath = os.path.join(source_path, rel_filepath)
if is_archive is True:
member_filepaths_and_names = \
_yield_images_from_archive_gen(
target_path,
t,
filepath,
mappings)
for temp_filepath, filename, timestamp_dt \
in member_filepaths_and_names:
_, extension = os.path.splitext(filename)
final_filename = \
_construct_target_filename(
temp_filepath,
extension)
archive_and_entry = '{}//{}'.format(rel_filepath, filename)
mappings[archive_and_entry] = final_filename
target_filepath = \
os.path.join(
target_path,
final_filename)
os.rename(temp_filepath, target_filepath)
# Set timestamp
if timestamp_dt is not None:
timestamp_epoch = timestamp_dt.timestamp()
os.utime(
target_filepath,
times=(timestamp_epoch, timestamp_epoch))
t.update(1)
else:
filename = os.path.basename(filepath)
t.set_description(filename)
_, extension = os.path.splitext(filename)
final_filename = \
_construct_target_filename(
filepath,
extension)
target_filepath = \
os.path.join(
target_path,
final_filename)
mappings[filename] = final_filename
shutil.copyfile(filepath, target_filepath)
t.update(1)
if args.mappings_output_filepath is not None:
with open(args.mappings_output_filepath, 'w') as f:
_write_pretty_json(mappings, f)
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment