Skip to content

Instantly share code, notes, and snippets.

@renaudll
Last active September 7, 2020 19:45
Show Gist options
  • Select an option

  • Save renaudll/6144c502c42983d880a7ab77e6818096 to your computer and use it in GitHub Desktop.

Select an option

Save renaudll/6144c502c42983d880a7ab77e6818096 to your computer and use it in GitHub Desktop.
Find duplicates
"""
Scan a provided directory and report any duplicate files.
Made quickly for a friend.
"""
import os
import logging
import itertools
import hashlib
import argparse
import multiprocessing
import collections
# Configure logging
logging.basicConfig(format="%(message)s")
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
# Configure parser
parser = argparse.ArgumentParser()
parser.add_argument("directory")
parser.add_argument("--logfile")
def main(start):
if not os.path.isdir(start):
raise Exception("Invalid directory.")
scan_result = _scan(start)
# Report
_report(scan_result)
def _scan(start):
"""
Scan a directory and return information regarding duplicate files.
:param str start: The directory to scan.
:return: A dict of duplicate paths by their hashes.
:rtype: dict[str, tuple[str]]
"""
# Find paths
all_paths = set()
for rootdir, dirnames, filenames in os.walk(start):
for filename in filenames:
path = os.path.join(rootdir, filename)
all_paths.add(path)
log.info("Found %s files" % len(all_paths))
# Group paths by their size
paths_by_size = _group_by_fn(all_paths, os.path.getsize)
candidates = set()
for size, paths in paths_by_size.items():
# Ignore file that have no size
if not size:
continue
if len(paths) > 1:
candidates.update(paths)
log.info("Found %s potentially similar files" % len(candidates))
# Compute hash for each candidates
log.info("Computing file hashes...")
candidates = tuple(candidates) # make ordered
pool = multiprocessing.Pool()
hashes = pool.map(_get_file_hash, candidates)
paths_by_hashes = collections.defaultdict(set)
for candidate, hash in zip(candidates, hashes):
paths_by_hashes[hash].add(candidate)
# Keep only groups that have more than one file associated with the same hash.
return {key: paths for key, paths in paths_by_hashes.items() if len(paths) > 1}
def _get_file_hash(path):
"""
Get a hash from a file content.
:param str path: A path to a file
:return: A hash string
:rtype: str
"""
with open(path, "rb") as fp:
return hashlib.md5(fp.read()).hexdigest()
def _group_by_fn(pool, fn):
"""
Group multiples values using a function.
:param pool: A sequence of values
:type pool: Sequence[str]
:param callable fn: A function that return a key from a value
:return: A dict of values by their keys
:rtype: dict[str, list[str]]
"""
return {key: tuple(groups) for key, groups in itertools.groupby(sorted(pool, key=fn), fn)}
def _report(paths_by_hashes):
"""
Log scan results
:param paths_by_hashes: A dict of duplicate paths by their hashes.
:type paths_by_hashes: dict[str, tuple[str]]
"""
paths = sorted(sorted(paths) for paths in paths_by_hashes.values())
for paths in paths:
log.info("The following files are the same:")
for path in paths:
log.info("\t%s" % path)
if __name__ == "__main__":
args = parser.parse_args()
# Dump logging to file if requested
if args.logfile:
handler = logging.FileHandler(args.logfile)
handler.setLevel(logging.INFO)
log.addHandler(handler)
main(args.directory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment