Goal: Iterate through 100+ text files, parse for specific types of rows, return the row length for the desired row ONCE. Once, because these files have about 5-6 different types of rows with varying lengths but the same type of row will have the exact length. So, once the program has found a desired row, return the row length for that first instance. Ignore other rows that meet this criteria. Move to the next row type, return the row length for the first instance of that row type.
Outcome: A report that returns the file_date, row type, and row length for each file.
Issue: We have written something that does this... However, it took 9 hours to scan 48 files. :(
This is my first stab at python so I am certain there's a better approach. I am hoping someone with much more Python experience can provide pointers for improvement.
import os
from datetime import datetime
def write_log_file(month_year, rec15_len, rec1_len, rec7_len):
output_file = "log.txt"
with open(output_file, "a") as f1:
f1.write(month_year + ",1.5," + str(rec15_len))
f1.write("\n")
f1.write(month_year + ",1," + str(rec1_len))
f1.write("\n")
f1.write(month_year + ", 7," + str(rec7_len))
f1.write("-------------------------\n")
f1.close()
def check_rec1(row, record):
if record == "1":
if row[44] == "5":
rec_len = len(row)
# print("found 15")
return rec_len
else:
rec_len = len(row)
# print("found 1")
return rec_len
def check_rec7(row, record):
if record == "7":
rec_len = len(row)
# print("found 7")
return rec_len
def process_files(folder):
for folder, subfolders, filenames in os.walk(folder):
print("the current folder is " + folder)
for subfolder in subfolders:
print("subfolder= " + folder + ": " + subfolder)
for filename in filenames:
if filename.startswith("itec") and filename.endswith(".cmp"):
print("found file name " + filename)
xname_cmp = filename.split(".")
xname = xname_cmp[0].split("_")
month_year = xname[2] + "/" + xname[1]
print("Processing file:" + filename)
print("Start processing time: " + str(datetime.now()))
cmp_file = open(folder + "/" + filename, "r+")
print("reading file ", filename)
read_rows(cmp_file, month_year)
cmp_file.close()
print("End processing time: " + str(datetime.now()))
print("-------------------------------------------------")
else:
print(f"Skipping {filename}")
return
def find_rec(line):
split_line = line.split(" ")
record_type = split_line[1]
# print(len(record_type))
rec_list = record_type.lstrip()
return rec_list
def read_rows(cmp_file, month_year):
count = 0
process_row = True
rec15_len = 0
rec1_len = 0
rec7_len = 0
row_list = []
while process_row:
if (rec1_len != 0) and (rec15_len != 0) and (rec7_len != 0):
# print("process row false")
process_row = False
else:
# print("process row true")
line = cmp_file.readline()
if not line:
process_row = False
break
new_line = line.rstrip()
count += 1
row_list[:0] = find_rec(new_line)
record_type = row_list[0]
# print("stuff " + record_type)
if record_type == "1":
if row_list[1] == "5":
rec15_len = check_rec1(new_line, record_type)
# print(f"rec15 length {str(rec15_len)} {count}")
else:
rec1_len = check_rec1(new_line, record_type)
# print(f"rec1 length {str(rec1_len)} {count}")
elif record_type == "7":
check_len = check_rec7(new_line, record_type)
if check_len > 0:
rec7_len = check_len
# print(f"rec7 length {str(rec7_len)} {count}")
# if (type1 == True) and (type15 == True) and (type7 == True):
print("finished processing all recs true")
write_log_file(month_year, rec15_len, rec1_len, rec7_len)
output_file = "log.txt"
header = "file_date,record_type,length"
with open(output_file, "w") as f1:
f1.write(header)
f1.write("\n")
f1.close()
folder_name = "files"
process_files(folder_name)
```
1 Answer 1
String concatenation
Don't use +
on strings, either use f-strings or str.format
:
filename = 'hello'
foldername = 'world'
newfile = f'{foldername}_{filename}.txt'
# or
newfile = '{}_{}.txt'.format(foldername, filename)
Processing files
When opening files, it's best to use the with
context manager, which eliminates you having to remember to close()
the file handle later:
with open(myfile) as fh:
# process fh
# close has automatically been invoked here
Next, it's usually a good pattern to iterate over a file using for
rather than while
:
with open(myfile) as fh:
for line in fh:
line = line.rstrip()
# do something here
enumerate
You are also maintaining a counter in a loop:
while True:
~snip~
count += 1
When you convert this to a for loop, you can use enumerate to deal with that for you:
for count, line in enumerate(fh):
# do something
Process rows
In your check_rec_[n]
functions you do an if record == n
check. This doesn't need to happen, since you should have filtered that ahead of time to call the function. So drop them:
def check_rec1(row, record):
if row[44] == "5":
rec_len = len(row)
# print("found 15")
return rec_len
else:
rec_len = len(row)
# print("found 1")
return rec_len
def check_rec7(row, record):
rec_len = len(row)
# print("found 7")
return rec_len
But examining them shows that they all do the exact same thing:
def check_rec(row, record):
return len(row)
So record
is a redundant variable. Remove it from the function:
def check_rec(row):
return len(row)
Now this is just unnecessarily wrapping a call to len
. So anywhere you have a check_rec_[n]
, replace it with len(row)
:
for count, line in enumerate(fh):
line = line.rstrip()
row_list[:0] = find_rec(new_line)
record_type = row_list[0]
if record_type == "1":
if row_list[1] == "5":
rec15_len = len(line)
# print(f"rec15 length {str(rec15_len)} {count}")
else:
rec1_len = len(line)
# print(f"rec1 length {str(rec1_len)} {count}")
elif record_type == "7":
check_len = len(line)
if check_len > 0:
rec7_len = check_len
Checking for empty strings
if check_len > 0
should just be if line
:
elif record_type == '7':
if line:
rec7_len = len(line)
Your original break condition
You are checking if all((rec1_len, rec7_len, rec15_len))
. Since that will break
the loop, don't use an else
, use it as a guard:
for count, line in enumerate(fh):
if all((rec1_len, rec7_len, rec15_len)):
break
line = line.rstrip()
...
However, this naming indicates there's a better way. There is, use a dictionary:
record_lens = {
"1": 0,
"15": 0,
"7": 0,
}
for count, line in enumerate(fh):
# do things
record_lens[record] = len(line)
And your guard becomes:
for count, line in enumerate(fh):
if all(record_lens.values()):
break
Furthermore, if you've already populated it in the dictionary, that means that you can very simply look up the length and skip it if it's nonzero:
if record_lens[record_type]:
# no need to re-calculate
continue
find_rec
You are splitting line
on multiple spaces and then returning rec_list
. This could be slightly refactored as the following:
def find_rec(line):
_, record_type, *_ = line.split(" ")
return record_type.strip()
I now think that this is small enough that I wouldn't make it a separate function.
The issue is that you are prepending to your list here:
row_list[:0] = find_rec(new_line)
This is a bit sneaky, because the list slice will split up your record_type
, so if you have two digits (the number 15
, for instance`, you have to do two checks. A few things:
- Prepending to a list is expensive
- You don't use the collected results at all
So don't collect a list here, read the value you've returned. It's already a string!
record_type = find_rec(new_line)
if record_lens[record_type]:
continue
else:
record_lens[record_type] = len(line)
Last, this assumes that you only have three record_types. If you have more, this will fail, so to further generalize it, let's use an empty dictionary and the dict.get
function to prevent pesky KeyError
s:
mydict = {}
# the key isn't in the dictionary
# so the lookup fails
mydict['thing']
KeyError
# now the lookup succeeds
mydict.get('thing')
None
Now, the only thing that will change is the guard clause can go away, because we are skipping all rows that are currently processed:
for count, line in enumerate(fh):
# no if check here
line = line.rstrip()
_, record_type, *_ = line.split(" ")
# if a value is present, skip
if record_lens.get(record_type):
continue
# otherwise, set it
else:
record_lens[record_type] = len(line)
So let's look at what we have so far as a refactored effort:
def read_rows(fh):
record_lens = {}
for count, line in enumerate(fh):
line = line.rstrip()
_, record_type, *_ = line.split(" ")
if record_lens.get(record_type):
continue
else:
record_lens[record_type] = len(line)
# let's make sure we return this
return record_lens
month_year
is only used to log to the file, which I'd prefer to do separately. So let's drop it for now.
Combining the with
for the file handle on opening:
def process_files(folder):
for folder, subfolders, filenames in os.walk(folder):
print("the current folder is " + folder)
for subfolder in subfolders:
print("subfolder= " + folder + ": " + subfolder)
for filename in filenames:
if filename.startswith("itec") and filename.endswith(".cmp"):
print("found file name " + filename)
xname_cmp = filename.split(".")
xname = xname_cmp[0].split("_")
month_year = xname[2] + "/" + xname[1]
print("Processing file:" + filename)
print("Start processing time: " + str(datetime.now()))
with open(f'{folder}/{filename}') as fh:
print("reading file ", filename)
# we will do something with this momentarily
record_lens = read_rows(fh)
print("End processing time: " + str(datetime.now()))
print("-------------------------------------------------")
else:
print(f"Skipping {filename}")
return
Using if
as a guard, again
When walking your files, you can avoid unnecessary indentation by checking for the case you want to skip first and using continue
:
def process_files(folder):
for folder, subfolders, filenames in os.walk(folder):
print("the current folder is " + folder)
for filename in filenames:
if not filename.startswith("itec"):
continue
elif not filename.endswith(".cmp"):
continue
# no else needed
Tuple unpacking
I used this earlier, but when using split
, you can unpack the result into your desired variables directly:
_, month, year, *_ = filename.split(".")[0].split('_')
month_year = f'{month}/{year}'
Where the _
denotes a throwaway or unused quantity, and the *_
is to ensure I capture any trailing elements:
somelist = list('abcd')
_, b, c, *_ = somelist
print(b, c)
'b' 'c'
Style: Add line breaks
This function:
def process_files(folder):
for folder, subfolders, filenames in os.walk(folder):
print("the current folder is " + folder)
for subfolder in subfolders:
print("subfolder= " + folder + ": " + subfolder)
for filename in filenames:
if filename.startswith("itec") and filename.endswith(".cmp"):
print("found file name " + filename)
xname_cmp = filename.split(".")
xname = xname_cmp[0].split("_")
month_year = xname[2] + "/" + xname[1]
print("Processing file:" + filename)
print("Start processing time: " + str(datetime.now()))
cmp_file = open(folder + "/" + filename, "r+")
print("reading file ", filename)
read_rows(cmp_file, month_year)
cmp_file.close()
print("End processing time: " + str(datetime.now()))
print("-------------------------------------------------")
else:
print(f"Skipping {filename}")
return
Is a nearly impossible to read wall-of-text. Break it up. Refactored:
def process_files(folder):
for folder, subfolders, filenames in os.walk(folder):
print(f"the current folder is {folder}")
for filename in filenames:
if not filename.startswith("itec"):
continue
elif not filename.endswith(".cmp"):
continue
print("found file name {filename}")
xname_cmp = filename.split(".")
xname = xname_cmp[0].split("_")
month_year = f"{xname[2]}/{xname[1]}"
print(f"Processing file: {filename}")
print("Start processing time: {datetime.now()}"))
with open(f'{folder}/{filename}') as fh:
print("reading file ", filename)
# we will do something with this momentarily
record_lens = read_rows(fh)
print(f"End processing time: {datetime.now()}"))
print("-------------------------------------------------")
return
I'm going to create a generator expression with the yield
statement in order to log here:
def process_files(folder):
for folder, subfolders, filenames in os.walk(folder):
print(f"the current folder is {folder}")
for filename in filenames:
if not filename.startswith("itec"):
continue
elif not filename.endswith(".cmp"):
continue
print(f"found file name {filename}")
_, month, year, *_ = filename.split(".")[0].split('_')
date = f'{year}/{month}'
print(f"Processing file: {filename}")
print(f"Start processing time: {datetime.now()}")
with open(f'{folder}/{filename}') as fh:
print(f"reading {filename} ")
record_lens = read_rows(fh)
print(f"End processing time: {datetime.now()}"))
print("-------------------------------------------------")
yield record_lens, date
Now I can iterate over the result of this function and it evaluates lazily:
def write_log_file(date, records):
output_file = "log.txt"
with open(output_file, "a") as f1:
for rec_type, val in records.items():
f1.write(f"{date},{rec_type},{val}")
f1.write('\n')
for filename, date, record_lens in process_files(folder_name):
write_log_file(date, record_lens)
I've also simplified the log to file function by just taking the dictionary as a parameter.
Separating folder filtering
It might make the code a bit easier to read if we break up the process_folder
function into two distinct parts:
- Get files
- Process file
Let's start with just getting the file names themselves. I'm going to lean on the generator concept again:
def get_filenames(folder):
for folder, subfolders, filenames in os.walk(folder):
print(f"the current folder is {folder}")
for filename in filenames:
if not filename.startswith("itec"):
continue
elif not filename.endswith(".cmp"):
continue
print(f"found file name {filename}")
_, month, year, *_ = filename.split(".")[0].split('_')
yield f'{folder}/{filename}', f'{year}/{month}'
This now very clearly just builds the paths and dates for each file in the folder.
Now, when we process each file, we can do that in the main loop without having to clutter up the process_folder
function:
for file, date in get_filenames(folder):
with open(file) as fh:
records = read_rows(fh)
# do something
Logging
You are repeatedly opening and closing your log file, you could use a more global file handle:
def write_log_file(fh, date, records):
for rec_type, val in records.items():
fh.write(f"{date},{rec_type},{val}")
fh.write('\n')
# open this globally
with open(logfile, 'w') as fh:
for file, date in get_filenames(folder_name):
with open(file) as gh:
record_lens = read_rows(gh)
# reuse it here
write_log_file(fh, date, record_lens)
But the best way is to use the logging
module:
import logging
# create logger
logger = logging.getLogger('Process CMP Files')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('log.txt')
fh.setLevel(logging.DEBUG)
# create formatter and add it to the handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
# add the handler to the logger
logger.addHandler(fh)
Now, anytime you want to log:
for filepath, date get_filenames(folder_name):
with open(filepath) as fh:
records = read_rows(fh)
for rec_type, val in records.items():
logger.info(f"{date},{rec_type},{val}")
And we can just eliminate your log_to_file
function entirely.
main
function
Let's wrap your functionality in a main
function:
def main(folder_name):
for filepath, date in get_filenames(folder_name):
with open(filepath) as fh:
records = read_rows(fh)
for rec_type, val in records.items():
logger.info(f"{date},{rec_type},{val}")
And let's add an if __name__
guard to make your code more portable if you want to import any of these functions into another module:
def main(folder_name):
for filepath, date get_filenames(folder_name):
with open(filepath) as fh:
records = read_rows(fh)
for rec_type, val in records.items():
logger.info(f"{date},{rec_type},{val}")
if __name__ == "__main__":
# you can now process this as an input
# arg to your function
folder_name = sys.argv[1]
main(folder_name)
Result
Here's the end result:
from datetime import datetime
import logging
import os
# create a global logger
logger = logging.getLogger('Process CMP Files')
logger.setLevel(logging.DEBUG)
# create file handler which logs info messages
fh = logging.FileHandler('log.txt')
fh.setLevel(logging.INFO)
# Let's add a console handler to eliminate print statements
ch = logging.StreamHandler()
# take debug messages
ch.setLevel(logging.DEBUG)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
def read_rows(fh):
record_lens = {}
for count, line in enumerate(fh):
line = line.rstrip()
_, record_type, *_ = line.split(" ")
if record_lens.get(record_type):
continue
else:
record_lens[record_type] = len(line)
return record_lens
def get_filenames(folder):
for folder, subfolders, filenames in os.walk(folder):
logger.debug(f"the current folder is {folder}")
for filename in filenames:
if not filename.startswith("itec"):
continue
elif not filename.endswith(".cmp"):
continue
logger.debug(f"found file name {filename}")
_, month, year, *_ = filename.split(".")[0].split('_')
yield f'{folder}/{filename}', f'{year}/{month}'
def main(folder_name):
for filename, date in get_filenames(folder_name):
logger.debug(f"Processing file: {filename}")
logger.debug(f"Start processing time: {datetime.now()}")
with open(filename) as fh:
records = read_rows(fh)
logger.debug(f"End processing time: {datetime.now()}")
logger.debug("-" * 49)
for rec_type, val in records.items():
logger.info(f"{date},{rec_type},{val}")
if __name__ == "__main__":
# you can now process this as an input
# arg to your function
folder_name = sys.argv[1]
main(folder_name)
Revisiting filepaths
with pathlib
You can use new style paths with the pathlib
module, which simplifies your os.walk
loop significantly:
from pathlib import Path
def get_filenames(folder):
path = Path(folder)
# the '**/' allows for a recursive glob
for file in path.glob('**/itec*.cmp'):
logger.debug(f"Found file {file}")
_, month, year, *_ = file.stem.split('_')
# open works with Path objects
yield file, f'{year}/{month}'