Last active
September 7, 2020 19:45
-
-
Save renaudll/6144c502c42983d880a7ab77e6818096 to your computer and use it in GitHub Desktop.
Find duplicates
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Scan a provided directory and report any duplicate files. | |
| Made quickly for a friend. | |
| """ | |
| import os | |
| import logging | |
| import itertools | |
| import hashlib | |
| import argparse | |
| import multiprocessing | |
| import collections | |
| # Configure logging | |
| logging.basicConfig(format="%(message)s") | |
| log = logging.getLogger(__name__) | |
| log.setLevel(logging.INFO) | |
| # Configure parser | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("directory") | |
| parser.add_argument("--logfile") | |
| def main(start): | |
| if not os.path.isdir(start): | |
| raise Exception("Invalid directory.") | |
| scan_result = _scan(start) | |
| # Report | |
| _report(scan_result) | |
| def _scan(start): | |
| """ | |
| Scan a directory and return information regarding duplicate files. | |
| :param str start: The directory to scan. | |
| :return: A dict of duplicate paths by their hashes. | |
| :rtype: dict[str, tuple[str]] | |
| """ | |
| # Find paths | |
| all_paths = set() | |
| for rootdir, dirnames, filenames in os.walk(start): | |
| for filename in filenames: | |
| path = os.path.join(rootdir, filename) | |
| all_paths.add(path) | |
| log.info("Found %s files" % len(all_paths)) | |
| # Group paths by their size | |
| paths_by_size = _group_by_fn(all_paths, os.path.getsize) | |
| candidates = set() | |
| for size, paths in paths_by_size.items(): | |
| # Ignore file that have no size | |
| if not size: | |
| continue | |
| if len(paths) > 1: | |
| candidates.update(paths) | |
| log.info("Found %s potentially similar files" % len(candidates)) | |
| # Compute hash for each candidates | |
| log.info("Computing file hashes...") | |
| candidates = tuple(candidates) # make ordered | |
| pool = multiprocessing.Pool() | |
| hashes = pool.map(_get_file_hash, candidates) | |
| paths_by_hashes = collections.defaultdict(set) | |
| for candidate, hash in zip(candidates, hashes): | |
| paths_by_hashes[hash].add(candidate) | |
| # Keep only groups that have more than one file associated with the same hash. | |
| return {key: paths for key, paths in paths_by_hashes.items() if len(paths) > 1} | |
| def _get_file_hash(path): | |
| """ | |
| Get a hash from a file content. | |
| :param str path: A path to a file | |
| :return: A hash string | |
| :rtype: str | |
| """ | |
| with open(path, "rb") as fp: | |
| return hashlib.md5(fp.read()).hexdigest() | |
| def _group_by_fn(pool, fn): | |
| """ | |
| Group multiples values using a function. | |
| :param pool: A sequence of values | |
| :type pool: Sequence[str] | |
| :param callable fn: A function that return a key from a value | |
| :return: A dict of values by their keys | |
| :rtype: dict[str, list[str]] | |
| """ | |
| return {key: tuple(groups) for key, groups in itertools.groupby(sorted(pool, key=fn), fn)} | |
| def _report(paths_by_hashes): | |
| """ | |
| Log scan results | |
| :param paths_by_hashes: A dict of duplicate paths by their hashes. | |
| :type paths_by_hashes: dict[str, tuple[str]] | |
| """ | |
| paths = sorted(sorted(paths) for paths in paths_by_hashes.values()) | |
| for paths in paths: | |
| log.info("The following files are the same:") | |
| for path in paths: | |
| log.info("\t%s" % path) | |
| if __name__ == "__main__": | |
| args = parser.parse_args() | |
| # Dump logging to file if requested | |
| if args.logfile: | |
| handler = logging.FileHandler(args.logfile) | |
| handler.setLevel(logging.INFO) | |
| log.addHandler(handler) | |
| main(args.directory) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment