[Python-checkins] cpython: Issue #12456: fix a possible hang on shutdown of a

antoine.pitrou python-checkins at python.org
Sat Jul 2 21:21:14 CEST 2011


http://hg.python.org/cpython/rev/51c1f2cedb96
changeset: 71137:51c1f2cedb96
user: Antoine Pitrou <solipsis at pitrou.net>
date: Sat Jul 02 21:20:25 2011 +0200
summary:
 Issue #12456: fix a possible hang on shutdown of a concurrent.futures.ProcessPoolExecutor.
files:
 Lib/concurrent/futures/process.py | 30 +++++++++++-----
 Lib/test/test_concurrent_futures.py | 7 +++
 2 files changed, 28 insertions(+), 9 deletions(-)
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -50,7 +50,7 @@
 from concurrent.futures import _base
 import queue
 import multiprocessing
-from multiprocessing.queues import SimpleQueue, SentinelReady
+from multiprocessing.queues import SimpleQueue, SentinelReady, Full
 import threading
 import weakref
 
@@ -195,6 +195,10 @@
 result_queue: A multiprocessing.Queue of _ResultItems generated by the
 process workers.
 """
+ executor = None
+
+ def shutting_down():
+ return _shutdown or executor is None or executor._shutdown_thread
 
 def shutdown_worker():
 # This is an upper bound
@@ -202,8 +206,7 @@
 for i in range(0, nb_children_alive):
 call_queue.put(None)
 # If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS
- # X.
+ # some multiprocessing.Queue methods may deadlock on Mac OS X.
 for p in processes.values():
 p.join()
 
@@ -222,7 +225,7 @@
 if executor is not None:
 executor._broken = True
 executor._shutdown_thread = True
- del executor
+ executor = None
 # All futures in flight must be marked failed
 for work_id, work_item in pending_work_items.items():
 work_item.future.set_exception(
@@ -242,7 +245,11 @@
 if isinstance(result_item, int):
 # Clean shutdown of a worker using its PID
 # (avoids marking the executor broken)
+ assert shutting_down()
 del processes[result_item]
+ if not processes:
+ shutdown_worker()
+ return
 elif result_item is not None:
 work_item = pending_work_items.pop(result_item.work_id, None)
 # work_item can be None if another process terminated (see above)
@@ -257,16 +264,21 @@
 # - The interpreter is shutting down OR
 # - The executor that owns this worker has been collected OR
 # - The executor that owns this worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown_thread:
+ if shutting_down():
 # Since no new work items can be added, it is safe to shutdown
 # this thread if there are no pending work items.
- if not pending_work_items:
+ if not pending_work_items and call_queue.qsize() == 0:
 shutdown_worker()
 return
- else:
+ try:
 # Start shutting down by telling a process it can exit.
- call_queue.put(None)
- del executor
+ call_queue.put_nowait(None)
+ except Full:
+ # This is not a problem: we will eventually be woken up (in
+ # result_queue.get()) and be able to send a sentinel again,
+ # if necessary.
+ pass
+ executor = None
 
 _system_limits_checked = False
 _system_limited = None
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -367,6 +367,13 @@
 
 self.assertEqual([None, None], results)
 
+ def test_shutdown_race_issue12456(self):
+ # Issue #12456: race condition at shutdown where trying to post a
+ # sentinel in the call queue blocks (the queue is full while processes
+ # have exited).
+ self.executor.map(str, [2] * (self.worker_count + 1))
+ self.executor.shutdown()
+
 
 class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
 def test_map_submits_without_iteration(self):
-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /