4
\$\begingroup\$

I have made a program that does what the title explains. It also hashes the file to check for changes. I will be doing this for a project to keep track of files that has been changed when doing different things in that certain filetree. I need this to be done as an sqlite/python app to make information easily available regarding the files of interest. This will later be used in an TKinter GUI app and the Parent column contains the rowid for its parent folder to make it work with TKinter. I will make it able to use the Tags column for searching within the app as i progress.

I have been programming for a few years, but not much, I feel a lack in confidence and therefore I am throwing my code out here to get some review. I need to get better at OOP, efficiency and my logical sense needs an update from some bright minds. Please see the full code below, it is two files: filesystemScanner.py:

import os
import hashlib
from pathlib import Path
from mydatabasemanagerfile import MyDatabaseManager
path = "C:\\Users\Skaft\\Documents\\GitHub"
isFolder = True
fullList = []
currhash = ""
db_name = "parenttest.db"
table_name = "Windows10"
my_current_db = MyDatabaseManager(db_name, table_name)
#####################################################################
# copied from https://stackoverflow.com/questions/24937495/
# how-can-i-calculate-a-hash-for-a-filesystem-directory-using-python
#####################################################################
def md5_update_from_dir(directory, my_hash):
 assert Path(directory).is_dir()
 for hash_path in sorted(Path(directory).iterdir(), key=lambda p: str(p).lower()):
 my_hash.update(hash_path.name.encode())
 if hash_path.is_file():
 with open(hash_path, "rb") as f:
 for chunk in iter(lambda: f.read(4096), b""):
 my_hash.update(chunk)
 elif hash_path.is_dir():
 my_hash = md5_update_from_dir(hash_path, my_hash)
 return my_hash
def md5_dir(directory):
 return md5_update_from_dir(directory, hashlib.md5()).hexdigest()
####################################################
# End of copy
####################################################
def file_hasher(hashing_file):
 try:
 if not isFolder:
 with open(hashing_file, "rb") as current:
 readfile = current.read()
 current_hash = hashlib.md5(readfile).hexdigest()
 return current_hash
 except PermissionError:
 current_hash = "Could not hash due to permission"
 return current_hash
for (dirpath, dirnames, filenames) in os.walk(path):
 isFolder = True
 full_dirpath = dirpath + "\\"
 fullList.append([full_dirpath, md5_dir(full_dirpath), isFolder])
 for file in filenames:
 file_path = dirpath + "\\" + file
 isFolder = False
 fullList.append([file_path, file_hasher(file_path), isFolder])
my_current_db.initialize_db()
for file_info in fullList:
 my_current_db.my_updater(file_info[0], file_info[1], file_info[2])
my_current_db.inserting_parentID()
my_current_db.inserting_suffix()
my_current_db.close_db()

And mydatabasemanagerfile.py

import sqlite3
import datetime
import pathlib
class MyDatabaseManager:
 def __init__(self, db_name, table_name):
 self.db_name = db_name
 self.table_name = table_name
 self.conn = sqlite3.connect(self.db_name)
 self.cur = self.conn.cursor()
 def initialize_db(self):
 """Function to open database, create if not exists"""
 print("Opened database successfully")
 # drop_if_exist = "DROP TABLE IF EXISTS {}".format(self.table_name)
 # self.cur.execute(drop_if_exist)
 create_table = '''CREATE TABLE IF NOT EXISTS {}
 ("Path" TEXT NOT NULL,
 "Information" TEXT,
 "Tags" TEXT,
 "Parent" INTEGER,
 "isFolder" INTEGER,
 "fileExt" TEXT,
 "MD5" TEXT,
 "Updated" TEXT,
 "See also" TEXT
 )'''.format(self.table_name)
 self.cur.execute(create_table)
 print("Table created successfully")
 def my_updater(self, path, md5, isFolder):
 query_path = '''SELECT rowid, Path FROM {} WHERE Path = "{}"'''.format(self.table_name, path)
 if self.conn.execute(query_path).fetchone() is None: # Test if returns None.
 result_path = "Please add me" # Placeholder to specify that it needs to be inserted, it does not exist
 result_hash = "Please add me"
 else:
 result_path = self.conn.execute(query_path).fetchone()[1]
 rowid = self.conn.execute(query_path).fetchone()[0]
 query_hash = '''SELECT MD5 FROM {} WHERE rowid = "{}"'''.format(self.table_name, rowid)
 result_hash = self.conn.execute(query_hash).fetchone()[0]
 current_time = datetime.datetime.now()
 if path == result_path and md5 != result_hash:
 query = '''UPDATE {} SET MD5=?, Updated=? WHERE rowid = "{}"'''.format(self.table_name, rowid)
 self.cur.execute(query, (md5, current_time))
 elif path != result_path:
 query = '''INSERT INTO {} (Path, MD5, isFolder, Updated) VALUES (?, ?, ?, ?)'''.format(self.table_name)
 self.cur.execute(query, (path, md5, isFolder, current_time))
 else:
 #print("Path: ", path, "\n", "result_path: ", result_path, "\n", "md5 : ", md5, "\n", "result_hash: ",
 # result_hash, "\n")
 pass
 self.conn.commit()
 def inserting_parentID(self):
 query = '''SELECT rowid, Path FROM {}'''.format(self.table_name)
 all_paths = self.cur.execute(query).fetchall()
 for rowid, path in all_paths:
 parent_path_split = path.split("\\")
 if rowid == 1:
 continue
 if path[-1] == "\\":
 parent_path_joined = "\\".join(parent_path_split[:-2]) + "\\"
 parent_path_joinedads = "\\".join(parent_path_split[:-2]) + "\\"
 else:
 parent_path_joined = "\\".join(parent_path_split[:-1]) + "\\"
 parent_query = '''SELECT rowid FROM {} WHERE Path = "{}"'''.format(self.table_name, parent_path_joined)
 parent_paths = self.cur.execute(parent_query).fetchone()
 update_parentID = '''UPDATE {} SET Parent=? WHERE rowid = "{}"'''.format(self.table_name, rowid)
 self.cur.execute(update_parentID, (parent_paths))
 self.conn.commit()
 def inserting_suffix(self):
 query = '''SELECT rowid, Path FROM {}'''.format(self.table_name)
 all_paths = self.cur.execute(query).fetchall()
 for rowid, path in all_paths:
 ext = pathlib.Path(path).suffix
 if ext == "":
 continue
 update_ext_query = '''UPDATE {} SET fileExt=? WHERE rowid = "{}"'''.format(self.table_name, rowid)
 self.cur.execute(update_ext_query, (ext,))
 self.conn.commit()
 def close_db(self):
 self.conn.close()

Any feedback is appreciated to get a more professional and standardized result. I find this a good way to learn, to view and discuss improvement on my own code.

asked Oct 30, 2021 at 22:32
\$\endgroup\$
5
  • 1
    \$\begingroup\$ Why is this going to SQLite? What can it do that the filesystem itself can't? \$\endgroup\$ Commented Oct 30, 2021 at 23:10
  • \$\begingroup\$ I need to keep track of changes made to the files and there is an "Imformation" column as you can see from the creation of the table. This will be filled by hand to note important info regarding a few of the files. After this is done I need the parent ID for use with TKinter to recreate the filetree structure as it cant make the logic based on the path names itself. Doing this withinin the filesystem removes a lot of the educational angle I need for my project. \$\endgroup\$ Commented Oct 31, 2021 at 6:23
  • \$\begingroup\$ Why not create a .info file or something similar in the folders itself? \$\endgroup\$ Commented Oct 31, 2021 at 9:00
  • \$\begingroup\$ Because size. I only need the representation of the filestructure. Some of the files involved will be larger than 4gb. And the app will be shared \$\endgroup\$ Commented Oct 31, 2021 at 9:54
  • \$\begingroup\$ Perhaps you should consider using QFileSystemWatcher \$\endgroup\$ Commented Nov 9, 2021 at 14:30

1 Answer 1

1
\$\begingroup\$

Avoid hard-coding file paths like this:

path = "C:\\Users\Skaft\\Documents\\GitHub"

That path should be parametric.

Variable names like isFolder should be is_folder (snake_case) via PEP8.

Run-time assertions of the kind seen here:

assert Path(directory).is_dir()

can be trivially disabled in Python and are not appropriate for production code. Raise an exception instead.

You say you have files of multiple GB. You've done the right thing here:

 for chunk in iter(lambda: f.read(4096), b""):
 my_hash.update(chunk)

and reimplemented it, incorrectly, here:

 current_hash = hashlib.md5(readfile).hexdigest()

Consider factoring out a single file-hashing method that uses the chunked approach to go easy on your RAM.

Do not use string interpolation to add query parameters:

 query_path = '''SELECT rowid, Path FROM {} WHERE Path = "{}"'''.format(self.table_name, path)

That's the classic vector for injection attacks. Instead use ? parameter substitution; read the documentation.

Don't use in-band error signalling, like here:

except PermissionError:
 current_hash = "Could not hash due to permission"
 return current_hash

If both a hash and your error message are a string, it's problematic for a caller to tell them apart. Just let the permission error exception fall through and catch it on the calling side.

answered Nov 5, 2021 at 15:53
\$\endgroup\$
1
  • 1
    \$\begingroup\$ Really appreciate your feedback! Ill work on it! \$\endgroup\$ Commented Nov 8, 2021 at 22:27

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.