# HG changeset patch # Parent 5c8ffd2e3119ca55aa5a91aa9d91305d1aac4bd0 # Parent 4b64a049f451f74c0278a0d496fe3ade4162abd6 Issue #20132: Fix base64 and bz2 incremental decoders diff -r 4b64a049f451 Lib/encodings/base64_codec.py --- a/Lib/encodings/base64_codec.py Fri Aug 19 12:00:13 2016 +0300 +++ b/Lib/encodings/base64_codec.py Fri Aug 19 11:37:20 2016 +0000 @@ -7,6 +7,8 @@ import codecs import base64 +import binascii +import re ### Codec APIs @@ -29,10 +31,16 @@ assert self.errors == 'strict' return base64.encodebytes(input) -class IncrementalDecoder(codecs.IncrementalDecoder): - def decode(self, input, final=False): +class IncrementalDecoder(codecs.BufferedIncrementalDecoder): + def _buffer_decode(self, input, errors, final): assert self.errors == 'strict' - return base64.decodebytes(input) + if not final: + input = chunk_re.match(input).group() + return (binascii.a2b_base64(input), len(input)) + +# Without the second level of brackets, this raises a "multiple repeat" error +chunk_re = br'(?: (?: [^A-Za-z0-9+/=]* [A-Za-z0-9+/=] ){4} )*' +chunk_re = re.compile(chunk_re, re.VERBOSE) class StreamWriter(Codec, codecs.StreamWriter): charbuffertype = bytes diff -r 4b64a049f451 Lib/encodings/bz2_codec.py --- a/Lib/encodings/bz2_codec.py Fri Aug 19 12:00:13 2016 +0300 +++ b/Lib/encodings/bz2_codec.py Fri Aug 19 11:37:20 2016 +0000 @@ -52,7 +52,7 @@ try: return self.decompressobj.decompress(input) except EOFError: - return '' + return b'' def reset(self): self.decompressobj = bz2.BZ2Decompressor() diff -r 4b64a049f451 Lib/test/test_codecs.py --- a/Lib/test/test_codecs.py Fri Aug 19 12:00:13 2016 +0300 +++ b/Lib/test/test_codecs.py Fri Aug 19 11:37:20 2016 +0000 @@ -2681,6 +2681,29 @@ self.assertEqual(size, len(o)) self.assertEqual(i, binput) + def test_inc_decode(self): + data = b"\x80data" + for encoding in bytes_transform_encodings: + with self.subTest(encoding=encoding): + sin = codecs.encode(data, encoding) + decoder = codecs.getincrementaldecoder(encoding)() + self.assertEqual(decoder.decode(sin, final=True), data) + if encoding == "uu_codec": + continue # Broken incremental decoder; see Issue 20132 + decoder.reset() + sout1 = decoder.decode(sin) + sout2 = decoder.decode(b"", final=True) + self.assertEqual(sout1 + sout2, data) + + if encoding in {"hex_codec", "quopri_codec"}: + continue # Broken incremental decoders; see Issue 20132 + decoder.reset() + buffer = bytearray() + for byte in sin: + buffer += decoder.decode(bytes((byte,))) + buffer += decoder.decode(b"", final=True) + self.assertEqual(buffer, data) + def test_read(self): for encoding in bytes_transform_encodings: with self.subTest(encoding=encoding): @@ -2759,6 +2782,33 @@ bad_input.decode("rot_13") self.assertIsNone(failure.exception.__cause__) + def test_decode_incomplete(self): + # Test handling of incomplete data. + # The stateless and incremental decoders should raise ValueError. + special_cases = dict( + base64_codec=(b"x", -2), # Truncate newline and padding char + quopri_codec=(None, None), # Seems to tolerate any truncation + ) + other_cases = (b"data", -1) + for encoding in bytes_transform_encodings: + data, offset = special_cases.get(encoding, other_cases) + if data is None: + continue + with self.subTest(encoding=encoding): + truncated = codecs.encode(data, encoding)[:offset] + exception = ValueError + if encoding == "zlib_codec": # Doesn't raise ValueError + exception = zlib.error + decoder = codecs.getdecoder(encoding) + self.assertRaises(exception, decoder, truncated) + + # The zlib and bz2 incremental decoders do not check for + # truncated data + if encoding not in {"zlib_codec", "bz2_codec"}: + decoder = codecs.getincrementaldecoder(encoding)() + self.assertRaises(ValueError, + decoder.decode, truncated, final=True) + @unittest.skipUnless(zlib, "Requires zlib support") def test_custom_zlib_error_is_wrapped(self): # Check zlib codec gives a good error for malformed input @@ -2788,6 +2838,39 @@ info = codecs.lookup(alias) self.assertEqual(info.name, expected_name) + def test_iterdecode(self): + # Exercise incremental decoders with a variety of input + tests = ( + # (encoding, {"good": (input, ...), "bad": ...}, ...) + ("base64-codec", { + "good": ( + (b" AZ \n az \r 09 - +/ _ == ",), + (b"AA", b"", b"AAB", b"BBB", b""), + (b"AAA", b"="), + (b"AA", b"=", b"="), + (b"AAAA BBBB CCCC",), + (b"AAAA BBBB CCC", b"C"), + ), + "bad": ( + (b"A",), + (b"AAA",), + (b"A=", b"="), + ), + }), + ) + for encoding, good_bad in tests: + for input in good_bad["good"]: + with self.subTest(encoding=encoding, input=input): + generator = codecs.iterdecode(iter(input), encoding) + decoded = b"".join(generator) + expected = codecs.decode(b"".join(input), encoding) + self.assertEqual(decoded, expected) + for input in good_bad["bad"]: + with self.subTest(encoding=encoding, input=input), \ + self.assertRaises(ValueError): + for _ in codecs.iterdecode(iter(input), encoding): + pass + def test_quopri_stateless(self): # Should encode with quotetabs=True encoded = codecs.encode(b"space tab\teol \n", "quopri-codec") diff -r 4b64a049f451 Misc/NEWS --- a/Misc/NEWS Fri Aug 19 12:00:13 2016 +0300 +++ b/Misc/NEWS Fri Aug 19 11:37:20 2016 +0000 @@ -20,6 +20,11 @@ Library ------- +- Issue #20132: Fix the base64-codec and bz2-codec incremental decoders. + Base64-codec now works even when sets of four base-64 codes are split + across multiple input chunks. Bz2-codec now returns a bytes object at EOF, + rather than a text str object. + - Issue #2466: posixpath.ismount now correctly recognizes mount points which the user does not have permission to access.