6
\$\begingroup\$

Newbie here. I want to write a code that can open an SSH Tunnel for a remote server, then read out the data from a PLC and transfers it in JSON format via MQTT. This is my code, but I don't think this is a the clearest and most effective version of it. I want to host it in Azure Function and trigger it once a day with Azure Logic App. How should I improve my code?

# This is the main script file of the PLC Reader of MET3R project.
# It is used to read the data from the PLC and send it to the server.
# Author: name
# License: MIT
# Imported libraries
import time
import json
import subprocess
# Imported third party libraries
import BAC0
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
# Import secrets
from secrets.secrets import broker_address, plc_address, target_mqtt_server
from secrets.secrets import ssh_user, ssh_password, ssh_privatekey, ssh_local_port, ssh_port
# Functions
def create_ssh_tunnel():
 print('Creating SSH Tunnel at ' + time.strftime('%Y-%m-%d %H:%M:%S'))
 try:
 tunnel = subprocess.Popen(
 ['ssh', '-i', ssh_privatekey, '-N', '-L', f'{ssh_local_port}:{plc_address}:{ssh_local_port}',
 f'{ssh_user}@{broker_address}'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 if tunnel.returncode is not None and tunnel.returncode != 0:
 print(f"Failed to create SSH tunnel: {tunnel.stderr.read().decode()}")
 else:
 print("SSH tunnel created successfully.")
 except Exception as e:
 print(f"Error creating SSH tunnel: {e}")
 time.sleep(5)
def check_broker_connection():
 # Check connection to the MQTT Broker
 print('Checking connection to the MQTT Broker...')
 try:
 # Create a new MQTT client
 client = mqtt.Client()
 client.tls_set()
 client.username_pw_set(ssh_user, ssh_password)
 client.connect(broker_address, int(ssh_port), bind_address='127.0.0.1', bind_port=int(ssh_local_port))
 # Disconnect from the MQTT Broker
 client.disconnect()
 # Return True if the connection was successful
 return True
 except Exception as e:
 # Return False if the connection was not successful
 print('The exception was: ' + str(e))
 return False
def check_plc_connection():
 # Check connection to the PLC
 try:
 # Create a new BAC0 object
 bacnet = BAC0.lite()
 # Read the value of the object
 bacnet.read(plc_address, 'analogValue', 1)
 # Return True if the connection was successful
 return True
 except Exception as e:
 # Return False if the connection was not successful
 print('The exception was: ' + str(e))
 return False
def check_target_mqtt_server_connection():
 # Check connection to the target MQTT server
 print('Checking connection to the target MQTT server...')
 try:
 # Create a new MQTT client
 client = mqtt.Client()
 # Connect to the target MQTT server
 client.connect(target_mqtt_server)
 # Disconnect from the target MQTT server
 client.disconnect()
 # Return True if the connection was successful
 return True
 except Exception as e:
 # Return False if the connection was not successful
 print('The exception was: ' + str(e))
 return False
def connection_checker():
 # Check the connections
 print('Checking connections...')
 # Check the connection to the MQTT Broker
 if check_broker_connection():
 # Check the connection to the PLC
 print('Connection to the MQTT Broker was successful.')
 else:
 print('Connection to the MQTT Broker was not successful.')
 if check_plc_connection():
 # Check the connection to the target MQTT server
 print('Connection to the PLC was successful.')
 else:
 print('Connection to the PLC was not successful.')
 if check_target_mqtt_server_connection():
 # Return True if all the connections were successful
 print('Connection to the target MQTT server was successful.')
 else:
 print('Connection to the target MQTT server was not successful.')
def read_plc_to_json():
 # Read the PLC
 print('Reading the PLC...')
 # Create a new BAC0 object
 bacnet = BAC0.lite()
 # Read the value of the object
 value = bacnet.read(plc_address, 'analogValue', 1)
 # Convert the value to a JSON object
 json_data = json.dumps({'data': value})
 # Return the JSON data
 return json_data
# Main function
def main():
 print('Starting PLC Reader script at ' + time.strftime('%Y-%m-%d %H:%M:%S'))
 # Create the SSH tunnel
 create_ssh_tunnel()
 # Check the connections
 if connection_checker():
 print('All connections are successful.')
 json_data = read_plc_to_json()
 print('Data read from the PLC: ' + str(json_data))
 # Publish the data to the MQTT Broker
 publish.single('targetServer/plc', json_data, hostname=target_mqtt_server, port=broker_port)
# Run the main script
if __name__ == '__main__':
 main()

I tried my best to collect the libraries needed and create a valid solution for my problem.

Sᴀᴍ Onᴇᴌᴀ
29.5k16 gold badges45 silver badges201 bronze badges
asked Mar 16, 2023 at 11:51
\$\endgroup\$
1
  • \$\begingroup\$ You need to gather some thoughts around your problem statement. When you say you want this to run as daily once Azure function - it cannot be run as it is. There should be different python program that reads from PLC card and sends message to, ideally Azure Event Hub [not vanilla mqtt server], then you could process this event hub messages - either immediately via trigger functions, or once a day. \$\endgroup\$ Commented Apr 11, 2023 at 14:37

1 Answer 1

4
\$\begingroup\$

Let's start with the annoying bit. Some of your comments are not helpful at all and appear to be there just for the sake of leaving comments. That's clutter. Clutter distracts:

# Disconnect from the target MQTT server
client.disconnect()
# Return the JSON data
return json_data
# Create the SSH tunnel
create_ssh_tunnel()

We would've gotten what's going on there without the comment just fine.

The one place where I would've liked to see a comment, is missing one:

def create_ssh_tunnel():
 ...
 time.sleep(5)

Why sleep? And why 5 seconds? Is this how long it takes for the tunnel to establish? Was 2 seconds too short? Does the time vary and can't we check whether the tunnel is ready another way? That last bit would be more robust after all. If I'd write this code and had to maintain it a year later, I'd have questions for my past self.

Worse, some of the comments are wrong:

# Read the PLC
print('Reading the PLC...')

No, you're not reading the PLC. You're printing. You arrive at the next comment before doing anything meaningful since this comment. If you intended the comment to be about the function, put it above the function. Or, much better, use a docstring:

def read_plc_to_json():
 """
 Reads the Programmable Logic Controller (PLC) and
 converts the retrieved data into a JSON object.
 This function initializes a BAC0 object,
 reads the analog value from the specified PLC address and
 converts this value into a JSON string.
 The resulting JSON data is returned.
 Returns:
 str: A JSON string containing the data read from the PLC.
 """
 print('Reading the PLC...')
 bacnet = BAC0.lite()
 value = bacnet.read(plc_address, 'analogValue', 1)
 json_data = json.dumps({'data': value})
 return json_data

No comments, one verbose docstring. Less distractions and much more professional.

All your error handling are catch-all constructs:

try:
...
except Exception as e:

Why are you catching the exceptions if you're silencing all of them anyway? The rest of your code is going to fail or produce invalid output now. Code in an illegal state can cause the worst bugs.

Catch specific exceptions instead and let the rest fail hard. For example, in your create_ssh_tunnel function, I imagine you've run into subprocess.CalledProcessError at least once:

except subprocess.CalledProcessError as e:
 print(f"Error creating SSH tunnel: {e}")

One of the benefits of this approach is you can decide what to do after each separate error depending on your usecase. Not being able to create a tunnel in your case is probably going to make the rest of the program pointless. Perhaps you want to clean-up your resources and connections here and take the sys.exit(1) way out. Perhaps you want to raise a new exception. Perhaps you want to retry instead. Simplified:

for attempt in range(3):
 try:
 create_ssh_tunnel()
 break
 except subprocess.CalledProcessError as e:
 print(f"Attempt {attempt + 1}: Error creating SSH tunnel: {e}")
 # Network may be busy, try again in a few seconds
 time.sleep(5)

Perhaps it's even worth writing a dedicated retry function for and use the same idea for your other try-except blocks. If there's one thing I know from hardware and networks it's that sometimes you'll get a connection failure that you won't be able to reproduce the rest of the day.

Considering you want to run this on Azure, I would recommend taking at least a look at using Azure Key Vault to replace your current secrets handling.

answered Mar 11 at 19:27
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.