[Python-checkins] r87665 - in python/branches/py3k: Lib/concurrent/futures/process.py Lib/test/test_concurrent_futures.py Misc/NEWS

martin.v.loewis python-checkins at python.org
Mon Jan 3 01:07:01 CET 2011


Author: martin.v.loewis
Date: Mon Jan 3 01:07:01 2011
New Revision: 87665
Log:
Issue #10798: Reject supporting concurrent.futures if the system has
too few POSIX semaphores.
Modified:
 python/branches/py3k/Lib/concurrent/futures/process.py
 python/branches/py3k/Lib/test/test_concurrent_futures.py
 python/branches/py3k/Misc/NEWS
Modified: python/branches/py3k/Lib/concurrent/futures/process.py
==============================================================================
--- python/branches/py3k/Lib/concurrent/futures/process.py	(original)
+++ python/branches/py3k/Lib/concurrent/futures/process.py	Mon Jan 3 01:07:01 2011
@@ -244,6 +244,31 @@
 else:
 work_item.future.set_result(result_item.result)
 
+_system_limits_checked = False
+_system_limited = None
+def _check_system_limits():
+ global _system_limits_checked, _system_limited
+ if _system_limits_checked:
+ if _system_limited:
+ raise NotImplementedError(_system_limited)
+ _system_limits_checked = True
+ try:
+ import os
+ nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
+ except (AttributeError, ValueError):
+ # sysconf not available or setting not available
+ return
+ if nsems_max == -1:
+ # indetermine limit, assume that limit is determined
+ # by available memory only
+ return
+ if nsems_max >= 256:
+ # minimum number of semaphores available
+ # according to POSIX
+ return
+ _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
+ raise NotImplementedError(_system_limited)
+
 class ProcessPoolExecutor(_base.Executor):
 def __init__(self, max_workers=None):
 """Initializes a new ProcessPoolExecutor instance.
@@ -253,6 +278,7 @@
 execute the given calls. If None or not given then as many
 worker processes will be created as the machine has processors.
 """
+ _check_system_limits()
 _remove_dead_thread_references()
 
 if max_workers is None:
Modified: python/branches/py3k/Lib/test/test_concurrent_futures.py
==============================================================================
--- python/branches/py3k/Lib/test/test_concurrent_futures.py	(original)
+++ python/branches/py3k/Lib/test/test_concurrent_futures.py	Mon Jan 3 01:07:01 2011
@@ -69,7 +69,7 @@
 assert handle is not None
 return handle
 else:
- event = multiprocessing.Event()
+ event = self.Event[0]()
 self.CALL_LOCKS[id(event)] = event
 return id(event)
 
@@ -99,7 +99,8 @@
 else:
 self.CALL_LOCKS[handle].set()
 
- def __init__(self, manual_finish=False, result=42):
+ def __init__(self, Event, manual_finish=False, result=42):
+ self.Event = Event
 self._called_event = self._create_event()
 self._can_finish = self._create_event()
 
@@ -138,8 +139,8 @@
 raise ZeroDivisionError()
 
 class MapCall(Call):
- def __init__(self, result=42):
- super().__init__(manual_finish=True, result=result)
+ def __init__(self, Event, result=42):
+ super().__init__(Event, manual_finish=True, result=result)
 
 def __call__(self, manual_finish):
 if manual_finish:
@@ -155,9 +156,9 @@
 
 
 def _start_some_futures(self):
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
+ call3 = Call(self.Event, manual_finish=True)
 
 try:
 self.executor.submit(call1)
@@ -176,13 +177,28 @@
 call2.close()
 call3.close()
 
-class ThreadPoolShutdownTest(ExecutorShutdownTest):
+class ThreadPoolMixin:
+ # wrap in tuple to prevent creation of instance methods
+ Event = (threading.Event,)
 def setUp(self):
 self.executor = futures.ThreadPoolExecutor(max_workers=5)
 
 def tearDown(self):
 self.executor.shutdown(wait=True)
 
+class ProcessPoolMixin:
+ # wrap in tuple to prevent creation of instance methods
+ Event = (multiprocessing.Event,)
+ def setUp(self):
+ try:
+ self.executor = futures.ProcessPoolExecutor(max_workers=5)
+ except NotImplementedError as e:
+ self.skipTest(str(e))
+
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
 def test_threads_terminate(self):
 self._start_some_futures()
 self.assertEqual(len(self.executor._threads), 3)
