[Python-checkins] cpython: Issue #14666: stop multiprocessing's resource-sharing thread after the tests

antoine.pitrou python-checkins at python.org
Fri Apr 27 23:52:35 CEST 2012


http://hg.python.org/cpython/rev/f163c4731c58
changeset: 76589:f163c4731c58
user: Antoine Pitrou <solipsis at pitrou.net>
date: Fri Apr 27 23:51:03 2012 +0200
summary:
 Issue #14666: stop multiprocessing's resource-sharing thread after the tests are done.
Also, block delivery of signals to that thread. Patch by Richard Oudkerk.
This will hopefully fix sporadic freezes on the FreeBSD 9.0 buildbot.
files:
 Lib/multiprocessing/reduction.py | 29 +++++++++++++++++++-
 Lib/test/test_multiprocessing.py | 5 +++
 2 files changed, 33 insertions(+), 1 deletions(-)
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -40,6 +40,7 @@
 import socket
 import threading
 import struct
+import signal
 
 from multiprocessing import current_process
 from multiprocessing.util import register_after_fork, debug, sub_debug
@@ -209,6 +210,7 @@
 self._lock = threading.Lock()
 self._listener = None
 self._address = None
+ self._thread = None
 register_after_fork(self, ResourceSharer._afterfork)
 
 def register(self, send, close):
@@ -227,6 +229,24 @@
 c.send((key, os.getpid()))
 return c
 
+ def stop(self, timeout=None):
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address, authkey=current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ sub_warn('ResourceSharer thread did not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
 def _afterfork(self):
 for key, (send, close) in self._cache.items():
 close()
@@ -239,6 +259,7 @@
 self._listener.close()
 self._listener = None
 self._address = None
+ self._thread = None
 
 def _start(self):
 from .connection import Listener
@@ -249,12 +270,18 @@
 t = threading.Thread(target=self._serve)
 t.daemon = True
 t.start()
+ self._thread = t
 
 def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
 while 1:
 try:
 conn = self._listener.accept()
- key, destination_pid = conn.recv()
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
 send, close = self._cache.pop(key)
 send(conn, destination_pid)
 close()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1966,6 +1966,11 @@
 ALLOWED_TYPES = ('processes',)
 
 @classmethod
+ def tearDownClass(cls):
+ from multiprocessing.reduction import resource_sharer
+ resource_sharer.stop(timeout=5)
+
+ @classmethod
 def _listener(cls, conn, families):
 for fam in families:
 l = cls.connection.Listener(family=fam)
-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /