I've created a simple python meter simulator which connects to a serial port (in my case COM1
) listens on the serial port and takes actions based on what it receives.
I'm looking for some advice regarding:
- unnecessary / wrong logic (it looks like I've abused those
while
loops) - code that might be rewritten in a more concise / pythonic way (maybe some methods to avoid the repeating code)
- I'm not so interested in
PEP
ing this code but any advice regarding this is welcomed.
To understand how the flow actually works:
- from a third party software (of a real meter) I send a command (let's say
read_register
) - I'm monitoring the raw data that is written on
COM1
and I can see that the real meter sends abytearray
which looks like this:?/35169984\r\n
. The raw data that is being read for this specificbytearray
is/ELS2\\@V8.22 \r\n
. (there might be a longer communication here, but from now on I think it's easy to figure out how the comunication works) - after the above communication is done, on the real meter I receive a list of indexes. (in my case - I just have a file that contains the exact same indexes as the real meter sends)
Here are two pictures of:
- written data:
- read data
Please ask me if there's anything unclear and I'll try to provide as many details as possible.
import serial
import time
# configure the serial connections
serial_object = serial.Serial(
port='COM1',
baudrate=1200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS
)
serial_object.close()
dict_of_bytes = {
'null_byte': b'',
'new_line_byte': b'\n',
'ack_byte': b'\x06',
'a': b'a',
'etx_byte': b'\x03'
}
dict_of_bytearray = {
'init_command': b'/?35169984!\r\n',
'second_command': b'\x06021\r\n',
'third_command': b'\x06020\r\n'
}
dict_of_responses = {
'init_response': b'/ELS2\\@V8.22 \r\n',
'second_response': b'\x01P0\x02(35169984)\x03m Soll: m',
'third_response': b'\x01P1\x02(00000000)\x03a'
}
while True:
serial_object.open()
print('\nListening on serial...\n\n')
received_byte = dict_of_bytes['null_byte']
received_bytearray = bytearray()
while received_byte != dict_of_bytes['new_line_byte']:
received_byte = serial_object.read(1) # this reads byte-by-byte what it comes on COM1
received_bytearray += received_byte # create the entire bytearray
if received_byte == dict_of_bytes['new_line_byte']:
break
if received_bytearray == dict_of_bytearray['init_command']:
serial_object.write(dict_of_responses['init_response'])
received_byte = dict_of_bytes['null_byte']
received_bytearray = bytearray()
while received_byte != dict_of_bytes['new_line_byte']:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == dict_of_bytes['new_line_byte']:
break
if received_bytearray == dict_of_bytearray['second_command']:
time.sleep(0.5)
serial_object.write(dict_of_responses['second_response'])
time.sleep(0.5)
received_byte = dict_of_bytes['null_byte']
received_bytearray = bytearray()
while received_byte != dict_of_bytes['a']:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == dict_of_bytes['a']:
break
if received_bytearray == dict_of_responses['third_response']:
time.sleep(0.5)
serial_object.write(dict_of_bytes['ack_byte'])
received_byte = dict_of_bytes['null_byte']
received_bytearray = bytearray()
while received_byte != dict_of_bytes['etx_byte']:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == dict_of_bytes['etx_byte']:
received_bytearray += serial_object.read()
break
else:
if received_bytearray == dict_of_bytearray['third_command']:
time.sleep(0.5)
with open('regs.txt', 'rb') as fin:
bytes_from_file = fin.read()
serial_object.write(bytes_from_file)
print(bytes_from_file.decode('utf-8'))
print('length: ' + str(len(bytes_from_file)))
serial_object.close()
2 Answers 2
Disclaimer: None of this review has been tested. I've just applied some manipulation of your original code.
Making thing more simple
Your dict_of_<something>
dictionnaries are constant and always used with literal strings. From my point of view, it'd be much clearer to use constants to achieve the same thing:
import serial
import time
# configure the serial connections
serial_object = serial.Serial(
port='COM1',
baudrate=1200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS
)
serial_object.close()
# Bytes
NULL_BYTE = b''
NEW_LINE_BYTE = b'\n'
ACK_BYTE = b'\x06'
A = b'a'
ETX_BYTE = b'\x03'
# Commands
INIT_COMMAND = b'/?35169984!\r\n'
SECOND_COMMAND = b'\x06021\r\n'
THIRD_COMMAND = b'\x06020\r\n'
# Responses
INIT_REPONSE = b'/ELS2\\@V8.22 \r\n'
SECOND_REPONSE = b'\x01P0\x02(35169984)\x03m Soll: m'
THIRD_REPONSE = b'\x01P1\x02(00000000)\x03a'
while True:
serial_object.open()
print('\nListening on serial...\n\n')
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != NEW_LINE_BYTE:
received_byte = serial_object.read(1) # this reads byte-by-byte what it comes on COM1
received_bytearray += received_byte # create the entire bytearray
if received_byte == NEW_LINE_BYTE:
break
if received_bytearray == INIT_COMMAND:
serial_object.write(INIT_REPONSE)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != NEW_LINE_BYTE:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == NEW_LINE_BYTE:
break
if received_bytearray == SECOND_COMMAND:
time.sleep(0.5)
serial_object.write(SECOND_REPONSE)
time.sleep(0.5)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != A:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == A:
break
if received_bytearray == THIRD_REPONSE:
time.sleep(0.5)
serial_object.write(ACK_BYTE)
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != ETX_BYTE:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == ETX_BYTE:
received_bytearray += serial_object.read()
break
else:
if received_bytearray == THIRD_COMMAND:
time.sleep(0.5)
with open('regs.txt', 'rb') as fin:
bytes_from_file = fin.read()
serial_object.write(bytes_from_file)
print(bytes_from_file.decode('utf-8'))
print('length: ' + str(len(bytes_from_file)))
serial_object.close()
Don't repeat yourself
There is a piece of code you have in different places. It might be worth extracting it in a function on its own:
def get_bytearray(serial, ending_value):
received_byte = NULL_BYTE
received_bytearray = bytearray()
while received_byte != ending_value:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == ending_value:
break
return received_bytearray
while True:
serial_object.open()
print('\nListening on serial...\n\n')
received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)
if received_bytearray == INIT_COMMAND:
serial_object.write(INIT_REPONSE)
received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)
if received_bytearray == SECOND_COMMAND:
time.sleep(0.5)
serial_object.write(SECOND_REPONSE)
time.sleep(0.5)
received_bytearray = get_bytearray(serial_object, A)
if received_bytearray == THIRD_REPONSE:
time.sleep(0.5)
serial_object.write(ACK_BYTE)
received_bytearray = get_bytearray(serial_object, ETX_BYTE)
else:
if received_bytearray == THIRD_COMMAND:
time.sleep(0.5)
with open('regs.txt', 'rb') as fin:
bytes_from_file = fin.read()
serial_object.write(bytes_from_file)
print(bytes_from_file.decode('utf-8'))
print('length: ' + str(len(bytes_from_file)))
serial_object.close()
Re-write your function
Now that some part of the logic is extracted in a function, it is much easier to think about it and/or to rewrite it.
In your code, I'd simply write :
def get_bytearray(serial, ending_value):
received_bytearray = bytearray()
while True:
received_byte = serial_object.read(1)
received_bytearray += received_byte
if received_byte == ending_value:
return received_bytearray
(Note that is also removes the need for a NULL_BYTE
constant).
(I might add other comments later on)
-
\$\begingroup\$ thanks for your answer. I'll try your solution tomorrow and let you know if everything is ok. Feel free to edit this with other ideas when you have some free time. \$\endgroup\$Grajdeanu Alex– Grajdeanu Alex2016年06月14日 18:02:23 +00:00Commented Jun 14, 2016 at 18:02
Since Josay has mentioned all the points that I was going to raise, I'll just say:
serial_object
is usable in a with
and it makes little sense why you are manually open
ing and close
ing this object. Instead I'd just wrap the while True
in a with
.
Such as, Note I've not tested this, but I checked the source and Serial
has both __enter__
and __exit__
:
serial_object = serial.Serial(
port='COM1',
baudrate=1200,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.SEVENBITS
)
with serial_object:
while True:
...
And then use Josay's points.
-
\$\begingroup\$ Thanks for the tip. It does its job as you said. Cheers. \$\endgroup\$Grajdeanu Alex– Grajdeanu Alex2016年06月15日 05:58:00 +00:00Commented Jun 15, 2016 at 5:58
for packet in serial_input: # process packet
, to be more explicit thanwhile
loops \$\endgroup\$serial
pyserial? \$\endgroup\$