[Python-checkins] cpython (3.4): Issue #23198: Reactor asyncio.StreamReader

victor.stinner python-checkins at python.org
Wed Jan 14 00:54:45 CET 2015


https://hg.python.org/cpython/rev/94a6f9a3580e
changeset: 94131:94a6f9a3580e
branch: 3.4
parent: 94128:432b817611f2
user: Victor Stinner <victor.stinner at gmail.com>
date: Wed Jan 14 00:53:37 2015 +0100
summary:
 Issue #23198: Reactor asyncio.StreamReader
- Add a new _wakeup_waiter() method
- Replace _create_waiter() method with a _wait_for_data() coroutine function
- Use the value None instead of True or False to wake up the waiter
files:
 Lib/asyncio/streams.py | 47 ++++++++++++++---------------
 1 files changed, 22 insertions(+), 25 deletions(-)
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -313,8 +313,8 @@
 else:
 self._loop = loop
 self._buffer = bytearray()
- self._eof = False # Whether we're done.
- self._waiter = None # A future.
+ self._eof = False # Whether we're done.
+ self._waiter = None # A future used by _wait_for_data()
 self._exception = None
 self._transport = None
 self._paused = False
@@ -331,6 +331,14 @@
 if not waiter.cancelled():
 waiter.set_exception(exc)
 
+ def _wakeup_waiter(self):
+ """Wakeup read() or readline() function waiting for data or EOF."""
+ waiter = self._waiter
+ if waiter is not None:
+ self._waiter = None
+ if not waiter.cancelled():
+ waiter.set_result(None)
+
 def set_transport(self, transport):
 assert self._transport is None, 'Transport already set'
 self._transport = transport
@@ -342,11 +350,7 @@
 
 def feed_eof(self):
 self._eof = True
- waiter = self._waiter
- if waiter is not None:
- self._waiter = None
- if not waiter.cancelled():
- waiter.set_result(True)
+ self._wakeup_waiter()
 
 def at_eof(self):
 """Return True if the buffer is empty and 'feed_eof' was called."""
@@ -359,12 +363,7 @@
 return
 
 self._buffer.extend(data)
-
- waiter = self._waiter
- if waiter is not None:
- self._waiter = None
- if not waiter.cancelled():
- waiter.set_result(False)
+ self._wakeup_waiter()
 
 if (self._transport is not None and
 not self._paused and
@@ -379,7 +378,8 @@
 else:
 self._paused = True
 
- def _create_waiter(self, func_name):
+ def _wait_for_data(self, func_name):
+ """Wait until feed_data() or feed_eof() is called."""
 # StreamReader uses a future to link the protocol feed_data() method
 # to a read coroutine. Running two read coroutines at the same time
 # would have an unexpected behaviour. It would not possible to know
@@ -387,7 +387,12 @@
 if self._waiter is not None:
 raise RuntimeError('%s() called while another coroutine is '
 'already waiting for incoming data' % func_name)
- return futures.Future(loop=self._loop)
+
+ self._waiter = futures.Future(loop=self._loop)
+ try:
+ yield from self._waiter
+ finally:
+ self._waiter = None
 
 @coroutine
 def readline(self):
@@ -417,11 +422,7 @@
 break
 
 if not_enough:
- self._waiter = self._create_waiter('readline')
- try:
- yield from self._waiter
- finally:
- self._waiter = None
+ yield from self._wait_for_data('readline')
 
 self._maybe_resume_transport()
 return bytes(line)
@@ -448,11 +449,7 @@
 return b''.join(blocks)
 else:
 if not self._buffer and not self._eof:
- self._waiter = self._create_waiter('read')
- try:
- yield from self._waiter
- finally:
- self._waiter = None
+ yield from self._wait_for_data('read')
 
 if n < 0 or len(self._buffer) <= n:
 data = bytes(self._buffer)
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /