[Python-checkins] cpython: Issue #12040: Expose a new attribute `sentinel` on instances of

antoine.pitrou python-checkins at python.org
Mon Jun 6 19:36:06 CEST 2011


http://hg.python.org/cpython/rev/568a3ba088e4
changeset: 70690:568a3ba088e4
user: Antoine Pitrou <solipsis at pitrou.net>
date: Mon Jun 06 19:35:31 2011 +0200
summary:
 Issue #12040: Expose a new attribute `sentinel` on instances of
:class:`multiprocessing.Process`. Also, fix Process.join() to not use
polling anymore, when given a timeout.
files:
 Doc/library/multiprocessing.rst | 14 +++++++
 Lib/multiprocessing/forking.py | 34 ++++++++++-------
 Lib/multiprocessing/process.py | 12 ++++++
 Lib/multiprocessing/util.py | 21 +++++++++++
 Lib/test/test_multiprocessing.py | 37 ++++++++++++++++++++
 Misc/NEWS | 4 ++
 6 files changed, 108 insertions(+), 14 deletions(-)
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -411,6 +411,20 @@
 
 See :ref:`multiprocessing-auth-keys`.
 
+ .. attribute:: sentinel
+
+ A numeric handle of a system object which will become "ready" when
+ the process ends.
+
+ On Windows, this is an OS handle usable with the ``WaitForSingleObject``
+ and ``WaitForMultipleObjects`` family of API calls. On Unix, this is
+ a file descriptor usable with primitives from the :mod:`select` module.
+
+ You can use this value if you want to wait on several events at once.
+ Otherwise calling :meth:`join()` is simpler.
+
+ .. versionadded:: 3.3
+
 .. method:: terminate()
 
 Terminate the process. On Unix this is done using the ``SIGTERM`` signal;
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -101,10 +101,12 @@
 
 if sys.platform != 'win32':
 import time
+ import select
 
 exit = os._exit
 duplicate = os.dup
 close = os.close
+ _select = util._eintr_retry(select.select)
 
 #
 # We define a Popen class similar to the one from subprocess, but
@@ -118,8 +120,12 @@
 sys.stderr.flush()
 self.returncode = None
 
+ r, w = os.pipe()
+ self.sentinel = r
+
 self.pid = os.fork()
 if self.pid == 0:
+ os.close(r)
 if 'random' in sys.modules:
 import random
 random.seed()
@@ -128,6 +134,11 @@
 sys.stderr.flush()
 os._exit(code)
 
+ # `w` will be closed when the child exits, at which point `r`
+ # will become ready for reading (using e.g. select()).
+ os.close(w)
+ util.Finalize(self, os.close, (r,))
+
 def poll(self, flag=os.WNOHANG):
 if self.returncode is None:
 try:
@@ -145,20 +156,14 @@
 return self.returncode
 
 def wait(self, timeout=None):
- if timeout is None:
- return self.poll(0)
- deadline = time.time() + timeout
- delay = 0.0005
- while 1:
- res = self.poll()
- if res is not None:
- break
- remaining = deadline - time.time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, 0.05)
- time.sleep(delay)
- return res
+ if self.returncode is None:
+ if timeout is not None:
+ r = _select([self.sentinel], [], [], timeout)[0]
+ if not r:
+ return None
+ # This shouldn't block if select() returned successfully.
+ return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+ return self.returncode
 
 def terminate(self):
 if self.returncode is None:
@@ -258,6 +263,7 @@
 self.pid = pid
 self.returncode = None
 self._handle = hp
+ self.sentinel = int(hp)
 
 # send information to child
 prep_data = get_preparation_data(process_obj._name)
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -132,6 +132,7 @@
 else:
 from .forking import Popen
 self._popen = Popen(self)
+ self._sentinel = self._popen.sentinel
 _current_process._children.add(self)
 
 def terminate(self):
@@ -218,6 +219,17 @@
 
 pid = ident
 
+ @property
+ def sentinel(self):
+ '''
+ Return a file descriptor (Unix) or handle (Windows) suitable for
+ waiting for process termination.
+ '''
+ try:
+ return self._sentinel
+ except AttributeError:
+ raise ValueError("process not started")
+
 def __repr__(self):
 if self is _current_process:
 status = 'started'
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -32,9 +32,11 @@
 # SUCH DAMAGE.
 #
 
+import functools
 import itertools
 import weakref
 import atexit
+import select
 import threading # we want threading to install it's
 # cleanup function before multiprocessing does
 
@@ -315,3 +317,22 @@
 register_after_fork(self, lambda obj : obj.__dict__.clear())
 def __reduce__(self):
 return type(self), ()
+
+
+#
+# Automatic retry after EINTR
+#
+
+def _eintr_retry(func, _errors=(EnvironmentError, select.error)):
+ @functools.wraps(func)
+ def wrapped(*args, **kwargs):
+ while True:
+ try:
+ return func(*args, **kwargs)
+ except _errors as e:
+ # select.error has no `errno` attribute
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+ return wrapped
+
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -71,6 +71,23 @@
 'HAVE_BROKEN_SEM_GETVALUE', False)
 
 WIN32 = (sys.platform == "win32")
+if WIN32:
+ from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
+
+ def wait_for_handle(handle, timeout):
+ if timeout is None or timeout < 0.0:
+ timeout = INFINITE
+ else:
+ timeout = int(1000 * timeout)
+ return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
+else:
+ from select import select
+ _select = util._eintr_retry(select)
+
+ def wait_for_handle(handle, timeout):
+ if timeout is not None and timeout < 0.0:
+ timeout = None
+ return handle in _select([handle], [], [], timeout)[0]
 
 #
 # Some tests require ctypes
@@ -307,6 +324,26 @@
 ]
 self.assertEqual(result, expected)
 
+ @classmethod
+ def _test_sentinel(cls, event):
+ event.wait(10.0)
+
+ def test_sentinel(self):
+ if self.TYPE == "threads":
+ return
+ event = self.Event()
+ p = self.Process(target=self._test_sentinel, args=(event,))
+ with self.assertRaises(ValueError):
+ p.sentinel
+ p.start()
+ self.addCleanup(p.join)
+ sentinel = p.sentinel
+ self.assertIsInstance(sentinel, int)
+ self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
+ event.set()
+ p.join()
+ self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
+
 #
 #
 #
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -187,6 +187,10 @@
 Library
 -------
 
+- Issue #12040: Expose a new attribute ``sentinel`` on instances of
+ :class:`multiprocessing.Process`. Also, fix Process.join() to not use
+ polling anymore, when given a timeout.
+
 - Issue #11893: Remove obsolete internal wrapper class ``SSLFakeFile`` in the
 smtplib module. Patch by Catalin Iacob.
 
-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /