5
\$\begingroup\$

I want to execute a number of tasks that may enqueue other tasks, and do all that in parallel, ending only once all tasks are finished.

For example, some sort of crawler that starts with node A, discovers B, C, and D, then visits each of those and discovers other adjacent nodes.

If one task has an error, that error should be raised on the main thread.

My primary concerns are correctness and simplicity.

AFAIK, this is simplest way to do that, even with the new standard libraries like concurrent.futures.

import queue
import threading
class WorkerRunner:
 """
 Runner
 :param int parallelism: Number of workers to run in parallel
 :param handler: Handler that receives work and optionally yields other work 
 """
 def __init__(self, parallelism: int, handler):
 self._handler = handler
 self._parallelism = parallelism
 def run(self, items):
 queue_ = queue.Queue()
 threads = []
 for _ in range(self._parallelism):
 thread = WorkerThread(queue_, self._handler)
 thread.start()
 threads.append(thread)
 for item in items:
 queue_.put(WorkItem(item))
 queue_.join()
 for _ in range(self._parallelism):
 queue_.put(END_ITEM)
 for thread in threads:
 thread.result()
"""
End item
"""
END_ITEM = None
class WorkItem:
 def __init__(self, value):
 self.value = value
class WorkerThread(threading.Thread):
 def __init__(self, queue: queue.Queue, handler):
 super().__init__()
 self._queue = queue
 self._exception = None
 self._handler = handler
 def run(self):
 try:
 self._work()
 except Exception as e:
 self._exception = e
 while True:
 try:
 self._queue.get(False)
 except queue.Empty:
 break
 else:
 self._queue.task_done()
 print("here")
 def result(self):
 self.join()
 if self._exception is not None:
 raise self._exception
 def _work(self):
 while True:
 item = self._queue.get()
 try:
 if item is END_ITEM:
 break
 result = self._handler(item.value)
 if result is not None:
 for item in result:
 self._queue.put(WorkItem(item))
 finally:
 self._queue.task_done()

Example

def work(x: int):
 print(x)
 while 0 < x:
 x -= 1
 yield x
runner = WorkerRunner(2, work)
runner.run([3])
# 3
# 2
# 1
# 0
# 0
# 1
# 0
# 0
asked Mar 8, 2021 at 17:02
\$\endgroup\$
0

1 Answer 1

4
\$\begingroup\$

Perhaps I got off track somehow, but I found run() and _work() difficult to understand. I was puzzled by the decision to use two methods rather than one, I did not immediately grasp why two layers of exception handling were needed, and the intent behind the while True loop in run() wasn't immediately obvious.

Here are a few possible edits intended to simplify and clarify. Unless I overlooked a detail, the edits:

  1. do little more than move your lines of code around, putting everything under run() other than the completely separable stuff moved to _drain_queue();
  2. rename result to work_items for clarity; and
  3. add a few short comments to provide some simple narrative guidance to any future code readers.
def run(self):
 while True:
 # Get next work item, stopping on the sentinel.
 item = self._queue.get()
 if item is END_ITEM:
 break
 # Run the handler, enqueuing any work items it yields.
 try:
 work_items = self._handler(item.value)
 if work_items:
 for item in work_items:
 self._queue.put(WorkItem(item))
 # On error, store exception and drain queue.
 except Exception as e:
 self._exception = e
 self._drain_queue()
 # Either way, mark current task done.
 finally:
 self._queue.task_done()
def _drain_queue(self):
 while True:
 try:
 self._queue.get(False)
 except queue.Empty:
 break
 else:
 self._queue.task_done()
Peilonrayz
44.4k7 gold badges80 silver badges157 bronze badges
answered Mar 9, 2021 at 1:20
\$\endgroup\$
0

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.