`This code is an attempt to use a queue to feed tasks to a number worker processes.
I wanted to time the difference in speed between different number of process and different methods for handling data.
But the output is not doing what I thought it would.
from multiprocessing import Process, Queue
import time
result = []
base = 2
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 23, 45, 76, 4567, 65423, 45, 4, 3, 21]
# create queue for new tasks
new_tasks = Queue(maxsize=0)
# put tasks in queue
print('Putting tasks in Queue')
for i in data:
new_tasks.put(i)
# worker function definition
def f(q, p_num):
print('Starting process: {}'.format(p_num))
while not q.empty():
# mimic some process being done
time.sleep(0.05)
print(q.get(), p_num)
print('Finished', p_num)
print('initiating processes')
processes = []
for i in range(0, 2):
if __name__ == '__main__':
print('Creating process {}'.format(i))
p = Process(target=f, args=(new_tasks, i))
processes.append(p)
#record start time
start = time.time()
# start process
for p in processes:
p.start()
# wait for processes to finish processes
for p in processes:
p.join()
#record end time
end = time.time()
# print time result
print('Time taken: {}'.format(end-start))
I expect this:
Putting tasks in Queue
initiating processes
Creating process 0
Creating process 1
Starting process: 1
Starting process: 0
1 1
2 0
3 1
4 0
5 1
6 0
7 1
8 0
9 1
10 0
11 1
23 0
45 1
76 0
4567 1
65423 0
45 1
4 0
3 1
21 0
Finished 1
Finished 0
Time taken: <some-time>
But instead I actually get this:
Putting tasks in Queue
initiating processes
Creating process 0
Creating process 1
Time taken: 0.01000523567199707
Putting tasks in Queue
Putting tasks in Queue
initiating processes
Time taken: 0.0
Starting process: 1
initiating processes
Time taken: 0.0
Starting process: 0
1 1
2 0
3 1
4 0
5 1
6 0
7 1
8 0
9 1
10 0
11 1
23 0
45 1
76 0
4567 1
65423 0
45 1
4 0
3 1
21 0
Finished 0
There seem to be two major problems, I am not sure how related they are:
The print statements such as:
Putting tasks in Queueinitiating processesTime taken: 0.0are repeated systematically though out the code - I say systematically becasue they repeat exactly every time.The second process never finishes, it never recognizes the queue is empty and therefore fails to exit
2 Answers 2
1) I cannot reproduce this.
2) Look at the following code:
while not q.empty():
time.sleep(0.05)
print(q.get(), p_num)
Each line can be run in any order by any proces. Now consider q having a single item and two processes A and B. Now consider the following order of execution:
# A runs
while not q.empty():
time.sleep(0.05)
# B runs
while not q.empty():
time.sleep(0.05)
# A runs
print(q.get(), p_num) # Removes and prints the last element of q
# B runs
print(q.get(), p_num) # q is now empty so q.get() blocks forever
Swapping the order of time.sleep and q.get removes the blocking in all of my runs, but it's still possible to have more than one processes enter the loop with a single item left.
The way to fix this is using a non-blocking get call and catching the queue.Empty exception:
import queue
while True:
time.sleep(0.05)
try:
print(q.get(False), p_num)
except queue.Empty:
break
2 Comments
Your worker threads should be like this:
def f(q, p_num):
print('Starting process: {}'.format(p_num))
while True:
value = q.get()
if value is None:
break
# mimic some process being done
time.sleep(0.05)
print(value, p_num)
print('Finished', p_num)
And the queue should be filled with markers after the real data:
for i in data:
new_tasks.put(i)
for _ in range(num_of_threads):
new_tasks.put(None)
5 Comments
if then break instead of try - except . Is this just a speed issue?None as a marker. You need to pass as many of these in as you have threads. Even if one thread is exceptionally greedy and consumes all queued items, it will find the first None and quit. Then all the rest of the lazy threads, when they get round to calling get() will each find a None and quit as they get there. The None markers can be pushed into the Queue at any time after the real data has gone in, but before you want the threads to quit.
Time taken:...printout.q.empty()since a greedy thread might steal the last item and leave all the other threads waiting for items that will never appear. What you should use is an end of queue marker. One per thread.