[Python-checkins] cpython: Use WeakSets rather than manual pruning to prevent unbounded growth of dead

brian.quinlan python-checkins at python.org
Sun Mar 20 03:33:11 CET 2011


http://hg.python.org/cpython/rev/a3b061697b18
changeset: 68700:a3b061697b18
parent: 68691:00459310b136
user: Brian Quinlan <brian at sweetapp.com>
date: Sun Mar 20 13:11:11 2011 +1100
summary:
 Use WeakSets rather than manual pruning to prevent unbounded growth of dead thread references.
files:
 Lib/concurrent/futures/process.py
 Lib/concurrent/futures/thread.py
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -66,28 +66,14 @@
 # workers to exit when their work queues are empty and then waits until the
 # threads/processes finish.
 
-_thread_references = set()
+_live_threads = weakref.WeakSet()
 _shutdown = False
 
 def _python_exit():
 global _shutdown
 _shutdown = True
- for thread_reference in _thread_references:
- thread = thread_reference()
- if thread is not None:
- thread.join()
-
-def _remove_dead_thread_references():
- """Remove inactive threads from _thread_references.
-
- Should be called periodically to prevent memory leaks in scenarios such as:
- >>> while True:
- >>> ... t = ThreadPoolExecutor(max_workers=5)
- >>> ... t.map(int, ['1', '2', '3', '4', '5'])
- """
- for thread_reference in set(_thread_references):
- if thread_reference() is None:
- _thread_references.discard(thread_reference)
+ for thread in _live_threads:
+ thread.join()
 
 # Controls how many more calls than processes will be queued in the call queue.
 # A smaller number will mean that processes spend more time idle waiting for
@@ -279,7 +265,6 @@
 worker processes will be created as the machine has processors.
 """
 _check_system_limits()
- _remove_dead_thread_references()
 
 if max_workers is None:
 self._max_workers = multiprocessing.cpu_count()
@@ -316,7 +301,7 @@
 self._shutdown_process_event))
 self._queue_management_thread.daemon = True
 self._queue_management_thread.start()
- _thread_references.add(weakref.ref(self._queue_management_thread))
+ _live_threads.add(self._queue_management_thread)
 
 def _adjust_process_count(self):
 for _ in range(len(self._processes), self._max_workers):
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -25,29 +25,14 @@
 # workers to exit when their work queues are empty and then waits until the
 # threads finish.
 
-_thread_references = set()
+_live_threads = weakref.WeakSet()
 _shutdown = False
 
 def _python_exit():
 global _shutdown
 _shutdown = True
- for thread_reference in _thread_references:
- thread = thread_reference()
- if thread is not None:
- thread.join()
-
-def _remove_dead_thread_references():
- """Remove inactive threads from _thread_references.
-
- Should be called periodically to prevent memory leaks in scenarios such as:
- >>> while True:
- ... t = ThreadPoolExecutor(max_workers=5)
- ... t.map(int, ['1', '2', '3', '4', '5'])
- """
- for thread_reference in set(_thread_references):
- if thread_reference() is None:
- _thread_references.discard(thread_reference)
-
+ for thread in _live_threads:
+ thread.join()
 atexit.register(_python_exit)
 
 class _WorkItem(object):
@@ -95,8 +80,6 @@
 max_workers: The maximum number of threads that can be used to
 execute the given calls.
 """
- _remove_dead_thread_references()
-
 self._max_workers = max_workers
 self._work_queue = queue.Queue()
 self._threads = set()
@@ -125,7 +108,7 @@
 t.daemon = True
 t.start()
 self._threads.add(t)
- _thread_references.add(weakref.ref(t))
+ _live_threads.add(t)
 
 def shutdown(self, wait=True):
 with self._shutdown_lock:
-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /