Hardware: Raspberry Pi 3B+, Elecrow touchscreen, DFR0660 Barcode Scanner, ADS1115 ADC
I would like any bad practices/possible failure points pointed out specifically in the loop(), read_adc(), and catheter_test() functions. The write_to_report() and measure_single_catheter() functions were removed. The system is fully functional.
import sys
from time import sleep
import RPi.GPIO as GPIO
from os import system
from datetime import datetime
from fpdf import FPDF
from fpdf.enums import XPos, YPos
import pyautogui
import pygame
from itertools import chain
import subprocess
from hw_init import *
from cath_test_init import *
from gui_init import *
sys.path.append('/usr/lib/python39.zip')
# !/usr/bin python3
# VERBOSE LOGGING FOR TROUBLESHOOTING
if sys.argv[1] == 'log':
logfile_date = datetime.now().strftime("%m_%d_%Y")
logfile_name = 'logfile' + logfile_date + '.txt'
sys.stdout = open(logfile_name, 'w')
def log_print(string):
logfile_now = datetime.now().strftime("%m_%d_%Y_%H:%M:%S")
print(logfile_now + "\t" + string)
class PDF(FPDF):
# Page footer
def footer(self):
self.set_y(-40)
self.set_font('Times', 'I', 12)
# Page number {nb} comes from alias_nb_page
self.cell(0, 10, 'Model Selected: ' + MODEL_SELECTED +
' Sofware Version:' + SOFTWARE_VERSION,
new_x=XPos.LMARGIN, new_y=YPos.NEXT, align='L')
self.cell(0, 10, 'Page ' + str(self.page_no()) + '/{nb}',
new_x=XPos.RIGHT, new_y=YPos.TOP, align='C')
# AUDIO SETUP
FAIL_SOUND = 'fail.mp3'
PASS_SOUND = 'pass.mp3'
PLAY_SOUND = True
pygame.init()
pygame.mixer.init()
'''Loading and playing sounds in order to load dependencies of library'''
pygame.mixer.music.load(FAIL_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
pygame.mixer.music.load(PASS_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
def get_ip():
ip = subprocess.run(['hostname', '-I'],
stdout=subprocess.PIPE).stdout.decode('utf-8')
return ip
def terminate_script():
window.close()
sys.exit("Terminated at admin request")
def reboot_system():
system('sudo reboot')
def shutdown_system():
system('sudo shutdown -h now')
def set_bias_mux_to_low_range():
log_print("set_bias_mux_to_low_range() called")
GPIO.output(A_BIAS_MUX, GPIO.LOW)
log_print("set_bias_mux_to_low_range() returning")
def set_bias_mux_to_hi_range():
log_print("set_bias_mux_to_hi_range() called")
GPIO.output(A_BIAS_MUX, GPIO.HIGH)
log_print("set_bias_mux_to_hi_range() returning")
def set_dut_mux_to_input_res():
log_print("set_dut_mux_to_input_res() called")
GPIO.output(A_DUT_MUX, GPIO.LOW)
GPIO.output(B_DUT_MUX, GPIO.LOW)
log_print("set_dut_mux_to_input_res() returning")
def set_dut_mux_to_output_res():
log_print("set_dut_mux_to_output_res() called")
GPIO.output(A_DUT_MUX, GPIO.HIGH)
GPIO.output(B_DUT_MUX, GPIO.LOW)
log_print("set_dut_mux_to_output_res() returning")
def reset_mouse_position():
# log_print("reset_mouse_position() called")
pyautogui.moveTo(700, 160)
# log_print("reset_mouse_position() returning")
def no_blank_screen():
log_print("no_blank_screen() called")
cmd_list = ['xset s noblank', 'xset -dpms', 'xset s off']
for command in cmd_list:
system(command)
log_print("no_blank_screen() returning")
def audio_feedback(local_result):
log_print("audio_feedback(%s) called" % local_result)
global PLAY_SOUND
if local_result == 'FAIL':
pygame.mixer.music.load(FAIL_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
elif local_result == 'PASS':
pygame.mixer.music.load(PASS_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
else:
print('result is invalid. Cant play audio')
if PLAY_SOUND:
PLAY_SOUND = False
log_print("audio_feedback(%s) returning" % local_result)
def gui_frame_msngr_update(frame_to_show,
new_current_process_message='No message'):
user_messager_window.update(new_current_process_message)
current_frame_visibility = [frame1.visible, frame2.visible, frame3.visible,
frame4.visible, frame5.visible,
frame6.visible]
frame_update = [False, True, False, False, False, False, False, False,
False]
if frame_to_show == 1:
frame_update = [True, False, False, False, False, False, False, False,
False]
elif frame_to_show == 2:
frame_update = [False, True, False, False, False, False, False, False,
False]
elif frame_to_show == 3:
reset_mouse_position()
frame_update = [False, False, True, False, False, False, False, False,
False]
keypad_message_box.update(new_current_process_message)
elif frame_to_show == 4:
frame_update = [False, False, False, True, False, False, False, False,
False]
pass_test_text_box.update(new_current_process_message)
elif frame_to_show == 5:
frame_update = [False, False, False, False, True, False, False, False,
False]
fail_test_text_box.update(new_current_process_message)
elif frame_to_show == 6:
frame_update = [False, False, False, False, False, True, False, False,
False]
elif frame_to_show == 7:
frame_update = [False, False, False, False, False, False, True, False,
False]
elif frame_to_show == 8:
frame_update = [False, False, False, False, False, False, False, True,
False]
frame8_message_box.update(new_current_process_message)
elif frame_to_show == 9:
frame_update = [False, False, False, False, False, False, False, False,
True]
frame9_ip_add_box.update(new_current_process_message)
if not (current_frame_visibility == frame_update):
frame1.update(visible=frame_update[0])
frame2.update(visible=frame_update[1])
frame3.update(visible=frame_update[2])
frame4.update(visible=frame_update[3])
frame5.update(visible=frame_update[4])
frame6.update(visible=frame_update[5])
frame7.update(visible=frame_update[6])
frame8.update(visible=frame_update[7])
frame9.update(visible=frame_update[8])
window.refresh()
def show_calcheck_results(measurements, button_text, cal_reset):
log_print("show_calcheck_results([%.3f,%.3f], %s)" % (
measurements[0], measurements[1], button_text))
if button_text == 'CHECK LOW RANGE':
add_string = 'INSERT HIGH-RANGE\n' \
'RESISTANCE FIXTURE AND PRESS\n' \
'\'CHECK HIGH RANGE\''
if cal_reset:
res_message = 'UPPER-END RESISTANCE: %.3f\n' \
'LOWER-END RESISTANCE: %.3f\n%s' % \
(round(measurements[0], 3),
round(measurements[1], 3),
add_string)
else:
res_message = 'INPUT RESISTANCE: %.3f\n' \
'OUTPUT RESISTANCE: %.3f\n%s' % \
(round(measurements[0], 3),
round(measurements[1], 3),
add_string)
elif button_text == 'CHECK HIGH RANGE':
add_string = 'PRESS \'APPROVE & EXIT\'\n' \
'OR \'REDO CAL REF\'\n' \
'IF OUT OF TOLERANCE'
if cal_reset:
res_message = 'UPPER-END RESISTANCE: %.3f\n' \
'LOWER-END RESISTANCE: %.3f\n%s' % \
(round(measurements[0], 3),
round(measurements[1], 3),
add_string)
else:
res_message = 'INPUT RESISTANCE: %.3f\n' \
'OUTPUT RESISTANCE: %.3f\n' \
'%s' % \
(round(measurements[0], 3),
round(measurements[1], 3),
add_string)
else:
res_message = 'ERROR IN show_calcheck_results\nCONTACT ENGINEERING'
log_print("show_calcheck_results() returning")
return res_message
def show_results(cath_test_result, barcode):
log_print("show_results(%s, %s) called" % (cath_test_result, barcode))
if SHOW_LAST_CATH_GUI_MSG:
message = 'JOB FINISHED\nUNPLUG CATHETER\nTO PRINT REPORT\n%s: %s' % (
barcode, cath_test_result)
else:
if REPEATED_CATHETER_DETECTED:
message = 'REPEATED CATHETER\nREADY FOR\nNEXT CATHETER\n%s: %s' % (
barcode, cath_test_result)
else:
message = 'READY FOR\nNEXT CATHETER\n%s: %s' % (
barcode, cath_test_result)
log_print("show_results() returning")
return message
def alrt_rdy_func_enable():
log_print("alrt_rdy_func_enable() called")
i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_LOTHRESH_ADDR,
LOTHRESH_CONFIGURATION_DATA)
i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_HITHRESH_ADDR,
HITHRESH_CONFIGURATION_DATA)
log_print("alrt_rdy_func_enable() returning")
def configure_adc(config_reg_data):
i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_CONFIG_ADDR, config_reg_data)
def isr_enable():
log_print("isr_enable() called")
GPIO.add_event_detect(ADS1115_ALRT_RDY_SIG, GPIO.FALLING)
GPIO.add_event_callback(ADS1115_ALRT_RDY_SIG, read_adc)
log_print("isr_enable() returning")
def isr_disable():
log_print("isr_disable() called")
# GPIO.remove_event_detect(ADS1115_ALRT_RDY_SIG)
log_print("isr_disable() returning")
def adc_decode(adc_codes):
msb = adc_codes[0] << 8
lsb = adc_codes[1]
adc_code = msb | lsb
voltage = adc_code * LSB
return voltage
def volt2res(voltage):
log_print("volt2res(%f) called" % voltage)
high_range_correction_factor = \
(initial_m_high_range * voltage) + initial_b_high_range
low_range_correction_factor = \
(initial_m_low_range * voltage) + initial_b_low_range
dynamic_high_range_correction_factor = \
(dynamic_m_high_range * voltage) + dynamic_b_high_range
dynamic_low_range_correction_factor = \
(dynamic_m_low_range * voltage) + dynamic_b_low_range
if MEASURE_OUTPUT_RESISTANCE:
if CATH_MODEL is MODEL_P16x_SEL:
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
tfco = TFCO_LOW
elif CATH_MODEL is MODEL_P330_SEL:
correction_factor = high_range_correction_factor
dynamic_correction_factor = dynamic_high_range_correction_factor
tfco = TFCO_HIGH
elif CATH_MODEL is MODEL_P330B_SEL:
correction_factor = high_range_correction_factor
dynamic_correction_factor = dynamic_high_range_correction_factor
tfco = TFCO_HIGH
else:
tfco = TFCO_LOW
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
print("Something went wrong when selecting the correction "
"factor during output resistance "
"measurement. CATH_MODEL = ", CATH_MODEL)
else:
if CATH_MODEL is MODEL_P16x_SEL:
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
tfco = TFCO_LOW
elif CATH_MODEL is MODEL_P330_SEL:
correction_factor = high_range_correction_factor
dynamic_correction_factor = dynamic_high_range_correction_factor
tfco = TFCO_HIGH
elif CATH_MODEL is MODEL_P330B_SEL:
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
tfco = TFCO_LOW
else:
tfco = TFCO_LOW
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
print("Something went wrong when selecting the correction factor "
"during input resistance measurement. "
"CATH_MODEL = ", CATH_MODEL)
transfer_func = RGAIN1 * ((10 * ((voltage - V_BIAS) * tfco + V_BIAS)) - 1)
r_dut = transfer_func - correction_factor - dynamic_correction_factor
log_print("volt2res() returning")
return r_dut
def configure_measurement():
global MODEL_RES, MODEL_TOL, V_BIAS, RANGE_LIMIT_LOW, RANGE_LIMIT_HIGH
log_print("configure_measurement() called")
if MEASURE_OUTPUT_RESISTANCE:
set_dut_mux_to_output_res()
if CATH_MODEL is MODEL_P16x_SEL:
set_bias_mux_to_low_range()
MODEL_RES = MODEL_P16x_OUTPUT_RES
MODEL_TOL = MODEL_P16x_TOL
V_BIAS = V_BIAS_LOW
RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
elif CATH_MODEL is MODEL_P330_SEL:
set_bias_mux_to_hi_range()
MODEL_RES = MODEL_P330_OUTPUT_RES
MODEL_TOL = MODEL_P330_OUTPUT_TOL
V_BIAS = V_BIAS_HIGH
RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
elif CATH_MODEL is MODEL_P330B_SEL:
set_bias_mux_to_hi_range()
MODEL_RES = MODEL_P330B_OUTPUT_RES
MODEL_TOL = MODEL_P330B_OUTPUT_TOL
V_BIAS = V_BIAS_HIGH
RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
else:
log_print("InvalidModelError:model index "
"value is invalid (model=%d)" % CATH_MODEL)
else:
set_dut_mux_to_input_res()
if CATH_MODEL is MODEL_P16x_SEL:
set_bias_mux_to_low_range()
MODEL_RES = MODEL_P16x_INPUT_RES
MODEL_TOL = MODEL_P16x_TOL
V_BIAS = V_BIAS_LOW
RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
elif CATH_MODEL is MODEL_P330_SEL:
set_bias_mux_to_hi_range()
MODEL_RES = MODEL_P330_INPUT_RES
MODEL_TOL = MODEL_P330_INPUT_TOL
V_BIAS = V_BIAS_HIGH
RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
elif CATH_MODEL is MODEL_P330B_SEL:
set_bias_mux_to_low_range()
MODEL_RES = MODEL_P330B_INPUT_RES
MODEL_TOL = MODEL_P330B_INPUT_TOL
V_BIAS = V_BIAS_LOW
RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
else:
log_print("InvalidModelError:model index "
"value is invalid (model=%d)" % CATH_MODEL)
log_print("configure_measurement() returning")
def jobsize_capture(jobsize_string):
global JOB_SIZE
log_print("jobsize_capture(%s) called" % jobsize_string)
if len(jobsize_string) > 0:
JOB_SIZE = int(jobsize_string)
else:
log_print("NO VALUE ENTERED.")
log_print("jobsize_capture() returning")
return False
if 0 < JOB_SIZE < MAX_CATHETERS_PER_JOB:
log_print("jobsize_capture() returning")
return True
else:
log_print("INVALID JOB SIZE.")
log_print("jobsize_capture() returning")
return False
def validate_job_number_barcode_scan(x):
log_print("validate_job_number_barcode_scan(%s) called" % x.strip())
try:
if len(x) > 0 and (int(x.strip()) > 999 and int(
x.strip()) < 100000000):
code_validity = True
else:
code_validity = False
except ValueError:
log_print("Invalid Job Number")
code_validity = False
return code_validity
log_print("validate_job_number_barcode_scan() returning")
return code_validity
def jobnumber_capture():
global JOB_NUMBER
log_print("jobnumber_capture() called")
for attempts in range(3):
JOB_NUMBER, jobnumber_validity = barcode_scanner()
if len(JOB_NUMBER) > 1:
if validate_job_number_barcode_scan(JOB_NUMBER):
log_print("jobnumber_capture() returning")
return True
else:
log_print("jobnumber_capture() returning")
return False
if not (bool(JOB_NUMBER)):
log_print("jobnumber_capture() returning")
return False
def model_capture():
global CATH_MODEL
log_print("model_capture() called")
if MODEL_SELECTED == 'P16x':
CATH_MODEL = MODEL_P16x_SEL
elif MODEL_SELECTED == 'P330':
CATH_MODEL = MODEL_P330_SEL
elif MODEL_SELECTED == 'P330B':
CATH_MODEL = MODEL_P330B_SEL
else:
log_print("INVALID MODEL SELECTED!")
log_print("model_capture() returning")
def waiting_for_manual_barcode():
global KEYPAD_CATH_BARCODE, MANUAL_BARCODE_CAPTURED
log_print("waiting_for_manual_barcode() called")
while not MANUAL_BARCODE_CAPTURED:
pass
sleep(.1)
MANUAL_BARCODE_CAPTURED = False
local_barcode = KEYPAD_CATH_BARCODE
log_print("waiting_for_manual_barcode() returning")
return local_barcode
def validate_catheter_barcode_scan(x):
log_print("validate_catheter_barcode_scan(%s) called" % x)
try:
if 5 < len(x) < 10:
code_validity = True
else:
code_validity = False
except TypeError:
code_validity = False
log_print("validate_catheter_barcode_scan() returning")
return code_validity
def manual_catheter_barcode_entry():
log_print("manual_catheter_barcode_entry() called")
keypad_back_btn.update('')
gui_frame_msngr_update(3, CATH_KEYPAD_MESSAGE)
while True:
event, values = window.read()
if event in '1234567890':
keys_entered = values['input'] # get what's been entered so far
keys_entered += event # add the new digit
window['input'].update(keys_entered)
elif event == 'Submit':
keys_entered = values['input']
window['input'].update('')
if validate_catheter_barcode_scan(keys_entered):
cath_barcode = keys_entered
break
else:
window['keypad_message'].update(CATH_KEYPAD_INVALID_MESSAGE)
keys_entered = ''
window['input'].update(keys_entered)
elif event == 'Clear': # clear keys if clear button
keys_entered = ''
window['input'].update(keys_entered)
log_print("manual_catheter_barcode_entry() returning")
return cath_barcode
def barcode_scanner():
log_print("barcode_scanner() called")
ser.write(SCNR_TRGR_CMD_BYTES) # Write Hex command to trigger barcode read
x = ser.read_until(
b'\r') # \r is the last character returned by the barcode reader after
# a barcode has been read. This character may change if
# the scanner model changes.
x = x.decode().split("31",
1) # anything to the right of the
# first 31 is the barcode.
# '31' is the last 2-digit code returned by the reader.
# This 2-digit code may have to change (or be removed)
# if the scanner model changes.
code_validity = validate_catheter_barcode_scan(x[1])
log_print("barcode_scanner() returning")
return x[1], code_validity
def print_report(local_report_filename):
log_print("print_report(%s) called" % local_report_filename)
cmd = 'sudo lp ' + local_report_filename
system(cmd)
log_print("print_report() returning")
def sort_data():
log_print("sort_data() called")
keys = list(cath_data_dict_unsorted_buffer.keys())
keys.sort()
catheter_data_dict_sorted = {key: cath_data_dict_unsorted_buffer[key]
for key in keys}
log_print("sort_data() returning")
return catheter_data_dict_sorted
def correction_fctr_calc(xlist, ylist):
log_print("correction_factor_calculation() called")
delta_x_low_range = xlist[0] - xlist[1]
delta_y_low_range = (ylist[0] - CAL_REF_RESISTANCE_LOWRANGE_HIGH) - (
ylist[1] - CAL_REF_RESISTANCE_LOWRANGE_LOW)
m_low_range = delta_y_low_range / delta_x_low_range
b_low_range = \
(ylist[0] - CAL_REF_RESISTANCE_LOWRANGE_HIGH) - m_low_range * xlist[0]
delta_x_high_range = xlist[2] - xlist[3]
delta_y_high_range = (ylist[2] - CAL_REF_RESISTANCE_HIRANGE_HIGH) - (
ylist[3] - CAL_REF_RESISTANCE_HIRANGE_LOW)
m_high_range = delta_y_high_range / delta_x_high_range
b_high_range = \
(ylist[2] - CAL_REF_RESISTANCE_HIRANGE_HIGH) - m_high_range * xlist[2]
log_print("correction_factor_calculation() returning")
return m_low_range, b_low_range, m_high_range, b_high_range
def correction_value(m, b, x):
log_print("correction_value() called")
corr_val = (m * x) + b
log_print("correction_value() returning")
return corr_val
def calibration():
global CAL_PASSED, V_BIAS, CATH_MODEL, CONVERSION_READY, \
ADC_SAMPLE_LATEST, CAL_REF_RESISTANCE_LOWRANGE_LOW, \
CAL_REF_RESISTANCE_LOWRANGE_HIGH, CAL_REF_RESISTANCE_HIRANGE_LOW, \
CAL_REF_RESISTANCE_HIRANGE_HIGH, \
dynamic_m_low_range, dynamic_b_low_range, dynamic_m_high_range, \
dynamic_b_high_range, CAL_FAIL
log_print("calibration() called")
# isr_enable()
temp_cath_model = CATH_MODEL
A = [0, 0]
x_list = []
y_list = []
SAMPLES_TO_TAKE = 120
samples_to_remove_cal = int(SAMPLES_TO_TAKE / 2)
sample_period = 0.004
CAL_FAIL = False
GPIO.output(B_DUT_MUX, GPIO.HIGH)
cal_resistances = [CAL_REF_RESISTANCE_LOWRANGE_HIGH,
CAL_REF_RESISTANCE_LOWRANGE_LOW,
CAL_REF_RESISTANCE_HIRANGE_HIGH,
CAL_REF_RESISTANCE_HIRANGE_LOW]
v_bias_cal = []
recalculate_dynamic_coefficients = False
log_print('\n\n===INITIATING CALIBRATION===\n')
for i in range(4):
voltage_samples = []
vbias_sample_buffer_cal = []
GPIO.output(A_DUT_MUX, A[1])
GPIO.output(CAL_RES_HI_RANGE_MUX, A[0])
GPIO.output(A_BIAS_MUX, A[0])
for sample in range(SAMPLES_TO_TAKE):
configure_adc(SINGLESHOT_VBIAS_CONFIG_REG)
while not CONVERSION_READY:
sleep(sample_period)
CONVERSION_READY = False
vbias_sample_buffer_cal.append(adc_decode(ADC_SAMPLE_LATEST))
vbias_sample_buffer_cal = vbias_sample_buffer_cal[
samples_to_remove_cal:]
V_BIAS = sum(vbias_sample_buffer_cal) / len(vbias_sample_buffer_cal)
log_print("V_BIAS = %f" % V_BIAS)
v_bias_cal.append(V_BIAS)
CATH_MODEL = A[0]
sleep(.35) # to allow the switches to settle
log_print('SAMPLING INTERNAL RESISTANCE...')
for sample in range(SAMPLES_TO_TAKE):
configure_adc(SINGLESHOT_CONFIG_REG)
while not CONVERSION_READY:
sleep(sample_period)
CONVERSION_READY = False
voltage_samples.append(adc_decode(ADC_SAMPLE_LATEST))
voltage_samples = voltage_samples[samples_to_remove_cal:]
avg_voltage = round(sum(voltage_samples) / len(voltage_samples), 3)
x_list.append(avg_voltage)
cal_res_msrmnt = round(volt2res(avg_voltage), 3)
y_list.append(cal_res_msrmnt)
if (abs(cal_res_msrmnt - cal_resistances[i]) /
cal_resistances[i]) > CAL_REF_RESISTANCE_TOL:
recalculate_dynamic_coefficients = True
log_print('avg_voltage:%.3f\navg_res:%.3f' % (
avg_voltage, cal_res_msrmnt))
for j in range(len(A) - 1, -1, -1):
if A[j] == 0:
A[j] = 1
break
A[j] = 0
if recalculate_dynamic_coefficients:
dynamic_m_low_range, dynamic_b_low_range, \
dynamic_m_high_range, dynamic_b_high_range = \
correction_fctr_calc(x_list, y_list)
log_print('dynamic_m_low_range, dynamic_b_low_range, '
'dynamic_m_high_range, dynamic_b_high_range:\n%f %f %f %f' %
(dynamic_m_low_range, dynamic_b_low_range,
dynamic_m_high_range, dynamic_b_high_range))
log_print("DYNAMIC CORRECTION FACTOR CALCULATED!")
# This condition limits how much the system can correct itself
# from the initial correction factors before triggering
# an engineer to recalibrate the system.
if (abs(dynamic_m_low_range)) > 10 or (
abs(dynamic_m_low_range) > 10) or (
abs(dynamic_m_low_range) > 10) or (
abs(dynamic_m_low_range) > 10):
CAL_FAIL = True
A = [0, 0]
for i in range(4):
voltage_samples = []
GPIO.output(A_DUT_MUX, A[1])
GPIO.output(CAL_RES_HI_RANGE_MUX, A[0])
GPIO.output(A_BIAS_MUX, A[0])
V_BIAS = v_bias_cal[i]
CATH_MODEL = A[0]
log_print('SAMPLING INTERNAL RESISTANCES '
'WITH DYNAMIC CORRECTION FACTOR...')
for sample in range(SAMPLES_TO_TAKE):
configure_adc(SINGLESHOT_CONFIG_REG)
while not CONVERSION_READY:
sleep(sample_period)
CONVERSION_READY = False
voltage_samples.append(adc_decode(ADC_SAMPLE_LATEST))
voltage_samples = voltage_samples[samples_to_remove_cal:]
avg_voltage = round(sum(voltage_samples) / len(voltage_samples), 3)
error_magnitude_voltage = round(
max(voltage_samples) - min(voltage_samples), 3)
error_plus_minus_voltage = round(error_magnitude_voltage / 2, 3)
cal_res_msrmnt = round(volt2res(avg_voltage), 3)
error_magnitude_res = round(
volt2res(max(voltage_samples)) - volt2res(
min(voltage_samples)), 3)
error_plus_minus_res = round(error_magnitude_res / 2, 3)
cal_resistance_tolerance = round(
cal_resistances[i] * CAL_REF_RESISTANCE_TOL, 3)
log_print('avg_voltage:%.3f +/-%.3f\navg_res:%.3f +/-%.3f\n' % (
avg_voltage, error_plus_minus_voltage,
cal_res_msrmnt, error_plus_minus_res))
if (abs(cal_res_msrmnt - cal_resistances[i]) / cal_resistances[i])\
< CAL_REF_RESISTANCE_TOL:
log_print("Measured calibrated resistance passed\n")
else:
CAL_FAIL = True
log_print("Measured calibrated resistance out of tolerance\n")
log_print(
'Expected Ω: %.3fΩ\nCalculated Ω: %fΩ\n' % (
cal_resistances[i], cal_res_msrmnt))
log_print(
'resistance tolerance: %.3f\n' % cal_resistance_tolerance)
log_print('resistance error measured: %.3f\n\n' % (
cal_res_msrmnt - cal_resistances[i]))
if i == 3 and CAL_FAIL is False:
CAL_PASSED = True
GPIO.output(A_DUT_MUX, GPIO.LOW)
GPIO.output(B_DUT_MUX, GPIO.LOW)
CATH_MODEL = temp_cath_model
print('===CALIBRATION DONE===')
for j in range(len(A) - 1, -1, -1):
if A[j] == 0:
A[j] = 1
break
A[j] = 0
else:
GPIO.output(A_DUT_MUX, GPIO.LOW)
GPIO.output(B_DUT_MUX, GPIO.LOW)
CATH_MODEL = temp_cath_model
CAL_PASSED = True
log_print('Dynamic coefficients were not recalculated. '
'Calibration is still stable.')
def read_adc(gpionum):
global CATH_CONN_EMPTY, RES_SAMPLES_VALS, ACTIVE_SAMPLING, \
CATH_RES_SAMPLES_COLLECTED, TEST_FINISHED, CATH_MODEL, \
ADC_SAMPLE_LATEST, CATH_DETECTED_SAMPLES_COLLECTED, \
CATH_DISCONN_SAMPLES_COLLECTED, end_job, SHOW_LAST_CATH_GUI_MSG, \
CONVERSION_READY, MEASURE_VBIAS, VBIAS_SAMPLES_BUFFER, V_BIAS, \
report_filename
ADC_SAMPLE_LATEST = i2cbus.read_i2c_block_data(I2C_DEV_ADDR,
REG_CONVERSION_ADDR, 2)
CONVERSION_READY = True
avg_voltage = adc_decode(ADC_SAMPLE_LATEST)
if CATH_CONN_EMPTY and CAL_PASSED and (
not ACTIVE_SAMPLING) and not EXTERNAL_CAL_INPROGRESS:
if CATH_DETECTED_SAMPLES_COLLECTED < CATH_DETECTED_SAMPLES_REQUIRED:
if avg_voltage < CATH_DETECTED_VOLTAGE:
CATH_DETECTED_SAMPLES_COLLECTED += 1
else:
CATH_DETECTED_SAMPLES_COLLECTED = 0
else:
configure_measurement()
MEASURE_VBIAS = True
CATH_CONN_EMPTY = False
CATH_DETECTED_SAMPLES_COLLECTED = 0
log_print("CATHETER DETECTED. STARTING TEST...")
elif MEASURE_VBIAS:
if len(VBIAS_SAMPLES_BUFFER) < 100:
VBIAS_SAMPLES_BUFFER.append(avg_voltage)
configure_adc(SINGLESHOT_VBIAS_CONFIG_REG)
# print('collecting vbias samples:',avg_voltage)
else:
MEASURE_VBIAS = False
VBIAS_SAMPLES_BUFFER = VBIAS_SAMPLES_BUFFER[50:]
V_BIAS = sum(VBIAS_SAMPLES_BUFFER) / len(VBIAS_SAMPLES_BUFFER)
VBIAS_SAMPLES_BUFFER = []
ACTIVE_SAMPLING = True
log_print('VBIAS = %f' % V_BIAS)
configure_adc(CONTINUOUS_CONFIG_REG)
elif ACTIVE_SAMPLING:
if CATH_RES_SAMPLES_COLLECTED < SAMPLES_REQUIRED:
RES_SAMPLES_VALS.append(avg_voltage)
CATH_RES_SAMPLES_COLLECTED += 1
else:
log_print("Finished logging test samples")
ACTIVE_SAMPLING = False
catheter_test()
if TEST_FINISHED:
if MEASURE_OUTPUT_RESISTANCE:
TEST_FINISHED = False
MEASURE_VBIAS = True
CATH_RES_SAMPLES_COLLECTED = 0
RES_SAMPLES_VALS = []
configure_measurement()
log_print("Measuring output resistance now...")
else:
if CATH_DISCONN_SAMPLES_COLLECTED < CATH_DETECTED_SAMPLES_REQUIRED:
if avg_voltage > CATH_DETECTED_VOLTAGE:
CATH_DISCONN_SAMPLES_COLLECTED += 1
else:
CATH_DISCONN_SAMPLES_COLLECTED = 0
else:
log_print("CATHETER DISCONNECTED. WRITING REPORT...")
TEST_FINISHED = False
CATH_CONN_EMPTY = True
CATH_RES_SAMPLES_COLLECTED = 0
CATH_DISCONN_SAMPLES_COLLECTED = 0
RES_SAMPLES_VALS = []
if catheters_processed == JOB_SIZE:
report_filename = write_to_report(sort_data())
end_job = True
def catheter_test():
global TEST_FINISHED, RES_SAMPLES_VALS, MEASURE_OUTPUT_RESISTANCE, \
catheters_processed, cath_data_dict_unsorted_buffer, avg_res, \
GUI_CURRENT_BARCODE, GUI_CURRENT_CATH_RESULT, SHOW_LAST_CATH_GUI_MSG, \
REPEATED_CATHETER_DETECTED, MANUAL_CATH_BARCODE_CAPTURE, \
test_result_buffer
log_print("catheter_test() called")
RES_SAMPLES_VALS = RES_SAMPLES_VALS[SAMPLES_TO_REMOVE:]
avg_voltage = sum(RES_SAMPLES_VALS) / len(RES_SAMPLES_VALS)
unrepeated_cath = False
repeated_catheter = True
if MEASURE_OUTPUT_RESISTANCE:
avg_res[1] = round(volt2res(avg_voltage), 2)
avg_res_dut = avg_res[1]
if abs(avg_res_dut - MODEL_RES) < MODEL_TOL:
test_result_buffer[1] = "PASS"
else:
test_result_buffer[1] = "FAIL"
log_print("Test Result:%s" % test_result_buffer[1])
else:
avg_res[0] = round(volt2res(avg_voltage), 2)
avg_res_dut = avg_res[0]
if abs(avg_res_dut - MODEL_RES) < MODEL_TOL:
test_result_buffer[0] = "PASS"
else:
test_result_buffer[0] = "FAIL"
log_print("Test Result:%s" % test_result_buffer[0])
log_print("Average resistance read:%f" % avg_res_dut)
if MEASURE_OUTPUT_RESISTANCE:
barcode, code_val = barcode_scanner()
if test_result_buffer[0] == "FAIL" or test_result_buffer[1] == "FAIL":
test_result = "FAIL"
else:
test_result = "PASS"
if bool(barcode) is False:
# Trigger main thread to change
# GUI to capture catheter SN manually.
MANUAL_CATH_BARCODE_CAPTURE = True
# This loop simply waits for the
# main thread to do the GUI operation.
# Reason is, Tkinter does not like working in multiple threads.
barcode = waiting_for_manual_barcode()
code_val = validate_catheter_barcode_scan(barcode)
if barcode.strip() in cath_data_dict_unsorted_buffer:
current_catheter_data = cath_data_dict_unsorted_buffer[
barcode.strip()]
current_catheter_data[4] = repeated_catheter
REPEATED_CATHETER_DETECTED = True
log_print("REPEATED CATHETER, UPDATING REPORT FOR %s" % barcode)
else:
if (avg_res[0] > RANGE_LIMIT_HIGH) or (
avg_res[0] < RANGE_LIMIT_LOW):
avg_res[0] = 9999
if (avg_res[1] > RANGE_LIMIT_HIGH) or (
avg_res[1] < RANGE_LIMIT_LOW):
avg_res[1] = 9999
cath_data_dict_unsorted_buffer[barcode.strip()] = [test_result,
avg_res[0],
avg_res[1],
code_val,
unrepeated_cath]
catheters_processed += 1
if catheters_processed < JOB_SIZE:
log_print("\n\n---READY FOR NEXT CATHETER---\n\n")
elif catheters_processed == JOB_SIZE:
log_print("LAST CATHETER OF JOB. UNPLUG TO PRINT REPORT.")
SHOW_LAST_CATH_GUI_MSG = True
GUI_CURRENT_BARCODE = barcode.strip()
GUI_CURRENT_CATH_RESULT = test_result
MEASURE_OUTPUT_RESISTANCE = False
else:
MEASURE_OUTPUT_RESISTANCE = True
TEST_FINISHED = True
def loop():
global catheters_processed, CATH_MODEL, \
JOB_SIZE, JOB_NUMBER, CAL_PASSED, \
end_job, cath_data_dict_unsorted_buffer, \
MODEL_SELECTED, CURRENT_PROCESS_MESSAGE, \
SHOW_LAST_CATH_GUI_MSG, REPEATED_CATHETER_DETECTED, \
KEYPAD_CATH_BARCODE, MANUAL_BARCODE_CAPTURED, \
MANUAL_CATH_BARCODE_CAPTURE, CATH_RES_SAMPLES_COLLECTED, \
PLAY_SOUND, PROCESS_MESSENGER_FONT, EXTERNAL_CAL_INPROGRESS, \
EXTERNAL_CAL_MEASURED_RES_VALS, EXTERNAL_CAL_USER_ENTERED_RES_VALS, \
dynamic_m_low_range, dynamic_b_low_range, dynamic_m_high_range, \
dynamic_b_high_range, initial_b_low_range, initial_m_low_range, \
initial_b_high_range, initial_m_high_range, \
CAL_REF_RESISTANCE_LOWRANGE_LOW, CAL_REF_RESISTANCE_LOWRANGE_HIGH, \
CAL_REF_RESISTANCE_HIRANGE_LOW, \
CAL_REF_RESISTANCE_HIRANGE_HIGH, CAL_FAIL
first_iteration = True
jobsize_valid = False
jobnumber_valid = False
reset_mouse_position()
first_job_done = False
res_measurements = []
display_cal_check_measurements = False
x_list = [[], []]
cal_res_fixture_count = 0
admin_logon_attempt = False
notify_user_cath_detected = True
notify_user_test_result = True
cath_data_dict_unsorted_buffer = {}
while True:
while True:
event, values = window.read()
f8_btn1_txt = frame8_button1_text.get_text().strip()
f8_btn2_txt = frame8_button2_text.get_text().strip()
if event in (sg.WINDOW_CLOSED, 'Exit'):
break
elif event == ADMIN_FUNCS_KEY:
log_print("USER PULLED UP ADMIN PASSWORD REQUEST PAGE")
gui_frame_msngr_update(3, ENTER_PW_MESSAGE)
admin_logon_attempt = True
elif event == 'RE-PRINT':
log_print('=USER ATTEMPTED TO RE-PRINT=')
if first_job_done:
reset_mouse_position()
print_report(report_filename)
else:
frame8_button1_text.update('BACK')
frame8_button2_text.update('')
gui_frame_msngr_update(8, RUN_FIRST_JOB_FIRST)
elif event == "TERMINATE SCRIPT":
terminate_script()
elif event == "REBOOT":
reboot_system()
elif event == "SHUTDOWN":
shutdown_system()
elif event == 'CALIBRATE':
CAL_FAIL = False
log_print("=USER PRESSED 'CALIBRATE'=")
reset_mouse_position()
GPIO.output(B_DUT_MUX, GPIO.LOW)
EXTERNAL_CAL_INPROGRESS = True
frame8_button1_text.update('CHECK LOW RANGE')
frame8_button2_text.update('BACK')
log_print("=INSTRUCTING USER TO MEASURE AND "
"INSERT CALIBRATION RESISTANCES=")
gui_frame_msngr_update(8, CAL_INSERT_FIXTURE_MESSAGE)
elif event == BUTTON_MESSAGE_KEY1 and \
f8_btn1_txt == 'CHECK LOW RANGE' and \
not display_cal_check_measurements:
reset_mouse_position()
log_print("=USER PRESSED 'CHECK LOW RANGE'=")
gui_frame_msngr_update(2, CHECK_CAL_MESSAGE)
res_measurements, temp_hold = measure_single_catheter('LOW')
window.write_event_value('display_measurements', '')
elif event == BUTTON_MESSAGE_KEY1 and \
f8_btn1_txt == 'CHECK HIGH RANGE' and \
not display_cal_check_measurements:
log_print("=USER PRESSED 'CHECK HIGH RANGE'=")
gui_frame_msngr_update(2, CHECK_CAL_MESSAGE)
res_measurements, temp_hold = measure_single_catheter('HIGH')
window.write_event_value('display_measurements', '')
elif event == BUTTON_MESSAGE_KEY1 and \
f8_btn1_txt == 'APPROVE & EXIT':
log_print("=USER PRESSED 'APPROVE & EXIT'=")
reset_mouse_position()
EXTERNAL_CAL_INPROGRESS = False
EXTERNAL_CAL_MEASURED_RES_VALS = [[], []]
EXTERNAL_CAL_USER_ENTERED_RES_VALS = [0, 0, 0, 0]
x_list = [[], []]
gui_frame_msngr_update(1)
elif event == BUTTON_MESSAGE_KEY2 and \
f8_btn2_txt == 'REDO CAL REF':
reset_mouse_position()
initial_b_low_range, initial_m_low_range = 0, 0
initial_b_high_range, initial_m_high_range = 0, 0
dynamic_m_low_range, dynamic_b_low_range = 0, 0
dynamic_m_high_range, dynamic_b_high_range = 0, 0
cal_res_count = 0
cal_res_fixture_count = 0
cal_keypad_messager_window.update(
CAL_KEYPAD_PROMPT_MESSAGES[cal_res_count])
gui_frame_msngr_update(7)
elif event == BUTTON_MESSAGE_KEY1 and \
f8_btn1_txt == 'CAPTURE REF RESISTANCES':
gui_frame_msngr_update(2, CAPTURE_CAL_RES_MESSAGE)
EXTERNAL_CAL_MEASURED_RES_VALS[cal_res_fixture_count], x_list[
cal_res_fixture_count] = \
measure_single_catheter(
'LOW' if cal_res_fixture_count == 0 else 'HIGH')
cal_res_fixture_count += 1
if cal_res_fixture_count > 1:
gui_frame_msngr_update(2, RECALCULATE_REF_RES_MESSAGE)
ext_cal_measured_vals_f = list(
chain.from_iterable(EXTERNAL_CAL_MEASURED_RES_VALS))
x_list_f = list(chain.from_iterable(x_list))
CAL_REF_RESISTANCE_LOWRANGE_HIGH = \
EXTERNAL_CAL_USER_ENTERED_RES_VALS[1]
CAL_REF_RESISTANCE_LOWRANGE_LOW = \
EXTERNAL_CAL_USER_ENTERED_RES_VALS[0]
CAL_REF_RESISTANCE_HIRANGE_HIGH = \
EXTERNAL_CAL_USER_ENTERED_RES_VALS[3]
CAL_REF_RESISTANCE_HIRANGE_LOW = \
EXTERNAL_CAL_USER_ENTERED_RES_VALS[2]
initial_m_low_range, initial_b_low_range, \
initial_m_high_range, initial_b_high_range = \
correction_fctr_calc(x_list_f, ext_cal_measured_vals_f)
GPIO.output(B_DUT_MUX, GPIO.HIGH)
sleep(.3)
low_range_recalculated_interal_resistances, temp_hold = \
measure_single_catheter('LOW')
GPIO.output(CAL_RES_HI_RANGE_MUX, GPIO.HIGH)
sleep(.3)
high_range_recalculated_interal_resistances, temp_hold = \
measure_single_catheter('HIGH')
print('%s\n%s\n%s\n%s\n' % (
initial_m_low_range, initial_b_low_range,
initial_m_high_range, initial_b_high_range))
print('%s\n%s\n%s\n%s\n' % (
low_range_recalculated_interal_resistances[1],
low_range_recalculated_interal_resistances[0],
high_range_recalculated_interal_resistances[1],
high_range_recalculated_interal_resistances[0]
))
with open('initial_correction_factor.txt', 'w') as ff:
ff.write('%s\n%s\n%s\n%s\n' % (
initial_m_low_range, initial_b_low_range,
initial_m_high_range, initial_b_high_range))
ff.write('%s\n%s\n%s\n%s\n' % (
low_range_recalculated_interal_resistances[1],
low_range_recalculated_interal_resistances[0],
high_range_recalculated_interal_resistances[1],
high_range_recalculated_interal_resistances[0]
))
CAL_REF_RESISTANCE_LOWRANGE_HIGH = \
low_range_recalculated_interal_resistances[0]
CAL_REF_RESISTANCE_LOWRANGE_LOW = \
low_range_recalculated_interal_resistances[1]
CAL_REF_RESISTANCE_HIRANGE_HIGH = \
high_range_recalculated_interal_resistances[0]
CAL_REF_RESISTANCE_HIRANGE_LOW = \
high_range_recalculated_interal_resistances[1]
GPIO.output(CAL_RES_HI_RANGE_MUX, GPIO.LOW)
GPIO.output(B_DUT_MUX, GPIO.LOW)
frame8_button1_text.update('CHECK LOW RANGE')
frame8_button2_text.update('')
res_measurements = []
reset_mouse_position()
gui_frame_msngr_update(8, CAL_RE_INSRT_LOW_RANGE_FIXTR_MSG)
else:
reset_mouse_position()
gui_frame_msngr_update(8, CAL_CURRENT_PROCESS_MESSAGES[
cal_res_fixture_count])
elif event == 'display_measurements':
log_print("Displaying calibration check values...")
reset_mouse_position()
if f8_btn1_txt == 'CHECK LOW RANGE':
frame8_button1_text.update('CHECK HIGH RANGE')
frame8_button2_text.update('')
elif f8_btn1_txt == 'CHECK HIGH RANGE':
frame8_button1_text.update('APPROVE & EXIT')
frame8_button2_text.update('REDO CAL REF')
gui_frame_msngr_update(8,
show_calcheck_results(
res_measurements, f8_btn1_txt,
cal_res_fixture_count))
display_cal_check_measurements = False
elif event == 'cal_Submit':
EXTERNAL_CAL_USER_ENTERED_RES_VALS[cal_res_count] = float(
values['calinput'])
window['calinput'].update('')
reset_mouse_position()
cal_res_count += 1
if cal_res_count > 3:
gui_frame_msngr_update(8, CAL_CURRENT_PROCESS_MESSAGES[
cal_res_fixture_count])
frame8_button1_text.update('CAPTURE REF RESISTANCES')
frame8_button2_text.update('')
else:
cal_keypad_messager_window.update(
CAL_KEYPAD_PROMPT_MESSAGES[cal_res_count])
elif event == 'cal_Clear':
log_print("=USER PRESSED 'CLEAR' in CAL KEYPAD")
reset_mouse_position()
cal_keys_entered = ''
window['calinput'].update(cal_keys_entered)
elif event == 'cal_BACK':
initial_m_low_range = float(lines[0].strip())
initial_b_low_range = float(lines[1].strip())
initial_m_high_range = float(lines[2].strip())
initial_b_high_range = float(lines[3].strip())
CAL_REF_RESISTANCE_LOWRANGE_LOW = float(lines[4].strip())
CAL_REF_RESISTANCE_LOWRANGE_HIGH = float(lines[5].strip())
CAL_REF_RESISTANCE_HIRANGE_LOW = float(lines[6].strip())
CAL_REF_RESISTANCE_HIRANGE_HIGH = float(lines[7].strip())
log_print("=USER PRESSED 'BACK' WHEN "
"IN THE REDO REF CAL KEYPAD=")
reset_mouse_position()
gui_frame_msngr_update(1)
cal_keys_entered = ''
window['calinput'].update(cal_keys_entered)
elif 'cal_' in event:
event_name = event.split('cal_')
reset_mouse_position()
cal_keys_entered = values[
'calinput'] # get what's been entered so far
cal_keys_entered += event_name[1] # add the new digit
window['calinput'].update(cal_keys_entered)
elif (event == BUTTON_MESSAGE_KEY1 and f8_btn1_txt == 'BACK') or (
event == BUTTON_MESSAGE_KEY2 and f8_btn2_txt == 'BACK'):
log_print("=USER PRESSED 'BACK' IN CALIBRATE=")
admin_logon_attempt = False
reset_mouse_position()
EXTERNAL_CAL_INPROGRESS = False
gui_frame_msngr_update(1)
elif event == KEYPAD_BACK_BTN_KEY:
if admin_logon_attempt:
admin_logon_attempt = False
log_print("=USER PRESSED 'BACK' ON "
"KEYPAD WHEN PROMPTED FOR ADMIN PW")
gui_frame_msngr_update(1)
if JOB_SIZE == 0:
log_print("=USER PRESSED 'BACK' ON KEYPAD WHEN PROMPTED TO"
" ENTER JOB SIZE OR WHEN IN "
"THE REDO REF CAL KEYPAD=")
gui_frame_msngr_update(1)
else:
log_print("=USER PRESSED 'BACK' ON KEYPAD WHEN "
"PROMPTED FOR JOB NUMBER=")
JOB_SIZE = 0
JOB_NUMBER = 0
window['keypad_message'].update(JOBSIZE_KEYPAD_MESSAGE)
keys_entered = ''
window['input'].update(keys_entered)
reset_mouse_position()
elif event == 'P16x' or event == 'P330' or event == 'P330B':
log_print("=USER SELECTED %s AS THE MODEL=" % event)
# window['keypad_message'].update(JOBSIZE_KEYPAD_MESSAGE)
MODEL_SELECTED = event
model_capture()
gui_frame_msngr_update(3, JOBSIZE_KEYPAD_MESSAGE)
elif event in '1234567890':
log_print("=USER PRESSED %s ON THE KEYPAD=" % event)
reset_mouse_position()
keys_entered = values[
'input'] # get what's been entered so far
keys_entered += event # add the new digit
window['input'].update(keys_entered)
elif event == 'Submit':
reset_mouse_position()
keys_entered = values['input']
window['input'].update('')
if admin_logon_attempt:
if keys_entered == ADMIN_PASSWORD:
gui_frame_msngr_update(9, get_ip())
else:
window['keypad_message'].update(INVALID_PW_MESSAGE)
else:
if JOB_SIZE == 0:
log_print("=USER PRESSED 'SUBMIT' TO RECORD JOB SIZE=")
jobsize_valid = jobsize_capture(keys_entered)
if jobsize_valid:
log_print("=USER WAS PROMPTED TO "
"SCAN JOB NUMBER BARCODE=")
gui_frame_msngr_update(6,
JOBNUMBER_SCAN_MESSAGE)
jobnumber_valid = jobnumber_capture()
if not jobnumber_valid:
log_print("USER WAS PROMPTED TO "
"ENTER JOB NUMBER MANUALLY")
keys_entered = ''
window['input'].update('')
gui_frame_msngr_update(3, JOBNUM_KEYPAD_MSG)
else:
window['keypad_message'].update(
JOBSIZE_INVALID_MESSAGE)
keys_entered = ''
JOB_SIZE = 0
if jobsize_valid and jobnumber_valid:
log_print(
"VALID JOB SIZE AND JOB NUMBERS WERE RECORDED")
break
elif not jobnumber_valid:
log_print(
"=USER PRESSED 'SUBMIT' TO RECORD JOB NUMBER=")
jobnumber_valid = validate_job_number_barcode_scan(
keys_entered)
if jobnumber_valid:
log_print(
"VALID JOB SIZE AND JOB NUMBERS WERE RECORDED")
JOB_NUMBER = keys_entered
break
else:
window['keypad_message'].update(
JOBNUMBER_INVALID_MESSAGE)
keys_entered = ''
window['input'].update(keys_entered)
elif event == 'Clear':
log_print("=USER PRESSED 'CLEAR'")
reset_mouse_position()
keys_entered = ''
window['input'].update(keys_entered)
while True:
if jobsize_valid and jobnumber_valid and \
not CAL_PASSED and not CAL_FAIL:
log_print(
"USER WAS NOTIFIED THAT CALIBRATION WAS BEING PERFORMED")
gui_frame_msngr_update(2, CAL_PERFORMING_MESSAGE)
calibration()
elif CAL_FAIL and not CAL_PASSED:
log_print("USER WAS NOTIFIED THAT CALIBRATION FAILED")
dynamic_m_low_range, dynamic_b_low_range, \
dynamic_m_high_range, dynamic_b_high_range = 0, 0, 0, 0
frame8_button1_text.update('')
frame8_button2_text.update('')
gui_frame_msngr_update(8, CAL_FAIL_MESSAGE)
sleep(10)
log_print("Calibration failed. "
"An engineer will come troubleshoot the system.")
SHOW_LAST_CATH_GUI_MSG = False
catheters_processed = 0
cath_data_dict_unsorted_buffer = {}
CATH_MODEL = -1
JOB_SIZE = 0
JOB_NUMBER = 0
CATH_RES_SAMPLES_COLLECTED = 0
CAL_PASSED = False
first_iteration = True
end_job = False
reset_mouse_position()
gui_frame_msngr_update(1)
notify_user_cath_detected = True
notify_user_test_result = True
break
elif CAL_PASSED and first_iteration:
# isr_enable()
configure_adc(CONTINUOUS_CONFIG_REG)
first_iteration = False
log_print("USER WAS PROMPTED TO INSERT "
"THE FIRST CATHETER OF THE JOB")
gui_frame_msngr_update(2, FIRST_CATH_MESSAGE)
elif not CATH_CONN_EMPTY and ACTIVE_SAMPLING and \
CATH_DETECTED_SAMPLES_COLLECTED == 0 and \
notify_user_cath_detected:
notify_user_cath_detected = False
notify_user_test_result = True
REPEATED_CATHETER_DETECTED = False # flag reset purposes only
PLAY_SOUND = True # flag reset purposes only
log_print("USER WAS NOTIFIED THAT THE CATHETER WAS DETECTED")
gui_frame_msngr_update(2, CATH_DETECTED_MESSAGE)
elif MANUAL_CATH_BARCODE_CAPTURE:
KEYPAD_CATH_BARCODE = manual_catheter_barcode_entry()
keypad_back_btn.update('BACK')
MANUAL_BARCODE_CAPTURED = True
MANUAL_CATH_BARCODE_CAPTURE = False
elif ((TEST_FINISHED and not MEASURE_OUTPUT_RESISTANCE and
not SHOW_LAST_CATH_GUI_MSG and not end_job) or
(catheters_processed == JOB_SIZE and
SHOW_LAST_CATH_GUI_MSG and
not end_job)) and notify_user_test_result:
notify_user_test_result = False
frame_to_see = 4 if GUI_CURRENT_CATH_RESULT == "PASS" else 5
log_print(
"USER WAS NOTIFIED THAT %s TEST RESULT IS: %s" % (
GUI_CURRENT_BARCODE, GUI_CURRENT_CATH_RESULT))
gui_frame_msngr_update(frame_to_see, show_results(
GUI_CURRENT_CATH_RESULT, GUI_CURRENT_BARCODE))
# resetting flag for next possible catheter
notify_user_cath_detected = True
if PLAY_SOUND:
audio_feedback(GUI_CURRENT_CATH_RESULT)
pass
elif catheters_processed == JOB_SIZE and end_job:
log_print("Job finished. Resetting flags and printing report.")
SHOW_LAST_CATH_GUI_MSG = False
catheters_processed = 0
cath_data_dict_unsorted_buffer = {}
CATH_MODEL = -1
JOB_SIZE = 0
JOB_NUMBER = 0
CATH_RES_SAMPLES_COLLECTED = 0
CAL_PASSED = False
first_iteration = True
end_job = False
reset_mouse_position()
gui_frame_msngr_update(1)
first_job_done = True
notify_user_cath_detected = True
notify_user_test_result = True
print_report(report_filename)
break
sleep(.05)
window.close()
alrt_rdy_func_enable()
isr_enable()
no_blank_screen()
loop()
1 Answer 1
First impressions:
- way too much code
- not enough comments if any
- needs refactoring to achieve better separation of concerns
Unfortunately I can't test it, and TBH I don't understand much so I will just focus on style mainly.
There are many different things going on in this program, so I think your priority should be to break it up in small pieces to make it more comprehensible and more maintainable. It's difficult to maintain long code, when you have to scroll a lot and don't have a clear overview of code because it spreads along so many lines.
The first thing that is obvious is that you missed the logging module. I recommend you start using it from now on because it is more flexible and there is no need to reinvent the wheel. I always use it for my projects and I like to write to console and to file at the same time (at different levels eg DEBUG for file and INFO for console), because it's easier to troubleshoot unattended applications when you have a permanent log file available.
But it's already good that you are using some logging in your program because it makes it easier to trace the execution flow.
The shebang should be on the first line:
#!/usr/bin python3
from import * should be avoided, just import what you need. Why: namespace pollution and possibly overriding existing functions, which may cause nasty bugs.
sys.path.append is something one should never do. This may be convenient when writing some test code but there are ways in Python to load libraries dynamically if the need arises. When you see this it usually means the application is poorly structured (files, directories) or the virtualenv is not setup right.
The PDF class does not seem to be used presently, so remove it from your code along with the unused imports. Declutter your file as much as you can. Anything you don't use is unneeded distraction. If you really need PDF generation capabilities, then move your class code to a separate file, and import it accordingly.
Then we have a couple of variables related to sound, which is yet another type of functionality:
FAIL_SOUND = 'fail.mp3'
PASS_SOUND = 'pass.mp3'
PLAY_SOUND = True
At this point, it becomes clear that a separate configuration file should be used to contain those settings. Then they can be customized without touching the code itself, at the risk of inadvertently breaking things. In Python there is the classic configparser module but I prefer YAML files personally. Here you can have a look at some options.
Then comes the meat. Next function names are set_bias_mux_to_low_range, set_bias_mux_to_hi_range, set_dut_mux_to_input_res, set_dut_mux_to_output_res etc. I have no idea what they do. You might want to comment these functions using docstrings. If I reason purely in terms of functionality and without even understanding your code, it seems to me that the functions related to GPIO deserve to be put in a separate file.
Next function: reset_mouse_position. OK, mouse manipulation. I would externalize this part as well. Possibly into a UI class of some sort.
Next: no_blank_screen. I suspect there is feature creep here. This is something that I would rather do outside Python. Possibly in a .bashrc file. Or straight into the Xorg configuration. If you're on Linux, that is. You could also create a launcher for your program. But if you're running a graphical environment, it seems sensible to adapt your power management options, or session preferences if you're concerned about screen lock. In short: this is outside the purview of your application.
Next:
def show_calcheck_results(measurements, button_text, cal_reset):
log_print("show_calcheck_results([%.3f,%.3f], %s)" % (
measurements[0], measurements[1], button_text))
if button_text == 'CHECK LOW RANGE':
add_string = 'INSERT HIGH-RANGE\n' \
'RESISTANCE FIXTURE AND PRESS\n' \
'\'CHECK HIGH RANGE\''
This does not sound right. If you click on a button, it should call a function with appropriate arguments. Not the other way round. You normally don't write a function that checks which button was clicked by looking at the button caption (which may change). Or it's possible I misunderstand your code because I'm not familiar with pyautogui and you're automating key strokes perhaps. I'd need to look more in depth then. But in GUIs such as GTK, QT etc you work with event handlers that you attach to controls such as buttons.
Next: alrt_rdy_func_enable. It was not worth abbreviating the function name. Make it more explicit. You've saved just 3 characters here. Ideally a function name should already give some clue about its purpose. Admittedly I am a noob in this stuff but maybe the function name could be more intuitive? You're handling I2C here by the way. I guess it goes with GPIO stuff then.
I could go on and on but there is misuse of global variables. Use function arguments where appropriate instead. When the number of arguments is unknown or variable you can use **kwargs. But global variables are seldom needed or justified really. And of course they are tricky: when you have global variables that can change value in many code paths, this is going to make debugging difficult. It's something you want to avoid. Variable scope should be limited as much as possible. Function scope is usually sufficient, and arguments can be passed from one function to another.
Some values are undefined for example RGAIN1 at line 327:
transfer_func = RGAIN1 * ((10 * ((voltage - V_BIAS) * tfco + V_BIAS)) - 1)
So I would guess it's defined in hw_init, where you've used star import.
Then do: from hw_init import RGAIN1 etc. Again, a config file may be more appropriate, if these are not hard values that will seldom change. My IDE (Pycharm) highlights the variables that are undefined or that cannot be resolved and there are quite a few. Declaring them in your imports would be beneficial because you can directly access their definition. This also reduces the risk of typos.
Coming next: barcode stuff. Unsurprisingly I might suggest a dedicated file for this. It looks like you're doing serial read/write but I don't see the relevant imports. You're saying the application works but maybe not in all code paths.
Let's have a look at one barcode function:
def validate_catheter_barcode_scan(x):
log_print("validate_catheter_barcode_scan(%s) called" % x)
try:
if 5 < len(x) < 10:
code_validity = True
else:
code_validity = False
except TypeError:
code_validity = False
log_print("validate_catheter_barcode_scan() returning")
return code_validity
Since you're merely returning a boolean value it can be shortened to:
def validate_catheter_barcode_scan(value):
return 5 < len(value) < 10
I have omitted logging and exception handling. Instead of TypeError I would probably use ValueError. In function barcode_scanner I would just return the raw results, rather than call validate_catheter_barcode_scan within that same function. Because you may want to handle different types of barcodes and apply different forms of validation. Leave room for flexibility.
Coming next: sort_data. The name is too broad to be meaningful. I suppose sort_catheter_data would then be more expressive.
correction_value: what are we correcting and why? No idea. Maybe this is important. But since the function appears to be unused anyway, remove it or stash it somewhere.
When you've done all that, review the remaining functions: calibration, read_adc, catheter_test and loop since they are the biggest chunks of code. A loop shouldn't be that long. There are lots of variables, not always aptly named so this part is a bit overwhelming. Sadly, many variables are just unneeded. They are there merely to solve control flow problems.
In addition to comments, there has to be more line spacing. That would make the code less of a sore to read. Code that is tedious to read and difficult to understand is harder to maintain because it takes more effort.
The other thing that strikes me is the nested ifs - the longer the block, the higher there is a risk of wrong indentation, causing bugs. Error logic are prone to happen here. You can "denest" by using the early return pattern instead: illustration (not in Python) but applies to any language.
I think that denesting should be a priority, because you are inevitably going to add more code, and the code will become even more difficult to comprehend. It will take a lot of scrolling. And even a large screen won't suffice to show the whole block so we can have a block-level overview. As said already, indentation is critical in Python. It's very easy to break the logic when you have big chunks of if blocks, nested with other ifs or loops.
At least you've made some effort to modularize by defining 4 distinct functions that have to run in order to start your app. But you need to modularize more.
Use a __main__ guard as well. This is a good habit to follow. One benefit is that you can import your module (or parts of it) without executing it.
sys.path.appendis suspicious and concerning. Why is it needed? \$\endgroup\$