@@ -208,13 +224,7 @@
 for t in threads:
 t.join()
 
-class ProcessPoolShutdownTest(ExecutorShutdownTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=5)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
 def test_processes_terminate(self):
 self._start_some_futures()
 self.assertEqual(len(self.executor._processes), 5)
@@ -251,8 +261,8 @@
 pass
 call1.set_can()
 
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 future2 = self.executor.submit(call2)
@@ -270,7 +280,7 @@
 call2.close()
 
 def test_first_completed_one_already_completed(self):
- call1 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 
@@ -290,9 +300,9 @@
 call1.set_can()
 call2.set_can()
 
- call1 = Call(manual_finish=True)
- call2 = ExceptionCall(manual_finish=True)
- call3 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = ExceptionCall(self.Event, manual_finish=True)
+ call3 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 future2 = self.executor.submit(call2)
@@ -317,8 +327,8 @@
 pass
 call1.set_can()
 
- call1 = ExceptionCall(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = ExceptionCall(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 future2 = self.executor.submit(call2)
@@ -343,7 +353,7 @@
 call2.close()
 
 def test_first_exception_one_already_failed(self):
- call1 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 
@@ -363,8 +373,8 @@
 call1.set_can()
 call2.set_can()
 
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 future2 = self.executor.submit(call2)
@@ -397,10 +407,10 @@
 'this test assumes that future4 will be cancelled before it is '
 'queued to run - which might not be the case if '
 'ProcessPoolExecutor is too aggresive in scheduling futures')
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
- call3 = Call(manual_finish=True)
- call4 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
+ call3 = Call(self.Event, manual_finish=True)
+ call4 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 future2 = self.executor.submit(call2)
@@ -432,8 +442,8 @@
 pass
 call1.set_can()
 
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 future2 = self.executor.submit(call2)
@@ -460,19 +470,11 @@
 call2.close()
 
 
-class ThreadPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+ pass
 
- def tearDown(self):
- self.executor.shutdown(wait=True)
-
-class ProcessPoolWaitTests(WaitTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+ pass
 
 class AsCompletedTests(unittest.TestCase):
 # TODO(brian at sweetapp.com): Should have a test with a non-zero timeout.
@@ -483,8 +485,8 @@
 call1.set_can()
 call2.set_can()
 
- call1 = Call(manual_finish=True)
- call2 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
+ call2 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 future2 = self.executor.submit(call2)
@@ -507,7 +509,7 @@
 call2.close()
 
 def test_zero_timeout(self):
- call1 = Call(manual_finish=True)
+ call1 = Call(self.Event, manual_finish=True)
 try:
 future1 = self.executor.submit(call1)
 completed_futures = set()
@@ -529,19 +531,11 @@
 finally:
 call1.close()
 
-class ThreadPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+ pass
 
-class ProcessPoolAsCompletedTests(AsCompletedTests):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+ pass
 
 class ExecutorTest(unittest.TestCase):
 # Executor.shutdown() and context manager usage is tested by
@@ -567,7 +561,7 @@
 
 def test_map_timeout(self):
 results = []
- timeout_call = MapCall()
+ timeout_call = MapCall(self.Event)
 try:
 try:
 for i in self.executor.map(timeout_call,
@@ -583,19 +577,11 @@
 
 self.assertEqual([42, 42], results)
 
-class ThreadPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ThreadPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
+ pass
 
-class ProcessPoolExecutorTest(ExecutorTest):
- def setUp(self):
- self.executor = futures.ProcessPoolExecutor(max_workers=1)
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
+ pass
 
 class FutureTests(unittest.TestCase):
 def test_done_callback_with_result(self):
Modified: python/branches/py3k/Misc/NEWS
==============================================================================
--- python/branches/py3k/Misc/NEWS	(original)
+++ python/branches/py3k/Misc/NEWS	Mon Jan 3 01:07:01 2011
@@ -20,6 +20,9 @@
 Library
 -------
 
+- Issue #10798: Reject supporting concurrent.futures if the system has too
+ few POSIX semaphores.
+
 - Issue #10807: Remove base64, bz2, hex, quopri, rot13, uu and zlib codecs from
 the codec aliases. They are still accessible via codecs.lookup().
 


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /