[Python-checkins] bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450) (GH-9677)

Antoine Pitrou webhook-mailer at python.org
Tue Oct 2 17:36:20 EDT 2018


https://github.com/python/cpython/commit/07b96a95db78eff3557d1bfed1df9ebecc40815b
commit: 07b96a95db78eff3557d1bfed1df9ebecc40815b
branch: 3.6
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: Antoine Pitrou <pitrou at free.fr>
date: 2018年10月02日T23:36:15+02:00
summary:
bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450) (GH-9677)
Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.
(cherry picked from commit 97bfe8d3ebb0a54c8798f57555cb4152f9b2e1d0)
Co-authored-by: tzickel <tzickel at users.noreply.github.com>
files:
A Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst
M Lib/multiprocessing/pool.py
M Lib/test/_test_multiprocessing.py
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index a545f3c1a189..32254d8ea6cf 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -147,8 +147,9 @@ class Pool(object):
 '''
 _wrap_exception = True
 
- def Process(self, *args, **kwds):
- return self._ctx.Process(*args, **kwds)
+ @staticmethod
+ def Process(ctx, *args, **kwds):
+ return ctx.Process(*args, **kwds)
 
 def __init__(self, processes=None, initializer=None, initargs=(),
 maxtasksperchild=None, context=None):
@@ -175,13 +176,15 @@ def __init__(self, processes=None, initializer=None, initargs=(),
 
 self._worker_handler = threading.Thread(
 target=Pool._handle_workers,
- args=(self, )
+ args=(self._cache, self._taskqueue, self._ctx, self.Process,
+ self._processes, self._pool, self._inqueue, self._outqueue,
+ self._initializer, self._initargs, self._maxtasksperchild,
+ self._wrap_exception)
 )
 self._worker_handler.daemon = True
 self._worker_handler._state = RUN
 self._worker_handler.start()
 
-
 self._task_handler = threading.Thread(
 target=Pool._handle_tasks,
 args=(self._taskqueue, self._quick_put, self._outqueue,
@@ -207,43 +210,62 @@ def __init__(self, processes=None, initializer=None, initargs=(),
 exitpriority=15
 )
 
- def _join_exited_workers(self):
+ @staticmethod
+ def _join_exited_workers(pool):
 """Cleanup after any worker processes which have exited due to reaching
 their specified lifetime. Returns True if any workers were cleaned up.
 """
 cleaned = False
- for i in reversed(range(len(self._pool))):
- worker = self._pool[i]
+ for i in reversed(range(len(pool))):
+ worker = pool[i]
 if worker.exitcode is not None:
 # worker exited
 util.debug('cleaning up worker %d' % i)
 worker.join()
 cleaned = True
- del self._pool[i]
+ del pool[i]
 return cleaned
 
 def _repopulate_pool(self):
+ return self._repopulate_pool_static(self._ctx, self.Process,
+ self._processes,
+ self._pool, self._inqueue,
+ self._outqueue, self._initializer,
+ self._initargs,
+ self._maxtasksperchild,
+ self._wrap_exception)
+
+ @staticmethod
+ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception):
 """Bring the number of pool processes up to the specified number,
 for use after reaping workers which have exited.
 """
- for i in range(self._processes - len(self._pool)):
- w = self.Process(target=worker,
- args=(self._inqueue, self._outqueue,
- self._initializer,
- self._initargs, self._maxtasksperchild,
- self._wrap_exception)
- )
- self._pool.append(w)
+ for i in range(processes - len(pool)):
+ w = Process(ctx, target=worker,
+ args=(inqueue, outqueue,
+ initializer,
+ initargs, maxtasksperchild,
+ wrap_exception)
+ )
+ pool.append(w)
 w.name = w.name.replace('Process', 'PoolWorker')
 w.daemon = True
 w.start()
 util.debug('added worker')
 
- def _maintain_pool(self):
+ @staticmethod
+ def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
+ initializer, initargs, maxtasksperchild,
+ wrap_exception):
 """Clean up any exited workers and start replacements for them.
 """
- if self._join_exited_workers():
- self._repopulate_pool()
+ if Pool._join_exited_workers(pool):
+ Pool._repopulate_pool_static(ctx, Process, processes, pool,
+ inqueue, outqueue, initializer,
+ initargs, maxtasksperchild,
+ wrap_exception)
 
 def _setup_queues(self):
 self._inqueue = self._ctx.SimpleQueue()
@@ -396,16 +418,20 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
 return result
 
 @staticmethod
- def _handle_workers(pool):
+ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
+ inqueue, outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception):
 thread = threading.current_thread()
 
 # Keep maintaining workers until the cache gets drained, unless the pool
 # is terminated.
- while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
- pool._maintain_pool()
+ while thread._state == RUN or (cache and thread._state != TERMINATE):
+ Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
+ outqueue, initializer, initargs,
+ maxtasksperchild, wrap_exception)
 time.sleep(0.1)
 # send sentinel to stop workers
- pool._taskqueue.put(None)
+ taskqueue.put(None)
 util.debug('worker handler exiting')
 
 @staticmethod
@@ -781,7 +807,7 @@ class ThreadPool(Pool):
 _wrap_exception = False
 
 @staticmethod
- def Process(*args, **kwds):
+ def Process(ctx, *args, **kwds):
 from .dummy import Process
 return Process(*args, **kwds)
 
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 9d4917076c94..6667f1178574 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -2285,6 +2285,12 @@ def test_release_task_refs(self):
 # they were released too.
 self.assertEqual(CountedObject.n_instances, 0)
 
+ def test_del_pool(self):
+ p = self.Pool(1)
+ wr = weakref.ref(p)
+ del p
+ gc.collect()
+ self.assertIsNone(wr())
 
 def raising():
 raise KeyError("key")
diff --git a/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst b/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst
new file mode 100644
index 000000000000..d1c5a7721019
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-07-26-10-31-52.bpo-34172.8ovLNi.rst
@@ -0,0 +1 @@
+Fix a reference issue inside multiprocessing.Pool that caused the pool to remain alive if it was deleted without being closed or terminated explicitly.


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /