1 import sys 2 import os 3 import io 4 from hashlib import md5 5 from contextlib import contextmanager 6 from random import Random 7 8 import unittest 9 import unittest.mock 10 import tarfile 11 12 from test import support 13 from test.support import script_helper 14 15 # Check for our compression modules. 16 try: 17 import gzip 18 except ImportError: 19 gzip = None 20 try: 21 import bz2 22 except ImportError: 23 bz2 = None 24 try: 25 import lzma 26 except ImportError: 27 lzma = None 28 29 def md5sum(data): 30 return md5(data).hexdigest() 31 32 TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 33 tarextdir = TEMPDIR + '-extract-test' 34 tarname = support.findfile("testtar.tar") 35 gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 36 bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 37 xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 38 tmpname = os.path.join(TEMPDIR, "tmp.tar") 39 dotlessname = os.path.join(TEMPDIR, "testtar") 40 41 md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 42 md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 43 44 45 class TarTest: 46 tarname = tarname 47 suffix = '' 48 open = io.FileIO 49 taropen = tarfile.TarFile.taropen 50 51 @property 52 def mode(self): 53 return self.prefix + self.suffix 54 55 @support.requires_gzip 56 class GzipTest: 57 tarname = gzipname 58 suffix = 'gz' 59 open = gzip.GzipFile if gzip else None 60 taropen = tarfile.TarFile.gzopen 61 62 @support.requires_bz2 63 class Bz2Test: 64 tarname = bz2name 65 suffix = 'bz2' 66 open = bz2.BZ2File if bz2 else None 67 taropen = tarfile.TarFile.bz2open 68 69 @support.requires_lzma 70 class LzmaTest: 71 tarname = xzname 72 suffix = 'xz' 73 open = lzma.LZMAFile if lzma else None 74 taropen = tarfile.TarFile.xzopen 75 76 77 class ReadTest(TarTest): 78 79 prefix = "r:" 80 81 def setUp(self): 82 self.tar = tarfile.open(self.tarname, mode=self.mode, 83 encoding="iso8859-1") 84 85 def tearDown(self): 86 self.tar.close() 87 88 89 class UstarReadTest(ReadTest, unittest.TestCase): 90 91 def test_fileobj_regular_file(self): 92 tarinfo = self.tar.getmember("ustar/regtype") 93 with self.tar.extractfile(tarinfo) as fobj: 94 data = fobj.read() 95 self.assertEqual(len(data), tarinfo.size, 96 "regular file extraction failed") 97 self.assertEqual(md5sum(data), md5_regtype, 98 "regular file extraction failed") 99 100 def test_fileobj_readlines(self): 101 self.tar.extract("ustar/regtype", TEMPDIR) 102 tarinfo = self.tar.getmember("ustar/regtype") 103 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 104 lines1 = fobj1.readlines() 105 106 with self.tar.extractfile(tarinfo) as fobj: 107 fobj2 = io.TextIOWrapper(fobj) 108 lines2 = fobj2.readlines() 109 self.assertEqual(lines1, lines2, 110 "fileobj.readlines() failed") 111 self.assertEqual(len(lines2), 114, 112 "fileobj.readlines() failed") 113 self.assertEqual(lines2[83], 114 "I will gladly admit that Python is not the fastest " 115 "running scripting language.\n", 116 "fileobj.readlines() failed") 117 118 def test_fileobj_iter(self): 119 self.tar.extract("ustar/regtype", TEMPDIR) 120 tarinfo = self.tar.getmember("ustar/regtype") 121 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 122 lines1 = fobj1.readlines() 123 with self.tar.extractfile(tarinfo) as fobj2: 124 lines2 = list(io.TextIOWrapper(fobj2)) 125 self.assertEqual(lines1, lines2, 126 "fileobj.__iter__() failed") 127 128 def test_fileobj_seek(self): 129 self.tar.extract("ustar/regtype", TEMPDIR) 130 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 131 data = fobj.read() 132 133 tarinfo = self.tar.getmember("ustar/regtype") 134 fobj = self.tar.extractfile(tarinfo) 135 136 text = fobj.read() 137 fobj.seek(0) 138 self.assertEqual(0, fobj.tell(), 139 "seek() to file's start failed") 140 fobj.seek(2048, 0) 141 self.assertEqual(2048, fobj.tell(), 142 "seek() to absolute position failed") 143 fobj.seek(-1024, 1) 144 self.assertEqual(1024, fobj.tell(), 145 "seek() to negative relative position failed") 146 fobj.seek(1024, 1) 147 self.assertEqual(2048, fobj.tell(), 148 "seek() to positive relative position failed") 149 s = fobj.read(10) 150 self.assertEqual(s, data[2048:2058], 151 "read() after seek failed") 152 fobj.seek(0, 2) 153 self.assertEqual(tarinfo.size, fobj.tell(), 154 "seek() to file's end failed") 155 self.assertEqual(fobj.read(), b"", 156 "read() at file's end did not return empty string") 157 fobj.seek(-tarinfo.size, 2) 158 self.assertEqual(0, fobj.tell(), 159 "relative seek() to file's end failed") 160 fobj.seek(512) 161 s1 = fobj.readlines() 162 fobj.seek(512) 163 s2 = fobj.readlines() 164 self.assertEqual(s1, s2, 165 "readlines() after seek failed") 166 fobj.seek(0) 167 self.assertEqual(len(fobj.readline()), fobj.tell(), 168 "tell() after readline() failed") 169 fobj.seek(512) 170 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 171 "tell() after seek() and readline() failed") 172 fobj.seek(0) 173 line = fobj.readline() 174 self.assertEqual(fobj.read(), data[len(line):], 175 "read() after readline() failed") 176 fobj.close() 177 178 def test_fileobj_text(self): 179 with self.tar.extractfile("ustar/regtype") as fobj: 180 fobj = io.TextIOWrapper(fobj) 181 data = fobj.read().encode("iso8859-1") 182 self.assertEqual(md5sum(data), md5_regtype) 183 try: 184 fobj.seek(100) 185 except AttributeError: 186 # Issue #13815: seek() complained about a missing 187 # flush() method. 188 self.fail("seeking failed in text mode") 189 190 # Test if symbolic and hard links are resolved by extractfile(). The 191 # test link members each point to a regular member whose data is 192 # supposed to be exported. 193 def _test_fileobj_link(self, lnktype, regtype): 194 with self.tar.extractfile(lnktype) as a, \ 195 self.tar.extractfile(regtype) as b: 196 self.assertEqual(a.name, b.name) 197 198 def test_fileobj_link1(self): 199 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 200 201 def test_fileobj_link2(self): 202 self._test_fileobj_link("./ustar/linktest2/lnktype", 203 "ustar/linktest1/regtype") 204 205 def test_fileobj_symlink1(self): 206 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 207 208 def test_fileobj_symlink2(self): 209 self._test_fileobj_link("./ustar/linktest2/symtype", 210 "ustar/linktest1/regtype") 211 212 def test_issue14160(self): 213 self._test_fileobj_link("symtype2", "ustar/regtype") 214 215 class GzipUstarReadTest(GzipTest, UstarReadTest): 216 pass 217 218 class Bz2UstarReadTest(Bz2Test, UstarReadTest): 219 pass 220 221 class LzmaUstarReadTest(LzmaTest, UstarReadTest): 222 pass 223 224 225 class ListTest(ReadTest, unittest.TestCase): 226 227 # Override setUp to use default encoding (UTF-8) 228 def setUp(self): 229 self.tar = tarfile.open(self.tarname, mode=self.mode) 230 231 def test_list(self): 232 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 233 with support.swap_attr(sys, 'stdout', tio): 234 self.tar.list(verbose=False) 235 out = tio.detach().getvalue() 236 self.assertIn(b'ustar/conttype', out) 237 self.assertIn(b'ustar/regtype', out) 238 self.assertIn(b'ustar/lnktype', out) 239 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 240 self.assertIn(b'./ustar/linktest2/symtype', out) 241 self.assertIn(b'./ustar/linktest2/lnktype', out) 242 # Make sure it puts trailing slash for directory 243 self.assertIn(b'ustar/dirtype/', out) 244 self.assertIn(b'ustar/dirtype-with-size/', out) 245 # Make sure it is able to print unencodable characters 246 def conv(b): 247 s = b.decode(self.tar.encoding, 'surrogateescape') 248 return s.encode('ascii', 'backslashreplace') 249 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 250 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 251 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 252 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 253 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 254 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 255 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 256 # Make sure it prints files separated by one newline without any 257 # 'ls -l'-like accessories if verbose flag is not being used 258 # ... 259 # ustar/conttype 260 # ustar/regtype 261 # ... 262 self.assertRegex(out, br'ustar/conttype ?\r?\n' 263 br'ustar/regtype ?\r?\n') 264 # Make sure it does not print the source of link without verbose flag 265 self.assertNotIn(b'link to', out) 266 self.assertNotIn(b'->', out) 267 268 def test_list_verbose(self): 269 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 270 with support.swap_attr(sys, 'stdout', tio): 271 self.tar.list(verbose=True) 272 out = tio.detach().getvalue() 273 # Make sure it prints files separated by one newline with 'ls -l'-like 274 # accessories if verbose flag is being used 275 # ... 276 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 277 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 278 # ... 279 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 280 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 281 br'ustar/\w+type ?\r?\n') * 2) 282 # Make sure it prints the source of link with verbose flag 283 self.assertIn(b'ustar/symtype -> regtype', out) 284 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 285 self.assertIn(b'./ustar/linktest2/lnktype link to ' 286 b'./ustar/linktest1/regtype', out) 287 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 288 (b'/123' * 125) + b'/longname', out) 289 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 290 (b'/123' * 125) + b'/longname', out) 291 292 def test_list_members(self): 293 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 294 def members(tar): 295 for tarinfo in tar.getmembers(): 296 if 'reg' in tarinfo.name: 297 yield tarinfo 298 with support.swap_attr(sys, 'stdout', tio): 299 self.tar.list(verbose=False, members=members(self.tar)) 300 out = tio.detach().getvalue() 301 self.assertIn(b'ustar/regtype', out) 302 self.assertNotIn(b'ustar/conttype', out) 303 304 305 class GzipListTest(GzipTest, ListTest): 306 pass 307 308 309 class Bz2ListTest(Bz2Test, ListTest): 310 pass 311 312 313 class LzmaListTest(LzmaTest, ListTest): 314 pass 315 316 317 class CommonReadTest(ReadTest): 318 319 def test_empty_tarfile(self): 320 # Test for issue6123: Allow opening empty archives. 321 # This test checks if tarfile.open() is able to open an empty tar 322 # archive successfully. Note that an empty tar archive is not the 323 # same as an empty file! 324 with tarfile.open(tmpname, self.mode.replace("r", "w")): 325 pass 326 try: 327 tar = tarfile.open(tmpname, self.mode) 328 tar.getnames() 329 except tarfile.ReadError: 330 self.fail("tarfile.open() failed on empty archive") 331 else: 332 self.assertListEqual(tar.getmembers(), []) 333 finally: 334 tar.close() 335 336 def test_non_existent_tarfile(self): 337 # Test for issue11513: prevent non-existent gzipped tarfiles raising 338 # multiple exceptions. 339 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 340 tarfile.open("xxx", self.mode) 341 342 def test_null_tarfile(self): 343 # Test for issue6123: Allow opening empty archives. 344 # This test guarantees that tarfile.open() does not treat an empty 345 # file as an empty tar archive. 346 with open(tmpname, "wb"): 347 pass 348 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 349 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 350 351 def test_ignore_zeros(self): 352 # Test TarFile's ignore_zeros option. 353 # generate 512 pseudorandom bytes 354 data = Random(0).getrandbits(512*8).to_bytes(512, 'big') 355 for char in (b'\0', b'a'): 356 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 357 # are ignored correctly. 358 with self.open(tmpname, "w") as fobj: 359 fobj.write(char * 1024) 360 tarinfo = tarfile.TarInfo("foo") 361 tarinfo.size = len(data) 362 fobj.write(tarinfo.tobuf()) 363 fobj.write(data) 364 365 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 366 try: 367 self.assertListEqual(tar.getnames(), ["foo"], 368 "ignore_zeros=True should have skipped the %r-blocks" % 369 char) 370 finally: 371 tar.close() 372 373 def test_premature_end_of_archive(self): 374 for size in (512, 600, 1024, 1200): 375 with tarfile.open(tmpname, "w:") as tar: 376 t = tarfile.TarInfo("foo") 377 t.size = 1024 378 tar.addfile(t, io.BytesIO(b"a" * 1024)) 379 380 with open(tmpname, "r+b") as fobj: 381 fobj.truncate(size) 382 383 with tarfile.open(tmpname) as tar: 384 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 385 for t in tar: 386 pass 387 388 with tarfile.open(tmpname) as tar: 389 t = tar.next() 390 391 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 392 tar.extract(t, TEMPDIR) 393 394 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 395 tar.extractfile(t).read() 396 397 class MiscReadTestBase(CommonReadTest): 398 def requires_name_attribute(self): 399 pass 400 401 def test_no_name_argument(self): 402 self.requires_name_attribute() 403 with open(self.tarname, "rb") as fobj: 404 self.assertIsInstance(fobj.name, str) 405 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 406 self.assertIsInstance(tar.name, str) 407 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 408 409 def test_no_name_attribute(self): 410 with open(self.tarname, "rb") as fobj: 411 data = fobj.read() 412 fobj = io.BytesIO(data) 413 self.assertRaises(AttributeError, getattr, fobj, "name") 414 tar = tarfile.open(fileobj=fobj, mode=self.mode) 415 self.assertIsNone(tar.name) 416 417 def test_empty_name_attribute(self): 418 with open(self.tarname, "rb") as fobj: 419 data = fobj.read() 420 fobj = io.BytesIO(data) 421 fobj.name = "" 422 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 423 self.assertIsNone(tar.name) 424 425 def test_int_name_attribute(self): 426 # Issue 21044: tarfile.open() should handle fileobj with an integer 427 # 'name' attribute. 428 fd = os.open(self.tarname, os.O_RDONLY) 429 with open(fd, 'rb') as fobj: 430 self.assertIsInstance(fobj.name, int) 431 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 432 self.assertIsNone(tar.name) 433 434 def test_bytes_name_attribute(self): 435 self.requires_name_attribute() 436 tarname = os.fsencode(self.tarname) 437 with open(tarname, 'rb') as fobj: 438 self.assertIsInstance(fobj.name, bytes) 439 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 440 self.assertIsInstance(tar.name, bytes) 441 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 442 443 def test_illegal_mode_arg(self): 444 with open(tmpname, 'wb'): 445 pass 446 with self.assertRaisesRegex(ValueError, 'mode must be '): 447 tar = self.taropen(tmpname, 'q') 448 with self.assertRaisesRegex(ValueError, 'mode must be '): 449 tar = self.taropen(tmpname, 'rw') 450 with self.assertRaisesRegex(ValueError, 'mode must be '): 451 tar = self.taropen(tmpname, '') 452 453 def test_fileobj_with_offset(self): 454 # Skip the first member and store values from the second member 455 # of the testtar. 456 tar = tarfile.open(self.tarname, mode=self.mode) 457 try: 458 tar.next() 459 t = tar.next() 460 name = t.name 461 offset = t.offset 462 with tar.extractfile(t) as f: 463 data = f.read() 464 finally: 465 tar.close() 466 467 # Open the testtar and seek to the offset of the second member. 468 with self.open(self.tarname) as fobj: 469 fobj.seek(offset) 470 471 # Test if the tarfile starts with the second member. 472 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 473 t = tar.next() 474 self.assertEqual(t.name, name) 475 # Read to the end of fileobj and test if seeking back to the 476 # beginning works. 477 tar.getmembers() 478 self.assertEqual(tar.extractfile(t).read(), data, 479 "seek back did not work") 480 tar.close() 481 482 def test_fail_comp(self): 483 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 484 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 485 with open(tarname, "rb") as fobj: 486 self.assertRaises(tarfile.ReadError, tarfile.open, 487 fileobj=fobj, mode=self.mode) 488 489 def test_v7_dirtype(self): 490 # Test old style dirtype member (bug #1336623): 491 # Old V7 tars create directory members using an AREGTYPE 492 # header with a "/" appended to the filename field. 493 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 494 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 495 "v7 dirtype failed") 496 497 def test_xstar_type(self): 498 # The xstar format stores extra atime and ctime fields inside the 499 # space reserved for the prefix field. The prefix field must be 500 # ignored in this case, otherwise it will mess up the name. 501 try: 502 self.tar.getmember("misc/regtype-xstar") 503 except KeyError: 504 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 505 506 def test_check_members(self): 507 for tarinfo in self.tar: 508 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 509 "wrong mtime for %s" % tarinfo.name) 510 if not tarinfo.name.startswith("ustar/"): 511 continue 512 self.assertEqual(tarinfo.uname, "tarfile", 513 "wrong uname for %s" % tarinfo.name) 514 515 def test_find_members(self): 516 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 517 "could not find all members") 518 519 @unittest.skipUnless(hasattr(os, "link"), 520 "Missing hardlink implementation") 521 @support.skip_unless_symlink 522 def test_extract_hardlink(self): 523 # Test hardlink extraction (e.g. bug #857297). 524 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 525 tar.extract("ustar/regtype", TEMPDIR) 526 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 527 528 tar.extract("ustar/lnktype", TEMPDIR) 529 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 530 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 531 data = f.read() 532 self.assertEqual(md5sum(data), md5_regtype) 533 534 tar.extract("ustar/symtype", TEMPDIR) 535 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 536 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 537 data = f.read() 538 self.assertEqual(md5sum(data), md5_regtype) 539 540 def test_extractall(self): 541 # Test if extractall() correctly restores directory permissions 542 # and times (see issue1735). 543 tar = tarfile.open(tarname, encoding="iso8859-1") 544 DIR = os.path.join(TEMPDIR, "extractall") 545 os.mkdir(DIR) 546 try: 547 directories = [t for t in tar if t.isdir()] 548 tar.extractall(DIR, directories) 549 for tarinfo in directories: 550 path = os.path.join(DIR, tarinfo.name) 551 if sys.platform != "win32": 552 # Win32 has no support for fine grained permissions. 553 self.assertEqual(tarinfo.mode & 0o777, 554 os.stat(path).st_mode & 0o777) 555 def format_mtime(mtime): 556 if isinstance(mtime, float): 557 return "{} ({})".format(mtime, mtime.hex()) 558 else: 559 return "{!r} (int)".format(mtime) 560 file_mtime = os.path.getmtime(path) 561 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 562 format_mtime(tarinfo.mtime), 563 format_mtime(file_mtime), 564 path) 565 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 566 finally: 567 tar.close() 568 support.rmtree(DIR) 569 570 def test_extract_directory(self): 571 dirtype = "ustar/dirtype" 572 DIR = os.path.join(TEMPDIR, "extractdir") 573 os.mkdir(DIR) 574 try: 575 with tarfile.open(tarname, encoding="iso8859-1") as tar: 576 tarinfo = tar.getmember(dirtype) 577 tar.extract(tarinfo, path=DIR) 578 extracted = os.path.join(DIR, dirtype) 579 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 580 if sys.platform != "win32": 581 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 582 finally: 583 support.rmtree(DIR) 584 585 def test_init_close_fobj(self): 586 # Issue #7341: Close the internal file object in the TarFile 587 # constructor in case of an error. For the test we rely on 588 # the fact that opening an empty file raises a ReadError. 589 empty = os.path.join(TEMPDIR, "empty") 590 with open(empty, "wb") as fobj: 591 fobj.write(b"") 592 593 try: 594 tar = object.__new__(tarfile.TarFile) 595 try: 596 tar.__init__(empty) 597 except tarfile.ReadError: 598 self.assertTrue(tar.fileobj.closed) 599 else: 600 self.fail("ReadError not raised") 601 finally: 602 support.unlink(empty) 603 604 def test_parallel_iteration(self): 605 # Issue #16601: Restarting iteration over tarfile continued 606 # from where it left off. 607 with tarfile.open(self.tarname) as tar: 608 for m1, m2 in zip(tar, tar): 609 self.assertEqual(m1.offset, m2.offset) 610 self.assertEqual(m1.get_info(), m2.get_info()) 611 612 class MiscReadTest(MiscReadTestBase, unittest.TestCase): 613 test_fail_comp = None 614 615 class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 616 pass 617 618 class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 619 def requires_name_attribute(self): 620 self.skipTest("BZ2File have no name attribute") 621 622 class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 623 def requires_name_attribute(self): 624 self.skipTest("LZMAFile have no name attribute") 625 626 627 class StreamReadTest(CommonReadTest, unittest.TestCase): 628 629 prefix="r|" 630 631 def test_read_through(self): 632 # Issue #11224: A poorly designed _FileInFile.read() method 633 # caused seeking errors with stream tar files. 634 for tarinfo in self.tar: 635 if not tarinfo.isreg(): 636 continue 637 with self.tar.extractfile(tarinfo) as fobj: 638 while True: 639 try: 640 buf = fobj.read(512) 641 except tarfile.StreamError: 642 self.fail("simple read-through using " 643 "TarFile.extractfile() failed") 644 if not buf: 645 break 646 647 def test_fileobj_regular_file(self): 648 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 649 with self.tar.extractfile(tarinfo) as fobj: 650 data = fobj.read() 651 self.assertEqual(len(data), tarinfo.size, 652 "regular file extraction failed") 653 self.assertEqual(md5sum(data), md5_regtype, 654 "regular file extraction failed") 655 656 def test_provoke_stream_error(self): 657 tarinfos = self.tar.getmembers() 658 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 659 self.assertRaises(tarfile.StreamError, f.read) 660 661 def test_compare_members(self): 662 tar1 = tarfile.open(tarname, encoding="iso8859-1") 663 try: 664 tar2 = self.tar 665 666 while True: 667 t1 = tar1.next() 668 t2 = tar2.next() 669 if t1 is None: 670 break 671 self.assertIsNotNone(t2, "stream.next() failed.") 672 673 if t2.islnk() or t2.issym(): 674 with self.assertRaises(tarfile.StreamError): 675 tar2.extractfile(t2) 676 continue 677 678 v1 = tar1.extractfile(t1) 679 v2 = tar2.extractfile(t2) 680 if v1 is None: 681 continue 682 self.assertIsNotNone(v2, "stream.extractfile() failed") 683 self.assertEqual(v1.read(), v2.read(), 684 "stream extraction failed") 685 finally: 686 tar1.close() 687 688 class GzipStreamReadTest(GzipTest, StreamReadTest): 689 pass 690 691 class Bz2StreamReadTest(Bz2Test, StreamReadTest): 692 pass 693 694 class LzmaStreamReadTest(LzmaTest, StreamReadTest): 695 pass 696 697 698 class DetectReadTest(TarTest, unittest.TestCase): 699 def _testfunc_file(self, name, mode): 700 try: 701 tar = tarfile.open(name, mode) 702 except tarfile.ReadError as e: 703 self.fail() 704 else: 705 tar.close() 706 707 def _testfunc_fileobj(self, name, mode): 708 try: 709 with open(name, "rb") as f: 710 tar = tarfile.open(name, mode, fileobj=f) 711 except tarfile.ReadError as e: 712 self.fail() 713 else: 714 tar.close() 715 716 def _test_modes(self, testfunc): 717 if self.suffix: 718 with self.assertRaises(tarfile.ReadError): 719 tarfile.open(tarname, mode="r:" + self.suffix) 720 with self.assertRaises(tarfile.ReadError): 721 tarfile.open(tarname, mode="r|" + self.suffix) 722 with self.assertRaises(tarfile.ReadError): 723 tarfile.open(self.tarname, mode="r:") 724 with self.assertRaises(tarfile.ReadError): 725 tarfile.open(self.tarname, mode="r|") 726 testfunc(self.tarname, "r") 727 testfunc(self.tarname, "r:" + self.suffix) 728 testfunc(self.tarname, "r:*") 729 testfunc(self.tarname, "r|" + self.suffix) 730 testfunc(self.tarname, "r|*") 731 732 def test_detect_file(self): 733 self._test_modes(self._testfunc_file) 734 735 def test_detect_fileobj(self): 736 self._test_modes(self._testfunc_fileobj) 737 738 class GzipDetectReadTest(GzipTest, DetectReadTest): 739 pass 740 741 class Bz2DetectReadTest(Bz2Test, DetectReadTest): 742 def test_detect_stream_bz2(self): 743 # Originally, tarfile's stream detection looked for the string 744 # "BZh91" at the start of the file. This is incorrect because 745 # the '9' represents the blocksize (900kB). If the file was 746 # compressed using another blocksize autodetection fails. 747 with open(tarname, "rb") as fobj: 748 data = fobj.read() 749 750 # Compress with blocksize 100kB, the file starts with "BZh11". 751 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 752 fobj.write(data) 753 754 self._testfunc_file(tmpname, "r|*") 755 756 class LzmaDetectReadTest(LzmaTest, DetectReadTest): 757 pass 758 759 760 class MemberReadTest(ReadTest, unittest.TestCase): 761 762 def _test_member(self, tarinfo, chksum=None, **kwargs): 763 if chksum is not None: 764 with self.tar.extractfile(tarinfo) as f: 765 self.assertEqual(md5sum(f.read()), chksum, 766 "wrong md5sum for %s" % tarinfo.name) 767 768 kwargs["mtime"] = 0o7606136617 769 kwargs["uid"] = 1000 770 kwargs["gid"] = 100 771 if "old-v7" not in tarinfo.name: 772 # V7 tar can't handle alphabetic owners. 773 kwargs["uname"] = "tarfile" 774 kwargs["gname"] = "tarfile" 775 for k, v in kwargs.items(): 776 self.assertEqual(getattr(tarinfo, k), v, 777 "wrong value in %s field of %s" % (k, tarinfo.name)) 778 779 def test_find_regtype(self): 780 tarinfo = self.tar.getmember("ustar/regtype") 781 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 782 783 def test_find_conttype(self): 784 tarinfo = self.tar.getmember("ustar/conttype") 785 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 786 787 def test_find_dirtype(self): 788 tarinfo = self.tar.getmember("ustar/dirtype") 789 self._test_member(tarinfo, size=0) 790 791 def test_find_dirtype_with_size(self): 792 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 793 self._test_member(tarinfo, size=255) 794 795 def test_find_lnktype(self): 796 tarinfo = self.tar.getmember("ustar/lnktype") 797 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 798 799 def test_find_symtype(self): 800 tarinfo = self.tar.getmember("ustar/symtype") 801 self._test_member(tarinfo, size=0, linkname="regtype") 802 803 def test_find_blktype(self): 804 tarinfo = self.tar.getmember("ustar/blktype") 805 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 806 807 def test_find_chrtype(self): 808 tarinfo = self.tar.getmember("ustar/chrtype") 809 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 810 811 def test_find_fifotype(self): 812 tarinfo = self.tar.getmember("ustar/fifotype") 813 self._test_member(tarinfo, size=0) 814 815 def test_find_sparse(self): 816 tarinfo = self.tar.getmember("ustar/sparse") 817 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 818 819 def test_find_gnusparse(self): 820 tarinfo = self.tar.getmember("gnu/sparse") 821 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 822 823 def test_find_gnusparse_00(self): 824 tarinfo = self.tar.getmember("gnu/sparse-0.0") 825 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 826 827 def test_find_gnusparse_01(self): 828 tarinfo = self.tar.getmember("gnu/sparse-0.1") 829 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 830 831 def test_find_gnusparse_10(self): 832 tarinfo = self.tar.getmember("gnu/sparse-1.0") 833 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 834 835 def test_find_umlauts(self): 836 tarinfo = self.tar.getmember("ustar/umlauts-" 837 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 838 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 839 840 def test_find_ustar_longname(self): 841 name = "ustar/" + "12345/" * 39 + "1234567/longname" 842 self.assertIn(name, self.tar.getnames()) 843 844 def test_find_regtype_oldv7(self): 845 tarinfo = self.tar.getmember("misc/regtype-old-v7") 846 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 847 848 def test_find_pax_umlauts(self): 849 self.tar.close() 850 self.tar = tarfile.open(self.tarname, mode=self.mode, 851 encoding="iso8859-1") 852 tarinfo = self.tar.getmember("pax/umlauts-" 853 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 854 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 855 856 857 class LongnameTest: 858 859 def test_read_longname(self): 860 # Test reading of longname (bug #1471427). 861 longname = self.subdir + "/" + "123/" * 125 + "longname" 862 try: 863 tarinfo = self.tar.getmember(longname) 864 except KeyError: 865 self.fail("longname not found") 866 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 867 "read longname as dirtype") 868 869 def test_read_longlink(self): 870 longname = self.subdir + "/" + "123/" * 125 + "longname" 871 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 872 try: 873 tarinfo = self.tar.getmember(longlink) 874 except KeyError: 875 self.fail("longlink not found") 876 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 877 878 def test_truncated_longname(self): 879 longname = self.subdir + "/" + "123/" * 125 + "longname" 880 tarinfo = self.tar.getmember(longname) 881 offset = tarinfo.offset 882 self.tar.fileobj.seek(offset) 883 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 884 with self.assertRaises(tarfile.ReadError): 885 tarfile.open(name="foo.tar", fileobj=fobj) 886 887 def test_header_offset(self): 888 # Test if the start offset of the TarInfo object includes 889 # the preceding extended header. 890 longname = self.subdir + "/" + "123/" * 125 + "longname" 891 offset = self.tar.getmember(longname).offset 892 with open(tarname, "rb") as fobj: 893 fobj.seek(offset) 894 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 895 "iso8859-1", "strict") 896 self.assertEqual(tarinfo.type, self.longnametype) 897 898 899 class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 900 901 subdir = "gnu" 902 longnametype = tarfile.GNUTYPE_LONGNAME 903 904 # Since 3.2 tarfile is supposed to accurately restore sparse members and 905 # produce files with holes. This is what we actually want to test here. 906 # Unfortunately, not all platforms/filesystems support sparse files, and 907 # even on platforms that do it is non-trivial to make reliable assertions 908 # about holes in files. Therefore, we first do one basic test which works 909 # an all platforms, and after that a test that will work only on 910 # platforms/filesystems that prove to support sparse files. 911 def _test_sparse_file(self, name): 912 self.tar.extract(name, TEMPDIR) 913 filename = os.path.join(TEMPDIR, name) 914 with open(filename, "rb") as fobj: 915 data = fobj.read() 916 self.assertEqual(md5sum(data), md5_sparse, 917 "wrong md5sum for %s" % name) 918 919 if self._fs_supports_holes(): 920 s = os.stat(filename) 921 self.assertLess(s.st_blocks * 512, s.st_size) 922 923 def test_sparse_file_old(self): 924 self._test_sparse_file("gnu/sparse") 925 926 def test_sparse_file_00(self): 927 self._test_sparse_file("gnu/sparse-0.0") 928 929 def test_sparse_file_01(self): 930 self._test_sparse_file("gnu/sparse-0.1") 931 932 def test_sparse_file_10(self): 933 self._test_sparse_file("gnu/sparse-1.0") 934 935 @staticmethod 936 def _fs_supports_holes(): 937 # Return True if the platform knows the st_blocks stat attribute and 938 # uses st_blocks units of 512 bytes, and if the filesystem is able to 939 # store holes in files. 940 if sys.platform.startswith("linux"): 941 # Linux evidentially has 512 byte st_blocks units. 942 name = os.path.join(TEMPDIR, "sparse-test") 943 with open(name, "wb") as fobj: 944 fobj.seek(4096) 945 fobj.truncate() 946 s = os.stat(name) 947 support.unlink(name) 948 return s.st_blocks == 0 949 else: 950 return False 951 952 953 class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 954 955 subdir = "pax" 956 longnametype = tarfile.XHDTYPE 957 958 def test_pax_global_headers(self): 959 tar = tarfile.open(tarname, encoding="iso8859-1") 960 try: 961 tarinfo = tar.getmember("pax/regtype1") 962 self.assertEqual(tarinfo.uname, "foo") 963 self.assertEqual(tarinfo.gname, "bar") 964 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 965 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 966 967 tarinfo = tar.getmember("pax/regtype2") 968 self.assertEqual(tarinfo.uname, "") 969 self.assertEqual(tarinfo.gname, "bar") 970 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 971 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 972 973 tarinfo = tar.getmember("pax/regtype3") 974 self.assertEqual(tarinfo.uname, "tarfile") 975 self.assertEqual(tarinfo.gname, "tarfile") 976 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 977 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 978 finally: 979 tar.close() 980 981 def test_pax_number_fields(self): 982 # All following number fields are read from the pax header. 983 tar = tarfile.open(tarname, encoding="iso8859-1") 984 try: 985 tarinfo = tar.getmember("pax/regtype4") 986 self.assertEqual(tarinfo.size, 7011) 987 self.assertEqual(tarinfo.uid, 123) 988 self.assertEqual(tarinfo.gid, 123) 989 self.assertEqual(tarinfo.mtime, 1041808783.0) 990 self.assertEqual(type(tarinfo.mtime), float) 991 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 992 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 993 finally: 994 tar.close() 995 996 997 class WriteTestBase(TarTest): 998 # Put all write tests in here that are supposed to be tested 999 # in all possible mode combinations. 1000 1001 def test_fileobj_no_close(self): 1002 fobj = io.BytesIO() 1003 tar = tarfile.open(fileobj=fobj, mode=self.mode) 1004 tar.addfile(tarfile.TarInfo("foo")) 1005 tar.close() 1006 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1007 # Issue #20238: Incomplete gzip output with mode="w:gz" 1008 data = fobj.getvalue() 1009 del tar 1010 support.gc_collect() 1011 self.assertFalse(fobj.closed) 1012 self.assertEqual(data, fobj.getvalue()) 1013 1014 def test_eof_marker(self): 1015 # Make sure an end of archive marker is written (two zero blocks). 1016 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1017 # So, we create an archive that has exactly 10240 bytes without the 1018 # marker, and has 20480 bytes once the marker is written. 1019 with tarfile.open(tmpname, self.mode) as tar: 1020 t = tarfile.TarInfo("foo") 1021 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1022 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1023 1024 with self.open(tmpname, "rb") as fobj: 1025 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1026 1027 1028 class WriteTest(WriteTestBase, unittest.TestCase): 1029 1030 prefix = "w:" 1031 1032 def test_100_char_name(self): 1033 # The name field in a tar header stores strings of at most 100 chars. 1034 # If a string is shorter than 100 chars it has to be padded with '\0', 1035 # which implies that a string of exactly 100 chars is stored without 1036 # a trailing '\0'. 1037 name = "0123456789" * 10 1038 tar = tarfile.open(tmpname, self.mode) 1039 try: 1040 t = tarfile.TarInfo(name) 1041 tar.addfile(t) 1042 finally: 1043 tar.close() 1044 1045 tar = tarfile.open(tmpname) 1046 try: 1047 self.assertEqual(tar.getnames()[0], name, 1048 "failed to store 100 char filename") 1049 finally: 1050 tar.close() 1051 1052 def test_tar_size(self): 1053 # Test for bug #1013882. 1054 tar = tarfile.open(tmpname, self.mode) 1055 try: 1056 path = os.path.join(TEMPDIR, "file") 1057 with open(path, "wb") as fobj: 1058 fobj.write(b"aaa") 1059 tar.add(path) 1060 finally: 1061 tar.close() 1062 self.assertGreater(os.path.getsize(tmpname), 0, 1063 "tarfile is empty") 1064 1065 # The test_*_size tests test for bug #1167128. 1066 def test_file_size(self): 1067 tar = tarfile.open(tmpname, self.mode) 1068 try: 1069 path = os.path.join(TEMPDIR, "file") 1070 with open(path, "wb"): 1071 pass 1072 tarinfo = tar.gettarinfo(path) 1073 self.assertEqual(tarinfo.size, 0) 1074 1075 with open(path, "wb") as fobj: 1076 fobj.write(b"aaa") 1077 tarinfo = tar.gettarinfo(path) 1078 self.assertEqual(tarinfo.size, 3) 1079 finally: 1080 tar.close() 1081 1082 def test_directory_size(self): 1083 path = os.path.join(TEMPDIR, "directory") 1084 os.mkdir(path) 1085 try: 1086 tar = tarfile.open(tmpname, self.mode) 1087 try: 1088 tarinfo = tar.gettarinfo(path) 1089 self.assertEqual(tarinfo.size, 0) 1090 finally: 1091 tar.close() 1092 finally: 1093 support.rmdir(path) 1094 1095 @unittest.skipUnless(hasattr(os, "link"), 1096 "Missing hardlink implementation") 1097 def test_link_size(self): 1098 link = os.path.join(TEMPDIR, "link") 1099 target = os.path.join(TEMPDIR, "link_target") 1100 with open(target, "wb") as fobj: 1101 fobj.write(b"aaa") 1102 os.link(target, link) 1103 try: 1104 tar = tarfile.open(tmpname, self.mode) 1105 try: 1106 # Record the link target in the inodes list. 1107 tar.gettarinfo(target) 1108 tarinfo = tar.gettarinfo(link) 1109 self.assertEqual(tarinfo.size, 0) 1110 finally: 1111 tar.close() 1112 finally: 1113 support.unlink(target) 1114 support.unlink(link) 1115 1116 @support.skip_unless_symlink 1117 def test_symlink_size(self): 1118 path = os.path.join(TEMPDIR, "symlink") 1119 os.symlink("link_target", path) 1120 try: 1121 tar = tarfile.open(tmpname, self.mode) 1122 try: 1123 tarinfo = tar.gettarinfo(path) 1124 self.assertEqual(tarinfo.size, 0) 1125 finally: 1126 tar.close() 1127 finally: 1128 support.unlink(path) 1129 1130 def test_add_self(self): 1131 # Test for #1257255. 1132 dstname = os.path.abspath(tmpname) 1133 tar = tarfile.open(tmpname, self.mode) 1134 try: 1135 self.assertEqual(tar.name, dstname, 1136 "archive name must be absolute") 1137 tar.add(dstname) 1138 self.assertEqual(tar.getnames(), [], 1139 "added the archive to itself") 1140 1141 with support.change_cwd(TEMPDIR): 1142 tar.add(dstname) 1143 self.assertEqual(tar.getnames(), [], 1144 "added the archive to itself") 1145 finally: 1146 tar.close() 1147 1148 def test_exclude(self): 1149 tempdir = os.path.join(TEMPDIR, "exclude") 1150 os.mkdir(tempdir) 1151 try: 1152 for name in ("foo", "bar", "baz"): 1153 name = os.path.join(tempdir, name) 1154 support.create_empty_file(name) 1155 1156 exclude = os.path.isfile 1157 1158 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1159 try: 1160 with support.check_warnings(("use the filter argument", 1161 DeprecationWarning)): 1162 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 1163 finally: 1164 tar.close() 1165 1166 tar = tarfile.open(tmpname, "r") 1167 try: 1168 self.assertEqual(len(tar.getmembers()), 1) 1169 self.assertEqual(tar.getnames()[0], "empty_dir") 1170 finally: 1171 tar.close() 1172 finally: 1173 support.rmtree(tempdir) 1174 1175 def test_filter(self): 1176 tempdir = os.path.join(TEMPDIR, "filter") 1177 os.mkdir(tempdir) 1178 try: 1179 for name in ("foo", "bar", "baz"): 1180 name = os.path.join(tempdir, name) 1181 support.create_empty_file(name) 1182 1183 def filter(tarinfo): 1184 if os.path.basename(tarinfo.name) == "bar": 1185 return 1186 tarinfo.uid = 123 1187 tarinfo.uname = "foo" 1188 return tarinfo 1189 1190 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1191 try: 1192 tar.add(tempdir, arcname="empty_dir", filter=filter) 1193 finally: 1194 tar.close() 1195 1196 # Verify that filter is a keyword-only argument 1197 with self.assertRaises(TypeError): 1198 tar.add(tempdir, "empty_dir", True, None, filter) 1199 1200 tar = tarfile.open(tmpname, "r") 1201 try: 1202 for tarinfo in tar: 1203 self.assertEqual(tarinfo.uid, 123) 1204 self.assertEqual(tarinfo.uname, "foo") 1205 self.assertEqual(len(tar.getmembers()), 3) 1206 finally: 1207 tar.close() 1208 finally: 1209 support.rmtree(tempdir) 1210 1211 # Guarantee that stored pathnames are not modified. Don't 1212 # remove ./ or ../ or double slashes. Still make absolute 1213 # pathnames relative. 1214 # For details see bug #6054. 1215 def _test_pathname(self, path, cmp_path=None, dir=False): 1216 # Create a tarfile with an empty member named path 1217 # and compare the stored name with the original. 1218 foo = os.path.join(TEMPDIR, "foo") 1219 if not dir: 1220 support.create_empty_file(foo) 1221 else: 1222 os.mkdir(foo) 1223 1224 tar = tarfile.open(tmpname, self.mode) 1225 try: 1226 tar.add(foo, arcname=path) 1227 finally: 1228 tar.close() 1229 1230 tar = tarfile.open(tmpname, "r") 1231 try: 1232 t = tar.next() 1233 finally: 1234 tar.close() 1235 1236 if not dir: 1237 support.unlink(foo) 1238 else: 1239 support.rmdir(foo) 1240 1241 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1242 1243 1244 @support.skip_unless_symlink 1245 def test_extractall_symlinks(self): 1246 # Test if extractall works properly when tarfile contains symlinks 1247 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1248 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1249 os.mkdir(tempdir) 1250 try: 1251 source_file = os.path.join(tempdir,'source') 1252 target_file = os.path.join(tempdir,'symlink') 1253 with open(source_file,'w') as f: 1254 f.write('something\n') 1255 os.symlink(source_file, target_file) 1256 tar = tarfile.open(temparchive,'w') 1257 tar.add(source_file) 1258 tar.add(target_file) 1259 tar.close() 1260 # Let's extract it to the location which contains the symlink 1261 tar = tarfile.open(temparchive,'r') 1262 # this should not raise OSError: [Errno 17] File exists 1263 try: 1264 tar.extractall(path=tempdir) 1265 except OSError: 1266 self.fail("extractall failed with symlinked files") 1267 finally: 1268 tar.close() 1269 finally: 1270 support.unlink(temparchive) 1271 support.rmtree(tempdir) 1272 1273 def test_pathnames(self): 1274 self._test_pathname("foo") 1275 self._test_pathname(os.path.join("foo", ".", "bar")) 1276 self._test_pathname(os.path.join("foo", "..", "bar")) 1277 self._test_pathname(os.path.join(".", "foo")) 1278 self._test_pathname(os.path.join(".", "foo", ".")) 1279 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1280 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1281 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1282 self._test_pathname(os.path.join("..", "foo")) 1283 self._test_pathname(os.path.join("..", "foo", "..")) 1284 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1285 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1286 1287 self._test_pathname("foo" + os.sep + os.sep + "bar") 1288 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1289 1290 def test_abs_pathnames(self): 1291 if sys.platform == "win32": 1292 self._test_pathname("C:\\foo", "foo") 1293 else: 1294 self._test_pathname("/foo", "foo") 1295 self._test_pathname("///foo", "foo") 1296 1297 def test_cwd(self): 1298 # Test adding the current working directory. 1299 with support.change_cwd(TEMPDIR): 1300 tar = tarfile.open(tmpname, self.mode) 1301 try: 1302 tar.add(".") 1303 finally: 1304 tar.close() 1305 1306 tar = tarfile.open(tmpname, "r") 1307 try: 1308 for t in tar: 1309 if t.name != ".": 1310 self.assertTrue(t.name.startswith("./"), t.name) 1311 finally: 1312 tar.close() 1313 1314 def test_open_nonwritable_fileobj(self): 1315 for exctype in OSError, EOFError, RuntimeError: 1316 class BadFile(io.BytesIO): 1317 first = True 1318 def write(self, data): 1319 if self.first: 1320 self.first = False 1321 raise exctype 1322 1323 f = BadFile() 1324 with self.assertRaises(exctype): 1325 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1326 format=tarfile.PAX_FORMAT, 1327 pax_headers={'non': 'empty'}) 1328 self.assertFalse(f.closed) 1329 1330 class GzipWriteTest(GzipTest, WriteTest): 1331 pass 1332 1333 class Bz2WriteTest(Bz2Test, WriteTest): 1334 pass 1335 1336 class LzmaWriteTest(LzmaTest, WriteTest): 1337 pass 1338 1339 1340 class StreamWriteTest(WriteTestBase, unittest.TestCase): 1341 1342 prefix = "w|" 1343 decompressor = None 1344 1345 def test_stream_padding(self): 1346 # Test for bug #1543303. 1347 tar = tarfile.open(tmpname, self.mode) 1348 tar.close() 1349 if self.decompressor: 1350 dec = self.decompressor() 1351 with open(tmpname, "rb") as fobj: 1352 data = fobj.read() 1353 data = dec.decompress(data) 1354 self.assertFalse(dec.unused_data, "found trailing data") 1355 else: 1356 with self.open(tmpname) as fobj: 1357 data = fobj.read() 1358 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1359 "incorrect zero padding") 1360 1361 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1362 "Missing umask implementation") 1363 def test_file_mode(self): 1364 # Test for issue #8464: Create files with correct 1365 # permissions. 1366 if os.path.exists(tmpname): 1367 support.unlink(tmpname) 1368 1369 original_umask = os.umask(0o022) 1370 try: 1371 tar = tarfile.open(tmpname, self.mode) 1372 tar.close() 1373 mode = os.stat(tmpname).st_mode & 0o777 1374 self.assertEqual(mode, 0o644, "wrong file permissions") 1375 finally: 1376 os.umask(original_umask) 1377 1378 class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1379 pass 1380 1381 class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1382 decompressor = bz2.BZ2Decompressor if bz2 else None 1383 1384 class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1385 decompressor = lzma.LZMADecompressor if lzma else None 1386 1387 1388 class GNUWriteTest(unittest.TestCase): 1389 # This testcase checks for correct creation of GNU Longname 1390 # and Longlink extended headers (cp. bug #812325). 1391 1392 def _length(self, s): 1393 blocks = len(s) // 512 + 1 1394 return blocks * 512 1395 1396 def _calc_size(self, name, link=None): 1397 # Initial tar header 1398 count = 512 1399 1400 if len(name) > tarfile.LENGTH_NAME: 1401 # GNU longname extended header + longname 1402 count += 512 1403 count += self._length(name) 1404 if link is not None and len(link) > tarfile.LENGTH_LINK: 1405 # GNU longlink extended header + longlink 1406 count += 512 1407 count += self._length(link) 1408 return count 1409 1410 def _test(self, name, link=None): 1411 tarinfo = tarfile.TarInfo(name) 1412 if link: 1413 tarinfo.linkname = link 1414 tarinfo.type = tarfile.LNKTYPE 1415 1416 tar = tarfile.open(tmpname, "w") 1417 try: 1418 tar.format = tarfile.GNU_FORMAT 1419 tar.addfile(tarinfo) 1420 1421 v1 = self._calc_size(name, link) 1422 v2 = tar.offset 1423 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1424 finally: 1425 tar.close() 1426 1427 tar = tarfile.open(tmpname) 1428 try: 1429 member = tar.next() 1430 self.assertIsNotNone(member, 1431 "unable to read longname member") 1432 self.assertEqual(tarinfo.name, member.name, 1433 "unable to read longname member") 1434 self.assertEqual(tarinfo.linkname, member.linkname, 1435 "unable to read longname member") 1436 finally: 1437 tar.close() 1438 1439 def test_longname_1023(self): 1440 self._test(("longnam/" * 127) + "longnam") 1441 1442 def test_longname_1024(self): 1443 self._test(("longnam/" * 127) + "longname") 1444 1445 def test_longname_1025(self): 1446 self._test(("longnam/" * 127) + "longname_") 1447 1448 def test_longlink_1023(self): 1449 self._test("name", ("longlnk/" * 127) + "longlnk") 1450 1451 def test_longlink_1024(self): 1452 self._test("name", ("longlnk/" * 127) + "longlink") 1453 1454 def test_longlink_1025(self): 1455 self._test("name", ("longlnk/" * 127) + "longlink_") 1456 1457 def test_longnamelink_1023(self): 1458 self._test(("longnam/" * 127) + "longnam", 1459 ("longlnk/" * 127) + "longlnk") 1460 1461 def test_longnamelink_1024(self): 1462 self._test(("longnam/" * 127) + "longname", 1463 ("longlnk/" * 127) + "longlink") 1464 1465 def test_longnamelink_1025(self): 1466 self._test(("longnam/" * 127) + "longname_", 1467 ("longlnk/" * 127) + "longlink_") 1468 1469 1470 class CreateTest(WriteTestBase, unittest.TestCase): 1471 1472 prefix = "x:" 1473 1474 file_path = os.path.join(TEMPDIR, "spameggs42") 1475 1476 def setUp(self): 1477 support.unlink(tmpname) 1478 1479 @classmethod 1480 def setUpClass(cls): 1481 with open(cls.file_path, "wb") as fobj: 1482 fobj.write(b"aaa") 1483 1484 @classmethod 1485 def tearDownClass(cls): 1486 support.unlink(cls.file_path) 1487 1488 def test_create(self): 1489 with tarfile.open(tmpname, self.mode) as tobj: 1490 tobj.add(self.file_path) 1491 1492 with self.taropen(tmpname) as tobj: 1493 names = tobj.getnames() 1494 self.assertEqual(len(names), 1) 1495 self.assertIn('spameggs42', names[0]) 1496 1497 def test_create_existing(self): 1498 with tarfile.open(tmpname, self.mode) as tobj: 1499 tobj.add(self.file_path) 1500 1501 with self.assertRaises(FileExistsError): 1502 tobj = tarfile.open(tmpname, self.mode) 1503 1504 with self.taropen(tmpname) as tobj: 1505 names = tobj.getnames() 1506 self.assertEqual(len(names), 1) 1507 self.assertIn('spameggs42', names[0]) 1508 1509 def test_create_taropen(self): 1510 with self.taropen(tmpname, "x") as tobj: 1511 tobj.add(self.file_path) 1512 1513 with self.taropen(tmpname) as tobj: 1514 names = tobj.getnames() 1515 self.assertEqual(len(names), 1) 1516 self.assertIn('spameggs42', names[0]) 1517 1518 def test_create_existing_taropen(self): 1519 with self.taropen(tmpname, "x") as tobj: 1520 tobj.add(self.file_path) 1521 1522 with self.assertRaises(FileExistsError): 1523 with self.taropen(tmpname, "x"): 1524 pass 1525 1526 with self.taropen(tmpname) as tobj: 1527 names = tobj.getnames() 1528 self.assertEqual(len(names), 1) 1529 self.assertIn("spameggs42", names[0]) 1530 1531 1532 class GzipCreateTest(GzipTest, CreateTest): 1533 pass 1534 1535 1536 class Bz2CreateTest(Bz2Test, CreateTest): 1537 pass 1538 1539 1540 class LzmaCreateTest(LzmaTest, CreateTest): 1541 pass 1542 1543 1544 class CreateWithXModeTest(CreateTest): 1545 1546 prefix = "x" 1547 1548 test_create_taropen = None 1549 test_create_existing_taropen = None 1550 1551 1552 @unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1553 class HardlinkTest(unittest.TestCase): 1554 # Test the creation of LNKTYPE (hardlink) members in an archive. 1555 1556 def setUp(self): 1557 self.foo = os.path.join(TEMPDIR, "foo") 1558 self.bar = os.path.join(TEMPDIR, "bar") 1559 1560 with open(self.foo, "wb") as fobj: 1561 fobj.write(b"foo") 1562 1563 os.link(self.foo, self.bar) 1564 1565 self.tar = tarfile.open(tmpname, "w") 1566 self.tar.add(self.foo) 1567 1568 def tearDown(self): 1569 self.tar.close() 1570 support.unlink(self.foo) 1571 support.unlink(self.bar) 1572 1573 def test_add_twice(self): 1574 # The same name will be added as a REGTYPE every 1575 # time regardless of st_nlink. 1576 tarinfo = self.tar.gettarinfo(self.foo) 1577 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1578 "add file as regular failed") 1579 1580 def test_add_hardlink(self): 1581 tarinfo = self.tar.gettarinfo(self.bar) 1582 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1583 "add file as hardlink failed") 1584 1585 def test_dereference_hardlink(self): 1586 self.tar.dereference = True 1587 tarinfo = self.tar.gettarinfo(self.bar) 1588 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1589 "dereferencing hardlink failed") 1590 1591 1592 class PaxWriteTest(GNUWriteTest): 1593 1594 def _test(self, name, link=None): 1595 # See GNUWriteTest. 1596 tarinfo = tarfile.TarInfo(name) 1597 if link: 1598 tarinfo.linkname = link 1599 tarinfo.type = tarfile.LNKTYPE 1600 1601 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1602 try: 1603 tar.addfile(tarinfo) 1604 finally: 1605 tar.close() 1606 1607 tar = tarfile.open(tmpname) 1608 try: 1609 if link: 1610 l = tar.getmembers()[0].linkname 1611 self.assertEqual(link, l, "PAX longlink creation failed") 1612 else: 1613 n = tar.getmembers()[0].name 1614 self.assertEqual(name, n, "PAX longname creation failed") 1615 finally: 1616 tar.close() 1617 1618 def test_pax_global_header(self): 1619 pax_headers = { 1620 "foo": "bar", 1621 "uid": "0", 1622 "mtime": "1.23", 1623 "test": "\xe4\xf6\xfc", 1624 "\xe4\xf6\xfc": "test"} 1625 1626 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1627 pax_headers=pax_headers) 1628 try: 1629 tar.addfile(tarfile.TarInfo("test")) 1630 finally: 1631 tar.close() 1632 1633 # Test if the global header was written correctly. 1634 tar = tarfile.open(tmpname, encoding="iso8859-1") 1635 try: 1636 self.assertEqual(tar.pax_headers, pax_headers) 1637 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1638 # Test if all the fields are strings. 1639 for key, val in tar.pax_headers.items(): 1640 self.assertIsNot(type(key), bytes) 1641 self.assertIsNot(type(val), bytes) 1642 if key in tarfile.PAX_NUMBER_FIELDS: 1643 try: 1644 tarfile.PAX_NUMBER_FIELDS[key](val) 1645 except (TypeError, ValueError): 1646 self.fail("unable to convert pax header field") 1647 finally: 1648 tar.close() 1649 1650 def test_pax_extended_header(self): 1651 # The fields from the pax header have priority over the 1652 # TarInfo. 1653 pax_headers = {"path": "foo", "uid": "123"} 1654 1655 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1656 encoding="iso8859-1") 1657 try: 1658 t = tarfile.TarInfo() 1659 t.name = "\xe4\xf6\xfc" # non-ASCII 1660 t.uid = 8**8 # too large 1661 t.pax_headers = pax_headers 1662 tar.addfile(t) 1663 finally: 1664 tar.close() 1665 1666 tar = tarfile.open(tmpname, encoding="iso8859-1") 1667 try: 1668 t = tar.getmembers()[0] 1669 self.assertEqual(t.pax_headers, pax_headers) 1670 self.assertEqual(t.name, "foo") 1671 self.assertEqual(t.uid, 123) 1672 finally: 1673 tar.close() 1674 1675 1676 class UnicodeTest: 1677 1678 def test_iso8859_1_filename(self): 1679 self._test_unicode_filename("iso8859-1") 1680 1681 def test_utf7_filename(self): 1682 self._test_unicode_filename("utf7") 1683 1684 def test_utf8_filename(self): 1685 self._test_unicode_filename("utf-8") 1686 1687 def _test_unicode_filename(self, encoding): 1688 tar = tarfile.open(tmpname, "w", format=self.format, 1689 encoding=encoding, errors="strict") 1690 try: 1691 name = "\xe4\xf6\xfc" 1692 tar.addfile(tarfile.TarInfo(name)) 1693 finally: 1694 tar.close() 1695 1696 tar = tarfile.open(tmpname, encoding=encoding) 1697 try: 1698 self.assertEqual(tar.getmembers()[0].name, name) 1699 finally: 1700 tar.close() 1701 1702 def test_unicode_filename_error(self): 1703 tar = tarfile.open(tmpname, "w", format=self.format, 1704 encoding="ascii", errors="strict") 1705 try: 1706 tarinfo = tarfile.TarInfo() 1707 1708 tarinfo.name = "\xe4\xf6\xfc" 1709 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1710 1711 tarinfo.name = "foo" 1712 tarinfo.uname = "\xe4\xf6\xfc" 1713 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1714 finally: 1715 tar.close() 1716 1717 def test_unicode_argument(self): 1718 tar = tarfile.open(tarname, "r", 1719 encoding="iso8859-1", errors="strict") 1720 try: 1721 for t in tar: 1722 self.assertIs(type(t.name), str) 1723 self.assertIs(type(t.linkname), str) 1724 self.assertIs(type(t.uname), str) 1725 self.assertIs(type(t.gname), str) 1726 finally: 1727 tar.close() 1728 1729 def test_uname_unicode(self): 1730 t = tarfile.TarInfo("foo") 1731 t.uname = "\xe4\xf6\xfc" 1732 t.gname = "\xe4\xf6\xfc" 1733 1734 tar = tarfile.open(tmpname, mode="w", format=self.format, 1735 encoding="iso8859-1") 1736 try: 1737 tar.addfile(t) 1738 finally: 1739 tar.close() 1740 1741 tar = tarfile.open(tmpname, encoding="iso8859-1") 1742 try: 1743 t = tar.getmember("foo") 1744 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1745 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1746 1747 if self.format != tarfile.PAX_FORMAT: 1748 tar.close() 1749 tar = tarfile.open(tmpname, encoding="ascii") 1750 t = tar.getmember("foo") 1751 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1752 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1753 finally: 1754 tar.close() 1755 1756 1757 class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1758 1759 format = tarfile.USTAR_FORMAT 1760 1761 # Test whether the utf-8 encoded version of a filename exceeds the 100 1762 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1763 # bytes). 1764 def test_unicode_name1(self): 1765 self._test_ustar_name("0123456789" * 10) 1766 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1767 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1768 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1769 1770 def test_unicode_name2(self): 1771 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1772 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1773 1774 # Test whether the utf-8 encoded version of a filename exceeds the 155 1775 # bytes prefix + '/' + 100 bytes name limit. 1776 def test_unicode_longname1(self): 1777 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1778 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1779 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1780 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1781 1782 def test_unicode_longname2(self): 1783 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1784 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1785 1786 def test_unicode_longname3(self): 1787 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1788 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1789 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1790 1791 def test_unicode_longname4(self): 1792 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 1793 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 1794 1795 def _test_ustar_name(self, name, exc=None): 1796 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1797 t = tarfile.TarInfo(name) 1798 if exc is None: 1799 tar.addfile(t) 1800 else: 1801 self.assertRaises(exc, tar.addfile, t) 1802 1803 if exc is None: 1804 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1805 for t in tar: 1806 self.assertEqual(name, t.name) 1807 break 1808 1809 # Test the same as above for the 100 bytes link field. 1810 def test_unicode_link1(self): 1811 self._test_ustar_link("0123456789" * 10) 1812 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 1813 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 1814 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 1815 1816 def test_unicode_link2(self): 1817 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 1818 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 1819 1820 def _test_ustar_link(self, name, exc=None): 1821 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1822 t = tarfile.TarInfo("foo") 1823 t.linkname = name 1824 if exc is None: 1825 tar.addfile(t) 1826 else: 1827 self.assertRaises(exc, tar.addfile, t) 1828 1829 if exc is None: 1830 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1831 for t in tar: 1832 self.assertEqual(name, t.linkname) 1833 break 1834 1835 1836 class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 1837 1838 format = tarfile.GNU_FORMAT 1839 1840 def test_bad_pax_header(self): 1841 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1842 # without a hdrcharset=BINARY header. 1843 for encoding, name in ( 1844 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1845 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1846 with tarfile.open(tarname, encoding=encoding, 1847 errors="surrogateescape") as tar: 1848 try: 1849 t = tar.getmember(name) 1850 except KeyError: 1851 self.fail("unable to read bad GNU tar pax header") 1852 1853 1854 class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 1855 1856 format = tarfile.PAX_FORMAT 1857 1858 # PAX_FORMAT ignores encoding in write mode. 1859 test_unicode_filename_error = None 1860 1861 def test_binary_header(self): 1862 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1863 for encoding, name in ( 1864 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1865 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1866 with tarfile.open(tarname, encoding=encoding, 1867 errors="surrogateescape") as tar: 1868 try: 1869 t = tar.getmember(name) 1870 except KeyError: 1871 self.fail("unable to read POSIX.1-2008 binary header") 1872 1873 1874 class AppendTestBase: 1875 # Test append mode (cp. patch #1652681). 1876 1877 def setUp(self): 1878 self.tarname = tmpname 1879 if os.path.exists(self.tarname): 1880 support.unlink(self.tarname) 1881 1882 def _create_testtar(self, mode="w:"): 1883 with tarfile.open(tarname, encoding="iso8859-1") as src: 1884 t = src.getmember("ustar/regtype") 1885 t.name = "foo" 1886 with src.extractfile(t) as f: 1887 with tarfile.open(self.tarname, mode) as tar: 1888 tar.addfile(t, f) 1889 1890 def test_append_compressed(self): 1891 self._create_testtar("w:" + self.suffix) 1892 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1893 1894 class AppendTest(AppendTestBase, unittest.TestCase): 1895 test_append_compressed = None 1896 1897 def _add_testfile(self, fileobj=None): 1898 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1899 tar.addfile(tarfile.TarInfo("bar")) 1900 1901 def _test(self, names=["bar"], fileobj=None): 1902 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1903 self.assertEqual(tar.getnames(), names) 1904 1905 def test_non_existing(self): 1906 self._add_testfile() 1907 self._test() 1908 1909 def test_empty(self): 1910 tarfile.open(self.tarname, "w:").close() 1911 self._add_testfile() 1912 self._test() 1913 1914 def test_empty_fileobj(self): 1915 fobj = io.BytesIO(b"\0" * 1024) 1916 self._add_testfile(fobj) 1917 fobj.seek(0) 1918 self._test(fileobj=fobj) 1919 1920 def test_fileobj(self): 1921 self._create_testtar() 1922 with open(self.tarname, "rb") as fobj: 1923 data = fobj.read() 1924 fobj = io.BytesIO(data) 1925 self._add_testfile(fobj) 1926 fobj.seek(0) 1927 self._test(names=["foo", "bar"], fileobj=fobj) 1928 1929 def test_existing(self): 1930 self._create_testtar() 1931 self._add_testfile() 1932 self._test(names=["foo", "bar"]) 1933 1934 # Append mode is supposed to fail if the tarfile to append to 1935 # does not end with a zero block. 1936 def _test_error(self, data): 1937 with open(self.tarname, "wb") as fobj: 1938 fobj.write(data) 1939 self.assertRaises(tarfile.ReadError, self._add_testfile) 1940 1941 def test_null(self): 1942 self._test_error(b"") 1943 1944 def test_incomplete(self): 1945 self._test_error(b"\0" * 13) 1946 1947 def test_premature_eof(self): 1948 data = tarfile.TarInfo("foo").tobuf() 1949 self._test_error(data) 1950 1951 def test_trailing_garbage(self): 1952 data = tarfile.TarInfo("foo").tobuf() 1953 self._test_error(data + b"\0" * 13) 1954 1955 def test_invalid(self): 1956 self._test_error(b"a" * 512) 1957 1958 class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 1959 pass 1960 1961 class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 1962 pass 1963 1964 class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 1965 pass 1966 1967 1968 class LimitsTest(unittest.TestCase): 1969 1970 def test_ustar_limits(self): 1971 # 100 char name 1972 tarinfo = tarfile.TarInfo("0123456789" * 10) 1973 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1974 1975 # 101 char name that cannot be stored 1976 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1977 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1978 1979 # 256 char name with a slash at pos 156 1980 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1981 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1982 1983 # 256 char name that cannot be stored 1984 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1985 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1986 1987 # 512 char name 1988 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1989 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1990 1991 # 512 char linkname 1992 tarinfo = tarfile.TarInfo("longlink") 1993 tarinfo.linkname = "123/" * 126 + "longname" 1994 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1995 1996 # uid > 8 digits 1997 tarinfo = tarfile.TarInfo("name") 1998 tarinfo.uid = 0o10000000 1999 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2000 2001 def test_gnu_limits(self): 2002 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2003 tarinfo.tobuf(tarfile.GNU_FORMAT) 2004 2005 tarinfo = tarfile.TarInfo("longlink") 2006 tarinfo.linkname = "123/" * 126 + "longname" 2007 tarinfo.tobuf(tarfile.GNU_FORMAT) 2008 2009 # uid >= 256 ** 7 2010 tarinfo = tarfile.TarInfo("name") 2011 tarinfo.uid = 0o4000000000000000000 2012 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2013 2014 def test_pax_limits(self): 2015 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2016 tarinfo.tobuf(tarfile.PAX_FORMAT) 2017 2018 tarinfo = tarfile.TarInfo("longlink") 2019 tarinfo.linkname = "123/" * 126 + "longname" 2020 tarinfo.tobuf(tarfile.PAX_FORMAT) 2021 2022 tarinfo = tarfile.TarInfo("name") 2023 tarinfo.uid = 0o4000000000000000000 2024 tarinfo.tobuf(tarfile.PAX_FORMAT) 2025 2026 2027 class MiscTest(unittest.TestCase): 2028 2029 def test_char_fields(self): 2030 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2031 b"foo\0\0\0\0\0") 2032 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2033 b"foo") 2034 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2035 "foo") 2036 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2037 "foo") 2038 2039 def test_read_number_fields(self): 2040 # Issue 13158: Test if GNU tar specific base-256 number fields 2041 # are decoded correctly. 2042 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2043 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2044 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2045 0o10000000) 2046 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2047 0xffffffff) 2048 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2049 -1) 2050 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2051 -100) 2052 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2053 -0x100000000000000) 2054 2055 # Issue 24514: Test if empty number fields are converted to zero. 2056 self.assertEqual(tarfile.nti(b"\0"), 0) 2057 self.assertEqual(tarfile.nti(b" \0"), 0) 2058 2059 def test_write_number_fields(self): 2060 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2061 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2062 self.assertEqual(tarfile.itn(0o10000000), 2063 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2064 self.assertEqual(tarfile.itn(0xffffffff), 2065 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2066 self.assertEqual(tarfile.itn(-1), 2067 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2068 self.assertEqual(tarfile.itn(-100), 2069 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2070 self.assertEqual(tarfile.itn(-0x100000000000000), 2071 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2072 2073 def test_number_field_limits(self): 2074 with self.assertRaises(ValueError): 2075 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2076 with self.assertRaises(ValueError): 2077 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2078 with self.assertRaises(ValueError): 2079 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2080 with self.assertRaises(ValueError): 2081 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2082 2083 def test__all__(self): 2084 blacklist = {'version', 'grp', 'pwd', 'symlink_exception', 2085 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC', 2086 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK', 2087 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2088 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 2089 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 2090 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 2091 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES', 2092 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS', 2093 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj', 2094 'filemode', 2095 'EmptyHeaderError', 'TruncatedHeaderError', 2096 'EOFHeaderError', 'InvalidHeaderError', 2097 'SubsequentHeaderError', 'ExFileObject', 2098 'main'} 2099 support.check__all__(self, tarfile, blacklist=blacklist) 2100 2101 2102 class CommandLineTest(unittest.TestCase): 2103 2104 def tarfilecmd(self, *args, **kwargs): 2105 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2106 **kwargs) 2107 return out.replace(os.linesep.encode(), b'\n') 2108 2109 def tarfilecmd_failure(self, *args): 2110 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2111 2112 def make_simple_tarfile(self, tar_name): 2113 files = [support.findfile('tokenize_tests.txt'), 2114 support.findfile('tokenize_tests-no-coding-cookie-' 2115 'and-utf8-bom-sig-only.txt')] 2116 self.addCleanup(support.unlink, tar_name) 2117 with tarfile.open(tar_name, 'w') as tf: 2118 for tardata in files: 2119 tf.add(tardata, arcname=os.path.basename(tardata)) 2120 2121 def test_test_command(self): 2122 for tar_name in testtarnames: 2123 for opt in '-t', '--test': 2124 out = self.tarfilecmd(opt, tar_name) 2125 self.assertEqual(out, b'') 2126 2127 def test_test_command_verbose(self): 2128 for tar_name in testtarnames: 2129 for opt in '-v', '--verbose': 2130 out = self.tarfilecmd(opt, '-t', tar_name) 2131 self.assertIn(b'is a tar archive.\n', out) 2132 2133 def test_test_command_invalid_file(self): 2134 zipname = support.findfile('zipdir.zip') 2135 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2136 self.assertIn(b' is not a tar archive.', err) 2137 self.assertEqual(out, b'') 2138 self.assertEqual(rc, 1) 2139 2140 for tar_name in testtarnames: 2141 with self.subTest(tar_name=tar_name): 2142 with open(tar_name, 'rb') as f: 2143 data = f.read() 2144 try: 2145 with open(tmpname, 'wb') as f: 2146 f.write(data[:511]) 2147 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2148 self.assertEqual(out, b'') 2149 self.assertEqual(rc, 1) 2150 finally: 2151 support.unlink(tmpname) 2152 2153 def test_list_command(self): 2154 for tar_name in testtarnames: 2155 with support.captured_stdout() as t: 2156 with tarfile.open(tar_name, 'r') as tf: 2157 tf.list(verbose=False) 2158 expected = t.getvalue().encode('ascii', 'backslashreplace') 2159 for opt in '-l', '--list': 2160 out = self.tarfilecmd(opt, tar_name, 2161 PYTHONIOENCODING='ascii') 2162 self.assertEqual(out, expected) 2163 2164 def test_list_command_verbose(self): 2165 for tar_name in testtarnames: 2166 with support.captured_stdout() as t: 2167 with tarfile.open(tar_name, 'r') as tf: 2168 tf.list(verbose=True) 2169 expected = t.getvalue().encode('ascii', 'backslashreplace') 2170 for opt in '-v', '--verbose': 2171 out = self.tarfilecmd(opt, '-l', tar_name, 2172 PYTHONIOENCODING='ascii') 2173 self.assertEqual(out, expected) 2174 2175 def test_list_command_invalid_file(self): 2176 zipname = support.findfile('zipdir.zip') 2177 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2178 self.assertIn(b' is not a tar archive.', err) 2179 self.assertEqual(out, b'') 2180 self.assertEqual(rc, 1) 2181 2182 def test_create_command(self): 2183 files = [support.findfile('tokenize_tests.txt'), 2184 support.findfile('tokenize_tests-no-coding-cookie-' 2185 'and-utf8-bom-sig-only.txt')] 2186 for opt in '-c', '--create': 2187 try: 2188 out = self.tarfilecmd(opt, tmpname, *files) 2189 self.assertEqual(out, b'') 2190 with tarfile.open(tmpname) as tar: 2191 tar.getmembers() 2192 finally: 2193 support.unlink(tmpname) 2194 2195 def test_create_command_verbose(self): 2196 files = [support.findfile('tokenize_tests.txt'), 2197 support.findfile('tokenize_tests-no-coding-cookie-' 2198 'and-utf8-bom-sig-only.txt')] 2199 for opt in '-v', '--verbose': 2200 try: 2201 out = self.tarfilecmd(opt, '-c', tmpname, *files) 2202 self.assertIn(b' file created.', out) 2203 with tarfile.open(tmpname) as tar: 2204 tar.getmembers() 2205 finally: 2206 support.unlink(tmpname) 2207 2208 def test_create_command_dotless_filename(self): 2209 files = [support.findfile('tokenize_tests.txt')] 2210 try: 2211 out = self.tarfilecmd('-c', dotlessname, *files) 2212 self.assertEqual(out, b'') 2213 with tarfile.open(dotlessname) as tar: 2214 tar.getmembers() 2215 finally: 2216 support.unlink(dotlessname) 2217 2218 def test_create_command_dot_started_filename(self): 2219 tar_name = os.path.join(TEMPDIR, ".testtar") 2220 files = [support.findfile('tokenize_tests.txt')] 2221 try: 2222 out = self.tarfilecmd('-c', tar_name, *files) 2223 self.assertEqual(out, b'') 2224 with tarfile.open(tar_name) as tar: 2225 tar.getmembers() 2226 finally: 2227 support.unlink(tar_name) 2228 2229 def test_create_command_compressed(self): 2230 files = [support.findfile('tokenize_tests.txt'), 2231 support.findfile('tokenize_tests-no-coding-cookie-' 2232 'and-utf8-bom-sig-only.txt')] 2233 for filetype in (GzipTest, Bz2Test, LzmaTest): 2234 if not filetype.open: 2235 continue 2236 try: 2237 tar_name = tmpname + '.' + filetype.suffix 2238 out = self.tarfilecmd('-c', tar_name, *files) 2239 with filetype.taropen(tar_name) as tar: 2240 tar.getmembers() 2241 finally: 2242 support.unlink(tar_name) 2243 2244 def test_extract_command(self): 2245 self.make_simple_tarfile(tmpname) 2246 for opt in '-e', '--extract': 2247 try: 2248 with support.temp_cwd(tarextdir): 2249 out = self.tarfilecmd(opt, tmpname) 2250 self.assertEqual(out, b'') 2251 finally: 2252 support.rmtree(tarextdir) 2253 2254 def test_extract_command_verbose(self): 2255 self.make_simple_tarfile(tmpname) 2256 for opt in '-v', '--verbose': 2257 try: 2258 with support.temp_cwd(tarextdir): 2259 out = self.tarfilecmd(opt, '-e', tmpname) 2260 self.assertIn(b' file is extracted.', out) 2261 finally: 2262 support.rmtree(tarextdir) 2263 2264 def test_extract_command_different_directory(self): 2265 self.make_simple_tarfile(tmpname) 2266 try: 2267 with support.temp_cwd(tarextdir): 2268 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2269 self.assertEqual(out, b'') 2270 finally: 2271 support.rmtree(tarextdir) 2272 2273 def test_extract_command_invalid_file(self): 2274 zipname = support.findfile('zipdir.zip') 2275 with support.temp_cwd(tarextdir): 2276 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2277 self.assertIn(b' is not a tar archive.', err) 2278 self.assertEqual(out, b'') 2279 self.assertEqual(rc, 1) 2280 2281 2282 class ContextManagerTest(unittest.TestCase): 2283 2284 def test_basic(self): 2285 with tarfile.open(tarname) as tar: 2286 self.assertFalse(tar.closed, "closed inside runtime context") 2287 self.assertTrue(tar.closed, "context manager failed") 2288 2289 def test_closed(self): 2290 # The __enter__() method is supposed to raise OSError 2291 # if the TarFile object is already closed. 2292 tar = tarfile.open(tarname) 2293 tar.close() 2294 with self.assertRaises(OSError): 2295 with tar: 2296 pass 2297 2298 def test_exception(self): 2299 # Test if the OSError exception is passed through properly. 2300 with self.assertRaises(Exception) as exc: 2301 with tarfile.open(tarname) as tar: 2302 raise OSError 2303 self.assertIsInstance(exc.exception, OSError, 2304 "wrong exception raised in context manager") 2305 self.assertTrue(tar.closed, "context manager failed") 2306 2307 def test_no_eof(self): 2308 # __exit__() must not write end-of-archive blocks if an 2309 # exception was raised. 2310 try: 2311 with tarfile.open(tmpname, "w") as tar: 2312 raise Exception 2313 except: 2314 pass 2315 self.assertEqual(os.path.getsize(tmpname), 0, 2316 "context manager wrote an end-of-archive block") 2317 self.assertTrue(tar.closed, "context manager failed") 2318 2319 def test_eof(self): 2320 # __exit__() must write end-of-archive blocks, i.e. call 2321 # TarFile.close() if there was no error. 2322 with tarfile.open(tmpname, "w"): 2323 pass 2324 self.assertNotEqual(os.path.getsize(tmpname), 0, 2325 "context manager wrote no end-of-archive block") 2326 2327 def test_fileobj(self): 2328 # Test that __exit__() did not close the external file 2329 # object. 2330 with open(tmpname, "wb") as fobj: 2331 try: 2332 with tarfile.open(fileobj=fobj, mode="w") as tar: 2333 raise Exception 2334 except: 2335 pass 2336 self.assertFalse(fobj.closed, "external file object was closed") 2337 self.assertTrue(tar.closed, "context manager failed") 2338 2339 2340 @unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2341 class LinkEmulationTest(ReadTest, unittest.TestCase): 2342 2343 # Test for issue #8741 regression. On platforms that do not support 2344 # symbolic or hard links tarfile tries to extract these types of members 2345 # as the regular files they point to. 2346 def _test_link_extraction(self, name): 2347 self.tar.extract(name, TEMPDIR) 2348 with open(os.path.join(TEMPDIR, name), "rb") as f: 2349 data = f.read() 2350 self.assertEqual(md5sum(data), md5_regtype) 2351 2352 # See issues #1578269, #8879, and #17689 for some history on these skips 2353 @unittest.skipIf(hasattr(os.path, "islink"), 2354 "Skip emulation - has os.path.islink but not os.link") 2355 def test_hardlink_extraction1(self): 2356 self._test_link_extraction("ustar/lnktype") 2357 2358 @unittest.skipIf(hasattr(os.path, "islink"), 2359 "Skip emulation - has os.path.islink but not os.link") 2360 def test_hardlink_extraction2(self): 2361 self._test_link_extraction("./ustar/linktest2/lnktype") 2362 2363 @unittest.skipIf(hasattr(os, "symlink"), 2364 "Skip emulation if symlink exists") 2365 def test_symlink_extraction1(self): 2366 self._test_link_extraction("ustar/symtype") 2367 2368 @unittest.skipIf(hasattr(os, "symlink"), 2369 "Skip emulation if symlink exists") 2370 def test_symlink_extraction2(self): 2371 self._test_link_extraction("./ustar/linktest2/symtype") 2372 2373 2374 class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2375 # Issue5068: The _BZ2Proxy.read() method loops forever 2376 # on an empty or partial bzipped file. 2377 2378 def _test_partial_input(self, mode): 2379 class MyBytesIO(io.BytesIO): 2380 hit_eof = False 2381 def read(self, n): 2382 if self.hit_eof: 2383 raise AssertionError("infinite loop detected in " 2384 "tarfile.open()") 2385 self.hit_eof = self.tell() == len(self.getvalue()) 2386 return super(MyBytesIO, self).read(n) 2387 def seek(self, *args): 2388 self.hit_eof = False 2389 return super(MyBytesIO, self).seek(*args) 2390 2391 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2392 for x in range(len(data) + 1): 2393 try: 2394 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2395 except tarfile.ReadError: 2396 pass # we have no interest in ReadErrors 2397 2398 def test_partial_input(self): 2399 self._test_partial_input("r") 2400 2401 def test_partial_input_bz2(self): 2402 self._test_partial_input("r:bz2") 2403 2404 2405 def root_is_uid_gid_0(): 2406 try: 2407 import pwd, grp 2408 except ImportError: 2409 return False 2410 if pwd.getpwuid(0)[0] != 'root': 2411 return False 2412 if grp.getgrgid(0)[0] != 'root': 2413 return False 2414 return True 2415 2416 2417 @unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2418 @unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2419 class NumericOwnerTest(unittest.TestCase): 2420 # mock the following: 2421 # os.chown: so we can test what's being called 2422 # os.chmod: so the modes are not actually changed. if they are, we can't 2423 # delete the files/directories 2424 # os.geteuid: so we can lie and say we're root (uid = 0) 2425 2426 @staticmethod 2427 def _make_test_archive(filename_1, dirname_1, filename_2): 2428 # the file contents to write 2429 fobj = io.BytesIO(b"content") 2430 2431 # create a tar file with a file, a directory, and a file within that 2432 # directory. Assign various .uid/.gid values to them 2433 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2434 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2435 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2436 ] 2437 with tarfile.open(tmpname, 'w') as tarfl: 2438 for name, uid, gid, typ, contents in items: 2439 t = tarfile.TarInfo(name) 2440 t.uid = uid 2441 t.gid = gid 2442 t.uname = 'root' 2443 t.gname = 'root' 2444 t.type = typ 2445 tarfl.addfile(t, contents) 2446 2447 # return the full pathname to the tar file 2448 return tmpname 2449 2450 @staticmethod 2451 @contextmanager 2452 def _setup_test(mock_geteuid): 2453 mock_geteuid.return_value = 0 # lie and say we're root 2454 fname = 'numeric-owner-testfile' 2455 dirname = 'dir' 2456 2457 # the names we want stored in the tarfile 2458 filename_1 = fname 2459 dirname_1 = dirname 2460 filename_2 = os.path.join(dirname, fname) 2461 2462 # create the tarfile with the contents we're after 2463 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2464 dirname_1, 2465 filename_2) 2466 2467 # open the tarfile for reading. yield it and the names of the items 2468 # we stored into the file 2469 with tarfile.open(tar_filename) as tarfl: 2470 yield tarfl, filename_1, dirname_1, filename_2 2471 2472 @unittest.mock.patch('os.chown') 2473 @unittest.mock.patch('os.chmod') 2474 @unittest.mock.patch('os.geteuid') 2475 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2476 mock_chown): 2477 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2478 filename_2): 2479 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2480 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2481 2482 # convert to filesystem paths 2483 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2484 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2485 2486 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2487 unittest.mock.call(f_filename_2, 88, 87), 2488 ], 2489 any_order=True) 2490 2491 @unittest.mock.patch('os.chown') 2492 @unittest.mock.patch('os.chmod') 2493 @unittest.mock.patch('os.geteuid') 2494 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2495 mock_chown): 2496 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2497 filename_2): 2498 tarfl.extractall(TEMPDIR, numeric_owner=True) 2499 2500 # convert to filesystem paths 2501 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2502 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2503 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2504 2505 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2506 unittest.mock.call(f_dirname_1, 77, 76), 2507 unittest.mock.call(f_filename_2, 88, 87), 2508 ], 2509 any_order=True) 2510 2511 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2512 # because the uname and gname in the test file are 'root', and extract() 2513 # will look them up using pwd and grp to find their uid and gid, which we 2514 # test here to be 0. 2515 @unittest.skipUnless(root_is_uid_gid_0(), 2516 'uid=0,gid=0 must be named "root"') 2517 @unittest.mock.patch('os.chown') 2518 @unittest.mock.patch('os.chmod') 2519 @unittest.mock.patch('os.geteuid') 2520 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2521 mock_chown): 2522 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2523 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2524 2525 # convert to filesystem paths 2526 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2527 2528 mock_chown.assert_called_with(f_filename_1, 0, 0) 2529 2530 @unittest.mock.patch('os.geteuid') 2531 def test_keyword_only(self, mock_geteuid): 2532 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2533 self.assertRaises(TypeError, 2534 tarfl.extract, filename_1, TEMPDIR, False, True) 2535 2536 2537 def setUpModule(): 2538 support.unlink(TEMPDIR) 2539 os.makedirs(TEMPDIR) 2540 2541 global testtarnames 2542 testtarnames = [tarname] 2543 with open(tarname, "rb") as fobj: 2544 data = fobj.read() 2545 2546 # Create compressed tarfiles. 2547 for c in GzipTest, Bz2Test, LzmaTest: 2548 if c.open: 2549 support.unlink(c.tarname) 2550 testtarnames.append(c.tarname) 2551 with c.open(c.tarname, "wb") as tar: 2552 tar.write(data) 2553 2554 def tearDownModule(): 2555 if os.path.exists(TEMPDIR): 2556 support.rmtree(TEMPDIR) 2557 2558 if __name__ == "__main__": 2559 unittest.main() 2560