[Python-checkins] cpython (3.5): Issue #26909: Fix slow pipes IO in asyncio.

yury.selivanov python-checkins at python.org
Thu Sep 15 16:53:00 EDT 2016


https://hg.python.org/cpython/rev/e6adc2448c3d
changeset: 103839:e6adc2448c3d
branch: 3.5
parent: 103836:4ab64ea31d75
user: Yury Selivanov <yury at magic.io>
date: Thu Sep 15 16:51:48 2016 -0400
summary:
 Issue #26909: Fix slow pipes IO in asyncio.
Patch by INADA Naoki.
files:
 Lib/asyncio/unix_events.py | 27 ++---
 Lib/test/test_asyncio/test_unix_events.py | 49 ++++------
 Misc/NEWS | 3 +
 3 files changed, 37 insertions(+), 42 deletions(-)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -434,7 +434,7 @@
 self._pipe = pipe
 self._fileno = pipe.fileno()
 self._protocol = protocol
- self._buffer = []
+ self._buffer = bytearray()
 self._conn_lost = 0
 self._closing = False # Set when close() or write_eof() called.
 
@@ -450,7 +450,6 @@
 "pipes, sockets and character devices")
 
 _set_nonblocking(self._fileno)
-
 self._loop.call_soon(self._protocol.connection_made, self)
 
 # On AIX, the reader trick (to be notified when the read end of the
@@ -492,7 +491,7 @@
 return '<%s>' % ' '.join(info)
 
 def get_write_buffer_size(self):
- return sum(len(data) for data in self._buffer)
+ return len(self._buffer)
 
 def _read_ready(self):
 # Pipe was closed by peer.
@@ -530,39 +529,37 @@
 if n == len(data):
 return
 elif n > 0:
- data = data[n:]
+ data = memoryview(data)[n:]
 self._loop.add_writer(self._fileno, self._write_ready)
 
- self._buffer.append(data)
+ self._buffer += data
 self._maybe_pause_protocol()
 
 def _write_ready(self):
- data = b''.join(self._buffer)
- assert data, 'Data should not be empty'
+ assert self._buffer, 'Data should not be empty'
 
- self._buffer.clear()
 try:
- n = os.write(self._fileno, data)
+ n = os.write(self._fileno, self._buffer)
 except (BlockingIOError, InterruptedError):
- self._buffer.append(data)
+ pass
 except Exception as exc:
+ self._buffer.clear()
 self._conn_lost += 1
 # Remove writer here, _fatal_error() doesn't it
 # because _buffer is empty.
 self._loop.remove_writer(self._fileno)
 self._fatal_error(exc, 'Fatal write error on pipe transport')
 else:
- if n == len(data):
+ if n == len(self._buffer):
+ self._buffer.clear()
 self._loop.remove_writer(self._fileno)
 self._maybe_resume_protocol() # May append to buffer.
- if not self._buffer and self._closing:
+ if self._closing:
 self._loop.remove_reader(self._fileno)
 self._call_connection_lost(None)
 return
 elif n > 0:
- data = data[n:]
-
- self._buffer.append(data) # Try again later.
+ del self._buffer[:n]
 
 def can_write_eof(self):
 return True
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -518,7 +518,7 @@
 tr.write(b'data')
 m_write.assert_called_with(5, b'data')
 self.assertFalse(self.loop.writers)
- self.assertEqual([], tr._buffer)
+ self.assertEqual(bytearray(), tr._buffer)
 
 @mock.patch('os.write')
 def test_write_no_data(self, m_write):
@@ -526,35 +526,34 @@
 tr.write(b'')
 self.assertFalse(m_write.called)
 self.assertFalse(self.loop.writers)
- self.assertEqual([], tr._buffer)
+ self.assertEqual(bytearray(b''), tr._buffer)
 
 @mock.patch('os.write')
 def test_write_partial(self, m_write):
 tr = self.write_pipe_transport()
 m_write.return_value = 2
 tr.write(b'data')
- m_write.assert_called_with(5, b'data')
 self.loop.assert_writer(5, tr._write_ready)
- self.assertEqual([b'ta'], tr._buffer)
+ self.assertEqual(bytearray(b'ta'), tr._buffer)
 
 @mock.patch('os.write')
 def test_write_buffer(self, m_write):
 tr = self.write_pipe_transport()
 self.loop.add_writer(5, tr._write_ready)
- tr._buffer = [b'previous']
+ tr._buffer = bytearray(b'previous')
 tr.write(b'data')
 self.assertFalse(m_write.called)
 self.loop.assert_writer(5, tr._write_ready)
- self.assertEqual([b'previous', b'data'], tr._buffer)
+ self.assertEqual(bytearray(b'previousdata'), tr._buffer)
 
 @mock.patch('os.write')
 def test_write_again(self, m_write):
 tr = self.write_pipe_transport()
 m_write.side_effect = BlockingIOError()
 tr.write(b'data')
- m_write.assert_called_with(5, b'data')
+ m_write.assert_called_with(5, bytearray(b'data'))
 self.loop.assert_writer(5, tr._write_ready)
- self.assertEqual([b'data'], tr._buffer)
+ self.assertEqual(bytearray(b'data'), tr._buffer)
 
 @mock.patch('asyncio.unix_events.logger')
 @mock.patch('os.write')
@@ -566,7 +565,7 @@
 tr.write(b'data')
 m_write.assert_called_with(5, b'data')
 self.assertFalse(self.loop.writers)
- self.assertEqual([], tr._buffer)
+ self.assertEqual(bytearray(), tr._buffer)
 tr._fatal_error.assert_called_with(
 err,
 'Fatal write error on pipe transport')
@@ -606,58 +605,55 @@
 def test__write_ready(self, m_write):
 tr = self.write_pipe_transport()
 self.loop.add_writer(5, tr._write_ready)
- tr._buffer = [b'da', b'ta']
+ tr._buffer = bytearray(b'data')
 m_write.return_value = 4
 tr._write_ready()
- m_write.assert_called_with(5, b'data')
 self.assertFalse(self.loop.writers)
- self.assertEqual([], tr._buffer)
+ self.assertEqual(bytearray(), tr._buffer)
 
 @mock.patch('os.write')
 def test__write_ready_partial(self, m_write):
 tr = self.write_pipe_transport()
 self.loop.add_writer(5, tr._write_ready)
- tr._buffer = [b'da', b'ta']
+ tr._buffer = bytearray(b'data')
 m_write.return_value = 3
 tr._write_ready()
- m_write.assert_called_with(5, b'data')
 self.loop.assert_writer(5, tr._write_ready)
- self.assertEqual([b'a'], tr._buffer)
+ self.assertEqual(bytearray(b'a'), tr._buffer)
 
 @mock.patch('os.write')
 def test__write_ready_again(self, m_write):
 tr = self.write_pipe_transport()
 self.loop.add_writer(5, tr._write_ready)
- tr._buffer = [b'da', b'ta']
+ tr._buffer = bytearray(b'data')
 m_write.side_effect = BlockingIOError()
 tr._write_ready()
- m_write.assert_called_with(5, b'data')
+ m_write.assert_called_with(5, bytearray(b'data'))
 self.loop.assert_writer(5, tr._write_ready)
- self.assertEqual([b'data'], tr._buffer)
+ self.assertEqual(bytearray(b'data'), tr._buffer)
 
 @mock.patch('os.write')
 def test__write_ready_empty(self, m_write):
 tr = self.write_pipe_transport()
 self.loop.add_writer(5, tr._write_ready)
- tr._buffer = [b'da', b'ta']
+ tr._buffer = bytearray(b'data')
 m_write.return_value = 0
 tr._write_ready()
- m_write.assert_called_with(5, b'data')
+ m_write.assert_called_with(5, bytearray(b'data'))
 self.loop.assert_writer(5, tr._write_ready)
- self.assertEqual([b'data'], tr._buffer)
+ self.assertEqual(bytearray(b'data'), tr._buffer)
 
 @mock.patch('asyncio.log.logger.error')
 @mock.patch('os.write')
 def test__write_ready_err(self, m_write, m_logexc):
 tr = self.write_pipe_transport()
 self.loop.add_writer(5, tr._write_ready)
- tr._buffer = [b'da', b'ta']
+ tr._buffer = bytearray(b'data')
 m_write.side_effect = err = OSError()
 tr._write_ready()
- m_write.assert_called_with(5, b'data')
 self.assertFalse(self.loop.writers)
 self.assertFalse(self.loop.readers)
- self.assertEqual([], tr._buffer)
+ self.assertEqual(bytearray(), tr._buffer)
 self.assertTrue(tr.is_closing())
 m_logexc.assert_called_with(
 test_utils.MockPattern(
@@ -673,13 +669,12 @@
 tr = self.write_pipe_transport()
 self.loop.add_writer(5, tr._write_ready)
 tr._closing = True
- tr._buffer = [b'da', b'ta']
+ tr._buffer = bytearray(b'data')
 m_write.return_value = 4
 tr._write_ready()
- m_write.assert_called_with(5, b'data')
 self.assertFalse(self.loop.writers)
 self.assertFalse(self.loop.readers)
- self.assertEqual([], tr._buffer)
+ self.assertEqual(bytearray(), tr._buffer)
 self.protocol.connection_lost.assert_called_with(None)
 self.pipe.close.assert_called_with()
 
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -272,6 +272,9 @@
 - Issue #26654: Inspect functools.partial in asyncio.Handle.__repr__.
 Patch by iceboy.
 
+- Issue #26909: Fix slow pipes IO in asyncio.
+ Patch by INADA Naoki.
+
 IDLE
 ----
 
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /