I am building a tool that interacts with a batched stream of incoming data. This data needs to be processed and the result returned. To split up the work I have created a class that has inbound (_in
) and outbound (out
) queues and workers that are getting, processing, and depositing the work.
This example takes an iterable of numbers (in pass_data
) and multiplies them by f
.
import queue, random, time
from multiprocessing import Process, Queue
def _worker(_in, out, f):
"""Get work from _in and output processed data to out"""
while True:
try:
work = _in.get()
except queue.Empty:
continue
# simulate blocking for some time
time.sleep(random.uniform(0.01, 0.5))
out.put(work * f)
class C:
def __init__(self, f, threads=2):
self.f = f
self.threads = threads
self._in, self.out = Queue(), Queue()
self.args = (self._in, self.out, self.f)
self.workers = [
Process(target=_worker, args=self.args) for _ in range(self.threads)
]
def __repr__(self):
return f"{self.__class__.__name__}(threads={self.threads})"
def start(self):
"""Start all workers"""
for worker in self.workers:
worker.start()
def terminate(self):
"""Terminate all workers"""
for worker in self.workers:
worker.terminate()
def pass_data(self, data):
"""Pass data to the queue to be processed"""
for rec in data:
self._in.put(rec)
def get_completed(self):
"""Return a list of processed data"""
items = []
while True:
try:
items.append(self.out.get_nowait())
except queue.Empty:
break
return items
if __name__ == "__main__":
c = C(f=12, threads=2)
c.start()
for i in range(5):
s = 0
n = random.randint(1, 20)
c.pass_data(list(range(n)))
print(f"sent: {n}")
while s < n:
r = c.get_completed()
s += len(r)
if r:
print(len(r), end=", ")
time.sleep(random.uniform(0.01, 0.4))
print()
c.terminate()
This is, at the moment, a proof of concept. Are there any pitfalls to this method? Is there a better way to do this already?!
Aspects that I intend to address:
- queue size limits
- thread number limits
2 Answers 2
Here are some observations and things to consider.
Are you sure you need multiprocessing or threads? There isn't any information in the question to say why they may be needed. There is overhead for using them. Perhaps an input-compute-output loop is sufficient.
Do you anticipate the program to have throughput limited by IO or by CPU processing. The general rule of thumb is to use threads or asynchio
for the former and processes for the later.
Does it matter that results may not be returned in the same order they were submitted? Do they need to be time-stamped?
threads
is a confusing parameter name when using processes.
The current main code puts items in the input queue and gets items from the output queue. If the queues have limited sizes it will be possible to deadlock if the main code is blocked on adding to a full input queue and the workers are blocked from adding to a full output queue.
Use multiprocessing.Pool
The multiprocessing
library already has a worker pool implementation ready to be used. Your code could be rewritten as:
import time
from multiprocessing import Pool
def f(x):
time.sleep(random.uniform(0.01, 0.5))
return x * 12
if __name__ == "__main__":
c = Pool(2)
for i in range(5):
n = random.randint(1, 20)
r = c.map_async(f, list(range(n)))
print(f"sent: {n}")
print(f"got: {len(r.get())}")
While multiprocessing.Pool
allows you to check if the results are ready by using .ready()
on the result of an apply_async()
or map_async()
call, you can't get a partial result from map_async()
. However, if you do want to process individual results as soon as they are ready, you can consider calling apply_async()
with a callback function that handles the result.
in_
instead of_in
. I understand that simplyin
would conflict with the keyword. This is better because leading _ is in python considered variable for internal use only, and judging byout
that has no leading _, you don't want to hidein
\$\endgroup\$tkinter.Grid.config
\$\endgroup\$