I have a job where I get a lot of separate tasks through. For each task I need to download some data, process it and then upload it again.
I'm using a multiprocessing pool for the processing.
I have a couple of issues I'm unsure of though.
Firstly the data can be up to 20MB roughly, I ideally want to get it to the child worker process without physically moving it in memory and getting the resulting data back to the parent process as well without moving it. As I'm not sure how some tools are working under the hood I don't know if I can just pass the data as an argument to the pool's apply_async (from my understanding it serialises the objects and then they're created again once the reach the subprocess?), or if I should use a multiprocessing Queue or mmap maybe? or something else?
I looked at ctypes objects but from what I understand only the objects that are defined when the pool is created when the process forks can be shared? Which is no good for me as I'll continuously have new data coming in which I need to share.
One thing I shouldn't need to worry about is any concurrent access on the data so I shouldn't need any type of locking. This is because the processing will only start after the data has been downloaded, and the upload will also only start after the output data has been generated.
Another issue I'm having is that sometimes the tasks coming in might spike and as a result I'm downloading data for the tasks quicker than the child processes can process it. So therefore I'm downloading data quicker than I can finish the tasks and dispose of the data and python is dying from running out of memory. What would be a good way to hold up the tasks at the downloading stage when memory is almost full / too much data is in the job pipeline? I was thinking of some type of "ref" count by using the number of data bytes so I can limit the amount of data between download and upload and only download when the number is below some threshold. Although I'd be worried a child might sometimes fail and I'd never get to take the data it had off of the count. Is there a good way to achieve this kind of thing?
-
if your network can produce data faster than your pool of processes can process it then you shouldn't worry about moving data between processes: RAM is typically faster than network so your bottleneck is not in moving data between processes but how fast they can process it.jfs– jfs2012年11月15日 14:25:46 +00:00Commented Nov 15, 2012 at 14:25
-
@Sebastian The speed of the network is irrelevant here because the process can't start until all the data is in memory. If it was streaming from the network to the process then you'd be right. So the download happens, then the data has to get to the process (either by passing a reference, or by moving it physically to a new location in memory), and only then can the processing begin. So that time will add to the overall time.GP89– GP892012年11月15日 16:22:39 +00:00Commented Nov 15, 2012 at 16:22
-
read: "I'm downloading data for the tasks quicker than the child processes can process it." and then reread my previous comment.jfs– jfs2012年11月15日 18:36:57 +00:00Commented Nov 15, 2012 at 18:36
-
@Sebastian Ok sorry, I think I get you now. For some reason I was thinking the first thing the process needed to do was make a copy of the data, but actually that will probably be done when the object is shared. But either way, the process will have to make the copy either to get the initial data, or to put the resulting data- so it definitely will add to the overall time.GP89– GP892012年11月15日 19:04:03 +00:00Commented Nov 15, 2012 at 19:04
3 Answers 3
(This is an outcome of the discussion of my previous answer)
Have you tried POSH
This example shows that one can append elements to a mutable list, which is probably what you want (copied from documentation):
import posh
l = posh.share(range(3))
if posh.fork():
#parent process
l.append(3)
posh.waitall()
else:
# child process
l.append(4)
posh.exit(0)
print l
-- Output --
[0, 1, 2, 3, 4]
-- OR --
[0, 1, 2, 4, 3]
2 Comments
Segmentation fault.Here is cannonical example from multiprocessing documentation:
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
Note that num and arr are shared objects. Is it what you are looking for?
12 Comments
Value and Array objects that are defined when the process forks are shared. As I'm using a process pool that gets created when the program starts, before I have any data, I'm pretty sure I cant use these.I clobbered this together since I need to figure this out for myself anyway. I'm by no means very accomplished when it comes to multiprocessing or threading, but at least it works. Maybe it can be done in a smarter way, I couldn't figure out how to use the lock that comes with the non-raw Array type. Maybe someone will suggest improvements in comments.
from multiprocessing import Process, Event
from multiprocessing.sharedctypes import RawArray
def modify(s, task_event, result_event):
for i in range(4):
print "Worker: waiting for task"
task_event.wait()
task_event.clear()
print "Worker: got task"
s.value = s.value.upper()
result_event.set()
if __name__ == '__main__':
data_list = ("Data", "More data", "oh look, data!", "Captain Pickard")
task_event = Event()
result_event = Event()
s = RawArray('c', "X" * max(map(len, data_list)))
p = Process(target=modify, args=(s, task_event, result_event))
p.start()
for data in data_list:
s.value = data
task_event.set()
print "Sent new task. Waiting for results."
result_event.wait()
result_event.clear()
print "Got result: {0}".format(s.value)
p.join()
In this example, data_list is defined beforehand, but it need not be. The only information I needed from that list was the length of the longest string. As long as you have some practical upper bound for the length, it's no problem.
Here's the output of the program:
Sent new task. Waiting for results. Worker: waiting for task Worker: got task Worker: waiting for task Got result: DATA Sent new task. Waiting for results. Worker: got task Worker: waiting for task Got result: MORE DATA Sent new task. Waiting for results. Worker: got task Worker: waiting for task Got result: OH LOOK, DATA! Sent new task. Waiting for results. Worker: got task Got result: CAPTAIN PICKARD
As you can see, btel did in fact provide the solution, but the problem lay in keeping the two processes in lockstep with each other, so that the worker only starts working on a new task when the task is ready, and so that the main process doesn't read the result before it's complete.
4 Comments
Explore related questions
See similar questions with these tags.