4
\$\begingroup\$

I've created a simple python meter simulator which connects to a serial port (in my case COM1) listens on the serial port and takes actions based on what it receives.

I'm looking for some advice regarding:

  • unnecessary / wrong logic (it looks like I've abused those while loops)
  • code that might be rewritten in a more concise / pythonic way (maybe some methods to avoid the repeating code)
  • I'm not so interested in PEPing this code but any advice regarding this is welcomed.

To understand how the flow actually works:

  • from a third party software (of a real meter) I send a command (let's say read_register)
  • I'm monitoring the raw data that is written on COM1 and I can see that the real meter sends a bytearray which looks like this: ?/35169984\r\n. The raw data that is being read for this specific bytearray is /ELS2\\@V8.22 \r\n. (there might be a longer communication here, but from now on I think it's easy to figure out how the comunication works)
  • after the above communication is done, on the real meter I receive a list of indexes. (in my case - I just have a file that contains the exact same indexes as the real meter sends)

Here are two pictures of:

  • written data:

write data

  • read data

read data

Please ask me if there's anything unclear and I'll try to provide as many details as possible.

import serial
import time
# configure the serial connections
serial_object = serial.Serial(
 port='COM1',
 baudrate=1200,
 parity=serial.PARITY_EVEN,
 stopbits=serial.STOPBITS_ONE,
 bytesize=serial.SEVENBITS
)
serial_object.close()
dict_of_bytes = {
 'null_byte': b'',
 'new_line_byte': b'\n',
 'ack_byte': b'\x06',
 'a': b'a',
 'etx_byte': b'\x03'
}
dict_of_bytearray = {
 'init_command': b'/?35169984!\r\n',
 'second_command': b'\x06021\r\n',
 'third_command': b'\x06020\r\n'
}
dict_of_responses = {
 'init_response': b'/ELS2\\@V8.22 \r\n',
 'second_response': b'\x01P0\x02(35169984)\x03m Soll: m',
 'third_response': b'\x01P1\x02(00000000)\x03a'
}
while True:
 serial_object.open()
 print('\nListening on serial...\n\n')
 received_byte = dict_of_bytes['null_byte']
 received_bytearray = bytearray()
 while received_byte != dict_of_bytes['new_line_byte']:
 received_byte = serial_object.read(1) # this reads byte-by-byte what it comes on COM1
 received_bytearray += received_byte # create the entire bytearray
 if received_byte == dict_of_bytes['new_line_byte']:
 break
 if received_bytearray == dict_of_bytearray['init_command']:
 serial_object.write(dict_of_responses['init_response'])
 received_byte = dict_of_bytes['null_byte']
 received_bytearray = bytearray()
 while received_byte != dict_of_bytes['new_line_byte']:
 received_byte = serial_object.read(1)
 received_bytearray += received_byte
 if received_byte == dict_of_bytes['new_line_byte']:
 break
 if received_bytearray == dict_of_bytearray['second_command']:
 time.sleep(0.5)
 serial_object.write(dict_of_responses['second_response'])
 time.sleep(0.5)
 received_byte = dict_of_bytes['null_byte']
 received_bytearray = bytearray()
 while received_byte != dict_of_bytes['a']:
 received_byte = serial_object.read(1)
 received_bytearray += received_byte
 if received_byte == dict_of_bytes['a']:
 break
 if received_bytearray == dict_of_responses['third_response']:
 time.sleep(0.5)
 serial_object.write(dict_of_bytes['ack_byte'])
 received_byte = dict_of_bytes['null_byte']
 received_bytearray = bytearray()
 while received_byte != dict_of_bytes['etx_byte']:
 received_byte = serial_object.read(1)
 received_bytearray += received_byte
 if received_byte == dict_of_bytes['etx_byte']:
 received_bytearray += serial_object.read()
 break
 else:
 if received_bytearray == dict_of_bytearray['third_command']:
 time.sleep(0.5)
 with open('regs.txt', 'rb') as fin:
 bytes_from_file = fin.read()
 serial_object.write(bytes_from_file)
 print(bytes_from_file.decode('utf-8'))
 print('length: ' + str(len(bytes_from_file)))
 serial_object.close()
asked Jun 14, 2016 at 12:24
\$\endgroup\$
3
  • \$\begingroup\$ I cannot elaborate enough nor am I sure of this enough to post an answer, but a generator is usually used to model input streams in Python. Pseudocode may be like for packet in serial_input: # process packet, to be more explicit than while loops \$\endgroup\$ Commented Jun 14, 2016 at 12:59
  • \$\begingroup\$ Is serial pyserial? \$\endgroup\$ Commented Jun 14, 2016 at 13:35
  • \$\begingroup\$ @JoeWallis yes it is. \$\endgroup\$ Commented Jun 14, 2016 at 13:37

2 Answers 2

3
\$\begingroup\$

Disclaimer: None of this review has been tested. I've just applied some manipulation of your original code.

Making thing more simple

Your dict_of_<something> dictionnaries are constant and always used with literal strings. From my point of view, it'd be much clearer to use constants to achieve the same thing:

import serial
import time
# configure the serial connections
serial_object = serial.Serial(
 port='COM1',
 baudrate=1200,
 parity=serial.PARITY_EVEN,
 stopbits=serial.STOPBITS_ONE,
 bytesize=serial.SEVENBITS
)
serial_object.close()
# Bytes
NULL_BYTE = b''
NEW_LINE_BYTE = b'\n'
ACK_BYTE = b'\x06'
A = b'a'
ETX_BYTE = b'\x03'
# Commands
INIT_COMMAND = b'/?35169984!\r\n'
SECOND_COMMAND = b'\x06021\r\n'
THIRD_COMMAND = b'\x06020\r\n'
# Responses
INIT_REPONSE = b'/ELS2\\@V8.22 \r\n'
SECOND_REPONSE = b'\x01P0\x02(35169984)\x03m Soll: m'
THIRD_REPONSE = b'\x01P1\x02(00000000)\x03a'
while True:
 serial_object.open()
 print('\nListening on serial...\n\n')
 received_byte = NULL_BYTE
 received_bytearray = bytearray()
 while received_byte != NEW_LINE_BYTE:
 received_byte = serial_object.read(1) # this reads byte-by-byte what it comes on COM1
 received_bytearray += received_byte # create the entire bytearray
 if received_byte == NEW_LINE_BYTE:
 break
 if received_bytearray == INIT_COMMAND:
 serial_object.write(INIT_REPONSE)
 received_byte = NULL_BYTE
 received_bytearray = bytearray()
 while received_byte != NEW_LINE_BYTE:
 received_byte = serial_object.read(1)
 received_bytearray += received_byte
 if received_byte == NEW_LINE_BYTE:
 break
 if received_bytearray == SECOND_COMMAND:
 time.sleep(0.5)
 serial_object.write(SECOND_REPONSE)
 time.sleep(0.5)
 received_byte = NULL_BYTE
 received_bytearray = bytearray()
 while received_byte != A:
 received_byte = serial_object.read(1)
 received_bytearray += received_byte
 if received_byte == A:
 break
 if received_bytearray == THIRD_REPONSE:
 time.sleep(0.5)
 serial_object.write(ACK_BYTE)
 received_byte = NULL_BYTE
 received_bytearray = bytearray()
 while received_byte != ETX_BYTE:
 received_byte = serial_object.read(1)
 received_bytearray += received_byte
 if received_byte == ETX_BYTE:
 received_bytearray += serial_object.read()
 break
 else:
 if received_bytearray == THIRD_COMMAND:
 time.sleep(0.5)
 with open('regs.txt', 'rb') as fin:
 bytes_from_file = fin.read()
 serial_object.write(bytes_from_file)
 print(bytes_from_file.decode('utf-8'))
 print('length: ' + str(len(bytes_from_file)))
 serial_object.close()

Don't repeat yourself

There is a piece of code you have in different places. It might be worth extracting it in a function on its own:

def get_bytearray(serial, ending_value):
 received_byte = NULL_BYTE
 received_bytearray = bytearray()
 while received_byte != ending_value:
 received_byte = serial_object.read(1)
 received_bytearray += received_byte
 if received_byte == ending_value:
 break
 return received_bytearray
while True:
 serial_object.open()
 print('\nListening on serial...\n\n')
 received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)
 if received_bytearray == INIT_COMMAND:
 serial_object.write(INIT_REPONSE)
 received_bytearray = get_bytearray(serial_object, NEW_LINE_BYTE)
 if received_bytearray == SECOND_COMMAND:
 time.sleep(0.5)
 serial_object.write(SECOND_REPONSE)
 time.sleep(0.5)
 received_bytearray = get_bytearray(serial_object, A)
 if received_bytearray == THIRD_REPONSE:
 time.sleep(0.5)
 serial_object.write(ACK_BYTE)
 received_bytearray = get_bytearray(serial_object, ETX_BYTE)
 else:
 if received_bytearray == THIRD_COMMAND:
 time.sleep(0.5)
 with open('regs.txt', 'rb') as fin:
 bytes_from_file = fin.read()
 serial_object.write(bytes_from_file)
 print(bytes_from_file.decode('utf-8'))
 print('length: ' + str(len(bytes_from_file)))
 serial_object.close()

