[Python-checkins] cpython (merge 3.4 -> default): Merge 3.4 (asyncio)

victor.stinner python-checkins at python.org
Fri Jan 30 00:09:05 CET 2015


https://hg.python.org/cpython/rev/0701ffaa89f9
changeset: 94380:0701ffaa89f9
parent: 94378:7c9a42cbfff0
parent: 94379:a5efd5021ca1
user: Victor Stinner <victor.stinner at gmail.com>
date: Fri Jan 30 00:05:36 2015 +0100
summary:
 Merge 3.4 (asyncio)
files:
 Lib/asyncio/base_subprocess.py | 108 ++++++----
 Lib/asyncio/subprocess.py | 40 +---
 Lib/asyncio/unix_events.py | 15 +-
 Lib/asyncio/windows_events.py | 7 +-
 Lib/test/test_asyncio/test_events.py | 35 +-
 Lib/test/test_asyncio/test_subprocess.py | 65 ++++++
 6 files changed, 166 insertions(+), 104 deletions(-)
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -3,6 +3,7 @@
 import sys
 import warnings
 
+from . import futures
 from . import protocols
 from . import transports
 from .coroutines import coroutine
@@ -13,27 +14,32 @@
 
 def __init__(self, loop, protocol, args, shell,
 stdin, stdout, stderr, bufsize,
- extra=None, **kwargs):
+ waiter=None, extra=None, **kwargs):
 super().__init__(extra)
 self._closed = False
 self._protocol = protocol
 self._loop = loop
+ self._proc = None
 self._pid = None
+ self._returncode = None
+ self._exit_waiters = []
+ self._pending_calls = collections.deque()
+ self._pipes = {}
+ self._finished = False
 
- self._pipes = {}
 if stdin == subprocess.PIPE:
 self._pipes[0] = None
 if stdout == subprocess.PIPE:
 self._pipes[1] = None
 if stderr == subprocess.PIPE:
 self._pipes[2] = None
- self._pending_calls = collections.deque()
- self._finished = False
- self._returncode = None
+
+ # Create the child process: set the _proc attribute
 self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
 stderr=stderr, bufsize=bufsize, **kwargs)
 self._pid = self._proc.pid
 self._extra['subprocess'] = self._proc
+
 if self._loop.get_debug():
 if isinstance(args, (bytes, str)):
 program = args
@@ -42,6 +48,8 @@
 logger.debug('process %r created: pid %s',
 program, self._pid)
 
+ self._loop.create_task(self._connect_pipes(waiter))
+
 def __repr__(self):
 info = [self.__class__.__name__]
 if self._closed:
@@ -77,12 +85,23 @@
 
 def close(self):
 self._closed = True
+
 for proto in self._pipes.values():
 if proto is None:
 continue
 proto.pipe.close()
- if self._returncode is None:
- self.terminate()
+
+ if self._proc is not None and self._returncode is None:
+ if self._loop.get_debug():
+ logger.warning('Close running child process: kill %r', self)
+
+ try:
+ self._proc.kill()
+ except ProcessLookupError:
+ pass
+
+ # Don't clear the _proc reference yet because _post_init() may
+ # still run
 
 # On Python 3.3 and older, objects with a destructor part of a reference
 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
@@ -105,59 +124,42 @@
 else:
 return None
 
+ def _check_proc(self):
+ if self._closed:
+ raise ValueError("operation on closed transport")
+ if self._proc is None:
+ raise ProcessLookupError()
+
 def send_signal(self, signal):
+ self._check_proc()
 self._proc.send_signal(signal)
 
 def terminate(self):
+ self._check_proc()
 self._proc.terminate()
 
 def kill(self):
+ self._check_proc()
 self._proc.kill()
 
- def _kill_wait(self):
- """Close pipes, kill the subprocess and read its return status.
-
- Function called when an exception is raised during the creation
- of a subprocess.
- """
- self._closed = True
- if self._loop.get_debug():
- logger.warning('Exception during subprocess creation, '
- 'kill the subprocess %r',
- self,
- exc_info=True)
-
- proc = self._proc
- if proc.stdout:
- proc.stdout.close()
- if proc.stderr:
- proc.stderr.close()
- if proc.stdin:
- proc.stdin.close()
-
- try:
- proc.kill()
- except ProcessLookupError:
- pass
- self._returncode = proc.wait()
-
- self.close()
-
 @coroutine
- def _post_init(self):
+ def _connect_pipes(self, waiter):
 try:
 proc = self._proc
 loop = self._loop
+
 if proc.stdin is not None:
 _, pipe = yield from loop.connect_write_pipe(
 lambda: WriteSubprocessPipeProto(self, 0),
 proc.stdin)
 self._pipes[0] = pipe
+
 if proc.stdout is not None:
 _, pipe = yield from loop.connect_read_pipe(
 lambda: ReadSubprocessPipeProto(self, 1),
 proc.stdout)
 self._pipes[1] = pipe
+
 if proc.stderr is not None:
 _, pipe = yield from loop.connect_read_pipe(
 lambda: ReadSubprocessPipeProto(self, 2),
@@ -166,13 +168,16 @@
 
 assert self._pending_calls is not None
 
- self._loop.call_soon(self._protocol.connection_made, self)
+ loop.call_soon(self._protocol.connection_made, self)
 for callback, data in self._pending_calls:
- self._loop.call_soon(callback, *data)
+ loop.call_soon(callback, *data)
 self._pending_calls = None
- except:
- self._kill_wait()
- raise
+ except Exception as exc:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_exception(exc)
+ else:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_result(None)
 
 def _call(self, cb, *data):
 if self._pending_calls is not None:
@@ -197,6 +202,23 @@
 self._call(self._protocol.process_exited)
 self._try_finish()
 
+ # wake up futures waiting for wait()
+ for waiter in self._exit_waiters:
+ if not waiter.cancelled():
+ waiter.set_result(returncode)
+ self._exit_waiters = None
+
+ def wait(self):
+ """Wait until the process exit and return the process return code.
+
+ This method is a coroutine."""
+ if self._returncode is not None:
+ return self._returncode
+
+ waiter = futures.Future(loop=self._loop)
+ self._exit_waiters.append(waiter)
+ return (yield from waiter)
+
 def _try_finish(self):
 assert not self._finished
 if self._returncode is None:
@@ -210,9 +232,9 @@
 try:
 self._protocol.connection_lost(exc)
 finally:
+ self._loop = None
 self._proc = None
 self._protocol = None
- self._loop = None
 
 
 class WriteSubprocessPipeProto(protocols.BaseProtocol):
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -25,8 +25,6 @@
 super().__init__(loop=loop)
 self._limit = limit
 self.stdin = self.stdout = self.stderr = None
- self.waiter = futures.Future(loop=loop)
- self._waiters = collections.deque()
 self._transport = None
 
 def __repr__(self):
@@ -61,9 +59,6 @@
 reader=None,
 loop=self._loop)
 
- if not self.waiter.cancelled():
- self.waiter.set_result(None)
-
 def pipe_data_received(self, fd, data):
 if fd == 1:
 reader = self.stdout
@@ -94,16 +89,9 @@
 reader.set_exception(exc)
 
 def process_exited(self):
- returncode = self._transport.get_returncode()
 self._transport.close()
 self._transport = None
 
- # wake up futures waiting for wait()
- while self._waiters:
- waiter = self._waiters.popleft()
- if not waiter.cancelled():
- waiter.set_result(returncode)
-
 
 class Process:
 def __init__(self, transport, protocol, loop):
@@ -124,30 +112,18 @@
 
 @coroutine
 def wait(self):
- """Wait until the process exit and return the process return code."""
- returncode = self._transport.get_returncode()
- if returncode is not None:
- return returncode
+ """Wait until the process exit and return the process return code.
 
- waiter = futures.Future(loop=self._loop)
- self._protocol._waiters.append(waiter)
- yield from waiter
- return waiter.result()
-
- def _check_alive(self):
- if self._transport.get_returncode() is not None:
- raise ProcessLookupError()
+ This method is a coroutine."""
+ return (yield from self._transport.wait())
 
 def send_signal(self, signal):
- self._check_alive()
 self._transport.send_signal(signal)
 
 def terminate(self):
- self._check_alive()
 self._transport.terminate()
 
 def kill(self):
- self._check_alive()
 self._transport.kill()
 
 @coroutine
@@ -221,11 +197,6 @@
 protocol_factory,
 cmd, stdin=stdin, stdout=stdout,
 stderr=stderr, **kwds)
- try:
- yield from protocol.waiter
- except:
- transport._kill_wait()
- raise
 return Process(transport, protocol, loop)
 
 @coroutine
@@ -241,9 +212,4 @@
 program, *args,
 stdin=stdin, stdout=stdout,
 stderr=stderr, **kwds)
- try:
- yield from protocol.waiter
- except:
- transport._kill_wait()
- raise
 return Process(transport, protocol, loop)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -16,6 +16,7 @@
 from . import constants
 from . import coroutines
 from . import events
+from . import futures
 from . import selector_events
 from . import selectors
 from . import transports
@@ -175,16 +176,20 @@
 stdin, stdout, stderr, bufsize,
 extra=None, **kwargs):
 with events.get_child_watcher() as watcher:
+ waiter = futures.Future(loop=self)
 transp = _UnixSubprocessTransport(self, protocol, args, shell,
 stdin, stdout, stderr, bufsize,
- extra=extra, **kwargs)
+ waiter=waiter, extra=extra,
+ **kwargs)
+
+ watcher.add_child_handler(transp.get_pid(),
+ self._child_watcher_callback, transp)
 try:
- yield from transp._post_init()
+ yield from waiter
 except:
 transp.close()
+ yield from transp.wait()
 raise
- watcher.add_child_handler(transp.get_pid(),
- self._child_watcher_callback, transp)
 
 return transp
 
@@ -774,7 +779,7 @@
 pass
 
 def add_child_handler(self, pid, callback, *args):
- self._callbacks[pid] = callback, args
+ self._callbacks[pid] = (callback, args)
 
 # Prevent a race condition in case the child is already terminated.
 self._do_waitpid(pid)
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -366,13 +366,16 @@
 def _make_subprocess_transport(self, protocol, args, shell,
 stdin, stdout, stderr, bufsize,
 extra=None, **kwargs):
+ waiter = futures.Future(loop=self)
 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
 stdin, stdout, stderr, bufsize,
- extra=extra, **kwargs)
+ waiter=waiter, extra=extra,
+ **kwargs)
 try:
- yield from transp._post_init()
+ yield from waiter
 except:
 transp.close()
+ yield from transp.wait()
 raise
 
 return transp
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1551,9 +1551,10 @@
 stdin = transp.get_pipe_transport(0)
 stdin.write(b'Python The Winner')
 self.loop.run_until_complete(proto.got_data[1].wait())
- transp.close()
+ with test_utils.disable_logger():
+ transp.close()
 self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
 self.assertEqual(b'Python The Winner', proto.data[1])
 
 def test_subprocess_interactive(self):
@@ -1567,21 +1568,20 @@
 self.loop.run_until_complete(proto.connected)
 self.assertEqual('CONNECTED', proto.state)
 
- try:
- stdin = transp.get_pipe_transport(0)
- stdin.write(b'Python ')
- self.loop.run_until_complete(proto.got_data[1].wait())
- proto.got_data[1].clear()
- self.assertEqual(b'Python ', proto.data[1])
-
- stdin.write(b'The Winner')
- self.loop.run_until_complete(proto.got_data[1].wait())
- self.assertEqual(b'Python The Winner', proto.data[1])
- finally:
+ stdin = transp.get_pipe_transport(0)
+ stdin.write(b'Python ')
+ self.loop.run_until_complete(proto.got_data[1].wait())
+ proto.got_data[1].clear()
+ self.assertEqual(b'Python ', proto.data[1])
+
+ stdin.write(b'The Winner')
+ self.loop.run_until_complete(proto.got_data[1].wait())
+ self.assertEqual(b'Python The Winner', proto.data[1])
+
+ with test_utils.disable_logger():
 transp.close()
-
 self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
 
 def test_subprocess_shell(self):
 connect = self.loop.subprocess_shell(
@@ -1739,9 +1739,10 @@
 # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
 # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
 self.assertEqual(b'ERR:OSError', proto.data[2])
- transp.close()
+ with test_utils.disable_logger():
+ transp.close()
 self.loop.run_until_complete(proto.completed)
- self.check_terminated(proto.returncode)
+ self.check_killed(proto.returncode)
 
 def test_subprocess_wait_no_same_group(self):
 # start the new process in a new session
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -4,6 +4,7 @@
 from unittest import mock
 
 import asyncio
+from asyncio import base_subprocess
 from asyncio import subprocess
 from asyncio import test_utils
 try:
@@ -23,6 +24,70 @@
 'data = sys.stdin.buffer.read()',
 'sys.stdout.buffer.write(data)'))]
 
+class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+ def _start(self, *args, **kwargs):
+ self._proc = mock.Mock()
+ self._proc.stdin = None
+ self._proc.stdout = None
+ self._proc.stderr = None
+
+
+class SubprocessTransportTests(test_utils.TestCase):
+ def setUp(self):
+ self.loop = self.new_test_loop()
+ self.set_event_loop(self.loop)
+
+
+ def create_transport(self, waiter=None):
+ protocol = mock.Mock()
+ protocol.connection_made._is_coroutine = False
+ protocol.process_exited._is_coroutine = False
+ transport = TestSubprocessTransport(
+ self.loop, protocol, ['test'], False,
+ None, None, None, 0, waiter=waiter)
+ return (transport, protocol)
+
+ def test_close(self):
+ waiter = asyncio.Future(loop=self.loop)
+ transport, protocol = self.create_transport(waiter)
+ transport._process_exited(0)
+ transport.close()
+
+ # The loop didn't run yet
+ self.assertFalse(protocol.connection_made.called)
+
+ # methods must raise ProcessLookupError if the transport was closed
+ self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM)
+ self.assertRaises(ValueError, transport.terminate)
+ self.assertRaises(ValueError, transport.kill)
+
+ self.loop.run_until_complete(waiter)
+
+ def test_proc_exited(self):
+ waiter = asyncio.Future(loop=self.loop)
+ transport, protocol = self.create_transport(waiter)
+ transport._process_exited(6)
+ self.loop.run_until_complete(waiter)
+
+ self.assertEqual(transport.get_returncode(), 6)
+
+ self.assertTrue(protocol.connection_made.called)
+ self.assertTrue(protocol.process_exited.called)
+ self.assertTrue(protocol.connection_lost.called)
+ self.assertEqual(protocol.connection_lost.call_args[0], (None,))
+
+ self.assertFalse(transport._closed)
+ self.assertIsNone(transport._loop)
+ self.assertIsNone(transport._proc)
+ self.assertIsNone(transport._protocol)
+
+ # methods must raise ProcessLookupError if the process exited
+ self.assertRaises(ProcessLookupError,
+ transport.send_signal, signal.SIGTERM)
+ self.assertRaises(ProcessLookupError, transport.terminate)
+ self.assertRaises(ProcessLookupError, transport.kill)
+
+
 class SubprocessMixin:
 
 def test_stdin_stdout(self):
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /