Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import support
      3 import binascii
      4 import pickle
      5 import random
      6 import sys
      7 from test.support import bigmemtest, _1G, _4G
      8 
      9 zlib = support.import_module('zlib')
     10 
     11 requires_Compress_copy = unittest.skipUnless(
     12         hasattr(zlib.compressobj(), "copy"),
     13         'requires Compress.copy()')
     14 requires_Decompress_copy = unittest.skipUnless(
     15         hasattr(zlib.decompressobj(), "copy"),
     16         'requires Decompress.copy()')
     17 
     18 
     19 class VersionTestCase(unittest.TestCase):
     20 
     21     def test_library_version(self):
     22         # Test that the major version of the actual library in use matches the
     23         # major version that we were compiled against. We can't guarantee that
     24         # the minor versions will match (even on the machine on which the module
     25         # was compiled), and the API is stable between minor versions, so
     26         # testing only the major versions avoids spurious failures.
     27         self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
     28 
     29 
     30 class ChecksumTestCase(unittest.TestCase):
     31     # checksum test cases
     32     def test_crc32start(self):
     33         self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
     34         self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
     35 
     36     def test_crc32empty(self):
     37         self.assertEqual(zlib.crc32(b"", 0), 0)
     38         self.assertEqual(zlib.crc32(b"", 1), 1)
     39         self.assertEqual(zlib.crc32(b"", 432), 432)
     40 
     41     def test_adler32start(self):
     42         self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
     43         self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
     44 
     45     def test_adler32empty(self):
     46         self.assertEqual(zlib.adler32(b"", 0), 0)
     47         self.assertEqual(zlib.adler32(b"", 1), 1)
     48         self.assertEqual(zlib.adler32(b"", 432), 432)
     49 
     50     def test_penguins(self):
     51         self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
     52         self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
     53         self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
     54         self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
     55 
     56         self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
     57         self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
     58 
     59     def test_crc32_adler32_unsigned(self):
     60         foo = b'abcdefghijklmnop'
     61         # explicitly test signed behavior
     62         self.assertEqual(zlib.crc32(foo), 2486878355)
     63         self.assertEqual(zlib.crc32(b'spam'), 1138425661)
     64         self.assertEqual(zlib.adler32(foo+foo), 3573550353)
     65         self.assertEqual(zlib.adler32(b'spam'), 72286642)
     66 
     67     def test_same_as_binascii_crc32(self):
     68         foo = b'abcdefghijklmnop'
     69         crc = 2486878355
     70         self.assertEqual(binascii.crc32(foo), crc)
     71         self.assertEqual(zlib.crc32(foo), crc)
     72         self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
     73 
     74 
     75 # Issue #10276 - check that inputs >=4GB are handled correctly.
     76 class ChecksumBigBufferTestCase(unittest.TestCase):
     77 
     78     @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
     79     def test_big_buffer(self, size):
     80         data = b"nyan" * (_1G + 1)
     81         self.assertEqual(zlib.crc32(data), 1044521549)
     82         self.assertEqual(zlib.adler32(data), 2256789997)
     83 
     84 
     85 class ExceptionTestCase(unittest.TestCase):
     86     # make sure we generate some expected errors
     87     def test_badlevel(self):
     88         # specifying compression level out of range causes an error
     89         # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
     90         # accepts 0 too)
     91         self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
     92 
     93     def test_badargs(self):
     94         self.assertRaises(TypeError, zlib.adler32)
     95         self.assertRaises(TypeError, zlib.crc32)
     96         self.assertRaises(TypeError, zlib.compress)
     97         self.assertRaises(TypeError, zlib.decompress)
     98         for arg in (42, None, '', 'abc', (), []):
     99             self.assertRaises(TypeError, zlib.adler32, arg)
    100             self.assertRaises(TypeError, zlib.crc32, arg)
    101             self.assertRaises(TypeError, zlib.compress, arg)
    102             self.assertRaises(TypeError, zlib.decompress, arg)
    103 
    104     def test_badcompressobj(self):
    105         # verify failure on building compress object with bad params
    106         self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
    107         # specifying total bits too large causes an error
    108         self.assertRaises(ValueError,
    109                 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
    110 
    111     def test_baddecompressobj(self):
    112         # verify failure on building decompress object with bad params
    113         self.assertRaises(ValueError, zlib.decompressobj, -1)
    114 
    115     def test_decompressobj_badflush(self):
    116         # verify failure on calling decompressobj.flush with bad params
    117         self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
    118         self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
    119 
    120     @support.cpython_only
    121     def test_overflow(self):
    122         with self.assertRaisesRegex(OverflowError, 'int too large'):
    123             zlib.decompress(b'', 15, sys.maxsize + 1)
    124         with self.assertRaisesRegex(OverflowError, 'int too large'):
    125             zlib.decompressobj().decompress(b'', sys.maxsize + 1)
    126         with self.assertRaisesRegex(OverflowError, 'int too large'):
    127             zlib.decompressobj().flush(sys.maxsize + 1)
    128 
    129 
    130 class BaseCompressTestCase(object):
    131     def check_big_compress_buffer(self, size, compress_func):
    132         _1M = 1024 * 1024
    133         # Generate 10MB worth of random, and expand it by repeating it.
    134         # The assumption is that zlib's memory is not big enough to exploit
    135         # such spread out redundancy.
    136         data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little')
    137                         for i in range(10)])
    138         data = data * (size // len(data) + 1)
    139         try:
    140             compress_func(data)
    141         finally:
    142             # Release memory
    143             data = None
    144 
    145     def check_big_decompress_buffer(self, size, decompress_func):
    146         data = b'x' * size
    147         try:
    148             compressed = zlib.compress(data, 1)
    149         finally:
    150             # Release memory
    151             data = None
    152         data = decompress_func(compressed)
    153         # Sanity check
    154         try:
    155             self.assertEqual(len(data), size)
    156             self.assertEqual(len(data.strip(b'x')), 0)
    157         finally:
    158             data = None
    159 
    160 
    161 class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
    162     # Test compression in one go (whole message compression)
    163     def test_speech(self):
    164         x = zlib.compress(HAMLET_SCENE)
    165         self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
    166 
    167     def test_keywords(self):
    168         x = zlib.compress(HAMLET_SCENE, level=3)
    169         self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
    170         with self.assertRaises(TypeError):
    171             zlib.compress(data=HAMLET_SCENE, level=3)
    172         self.assertEqual(zlib.decompress(x,
    173                                          wbits=zlib.MAX_WBITS,
    174                                          bufsize=zlib.DEF_BUF_SIZE),
    175                          HAMLET_SCENE)
    176 
    177     def test_speech128(self):
    178         # compress more data
    179         data = HAMLET_SCENE * 128
    180         x = zlib.compress(data)
    181         self.assertEqual(zlib.compress(bytearray(data)), x)
    182         for ob in x, bytearray(x):
    183             self.assertEqual(zlib.decompress(ob), data)
    184 
    185     def test_incomplete_stream(self):
    186         # A useful error message is given
    187         x = zlib.compress(HAMLET_SCENE)
    188         self.assertRaisesRegex(zlib.error,
    189             "Error -5 while decompressing data: incomplete or truncated stream",
    190             zlib.decompress, x[:-1])
    191 
    192     # Memory use of the following functions takes into account overallocation
    193 
    194     @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
    195     def test_big_compress_buffer(self, size):
    196         compress = lambda s: zlib.compress(s, 1)
    197         self.check_big_compress_buffer(size, compress)
    198 
    199     @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
    200     def test_big_decompress_buffer(self, size):
    201         self.check_big_decompress_buffer(size, zlib.decompress)
    202 
    203     @bigmemtest(size=_4G, memuse=1)
    204     def test_large_bufsize(self, size):
    205         # Test decompress(bufsize) parameter greater than the internal limit
    206         data = HAMLET_SCENE * 10
    207         compressed = zlib.compress(data, 1)
    208         self.assertEqual(zlib.decompress(compressed, 15, size), data)
    209 
    210     def test_custom_bufsize(self):
    211         data = HAMLET_SCENE * 10
    212         compressed = zlib.compress(data, 1)
    213         self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
    214 
    215     @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
    216     @bigmemtest(size=_4G + 100, memuse=4)
    217     def test_64bit_compress(self, size):
    218         data = b'x' * size
    219         try:
    220             comp = zlib.compress(data, 0)
    221             self.assertEqual(zlib.decompress(comp), data)
    222         finally:
    223             comp = data = None
    224 
    225 
    226 class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
    227     # Test compression object
    228     def test_pair(self):
    229         # straightforward compress/decompress objects
    230         datasrc = HAMLET_SCENE * 128
    231         datazip = zlib.compress(datasrc)
    232         # should compress both bytes and bytearray data
    233         for data in (datasrc, bytearray(datasrc)):
    234             co = zlib.compressobj()
    235             x1 = co.compress(data)
    236             x2 = co.flush()
    237             self.assertRaises(zlib.error, co.flush) # second flush should not work
    238             self.assertEqual(x1 + x2, datazip)
    239         for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
    240             dco = zlib.decompressobj()
    241             y1 = dco.decompress(v1 + v2)
    242             y2 = dco.flush()
    243             self.assertEqual(data, y1 + y2)
    244             self.assertIsInstance(dco.unconsumed_tail, bytes)
    245             self.assertIsInstance(dco.unused_data, bytes)
    246 
    247     def test_keywords(self):
    248         level = 2
    249         method = zlib.DEFLATED
    250         wbits = -12
    251         memLevel = 9
    252         strategy = zlib.Z_FILTERED
    253         co = zlib.compressobj(level=level,
    254                               method=method,
    255                               wbits=wbits,
    256                               memLevel=memLevel,
    257                               strategy=strategy,
    258                               zdict=b"")
    259         do = zlib.decompressobj(wbits=wbits, zdict=b"")
    260         with self.assertRaises(TypeError):
    261             co.compress(data=HAMLET_SCENE)
    262         with self.assertRaises(TypeError):
    263             do.decompress(data=zlib.compress(HAMLET_SCENE))
    264         x = co.compress(HAMLET_SCENE) + co.flush()
    265         y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
    266         self.assertEqual(HAMLET_SCENE, y)
    267 
    268     def test_compressoptions(self):
    269         # specify lots of options to compressobj()
    270         level = 2
    271         method = zlib.DEFLATED
    272         wbits = -12
    273         memLevel = 9
    274         strategy = zlib.Z_FILTERED
    275         co = zlib.compressobj(level, method, wbits, memLevel, strategy)
    276         x1 = co.compress(HAMLET_SCENE)
    277         x2 = co.flush()
    278         dco = zlib.decompressobj(wbits)
    279         y1 = dco.decompress(x1 + x2)
    280         y2 = dco.flush()
    281         self.assertEqual(HAMLET_SCENE, y1 + y2)
    282 
    283     def test_compressincremental(self):
    284         # compress object in steps, decompress object as one-shot
    285         data = HAMLET_SCENE * 128
    286         co = zlib.compressobj()
    287         bufs = []
    288         for i in range(0, len(data), 256):
    289             bufs.append(co.compress(data[i:i+256]))
    290         bufs.append(co.flush())
    291         combuf = b''.join(bufs)
    292 
    293         dco = zlib.decompressobj()
    294         y1 = dco.decompress(b''.join(bufs))
    295         y2 = dco.flush()
    296         self.assertEqual(data, y1 + y2)
    297 
    298     def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
    299         # compress object in steps, decompress object in steps
    300         source = source or HAMLET_SCENE
    301         data = source * 128
    302         co = zlib.compressobj()
    303         bufs = []
    304         for i in range(0, len(data), cx):
    305             bufs.append(co.compress(data[i:i+cx]))
    306         bufs.append(co.flush())
    307         combuf = b''.join(bufs)
    308 
    309         decombuf = zlib.decompress(combuf)
    310         # Test type of return value
    311         self.assertIsInstance(decombuf, bytes)
    312 
    313         self.assertEqual(data, decombuf)
    314 
    315         dco = zlib.decompressobj()
    316         bufs = []
    317         for i in range(0, len(combuf), dcx):
    318             bufs.append(dco.decompress(combuf[i:i+dcx]))
    319             self.assertEqual(b'', dco.unconsumed_tail, ########
    320                              "(A) uct should be b'': not %d long" %
    321                                        len(dco.unconsumed_tail))
    322             self.assertEqual(b'', dco.unused_data)
    323         if flush:
    324             bufs.append(dco.flush())
    325         else:
    326             while True:
    327                 chunk = dco.decompress(b'')
    328                 if chunk:
    329                     bufs.append(chunk)
    330                 else:
    331                     break
    332         self.assertEqual(b'', dco.unconsumed_tail, ########
    333                          "(B) uct should be b'': not %d long" %
    334                                        len(dco.unconsumed_tail))
    335         self.assertEqual(b'', dco.unused_data)
    336         self.assertEqual(data, b''.join(bufs))
    337         # Failure means: "decompressobj with init options failed"
    338 
    339     def test_decompincflush(self):
    340         self.test_decompinc(flush=True)
    341 
    342     def test_decompimax(self, source=None, cx=256, dcx=64):
    343         # compress in steps, decompress in length-restricted steps
    344         source = source or HAMLET_SCENE
    345         # Check a decompression object with max_length specified
    346         data = source * 128
    347         co = zlib.compressobj()
    348         bufs = []
    349         for i in range(0, len(data), cx):
    350             bufs.append(co.compress(data[i:i+cx]))
    351         bufs.append(co.flush())
    352         combuf = b''.join(bufs)
    353         self.assertEqual(data, zlib.decompress(combuf),
    354                          'compressed data failure')
    355 
    356         dco = zlib.decompressobj()
    357         bufs = []
    358         cb = combuf
    359         while cb:
    360             #max_length = 1 + len(cb)//10
    361             chunk = dco.decompress(cb, dcx)
    362             self.assertFalse(len(chunk) > dcx,
    363                     'chunk too big (%d>%d)' % (len(chunk), dcx))
    364             bufs.append(chunk)
    365             cb = dco.unconsumed_tail
    366         bufs.append(dco.flush())
    367         self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
    368 
    369     def test_decompressmaxlen(self, flush=False):
    370         # Check a decompression object with max_length specified
    371         data = HAMLET_SCENE * 128
    372         co = zlib.compressobj()
    373         bufs = []
    374         for i in range(0, len(data), 256):
    375             bufs.append(co.compress(data[i:i+256]))
    376         bufs.append(co.flush())
    377         combuf = b''.join(bufs)
    378         self.assertEqual(data, zlib.decompress(combuf),
    379                          'compressed data failure')
    380 
    381         dco = zlib.decompressobj()
    382         bufs = []
    383         cb = combuf
    384         while cb:
    385             max_length = 1 + len(cb)//10
    386             chunk = dco.decompress(cb, max_length)
    387             self.assertFalse(len(chunk) > max_length,
    388                         'chunk too big (%d>%d)' % (len(chunk),max_length))
    389             bufs.append(chunk)
    390             cb = dco.unconsumed_tail
    391         if flush:
    392             bufs.append(dco.flush())
    393         else:
    394             while chunk:
    395                 chunk = dco.decompress(b'', max_length)
    396                 self.assertFalse(len(chunk) > max_length,
    397                             'chunk too big (%d>%d)' % (len(chunk),max_length))
    398                 bufs.append(chunk)
    399         self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
    400 
    401     def test_decompressmaxlenflush(self):
    402         self.test_decompressmaxlen(flush=True)
    403 
    404     def test_maxlenmisc(self):
    405         # Misc tests of max_length
    406         dco = zlib.decompressobj()
    407         self.assertRaises(ValueError, dco.decompress, b"", -1)
    408         self.assertEqual(b'', dco.unconsumed_tail)
    409 
    410     def test_maxlen_large(self):
    411         # Sizes up to sys.maxsize should be accepted, although zlib is
    412         # internally limited to expressing sizes with unsigned int
    413         data = HAMLET_SCENE * 10
    414         self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
    415         compressed = zlib.compress(data, 1)
    416         dco = zlib.decompressobj()
    417         self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
    418 
    419     def test_maxlen_custom(self):
    420         data = HAMLET_SCENE * 10
    421         compressed = zlib.compress(data, 1)
    422         dco = zlib.decompressobj()
    423         self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
    424 
    425     def test_clear_unconsumed_tail(self):
    426         # Issue #12050: calling decompress() without providing max_length
    427         # should clear the unconsumed_tail attribute.
    428         cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
    429         dco = zlib.decompressobj()
    430         ddata = dco.decompress(cdata, 1)
    431         ddata += dco.decompress(dco.unconsumed_tail)
    432         self.assertEqual(dco.unconsumed_tail, b"")
    433 
    434     def test_flushes(self):
    435         # Test flush() with the various options, using all the
    436         # different levels in order to provide more variations.
    437         sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
    438         sync_opt = [getattr(zlib, opt) for opt in sync_opt
    439                     if hasattr(zlib, opt)]
    440         data = HAMLET_SCENE * 8
    441 
    442         for sync in sync_opt:
    443             for level in range(10):
    444                 obj = zlib.compressobj( level )
    445                 a = obj.compress( data[:3000] )
    446                 b = obj.flush( sync )
    447                 c = obj.compress( data[3000:] )
    448                 d = obj.flush()
    449                 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
    450                                  data, ("Decompress failed: flush "
    451                                         "mode=%i, level=%i") % (sync, level))
    452                 del obj
    453 
    454     @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
    455                          'requires zlib.Z_SYNC_FLUSH')
    456     def test_odd_flush(self):
    457         # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
    458         import random
    459         # Testing on 17K of "random" data
    460 
    461         # Create compressor and decompressor objects
    462         co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
    463         dco = zlib.decompressobj()
    464 
    465         # Try 17K of data
    466         # generate random data stream
    467         try:
    468             # In 2.3 and later, WichmannHill is the RNG of the bug report
    469             gen = random.WichmannHill()
    470         except AttributeError:
    471             try:
    472                 # 2.2 called it Random
    473                 gen = random.Random()
    474             except AttributeError:
    475                 # others might simply have a single RNG
    476                 gen = random
    477         gen.seed(1)
    478         data = genblock(1, 17 * 1024, generator=gen)
    479 
    480         # compress, sync-flush, and decompress
    481         first = co.compress(data)
    482         second = co.flush(zlib.Z_SYNC_FLUSH)
    483         expanded = dco.decompress(first + second)
    484 
    485         # if decompressed data is different from the input data, choke.
    486         self.assertEqual(expanded, data, "17K random source doesn't match")
    487 
    488     def test_empty_flush(self):
    489         # Test that calling .flush() on unused objects works.
    490         # (Bug #1083110 -- calling .flush() on decompress objects
    491         # caused a core dump.)
    492 
    493         co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
    494         self.assertTrue(co.flush())  # Returns a zlib header
    495         dco = zlib.decompressobj()
    496         self.assertEqual(dco.flush(), b"") # Returns nothing
    497 
    498     def test_dictionary(self):
    499         h = HAMLET_SCENE
    500         # Build a simulated dictionary out of the words in HAMLET.
    501         words = h.split()
    502         random.shuffle(words)
    503         zdict = b''.join(words)
    504         # Use it to compress HAMLET.
    505         co = zlib.compressobj(zdict=zdict)
    506         cd = co.compress(h) + co.flush()
    507         # Verify that it will decompress with the dictionary.
    508         dco = zlib.decompressobj(zdict=zdict)
    509         self.assertEqual(dco.decompress(cd) + dco.flush(), h)
    510         # Verify that it fails when not given the dictionary.
    511         dco = zlib.decompressobj()
    512         self.assertRaises(zlib.error, dco.decompress, cd)
    513 
    514     def test_dictionary_streaming(self):
    515         # This simulates the reuse of a compressor object for compressing
    516         # several separate data streams.
    517         co = zlib.compressobj(zdict=HAMLET_SCENE)
    518         do = zlib.decompressobj(zdict=HAMLET_SCENE)
    519         piece = HAMLET_SCENE[1000:1500]
    520         d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
    521         d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
    522         d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
    523         self.assertEqual(do.decompress(d0), piece)
    524         self.assertEqual(do.decompress(d1), piece[100:])
    525         self.assertEqual(do.decompress(d2), piece[:-100])
    526 
    527     def test_decompress_incomplete_stream(self):
    528         # This is 'foo', deflated
    529         x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
    530         # For the record
    531         self.assertEqual(zlib.decompress(x), b'foo')
    532         self.assertRaises(zlib.error, zlib.decompress, x[:-5])
    533         # Omitting the stream end works with decompressor objects
    534         # (see issue #8672).
    535         dco = zlib.decompressobj()
    536         y = dco.decompress(x[:-5])
    537         y += dco.flush()
    538         self.assertEqual(y, b'foo')
    539 
    540     def test_decompress_eof(self):
    541         x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
    542         dco = zlib.decompressobj()
    543         self.assertFalse(dco.eof)
    544         dco.decompress(x[:-5])
    545         self.assertFalse(dco.eof)
    546         dco.decompress(x[-5:])
    547         self.assertTrue(dco.eof)
    548         dco.flush()
    549         self.assertTrue(dco.eof)
    550 
    551     def test_decompress_eof_incomplete_stream(self):
    552         x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
    553         dco = zlib.decompressobj()
    554         self.assertFalse(dco.eof)
    555         dco.decompress(x[:-5])
    556         self.assertFalse(dco.eof)
    557         dco.flush()
    558         self.assertFalse(dco.eof)
    559 
    560     def test_decompress_unused_data(self):
    561         # Repeated calls to decompress() after EOF should accumulate data in
    562         # dco.unused_data, instead of just storing the arg to the last call.
    563         source = b'abcdefghijklmnopqrstuvwxyz'
    564         remainder = b'0123456789'
    565         y = zlib.compress(source)
    566         x = y + remainder
    567         for maxlen in 0, 1000:
    568             for step in 1, 2, len(y), len(x):
    569                 dco = zlib.decompressobj()
    570                 data = b''
    571                 for i in range(0, len(x), step):
    572                     if i < len(y):
    573                         self.assertEqual(dco.unused_data, b'')
    574                     if maxlen == 0:
    575                         data += dco.decompress(x[i : i + step])
    576                         self.assertEqual(dco.unconsumed_tail, b'')
    577                     else:
    578                         data += dco.decompress(
    579                                 dco.unconsumed_tail + x[i : i + step], maxlen)
    580                 data += dco.flush()
    581                 self.assertTrue(dco.eof)
    582                 self.assertEqual(data, source)
    583                 self.assertEqual(dco.unconsumed_tail, b'')
    584                 self.assertEqual(dco.unused_data, remainder)
    585 
    586     # issue27164
    587     def test_decompress_raw_with_dictionary(self):
    588         zdict = b'abcdefghijklmnopqrstuvwxyz'
    589         co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
    590         comp = co.compress(zdict) + co.flush()
    591         dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
    592         uncomp = dco.decompress(comp) + dco.flush()
    593         self.assertEqual(zdict, uncomp)
    594 
    595     def test_flush_with_freed_input(self):
    596         # Issue #16411: decompressor accesses input to last decompress() call
    597         # in flush(), even if this object has been freed in the meanwhile.
    598         input1 = b'abcdefghijklmnopqrstuvwxyz'
    599         input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
    600         data = zlib.compress(input1)
    601         dco = zlib.decompressobj()
    602         dco.decompress(data, 1)
    603         del data
    604         data = zlib.compress(input2)
    605         self.assertEqual(dco.flush(), input1[1:])
    606 
    607     @bigmemtest(size=_4G, memuse=1)
    608     def test_flush_large_length(self, size):
    609         # Test flush(length) parameter greater than internal limit UINT_MAX
    610         input = HAMLET_SCENE * 10
    611         data = zlib.compress(input, 1)
    612         dco = zlib.decompressobj()
    613         dco.decompress(data, 1)
    614         self.assertEqual(dco.flush(size), input[1:])
    615 
    616     def test_flush_custom_length(self):
    617         input = HAMLET_SCENE * 10
    618         data = zlib.compress(input, 1)
    619         dco = zlib.decompressobj()
    620         dco.decompress(data, 1)
    621         self.assertEqual(dco.flush(CustomInt()), input[1:])
    622 
    623     @requires_Compress_copy
    624     def test_compresscopy(self):
    625         # Test copying a compression object
    626         data0 = HAMLET_SCENE
    627         data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
    628         c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
    629         bufs0 = []
    630         bufs0.append(c0.compress(data0))
    631 
    632         c1 = c0.copy()
    633         bufs1 = bufs0[:]
    634 
    635         bufs0.append(c0.compress(data0))
    636         bufs0.append(c0.flush())
    637         s0 = b''.join(bufs0)
    638 
    639         bufs1.append(c1.compress(data1))
    640         bufs1.append(c1.flush())
    641         s1 = b''.join(bufs1)
    642 
    643         self.assertEqual(zlib.decompress(s0),data0+data0)
    644         self.assertEqual(zlib.decompress(s1),data0+data1)
    645 
    646     @requires_Compress_copy
    647     def test_badcompresscopy(self):
    648         # Test copying a compression object in an inconsistent state
    649         c = zlib.compressobj()
    650         c.compress(HAMLET_SCENE)
    651         c.flush()
    652         self.assertRaises(ValueError, c.copy)
    653 
    654     @requires_Decompress_copy
    655     def test_decompresscopy(self):
    656         # Test copying a decompression object
    657         data = HAMLET_SCENE
    658         comp = zlib.compress(data)
    659         # Test type of return value
    660         self.assertIsInstance(comp, bytes)
    661 
    662         d0 = zlib.decompressobj()
    663         bufs0 = []
    664         bufs0.append(d0.decompress(comp[:32]))
    665 
    666         d1 = d0.copy()
    667         bufs1 = bufs0[:]
    668 
    669         bufs0.append(d0.decompress(comp[32:]))
    670         s0 = b''.join(bufs0)
    671 
    672         bufs1.append(d1.decompress(comp[32:]))
    673         s1 = b''.join(bufs1)
    674 
    675         self.assertEqual(s0,s1)
    676         self.assertEqual(s0,data)
    677 
    678     @requires_Decompress_copy
    679     def test_baddecompresscopy(self):
    680         # Test copying a compression object in an inconsistent state
    681         data = zlib.compress(HAMLET_SCENE)
    682         d = zlib.decompressobj()
    683         d.decompress(data)
    684         d.flush()
    685         self.assertRaises(ValueError, d.copy)
    686 
    687     def test_compresspickle(self):
    688         for proto in range(pickle.HIGHEST_PROTOCOL + 1):
    689             with self.assertRaises((TypeError, pickle.PicklingError)):
    690                 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
    691 
    692     def test_decompresspickle(self):
    693         for proto in range(pickle.HIGHEST_PROTOCOL + 1):
    694             with self.assertRaises((TypeError, pickle.PicklingError)):
    695                 pickle.dumps(zlib.decompressobj(), proto)
    696 
    697     # Memory use of the following functions takes into account overallocation
    698 
    699     @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
    700     def test_big_compress_buffer(self, size):
    701         c = zlib.compressobj(1)
    702         compress = lambda s: c.compress(s) + c.flush()
    703         self.check_big_compress_buffer(size, compress)
    704 
    705     @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
    706     def test_big_decompress_buffer(self, size):
    707         d = zlib.decompressobj()
    708         decompress = lambda s: d.decompress(s) + d.flush()
    709         self.check_big_decompress_buffer(size, decompress)
    710 
    711     @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
    712     @bigmemtest(size=_4G + 100, memuse=4)
    713     def test_64bit_compress(self, size):
    714         data = b'x' * size
    715         co = zlib.compressobj(0)
    716         do = zlib.decompressobj()
    717         try:
    718             comp = co.compress(data) + co.flush()
    719             uncomp = do.decompress(comp) + do.flush()
    720             self.assertEqual(uncomp, data)
    721         finally:
    722             comp = uncomp = data = None
    723 
    724     @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
    725     @bigmemtest(size=_4G + 100, memuse=3)
    726     def test_large_unused_data(self, size):
    727         data = b'abcdefghijklmnop'
    728         unused = b'x' * size
    729         comp = zlib.compress(data) + unused
    730         do = zlib.decompressobj()
    731         try:
    732             uncomp = do.decompress(comp) + do.flush()
    733             self.assertEqual(unused, do.unused_data)
    734             self.assertEqual(uncomp, data)
    735         finally:
    736             unused = comp = do = None
    737 
    738     @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
    739     @bigmemtest(size=_4G + 100, memuse=5)
    740     def test_large_unconsumed_tail(self, size):
    741         data = b'x' * size
    742         do = zlib.decompressobj()
    743         try:
    744             comp = zlib.compress(data, 0)
    745             uncomp = do.decompress(comp, 1) + do.flush()
    746             self.assertEqual(uncomp, data)
    747             self.assertEqual(do.unconsumed_tail, b'')
    748         finally:
    749             comp = uncomp = data = None
    750 
    751     def test_wbits(self):
    752         # wbits=0 only supported since zlib v1.2.3.5
    753         # Register "1.2.3" as "1.2.3.0"
    754         v = (zlib.ZLIB_RUNTIME_VERSION + ".0").split(".", 4)
    755         supports_wbits_0 = int(v[0]) > 1 or int(v[0]) == 1 \
    756             and (int(v[1]) > 2 or int(v[1]) == 2
    757             and (int(v[2]) > 3 or int(v[2]) == 3 and int(v[3]) >= 5))
    758 
    759         co = zlib.compressobj(level=1, wbits=15)
    760         zlib15 = co.compress(HAMLET_SCENE) + co.flush()
    761         self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
    762         if supports_wbits_0:
    763             self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
    764         self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
    765         with self.assertRaisesRegex(zlib.error, 'invalid window size'):
    766             zlib.decompress(zlib15, 14)
    767         dco = zlib.decompressobj(wbits=32 + 15)
    768         self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
    769         dco = zlib.decompressobj(wbits=14)
    770         with self.assertRaisesRegex(zlib.error, 'invalid window size'):
    771             dco.decompress(zlib15)
    772 
    773         co = zlib.compressobj(level=1, wbits=9)
    774         zlib9 = co.compress(HAMLET_SCENE) + co.flush()
    775         self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
    776         self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
    777         if supports_wbits_0:
    778             self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
    779         self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
    780         dco = zlib.decompressobj(wbits=32 + 9)
    781         self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
    782 
    783         co = zlib.compressobj(level=1, wbits=-15)
    784         deflate15 = co.compress(HAMLET_SCENE) + co.flush()
    785         self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
    786         dco = zlib.decompressobj(wbits=-15)
    787         self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
    788 
    789         co = zlib.compressobj(level=1, wbits=-9)
    790         deflate9 = co.compress(HAMLET_SCENE) + co.flush()
    791         self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
    792         self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
    793         dco = zlib.decompressobj(wbits=-9)
    794         self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
    795 
    796         co = zlib.compressobj(level=1, wbits=16 + 15)
    797         gzip = co.compress(HAMLET_SCENE) + co.flush()
    798         self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
    799         self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
    800         dco = zlib.decompressobj(32 + 15)
    801         self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
    802 
    803 
    804 def genblock(seed, length, step=1024, generator=random):
    805     """length-byte stream of random data from a seed (in step-byte blocks)."""
    806     if seed is not None:
    807         generator.seed(seed)
    808     randint = generator.randint
    809     if length < step or step < 2:
    810         step = length
    811     blocks = bytes()
    812     for i in range(0, length, step):
    813         blocks += bytes(randint(0, 255) for x in range(step))
    814     return blocks
    815 
    816 
    817 
    818 def choose_lines(source, number, seed=None, generator=random):
    819     """Return a list of number lines randomly chosen from the source"""
    820     if seed is not None:
    821         generator.seed(seed)
    822     sources = source.split('\n')
    823     return [generator.choice(sources) for n in range(number)]
    824 
    825 
    826 
    827 HAMLET_SCENE = b"""
    828 LAERTES
    829 
    830        O, fear me not.
    831        I stay too long: but here my father comes.
    832 
    833        Enter POLONIUS
    834 
    835        A double blessing is a double grace,
    836        Occasion smiles upon a second leave.
    837 
    838 LORD POLONIUS
    839 
    840        Yet here, Laertes! aboard, aboard, for shame!
    841        The wind sits in the shoulder of your sail,
    842        And you are stay'd for. There; my blessing with thee!
    843        And these few precepts in thy memory
    844        See thou character. Give thy thoughts no tongue,
    845        Nor any unproportioned thought his act.
    846        Be thou familiar, but by no means vulgar.
    847        Those friends thou hast, and their adoption tried,
    848        Grapple them to thy soul with hoops of steel;
    849        But do not dull thy palm with entertainment
    850        Of each new-hatch'd, unfledged comrade. Beware
    851        Of entrance to a quarrel, but being in,
    852        Bear't that the opposed may beware of thee.
    853        Give every man thy ear, but few thy voice;
    854        Take each man's censure, but reserve thy judgment.
    855        Costly thy habit as thy purse can buy,
    856        But not express'd in fancy; rich, not gaudy;
    857        For the apparel oft proclaims the man,
    858        And they in France of the best rank and station
    859        Are of a most select and generous chief in that.
    860        Neither a borrower nor a lender be;
    861        For loan oft loses both itself and friend,
    862        And borrowing dulls the edge of husbandry.
    863        This above all: to thine ownself be true,
    864        And it must follow, as the night the day,
    865        Thou canst not then be false to any man.
    866        Farewell: my blessing season this in thee!
    867 
    868 LAERTES
    869 
    870        Most humbly do I take my leave, my lord.
    871 
    872 LORD POLONIUS
    873 
    874        The time invites you; go; your servants tend.
    875 
    876 LAERTES
    877 
    878        Farewell, Ophelia; and remember well
    879        What I have said to you.
    880 
    881 OPHELIA
    882 
    883        'Tis in my memory lock'd,
    884        And you yourself shall keep the key of it.
    885 
    886 LAERTES
    887 
    888        Farewell.
    889 """
    890 
    891 
    892 class CustomInt:
    893     def __int__(self):
    894         return 100
    895 
    896 
    897 if __name__ == "__main__":
    898     unittest.main()
    899