In an attempt to clean out my picture collection I made a script to find all duplicate files
import datetime
import hashlib
import json
import typing
from collections import defaultdict
from pathlib import Path
def filesize(path: Path) -> int:
"""returns the filesize"""
return path.stat().st_size
def hash_first(
path: Path, hash_function=hashlib.md5, blocksize: int = 65536
) -> str:
"""returns the hash of the first block of the file"""
with path.open("rb") as afile:
return hash_function(afile.read(blocksize)).hexdigest()
def hash_all(path: Path, hash_function=hashlib.md5, blocksize=65536) -> str:
"""returns the hash of the whole file"""
with path.open("rb") as afile:
hasher = hash_function()
buf = afile.read(blocksize)
while buf:
hasher.update(buf)
buf = afile.read(blocksize)
return hasher.hexdigest()
def compare_files(
comparison: typing.Callable[[Path], typing.Hashable], duplicates: dict
) -> dict:
"""
Subdivides each group along `comparison`
discards subgroups with less than 2 items
"""
results: defaultdict = defaultdict(list)
for old_hash, files in duplicates.items():
for file in files:
results[(*old_hash, comparison(file))].append(file)
return {
filehash: files
for filehash, files in results.items()
if len(files) > 1
}
def find_duplicates(
rootdir: Path,
comparisons: typing.Iterable[
typing.Callable[[Path], typing.Hashable]
] = (),
):
"""
Finds duplicate files in `rootdir` and its subdirectories
Returns a list with subgroups of identical files
Groups the files along the each of the comparisons in turn.
Subgroups with less than 2 items are discarded.
Each of the `comparisons` should be a callable that accepts a
`pathlib.Path` as only argument, and returns a hashable value
if `comparisons` is not defined, compares along:
1. file size
2. MD5 hash of the first block (65536 bytes)
3. MD5 hash of the whole file
"""
if not comparisons:
comparisons = filesize, hash_first, hash_all
duplicates = {(): filter(Path.is_file, rootdir.glob("**/*"))}
for comparison in comparisons:
duplicates = compare_files(comparison, duplicates)
return [tuple(map(str, files)) for files in duplicates.values()]
if __name__ == "__main__":
pic_dir = r"./testdata"
results = find_duplicates(rootdir=Path(pic_dir))
# print(json.dumps(results, indent=2))
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = Path("./output") / f"{timestamp}.json"
with output_file.open("w") as fh:
json.dump(results, fh, indent=2)
Any remarks on this code?
The code is formatted with black
(linelength 79), with mypy
pylint
and pylama
as linters.
If I try to specify the dict
as argument and return value to compare_files
to typing.Dict[typing.Tuple[typing.Hashable, ...], typing.Iterable[Path]]
, mypy
complains: Incompatible types in assignment (expression has type "Dict[Tuple[Hashable, ...], Iterable[Path]]", variable has type "Dict[Tuple[], Iterator[Path]]")
3 Answers 3
- Instead of
rootdir.glob("**/*")
you can writerootdir.rglob("*")
.rglob
adds "**/
" automatically before the given pattern. - You specify the return type of the
comparison
functions asHashable
. I'm not really sure why. Are you thinking that you will have morecomparison
functions in the future that would return something other thanint
orstr
? I think you could limit it toFileHash = Union[int, str]
for now. I doubt that you will have functions that would return such hashable objects as, for example,namedtuple
s orfrozenset
s. Maybe I don't understand something, but I don't really see the point why
compare_files
function exists. I would expect it to take as an argument an iterable of paths to the files, but instead, it takes them packed in a dictionary which doesn't make much sense to me. By taking the logic out of it to the outerfind_duplicates
function you will also avoid the problem with mypy complaining about incompatible types. It could look, for example, like thisFileHash = Union[int, str] Hashes = Tuple[FileHash, ...] def find_duplicates(rootdir: Path, comparisons: Iterable[Callable[[Path], FileHash]] = () ) -> List[Tuple[str, ...]]: if not comparisons: comparisons = filesize, hash_first, hash_all files = filter(Path.is_file, rootdir.rglob("*")) files_per_hashes: Dict[Hashes, List[Path]] = defaultdict(list) for file in files: key = tuple(hasher(file) for hasher in comparisons) files_per_hashes[key].append(file) duplicates = (files for files in files_per_hashes.values() if len(files) > 1) return [tuple(map(str, files)) for files in duplicates]
You are missing type hints for
hash_function
s. Unfortunately, I'm not sure how to solve the problem with their return types. If you are on Python 3.7, you could try something like this (idea taken from https://github.com/python/typeshed/issues/2928):from __future__ import annotations Hasher = Union[Callable[[ByteString], hashlib._hashlib.HASH], Callable[[], hashlib._hashlib.HASH]] ... def hash_first(path: Path, hash_function: Hasher = hashlib.md5, blocksize: int = 65536) -> str: ...
but mypy doesn't like it. It says:
error: Name 'hashlib._hashlib.HASH' is not defined
Maybe you could dig a bit more in this direction.
I also think that it would be better to make the
comparisons
as anOptional
argument in thefind_duplicates
function with default value set toNone
because right now you use tuple as an immutable version of list (the data you keep in that tuple is homogeneous but tuples are meant to keep heterogenous data, and the number of elements in the tuples is usually fixed). As Guido van Rossum said: "Tuples are *not* read-only lists."
Some suggestions:
- I would usually inline single line functions unless what they do is hard to understand from just reading the command.
filesize
might be one such candidate. - I believe that not specifying the block size can be faster, by allowing Python to use whatever block size it thinks is appropriate. This would need some actual testing though.
Your function arguments are only partially typed. You might want to use a stricter
mypy
configuration, such as the following, in your setup.cfg:[mypy] check_untyped_defs = true disallow_untyped_defs = true ignore_missing_imports = true no_implicit_optional = true warn_redundant_casts = true warn_return_any = true warn_unused_ignores = true
- MD5 was optimized for cryptographic use, not for speed. Another Q&A explores options for fast non-cryptographic hashing.
- I would avoid converting digests to strings - it might be slightly easier for debugging purposes, but data type conversions are in general very costly.
- Optional parameters mean you have to test at least two things rather than one. Your code only ever calls the
hash_*
functions without optional parameters, so you might as well either inline them. - On a related note, static values which are used in multiple places are perfect for pulling out as constants. The hashing function and block size would be obvious candidates for this.
- Mutable default parameter values are an accident waiting to happen. The default pattern for this is to use a default of
None
and to assign a defaultif foo is None
. - r-strings like
r"./testdata"
are meant for regular expressions. Sincepic_dir
is not used as such it should probably be a regular string. duplicates
is at first a list of potential duplicates and later is trimmed in stages. This makes the code hard to follow.- Things like the directory/directories to include should be arguments.
argparse
can deal with this easily. - To make this more scriptable I would print the result on standard output. It's trivial to then redirect it to a file.
I'm writing my own answer here because I incorporated some of the remarks of both other answers, and ignored other ones. In this answer I explain why I took certain design choices.
@Georgy:
- good tip
- I don't expect much other than
str
andint
, butbytes
is a possibility too (hash.digest
) - The use of
compare_files
is to also whittle the subgroups down (if len(files)> 1). I stagger the comparisons from fast to more expensive. There might be a lot of files with the same size, but if they don't have the same checksum in their first few bytes, there is no use in comparing the complete file. When done with an step in between logging how much files got compared at each stage, I saw the numbers dwindling step per step. - the typeshed has a class signature for "hashlib._Hash", so I adapted this to a
typing.Protocol
@l0b0
- I didn't inline them because I want to refer to them as named functions. I could make lambda's out of them, but that signifies the intent less than a named function
filesize
- I suggest a blocksize here to prevent python from reading in complete 8GB files into memory to take a digest`
- nice tip on the stricter mypy. It's tedious to do this for an existing code base, but it has showed me some bugs I wouldn't have otherwise spotted otherwise, so I'm gradually introducing this elsewhere as well.
- I used md5 because I could easily verify the md5hash against an external tool to test my implementation. Later I moved to CRC32.
- In my first versions I exported the
duplicates
in between iterations, and then a string representation was handy, and to compare it to the output of external tools to compute the hash - True
- good tip
- Where did I use a mutable default parameter?
- I use r-strings for Windows paths as well. Then I don't have to escape all the
\
. Recently I found out VS Code makes a distinction in how it representsr
andR
strings, so I switched toR
(Apparentlyblack
doesn't like them, and changes them tor""
) - That is the tricky part I have not been able to convey cleanly, but this is to dwindle down the more expensive operations. I renamed it to
potential_duplicates
- and 12. Correct, and this might be a good opportunity for a next version.
Other things
- I include
pydocstyle
in my routing, so the docstring requirements have become a bit more strict. I haven't found the patience to annotate every parameter. I hope the variable names are clear enough - I included a
min_filesize
argument tofind_duplicates
to be able to focus on the larger duplicates when trying to reduce disk usage
crc32.hash.py
"""CRC32 adapted to the `hashlib._Hash` protocol."""
from __future__ import annotations
import typing
import zlib
class Hash(typing.Protocol):
"""Protocol for a hash algorithm."""
digest_size: int
block_size: int
name: str
def __init__(
self, data: typing.Union[bytes, bytearray, memoryview] = ...
) -> None:
"""Protocol for a hash algorithm."""
...
def copy(self) -> Hash:
"""Return a copy of the hash object."""
...
def digest(self) -> bytes:
"""Return the digest of the data passed to the update() method."""
...
def hexdigest(self) -> str:
"""Like digest() except the digest is returned as a string object.
Like digest() except the digest is returned as a string object
of double length, containing only hexadecimal digits.
"""
...
def update(self, arg: typing.Union[bytes, bytearray, memoryview]) -> None:
"""Update the hash object with the bytes-like object."""
...
class CRC32(Hash):
"""Adapts zlib.crc32" for the hashlib protocol."""
# docstring borrowed from https://docs.python.org/3/library/hashlib.html
digest_size = -1
block_size = -1
name = "crc32"
def __init__(self, data: typing.Union[bytes, bytearray, memoryview] = b""):
"""Adapts zlib.crc32" for the hashlib protocol."""
self._checksum = zlib.crc32(bytes(data))
def copy(self) -> CRC32:
"""Return a copy of the hash object."""
duplicate = CRC32()
duplicate._checksum = self._checksum
return duplicate
def digest(self) -> bytes:
"""Return the digest of the data passed to the update() method."""
return self._checksum.to_bytes(2, byteorder="big")
def hexdigest(self) -> str:
"""Like digest() except the digest is returned as a string object.
Like digest() except the digest is returned as a string object
of double length, containing only hexadecimal digits.
"""
return f"{self._checksum:X}"
def update(self, arg: typing.Union[bytes, bytearray, memoryview]) -> None:
"""Update the hash object with the bytes-like object."""
self._checksum = zlib.crc32(arg, self._checksum)
if __name__ == "__main__":
test_files = {"CHANGELOG.rst": "C171C371"}
for filename, expected in test_files.items():
with open(filename, "rb") as f:
checksum = CRC32(f.read()).hexdigest()
print(f"{filename}: expected: {expected}; calculated: {checksum}")
find_duplicates.py
"""Code to find duplicate files."""
from __future__ import annotations
import datetime
import hashlib
import json
import typing
from collections import defaultdict
from pathlib import Path
from crc32hash import CRC32, Hash
_Duplicates = typing.Mapping[
typing.Tuple[typing.Hashable, ...], typing.Iterable[Path]
]
DEFAULT_BLOCKSIZE = 65536
def filesize(path: Path) -> int:
"""Return the filesize."""
return path.stat().st_size
def hash_first(
path: Path,
hash_function: typing.Callable[[bytes], Hash] = hashlib.sha256,
blocksize: int = DEFAULT_BLOCKSIZE,
) -> str:
"""Return the hash of the first block of the file."""
with path.open("rb") as afile:
return hash_function(afile.read(blocksize)).hexdigest()
def hash_all(
path: Path, hash_function: typing.Callable[[bytes], Hash] = hashlib.sha256,
) -> str:
"""Return the hash of the whole file."""
with path.open("rb") as afile:
hasher = hash_function(b"")
buf = afile.read(DEFAULT_BLOCKSIZE)
while buf:
hasher.update(buf)
buf = afile.read(DEFAULT_BLOCKSIZE)
return hasher.hexdigest()
def crc_first(path: Path, blocksize: int = DEFAULT_BLOCKSIZE,) -> str:
"""Return the crc32 hash of the first block of the file."""
return hash_first(path=path, hash_function=CRC32, blocksize=blocksize)
def crc32_all(path: Path) -> str:
"""Return the crc32 hash of the whole file."""
return hash_all(path=path, hash_function=CRC32)
def compare_files(
comparison: typing.Callable[[Path], typing.Hashable],
potential_duplicates: _Duplicates,
) -> _Duplicates:
"""Subdivide each group along `comparison`.
Discards subgroups with less than 2 items
"""
results: typing.DefaultDict[
typing.Tuple[typing.Hashable, ...], typing.List[Path]
] = defaultdict(list)
for old_hash, files in potential_duplicates.items():
for file in files:
results[(*old_hash, comparison(file))].append(file)
return {
filehash: files
for filehash, files in results.items()
if len(files) > 1
}
def find_duplicates(
rootdir: Path,
comparisons: typing.Optional[
typing.Iterable[typing.Callable[[Path], typing.Hashable]]
] = None,
min_filesize: typing.Optional[int] = None,
) -> typing.List[typing.Tuple[str, ...]]:
"""Find duplicate files in `rootdir` and its subdirectories.
Returns a list with subgroups of identical files
Groups the files along the each of the comparisons in turn.
Subgroups with less than 2 items are discarded.
Each of the `comparisons` should be a callable that accepts a
`pathlib.Path` as only argument, and returns a hashable value
if `comparisons` is not defined, compares along:
1. file size
2. CRC32 hash of the first block (65536 bytes)
3. CRC32 hash of the whole file
"""
if comparisons is None:
comparisons = [filesize, crc_first, crc32_all]
potential_duplicates: _Duplicates = {
(): (file for file in rootdir.rglob("*") if file.is_file())
}
if min_filesize is not None:
potential_duplicates = {
key: (file for file in files if filesize(file) > min_filesize)
for key, files in potential_duplicates.items()
}
for comparison in comparisons:
potential_duplicates = compare_files(comparison, potential_duplicates)
return [tuple(map(str, files)) for files in potential_duplicates.values()]
if __name__ == "__main__":
duplicate_dir = Path(r".")
results = find_duplicates(rootdir=duplicate_dir, min_filesize=int(1e8))
# print(json.dumps(results, indent=2))
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = Path("./output") / f"{timestamp}.json"
with output_file.open("w") as fh:
json.dump(results, fh, indent=2)
Explore related questions
See similar questions with these tags.