Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support as support
      3 from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
      4 import binascii
      5 import pickle
      6 import random
      7 from test.test_support import precisionbigmemtest, _1G, _4G
      8 import sys
      9 
     10 try:
     11     import mmap
     12 except ImportError:
     13     mmap = None
     14 
     15 zlib = import_module('zlib')
     16 
     17 requires_Compress_copy = unittest.skipUnless(
     18         hasattr(zlib.compressobj(), "copy"),
     19         'requires Compress.copy()')
     20 requires_Decompress_copy = unittest.skipUnless(
     21         hasattr(zlib.decompressobj(), "copy"),
     22         'requires Decompress.copy()')
     23 
     24 
     25 class ChecksumTestCase(unittest.TestCase):
     26     # checksum test cases
     27     def test_crc32start(self):
     28         self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
     29         self.assertTrue(zlib.crc32("abc", 0xffffffff))
     30 
     31     def test_crc32empty(self):
     32         self.assertEqual(zlib.crc32("", 0), 0)
     33         self.assertEqual(zlib.crc32("", 1), 1)
     34         self.assertEqual(zlib.crc32("", 432), 432)
     35 
     36     def test_adler32start(self):
     37         self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
     38         self.assertTrue(zlib.adler32("abc", 0xffffffff))
     39 
     40     def test_adler32empty(self):
     41         self.assertEqual(zlib.adler32("", 0), 0)
     42         self.assertEqual(zlib.adler32("", 1), 1)
     43         self.assertEqual(zlib.adler32("", 432), 432)
     44 
     45     def assertEqual32(self, seen, expected):
     46         # 32-bit values masked -- checksums on 32- vs 64- bit machines
     47         # This is important if bit 31 (0x08000000L) is set.
     48         self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
     49 
     50     def test_penguins(self):
     51         self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
     52         self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
     53         self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
     54         self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
     55 
     56         self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
     57         self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
     58 
     59     def test_abcdefghijklmnop(self):
     60         """test issue1202 compliance: signed crc32, adler32 in 2.x"""
     61         foo = 'abcdefghijklmnop'
     62         # explicitly test signed behavior
     63         self.assertEqual(zlib.crc32(foo), -1808088941)
     64         self.assertEqual(zlib.crc32('spam'), 1138425661)
     65         self.assertEqual(zlib.adler32(foo+foo), -721416943)
     66         self.assertEqual(zlib.adler32('spam'), 72286642)
     67 
     68     def test_same_as_binascii_crc32(self):
     69         foo = 'abcdefghijklmnop'
     70         self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
     71         self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
     72 
     73     def test_negative_crc_iv_input(self):
     74         # The range of valid input values for the crc state should be
     75         # -2**31 through 2**32-1 to allow inputs artifically constrained
     76         # to a signed 32-bit integer.
     77         self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
     78         self.assertEqual(zlib.crc32('spam', -3141593),
     79                          zlib.crc32('spam',  0xffd01027L))
     80         self.assertEqual(zlib.crc32('spam', -(2**31)),
     81                          zlib.crc32('spam',  (2**31)))
     82 
     83 
     84 # Issue #10276 - check that inputs >=4GB are handled correctly.
     85 class ChecksumBigBufferTestCase(unittest.TestCase):
     86 
     87     @precisionbigmemtest(size=_4G + 4, memuse=1, dry_run=False)
     88     def test_big_buffer(self, size):
     89         data = b"nyan" * (_1G + 1)
     90         self.assertEqual(zlib.crc32(data) & 0xFFFFFFFF, 1044521549)
     91         self.assertEqual(zlib.adler32(data) & 0xFFFFFFFF, 2256789997)
     92 
     93 
     94 class ExceptionTestCase(unittest.TestCase):
     95     # make sure we generate some expected errors
     96     def test_badlevel(self):
     97         # specifying compression level out of range causes an error
     98         # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
     99         # accepts 0 too)
    100         self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
    101 
    102     def test_badcompressobj(self):
    103         # verify failure on building compress object with bad params
    104         self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
    105         # specifying total bits too large causes an error
    106         self.assertRaises(ValueError,
    107                 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
    108 
    109     def test_baddecompressobj(self):
    110         # verify failure on building decompress object with bad params
    111         self.assertRaises(ValueError, zlib.decompressobj, -1)
    112 
    113     def test_decompressobj_badflush(self):
    114         # verify failure on calling decompressobj.flush with bad params
    115         self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
    116         self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
    117 
    118     @support.cpython_only
    119     def test_overflow(self):
    120         with self.assertRaisesRegexp(OverflowError, 'int too large'):
    121             zlib.decompress(b'', 15, sys.maxsize + 1)
    122         with self.assertRaisesRegexp(OverflowError, 'int too large'):
    123             zlib.decompressobj().decompress(b'', sys.maxsize + 1)
    124         with self.assertRaisesRegexp(OverflowError, 'int too large'):
    125             zlib.decompressobj().flush(sys.maxsize + 1)
    126 
    127 
    128 class BaseCompressTestCase(object):
    129     def check_big_compress_buffer(self, size, compress_func):
    130         _1M = 1024 * 1024
    131         fmt = "%%0%dx" % (2 * _1M)
    132         # Generate 10MB worth of random, and expand it by repeating it.
    133         # The assumption is that zlib's memory is not big enough to exploit
    134         # such spread out redundancy.
    135         data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
    136                         for i in range(10)])
    137         data = data * (size // len(data) + 1)
    138         try:
    139             compress_func(data)
    140         finally:
    141             # Release memory
    142             data = None
    143 
    144     def check_big_decompress_buffer(self, size, decompress_func):
    145         data = 'x' * size
    146         try:
    147             compressed = zlib.compress(data, 1)
    148         finally:
    149             # Release memory
    150             data = None
    151         data = decompress_func(compressed)
    152         # Sanity check
    153         try:
    154             self.assertEqual(len(data), size)
    155             self.assertEqual(len(data.strip('x')), 0)
    156         finally:
    157             data = None
    158 
    159 
    160 class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
    161     # Test compression in one go (whole message compression)
    162     def test_speech(self):
    163         x = zlib.compress(HAMLET_SCENE)
    164         self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
    165 
    166     def test_speech128(self):
    167         # compress more data
    168         data = HAMLET_SCENE * 128
    169         x = zlib.compress(data)
    170         self.assertEqual(zlib.decompress(x), data)
    171 
    172     def test_incomplete_stream(self):
    173         # A useful error message is given
    174         x = zlib.compress(HAMLET_SCENE)
    175         self.assertRaisesRegexp(zlib.error,
    176             "Error -5 while decompressing data: incomplete or truncated stream",
    177             zlib.decompress, x[:-1])
    178 
    179     # Memory use of the following functions takes into account overallocation
    180 
    181     @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
    182     def test_big_compress_buffer(self, size):
    183         compress = lambda s: zlib.compress(s, 1)
    184         self.check_big_compress_buffer(size, compress)
    185 
    186     @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
    187     def test_big_decompress_buffer(self, size):
    188         self.check_big_decompress_buffer(size, zlib.decompress)
    189 
    190     @precisionbigmemtest(size=_4G, memuse=1)
    191     def test_large_bufsize(self, size):
    192         # Test decompress(bufsize) parameter greater than the internal limit
    193         data = HAMLET_SCENE * 10
    194         compressed = zlib.compress(data, 1)
    195         self.assertEqual(zlib.decompress(compressed, 15, size), data)
    196 
    197     def test_custom_bufsize(self):
    198         data = HAMLET_SCENE * 10
    199         compressed = zlib.compress(data, 1)
    200         self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
    201 
    202     @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
    203     @precisionbigmemtest(size=_4G + 100, memuse=4)
    204     def test_64bit_compress(self, size):
    205         data = b'x' * size
    206         try:
    207             comp = zlib.compress(data, 0)
    208             self.assertEqual(zlib.decompress(comp), data)
    209         finally:
    210             comp = data = None
    211 
    212 
    213 class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
    214     # Test compression object
    215     def test_pair(self):
    216         # straightforward compress/decompress objects
    217         data = HAMLET_SCENE * 128
    218         co = zlib.compressobj()
    219         x1 = co.compress(data)
    220         x2 = co.flush()
    221         self.assertRaises(zlib.error, co.flush) # second flush should not work
    222         dco = zlib.decompressobj()
    223         y1 = dco.decompress(x1 + x2)
    224         y2 = dco.flush()
    225         self.assertEqual(data, y1 + y2)
    226 
    227     def test_compressoptions(self):
    228         # specify lots of options to compressobj()
    229         level = 2
    230         method = zlib.DEFLATED
    231         wbits = -12
    232         memlevel = 9
    233         strategy = zlib.Z_FILTERED
    234         co = zlib.compressobj(level, method, wbits, memlevel, strategy)
    235         x1 = co.compress(HAMLET_SCENE)
    236         x2 = co.flush()
    237         dco = zlib.decompressobj(wbits)
    238         y1 = dco.decompress(x1 + x2)
    239         y2 = dco.flush()
    240         self.assertEqual(HAMLET_SCENE, y1 + y2)
    241 
    242     def test_compressincremental(self):
    243         # compress object in steps, decompress object as one-shot
    244         data = HAMLET_SCENE * 128
    245         co = zlib.compressobj()
    246         bufs = []
    247         for i in range(0, len(data), 256):
    248             bufs.append(co.compress(data[i:i+256]))
    249         bufs.append(co.flush())
    250         combuf = ''.join(bufs)
    251 
    252         dco = zlib.decompressobj()
    253         y1 = dco.decompress(''.join(bufs))
    254         y2 = dco.flush()
    255         self.assertEqual(data, y1 + y2)
    256 
    257     def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
    258         # compress object in steps, decompress object in steps
    259         source = source or HAMLET_SCENE
    260         data = source * 128
    261         co = zlib.compressobj()
    262         bufs = []
    263         for i in range(0, len(data), cx):
    264             bufs.append(co.compress(data[i:i+cx]))
    265         bufs.append(co.flush())
    266         combuf = ''.join(bufs)
    267 
    268         self.assertEqual(data, zlib.decompress(combuf))
    269 
    270         dco = zlib.decompressobj()
    271         bufs = []
    272         for i in range(0, len(combuf), dcx):
    273             bufs.append(dco.decompress(combuf[i:i+dcx]))
    274             self.assertEqual('', dco.unconsumed_tail, ########
    275                              "(A) uct should be '': not %d long" %
    276                                        len(dco.unconsumed_tail))
    277         if flush:
    278             bufs.append(dco.flush())
    279         else:
    280             while True:
    281                 chunk = dco.decompress('')
    282                 if chunk:
    283                     bufs.append(chunk)
    284                 else:
    285                     break
    286         self.assertEqual('', dco.unconsumed_tail, ########
    287                          "(B) uct should be '': not %d long" %
    288                                        len(dco.unconsumed_tail))
    289         self.assertEqual(data, ''.join(bufs))
    290         # Failure means: "decompressobj with init options failed"
    291 
    292     def test_decompincflush(self):
    293         self.test_decompinc(flush=True)
    294 
    295     def test_decompimax(self, source=None, cx=256, dcx=64):
    296         # compress in steps, decompress in length-restricted steps
    297         source = source or HAMLET_SCENE
    298         # Check a decompression object with max_length specified
    299         data = source * 128
    300         co = zlib.compressobj()
    301         bufs = []
    302         for i in range(0, len(data), cx):
    303             bufs.append(co.compress(data[i:i+cx]))
    304         bufs.append(co.flush())
    305         combuf = ''.join(bufs)
    306         self.assertEqual(data, zlib.decompress(combuf),
    307                          'compressed data failure')
    308 
    309         dco = zlib.decompressobj()
    310         bufs = []
    311         cb = combuf
    312         while cb:
    313             #max_length = 1 + len(cb)//10
    314             chunk = dco.decompress(cb, dcx)
    315             self.assertFalse(len(chunk) > dcx,
    316                     'chunk too big (%d>%d)' % (len(chunk), dcx))
    317             bufs.append(chunk)
    318             cb = dco.unconsumed_tail
    319         bufs.append(dco.flush())
    320         self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
    321 
    322     def test_decompressmaxlen(self, flush=False):
    323         # Check a decompression object with max_length specified
    324         data = HAMLET_SCENE * 128
    325         co = zlib.compressobj()
    326         bufs = []
    327         for i in range(0, len(data), 256):
    328             bufs.append(co.compress(data[i:i+256]))
    329         bufs.append(co.flush())
    330         combuf = ''.join(bufs)
    331         self.assertEqual(data, zlib.decompress(combuf),
    332                          'compressed data failure')
    333 
    334         dco = zlib.decompressobj()
    335         bufs = []
    336         cb = combuf
    337         while cb:
    338             max_length = 1 + len(cb)//10
    339             chunk = dco.decompress(cb, max_length)
    340             self.assertFalse(len(chunk) > max_length,
    341                         'chunk too big (%d>%d)' % (len(chunk),max_length))
    342             bufs.append(chunk)
    343             cb = dco.unconsumed_tail
    344         if flush:
    345             bufs.append(dco.flush())
    346         else:
    347             while chunk:
    348                 chunk = dco.decompress('', max_length)
    349                 self.assertFalse(len(chunk) > max_length,
    350                             'chunk too big (%d>%d)' % (len(chunk),max_length))
    351                 bufs.append(chunk)
    352         self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
    353 
    354     def test_decompressmaxlenflush(self):
    355         self.test_decompressmaxlen(flush=True)
    356 
    357     def test_maxlenmisc(self):
    358         # Misc tests of max_length
    359         dco = zlib.decompressobj()
    360         self.assertRaises(ValueError, dco.decompress, "", -1)
    361         self.assertEqual('', dco.unconsumed_tail)
    362 
    363     def test_maxlen_large(self):
    364         # Sizes up to sys.maxsize should be accepted, although zlib is
    365         # internally limited to expressing sizes with unsigned int
    366         data = HAMLET_SCENE * 10
    367         DEFAULTALLOC = 16 * 1024
    368         self.assertGreater(len(data), DEFAULTALLOC)
    369         compressed = zlib.compress(data, 1)
    370         dco = zlib.decompressobj()
    371         self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
    372 
    373     def test_maxlen_custom(self):
    374         data = HAMLET_SCENE * 10
    375         compressed = zlib.compress(data, 1)
    376         dco = zlib.decompressobj()
    377         self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
    378 
    379     def test_clear_unconsumed_tail(self):
    380         # Issue #12050: calling decompress() without providing max_length
    381         # should clear the unconsumed_tail attribute.
    382         cdata = "x\x9cKLJ\x06\x00\x02M\x01"     # "abc"
    383         dco = zlib.decompressobj()
    384         ddata = dco.decompress(cdata, 1)
    385         ddata += dco.decompress(dco.unconsumed_tail)
    386         self.assertEqual(dco.unconsumed_tail, "")
    387 
    388     def test_flushes(self):
    389         # Test flush() with the various options, using all the
    390         # different levels in order to provide more variations.
    391         sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
    392         sync_opt = [getattr(zlib, opt) for opt in sync_opt
    393                     if hasattr(zlib, opt)]
    394         data = HAMLET_SCENE * 8
    395 
    396         for sync in sync_opt:
    397             for level in range(10):
    398                 obj = zlib.compressobj( level )
    399                 a = obj.compress( data[:3000] )
    400                 b = obj.flush( sync )
    401                 c = obj.compress( data[3000:] )
    402                 d = obj.flush()
    403                 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
    404                                  data, ("Decompress failed: flush "
    405                                         "mode=%i, level=%i") % (sync, level))
    406                 del obj
    407 
    408     @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
    409                          'requires zlib.Z_SYNC_FLUSH')
    410     def test_odd_flush(self):
    411         # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
    412         import random
    413         # Testing on 17K of "random" data
    414 
    415         # Create compressor and decompressor objects
    416         co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
    417         dco = zlib.decompressobj()
    418 
    419         # Try 17K of data
    420         # generate random data stream
    421         try:
    422             # In 2.3 and later, WichmannHill is the RNG of the bug report
    423             gen = random.WichmannHill()
    424         except AttributeError:
    425             try:
    426                 # 2.2 called it Random
    427                 gen = random.Random()
    428             except AttributeError:
    429                 # others might simply have a single RNG
    430                 gen = random
    431         gen.seed(1)
    432         data = genblock(1, 17 * 1024, generator=gen)
    433 
    434         # compress, sync-flush, and decompress
    435         first = co.compress(data)
    436         second = co.flush(zlib.Z_SYNC_FLUSH)
    437         expanded = dco.decompress(first + second)
    438 
    439         # if decompressed data is different from the input data, choke.
    440         self.assertEqual(expanded, data, "17K random source doesn't match")
    441 
    442     def test_empty_flush(self):
    443         # Test that calling .flush() on unused objects works.
    444         # (Bug #1083110 -- calling .flush() on decompress objects
    445         # caused a core dump.)
    446 
    447         co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
    448         self.assertTrue(co.flush())  # Returns a zlib header
    449         dco = zlib.decompressobj()
    450         self.assertEqual(dco.flush(), "") # Returns nothing
    451 
    452     def test_decompress_incomplete_stream(self):
    453         # This is 'foo', deflated
    454         x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
    455         # For the record
    456         self.assertEqual(zlib.decompress(x), 'foo')
    457         self.assertRaises(zlib.error, zlib.decompress, x[:-5])
    458         # Omitting the stream end works with decompressor objects
    459         # (see issue #8672).
    460         dco = zlib.decompressobj()
    461         y = dco.decompress(x[:-5])
    462         y += dco.flush()
    463         self.assertEqual(y, 'foo')
    464 
    465     def test_flush_with_freed_input(self):
    466         # Issue #16411: decompressor accesses input to last decompress() call
    467         # in flush(), even if this object has been freed in the meanwhile.
    468         input1 = 'abcdefghijklmnopqrstuvwxyz'
    469         input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM'
    470         data = zlib.compress(input1)
    471         dco = zlib.decompressobj()
    472         dco.decompress(data, 1)
    473         del data
    474         data = zlib.compress(input2)
    475         self.assertEqual(dco.flush(), input1[1:])
    476 
    477     @precisionbigmemtest(size=_4G, memuse=1)
    478     def test_flush_large_length(self, size):
    479         # Test flush(length) parameter greater than internal limit UINT_MAX
    480         input = HAMLET_SCENE * 10
    481         data = zlib.compress(input, 1)
    482         dco = zlib.decompressobj()
    483         dco.decompress(data, 1)
    484         self.assertEqual(dco.flush(size), input[1:])
    485 
    486     def test_flush_custom_length(self):
    487         input = HAMLET_SCENE * 10
    488         data = zlib.compress(input, 1)
    489         dco = zlib.decompressobj()
    490         dco.decompress(data, 1)
    491         self.assertEqual(dco.flush(CustomInt()), input[1:])
    492 
    493     @requires_Compress_copy
    494     def test_compresscopy(self):
    495         # Test copying a compression object
    496         data0 = HAMLET_SCENE
    497         data1 = HAMLET_SCENE.swapcase()
    498         c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
    499         bufs0 = []
    500         bufs0.append(c0.compress(data0))
    501 
    502         c1 = c0.copy()
    503         bufs1 = bufs0[:]
    504 
    505         bufs0.append(c0.compress(data0))
    506         bufs0.append(c0.flush())
    507         s0 = ''.join(bufs0)
    508 
    509         bufs1.append(c1.compress(data1))
    510         bufs1.append(c1.flush())
    511         s1 = ''.join(bufs1)
    512 
    513         self.assertEqual(zlib.decompress(s0),data0+data0)
    514         self.assertEqual(zlib.decompress(s1),data0+data1)
    515 
    516     @requires_Compress_copy
    517     def test_badcompresscopy(self):
    518         # Test copying a compression object in an inconsistent state
    519         c = zlib.compressobj()
    520         c.compress(HAMLET_SCENE)
    521         c.flush()
    522         self.assertRaises(ValueError, c.copy)
    523 
    524     def test_decompress_unused_data(self):
    525         # Repeated calls to decompress() after EOF should accumulate data in
    526         # dco.unused_data, instead of just storing the arg to the last call.
    527         source = b'abcdefghijklmnopqrstuvwxyz'
    528         remainder = b'0123456789'
    529         y = zlib.compress(source)
    530         x = y + remainder
    531         for maxlen in 0, 1000:
    532             for step in 1, 2, len(y), len(x):
    533                 dco = zlib.decompressobj()
    534                 data = b''
    535                 for i in range(0, len(x), step):
    536                     if i < len(y):
    537                         self.assertEqual(dco.unused_data, b'')
    538                     if maxlen == 0:
    539                         data += dco.decompress(x[i : i + step])
    540                         self.assertEqual(dco.unconsumed_tail, b'')
    541                     else:
    542                         data += dco.decompress(
    543                                 dco.unconsumed_tail + x[i : i + step], maxlen)
    544                 data += dco.flush()
    545                 self.assertEqual(data, source)
    546                 self.assertEqual(dco.unconsumed_tail, b'')
    547                 self.assertEqual(dco.unused_data, remainder)
    548 
    549     @requires_Decompress_copy
    550     def test_decompresscopy(self):
    551         # Test copying a decompression object
    552         data = HAMLET_SCENE
    553         comp = zlib.compress(data)
    554 
    555         d0 = zlib.decompressobj()
    556         bufs0 = []
    557         bufs0.append(d0.decompress(comp[:32]))
    558 
    559         d1 = d0.copy()
    560         bufs1 = bufs0[:]
    561 
    562         bufs0.append(d0.decompress(comp[32:]))
    563         s0 = ''.join(bufs0)
    564 
    565         bufs1.append(d1.decompress(comp[32:]))
    566         s1 = ''.join(bufs1)
    567 
    568         self.assertEqual(s0,s1)
    569         self.assertEqual(s0,data)
    570 
    571     @requires_Decompress_copy
    572     def test_baddecompresscopy(self):
    573         # Test copying a compression object in an inconsistent state
    574         data = zlib.compress(HAMLET_SCENE)
    575         d = zlib.decompressobj()
    576         d.decompress(data)
    577         d.flush()
    578         self.assertRaises(ValueError, d.copy)
    579 
    580     def test_compresspickle(self):
    581         for proto in range(pickle.HIGHEST_PROTOCOL + 1):
    582             with self.assertRaises((TypeError, pickle.PicklingError)):
    583                 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
    584 
    585     def test_decompresspickle(self):
    586         for proto in range(pickle.HIGHEST_PROTOCOL + 1):
    587             with self.assertRaises((TypeError, pickle.PicklingError)):
    588                 pickle.dumps(zlib.decompressobj(), proto)
    589 
    590     # Memory use of the following functions takes into account overallocation
    591 
    592     @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
    593     def test_big_compress_buffer(self, size):
    594         c = zlib.compressobj(1)
    595         compress = lambda s: c.compress(s) + c.flush()
    596         self.check_big_compress_buffer(size, compress)
    597 
    598     @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
    599     def test_big_decompress_buffer(self, size):
    600         d = zlib.decompressobj()
    601         decompress = lambda s: d.decompress(s) + d.flush()
    602         self.check_big_decompress_buffer(size, decompress)
    603 
    604     @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
    605     @precisionbigmemtest(size=_4G + 100, memuse=4)
    606     def test_64bit_compress(self, size):
    607         data = b'x' * size
    608         co = zlib.compressobj(0)
    609         do = zlib.decompressobj()
    610         try:
    611             comp = co.compress(data) + co.flush()
    612             uncomp = do.decompress(comp) + do.flush()
    613             self.assertEqual(uncomp, data)
    614         finally:
    615             comp = uncomp = data = None
    616 
    617     @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
    618     @precisionbigmemtest(size=_4G + 100, memuse=3)
    619     def test_large_unused_data(self, size):
    620         data = b'abcdefghijklmnop'
    621         unused = b'x' * size
    622         comp = zlib.compress(data) + unused
    623         do = zlib.decompressobj()
    624         try:
    625             uncomp = do.decompress(comp) + do.flush()
    626             self.assertEqual(unused, do.unused_data)
    627             self.assertEqual(uncomp, data)
    628         finally:
    629             unused = comp = do = None
    630 
    631     @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
    632     @precisionbigmemtest(size=_4G + 100, memuse=5)
    633     def test_large_unconsumed_tail(self, size):
    634         data = b'x' * size
    635         do = zlib.decompressobj()
    636         try:
    637             comp = zlib.compress(data, 0)
    638             uncomp = do.decompress(comp, 1) + do.flush()
    639             self.assertEqual(uncomp, data)
    640             self.assertEqual(do.unconsumed_tail, b'')
    641         finally:
    642             comp = uncomp = data = None
    643 
    644     def test_wbits(self):
    645         co = zlib.compressobj(1, zlib.DEFLATED, 15)
    646         zlib15 = co.compress(HAMLET_SCENE) + co.flush()
    647         self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
    648         self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
    649         with self.assertRaisesRegexp(zlib.error, 'invalid window size'):
    650             zlib.decompress(zlib15, 14)
    651         dco = zlib.decompressobj(32 + 15)
    652         self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
    653         dco = zlib.decompressobj(14)
    654         with self.assertRaisesRegexp(zlib.error, 'invalid window size'):
    655             dco.decompress(zlib15)
    656 
    657         co = zlib.compressobj(1, zlib.DEFLATED, 9)
    658         zlib9 = co.compress(HAMLET_SCENE) + co.flush()
    659         self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
    660         self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
    661         self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
    662         dco = zlib.decompressobj(32 + 9)
    663         self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
    664 
    665         co = zlib.compressobj(1, zlib.DEFLATED, -15)
    666         deflate15 = co.compress(HAMLET_SCENE) + co.flush()
    667         self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
    668         dco = zlib.decompressobj(-15)
    669         self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
    670 
    671         co = zlib.compressobj(1, zlib.DEFLATED, -9)
    672         deflate9 = co.compress(HAMLET_SCENE) + co.flush()
    673         self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
    674         self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
    675         dco = zlib.decompressobj(-9)
    676         self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
    677 
    678         co = zlib.compressobj(1, zlib.DEFLATED, 16 + 15)
    679         gzip = co.compress(HAMLET_SCENE) + co.flush()
    680         self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
    681         self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
    682         dco = zlib.decompressobj(32 + 15)
    683         self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
    684 
    685 
    686 def genblock(seed, length, step=1024, generator=random):
    687     """length-byte stream of random data from a seed (in step-byte blocks)."""
    688     if seed is not None:
    689         generator.seed(seed)
    690     randint = generator.randint
    691     if length < step or step < 2:
    692         step = length
    693     blocks = []
    694     for i in range(0, length, step):
    695         blocks.append(''.join([chr(randint(0,255))
    696                                for x in range(step)]))
    697     return ''.join(blocks)[:length]
    698 
    699 
    700 
    701 def choose_lines(source, number, seed=None, generator=random):
    702     """Return a list of number lines randomly chosen from the source"""
    703     if seed is not None:
    704         generator.seed(seed)
    705     sources = source.split('\n')
    706     return [generator.choice(sources) for n in range(number)]
    707 
    708 
    709 
    710 HAMLET_SCENE = """
    711 LAERTES
    712 
    713        O, fear me not.
    714        I stay too long: but here my father comes.
    715 
    716        Enter POLONIUS
    717 
    718        A double blessing is a double grace,
    719        Occasion smiles upon a second leave.
    720 
    721 LORD POLONIUS
    722 
    723        Yet here, Laertes! aboard, aboard, for shame!
    724        The wind sits in the shoulder of your sail,
    725        And you are stay'd for. There; my blessing with thee!
    726        And these few precepts in thy memory
    727        See thou character. Give thy thoughts no tongue,
    728        Nor any unproportioned thought his act.
    729        Be thou familiar, but by no means vulgar.
    730        Those friends thou hast, and their adoption tried,
    731        Grapple them to thy soul with hoops of steel;
    732        But do not dull thy palm with entertainment
    733        Of each new-hatch'd, unfledged comrade. Beware
    734        Of entrance to a quarrel, but being in,
    735        Bear't that the opposed may beware of thee.
    736        Give every man thy ear, but few thy voice;
    737        Take each man's censure, but reserve thy judgment.
    738        Costly thy habit as thy purse can buy,
    739        But not express'd in fancy; rich, not gaudy;
    740        For the apparel oft proclaims the man,
    741        And they in France of the best rank and station
    742        Are of a most select and generous chief in that.
    743        Neither a borrower nor a lender be;
    744        For loan oft loses both itself and friend,
    745        And borrowing dulls the edge of husbandry.
    746        This above all: to thine ownself be true,
    747        And it must follow, as the night the day,
    748        Thou canst not then be false to any man.
    749        Farewell: my blessing season this in thee!
    750 
    751 LAERTES
    752 
    753        Most humbly do I take my leave, my lord.
    754 
    755 LORD POLONIUS
    756 
    757        The time invites you; go; your servants tend.
    758 
    759 LAERTES
    760 
    761        Farewell, Ophelia; and remember well
    762        What I have said to you.
    763 
    764 OPHELIA
    765 
    766        'Tis in my memory lock'd,
    767        And you yourself shall keep the key of it.
    768 
    769 LAERTES
    770 
    771        Farewell.
    772 """
    773 
    774 
    775 class CustomInt:
    776     def __int__(self):
    777         return 100
    778 
    779 
    780 def test_main():
    781     run_unittest(
    782         ChecksumTestCase,
    783         ChecksumBigBufferTestCase,
    784         ExceptionTestCase,
    785         CompressTestCase,
    786         CompressObjectTestCase
    787     )
    788 
    789 if __name__ == "__main__":
    790     test_main()
    791