Newbie here. I want to write a code that can open an SSH Tunnel for a remote server, then read out the data from a PLC and transfers it in JSON format via MQTT. This is my code, but I don't think this is a the clearest and most effective version of it. I want to host it in Azure Function and trigger it once a day with Azure Logic App. How should I improve my code?
# This is the main script file of the PLC Reader of MET3R project.
# It is used to read the data from the PLC and send it to the server.
# Author: name
# License: MIT
# Imported libraries
import time
import json
import subprocess
# Imported third party libraries
import BAC0
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
# Import secrets
from secrets.secrets import broker_address, plc_address, target_mqtt_server
from secrets.secrets import ssh_user, ssh_password, ssh_privatekey, ssh_local_port, ssh_port
# Functions
def create_ssh_tunnel():
print('Creating SSH Tunnel at ' + time.strftime('%Y-%m-%d %H:%M:%S'))
try:
tunnel = subprocess.Popen(
['ssh', '-i', ssh_privatekey, '-N', '-L', f'{ssh_local_port}:{plc_address}:{ssh_local_port}',
f'{ssh_user}@{broker_address}'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if tunnel.returncode is not None and tunnel.returncode != 0:
print(f"Failed to create SSH tunnel: {tunnel.stderr.read().decode()}")
else:
print("SSH tunnel created successfully.")
except Exception as e:
print(f"Error creating SSH tunnel: {e}")
time.sleep(5)
def check_broker_connection():
# Check connection to the MQTT Broker
print('Checking connection to the MQTT Broker...')
try:
# Create a new MQTT client
client = mqtt.Client()
client.tls_set()
client.username_pw_set(ssh_user, ssh_password)
client.connect(broker_address, int(ssh_port), bind_address='127.0.0.1', bind_port=int(ssh_local_port))
# Disconnect from the MQTT Broker
client.disconnect()
# Return True if the connection was successful
return True
except Exception as e:
# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False
def check_plc_connection():
# Check connection to the PLC
try:
# Create a new BAC0 object
bacnet = BAC0.lite()
# Read the value of the object
bacnet.read(plc_address, 'analogValue', 1)
# Return True if the connection was successful
return True
except Exception as e:
# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False
def check_target_mqtt_server_connection():
# Check connection to the target MQTT server
print('Checking connection to the target MQTT server...')
try:
# Create a new MQTT client
client = mqtt.Client()
# Connect to the target MQTT server
client.connect(target_mqtt_server)
# Disconnect from the target MQTT server
client.disconnect()
# Return True if the connection was successful
return True
except Exception as e:
# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False
def connection_checker():
# Check the connections
print('Checking connections...')
# Check the connection to the MQTT Broker
if check_broker_connection():
# Check the connection to the PLC
print('Connection to the MQTT Broker was successful.')
else:
print('Connection to the MQTT Broker was not successful.')
if check_plc_connection():
# Check the connection to the target MQTT server
print('Connection to the PLC was successful.')
else:
print('Connection to the PLC was not successful.')
if check_target_mqtt_server_connection():
# Return True if all the connections were successful
print('Connection to the target MQTT server was successful.')
else:
print('Connection to the target MQTT server was not successful.')
def read_plc_to_json():
# Read the PLC
print('Reading the PLC...')
# Create a new BAC0 object
bacnet = BAC0.lite()
# Read the value of the object
value = bacnet.read(plc_address, 'analogValue', 1)
# Convert the value to a JSON object
json_data = json.dumps({'data': value})
# Return the JSON data
return json_data
# Main function
def main():
print('Starting PLC Reader script at ' + time.strftime('%Y-%m-%d %H:%M:%S'))
# Create the SSH tunnel
create_ssh_tunnel()
# Check the connections
if connection_checker():
print('All connections are successful.')
json_data = read_plc_to_json()
print('Data read from the PLC: ' + str(json_data))
# Publish the data to the MQTT Broker
publish.single('targetServer/plc', json_data, hostname=target_mqtt_server, port=broker_port)
# Run the main script
if __name__ == '__main__':
main()
I tried my best to collect the libraries needed and create a valid solution for my problem.
1 Answer 1
Let's start with the annoying bit. Some of your comments are not helpful at all and appear to be there just for the sake of leaving comments. That's clutter. Clutter distracts:
# Disconnect from the target MQTT server
client.disconnect()
# Return the JSON data
return json_data
# Create the SSH tunnel
create_ssh_tunnel()
We would've gotten what's going on there without the comment just fine.
The one place where I would've liked to see a comment, is missing one:
def create_ssh_tunnel():
...
time.sleep(5)
Why sleep? And why 5 seconds? Is this how long it takes for the tunnel to establish? Was 2 seconds too short? Does the time vary and can't we check whether the tunnel is ready another way? That last bit would be more robust after all. If I'd write this code and had to maintain it a year later, I'd have questions for my past self.
Worse, some of the comments are wrong:
# Read the PLC
print('Reading the PLC...')
No, you're not reading the PLC. You're printing. You arrive at the next comment before doing anything meaningful since this comment. If you intended the comment to be about the function, put it above the function. Or, much better, use a docstring:
def read_plc_to_json():
"""
Reads the Programmable Logic Controller (PLC) and
converts the retrieved data into a JSON object.
This function initializes a BAC0 object,
reads the analog value from the specified PLC address and
converts this value into a JSON string.
The resulting JSON data is returned.
Returns:
str: A JSON string containing the data read from the PLC.
"""
print('Reading the PLC...')
bacnet = BAC0.lite()
value = bacnet.read(plc_address, 'analogValue', 1)
json_data = json.dumps({'data': value})
return json_data
No comments, one verbose docstring. Less distractions and much more professional.
All your error handling are catch-all constructs:
try:
...
except Exception as e:
Why are you catching the exceptions if you're silencing all of them anyway? The rest of your code is going to fail or produce invalid output now. Code in an illegal state can cause the worst bugs.
Catch specific exceptions instead and let the rest fail hard. For example, in your create_ssh_tunnel
function, I imagine you've run into subprocess.CalledProcessError
at least once:
except subprocess.CalledProcessError as e:
print(f"Error creating SSH tunnel: {e}")
One of the benefits of this approach is you can decide what to do after each separate error depending on your usecase. Not being able to create a tunnel in your case is probably going to make the rest of the program pointless. Perhaps you want to clean-up your resources and connections here and take the sys.exit(1)
way out. Perhaps you want to raise
a new exception. Perhaps you want to retry instead. Simplified:
for attempt in range(3):
try:
create_ssh_tunnel()
break
except subprocess.CalledProcessError as e:
print(f"Attempt {attempt + 1}: Error creating SSH tunnel: {e}")
# Network may be busy, try again in a few seconds
time.sleep(5)
Perhaps it's even worth writing a dedicated retry function for and use the same idea for your other try-except blocks. If there's one thing I know from hardware and networks it's that sometimes you'll get a connection failure that you won't be able to reproduce the rest of the day.
Considering you want to run this on Azure, I would recommend taking at least a look at using Azure Key Vault to replace your current secrets handling.
trigger
functions, or once a day. \$\endgroup\$