[Python-checkins] python/nondist/sandbox/sio sio.py,NONE,1.1 test_sio.py,NONE,1.1

gvanrossum@users.sourceforge.net gvanrossum@users.sourceforge.net
2003年4月09日 14:17:44 -0700


Update of /cvsroot/python/python/nondist/sandbox/sio
In directory sc8-pr-cvs1:/tmp/cvs-serv14889
Added Files:
	sio.py test_sio.py 
Log Message:
Experimental new standard I/O library.
--- NEW FILE: sio.py ---
"""New standard I/O library.
This code is still very young and experimental!
There are fairly complete unit tests in test_sio.py.
The design is simple:
- A raw stream supports read(n), write(s), seek(offset, whence=0) and
 tell(). This is generally unbuffered. Raw streams may support
 Unicode.
- A basis stream provides the raw stream API and builds on a much more
 low-level API, e.g. the os, mmap or socket modules.
- A filtering stream is raw stream built on top of another raw stream.
 There are filtering streams for universal newline translation and
 for unicode translation.
- A buffering stream supports the full classic Python I/O API:
 read(n=-1), readline(), readlines(sizehint=0), tell(), seek(offset,
 whence=0), write(s), writelines(lst), as well as __iter__() and
 next(). (There's also readall() but that's a synonym for read()
 without arguments.) This is a superset of the raw stream API. I
 haven't thought about fileno() and isatty() yet. We really need
 only one buffering stream implementation, which is a filtering
 stream.
You typically take a basis stream, place zero or more filtering
streams on top of it, and then top it off with a buffering stream.
"""
import os
import mmap
class BufferingInputStream(object):
 """Standard buffering input stream.
 This is typically the top of the stack.
 """
 bigsize = 2**19 # Half a Meg
 bufsize = 2**13 # 8 K
 def __init__(self, base, bufsize=None):
 self.do_read = getattr(base, "read", None)
 # function to fill buffer some more
 self.do_tell = getattr(base, "tell", None)
 # None, or return a byte offset 
 self.do_seek = getattr(base, "seek", None)
 # None, or seek to abyte offset
 if bufsize is None: # Get default from the class
 bufsize = self.bufsize
 self.bufsize = bufsize # buffer size (hint only)
 self.lines = [] # ready-made lines (sans "\n")
 self.buf = "" # raw data (may contain "\n")
 # Invariant: readahead == "\n".join(self.lines + [self.buf])
 # self.lines contains no "\n"
 # self.buf may contain "\n"
 def tell(self):
 bytes = self.do_tell() # This may fail
 offset = len(self.buf)
 for line in self.lines:
 offset += len(line) + 1
 assert bytes >= offset, (locals(), self.__dict__)
 return bytes - offset
 def seek(self, offset, whence=0):
 # This may fail on the do_seek() or do_tell() call.
 # But it won't call either on a relative forward seek.
 # Nor on a seek to the very end.
 if whence == 0 or (whence == 2 and self.do_seek is not None):
 self.do_seek(offset, whence)
 self.lines = []
 self.buf = ""
 return
 if whence == 2:
 # Skip relative to EOF by reading and saving only just as
 # much as needed
 assert self.do_seek is None
 data = "\n".join(self.lines + [self.buf])
 total = len(data)
 buffers = [data]
 self.lines = []
 self.buf = ""
 while 1:
 data = self.do_read(self.bufsize)
 if not data:
 break
 buffers.append(data)
 total += len(data)
 while buffers and total >= len(buffers[0]) - offset:
 total -= len(buffers[0])
 del buffers[0]
 cutoff = total + offset
 if cutoff < 0:
 raise TypeError, "cannot seek back"
 if buffers:
 buffers[0] = buffers[0][cutoff:]
 self.buf = "".join(buffers)
 self.lines = []
 return
 if whence == 1:
 if offset < 0:
 self.do_seek(self.tell() + offset, 0)
 self.lines = []
 self.buf = ""
 return
 while self.lines:
 line = self.lines[0]
 if offset <= len(line):
 self.lines[0] = line[offset:]
 return
 offset -= len(self.lines[0]) - 1
 del self.lines[0]
 assert not self.lines
 if offset <= len(self.buf):
 self.buf = self.buf[offset:]
 return
 offset -= len(self.buf)
 self.buf = ""
 if self.do_seek is None:
 self.read(offset)
 else:
 self.do_seek(offset, 1)
 return
 raise ValueError, "whence should be 0, 1 or 2"
 def readall(self):
 self.lines.append(self.buf)
 more = ["\n".join(self.lines)]
 self.lines = []
 self.buf = ""
 bufsize = self.bufsize
 while 1:
 data = self.do_read(bufsize)
 if not data:
 break
 more.append(data)
 bufsize = max(bufsize*2, self.bigsize)
 return "".join(more)
 def read(self, n=-1):
 if n < 0:
 return self.readall()
 if self.lines:
 # See if this can be satisfied from self.lines[0]
 line = self.lines[0]
 if len(line) >= n:
 self.lines[0] = line[n:]
 return line[:n]
 # See if this can be satisfied *without exhausting* self.lines
 k = 0
 i = 0
 for line in self.lines:
 k += len(line)
 if k >= n:
 lines = self.lines[:i]
 data = self.lines[i]
 cutoff = len(data) - (k-n)
 lines.append(data[:cutoff])
 self.lines[:i+1] = [data[cutoff:]]
 return "\n".join(lines)
 k += 1
 i += 1
 # See if this can be satisfied from self.lines plus self.buf
 if k + len(self.buf) >= n:
 lines = self.lines
 self.lines = []
 cutoff = n - k
 lines.append(self.buf[:cutoff])
 self.buf = self.buf[cutoff:]
 return "\n".join(lines)
 else:
 # See if this can be satisfied from self.buf
 data = self.buf
 k = len(data)
 if k >= n:
 cutoff = len(data) - (k-n)
 self.buf = data[cutoff:]
 return data[:cutoff]
 lines = self.lines
 self.lines = []
 lines.append(self.buf)
 self.buf = ""
 data = "\n".join(lines)
 more = [data]
 k = len(data)
 while k < n:
 data = self.do_read(max(self.bufsize, n-k))
 k += len(data)
 more.append(data)
 if not data:
 break
 cutoff = len(data) - (k-n)
 self.buf = data[cutoff:]
 more[-1] = data[:cutoff]
 return "".join(more)
 def __iter__(self):
 return self
 def next(self):
 if self.lines:
 return self.lines.pop(0) + "\n"
 # This block is needed because read() can leave self.buf
 # containing newlines
 self.lines = self.buf.split("\n")
 self.buf = self.lines.pop()
 if self.lines:
 return self.lines.pop(0) + "\n"
 buf = self.buf and [self.buf] or []
 while 1:
 self.buf = self.do_read(self.bufsize)
 self.lines = self.buf.split("\n")
 self.buf = self.lines.pop()
 if self.lines:
 buf.append(self.lines.pop(0))
 buf.append("\n")
 break
 if not self.buf:
 break
 buf.append(self.buf)
 line = "".join(buf)
 if not line:
 raise StopIteration
 return line
 def readline(self):
 try:
 return self.next()
 except StopIteration:
 return ""
 def readlines(self, sizehint=0):
 return list(self)
