1
\$\begingroup\$

I'm setting up a python mqtt client that is supposed to receive messages of specific topics, process the messages when all incoming messages from the clients are complete, output the new calculated data via http request, request new data via mqtt and start from the beginning.

Are there any issues regarding receiving multiple messages at the same time?

What happens if the control function is executed and the client receives a new message?

How can I implement a timeout that the control function waits for maximum X seconds?

#!/usr/bin/env python
import paho.mqtt.client as mqtt
import time
import socket
import json
import requests
import array
from configparser import SafeConfigParser
def on_connect(client, userdata, flags, rc):
 print("CONNECTED")
 print("Connected with result code: ", str(rc))
 print("subscribing to topics:")
 print(*mqtt_sub_topics)
 client.subscribe(mqtt_sub_topics)
def on_message(client, userdata, message):
 print("MESSAGE")
 print("message received " ,str(message.payload.decode("utf-8")))
 print("message topic=",message.topic)
 print("message qos=",message.qos)
 print("message retain flag=",message.retain)
 received=message.payload.decode("utf-8","ignore")
 received=json.loads(received) #JSON data to python object
 # UPDATING
 print("Updating...")
 index = mqtt_sub_topics.index((message.topic,0)) #search for index of message topic
 for i in range(len(keys)):
 data[index][i] = float(received[keys[i]])
 data[index][7] = 1
 print("Update von Wallbox ",index," => ",data[index])
def main():
 print("WAIT for max: ",control_timeout, "seconds")
 while True:
 # CHECK IF UPDATE OF ALL CLIENTS COMPLETE
 bUpdate = True
 for i in range(NbrClients):
 if data[i][7] == 0: # If data not updated
 bUpdate = False
 #print("Wallbox ",i," not updated")
 break
 if bUpdate == True:
 print("Update of all Clients complete") 
 control()
def control():
 print("CONTROL")
 # Get properties
 NbrActive=NbrMin=NbrLimit=NbrOutdated = 0
 print("Updating Properties")
 for i in range(NbrClients):
 print("Updating properties of client: ",i)
 if data[i][0] != 3 and data[i][7] == 1:
 NbrActive += 1
 if data[i][6] > data[i][1]: 
 data[i][4] = 1 
 NbrLimit += 1
 if sum(data[i][1:3]) < MinCurrent:
 data[i][5] = 1 
 NbrMin += 1
 elif data[i][7] == 0:
 NbrOutdated += 1
 data[i][1] = 32 
 data[i][2] = 32
 data[i][3] = 32
 print("Number active: ",NbrActive)
 print("Number limited: ",NbrLimit)
 print("Number minimum: ",NbrMin)
 print("Number outdated", NbrOutdated)
 #####################################
 #Calculate currents of the 3 phases
 p1,p2,p3 = 0,0,0
 for i in range(NbrClients):
 p1 += data[i][1]
 p2 += data[i][2]
 p3 += data[i][3] 
 print("Strom auf den Phasen: ",p1,", ",p2,", ",p3)
 diff1 = p1 - MaxCurrent
 diff2 = p2 - MaxCurrent
 diff3 = p3 - MaxCurrent
 diff = max(diff1,diff2,diff3)
 if diff < 0:
 bIncrease = True
 print("Current potential available: ",diff," A")
 elif diff == 0:
 bIncrease = True
 print("Current limit reached")
 else:
 bIncrease = False
 print("Current limit exceeded: ",diff," => Decreasing")
 #Calculate Number of stations for current distribution
 if bIncrease == False and data[0][0] != 3: # WB 0 Active
 div = NbrActive - 1 - NbrMin
 elif bIncrease == False and data[0][0] == 3: # WB 0 Inactive 
 div = NbrActive - NbrMin
 elif bIncrease == True and data[0][0] != 3: # WB 0 Active
 div = NbrActive - 1 - NbrLimit
 elif bIncrease == True and data[0][0] == 3: # WB 0 Inactive
 div = NbrActive - NbrLimit
 if div > 0:
 diff = diff / div
 for i in range(NbrClients):
 if data[i][0] == 3:
 data[i][6] == 0
 elif bIncrease == True and data[i][4] == 0:
 data[i][6] = data[i][1] - diff
 elif bIncrease == True and data[i][4] == 1:
 data[i][6] = data[i][1]
 elif bIncrease == False and data[i][5] == 0:
 data[i][6] = data[i][1] - diff
 elif bIncrease == False and data[i][5] == 1:
 data[i][6] = data[i][1]
 else: # no changes possible
 print("Control not active => No changes possible")
 for i in range(NbrClients):
 payload={'current': data[i][6]}
 print("Current target for client: ",i," ",payload)
 #r = requests.get(url_client[i], params=payload)
 #print(r.url)
 data[i][4] = 0 # Reset limit flag
 data[i][5] = 0 # Reset min current flag
 data[i][7] = 0 # Reset update flag
 ##############################################################
 print("Requesting new data: "+mqtt_pub_topic)
 client.publish(mqtt_pub_topic,"request") #Request new data from clients 
print("INIT")
global url_client, keys, MaxCurrent, data, MinCurrent
global control_timeout, NbrClients
global mqtt_pub_topic, mqtt_sub_topics, client
parser = SafeConfigParser()
parser.read('control.ini')
# Creates an array containing data of X(NbrClients) clients
w, NbrClients = 8, int(parser.get('CLIENT', 'NbrClients'))
data = [[0 for x in range(w)] for y in range(NbrClients)] 
for i in range(NbrClients):
 data[i][0] = 3 #Init lademodus to stop 
print(data) url_client = json.loads(parser.get('CLIENT',"url"))
MinCurrent = int(parser.get('CLIENT','MinCurrent'))
MaxCurrent = int(parser.get('CLIENT','MaxCurrent'))
mqtt_broker = parser.get('MQTT', 'mqtt_broker')
mqtt_port = int(parser.get('MQTT','mqtt_port'))
mqtt_client = parser.get('MQTT','mqtt_client')
mqtt_pub_topic = parser.get('MQTT','mqtt_pub_topic')
sub_topics = json.loads(parser.get('MQTT','mqtt_sub_topics'))
mqtt_sub_topics = [(sub_topics[0],0),] # create list with qos = 0
for i in range(NbrClients-1): # Add subtopics to list
 mqtt_sub_topics = mqtt_sub_topics + [(sub_topics[i+1],0),]
control_timeout = float(parser.get('CONTROL','control_timeout'))
#Relevant keys of MQTT messages
keys=['mode','p1','p2','p3']
#######################################################
client = mqtt.Client() #create new instance
client.on_connect = on_connect #attach function to callback
client.on_message = on_message #attach function to callback
print("Connecting to broker")
client.connect(mqtt_broker,mqtt_port) #connect to broker
client.loop_start() #start the loop
main()
asked Feb 7, 2019 at 10:33
\$\endgroup\$
1
  • \$\begingroup\$ Thanks for your advise. I published the complete code \$\endgroup\$ Commented Feb 7, 2019 at 11:27

1 Answer 1

1
\$\begingroup\$

Logging

You have plenty of statements like this

print("CONNECTED")
print("Connected with result code: ", str(rc))
print("subscribing to topics:")

Instead use the logging module,

import logging
logging.basicConfig(filename='example.log',level=logging.DEBUG)
...
logging.debug('Connected with result code: {}".format(rc))

You should use 'str {0}'.format() or even f"{str}" (Python3.6+) to concatenate strings for readability

Magic numbers

I see many instances of so call magic numbers,

for i in range(NbrClients):
 print("Updating properties of client: ",i)
 if data[i][0] != 3 and data[i][7] == 1:
 NbrActive += 1
 if data[i][6] > data[i][1]: 
 data[i][4] = 1 
 NbrLimit += 1
 if sum(data[i][1:3]) < MinCurrent:
 data[i][5] = 1 
 NbrMin += 1
 elif data[i][7] == 0:
 NbrOutdated += 1
 data[i][1] = 32 
 data[i][2] = 32
 data[i][3] = 32

What is the 7th index? or the 3th?

Why 32 in those data[i][x] = 32

Since numbers don't have meaning, you could consider changing them to constant variables -> SOME_VAR = 32 or at least document those in a nice docstring

Globals

 global url_client, keys, MaxCurrent, data, MinCurrent
 global control_timeout, NbrClients
 global mqtt_pub_topic, mqtt_sub_topics, client

Globals are considered bad style, because you lose track of where each variable is called

If you have a lot of functions that require global variables, consider making it a class.

There is more to be improved, but I don't have much time. Hope this helps :)

answered Feb 7, 2019 at 12:08
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.