We have a department in our org that generates a lot of data in flat files, all with different formats. We are now trying to sort this data out and load into a database. As step 1, I am trying to identify the structure(separator, header, fields, datatypes, etc) of the files using a python script. This is my first time doing python programming, also I am programming after a long time. While this code gives me the output I need, how can I make it better?
The code looks at the directory C:\logs and loops through all files in the directory first to identify a separator and header, then if the separator is space it will check if it is a fixed width file. use the information here to identify all fields and their related attributes and prints out a mapping file. It also combines multiple mapping files into one mapping file if possible (this allows the mapping sheet to accommodate format variances across multiple files). I want to be able to use this on Excel files later.
SAMPLE INPUT:
Input
Example Input in txt format below. However this code is supposed to identify 'any character' delimited or fixed width files. Files can have any number of header lines, etc.
Some Title
Grp IDS Date Time Weight
1 3559 3-10-19 8:45 1159.4
1 3680 3-10-19 8:40 1861.2
1 3844 3-10-19 8:36 2039.4
1 3867 3-10-19 8:38 1861.2
1 3985 3-10-19 8:40 1729.2
1 4009 3-10-19 8:46 1883.2
1 4014 3-10-19 8:36 1920.6
1 4044 3-10-19 8:41 1689.6
1 4058 3-10-19 8:39 1764.4
1 4192 3-10-19 8:43 1775.4
1 4344 3-10-19 8:43 1449.8
1 4354 3-10-19 8:49 1555.4
1 4356 3-10-19 8:31 1091.2
1 4359 3-10-19 8:43 0.0
1 4361 3-10-19 8:44 1689.6
1 4365 3-10-19 8:43 1513.6
2 3347 3-10-19 8:34 1867.8
2 3860 3-10-19 8:37 1788.6
2 3866 3-10-19 8:33 1980.0
2 4004 3-10-19 8:24 1634.6
2 4020 3-10-19 8:29 1612.6
2 4086 3-10-19 8:35 1553.2
2 4139 3-10-19 8:22 1883.2
2 4145 3-10-19 8:27 1177.0
2 4158 3-10-19 8:33 0.0
2 4186 3-10-19 8:29 1586.2
2 4193 3-10-19 8:28 1746.8
2 4202 3-10-19 8:28 1870.0
2 4215 3-10-19 8:31 1104.4
2 4348 3-10-19 8:33 1628.0
2 4369 3-10-19 8:32 1392.6
2 4374 3-10-19 8:33 1394.8
OUTPUT:
# Import Libraries
import os
import csv
import re
import pandas as pd
from collections import Counter
from dateutil.parser import parse
import calendar
# Set Variables
path = "C:\Logs"
printableChars = list('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;?@[\\]^_`{|}~ \t\n\r\x0b\x0c')
skipSeperator = list('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ:.!"#$%&\'()*+@[]\n\r\x0b\x0c')
masterFields=pd.DataFrame(columns=['Target Name','Source Name','Column Position','Column Width','DataType','Size','Format', 'KEY','Default','Comments','Unique', 'ImpactsGrain', 'HasNULLs'])
consolidate= True
debug=False
# Identify if any column is duplicate. It needs to be removed from further analysis
def getDuplicateColumns(df):
'''
Get a list of duplicate columns.
It will iterate over all the columns in dataframe and find the columns whose contents are duplicate.
:param df: Dataframe object
:return: List of columns whose contents are duplicates.
'''
duplicateColumnNames = set()
# Iterate over all the columns in dataframe
for x in range(df.shape[1]):
# Select column at xth index.
col = df.iloc[:, x]
# Iterate over all the columns in DataFrame from (x+1)th index till end
for y in range(x + 1, df.shape[1]):
# Select column at yth index.
otherCol = df.iloc[:, y]
# Check if two columns at x 7 y index are equal
if col.equals(otherCol):
duplicateColumnNames.add(df.columns.values[y])
return list(duplicateColumnNames)
# Identify how column values are separated within a row.
def identifySeparator(name):
if debug: print( "Currently analyzing file : " + name)
currentFile = open(path +"\\" +name, "r")
# Create a list of charecters by Line
characterFrequencies =[]
for line in currentFile:
characters = list(line)
if characters[0] == '\n':
continue
characterFrequencies.append(Counter(characters))
if debug: print(pd.DataFrame(characterFrequencies).head())
df = pd.DataFrame(characterFrequencies)
df = df.drop(columns=skipSeperator, errors='ignore') # Remove characters that are generally not used as sperators.
if debug: print("Potential Seperators")
# METHOD 1: Try to identify seperator with the understanding it should be present in every row.
dfDense = df.dropna(axis='columns')
if debug: print(dfDense.head())
Candidates = dfDense.columns
if debug: print("Number of characters present in every row : " + str(len(dfDense.columns)))
if len(dfDense.columns) == 1:
Separator = str(dfDense.columns[0])
if debug: print("Separator identified as : " + Separator + ' using METHOD 1')
else:
Separator = '-1'
if debug: print('Unable to identify seperator using METHOD 1 as 0 or multiple exist!!')
# METHOD 2: Least Variance: The count of the seperator should be more or less same across rows.
if debug: print('% of rows missing the charecter')
if debug: print(df.isna().sum()/ df.isna().count())
cleanupCandidates=df.dropna(axis='columns', thresh = (df.shape[0] + 1)*.8).fillna(-1)
if debug: print('Dropping characters not present in 80% of the columns')
if debug: print(cleanupCandidates.head())
lowestVariance = 0
spaceDetectedFlag = False
Separator2 = ''
for character in cleanupCandidates.columns:
if debug: print('**********' + character +' **********')
x = cleanupCandidates.loc[:,character].var()
if debug: print('Calculated variance : ' + str(x) )
if character == ' ':
spaceDetectedFlag = True
if debug: print('Potential position based file...')
continue
if lowestVariance >= x:
lowestVariance =x
Separator2 = character
if debug: print("Separator identified as : " + Separator2 + ' using METHOD 2')
if Separator == Separator2:
commonSep= Separator
else:
commonSep = list(set(Candidates).intersection(cleanupCandidates.columns))
if debug: print('Both methods identify '+ str(commonSep) + 'as one of the separator candidates.')
maxMode = 0
modeTable = cleanupCandidates.mode()
if len(commonSep) != 1:
print ('Multiple Common Seperator!! Use Max MODE', cleanupCandidates.columns)
if debug: print(cleanupCandidates.mode())
for column in modeTable.columns:
x = modeTable.loc[0,column]
print (column,'\'s Mode: ', x)
if x > maxMode:
commonSep = column
maxMode = x
if debug: print('Resolved ambiguity by Max Mode Method to: ', commonSep)
# Identify if header rows need to be skipped
firstRow = cleanupCandidates[commonSep].idxmax()
if debug: print('The Header is expected to be in row: ',firstRow)
return commonSep[0], firstRow
#candidates = []
#candidates.append(Counter(characters))
def identifyFixedWidth(name):
numberLines = 0
totalCharecters = 0
maxLength = 0
for line in open(path +"\\" +name, "r"):
numberLines +=1
totalCharecters += len(line)
if len(line) <= 2: numberLines -=1
if maxLength < len(line): maxLength = len(line)
avgChars =totalCharecters/numberLines
if debug: print('Sample file has '+ str(numberLines) + ' lines. There are an average '+ str(avgChars) +' charecters pe line.')
counter = 0
for line in open(path +"\\" +name, "r"):
if debug: print(str(counter) + ' has ' + str(len(line)) + ' chars')
if len(line)<= avgChars*.9 or len(line)>=avgChars*1.1:
if debug: print('Line '+ str(counter) +': Maybe part of header and needs to be skipped')
counter+=1
else:
if debug: print('Header found at column :', counter)
break
# Figure out Column Start and stop positions
rowCounter = -1
colPos = []
for line in open(path +"\\" +name, "r"):
rowCounter+=1
if rowCounter < counter: continue
blanks=[m.start() for m in re.finditer(' ', line)]
if len(blanks)>2:
colPos.append([m.start() for m in re.finditer(' ', line)])
if rowCounter<=5:
if debug: print(colPos[-1])
# Intersection
Common = list(set.intersection(*map(set, colPos)))
if debug: print('Potential field separator positions: ',Common)
# Remove sequential values
newCommon = []
for x in Common:
if ((x-1) not in Common):newCommon.append(x)
newCommon.append(maxLength)
if debug: print('Field separator positions identified as :',newCommon)
#Calculate Width
width=[]
range = len(newCommon)
i=0
for x in newCommon[0:range-1]:
width.append(newCommon[i+1]-newCommon[i])
i+=1
if debug: print('Column Lengths:', width)
return counter,width
# Parse File and collect Field Information
def identifyFields(name,separator, HeaderPos, width):
if debug: print( "Currently analyzing column structure for file : " + name)
currentFile = path +"\\" +name
if separator !='FWF':
df = pd.read_csv(currentFile, sep=separator, parse_dates=False, skiprows=HeaderPos)
else:
df = pd.read_fwf(currentFile, parse_dates=False, header=HeaderPos, widths=width)
if debug: print('Opening File as Fixed Width')
dupCols = getDuplicateColumns(df)
df = df.drop(columns=dupCols)
fieldStructure = pd.DataFrame(index=[df.columns], columns=['Target Name','Source Name','Column Position','Column Width','DataType','Size','Format', 'KEY','Default','Comments','Unique', 'ImpactsGrain', 'HasNULLs'])
totalRowCount = df.shape[0]
totalDupRows = df[df.duplicated()].shape[0]
if totalDupRows > 0:
print('!!!!!!!!!WARNING!!!!!!!! This file contains ' + str(totalDupRows) + ' duplicate rows!')
if len(dupCols)> 0:
fieldStructure.loc['Duplicate','Target Name'] = 'DUPLICATE Fields:' + '-'.join(dupCols)
print('!!!!!!!!!WARNING!!!!!!!! The following columns were identified as a duplicate column and will be removed from the final mapping')
if debug: print(dupCols)
if debug: print('Columns in the Dataset',df.columns)
if debug: print(fieldStructure)
counter = 1
for fieldName in df.columns:
print('Processing Field: ' + fieldName)
fieldStructure.loc[fieldName,'Source Name'] = fieldName
fieldStructure.loc[fieldName,'Target Name'] = fieldName
fieldStructure.loc[fieldName,'Column Position'] = counter
if separator =='FWF':fieldStructure.loc[fieldName,'Column Width'] = width[counter-1]
counter +=1
fieldStructure.loc[fieldName,'DataType'] = str(df[fieldName].dtypes).replace('64', '',1)
if str(df[fieldName].dtypes).replace('64', '',1) == 'float':
if df[fieldName].fillna(1).apply(float.is_integer).all():
fieldStructure.loc[fieldName,'DataType'] = 'int'
else:
fieldStructure.loc[fieldName,'DataType'] = 'float'
format = ''
dateFlg = True
if df[fieldName].isnull().all(): fieldStructure.loc[fieldName,'DataType'] = 'Unknown'
if df[fieldName].dtypes == 'object':
fieldValue = str(df.loc[df.loc[:,fieldName].first_valid_index(),fieldName])
if debug: print('First non NaN Index & Value: ',str(df.loc[:,fieldName].first_valid_index()), str(fieldValue))
try:
dateTime = parse(fieldValue,fuzzy=False)
except ValueError:
dateFlg = False
if dateFlg == True:
shortMonth = False
shortDate = False
if debug: print('Input Date:', fieldValue)
if debug: print('Interpreted Date',dateTime)
yrSt = fieldValue.find(str(dateTime.year))
if debug: print('Year Start Position: ',yrSt)
format = fieldValue.replace(str(dateTime.year), 'yyyy',1)
if yrSt == -1:
format = fieldValue.replace(str(dateTime.year)[2:], 'yy',1)
if debug: print('Year format: ',format)
noMonth=False
monSt = format.find(str(dateTime.month).zfill(2))
if debug: print('2 Digit Month Position:',monSt)
format = format.replace(str(dateTime.month).zfill(2), 'mm',1)
if monSt == -1:
monSt1 = format.find(str(dateTime.month))
if monSt1 != -1:
shortMonth = True
format = format.replace(str(dateTime.month).zfill(1), 'mm',1)
else:
noMonth = True
#Check if Month Name or Abbreviations are used
if noMonth:
abbr=map(str.upper,calendar.month_abbr[1:])
fmnth=map(str.upper,calendar.month_name[1:])
if debug: print(abbr)
for mon in abbr:
if(mon in format.upper()):
if debug: print('Month Abbr used in this date format', mon)
noMonth = False
format = format.replace(mon, 'MMM',1)
for mon in fmnth:
if(mon in format.upper()):
if debug: print('Month Name used in this date format', mon)
noMonth = False
format = format.replace(mon, 'MMMM',1)
if debug: print('Month Format: ',format)
daySt = format.find(str(dateTime.day).zfill(2))
if debug: print('2 Digit Day Position: ',daySt)
format = format.replace(str(dateTime.day).zfill(2), 'dd',1)
if daySt == -1:
daySt1 = format.find(str(dateTime.day))
if daySt1 != -1 and not noMonth:
shortDate = True
format = format.replace(str(dateTime.day), 'dd',1)
if debug: print('Day format: ',format)
hhSt = format.find(str(dateTime.hour).zfill(2))
if debug: print('2 digit Hour Position: ',hhSt)
format = format.replace(str(dateTime.hour).zfill(2), 'HH',1)
if debug: print('2 digit Hour Format: ',format)
if hhSt == -1:
hhSt = format.find(str(dateTime.hour).zfill(2))
if debug: print('24 Hour Format Position: ',hhSt)
format = format.replace(str(dateTime.hour).zfill(2), 'HH',1)
if debug: print('24 Hour Format: ',format)
if hhSt == -1:
hhSt = format.find(str(dateTime.hour))
if debug: print('1 digit Hour Position: ',hhSt)
format = format.replace(str(dateTime.hour), 'H',1)
mnSt = format.find(str(dateTime.minute).zfill(2))
if debug: print('Mins Position:',mnSt)
format = format.replace(str(dateTime.minute).zfill(2), 'MM',1)
if debug: print('Mins Format: ',format)
secSt = format.find(str(dateTime.second).zfill(2))
if debug: print('Seconds Position',secSt)
format = format.replace(str(dateTime.second).zfill(2), 'SS',1)
if debug: print('Seconds Format',format)
if shortMonth or shortDate:
format = format + ';' + format.replace(str('mm'), 'm',1).replace(str('dd'), 'd',1)
if debug: print('Date Format Identified as :', format)
fieldStructure.loc[fieldName,'DataType'] = 'Timestamp'
else:
fieldStructure.loc[fieldName,'DataType'] = 'String'
if df[fieldName].isnull().all():
fieldStructure.loc[fieldName,'Size'] = 0
else:
fieldStructure.loc[fieldName,'Size'] = df[fieldName].map(str).apply(len).max()
fieldStructure.loc[fieldName,'Format'] = format
fieldStructure.loc[fieldName,'Unique'] = df[fieldName].is_unique
dftmp = df.drop(columns=fieldName)
# if debug: print(dftmp.head())
dupRows = dftmp[dftmp.duplicated()].shape[0]
if dupRows > totalDupRows:
grainFlg = True
else:
grainFlg = False
fieldStructure.loc[fieldName,'ImpactsGrain'] = grainFlg # NEEDS MORE ANALYSIS
if df[fieldName].isna().sum() > 0:
fieldStructure.loc[fieldName,'HasNULLs'] = True
else:
fieldStructure.loc[fieldName,'HasNULLs'] = False
if debug: print(df.columns)
if debug: print(fieldStructure)
return fieldStructure
# Merge all File Mappings to master mapping file.
def addToMaster(fieldStructure):
if debug: print('Consolidating Multiple Mappings.....')
#if debug: print(fieldStructure.loc[:,'Target Name'])
#masterFields
#masterFields.loc[:,'Format']='XXX'
masterFields['Format'] = masterFields.Format.apply(lambda x: x if not (pd.isnull(x) or x=='') else 'XXX')
if debug: print(masterFields['Format'])
for index, row in fieldStructure.iterrows():
#if debug: print(row)
masterFields.loc[row['Target Name'],'Target Name']=row['Target Name']
masterFields.loc[row['Target Name'],'Source Name']=row['Source Name']
if pd.isnull(masterFields.loc[row['Target Name'],'Column Position']):
masterFields.loc[row['Target Name'],'Column Position']=row['Column Position']
else:
if masterFields.loc[row['Target Name'],'Column Position']!=row['Column Position']:
if debug: print(bcolors.WARNING + "WARNING: Column positions vary by file."+ bcolors.ENDC)
if pd.isnull(masterFields.loc[row['Target Name'],'Column Width']):
masterFields.loc[row['Target Name'],'Column Width']=row['Column Width']
else:
if masterFields.loc[row['Target Name'],'Column Width']<row['Column Width']:
if debug: print('!!!!!!!!!WARNING!!!!!!!! Column Widths vary by file.Merge may not be accurate')
masterFields.loc[row['Target Name'],'Column Width']=row['Column Width']
if pd.isnull(masterFields.loc[row['Target Name'],'DataType']):
masterFields.loc[row['Target Name'],'DataType']=row['DataType']
else:
if masterFields.loc[row['Target Name'],'DataType']!=row['DataType']:
if row['DataType']== 'float': masterFields.loc[row['Target Name'],'DataType'] = float
if row['DataType']=='Timestamp': masterFields.loc[row['Target Name'],'DataType'] = Timestamp
if pd.isnull(masterFields.loc[row['Target Name'],'Size']):
masterFields.loc[row['Target Name'],'Size']=row['Size']
else:
if masterFields.loc[row['Target Name'],'Size']<row['Size']:
masterFields.loc[row['Target Name'],'Size']=row['Size']
if pd.isnull(masterFields.loc[row['Target Name'],'Format']):masterFields.loc[row['Target Name'],'Format']='XXX'
if not(pd.isnull(row['Format']) or row['Format']==''):
if debug: print('Checking if ',row['Format'], ' not in ', masterFields.loc[row['Target Name'],'Format'])
if debug: print('Size of Format value is:', str(len(row['Format'])))
if debug: print('Check to see if the value is NULL: ', pd.isnull(row['Format']))
if row['Format'] not in masterFields.loc[row['Target Name'],'Format']:
masterFields.loc[row['Target Name'],'Format'] += row['Format']
masterFields.loc[row['Target Name'],'Format'] += ';'
if pd.isnull(masterFields.loc[row['Target Name'],'Unique']):
masterFields.loc[row['Target Name'],'Unique'] = row['Unique']
else:
if not(row['Unique']): masterFields.loc[row['Target Name'],'Unique'] = False
if pd.isnull(masterFields.loc[row['Target Name'],'ImpactsGrain']):
masterFields.loc[row['Target Name'],'ImpactsGrain'] = row['ImpactsGrain']
else:
if row['ImpactsGrain']: masterFields.loc[row['Target Name'],'ImpactsGrain'] = True
if pd.isnull(masterFields.loc[row['Target Name'],'HasNULLs']):
masterFields.loc[row['Target Name'],'HasNULLs'] = row['HasNULLs']
else:
if row['HasNULLs']: masterFields.loc[row['Target Name'],'HasNULLs'] = True
if debug: print(masterFields)
def printMapping(fileNameParts,separator,HeaderPos,fieldStructure):
fileNameParts[len(fileNameParts) -1] = 'map'
name='.'.join(fileNameParts)
name+='.csv'
if debug: print(name)
currentFile = open(path +"\\" +name, "w+")
currentFile.write('Source file directory,' + path + '\n')
currentFile.write('Source file pattern,' + 'TBD' + '\n')
currentFile.write('Target table name,' + '[ENTER TABLENAME]' + '\n')
if separator != ' ':
currentFile.write('Field Seperator,"' + separator + '"\n')
else:
currentFile.write('Field Seperator,"' + 'FWF' + '"\n')
currentFile.write('Skip Rows,' + str(HeaderPos) + '\n' + '\n')
currentFile.close()
fieldStructure.to_csv(path +"\\" +name, mode='a', index= False, header= True )
# Read all files in directory
files = os.listdir(path)
masterSep='INIT'
textFormats = ["txt","csv","log"]
#Print all files in directory
for name in files:
print('>>>>>> CURRENTLY PROCESSING FILE : ',name)
fileNameParts = str.split(name, ".")
if debug: print(fileNameParts)
width=[0]
if '.map.' in name:
print('Skip current file (Output File): ',name)
continue
if fileNameParts[len(fileNameParts) -1] in textFormats:
if debug: print("This is a text file.")
separator, HeaderPos = identifySeparator(name)
if separator != '-1':
print('Seperator successfully identified as: ' + separator)
else:
print('Unable to identify Separator. This file will be skipped!')
continue
if separator == ' ':
print('This file may be a fixed width file')
HeaderPos, width=identifyFixedWidth(name)
if debug: print('Width Array ',width, len(width))
if len(width)<=1:
print('This file may be space seperated however it is not a fixed width file.')
continue
else:
separator = 'FWF'
fieldStructure = identifyFields(name, separator, HeaderPos, width)
if masterSep !='INIT':
if (masterSep != separator) and consolidate:
print('These files have different seperators so results will not be consolidated')
consolidate=False
else:
masterSep = separator
print('Field Structure identified successfully')
# Print Mapping Sheet
# if debug: print(fieldStructure)
if consolidate: addToMaster(fieldStructure)
printMapping(fileNameParts,separator,HeaderPos,fieldStructure)
else:
print('Skip current file (Unknown Format) : ',name)
#Prepare to print consolidated results
if consolidate:
fileNameParts[0] = 'FinalMappingSheet'
masterFields['Format'] = [x[3:] for x in masterFields['Format']]
printMapping(fileNameParts,separator,HeaderPos,masterFields)
-
2\$\begingroup\$ I'd be very interested to see a pythonic refactoring of the debug statements \$\endgroup\$Russ Hyde– Russ Hyde2019年03月13日 17:53:38 +00:00Commented Mar 13, 2019 at 17:53
-
1\$\begingroup\$ Can you include the example input as text instead of an image? This way reviewers don't need to type out your whole file... \$\endgroup\$Graipher– Graipher2019年03月13日 19:17:16 +00:00Commented Mar 13, 2019 at 19:17
-
\$\begingroup\$ Add the example input as text. \$\endgroup\$Uban– Uban2019年03月13日 20:03:29 +00:00Commented Mar 13, 2019 at 20:03
-
\$\begingroup\$ Can someone let me know if there is a better way to get the date format string \$\endgroup\$Uban– Uban2019年03月16日 19:09:14 +00:00Commented Mar 16, 2019 at 19:09
1 Answer 1
This is going to be relatively long, but I don't have a TL;DR section
if debug: print
Whenever you see code repeated like this, it's quite often you can refactor it into either a function, or there's a builtin to support it. Fortunately, the logging
module makes this quite simple. You do lose a bit of speed over the if
statement, but it's a much easier call to read and visually separate from the rest of your code:
import logging
import sys
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# I'm specifying sys.stdout because default is sys.stderr which
# may or may not be viewable in your console, whereas sys.stdout
# is always viewable
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
for i in range(10):
if i > 8:
logger.debug(f'{i}')
# do other things
9
# rather than
for i in range(10):
if i > 8:
if debug:
print(i)
# do other things
9
You can set this up with a level
mapping to point to other logging levels:
levels = {True: logging.DEBUG,
False: logging.INFO}
debug = True
logger.setLevel(levels[debug])
This way you don't lose your original way of keeping track of your debug status. The major benefit is that it doesn't visually collide with the rest of your flow control. When you have a ton of if
statements, visually sifting through to ignore if debug
becomes a major pain.
The way to fix this is to find if debug print
and replace it with logger.debug
, and everywhere else print
is needs to be logger.info
, logger.warn
, or logger.exception
(which will include a stack trace for you :) ). You'll need to fix the print arguments as well, how to do that is below.
Using logger
with objects
I'd probably start switching to using f-strings, this way you avoid string concatenation and your code is a bit more readable:
import pandas as pd
import logging
import sys
df = pd.DataFrame([[1,2,3],[4,5,6]], columns = list('abc'))
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
logger.debug(f'My dataframe is \n{df.head()}')
My dataframe is
a b c
0 1 2 3
1 4 5 6
# you can still use it for objects by themselves
logger.debug(df.head())
a b c
0 1 2 3
1 4 5 6
# To show logging.exception behavior
def f():
raise TypeError("Some random error")
try:
f()
except TypeError as e:
logger.exception("An error occurred")
An error occurred
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "<stdin>", line 3, in f
TypeError: Some random error
Opening and closing files
I see lots of different ways you open files:
for line in open(file):
...
fh = open(file)
for line in fh:
...
It's best to be consistent, and the most pythonic way to open a file is to use with
:
with open(file) as fh:
for line in fh:
...
This removes the need to manually close the file, since even on an exception, the handle is closed and you don't have to wait for fh
to exit function scope.
os.listdir vs os.scandir
If your directories are particularly large, it can be quite advantageous to use os.scandir
since it produces a generator rather than a list:
os.listdir('.')
['newfile.py', '.Rhistory', 'VMBash.txt', '.config', 'Music', '.sparkmagic', 'transcripts.tar.gz', '.amlinstaller', 'spotify.docx', 'tree.py', '.condarc', '.docker', 'company.20190102.idx.1', 'itertools_loop.py', 'sql2019.ipynb', 'somexml.xml', 'temp'...]
os.scandir('.')
<posix.ScandirIterator object at 0x10bbb51b0>
For large directories, you'd have to wait for listdir
to aggregate all of the files into memory, which could be either a) long-running or b) crash your machine (probably not but who knows). You would iterate over scandir
with the file.name
attribute to get back to what you had before:
for file in os.scandir('.'):
print(file.name)
newfile.py
.Rhistory
VMBash.txt
...
Tracking an Index
If you ever find yourself doing the following:
counter = 0
for x in iterable:
something[x] = 1
counter += 1
It's probably better to use enumerate
to track the index:
l = list('abcd')
for idx, item in enumerate(l):
print(idx)
0
1
2
3
You can also provide a start
kwarg to tell enumerate
where to begin:
l = list('abcd')
for idx, item in enumerate(l, start=1):
print(idx)
1
2
3
4
So in your identifyFields
function, I would definitely leverage that:
for counter, field_name in enumerate(df.columns, start=1):
# rest of loop
Counter
When you are iterating over your file to get character counts, you are losing speed with extra steps by converting to list
, then checking for \n
, then building your Counter. Counter
will consume a string, and \n
is a single string object. Move that into a separate function for separation of concerns:
def read_file(filepath):
# It is much better practice to open files using the with context manager
# it's safer and less error-prone
with open(filepath) as fh:
char_counts = []
for line in fh:
# don't construct a list, there's no need,
# just check if startswith, which is the same
# as line[0] == '\n'
if not line.startswith('\n'):
# Counter will consume a string
char_counts.append(Counter(line))
return char_counts
char_counts = read_file(name)
Or, a bit more succinctly as a list comprehension:
def read_file(name):
with open(filename) as fh:
return [Counter(line) for line in fh if not line.startswith('\n')]
Where the str.startwith
is a bit more robust for variable-length strings, as it avoids the if mystring[:len(to_check)] == to_check
mess.
Checking duplicate columns
It might be easier to leverage itertools.combinations
to get pairs of columns in your dataframe, then use the pd.Series.all()
function to check the values:
# change your naming to fit with common naming standards for
# variables and functions, pep linked below
def get_duplicate_columns(df):
'''
You are looking for non-repeated combinations of columns, so use
itertools.combinations to accomplish this, it's more efficient and
easier to see what you are trying to do, rather than tracking an index
'''
duplicate_column_names = set()
# Iterate over all pairs of columns in df
for a, b in itertools.combinations(df.columns, 2):
# will check if every entry in the series is True
if (df[a] == df[b]).all():
duplicate_column_names.add(b)
return list(duplicate_column_names)
This way you avoid the nested loops, and you are just checking across the boolean mask.
Check pandas datatype for column
It is both unclear and bad practice to do:
if str(df[fieldName].dtypes).replace('64', '',1) == 'float':
To explicitly check a pandas datatype, use the dtype
on a pd.Series
by accessing the column directly:
import numpy as np
if df[field_name].dtype == np.float64:
# do things
You'll need numpy
because the dtypes for pandas inherit from numpy
dtypes. An even clearer way to check (in case you get np.float32
instead of np.float64
, for example) is to leverage the pandas.api.types
:
import pandas as pd
import pandas.api.types as pdtypes
if pdtypes.is_float_dtype(df[field_name]):
# do things
This might be the preferred way to go, since you are assuming that you will always get float64
, where you might not. This also works for other types:
if pdtypes.is_integer_dtype(df[field_name]):
pass
elif pdtypes.is_object_dtype(df[field_name]):
pass
elif pdtypes.is_datetimetz(df[field_name]):
pass
# etc
Checking last element in an indexed data structure
Instead of using object[len(object) - 1]
, just use object[-1]
. Negative indexing also works for counting backwards:
x = list('abcdefg')
# get the last element
x[-1]
'g'
# get the third-to-last element
x[-3]
'e'
Finding a substring using str.find
In your datetime processing logic there's a ton of the following:
year_st = field_value.find(str(date_time.year))
if year_st == -1:
# do something
This will never evaluate to true, because find
will never return a negative index:
mystr = 'abcd'
mystr.find('d')
# 3
Likewise, later you use:
mon_st = format.find(str(dateTime.month))
if mon_st != -1:
# do things
This will always evaluate to True
.
This falls under checking str.endswith
, since you want to know if that block of text is the last element in the string:
if field_value.endswith(str(date_time.year)):
# do things
tuples vs lists
There are numerous places in your code where you use lists that you do not modify:
if ending not in formats:
...
df = df.drop(columns=skipSeperator, errors='ignore')
...
You incur extra overhead by allocating a mutable data structure:
import sys
lst, tup = list(range(100)), tuple(range(100))
sys.getsizeof(lst)
1008
sys.getsizeof(tup)
848
With this in mind, it's better to stick with the smaller data structure.
Pandas Recommendations
data-type checking
This kind of code-snippet is not something you'll want:
if str(df[fieldName].dtypes).replace('64', '',1) == 'float':
You're calling dtypes
, then coercing it to a string, then replacing it, then comparing to a fixed string. No need, those are numpy datatypes, so just do:
if df[fieldName].dtype == np.float64:
str functions
You can refactor code snippets like:
fieldStructure.loc[fieldName,'Size'] = df[fieldName].map(str).apply(len).max()
To be
fieldStructure.loc[fieldName, 'Size'] = df[fieldName].str.len().max()
Since the field is already a string type, you shouldn't need to map everything with str
, and then the len
operation should already be supported. The latter is faster largely because of the fewer operations that need to occur:
timing
python -m timeit -s "import pandas as pd; df = pd.DataFrame([['kladflna', 'l;adfkadf', ';aljdnvohauehfkadflan'] for i in range(1000)], columns=list(range(3)))" 'df[2].map(str).apply(len).max()'
1000 loops, best of 3: 506 usec per loop
python -m timeit -s "import pandas as pd; df = pd.DataFrame([['kladflna', 'l;adfkadf', ';aljdnvohauehfkadflan'] for i in range(1000)], columns=list(range(3)))" 'df[2].str.len().max()'
1000 loops, best of 3: 351 usec per loop
function calls
from dis import dis
import pandas as pd; df = pd.DataFrame([['kladflna', 'l;adfkadf', ';aljdnvohauehfkadflan'] for i in range(1000)], columns=list(range(3)))
def f(df):
df[2].map(str).apply(len).max()
def g(df):
df[2].str.len().max()
dis(f)
2 0 LOAD_FAST 0 (df)
2 LOAD_CONST 1 (2)
4 BINARY_SUBSCR
6 LOAD_ATTR 0 (map)
8 LOAD_GLOBAL 1 (str)
10 CALL_FUNCTION 1
12 LOAD_ATTR 2 (apply)
14 LOAD_GLOBAL 3 (len)
16 CALL_FUNCTION 1
18 LOAD_ATTR 4 (max)
20 CALL_FUNCTION 0
22 POP_TOP
24 LOAD_CONST 0 (None)
26 RETURN_VALUE
dis(g)
2 0 LOAD_FAST 0 (df)
2 LOAD_CONST 1 (2)
4 BINARY_SUBSCR
6 LOAD_ATTR 0 (str)
8 LOAD_ATTR 1 (len)
10 CALL_FUNCTION 0
12 LOAD_ATTR 2 (max)
14 CALL_FUNCTION 0
16 POP_TOP
18 LOAD_CONST 0 (None)
20 RETURN_VALUE
Opening a file multiple times
In your identifyFixedWidth
function, you open and read a file multiple times. I would say this makes it a candidate for a refactor, since these tasks should probably be broken into smaller functions:
def identify_fixed_width(name):
filepath = os.path.join(path, name)
# open the file here, and re-use the file-handle with fh.seek(0)
with open(filepath) as fh:
number_lines, avg_chars, max_length = get_avg_chars(fh)
# re-use this file handle, go back to the beginning
fh.seek(0)
counter = find_header(fh, avg_chars)
fh.seek(0)
col_pos = get_row_counter(fh, counter)
common = list(set.intersection(*map(set, col_pos))
logger.debug(f"Potential field separator posistions: {common}")
# replace this with a list comprehension, it's faster and more compact
new_common = [x for x in common if (x-1) not in common]
new_common.append(max_len)
logger.debug(f"Field separator positions identified as {new_common}"
# do not shadow builtin names like range. If you must, use a leading underscore
_range = len(new_common)
width = []
# underscore will show an unused or throwaway variable
for i, _ in enumerate(new_common[0:_range-1]):
width.append(new_common[i+1] - new_common[i])
logger.debug(f'Column Lengths: {width}')
return counter, width
def get_avg_chars(fh):
"""
Use enumerate to track the index here and
just count how much you want to decrement from the index
at the end
"""
decrement, max_len, total_chars = 0, 0, 0
for idx, line in enumerate(fh, start=1)
total_chars += len(line)
if len(line) <= 2:
decrement += 1
# this can be evaluated with a ternary expression
max_len = len(line) if len(line) > max_len else max_len
# at the end of the for loop, idx is the length of the file
num_lines = idx - decrement
avg_chars = total_chars / num_lines
return num_lines, avg_chars, max_len
def find_header(fh, avg_chars):
counter = 0
for line in fh:
logger.debug(f"{counter} has {len(line)} chars")
lower = len(line) <= avg_chars * 0.9
upper = len(line) >= avg_chars * 1.1
if upper or lower:
logger.debug(f"Line {counter}: Maybe part of header and needs to be skipped")
counter += 1
else:
logger.debug(f"Header found at {counter}")
break
return counter
def get_row_counter(fh, counter):
"""
Again, use enumerate here for row_counter
"""
col_pos = []
for row_counter, line in enumerate(fh):
if row_counter <= counter:
continue
blanks = [m.start() for m in re.finditer(' ', line)]
if len(blanks) > 2:
# you've already declared this variable, so use it
col_pos.append(blanks)
if row_counter <= 5:
logger.debug(col_pos[-1])
return col_pos
Now, it's easier to break apart, debug, and separate out pieces of work within a function.
Style
A few things
Variable and function names should be lowercase with words separated by underscores (
_
). Upper case is usually reserved for types, classes, etc. So something likerowCounter
should really berow_counter
.When checking for equivalence with singletons such as
None
,True
, andFalse
, it is usually better to useif value:
orif not value
:
# change this to
if date_flag == True:
# this
if date_flag:
- I'm not sure if there's a PEP for it, but visually separating blocks of code with newlines can be very helpful. As an example:
logger.debug('% of rows missing the charecter')
logger.debug(df.isna().sum()/ df.isna().count())
cleanupCandidates=df.dropna(axis='columns', thresh = (df.shape[0] + 1)*.8).fillna(-1)
logger.debug('Dropping characters not present in 80% of the columns')
logger.debug(cleanupCandidates.head())
lowestVariance = 0
spaceDetectedFlag = False
Separator2 = ''
for character in cleanupCandidates.columns:
logger.debug('**********' + character +' **********')
x = cleanupCandidates.loc[:,character].var()
logger.debug('Calculated variance : ' + str(x) )
if character == ' ':
spaceDetectedFlag = True
logger.debug('Potential position based file...')
continue
if lowestVariance >= x:
lowestVariance =x
Separator2 = character
logger.debug("Separator identified as : " + Separator2 + ' using METHOD 2')
if Separator == Separator2:
commonSep= Separator
else:
commonSep = list(set(Candidates).intersection(cleanupCandidates.columns))
logger.debug('Both methods identify '+ str(commonSep) + 'as one of the separator candidates.')
maxMode = 0
modeTable = cleanupCandidates.mode()
This just looks like a gigantic wall of code, and is difficult to quickly scan for keywords, patterns, etc. Try breaking up the code by breaking things into logical steps:
# Break all of this apart
logger.debug('% of rows missing the charecter')
logger.debug(df.isna().sum()/ df.isna().count())
cleanupCandidates=df.dropna(axis='columns', thresh = (df.shape[0] + 1)*.8).fillna(-1)
logger.debug('Dropping characters not present in 80% of the columns')
logger.debug(cleanupCandidates.head())
lowestVariance = 0
spaceDetectedFlag = False
Separator2 = ''
for character in cleanupCandidates.columns:
logger.debug('**********' + character +' **********')
x = cleanupCandidates.loc[:,character].var()
logger.debug('Calculated variance : ' + str(x) )
# separate these if statements, they do different things
if character == ' ':
spaceDetectedFlag = True
logger.debug('Potential position based file...')
continue
if lowestVariance >= x:
lowestVariance =x
Separator2 = character
logger.debug("Separator identified as : " + Separator2 + ' using METHOD 2')
# if, elif, else should be connected because they are one logical
# block of code
if Separator == Separator2:
commonSep= Separator
else:
commonSep = list(set(Candidates).intersection(cleanupCandidates.columns))
logger.debug('Both methods identify '+ str(commonSep) + 'as one of the separator candidates.')
maxMode = 0
modeTable = cleanupCandidates.mode()
- Make sure there's consistent whitespace around variables and operators.
# go from this
lowest_variance =x
common_sep= separator
# to this
lowest_variance = x
common_sep = separator
Explore related questions
See similar questions with these tags.