class CRLFFilter(object):
 """Filtering stream for universal newlines.
 TextInputFilter is more general, but this is faster when you don't
 need tell/seek.
 """
 def __init__(self, base):
 self.do_read = base.read
 self.atcr = False
 def read(self, n):
 data = self.do_read(n)
 if self.atcr:
 if data.startswith("\n"):
 data = data[1:] # Very rare case: in the middle of "\r\n"
 self.atcr = False
 if "\r" in data:
 self.atcr = data.endswith("\r") # Test this before removing \r
 data = data.replace("\r\n", "\n") # Catch \r\n this first
 data = data.replace("\r", "\n") # Remaining \r are standalone
 return data
class MMapFile(object):
 """Standard I/O basis stream using mmap."""
 def __init__(self, filename, mode="r"):
 self.filename = filename
 self.mode = mode
 if mode == "r":
 flag = os.O_RDONLY
 self.access = mmap.ACCESS_READ
 else:
 if mode == "w":
 flag = os.O_RDWR | os.O_CREAT
 elif mode == "a":
 flag = os.O_RDWR
 else:
 raise ValueError, "mode should be 'r', 'w' or 'a'"
 self.access = mmap.ACCESS_WRITE
 if hasattr(os, "O_BINARY"):
 flag |= os.O_BINARY
 self.fd = os.open(filename, flag)
 self.mm = mmap.mmap(self.fd, 0, access=self.access)
 self.pos = 0
 def __del__(self):
 self.close()
 def close(self):
 if self.mm is not None:
 self.mm.close()
 self.mm = None
 if self.fd is not None:
 os.close(self.fd)
 self.fd = None
 def tell(self):
 return self.pos
 def seek(self, offset, whence=0):
 if whence == 0:
 self.pos = max(0, offset)
 elif whence == 1:
 self.pos = max(0, self.pos + offset)
 elif whence == 2:
 self.pos = max(0, self.mm.size() + offset)
 else:
 raise ValueError, "seek(): whence must be 0, 1 or 2"
 def readall(self):
 return self.read()
 def read(self, n=-1):
 if n >= 0:
 aim = self.pos + n
 else:
 aim = self.mm.size() # Actual file size, may be more than mapped
 n = aim - self.pos
 data = self.mm[self.pos:aim]
 if len(data) < n:
 del data
 # File grew since opened; remap to get the new data
 self.mm = mmap.mmap(self.fd, 0, access=self.access)
 data = self.mm[self.pos:aim]
 self.pos += len(data)
 return data
 def __iter__(self):
 return self
 def readline(self):
 hit = self.mm.find("\n", self.pos) + 1
 if hit:
 data = self.mm[self.pos:hit]
 self.pos = hit
 return data
 # Remap the file just in case
 self.mm = mmap.mmap(self.fd, 0, access=self.access)
 hit = self.mm.find("\n", self.pos) + 1
 if hit:
 # Got a whole line after remapping
 data = self.mm[self.pos:hit]
 self.pos = hit
 return data
 # Read whatever we've got -- may be empty
 data = self.mm[self.pos:self.mm.size()]
 self.pos += len(data)
 return data
 def next(self):
 hit = self.mm.find("\n", self.pos) + 1
 if hit:
 data = self.mm[self.pos:hit]
 self.pos = hit
 return data
 # Remap the file just in case
 self.mm = mmap.mmap(self.fd, 0, access=self.access)
 hit = self.mm.find("\n", self.pos) + 1
 if hit:
 # Got a whole line after remapping
 data = self.mm[self.pos:hit]
 self.pos = hit
 return data
 # Read whatever we've got -- may be empty
 data = self.mm[self.pos:self.mm.size()]
 if not data:
 raise StopIteration
 self.pos += len(data)
 return data
 def readlines(self, sizehint=0):
 return list(iter(self.readline, ""))
 def write(self, data):
 end = self.pos + len(data)
 try:
 self.mm[self.pos:end] = data
 except IndexError:
 self.mm.resize(end)
 self.mm[self.pos:end] = data
 self.pos = end
 def writelines(self, lines):
 filter(self.write, lines)
