This is my first Python project in file management. It's a finished and passed exercise from JetBrains (Duplicate File Handler).
The main job is reading a full path to folder from argument, list files of the folder, grouping them based on same size and md5 hash (md5 required for passing the test) and then asking for deletion based on the input number (ex 1 3 6) separated with space (input number is referring to the full path of the file that user wants to delete.
Is there any way to make the code more clever and avoid nested for loops to make it faster? Especially in duplicates_check
function and in delete_duplicates
function.
Also I was confused a bit in the grouping part duplicates_check
function were we are called to print in groups of same bytes for example:
5550640 bytes
Hash: 909ba4ad2bda46b10aac3c5b7f01abd5
- root_folder/poker_face.mp3
- root_folder/poker_face_copy.mp3
3422208 bytes
Hash: a7f5f35426b927411fc9231b56382173
3. root_folder/audio/classic/unknown.mp3
4. root_folder/masterpiece/rick_astley_never_gonna_give_you_up.mp3
Hash: b6d767d2f8ed5d21a44b0e5886680cb9
5. root_folder/masterpiece/the_magic_flute_queen_of_the_night_aria.mp3
6. root_folder/masterpiece/the_magic_flute_queen_of_the_night_aria_copy.mp355540 bytes
Hash: 6b92a4ad2bda46b10aac3c5b7f013821
7. root_folder/poker_face.mp3
8. root_folder/poker_face_copy.mp3
You can see that if the bytes are the same and only the hash changes, the line with bytes doesn't get printed again so I created a new list to append the bytes that already has been iterated to pass the test.
I couldn't think of another way to check for example if in the duplicates_list variable that is a dictionary of
duplicates_list = {(hash1, bytes1): [path of files that those keys are referring to, ...],
(hash2, bytes1) : [path of files that those keys are referring to, ...]}
If bytes1
are the same for both keys don't print them on separate lines (as different keys) and instead print them as the example.
Right now everything works great I just want to see different more clever way to achieve this.
# write your code here
import argparse
import os
import hashlib
# import sys
# args = sys.argv
# if len(args) < 2:
# print('Directory is not specified')
parser = argparse.ArgumentParser(description="You must enter a folder to list files in dir and subdir")
parser.add_argument("folder", nargs="?")
args = parser.parse_args()
choice = args.folder
FILTER_BY_EXTENSION = False
# Function to get files in directory and create list with dictionaries (dic per file) with file attributes
# (filename, extension, bytes, full path and md5_hash)
def file_listing():
files_values = []
if choice is None:
print("Directory is not specified")
else:
for root, dirs, files in os.walk(choice):
for name in files:
hash_md5 = hashlib.md5()
with open(os.path.join(root, name), "rb") as f:
for chunk in iter(lambda: f.read(1024), b""):
hash_md5.update(chunk)
file_values = {
"filename": name,
"extension": name[name.rfind(".") + 1:],
"bytes": os.path.getsize(os.path.join(root, name)),
"full_path": os.path.join(root, name),
"md5_hash": hash_md5.hexdigest()
}
files_values.append(file_values)
return files_values
def file_filter_by_extension(files_values):
global FILTER_BY_EXTENSION
extension = input("Enter file format:\n")
if len(extension) == 0:
return False
extension_dictionary = []
for i in files_values:
if i["extension"] == extension:
extension_dictionary.append(i)
print()
FILTER_BY_EXTENSION = True
return extension_dictionary
# Grouping files by size and return a sorted list with pairs of (bytes, [{full path, md5_hash]}).
# If bytes are the same the pairs are (bytes, [{full path, md5_hash}...{full path, md5_hash}]
def bytes_filter(files_values):
sorting_type = input('''Size sorting options:
1. Descending
2. Ascending\n''')
print()
bytes_dictionary = {key["bytes"]: [] for key in files_values}
sorted_list = []
for i in files_values:
bytes_dictionary[i["bytes"]].append({"full_path": i["full_path"], "md5_hash": i["md5_hash"]})
while True:
if sorting_type in ["1", "2"]:
break
else:
print("Wrong option\nEnter a sorting option:\n")
sorting_type = input()
if sorting_type == "1":
for i, j in sorted(bytes_dictionary.items(), reverse=True):
print(f'{i} bytes')
sorted_list.append([i, j])
for files in range(len(j)):
print(j[files]["full_path"])
print()
if sorting_type == "2":
for i, j in sorted(bytes_dictionary.items()):
sorted_list.append([i, j])
print(f'{i} bytes')
for files in range(len(j)):
print(j[files]["full_path"])
print()
return sorted_list
# Checking for duplicates - Constructing a duplicates dicts and a list to print by group if bytes are the same.
# Printed values: Bytes \n Hash \n full path of files with line numbering.
# If the bytes are the same in the next different file pairs we dont print bytes again instead
# we print only hash and full path else we print again a completely new line Bytes \n Hash \n full path
def duplicates_check(files_values):
duplicates_ask = input('Check for duplicates?\n').lower()
while True:
if duplicates_ask in ["yes", "no"]:
if duplicates_ask == "yes":
break
elif duplicates_ask == "no":
return False
else:
print("Wrong option\n")
duplicates_ask = input()
duplicates_list = {}
line_number = 1
for i in files_values:
for j in i[1]:
if (j["md5_hash"], i[0]) not in duplicates_list:
duplicates_list[tuple((j["md5_hash"], i[0]))] = []
duplicates_list[(j["md5_hash"], i[0])].append(j["full_path"])
else:
duplicates_list[tuple((j["md5_hash"], i[0]))].append(j["full_path"])
# Constructing a final list to return with numbered duplicate files
temporary_bytes = []
final_duplicate_list = {}
for key, value in duplicates_list.items():
if len(duplicates_list[(key[0], key[1])]) > 1:
if key[1] not in temporary_bytes:
temporary_bytes.append(key[1])
print()
print(f'{key[1]} bytes\nHash: {key[0]}')
else:
print(f'Hash: {key[0]}')
for path_value in duplicates_list[(key[0], key[1])]:
print(f'{line_number}. {path_value}')
final_duplicate_list[str(line_number)] = path_value
line_number += 1
return final_duplicate_list
def delete_duplicates(list_duplicates, working_directory):
print()
total_bytes_removed = 0
duplicates_delete = input('Delete files?\n').lower()
while True:
if duplicates_delete in ["yes", "no"]:
if duplicates_delete == "yes":
break
elif duplicates_delete == "no":
return False
else:
print("\nWrong option")
duplicates_delete = input()
duplicates_index = input("\nEnter file numbers to delete:\n").strip().split(" ")
while True:
if set(duplicates_index).issubset(list_duplicates):
break
else:
print("\nWrong format")
duplicates_index = input().strip().split(" ")
for i in duplicates_index:
for j in range(len(working_directory)):
if list_duplicates[i] == working_directory[j]["full_path"]:
total_bytes_removed += working_directory[j]["bytes"]
os.remove(list_duplicates[i])
print(f'\nTotal freed up space: {total_bytes_removed} bytes')
def main():
directory = file_listing()
extension_filter = file_filter_by_extension(directory)
if FILTER_BY_EXTENSION:
bytes_filtering = bytes_filter(extension_filter)
duplicates = duplicates_check(bytes_filtering)
delete_duplicates(duplicates, directory)
else:
bytes_filtering = bytes_filter(directory)
duplicates = duplicates_check(bytes_filtering)
delete_duplicates(duplicates, directory)
if __name__ == main():
main()
1 Answer 1
Delete commented-out code blocks like
# import sys
# args = sys.argv
# if len(args) < 2:
# print('Directory is not specified')
This is the kind of thing that git branches should be used for.
Move this code:
parser = argparse.ArgumentParser(description="You must enter a folder to list files in dir and subdir")
parser.add_argument("folder", nargs="?")
args = parser.parse_args()
choice = args.folder
into a function. choice
is not a good variable name in this case.
FILTER_BY_EXTENSION
is not a constant, so don't capitalise it and don't leave it global; pass it around in function parameters.
choice
should never be None
. You should not use ?
in your argument parsing; you should force the user to pass the argument.
Use pathlib.Path
instead of os.path
where alternatives exist.
For performance you should be using a much larger block size such as 16 MiB.
I forgot (or never knew) the sentinel-form of the iter()
built-in, so thank you for teaching me!
Do not represent file records as dictionaries. Use a class. Since the class is immutable and needs to be sortable, NamedTuple
makes this easy. Many of the fields in your file record should not be written out at all, since Path
makes these easy to be calculated inline.
Whenever you .append()
-and-return
as you do in file_listing
, consider using an iterator instead.
file_filter_by_extension
should not return False
-or-a-list. Instead, unconditionally return an iterable, and the size of the iterable should be conditional on the user's input.
Expressions like in ["1", "2"]:
should use a set literal {}
instead.
Use defaultdict(list)
to simplify your duplicate aggregation logic.
Factor out a function to ask a yes-no question with a validation loop.
In your input validation loops, Wrong option
is not a very good way to describe what happened, since there is not one right answer: instead the user input an invalid option.
Consider yielding the number of bytes deleted from a loop and applying sum()
afterward.
This is buggy:
if __name__ == main():
main()
since it will unconditionally run main
even if an importer doesn't want to; and will always evaluate to False
. Instead, you need to compare to the string __main__
.
To make your selection friendlier for the user, rather than 1
/2
for your order options, why not ask for a
/d
?
Similarly, rather than gathering deletion indices all in one shot via your
input("\nEnter file numbers to delete:\n")
you should instead stagger this choice per file, and also ask for the single file to keep, not ask for all the files to delete.
You call hexdigest()
too early, because you use a string for business logic when that string should only be used for presentation. Just call digest()
instead, which uses a more compact bytes
representation. This will be marginally faster for internal comparison, and you can in turn call .hex()
on that to get a presentable string.
Suggested
import argparse
import os
import hashlib
from collections import defaultdict
from pathlib import Path
from typing import Iterator, Iterable, NamedTuple
CHUNK_SIZE = 4 * 1024 * 1024
class HashedFile(NamedTuple):
size: int
path: Path
md5_hash: bytes
def __str__(self) -> str:
return str(self.path)
@classmethod
def load(cls, root: Path, name: str) -> 'HashedFile':
path = root / name
md5_hash = hashlib.md5()
with path.open('rb') as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), b''):
md5_hash.update(chunk)
return cls(path.stat().st_size, path, md5_hash.digest())
def file_listing(directory: str) -> Iterator[HashedFile]:
for root, dirs, files in os.walk(directory):
root_path = Path(root)
for name in files:
yield HashedFile.load(root_path, name)
def filter_by_extension(files: Iterable[HashedFile]) -> Iterator[HashedFile]:
extension = input('Enter file extension to select, or press enter for all: ')
if len(extension) == 0:
yield from files
else:
for file in files:
if file.path.suffix == '.' + extension:
yield file
def print_by_size(files: Iterable[HashedFile]) -> None:
print('Size sorting options:'
'\n d. Descending'
'\n a. Ascending')
while True:
order = input('Enter a sorting option: ')
if order in {'a', 'd'}:
break
print('Invalid option')
print()
for file in sorted(files, reverse=order == 'd'):
print(f'{file.size:9} bytes: {file}')
def ask_yn(prompt: str) -> bool:
while True:
answer = input(f'{prompt} (y|n)? ')[:1].lower()
if answer in {'y', 'n'}:
return answer == 'y'
print('Invalid option')
def find_duplicates(files: Iterable[HashedFile]) -> Iterator[list[HashedFile]]:
duplicates = defaultdict(list)
for file in files:
duplicates[file.size, file.md5_hash].append(file)
for (size, md5_hash), dupe_files in duplicates.items():
if len(dupe_files) > 1:
print(f'Size: {size} bytes, MD5: {md5_hash.hex()}')
for i, file in enumerate(dupe_files, 1):
print(f'{i:4}. {file}')
yield dupe_files
def delete_duplicates(duplicate_groups: Iterable[list[HashedFile]]) -> Iterator[int]:
for files in duplicate_groups:
while True:
keep_str = input('Enter file number to keep: ')
if keep_str.isdigit():
keep_index = int(keep_str) - 1
if 0 <= keep_index < len(files):
keep = files[keep_index]
break
print('Invalid option')
for file in files:
if file is not keep:
print(f' Deleting {file}')
file.path.unlink()
yield file.size
def main() -> None:
parser = argparse.ArgumentParser(description='Handle duplicate files')
parser.add_argument('folder', help='folder to list files in dir and subdir')
args = parser.parse_args()
directory = file_listing(args.folder)
directory = tuple(filter_by_extension(directory))
print_by_size(directory)
print()
if ask_yn('Check for duplicates'):
duplicates = find_duplicates(directory)
if ask_yn('Delete duplicates'):
bytes_removed = sum(delete_duplicates(duplicates))
print(f'\nTotal freed up space: {bytes_removed} bytes')
else:
for _ in duplicates: # consume iterator to print
pass
if __name__ == '__main__':
main()
Output
Enter file extension to select, or press enter for all:
Size sorting options:
d. Descending
a. Ascending
Enter a sorting option: a
47 bytes: 277665/UserInput_277665/.idea/.gitignore
271 bytes: 277665/UserInput_277665/.idea/misc.xml
356 bytes: 277665/UserInput_277665/.idea/modules.xml
423 bytes: 277665/com.stackexchange.userinput/com.stackexchange.userinput.iml
640 bytes: 277665/Main.java
640 bytes: 277665/com.stackexchange.userinput/src/Main.java
2108 bytes: 277665/Main.class
2108 bytes: 277665/UserInput_277665/out/production/com.stackexchange.userinput/Main.class
2234 bytes: 277665/UserInput_277665/.idea/workspace.xml
Check for duplicates (y|n)? y
Delete duplicates (y|n)? y
Size: 2108 bytes, MD5: 89085fddc797fda5dd8319f85b206177
1. 277665/Main.class
2. 277665/UserInput_277665/out/production/com.stackexchange.userinput/Main.class
Enter file number to keep: 2
Deleting 277665/Main.class
Size: 640 bytes, MD5: a21c362935f1afb357bb8e9ce5d7acbb
1. 277665/Main.java
2. 277665/com.stackexchange.userinput/src/Main.java
Enter file number to keep: 2
Deleting 277665/Main.java
Total freed up space: 2748 bytes
-
1\$\begingroup\$ Thank you very much for the answer. Your suggestions are really helpful and they will help me think better. I will study them deeply. \$\endgroup\$Kostas– Kostas2022年07月24日 22:00:01 +00:00Commented Jul 24, 2022 at 22:00