I am new to amazing world of python, was developing a test system consist of continuous sense and test run. i have three or more while loops of which one is producer and rest two are consumers. did not understand multiprocessing very well, here are a sample code, first loop will create a data and second loop will get the data, how to impliment this in a infinity while loop, i will stop loop in the main program but asking your kind help to understand data exchange between while loops
from multiprocessing import Process,Queue
from time import sleep
q=Queue()
cnt=0
def send():
global cnt
while True:
sleep(1)
cnt=cnt+1
q.put(cnt,False)
print ("data Send:",cnt)
def rcv():
while True:
sleep(1)
newdata=q.get(cnt, False)
print ("data Received",newdata)
if __name__=='__main__':
p1=Process(target=send)
p2=Process(target=rcv)
p1.start()
p2.start()
p1.join()
p2.join()
-
If the queue is empty your consumer is blocked until something is on the queue, if the queue is full the producer is blocked untill there is free space. BTW you could get the same model with one thread using Python generators.Dan– Dan2016年09月19日 23:43:49 +00:00Commented Sep 19, 2016 at 23:43
2 Answers 2
I would suggest you to dive in to the documentation of the multiprocessing-library you are using.
Basically, you have two options. Queue and Pipe. Now you are using Queue,
q.put(cnt,False)
...
newdata=q.get(cnt, False)
this will crash because you will try to get data from and empty Queue at some point, so you will need to check the queue status before reading from it.
while not q.empty() and not q.task_done():
newdata = q.get(cnt)
Other than that, if you want to have multiple receivers, you need to think about some kind of mutexes (see multitprocessing.Lock), or multiprocessing.Pipe, since if one reader-process is just getting a value from the queue, and another one is checking the status of the queue, it will fail because the queue will actually be empty when the second one tries to read from it.
However, for this minimal example, using a mutex (mutual exclusive lock, prevents multiple processes accessing the same memory at the same time), will most likely negate the advantage gained when using multiple cores. However, if the different processes actually do some heavy calculation with the values before/after accessing the queue, the benefit gained will be greater than the loss from using a lock.
5 Comments
while not q.empty() and not q.task_done(): newdata = q.get(cnt)q.get(cnt, False) This might fail, in the consumer you want to block if the queue is empty but the producer is still processing. I think it should be q.get(cnt).def send():
cnt = -1
while True:
cnt += 1
yield cnt
def rcv(value):
... # logic
consumers = [rcv] * NUM_CONSUMERS
idx = 0
for val in send():
consumers[idx](val)
idx += 1
idx %= NUM_CONSUMERS - 1
10 Comments
multiprocessing to blocking I/O calls. Can you clarify what you mean? It's unclear what you're trying to say and is potentially misinformed.