class DiskFile(object):
 """Standard I/O basis stream using os.open/close/read/write/lseek"""
 def __init__(self, filename, mode="r"):
 self.filename = filename
 self.mode = mode
 if mode == "r":
 flag = os.O_RDONLY
 elif mode == "w":
 flag = os.O_RDWR | os.O_CREAT
 elif mode == "a":
 flag = os.O_RDWR
 else:
 raise ValueError, "mode should be 'r', 'w' or 'a'"
 if hasattr(os, "O_BINARY"):
 flag |= os.O_BINARY
 self.fd = os.open(filename, flag)
 def seek(self, offset, whence=0):
 os.lseek(self.fd, offset, whence)
 def tell(self):
 return os.lseek(self.fd, 0, 1)
 def read(self, n):
 return os.read(self.fd, n)
 def write(self, data):
 while data:
 n = os.write(self.fd, data)
 data = data[n:]
 def close(self):
 fd = self.fd
 if fd is not None:
 self.fd = None
 os.close(fd)
 def __del__(self):
 try:
 self.close()
 except:
 pass
class TextInputFilter(object):
 """Filtering input stream for universal newline translation."""
 def __init__(self, base):
 self.base = base # must implement read, may implement tell, seek
 self.atcr = False # Set when last char read was \r
 self.buf = "" # Optional one-character read-ahead buffer
 def read(self, n):
 """Read up to n bytes."""
 if n <= 0:
 return ""
 if self.buf:
 assert not self.atcr
 data = self.buf
 self.buf = ""
 return data
 data = self.base.read(n)
 if self.atcr:
 if data.startswith("\n"):
 data = data[1:]
 if not data:
 data = self.base.read(n)
 self.atcr = False
 if "\r" in data:
 self.atcr = data.endswith("\r")
 data = data.replace("\r\n", "\n").replace("\r", "\n")
 return data
 def seek(self, offset, whence=0):
 self.base.seek(offset, whence)
 self.atcr = False
 self.buf = ""
 def tell(self):
 pos = self.base.tell()
 if self.atcr:
 # Must read the next byte to see if it's \n,
 # because then we must report the next position.
 assert not self.buf 
 self.buf = self.base.read(1)
 pos += 1
 self.atcr = False
 if self.buf == "\n":
 self.buf = ""
 return pos - len(self.buf)
class TextOutputFilter(object):
 """Filtering output stream for universal newline translation."""
 def __init__(self, base, linesep=os.linesep):
 assert linesep in ["\n", "\r\n", "\r"]
 self.base = base # must implement write, may implement seek, tell
 self.linesep = linesep
 def write(self, data):
 if self.linesep is not "\n" and "\n" in data:
 data = data.replace("\n", self.linesep)
 self.base.write(data)
 def seek(self, offset, whence=0):
 self.base.seek(offset, whence)
 def tell(self):
 return self.base.tell()
class DecodingInputFilter(object):
 """Filtering input stream that decodes an encoded file."""
 def __init__(self, base, encoding="utf8", errors="strict"):
 self.base = base
 self.encoding = encoding
 self.errors = errors
 self.tell = base.tell
 self.seek = base.seek
 def read(self, n):
 """Read *approximately* n bytes, then decode them.
 Under extreme circumstances,
 the return length could be longer than n!
 Always return a unicode string.
 This does *not* translate newlines;
 you can stack TextInputFilter.
 """
 data = self.base.read(n)
 try:
 return data.decode(self.encoding, self.errors)
 except ValueError:
 # XXX Sigh. decode() doesn't handle incomplete strings well.
 # Use the retry strategy from codecs.StreamReader.
 for i in range(9):
 more = self.base.read(1)
 if not more:
 raise
 data += more
 try:
 return data.decode(self.encoding, self.errors)
 except ValueError:
 pass
 raise
class EncodingOutputFilter(object):
 """Filtering output stream that writes to an encoded file."""
 def __init__(self, base, encoding="utf8", errors="strict"):
 self.base = base
 self.encoding = encoding
 self.errors = errors
 self.tell = base.tell
 self.seek = base.seek
 def write(self, chars):
 if isinstance(chars, str):
 chars = unicode(chars) # Fail if it's not ASCII
 self.base.write(chars.encode(self.encoding, self.errors))
--- NEW FILE: test_sio.py ---
"""Unit tests for sio (new standard I/O)."""
import time
import tempfile
import unittest
from test import test_support
import sio
class TestSource(object):
 def __init__(self, packets):
 for x in packets:
 assert x
 self.orig_packets = list(packets)
 self.packets = list(packets)
 self.pos = 0
 def tell(self):
 return self.pos
 def seek(self, offset, whence=0):
 if whence == 1:
 offset += self.pos
 elif whence == 2:
 for packet in self.orig_packets:
 offset += len(packet)
 else:
 assert whence == 0
 self.packets = list(self.orig_packets)
 self.pos = 0
 while self.pos < offset:
 data = self.read(offset - self.pos)
 if not data:
 break
 assert self.pos == offset
 def read(self, n):
 try:
 data = self.packets.pop(0)
 except IndexError:
 return ""
 if len(data) > n:
 data, rest = data[:n], data[n:]
 self.packets.insert(0, rest)
 self.pos += len(data)
 return data
class TestReader(object):
 def __init__(self, packets):
 for x in packets:
 assert x
 self.orig_packets = list(packets)
 self.packets = list(packets)
 self.pos = 0
 def tell(self):
 return self.pos
 def seek(self, offset, whence=0):
 if whence == 1:
 offset += self.pos
 elif whence == 2:
 for packet in self.orig_packets:
 offset += len(packet)
 else:
 assert whence == 0
 self.packets = list(self.orig_packets)
 self.pos = 0
 while self.pos < offset:
 data = self.read(offset - self.pos)
 if not data:
 break
 assert self.pos == offset
 def read(self, n):
 try:
 data = self.packets.pop(0)
 except IndexError:
 return ""
 if len(data) > n:
 data, rest = data[:n], data[n:]
 self.packets.insert(0, rest)
 self.pos += len(data)
 return data
class TestWriter(object):
 def __init__(self):
 self.buf = ""
 self.pos = 0
 def write(self, data):
 if self.pos >= len(self.buf):
 self.buf += "0円" * (self.pos - len(self.buf)) + data
 self.pos = len(self.buf)
 else:
 self.buf = (self.buf[:self.pos] + data +
 self.buf[self.pos + len(data):])
 self.pos += len(data)
 def tell(self):
 return self.pos
 def seek(self, offset, whence=0):
 if whence == 0:
 pass
 elif whence == 1:
 offset += self.pos
 elif whence == 2:
 offset += len(self.buf)
 else:
 raise ValueError, "whence should be 0, 1 or 2"
 if offset < 0:
 offset = 0
 self.pos = offset
class BufferingInputStreamTests(unittest.TestCase):
 packets = ["a", "b", "\n", "def", "\nxy\npq\nuv", "wx"]
 lines = ["ab\n", "def\n", "xy\n", "pq\n", "uvwx"]
 def setUp(self):
 pass
 def makeStream(self, tell=False, seek=False, bufsize=None):
 base = TestSource(self.packets)
 if not tell:
 base.tell = None
 if not seek:
 base.seek = None
 return sio.BufferingInputStream(base, bufsize)
 def test_readline(self):
 file = self.makeStream()
 self.assertEqual(list(iter(file.readline, "")), self.lines)
 def test_readlines(self):
 # This also tests next() and __iter__()
 file = self.makeStream()
 self.assertEqual(file.readlines(), self.lines)
 def test_readlines_small_bufsize(self):
 file = self.makeStream(bufsize=1)
 self.assertEqual(list(file), self.lines)
 def test_readall(self):
 file = self.makeStream()
 self.assertEqual(file.readall(), "".join(self.lines))
 def test_readall_small_bufsize(self):
 file = self.makeStream(bufsize=1)
 self.assertEqual(file.readall(), "".join(self.lines))
 def test_readall_after_readline(self):
 file = self.makeStream()
 self.assertEqual(file.readline(), self.lines[0])
 self.assertEqual(file.readline(), self.lines[1])
 self.assertEqual(file.readall(), "".join(self.lines[2:]))
 def test_read_1_after_readline(self):
 file = self.makeStream()
 self.assertEqual(file.readline(), "ab\n")
 self.assertEqual(file.readline(), "def\n")
 blocks = []
 while 1:
 block = file.read(1)
 if not block:
 break
 blocks.append(block)
 self.assertEqual(file.read(0), "")
 self.assertEqual(blocks, list("".join(self.lines)[7:]))
 def test_read_1(self):
 file = self.makeStream()
 blocks = []
 while 1:
 block = file.read(1)
 if not block:
 break
 blocks.append(block)
 self.assertEqual(file.read(0), "")
 self.assertEqual(blocks, list("".join(self.lines)))
 def test_read_2(self):
 file = self.makeStream()
 blocks = []
 while 1:
 block = file.read(2)
 if not block:
 break
 blocks.append(block)
 self.assertEqual(file.read(0), "")
 self.assertEqual(blocks, ["ab", "\nd", "ef", "\nx", "y\n", "pq",
 "\nu", "vw", "x"])
 def test_read_4(self):
 file = self.makeStream()
 blocks = []
 while 1:
 block = file.read(4)
 if not block:
 break
 blocks.append(block)
 self.assertEqual(file.read(0), "")
 self.assertEqual(blocks, ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"])
 
 def test_read_4_after_readline(self):
 file = self.makeStream()
 self.assertEqual(file.readline(), "ab\n")
 self.assertEqual(file.readline(), "def\n")
 blocks = [file.read(4)]
 while 1:
 block = file.read(4)
 if not block:
 break
 blocks.append(block)
 self.assertEqual(file.read(0), "")
 self.assertEqual(blocks, ["xy\np", "q\nuv", "wx"])
 def test_read_4_small_bufsize(self):
 file = self.makeStream(bufsize=1)
 blocks = []
 while 1:
 block = file.read(4)
 if not block:
 break
 blocks.append(block)
 self.assertEqual(blocks, ["ab\nd", "ef\nx", "y\npq", "\nuvw", "x"])
 def test_tell_1(self):
 file = self.makeStream(tell=True)
 pos = 0
 while 1:
 self.assertEqual(file.tell(), pos)
 n = len(file.read(1))
 if not n:
 break
 pos += n
 def test_tell_1_after_readline(self):
 file = self.makeStream(tell=True)
 pos = 0
 pos += len(file.readline())
 self.assertEqual(file.tell(), pos)
 pos += len(file.readline())
 self.assertEqual(file.tell(), pos)
 while 1:
 self.assertEqual(file.tell(), pos)
 n = len(file.read(1))
 if not n:
 break
 pos += n
 def test_tell_2(self):
 file = self.makeStream(tell=True)
 pos = 0
 while 1:
 self.assertEqual(file.tell(), pos)
 n = len(file.read(2))
 if not n:
 break
 pos += n
 def test_tell_4(self):
 file = self.makeStream(tell=True)
 pos = 0
 while 1:
 self.assertEqual(file.tell(), pos)
 n = len(file.read(4))
 if not n:
 break
 pos += n
 def test_tell_readline(self):
 file = self.makeStream(tell=True)
 pos = 0
 while 1:
 self.assertEqual(file.tell(), pos)
 n = len(file.readline())
 if not n:
 break
 pos += n
 def test_seek(self):
 file = self.makeStream(tell=True, seek=True)
 all = file.readall()
 end = len(all)
 for readto in range(0, end+1):
 for seekto in range(0, end+1):
 for whence in 0, 1, 2:
 file.seek(0)
 self.assertEqual(file.tell(), 0)
 head = file.read(readto)
 self.assertEqual(head, all[:readto])
 if whence == 1:
 offset = seekto - readto
 elif whence == 2:
 offset = seekto - end
 else:
 offset = seekto
 file.seek(offset, whence)
 here = file.tell()
 self.assertEqual(here, seekto)
 rest = file.readall()
 self.assertEqual(rest, all[seekto:])
 def test_seek_noseek(self):
 file = self.makeStream()
 all = file.readall()
 end = len(all)
 for readto in range(0, end+1):
 for seekto in range(readto, end+1):
 for whence in 1, 2:
 file = self.makeStream()
 head = file.read(readto)
 self.assertEqual(head, all[:readto])
 if whence == 1:
 offset = seekto - readto
 elif whence == 2:
 offset = seekto - end
 file.seek(offset, whence)
 rest = file.readall()
 self.assertEqual(rest, all[seekto:])
class CRLFFilterTests(unittest.TestCase):
 def test_filter(self):
 packets = ["abc\ndef\rghi\r\nxyz\r", "123\r", "\n456"]
 expected = ["abc\ndef\nghi\nxyz\n", "123\n", "456"]
 crlf = sio.CRLFFilter(TestSource(packets))
 blocks = []
 while 1:
 block = crlf.read(100)
 if not block:
 break
 blocks.append(block)
 self.assertEqual(blocks, expected)
class MMapFileTests(BufferingInputStreamTests):
 tfn = None
 def tearDown(self):
 tfn = self.tfn
 if tfn:
 self.tfn = None
 try:
 os.remove(self.tfn)
 except os.error, msg:
 print "can't remove %s: %s" % (tfn, msg)
 def makeStream(self, tell=None, seek=None, bufsize=None, mode="r"):
 self.tfn = tempfile.mktemp()
 f = open(tfn, "wb")
 f.writelines(self.packets)
 f.close()
 return sio.MMapFile(self.tfn, mode)
 def test_write(self):
 file = self.makeStream(mode="w")
 file.write("BooHoo\n")
 file.write("Barf\n")
 file.writelines(["a\n", "b\n", "c\n"])
 self.assertEqual(file.tell(), len("BooHoo\nBarf\na\nb\nc\n"))
 file.seek(0)
 self.assertEqual(file.read(), "BooHoo\nBarf\na\nb\nc\n")
 file.seek(0)
 self.assertEqual(file.readlines(),
 ["BooHoo\n", "Barf\n", "a\n", "b\n", "c\n"])
 self.assertEqual(file.tell(), len("BooHoo\nBarf\na\nb\nc\n"))
class TextInputFilterTests(unittest.TestCase):
 packets = [
 "foo\r",
 "bar\r",
 "\nfoo\r\n",
 "abc\ndef\rghi\r\nxyz",
 "\nuvw\npqr\r",
 "\n",
 "abc\n",
 ]
 expected = [
 ("foo\n", 4),
 ("bar\n", 9),
 ("foo\n", 14),
 ("abc\ndef\nghi\nxyz", 30),
 ("\nuvw\npqr\n", 40),
 ("abc\n", 44),
 ("", 44),
 ("", 44),
 ]
 expected_with_tell = [
 ("foo\n", 4),
 ("b", 5),
 ("ar\n", 9),
 ("foo\n", 14),
 ("abc\ndef\nghi\nxyz", 30),
 ("\nuvw\npqr\n", 40),
 ("abc\n", 44),
 ("", 44),
 ("", 44),
 ]
 def test_read(self):
 base = TestReader(self.packets)
 filter = sio.TextInputFilter(base)
 for data, pos in self.expected:
 self.assertEqual(filter.read(100), data)
 def test_read_tell(self):
 base = TestReader(self.packets)
 filter = sio.TextInputFilter(base)
 for data, pos in self.expected_with_tell:
 self.assertEqual(filter.read(100), data)
 self.assertEqual(filter.tell(), pos)
 self.assertEqual(filter.tell(), pos) # Repeat the tell() !
 def test_seek(self):
 base = TestReader(self.packets)
 filter = sio.TextInputFilter(base)
 sofar = ""
 pairs = []
 while True:
 pairs.append((sofar, filter.tell()))
 c = filter.read(1)
 if not c:
 break
 self.assertEqual(len(c), 1)
 sofar += c
 all = sofar
 for i in range(len(pairs)):
 sofar, pos = pairs[i]
 filter.seek(pos)
 self.assertEqual(filter.tell(), pos)
 self.assertEqual(filter.tell(), pos)
 bufs = [sofar]
 while True:
 data = filter.read(100)
 if not data:
 self.assertEqual(filter.read(100), "")
 break
 bufs.append(data)
 self.assertEqual("".join(bufs), all)
class TextOutputFilterTests(unittest.TestCase):
 def test_write_nl(self):
 base = TestWriter()
 filter = sio.TextOutputFilter(base, linesep="\n")
 filter.write("abc")
 filter.write("def\npqr\nuvw")
 filter.write("\n123\n")
 self.assertEqual(base.buf, "abcdef\npqr\nuvw\n123\n")
 def test_write_cr(self):
 base = TestWriter()
 filter = sio.TextOutputFilter(base, linesep="\r")
 filter.write("abc")
 filter.write("def\npqr\nuvw")
 filter.write("\n123\n")
 self.assertEqual(base.buf, "abcdef\rpqr\ruvw\r123\r")
 def test_write_crnl(self):
 base = TestWriter()
 filter = sio.TextOutputFilter(base, linesep="\r\n")
 filter.write("abc")
 filter.write("def\npqr\nuvw")
 filter.write("\n123\n")
 self.assertEqual(base.buf, "abcdef\r\npqr\r\nuvw\r\n123\r\n")
 def test_write_tell_nl(self):
 base = TestWriter()
 filter = sio.TextOutputFilter(base, linesep="\n")
 filter.write("xxx")
 self.assertEqual(filter.tell(), 3)
 filter.write("\nabc\n")
 self.assertEqual(filter.tell(), 8)
 def test_write_tell_cr(self):
 base = TestWriter()
 filter = sio.TextOutputFilter(base, linesep="\r")
 filter.write("xxx")
 self.assertEqual(filter.tell(), 3)
 filter.write("\nabc\n")
 self.assertEqual(filter.tell(), 8)
 def test_write_tell_crnl(self):
 base = TestWriter()
 filter = sio.TextOutputFilter(base, linesep="\r\n")
 filter.write("xxx")
 self.assertEqual(filter.tell(), 3)
 filter.write("\nabc\n")
 self.assertEqual(filter.tell(), 10)
 def test_write_seek(self):
 base = TestWriter()
 filter = sio.TextOutputFilter(base, linesep="\n")
 filter.write("x"*100)
 filter.seek(50)
 filter.write("y"*10)
 self.assertEqual(base.buf, "x"*50 + "y"*10 + "x"*40)
class DecodingInputFilterTests(unittest.TestCase):
 def test_read(self):
 chars = u"abc\xff\u1234\u4321\x80xyz"
 data = chars.encode("utf8")
 base = TestReader([data])
 filter = sio.DecodingInputFilter(base)
 bufs = []
 for n in range(1, 11):
 while 1:
 c = filter.read(n)
 self.assertEqual(type(c), unicode)
 if not c:
 break
 bufs.append(c)
 self.assertEqual(u"".join(bufs), chars)
class EncodingOutputFilterTests(unittest.TestCase):
 def test_write(self):
 chars = u"abc\xff\u1234\u4321\x80xyz"
 data = chars.encode("utf8")
 for n in range(1, 11):
 base = TestWriter()
 filter = sio.EncodingOutputFilter(base)
 pos = 0
 while 1:
 c = chars[pos:pos+n]
 if not c:
 break
 pos += len(c)
 filter.write(c)
 self.assertEqual(base.buf, data)
# Speed test
FN = "BIG"
def timeit(fn=FN):
 f = sio.MMapFile(fn, "r")
 lines = bytes = 0
 t0 = time.clock()
 for line in f:
 lines += 1
 bytes += len(line)
 t1 = time.clock()
 print "%d lines (%d bytes) in %.3f seconds" % (lines, bytes, t1-t0)
def timeold(fn=FN):
 f = open(fn, "rb")
 lines = bytes = 0
 t0 = time.clock()
 for line in f:
 lines += 1
 bytes += len(line)
 t1 = time.clock()
 print "%d lines (%d bytes) in %.3f seconds" % (lines, bytes, t1-t0)
# Functional test
def main():
 f = sio.DiskFile("sio.py")
 f = sio.DecodingInputFilter(f)
 f = sio.TextInputFilter(f)
 f = sio.BufferingInputStream(f)
 for i in range(10):
 print repr(f.readline())
# Unit test main program
def test_main():
 suite = unittest.TestSuite()
 suite.addTest(unittest.makeSuite(BufferingInputStreamTests))
 suite.addTest(unittest.makeSuite(CRLFFilterTests))
 ##suite.addTest(unittest.makeSuite(MMapFileTests))
 suite.addTest(unittest.makeSuite(TextInputFilterTests))
 suite.addTest(unittest.makeSuite(TextOutputFilterTests))
 suite.addTest(unittest.makeSuite(DecodingInputFilterTests))
 suite.addTest(unittest.makeSuite(EncodingOutputFilterTests))
 test_support.run_suite(suite)
if __name__ == "__main__":
 test_main()

AltStyle によって変換されたページ (->オリジナル) /