3
\$\begingroup\$

Hardware: Raspberry Pi 3B+, Elecrow touchscreen, DFR0660 Barcode Scanner, ADS1115 ADC

I would like any bad practices/possible failure points pointed out specifically in the loop(), read_adc(), and catheter_test() functions. The write_to_report() and measure_single_catheter() functions were removed. The system is fully functional.

import sys
from time import sleep
import RPi.GPIO as GPIO
from os import system
from datetime import datetime
from fpdf import FPDF
from fpdf.enums import XPos, YPos
import pyautogui
import pygame
from itertools import chain
import subprocess
from hw_init import *
from cath_test_init import *
from gui_init import *
sys.path.append('/usr/lib/python39.zip')
# !/usr/bin python3
# VERBOSE LOGGING FOR TROUBLESHOOTING
if sys.argv[1] == 'log':
 logfile_date = datetime.now().strftime("%m_%d_%Y")
 logfile_name = 'logfile' + logfile_date + '.txt'
 sys.stdout = open(logfile_name, 'w')
def log_print(string):
 logfile_now = datetime.now().strftime("%m_%d_%Y_%H:%M:%S")
 print(logfile_now + "\t" + string)
class PDF(FPDF):
 # Page footer
 def footer(self):
 self.set_y(-40)
 self.set_font('Times', 'I', 12)
 # Page number {nb} comes from alias_nb_page
 self.cell(0, 10, 'Model Selected: ' + MODEL_SELECTED +
 ' Sofware Version:' + SOFTWARE_VERSION,
 new_x=XPos.LMARGIN, new_y=YPos.NEXT, align='L')
 self.cell(0, 10, 'Page ' + str(self.page_no()) + '/{nb}',
 new_x=XPos.RIGHT, new_y=YPos.TOP, align='C')
# AUDIO SETUP
FAIL_SOUND = 'fail.mp3'
PASS_SOUND = 'pass.mp3'
PLAY_SOUND = True
pygame.init()
pygame.mixer.init()
'''Loading and playing sounds in order to load dependencies of library'''
pygame.mixer.music.load(FAIL_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
 pygame.time.Clock().tick(10)
pygame.mixer.music.load(PASS_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
 pygame.time.Clock().tick(10)
def get_ip():
 ip = subprocess.run(['hostname', '-I'],
 stdout=subprocess.PIPE).stdout.decode('utf-8')
 return ip
def terminate_script():
 window.close()
 sys.exit("Terminated at admin request")
def reboot_system():
 system('sudo reboot')
def shutdown_system():
 system('sudo shutdown -h now')
def set_bias_mux_to_low_range():
 log_print("set_bias_mux_to_low_range() called")
 GPIO.output(A_BIAS_MUX, GPIO.LOW)
 log_print("set_bias_mux_to_low_range() returning")
def set_bias_mux_to_hi_range():
 log_print("set_bias_mux_to_hi_range() called")
 GPIO.output(A_BIAS_MUX, GPIO.HIGH)
 log_print("set_bias_mux_to_hi_range() returning")
def set_dut_mux_to_input_res():
 log_print("set_dut_mux_to_input_res() called")
 GPIO.output(A_DUT_MUX, GPIO.LOW)
 GPIO.output(B_DUT_MUX, GPIO.LOW)
 log_print("set_dut_mux_to_input_res() returning")
def set_dut_mux_to_output_res():
 log_print("set_dut_mux_to_output_res() called")
 GPIO.output(A_DUT_MUX, GPIO.HIGH)
 GPIO.output(B_DUT_MUX, GPIO.LOW)
 log_print("set_dut_mux_to_output_res() returning")
def reset_mouse_position():
 # log_print("reset_mouse_position() called")
 pyautogui.moveTo(700, 160)
 # log_print("reset_mouse_position() returning")
def no_blank_screen():
 log_print("no_blank_screen() called")
 cmd_list = ['xset s noblank', 'xset -dpms', 'xset s off']
 for command in cmd_list:
 system(command)
 log_print("no_blank_screen() returning")
def audio_feedback(local_result):
 log_print("audio_feedback(%s) called" % local_result)
 global PLAY_SOUND
 if local_result == 'FAIL':
 pygame.mixer.music.load(FAIL_SOUND)
 pygame.mixer.music.play()
 while pygame.mixer.music.get_busy():
 pygame.time.Clock().tick(10)
 elif local_result == 'PASS':
 pygame.mixer.music.load(PASS_SOUND)
 pygame.mixer.music.play()
 while pygame.mixer.music.get_busy():
 pygame.time.Clock().tick(10)
 else:
 print('result is invalid. Cant play audio')
 if PLAY_SOUND:
 PLAY_SOUND = False
 log_print("audio_feedback(%s) returning" % local_result)
def gui_frame_msngr_update(frame_to_show,
 new_current_process_message='No message'):
 user_messager_window.update(new_current_process_message)
 current_frame_visibility = [frame1.visible, frame2.visible, frame3.visible,
 frame4.visible, frame5.visible,
 frame6.visible]
 frame_update = [False, True, False, False, False, False, False, False,
 False]
 if frame_to_show == 1:
 frame_update = [True, False, False, False, False, False, False, False,
 False]
 elif frame_to_show == 2:
 frame_update = [False, True, False, False, False, False, False, False,
 False]
 elif frame_to_show == 3:
 reset_mouse_position()
 frame_update = [False, False, True, False, False, False, False, False,
 False]
 keypad_message_box.update(new_current_process_message)
 elif frame_to_show == 4:
 frame_update = [False, False, False, True, False, False, False, False,
 False]
 pass_test_text_box.update(new_current_process_message)
 elif frame_to_show == 5:
 frame_update = [False, False, False, False, True, False, False, False,
 False]
 fail_test_text_box.update(new_current_process_message)
 elif frame_to_show == 6:
 frame_update = [False, False, False, False, False, True, False, False,
 False]
 elif frame_to_show == 7:
 frame_update = [False, False, False, False, False, False, True, False,
 False]
 elif frame_to_show == 8:
 frame_update = [False, False, False, False, False, False, False, True,
 False]
 frame8_message_box.update(new_current_process_message)
 elif frame_to_show == 9:
 frame_update = [False, False, False, False, False, False, False, False,
 True]
 frame9_ip_add_box.update(new_current_process_message)
 if not (current_frame_visibility == frame_update):
 frame1.update(visible=frame_update[0])
 frame2.update(visible=frame_update[1])
 frame3.update(visible=frame_update[2])
 frame4.update(visible=frame_update[3])
 frame5.update(visible=frame_update[4])
 frame6.update(visible=frame_update[5])
 frame7.update(visible=frame_update[6])
 frame8.update(visible=frame_update[7])
 frame9.update(visible=frame_update[8])
 window.refresh()
def show_calcheck_results(measurements, button_text, cal_reset):
 log_print("show_calcheck_results([%.3f,%.3f], %s)" % (
 measurements[0], measurements[1], button_text))
 if button_text == 'CHECK LOW RANGE':
 add_string = 'INSERT HIGH-RANGE\n' \
 'RESISTANCE FIXTURE AND PRESS\n' \
 '\'CHECK HIGH RANGE\''
 if cal_reset:
 res_message = 'UPPER-END RESISTANCE: %.3f\n' \
 'LOWER-END RESISTANCE: %.3f\n%s' % \
 (round(measurements[0], 3),
 round(measurements[1], 3),
 add_string)
 else:
 res_message = 'INPUT RESISTANCE: %.3f\n' \
 'OUTPUT RESISTANCE: %.3f\n%s' % \
 (round(measurements[0], 3),
 round(measurements[1], 3),
 add_string)
 elif button_text == 'CHECK HIGH RANGE':
 add_string = 'PRESS \'APPROVE & EXIT\'\n' \
 'OR \'REDO CAL REF\'\n' \
 'IF OUT OF TOLERANCE'
 if cal_reset:
 res_message = 'UPPER-END RESISTANCE: %.3f\n' \
 'LOWER-END RESISTANCE: %.3f\n%s' % \
 (round(measurements[0], 3),
 round(measurements[1], 3),
 add_string)
 else:
 res_message = 'INPUT RESISTANCE: %.3f\n' \
 'OUTPUT RESISTANCE: %.3f\n' \
 '%s' % \
 (round(measurements[0], 3),
 round(measurements[1], 3),
 add_string)
 else:
 res_message = 'ERROR IN show_calcheck_results\nCONTACT ENGINEERING'
 log_print("show_calcheck_results() returning")
 return res_message
def show_results(cath_test_result, barcode):
 log_print("show_results(%s, %s) called" % (cath_test_result, barcode))
 if SHOW_LAST_CATH_GUI_MSG:
 message = 'JOB FINISHED\nUNPLUG CATHETER\nTO PRINT REPORT\n%s: %s' % (
 barcode, cath_test_result)
 else:
 if REPEATED_CATHETER_DETECTED:
 message = 'REPEATED CATHETER\nREADY FOR\nNEXT CATHETER\n%s: %s' % (
 barcode, cath_test_result)
 else:
 message = 'READY FOR\nNEXT CATHETER\n%s: %s' % (
 barcode, cath_test_result)
 log_print("show_results() returning")
 return message
def alrt_rdy_func_enable():
 log_print("alrt_rdy_func_enable() called")
 i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_LOTHRESH_ADDR,
 LOTHRESH_CONFIGURATION_DATA)
 i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_HITHRESH_ADDR,
 HITHRESH_CONFIGURATION_DATA)
 log_print("alrt_rdy_func_enable() returning")
def configure_adc(config_reg_data):
 i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_CONFIG_ADDR, config_reg_data)
def isr_enable():
 log_print("isr_enable() called")
 GPIO.add_event_detect(ADS1115_ALRT_RDY_SIG, GPIO.FALLING)
 GPIO.add_event_callback(ADS1115_ALRT_RDY_SIG, read_adc)
 log_print("isr_enable() returning")
def isr_disable():
 log_print("isr_disable() called")
 # GPIO.remove_event_detect(ADS1115_ALRT_RDY_SIG)
 log_print("isr_disable() returning")
def adc_decode(adc_codes):
 msb = adc_codes[0] << 8
 lsb = adc_codes[1]
 adc_code = msb | lsb
 voltage = adc_code * LSB
 return voltage
def volt2res(voltage):
 log_print("volt2res(%f) called" % voltage)
 high_range_correction_factor = \
 (initial_m_high_range * voltage) + initial_b_high_range
 low_range_correction_factor = \
 (initial_m_low_range * voltage) + initial_b_low_range
 dynamic_high_range_correction_factor = \
 (dynamic_m_high_range * voltage) + dynamic_b_high_range
 dynamic_low_range_correction_factor = \
 (dynamic_m_low_range * voltage) + dynamic_b_low_range
 if MEASURE_OUTPUT_RESISTANCE:
 if CATH_MODEL is MODEL_P16x_SEL:
 correction_factor = low_range_correction_factor
 dynamic_correction_factor = dynamic_low_range_correction_factor
 tfco = TFCO_LOW
 elif CATH_MODEL is MODEL_P330_SEL:
 correction_factor = high_range_correction_factor
 dynamic_correction_factor = dynamic_high_range_correction_factor
 tfco = TFCO_HIGH
 elif CATH_MODEL is MODEL_P330B_SEL:
 correction_factor = high_range_correction_factor
 dynamic_correction_factor = dynamic_high_range_correction_factor
 tfco = TFCO_HIGH
 else:
 tfco = TFCO_LOW
 correction_factor = low_range_correction_factor
 dynamic_correction_factor = dynamic_low_range_correction_factor
 print("Something went wrong when selecting the correction "
 "factor during output resistance "
 "measurement. CATH_MODEL = ", CATH_MODEL)
 else:
 if CATH_MODEL is MODEL_P16x_SEL:
 correction_factor = low_range_correction_factor
 dynamic_correction_factor = dynamic_low_range_correction_factor
 tfco = TFCO_LOW
 elif CATH_MODEL is MODEL_P330_SEL:
 correction_factor = high_range_correction_factor
 dynamic_correction_factor = dynamic_high_range_correction_factor
 tfco = TFCO_HIGH
 elif CATH_MODEL is MODEL_P330B_SEL:
 correction_factor = low_range_correction_factor
 dynamic_correction_factor = dynamic_low_range_correction_factor
 tfco = TFCO_LOW
 else:
 tfco = TFCO_LOW
 correction_factor = low_range_correction_factor
 dynamic_correction_factor = dynamic_low_range_correction_factor
 print("Something went wrong when selecting the correction factor "
 "during input resistance measurement. "
 "CATH_MODEL = ", CATH_MODEL)
 transfer_func = RGAIN1 * ((10 * ((voltage - V_BIAS) * tfco + V_BIAS)) - 1)
 r_dut = transfer_func - correction_factor - dynamic_correction_factor
 log_print("volt2res() returning")
 return r_dut
def configure_measurement():
 global MODEL_RES, MODEL_TOL, V_BIAS, RANGE_LIMIT_LOW, RANGE_LIMIT_HIGH
 log_print("configure_measurement() called")
 if MEASURE_OUTPUT_RESISTANCE:
 set_dut_mux_to_output_res()
 if CATH_MODEL is MODEL_P16x_SEL:
 set_bias_mux_to_low_range()
 MODEL_RES = MODEL_P16x_OUTPUT_RES
 MODEL_TOL = MODEL_P16x_TOL
 V_BIAS = V_BIAS_LOW
 RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
 RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
 elif CATH_MODEL is MODEL_P330_SEL:
 set_bias_mux_to_hi_range()
 MODEL_RES = MODEL_P330_OUTPUT_RES
 MODEL_TOL = MODEL_P330_OUTPUT_TOL
 V_BIAS = V_BIAS_HIGH
 RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
 RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
 elif CATH_MODEL is MODEL_P330B_SEL:
 set_bias_mux_to_hi_range()
 MODEL_RES = MODEL_P330B_OUTPUT_RES
 MODEL_TOL = MODEL_P330B_OUTPUT_TOL
 V_BIAS = V_BIAS_HIGH
 RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
 RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
 else:
 log_print("InvalidModelError:model index "
 "value is invalid (model=%d)" % CATH_MODEL)
 else:
 set_dut_mux_to_input_res()
 if CATH_MODEL is MODEL_P16x_SEL:
 set_bias_mux_to_low_range()
 MODEL_RES = MODEL_P16x_INPUT_RES
 MODEL_TOL = MODEL_P16x_TOL
 V_BIAS = V_BIAS_LOW
 RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
 RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
 elif CATH_MODEL is MODEL_P330_SEL:
 set_bias_mux_to_hi_range()
 MODEL_RES = MODEL_P330_INPUT_RES
 MODEL_TOL = MODEL_P330_INPUT_TOL
 V_BIAS = V_BIAS_HIGH
 RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
 RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
 elif CATH_MODEL is MODEL_P330B_SEL:
 set_bias_mux_to_low_range()
 MODEL_RES = MODEL_P330B_INPUT_RES
 MODEL_TOL = MODEL_P330B_INPUT_TOL
 V_BIAS = V_BIAS_LOW
 RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
 RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
 else:
 log_print("InvalidModelError:model index "
 "value is invalid (model=%d)" % CATH_MODEL)
 log_print("configure_measurement() returning")
def jobsize_capture(jobsize_string):
 global JOB_SIZE
 log_print("jobsize_capture(%s) called" % jobsize_string)
 if len(jobsize_string) > 0:
 JOB_SIZE = int(jobsize_string)
 else:
 log_print("NO VALUE ENTERED.")
 log_print("jobsize_capture() returning")
 return False
 if 0 < JOB_SIZE < MAX_CATHETERS_PER_JOB:
 log_print("jobsize_capture() returning")
 return True
 else:
 log_print("INVALID JOB SIZE.")
 log_print("jobsize_capture() returning")
 return False
def validate_job_number_barcode_scan(x):
 log_print("validate_job_number_barcode_scan(%s) called" % x.strip())
 try:
 if len(x) > 0 and (int(x.strip()) > 999 and int(
 x.strip()) < 100000000):
 code_validity = True
 else:
 code_validity = False
 except ValueError:
 log_print("Invalid Job Number")
 code_validity = False
 return code_validity
 log_print("validate_job_number_barcode_scan() returning")
 return code_validity
def jobnumber_capture():
 global JOB_NUMBER
 log_print("jobnumber_capture() called")
 for attempts in range(3):
 JOB_NUMBER, jobnumber_validity = barcode_scanner()
 if len(JOB_NUMBER) > 1:
 if validate_job_number_barcode_scan(JOB_NUMBER):
 log_print("jobnumber_capture() returning")
 return True
 else:
 log_print("jobnumber_capture() returning")
 return False
 if not (bool(JOB_NUMBER)):
 log_print("jobnumber_capture() returning")
 return False
def model_capture():
 global CATH_MODEL
 log_print("model_capture() called")
 if MODEL_SELECTED == 'P16x':
 CATH_MODEL = MODEL_P16x_SEL
 elif MODEL_SELECTED == 'P330':
 CATH_MODEL = MODEL_P330_SEL
 elif MODEL_SELECTED == 'P330B':
 CATH_MODEL = MODEL_P330B_SEL
 else:
 log_print("INVALID MODEL SELECTED!")
 log_print("model_capture() returning")
def waiting_for_manual_barcode():
 global KEYPAD_CATH_BARCODE, MANUAL_BARCODE_CAPTURED
 log_print("waiting_for_manual_barcode() called")
 while not MANUAL_BARCODE_CAPTURED:
 pass
 sleep(.1)
 MANUAL_BARCODE_CAPTURED = False
 local_barcode = KEYPAD_CATH_BARCODE
 log_print("waiting_for_manual_barcode() returning")
 return local_barcode
def validate_catheter_barcode_scan(x):
 log_print("validate_catheter_barcode_scan(%s) called" % x)
 try:
 if 5 < len(x) < 10:
 code_validity = True
 else:
 code_validity = False
 except TypeError:
 code_validity = False
 log_print("validate_catheter_barcode_scan() returning")
 return code_validity
def manual_catheter_barcode_entry():
 log_print("manual_catheter_barcode_entry() called")
 keypad_back_btn.update('')
 gui_frame_msngr_update(3, CATH_KEYPAD_MESSAGE)
 while True:
 event, values = window.read()
 if event in '1234567890':
 keys_entered = values['input'] # get what's been entered so far
 keys_entered += event # add the new digit
 window['input'].update(keys_entered)
 elif event == 'Submit':
 keys_entered = values['input']
 window['input'].update('')
 if validate_catheter_barcode_scan(keys_entered):
 cath_barcode = keys_entered
 break
 else:
 window['keypad_message'].update(CATH_KEYPAD_INVALID_MESSAGE)
 keys_entered = ''
 window['input'].update(keys_entered)
 elif event == 'Clear': # clear keys if clear button
 keys_entered = ''
 window['input'].update(keys_entered)
 log_print("manual_catheter_barcode_entry() returning")
 return cath_barcode
def barcode_scanner():
 log_print("barcode_scanner() called")
 ser.write(SCNR_TRGR_CMD_BYTES) # Write Hex command to trigger barcode read
 x = ser.read_until(
 b'\r') # \r is the last character returned by the barcode reader after
 # a barcode has been read. This character may change if
 # the scanner model changes.
 x = x.decode().split("31",
 1) # anything to the right of the
 # first 31 is the barcode.
 # '31' is the last 2-digit code returned by the reader.
 # This 2-digit code may have to change (or be removed)
 # if the scanner model changes.
 code_validity = validate_catheter_barcode_scan(x[1])
 log_print("barcode_scanner() returning")
 return x[1], code_validity
def print_report(local_report_filename):
 log_print("print_report(%s) called" % local_report_filename)
 cmd = 'sudo lp ' + local_report_filename
 system(cmd)
 log_print("print_report() returning")
def sort_data():
 log_print("sort_data() called")
 keys = list(cath_data_dict_unsorted_buffer.keys())
 keys.sort()
 catheter_data_dict_sorted = {key: cath_data_dict_unsorted_buffer[key]
 for key in keys}
 log_print("sort_data() returning")
 return catheter_data_dict_sorted
def correction_fctr_calc(xlist, ylist):
 log_print("correction_factor_calculation() called")
 delta_x_low_range = xlist[0] - xlist[1]
 delta_y_low_range = (ylist[0] - CAL_REF_RESISTANCE_LOWRANGE_HIGH) - (
 ylist[1] - CAL_REF_RESISTANCE_LOWRANGE_LOW)
 m_low_range = delta_y_low_range / delta_x_low_range
 b_low_range = \
 (ylist[0] - CAL_REF_RESISTANCE_LOWRANGE_HIGH) - m_low_range * xlist[0]
 delta_x_high_range = xlist[2] - xlist[3]
 delta_y_high_range = (ylist[2] - CAL_REF_RESISTANCE_HIRANGE_HIGH) - (
 ylist[3] - CAL_REF_RESISTANCE_HIRANGE_LOW)
 m_high_range = delta_y_high_range / delta_x_high_range
 b_high_range = \
 (ylist[2] - CAL_REF_RESISTANCE_HIRANGE_HIGH) - m_high_range * xlist[2]
 log_print("correction_factor_calculation() returning")
 return m_low_range, b_low_range, m_high_range, b_high_range
def correction_value(m, b, x):
 log_print("correction_value() called")
 corr_val = (m * x) + b
 log_print("correction_value() returning")
 return corr_val
def calibration():
 global CAL_PASSED, V_BIAS, CATH_MODEL, CONVERSION_READY, \
 ADC_SAMPLE_LATEST, CAL_REF_RESISTANCE_LOWRANGE_LOW, \
 CAL_REF_RESISTANCE_LOWRANGE_HIGH, CAL_REF_RESISTANCE_HIRANGE_LOW, \
 CAL_REF_RESISTANCE_HIRANGE_HIGH, \
 dynamic_m_low_range, dynamic_b_low_range, dynamic_m_high_range, \
 dynamic_b_high_range, CAL_FAIL
 log_print("calibration() called")
 # isr_enable()
 temp_cath_model = CATH_MODEL
 A = [0, 0]
 x_list = []
 y_list = []
 SAMPLES_TO_TAKE = 120
 samples_to_remove_cal = int(SAMPLES_TO_TAKE / 2)
 sample_period = 0.004
 CAL_FAIL = False
 GPIO.output(B_DUT_MUX, GPIO.HIGH)
 cal_resistances = [CAL_REF_RESISTANCE_LOWRANGE_HIGH,
 CAL_REF_RESISTANCE_LOWRANGE_LOW,
 CAL_REF_RESISTANCE_HIRANGE_HIGH,
 CAL_REF_RESISTANCE_HIRANGE_LOW]
 v_bias_cal = []
 recalculate_dynamic_coefficients = False
 log_print('\n\n===INITIATING CALIBRATION===\n')
 for i in range(4):
 voltage_samples = []
 vbias_sample_buffer_cal = []
 GPIO.output(A_DUT_MUX, A[1])
 GPIO.output(CAL_RES_HI_RANGE_MUX, A[0])
 GPIO.output(A_BIAS_MUX, A[0])
 for sample in range(SAMPLES_TO_TAKE):
 configure_adc(SINGLESHOT_VBIAS_CONFIG_REG)
 while not CONVERSION_READY:
 sleep(sample_period)
 CONVERSION_READY = False
 vbias_sample_buffer_cal.append(adc_decode(ADC_SAMPLE_LATEST))
 vbias_sample_buffer_cal = vbias_sample_buffer_cal[
 samples_to_remove_cal:]
 V_BIAS = sum(vbias_sample_buffer_cal) / len(vbias_sample_buffer_cal)
 log_print("V_BIAS = %f" % V_BIAS)
 v_bias_cal.append(V_BIAS)
 CATH_MODEL = A[0]
 sleep(.35) # to allow the switches to settle
 log_print('SAMPLING INTERNAL RESISTANCE...')
 for sample in range(SAMPLES_TO_TAKE):
 configure_adc(SINGLESHOT_CONFIG_REG)
 while not CONVERSION_READY:
 sleep(sample_period)
 CONVERSION_READY = False
 voltage_samples.append(adc_decode(ADC_SAMPLE_LATEST))
 voltage_samples = voltage_samples[samples_to_remove_cal:]
 avg_voltage = round(sum(voltage_samples) / len(voltage_samples), 3)
 x_list.append(avg_voltage)
 cal_res_msrmnt = round(volt2res(avg_voltage), 3)
 y_list.append(cal_res_msrmnt)
 if (abs(cal_res_msrmnt - cal_resistances[i]) /
 cal_resistances[i]) > CAL_REF_RESISTANCE_TOL:
 recalculate_dynamic_coefficients = True
 log_print('avg_voltage:%.3f\navg_res:%.3f' % (
 avg_voltage, cal_res_msrmnt))
 for j in range(len(A) - 1, -1, -1):
 if A[j] == 0:
 A[j] = 1
 break
 A[j] = 0
 if recalculate_dynamic_coefficients:
 dynamic_m_low_range, dynamic_b_low_range, \
 dynamic_m_high_range, dynamic_b_high_range = \
 correction_fctr_calc(x_list, y_list)
 log_print('dynamic_m_low_range, dynamic_b_low_range, '
 'dynamic_m_high_range, dynamic_b_high_range:\n%f %f %f %f' %
 (dynamic_m_low_range, dynamic_b_low_range,
 dynamic_m_high_range, dynamic_b_high_range))
 log_print("DYNAMIC CORRECTION FACTOR CALCULATED!")
 # This condition limits how much the system can correct itself
 # from the initial correction factors before triggering
 # an engineer to recalibrate the system.
 if (abs(dynamic_m_low_range)) > 10 or (
 abs(dynamic_m_low_range) > 10) or (
 abs(dynamic_m_low_range) > 10) or (
 abs(dynamic_m_low_range) > 10):
 CAL_FAIL = True
 A = [0, 0]
 for i in range(4):
 voltage_samples = []
 GPIO.output(A_DUT_MUX, A[1])
 GPIO.output(CAL_RES_HI_RANGE_MUX, A[0])
 GPIO.output(A_BIAS_MUX, A[0])
 V_BIAS = v_bias_cal[i]
 CATH_MODEL = A[0]
 log_print('SAMPLING INTERNAL RESISTANCES '
 'WITH DYNAMIC CORRECTION FACTOR...')
 for sample in range(SAMPLES_TO_TAKE):
 configure_adc(SINGLESHOT_CONFIG_REG)
 while not CONVERSION_READY:
 sleep(sample_period)
 CONVERSION_READY = False
 voltage_samples.append(adc_decode(ADC_SAMPLE_LATEST))
 voltage_samples = voltage_samples[samples_to_remove_cal:]
 avg_voltage = round(sum(voltage_samples) / len(voltage_samples), 3)
 error_magnitude_voltage = round(
 max(voltage_samples) - min(voltage_samples), 3)
 error_plus_minus_voltage = round(error_magnitude_voltage / 2, 3)
 cal_res_msrmnt = round(volt2res(avg_voltage), 3)
 error_magnitude_res = round(
 volt2res(max(voltage_samples)) - volt2res(
 min(voltage_samples)), 3)
 error_plus_minus_res = round(error_magnitude_res / 2, 3)
 cal_resistance_tolerance = round(
 cal_resistances[i] * CAL_REF_RESISTANCE_TOL, 3)
 log_print('avg_voltage:%.3f +/-%.3f\navg_res:%.3f +/-%.3f\n' % (
 avg_voltage, error_plus_minus_voltage,
 cal_res_msrmnt, error_plus_minus_res))
 if (abs(cal_res_msrmnt - cal_resistances[i]) / cal_resistances[i])\
 < CAL_REF_RESISTANCE_TOL:
 log_print("Measured calibrated resistance passed\n")
 else:
 CAL_FAIL = True
 log_print("Measured calibrated resistance out of tolerance\n")
 log_print(
 'Expected Ω: %.3fΩ\nCalculated Ω: %fΩ\n' % (
 cal_resistances[i], cal_res_msrmnt))
 log_print(
 'resistance tolerance: %.3f\n' % cal_resistance_tolerance)
 log_print('resistance error measured: %.3f\n\n' % (
 cal_res_msrmnt - cal_resistances[i]))
 if i == 3 and CAL_FAIL is False:
 CAL_PASSED = True
 GPIO.output(A_DUT_MUX, GPIO.LOW)
 GPIO.output(B_DUT_MUX, GPIO.LOW)
 CATH_MODEL = temp_cath_model
 print('===CALIBRATION DONE===')
 for j in range(len(A) - 1, -1, -1):
 if A[j] == 0:
 A[j] = 1
 break
 A[j] = 0
 else:
 GPIO.output(A_DUT_MUX, GPIO.LOW)
 GPIO.output(B_DUT_MUX, GPIO.LOW)
 CATH_MODEL = temp_cath_model
 CAL_PASSED = True
 log_print('Dynamic coefficients were not recalculated. '
 'Calibration is still stable.')
def read_adc(gpionum):
 global CATH_CONN_EMPTY, RES_SAMPLES_VALS, ACTIVE_SAMPLING, \
 CATH_RES_SAMPLES_COLLECTED, TEST_FINISHED, CATH_MODEL, \
 ADC_SAMPLE_LATEST, CATH_DETECTED_SAMPLES_COLLECTED, \
 CATH_DISCONN_SAMPLES_COLLECTED, end_job, SHOW_LAST_CATH_GUI_MSG, \
 CONVERSION_READY, MEASURE_VBIAS, VBIAS_SAMPLES_BUFFER, V_BIAS, \
 report_filename
 ADC_SAMPLE_LATEST = i2cbus.read_i2c_block_data(I2C_DEV_ADDR,
 REG_CONVERSION_ADDR, 2)
 CONVERSION_READY = True
 avg_voltage = adc_decode(ADC_SAMPLE_LATEST)
 if CATH_CONN_EMPTY and CAL_PASSED and (
 not ACTIVE_SAMPLING) and not EXTERNAL_CAL_INPROGRESS:
 if CATH_DETECTED_SAMPLES_COLLECTED < CATH_DETECTED_SAMPLES_REQUIRED:
 if avg_voltage < CATH_DETECTED_VOLTAGE:
 CATH_DETECTED_SAMPLES_COLLECTED += 1
 else:
 CATH_DETECTED_SAMPLES_COLLECTED = 0
 else:
 configure_measurement()
 MEASURE_VBIAS = True
 CATH_CONN_EMPTY = False
 CATH_DETECTED_SAMPLES_COLLECTED = 0
 log_print("CATHETER DETECTED. STARTING TEST...")
 elif MEASURE_VBIAS:
 if len(VBIAS_SAMPLES_BUFFER) < 100:
 VBIAS_SAMPLES_BUFFER.append(avg_voltage)
 configure_adc(SINGLESHOT_VBIAS_CONFIG_REG)
 # print('collecting vbias samples:',avg_voltage)
 else:
 MEASURE_VBIAS = False
 VBIAS_SAMPLES_BUFFER = VBIAS_SAMPLES_BUFFER[50:]
 V_BIAS = sum(VBIAS_SAMPLES_BUFFER) / len(VBIAS_SAMPLES_BUFFER)
 VBIAS_SAMPLES_BUFFER = []
 ACTIVE_SAMPLING = True
 log_print('VBIAS = %f' % V_BIAS)
 configure_adc(CONTINUOUS_CONFIG_REG)
 elif ACTIVE_SAMPLING:
 if CATH_RES_SAMPLES_COLLECTED < SAMPLES_REQUIRED:
 RES_SAMPLES_VALS.append(avg_voltage)
 CATH_RES_SAMPLES_COLLECTED += 1
 else:
 log_print("Finished logging test samples")
 ACTIVE_SAMPLING = False
 catheter_test()
 if TEST_FINISHED:
 if MEASURE_OUTPUT_RESISTANCE:
 TEST_FINISHED = False
 MEASURE_VBIAS = True
 CATH_RES_SAMPLES_COLLECTED = 0
 RES_SAMPLES_VALS = []
 configure_measurement()
 log_print("Measuring output resistance now...")
 else:
 if CATH_DISCONN_SAMPLES_COLLECTED < CATH_DETECTED_SAMPLES_REQUIRED:
 if avg_voltage > CATH_DETECTED_VOLTAGE:
 CATH_DISCONN_SAMPLES_COLLECTED += 1
 else:
 CATH_DISCONN_SAMPLES_COLLECTED = 0
 else:
 log_print("CATHETER DISCONNECTED. WRITING REPORT...")
 TEST_FINISHED = False
 CATH_CONN_EMPTY = True
 CATH_RES_SAMPLES_COLLECTED = 0
 CATH_DISCONN_SAMPLES_COLLECTED = 0
 RES_SAMPLES_VALS = []
 if catheters_processed == JOB_SIZE:
 report_filename = write_to_report(sort_data())
 end_job = True
def catheter_test():
 global TEST_FINISHED, RES_SAMPLES_VALS, MEASURE_OUTPUT_RESISTANCE, \
 catheters_processed, cath_data_dict_unsorted_buffer, avg_res, \
 GUI_CURRENT_BARCODE, GUI_CURRENT_CATH_RESULT, SHOW_LAST_CATH_GUI_MSG, \
 REPEATED_CATHETER_DETECTED, MANUAL_CATH_BARCODE_CAPTURE, \
 test_result_buffer
 log_print("catheter_test() called")
 RES_SAMPLES_VALS = RES_SAMPLES_VALS[SAMPLES_TO_REMOVE:]
 avg_voltage = sum(RES_SAMPLES_VALS) / len(RES_SAMPLES_VALS)
 unrepeated_cath = False
 repeated_catheter = True
 if MEASURE_OUTPUT_RESISTANCE:
 avg_res[1] = round(volt2res(avg_voltage), 2)
 avg_res_dut = avg_res[1]
 if abs(avg_res_dut - MODEL_RES) < MODEL_TOL:
 test_result_buffer[1] = "PASS"
 else:
 test_result_buffer[1] = "FAIL"
 log_print("Test Result:%s" % test_result_buffer[1])
 else:
 avg_res[0] = round(volt2res(avg_voltage), 2)
 avg_res_dut = avg_res[0]
 if abs(avg_res_dut - MODEL_RES) < MODEL_TOL:
 test_result_buffer[0] = "PASS"
 else:
 test_result_buffer[0] = "FAIL"
 log_print("Test Result:%s" % test_result_buffer[0])
 log_print("Average resistance read:%f" % avg_res_dut)
 if MEASURE_OUTPUT_RESISTANCE:
 barcode, code_val = barcode_scanner()
 if test_result_buffer[0] == "FAIL" or test_result_buffer[1] == "FAIL":
 test_result = "FAIL"
 else:
 test_result = "PASS"
 if bool(barcode) is False:
 # Trigger main thread to change
 # GUI to capture catheter SN manually.
 MANUAL_CATH_BARCODE_CAPTURE = True
 # This loop simply waits for the
 # main thread to do the GUI operation.
 # Reason is, Tkinter does not like working in multiple threads.
 barcode = waiting_for_manual_barcode()
 code_val = validate_catheter_barcode_scan(barcode)
 if barcode.strip() in cath_data_dict_unsorted_buffer:
 current_catheter_data = cath_data_dict_unsorted_buffer[
 barcode.strip()]
 current_catheter_data[4] = repeated_catheter
 REPEATED_CATHETER_DETECTED = True
 log_print("REPEATED CATHETER, UPDATING REPORT FOR %s" % barcode)
 else:
 if (avg_res[0] > RANGE_LIMIT_HIGH) or (
 avg_res[0] < RANGE_LIMIT_LOW):
 avg_res[0] = 9999
 if (avg_res[1] > RANGE_LIMIT_HIGH) or (
 avg_res[1] < RANGE_LIMIT_LOW):
 avg_res[1] = 9999
 cath_data_dict_unsorted_buffer[barcode.strip()] = [test_result,
 avg_res[0],
 avg_res[1],
 code_val,
 unrepeated_cath]
 catheters_processed += 1
 if catheters_processed < JOB_SIZE:
 log_print("\n\n---READY FOR NEXT CATHETER---\n\n")
 elif catheters_processed == JOB_SIZE:
 log_print("LAST CATHETER OF JOB. UNPLUG TO PRINT REPORT.")
 SHOW_LAST_CATH_GUI_MSG = True
 GUI_CURRENT_BARCODE = barcode.strip()
 GUI_CURRENT_CATH_RESULT = test_result
 MEASURE_OUTPUT_RESISTANCE = False
 else:
 MEASURE_OUTPUT_RESISTANCE = True
 TEST_FINISHED = True
def loop():
 global catheters_processed, CATH_MODEL, \
 JOB_SIZE, JOB_NUMBER, CAL_PASSED, \
 end_job, cath_data_dict_unsorted_buffer, \
 MODEL_SELECTED, CURRENT_PROCESS_MESSAGE, \
 SHOW_LAST_CATH_GUI_MSG, REPEATED_CATHETER_DETECTED, \
 KEYPAD_CATH_BARCODE, MANUAL_BARCODE_CAPTURED, \
 MANUAL_CATH_BARCODE_CAPTURE, CATH_RES_SAMPLES_COLLECTED, \
 PLAY_SOUND, PROCESS_MESSENGER_FONT, EXTERNAL_CAL_INPROGRESS, \
 EXTERNAL_CAL_MEASURED_RES_VALS, EXTERNAL_CAL_USER_ENTERED_RES_VALS, \
 dynamic_m_low_range, dynamic_b_low_range, dynamic_m_high_range, \
 dynamic_b_high_range, initial_b_low_range, initial_m_low_range, \
 initial_b_high_range, initial_m_high_range, \
 CAL_REF_RESISTANCE_LOWRANGE_LOW, CAL_REF_RESISTANCE_LOWRANGE_HIGH, \
 CAL_REF_RESISTANCE_HIRANGE_LOW, \
 CAL_REF_RESISTANCE_HIRANGE_HIGH, CAL_FAIL
 first_iteration = True
 jobsize_valid = False
 jobnumber_valid = False
 reset_mouse_position()
 first_job_done = False
 res_measurements = []
 display_cal_check_measurements = False
 x_list = [[], []]
 cal_res_fixture_count = 0
 admin_logon_attempt = False
 notify_user_cath_detected = True
 notify_user_test_result = True
 cath_data_dict_unsorted_buffer = {}
 while True:
 while True:
 event, values = window.read()
 f8_btn1_txt = frame8_button1_text.get_text().strip()
 f8_btn2_txt = frame8_button2_text.get_text().strip()
 if event in (sg.WINDOW_CLOSED, 'Exit'):
 break
 elif event == ADMIN_FUNCS_KEY:
 log_print("USER PULLED UP ADMIN PASSWORD REQUEST PAGE")
 gui_frame_msngr_update(3, ENTER_PW_MESSAGE)
 admin_logon_attempt = True
 elif event == 'RE-PRINT':
 log_print('=USER ATTEMPTED TO RE-PRINT=')
 if first_job_done:
 reset_mouse_position()
 print_report(report_filename)
 else:
 frame8_button1_text.update('BACK')
 frame8_button2_text.update('')
 gui_frame_msngr_update(8, RUN_FIRST_JOB_FIRST)
 elif event == "TERMINATE SCRIPT":
 terminate_script()
 elif event == "REBOOT":
 reboot_system()
 elif event == "SHUTDOWN":
 shutdown_system()
 elif event == 'CALIBRATE':
 CAL_FAIL = False
 log_print("=USER PRESSED 'CALIBRATE'=")
 reset_mouse_position()
 GPIO.output(B_DUT_MUX, GPIO.LOW)
 EXTERNAL_CAL_INPROGRESS = True
 frame8_button1_text.update('CHECK LOW RANGE')
 frame8_button2_text.update('BACK')
 log_print("=INSTRUCTING USER TO MEASURE AND "
 "INSERT CALIBRATION RESISTANCES=")
 gui_frame_msngr_update(8, CAL_INSERT_FIXTURE_MESSAGE)
 elif event == BUTTON_MESSAGE_KEY1 and \
 f8_btn1_txt == 'CHECK LOW RANGE' and \
 not display_cal_check_measurements:
 reset_mouse_position()
 log_print("=USER PRESSED 'CHECK LOW RANGE'=")
 gui_frame_msngr_update(2, CHECK_CAL_MESSAGE)
 res_measurements, temp_hold = measure_single_catheter('LOW')
 window.write_event_value('display_measurements', '')
 elif event == BUTTON_MESSAGE_KEY1 and \
 f8_btn1_txt == 'CHECK HIGH RANGE' and \
 not display_cal_check_measurements:
 log_print("=USER PRESSED 'CHECK HIGH RANGE'=")
 gui_frame_msngr_update(2, CHECK_CAL_MESSAGE)
 res_measurements, temp_hold = measure_single_catheter('HIGH')
 window.write_event_value('display_measurements', '')
 elif event == BUTTON_MESSAGE_KEY1 and \
 f8_btn1_txt == 'APPROVE & EXIT':
 log_print("=USER PRESSED 'APPROVE & EXIT'=")
 reset_mouse_position()
 EXTERNAL_CAL_INPROGRESS = False
 EXTERNAL_CAL_MEASURED_RES_VALS = [[], []]
 EXTERNAL_CAL_USER_ENTERED_RES_VALS = [0, 0, 0, 0]
 x_list = [[], []]
 gui_frame_msngr_update(1)
 elif event == BUTTON_MESSAGE_KEY2 and \
 f8_btn2_txt == 'REDO CAL REF':
 reset_mouse_position()
 initial_b_low_range, initial_m_low_range = 0, 0
 initial_b_high_range, initial_m_high_range = 0, 0
 dynamic_m_low_range, dynamic_b_low_range = 0, 0
 dynamic_m_high_range, dynamic_b_high_range = 0, 0
 cal_res_count = 0
 cal_res_fixture_count = 0
 cal_keypad_messager_window.update(
 CAL_KEYPAD_PROMPT_MESSAGES[cal_res_count])
 gui_frame_msngr_update(7)
 elif event == BUTTON_MESSAGE_KEY1 and \
 f8_btn1_txt == 'CAPTURE REF RESISTANCES':
 gui_frame_msngr_update(2, CAPTURE_CAL_RES_MESSAGE)
 EXTERNAL_CAL_MEASURED_RES_VALS[cal_res_fixture_count], x_list[
 cal_res_fixture_count] = \
 measure_single_catheter(
 'LOW' if cal_res_fixture_count == 0 else 'HIGH')
 cal_res_fixture_count += 1
 if cal_res_fixture_count > 1:
 gui_frame_msngr_update(2, RECALCULATE_REF_RES_MESSAGE)
 ext_cal_measured_vals_f = list(
 chain.from_iterable(EXTERNAL_CAL_MEASURED_RES_VALS))
 x_list_f = list(chain.from_iterable(x_list))
 CAL_REF_RESISTANCE_LOWRANGE_HIGH = \
 EXTERNAL_CAL_USER_ENTERED_RES_VALS[1]
 CAL_REF_RESISTANCE_LOWRANGE_LOW = \
 EXTERNAL_CAL_USER_ENTERED_RES_VALS[0]
 CAL_REF_RESISTANCE_HIRANGE_HIGH = \
 EXTERNAL_CAL_USER_ENTERED_RES_VALS[3]
 CAL_REF_RESISTANCE_HIRANGE_LOW = \
 EXTERNAL_CAL_USER_ENTERED_RES_VALS[2]
 initial_m_low_range, initial_b_low_range, \
 initial_m_high_range, initial_b_high_range = \
 correction_fctr_calc(x_list_f, ext_cal_measured_vals_f)
 GPIO.output(B_DUT_MUX, GPIO.HIGH)
 sleep(.3)
 low_range_recalculated_interal_resistances, temp_hold = \
 measure_single_catheter('LOW')
 GPIO.output(CAL_RES_HI_RANGE_MUX, GPIO.HIGH)
 sleep(.3)
 high_range_recalculated_interal_resistances, temp_hold = \
 measure_single_catheter('HIGH')
 print('%s\n%s\n%s\n%s\n' % (
 initial_m_low_range, initial_b_low_range,
 initial_m_high_range, initial_b_high_range))
 print('%s\n%s\n%s\n%s\n' % (
 low_range_recalculated_interal_resistances[1],
 low_range_recalculated_interal_resistances[0],
 high_range_recalculated_interal_resistances[1],
 high_range_recalculated_interal_resistances[0]
 ))
 with open('initial_correction_factor.txt', 'w') as ff:
 ff.write('%s\n%s\n%s\n%s\n' % (
 initial_m_low_range, initial_b_low_range,
 initial_m_high_range, initial_b_high_range))
 ff.write('%s\n%s\n%s\n%s\n' % (
 low_range_recalculated_interal_resistances[1],
 low_range_recalculated_interal_resistances[0],
 high_range_recalculated_interal_resistances[1],
 high_range_recalculated_interal_resistances[0]
 ))
 CAL_REF_RESISTANCE_LOWRANGE_HIGH = \
 low_range_recalculated_interal_resistances[0]
 CAL_REF_RESISTANCE_LOWRANGE_LOW = \
 low_range_recalculated_interal_resistances[1]
 CAL_REF_RESISTANCE_HIRANGE_HIGH = \
 high_range_recalculated_interal_resistances[0]
 CAL_REF_RESISTANCE_HIRANGE_LOW = \
 high_range_recalculated_interal_resistances[1]
 GPIO.output(CAL_RES_HI_RANGE_MUX, GPIO.LOW)
 GPIO.output(B_DUT_MUX, GPIO.LOW)
 frame8_button1_text.update('CHECK LOW RANGE')
 frame8_button2_text.update('')
 res_measurements = []
 reset_mouse_position()
 gui_frame_msngr_update(8, CAL_RE_INSRT_LOW_RANGE_FIXTR_MSG)
 else:
 reset_mouse_position()
 gui_frame_msngr_update(8, CAL_CURRENT_PROCESS_MESSAGES[
 cal_res_fixture_count])
 elif event == 'display_measurements':
 log_print("Displaying calibration check values...")
 reset_mouse_position()
 if f8_btn1_txt == 'CHECK LOW RANGE':
 frame8_button1_text.update('CHECK HIGH RANGE')
 frame8_button2_text.update('')
 elif f8_btn1_txt == 'CHECK HIGH RANGE':
 frame8_button1_text.update('APPROVE & EXIT')
 frame8_button2_text.update('REDO CAL REF')
 gui_frame_msngr_update(8,
 show_calcheck_results(
 res_measurements, f8_btn1_txt,
 cal_res_fixture_count))
 display_cal_check_measurements = False
 elif event == 'cal_Submit':
 EXTERNAL_CAL_USER_ENTERED_RES_VALS[cal_res_count] = float(
 values['calinput'])
 window['calinput'].update('')
 reset_mouse_position()
 cal_res_count += 1
 if cal_res_count > 3:
 gui_frame_msngr_update(8, CAL_CURRENT_PROCESS_MESSAGES[
 cal_res_fixture_count])
 frame8_button1_text.update('CAPTURE REF RESISTANCES')
 frame8_button2_text.update('')
 else:
 cal_keypad_messager_window.update(
 CAL_KEYPAD_PROMPT_MESSAGES[cal_res_count])
 elif event == 'cal_Clear':
 log_print("=USER PRESSED 'CLEAR' in CAL KEYPAD")
 reset_mouse_position()
 cal_keys_entered = ''
 window['calinput'].update(cal_keys_entered)
 elif event == 'cal_BACK':
 initial_m_low_range = float(lines[0].strip())
 initial_b_low_range = float(lines[1].strip())
 initial_m_high_range = float(lines[2].strip())
 initial_b_high_range = float(lines[3].strip())
 CAL_REF_RESISTANCE_LOWRANGE_LOW = float(lines[4].strip())
 CAL_REF_RESISTANCE_LOWRANGE_HIGH = float(lines[5].strip())
 CAL_REF_RESISTANCE_HIRANGE_LOW = float(lines[6].strip())
 CAL_REF_RESISTANCE_HIRANGE_HIGH = float(lines[7].strip())
 log_print("=USER PRESSED 'BACK' WHEN "
 "IN THE REDO REF CAL KEYPAD=")
 reset_mouse_position()
 gui_frame_msngr_update(1)
 cal_keys_entered = ''
 window['calinput'].update(cal_keys_entered)
 elif 'cal_' in event:
 event_name = event.split('cal_')
 reset_mouse_position()
 cal_keys_entered = values[
 'calinput'] # get what's been entered so far
 cal_keys_entered += event_name[1] # add the new digit
 window['calinput'].update(cal_keys_entered)
 elif (event == BUTTON_MESSAGE_KEY1 and f8_btn1_txt == 'BACK') or (
 event == BUTTON_MESSAGE_KEY2 and f8_btn2_txt == 'BACK'):
 log_print("=USER PRESSED 'BACK' IN CALIBRATE=")
 admin_logon_attempt = False
 reset_mouse_position()
 EXTERNAL_CAL_INPROGRESS = False
 gui_frame_msngr_update(1)
 elif event == KEYPAD_BACK_BTN_KEY:
 if admin_logon_attempt:
 admin_logon_attempt = False
 log_print("=USER PRESSED 'BACK' ON "
 "KEYPAD WHEN PROMPTED FOR ADMIN PW")
 gui_frame_msngr_update(1)
 if JOB_SIZE == 0:
 log_print("=USER PRESSED 'BACK' ON KEYPAD WHEN PROMPTED TO"
 " ENTER JOB SIZE OR WHEN IN "
 "THE REDO REF CAL KEYPAD=")
 gui_frame_msngr_update(1)
 else:
 log_print("=USER PRESSED 'BACK' ON KEYPAD WHEN "
 "PROMPTED FOR JOB NUMBER=")
 JOB_SIZE = 0
 JOB_NUMBER = 0
 window['keypad_message'].update(JOBSIZE_KEYPAD_MESSAGE)
 keys_entered = ''
 window['input'].update(keys_entered)
 reset_mouse_position()
 elif event == 'P16x' or event == 'P330' or event == 'P330B':
 log_print("=USER SELECTED %s AS THE MODEL=" % event)
 # window['keypad_message'].update(JOBSIZE_KEYPAD_MESSAGE)
 MODEL_SELECTED = event
 model_capture()
 gui_frame_msngr_update(3, JOBSIZE_KEYPAD_MESSAGE)
 elif event in '1234567890':
 log_print("=USER PRESSED %s ON THE KEYPAD=" % event)
 reset_mouse_position()
 keys_entered = values[
 'input'] # get what's been entered so far
 keys_entered += event # add the new digit
 window['input'].update(keys_entered)
 elif event == 'Submit':
 reset_mouse_position()
 keys_entered = values['input']
 window['input'].update('')
 if admin_logon_attempt:
 if keys_entered == ADMIN_PASSWORD:
 gui_frame_msngr_update(9, get_ip())
 else:
 window['keypad_message'].update(INVALID_PW_MESSAGE)
 else:
 if JOB_SIZE == 0:
 log_print("=USER PRESSED 'SUBMIT' TO RECORD JOB SIZE=")
 jobsize_valid = jobsize_capture(keys_entered)
 if jobsize_valid:
 log_print("=USER WAS PROMPTED TO "
 "SCAN JOB NUMBER BARCODE=")
 gui_frame_msngr_update(6,
 JOBNUMBER_SCAN_MESSAGE)
 jobnumber_valid = jobnumber_capture()
 if not jobnumber_valid:
 log_print("USER WAS PROMPTED TO "
 "ENTER JOB NUMBER MANUALLY")
 keys_entered = ''
 window['input'].update('')
 gui_frame_msngr_update(3, JOBNUM_KEYPAD_MSG)
 else:
 window['keypad_message'].update(
 JOBSIZE_INVALID_MESSAGE)
 keys_entered = ''
 JOB_SIZE = 0
 if jobsize_valid and jobnumber_valid:
 log_print(
 "VALID JOB SIZE AND JOB NUMBERS WERE RECORDED")
 break
 elif not jobnumber_valid:
 log_print(
 "=USER PRESSED 'SUBMIT' TO RECORD JOB NUMBER=")
 jobnumber_valid = validate_job_number_barcode_scan(
 keys_entered)
 if jobnumber_valid:
 log_print(
 "VALID JOB SIZE AND JOB NUMBERS WERE RECORDED")
 JOB_NUMBER = keys_entered
 break
 else:
 window['keypad_message'].update(
 JOBNUMBER_INVALID_MESSAGE)
 keys_entered = ''
 window['input'].update(keys_entered)
 elif event == 'Clear':
 log_print("=USER PRESSED 'CLEAR'")
 reset_mouse_position()
 keys_entered = ''
 window['input'].update(keys_entered)
 while True:
 if jobsize_valid and jobnumber_valid and \
 not CAL_PASSED and not CAL_FAIL:
 log_print(
 "USER WAS NOTIFIED THAT CALIBRATION WAS BEING PERFORMED")
 gui_frame_msngr_update(2, CAL_PERFORMING_MESSAGE)
 calibration()
 elif CAL_FAIL and not CAL_PASSED:
 log_print("USER WAS NOTIFIED THAT CALIBRATION FAILED")
 dynamic_m_low_range, dynamic_b_low_range, \
 dynamic_m_high_range, dynamic_b_high_range = 0, 0, 0, 0
 frame8_button1_text.update('')
 frame8_button2_text.update('')
 gui_frame_msngr_update(8, CAL_FAIL_MESSAGE)
 sleep(10)
 log_print("Calibration failed. "
 "An engineer will come troubleshoot the system.")
 SHOW_LAST_CATH_GUI_MSG = False
 catheters_processed = 0
 cath_data_dict_unsorted_buffer = {}
 CATH_MODEL = -1
 JOB_SIZE = 0
 JOB_NUMBER = 0
 CATH_RES_SAMPLES_COLLECTED = 0
 CAL_PASSED = False
 first_iteration = True
 end_job = False
 reset_mouse_position()
 gui_frame_msngr_update(1)
 notify_user_cath_detected = True
 notify_user_test_result = True
 break
 elif CAL_PASSED and first_iteration:
 # isr_enable()
 configure_adc(CONTINUOUS_CONFIG_REG)
 first_iteration = False
 log_print("USER WAS PROMPTED TO INSERT "
 "THE FIRST CATHETER OF THE JOB")
 gui_frame_msngr_update(2, FIRST_CATH_MESSAGE)
 elif not CATH_CONN_EMPTY and ACTIVE_SAMPLING and \
 CATH_DETECTED_SAMPLES_COLLECTED == 0 and \
 notify_user_cath_detected:
 notify_user_cath_detected = False
 notify_user_test_result = True
 REPEATED_CATHETER_DETECTED = False # flag reset purposes only
 PLAY_SOUND = True # flag reset purposes only
 log_print("USER WAS NOTIFIED THAT THE CATHETER WAS DETECTED")
 gui_frame_msngr_update(2, CATH_DETECTED_MESSAGE)
 elif MANUAL_CATH_BARCODE_CAPTURE:
 KEYPAD_CATH_BARCODE = manual_catheter_barcode_entry()
 keypad_back_btn.update('BACK')
 MANUAL_BARCODE_CAPTURED = True
 MANUAL_CATH_BARCODE_CAPTURE = False
 elif ((TEST_FINISHED and not MEASURE_OUTPUT_RESISTANCE and
 not SHOW_LAST_CATH_GUI_MSG and not end_job) or
 (catheters_processed == JOB_SIZE and
 SHOW_LAST_CATH_GUI_MSG and
 not end_job)) and notify_user_test_result:
 notify_user_test_result = False
 frame_to_see = 4 if GUI_CURRENT_CATH_RESULT == "PASS" else 5
 log_print(
 "USER WAS NOTIFIED THAT %s TEST RESULT IS: %s" % (
 GUI_CURRENT_BARCODE, GUI_CURRENT_CATH_RESULT))
 gui_frame_msngr_update(frame_to_see, show_results(
 GUI_CURRENT_CATH_RESULT, GUI_CURRENT_BARCODE))
 # resetting flag for next possible catheter
 notify_user_cath_detected = True
 if PLAY_SOUND:
 audio_feedback(GUI_CURRENT_CATH_RESULT)
 pass
 elif catheters_processed == JOB_SIZE and end_job:
 log_print("Job finished. Resetting flags and printing report.")
 SHOW_LAST_CATH_GUI_MSG = False
 catheters_processed = 0
 cath_data_dict_unsorted_buffer = {}
 CATH_MODEL = -1
 JOB_SIZE = 0
 JOB_NUMBER = 0
 CATH_RES_SAMPLES_COLLECTED = 0
 CAL_PASSED = False
 first_iteration = True
 end_job = False
 reset_mouse_position()
 gui_frame_msngr_update(1)
 first_job_done = True
 notify_user_cath_detected = True
 notify_user_test_result = True
 print_report(report_filename)
 break
 sleep(.05)
 window.close()
