7
\$\begingroup\$

We have a department in our org that generates a lot of data in flat files, all with different formats. We are now trying to sort this data out and load into a database. As step 1, I am trying to identify the structure(separator, header, fields, datatypes, etc) of the files using a python script. This is my first time doing python programming, also I am programming after a long time. While this code gives me the output I need, how can I make it better?

The code looks at the directory C:\logs and loops through all files in the directory first to identify a separator and header, then if the separator is space it will check if it is a fixed width file. use the information here to identify all fields and their related attributes and prints out a mapping file. It also combines multiple mapping files into one mapping file if possible (this allows the mapping sheet to accommodate format variances across multiple files). I want to be able to use this on Excel files later.

SAMPLE INPUT:

Input
Example Input in txt format below. However this code is supposed to identify 'any character' delimited or fixed width files. Files can have any number of header lines, etc.

Some Title
 Grp IDS Date Time Weight 
 1 3559 3-10-19 8:45 1159.4
 1 3680 3-10-19 8:40 1861.2
 1 3844 3-10-19 8:36 2039.4
 1 3867 3-10-19 8:38 1861.2
 1 3985 3-10-19 8:40 1729.2
 1 4009 3-10-19 8:46 1883.2
 1 4014 3-10-19 8:36 1920.6
 1 4044 3-10-19 8:41 1689.6
 1 4058 3-10-19 8:39 1764.4
 1 4192 3-10-19 8:43 1775.4
 1 4344 3-10-19 8:43 1449.8
 1 4354 3-10-19 8:49 1555.4
 1 4356 3-10-19 8:31 1091.2
 1 4359 3-10-19 8:43 0.0
 1 4361 3-10-19 8:44 1689.6
 1 4365 3-10-19 8:43 1513.6
 2 3347 3-10-19 8:34 1867.8
 2 3860 3-10-19 8:37 1788.6
 2 3866 3-10-19 8:33 1980.0
 2 4004 3-10-19 8:24 1634.6
 2 4020 3-10-19 8:29 1612.6
 2 4086 3-10-19 8:35 1553.2
 2 4139 3-10-19 8:22 1883.2
 2 4145 3-10-19 8:27 1177.0
 2 4158 3-10-19 8:33 0.0
 2 4186 3-10-19 8:29 1586.2
 2 4193 3-10-19 8:28 1746.8
 2 4202 3-10-19 8:28 1870.0
 2 4215 3-10-19 8:31 1104.4
 2 4348 3-10-19 8:33 1628.0
 2 4369 3-10-19 8:32 1392.6
 2 4374 3-10-19 8:33 1394.8

OUTPUT:

Output

# Import Libraries
import os
import csv
import re
import pandas as pd
from collections import Counter
from dateutil.parser import parse
import calendar
# Set Variables
path = "C:\Logs"
printableChars = list('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;?@[\\]^_`{|}~ \t\n\r\x0b\x0c')
skipSeperator = list('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ:.!"#$%&\'()*+@[]\n\r\x0b\x0c')
masterFields=pd.DataFrame(columns=['Target Name','Source Name','Column Position','Column Width','DataType','Size','Format', 'KEY','Default','Comments','Unique', 'ImpactsGrain', 'HasNULLs'])
consolidate= True
debug=False
# Identify if any column is duplicate. It needs to be removed from further analysis
def getDuplicateColumns(df):
 '''
 Get a list of duplicate columns.
 It will iterate over all the columns in dataframe and find the columns whose contents are duplicate.
 :param df: Dataframe object
 :return: List of columns whose contents are duplicates.
 '''
 duplicateColumnNames = set()
 # Iterate over all the columns in dataframe
 for x in range(df.shape[1]):
 # Select column at xth index.
 col = df.iloc[:, x]
 # Iterate over all the columns in DataFrame from (x+1)th index till end
 for y in range(x + 1, df.shape[1]):
 # Select column at yth index.
 otherCol = df.iloc[:, y]
 # Check if two columns at x 7 y index are equal
 if col.equals(otherCol):
 duplicateColumnNames.add(df.columns.values[y])
 return list(duplicateColumnNames)
# Identify how column values are separated within a row. 
def identifySeparator(name):
 if debug: print( "Currently analyzing file : " + name)
 currentFile = open(path +"\\" +name, "r")
 # Create a list of charecters by Line
 characterFrequencies =[]
 for line in currentFile:
 characters = list(line)
 if characters[0] == '\n':
 continue
 characterFrequencies.append(Counter(characters))
 if debug: print(pd.DataFrame(characterFrequencies).head())
 df = pd.DataFrame(characterFrequencies)
 df = df.drop(columns=skipSeperator, errors='ignore') # Remove characters that are generally not used as sperators.
 if debug: print("Potential Seperators")
 # METHOD 1: Try to identify seperator with the understanding it should be present in every row.
 dfDense = df.dropna(axis='columns')
 if debug: print(dfDense.head())
 Candidates = dfDense.columns
 if debug: print("Number of characters present in every row : " + str(len(dfDense.columns)))
 if len(dfDense.columns) == 1:
 Separator = str(dfDense.columns[0])
 if debug: print("Separator identified as : " + Separator + ' using METHOD 1')
 else:
 Separator = '-1'
 if debug: print('Unable to identify seperator using METHOD 1 as 0 or multiple exist!!')
 # METHOD 2: Least Variance: The count of the seperator should be more or less same across rows.
 if debug: print('% of rows missing the charecter')
 if debug: print(df.isna().sum()/ df.isna().count())
 cleanupCandidates=df.dropna(axis='columns', thresh = (df.shape[0] + 1)*.8).fillna(-1)
 if debug: print('Dropping characters not present in 80% of the columns')
 if debug: print(cleanupCandidates.head())
 lowestVariance = 0
 spaceDetectedFlag = False
 Separator2 = ''
 for character in cleanupCandidates.columns:
 if debug: print('**********' + character +' **********')
 x = cleanupCandidates.loc[:,character].var()
 if debug: print('Calculated variance : ' + str(x) )
 if character == ' ':
 spaceDetectedFlag = True
 if debug: print('Potential position based file...')
 continue
 if lowestVariance >= x:
 lowestVariance =x
 Separator2 = character
 if debug: print("Separator identified as : " + Separator2 + ' using METHOD 2')
 if Separator == Separator2:
 commonSep= Separator
 else:
 commonSep = list(set(Candidates).intersection(cleanupCandidates.columns))
 if debug: print('Both methods identify '+ str(commonSep) + 'as one of the separator candidates.')
 maxMode = 0
 modeTable = cleanupCandidates.mode()
 if len(commonSep) != 1:
 print ('Multiple Common Seperator!! Use Max MODE', cleanupCandidates.columns)
 if debug: print(cleanupCandidates.mode())
 for column in modeTable.columns: 
 x = modeTable.loc[0,column]
 print (column,'\'s Mode: ', x)
 if x > maxMode:
 commonSep = column
 maxMode = x
 if debug: print('Resolved ambiguity by Max Mode Method to: ', commonSep)
 # Identify if header rows need to be skipped
 firstRow = cleanupCandidates[commonSep].idxmax()
 if debug: print('The Header is expected to be in row: ',firstRow)
 return commonSep[0], firstRow
 #candidates = []
 #candidates.append(Counter(characters))
def identifyFixedWidth(name):
 numberLines = 0
 totalCharecters = 0
 maxLength = 0
 for line in open(path +"\\" +name, "r"):
 numberLines +=1
 totalCharecters += len(line)
 if len(line) <= 2: numberLines -=1
 if maxLength < len(line): maxLength = len(line)
 avgChars =totalCharecters/numberLines
 if debug: print('Sample file has '+ str(numberLines) + ' lines. There are an average '+ str(avgChars) +' charecters pe line.')
 counter = 0
 for line in open(path +"\\" +name, "r"):
 if debug: print(str(counter) + ' has ' + str(len(line)) + ' chars')
 if len(line)<= avgChars*.9 or len(line)>=avgChars*1.1:
 if debug: print('Line '+ str(counter) +': Maybe part of header and needs to be skipped')
 counter+=1
 else:
 if debug: print('Header found at column :', counter)
 break
 # Figure out Column Start and stop positions
 rowCounter = -1
 colPos = []
 for line in open(path +"\\" +name, "r"):
 rowCounter+=1 
 if rowCounter < counter: continue
 blanks=[m.start() for m in re.finditer(' ', line)]
 if len(blanks)>2:
 colPos.append([m.start() for m in re.finditer(' ', line)])
 if rowCounter<=5:
 if debug: print(colPos[-1])
 # Intersection
 Common = list(set.intersection(*map(set, colPos)))
 if debug: print('Potential field separator positions: ',Common)
 # Remove sequential values
 newCommon = []
 for x in Common:
 if ((x-1) not in Common):newCommon.append(x)
 newCommon.append(maxLength)
 if debug: print('Field separator positions identified as :',newCommon)
 #Calculate Width
 width=[]
 range = len(newCommon)
 i=0
 for x in newCommon[0:range-1]:
 width.append(newCommon[i+1]-newCommon[i])
 i+=1
 if debug: print('Column Lengths:', width) 
 return counter,width
# Parse File and collect Field Information
def identifyFields(name,separator, HeaderPos, width):
 if debug: print( "Currently analyzing column structure for file : " + name)
 currentFile = path +"\\" +name
 if separator !='FWF':
 df = pd.read_csv(currentFile, sep=separator, parse_dates=False, skiprows=HeaderPos)
 else:
 df = pd.read_fwf(currentFile, parse_dates=False, header=HeaderPos, widths=width)
 if debug: print('Opening File as Fixed Width')
 dupCols = getDuplicateColumns(df)
 df = df.drop(columns=dupCols)
 fieldStructure = pd.DataFrame(index=[df.columns], columns=['Target Name','Source Name','Column Position','Column Width','DataType','Size','Format', 'KEY','Default','Comments','Unique', 'ImpactsGrain', 'HasNULLs'])
 totalRowCount = df.shape[0]
 totalDupRows = df[df.duplicated()].shape[0]
 if totalDupRows > 0:
 print('!!!!!!!!!WARNING!!!!!!!! This file contains ' + str(totalDupRows) + ' duplicate rows!')
 if len(dupCols)> 0:
 fieldStructure.loc['Duplicate','Target Name'] = 'DUPLICATE Fields:' + '-'.join(dupCols)
 print('!!!!!!!!!WARNING!!!!!!!! The following columns were identified as a duplicate column and will be removed from the final mapping')
 if debug: print(dupCols)
 if debug: print('Columns in the Dataset',df.columns)
 if debug: print(fieldStructure) 
 counter = 1
 for fieldName in df.columns:
 print('Processing Field: ' + fieldName)
 fieldStructure.loc[fieldName,'Source Name'] = fieldName
 fieldStructure.loc[fieldName,'Target Name'] = fieldName
 fieldStructure.loc[fieldName,'Column Position'] = counter
 if separator =='FWF':fieldStructure.loc[fieldName,'Column Width'] = width[counter-1]
 counter +=1
 fieldStructure.loc[fieldName,'DataType'] = str(df[fieldName].dtypes).replace('64', '',1)
 if str(df[fieldName].dtypes).replace('64', '',1) == 'float':
 if df[fieldName].fillna(1).apply(float.is_integer).all():
 fieldStructure.loc[fieldName,'DataType'] = 'int'
 else:
 fieldStructure.loc[fieldName,'DataType'] = 'float'
 format = ''
 dateFlg = True
 if df[fieldName].isnull().all(): fieldStructure.loc[fieldName,'DataType'] = 'Unknown'
 if df[fieldName].dtypes == 'object':
 fieldValue = str(df.loc[df.loc[:,fieldName].first_valid_index(),fieldName])
 if debug: print('First non NaN Index & Value: ',str(df.loc[:,fieldName].first_valid_index()), str(fieldValue))
 try:
 dateTime = parse(fieldValue,fuzzy=False)
 except ValueError:
 dateFlg = False
 if dateFlg == True:
 shortMonth = False
 shortDate = False
 if debug: print('Input Date:', fieldValue)
 if debug: print('Interpreted Date',dateTime)
 yrSt = fieldValue.find(str(dateTime.year))
 if debug: print('Year Start Position: ',yrSt)
 format = fieldValue.replace(str(dateTime.year), 'yyyy',1)
 if yrSt == -1:
 format = fieldValue.replace(str(dateTime.year)[2:], 'yy',1)
 if debug: print('Year format: ',format)
 noMonth=False
 monSt = format.find(str(dateTime.month).zfill(2))
 if debug: print('2 Digit Month Position:',monSt)
 format = format.replace(str(dateTime.month).zfill(2), 'mm',1)
 if monSt == -1:
 monSt1 = format.find(str(dateTime.month))
 if monSt1 != -1:
 shortMonth = True
 format = format.replace(str(dateTime.month).zfill(1), 'mm',1)
 else:
 noMonth = True
 #Check if Month Name or Abbreviations are used
 if noMonth:
 abbr=map(str.upper,calendar.month_abbr[1:])
 fmnth=map(str.upper,calendar.month_name[1:])
 if debug: print(abbr)
 for mon in abbr:
 if(mon in format.upper()):
 if debug: print('Month Abbr used in this date format', mon)
 noMonth = False
 format = format.replace(mon, 'MMM',1)
 for mon in fmnth:
 if(mon in format.upper()):
 if debug: print('Month Name used in this date format', mon)
 noMonth = False
 format = format.replace(mon, 'MMMM',1)
 if debug: print('Month Format: ',format)
 daySt = format.find(str(dateTime.day).zfill(2))
 if debug: print('2 Digit Day Position: ',daySt)
 format = format.replace(str(dateTime.day).zfill(2), 'dd',1)
 if daySt == -1:
 daySt1 = format.find(str(dateTime.day))
 if daySt1 != -1 and not noMonth:
 shortDate = True
 format = format.replace(str(dateTime.day), 'dd',1)
 if debug: print('Day format: ',format)
 hhSt = format.find(str(dateTime.hour).zfill(2))
 if debug: print('2 digit Hour Position: ',hhSt)
 format = format.replace(str(dateTime.hour).zfill(2), 'HH',1)
 if debug: print('2 digit Hour Format: ',format)
 if hhSt == -1:
 hhSt = format.find(str(dateTime.hour).zfill(2))
 if debug: print('24 Hour Format Position: ',hhSt)
 format = format.replace(str(dateTime.hour).zfill(2), 'HH',1)
 if debug: print('24 Hour Format: ',format)
 if hhSt == -1:
 hhSt = format.find(str(dateTime.hour))
 if debug: print('1 digit Hour Position: ',hhSt)
 format = format.replace(str(dateTime.hour), 'H',1)
 mnSt = format.find(str(dateTime.minute).zfill(2))
 if debug: print('Mins Position:',mnSt)
 format = format.replace(str(dateTime.minute).zfill(2), 'MM',1)
 if debug: print('Mins Format: ',format)
 secSt = format.find(str(dateTime.second).zfill(2))
 if debug: print('Seconds Position',secSt)
 format = format.replace(str(dateTime.second).zfill(2), 'SS',1)
 if debug: print('Seconds Format',format)
 if shortMonth or shortDate:
 format = format + ';' + format.replace(str('mm'), 'm',1).replace(str('dd'), 'd',1)
 if debug: print('Date Format Identified as :', format)
 fieldStructure.loc[fieldName,'DataType'] = 'Timestamp'
 else:
 fieldStructure.loc[fieldName,'DataType'] = 'String'
 if df[fieldName].isnull().all(): 
 fieldStructure.loc[fieldName,'Size'] = 0
 else:
 fieldStructure.loc[fieldName,'Size'] = df[fieldName].map(str).apply(len).max()
 fieldStructure.loc[fieldName,'Format'] = format
 fieldStructure.loc[fieldName,'Unique'] = df[fieldName].is_unique
 dftmp = df.drop(columns=fieldName)
 # if debug: print(dftmp.head())
 dupRows = dftmp[dftmp.duplicated()].shape[0]
 if dupRows > totalDupRows:
 grainFlg = True
 else:
 grainFlg = False
 fieldStructure.loc[fieldName,'ImpactsGrain'] = grainFlg # NEEDS MORE ANALYSIS
 if df[fieldName].isna().sum() > 0:
 fieldStructure.loc[fieldName,'HasNULLs'] = True
 else:
 fieldStructure.loc[fieldName,'HasNULLs'] = False
 if debug: print(df.columns)
 if debug: print(fieldStructure)
 return fieldStructure
# Merge all File Mappings to master mapping file.
def addToMaster(fieldStructure):
 if debug: print('Consolidating Multiple Mappings.....')
 #if debug: print(fieldStructure.loc[:,'Target Name'])
 #masterFields
 #masterFields.loc[:,'Format']='XXX'
 masterFields['Format'] = masterFields.Format.apply(lambda x: x if not (pd.isnull(x) or x=='') else 'XXX')
 if debug: print(masterFields['Format'])
 for index, row in fieldStructure.iterrows():
 #if debug: print(row)
 masterFields.loc[row['Target Name'],'Target Name']=row['Target Name']
 masterFields.loc[row['Target Name'],'Source Name']=row['Source Name']
 if pd.isnull(masterFields.loc[row['Target Name'],'Column Position']):
 masterFields.loc[row['Target Name'],'Column Position']=row['Column Position']
 else: 
 if masterFields.loc[row['Target Name'],'Column Position']!=row['Column Position']:
 if debug: print(bcolors.WARNING + "WARNING: Column positions vary by file."+ bcolors.ENDC)
 if pd.isnull(masterFields.loc[row['Target Name'],'Column Width']):
 masterFields.loc[row['Target Name'],'Column Width']=row['Column Width']
 else:
 if masterFields.loc[row['Target Name'],'Column Width']<row['Column Width']:
 if debug: print('!!!!!!!!!WARNING!!!!!!!! Column Widths vary by file.Merge may not be accurate')
 masterFields.loc[row['Target Name'],'Column Width']=row['Column Width']
 if pd.isnull(masterFields.loc[row['Target Name'],'DataType']):
 masterFields.loc[row['Target Name'],'DataType']=row['DataType']
 else:
 if masterFields.loc[row['Target Name'],'DataType']!=row['DataType']:
 if row['DataType']== 'float': masterFields.loc[row['Target Name'],'DataType'] = float
 if row['DataType']=='Timestamp': masterFields.loc[row['Target Name'],'DataType'] = Timestamp
 if pd.isnull(masterFields.loc[row['Target Name'],'Size']):
 masterFields.loc[row['Target Name'],'Size']=row['Size']
 else:
 if masterFields.loc[row['Target Name'],'Size']<row['Size']:
 masterFields.loc[row['Target Name'],'Size']=row['Size']
 if pd.isnull(masterFields.loc[row['Target Name'],'Format']):masterFields.loc[row['Target Name'],'Format']='XXX'
 if not(pd.isnull(row['Format']) or row['Format']==''):
 if debug: print('Checking if ',row['Format'], ' not in ', masterFields.loc[row['Target Name'],'Format'])
 if debug: print('Size of Format value is:', str(len(row['Format'])))
 if debug: print('Check to see if the value is NULL: ', pd.isnull(row['Format']))
 if row['Format'] not in masterFields.loc[row['Target Name'],'Format']: 
 masterFields.loc[row['Target Name'],'Format'] += row['Format']
 masterFields.loc[row['Target Name'],'Format'] += ';'
 if pd.isnull(masterFields.loc[row['Target Name'],'Unique']):
 masterFields.loc[row['Target Name'],'Unique'] = row['Unique']
 else:
 if not(row['Unique']): masterFields.loc[row['Target Name'],'Unique'] = False
 if pd.isnull(masterFields.loc[row['Target Name'],'ImpactsGrain']):
 masterFields.loc[row['Target Name'],'ImpactsGrain'] = row['ImpactsGrain']
 else:
 if row['ImpactsGrain']: masterFields.loc[row['Target Name'],'ImpactsGrain'] = True
 if pd.isnull(masterFields.loc[row['Target Name'],'HasNULLs']):
 masterFields.loc[row['Target Name'],'HasNULLs'] = row['HasNULLs']
 else:
 if row['HasNULLs']: masterFields.loc[row['Target Name'],'HasNULLs'] = True
 if debug: print(masterFields)
def printMapping(fileNameParts,separator,HeaderPos,fieldStructure):
 fileNameParts[len(fileNameParts) -1] = 'map'
 name='.'.join(fileNameParts)
 name+='.csv'
 if debug: print(name)
 currentFile = open(path +"\\" +name, "w+")
 currentFile.write('Source file directory,' + path + '\n')
 currentFile.write('Source file pattern,' + 'TBD' + '\n')
 currentFile.write('Target table name,' + '[ENTER TABLENAME]' + '\n')
 if separator != ' ':
 currentFile.write('Field Seperator,"' + separator + '"\n')
 else:
 currentFile.write('Field Seperator,"' + 'FWF' + '"\n')
 currentFile.write('Skip Rows,' + str(HeaderPos) + '\n' + '\n')
 currentFile.close()
 fieldStructure.to_csv(path +"\\" +name, mode='a', index= False, header= True )
# Read all files in directory
files = os.listdir(path)
masterSep='INIT'
textFormats = ["txt","csv","log"]
#Print all files in directory
for name in files:
 print('>>>>>> CURRENTLY PROCESSING FILE : ',name)
 fileNameParts = str.split(name, ".")
 if debug: print(fileNameParts)
 width=[0]
 if '.map.' in name:
 print('Skip current file (Output File): ',name)
 continue
 if fileNameParts[len(fileNameParts) -1] in textFormats:
 if debug: print("This is a text file.")
 separator, HeaderPos = identifySeparator(name)
 if separator != '-1':
 print('Seperator successfully identified as: ' + separator)
 else:
 print('Unable to identify Separator. This file will be skipped!')
 continue
 if separator == ' ':
 print('This file may be a fixed width file')
 HeaderPos, width=identifyFixedWidth(name)
 if debug: print('Width Array ',width, len(width))
 if len(width)<=1:
 print('This file may be space seperated however it is not a fixed width file.')
 continue
 else:
 separator = 'FWF'
 fieldStructure = identifyFields(name, separator, HeaderPos, width)
 if masterSep !='INIT':
 if (masterSep != separator) and consolidate:
 print('These files have different seperators so results will not be consolidated')
 consolidate=False
 else:
 masterSep = separator
 print('Field Structure identified successfully')
 # Print Mapping Sheet
 # if debug: print(fieldStructure)
 if consolidate: addToMaster(fieldStructure)
 printMapping(fileNameParts,separator,HeaderPos,fieldStructure)
 else:
 print('Skip current file (Unknown Format) : ',name)
#Prepare to print consolidated results
if consolidate:
 fileNameParts[0] = 'FinalMappingSheet'
 masterFields['Format'] = [x[3:] for x in masterFields['Format']]
 printMapping(fileNameParts,separator,HeaderPos,masterFields) 
asked Mar 13, 2019 at 16:26
\$\endgroup\$
4
  • 2
    \$\begingroup\$ I'd be very interested to see a pythonic refactoring of the debug statements \$\endgroup\$ Commented Mar 13, 2019 at 17:53
  • 1
    \$\begingroup\$ Can you include the example input as text instead of an image? This way reviewers don't need to type out your whole file... \$\endgroup\$ Commented Mar 13, 2019 at 19:17
  • \$\begingroup\$ Add the example input as text. \$\endgroup\$ Commented Mar 13, 2019 at 20:03
  • \$\begingroup\$ Can someone let me know if there is a better way to get the date format string \$\endgroup\$ Commented Mar 16, 2019 at 19:09

1 Answer 1

3
\$\begingroup\$

This is going to be relatively long, but I don't have a TL;DR section

if debug: print

Whenever you see code repeated like this, it's quite often you can refactor it into either a function, or there's a builtin to support it. Fortunately, the logging module makes this quite simple. You do lose a bit of speed over the if statement, but it's a much easier call to read and visually separate from the rest of your code:

import logging
import sys
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# I'm specifying sys.stdout because default is sys.stderr which
# may or may not be viewable in your console, whereas sys.stdout
# is always viewable
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
for i in range(10):
 if i > 8:
 logger.debug(f'{i}')
 # do other things
9
# rather than
for i in range(10):
 if i > 8:
 if debug:
 print(i)
 # do other things
9

You can set this up with a level mapping to point to other logging levels:

levels = {True: logging.DEBUG,
 False: logging.INFO}
debug = True
logger.setLevel(levels[debug])

This way you don't lose your original way of keeping track of your debug status. The major benefit is that it doesn't visually collide with the rest of your flow control. When you have a ton of if statements, visually sifting through to ignore if debug becomes a major pain.

The way to fix this is to find if debug print and replace it with logger.debug, and everywhere else print is needs to be logger.info, logger.warn, or logger.exception (which will include a stack trace for you :) ). You'll need to fix the print arguments as well, how to do that is below.

Using logger with objects

I'd probably start switching to using f-strings, this way you avoid string concatenation and your code is a bit more readable:

import pandas as pd
import logging
import sys
df = pd.DataFrame([[1,2,3],[4,5,6]], columns = list('abc'))
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
logger.debug(f'My dataframe is \n{df.head()}')
My dataframe is
 a b c
0 1 2 3
1 4 5 6
# you can still use it for objects by themselves
logger.debug(df.head())
 a b c
0 1 2 3
1 4 5 6
# To show logging.exception behavior
def f():
 raise TypeError("Some random error")
try:
 f()
except TypeError as e:
 logger.exception("An error occurred")
An error occurred
Traceback (most recent call last):
 File "<stdin>", line 2, in <module>
 File "<stdin>", line 3, in f
TypeError: Some random error

Opening and closing files

I see lots of different ways you open files:

for line in open(file):
 ...
fh = open(file)
for line in fh:
 ...

It's best to be consistent, and the most pythonic way to open a file is to use with:

with open(file) as fh:
 for line in fh:
 ...

This removes the need to manually close the file, since even on an exception, the handle is closed and you don't have to wait for fh to exit function scope.

os.listdir vs os.scandir

If your directories are particularly large, it can be quite advantageous to use os.scandir since it produces a generator rather than a list:

os.listdir('.')
['newfile.py', '.Rhistory', 'VMBash.txt', '.config', 'Music', '.sparkmagic', 'transcripts.tar.gz', '.amlinstaller', 'spotify.docx', 'tree.py', '.condarc', '.docker', 'company.20190102.idx.1', 'itertools_loop.py', 'sql2019.ipynb', 'somexml.xml', 'temp'...]
os.scandir('.')
<posix.ScandirIterator object at 0x10bbb51b0>

For large directories, you'd have to wait for listdir to aggregate all of the files into memory, which could be either a) long-running or b) crash your machine (probably not but who knows). You would iterate over scandir with the file.name attribute to get back to what you had before:

for file in os.scandir('.'):
 print(file.name)
newfile.py
.Rhistory
VMBash.txt
...

Tracking an Index

If you ever find yourself doing the following:

counter = 0
for x in iterable:
 something[x] = 1
 counter += 1

It's probably better to use enumerate to track the index:

l = list('abcd')
for idx, item in enumerate(l):
 print(idx)
0
1
2
3

You can also provide a start kwarg to tell enumerate where to begin:

l = list('abcd')
for idx, item in enumerate(l, start=1):
 print(idx)
1
2
3
4

So in your identifyFields function, I would definitely leverage that:

for counter, field_name in enumerate(df.columns, start=1):
 # rest of loop

Counter

When you are iterating over your file to get character counts, you are losing speed with extra steps by converting to list, then checking for \n, then building your Counter. Counter will consume a string, and \n is a single string object. Move that into a separate function for separation of concerns:

def read_file(filepath):
 # It is much better practice to open files using the with context manager
 # it's safer and less error-prone
 with open(filepath) as fh:
 char_counts = []
 for line in fh:
 # don't construct a list, there's no need,
 # just check if startswith, which is the same
 # as line[0] == '\n'
 if not line.startswith('\n'):
 # Counter will consume a string
 char_counts.append(Counter(line))
 return char_counts
char_counts = read_file(name)

Or, a bit more succinctly as a list comprehension:

def read_file(name):
 with open(filename) as fh:
 return [Counter(line) for line in fh if not line.startswith('\n')]

Where the str.startwith is a bit more robust for variable-length strings, as it avoids the if mystring[:len(to_check)] == to_check mess.

Checking duplicate columns

It might be easier to leverage itertools.combinations to get pairs of columns in your dataframe, then use the pd.Series.all() function to check the values:

# change your naming to fit with common naming standards for
# variables and functions, pep linked below
def get_duplicate_columns(df):
 '''
 You are looking for non-repeated combinations of columns, so use
 itertools.combinations to accomplish this, it's more efficient and
 easier to see what you are trying to do, rather than tracking an index
 '''
 duplicate_column_names = set()
 # Iterate over all pairs of columns in df
 for a, b in itertools.combinations(df.columns, 2):
 # will check if every entry in the series is True
 if (df[a] == df[b]).all():
 duplicate_column_names.add(b)
 return list(duplicate_column_names)

This way you avoid the nested loops, and you are just checking across the boolean mask.

Check pandas datatype for column

It is both unclear and bad practice to do:

if str(df[fieldName].dtypes).replace('64', '',1) == 'float':

To explicitly check a pandas datatype, use the dtype on a pd.Series by accessing the column directly:

import numpy as np
if df[field_name].dtype == np.float64:
 # do things

You'll need numpy because the dtypes for pandas inherit from numpy dtypes. An even clearer way to check (in case you get np.float32 instead of np.float64, for example) is to leverage the pandas.api.types:

import pandas as pd
import pandas.api.types as pdtypes
if pdtypes.is_float_dtype(df[field_name]):
 # do things

This might be the preferred way to go, since you are assuming that you will always get float64, where you might not. This also works for other types:

if pdtypes.is_integer_dtype(df[field_name]):
 pass
elif pdtypes.is_object_dtype(df[field_name]):
 pass
elif pdtypes.is_datetimetz(df[field_name]):
 pass
# etc

Checking last element in an indexed data structure

Instead of using object[len(object) - 1], just use object[-1]. Negative indexing also works for counting backwards:

x = list('abcdefg')
# get the last element
x[-1]
'g'
# get the third-to-last element
x[-3]
'e'

Finding a substring using str.find

In your datetime processing logic there's a ton of the following:

year_st = field_value.find(str(date_time.year))
if year_st == -1:
 # do something

This will never evaluate to true, because find will never return a negative index:

mystr = 'abcd'
mystr.find('d')
# 3

Likewise, later you use:

mon_st = format.find(str(dateTime.month))
if mon_st != -1:
 # do things

This will always evaluate to True.

This falls under checking str.endswith, since you want to know if that block of text is the last element in the string:

if field_value.endswith(str(date_time.year)):
 # do things

tuples vs lists

There are numerous places in your code where you use lists that you do not modify:

if ending not in formats:
...
df = df.drop(columns=skipSeperator, errors='ignore')
...

You incur extra overhead by allocating a mutable data structure:

import sys
lst, tup = list(range(100)), tuple(range(100))
sys.getsizeof(lst)
1008
sys.getsizeof(tup)
848

With this in mind, it's better to stick with the smaller data structure.

Pandas Recommendations

data-type checking

This kind of code-snippet is not something you'll want:

if str(df[fieldName].dtypes).replace('64', '',1) == 'float':

You're calling dtypes, then coercing it to a string, then replacing it, then comparing to a fixed string. No need, those are numpy datatypes, so just do:

if df[fieldName].dtype == np.float64:

str functions

You can refactor code snippets like:

fieldStructure.loc[fieldName,'Size'] = df[fieldName].map(str).apply(len).max()

To be

fieldStructure.loc[fieldName, 'Size'] = df[fieldName].str.len().max()

Since the field is already a string type, you shouldn't need to map everything with str, and then the len operation should already be supported. The latter is faster largely because of the fewer operations that need to occur:

timing

python -m timeit -s "import pandas as pd; df = pd.DataFrame([['kladflna', 'l;adfkadf', ';aljdnvohauehfkadflan'] for i in range(1000)], columns=list(range(3)))" 'df[2].map(str).apply(len).max()'
1000 loops, best of 3: 506 usec per loop
python -m timeit -s "import pandas as pd; df = pd.DataFrame([['kladflna', 'l;adfkadf', ';aljdnvohauehfkadflan'] for i in range(1000)], columns=list(range(3)))" 'df[2].str.len().max()'
1000 loops, best of 3: 351 usec per loop

function calls

from dis import dis
import pandas as pd; df = pd.DataFrame([['kladflna', 'l;adfkadf', ';aljdnvohauehfkadflan'] for i in range(1000)], columns=list(range(3)))
def f(df):
 df[2].map(str).apply(len).max()
def g(df):
 df[2].str.len().max()
dis(f)
 2 0 LOAD_FAST 0 (df)
 2 LOAD_CONST 1 (2)
 4 BINARY_SUBSCR
 6 LOAD_ATTR 0 (map)
 8 LOAD_GLOBAL 1 (str)
 10 CALL_FUNCTION 1
 12 LOAD_ATTR 2 (apply)
 14 LOAD_GLOBAL 3 (len)
 16 CALL_FUNCTION 1
 18 LOAD_ATTR 4 (max)
 20 CALL_FUNCTION 0
 22 POP_TOP
 24 LOAD_CONST 0 (None)
 26 RETURN_VALUE
dis(g)
 2 0 LOAD_FAST 0 (df)
 2 LOAD_CONST 1 (2)
 4 BINARY_SUBSCR
 6 LOAD_ATTR 0 (str)
 8 LOAD_ATTR 1 (len)
 10 CALL_FUNCTION 0
 12 LOAD_ATTR 2 (max)
 14 CALL_FUNCTION 0
 16 POP_TOP
 18 LOAD_CONST 0 (None)
 20 RETURN_VALUE

Opening a file multiple times

In your identifyFixedWidth function, you open and read a file multiple times. I would say this makes it a candidate for a refactor, since these tasks should probably be broken into smaller functions:

def identify_fixed_width(name):
 filepath = os.path.join(path, name)
 # open the file here, and re-use the file-handle with fh.seek(0)
 with open(filepath) as fh:
 number_lines, avg_chars, max_length = get_avg_chars(fh)
 # re-use this file handle, go back to the beginning
 fh.seek(0)
 counter = find_header(fh, avg_chars)
 fh.seek(0)
 col_pos = get_row_counter(fh, counter)
 common = list(set.intersection(*map(set, col_pos))
 logger.debug(f"Potential field separator posistions: {common}")
 # replace this with a list comprehension, it's faster and more compact
 new_common = [x for x in common if (x-1) not in common]
 new_common.append(max_len)
 logger.debug(f"Field separator positions identified as {new_common}"
 # do not shadow builtin names like range. If you must, use a leading underscore
 _range = len(new_common)
 width = []
 # underscore will show an unused or throwaway variable
 for i, _ in enumerate(new_common[0:_range-1]):
 width.append(new_common[i+1] - new_common[i])
 logger.debug(f'Column Lengths: {width}') 
 return counter, width 
def get_avg_chars(fh):
 """
 Use enumerate to track the index here and
 just count how much you want to decrement from the index
 at the end
 """
 decrement, max_len, total_chars = 0, 0, 0
 for idx, line in enumerate(fh, start=1)
 total_chars += len(line)
 if len(line) <= 2:
 decrement += 1
 # this can be evaluated with a ternary expression
 max_len = len(line) if len(line) > max_len else max_len
 # at the end of the for loop, idx is the length of the file
 num_lines = idx - decrement
 avg_chars = total_chars / num_lines
 return num_lines, avg_chars, max_len
def find_header(fh, avg_chars):
 counter = 0
 for line in fh:
 logger.debug(f"{counter} has {len(line)} chars")
 lower = len(line) <= avg_chars * 0.9
 upper = len(line) >= avg_chars * 1.1
 if upper or lower:
 logger.debug(f"Line {counter}: Maybe part of header and needs to be skipped")
 counter += 1
 else:
 logger.debug(f"Header found at {counter}") 
 break
 return counter
def get_row_counter(fh, counter):
 """
 Again, use enumerate here for row_counter
 """ 
 col_pos = []
 for row_counter, line in enumerate(fh):
 if row_counter <= counter:
 continue
 blanks = [m.start() for m in re.finditer(' ', line)]
 if len(blanks) > 2:
 # you've already declared this variable, so use it 
 col_pos.append(blanks)
 if row_counter <= 5:
 logger.debug(col_pos[-1])
 return col_pos

Now, it's easier to break apart, debug, and separate out pieces of work within a function.

Style

A few things

  1. Variable and function names should be lowercase with words separated by underscores (_). Upper case is usually reserved for types, classes, etc. So something like rowCounter should really be row_counter.

  2. When checking for equivalence with singletons such as None, True, and False, it is usually better to use if value: or if not value:

# change this to
if date_flag == True:
# this
if date_flag:
  1. I'm not sure if there's a PEP for it, but visually separating blocks of code with newlines can be very helpful. As an example:
 logger.debug('% of rows missing the charecter')
 logger.debug(df.isna().sum()/ df.isna().count())
 cleanupCandidates=df.dropna(axis='columns', thresh = (df.shape[0] + 1)*.8).fillna(-1)
 logger.debug('Dropping characters not present in 80% of the columns')
 logger.debug(cleanupCandidates.head())
 lowestVariance = 0
 spaceDetectedFlag = False
 Separator2 = ''
 for character in cleanupCandidates.columns:
 logger.debug('**********' + character +' **********')
 x = cleanupCandidates.loc[:,character].var()
 logger.debug('Calculated variance : ' + str(x) )
 if character == ' ':
 spaceDetectedFlag = True
 logger.debug('Potential position based file...')
 continue
 if lowestVariance >= x:
 lowestVariance =x
 Separator2 = character
 logger.debug("Separator identified as : " + Separator2 + ' using METHOD 2')
 if Separator == Separator2:
 commonSep= Separator
 else:
 commonSep = list(set(Candidates).intersection(cleanupCandidates.columns))
 logger.debug('Both methods identify '+ str(commonSep) + 'as one of the separator candidates.')
 maxMode = 0
 modeTable = cleanupCandidates.mode()

This just looks like a gigantic wall of code, and is difficult to quickly scan for keywords, patterns, etc. Try breaking up the code by breaking things into logical steps:

# Break all of this apart
 logger.debug('% of rows missing the charecter')
 logger.debug(df.isna().sum()/ df.isna().count())
 cleanupCandidates=df.dropna(axis='columns', thresh = (df.shape[0] + 1)*.8).fillna(-1)
 logger.debug('Dropping characters not present in 80% of the columns')
 logger.debug(cleanupCandidates.head())
 lowestVariance = 0
 spaceDetectedFlag = False
 Separator2 = ''
 for character in cleanupCandidates.columns:
 logger.debug('**********' + character +' **********')
 x = cleanupCandidates.loc[:,character].var()
 logger.debug('Calculated variance : ' + str(x) )
 # separate these if statements, they do different things
 if character == ' ':
 spaceDetectedFlag = True
 logger.debug('Potential position based file...')
 continue
 if lowestVariance >= x:
 lowestVariance =x
 Separator2 = character
 logger.debug("Separator identified as : " + Separator2 + ' using METHOD 2')
 # if, elif, else should be connected because they are one logical
 # block of code
 if Separator == Separator2:
 commonSep= Separator
 else:
 commonSep = list(set(Candidates).intersection(cleanupCandidates.columns))
 logger.debug('Both methods identify '+ str(commonSep) + 'as one of the separator candidates.')
 maxMode = 0
 modeTable = cleanupCandidates.mode()
  1. Make sure there's consistent whitespace around variables and operators.
# go from this
lowest_variance =x
common_sep= separator
# to this
lowest_variance = x
common_sep = separator
answered Jul 12, 2019 at 14:48
\$\endgroup\$
0

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.