[Python-checkins] r68558 - in sandbox/trunk/io-c: _bufferedio.c test_io.py
antoine.pitrou
python-checkins at python.org
Mon Jan 12 23:45:43 CET 2009
Author: antoine.pitrou
Date: Mon Jan 12 23:45:43 2009
New Revision: 68558
Log:
Add some sanity checks, and test them.
Modified:
sandbox/trunk/io-c/_bufferedio.c
sandbox/trunk/io-c/test_io.py
Modified: sandbox/trunk/io-c/_bufferedio.c
==============================================================================
--- sandbox/trunk/io-c/_bufferedio.c (original)
+++ sandbox/trunk/io-c/_bufferedio.c Mon Jan 12 23:45:43 2009
@@ -469,8 +469,12 @@
return -1;
n = PyNumber_AsSsize_t(res, PyExc_ValueError);
Py_DECREF(res);
+ if (n < 0) {
+ PyErr_Format(PyExc_IOError,
+ "Raw stream returned invalid position %zd", n);
+ return -1;
+ }
self->abs_pos = n;
- /* TODO: sanity check (n >= 0) */
return n;
}
@@ -485,7 +489,11 @@
n = PyNumber_AsSsize_t(res, PyExc_ValueError);
Py_DECREF(res);
self->abs_pos = n;
- /* TODO: sanity check (n >= 0) */
+ if (n < 0) {
+ PyErr_Format(PyExc_IOError,
+ "Raw stream returned invalid position %zd", n);
+ return -1;
+ }
return n;
}
@@ -590,6 +598,11 @@
if (!PyArg_ParseTuple(args, "|n:read", &n)) {
return NULL;
}
+ if (n < -1) {
+ PyErr_SetString(PyExc_ValueError,
+ "read length must be positive or -1");
+ return NULL;
+ }
if (BufferedIOMixin_closed(self)) {
PyErr_SetString(PyExc_ValueError, "read of closed file");
@@ -614,8 +627,12 @@
return NULL;
}
- /* TODO: n < 0 should raise an error (?) */
- if (n <= 0)
+ if (n < 0) {
+ PyErr_SetString(PyExc_ValueError,
+ "read length must be positive");
+ return NULL;
+ }
+ if (n == 0)
return PyBytes_FromStringAndSize(NULL, 0);
ENTER_BUFFERED(self)
@@ -712,7 +729,12 @@
if (!PyArg_ParseTuple(args, "n|i:seek", &target, &whence)) {
return NULL;
}
- /* TODO: sanity check whence value */
+
+ if (whence < 0 || whence > 2) {
+ PyErr_Format(PyExc_ValueError,
+ "whence must be between 0 and 2, not %d", whence);
+ return NULL;
+ }
ENTER_BUFFERED(self)
@@ -872,7 +894,12 @@
}
n = PyNumber_AsSsize_t(res, PyExc_ValueError);
Py_DECREF(res);
- /* TODO: sanity check (0 <= n <= len) */
+ if (n < 0 || n > len) {
+ PyErr_Format(PyExc_IOError,
+ "raw readinto() returned invalid length %zd "
+ "(should have been between 0 and %zd)", n, len);
+ return -1;
+ }
if (n > 0 && self->abs_pos != -1)
self->abs_pos += n;
return n;
@@ -1241,7 +1268,12 @@
return -1;
n = PyNumber_AsSsize_t(res, PyExc_ValueError);
Py_DECREF(res);
- /* TODO: sanity check (0 <= n <= len) */
+ if (n < 0 || n > len) {
+ PyErr_Format(PyExc_IOError,
+ "raw write() returned invalid length %zd "
+ "(should have been between 0 and %zd)", n, len);
+ return -1;
+ }
if (n > 0 && self->abs_pos != -1)
self->abs_pos += n;
return n;
Modified: sandbox/trunk/io-c/test_io.py
==============================================================================
--- sandbox/trunk/io-c/test_io.py (original)
+++ sandbox/trunk/io-c/test_io.py Mon Jan 12 23:45:43 2009
@@ -81,6 +81,24 @@
return pos
+class MisbehavedRawIO(MockRawIO):
+ def write(self, b):
+ return MockRawIO.write(self, b) * 2
+
+ def read(self, n=None):
+ return MockRawIO.read(self, n) * 2
+
+ def seek(self, pos, whence):
+ return -123
+
+ def tell(self):
+ return -456
+
+ def readinto(self, buf):
+ MockRawIO.readinto(self, buf)
+ return len(buf) * 5
+
+
class MockFileIO(io.BytesIO):
def __init__(self, data):
@@ -420,6 +438,7 @@
class CommonBufferedTests:
# Tests common to BufferedReader, BufferedWriter and BufferedRandom
+
def testFileno(self):
rawio = MockRawIO()
bufio = self.tp(rawio)
@@ -431,6 +450,13 @@
# this test. Else, write it.
pass
+ def testInvalidArgs(self):
+ rawio = MockRawIO()
+ bufio = self.tp(rawio)
+ # Invalid whence
+ self.assertRaises(ValueError, bufio.seek, 0, -1)
+ self.assertRaises(ValueError, bufio.seek, 0, 3)
+
def testOverrideDestructor(self):
tp = self.tp
record = []
@@ -470,6 +496,7 @@
# a ValueError.
self.assertRaises(ValueError, _with)
+
class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
tp = io.BufferedReader
read_mode = "rb"
@@ -497,6 +524,8 @@
rawio = MockRawIO((b"abc", b"d", b"efg"))
bufio = self.tp(rawio)
self.assertEquals(b"abcdef", bufio.read(6))
+ # Invalid args
+ self.assertRaises(ValueError, bufio.read, -2)
def testRead1(self):
rawio = MockRawIO((b"abc", b"d", b"efg"))
@@ -511,6 +540,8 @@
self.assertEquals(b"efg", bufio.read1(100))
self.assertEquals(rawio._reads, 3)
self.assertEquals(b"", bufio.read1(100))
+ # Invalid args
+ self.assertRaises(ValueError, bufio.read1, -1)
def testReadinto(self):
rawio = MockRawIO((b"abc", b"d", b"efg"))
@@ -612,6 +643,13 @@
finally:
support.unlink(support.TESTFN)
+ def testMisbehavedRawIO(self):
+ rawio = MisbehavedRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ self.assertRaises(IOError, bufio.seek, 0)
+ self.assertRaises(IOError, bufio.tell)
+ self.assertRaises(IOError, bufio.read, 10)
+
class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
tp = io.BufferedWriter
@@ -812,6 +850,13 @@
finally:
support.unlink(support.TESTFN)
+ def testMisbehavedRawIO(self):
+ rawio = MisbehavedRawIO()
+ bufio = self.tp(rawio, 5)
+ self.assertRaises(IOError, bufio.seek, 0)
+ self.assertRaises(IOError, bufio.tell)
+ self.assertRaises(IOError, bufio.write, b"abcdef")
+
class BufferedRWPairTest(unittest.TestCase):
@@ -948,6 +993,10 @@
bufio.readinto(bytearray(1))
self.check_writes(_read)
+ def testMisbehavedRawIO(self):
+ BufferedReaderTest.testMisbehavedRawIO(self)
+ BufferedWriterTest.testMisbehavedRawIO(self)
+
# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
# properties:
@@ -1381,6 +1430,8 @@
def testSeekAndTell(self):
"""Test seek/tell using the StatefulIncrementalDecoder."""
+ # Make test faster by doing smaller seeks
+ CHUNK_SIZE = 256
def testSeekAndTellWithData(data, min_pos=0):
"""Tell/seek to various points within a data stream and ensure
@@ -1389,6 +1440,7 @@
f.write(data)
f.close()
f = io.open(support.TESTFN, encoding='test_decoder')
+ f._CHUNK_SIZE = CHUNK_SIZE
decoded = f.read()
f.close()
@@ -1412,7 +1464,7 @@
testSeekAndTellWithData(input)
# Position each test case so that it crosses a chunk boundary.
- CHUNK_SIZE = _default_chunk_size()
+ #CHUNK_SIZE = _default_chunk_size()
for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
offset = CHUNK_SIZE - len(input)//2
prefix = b'.'*offset
More information about the Python-checkins
mailing list