alrt_rdy_func_enable()
isr_enable()
no_blank_screen()
loop()
Reinderien
71.1k5 gold badges76 silver badges256 bronze badges
asked Jun 5, 2023 at 14:10
\$\endgroup\$
7
  • \$\begingroup\$ That first sys.path.append is suspicious and concerning. Why is it needed? \$\endgroup\$ Commented Jun 5, 2023 at 14:42
  • \$\begingroup\$ I updated python to 3.11 during some troubleshooting. Didn't setup a virtual environment from the get-go, so now I'm stuck referencing a library that was installed in python 3.9 The sys.path.append is admittedly a dirty way of making python reference that stranded library. \$\endgroup\$ Commented Jun 5, 2023 at 14:59
  • \$\begingroup\$ Why not just... Install that package in your venv? \$\endgroup\$ Commented Jun 5, 2023 at 15:04
  • \$\begingroup\$ Your code is way too long and that can deter potential reviewers. I for one lost my interest to review it after scrolling for some time. You need to split your code into separate modules and post each module as a different question. \$\endgroup\$ Commented Jun 5, 2023 at 15:41
  • \$\begingroup\$ @ΞένηΓήινος I disagree. Splitting the code into modules deprives the reviewer of potential context. The current code is not very well modularized in the first place, which is something OP can improve - but does not preclude them from posting the whole program. \$\endgroup\$ Commented Jun 5, 2023 at 16:50

