Source code for fastdupes

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Find Dupes Fast
By Stephan Sokolow (ssokolow.com)

A simple script which identifies duplicate files several orders of magnitude
more quickly than fdupes by using smarter algorithms.

----

.. only:: draft

    Copyright (C) 2009-2014 Stephan Sokolow

    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 2 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program; if not, see <http://www.gnu.org/licenses/>.

.. todo:: Figure out how to do ePyDoc-style grouping here without giving up
          automodule-level comfort.

.. default-domain:: py
"""


__appname__ = "Find Dupes Fast"
__author__ = "Stephan Sokolow (deitarion/SSokolow)"
__version__ = "0.3.6"
__license__ = "GNU GPL 2.0 or later"

import fnmatch, os, re, stat, sys
from functools import wraps

# Note: In my `python -m timeit` tests, the difference between MD5 and SHA1 was
# negligible, so there is no meaningful reason not to take advantage of the
# reduced potential for hash collisions that SHA1's greater hash size offers.
import hashlib

#: Default settings used by :mod:`optparse` and some functions
DEFAULTS = {
    'delete': False,
    'exclude': ['*/.svn', '*/.bzr', '*/.git', '*/.hg'],
    'min_size': 25,  #: Only check files this big or bigger.
}
CHUNK_SIZE = 2 ** 16  #: Size for chunked reads from file handles
HEAD_SIZE = 2 ** 14  #: Limit how many bytes will be read to compare headers

# {{{ General Helper Functions

# We need os.lstat so we can skip symlinks, but we want Windows portability too
try:
    _stat = os.lstat
except AttributeError:
    _stat = os.stat

[docs]def multiglob_compile(globs, prefix=False): """Generate a single "A or B or C" regex from a list of shell globs. :param globs: Patterns to be processed by :mod:`fnmatch`. :type globs: iterable of :class:`~__builtins__.str` :param prefix: If ``True``, then :meth:`~re.RegexObject.match` will perform prefix matching rather than exact string matching. :type prefix: :class:`~__builtins__.bool` :rtype: :class:`re.RegexObject` """ if not globs: # An empty globs list should only match empty strings return re.compile('^$') elif prefix: globs = [x + '*' for x in globs] return re.compile('|'.join(fnmatch.translate(x) for x in globs))
[docs]def hashFile(handle, want_hex=False, limit=None, chunk_size=CHUNK_SIZE): """Generate a hash from a potentially long file. Digesting will obey :const:`CHUNK_SIZE` to conserve memory. :param handle: A file-like object or path to hash from. :param want_hex: If ``True``, returned hash will be hex-encoded. :type want_hex: :class:`~__builtins__.bool` :param limit: Maximum number of bytes to read (rounded up to a multiple of ``CHUNK_SIZE``) :type limit: :class:`~__builtins__.int` :param chunk_size: Size of :meth:`~__builtins__.file.read` operations in bytes. :type chunk_size: :class:`~__builtins__.int` :rtype: :class:`~__builtins__.str` :returns: A binary or hex-encoded SHA1 hash. .. note:: It is your responsibility to close any file-like objects you pass in """ fhash, read = hashlib.sha1(), 0 if isinstance(handle, basestring): handle = file(handle, 'rb') if limit: chunk_size = min(chunk_size, limit) # Chunked digest generation (conserve memory) for block in iter(lambda: handle.read(chunk_size), ''): fhash.update(block) read += chunk_size if 0 < limit <= read: break return want_hex and fhash.hexdigest() or fhash.digest()
[docs]class OverWriter(object): # pylint: disable=too-few-public-methods """Output helper for handling overdrawing the previous line cleanly.""" def __init__(self, fobj): self.max_len = 0 self.fobj = fobj self.isatty = False if hasattr(self.fobj, 'fileno'): self.isatty = os.isatty(self.fobj.fileno())
[docs] def write(self, text, newline=False): """Use ``\\r`` to overdraw the current line with the given text. This function transparently handles tracking how much overdrawing is necessary to erase the previous line when used consistently. :param text: The text to be outputted :param newline: Whether to start a new line and reset the length count. :type text: :class:`~__builtins__.str` :type newline: :class:`~__builtins__.bool` """ if not self.isatty: self.fobj.write('%s\n' % text) return msg_len = len(text) self.max_len = max(self.max_len, msg_len) self.fobj.write("\r%-*s" % (self.max_len, text)) if newline or not self.isatty: self.fobj.write('\n') self.max_len = 0
out = OverWriter(sys.stderr) # }}} # {{{ Processing Pipeline
[docs]def getPaths(roots, ignores=None): """ Recursively walk a set of paths and return a listing of contained files. :param roots: Relative or absolute paths to files or folders. :type roots: :class:`~__builtins__.list` of :class:`~__builtins__.str` :param ignores: A list of :py:mod:`fnmatch` globs to avoid walking and omit from results :type ignores: :class:`~__builtins__.list` of :class:`~__builtins__.str` :returns: Absolute paths to only files. :rtype: :class:`~__builtins__.list` of :class:`~__builtins__.str` .. todo:: Try to optimize the ignores matching. Running a regex on every filename is a fairly significant percentage of the time taken according to the profiler. """ paths, count, ignores = [], 0, ignores or [] # Prepare the ignores list for most efficient use ignore_re = multiglob_compile(ignores, prefix=False) for root in roots: # For safety, only use absolute, real paths. root = os.path.realpath(root) # Handle directly-referenced filenames properly # (And override ignores to "do as I mean, not as I say") if os.path.isfile(root): paths.append(root) continue for fldr in os.walk(root): out.write("Gathering file paths to compare... (%d files examined)" % count) # Don't even descend into IGNOREd directories. for subdir in fldr[1]: dirpath = os.path.join(fldr[0], subdir) if ignore_re.match(dirpath): fldr[1].remove(subdir) for filename in fldr[2]: filepath = os.path.join(fldr[0], filename) if ignore_re.match(filepath): continue # Skip IGNOREd files. paths.append(filepath) count += 1 out.write("Found %s files to be compared for duplication." % (len(paths)), newline=True) return paths
[docs]def groupBy(groups_in, classifier, fun_desc='?', keep_uniques=False, *args, **kwargs): """Subdivide groups of paths according to a function. :param groups_in: Grouped sets of paths. :type groups_in: :class:`~__builtins__.dict` of iterables :param classifier: Function to group a list of paths by some attribute. :type classifier: ``function(list, *args, **kwargs) -> str`` :param fun_desc: Human-readable term for what the classifier operates on. (Used in log messages) :type fun_desc: :class:`~__builtins__.str` :param keep_uniques: If ``False``, discard groups with only one member. :type keep_uniques: :class:`~__builtins__.bool` :returns: A dict mapping classifier keys to groups of matches. :rtype: :class:`~__builtins__.dict` :attention: Grouping functions generally use a :class:`~__builtins__.set` ``groups`` as extra protection against accidentally counting a given file twice. (Complimentary to use of :func:`os.path.realpath` in :func:`~fastdupes.getPaths`) .. todo:: Find some way to bring back the file-by-file status text """ groups, count, group_count = {}, 0, len(groups_in) for pos, paths in enumerate(groups_in.values()): out.write("Subdividing group %d of %d by %s... (%d files examined, %d " "in current group)" % ( pos + 1, group_count, fun_desc, count, len(paths) )) for key, group in classifier(paths, *args, **kwargs).items(): groups.setdefault(key, set()).update(group) count += len(group) if not keep_uniques: # Return only the groups with more than one file. groups = dict([(x, groups[x]) for x in groups if len(groups[x]) > 1]) out.write("Found %s sets of files with identical %s. (%d files examined)" % (len(groups), fun_desc, count), newline=True) return groups
[docs]def groupify(function): """Decorator to convert a function which takes a single value and returns a key into one which takes a list of values and returns a dict of key-group mappings. :param function: A function which takes a value and returns a hash key. :type function: ``function(value) -> key`` :rtype: .. parsed-literal:: function(iterable) -> {key: :class:`~__builtins__.set` ([value, ...]), ...} """ @wraps(function) def wrapper(paths, *args, **kwargs): # pylint: disable=missing-docstring groups = {} for path in paths: key = function(path, *args, **kwargs) if key is not None: groups.setdefault(key, set()).add(path) return groups return wrapper
@groupify
[docs]def sizeClassifier(path, min_size=DEFAULTS['min_size']): """Sort a file into a group based on on-disk size. :param paths: See :func:`fastdupes.groupify` :param min_size: Files smaller than this size (in bytes) will be ignored. :type min_size: :class:`__builtins__.int` :returns: See :func:`fastdupes.groupify` .. todo:: Rework the calling of :func:`~os.stat` to minimize the number of calls. It's a fairly significant percentage of the time taken according to the profiler. """ filestat = _stat(path) if stat.S_ISLNK(filestat.st_mode): return # Skip symlinks. if filestat.st_size < min_size: return # Skip files below the size limit return filestat.st_size
@groupify
[docs]def hashClassifier(path, limit=HEAD_SIZE): """Sort a file into a group based on its SHA1 hash. :param paths: See :func:`fastdupes.groupify` :param limit: Only this many bytes will be counted in the hash. Values which evaluate to ``False`` indicate no limit. :type limit: :class:`__builtins__.int` :returns: See :func:`fastdupes.groupify` """ return hashFile(path, limit=limit)
[docs]def groupByContent(paths): """Byte-for-byte comparison on an arbitrary number of files in parallel. This operates by opening all files in parallel and comparing chunk-by-chunk. This has the following implications: - Reads the same total amount of data as hash comparison. - Performs a *lot* of disk seeks. (Best suited for SSDs) - Vulnerable to file handle exhaustion if used on its own. :param paths: List of potentially identical files. :type paths: iterable :returns: A dict mapping one path to a list of all paths (self included) with the same contents. .. todo:: Start examining the ``while handles:`` block to figure out how to minimize thrashing in situations where read-ahead caching is active. Compare savings by read-ahead to savings due to eliminating false positives as quickly as possible. This is a 2-variable min/max problem. .. todo:: Look into possible solutions for pathological cases of thousands of files with the same size and same pre-filter results. (File handle exhaustion) """ handles, results = [], [] # Silently ignore files we don't have permission to read. hList = [] for path in paths: try: hList.append((path, open(path, 'rb'), '')) except IOError: pass # TODO: Verbose-mode output here. handles.append(hList) while handles: # Process more blocks. more, done = compareChunks(handles.pop(0)) # Add the results to the top-level lists. handles.extend(more) results.extend(done) # Keep the same API as the others. return dict((x[0], x) for x in results)
[docs]def compareChunks(handles, chunk_size=CHUNK_SIZE): """Group a list of file handles based on equality of the next chunk of data read from them. :param handles: A list of open handles for file-like objects with otentially-identical contents. :param chunk_size: The amount of data to read from each handle every time this function is called. :returns: Two lists of lists: * Lists to be fed back into this function individually * Finished groups of duplicate paths. (including unique files as single-file lists) :rtype: ``(list, list)`` .. attention:: File handles will be closed when no longer needed .. todo:: Discard chunk contents immediately once they're no longer needed """ chunks = [(path, fh, fh.read(chunk_size)) for path, fh, _ in handles] more, done = [], [] # While there are combinations not yet tried... while chunks: # Compare the first chunk to all successive chunks matches, non_matches = [chunks[0]], [] for chunk in chunks[1:]: if matches[0][2] == chunk[2]: matches.append(chunk) else: non_matches.append(chunk) # Check for EOF or obviously unique files if len(matches) == 1 or matches[0][2] == "": for x in matches: x[1].close() done.append([x[0] for x in matches]) else: more.append(matches) chunks = non_matches return more, done
# }}} # {{{ User Interface
[docs]def pruneUI(dupeList, mainPos=1, mainLen=1): """Display a list of files and prompt for ones to be kept. The user may enter ``all`` or one or more numbers separated by spaces and/or commas. .. note:: It is impossible to accidentally choose to keep none of the displayed files. :param dupeList: A list duplicate file paths :param mainPos: Used to display "set X of Y" :param mainLen: Used to display "set X of Y" :type dupeList: :class:`~__builtins__.list` :type mainPos: :class:`~__builtins__.int` :type mainLen: :class:`~__builtins__.int` :returns: A list of files to be deleted. :rtype: :class:`~__builtins__.int` """ dupeList = sorted(dupeList) print for pos, val in enumerate(dupeList): print "%d) %s" % (pos + 1, val) while True: choice = raw_input("[%s/%s] Keepers: " % (mainPos, mainLen)).strip() if not choice: print ("Please enter a space/comma-separated list of numbers or " "'all'.") continue elif choice.lower() == 'all': return [] try: out = [int(x) - 1 for x in choice.replace(',', ' ').split()] return [val for pos, val in enumerate(dupeList) if pos not in out] except ValueError: print("Invalid choice. Please enter a space/comma-separated list" "of numbers or 'all'.")
# }}}
[docs]def find_dupes(paths, exact=False, ignores=None, min_size=0): """High-level code to walk a set of paths and find duplicate groups. :param exact: Whether to compare file contents by hash or by reading chunks in parallel. :type exact: :class:`~__builtins__.bool` :param paths: See :meth:`~fastdupes.getPaths` :param ignores: See :meth:`~fastdupes.getPaths` :param min_size: See :meth:`~fastdupes.sizeClassifier` :returns: A list of groups of files with identical contents :rtype: ``[[path, ...], [path, ...]]`` """ groups = {'': getPaths(paths, ignores)} groups = groupBy(groups, sizeClassifier, 'sizes', min_size=min_size) # This serves one of two purposes depending on run-mode: # - Minimize number of files checked by full-content comparison (hash) # - Minimize chances of file handle exhaustion and limit seeking (exact) groups = groupBy(groups, hashClassifier, 'header hashes', limit=HEAD_SIZE) if exact: groups = groupBy(groups, groupByContent, fun_desc='contents') else: groups = groupBy(groups, hashClassifier, fun_desc='hashes') return groups
[docs]def delete_dupes(groups, prefer_list=None, interactive=True, dry_run=False): """Code to handle the :option:`--delete` command-line option. :param groups: A list of groups of paths. :type groups: iterable :param prefer_list: A whitelist to be compiled by :func:`~fastdupes.multiglob_compile` and used to skip some prompts. :param interactive: If ``False``, assume the user wants to keep all copies when a prompt would otherwise be displayed. :type interactive: :class:`~__builtins__.bool` :param dry_run: If ``True``, only pretend to delete files. :type dry_run: :class:`~__builtins__.bool` .. todo:: Add a secondary check for symlinks for safety. """ prefer_list = prefer_list or [] prefer_re = multiglob_compile(prefer_list, prefix=True) for pos, group in enumerate(groups.values()): preferred = [x for x in group if prefer_re.match(x)] pruneList = [x for x in group if x not in preferred] if not preferred: if interactive: pruneList = pruneUI(group, pos + 1, len(groups)) preferred = [x for x in group if x not in pruneList] else: preferred, pruneList = pruneList, [] assert preferred # Safety check for path in pruneList: print "Removing %s" % path if not dry_run: os.remove(path)
[docs]def main(): """The main entry point, compatible with setuptools.""" # pylint: disable=bad-continuation from optparse import OptionParser, OptionGroup parser = OptionParser(usage="%prog [options] <folder path> ...", version="%s v%s" % (__appname__, __version__)) parser.add_option('-D', '--defaults', action="store_true", dest="defaults", default=False, help="Display the default values for options which take" " arguments and then exit.") parser.add_option('-E', '--exact', action="store_true", dest="exact", default=False, help="There is a vanishingly small chance of false" " positives when comparing files using sizes and hashes. This option" " enables exact comparison. However, exact comparison requires a lot" " of disk seeks, so, on traditional moving-platter media, this trades" " a LOT of performance for a very tiny amount of safety most people" " don't need.") # XXX: Should I add --verbose and/or --quiet? filter_group = OptionGroup(parser, "Input Filtering") filter_group.add_option('-e', '--exclude', action="append", dest="exclude", metavar="PAT", help="Specify a globbing pattern to be" " added to the internal blacklist. This option can be used multiple" " times. Provide a dash (-) as your first exclude to override the" " pre-programmed defaults.") filter_group.add_option('--min-size', action="store", type="int", dest="min_size", metavar="X", help="Specify a non-default minimum size" ". Files below this size (default: %default bytes) will be ignored.") parser.add_option_group(filter_group) behaviour_group = OptionGroup(parser, "Output Behaviour") behaviour_group.add_option('-d', '--delete', action="store_true", dest="delete", help="Prompt the user for files to preserve and delete " "all others.") behaviour_group.add_option('-n', '--dry-run', action="store_true", dest="dry_run", metavar="PREFIX", help="Don't actually delete any " "files. Just list what actions would be performed. (Good for testing " "values for --prefer)") behaviour_group.add_option('--prefer', action="append", dest="prefer", metavar="PATH", default=[], help="Append a globbing pattern which " "--delete should automatically prefer (rather than prompting) when it " "occurs in a list of duplicates.") behaviour_group.add_option('--noninteractive', action="store_true", dest="noninteractive", help="When using --delete, automatically assume" " 'all' for any groups with no --prefer matches rather than prompting") parser.add_option_group(behaviour_group) parser.set_defaults(**DEFAULTS) # pylint: disable=W0142 opts, args = parser.parse_args() if '-' in opts.exclude: opts.exclude = opts.exclude[opts.exclude.index('-') + 1:] opts.exclude = [x.rstrip(os.sep + (os.altsep or '')) for x in opts.exclude] # This line is required to make it match directories if opts.defaults: print_defaults() sys.exit() groups = find_dupes(args, opts.exact, opts.exclude, opts.min_size) if opts.delete: delete_dupes(groups, opts.prefer, not opts.noninteractive, opts.dry_run) else: for dupeSet in groups.values(): print '\n'.join(dupeSet) + '\n'
if __name__ == '__main__': main() # vim: set sw=4 sts=4 expandtab :