diff -r 02b8a061203a Lib/io.py --- a/Lib/io.py Thu Aug 14 17:56:07 2008 +0200 +++ b/Lib/io.py Thu Aug 14 21:20:29 2008 +0200 @@ -63,6 +63,7 @@ import codecs import codecs import _fileio import warnings +import threading # open() uses st_blksize whenever we can DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes @@ -1022,43 +1023,51 @@ class BufferedWriter(_BufferedIOMixin): if max_buffer_size is None else max_buffer_size) self._write_buf = bytearray() + self._write_lock = threading.Lock() def write(self, b): if self.closed: raise ValueError("write to closed file") if isinstance(b, unicode): raise TypeError("can't write unicode to binary stream") - # XXX we can implement some more tricks to try and avoid partial writes - if len(self._write_buf)> self.buffer_size: - # We're full, so let's pre-flush the buffer - try: - self.flush() - except BlockingIOError as e: - # We can't accept anything else. - # XXX Why not just let the exception pass through? - raise BlockingIOError(e.errno, e.strerror, 0) - before = len(self._write_buf) - self._write_buf.extend(b) - written = len(self._write_buf) - before - if len(self._write_buf)> self.buffer_size: - try: - self.flush() - except BlockingIOError as e: - if (len(self._write_buf)> self.max_buffer_size): - # We've hit max_buffer_size. We have to accept a partial - # write and cut back our buffer. - overage = len(self._write_buf) - self.max_buffer_size - self._write_buf = self._write_buf[:self.max_buffer_size] - raise BlockingIOError(e.errno, e.strerror, overage) - return written + with self._write_lock: + # XXX we can implement some more tricks to try and avoid + # partial writes + if len(self._write_buf)> self.buffer_size: + # We're full, so let's pre-flush the buffer + try: + self._flush_unlocked() + except BlockingIOError as e: + # We can't accept anything else. + # XXX Why not just let the exception pass through? + raise BlockingIOError(e.errno, e.strerror, 0) + before = len(self._write_buf) + self._write_buf.extend(b) + written = len(self._write_buf) - before + if len(self._write_buf)> self.buffer_size: + try: + self._flush_unlocked() + except BlockingIOError as e: + if len(self._write_buf)> self.max_buffer_size: + # We've hit max_buffer_size. We have to accept a + # partial write and cut back our buffer. + overage = len(self._write_buf) - self.max_buffer_size + self._write_buf = self._write_buf[:self.max_buffer_size] + raise BlockingIOError(e.errno, e.strerror, overage) + return written def truncate(self, pos=None): - self.flush() - if pos is None: - pos = self.raw.tell() - return self.raw.truncate(pos) + with self._write_lock: + self._flush_unlocked() + if pos is None: + pos = self.raw.tell() + return self.raw.truncate(pos) def flush(self): + with self._write_lock: + self._flush_unlocked() + + def _flush_unlocked(self): if self.closed: raise ValueError("flush of closed file") written = 0 @@ -1077,8 +1086,9 @@ class BufferedWriter(_BufferedIOMixin): return self.raw.tell() + len(self._write_buf) def seek(self, pos, whence=0): - self.flush() - return self.raw.seek(pos, whence) + with self._write_lock: + self._flush_unlocked() + return self.raw.seek(pos, whence) class BufferedRWPair(BufferedIOBase): diff -r 02b8a061203a Lib/test/test_io.py --- a/Lib/test/test_io.py Thu Aug 14 17:56:07 2008 +0200 +++ b/Lib/test/test_io.py Thu Aug 14 21:20:29 2008 +0200 @@ -6,6 +6,7 @@ import sys import sys import time import array +import threading import unittest from itertools import chain from test import test_support @@ -445,6 +446,38 @@ class BufferedWriterTest(unittest.TestCa bufio.flush() self.assertEquals(b"abc", writer._write_stack[0]) + + def testThreads(self): + # BufferedWriter should not raise exceptions or crash + # when called from multiple threads. + try: + # We use a real file object because it allows us to + # exercise situations where the GIL is released before + # writing the buffer to the raw streams. This is in addition + # to concurrency issues due to switching threads in the middle + # of Python code. + with io.open(test_support.TESTFN, "wb", buffering=0) as raw: + bufio = io.BufferedWriter(raw, 8) + errors = [] + def f(): + try: + # Write enough bytes to flush the buffer + s = b"a" * 19 + for i in range(50): + bufio.write(s) + except Exception as e: + errors.append(e) + raise + threads = [threading.Thread(target=f) for x in range(20)] + for t in threads: + t.start() + time.sleep(0.02) # yield + for t in threads: + t.join() + self.assertFalse(errors, + "the following exceptions were caught: %r" % errors) + finally: + test_support.unlink(test_support.TESTFN) class BufferedRWPairTest(unittest.TestCase):

AltStyle によって変換されたページ (->オリジナル) /