Index: Lib/threading.py =================================================================== --- Lib/threading.py (revision 82754) +++ Lib/threading.py (working copy) @@ -175,27 +175,7 @@ def __init__(self, lock=None, verbose=None): _Verbose.__init__(self, verbose) - if lock is None: - lock = RLock() - self._lock = lock - # Export the lock's acquire() and release() methods - self.acquire = lock.acquire - self.release = lock.release - # If the lock defines _release_save() and/or _acquire_restore(), - # these override the default implementations (which just call - # release() and acquire() on the lock). Ditto for _is_owned(). - try: - self._release_save = lock._release_save - except AttributeError: - pass - try: - self._acquire_restore = lock._acquire_restore - except AttributeError: - pass - try: - self._is_owned = lock._is_owned - except AttributeError: - pass + self._reset_lock(lock) self._waiters = [] def __enter__(self): @@ -222,6 +202,30 @@ else: return True + def _reset_lock(self, lock=None): + """Throw away the old lock and replace it with this one.""" + if lock is None: + lock = RLock() + self._lock = lock + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release + # If the lock defines _release_save() and/or _acquire_restore(), + # these override the default implementations (which just call + # release() and acquire() on the lock). Ditto for _is_owned(). + try: + self._release_save = lock._release_save + except AttributeError: + pass + try: + self._acquire_restore = lock._acquire_restore + except AttributeError: + pass + try: + self._is_owned = lock._is_owned + except AttributeError: + pass + def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") @@ -859,6 +863,10 @@ # its new value since it can have changed. ident = _get_ident() thread._ident = ident + # Any locks hanging off of the active thread may be in an + # invalid state, so we reset them. + thread._block._reset_lock() + thread._started._cond._reset_lock() new_active[ident] = thread else: # All the others are already stopped. Index: Lib/test/test_threading.py =================================================================== --- Lib/test/test_threading.py (revision 82754) +++ Lib/test/test_threading.py (working copy) @@ -11,6 +11,7 @@ import unittest import weakref import os +import subprocess from test import lock_tests @@ -271,7 +272,6 @@ except ImportError: raise unittest.SkipTest("cannot import ctypes") - import subprocess rc = subprocess.call([sys.executable, "-c", """if 1: import ctypes, sys, time, _thread @@ -302,7 +302,6 @@ def test_finalize_with_trace(self): # Issue1733757 # Avoid a deadlock when sys.settrace steps into threading._shutdown - import subprocess rc = subprocess.call([sys.executable, "-c", """if 1: import sys, threading @@ -330,7 +329,6 @@ def test_join_nondaemon_on_shutdown(self): # Issue 1722344 # Raising SystemExit skipped threading._shutdown - import subprocess p = subprocess.Popen([sys.executable, "-c", """if 1: import threading from time import sleep @@ -429,7 +427,6 @@ sys.stdout.flush() \n""" + script - import subprocess p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) rc = p.wait() data = p.stdout.read().decode().replace('\r', '') @@ -494,7 +491,90 @@ """ self._run_and_join(script) + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + def test_4_joining_across_fork_in_worker_thread(self): + # There used to be a possible deadlock when forking from a child + # thread. See http://bugs.python.org/issue6643. + # Skip platforms with known problems forking from a worker thread. + # See http://bugs.python.org/issue3863. + if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): + raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) + + # The script takes the following steps: + # - The main thread in the parent process starts a new thread and then + # tries to join it. + # - The join operation acquires the Lock inside the thread's _block + # Condition. (See threading.py:Thread.join().) + # - We stub out the acquire method on the condition to force it to wait + # until the child thread forks. (See LOCK ACQUIRED HERE) + # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS + # HERE) + # - The main thread of the parent process enters Condition.wait(), + # which releases the lock on the child thread. + # - The child process returns. Without the necessary fix, when the + # main thread of the child process (which used to be the child thread + # in the parent process) attempts to exit, it will try to acquire the + # lock in the Thread._block Condition object and hang, because the + # lock was held across the fork. + + script = """if 1: + import os, time, threading + + finish_join = False + start_fork = False + + def worker(): + # Wait until this thread's lock is acquired before forking to + # create the deadlock. + global finish_join + while not start_fork: + time.sleep(0.01) + # LOCK HELD: Main thread holds lock across this call. + childpid = os.fork() + finish_join = True + if childpid != 0: + # Parent process just waits for child. + os.waitpid(childpid, 0) + # Child process should just return. + + w = threading.Thread(target=worker) + + # Stub out the private condition variable's lock acquire method. + # This acquires the lock and then waits until the child has forked + # before returning, which will release the lock soon after. If + # someone else tries to fix this test case by acquiring this lock + # before forking instead of reseting it, the test case will + # deadlock when it shouldn't. + condition = w._block + orig_acquire = condition.acquire + call_count_lock = threading.Lock() + call_count = 0 + def my_acquire(): + global call_count + global start_fork + orig_acquire() # LOCK ACQUIRED HERE + start_fork = True + if call_count == 0: + while not finish_join: + time.sleep(0.01) # WORKER THREAD FORKS HERE + with call_count_lock: + call_count += 1 + condition.acquire = my_acquire + + w.start() + w.join() + print('end of main') + """ + p = subprocess.Popen([sys.executable, "-c", script], + stdout=subprocess.PIPE) + rc = p.wait() + data = p.stdout.read().decode().replace('\r', '') + self.assertEqual(data, "end of main\n") + self.assertFalse(rc == 2, "interpreter was blocked") + self.assertTrue(rc == 0, "Unexpected error") + + class ThreadingExceptionTests(BaseTestCase): # A RuntimeError should be raised if Thread.start() is called # multiple times. @@ -549,6 +629,7 @@ SemaphoreTests, BoundedSemaphoreTests, ThreadTests, ThreadJoinOnShutdown, + ThreadAndForkTests, ThreadingExceptionTests, )

AltStyle によって変換されたページ (->オリジナル) /