2

`This code is an attempt to use a queue to feed tasks to a number worker processes.

I wanted to time the difference in speed between different number of process and different methods for handling data.

But the output is not doing what I thought it would.

from multiprocessing import Process, Queue
import time
result = []
base = 2
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 23, 45, 76, 4567, 65423, 45, 4, 3, 21]
# create queue for new tasks
new_tasks = Queue(maxsize=0)
# put tasks in queue
print('Putting tasks in Queue')
for i in data:
 new_tasks.put(i)
# worker function definition
def f(q, p_num):
 print('Starting process: {}'.format(p_num))
 while not q.empty():
 # mimic some process being done
 time.sleep(0.05)
 print(q.get(), p_num)
 print('Finished', p_num)
print('initiating processes')
processes = []
for i in range(0, 2):
 if __name__ == '__main__':
 print('Creating process {}'.format(i))
 p = Process(target=f, args=(new_tasks, i))
 processes.append(p)
#record start time
start = time.time()
# start process
for p in processes:
 p.start()
# wait for processes to finish processes
for p in processes:
 p.join()
#record end time
end = time.time()
# print time result
print('Time taken: {}'.format(end-start))

I expect this:

Putting tasks in Queue
initiating processes
Creating process 0
Creating process 1
Starting process: 1
Starting process: 0
1 1
2 0
3 1
4 0
5 1
6 0
7 1
8 0
9 1
10 0
11 1
23 0
45 1
76 0
4567 1
65423 0
45 1
4 0
3 1
21 0
Finished 1
Finished 0
Time taken: <some-time>

But instead I actually get this:

Putting tasks in Queue
initiating processes
Creating process 0
Creating process 1
Time taken: 0.01000523567199707
Putting tasks in Queue
Putting tasks in Queue
initiating processes
Time taken: 0.0
Starting process: 1
initiating processes
Time taken: 0.0
Starting process: 0
1 1
2 0
3 1
4 0
5 1
6 0
7 1
8 0
9 1
10 0
11 1
23 0
45 1
76 0
4567 1
65423 0
45 1
4 0
3 1
21 0
Finished 0

There seem to be two major problems, I am not sure how related they are:

  1. The print statements such as: Putting tasks in Queue initiating processes Time taken: 0.0 are repeated systematically though out the code - I say systematically becasue they repeat exactly every time.

  2. The second process never finishes, it never recognizes the queue is empty and therefore fails to exit

asked Aug 8, 2017 at 14:42
6
  • 1
    I sounds like you have code formatting problems: You should only ever have one Time taken:... printout. Commented Aug 8, 2017 at 15:18
  • 1
    Plus you should never poll q.empty() since a greedy thread might steal the last item and leave all the other threads waiting for items that will never appear. What you should use is an end of queue marker. One per thread. Commented Aug 8, 2017 at 15:21
  • Otherwise this is a good question. You have shown some effort in writing code and collecting output and shown what you were expecting to happen. Commented Aug 8, 2017 at 15:27
  • @quamrana you where right, polling q.empty() was the solution to 2 Commented Aug 8, 2017 at 16:50
  • 1
    You can be PEP8 compliant, but an extra or missing indentation can completely change a python program. Commented Aug 8, 2017 at 18:35

2 Answers 2

2

1) I cannot reproduce this.

2) Look at the following code:

while not q.empty():
 time.sleep(0.05)
 print(q.get(), p_num)

Each line can be run in any order by any proces. Now consider q having a single item and two processes A and B. Now consider the following order of execution:

# A runs
while not q.empty():
 time.sleep(0.05)
# B runs
while not q.empty():
 time.sleep(0.05)
# A runs
print(q.get(), p_num) # Removes and prints the last element of q
# B runs
print(q.get(), p_num) # q is now empty so q.get() blocks forever

Swapping the order of time.sleep and q.get removes the blocking in all of my runs, but it's still possible to have more than one processes enter the loop with a single item left.

The way to fix this is using a non-blocking get call and catching the queue.Empty exception:

import queue
while True:
 time.sleep(0.05)
 try:
 print(q.get(False), p_num)
 except queue.Empty:
 break
answered Aug 8, 2017 at 15:04
Sign up to request clarification or add additional context in comments.

2 Comments

nice answer, well explained, any thoughts on the other half of my problem?
No, looking at your the code I don't see any way that line could be printed more than once. Maybe try coping the code from the question to see if there is any difference with the code you are running
1

Your worker threads should be like this:

def f(q, p_num):
 print('Starting process: {}'.format(p_num))
 while True:
 value = q.get()
 if value is None:
 break
 # mimic some process being done
 time.sleep(0.05)
 print(value, p_num)
 print('Finished', p_num)

And the queue should be filled with markers after the real data:

for i in data:
 new_tasks.put(i)
for _ in range(num_of_threads):
 new_tasks.put(None)
answered Aug 8, 2017 at 15:24

5 Comments

you have chosen to use if then break instead of try - except . Is this just a speed issue?
Also, what are the 'markers' my queue should be filled with? I cannot find a mention of them in the documentation on queues or multi-processing or multi-threading? (a link would be lovely, I don't expect you ti write an essay in the comments)
@quarana surely the items need to be in the queue in order to be past to the process properly without sharing or 'locking' ?
I've used None as a marker. You need to pass as many of these in as you have threads. Even if one thread is exceptionally greedy and consumes all queued items, it will find the first None and quit. Then all the rest of the lazy threads, when they get round to calling get() will each find a None and quit as they get there. The None markers can be pushed into the Queue at any time after the real data has gone in, but before you want the threads to quit.
It could be that @ikkuh has a better answer in regard to finding the end of the queue.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.