Re-write your function

Now that some part of the logic is extracted in a function, it is much easier to think about it and/or to rewrite it.

In your code, I'd simply write :

def get_bytearray(serial, ending_value):
 received_bytearray = bytearray()
 while True:
 received_byte = serial_object.read(1)
 received_bytearray += received_byte
 if received_byte == ending_value:
 return received_bytearray

(Note that is also removes the need for a NULL_BYTE constant). (I might add other comments later on)

answered Jun 14, 2016 at 15:00
\$\endgroup\$
1
  • \$\begingroup\$ thanks for your answer. I'll try your solution tomorrow and let you know if everything is ok. Feel free to edit this with other ideas when you have some free time. \$\endgroup\$ Commented Jun 14, 2016 at 18:02
2
\$\begingroup\$

Since Josay has mentioned all the points that I was going to raise, I'll just say:

serial_object is usable in a with and it makes little sense why you are manually opening and closeing this object. Instead I'd just wrap the while True in a with.

Such as, Note I've not tested this, but I checked the source and Serial has both __enter__ and __exit__:

serial_object = serial.Serial(
 port='COM1',
 baudrate=1200,
 parity=serial.PARITY_EVEN,
 stopbits=serial.STOPBITS_ONE,
 bytesize=serial.SEVENBITS
)
with serial_object:
 while True:
 ...

And then use Josay's points.

answered Jun 14, 2016 at 15:29
\$\endgroup\$
1
  • \$\begingroup\$ Thanks for the tip. It does its job as you said. Cheers. \$\endgroup\$ Commented Jun 15, 2016 at 5:58

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.