[Python-checkins] cpython (3.2): Add tests for the atexit hook in concurrent.futures (part of #11635)

antoine.pitrou python-checkins at python.org
Thu Mar 24 15:49:10 CET 2011


http://hg.python.org/cpython/rev/76a898433a02
changeset: 68893:76a898433a02
branch: 3.2
parent: 68890:2e5aff2a9e54
user: Antoine Pitrou <solipsis at pitrou.net>
date: Thu Mar 24 15:47:39 2011 +0100
summary:
 Add tests for the atexit hook in concurrent.futures (part of #11635)
files:
 Lib/test/test_concurrent_futures.py | 55 ++++++++++++----
 1 files changed, 40 insertions(+), 15 deletions(-)
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -9,6 +9,9 @@
 # without thread support.
 test.support.import_module('threading')
 
+from test.script_helper import assert_python_ok
+
+import sys
 import threading
 import time
 import unittest
@@ -43,9 +46,30 @@
 time.sleep(t)
 raise Exception('this is an exception')
 
+def sleep_and_print(t, msg):
+ time.sleep(t)
+ print(msg)
+ sys.stdout.flush()
+
 
 class ExecutorMixin:
 worker_count = 5
+
+ def setUp(self):
+ self.t1 = time.time()
+ try:
+ self.executor = self.executor_type(max_workers=self.worker_count)
+ except NotImplementedError as e:
+ self.skipTest(str(e))
+ self._prime_executor()
+
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+ dt = time.time() - self.t1
+ if test.support.verbose:
+ print("%.2fs" % dt, end=' ')
+ self.assertLess(dt, 60, "synchronization issue: test lasted too long")
+
 def _prime_executor(self):
 # Make sure that the executor is ready to do work before running the
 # tests. This should reduce the probability of timeouts in the tests.
@@ -57,24 +81,11 @@
 
 
 class ThreadPoolMixin(ExecutorMixin):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=5)
- self._prime_executor()
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+ executor_type = futures.ThreadPoolExecutor
 
 
 class ProcessPoolMixin(ExecutorMixin):
- def setUp(self):
- try:
- self.executor = futures.ProcessPoolExecutor(max_workers=5)
- except NotImplementedError as e:
- self.skipTest(str(e))
- self._prime_executor()
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+ executor_type = futures.ProcessPoolExecutor
 
 
 class ExecutorShutdownTest(unittest.TestCase):
@@ -84,6 +95,20 @@
 self.executor.submit,
 pow, 2, 5)
 
+ def test_interpreter_shutdown(self):
+ # Test the atexit hook for shutdown of worker threads and processes
+ rc, out, err = assert_python_ok('-c', """if 1:
+ from concurrent.futures import {executor_type}
+ from time import sleep
+ from test.test_concurrent_futures import sleep_and_print
+ t = {executor_type}(5)
+ t.submit(sleep_and_print, 1.0, "apple")
+ """.format(executor_type=self.executor_type.__name__))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), b"apple")
+
 
 class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
 def _prime_executor(self):
-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /