I want to execute a number of tasks that may enqueue other tasks, and do all that in parallel, ending only once all tasks are finished.
For example, some sort of crawler that starts with node A, discovers B, C, and D, then visits each of those and discovers other adjacent nodes.
If one task has an error, that error should be raised on the main thread.
My primary concerns are correctness and simplicity.
AFAIK, this is simplest way to do that, even with the new standard libraries like concurrent.futures
.
import queue
import threading
class WorkerRunner:
"""
Runner
:param int parallelism: Number of workers to run in parallel
:param handler: Handler that receives work and optionally yields other work
"""
def __init__(self, parallelism: int, handler):
self._handler = handler
self._parallelism = parallelism
def run(self, items):
queue_ = queue.Queue()
threads = []
for _ in range(self._parallelism):
thread = WorkerThread(queue_, self._handler)
thread.start()
threads.append(thread)
for item in items:
queue_.put(WorkItem(item))
queue_.join()
for _ in range(self._parallelism):
queue_.put(END_ITEM)
for thread in threads:
thread.result()
"""
End item
"""
END_ITEM = None
class WorkItem:
def __init__(self, value):
self.value = value
class WorkerThread(threading.Thread):
def __init__(self, queue: queue.Queue, handler):
super().__init__()
self._queue = queue
self._exception = None
self._handler = handler
def run(self):
try:
self._work()
except Exception as e:
self._exception = e
while True:
try:
self._queue.get(False)
except queue.Empty:
break
else:
self._queue.task_done()
print("here")
def result(self):
self.join()
if self._exception is not None:
raise self._exception
def _work(self):
while True:
item = self._queue.get()
try:
if item is END_ITEM:
break
result = self._handler(item.value)
if result is not None:
for item in result:
self._queue.put(WorkItem(item))
finally:
self._queue.task_done()
Example
def work(x: int):
print(x)
while 0 < x:
x -= 1
yield x
runner = WorkerRunner(2, work)
runner.run([3])
# 3
# 2
# 1
# 0
# 0
# 1
# 0
# 0
1 Answer 1
Perhaps I got off track somehow, but I found run()
and
_work()
difficult to understand. I was puzzled by the decision to use
two methods rather than one, I did not immediately grasp why two layers
of exception handling were needed, and the intent behind
the while True
loop in run()
wasn't immediately obvious.
Here are a few possible edits intended to simplify and clarify. Unless I overlooked a detail, the edits:
- do little more than move your
lines of code around, putting everything under
run()
other than the completely separable stuff moved to_drain_queue()
; - rename
result
towork_items
for clarity; and - add a few short comments to provide some simple narrative guidance to any future code readers.
def run(self):
while True:
# Get next work item, stopping on the sentinel.
item = self._queue.get()
if item is END_ITEM:
break
# Run the handler, enqueuing any work items it yields.
try:
work_items = self._handler(item.value)
if work_items:
for item in work_items:
self._queue.put(WorkItem(item))
# On error, store exception and drain queue.
except Exception as e:
self._exception = e
self._drain_queue()
# Either way, mark current task done.
finally:
self._queue.task_done()
def _drain_queue(self):
while True:
try:
self._queue.get(False)
except queue.Empty:
break
else:
self._queue.task_done()