1 """Test script for the gzip module. 2 """ 3 4 import unittest 5 from test import support 6 from test.support import bigmemtest, _4G 7 import os 8 import pathlib 9 import io 10 import struct 11 import array 12 gzip = support.import_module('gzip') 13 14 data1 = b""" int length=DEFAULTALLOC, err = Z_OK; 15 PyObject *RetVal; 16 int flushmode = Z_FINISH; 17 unsigned long start_total_out; 18 19 """ 20 21 data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */ 22 /* See http://www.gzip.org/zlib/ 23 /* See http://www.winimage.com/zLibDll for Windows */ 24 """ 25 26 27 class UnseekableIO(io.BytesIO): 28 def seekable(self): 29 return False 30 31 def tell(self): 32 raise io.UnsupportedOperation 33 34 def seek(self, *args): 35 raise io.UnsupportedOperation 36 37 38 class BaseTest(unittest.TestCase): 39 filename = support.TESTFN 40 41 def setUp(self): 42 support.unlink(self.filename) 43 44 def tearDown(self): 45 support.unlink(self.filename) 46 47 48 class TestGzip(BaseTest): 49 def write_and_read_back(self, data, mode='b'): 50 b_data = bytes(data) 51 with gzip.GzipFile(self.filename, 'w'+mode) as f: 52 l = f.write(data) 53 self.assertEqual(l, len(b_data)) 54 with gzip.GzipFile(self.filename, 'r'+mode) as f: 55 self.assertEqual(f.read(), b_data) 56 57 def test_write(self): 58 with gzip.GzipFile(self.filename, 'wb') as f: 59 f.write(data1 * 50) 60 61 # Try flush and fileno. 62 f.flush() 63 f.fileno() 64 if hasattr(os, 'fsync'): 65 os.fsync(f.fileno()) 66 f.close() 67 68 # Test multiple close() calls. 69 f.close() 70 71 def test_write_read_with_pathlike_file(self): 72 filename = pathlib.Path(self.filename) 73 with gzip.GzipFile(filename, 'w') as f: 74 f.write(data1 * 50) 75 self.assertIsInstance(f.name, str) 76 with gzip.GzipFile(filename, 'a') as f: 77 f.write(data1) 78 with gzip.GzipFile(filename) as f: 79 d = f.read() 80 self.assertEqual(d, data1 * 51) 81 self.assertIsInstance(f.name, str) 82 83 # The following test_write_xy methods test that write accepts 84 # the corresponding bytes-like object type as input 85 # and that the data written equals bytes(xy) in all cases. 86 def test_write_memoryview(self): 87 self.write_and_read_back(memoryview(data1 * 50)) 88 m = memoryview(bytes(range(256))) 89 data = m.cast('B', shape=[8,8,4]) 90 self.write_and_read_back(data) 91 92 def test_write_bytearray(self): 93 self.write_and_read_back(bytearray(data1 * 50)) 94 95 def test_write_array(self): 96 self.write_and_read_back(array.array('I', data1 * 40)) 97 98 def test_write_incompatible_type(self): 99 # Test that non-bytes-like types raise TypeError. 100 # Issue #21560: attempts to write incompatible types 101 # should not affect the state of the fileobject 102 with gzip.GzipFile(self.filename, 'wb') as f: 103 with self.assertRaises(TypeError): 104 f.write('') 105 with self.assertRaises(TypeError): 106 f.write([]) 107 f.write(data1) 108 with gzip.GzipFile(self.filename, 'rb') as f: 109 self.assertEqual(f.read(), data1) 110 111 def test_read(self): 112 self.test_write() 113 # Try reading. 114 with gzip.GzipFile(self.filename, 'r') as f: 115 d = f.read() 116 self.assertEqual(d, data1*50) 117 118 def test_read1(self): 119 self.test_write() 120 blocks = [] 121 nread = 0 122 with gzip.GzipFile(self.filename, 'r') as f: 123 while True: 124 d = f.read1() 125 if not d: 126 break 127 blocks.append(d) 128 nread += len(d) 129 # Check that position was updated correctly (see issue10791). 130 self.assertEqual(f.tell(), nread) 131 self.assertEqual(b''.join(blocks), data1 * 50) 132 133 @bigmemtest(size=_4G, memuse=1) 134 def test_read_large(self, size): 135 # Read chunk size over UINT_MAX should be supported, despite zlib's 136 # limitation per low-level call 137 compressed = gzip.compress(data1, compresslevel=1) 138 f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb') 139 self.assertEqual(f.read(size), data1) 140 141 def test_io_on_closed_object(self): 142 # Test that I/O operations on closed GzipFile objects raise a 143 # ValueError, just like the corresponding functions on file objects. 144 145 # Write to a file, open it for reading, then close it. 146 self.test_write() 147 f = gzip.GzipFile(self.filename, 'r') 148 fileobj = f.fileobj 149 self.assertFalse(fileobj.closed) 150 f.close() 151 self.assertTrue(fileobj.closed) 152 with self.assertRaises(ValueError): 153 f.read(1) 154 with self.assertRaises(ValueError): 155 f.seek(0) 156 with self.assertRaises(ValueError): 157 f.tell() 158 # Open the file for writing, then close it. 159 f = gzip.GzipFile(self.filename, 'w') 160 fileobj = f.fileobj 161 self.assertFalse(fileobj.closed) 162 f.close() 163 self.assertTrue(fileobj.closed) 164 with self.assertRaises(ValueError): 165 f.write(b'') 166 with self.assertRaises(ValueError): 167 f.flush() 168 169 def test_append(self): 170 self.test_write() 171 # Append to the previous file 172 with gzip.GzipFile(self.filename, 'ab') as f: 173 f.write(data2 * 15) 174 175 with gzip.GzipFile(self.filename, 'rb') as f: 176 d = f.read() 177 self.assertEqual(d, (data1*50) + (data2*15)) 178 179 def test_many_append(self): 180 # Bug #1074261 was triggered when reading a file that contained 181 # many, many members. Create such a file and verify that reading it 182 # works. 183 with gzip.GzipFile(self.filename, 'wb', 9) as f: 184 f.write(b'a') 185 for i in range(0, 200): 186 with gzip.GzipFile(self.filename, "ab", 9) as f: # append 187 f.write(b'a') 188 189 # Try reading the file 190 with gzip.GzipFile(self.filename, "rb") as zgfile: 191 contents = b"" 192 while 1: 193 ztxt = zgfile.read(8192) 194 contents += ztxt 195 if not ztxt: break 196 self.assertEqual(contents, b'a'*201) 197 198 def test_exclusive_write(self): 199 with gzip.GzipFile(self.filename, 'xb') as f: 200 f.write(data1 * 50) 201 with gzip.GzipFile(self.filename, 'rb') as f: 202 self.assertEqual(f.read(), data1 * 50) 203 with self.assertRaises(FileExistsError): 204 gzip.GzipFile(self.filename, 'xb') 205 206 def test_buffered_reader(self): 207 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for 208 # performance. 209 self.test_write() 210 211 with gzip.GzipFile(self.filename, 'rb') as f: 212 with io.BufferedReader(f) as r: 213 lines = [line for line in r] 214 215 self.assertEqual(lines, 50 * data1.splitlines(keepends=True)) 216 217 def test_readline(self): 218 self.test_write() 219 # Try .readline() with varying line lengths 220 221 with gzip.GzipFile(self.filename, 'rb') as f: 222 line_length = 0 223 while 1: 224 L = f.readline(line_length) 225 if not L and line_length != 0: break 226 self.assertTrue(len(L) <= line_length) 227 line_length = (line_length + 1) % 50 228 229 def test_readlines(self): 230 self.test_write() 231 # Try .readlines() 232 233 with gzip.GzipFile(self.filename, 'rb') as f: 234 L = f.readlines() 235 236 with gzip.GzipFile(self.filename, 'rb') as f: 237 while 1: 238 L = f.readlines(150) 239 if L == []: break 240 241 def test_seek_read(self): 242 self.test_write() 243 # Try seek, read test 244 245 with gzip.GzipFile(self.filename) as f: 246 while 1: 247 oldpos = f.tell() 248 line1 = f.readline() 249 if not line1: break 250 newpos = f.tell() 251 f.seek(oldpos) # negative seek 252 if len(line1)>10: 253 amount = 10 254 else: 255 amount = len(line1) 256 line2 = f.read(amount) 257 self.assertEqual(line1[:amount], line2) 258 f.seek(newpos) # positive seek 259 260 def test_seek_whence(self): 261 self.test_write() 262 # Try seek(whence=1), read test 263 264 with gzip.GzipFile(self.filename) as f: 265 f.read(10) 266 f.seek(10, whence=1) 267 y = f.read(10) 268 self.assertEqual(y, data1[20:30]) 269 270 def test_seek_write(self): 271 # Try seek, write test 272 with gzip.GzipFile(self.filename, 'w') as f: 273 for pos in range(0, 256, 16): 274 f.seek(pos) 275 f.write(b'GZ\n') 276 277 def test_mode(self): 278 self.test_write() 279 with gzip.GzipFile(self.filename, 'r') as f: 280 self.assertEqual(f.myfileobj.mode, 'rb') 281 support.unlink(self.filename) 282 with gzip.GzipFile(self.filename, 'x') as f: 283 self.assertEqual(f.myfileobj.mode, 'xb') 284 285 def test_1647484(self): 286 for mode in ('wb', 'rb'): 287 with gzip.GzipFile(self.filename, mode) as f: 288 self.assertTrue(hasattr(f, "name")) 289 self.assertEqual(f.name, self.filename) 290 291 def test_paddedfile_getattr(self): 292 self.test_write() 293 with gzip.GzipFile(self.filename, 'rb') as f: 294 self.assertTrue(hasattr(f.fileobj, "name")) 295 self.assertEqual(f.fileobj.name, self.filename) 296 297 def test_mtime(self): 298 mtime = 123456789 299 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: 300 fWrite.write(data1) 301 with gzip.GzipFile(self.filename) as fRead: 302 self.assertTrue(hasattr(fRead, 'mtime')) 303 self.assertIsNone(fRead.mtime) 304 dataRead = fRead.read() 305 self.assertEqual(dataRead, data1) 306 self.assertEqual(fRead.mtime, mtime) 307 308 def test_metadata(self): 309 mtime = 123456789 310 311 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: 312 fWrite.write(data1) 313 314 with open(self.filename, 'rb') as fRead: 315 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html 316 317 idBytes = fRead.read(2) 318 self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID 319 320 cmByte = fRead.read(1) 321 self.assertEqual(cmByte, b'\x08') # deflate 322 323 flagsByte = fRead.read(1) 324 self.assertEqual(flagsByte, b'\x08') # only the FNAME flag is set 325 326 mtimeBytes = fRead.read(4) 327 self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian 328 329 xflByte = fRead.read(1) 330 self.assertEqual(xflByte, b'\x02') # maximum compression 331 332 osByte = fRead.read(1) 333 self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent) 334 335 # Since the FNAME flag is set, the zero-terminated filename follows. 336 # RFC 1952 specifies that this is the name of the input file, if any. 337 # However, the gzip module defaults to storing the name of the output 338 # file in this field. 339 expected = self.filename.encode('Latin-1') + b'\x00' 340 nameBytes = fRead.read(len(expected)) 341 self.assertEqual(nameBytes, expected) 342 343 # Since no other flags were set, the header ends here. 344 # Rather than process the compressed data, let's seek to the trailer. 345 fRead.seek(os.stat(self.filename).st_size - 8) 346 347 crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1] 348 self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83') 349 350 isizeBytes = fRead.read(4) 351 self.assertEqual(isizeBytes, struct.pack('<i', len(data1))) 352 353 def test_with_open(self): 354 # GzipFile supports the context management protocol 355 with gzip.GzipFile(self.filename, "wb") as f: 356 f.write(b"xxx") 357 f = gzip.GzipFile(self.filename, "rb") 358 f.close() 359 try: 360 with f: 361 pass 362 except ValueError: 363 pass 364 else: 365 self.fail("__enter__ on a closed file didn't raise an exception") 366 try: 367 with gzip.GzipFile(self.filename, "wb") as f: 368 1/0 369 except ZeroDivisionError: 370 pass 371 else: 372 self.fail("1/0 didn't raise an exception") 373 374 def test_zero_padded_file(self): 375 with gzip.GzipFile(self.filename, "wb") as f: 376 f.write(data1 * 50) 377 378 # Pad the file with zeroes 379 with open(self.filename, "ab") as f: 380 f.write(b"\x00" * 50) 381 382 with gzip.GzipFile(self.filename, "rb") as f: 383 d = f.read() 384 self.assertEqual(d, data1 * 50, "Incorrect data in file") 385 386 def test_non_seekable_file(self): 387 uncompressed = data1 * 50 388 buf = UnseekableIO() 389 with gzip.GzipFile(fileobj=buf, mode="wb") as f: 390 f.write(uncompressed) 391 compressed = buf.getvalue() 392 buf = UnseekableIO(compressed) 393 with gzip.GzipFile(fileobj=buf, mode="rb") as f: 394 self.assertEqual(f.read(), uncompressed) 395 396 def test_peek(self): 397 uncompressed = data1 * 200 398 with gzip.GzipFile(self.filename, "wb") as f: 399 f.write(uncompressed) 400 401 def sizes(): 402 while True: 403 for n in range(5, 50, 10): 404 yield n 405 406 with gzip.GzipFile(self.filename, "rb") as f: 407 f.max_read_chunk = 33 408 nread = 0 409 for n in sizes(): 410 s = f.peek(n) 411 if s == b'': 412 break 413 self.assertEqual(f.read(len(s)), s) 414 nread += len(s) 415 self.assertEqual(f.read(100), b'') 416 self.assertEqual(nread, len(uncompressed)) 417 418 def test_textio_readlines(self): 419 # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile. 420 lines = (data1 * 50).decode("ascii").splitlines(keepends=True) 421 self.test_write() 422 with gzip.GzipFile(self.filename, 'r') as f: 423 with io.TextIOWrapper(f, encoding="ascii") as t: 424 self.assertEqual(t.readlines(), lines) 425 426 def test_fileobj_from_fdopen(self): 427 # Issue #13781: Opening a GzipFile for writing fails when using a 428 # fileobj created with os.fdopen(). 429 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT) 430 with os.fdopen(fd, "wb") as f: 431 with gzip.GzipFile(fileobj=f, mode="w") as g: 432 pass 433 434 def test_bytes_filename(self): 435 str_filename = self.filename 436 try: 437 bytes_filename = str_filename.encode("ascii") 438 except UnicodeEncodeError: 439 self.skipTest("Temporary file name needs to be ASCII") 440 with gzip.GzipFile(bytes_filename, "wb") as f: 441 f.write(data1 * 50) 442 with gzip.GzipFile(bytes_filename, "rb") as f: 443 self.assertEqual(f.read(), data1 * 50) 444 # Sanity check that we are actually operating on the right file. 445 with gzip.GzipFile(str_filename, "rb") as f: 446 self.assertEqual(f.read(), data1 * 50) 447 448 def test_decompress_limited(self): 449 """Decompressed data buffering should be limited""" 450 bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9) 451 self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE) 452 453 bomb = io.BytesIO(bomb) 454 decomp = gzip.GzipFile(fileobj=bomb) 455 self.assertEqual(decomp.read(1), b'\0') 456 max_decomp = 1 + io.DEFAULT_BUFFER_SIZE 457 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, 458 "Excessive amount of data was decompressed") 459 460 # Testing compress/decompress shortcut functions 461 462 def test_compress(self): 463 for data in [data1, data2]: 464 for args in [(), (1,), (6,), (9,)]: 465 datac = gzip.compress(data, *args) 466 self.assertEqual(type(datac), bytes) 467 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f: 468 self.assertEqual(f.read(), data) 469 470 def test_decompress(self): 471 for data in (data1, data2): 472 buf = io.BytesIO() 473 with gzip.GzipFile(fileobj=buf, mode="wb") as f: 474 f.write(data) 475 self.assertEqual(gzip.decompress(buf.getvalue()), data) 476 # Roundtrip with compress 477 datac = gzip.compress(data) 478 self.assertEqual(gzip.decompress(datac), data) 479 480 def test_read_truncated(self): 481 data = data1*50 482 # Drop the CRC (4 bytes) and file size (4 bytes). 483 truncated = gzip.compress(data)[:-8] 484 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: 485 self.assertRaises(EOFError, f.read) 486 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: 487 self.assertEqual(f.read(len(data)), data) 488 self.assertRaises(EOFError, f.read, 1) 489 # Incomplete 10-byte header. 490 for i in range(2, 10): 491 with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f: 492 self.assertRaises(EOFError, f.read, 1) 493 494 def test_read_with_extra(self): 495 # Gzip data with an extra field 496 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff' 497 b'\x05\x00Extra' 498 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00') 499 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f: 500 self.assertEqual(f.read(), b'Test') 501 502 def test_prepend_error(self): 503 # See issue #20875 504 with gzip.open(self.filename, "wb") as f: 505 f.write(data1) 506 with gzip.open(self.filename, "rb") as f: 507 f._buffer.raw._fp.prepend() 508 509 class TestOpen(BaseTest): 510 def test_binary_modes(self): 511 uncompressed = data1 * 50 512 513 with gzip.open(self.filename, "wb") as f: 514 f.write(uncompressed) 515 with open(self.filename, "rb") as f: 516 file_data = gzip.decompress(f.read()) 517 self.assertEqual(file_data, uncompressed) 518 519 with gzip.open(self.filename, "rb") as f: 520 self.assertEqual(f.read(), uncompressed) 521 522 with gzip.open(self.filename, "ab") as f: 523 f.write(uncompressed) 524 with open(self.filename, "rb") as f: 525 file_data = gzip.decompress(f.read()) 526 self.assertEqual(file_data, uncompressed * 2) 527 528 with self.assertRaises(FileExistsError): 529 gzip.open(self.filename, "xb") 530 support.unlink(self.filename) 531 with gzip.open(self.filename, "xb") as f: 532 f.write(uncompressed) 533 with open(self.filename, "rb") as f: 534 file_data = gzip.decompress(f.read()) 535 self.assertEqual(file_data, uncompressed) 536 537 def test_pathlike_file(self): 538 filename = pathlib.Path(self.filename) 539 with gzip.open(filename, "wb") as f: 540 f.write(data1 * 50) 541 with gzip.open(filename, "ab") as f: 542 f.write(data1) 543 with gzip.open(filename) as f: 544 self.assertEqual(f.read(), data1 * 51) 545 546 def test_implicit_binary_modes(self): 547 # Test implicit binary modes (no "b" or "t" in mode string). 548 uncompressed = data1 * 50 549 550 with gzip.open(self.filename, "w") as f: 551 f.write(uncompressed) 552 with open(self.filename, "rb") as f: 553 file_data = gzip.decompress(f.read()) 554 self.assertEqual(file_data, uncompressed) 555 556 with gzip.open(self.filename, "r") as f: 557 self.assertEqual(f.read(), uncompressed) 558 559 with gzip.open(self.filename, "a") as f: 560 f.write(uncompressed) 561 with open(self.filename, "rb") as f: 562 file_data = gzip.decompress(f.read()) 563 self.assertEqual(file_data, uncompressed * 2) 564 565 with self.assertRaises(FileExistsError): 566 gzip.open(self.filename, "x") 567 support.unlink(self.filename) 568 with gzip.open(self.filename, "x") as f: 569 f.write(uncompressed) 570 with open(self.filename, "rb") as f: 571 file_data = gzip.decompress(f.read()) 572 self.assertEqual(file_data, uncompressed) 573 574 def test_text_modes(self): 575 uncompressed = data1.decode("ascii") * 50 576 uncompressed_raw = uncompressed.replace("\n", os.linesep) 577 with gzip.open(self.filename, "wt") as f: 578 f.write(uncompressed) 579 with open(self.filename, "rb") as f: 580 file_data = gzip.decompress(f.read()).decode("ascii") 581 self.assertEqual(file_data, uncompressed_raw) 582 with gzip.open(self.filename, "rt") as f: 583 self.assertEqual(f.read(), uncompressed) 584 with gzip.open(self.filename, "at") as f: 585 f.write(uncompressed) 586 with open(self.filename, "rb") as f: 587 file_data = gzip.decompress(f.read()).decode("ascii") 588 self.assertEqual(file_data, uncompressed_raw * 2) 589 590 def test_fileobj(self): 591 uncompressed_bytes = data1 * 50 592 uncompressed_str = uncompressed_bytes.decode("ascii") 593 compressed = gzip.compress(uncompressed_bytes) 594 with gzip.open(io.BytesIO(compressed), "r") as f: 595 self.assertEqual(f.read(), uncompressed_bytes) 596 with gzip.open(io.BytesIO(compressed), "rb") as f: 597 self.assertEqual(f.read(), uncompressed_bytes) 598 with gzip.open(io.BytesIO(compressed), "rt") as f: 599 self.assertEqual(f.read(), uncompressed_str) 600 601 def test_bad_params(self): 602 # Test invalid parameter combinations. 603 with self.assertRaises(TypeError): 604 gzip.open(123.456) 605 with self.assertRaises(ValueError): 606 gzip.open(self.filename, "wbt") 607 with self.assertRaises(ValueError): 608 gzip.open(self.filename, "xbt") 609 with self.assertRaises(ValueError): 610 gzip.open(self.filename, "rb", encoding="utf-8") 611 with self.assertRaises(ValueError): 612 gzip.open(self.filename, "rb", errors="ignore") 613 with self.assertRaises(ValueError): 614 gzip.open(self.filename, "rb", newline="\n") 615 616 def test_encoding(self): 617 # Test non-default encoding. 618 uncompressed = data1.decode("ascii") * 50 619 uncompressed_raw = uncompressed.replace("\n", os.linesep) 620 with gzip.open(self.filename, "wt", encoding="utf-16") as f: 621 f.write(uncompressed) 622 with open(self.filename, "rb") as f: 623 file_data = gzip.decompress(f.read()).decode("utf-16") 624 self.assertEqual(file_data, uncompressed_raw) 625 with gzip.open(self.filename, "rt", encoding="utf-16") as f: 626 self.assertEqual(f.read(), uncompressed) 627 628 def test_encoding_error_handler(self): 629 # Test with non-default encoding error handler. 630 with gzip.open(self.filename, "wb") as f: 631 f.write(b"foo\xffbar") 632 with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \ 633 as f: 634 self.assertEqual(f.read(), "foobar") 635 636 def test_newline(self): 637 # Test with explicit newline (universal newline mode disabled). 638 uncompressed = data1.decode("ascii") * 50 639 with gzip.open(self.filename, "wt", newline="\n") as f: 640 f.write(uncompressed) 641 with gzip.open(self.filename, "rt", newline="\r") as f: 642 self.assertEqual(f.readlines(), [uncompressed]) 643 644 def test_main(verbose=None): 645 support.run_unittest(TestGzip, TestOpen) 646 647 if __name__ == "__main__": 648 test_main(verbose=True) 649