1 # We can test part of the module without zlib. 2 try: 3 import zlib 4 except ImportError: 5 zlib = None 6 7 import os 8 import io 9 import sys 10 import time 11 import shutil 12 import struct 13 import zipfile 14 import unittest 15 16 from StringIO import StringIO 17 from tempfile import TemporaryFile 18 from random import randint, random 19 from unittest import skipUnless 20 21 from test.test_support import TESTFN, run_unittest, findfile, unlink 22 23 TESTFN2 = TESTFN + "2" 24 TESTFNDIR = TESTFN + "d" 25 FIXEDTEST_SIZE = 1000 26 27 SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 28 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 29 ('/ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 30 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 31 32 33 class TestsWithSourceFile(unittest.TestCase): 34 def setUp(self): 35 self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random()) 36 for i in xrange(FIXEDTEST_SIZE)] 37 self.data = '\n'.join(self.line_gen) + '\n' 38 39 # Make a source file with some lines 40 with open(TESTFN, "wb") as fp: 41 fp.write(self.data) 42 43 def make_test_archive(self, f, compression): 44 # Create the ZIP archive 45 with zipfile.ZipFile(f, "w", compression) as zipfp: 46 zipfp.write(TESTFN, "another.name") 47 zipfp.write(TESTFN, TESTFN) 48 zipfp.writestr("strfile", self.data) 49 50 def zip_test(self, f, compression): 51 self.make_test_archive(f, compression) 52 53 # Read the ZIP archive 54 with zipfile.ZipFile(f, "r", compression) as zipfp: 55 self.assertEqual(zipfp.read(TESTFN), self.data) 56 self.assertEqual(zipfp.read("another.name"), self.data) 57 self.assertEqual(zipfp.read("strfile"), self.data) 58 59 # Print the ZIP directory 60 fp = StringIO() 61 stdout = sys.stdout 62 try: 63 sys.stdout = fp 64 zipfp.printdir() 65 finally: 66 sys.stdout = stdout 67 68 directory = fp.getvalue() 69 lines = directory.splitlines() 70 self.assertEqual(len(lines), 4) # Number of files + header 71 72 self.assertIn('File Name', lines[0]) 73 self.assertIn('Modified', lines[0]) 74 self.assertIn('Size', lines[0]) 75 76 fn, date, time_, size = lines[1].split() 77 self.assertEqual(fn, 'another.name') 78 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 79 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 80 self.assertEqual(size, str(len(self.data))) 81 82 # Check the namelist 83 names = zipfp.namelist() 84 self.assertEqual(len(names), 3) 85 self.assertIn(TESTFN, names) 86 self.assertIn("another.name", names) 87 self.assertIn("strfile", names) 88 89 # Check infolist 90 infos = zipfp.infolist() 91 names = [i.filename for i in infos] 92 self.assertEqual(len(names), 3) 93 self.assertIn(TESTFN, names) 94 self.assertIn("another.name", names) 95 self.assertIn("strfile", names) 96 for i in infos: 97 self.assertEqual(i.file_size, len(self.data)) 98 99 # check getinfo 100 for nm in (TESTFN, "another.name", "strfile"): 101 info = zipfp.getinfo(nm) 102 self.assertEqual(info.filename, nm) 103 self.assertEqual(info.file_size, len(self.data)) 104 105 # Check that testzip doesn't raise an exception 106 zipfp.testzip() 107 108 def test_stored(self): 109 for f in (TESTFN2, TemporaryFile(), StringIO()): 110 self.zip_test(f, zipfile.ZIP_STORED) 111 112 def zip_open_test(self, f, compression): 113 self.make_test_archive(f, compression) 114 115 # Read the ZIP archive 116 with zipfile.ZipFile(f, "r", compression) as zipfp: 117 zipdata1 = [] 118 with zipfp.open(TESTFN) as zipopen1: 119 while True: 120 read_data = zipopen1.read(256) 121 if not read_data: 122 break 123 zipdata1.append(read_data) 124 125 zipdata2 = [] 126 with zipfp.open("another.name") as zipopen2: 127 while True: 128 read_data = zipopen2.read(256) 129 if not read_data: 130 break 131 zipdata2.append(read_data) 132 133 self.assertEqual(''.join(zipdata1), self.data) 134 self.assertEqual(''.join(zipdata2), self.data) 135 136 def test_open_stored(self): 137 for f in (TESTFN2, TemporaryFile(), StringIO()): 138 self.zip_open_test(f, zipfile.ZIP_STORED) 139 140 def test_open_via_zip_info(self): 141 # Create the ZIP archive 142 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 143 zipfp.writestr("name", "foo") 144 zipfp.writestr("name", "bar") 145 146 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 147 infos = zipfp.infolist() 148 data = "" 149 for info in infos: 150 with zipfp.open(info) as f: 151 data += f.read() 152 self.assertTrue(data == "foobar" or data == "barfoo") 153 data = "" 154 for info in infos: 155 data += zipfp.read(info) 156 self.assertTrue(data == "foobar" or data == "barfoo") 157 158 def zip_random_open_test(self, f, compression): 159 self.make_test_archive(f, compression) 160 161 # Read the ZIP archive 162 with zipfile.ZipFile(f, "r", compression) as zipfp: 163 zipdata1 = [] 164 with zipfp.open(TESTFN) as zipopen1: 165 while True: 166 read_data = zipopen1.read(randint(1, 1024)) 167 if not read_data: 168 break 169 zipdata1.append(read_data) 170 171 self.assertEqual(''.join(zipdata1), self.data) 172 173 def test_random_open_stored(self): 174 for f in (TESTFN2, TemporaryFile(), StringIO()): 175 self.zip_random_open_test(f, zipfile.ZIP_STORED) 176 177 def test_univeral_readaheads(self): 178 f = StringIO() 179 180 data = 'a\r\n' * 16 * 1024 181 with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp: 182 zipfp.writestr(TESTFN, data) 183 184 data2 = '' 185 with zipfile.ZipFile(f, 'r') as zipfp: 186 with zipfp.open(TESTFN, 'rU') as zipopen: 187 for line in zipopen: 188 data2 += line 189 190 self.assertEqual(data, data2.replace('\n', '\r\n')) 191 192 def zip_readline_read_test(self, f, compression): 193 self.make_test_archive(f, compression) 194 195 # Read the ZIP archive 196 with zipfile.ZipFile(f, "r") as zipfp: 197 with zipfp.open(TESTFN) as zipopen: 198 data = '' 199 while True: 200 read = zipopen.readline() 201 if not read: 202 break 203 data += read 204 205 read = zipopen.read(100) 206 if not read: 207 break 208 data += read 209 210 self.assertEqual(data, self.data) 211 212 def zip_readline_test(self, f, compression): 213 self.make_test_archive(f, compression) 214 215 # Read the ZIP archive 216 with zipfile.ZipFile(f, "r") as zipfp: 217 with zipfp.open(TESTFN) as zipopen: 218 for line in self.line_gen: 219 linedata = zipopen.readline() 220 self.assertEqual(linedata, line + '\n') 221 222 def zip_readlines_test(self, f, compression): 223 self.make_test_archive(f, compression) 224 225 # Read the ZIP archive 226 with zipfile.ZipFile(f, "r") as zipfp: 227 with zipfp.open(TESTFN) as zo: 228 ziplines = zo.readlines() 229 for line, zipline in zip(self.line_gen, ziplines): 230 self.assertEqual(zipline, line + '\n') 231 232 def zip_iterlines_test(self, f, compression): 233 self.make_test_archive(f, compression) 234 235 # Read the ZIP archive 236 with zipfile.ZipFile(f, "r") as zipfp: 237 for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)): 238 self.assertEqual(zipline, line + '\n') 239 240 def test_readline_read_stored(self): 241 # Issue #7610: calls to readline() interleaved with calls to read(). 242 for f in (TESTFN2, TemporaryFile(), StringIO()): 243 self.zip_readline_read_test(f, zipfile.ZIP_STORED) 244 245 def test_readline_stored(self): 246 for f in (TESTFN2, TemporaryFile(), StringIO()): 247 self.zip_readline_test(f, zipfile.ZIP_STORED) 248 249 def test_readlines_stored(self): 250 for f in (TESTFN2, TemporaryFile(), StringIO()): 251 self.zip_readlines_test(f, zipfile.ZIP_STORED) 252 253 def test_iterlines_stored(self): 254 for f in (TESTFN2, TemporaryFile(), StringIO()): 255 self.zip_iterlines_test(f, zipfile.ZIP_STORED) 256 257 @skipUnless(zlib, "requires zlib") 258 def test_deflated(self): 259 for f in (TESTFN2, TemporaryFile(), StringIO()): 260 self.zip_test(f, zipfile.ZIP_DEFLATED) 261 262 @skipUnless(zlib, "requires zlib") 263 def test_open_deflated(self): 264 for f in (TESTFN2, TemporaryFile(), StringIO()): 265 self.zip_open_test(f, zipfile.ZIP_DEFLATED) 266 267 @skipUnless(zlib, "requires zlib") 268 def test_random_open_deflated(self): 269 for f in (TESTFN2, TemporaryFile(), StringIO()): 270 self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) 271 272 @skipUnless(zlib, "requires zlib") 273 def test_readline_read_deflated(self): 274 # Issue #7610: calls to readline() interleaved with calls to read(). 275 for f in (TESTFN2, TemporaryFile(), StringIO()): 276 self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED) 277 278 @skipUnless(zlib, "requires zlib") 279 def test_readline_deflated(self): 280 for f in (TESTFN2, TemporaryFile(), StringIO()): 281 self.zip_readline_test(f, zipfile.ZIP_DEFLATED) 282 283 @skipUnless(zlib, "requires zlib") 284 def test_readlines_deflated(self): 285 for f in (TESTFN2, TemporaryFile(), StringIO()): 286 self.zip_readlines_test(f, zipfile.ZIP_DEFLATED) 287 288 @skipUnless(zlib, "requires zlib") 289 def test_iterlines_deflated(self): 290 for f in (TESTFN2, TemporaryFile(), StringIO()): 291 self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED) 292 293 @skipUnless(zlib, "requires zlib") 294 def test_low_compression(self): 295 """Check for cases where compressed data is larger than original.""" 296 # Create the ZIP archive 297 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: 298 zipfp.writestr("strfile", '12') 299 300 # Get an open object for strfile 301 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp: 302 with zipfp.open("strfile") as openobj: 303 self.assertEqual(openobj.read(1), '1') 304 self.assertEqual(openobj.read(1), '2') 305 306 def test_absolute_arcnames(self): 307 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 308 zipfp.write(TESTFN, "/absolute") 309 310 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 311 self.assertEqual(zipfp.namelist(), ["absolute"]) 312 313 def test_append_to_zip_file(self): 314 """Test appending to an existing zipfile.""" 315 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 316 zipfp.write(TESTFN, TESTFN) 317 318 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 319 zipfp.writestr("strfile", self.data) 320 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 321 322 def test_append_to_non_zip_file(self): 323 """Test appending to an existing file that is not a zipfile.""" 324 # NOTE: this test fails if len(d) < 22 because of the first 325 # line "fpin.seek(-22, 2)" in _EndRecData 326 data = 'I am not a ZipFile!'*10 327 with open(TESTFN2, 'wb') as f: 328 f.write(data) 329 330 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 331 zipfp.write(TESTFN, TESTFN) 332 333 with open(TESTFN2, 'rb') as f: 334 f.seek(len(data)) 335 with zipfile.ZipFile(f, "r") as zipfp: 336 self.assertEqual(zipfp.namelist(), [TESTFN]) 337 338 def test_write_default_name(self): 339 """Check that calling ZipFile.write without arcname specified 340 produces the expected result.""" 341 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 342 zipfp.write(TESTFN) 343 self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read()) 344 345 @skipUnless(zlib, "requires zlib") 346 def test_per_file_compression(self): 347 """Check that files within a Zip archive can have different 348 compression options.""" 349 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 350 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 351 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 352 sinfo = zipfp.getinfo('storeme') 353 dinfo = zipfp.getinfo('deflateme') 354 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 355 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 356 357 def test_write_to_readonly(self): 358 """Check that trying to call write() on a readonly ZipFile object 359 raises a RuntimeError.""" 360 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 361 zipfp.writestr("somefile.txt", "bogus") 362 363 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 364 self.assertRaises(RuntimeError, zipfp.write, TESTFN) 365 366 def test_extract(self): 367 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 368 for fpath, fdata in SMALL_TEST_DATA: 369 zipfp.writestr(fpath, fdata) 370 371 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 372 for fpath, fdata in SMALL_TEST_DATA: 373 writtenfile = zipfp.extract(fpath) 374 375 # make sure it was written to the right place 376 if os.path.isabs(fpath): 377 correctfile = os.path.join(os.getcwd(), fpath[1:]) 378 else: 379 correctfile = os.path.join(os.getcwd(), fpath) 380 correctfile = os.path.normpath(correctfile) 381 382 self.assertEqual(writtenfile, correctfile) 383 384 # make sure correct data is in correct file 385 self.assertEqual(fdata, open(writtenfile, "rb").read()) 386 os.remove(writtenfile) 387 388 # remove the test file subdirectories 389 shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) 390 391 def test_extract_all(self): 392 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 393 for fpath, fdata in SMALL_TEST_DATA: 394 zipfp.writestr(fpath, fdata) 395 396 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 397 zipfp.extractall() 398 for fpath, fdata in SMALL_TEST_DATA: 399 if os.path.isabs(fpath): 400 outfile = os.path.join(os.getcwd(), fpath[1:]) 401 else: 402 outfile = os.path.join(os.getcwd(), fpath) 403 404 self.assertEqual(fdata, open(outfile, "rb").read()) 405 os.remove(outfile) 406 407 # remove the test file subdirectories 408 shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) 409 410 def test_writestr_compression(self): 411 zipfp = zipfile.ZipFile(TESTFN2, "w") 412 zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) 413 if zlib: 414 zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) 415 416 info = zipfp.getinfo('a.txt') 417 self.assertEqual(info.compress_type, zipfile.ZIP_STORED) 418 419 if zlib: 420 info = zipfp.getinfo('b.txt') 421 self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) 422 423 424 def zip_test_writestr_permissions(self, f, compression): 425 # Make sure that writestr creates files with mode 0600, 426 # when it is passed a name rather than a ZipInfo instance. 427 428 self.make_test_archive(f, compression) 429 with zipfile.ZipFile(f, "r") as zipfp: 430 zinfo = zipfp.getinfo('strfile') 431 self.assertEqual(zinfo.external_attr, 0600 << 16) 432 433 def test_writestr_permissions(self): 434 for f in (TESTFN2, TemporaryFile(), StringIO()): 435 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 436 437 def test_close(self): 438 """Check that the zipfile is closed after the 'with' block.""" 439 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 440 for fpath, fdata in SMALL_TEST_DATA: 441 zipfp.writestr(fpath, fdata) 442 self.assertTrue(zipfp.fp is not None, 'zipfp is not open') 443 self.assertTrue(zipfp.fp is None, 'zipfp is not closed') 444 445 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 446 self.assertTrue(zipfp.fp is not None, 'zipfp is not open') 447 self.assertTrue(zipfp.fp is None, 'zipfp is not closed') 448 449 def test_close_on_exception(self): 450 """Check that the zipfile is closed if an exception is raised in the 451 'with' block.""" 452 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 453 for fpath, fdata in SMALL_TEST_DATA: 454 zipfp.writestr(fpath, fdata) 455 456 try: 457 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 458 raise zipfile.BadZipfile() 459 except zipfile.BadZipfile: 460 self.assertTrue(zipfp2.fp is None, 'zipfp is not closed') 461 462 def tearDown(self): 463 unlink(TESTFN) 464 unlink(TESTFN2) 465 466 467 class TestZip64InSmallFiles(unittest.TestCase): 468 # These tests test the ZIP64 functionality without using large files, 469 # see test_zipfile64 for proper tests. 470 471 def setUp(self): 472 self._limit = zipfile.ZIP64_LIMIT 473 zipfile.ZIP64_LIMIT = 5 474 475 line_gen = ("Test of zipfile line %d." % i 476 for i in range(0, FIXEDTEST_SIZE)) 477 self.data = '\n'.join(line_gen) 478 479 # Make a source file with some lines 480 with open(TESTFN, "wb") as fp: 481 fp.write(self.data) 482 483 def large_file_exception_test(self, f, compression): 484 with zipfile.ZipFile(f, "w", compression) as zipfp: 485 self.assertRaises(zipfile.LargeZipFile, 486 zipfp.write, TESTFN, "another.name") 487 488 def large_file_exception_test2(self, f, compression): 489 with zipfile.ZipFile(f, "w", compression) as zipfp: 490 self.assertRaises(zipfile.LargeZipFile, 491 zipfp.writestr, "another.name", self.data) 492 493 def test_large_file_exception(self): 494 for f in (TESTFN2, TemporaryFile(), StringIO()): 495 self.large_file_exception_test(f, zipfile.ZIP_STORED) 496 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 497 498 def zip_test(self, f, compression): 499 # Create the ZIP archive 500 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 501 zipfp.write(TESTFN, "another.name") 502 zipfp.write(TESTFN, TESTFN) 503 zipfp.writestr("strfile", self.data) 504 505 # Read the ZIP archive 506 with zipfile.ZipFile(f, "r", compression) as zipfp: 507 self.assertEqual(zipfp.read(TESTFN), self.data) 508 self.assertEqual(zipfp.read("another.name"), self.data) 509 self.assertEqual(zipfp.read("strfile"), self.data) 510 511 # Print the ZIP directory 512 fp = StringIO() 513 stdout = sys.stdout 514 try: 515 sys.stdout = fp 516 zipfp.printdir() 517 finally: 518 sys.stdout = stdout 519 520 directory = fp.getvalue() 521 lines = directory.splitlines() 522 self.assertEqual(len(lines), 4) # Number of files + header 523 524 self.assertIn('File Name', lines[0]) 525 self.assertIn('Modified', lines[0]) 526 self.assertIn('Size', lines[0]) 527 528 fn, date, time_, size = lines[1].split() 529 self.assertEqual(fn, 'another.name') 530 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 531 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 532 self.assertEqual(size, str(len(self.data))) 533 534 # Check the namelist 535 names = zipfp.namelist() 536 self.assertEqual(len(names), 3) 537 self.assertIn(TESTFN, names) 538 self.assertIn("another.name", names) 539 self.assertIn("strfile", names) 540 541 # Check infolist 542 infos = zipfp.infolist() 543 names = [i.filename for i in infos] 544 self.assertEqual(len(names), 3) 545 self.assertIn(TESTFN, names) 546 self.assertIn("another.name", names) 547 self.assertIn("strfile", names) 548 for i in infos: 549 self.assertEqual(i.file_size, len(self.data)) 550 551 # check getinfo 552 for nm in (TESTFN, "another.name", "strfile"): 553 info = zipfp.getinfo(nm) 554 self.assertEqual(info.filename, nm) 555 self.assertEqual(info.file_size, len(self.data)) 556 557 # Check that testzip doesn't raise an exception 558 zipfp.testzip() 559 560 def test_stored(self): 561 for f in (TESTFN2, TemporaryFile(), StringIO()): 562 self.zip_test(f, zipfile.ZIP_STORED) 563 564 @skipUnless(zlib, "requires zlib") 565 def test_deflated(self): 566 for f in (TESTFN2, TemporaryFile(), StringIO()): 567 self.zip_test(f, zipfile.ZIP_DEFLATED) 568 569 def test_absolute_arcnames(self): 570 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 571 allowZip64=True) as zipfp: 572 zipfp.write(TESTFN, "/absolute") 573 574 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 575 self.assertEqual(zipfp.namelist(), ["absolute"]) 576 577 def tearDown(self): 578 zipfile.ZIP64_LIMIT = self._limit 579 unlink(TESTFN) 580 unlink(TESTFN2) 581 582 583 class PyZipFileTests(unittest.TestCase): 584 def test_write_pyfile(self): 585 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 586 fn = __file__ 587 if fn.endswith('.pyc') or fn.endswith('.pyo'): 588 fn = fn[:-1] 589 590 zipfp.writepy(fn) 591 592 bn = os.path.basename(fn) 593 self.assertNotIn(bn, zipfp.namelist()) 594 self.assertTrue(bn + 'o' in zipfp.namelist() or 595 bn + 'c' in zipfp.namelist()) 596 597 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 598 fn = __file__ 599 if fn.endswith(('.pyc', '.pyo')): 600 fn = fn[:-1] 601 602 zipfp.writepy(fn, "testpackage") 603 604 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 605 self.assertNotIn(bn, zipfp.namelist()) 606 self.assertTrue(bn + 'o' in zipfp.namelist() or 607 bn + 'c' in zipfp.namelist()) 608 609 def test_write_python_package(self): 610 import email 611 packagedir = os.path.dirname(email.__file__) 612 613 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 614 zipfp.writepy(packagedir) 615 616 # Check for a couple of modules at different levels of the 617 # hierarchy 618 names = zipfp.namelist() 619 self.assertTrue('email/__init__.pyo' in names or 620 'email/__init__.pyc' in names) 621 self.assertTrue('email/mime/text.pyo' in names or 622 'email/mime/text.pyc' in names) 623 624 def test_write_python_directory(self): 625 os.mkdir(TESTFN2) 626 try: 627 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 628 fp.write("print(42)\n") 629 630 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 631 fp.write("print(42 * 42)\n") 632 633 with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: 634 fp.write("bla bla bla\n") 635 636 zipfp = zipfile.PyZipFile(TemporaryFile(), "w") 637 zipfp.writepy(TESTFN2) 638 639 names = zipfp.namelist() 640 self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names) 641 self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names) 642 self.assertNotIn('mod2.txt', names) 643 644 finally: 645 shutil.rmtree(TESTFN2) 646 647 def test_write_non_pyfile(self): 648 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 649 open(TESTFN, 'w').write('most definitely not a python file') 650 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 651 os.remove(TESTFN) 652 653 654 class OtherTests(unittest.TestCase): 655 zips_with_bad_crc = { 656 zipfile.ZIP_STORED: ( 657 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 658 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 659 b'ilehello,AworldP' 660 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 661 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 662 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 663 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 664 b'\0\0/\0\0\0\0\0'), 665 zipfile.ZIP_DEFLATED: ( 666 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 667 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 668 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 669 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 670 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 671 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 672 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 673 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'), 674 } 675 676 def test_unicode_filenames(self): 677 with zipfile.ZipFile(TESTFN, "w") as zf: 678 zf.writestr(u"foo.txt", "Test for unicode filename") 679 zf.writestr(u"\xf6.txt", "Test for unicode filename") 680 self.assertIsInstance(zf.infolist()[0].filename, unicode) 681 682 with zipfile.ZipFile(TESTFN, "r") as zf: 683 self.assertEqual(zf.filelist[0].filename, "foo.txt") 684 self.assertEqual(zf.filelist[1].filename, u"\xf6.txt") 685 686 def test_create_non_existent_file_for_append(self): 687 if os.path.exists(TESTFN): 688 os.unlink(TESTFN) 689 690 filename = 'testfile.txt' 691 content = 'hello, world. this is some content.' 692 693 try: 694 with zipfile.ZipFile(TESTFN, 'a') as zf: 695 zf.writestr(filename, content) 696 except IOError: 697 self.fail('Could not append data to a non-existent zip file.') 698 699 self.assertTrue(os.path.exists(TESTFN)) 700 701 with zipfile.ZipFile(TESTFN, 'r') as zf: 702 self.assertEqual(zf.read(filename), content) 703 704 def test_close_erroneous_file(self): 705 # This test checks that the ZipFile constructor closes the file object 706 # it opens if there's an error in the file. If it doesn't, the 707 # traceback holds a reference to the ZipFile object and, indirectly, 708 # the file object. 709 # On Windows, this causes the os.unlink() call to fail because the 710 # underlying file is still open. This is SF bug #412214. 711 # 712 with open(TESTFN, "w") as fp: 713 fp.write("this is not a legal zip file\n") 714 try: 715 zf = zipfile.ZipFile(TESTFN) 716 except zipfile.BadZipfile: 717 pass 718 719 def test_is_zip_erroneous_file(self): 720 """Check that is_zipfile() correctly identifies non-zip files.""" 721 # - passing a filename 722 with open(TESTFN, "w") as fp: 723 fp.write("this is not a legal zip file\n") 724 chk = zipfile.is_zipfile(TESTFN) 725 self.assertFalse(chk) 726 # - passing a file object 727 with open(TESTFN, "rb") as fp: 728 chk = zipfile.is_zipfile(fp) 729 self.assertTrue(not chk) 730 # - passing a file-like object 731 fp = StringIO() 732 fp.write("this is not a legal zip file\n") 733 chk = zipfile.is_zipfile(fp) 734 self.assertTrue(not chk) 735 fp.seek(0, 0) 736 chk = zipfile.is_zipfile(fp) 737 self.assertTrue(not chk) 738 739 def test_is_zip_valid_file(self): 740 """Check that is_zipfile() correctly identifies zip files.""" 741 # - passing a filename 742 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 743 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 744 chk = zipfile.is_zipfile(TESTFN) 745 self.assertTrue(chk) 746 # - passing a file object 747 with open(TESTFN, "rb") as fp: 748 chk = zipfile.is_zipfile(fp) 749 self.assertTrue(chk) 750 fp.seek(0, 0) 751 zip_contents = fp.read() 752 # - passing a file-like object 753 fp = StringIO() 754 fp.write(zip_contents) 755 chk = zipfile.is_zipfile(fp) 756 self.assertTrue(chk) 757 fp.seek(0, 0) 758 chk = zipfile.is_zipfile(fp) 759 self.assertTrue(chk) 760 761 def test_non_existent_file_raises_IOError(self): 762 # make sure we don't raise an AttributeError when a partially-constructed 763 # ZipFile instance is finalized; this tests for regression on SF tracker 764 # bug #403871. 765 766 # The bug we're testing for caused an AttributeError to be raised 767 # when a ZipFile instance was created for a file that did not 768 # exist; the .fp member was not initialized but was needed by the 769 # __del__() method. Since the AttributeError is in the __del__(), 770 # it is ignored, but the user should be sufficiently annoyed by 771 # the message on the output that regression will be noticed 772 # quickly. 773 self.assertRaises(IOError, zipfile.ZipFile, TESTFN) 774 775 def test_empty_file_raises_BadZipFile(self): 776 with open(TESTFN, 'w') as f: 777 pass 778 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN) 779 780 with open(TESTFN, 'w') as fp: 781 fp.write("short file") 782 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN) 783 784 def test_closed_zip_raises_RuntimeError(self): 785 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 786 data = StringIO() 787 with zipfile.ZipFile(data, mode="w") as zipf: 788 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 789 790 # This is correct; calling .read on a closed ZipFile should throw 791 # a RuntimeError, and so should calling .testzip. An earlier 792 # version of .testzip would swallow this exception (and any other) 793 # and report that the first file in the archive was corrupt. 794 self.assertRaises(RuntimeError, zipf.read, "foo.txt") 795 self.assertRaises(RuntimeError, zipf.open, "foo.txt") 796 self.assertRaises(RuntimeError, zipf.testzip) 797 self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus") 798 open(TESTFN, 'w').write('zipfile test data') 799 self.assertRaises(RuntimeError, zipf.write, TESTFN) 800 801 def test_bad_constructor_mode(self): 802 """Check that bad modes passed to ZipFile constructor are caught.""" 803 self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q") 804 805 def test_bad_open_mode(self): 806 """Check that bad modes passed to ZipFile.open are caught.""" 807 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 808 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 809 810 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 811 # read the data to make sure the file is there 812 zipf.read("foo.txt") 813 self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q") 814 815 def test_read0(self): 816 """Check that calling read(0) on a ZipExtFile object returns an empty 817 string and doesn't advance file pointer.""" 818 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 819 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 820 # read the data to make sure the file is there 821 with zipf.open("foo.txt") as f: 822 for i in xrange(FIXEDTEST_SIZE): 823 self.assertEqual(f.read(0), '') 824 825 self.assertEqual(f.read(), "O, for a Muse of Fire!") 826 827 def test_open_non_existent_item(self): 828 """Check that attempting to call open() for an item that doesn't 829 exist in the archive raises a RuntimeError.""" 830 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 831 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 832 833 def test_bad_compression_mode(self): 834 """Check that bad compression methods passed to ZipFile.open are 835 caught.""" 836 self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1) 837 838 def test_null_byte_in_filename(self): 839 """Check that a filename containing a null byte is properly 840 terminated.""" 841 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 842 zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!") 843 self.assertEqual(zipf.namelist(), ['foo.txt']) 844 845 def test_struct_sizes(self): 846 """Check that ZIP internal structure sizes are calculated correctly.""" 847 self.assertEqual(zipfile.sizeEndCentDir, 22) 848 self.assertEqual(zipfile.sizeCentralDir, 46) 849 self.assertEqual(zipfile.sizeEndCentDir64, 56) 850 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 851 852 def test_comments(self): 853 """Check that comments on the archive are handled properly.""" 854 855 # check default comment is empty 856 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 857 self.assertEqual(zipf.comment, '') 858 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 859 860 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 861 self.assertEqual(zipf.comment, '') 862 863 # check a simple short comment 864 comment = 'Bravely taking to his feet, he beat a very brave retreat.' 865 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 866 zipf.comment = comment 867 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 868 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 869 self.assertEqual(zipf.comment, comment) 870 871 # check a comment of max length 872 comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)]) 873 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 874 zipf.comment = comment2 875 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 876 877 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 878 self.assertEqual(zipf.comment, comment2) 879 880 # check a comment that is too long is truncated 881 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 882 zipf.comment = comment2 + 'oops' 883 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 884 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 885 self.assertEqual(zipf.comment, comment2) 886 887 def check_testzip_with_bad_crc(self, compression): 888 """Tests that files with bad CRCs return their name from testzip.""" 889 zipdata = self.zips_with_bad_crc[compression] 890 891 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 892 # testzip returns the name of the first corrupt file, or None 893 self.assertEqual('afile', zipf.testzip()) 894 895 def test_testzip_with_bad_crc_stored(self): 896 self.check_testzip_with_bad_crc(zipfile.ZIP_STORED) 897 898 @skipUnless(zlib, "requires zlib") 899 def test_testzip_with_bad_crc_deflated(self): 900 self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED) 901 902 def check_read_with_bad_crc(self, compression): 903 """Tests that files with bad CRCs raise a BadZipfile exception when read.""" 904 zipdata = self.zips_with_bad_crc[compression] 905 906 # Using ZipFile.read() 907 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 908 self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile') 909 910 # Using ZipExtFile.read() 911 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 912 with zipf.open('afile', 'r') as corrupt_file: 913 self.assertRaises(zipfile.BadZipfile, corrupt_file.read) 914 915 # Same with small reads (in order to exercise the buffering logic) 916 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 917 with zipf.open('afile', 'r') as corrupt_file: 918 corrupt_file.MIN_READ_SIZE = 2 919 with self.assertRaises(zipfile.BadZipfile): 920 while corrupt_file.read(2): 921 pass 922 923 def test_read_with_bad_crc_stored(self): 924 self.check_read_with_bad_crc(zipfile.ZIP_STORED) 925 926 @skipUnless(zlib, "requires zlib") 927 def test_read_with_bad_crc_deflated(self): 928 self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED) 929 930 def check_read_return_size(self, compression): 931 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 932 # than requested. 933 for test_size in (1, 4095, 4096, 4097, 16384): 934 file_size = test_size + 1 935 junk = b''.join(struct.pack('B', randint(0, 255)) 936 for x in range(file_size)) 937 with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf: 938 zipf.writestr('foo', junk) 939 with zipf.open('foo', 'r') as fp: 940 buf = fp.read(test_size) 941 self.assertEqual(len(buf), test_size) 942 943 def test_read_return_size_stored(self): 944 self.check_read_return_size(zipfile.ZIP_STORED) 945 946 @skipUnless(zlib, "requires zlib") 947 def test_read_return_size_deflated(self): 948 self.check_read_return_size(zipfile.ZIP_DEFLATED) 949 950 def test_empty_zipfile(self): 951 # Check that creating a file in 'w' or 'a' mode and closing without 952 # adding any files to the archives creates a valid empty ZIP file 953 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 954 pass 955 try: 956 zipf = zipfile.ZipFile(TESTFN, mode="r") 957 except zipfile.BadZipfile: 958 self.fail("Unable to create empty ZIP file in 'w' mode") 959 960 with zipfile.ZipFile(TESTFN, mode="a") as zipf: 961 pass 962 try: 963 zipf = zipfile.ZipFile(TESTFN, mode="r") 964 except: 965 self.fail("Unable to create empty ZIP file in 'a' mode") 966 967 def test_open_empty_file(self): 968 # Issue 1710703: Check that opening a file with less than 22 bytes 969 # raises a BadZipfile exception (rather than the previously unhelpful 970 # IOError) 971 with open(TESTFN, 'w') as f: 972 pass 973 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r') 974 975 def tearDown(self): 976 unlink(TESTFN) 977 unlink(TESTFN2) 978 979 980 class DecryptionTests(unittest.TestCase): 981 """Check that ZIP decryption works. Since the library does not 982 support encryption at the moment, we use a pre-generated encrypted 983 ZIP file.""" 984 985 data = ( 986 'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 987 '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 988 '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 989 'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 990 '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 991 '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 992 '\x00\x00L\x00\x00\x00\x00\x00' ) 993 data2 = ( 994 'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 995 '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 996 '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 997 'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 998 '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 999 '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 1000 'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 1001 '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 1002 1003 plain = 'zipfile.py encryption test' 1004 plain2 = '\x00'*512 1005 1006 def setUp(self): 1007 with open(TESTFN, "wb") as fp: 1008 fp.write(self.data) 1009 self.zip = zipfile.ZipFile(TESTFN, "r") 1010 with open(TESTFN2, "wb") as fp: 1011 fp.write(self.data2) 1012 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 1013 1014 def tearDown(self): 1015 self.zip.close() 1016 os.unlink(TESTFN) 1017 self.zip2.close() 1018 os.unlink(TESTFN2) 1019 1020 def test_no_password(self): 1021 # Reading the encrypted file without password 1022 # must generate a RunTime exception 1023 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 1024 self.assertRaises(RuntimeError, self.zip2.read, "zero") 1025 1026 def test_bad_password(self): 1027 self.zip.setpassword("perl") 1028 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 1029 self.zip2.setpassword("perl") 1030 self.assertRaises(RuntimeError, self.zip2.read, "zero") 1031 1032 @skipUnless(zlib, "requires zlib") 1033 def test_good_password(self): 1034 self.zip.setpassword("python") 1035 self.assertEqual(self.zip.read("test.txt"), self.plain) 1036 self.zip2.setpassword("12345") 1037 self.assertEqual(self.zip2.read("zero"), self.plain2) 1038 1039 1040 class TestsWithRandomBinaryFiles(unittest.TestCase): 1041 def setUp(self): 1042 datacount = randint(16, 64)*1024 + randint(1, 1024) 1043 self.data = ''.join(struct.pack('<f', random()*randint(-1000, 1000)) 1044 for i in xrange(datacount)) 1045 1046 # Make a source file with some lines 1047 with open(TESTFN, "wb") as fp: 1048 fp.write(self.data) 1049 1050 def tearDown(self): 1051 unlink(TESTFN) 1052 unlink(TESTFN2) 1053 1054 def make_test_archive(self, f, compression): 1055 # Create the ZIP archive 1056 with zipfile.ZipFile(f, "w", compression) as zipfp: 1057 zipfp.write(TESTFN, "another.name") 1058 zipfp.write(TESTFN, TESTFN) 1059 1060 def zip_test(self, f, compression): 1061 self.make_test_archive(f, compression) 1062 1063 # Read the ZIP archive 1064 with zipfile.ZipFile(f, "r", compression) as zipfp: 1065 testdata = zipfp.read(TESTFN) 1066 self.assertEqual(len(testdata), len(self.data)) 1067 self.assertEqual(testdata, self.data) 1068 self.assertEqual(zipfp.read("another.name"), self.data) 1069 1070 def test_stored(self): 1071 for f in (TESTFN2, TemporaryFile(), StringIO()): 1072 self.zip_test(f, zipfile.ZIP_STORED) 1073 1074 @skipUnless(zlib, "requires zlib") 1075 def test_deflated(self): 1076 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1077 self.zip_test(f, zipfile.ZIP_DEFLATED) 1078 1079 def zip_open_test(self, f, compression): 1080 self.make_test_archive(f, compression) 1081 1082 # Read the ZIP archive 1083 with zipfile.ZipFile(f, "r", compression) as zipfp: 1084 zipdata1 = [] 1085 with zipfp.open(TESTFN) as zipopen1: 1086 while True: 1087 read_data = zipopen1.read(256) 1088 if not read_data: 1089 break 1090 zipdata1.append(read_data) 1091 1092 zipdata2 = [] 1093 with zipfp.open("another.name") as zipopen2: 1094 while True: 1095 read_data = zipopen2.read(256) 1096 if not read_data: 1097 break 1098 zipdata2.append(read_data) 1099 1100 testdata1 = ''.join(zipdata1) 1101 self.assertEqual(len(testdata1), len(self.data)) 1102 self.assertEqual(testdata1, self.data) 1103 1104 testdata2 = ''.join(zipdata2) 1105 self.assertEqual(len(testdata2), len(self.data)) 1106 self.assertEqual(testdata2, self.data) 1107 1108 def test_open_stored(self): 1109 for f in (TESTFN2, TemporaryFile(), StringIO()): 1110 self.zip_open_test(f, zipfile.ZIP_STORED) 1111 1112 @skipUnless(zlib, "requires zlib") 1113 def test_open_deflated(self): 1114 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1115 self.zip_open_test(f, zipfile.ZIP_DEFLATED) 1116 1117 def zip_random_open_test(self, f, compression): 1118 self.make_test_archive(f, compression) 1119 1120 # Read the ZIP archive 1121 with zipfile.ZipFile(f, "r", compression) as zipfp: 1122 zipdata1 = [] 1123 with zipfp.open(TESTFN) as zipopen1: 1124 while True: 1125 read_data = zipopen1.read(randint(1, 1024)) 1126 if not read_data: 1127 break 1128 zipdata1.append(read_data) 1129 1130 testdata = ''.join(zipdata1) 1131 self.assertEqual(len(testdata), len(self.data)) 1132 self.assertEqual(testdata, self.data) 1133 1134 def test_random_open_stored(self): 1135 for f in (TESTFN2, TemporaryFile(), StringIO()): 1136 self.zip_random_open_test(f, zipfile.ZIP_STORED) 1137 1138 @skipUnless(zlib, "requires zlib") 1139 def test_random_open_deflated(self): 1140 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1141 self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) 1142 1143 1144 @skipUnless(zlib, "requires zlib") 1145 class TestsWithMultipleOpens(unittest.TestCase): 1146 def setUp(self): 1147 # Create the ZIP archive 1148 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: 1149 zipfp.writestr('ones', '1'*FIXEDTEST_SIZE) 1150 zipfp.writestr('twos', '2'*FIXEDTEST_SIZE) 1151 1152 def test_same_file(self): 1153 # Verify that (when the ZipFile is in control of creating file objects) 1154 # multiple open() calls can be made without interfering with each other. 1155 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1156 zopen1 = zipf.open('ones') 1157 zopen2 = zipf.open('ones') 1158 data1 = zopen1.read(500) 1159 data2 = zopen2.read(500) 1160 data1 += zopen1.read(500) 1161 data2 += zopen2.read(500) 1162 self.assertEqual(data1, data2) 1163 1164 def test_different_file(self): 1165 # Verify that (when the ZipFile is in control of creating file objects) 1166 # multiple open() calls can be made without interfering with each other. 1167 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1168 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 1169 data1 = zopen1.read(500) 1170 data2 = zopen2.read(500) 1171 data1 += zopen1.read(500) 1172 data2 += zopen2.read(500) 1173 self.assertEqual(data1, '1'*FIXEDTEST_SIZE) 1174 self.assertEqual(data2, '2'*FIXEDTEST_SIZE) 1175 1176 def test_interleaved(self): 1177 # Verify that (when the ZipFile is in control of creating file objects) 1178 # multiple open() calls can be made without interfering with each other. 1179 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1180 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 1181 data1 = zopen1.read(500) 1182 data2 = zopen2.read(500) 1183 data1 += zopen1.read(500) 1184 data2 += zopen2.read(500) 1185 self.assertEqual(data1, '1'*FIXEDTEST_SIZE) 1186 self.assertEqual(data2, '2'*FIXEDTEST_SIZE) 1187 1188 def tearDown(self): 1189 unlink(TESTFN2) 1190 1191 1192 class TestWithDirectory(unittest.TestCase): 1193 def setUp(self): 1194 os.mkdir(TESTFN2) 1195 1196 def test_extract_dir(self): 1197 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 1198 zipf.extractall(TESTFN2) 1199 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 1200 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 1201 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 1202 1203 def test_bug_6050(self): 1204 # Extraction should succeed if directories already exist 1205 os.mkdir(os.path.join(TESTFN2, "a")) 1206 self.test_extract_dir() 1207 1208 def test_store_dir(self): 1209 os.mkdir(os.path.join(TESTFN2, "x")) 1210 zipf = zipfile.ZipFile(TESTFN, "w") 1211 zipf.write(os.path.join(TESTFN2, "x"), "x") 1212 self.assertTrue(zipf.filelist[0].filename.endswith("x/")) 1213 1214 def tearDown(self): 1215 shutil.rmtree(TESTFN2) 1216 if os.path.exists(TESTFN): 1217 unlink(TESTFN) 1218 1219 1220 class UniversalNewlineTests(unittest.TestCase): 1221 def setUp(self): 1222 self.line_gen = ["Test of zipfile line %d." % i 1223 for i in xrange(FIXEDTEST_SIZE)] 1224 self.seps = ('\r', '\r\n', '\n') 1225 self.arcdata, self.arcfiles = {}, {} 1226 for n, s in enumerate(self.seps): 1227 self.arcdata[s] = s.join(self.line_gen) + s 1228 self.arcfiles[s] = '%s-%d' % (TESTFN, n) 1229 open(self.arcfiles[s], "wb").write(self.arcdata[s]) 1230 1231 def make_test_archive(self, f, compression): 1232 # Create the ZIP archive 1233 with zipfile.ZipFile(f, "w", compression) as zipfp: 1234 for fn in self.arcfiles.values(): 1235 zipfp.write(fn, fn) 1236 1237 def read_test(self, f, compression): 1238 self.make_test_archive(f, compression) 1239 1240 # Read the ZIP archive 1241 with zipfile.ZipFile(f, "r") as zipfp: 1242 for sep, fn in self.arcfiles.items(): 1243 with zipfp.open(fn, "rU") as fp: 1244 zipdata = fp.read() 1245 self.assertEqual(self.arcdata[sep], zipdata) 1246 1247 def readline_read_test(self, f, compression): 1248 self.make_test_archive(f, compression) 1249 1250 # Read the ZIP archive 1251 zipfp = zipfile.ZipFile(f, "r") 1252 for sep, fn in self.arcfiles.items(): 1253 with zipfp.open(fn, "rU") as zipopen: 1254 data = '' 1255 while True: 1256 read = zipopen.readline() 1257 if not read: 1258 break 1259 data += read 1260 1261 read = zipopen.read(5) 1262 if not read: 1263 break 1264 data += read 1265 1266 self.assertEqual(data, self.arcdata['\n']) 1267 1268 zipfp.close() 1269 1270 def readline_test(self, f, compression): 1271 self.make_test_archive(f, compression) 1272 1273 # Read the ZIP archive 1274 with zipfile.ZipFile(f, "r") as zipfp: 1275 for sep, fn in self.arcfiles.items(): 1276 with zipfp.open(fn, "rU") as zipopen: 1277 for line in self.line_gen: 1278 linedata = zipopen.readline() 1279 self.assertEqual(linedata, line + '\n') 1280 1281 def readlines_test(self, f, compression): 1282 self.make_test_archive(f, compression) 1283 1284 # Read the ZIP archive 1285 with zipfile.ZipFile(f, "r") as zipfp: 1286 for sep, fn in self.arcfiles.items(): 1287 with zipfp.open(fn, "rU") as fp: 1288 ziplines = fp.readlines() 1289 for line, zipline in zip(self.line_gen, ziplines): 1290 self.assertEqual(zipline, line + '\n') 1291 1292 def iterlines_test(self, f, compression): 1293 self.make_test_archive(f, compression) 1294 1295 # Read the ZIP archive 1296 with zipfile.ZipFile(f, "r") as zipfp: 1297 for sep, fn in self.arcfiles.items(): 1298 for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")): 1299 self.assertEqual(zipline, line + '\n') 1300 1301 def test_read_stored(self): 1302 for f in (TESTFN2, TemporaryFile(), StringIO()): 1303 self.read_test(f, zipfile.ZIP_STORED) 1304 1305 def test_readline_read_stored(self): 1306 # Issue #7610: calls to readline() interleaved with calls to read(). 1307 for f in (TESTFN2, TemporaryFile(), StringIO()): 1308 self.readline_read_test(f, zipfile.ZIP_STORED) 1309 1310 def test_readline_stored(self): 1311 for f in (TESTFN2, TemporaryFile(), StringIO()): 1312 self.readline_test(f, zipfile.ZIP_STORED) 1313 1314 def test_readlines_stored(self): 1315 for f in (TESTFN2, TemporaryFile(), StringIO()): 1316 self.readlines_test(f, zipfile.ZIP_STORED) 1317 1318 def test_iterlines_stored(self): 1319 for f in (TESTFN2, TemporaryFile(), StringIO()): 1320 self.iterlines_test(f, zipfile.ZIP_STORED) 1321 1322 @skipUnless(zlib, "requires zlib") 1323 def test_read_deflated(self): 1324 for f in (TESTFN2, TemporaryFile(), StringIO()): 1325 self.read_test(f, zipfile.ZIP_DEFLATED) 1326 1327 @skipUnless(zlib, "requires zlib") 1328 def test_readline_read_deflated(self): 1329 # Issue #7610: calls to readline() interleaved with calls to read(). 1330 for f in (TESTFN2, TemporaryFile(), StringIO()): 1331 self.readline_read_test(f, zipfile.ZIP_DEFLATED) 1332 1333 @skipUnless(zlib, "requires zlib") 1334 def test_readline_deflated(self): 1335 for f in (TESTFN2, TemporaryFile(), StringIO()): 1336 self.readline_test(f, zipfile.ZIP_DEFLATED) 1337 1338 @skipUnless(zlib, "requires zlib") 1339 def test_readlines_deflated(self): 1340 for f in (TESTFN2, TemporaryFile(), StringIO()): 1341 self.readlines_test(f, zipfile.ZIP_DEFLATED) 1342 1343 @skipUnless(zlib, "requires zlib") 1344 def test_iterlines_deflated(self): 1345 for f in (TESTFN2, TemporaryFile(), StringIO()): 1346 self.iterlines_test(f, zipfile.ZIP_DEFLATED) 1347 1348 def tearDown(self): 1349 for sep, fn in self.arcfiles.items(): 1350 os.remove(fn) 1351 unlink(TESTFN) 1352 unlink(TESTFN2) 1353 1354 1355 def test_main(): 1356 run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests, 1357 PyZipFileTests, DecryptionTests, TestsWithMultipleOpens, 1358 TestWithDirectory, UniversalNewlineTests, 1359 TestsWithRandomBinaryFiles) 1360 1361 if __name__ == "__main__": 1362 test_main() 1363