I have made a program that does what the title explains. It also hashes the file to check for changes. I will be doing this for a project to keep track of files that has been changed when doing different things in that certain filetree. I need this to be done as an sqlite/python app to make information easily available regarding the files of interest. This will later be used in an TKinter GUI app and the Parent column contains the rowid for its parent folder to make it work with TKinter. I will make it able to use the Tags column for searching within the app as i progress.
I have been programming for a few years, but not much, I feel a lack in confidence and therefore I am throwing my code out here to get some review. I need to get better at OOP, efficiency and my logical sense needs an update from some bright minds. Please see the full code below, it is two files: filesystemScanner.py:
import os
import hashlib
from pathlib import Path
from mydatabasemanagerfile import MyDatabaseManager
path = "C:\\Users\Skaft\\Documents\\GitHub"
isFolder = True
fullList = []
currhash = ""
db_name = "parenttest.db"
table_name = "Windows10"
my_current_db = MyDatabaseManager(db_name, table_name)
#####################################################################
# copied from https://stackoverflow.com/questions/24937495/
# how-can-i-calculate-a-hash-for-a-filesystem-directory-using-python
#####################################################################
def md5_update_from_dir(directory, my_hash):
assert Path(directory).is_dir()
for hash_path in sorted(Path(directory).iterdir(), key=lambda p: str(p).lower()):
my_hash.update(hash_path.name.encode())
if hash_path.is_file():
with open(hash_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
my_hash.update(chunk)
elif hash_path.is_dir():
my_hash = md5_update_from_dir(hash_path, my_hash)
return my_hash
def md5_dir(directory):
return md5_update_from_dir(directory, hashlib.md5()).hexdigest()
####################################################
# End of copy
####################################################
def file_hasher(hashing_file):
try:
if not isFolder:
with open(hashing_file, "rb") as current:
readfile = current.read()
current_hash = hashlib.md5(readfile).hexdigest()
return current_hash
except PermissionError:
current_hash = "Could not hash due to permission"
return current_hash
for (dirpath, dirnames, filenames) in os.walk(path):
isFolder = True
full_dirpath = dirpath + "\\"
fullList.append([full_dirpath, md5_dir(full_dirpath), isFolder])
for file in filenames:
file_path = dirpath + "\\" + file
isFolder = False
fullList.append([file_path, file_hasher(file_path), isFolder])
my_current_db.initialize_db()
for file_info in fullList:
my_current_db.my_updater(file_info[0], file_info[1], file_info[2])
my_current_db.inserting_parentID()
my_current_db.inserting_suffix()
my_current_db.close_db()
And mydatabasemanagerfile.py
import sqlite3
import datetime
import pathlib
class MyDatabaseManager:
def __init__(self, db_name, table_name):
self.db_name = db_name
self.table_name = table_name
self.conn = sqlite3.connect(self.db_name)
self.cur = self.conn.cursor()
def initialize_db(self):
"""Function to open database, create if not exists"""
print("Opened database successfully")
# drop_if_exist = "DROP TABLE IF EXISTS {}".format(self.table_name)
# self.cur.execute(drop_if_exist)
create_table = '''CREATE TABLE IF NOT EXISTS {}
("Path" TEXT NOT NULL,
"Information" TEXT,
"Tags" TEXT,
"Parent" INTEGER,
"isFolder" INTEGER,
"fileExt" TEXT,
"MD5" TEXT,
"Updated" TEXT,
"See also" TEXT
)'''.format(self.table_name)
self.cur.execute(create_table)
print("Table created successfully")
def my_updater(self, path, md5, isFolder):
query_path = '''SELECT rowid, Path FROM {} WHERE Path = "{}"'''.format(self.table_name, path)
if self.conn.execute(query_path).fetchone() is None: # Test if returns None.
result_path = "Please add me" # Placeholder to specify that it needs to be inserted, it does not exist
result_hash = "Please add me"
else:
result_path = self.conn.execute(query_path).fetchone()[1]
rowid = self.conn.execute(query_path).fetchone()[0]
query_hash = '''SELECT MD5 FROM {} WHERE rowid = "{}"'''.format(self.table_name, rowid)
result_hash = self.conn.execute(query_hash).fetchone()[0]
current_time = datetime.datetime.now()
if path == result_path and md5 != result_hash:
query = '''UPDATE {} SET MD5=?, Updated=? WHERE rowid = "{}"'''.format(self.table_name, rowid)
self.cur.execute(query, (md5, current_time))
elif path != result_path:
query = '''INSERT INTO {} (Path, MD5, isFolder, Updated) VALUES (?, ?, ?, ?)'''.format(self.table_name)
self.cur.execute(query, (path, md5, isFolder, current_time))
else:
#print("Path: ", path, "\n", "result_path: ", result_path, "\n", "md5 : ", md5, "\n", "result_hash: ",
# result_hash, "\n")
pass
self.conn.commit()
def inserting_parentID(self):
query = '''SELECT rowid, Path FROM {}'''.format(self.table_name)
all_paths = self.cur.execute(query).fetchall()
for rowid, path in all_paths:
parent_path_split = path.split("\\")
if rowid == 1:
continue
if path[-1] == "\\":
parent_path_joined = "\\".join(parent_path_split[:-2]) + "\\"
parent_path_joinedads = "\\".join(parent_path_split[:-2]) + "\\"
else:
parent_path_joined = "\\".join(parent_path_split[:-1]) + "\\"
parent_query = '''SELECT rowid FROM {} WHERE Path = "{}"'''.format(self.table_name, parent_path_joined)
parent_paths = self.cur.execute(parent_query).fetchone()
update_parentID = '''UPDATE {} SET Parent=? WHERE rowid = "{}"'''.format(self.table_name, rowid)
self.cur.execute(update_parentID, (parent_paths))
self.conn.commit()
def inserting_suffix(self):
query = '''SELECT rowid, Path FROM {}'''.format(self.table_name)
all_paths = self.cur.execute(query).fetchall()
for rowid, path in all_paths:
ext = pathlib.Path(path).suffix
if ext == "":
continue
update_ext_query = '''UPDATE {} SET fileExt=? WHERE rowid = "{}"'''.format(self.table_name, rowid)
self.cur.execute(update_ext_query, (ext,))
self.conn.commit()
def close_db(self):
self.conn.close()
Any feedback is appreciated to get a more professional and standardized result. I find this a good way to learn, to view and discuss improvement on my own code.
-
1\$\begingroup\$ Why is this going to SQLite? What can it do that the filesystem itself can't? \$\endgroup\$Reinderien– Reinderien2021年10月30日 23:10:01 +00:00Commented Oct 30, 2021 at 23:10
-
\$\begingroup\$ I need to keep track of changes made to the files and there is an "Imformation" column as you can see from the creation of the table. This will be filled by hand to note important info regarding a few of the files. After this is done I need the parent ID for use with TKinter to recreate the filetree structure as it cant make the logic based on the path names itself. Doing this withinin the filesystem removes a lot of the educational angle I need for my project. \$\endgroup\$Adrian Kydland Skaftun– Adrian Kydland Skaftun2021年10月31日 06:23:39 +00:00Commented Oct 31, 2021 at 6:23
-
\$\begingroup\$ Why not create a .info file or something similar in the folders itself? \$\endgroup\$Thallius– Thallius2021年10月31日 09:00:34 +00:00Commented Oct 31, 2021 at 9:00
-
\$\begingroup\$ Because size. I only need the representation of the filestructure. Some of the files involved will be larger than 4gb. And the app will be shared \$\endgroup\$Adrian Kydland Skaftun– Adrian Kydland Skaftun2021年10月31日 09:54:31 +00:00Commented Oct 31, 2021 at 9:54
-
\$\begingroup\$ Perhaps you should consider using QFileSystemWatcher \$\endgroup\$jdt– jdt2021年11月09日 14:30:51 +00:00Commented Nov 9, 2021 at 14:30
1 Answer 1
Avoid hard-coding file paths like this:
path = "C:\\Users\Skaft\\Documents\\GitHub"
That path should be parametric.
Variable names like isFolder
should be is_folder
(snake_case) via PEP8.
Run-time assertions of the kind seen here:
assert Path(directory).is_dir()
can be trivially disabled in Python and are not appropriate for production code. Raise an exception instead.
You say you have files of multiple GB. You've done the right thing here:
for chunk in iter(lambda: f.read(4096), b""):
my_hash.update(chunk)
and reimplemented it, incorrectly, here:
current_hash = hashlib.md5(readfile).hexdigest()
Consider factoring out a single file-hashing method that uses the chunked approach to go easy on your RAM.
Do not use string interpolation to add query parameters:
query_path = '''SELECT rowid, Path FROM {} WHERE Path = "{}"'''.format(self.table_name, path)
That's the classic vector for injection attacks. Instead use ?
parameter substitution; read the documentation.
Don't use in-band error signalling, like here:
except PermissionError:
current_hash = "Could not hash due to permission"
return current_hash
If both a hash and your error message are a string, it's problematic for a caller to tell them apart. Just let the permission error exception fall through and catch it on the calling side.
-
1\$\begingroup\$ Really appreciate your feedback! Ill work on it! \$\endgroup\$Adrian Kydland Skaftun– Adrian Kydland Skaftun2021年11月08日 22:27:13 +00:00Commented Nov 8, 2021 at 22:27
Explore related questions
See similar questions with these tags.