[Python-checkins] cpython (3.4): Fix StreamReader.__repr__

andrew.svetlov python-checkins at python.org
Tue Sep 29 18:24:45 CEST 2015


https://hg.python.org/cpython/rev/64905df6d6b6
changeset: 98396:64905df6d6b6
branch: 3.4
parent: 98393:9eae18e8af66
user: Andrew Svetlov <andrew.svetlov at gmail.com>
date: Tue Sep 29 18:36:00 2015 +0300
summary:
 Fix StreamReader.__repr__
files:
 Lib/asyncio/streams.py | 2 +-
 Lib/test/test_asyncio/test_streams.py | 42 +++++++++++++++
 2 files changed, 43 insertions(+), 1 deletions(-)
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -324,7 +324,7 @@
 def __repr__(self):
 info = ['StreamReader']
 if self._buffer:
- info.append('%d bytes' % len(info))
+ info.append('%d bytes' % len(self._buffer))
 if self._eof:
 info.append('eof')
 if self._limit != _DEFAULT_LIMIT:
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -632,6 +632,48 @@
 protocol = asyncio.StreamReaderProtocol(reader)
 self.assertIs(protocol._loop, self.loop)
 
+ def test___repr__(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ self.assertEqual("<StreamReader>", repr(stream))
+
+ def test___repr__nondefault_limit(self):
+ stream = asyncio.StreamReader(loop=self.loop, limit=123)
+ self.assertEqual("<StreamReader l=123>", repr(stream))
+
+ def test___repr__eof(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_eof()
+ self.assertEqual("<StreamReader eof>", repr(stream))
+
+ def test___repr__data(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'data')
+ self.assertEqual("<StreamReader 4 bytes>", repr(stream))
+
+ def test___repr__exception(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ exc = RuntimeError()
+ stream.set_exception(exc)
+ self.assertEqual("<StreamReader e=RuntimeError()>", repr(stream))
+
+ def test___repr__waiter(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream._waiter = asyncio.Future(loop=self.loop)
+ self.assertRegex(
+ repr(stream),
+ "<StreamReader w=<Future pending[\S ]*>>")
+ stream._waiter.set_result(None)
+ self.loop.run_until_complete(stream._waiter)
+ stream._waiter = None
+ self.assertEqual("<StreamReader>", repr(stream))
+
+ def test___repr__transport(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream._transport = mock.Mock()
+ stream._transport.__repr__ = mock.Mock()
+ stream._transport.__repr__.return_value = "<Transport>"
+ self.assertEqual("<StreamReader t=<Transport>>", repr(stream))
+
 
 if __name__ == '__main__':
 unittest.main()
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /