Motivation
I have been trying to work on my first bigger scale Python project, however I am struggling to create pythonic solutions. Instead some of the functions (and especially the naming) I've done so far seems like more of a hacky solution, than a best coding practice. Hope someone can clear up some confusions and lead show me in the right direction.
Brief Overview
The script processes Google Spreadsheets by interacting with the Google Docs Sheets API. A class Sheets
handles the API calls. The second class CustomSheet
handles the application-specific data logic and parsing of the API call responses. At the same time there are 5 different instances of CustomSheet
shared amongst different scripts to perform various automation on the data.
Because the Google Docs API is limited too 100req/100s
, instances are only initialized once. Initially I had the idea of writing a script coordinating the handling of the instances and sub-scripts, however this added a lot of complexity and little benefit. Instead whenever a CustomSheet
is initialized, the instance is appended to instances
on class-level so scripts can get them autonomously – and of course it also made sense to implement a classmethod that initializes all instances automatically by calling initializeAll
, as their initialization requirements are predictable. While it adds a lot of comfort, it seems like a lot of the logic that should be handled on script level is now moved to class level.
Questions
- Is initializing all relevant class instances (
initializeAll
) ok? - And handling instances using
@classmethods
(get
,getAll
)? - Should
getCustomSheet
be renamed togetSheet
which then simply callssuper()
? But what if the necessity arises to make a raw API call from one of the scripts? - The
errorResilience
should really be a decorator. However having to work with slices and indices on return values either leaves the choice of passing them to a decorator, e.g.@error_resilience([0]['api_call'])
which does not seem possible, or to catch anIndexError
within the decorator, but then again the returned value is not available in the decorator context (, is it?) - There are a lot of functions in
CustomSheet
performing evaluations on instance variables. E.g.Entries
are evaluated usingCustomSheet
instances (searchEntry
,filterEntry
,conv
). An alternative solution could be addinginstances
forEntry
as well and moving the functions there, so the logic of evaluating entries is in the Entry class, however this seems to be unpractical during normal runtime, sinceEntry
would have to be imported in all scripts, instead of just importingCustomSheet
. Logic seems scattered amongst multiple classes, but it seems to make sense - Any other general remarks on the code? I feel like I use a lot of
for ... in ...:
loops. Any feedback is greatly appreciated.
Code
modules/Sheets.py
import requests
import json
from time import sleep
from random import randint
from modules.PositionRange import PositionRange
import logging
logger = logging.getLogger(__name__)
from settings import CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, PROXY
class Sheets():
""" Google Docs API Library """
PROXIES = {'http': PROXY, 'https': PROXY}
header = {
'Content-Type': 'application/json; charset=utf-8',
}
spreadsheetId = ''
accessToken = ''
def __init__(self, spreadsheetName):
self.getToken()
self.setSpreadsheet(name=spreadsheetName)
def getToken(self):
""" Gets authentication token from Google Docs API
if no Global API token is set on Class Level yet. """
if not Sheets.accessToken:
self.refreshToken()
else:
self.header.update({'Authorization': f'Bearer {Sheets.accessToken}'})
def refreshToken(self):
refreshGUrl = 'https://www.googleapis.com/oauth2/v4/token'
header = {
'Content-Type': 'application/x-www-form-urlencoded'
}
body = {
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'refresh_token': REFRESH_TOKEN,
'grant_type': 'refresh_token'
}
r = requests.post(refreshGUrl, headers=header, data=body, proxies=Sheets.PROXIES)
token = self.errorResilience(r.json(), self.refreshToken, {})['access_token']
Sheets.accessToken = token
self.header.update({'Authorization': f'Bearer {Sheets.accessToken}'})
return token
def setSpreadsheet(self, name=None, spreadsheetId=None):
if(name):
spreadsheetId = self.getSpreadsheet(name)
if(spreadsheetId and self.spreadsheetId != spreadsheetId):
logger.debug(f'Setting spreadsheetId to [{spreadsheetId}]')
self.spreadsheetId = spreadsheetId
spreadsheetInfo = self.getSpreadsheetInfo()
self.spreadsheetName = spreadsheetInfo['properties']['title']
self.sheets = spreadsheetInfo['sheets']
logger.info(f'Selected Spreadsheet: {self.spreadsheetName} [{self.spreadsheetId}]')
else:
logger.debug(f'SpreadsheetId already selected [{spreadsheetId}] or None')
def getSpreadsheet(self, name):
try:
logger.info(f'Trying to resolve spreadsheetId for {name}...')
query = f'name = "{name}"'
driveGUrl='https://www.googleapis.com/drive/v3/files'
params = {'q': query}
r = requests.get(driveGUrl, params=params, headers=self.header, proxies=Sheets.PROXIES)
logger.debug(f'RESPONSE: {r.json()}')
return self.errorResilience(r.json(), self.getSpreadsheet, {'name': name})['files'][0]['id']
except IndexError as e:
logger.error(f'Error during spreadsheetId lookup. File {name} was probably deleted.')
logger.exception(f'[ERROR] getSpreadsheet: {name}')
raise EOFError('File not found.') from None
def getSpreadsheetInfo(self):
logger.info(f'Getting all spreadsheet information [{self.spreadsheetId}]')
sheetGUrl = f'https://sheets.googleapis.com/v4/spreadsheets/{self.spreadsheetId}'
r = requests.get(sheetGUrl, headers=self.header, proxies=Sheets.PROXIES)
sheetData = r.json()
return self.errorResilience(sheetData, self.getSpreadsheetInfo, {})
def getSheet(self, sheetName: str, posRange: PositionRange) -> dict:
""" Gets the content of one specific sheet """
sheetGUrl = f'https://sheets.googleapis.com/v4/spreadsheets/{self.spreadsheetId}'
logger.info(f'Getting sheet content: {sheetName}{posRange} [{self.spreadsheetName} | {self.spreadsheetId}]')
sheetGUrl = f'{sheetGUrl}/values/{requests.utils.quote(sheetName)}{posRange}'
r = requests.get(sheetGUrl, headers=self.header, proxies=Sheets.PROXIES)
sheetData = r.json()
return self.errorResilience(sheetData, self.getSheet, {'sheetName': sheetName, 'posRange': posRange})
def errorResilience(self, sheetData, callingFunc, kwargs):
""" Centralized Error Handling for API Calls. Would ideally
be a decorator, however working with different slices and indices
(e.g. refreshToken) in return values doesn't make this possible(?) """
args = []
if('error' in sheetData.keys()):
code = sheetData['error']['code']
if(code == 401):
logger.error('UNAUTHORIZED. API TOKEN LIKELY EXPIRED...')
self.refreshToken()
sleep(5)
return callingFunc(*args, **kwargs)
elif(code == 403):
logger.error('The request is missing a valid API key.')
self.getToken()
elif(code == 404):
logger.error('FILE NOT FOUND. SPREADSHEETID INVALID')
raise IndexError(f'Spreadsheet does not exist {self.name} [{self.spreadsheetId}]')
elif(code == 429):
tsleep = 100 + randint(10, 50)
logger.error(f'API LIMIT EXCEEDED. AUTO-RECOVERING BY WAITING {tsleep}s...')
sleep(tsleep)
return callingFunc(*args, **kwargs)
elif(code == 400):
logger.error('SPECIFIED SHEET DOES NOT EXIST OR ILLEGAL RANGE.')
raise IndexError(sheetData['error']['message'])
else:
logger.error('AN UNKNOWN ERROR OCCURRED.')
return sheetData
modules/CustomSheet.py
from datetime import datetime
from copy import copy
from dateutil.relativedelta import relativedelta
from time import sleep
from modules.Sheets import Sheets
from modules.Entry import Entry
from modules.Synonyms import Synonyms
from modules.PositionRange import PositionRange
from collections import Counter
import logging
logger = logging.getLogger(__name__)
class CustomSheet(Sheets):
""" Custom class that holds """
MONTHS = ['Error', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
TYP = 'CustomSheet'
POS = PositionRange.from_str('A4:R')
instances = []
def __init__(self, date=datetime.now()):
super(CustomSheet, self).__init__(spreadsheetName=CustomSheet.getCustomSheet(date))
self.datum = date
self.name = self.spreadsheetName
self.sheetData = []
self.updateSynonyms()
self.entries = []
self.name = self.spreadsheetName
CustomSheet.append(self)
def __new__(cls, date=datetime.now()):
name = CustomSheet.getCustomSheet(date)
x = CustomSheet.get(name)
if x:
logger.debug(f'{name} already exists. Returning instance...')
return x
else:
logger.debug(f'{name} does not exist already. Creating new instance')
return super(CustomSheet, cls).__new__(cls)
def __getnewargs__(self):
return self.datum
def __str__(self):
return f'{self.spreadsheetName}'
def __eq__(self, value):
return self.name == value
def __lt__(self, other):
return self.datum < other.datum
def __lt__(self, other):
return self.datum > other.datum
def getCustomSheetSheet(self, sheetName):
sheetData = {}
posRange = self.POS
sheetData[sheetName] = self.getSheet(sheetName=sheetName)
return self.parseCustomSheet(sheetData=sheetData, posRange=posRange)
def getCustomSheets(self):
sheetData = {}
sheets = self.sheets
posRange = self.POS
for sheet in sheets:
sheetName = sheet['properties']['title']
if(sheetName.isdigit()):
sheetData[sheetName] = self.getSheet(posRange=posRange, sheetName=sheetName)
return self.parseCustomSheet(sheetData=sheetData, posRange=posRange)
def parseCustomSheet(self, sheetData, posRange):
""" Creates Entries from Spreadsheet Data; basically a dict
so we don't have to work with lists we get from Google Docs """
logger.debug(f'Parsing (raw data -> Entry) {sheetData}')
length = posRange.column_length()
logger.debug(f'LENGTH: {length}')
appended = []
for sheetName, rows in sheetData.items():
pos = copy(posRange)
pos.decrement_row()
for row in rows['values']:
while(len(row) < length+1):
row.append('')
pos.increment_row()
entry = Entry.from_customsheet(self, sheetName, row, pos)
if not entry.isValid():
logger.debug('NO VALID ENTRY FOR DICT ABOVE')
continue
logger.debug('IS VALID ENTRY')
self.sheetData.append(entry)
appended.append(entry)
return appended
def filter(self, field: str, value: str) -> list:
""" Filters Entries for <field> having a certain <value> """
found = []
if not isinstance(value, CustomSheet):
value = value.strip().upper()
for entry in instance.sheetData:
if(entry.__dict__[field.lower()] == value
and 'SYNC' not in entry.sheet):
found.append(entry)
return found
def hasEntry(self, entry: Entry) -> bool:
return entry in self.sheetData
@staticmethod
def getCustomSheet(date):
""" Function to build spreadsheet names by
internal naming convention. """
name = f'Sheet {CustomSheet.MONTHS[date.month]} {str(date.year)}'
return name
@staticmethod
def getTime(relativeMonth=0, absoluteMonth=0):
""" Helpfunction that helps iterating over months
while automatically decrementing years. """
relativeMonth = int(relativeMonth)
absoluteMonth = int(absoluteMonth)
thisMonth = datetime.today().replace(day=1, hour=4, minute=20, second=0, microsecond=0)
date = thisMonth - relativedelta(months=relativeMonth)
if(absoluteMonth != 0):
date = datetime.today()
while(date.month != absoluteMonth):
date = date - relativedelta(months=1)
return date
@classmethod
def get(cls, value: str):
""" Gets a certain CustomSheet instance by its name """
if(isinstance(value, datetime)):
value = CustomSheet.getCustomSheet(value)
for instance in cls.instances:
if instance.name == value:
return instance
@classmethod
def getAll(cls):
return cls.instances
@classmethod
def append(cls, instance) -> None:
if isinstance(instance, list):
instances = instance
for instance in instances:
CustomSheet.append(instance)
return
assert isinstance(instance, CustomSheet)
if(instance not in cls.instances):
cls.instances.append(instance)
@staticmethod
def initializeAll():
""" Helpfunction that initializes all sheets
of the last four months. """
initialized = []
for i in range(0, 4):
try:
initialize = CustomSheet(CustomSheet.getTime(i))
logger.info(f'Building CustomSheet Cache {initialize.name} [iteration {i+1}/4]')
initialize.getCustomSheets()
logger.debug(f'Sheet data [iteration {i+1}]: {initialize.sheetData}')
initialized.append(initialize)
logger.info(f'###- PASSED CUSTOMSHEET CACHE [iteration {i+1}/4]')
sleep(12)
except EOFError as e:
# Fallback in case a file was deleted on Google Docs
logger.exception(f'Skipping month trying to autorecover [iteration {i+1}/4]')
continue
return initialized
@classmethod
def searchEntry(cls, sentry):
""" Searches a specific Entry in all available instances """
found = []
for instance in cls.instances:
for entry in instance.sheetData:
if(entry == sentry):
found.append(entry)
return found
@classmethod
def filterEntry(cls, field, value):
found = []
for instance in cls.instances:
found.extend(instance.filter(field=field, value=value))
return found
@staticmethod
def conv(*entry_list):
""" Used to combine multiple search criteria using .filter()
Only keeps entries that are available in all lists of <entry_list> """
seen = set()
repeated = set()
for entries in entry_list:
for entry in set(entries):
if entry in seen:
repeated.add(entry)
else:
seen.add(entry)
return list(repeated)
def updateSynonyms(self) -> None:
self.synonyms = []
self.synonyms.extend(Synonyms.update(self))
logger.debug(f'New Synonyms: {self.synonyms}')
@classmethod
def searchSynonyms(cls, xSynonyms: list, typ: str='', name: str='') -> list:
found = []
if isinstance(synonym, str):
synonym = [synonym]
for instance in cls.instances:
for xSynonym in xSynonyms:
for synonym in instance.synonyms:
if(synonym.matches(synonym=xSynonym, typ=typ, name=name)):
found.append(synonym)
logger.debug(f'SYNONYM {xSynonyms} FOUND; {found}')
filtered = Synonyms.filter(found)
logger.info(f'Synonym {xSynonyms} found {filtered}')
return filtered
modules/Entry.py
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class Entry():
HEADERS = ['Abr', 'Kunde', 'Tätigkeit', 'Techniker', 'AZ Anfang', 'AZ Ende', 'Dauer',
'AZ Abzug', 'Anfahrt', 'AZ Typ', 'Bemerkung', 'Freigegeben', '', '', '', '',
'Wartung Anfang', 'Wartung Ende']
def __init__(self, *args, **kwargs):
"""
**kwargs {
'Datum': datetime.datetime(2020, 2, 6, 0, 0),
'pos': < modules.PositionRange.PositionRange object at 0x1101f41d0 > ,
'Abr': '',
'Kunde': 'Test',
'Tätigkeit': 'Something',
'Techniker': 'T2',
'AZ Anfang': '14:00',
'AZ Ende': '15:30',
'Dauer': '1,50',
'AZ Abzug': '0',
'Anfahrt': '',
'AZ Typ': '4',
'Bemerkung': 'b.A.',
'Freigegeben': 'nein',
'Wartung Anfang': '',
'Wartung Ende': '',
...
}
"""
self.abr = kwargs.get('Abr', '').strip().upper()
self.kunde = kwargs.get('Kunde', '').strip().upper()
self.tätigkeit = kwargs.get('Tätigkeit', '').strip().upper()
self.techniker = kwargs.get('Techniker', '').strip().upper()
self.anfang = kwargs.get('AZ Anfang', '')[0:5].replace('24:', '00:')
self.ende = kwargs.get('AZ Ende', '')[0:5].replace('24:', '00:')
self.dauer = kwargs.get('Dauer', '').strip().upper()
self.abzug = kwargs.get('AZ Abzug', '').strip().upper()
self.anfahrt = kwargs.get('Anfahrt', '').strip().upper()
self.typ = kwargs.get('AZ Typ', '').strip().upper()
self.bemerkung = kwargs.get('Bemerkung', '')
self.freigegeben = kwargs.get('Freigegeben', '')
self.wartunganfang = kwargs.get('Wartung Anfang', '')
self.wartungende = kwargs.get('Wartung Ende', '')
self.datum = kwargs.get('Datum')
self.sheet = kwargs.get('sheet').strip().upper()
self.pos = kwargs.get('pos')
self.ref = kwargs.get('ref')
self.sync = datetime.now()
try:
hanfang, manfang = self.anfang.split(':')
hende, mende = self.ende.split(':')
self.dtanfang = self.datum.replace(hour=int(hanfang), minute=int(manfang))
self.dtende = self.datum.replace(hour=int(hende), minute=int(mende))
except Exception as e:
self.dtanfang = self.datum
self.dtende = self.datum
#logger.debug(f'DT: {self}: {e}')
def __str__(self):
return f'{self.kunde} @ {self.techniker} {self.dauer} {self.datum.strftime("%d/%b")} ({self.sheet}{self.pos}) [{self.ref.name}]'
def __repr__(self):
return str(self.__dict__)
def __hash__(self):
return hash(f'{self.datum}{self.sheet}{self.kunde}{self.tätigkeit}{self.techniker}{self.typ}')
def __eq__(self, other):
try:
if(self.datum == other.datum
and self.kunde == other.kunde
and self.techniker == other.techniker
and self.tätigkeit == other.tätigkeit):
return True
else:
return False
except Exception as e:
logger.exception('You may only compare this to another Eintrag object.')
def __lt__(self, other):
if(self.sheet == other.sheet):
return self.dtanfang < other.dtanfang
else:
return self.sheet < other.sheet
def __le__(self, other):
if(self.sheet == other.sheet):
return self.dtanfang <= other.dtanfang
else:
return self.sheet <= other.sheet
def __ne__(self, other):
return not(self == other)
def __gt__(self, other):
if(self.sheet == other.sheet):
return self.dtanfang > other.dtanfang
else:
return self.sheet > other.sheet
def __ge__(self, other):
if(self.sheet == other.sheet):
return self.dtanfang >= other.dtanfang
else:
return self.sheet >= other.sheet
@classmethod
def from_customsheet(cls, ref, sheetName, sheetRow, posRange):
""" Creates an Entry from a sheetData dict """
logger.debug(f'Creating entry from {sheetRow}')
logger.debug(f'POSRANGE: {posRange}')
date = datetime.strptime(f'{ref.datum.year} '
f'{ref.datum.month} '
f'{sheetName}', '%Y %m %d')
parseDict = {
'Datum': date,
'sheet': sheetName.upper(),
'pos': posRange,
'ref': ref
}
for i in range(0, len(Entry.HEADERS)):
if(Entry.HEADERS[i] != ''):
logger.debug(f'{Entry.HEADERS[i]}: {sheetRow[i]}')
parseDict.update({Entry.HEADERS[i]: sheetRow[i].strip()})
logger.debug(parseDict)
return cls(**parseDict)
def isValid(self):
if(Entry.stripString(self.techniker) != ''
or Entry.stripString(self.kunde) != ''
or Entry.stripString(self.tätigkeit) != ''):
return True
else:
return False
def isComplete(self):
if(Entry.stripString(self.techniker) != ''
and Entry.stripString(self.kunde) != ''
and Entry.stripString(self.tätigkeit) != ''
and Entry.stripString(self.anfang != '')
and Entry.stripString(self.ende != '')):
return True
else:
return False
@staticmethod
def stripString(string):
string = string.strip()
string = string.replace('\\r\\n','')
string = string.replace('\r\n','')
string = string.replace(' ', '')
return string
modules/PositionRange.py
import logging
logger = logging.getLogger(__name__)
class PositionRange():
def __init__(self, p1=None, p2=None):
self.p1 = str(p1).upper().replace('!','') or ''
self.p2 = str(p2).upper() or p1
def __str__(self):
if(self.p1 and self.p2):
return f'!{self.p1}:{self.p2}'
elif(self.p1):
return f'!{self.p1}'
else:
return ''
def __repr__(self):
return f'{self.p1}:{self.p2}'
@classmethod
def from_str(cls, posRange):
""" Class from stringified version e.g. A1:F10 """
try:
p1, p2 = posRange.split(':')
except:
p1 = posRange.split(':')
p2 = p1
return cls(p1, p2)
def p1_column(self):
"""
Gibt den Buchstaben für p1
der aktuellen POSRange zurück
"""
chars = 0
for char in self.p1:
if(char.isalpha()):
chars += 1
return self.p1[0:chars]
def p2_column(self):
"""
Gibt den Buchstaben für p2
der aktuellen POSRange zurück
"""
chars = 0
for char in self.p2:
if(char.isalpha()):
chars += 1
return self.p2[0:chars]
def p1_column_number(self):
"""
Holt den Alphanumerischen Wert für p1, also
den für den Buchstaben den Index
"""
chars = 0
for char in self.p1:
if(char.isalpha()):
chars += 1
x = (chars - 1) * 25
x = x + (ord(self.p1[chars-1].lower()) - 97)
return x
def p2_column_number(self):
"""
Holt den Alphanumerischen Wert für p2, also
den für den Buchstaben den Index
"""
chars = 0
for char in self.p2:
if(char.isalpha()):
chars += 1
x = (chars - 1) * 25
x = x + (ord(self.p2[chars-1].lower()) - 97)
return x
def p1_row(self):
if(len(self.p1) <= 1):
return 1
else:
x = ''
for c in self.p1:
if(c.isdigit()):
x = x + c
return int(x)
def p2_row(self):
if(len(self.p2) <= 1):
return 999999999
else:
x = ''
for c in self.p2:
if(c.isdigit()):
x = x + c
return int(x)
def column_length(self):
"""
Rechnet aus, wie groß Zeilenrange ist, indem
der Abstand zwischen beiden berechnet wird
(bspw. für A4!M => müsste 12 sein)
"""
length = self.p2_column_number() - self.p1_column_number()
return length
def column_index(self, column):
indexLength = self.column_length()
indexStart = self.p1_column_number()
indexFind = (ord(column.lower()) - 97)
index = indexLength - (indexLength - (indexFind - indexStart))
return index
def column_headers(self, row=1):
for char in pos:
if(self.p1[0].isalpha()):
p1 = f'{self.p1[0]}{row}'
if(self.p2[0].isalpha()):
p2 = f'{self.p2[0]}{row}'
return PositionRange(p1, p2)
def increment_row(self):
row = str(self.p1_row() + 1)
self.p1 = self.p1_column() + row
self.p2 = self.p2_column() + row
def decrement_row(self):
row = str(self.p1_row() - 1)
self.p1 = self.p1_column() + row
self.p2 = self.p2_column() + row
modules/Synonyms.py
from collections import Counter
import logging
logger = logging.getLogger(__name__)
class Synonyms():
def __init__(self, *args, **kwargs):
self.synonym = kwargs.get('synonym', '').strip().upper()
self.sheet = kwargs.get('sheet', '').strip().upper()
self.typ = kwargs.get('typ', '').strip().upper()
self.ref = kwargs.get('ref')
def __str__(self):
return f'{self.synonym} ({self.sheet}) [{self.ref.name}]'
def __repr__(self):
return str(self.__dict__)
def __eq__(self, other: str):
if(self.synonym == other.strip().upper()):
return True
else:
return False
def matches(synonym: str, typ: str, name: str) -> bool:
if(self == synonym.upper().strip()):
if(typ and self.typ != typ.upper().strip()):
return False
if(name and self.ref.name.upper().strip() != name.upper().strip()):
return False
return True
else:
return False
@staticmethod
def update(instance) -> None:
logger.info(f'Updating Synonyms for {instance.name}...')
typ = instance.TYP
if(typ == 'CustomSheet'):
return Synonyms.updateCustomSheet(instance)
elif(typ == 'Projektliste'):
return Synonyms.updateOtherCustomSheet(instance)
else:
logger.error(f'Cannot update synonyms. {typ} is unknown instance.')
@staticmethod
def updateCustomSheet(instance) -> None:
synonyms = []
typ = instance.TYP
synonym = {'synonym': instance.name, 'sheet': '', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
synonym = {'synonym': instance.name.replace('Sheet ', ''), 'sheet': '', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
if(instance.datum.month == instance.getTime().month):
synonym = {'synonym': 'CURRENT', 'sheet': '', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
synonym = {'synonym': 'SYNC', 'sheet': 'SYNC', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
synonym = {'synonym': 'PJ-SYNC', 'sheet': 'PJ-SYNC', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
elif(instance.datum.month == instance.getTime(1).month):
synonym = {'synonym': 'PREVIOUS', 'sheet': '', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
for sheet in instance.sheets:
sheetName = sheet['properties']['title']
if(sheetName.isdigit()):
x = instance.datum.replace(day=int(sheetName))
synonym = {'synonym': x.strftime('%d%m%Y'), 'sheet': '', 'name': sheetName, 'typ': typ}
synonyms.append(Synonyms(synonym))
return synonyms
@staticmethod
def filter(synonyms):
""" Filters synonyms list to from .searchSynonyms()
for the Greatest Common Denominator """
greatestCommon = Counter(synonym.ref for synonym in synonyms if synonym.ref)
maxOccurences = 0
for name, occurences in greatestCommon.most_common():
if(occurences == maxOccurences):
raise EOFError(f'Search synonym no max determinable for {greatestCommon}')
elif(occurences > maxOccurences):
maxOccurences = occurences
try:
spreadsheet = greatestCommon.most_common(1)[0][0]
subSearch = [synonym for synonym in synonyms if synonym.ref == spreadsheet]
greatestCommon = Counter(xsearch.sheet for xsearch in subSearch if xsearch.sheet)
sheetName = greatestCommon.most_common(1)[0][0]
# just making sure
result = [x for x in subSearch if x.sheet == sheetName]
logger.debug(f'FILTERED SYNONYM: {result}')
return result[0]
except IndexError as e:
raise EOFError(f'No synonyms specified in {synonyms}') from None
settings.py
"""
Dummy account for Stackoverflow with two sheets using
|- https://stackoverflow.com/questions/19766912/how-do-i-authorise-an-app-web-or-installed-without-user-intervention
"""
CLIENT_ID = '255572645365-h0b1joml2eml85045u1htq062scebu4m.apps.googleusercontent.com'
CLIENT_SECRET = 'Mtx71-OaHyfHyZs6zxSFbJHR'
REFRESH_TOKEN = '1//04dwAK3oaiVrmCgYIARAAGAQSNwF-L9IrmzgKSCRRNMTGiPm9Ih-mCtsv5iIlJpPemHeHpoW7CzM85VxlxbobeoaP3j1uXxt5UvY'
PROXY = ''
example.py
import pickle
import os
from time import sleep
from modules.CustomSheet import CustomSheet
from modules.Synonyms import Synonyms
from modules.Entry import Entry
import logging
import logging.handlers
logger = logging.getLogger(__name__)
CACHE_PICKLE = 'GoogleCache.dat'
CACHE_DIR = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
logging.basicConfig(
format='[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s():%(lineno)d] – %(message)s',
datefmt='%Y/%m/%d %H:%M:%S',
level=logging.DEBUG,
handlers=[
logging.StreamHandler(),
]
)
def saveCache(sheetDataX):
logger.info('Saving cache...')
path = os.path.join(CACHE_DIR, CACHE_PICKLE)
logger.debug(f'Path: {path}')
with open(path, "wb") as f:
pickle.dump(sheetDataX, f, pickle.HIGHEST_PROTOCOL)
sleep(.5)
def loadCache():
logger.info('Loading cache...')
try:
path = os.path.join(CACHE_DIR, CACHE_PICKLE)
with open(path, "rb") as f:
cache = pickle.load(f)
CustomSheet.append(cache)
logger.debug(f'Cache: {cache}')
return cache
except FileNotFoundError as e:
logger.exception('Offline cache store not found. Was probably deleted; recreating completely...')
return buildCache()
def buildCache():
x = CustomSheet.initializeAll()
saveCache(x)
if __name__ == '__main__':
buildCache()
-
\$\begingroup\$ I created a Google dummy account that can be used to fiddle around with the API straight out of the box! Let me know if you run in any issues & happy about feedback! \$\endgroup\$schlumpfpirat– schlumpfpirat2020年05月16日 19:34:33 +00:00Commented May 16, 2020 at 19:34
1 Answer 1
I have rewritten the code you've provided in sheet.py
. Whilst it's pretty much an entire rewrite I believe the problems with the code aren't that drastic.
Be more scared of side effects and partially initialized classes.
I feel 'side effect' is a loaded term. If you look it up then you're bound to find FP zealots saying it's the spawn of Satan. Whilst OOP lovers will say it's FP scaremongering.
Either way your over-reliance on side effects in your code is making my life harder, as determining the state
Sheet
is in is much harder. Personally I would remove all, bar one, side effects fromSheet
.Don't be scared of making small classes.
I feel the largest problem with the code is the lack of a
GoogleSession
class that interacts withrequests.Session
. We can see this problem manifest inerrerResilience
.Centralized Error Handling for API Calls. Would ideally be a decorator, however working with different slices and indices (e.g. refreshToken) in return values doesn't make this possible(?)
This is not the best design. Instead if you wrap an immutable
requests.Session
object in your ownGoogleSession
then you can build aget
method that does this on each request. The benefit of doing it at this level is that you have the raw request and so you can just try over and over until it works. Additionally it looks like you're just callingrequests.Session
. making the calling code have additional functionality almost seamlessly.The functionality
Sheet
provides would be better as a library.By only passing
GoogleSession
toSheet
and removing all side effects you should notice my plan forSheet
is vastly different to what it is right now. By following both of these all methods will need to be passed the sheet's information as arguments.This makes the code easier to follow as now there are no strange, and needless, side effects when interacting with
Sheet
. The code is also now ridiculously short.You should follow composition over inheritance.
Whilst I think it's dumb to have COI as a principle, I do however agree that for many programmers it's much easier to get composition right. Good use of inheritance is notoriously hard to teach as many bad guides use shapes as an example.
I should note that the rest of my answer has suggested using composition;
Sheet
usesGoogleSession
, whereGoogleSession
usesrequests.Session
. I'm also suggestingCustomSheet
useSheet
rather than inherit from it.
You have some additional problems:
- You have too much logging for my taste. If you just log on each request to debug then you don't really need any more.
- Having
logger.error
followed by araise Exception
just feels wrong to me. Either the exception will be handled in which case you logging it as an error is erroneous, or the exception won't be handled and you'll get the error and a traceback when the program halts. - Many of the log messages in
errorResilience
are juvenile.
Below is the changes I made to sheets.py
. Unfortunately I do not have the time to review more than just this file. Please think about editing your code to follow some of the changes I made, and potentially post a follow up question.
Note: Untested
import requests
from time import sleep
from random import randint
from modules.PositionRange import PositionRange
import logging
logger = logging.getLogger(__name__)
from . import settings
class GoogleError(Exception):
def __init__(self, code, message):
super().__init__(message)
self.code = code
self.message = message
def __repr__(self):
return f'GoogleError({self.code!r}, {self.message!r})'
def __str__(self):
return f'[{self.code}] {self.message}'
class GoogleSession:
def __init__(self, session: requests.Session) -> None:
self._token = None
self.session = session
def get(self, *args: Any, **kwargs: Any) -> Any:
for _ in range(5):
if self._token is None:
self.update_token(self.get_oauth_token())
r = self.session.get(*args, **kwargs)
data = r.json()
if 'error' not in data:
return data
error = data['error']
self._handle_error(error['code'], error['message'])
raise GoogleError(error['code'], error['message'])
def _handle_error(self, code: int, message: str) -> None:
logger.debug(f'[{code}] {message}')
if code in (401, 403):
self.update_token(self.get_oauth_token())
elif code == 429:
tsleep = 100 + randint(10, 50)
logger.warn(f'API limit exceeded. Auto-recovering by waiting {tsleep}s.')
sleep(tsleep)
else:
raise GoogleError(code, message)
def get_oauth_token(self) -> str:
data = self.get(
'https://www.googleapis.com/oauth2/v4/token',
headers={
'Content-Type': 'application/x-www-form-urlencoded'
},
data={
'client_id': settings.CLIENT_ID,
'client_secret': settings.CLIENT_SECRET,
'refresh_token': settings.REFRESH_TOKEN,
'grant_type': 'refresh_token'
},
)
return data['access_token']
def update_token(self, token: str) -> None:
self._token = token
self.session.headers.update({'Authorization': f'Bearer {Sheets.accessToken}'})
class SheetHelper:
def __init__(self, session: GoogleSession) -> None:
self.session = session
def get_id(self, name: str) -> str:
data = self.session.get(
'https://www.googleapis.com/drive/v3/files',
params={'q': f'name = "{name}"'},
)
return data['files'][0]['id']
def get_info(self, id: str) -> dict:
return self.session.get(f'https://sheets.googleapis.com/v4/spreadsheets/{id}')
def get_sheet(self, id: str, name: str, range: PositionRange):
return self.session.get(
f'https://sheets.googleapis.com/v4/spreadsheets/{id}/values/{requests.utils.quote(name)}{range}'
)
def get(self, name: str, id: Optional[str] = None):
if id is None:
id = self.getSpreadsheet(name)
info = self.get_info(id)
return (
id,
name,
info['properties']['title'],
info['sheets'],
)
-
\$\begingroup\$ It's genius for how simple it appears while providing a lot of complexity. Thanks for the differentiated feedback; it really helps to see another version of the code! Even though
Sheets
was the only class I was happy with and saw the other classes as more of an issue, your approach just feels right. I do have some questions indeed though and hope you can clear them up! Will post in a second comment. \$\endgroup\$schlumpfpirat– schlumpfpirat2020年05月17日 12:09:52 +00:00Commented May 17, 2020 at 12:09 -
\$\begingroup\$ The Google Sheets API also utilizes
POST
,DELETE
andPUT
requests. How could they be incorporated without having too much redundant code regarding the token and error handling? Speaking of the_handle_error
, it appears like the API limit "auto-recovery" - by simply recalling the function - is no longer in scope; any reason for that? The application layer could handle it - sure, however I thought it would be smart if I moved as much API-specific complexity as I could to the API-specific class. \$\endgroup\$schlumpfpirat– schlumpfpirat2020年05月17日 12:12:33 +00:00Commented May 17, 2020 at 12:12 -
\$\begingroup\$
GoogleSession.get
has a loopfor _ in range(5):
, what's its purpose? And finally – by side effects you're referring to theSheets.accessToken
variable I assume? I didn't have this initially, however as the number ofCustomSheet
instances grew, every initialization led to a new token and therefore additional stress on the API, which I tried to avoid by all instances sharing the same token in an easily understanable manner. Your solution is much better however by just creating a passable session object. \$\endgroup\$schlumpfpirat– schlumpfpirat2020年05月17日 12:19:32 +00:00Commented May 17, 2020 at 12:19 -
1\$\begingroup\$ @schlumpfpirat 1 I would implement them without a care for "too much redundant code". You should only fix issues you can see, not ones originating from paranoia. (I too had this problem, just let go ;) 2 I'm not sure what you mean by this. The API limit auto-recovery is handled by the
for _ in range(5):
loop. 3 This retries the request 5 times before deciding to error, note how 401, 403 and 429 don't raise errors in_handle_error
. 4 Yes, modifyingSheets.accessToken
orself.spreadsheetId
are side effects - they're modifying state rather than returning. \$\endgroup\$2020年05月17日 12:24:33 +00:00Commented May 17, 2020 at 12:24 -
1\$\begingroup\$ @schlumpfpirat At programming and reviewing I'm ok, but teaching I'm really bad. On Code Review we have a thing called an iterative review, this means it's ok to try and follow my advice to the best of your ability and ask another question! If you get the above working and integrate it into your code I'd be happy to review other parts of your code. \$\endgroup\$2020年05月17日 13:11:05 +00:00Commented May 17, 2020 at 13:11