1 import unittest 2 from test.test_support import TESTFN, run_unittest, import_module, unlink, requires 3 import binascii 4 import random 5 from test.test_support import precisionbigmemtest, _1G, _4G 6 import sys 7 8 try: 9 import mmap 10 except ImportError: 11 mmap = None 12 13 zlib = import_module('zlib') 14 15 16 class ChecksumTestCase(unittest.TestCase): 17 # checksum test cases 18 def test_crc32start(self): 19 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) 20 self.assertTrue(zlib.crc32("abc", 0xffffffff)) 21 22 def test_crc32empty(self): 23 self.assertEqual(zlib.crc32("", 0), 0) 24 self.assertEqual(zlib.crc32("", 1), 1) 25 self.assertEqual(zlib.crc32("", 432), 432) 26 27 def test_adler32start(self): 28 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) 29 self.assertTrue(zlib.adler32("abc", 0xffffffff)) 30 31 def test_adler32empty(self): 32 self.assertEqual(zlib.adler32("", 0), 0) 33 self.assertEqual(zlib.adler32("", 1), 1) 34 self.assertEqual(zlib.adler32("", 432), 432) 35 36 def assertEqual32(self, seen, expected): 37 # 32-bit values masked -- checksums on 32- vs 64- bit machines 38 # This is important if bit 31 (0x08000000L) is set. 39 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) 40 41 def test_penguins(self): 42 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) 43 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) 44 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) 45 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) 46 47 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) 48 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) 49 50 def test_abcdefghijklmnop(self): 51 """test issue1202 compliance: signed crc32, adler32 in 2.x""" 52 foo = 'abcdefghijklmnop' 53 # explicitly test signed behavior 54 self.assertEqual(zlib.crc32(foo), -1808088941) 55 self.assertEqual(zlib.crc32('spam'), 1138425661) 56 self.assertEqual(zlib.adler32(foo+foo), -721416943) 57 self.assertEqual(zlib.adler32('spam'), 72286642) 58 59 def test_same_as_binascii_crc32(self): 60 foo = 'abcdefghijklmnop' 61 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo)) 62 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam')) 63 64 def test_negative_crc_iv_input(self): 65 # The range of valid input values for the crc state should be 66 # -2**31 through 2**32-1 to allow inputs artifically constrained 67 # to a signed 32-bit integer. 68 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL)) 69 self.assertEqual(zlib.crc32('spam', -3141593), 70 zlib.crc32('spam', 0xffd01027L)) 71 self.assertEqual(zlib.crc32('spam', -(2**31)), 72 zlib.crc32('spam', (2**31))) 73 74 75 class ExceptionTestCase(unittest.TestCase): 76 # make sure we generate some expected errors 77 def test_badlevel(self): 78 # specifying compression level out of range causes an error 79 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 80 # accepts 0 too) 81 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10) 82 83 def test_badcompressobj(self): 84 # verify failure on building compress object with bad params 85 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 86 # specifying total bits too large causes an error 87 self.assertRaises(ValueError, 88 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 89 90 def test_baddecompressobj(self): 91 # verify failure on building decompress object with bad params 92 self.assertRaises(ValueError, zlib.decompressobj, -1) 93 94 def test_decompressobj_badflush(self): 95 # verify failure on calling decompressobj.flush with bad params 96 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 97 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 98 99 100 class BaseCompressTestCase(object): 101 def check_big_compress_buffer(self, size, compress_func): 102 _1M = 1024 * 1024 103 fmt = "%%0%dx" % (2 * _1M) 104 # Generate 10MB worth of random, and expand it by repeating it. 105 # The assumption is that zlib's memory is not big enough to exploit 106 # such spread out redundancy. 107 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M)) 108 for i in range(10)]) 109 data = data * (size // len(data) + 1) 110 try: 111 compress_func(data) 112 finally: 113 # Release memory 114 data = None 115 116 def check_big_decompress_buffer(self, size, decompress_func): 117 data = 'x' * size 118 try: 119 compressed = zlib.compress(data, 1) 120 finally: 121 # Release memory 122 data = None 123 data = decompress_func(compressed) 124 # Sanity check 125 try: 126 self.assertEqual(len(data), size) 127 self.assertEqual(len(data.strip('x')), 0) 128 finally: 129 data = None 130 131 132 class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 133 # Test compression in one go (whole message compression) 134 def test_speech(self): 135 x = zlib.compress(HAMLET_SCENE) 136 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 137 138 def test_speech128(self): 139 # compress more data 140 data = HAMLET_SCENE * 128 141 x = zlib.compress(data) 142 self.assertEqual(zlib.decompress(x), data) 143 144 def test_incomplete_stream(self): 145 # An useful error message is given 146 x = zlib.compress(HAMLET_SCENE) 147 self.assertRaisesRegexp(zlib.error, 148 "Error -5 while decompressing data: incomplete or truncated stream", 149 zlib.decompress, x[:-1]) 150 151 # Memory use of the following functions takes into account overallocation 152 153 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 154 def test_big_compress_buffer(self, size): 155 compress = lambda s: zlib.compress(s, 1) 156 self.check_big_compress_buffer(size, compress) 157 158 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 159 def test_big_decompress_buffer(self, size): 160 self.check_big_decompress_buffer(size, zlib.decompress) 161 162 163 class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 164 # Test compression object 165 def test_pair(self): 166 # straightforward compress/decompress objects 167 data = HAMLET_SCENE * 128 168 co = zlib.compressobj() 169 x1 = co.compress(data) 170 x2 = co.flush() 171 self.assertRaises(zlib.error, co.flush) # second flush should not work 172 dco = zlib.decompressobj() 173 y1 = dco.decompress(x1 + x2) 174 y2 = dco.flush() 175 self.assertEqual(data, y1 + y2) 176 177 def test_compressoptions(self): 178 # specify lots of options to compressobj() 179 level = 2 180 method = zlib.DEFLATED 181 wbits = -12 182 memlevel = 9 183 strategy = zlib.Z_FILTERED 184 co = zlib.compressobj(level, method, wbits, memlevel, strategy) 185 x1 = co.compress(HAMLET_SCENE) 186 x2 = co.flush() 187 dco = zlib.decompressobj(wbits) 188 y1 = dco.decompress(x1 + x2) 189 y2 = dco.flush() 190 self.assertEqual(HAMLET_SCENE, y1 + y2) 191 192 def test_compressincremental(self): 193 # compress object in steps, decompress object as one-shot 194 data = HAMLET_SCENE * 128 195 co = zlib.compressobj() 196 bufs = [] 197 for i in range(0, len(data), 256): 198 bufs.append(co.compress(data[i:i+256])) 199 bufs.append(co.flush()) 200 combuf = ''.join(bufs) 201 202 dco = zlib.decompressobj() 203 y1 = dco.decompress(''.join(bufs)) 204 y2 = dco.flush() 205 self.assertEqual(data, y1 + y2) 206 207 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 208 # compress object in steps, decompress object in steps 209 source = source or HAMLET_SCENE 210 data = source * 128 211 co = zlib.compressobj() 212 bufs = [] 213 for i in range(0, len(data), cx): 214 bufs.append(co.compress(data[i:i+cx])) 215 bufs.append(co.flush()) 216 combuf = ''.join(bufs) 217 218 self.assertEqual(data, zlib.decompress(combuf)) 219 220 dco = zlib.decompressobj() 221 bufs = [] 222 for i in range(0, len(combuf), dcx): 223 bufs.append(dco.decompress(combuf[i:i+dcx])) 224 self.assertEqual('', dco.unconsumed_tail, ######## 225 "(A) uct should be '': not %d long" % 226 len(dco.unconsumed_tail)) 227 if flush: 228 bufs.append(dco.flush()) 229 else: 230 while True: 231 chunk = dco.decompress('') 232 if chunk: 233 bufs.append(chunk) 234 else: 235 break 236 self.assertEqual('', dco.unconsumed_tail, ######## 237 "(B) uct should be '': not %d long" % 238 len(dco.unconsumed_tail)) 239 self.assertEqual(data, ''.join(bufs)) 240 # Failure means: "decompressobj with init options failed" 241 242 def test_decompincflush(self): 243 self.test_decompinc(flush=True) 244 245 def test_decompimax(self, source=None, cx=256, dcx=64): 246 # compress in steps, decompress in length-restricted steps 247 source = source or HAMLET_SCENE 248 # Check a decompression object with max_length specified 249 data = source * 128 250 co = zlib.compressobj() 251 bufs = [] 252 for i in range(0, len(data), cx): 253 bufs.append(co.compress(data[i:i+cx])) 254 bufs.append(co.flush()) 255 combuf = ''.join(bufs) 256 self.assertEqual(data, zlib.decompress(combuf), 257 'compressed data failure') 258 259 dco = zlib.decompressobj() 260 bufs = [] 261 cb = combuf 262 while cb: 263 #max_length = 1 + len(cb)//10 264 chunk = dco.decompress(cb, dcx) 265 self.assertFalse(len(chunk) > dcx, 266 'chunk too big (%d>%d)' % (len(chunk), dcx)) 267 bufs.append(chunk) 268 cb = dco.unconsumed_tail 269 bufs.append(dco.flush()) 270 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 271 272 def test_decompressmaxlen(self, flush=False): 273 # Check a decompression object with max_length specified 274 data = HAMLET_SCENE * 128 275 co = zlib.compressobj() 276 bufs = [] 277 for i in range(0, len(data), 256): 278 bufs.append(co.compress(data[i:i+256])) 279 bufs.append(co.flush()) 280 combuf = ''.join(bufs) 281 self.assertEqual(data, zlib.decompress(combuf), 282 'compressed data failure') 283 284 dco = zlib.decompressobj() 285 bufs = [] 286 cb = combuf 287 while cb: 288 max_length = 1 + len(cb)//10 289 chunk = dco.decompress(cb, max_length) 290 self.assertFalse(len(chunk) > max_length, 291 'chunk too big (%d>%d)' % (len(chunk),max_length)) 292 bufs.append(chunk) 293 cb = dco.unconsumed_tail 294 if flush: 295 bufs.append(dco.flush()) 296 else: 297 while chunk: 298 chunk = dco.decompress('', max_length) 299 self.assertFalse(len(chunk) > max_length, 300 'chunk too big (%d>%d)' % (len(chunk),max_length)) 301 bufs.append(chunk) 302 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 303 304 def test_decompressmaxlenflush(self): 305 self.test_decompressmaxlen(flush=True) 306 307 def test_maxlenmisc(self): 308 # Misc tests of max_length 309 dco = zlib.decompressobj() 310 self.assertRaises(ValueError, dco.decompress, "", -1) 311 self.assertEqual('', dco.unconsumed_tail) 312 313 def test_clear_unconsumed_tail(self): 314 # Issue #12050: calling decompress() without providing max_length 315 # should clear the unconsumed_tail attribute. 316 cdata = "x\x9cKLJ\x06\x00\x02M\x01" # "abc" 317 dco = zlib.decompressobj() 318 ddata = dco.decompress(cdata, 1) 319 ddata += dco.decompress(dco.unconsumed_tail) 320 self.assertEqual(dco.unconsumed_tail, "") 321 322 def test_flushes(self): 323 # Test flush() with the various options, using all the 324 # different levels in order to provide more variations. 325 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] 326 sync_opt = [getattr(zlib, opt) for opt in sync_opt 327 if hasattr(zlib, opt)] 328 data = HAMLET_SCENE * 8 329 330 for sync in sync_opt: 331 for level in range(10): 332 obj = zlib.compressobj( level ) 333 a = obj.compress( data[:3000] ) 334 b = obj.flush( sync ) 335 c = obj.compress( data[3000:] ) 336 d = obj.flush() 337 self.assertEqual(zlib.decompress(''.join([a,b,c,d])), 338 data, ("Decompress failed: flush " 339 "mode=%i, level=%i") % (sync, level)) 340 del obj 341 342 def test_odd_flush(self): 343 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 344 import random 345 346 if hasattr(zlib, 'Z_SYNC_FLUSH'): 347 # Testing on 17K of "random" data 348 349 # Create compressor and decompressor objects 350 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 351 dco = zlib.decompressobj() 352 353 # Try 17K of data 354 # generate random data stream 355 try: 356 # In 2.3 and later, WichmannHill is the RNG of the bug report 357 gen = random.WichmannHill() 358 except AttributeError: 359 try: 360 # 2.2 called it Random 361 gen = random.Random() 362 except AttributeError: 363 # others might simply have a single RNG 364 gen = random 365 gen.seed(1) 366 data = genblock(1, 17 * 1024, generator=gen) 367 368 # compress, sync-flush, and decompress 369 first = co.compress(data) 370 second = co.flush(zlib.Z_SYNC_FLUSH) 371 expanded = dco.decompress(first + second) 372 373 # if decompressed data is different from the input data, choke. 374 self.assertEqual(expanded, data, "17K random source doesn't match") 375 376 def test_empty_flush(self): 377 # Test that calling .flush() on unused objects works. 378 # (Bug #1083110 -- calling .flush() on decompress objects 379 # caused a core dump.) 380 381 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 382 self.assertTrue(co.flush()) # Returns a zlib header 383 dco = zlib.decompressobj() 384 self.assertEqual(dco.flush(), "") # Returns nothing 385 386 def test_decompress_incomplete_stream(self): 387 # This is 'foo', deflated 388 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 389 # For the record 390 self.assertEqual(zlib.decompress(x), 'foo') 391 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 392 # Omitting the stream end works with decompressor objects 393 # (see issue #8672). 394 dco = zlib.decompressobj() 395 y = dco.decompress(x[:-5]) 396 y += dco.flush() 397 self.assertEqual(y, 'foo') 398 399 def test_flush_with_freed_input(self): 400 # Issue #16411: decompressor accesses input to last decompress() call 401 # in flush(), even if this object has been freed in the meanwhile. 402 input1 = 'abcdefghijklmnopqrstuvwxyz' 403 input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM' 404 data = zlib.compress(input1) 405 dco = zlib.decompressobj() 406 dco.decompress(data, 1) 407 del data 408 data = zlib.compress(input2) 409 self.assertEqual(dco.flush(), input1[1:]) 410 411 if hasattr(zlib.compressobj(), "copy"): 412 def test_compresscopy(self): 413 # Test copying a compression object 414 data0 = HAMLET_SCENE 415 data1 = HAMLET_SCENE.swapcase() 416 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 417 bufs0 = [] 418 bufs0.append(c0.compress(data0)) 419 420 c1 = c0.copy() 421 bufs1 = bufs0[:] 422 423 bufs0.append(c0.compress(data0)) 424 bufs0.append(c0.flush()) 425 s0 = ''.join(bufs0) 426 427 bufs1.append(c1.compress(data1)) 428 bufs1.append(c1.flush()) 429 s1 = ''.join(bufs1) 430 431 self.assertEqual(zlib.decompress(s0),data0+data0) 432 self.assertEqual(zlib.decompress(s1),data0+data1) 433 434 def test_badcompresscopy(self): 435 # Test copying a compression object in an inconsistent state 436 c = zlib.compressobj() 437 c.compress(HAMLET_SCENE) 438 c.flush() 439 self.assertRaises(ValueError, c.copy) 440 441 def test_decompress_unused_data(self): 442 # Repeated calls to decompress() after EOF should accumulate data in 443 # dco.unused_data, instead of just storing the arg to the last call. 444 source = b'abcdefghijklmnopqrstuvwxyz' 445 remainder = b'0123456789' 446 y = zlib.compress(source) 447 x = y + remainder 448 for maxlen in 0, 1000: 449 for step in 1, 2, len(y), len(x): 450 dco = zlib.decompressobj() 451 data = b'' 452 for i in range(0, len(x), step): 453 if i < len(y): 454 self.assertEqual(dco.unused_data, b'') 455 if maxlen == 0: 456 data += dco.decompress(x[i : i + step]) 457 self.assertEqual(dco.unconsumed_tail, b'') 458 else: 459 data += dco.decompress( 460 dco.unconsumed_tail + x[i : i + step], maxlen) 461 data += dco.flush() 462 self.assertEqual(data, source) 463 self.assertEqual(dco.unconsumed_tail, b'') 464 self.assertEqual(dco.unused_data, remainder) 465 466 if hasattr(zlib.decompressobj(), "copy"): 467 def test_decompresscopy(self): 468 # Test copying a decompression object 469 data = HAMLET_SCENE 470 comp = zlib.compress(data) 471 472 d0 = zlib.decompressobj() 473 bufs0 = [] 474 bufs0.append(d0.decompress(comp[:32])) 475 476 d1 = d0.copy() 477 bufs1 = bufs0[:] 478 479 bufs0.append(d0.decompress(comp[32:])) 480 s0 = ''.join(bufs0) 481 482 bufs1.append(d1.decompress(comp[32:])) 483 s1 = ''.join(bufs1) 484 485 self.assertEqual(s0,s1) 486 self.assertEqual(s0,data) 487 488 def test_baddecompresscopy(self): 489 # Test copying a compression object in an inconsistent state 490 data = zlib.compress(HAMLET_SCENE) 491 d = zlib.decompressobj() 492 d.decompress(data) 493 d.flush() 494 self.assertRaises(ValueError, d.copy) 495 496 # Memory use of the following functions takes into account overallocation 497 498 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 499 def test_big_compress_buffer(self, size): 500 c = zlib.compressobj(1) 501 compress = lambda s: c.compress(s) + c.flush() 502 self.check_big_compress_buffer(size, compress) 503 504 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 505 def test_big_decompress_buffer(self, size): 506 d = zlib.decompressobj() 507 decompress = lambda s: d.decompress(s) + d.flush() 508 self.check_big_decompress_buffer(size, decompress) 509 510 511 def genblock(seed, length, step=1024, generator=random): 512 """length-byte stream of random data from a seed (in step-byte blocks).""" 513 if seed is not None: 514 generator.seed(seed) 515 randint = generator.randint 516 if length < step or step < 2: 517 step = length 518 blocks = [] 519 for i in range(0, length, step): 520 blocks.append(''.join([chr(randint(0,255)) 521 for x in range(step)])) 522 return ''.join(blocks)[:length] 523 524 525 526 def choose_lines(source, number, seed=None, generator=random): 527 """Return a list of number lines randomly chosen from the source""" 528 if seed is not None: 529 generator.seed(seed) 530 sources = source.split('\n') 531 return [generator.choice(sources) for n in range(number)] 532 533 534 535 HAMLET_SCENE = """ 536 LAERTES 537 538 O, fear me not. 539 I stay too long: but here my father comes. 540 541 Enter POLONIUS 542 543 A double blessing is a double grace, 544 Occasion smiles upon a second leave. 545 546 LORD POLONIUS 547 548 Yet here, Laertes! aboard, aboard, for shame! 549 The wind sits in the shoulder of your sail, 550 And you are stay'd for. There; my blessing with thee! 551 And these few precepts in thy memory 552 See thou character. Give thy thoughts no tongue, 553 Nor any unproportioned thought his act. 554 Be thou familiar, but by no means vulgar. 555 Those friends thou hast, and their adoption tried, 556 Grapple them to thy soul with hoops of steel; 557 But do not dull thy palm with entertainment 558 Of each new-hatch'd, unfledged comrade. Beware 559 Of entrance to a quarrel, but being in, 560 Bear't that the opposed may beware of thee. 561 Give every man thy ear, but few thy voice; 562 Take each man's censure, but reserve thy judgment. 563 Costly thy habit as thy purse can buy, 564 But not express'd in fancy; rich, not gaudy; 565 For the apparel oft proclaims the man, 566 And they in France of the best rank and station 567 Are of a most select and generous chief in that. 568 Neither a borrower nor a lender be; 569 For loan oft loses both itself and friend, 570 And borrowing dulls the edge of husbandry. 571 This above all: to thine ownself be true, 572 And it must follow, as the night the day, 573 Thou canst not then be false to any man. 574 Farewell: my blessing season this in thee! 575 576 LAERTES 577 578 Most humbly do I take my leave, my lord. 579 580 LORD POLONIUS 581 582 The time invites you; go; your servants tend. 583 584 LAERTES 585 586 Farewell, Ophelia; and remember well 587 What I have said to you. 588 589 OPHELIA 590 591 'Tis in my memory lock'd, 592 And you yourself shall keep the key of it. 593 594 LAERTES 595 596 Farewell. 597 """ 598 599 600 def test_main(): 601 run_unittest( 602 ChecksumTestCase, 603 ExceptionTestCase, 604 CompressTestCase, 605 CompressObjectTestCase 606 ) 607 608 if __name__ == "__main__": 609 test_main() 610