changeset: 71137:51c1f2cedb96 user: Antoine Pitrou date: Sat Jul 02 21:20:25 2011 +0200 files: Lib/concurrent/futures/process.py Lib/test/test_concurrent_futures.py description: Issue #12456: fix a possible hang on shutdown of a concurrent.futures.ProcessPoolExecutor. diff -r 42193f3ffc94 -r 51c1f2cedb96 Lib/concurrent/futures/process.py --- a/Lib/concurrent/futures/process.py Sat Jul 02 18:42:21 2011 +0100 +++ b/Lib/concurrent/futures/process.py Sat Jul 02 21:20:25 2011 +0200 @@ -50,7 +50,7 @@ from concurrent.futures import _base import queue import multiprocessing -from multiprocessing.queues import SimpleQueue, SentinelReady +from multiprocessing.queues import SimpleQueue, SentinelReady, Full import threading import weakref @@ -195,6 +195,10 @@ result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. """ + executor = None + + def shutting_down(): + return _shutdown or executor is None or executor._shutdown_thread def shutdown_worker(): # This is an upper bound @@ -202,8 +206,7 @@ for i in range(0, nb_children_alive): call_queue.put(None) # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. + # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() @@ -222,7 +225,7 @@ if executor is not None: executor._broken = True executor._shutdown_thread = True - del executor + executor = None # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): work_item.future.set_exception( @@ -242,7 +245,11 @@ if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) + assert shutting_down() del processes[result_item] + if not processes: + shutdown_worker() + return elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) @@ -257,16 +264,21 @@ # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: + if shutting_down(): # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. - if not pending_work_items: + if not pending_work_items and call_queue.qsize() == 0: shutdown_worker() return - else: + try: # Start shutting down by telling a process it can exit. - call_queue.put(None) - del executor + call_queue.put_nowait(None) + except Full: + # This is not a problem: we will eventually be woken up (in + # result_queue.get()) and be able to send a sentinel again, + # if necessary. + pass + executor = None _system_limits_checked = False _system_limited = None diff -r 42193f3ffc94 -r 51c1f2cedb96 Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Sat Jul 02 18:42:21 2011 +0100 +++ b/Lib/test/test_concurrent_futures.py Sat Jul 02 21:20:25 2011 +0200 @@ -367,6 +367,13 @@ self.assertEqual([None, None], results) + def test_shutdown_race_issue12456(self): + # Issue #12456: race condition at shutdown where trying to post a + # sentinel in the call queue blocks (the queue is full while processes + # have exited). + self.executor.map(str, [2] * (self.worker_count + 1)) + self.executor.shutdown() + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): def test_map_submits_without_iteration(self):

AltStyle によって変換されたページ (->オリジナル) /