36

I am trying to use a worker Pool in python using Process objects. Each worker (a Process) does some initialization (takes a non-trivial amount of time), gets passed a series of jobs (ideally using map()), and returns something. No communication is necessary beyond that. However, I can't seem to figure out how to use map() to use my worker's compute() function.

from multiprocessing import Pool, Process
class Worker(Process):
 def __init__(self):
 print 'Worker started'
 # do some initialization here
 super(Worker, self).__init__()
 def compute(self, data):
 print 'Computing things!'
 return data * data
if __name__ == '__main__':
 # This works fine
 worker = Worker()
 print worker.compute(3)
 # workers get initialized fine
 pool = Pool(processes = 4,
 initializer = Worker)
 data = range(10)
 # How to use my worker pool?
 result = pool.map(compute, data)

Is a job queue the way to go instead, or can I use map()?

asked Jan 27, 2012 at 19:10
3
  • All process objects are stateful. You might want to remove that word from the title. Also. compute is a method of a Worker. In the examples it's usually a completely stand-alone function. Why not write the compute function to simply include both initialization and processing? Commented Jan 27, 2012 at 19:48
  • 1
    Fair enough, thanks. The initialization takes a long time, so I only want to do it once per worker process. Commented Jan 27, 2012 at 20:14
  • You must want to emphasize the "gets passed a series of jobs" part of the question. Since that wasn't obvious. Commented Jan 27, 2012 at 20:19

3 Answers 3

68

I would suggest that you use a Queue for this.

class Worker(Process):
 def __init__(self, queue):
 super(Worker, self).__init__()
 self.queue = queue
 def run(self):
 print('Worker started')
 # do some initialization here
 print('Computing things!')
 for data in iter(self.queue.get, None):
 # Use data

Now you can start a pile of these, all getting work from a single queue

request_queue = Queue()
for i in range(4):
 Worker(request_queue).start()
for data in the_real_source:
 request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
 request_queue.put(None) 

That kind of thing should allow you to amortize the expensive startup cost across multiple workers.

s3bw
3,0872 gold badges23 silver badges31 bronze badges
answered Jan 27, 2012 at 20:59
Sign up to request clarification or add additional context in comments.

5 Comments

That's what I figured, thanks! I ended up using a job queue (input) and result queue (output) to synchronize everything.
you example is awesome, i try right now how to input the sentinel objects when strg + c is pressed without an exepction
@S.Lott: Isn't it that Queue isn't pickle-able? and that's why you use multiprocessing.Manager().Queue?
It is really much more customizable than the default multiprocessing.Pool()!!!
This way will create 4 separate processes. If you want a pool of workers to run in a single process, use Pool docs.python.org/2/library/…
7

initializer expects an arbitrary callable that does initilization e.g., it can set some globals, not a Process subclass; map accepts an arbitrary iterable:

#!/usr/bin/env python
import multiprocessing as mp
def init(val):
 print('do some initialization here')
def compute(data):
 print('Computing things!')
 return data * data
def produce_data():
 yield -100
 for i in range(10):
 yield i
 yield 100
if __name__=="__main__":
 p = mp.Pool(initializer=init, initargs=('arg',))
 print(p.map(compute, produce_data()))
answered Jan 27, 2012 at 21:41

Comments

2

Since python 3.3 you can use starmap, also for using multiple arguments AND getting back the results in a very simplistic syntax:

import multiprocessing
nb_cores = multiprocessing.cpu_count()
def caps(nb, letter):
 print('Exec nb:', nb)
 return letter.upper()
if __name__ == '__main__':
 multiprocessing.freeze_support() # for Windows, also requires to be in the statement: if __name__ == '__main__'
 input_data = ['a','b','c','d','e','f','g','h']
 input_order = [1,2,3,4,5,6,7,8,9]
 with multiprocessing.Pool(processes=nb_cores) as pool: # auto closing workers
 results = pool.starmap(caps, zip(input_order, input_data))
 print(results)
answered Aug 16, 2019 at 8:39

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.