[Python-checkins] bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool (#11488)

Pablo Galindo webhook-mailer at python.org
Sat Mar 16 18:34:27 EDT 2019


https://github.com/python/cpython/commit/7c994549dcffd0d9d3bb37475e6374f356e7240e
commit: 7c994549dcffd0d9d3bb37475e6374f356e7240e
branch: master
author: Pablo Galindo <Pablogsal at gmail.com>
committer: GitHub <noreply at github.com>
date: 2019年03月16日T22:34:24Z
summary:
bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool (#11488)
* bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool
* Use self-pipe pattern to avoid polling for changes
* Refactor some variable names and add comments
* Restore timeout and poll
* Use reader object only on wait()
* Recompute worker sentinels every time
* Remove timeout and use change notifier
* Refactor some methods to be overloaded by the ThreadPool, document the cache class and fix typos
files:
A Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst
M Lib/multiprocessing/pool.py
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 18a56f8524b4..665ca067fa07 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -21,11 +21,13 @@
 import time
 import traceback
 import warnings
+from queue import Empty
 
 # If threading is available then ThreadPool should be provided. Therefore
 # we avoid top-level imports which are liable to fail on some systems.
 from . import util
 from . import get_context, TimeoutError
+from .connection import wait
 
 #
 # Constants representing the state of a pool
@@ -145,6 +147,29 @@ def _helper_reraises_exception(ex):
 # Class representing a process pool
 #
 
+class _PoolCache(dict):
+ """
+ Class that implements a cache for the Pool class that will notify
+ the pool management threads every time the cache is emptied. The
+ notification is done by the use of a queue that is provided when
+ instantiating the cache.
+ """
+ def __init__(self, *args, notifier=None, **kwds):
+ self.notifier = notifier
+ super().__init__(*args, **kwds)
+
+ def __delitem__(self, item):
+ super().__delitem__(item)
+
+ # Notify that the cache is empty. This is important because the
+ # pool keeps maintaining workers until the cache gets drained. This
+ # eliminates a race condition in which a task is finished after the
+ # the pool's _handle_workers method has enter another iteration of the
+ # loop. In this situation, the only event that can wake up the pool
+ # is the cache to be emptied (no more tasks available).
+ if not self:
+ self.notifier.put(None)
+
 class Pool(object):
 '''
 Class which supports an async version of applying functions to arguments.
@@ -165,7 +190,11 @@ def __init__(self, processes=None, initializer=None, initargs=(),
 self._ctx = context or get_context()
 self._setup_queues()
 self._taskqueue = queue.SimpleQueue()
- self._cache = {}
+ # The _change_notifier queue exist to wake up self._handle_workers()
+ # when the cache (self._cache) is empty or when there is a change in
+ # the _state variable of the thread that runs _handle_workers.
+ self._change_notifier = self._ctx.SimpleQueue()
+ self._cache = _PoolCache(notifier=self._change_notifier)
 self._maxtasksperchild = maxtasksperchild
 self._initializer = initializer
 self._initargs = initargs
@@ -189,12 +218,14 @@ def __init__(self, processes=None, initializer=None, initargs=(),
 p.join()
 raise
 
+ sentinels = self._get_sentinels()
+
 self._worker_handler = threading.Thread(
 target=Pool._handle_workers,
 args=(self._cache, self._taskqueue, self._ctx, self.Process,
 self._processes, self._pool, self._inqueue, self._outqueue,
 self._initializer, self._initargs, self._maxtasksperchild,
- self._wrap_exception)
+ self._wrap_exception, sentinels, self._change_notifier)
 )
 self._worker_handler.daemon = True
 self._worker_handler._state = RUN
@@ -221,7 +252,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
 self._terminate = util.Finalize(
 self, self._terminate_pool,
 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
- self._worker_handler, self._task_handler,
+ self._change_notifier, self._worker_handler, self._task_handler,
 self._result_handler, self._cache),
 exitpriority=15
 )
@@ -233,6 +264,8 @@ def __del__(self, _warn=warnings.warn, RUN=RUN):
 if self._state == RUN:
 _warn(f"unclosed running multiprocessing pool {self!r}",
 ResourceWarning, source=self)
+ if getattr(self, '_change_notifier', None) is not None:
+ self._change_notifier.put(None)
 
 def __repr__(self):
 cls = self.__class__
@@ -240,6 +273,16 @@ def __repr__(self):
 f'state={self._state} '
 f'pool_size={len(self._pool)}>')
 
+ def _get_sentinels(self):
+ task_queue_sentinels = [self._outqueue._reader]
+ self_notifier_sentinels = [self._change_notifier._reader]
+ return [*task_queue_sentinels, *self_notifier_sentinels]
+
+ @staticmethod
+ def _get_worker_sentinels(workers):
+ return [worker.sentinel for worker in
+ workers if hasattr(worker, "sentinel")]
+
 @staticmethod
 def _join_exited_workers(pool):
 """Cleanup after any worker processes which have exited due to reaching
@@ -452,18 +495,28 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
 return result
 
 @staticmethod
- def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
- inqueue, outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception):
+ def _wait_for_updates(sentinels, change_notifier, timeout=None):
+ wait(sentinels, timeout=timeout)
+ while not change_notifier.empty():
+ change_notifier.get()
+
+ @classmethod
+ def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
+ pool, inqueue, outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception, sentinels,
+ change_notifier):
 thread = threading.current_thread()
 
 # Keep maintaining workers until the cache gets drained, unless the pool
 # is terminated.
 while thread._state == RUN or (cache and thread._state != TERMINATE):
- Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
- outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception)
- time.sleep(0.1)
+ cls._maintain_pool(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception)
+
+ current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
+
+ cls._wait_for_updates(current_sentinels, change_notifier)
 # send sentinel to stop workers
 taskqueue.put(None)
 util.debug('worker handler exiting')
@@ -593,11 +646,13 @@ def close(self):
 if self._state == RUN:
 self._state = CLOSE
 self._worker_handler._state = CLOSE
+ self._change_notifier.put(None)
 
 def terminate(self):
 util.debug('terminating pool')
 self._state = TERMINATE
 self._worker_handler._state = TERMINATE
+ self._change_notifier.put(None)
 self._terminate()
 
 def join(self):
@@ -622,7 +677,7 @@ def _help_stuff_finish(inqueue, task_handler, size):
 time.sleep(0)
 
 @classmethod
- def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
+ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
 worker_handler, task_handler, result_handler, cache):
 # this is guaranteed to only be called once
 util.debug('finalizing pool')
@@ -638,6 +693,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
 "Cannot have cache with result_hander not alive")
 
 result_handler._state = TERMINATE
+ change_notifier.put(None)
 outqueue.put(None) # sentinel
 
 # We must wait for the worker handler to exit before terminating
@@ -871,6 +927,13 @@ def _setup_queues(self):
 self._quick_put = self._inqueue.put
 self._quick_get = self._outqueue.get
 
+ def _get_sentinels(self):
+ return [self._change_notifier._reader]
+
+ @staticmethod
+ def _get_worker_sentinels(workers):
+ return []
+
 @staticmethod
 def _help_stuff_finish(inqueue, task_handler, size):
 # drain inqueue, and put sentinels at its head to make workers finish
@@ -881,3 +944,6 @@ def _help_stuff_finish(inqueue, task_handler, size):
 pass
 for i in range(size):
 inqueue.put(None)
+
+ def _wait_for_updates(self, sentinels, change_notifier, timeout):
+ time.sleep(timeout)
diff --git a/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst b/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst
new file mode 100644
index 000000000000..fa408c8163b7
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst
@@ -0,0 +1,3 @@
+Use :func:`multiprocessing.connection.wait` instead of polling each 0.2
+seconds for worker updates in :class:`multiprocessing.Pool`. Patch by Pablo
+Galindo.


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /