I'm setting up a python mqtt client that is supposed to receive messages of specific topics, process the messages when all incoming messages from the clients are complete, output the new calculated data via http request, request new data via mqtt and start from the beginning.
Are there any issues regarding receiving multiple messages at the same time?
What happens if the control function is executed and the client receives a new message?
How can I implement a timeout that the control function waits for maximum X seconds?
#!/usr/bin/env python
import paho.mqtt.client as mqtt
import time
import socket
import json
import requests
import array
from configparser import SafeConfigParser
def on_connect(client, userdata, flags, rc):
print("CONNECTED")
print("Connected with result code: ", str(rc))
print("subscribing to topics:")
print(*mqtt_sub_topics)
client.subscribe(mqtt_sub_topics)
def on_message(client, userdata, message):
print("MESSAGE")
print("message received " ,str(message.payload.decode("utf-8")))
print("message topic=",message.topic)
print("message qos=",message.qos)
print("message retain flag=",message.retain)
received=message.payload.decode("utf-8","ignore")
received=json.loads(received) #JSON data to python object
# UPDATING
print("Updating...")
index = mqtt_sub_topics.index((message.topic,0)) #search for index of message topic
for i in range(len(keys)):
data[index][i] = float(received[keys[i]])
data[index][7] = 1
print("Update von Wallbox ",index," => ",data[index])
def main():
print("WAIT for max: ",control_timeout, "seconds")
while True:
# CHECK IF UPDATE OF ALL CLIENTS COMPLETE
bUpdate = True
for i in range(NbrClients):
if data[i][7] == 0: # If data not updated
bUpdate = False
#print("Wallbox ",i," not updated")
break
if bUpdate == True:
print("Update of all Clients complete")
control()
def control():
print("CONTROL")
# Get properties
NbrActive=NbrMin=NbrLimit=NbrOutdated = 0
print("Updating Properties")
for i in range(NbrClients):
print("Updating properties of client: ",i)
if data[i][0] != 3 and data[i][7] == 1:
NbrActive += 1
if data[i][6] > data[i][1]:
data[i][4] = 1
NbrLimit += 1
if sum(data[i][1:3]) < MinCurrent:
data[i][5] = 1
NbrMin += 1
elif data[i][7] == 0:
NbrOutdated += 1
data[i][1] = 32
data[i][2] = 32
data[i][3] = 32
print("Number active: ",NbrActive)
print("Number limited: ",NbrLimit)
print("Number minimum: ",NbrMin)
print("Number outdated", NbrOutdated)
#####################################
#Calculate currents of the 3 phases
p1,p2,p3 = 0,0,0
for i in range(NbrClients):
p1 += data[i][1]
p2 += data[i][2]
p3 += data[i][3]
print("Strom auf den Phasen: ",p1,", ",p2,", ",p3)
diff1 = p1 - MaxCurrent
diff2 = p2 - MaxCurrent
diff3 = p3 - MaxCurrent
diff = max(diff1,diff2,diff3)
if diff < 0:
bIncrease = True
print("Current potential available: ",diff," A")
elif diff == 0:
bIncrease = True
print("Current limit reached")
else:
bIncrease = False
print("Current limit exceeded: ",diff," => Decreasing")
#Calculate Number of stations for current distribution
if bIncrease == False and data[0][0] != 3: # WB 0 Active
div = NbrActive - 1 - NbrMin
elif bIncrease == False and data[0][0] == 3: # WB 0 Inactive
div = NbrActive - NbrMin
elif bIncrease == True and data[0][0] != 3: # WB 0 Active
div = NbrActive - 1 - NbrLimit
elif bIncrease == True and data[0][0] == 3: # WB 0 Inactive
div = NbrActive - NbrLimit
if div > 0:
diff = diff / div
for i in range(NbrClients):
if data[i][0] == 3:
data[i][6] == 0
elif bIncrease == True and data[i][4] == 0:
data[i][6] = data[i][1] - diff
elif bIncrease == True and data[i][4] == 1:
data[i][6] = data[i][1]
elif bIncrease == False and data[i][5] == 0:
data[i][6] = data[i][1] - diff
elif bIncrease == False and data[i][5] == 1:
data[i][6] = data[i][1]
else: # no changes possible
print("Control not active => No changes possible")
for i in range(NbrClients):
payload={'current': data[i][6]}
print("Current target for client: ",i," ",payload)
#r = requests.get(url_client[i], params=payload)
#print(r.url)
data[i][4] = 0 # Reset limit flag
data[i][5] = 0 # Reset min current flag
data[i][7] = 0 # Reset update flag
##############################################################
print("Requesting new data: "+mqtt_pub_topic)
client.publish(mqtt_pub_topic,"request") #Request new data from clients
print("INIT")
global url_client, keys, MaxCurrent, data, MinCurrent
global control_timeout, NbrClients
global mqtt_pub_topic, mqtt_sub_topics, client
parser = SafeConfigParser()
parser.read('control.ini')
# Creates an array containing data of X(NbrClients) clients
w, NbrClients = 8, int(parser.get('CLIENT', 'NbrClients'))
data = [[0 for x in range(w)] for y in range(NbrClients)]
for i in range(NbrClients):
data[i][0] = 3 #Init lademodus to stop
print(data) url_client = json.loads(parser.get('CLIENT',"url"))
MinCurrent = int(parser.get('CLIENT','MinCurrent'))
MaxCurrent = int(parser.get('CLIENT','MaxCurrent'))
mqtt_broker = parser.get('MQTT', 'mqtt_broker')
mqtt_port = int(parser.get('MQTT','mqtt_port'))
mqtt_client = parser.get('MQTT','mqtt_client')
mqtt_pub_topic = parser.get('MQTT','mqtt_pub_topic')
sub_topics = json.loads(parser.get('MQTT','mqtt_sub_topics'))
mqtt_sub_topics = [(sub_topics[0],0),] # create list with qos = 0
for i in range(NbrClients-1): # Add subtopics to list
mqtt_sub_topics = mqtt_sub_topics + [(sub_topics[i+1],0),]
control_timeout = float(parser.get('CONTROL','control_timeout'))
#Relevant keys of MQTT messages
keys=['mode','p1','p2','p3']
#######################################################
client = mqtt.Client() #create new instance
client.on_connect = on_connect #attach function to callback
client.on_message = on_message #attach function to callback
print("Connecting to broker")
client.connect(mqtt_broker,mqtt_port) #connect to broker
client.loop_start() #start the loop
main()
-
\$\begingroup\$ Thanks for your advise. I published the complete code \$\endgroup\$MSchuett– MSchuett2019年02月07日 11:27:12 +00:00Commented Feb 7, 2019 at 11:27
1 Answer 1
Logging
You have plenty of statements like this
print("CONNECTED") print("Connected with result code: ", str(rc)) print("subscribing to topics:")
Instead use the logging module,
import logging
logging.basicConfig(filename='example.log',level=logging.DEBUG)
...
logging.debug('Connected with result code: {}".format(rc))
You should use 'str {0}'.format()
or even f"{str}"
(Python3.6+) to concatenate strings for readability
Magic numbers
I see many instances of so call magic numbers,
for i in range(NbrClients): print("Updating properties of client: ",i) if data[i][0] != 3 and data[i][7] == 1: NbrActive += 1 if data[i][6] > data[i][1]: data[i][4] = 1 NbrLimit += 1 if sum(data[i][1:3]) < MinCurrent: data[i][5] = 1 NbrMin += 1 elif data[i][7] == 0: NbrOutdated += 1 data[i][1] = 32 data[i][2] = 32 data[i][3] = 32
What is the 7th index? or the 3th?
Why 32
in those data[i][x] = 32
Since numbers don't have meaning, you could consider changing them to constant variables -> SOME_VAR = 32
or at least document those in a nice docstring
Globals
global url_client, keys, MaxCurrent, data, MinCurrent global control_timeout, NbrClients global mqtt_pub_topic, mqtt_sub_topics, client
Globals are considered bad style, because you lose track of where each variable is called
If you have a lot of functions that require global variables, consider making it a class.
There is more to be improved, but I don't have much time. Hope this helps :)