1 Answer 1

2
\$\begingroup\$

First impressions:

  • way too much code
  • not enough comments if any
  • needs refactoring to achieve better separation of concerns

Unfortunately I can't test it, and TBH I don't understand much so I will just focus on style mainly.

There are many different things going on in this program, so I think your priority should be to break it up in small pieces to make it more comprehensible and more maintainable. It's difficult to maintain long code, when you have to scroll a lot and don't have a clear overview of code because it spreads along so many lines.

The first thing that is obvious is that you missed the logging module. I recommend you start using it from now on because it is more flexible and there is no need to reinvent the wheel. I always use it for my projects and I like to write to console and to file at the same time (at different levels eg DEBUG for file and INFO for console), because it's easier to troubleshoot unattended applications when you have a permanent log file available.

But it's already good that you are using some logging in your program because it makes it easier to trace the execution flow.

The shebang should be on the first line:

#!/usr/bin python3

from import * should be avoided, just import what you need. Why: namespace pollution and possibly overriding existing functions, which may cause nasty bugs.

sys.path.append is something one should never do. This may be convenient when writing some test code but there are ways in Python to load libraries dynamically if the need arises. When you see this it usually means the application is poorly structured (files, directories) or the virtualenv is not setup right.

The PDF class does not seem to be used presently, so remove it from your code along with the unused imports. Declutter your file as much as you can. Anything you don't use is unneeded distraction. If you really need PDF generation capabilities, then move your class code to a separate file, and import it accordingly.

Then we have a couple of variables related to sound, which is yet another type of functionality:

FAIL_SOUND = 'fail.mp3'
PASS_SOUND = 'pass.mp3'
PLAY_SOUND = True

At this point, it becomes clear that a separate configuration file should be used to contain those settings. Then they can be customized without touching the code itself, at the risk of inadvertently breaking things. In Python there is the classic configparser module but I prefer YAML files personally. Here you can have a look at some options.

Then comes the meat. Next function names are set_bias_mux_to_low_range, set_bias_mux_to_hi_range, set_dut_mux_to_input_res, set_dut_mux_to_output_res etc. I have no idea what they do. You might want to comment these functions using docstrings. If I reason purely in terms of functionality and without even understanding your code, it seems to me that the functions related to GPIO deserve to be put in a separate file.

Next function: reset_mouse_position. OK, mouse manipulation. I would externalize this part as well. Possibly into a UI class of some sort.

Next: no_blank_screen. I suspect there is feature creep here. This is something that I would rather do outside Python. Possibly in a .bashrc file. Or straight into the Xorg configuration. If you're on Linux, that is. You could also create a launcher for your program. But if you're running a graphical environment, it seems sensible to adapt your power management options, or session preferences if you're concerned about screen lock. In short: this is outside the purview of your application.

Next:

def show_calcheck_results(measurements, button_text, cal_reset):
 log_print("show_calcheck_results([%.3f,%.3f], %s)" % (
 measurements[0], measurements[1], button_text))
 if button_text == 'CHECK LOW RANGE':
 add_string = 'INSERT HIGH-RANGE\n' \
 'RESISTANCE FIXTURE AND PRESS\n' \
 '\'CHECK HIGH RANGE\''

This does not sound right. If you click on a button, it should call a function with appropriate arguments. Not the other way round. You normally don't write a function that checks which button was clicked by looking at the button caption (which may change). Or it's possible I misunderstand your code because I'm not familiar with pyautogui and you're automating key strokes perhaps. I'd need to look more in depth then. But in GUIs such as GTK, QT etc you work with event handlers that you attach to controls such as buttons.

Next: alrt_rdy_func_enable. It was not worth abbreviating the function name. Make it more explicit. You've saved just 3 characters here. Ideally a function name should already give some clue about its purpose. Admittedly I am a noob in this stuff but maybe the function name could be more intuitive? You're handling I2C here by the way. I guess it goes with GPIO stuff then.

I could go on and on but there is misuse of global variables. Use function arguments where appropriate instead. When the number of arguments is unknown or variable you can use **kwargs. But global variables are seldom needed or justified really. And of course they are tricky: when you have global variables that can change value in many code paths, this is going to make debugging difficult. It's something you want to avoid. Variable scope should be limited as much as possible. Function scope is usually sufficient, and arguments can be passed from one function to another.

Some values are undefined for example RGAIN1 at line 327:

transfer_func = RGAIN1 * ((10 * ((voltage - V_BIAS) * tfco + V_BIAS)) - 1)

So I would guess it's defined in hw_init, where you've used star import. Then do: from hw_init import RGAIN1 etc. Again, a config file may be more appropriate, if these are not hard values that will seldom change. My IDE (Pycharm) highlights the variables that are undefined or that cannot be resolved and there are quite a few. Declaring them in your imports would be beneficial because you can directly access their definition. This also reduces the risk of typos.

Coming next: barcode stuff. Unsurprisingly I might suggest a dedicated file for this. It looks like you're doing serial read/write but I don't see the relevant imports. You're saying the application works but maybe not in all code paths.

Let's have a look at one barcode function:

def validate_catheter_barcode_scan(x):
 log_print("validate_catheter_barcode_scan(%s) called" % x)
 try:
 if 5 < len(x) < 10:
 code_validity = True
 else:
 code_validity = False
 except TypeError:
 code_validity = False
 log_print("validate_catheter_barcode_scan() returning")
 return code_validity

Since you're merely returning a boolean value it can be shortened to:

def validate_catheter_barcode_scan(value):
 return 5 < len(value) < 10

I have omitted logging and exception handling. Instead of TypeError I would probably use ValueError. In function barcode_scanner I would just return the raw results, rather than call validate_catheter_barcode_scan within that same function. Because you may want to handle different types of barcodes and apply different forms of validation. Leave room for flexibility.

Coming next: sort_data. The name is too broad to be meaningful. I suppose sort_catheter_data would then be more expressive.

correction_value: what are we correcting and why? No idea. Maybe this is important. But since the function appears to be unused anyway, remove it or stash it somewhere.

When you've done all that, review the remaining functions: calibration, read_adc, catheter_test and loop since they are the biggest chunks of code. A loop shouldn't be that long. There are lots of variables, not always aptly named so this part is a bit overwhelming. Sadly, many variables are just unneeded. They are there merely to solve control flow problems.

In addition to comments, there has to be more line spacing. That would make the code less of a sore to read. Code that is tedious to read and difficult to understand is harder to maintain because it takes more effort.

The other thing that strikes me is the nested ifs - the longer the block, the higher there is a risk of wrong indentation, causing bugs. Error logic are prone to happen here. You can "denest" by using the early return pattern instead: illustration (not in Python) but applies to any language.

I think that denesting should be a priority, because you are inevitably going to add more code, and the code will become even more difficult to comprehend. It will take a lot of scrolling. And even a large screen won't suffice to show the whole block so we can have a block-level overview. As said already, indentation is critical in Python. It's very easy to break the logic when you have big chunks of if blocks, nested with other ifs or loops.

At least you've made some effort to modularize by defining 4 distinct functions that have to run in order to start your app. But you need to modularize more. Use a __main__ guard as well. This is a good habit to follow. One benefit is that you can import your module (or parts of it) without executing it.

answered Jun 5, 2023 at 21:45
\$\endgroup\$

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.