1 # We can test part of the module without zlib. 2 try: 3 import zlib 4 except ImportError: 5 zlib = None 6 7 import os 8 import io 9 import sys 10 import time 11 import shutil 12 import struct 13 import zipfile 14 import unittest 15 16 from StringIO import StringIO 17 from tempfile import TemporaryFile 18 from random import randint, random 19 from unittest import skipUnless 20 21 from test.test_support import TESTFN, TESTFN_UNICODE, TESTFN_ENCODING, \ 22 run_unittest, findfile, unlink 23 try: 24 TESTFN_UNICODE.encode(TESTFN_ENCODING) 25 except (UnicodeError, TypeError): 26 # Either the file system encoding is None, or the file name 27 # cannot be encoded in the file system encoding. 28 TESTFN_UNICODE = None 29 30 TESTFN2 = TESTFN + "2" 31 TESTFNDIR = TESTFN + "d" 32 FIXEDTEST_SIZE = 1000 33 34 SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 35 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 36 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 37 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 38 39 40 class TestsWithSourceFile(unittest.TestCase): 41 def setUp(self): 42 self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random()) 43 for i in xrange(FIXEDTEST_SIZE)] 44 self.data = '\n'.join(self.line_gen) + '\n' 45 46 # Make a source file with some lines 47 with open(TESTFN, "wb") as fp: 48 fp.write(self.data) 49 50 def make_test_archive(self, f, compression): 51 # Create the ZIP archive 52 with zipfile.ZipFile(f, "w", compression) as zipfp: 53 zipfp.write(TESTFN, "another.name") 54 zipfp.write(TESTFN, TESTFN) 55 zipfp.writestr("strfile", self.data) 56 57 def zip_test(self, f, compression): 58 self.make_test_archive(f, compression) 59 60 # Read the ZIP archive 61 with zipfile.ZipFile(f, "r", compression) as zipfp: 62 self.assertEqual(zipfp.read(TESTFN), self.data) 63 self.assertEqual(zipfp.read("another.name"), self.data) 64 self.assertEqual(zipfp.read("strfile"), self.data) 65 66 # Print the ZIP directory 67 fp = StringIO() 68 stdout = sys.stdout 69 try: 70 sys.stdout = fp 71 zipfp.printdir() 72 finally: 73 sys.stdout = stdout 74 75 directory = fp.getvalue() 76 lines = directory.splitlines() 77 self.assertEqual(len(lines), 4) # Number of files + header 78 79 self.assertIn('File Name', lines[0]) 80 self.assertIn('Modified', lines[0]) 81 self.assertIn('Size', lines[0]) 82 83 fn, date, time_, size = lines[1].split() 84 self.assertEqual(fn, 'another.name') 85 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 86 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 87 self.assertEqual(size, str(len(self.data))) 88 89 # Check the namelist 90 names = zipfp.namelist() 91 self.assertEqual(len(names), 3) 92 self.assertIn(TESTFN, names) 93 self.assertIn("another.name", names) 94 self.assertIn("strfile", names) 95 96 # Check infolist 97 infos = zipfp.infolist() 98 names = [i.filename for i in infos] 99 self.assertEqual(len(names), 3) 100 self.assertIn(TESTFN, names) 101 self.assertIn("another.name", names) 102 self.assertIn("strfile", names) 103 for i in infos: 104 self.assertEqual(i.file_size, len(self.data)) 105 106 # check getinfo 107 for nm in (TESTFN, "another.name", "strfile"): 108 info = zipfp.getinfo(nm) 109 self.assertEqual(info.filename, nm) 110 self.assertEqual(info.file_size, len(self.data)) 111 112 # Check that testzip doesn't raise an exception 113 zipfp.testzip() 114 115 def test_stored(self): 116 for f in (TESTFN2, TemporaryFile(), StringIO()): 117 self.zip_test(f, zipfile.ZIP_STORED) 118 119 def zip_open_test(self, f, compression): 120 self.make_test_archive(f, compression) 121 122 # Read the ZIP archive 123 with zipfile.ZipFile(f, "r", compression) as zipfp: 124 zipdata1 = [] 125 with zipfp.open(TESTFN) as zipopen1: 126 while True: 127 read_data = zipopen1.read(256) 128 if not read_data: 129 break 130 zipdata1.append(read_data) 131 132 zipdata2 = [] 133 with zipfp.open("another.name") as zipopen2: 134 while True: 135 read_data = zipopen2.read(256) 136 if not read_data: 137 break 138 zipdata2.append(read_data) 139 140 self.assertEqual(''.join(zipdata1), self.data) 141 self.assertEqual(''.join(zipdata2), self.data) 142 143 def test_open_stored(self): 144 for f in (TESTFN2, TemporaryFile(), StringIO()): 145 self.zip_open_test(f, zipfile.ZIP_STORED) 146 147 def test_open_via_zip_info(self): 148 # Create the ZIP archive 149 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 150 zipfp.writestr("name", "foo") 151 zipfp.writestr("name", "bar") 152 153 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 154 infos = zipfp.infolist() 155 data = "" 156 for info in infos: 157 with zipfp.open(info) as f: 158 data += f.read() 159 self.assertTrue(data == "foobar" or data == "barfoo") 160 data = "" 161 for info in infos: 162 data += zipfp.read(info) 163 self.assertTrue(data == "foobar" or data == "barfoo") 164 165 def zip_random_open_test(self, f, compression): 166 self.make_test_archive(f, compression) 167 168 # Read the ZIP archive 169 with zipfile.ZipFile(f, "r", compression) as zipfp: 170 zipdata1 = [] 171 with zipfp.open(TESTFN) as zipopen1: 172 while True: 173 read_data = zipopen1.read(randint(1, 1024)) 174 if not read_data: 175 break 176 zipdata1.append(read_data) 177 178 self.assertEqual(''.join(zipdata1), self.data) 179 180 def test_random_open_stored(self): 181 for f in (TESTFN2, TemporaryFile(), StringIO()): 182 self.zip_random_open_test(f, zipfile.ZIP_STORED) 183 184 def test_univeral_readaheads(self): 185 f = StringIO() 186 187 data = 'a\r\n' * 16 * 1024 188 with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp: 189 zipfp.writestr(TESTFN, data) 190 191 data2 = '' 192 with zipfile.ZipFile(f, 'r') as zipfp: 193 with zipfp.open(TESTFN, 'rU') as zipopen: 194 for line in zipopen: 195 data2 += line 196 197 self.assertEqual(data, data2.replace('\n', '\r\n')) 198 199 def zip_readline_read_test(self, f, compression): 200 self.make_test_archive(f, compression) 201 202 # Read the ZIP archive 203 with zipfile.ZipFile(f, "r") as zipfp: 204 with zipfp.open(TESTFN) as zipopen: 205 data = '' 206 while True: 207 read = zipopen.readline() 208 if not read: 209 break 210 data += read 211 212 read = zipopen.read(100) 213 if not read: 214 break 215 data += read 216 217 self.assertEqual(data, self.data) 218 219 def zip_readline_test(self, f, compression): 220 self.make_test_archive(f, compression) 221 222 # Read the ZIP archive 223 with zipfile.ZipFile(f, "r") as zipfp: 224 with zipfp.open(TESTFN) as zipopen: 225 for line in self.line_gen: 226 linedata = zipopen.readline() 227 self.assertEqual(linedata, line + '\n') 228 229 def zip_readlines_test(self, f, compression): 230 self.make_test_archive(f, compression) 231 232 # Read the ZIP archive 233 with zipfile.ZipFile(f, "r") as zipfp: 234 with zipfp.open(TESTFN) as zo: 235 ziplines = zo.readlines() 236 for line, zipline in zip(self.line_gen, ziplines): 237 self.assertEqual(zipline, line + '\n') 238 239 def zip_iterlines_test(self, f, compression): 240 self.make_test_archive(f, compression) 241 242 # Read the ZIP archive 243 with zipfile.ZipFile(f, "r") as zipfp: 244 for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)): 245 self.assertEqual(zipline, line + '\n') 246 247 def test_readline_read_stored(self): 248 # Issue #7610: calls to readline() interleaved with calls to read(). 249 for f in (TESTFN2, TemporaryFile(), StringIO()): 250 self.zip_readline_read_test(f, zipfile.ZIP_STORED) 251 252 def test_readline_stored(self): 253 for f in (TESTFN2, TemporaryFile(), StringIO()): 254 self.zip_readline_test(f, zipfile.ZIP_STORED) 255 256 def test_readlines_stored(self): 257 for f in (TESTFN2, TemporaryFile(), StringIO()): 258 self.zip_readlines_test(f, zipfile.ZIP_STORED) 259 260 def test_iterlines_stored(self): 261 for f in (TESTFN2, TemporaryFile(), StringIO()): 262 self.zip_iterlines_test(f, zipfile.ZIP_STORED) 263 264 @skipUnless(zlib, "requires zlib") 265 def test_deflated(self): 266 for f in (TESTFN2, TemporaryFile(), StringIO()): 267 self.zip_test(f, zipfile.ZIP_DEFLATED) 268 269 @skipUnless(zlib, "requires zlib") 270 def test_open_deflated(self): 271 for f in (TESTFN2, TemporaryFile(), StringIO()): 272 self.zip_open_test(f, zipfile.ZIP_DEFLATED) 273 274 @skipUnless(zlib, "requires zlib") 275 def test_random_open_deflated(self): 276 for f in (TESTFN2, TemporaryFile(), StringIO()): 277 self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) 278 279 @skipUnless(zlib, "requires zlib") 280 def test_readline_read_deflated(self): 281 # Issue #7610: calls to readline() interleaved with calls to read(). 282 for f in (TESTFN2, TemporaryFile(), StringIO()): 283 self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED) 284 285 @skipUnless(zlib, "requires zlib") 286 def test_readline_deflated(self): 287 for f in (TESTFN2, TemporaryFile(), StringIO()): 288 self.zip_readline_test(f, zipfile.ZIP_DEFLATED) 289 290 @skipUnless(zlib, "requires zlib") 291 def test_readlines_deflated(self): 292 for f in (TESTFN2, TemporaryFile(), StringIO()): 293 self.zip_readlines_test(f, zipfile.ZIP_DEFLATED) 294 295 @skipUnless(zlib, "requires zlib") 296 def test_iterlines_deflated(self): 297 for f in (TESTFN2, TemporaryFile(), StringIO()): 298 self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED) 299 300 @skipUnless(zlib, "requires zlib") 301 def test_low_compression(self): 302 """Check for cases where compressed data is larger than original.""" 303 # Create the ZIP archive 304 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: 305 zipfp.writestr("strfile", '12') 306 307 # Get an open object for strfile 308 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp: 309 with zipfp.open("strfile") as openobj: 310 self.assertEqual(openobj.read(1), '1') 311 self.assertEqual(openobj.read(1), '2') 312 313 def test_absolute_arcnames(self): 314 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 315 zipfp.write(TESTFN, "/absolute") 316 317 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 318 self.assertEqual(zipfp.namelist(), ["absolute"]) 319 320 def test_append_to_zip_file(self): 321 """Test appending to an existing zipfile.""" 322 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 323 zipfp.write(TESTFN, TESTFN) 324 325 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 326 zipfp.writestr("strfile", self.data) 327 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 328 329 def test_append_to_non_zip_file(self): 330 """Test appending to an existing file that is not a zipfile.""" 331 # NOTE: this test fails if len(d) < 22 because of the first 332 # line "fpin.seek(-22, 2)" in _EndRecData 333 data = 'I am not a ZipFile!'*10 334 with open(TESTFN2, 'wb') as f: 335 f.write(data) 336 337 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 338 zipfp.write(TESTFN, TESTFN) 339 340 with open(TESTFN2, 'rb') as f: 341 f.seek(len(data)) 342 with zipfile.ZipFile(f, "r") as zipfp: 343 self.assertEqual(zipfp.namelist(), [TESTFN]) 344 345 def test_ignores_newline_at_end(self): 346 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 347 zipfp.write(TESTFN, TESTFN) 348 with open(TESTFN2, 'a') as f: 349 f.write("\r\n\00\00\00") 350 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 351 self.assertIsInstance(zipfp, zipfile.ZipFile) 352 353 def test_ignores_stuff_appended_past_comments(self): 354 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 355 zipfp.comment = b"this is a comment" 356 zipfp.write(TESTFN, TESTFN) 357 with open(TESTFN2, 'a') as f: 358 f.write("abcdef\r\n") 359 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 360 self.assertIsInstance(zipfp, zipfile.ZipFile) 361 self.assertEqual(zipfp.comment, b"this is a comment") 362 363 def test_write_default_name(self): 364 """Check that calling ZipFile.write without arcname specified 365 produces the expected result.""" 366 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 367 zipfp.write(TESTFN) 368 self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read()) 369 370 @skipUnless(zlib, "requires zlib") 371 def test_per_file_compression(self): 372 """Check that files within a Zip archive can have different 373 compression options.""" 374 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 375 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 376 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 377 sinfo = zipfp.getinfo('storeme') 378 dinfo = zipfp.getinfo('deflateme') 379 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 380 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 381 382 def test_write_to_readonly(self): 383 """Check that trying to call write() on a readonly ZipFile object 384 raises a RuntimeError.""" 385 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 386 zipfp.writestr("somefile.txt", "bogus") 387 388 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 389 self.assertRaises(RuntimeError, zipfp.write, TESTFN) 390 391 def test_extract(self): 392 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 393 for fpath, fdata in SMALL_TEST_DATA: 394 zipfp.writestr(fpath, fdata) 395 396 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 397 for fpath, fdata in SMALL_TEST_DATA: 398 writtenfile = zipfp.extract(fpath) 399 400 # make sure it was written to the right place 401 correctfile = os.path.join(os.getcwd(), fpath) 402 correctfile = os.path.normpath(correctfile) 403 404 self.assertEqual(writtenfile, correctfile) 405 406 # make sure correct data is in correct file 407 self.assertEqual(fdata, open(writtenfile, "rb").read()) 408 os.remove(writtenfile) 409 410 # remove the test file subdirectories 411 shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) 412 413 def test_extract_all(self): 414 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 415 for fpath, fdata in SMALL_TEST_DATA: 416 zipfp.writestr(fpath, fdata) 417 418 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 419 zipfp.extractall() 420 for fpath, fdata in SMALL_TEST_DATA: 421 outfile = os.path.join(os.getcwd(), fpath) 422 423 self.assertEqual(fdata, open(outfile, "rb").read()) 424 os.remove(outfile) 425 426 # remove the test file subdirectories 427 shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) 428 429 def check_file(self, filename, content): 430 self.assertTrue(os.path.isfile(filename)) 431 with open(filename, 'rb') as f: 432 self.assertEqual(f.read(), content) 433 434 @skipUnless(TESTFN_UNICODE, "No Unicode filesystem semantics on this platform.") 435 def test_extract_unicode_filenames(self): 436 fnames = [u'foo.txt', os.path.basename(TESTFN_UNICODE)] 437 content = 'Test for unicode filename' 438 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 439 for fname in fnames: 440 zipfp.writestr(fname, content) 441 442 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 443 for fname in fnames: 444 writtenfile = zipfp.extract(fname) 445 446 # make sure it was written to the right place 447 correctfile = os.path.join(os.getcwd(), fname) 448 correctfile = os.path.normpath(correctfile) 449 self.assertEqual(writtenfile, correctfile) 450 451 self.check_file(writtenfile, content) 452 os.remove(writtenfile) 453 454 def test_extract_hackers_arcnames(self): 455 hacknames = [ 456 ('../foo/bar', 'foo/bar'), 457 ('foo/../bar', 'foo/bar'), 458 ('foo/../../bar', 'foo/bar'), 459 ('foo/bar/..', 'foo/bar'), 460 ('./../foo/bar', 'foo/bar'), 461 ('/foo/bar', 'foo/bar'), 462 ('/foo/../bar', 'foo/bar'), 463 ('/foo/../../bar', 'foo/bar'), 464 ] 465 if os.path.sep == '\\': 466 hacknames.extend([ 467 (r'..\foo\bar', 'foo/bar'), 468 (r'..\/foo\/bar', 'foo/bar'), 469 (r'foo/\..\/bar', 'foo/bar'), 470 (r'foo\/../\bar', 'foo/bar'), 471 (r'C:foo/bar', 'foo/bar'), 472 (r'C:/foo/bar', 'foo/bar'), 473 (r'C://foo/bar', 'foo/bar'), 474 (r'C:\foo\bar', 'foo/bar'), 475 (r'//conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 476 (r'\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 477 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 478 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 479 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 480 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 481 (r'//?/C:/foo/bar', '_/C_/foo/bar'), 482 (r'\\?\C:\foo\bar', '_/C_/foo/bar'), 483 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 484 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 485 ('../../foo../../ba..r', 'foo/ba..r'), 486 ]) 487 else: # Unix 488 hacknames.extend([ 489 ('//foo/bar', 'foo/bar'), 490 ('../../foo../../ba..r', 'foo../ba..r'), 491 (r'foo/..\bar', r'foo/..\bar'), 492 ]) 493 494 for arcname, fixedname in hacknames: 495 content = b'foobar' + arcname.encode() 496 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 497 zinfo = zipfile.ZipInfo() 498 # preserve backslashes 499 zinfo.filename = arcname 500 zinfo.external_attr = 0o600 << 16 501 zipfp.writestr(zinfo, content) 502 503 arcname = arcname.replace(os.sep, "/") 504 targetpath = os.path.join('target', 'subdir', 'subsub') 505 correctfile = os.path.join(targetpath, *fixedname.split('/')) 506 507 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 508 writtenfile = zipfp.extract(arcname, targetpath) 509 self.assertEqual(writtenfile, correctfile, 510 msg="extract %r" % arcname) 511 self.check_file(correctfile, content) 512 shutil.rmtree('target') 513 514 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 515 zipfp.extractall(targetpath) 516 self.check_file(correctfile, content) 517 shutil.rmtree('target') 518 519 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 520 521 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 522 writtenfile = zipfp.extract(arcname) 523 self.assertEqual(writtenfile, correctfile, 524 msg="extract %r" % arcname) 525 self.check_file(correctfile, content) 526 shutil.rmtree(fixedname.split('/')[0]) 527 528 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 529 zipfp.extractall() 530 self.check_file(correctfile, content) 531 shutil.rmtree(fixedname.split('/')[0]) 532 533 os.remove(TESTFN2) 534 535 def test_writestr_compression(self): 536 zipfp = zipfile.ZipFile(TESTFN2, "w") 537 zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) 538 if zlib: 539 zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) 540 541 info = zipfp.getinfo('a.txt') 542 self.assertEqual(info.compress_type, zipfile.ZIP_STORED) 543 544 if zlib: 545 info = zipfp.getinfo('b.txt') 546 self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) 547 548 549 def zip_test_writestr_permissions(self, f, compression): 550 # Make sure that writestr creates files with mode 0600, 551 # when it is passed a name rather than a ZipInfo instance. 552 553 self.make_test_archive(f, compression) 554 with zipfile.ZipFile(f, "r") as zipfp: 555 zinfo = zipfp.getinfo('strfile') 556 self.assertEqual(zinfo.external_attr, 0600 << 16) 557 558 def test_writestr_permissions(self): 559 for f in (TESTFN2, TemporaryFile(), StringIO()): 560 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 561 562 def test_close(self): 563 """Check that the zipfile is closed after the 'with' block.""" 564 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 565 for fpath, fdata in SMALL_TEST_DATA: 566 zipfp.writestr(fpath, fdata) 567 self.assertTrue(zipfp.fp is not None, 'zipfp is not open') 568 self.assertTrue(zipfp.fp is None, 'zipfp is not closed') 569 570 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 571 self.assertTrue(zipfp.fp is not None, 'zipfp is not open') 572 self.assertTrue(zipfp.fp is None, 'zipfp is not closed') 573 574 def test_close_on_exception(self): 575 """Check that the zipfile is closed if an exception is raised in the 576 'with' block.""" 577 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 578 for fpath, fdata in SMALL_TEST_DATA: 579 zipfp.writestr(fpath, fdata) 580 581 try: 582 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 583 raise zipfile.BadZipfile() 584 except zipfile.BadZipfile: 585 self.assertTrue(zipfp2.fp is None, 'zipfp is not closed') 586 587 def test_add_file_before_1980(self): 588 # Set atime and mtime to 1970-01-01 589 os.utime(TESTFN, (0, 0)) 590 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 591 self.assertRaises(ValueError, zipfp.write, TESTFN) 592 593 def tearDown(self): 594 unlink(TESTFN) 595 unlink(TESTFN2) 596 597 598 class TestZip64InSmallFiles(unittest.TestCase): 599 # These tests test the ZIP64 functionality without using large files, 600 # see test_zipfile64 for proper tests. 601 602 def setUp(self): 603 self._limit = zipfile.ZIP64_LIMIT 604 zipfile.ZIP64_LIMIT = 5 605 606 line_gen = ("Test of zipfile line %d." % i 607 for i in range(0, FIXEDTEST_SIZE)) 608 self.data = '\n'.join(line_gen) 609 610 # Make a source file with some lines 611 with open(TESTFN, "wb") as fp: 612 fp.write(self.data) 613 614 def large_file_exception_test(self, f, compression): 615 with zipfile.ZipFile(f, "w", compression) as zipfp: 616 self.assertRaises(zipfile.LargeZipFile, 617 zipfp.write, TESTFN, "another.name") 618 619 def large_file_exception_test2(self, f, compression): 620 with zipfile.ZipFile(f, "w", compression) as zipfp: 621 self.assertRaises(zipfile.LargeZipFile, 622 zipfp.writestr, "another.name", self.data) 623 624 def test_large_file_exception(self): 625 for f in (TESTFN2, TemporaryFile(), StringIO()): 626 self.large_file_exception_test(f, zipfile.ZIP_STORED) 627 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 628 629 def zip_test(self, f, compression): 630 # Create the ZIP archive 631 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 632 zipfp.write(TESTFN, "another.name") 633 zipfp.write(TESTFN, TESTFN) 634 zipfp.writestr("strfile", self.data) 635 636 # Read the ZIP archive 637 with zipfile.ZipFile(f, "r", compression) as zipfp: 638 self.assertEqual(zipfp.read(TESTFN), self.data) 639 self.assertEqual(zipfp.read("another.name"), self.data) 640 self.assertEqual(zipfp.read("strfile"), self.data) 641 642 # Print the ZIP directory 643 fp = StringIO() 644 stdout = sys.stdout 645 try: 646 sys.stdout = fp 647 zipfp.printdir() 648 finally: 649 sys.stdout = stdout 650 651 directory = fp.getvalue() 652 lines = directory.splitlines() 653 self.assertEqual(len(lines), 4) # Number of files + header 654 655 self.assertIn('File Name', lines[0]) 656 self.assertIn('Modified', lines[0]) 657 self.assertIn('Size', lines[0]) 658 659 fn, date, time_, size = lines[1].split() 660 self.assertEqual(fn, 'another.name') 661 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 662 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 663 self.assertEqual(size, str(len(self.data))) 664 665 # Check the namelist 666 names = zipfp.namelist() 667 self.assertEqual(len(names), 3) 668 self.assertIn(TESTFN, names) 669 self.assertIn("another.name", names) 670 self.assertIn("strfile", names) 671 672 # Check infolist 673 infos = zipfp.infolist() 674 names = [i.filename for i in infos] 675 self.assertEqual(len(names), 3) 676 self.assertIn(TESTFN, names) 677 self.assertIn("another.name", names) 678 self.assertIn("strfile", names) 679 for i in infos: 680 self.assertEqual(i.file_size, len(self.data)) 681 682 # check getinfo 683 for nm in (TESTFN, "another.name", "strfile"): 684 info = zipfp.getinfo(nm) 685 self.assertEqual(info.filename, nm) 686 self.assertEqual(info.file_size, len(self.data)) 687 688 # Check that testzip doesn't raise an exception 689 zipfp.testzip() 690 691 def test_stored(self): 692 for f in (TESTFN2, TemporaryFile(), StringIO()): 693 self.zip_test(f, zipfile.ZIP_STORED) 694 695 @skipUnless(zlib, "requires zlib") 696 def test_deflated(self): 697 for f in (TESTFN2, TemporaryFile(), StringIO()): 698 self.zip_test(f, zipfile.ZIP_DEFLATED) 699 700 def test_absolute_arcnames(self): 701 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 702 allowZip64=True) as zipfp: 703 zipfp.write(TESTFN, "/absolute") 704 705 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 706 self.assertEqual(zipfp.namelist(), ["absolute"]) 707 708 def tearDown(self): 709 zipfile.ZIP64_LIMIT = self._limit 710 unlink(TESTFN) 711 unlink(TESTFN2) 712 713 714 class PyZipFileTests(unittest.TestCase): 715 def test_write_pyfile(self): 716 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 717 fn = __file__ 718 if fn.endswith('.pyc') or fn.endswith('.pyo'): 719 fn = fn[:-1] 720 721 zipfp.writepy(fn) 722 723 bn = os.path.basename(fn) 724 self.assertNotIn(bn, zipfp.namelist()) 725 self.assertTrue(bn + 'o' in zipfp.namelist() or 726 bn + 'c' in zipfp.namelist()) 727 728 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 729 fn = __file__ 730 if fn.endswith(('.pyc', '.pyo')): 731 fn = fn[:-1] 732 733 zipfp.writepy(fn, "testpackage") 734 735 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 736 self.assertNotIn(bn, zipfp.namelist()) 737 self.assertTrue(bn + 'o' in zipfp.namelist() or 738 bn + 'c' in zipfp.namelist()) 739 740 def test_write_python_package(self): 741 import email 742 packagedir = os.path.dirname(email.__file__) 743 744 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 745 zipfp.writepy(packagedir) 746 747 # Check for a couple of modules at different levels of the 748 # hierarchy 749 names = zipfp.namelist() 750 self.assertTrue('email/__init__.pyo' in names or 751 'email/__init__.pyc' in names) 752 self.assertTrue('email/mime/text.pyo' in names or 753 'email/mime/text.pyc' in names) 754 755 def test_write_python_directory(self): 756 os.mkdir(TESTFN2) 757 try: 758 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 759 fp.write("print(42)\n") 760 761 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 762 fp.write("print(42 * 42)\n") 763 764 with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: 765 fp.write("bla bla bla\n") 766 767 zipfp = zipfile.PyZipFile(TemporaryFile(), "w") 768 zipfp.writepy(TESTFN2) 769 770 names = zipfp.namelist() 771 self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names) 772 self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names) 773 self.assertNotIn('mod2.txt', names) 774 775 finally: 776 shutil.rmtree(TESTFN2) 777 778 def test_write_non_pyfile(self): 779 with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp: 780 open(TESTFN, 'w').write('most definitely not a python file') 781 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 782 os.remove(TESTFN) 783 784 785 class OtherTests(unittest.TestCase): 786 zips_with_bad_crc = { 787 zipfile.ZIP_STORED: ( 788 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 789 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 790 b'ilehello,AworldP' 791 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 792 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 793 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 794 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 795 b'\0\0/\0\0\0\0\0'), 796 zipfile.ZIP_DEFLATED: ( 797 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 798 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 799 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 800 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 801 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 802 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 803 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 804 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'), 805 } 806 807 def test_unicode_filenames(self): 808 with zipfile.ZipFile(TESTFN, "w") as zf: 809 zf.writestr(u"foo.txt", "Test for unicode filename") 810 zf.writestr(u"\xf6.txt", "Test for unicode filename") 811 self.assertIsInstance(zf.infolist()[0].filename, unicode) 812 813 with zipfile.ZipFile(TESTFN, "r") as zf: 814 self.assertEqual(zf.filelist[0].filename, "foo.txt") 815 self.assertEqual(zf.filelist[1].filename, u"\xf6.txt") 816 817 def test_create_non_existent_file_for_append(self): 818 if os.path.exists(TESTFN): 819 os.unlink(TESTFN) 820 821 filename = 'testfile.txt' 822 content = 'hello, world. this is some content.' 823 824 try: 825 with zipfile.ZipFile(TESTFN, 'a') as zf: 826 zf.writestr(filename, content) 827 except IOError: 828 self.fail('Could not append data to a non-existent zip file.') 829 830 self.assertTrue(os.path.exists(TESTFN)) 831 832 with zipfile.ZipFile(TESTFN, 'r') as zf: 833 self.assertEqual(zf.read(filename), content) 834 835 def test_close_erroneous_file(self): 836 # This test checks that the ZipFile constructor closes the file object 837 # it opens if there's an error in the file. If it doesn't, the 838 # traceback holds a reference to the ZipFile object and, indirectly, 839 # the file object. 840 # On Windows, this causes the os.unlink() call to fail because the 841 # underlying file is still open. This is SF bug #412214. 842 # 843 with open(TESTFN, "w") as fp: 844 fp.write("this is not a legal zip file\n") 845 try: 846 zf = zipfile.ZipFile(TESTFN) 847 except zipfile.BadZipfile: 848 pass 849 850 def test_is_zip_erroneous_file(self): 851 """Check that is_zipfile() correctly identifies non-zip files.""" 852 # - passing a filename 853 with open(TESTFN, "w") as fp: 854 fp.write("this is not a legal zip file\n") 855 chk = zipfile.is_zipfile(TESTFN) 856 self.assertFalse(chk) 857 # - passing a file object 858 with open(TESTFN, "rb") as fp: 859 chk = zipfile.is_zipfile(fp) 860 self.assertTrue(not chk) 861 # - passing a file-like object 862 fp = StringIO() 863 fp.write("this is not a legal zip file\n") 864 chk = zipfile.is_zipfile(fp) 865 self.assertTrue(not chk) 866 fp.seek(0, 0) 867 chk = zipfile.is_zipfile(fp) 868 self.assertTrue(not chk) 869 870 def test_damaged_zipfile(self): 871 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 872 # - Create a valid zip file 873 fp = io.BytesIO() 874 with zipfile.ZipFile(fp, mode="w") as zipf: 875 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 876 zipfiledata = fp.getvalue() 877 878 # - Now create copies of it missing the last N bytes and make sure 879 # a BadZipFile exception is raised when we try to open it 880 for N in range(len(zipfiledata)): 881 fp = io.BytesIO(zipfiledata[:N]) 882 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp) 883 884 def test_is_zip_valid_file(self): 885 """Check that is_zipfile() correctly identifies zip files.""" 886 # - passing a filename 887 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 888 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 889 chk = zipfile.is_zipfile(TESTFN) 890 self.assertTrue(chk) 891 # - passing a file object 892 with open(TESTFN, "rb") as fp: 893 chk = zipfile.is_zipfile(fp) 894 self.assertTrue(chk) 895 fp.seek(0, 0) 896 zip_contents = fp.read() 897 # - passing a file-like object 898 fp = StringIO() 899 fp.write(zip_contents) 900 chk = zipfile.is_zipfile(fp) 901 self.assertTrue(chk) 902 fp.seek(0, 0) 903 chk = zipfile.is_zipfile(fp) 904 self.assertTrue(chk) 905 906 def test_non_existent_file_raises_IOError(self): 907 # make sure we don't raise an AttributeError when a partially-constructed 908 # ZipFile instance is finalized; this tests for regression on SF tracker 909 # bug #403871. 910 911 # The bug we're testing for caused an AttributeError to be raised 912 # when a ZipFile instance was created for a file that did not 913 # exist; the .fp member was not initialized but was needed by the 914 # __del__() method. Since the AttributeError is in the __del__(), 915 # it is ignored, but the user should be sufficiently annoyed by 916 # the message on the output that regression will be noticed 917 # quickly. 918 self.assertRaises(IOError, zipfile.ZipFile, TESTFN) 919 920 def test_empty_file_raises_BadZipFile(self): 921 with open(TESTFN, 'w') as f: 922 pass 923 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN) 924 925 with open(TESTFN, 'w') as fp: 926 fp.write("short file") 927 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN) 928 929 def test_closed_zip_raises_RuntimeError(self): 930 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 931 data = StringIO() 932 with zipfile.ZipFile(data, mode="w") as zipf: 933 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 934 935 # This is correct; calling .read on a closed ZipFile should raise 936 # a RuntimeError, and so should calling .testzip. An earlier 937 # version of .testzip would swallow this exception (and any other) 938 # and report that the first file in the archive was corrupt. 939 self.assertRaises(RuntimeError, zipf.read, "foo.txt") 940 self.assertRaises(RuntimeError, zipf.open, "foo.txt") 941 self.assertRaises(RuntimeError, zipf.testzip) 942 self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus") 943 open(TESTFN, 'w').write('zipfile test data') 944 self.assertRaises(RuntimeError, zipf.write, TESTFN) 945 946 def test_bad_constructor_mode(self): 947 """Check that bad modes passed to ZipFile constructor are caught.""" 948 self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q") 949 950 def test_bad_open_mode(self): 951 """Check that bad modes passed to ZipFile.open are caught.""" 952 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 953 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 954 955 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 956 # read the data to make sure the file is there 957 zipf.read("foo.txt") 958 self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q") 959 960 def test_read0(self): 961 """Check that calling read(0) on a ZipExtFile object returns an empty 962 string and doesn't advance file pointer.""" 963 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 964 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 965 # read the data to make sure the file is there 966 with zipf.open("foo.txt") as f: 967 for i in xrange(FIXEDTEST_SIZE): 968 self.assertEqual(f.read(0), '') 969 970 self.assertEqual(f.read(), "O, for a Muse of Fire!") 971 972 def test_open_non_existent_item(self): 973 """Check that attempting to call open() for an item that doesn't 974 exist in the archive raises a RuntimeError.""" 975 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 976 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 977 978 def test_bad_compression_mode(self): 979 """Check that bad compression methods passed to ZipFile.open are 980 caught.""" 981 self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1) 982 983 def test_unsupported_compression(self): 984 # data is declared as shrunk, but actually deflated 985 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 986 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 987 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 988 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 989 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 990 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 991 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 992 self.assertRaises(NotImplementedError, zipf.open, 'x') 993 994 def test_null_byte_in_filename(self): 995 """Check that a filename containing a null byte is properly 996 terminated.""" 997 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 998 zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!") 999 self.assertEqual(zipf.namelist(), ['foo.txt']) 1000 1001 def test_struct_sizes(self): 1002 """Check that ZIP internal structure sizes are calculated correctly.""" 1003 self.assertEqual(zipfile.sizeEndCentDir, 22) 1004 self.assertEqual(zipfile.sizeCentralDir, 46) 1005 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1006 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1007 1008 def test_comments(self): 1009 """Check that comments on the archive are handled properly.""" 1010 1011 # check default comment is empty 1012 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1013 self.assertEqual(zipf.comment, '') 1014 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1015 1016 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1017 self.assertEqual(zipf.comment, '') 1018 1019 # check a simple short comment 1020 comment = 'Bravely taking to his feet, he beat a very brave retreat.' 1021 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1022 zipf.comment = comment 1023 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1024 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1025 self.assertEqual(zipf.comment, comment) 1026 1027 # check a comment of max length 1028 comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)]) 1029 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1030 zipf.comment = comment2 1031 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1032 1033 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1034 self.assertEqual(zipf.comment, comment2) 1035 1036 # check a comment that is too long is truncated 1037 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1038 zipf.comment = comment2 + 'oops' 1039 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1040 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1041 self.assertEqual(zipf.comment, comment2) 1042 1043 def test_change_comment_in_empty_archive(self): 1044 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1045 self.assertFalse(zipf.filelist) 1046 zipf.comment = b"this is a comment" 1047 with zipfile.ZipFile(TESTFN, "r") as zipf: 1048 self.assertEqual(zipf.comment, b"this is a comment") 1049 1050 def test_change_comment_in_nonempty_archive(self): 1051 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1052 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1053 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1054 self.assertTrue(zipf.filelist) 1055 zipf.comment = b"this is a comment" 1056 with zipfile.ZipFile(TESTFN, "r") as zipf: 1057 self.assertEqual(zipf.comment, b"this is a comment") 1058 1059 def check_testzip_with_bad_crc(self, compression): 1060 """Tests that files with bad CRCs return their name from testzip.""" 1061 zipdata = self.zips_with_bad_crc[compression] 1062 1063 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1064 # testzip returns the name of the first corrupt file, or None 1065 self.assertEqual('afile', zipf.testzip()) 1066 1067 def test_testzip_with_bad_crc_stored(self): 1068 self.check_testzip_with_bad_crc(zipfile.ZIP_STORED) 1069 1070 @skipUnless(zlib, "requires zlib") 1071 def test_testzip_with_bad_crc_deflated(self): 1072 self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED) 1073 1074 def check_read_with_bad_crc(self, compression): 1075 """Tests that files with bad CRCs raise a BadZipfile exception when read.""" 1076 zipdata = self.zips_with_bad_crc[compression] 1077 1078 # Using ZipFile.read() 1079 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1080 self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile') 1081 1082 # Using ZipExtFile.read() 1083 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1084 with zipf.open('afile', 'r') as corrupt_file: 1085 self.assertRaises(zipfile.BadZipfile, corrupt_file.read) 1086 1087 # Same with small reads (in order to exercise the buffering logic) 1088 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 1089 with zipf.open('afile', 'r') as corrupt_file: 1090 corrupt_file.MIN_READ_SIZE = 2 1091 with self.assertRaises(zipfile.BadZipfile): 1092 while corrupt_file.read(2): 1093 pass 1094 1095 def test_read_with_bad_crc_stored(self): 1096 self.check_read_with_bad_crc(zipfile.ZIP_STORED) 1097 1098 @skipUnless(zlib, "requires zlib") 1099 def test_read_with_bad_crc_deflated(self): 1100 self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED) 1101 1102 def check_read_return_size(self, compression): 1103 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 1104 # than requested. 1105 for test_size in (1, 4095, 4096, 4097, 16384): 1106 file_size = test_size + 1 1107 junk = b''.join(struct.pack('B', randint(0, 255)) 1108 for x in range(file_size)) 1109 with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf: 1110 zipf.writestr('foo', junk) 1111 with zipf.open('foo', 'r') as fp: 1112 buf = fp.read(test_size) 1113 self.assertEqual(len(buf), test_size) 1114 1115 def test_read_return_size_stored(self): 1116 self.check_read_return_size(zipfile.ZIP_STORED) 1117 1118 @skipUnless(zlib, "requires zlib") 1119 def test_read_return_size_deflated(self): 1120 self.check_read_return_size(zipfile.ZIP_DEFLATED) 1121 1122 def test_empty_zipfile(self): 1123 # Check that creating a file in 'w' or 'a' mode and closing without 1124 # adding any files to the archives creates a valid empty ZIP file 1125 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1126 pass 1127 try: 1128 zipf = zipfile.ZipFile(TESTFN, mode="r") 1129 except zipfile.BadZipfile: 1130 self.fail("Unable to create empty ZIP file in 'w' mode") 1131 1132 with zipfile.ZipFile(TESTFN, mode="a") as zipf: 1133 pass 1134 try: 1135 zipf = zipfile.ZipFile(TESTFN, mode="r") 1136 except: 1137 self.fail("Unable to create empty ZIP file in 'a' mode") 1138 1139 def test_open_empty_file(self): 1140 # Issue 1710703: Check that opening a file with less than 22 bytes 1141 # raises a BadZipfile exception (rather than the previously unhelpful 1142 # IOError) 1143 with open(TESTFN, 'w') as f: 1144 pass 1145 self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r') 1146 1147 def test_create_zipinfo_before_1980(self): 1148 self.assertRaises(ValueError, 1149 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1150 1151 def tearDown(self): 1152 unlink(TESTFN) 1153 unlink(TESTFN2) 1154 1155 1156 class DecryptionTests(unittest.TestCase): 1157 """Check that ZIP decryption works. Since the library does not 1158 support encryption at the moment, we use a pre-generated encrypted 1159 ZIP file.""" 1160 1161 data = ( 1162 'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 1163 '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 1164 '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 1165 'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 1166 '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 1167 '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 1168 '\x00\x00L\x00\x00\x00\x00\x00' ) 1169 data2 = ( 1170 'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 1171 '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 1172 '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 1173 'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 1174 '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 1175 '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 1176 'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 1177 '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 1178 1179 plain = 'zipfile.py encryption test' 1180 plain2 = '\x00'*512 1181 1182 def setUp(self): 1183 with open(TESTFN, "wb") as fp: 1184 fp.write(self.data) 1185 self.zip = zipfile.ZipFile(TESTFN, "r") 1186 with open(TESTFN2, "wb") as fp: 1187 fp.write(self.data2) 1188 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 1189 1190 def tearDown(self): 1191 self.zip.close() 1192 os.unlink(TESTFN) 1193 self.zip2.close() 1194 os.unlink(TESTFN2) 1195 1196 def test_no_password(self): 1197 # Reading the encrypted file without password 1198 # must generate a RunTime exception 1199 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 1200 self.assertRaises(RuntimeError, self.zip2.read, "zero") 1201 1202 def test_bad_password(self): 1203 self.zip.setpassword("perl") 1204 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 1205 self.zip2.setpassword("perl") 1206 self.assertRaises(RuntimeError, self.zip2.read, "zero") 1207 1208 @skipUnless(zlib, "requires zlib") 1209 def test_good_password(self): 1210 self.zip.setpassword("python") 1211 self.assertEqual(self.zip.read("test.txt"), self.plain) 1212 self.zip2.setpassword("12345") 1213 self.assertEqual(self.zip2.read("zero"), self.plain2) 1214 1215 1216 class TestsWithRandomBinaryFiles(unittest.TestCase): 1217 def setUp(self): 1218 datacount = randint(16, 64)*1024 + randint(1, 1024) 1219 self.data = ''.join(struct.pack('<f', random()*randint(-1000, 1000)) 1220 for i in xrange(datacount)) 1221 1222 # Make a source file with some lines 1223 with open(TESTFN, "wb") as fp: 1224 fp.write(self.data) 1225 1226 def tearDown(self): 1227 unlink(TESTFN) 1228 unlink(TESTFN2) 1229 1230 def make_test_archive(self, f, compression): 1231 # Create the ZIP archive 1232 with zipfile.ZipFile(f, "w", compression) as zipfp: 1233 zipfp.write(TESTFN, "another.name") 1234 zipfp.write(TESTFN, TESTFN) 1235 1236 def zip_test(self, f, compression): 1237 self.make_test_archive(f, compression) 1238 1239 # Read the ZIP archive 1240 with zipfile.ZipFile(f, "r", compression) as zipfp: 1241 testdata = zipfp.read(TESTFN) 1242 self.assertEqual(len(testdata), len(self.data)) 1243 self.assertEqual(testdata, self.data) 1244 self.assertEqual(zipfp.read("another.name"), self.data) 1245 1246 def test_stored(self): 1247 for f in (TESTFN2, TemporaryFile(), StringIO()): 1248 self.zip_test(f, zipfile.ZIP_STORED) 1249 1250 @skipUnless(zlib, "requires zlib") 1251 def test_deflated(self): 1252 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1253 self.zip_test(f, zipfile.ZIP_DEFLATED) 1254 1255 def zip_open_test(self, f, compression): 1256 self.make_test_archive(f, compression) 1257 1258 # Read the ZIP archive 1259 with zipfile.ZipFile(f, "r", compression) as zipfp: 1260 zipdata1 = [] 1261 with zipfp.open(TESTFN) as zipopen1: 1262 while True: 1263 read_data = zipopen1.read(256) 1264 if not read_data: 1265 break 1266 zipdata1.append(read_data) 1267 1268 zipdata2 = [] 1269 with zipfp.open("another.name") as zipopen2: 1270 while True: 1271 read_data = zipopen2.read(256) 1272 if not read_data: 1273 break 1274 zipdata2.append(read_data) 1275 1276 testdata1 = ''.join(zipdata1) 1277 self.assertEqual(len(testdata1), len(self.data)) 1278 self.assertEqual(testdata1, self.data) 1279 1280 testdata2 = ''.join(zipdata2) 1281 self.assertEqual(len(testdata2), len(self.data)) 1282 self.assertEqual(testdata2, self.data) 1283 1284 def test_open_stored(self): 1285 for f in (TESTFN2, TemporaryFile(), StringIO()): 1286 self.zip_open_test(f, zipfile.ZIP_STORED) 1287 1288 @skipUnless(zlib, "requires zlib") 1289 def test_open_deflated(self): 1290 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1291 self.zip_open_test(f, zipfile.ZIP_DEFLATED) 1292 1293 def zip_random_open_test(self, f, compression): 1294 self.make_test_archive(f, compression) 1295 1296 # Read the ZIP archive 1297 with zipfile.ZipFile(f, "r", compression) as zipfp: 1298 zipdata1 = [] 1299 with zipfp.open(TESTFN) as zipopen1: 1300 while True: 1301 read_data = zipopen1.read(randint(1, 1024)) 1302 if not read_data: 1303 break 1304 zipdata1.append(read_data) 1305 1306 testdata = ''.join(zipdata1) 1307 self.assertEqual(len(testdata), len(self.data)) 1308 self.assertEqual(testdata, self.data) 1309 1310 def test_random_open_stored(self): 1311 for f in (TESTFN2, TemporaryFile(), StringIO()): 1312 self.zip_random_open_test(f, zipfile.ZIP_STORED) 1313 1314 @skipUnless(zlib, "requires zlib") 1315 def test_random_open_deflated(self): 1316 for f in (TESTFN2, TemporaryFile(), io.BytesIO()): 1317 self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) 1318 1319 1320 @skipUnless(zlib, "requires zlib") 1321 class TestsWithMultipleOpens(unittest.TestCase): 1322 def setUp(self): 1323 # Create the ZIP archive 1324 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: 1325 zipfp.writestr('ones', '1'*FIXEDTEST_SIZE) 1326 zipfp.writestr('twos', '2'*FIXEDTEST_SIZE) 1327 1328 def test_same_file(self): 1329 # Verify that (when the ZipFile is in control of creating file objects) 1330 # multiple open() calls can be made without interfering with each other. 1331 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1332 zopen1 = zipf.open('ones') 1333 zopen2 = zipf.open('ones') 1334 data1 = zopen1.read(500) 1335 data2 = zopen2.read(500) 1336 data1 += zopen1.read(500) 1337 data2 += zopen2.read(500) 1338 self.assertEqual(data1, data2) 1339 1340 def test_different_file(self): 1341 # Verify that (when the ZipFile is in control of creating file objects) 1342 # multiple open() calls can be made without interfering with each other. 1343 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1344 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 1345 data1 = zopen1.read(500) 1346 data2 = zopen2.read(500) 1347 data1 += zopen1.read(500) 1348 data2 += zopen2.read(500) 1349 self.assertEqual(data1, '1'*FIXEDTEST_SIZE) 1350 self.assertEqual(data2, '2'*FIXEDTEST_SIZE) 1351 1352 def test_interleaved(self): 1353 # Verify that (when the ZipFile is in control of creating file objects) 1354 # multiple open() calls can be made without interfering with each other. 1355 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 1356 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 1357 data1 = zopen1.read(500) 1358 data2 = zopen2.read(500) 1359 data1 += zopen1.read(500) 1360 data2 += zopen2.read(500) 1361 self.assertEqual(data1, '1'*FIXEDTEST_SIZE) 1362 self.assertEqual(data2, '2'*FIXEDTEST_SIZE) 1363 1364 def tearDown(self): 1365 unlink(TESTFN2) 1366 1367 1368 class TestWithDirectory(unittest.TestCase): 1369 def setUp(self): 1370 os.mkdir(TESTFN2) 1371 1372 def test_extract_dir(self): 1373 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 1374 zipf.extractall(TESTFN2) 1375 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 1376 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 1377 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 1378 1379 def test_bug_6050(self): 1380 # Extraction should succeed if directories already exist 1381 os.mkdir(os.path.join(TESTFN2, "a")) 1382 self.test_extract_dir() 1383 1384 def test_store_dir(self): 1385 os.mkdir(os.path.join(TESTFN2, "x")) 1386 zipf = zipfile.ZipFile(TESTFN, "w") 1387 zipf.write(os.path.join(TESTFN2, "x"), "x") 1388 self.assertTrue(zipf.filelist[0].filename.endswith("x/")) 1389 1390 def tearDown(self): 1391 shutil.rmtree(TESTFN2) 1392 if os.path.exists(TESTFN): 1393 unlink(TESTFN) 1394 1395 1396 class UniversalNewlineTests(unittest.TestCase): 1397 def setUp(self): 1398 self.line_gen = ["Test of zipfile line %d." % i 1399 for i in xrange(FIXEDTEST_SIZE)] 1400 self.seps = ('\r', '\r\n', '\n') 1401 self.arcdata, self.arcfiles = {}, {} 1402 for n, s in enumerate(self.seps): 1403 self.arcdata[s] = s.join(self.line_gen) + s 1404 self.arcfiles[s] = '%s-%d' % (TESTFN, n) 1405 open(self.arcfiles[s], "wb").write(self.arcdata[s]) 1406 1407 def make_test_archive(self, f, compression): 1408 # Create the ZIP archive 1409 with zipfile.ZipFile(f, "w", compression) as zipfp: 1410 for fn in self.arcfiles.values(): 1411 zipfp.write(fn, fn) 1412 1413 def read_test(self, f, compression): 1414 self.make_test_archive(f, compression) 1415 1416 # Read the ZIP archive 1417 with zipfile.ZipFile(f, "r") as zipfp: 1418 for sep, fn in self.arcfiles.items(): 1419 with zipfp.open(fn, "rU") as fp: 1420 zipdata = fp.read() 1421 self.assertEqual(self.arcdata[sep], zipdata) 1422 1423 def readline_read_test(self, f, compression): 1424 self.make_test_archive(f, compression) 1425 1426 # Read the ZIP archive 1427 zipfp = zipfile.ZipFile(f, "r") 1428 for sep, fn in self.arcfiles.items(): 1429 with zipfp.open(fn, "rU") as zipopen: 1430 data = '' 1431 while True: 1432 read = zipopen.readline() 1433 if not read: 1434 break 1435 data += read 1436 1437 read = zipopen.read(5) 1438 if not read: 1439 break 1440 data += read 1441 1442 self.assertEqual(data, self.arcdata['\n']) 1443 1444 zipfp.close() 1445 1446 def readline_test(self, f, compression): 1447 self.make_test_archive(f, compression) 1448 1449 # Read the ZIP archive 1450 with zipfile.ZipFile(f, "r") as zipfp: 1451 for sep, fn in self.arcfiles.items(): 1452 with zipfp.open(fn, "rU") as zipopen: 1453 for line in self.line_gen: 1454 linedata = zipopen.readline() 1455 self.assertEqual(linedata, line + '\n') 1456 1457 def readlines_test(self, f, compression): 1458 self.make_test_archive(f, compression) 1459 1460 # Read the ZIP archive 1461 with zipfile.ZipFile(f, "r") as zipfp: 1462 for sep, fn in self.arcfiles.items(): 1463 with zipfp.open(fn, "rU") as fp: 1464 ziplines = fp.readlines() 1465 for line, zipline in zip(self.line_gen, ziplines): 1466 self.assertEqual(zipline, line + '\n') 1467 1468 def iterlines_test(self, f, compression): 1469 self.make_test_archive(f, compression) 1470 1471 # Read the ZIP archive 1472 with zipfile.ZipFile(f, "r") as zipfp: 1473 for sep, fn in self.arcfiles.items(): 1474 for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")): 1475 self.assertEqual(zipline, line + '\n') 1476 1477 def test_read_stored(self): 1478 for f in (TESTFN2, TemporaryFile(), StringIO()): 1479 self.read_test(f, zipfile.ZIP_STORED) 1480 1481 def test_readline_read_stored(self): 1482 # Issue #7610: calls to readline() interleaved with calls to read(). 1483 for f in (TESTFN2, TemporaryFile(), StringIO()): 1484 self.readline_read_test(f, zipfile.ZIP_STORED) 1485 1486 def test_readline_stored(self): 1487 for f in (TESTFN2, TemporaryFile(), StringIO()): 1488 self.readline_test(f, zipfile.ZIP_STORED) 1489 1490 def test_readlines_stored(self): 1491 for f in (TESTFN2, TemporaryFile(), StringIO()): 1492 self.readlines_test(f, zipfile.ZIP_STORED) 1493 1494 def test_iterlines_stored(self): 1495 for f in (TESTFN2, TemporaryFile(), StringIO()): 1496 self.iterlines_test(f, zipfile.ZIP_STORED) 1497 1498 @skipUnless(zlib, "requires zlib") 1499 def test_read_deflated(self): 1500 for f in (TESTFN2, TemporaryFile(), StringIO()): 1501 self.read_test(f, zipfile.ZIP_DEFLATED) 1502 1503 @skipUnless(zlib, "requires zlib") 1504 def test_readline_read_deflated(self): 1505 # Issue #7610: calls to readline() interleaved with calls to read(). 1506 for f in (TESTFN2, TemporaryFile(), StringIO()): 1507 self.readline_read_test(f, zipfile.ZIP_DEFLATED) 1508 1509 @skipUnless(zlib, "requires zlib") 1510 def test_readline_deflated(self): 1511 for f in (TESTFN2, TemporaryFile(), StringIO()): 1512 self.readline_test(f, zipfile.ZIP_DEFLATED) 1513 1514 @skipUnless(zlib, "requires zlib") 1515 def test_readlines_deflated(self): 1516 for f in (TESTFN2, TemporaryFile(), StringIO()): 1517 self.readlines_test(f, zipfile.ZIP_DEFLATED) 1518 1519 @skipUnless(zlib, "requires zlib") 1520 def test_iterlines_deflated(self): 1521 for f in (TESTFN2, TemporaryFile(), StringIO()): 1522 self.iterlines_test(f, zipfile.ZIP_DEFLATED) 1523 1524 def tearDown(self): 1525 for sep, fn in self.arcfiles.items(): 1526 os.remove(fn) 1527 unlink(TESTFN) 1528 unlink(TESTFN2) 1529 1530 1531 def test_main(): 1532 run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests, 1533 PyZipFileTests, DecryptionTests, TestsWithMultipleOpens, 1534 TestWithDirectory, UniversalNewlineTests, 1535 TestsWithRandomBinaryFiles) 1536 1537 if __name__ == "__main__": 1538 test_main() 1539