Home | History | Annotate | Download | only in test
      1 import sys
      2 import os
      3 import shutil
      4 import StringIO
      5 from binascii import unhexlify
      6 from hashlib import md5
      7 from random import Random
      8 import errno
      9 
     10 import unittest
     11 import tarfile
     12 
     13 from test import test_support
     14 from test import test_support as support
     15 
     16 # Check for our compression modules.
     17 try:
     18     import gzip
     19     gzip.GzipFile
     20 except (ImportError, AttributeError):
     21     gzip = None
     22 try:
     23     import bz2
     24 except ImportError:
     25     bz2 = None
     26 
     27 def md5sum(data):
     28     return md5(data).hexdigest()
     29 
     30 TEMPDIR = os.path.abspath(test_support.TESTFN)
     31 tarname = test_support.findfile("testtar.tar")
     32 gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
     33 bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
     34 tmpname = os.path.join(TEMPDIR, "tmp.tar")
     35 
     36 md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
     37 md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
     38 
     39 
     40 class ReadTest(unittest.TestCase):
     41 
     42     tarname = tarname
     43     mode = "r:"
     44 
     45     def setUp(self):
     46         self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
     47 
     48     def tearDown(self):
     49         self.tar.close()
     50 
     51 
     52 class UstarReadTest(ReadTest):
     53 
     54     def test_fileobj_regular_file(self):
     55         tarinfo = self.tar.getmember("ustar/regtype")
     56         fobj = self.tar.extractfile(tarinfo)
     57         data = fobj.read()
     58         self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
     59                 "regular file extraction failed")
     60 
     61     def test_fileobj_readlines(self):
     62         self.tar.extract("ustar/regtype", TEMPDIR)
     63         tarinfo = self.tar.getmember("ustar/regtype")
     64         fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
     65         with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
     66             lines1 = fobj1.readlines()
     67         fobj2 = self.tar.extractfile(tarinfo)
     68 
     69         lines2 = fobj2.readlines()
     70         self.assertTrue(lines1 == lines2,
     71                 "fileobj.readlines() failed")
     72         self.assertTrue(len(lines2) == 114,
     73                 "fileobj.readlines() failed")
     74         self.assertTrue(lines2[83] ==
     75                 "I will gladly admit that Python is not the fastest running scripting language.\n",
     76                 "fileobj.readlines() failed")
     77 
     78     def test_fileobj_iter(self):
     79         self.tar.extract("ustar/regtype", TEMPDIR)
     80         tarinfo = self.tar.getmember("ustar/regtype")
     81         with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
     82             lines1 = fobj1.readlines()
     83         fobj2 = self.tar.extractfile(tarinfo)
     84         lines2 = [line for line in fobj2]
     85         self.assertTrue(lines1 == lines2,
     86                      "fileobj.__iter__() failed")
     87 
     88     def test_fileobj_seek(self):
     89         self.tar.extract("ustar/regtype", TEMPDIR)
     90         with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
     91             data = fobj.read()
     92 
     93         tarinfo = self.tar.getmember("ustar/regtype")
     94         fobj = self.tar.extractfile(tarinfo)
     95 
     96         text = fobj.read()
     97         fobj.seek(0)
     98         self.assertTrue(0 == fobj.tell(),
     99                      "seek() to file's start failed")
    100         fobj.seek(2048, 0)
    101         self.assertTrue(2048 == fobj.tell(),
    102                      "seek() to absolute position failed")
    103         fobj.seek(-1024, 1)
    104         self.assertTrue(1024 == fobj.tell(),
    105                      "seek() to negative relative position failed")
    106         fobj.seek(1024, 1)
    107         self.assertTrue(2048 == fobj.tell(),
    108                      "seek() to positive relative position failed")
    109         s = fobj.read(10)
    110         self.assertTrue(s == data[2048:2058],
    111                      "read() after seek failed")
    112         fobj.seek(0, 2)
    113         self.assertTrue(tarinfo.size == fobj.tell(),
    114                      "seek() to file's end failed")
    115         self.assertTrue(fobj.read() == "",
    116                      "read() at file's end did not return empty string")
    117         fobj.seek(-tarinfo.size, 2)
    118         self.assertTrue(0 == fobj.tell(),
    119                      "relative seek() to file's start failed")
    120         fobj.seek(512)
    121         s1 = fobj.readlines()
    122         fobj.seek(512)
    123         s2 = fobj.readlines()
    124         self.assertTrue(s1 == s2,
    125                      "readlines() after seek failed")
    126         fobj.seek(0)
    127         self.assertTrue(len(fobj.readline()) == fobj.tell(),
    128                      "tell() after readline() failed")
    129         fobj.seek(512)
    130         self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
    131                      "tell() after seek() and readline() failed")
    132         fobj.seek(0)
    133         line = fobj.readline()
    134         self.assertTrue(fobj.read() == data[len(line):],
    135                      "read() after readline() failed")
    136         fobj.close()
    137 
    138     # Test if symbolic and hard links are resolved by extractfile().  The
    139     # test link members each point to a regular member whose data is
    140     # supposed to be exported.
    141     def _test_fileobj_link(self, lnktype, regtype):
    142         a = self.tar.extractfile(lnktype)
    143         b = self.tar.extractfile(regtype)
    144         self.assertEqual(a.name, b.name)
    145 
    146     def test_fileobj_link1(self):
    147         self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
    148 
    149     def test_fileobj_link2(self):
    150         self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
    151 
    152     def test_fileobj_symlink1(self):
    153         self._test_fileobj_link("ustar/symtype", "ustar/regtype")
    154 
    155     def test_fileobj_symlink2(self):
    156         self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
    157 
    158     def test_issue14160(self):
    159         self._test_fileobj_link("symtype2", "ustar/regtype")
    160 
    161 
    162 class ListTest(ReadTest, unittest.TestCase):
    163 
    164     # Override setUp to use default encoding (UTF-8)
    165     def setUp(self):
    166         self.tar = tarfile.open(self.tarname, mode=self.mode)
    167 
    168     def test_list(self):
    169         with test_support.captured_stdout() as t:
    170             self.tar.list(verbose=False)
    171         out = t.getvalue()
    172         self.assertIn('ustar/conttype', out)
    173         self.assertIn('ustar/regtype', out)
    174         self.assertIn('ustar/lnktype', out)
    175         self.assertIn('ustar' + ('/12345' * 40) + '67/longname', out)
    176         self.assertIn('./ustar/linktest2/symtype', out)
    177         self.assertIn('./ustar/linktest2/lnktype', out)
    178         # Make sure it puts trailing slash for directory
    179         self.assertIn('ustar/dirtype/', out)
    180         self.assertIn('ustar/dirtype-with-size/', out)
    181         # Make sure it is able to print non-ASCII characters
    182         self.assertIn('ustar/umlauts-'
    183                       '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
    184         self.assertIn('misc/regtype-hpux-signed-chksum-'
    185                       '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
    186         self.assertIn('misc/regtype-old-v7-signed-chksum-'
    187                       '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
    188         # Make sure it prints files separated by one newline without any
    189         # 'ls -l'-like accessories if verbose flag is not being used
    190         # ...
    191         # ustar/conttype
    192         # ustar/regtype
    193         # ...
    194         self.assertRegexpMatches(out, r'ustar/conttype ?\r?\n'
    195                                       r'ustar/regtype ?\r?\n')
    196         # Make sure it does not print the source of link without verbose flag
    197         self.assertNotIn('link to', out)
    198         self.assertNotIn('->', out)
    199 
    200     def test_list_verbose(self):
    201         with test_support.captured_stdout() as t:
    202             self.tar.list(verbose=True)
    203         out = t.getvalue()
    204         # Make sure it prints files separated by one newline with 'ls -l'-like
    205         # accessories if verbose flag is being used
    206         # ...
    207         # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
    208         # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
    209         # ...
    210         self.assertRegexpMatches(out, (r'-rw-r--r-- tarfile/tarfile\s+7011 '
    211                                        r'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
    212                                        r'ustar/\w+type ?\r?\n') * 2)
    213         # Make sure it prints the source of link with verbose flag
    214         self.assertIn('ustar/symtype -> regtype', out)
    215         self.assertIn('./ustar/linktest2/symtype -> ../linktest1/regtype', out)
    216         self.assertIn('./ustar/linktest2/lnktype link to '
    217                       './ustar/linktest1/regtype', out)
    218         self.assertIn('gnu' + ('/123' * 125) + '/longlink link to gnu' +
    219                       ('/123' * 125) + '/longname', out)
    220         self.assertIn('pax' + ('/123' * 125) + '/longlink link to pax' +
    221                       ('/123' * 125) + '/longname', out)
    222 
    223 
    224 class GzipListTest(ListTest):
    225     tarname = gzipname
    226     mode = "r:gz"
    227     taropen = tarfile.TarFile.gzopen
    228 
    229 
    230 class Bz2ListTest(ListTest):
    231     tarname = bz2name
    232     mode = "r:bz2"
    233     taropen = tarfile.TarFile.bz2open
    234 
    235 
    236 class CommonReadTest(ReadTest):
    237 
    238     def test_empty_tarfile(self):
    239         # Test for issue6123: Allow opening empty archives.
    240         # This test checks if tarfile.open() is able to open an empty tar
    241         # archive successfully. Note that an empty tar archive is not the
    242         # same as an empty file!
    243         with tarfile.open(tmpname, self.mode.replace("r", "w")):
    244             pass
    245         try:
    246             tar = tarfile.open(tmpname, self.mode)
    247             tar.getnames()
    248         except tarfile.ReadError:
    249             self.fail("tarfile.open() failed on empty archive")
    250         else:
    251             self.assertListEqual(tar.getmembers(), [])
    252         finally:
    253             tar.close()
    254 
    255     def test_null_tarfile(self):
    256         # Test for issue6123: Allow opening empty archives.
    257         # This test guarantees that tarfile.open() does not treat an empty
    258         # file as an empty tar archive.
    259         with open(tmpname, "wb"):
    260             pass
    261         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
    262         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
    263 
    264     def test_non_existent_tarfile(self):
    265         # Test for issue11513: prevent non-existent gzipped tarfiles raising
    266         # multiple exceptions.
    267         exctype = OSError if '|' in self.mode else IOError
    268         with self.assertRaisesRegexp(exctype, "xxx") as ex:
    269             tarfile.open("xxx", self.mode)
    270         self.assertEqual(ex.exception.errno, errno.ENOENT)
    271 
    272     def test_ignore_zeros(self):
    273         # Test TarFile's ignore_zeros option.
    274         if self.mode.endswith(":gz"):
    275             _open = gzip.GzipFile
    276         elif self.mode.endswith(":bz2"):
    277             _open = bz2.BZ2File
    278         else:
    279             _open = open
    280 
    281         # generate 512 pseudorandom bytes
    282         data = unhexlify('%1024x' % Random(0).getrandbits(512*8))
    283         for char in ('\0', 'a'):
    284             # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
    285             # are ignored correctly.
    286             with _open(tmpname, "wb") as fobj:
    287                 fobj.write(char * 1024)
    288                 tarinfo = tarfile.TarInfo("foo")
    289                 tarinfo.size = len(data)
    290                 fobj.write(tarinfo.tobuf())
    291                 fobj.write(data)
    292 
    293             tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
    294             try:
    295                 self.assertListEqual(tar.getnames(), ["foo"],
    296                     "ignore_zeros=True should have skipped the %r-blocks" % char)
    297             finally:
    298                 tar.close()
    299 
    300     def test_premature_end_of_archive(self):
    301         for size in (512, 600, 1024, 1200):
    302             with tarfile.open(tmpname, "w:") as tar:
    303                 t = tarfile.TarInfo("foo")
    304                 t.size = 1024
    305                 tar.addfile(t, StringIO.StringIO("a" * 1024))
    306 
    307             with open(tmpname, "r+b") as fobj:
    308                 fobj.truncate(size)
    309 
    310             with tarfile.open(tmpname) as tar:
    311                 with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
    312                     for t in tar:
    313                         pass
    314 
    315             with tarfile.open(tmpname) as tar:
    316                 t = tar.next()
    317 
    318                 with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
    319                     tar.extract(t, TEMPDIR)
    320 
    321                 with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
    322                     tar.extractfile(t).read()
    323 
    324 
    325 class MiscReadTest(CommonReadTest):
    326     taropen = tarfile.TarFile.taropen
    327 
    328     def test_no_name_argument(self):
    329         with open(self.tarname, "rb") as fobj:
    330             tar = tarfile.open(fileobj=fobj, mode=self.mode)
    331             self.assertEqual(tar.name, os.path.abspath(fobj.name))
    332 
    333     def test_no_name_attribute(self):
    334         with open(self.tarname, "rb") as fobj:
    335             data = fobj.read()
    336         fobj = StringIO.StringIO(data)
    337         self.assertRaises(AttributeError, getattr, fobj, "name")
    338         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    339         self.assertEqual(tar.name, None)
    340 
    341     def test_empty_name_attribute(self):
    342         with open(self.tarname, "rb") as fobj:
    343             data = fobj.read()
    344         fobj = StringIO.StringIO(data)
    345         fobj.name = ""
    346         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    347         self.assertEqual(tar.name, None)
    348 
    349     def test_illegal_mode_arg(self):
    350         with open(tmpname, 'wb'):
    351             pass
    352         self.addCleanup(os.unlink, tmpname)
    353         with self.assertRaisesRegexp(ValueError, 'mode must be '):
    354             tar = self.taropen(tmpname, 'q')
    355         with self.assertRaisesRegexp(ValueError, 'mode must be '):
    356             tar = self.taropen(tmpname, 'rw')
    357         with self.assertRaisesRegexp(ValueError, 'mode must be '):
    358             tar = self.taropen(tmpname, '')
    359 
    360     def test_fileobj_with_offset(self):
    361         # Skip the first member and store values from the second member
    362         # of the testtar.
    363         tar = tarfile.open(self.tarname, mode=self.mode)
    364         try:
    365             tar.next()
    366             t = tar.next()
    367             name = t.name
    368             offset = t.offset
    369             data = tar.extractfile(t).read()
    370         finally:
    371             tar.close()
    372 
    373         # Open the testtar and seek to the offset of the second member.
    374         if self.mode.endswith(":gz"):
    375             _open = gzip.GzipFile
    376         elif self.mode.endswith(":bz2"):
    377             _open = bz2.BZ2File
    378         else:
    379             _open = open
    380         fobj = _open(self.tarname, "rb")
    381         try:
    382             fobj.seek(offset)
    383 
    384             # Test if the tarfile starts with the second member.
    385             tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
    386             t = tar.next()
    387             self.assertEqual(t.name, name)
    388             # Read to the end of fileobj and test if seeking back to the
    389             # beginning works.
    390             tar.getmembers()
    391             self.assertEqual(tar.extractfile(t).read(), data,
    392                     "seek back did not work")
    393             tar.close()
    394         finally:
    395             fobj.close()
    396 
    397     def test_fail_comp(self):
    398         # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
    399         if self.mode == "r:":
    400             self.skipTest('needs a gz or bz2 mode')
    401         self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
    402         with open(tarname, "rb") as fobj:
    403             self.assertRaises(tarfile.ReadError, tarfile.open,
    404                               fileobj=fobj, mode=self.mode)
    405 
    406     def test_v7_dirtype(self):
    407         # Test old style dirtype member (bug #1336623):
    408         # Old V7 tars create directory members using an AREGTYPE
    409         # header with a "/" appended to the filename field.
    410         tarinfo = self.tar.getmember("misc/dirtype-old-v7")
    411         self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
    412                 "v7 dirtype failed")
    413 
    414     def test_xstar_type(self):
    415         # The xstar format stores extra atime and ctime fields inside the
    416         # space reserved for the prefix field. The prefix field must be
    417         # ignored in this case, otherwise it will mess up the name.
    418         try:
    419             self.tar.getmember("misc/regtype-xstar")
    420         except KeyError:
    421             self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
    422 
    423     def test_check_members(self):
    424         for tarinfo in self.tar:
    425             self.assertTrue(int(tarinfo.mtime) == 07606136617,
    426                     "wrong mtime for %s" % tarinfo.name)
    427             if not tarinfo.name.startswith("ustar/"):
    428                 continue
    429             self.assertTrue(tarinfo.uname == "tarfile",
    430                     "wrong uname for %s" % tarinfo.name)
    431 
    432     def test_find_members(self):
    433         self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
    434                 "could not find all members")
    435 
    436     def test_extract_hardlink(self):
    437         # Test hardlink extraction (e.g. bug #857297).
    438         with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
    439             tar.extract("ustar/regtype", TEMPDIR)
    440             self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
    441 
    442             tar.extract("ustar/lnktype", TEMPDIR)
    443             self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
    444             with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
    445                 data = f.read()
    446             self.assertEqual(md5sum(data), md5_regtype)
    447 
    448             tar.extract("ustar/symtype", TEMPDIR)
    449             self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
    450             with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
    451                 data = f.read()
    452             self.assertEqual(md5sum(data), md5_regtype)
    453 
    454     def test_extractall(self):
    455         # Test if extractall() correctly restores directory permissions
    456         # and times (see issue1735).
    457         tar = tarfile.open(tarname, encoding="iso8859-1")
    458         try:
    459             directories = [t for t in tar if t.isdir()]
    460             tar.extractall(TEMPDIR, directories)
    461             for tarinfo in directories:
    462                 path = os.path.join(TEMPDIR, tarinfo.name)
    463                 if sys.platform != "win32":
    464                     # Win32 has no support for fine grained permissions.
    465                     self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
    466                 self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
    467         finally:
    468             tar.close()
    469 
    470     def test_init_close_fobj(self):
    471         # Issue #7341: Close the internal file object in the TarFile
    472         # constructor in case of an error. For the test we rely on
    473         # the fact that opening an empty file raises a ReadError.
    474         empty = os.path.join(TEMPDIR, "empty")
    475         with open(empty, "wb") as fobj:
    476             fobj.write("")
    477 
    478         try:
    479             tar = object.__new__(tarfile.TarFile)
    480             try:
    481                 tar.__init__(empty)
    482             except tarfile.ReadError:
    483                 self.assertTrue(tar.fileobj.closed)
    484             else:
    485                 self.fail("ReadError not raised")
    486         finally:
    487             support.unlink(empty)
    488 
    489     def test_parallel_iteration(self):
    490         # Issue #16601: Restarting iteration over tarfile continued
    491         # from where it left off.
    492         with tarfile.open(self.tarname) as tar:
    493             for m1, m2 in zip(tar, tar):
    494                 self.assertEqual(m1.offset, m2.offset)
    495                 self.assertEqual(m1.name, m2.name)
    496 
    497 
    498 class StreamReadTest(CommonReadTest):
    499 
    500     mode="r|"
    501 
    502     def test_fileobj_regular_file(self):
    503         tarinfo = self.tar.next() # get "regtype" (can't use getmember)
    504         fobj = self.tar.extractfile(tarinfo)
    505         data = fobj.read()
    506         self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
    507                 "regular file extraction failed")
    508 
    509     def test_provoke_stream_error(self):
    510         tarinfos = self.tar.getmembers()
    511         f = self.tar.extractfile(tarinfos[0]) # read the first member
    512         self.assertRaises(tarfile.StreamError, f.read)
    513 
    514     def test_compare_members(self):
    515         tar1 = tarfile.open(tarname, encoding="iso8859-1")
    516         try:
    517             tar2 = self.tar
    518 
    519             while True:
    520                 t1 = tar1.next()
    521                 t2 = tar2.next()
    522                 if t1 is None:
    523                     break
    524                 self.assertTrue(t2 is not None, "stream.next() failed.")
    525 
    526                 if t2.islnk() or t2.issym():
    527                     self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
    528                     continue
    529 
    530                 v1 = tar1.extractfile(t1)
    531                 v2 = tar2.extractfile(t2)
    532                 if v1 is None:
    533                     continue
    534                 self.assertTrue(v2 is not None, "stream.extractfile() failed")
    535                 self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
    536         finally:
    537             tar1.close()
    538 
    539 
    540 class DetectReadTest(unittest.TestCase):
    541 
    542     def _testfunc_file(self, name, mode):
    543         try:
    544             tar = tarfile.open(name, mode)
    545         except tarfile.ReadError:
    546             self.fail()
    547         else:
    548             tar.close()
    549 
    550     def _testfunc_fileobj(self, name, mode):
    551         try:
    552             tar = tarfile.open(name, mode, fileobj=open(name, "rb"))
    553         except tarfile.ReadError:
    554             self.fail()
    555         else:
    556             tar.close()
    557 
    558     def _test_modes(self, testfunc):
    559         testfunc(tarname, "r")
    560         testfunc(tarname, "r:")
    561         testfunc(tarname, "r:*")
    562         testfunc(tarname, "r|")
    563         testfunc(tarname, "r|*")
    564 
    565         if gzip:
    566             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
    567             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
    568             self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
    569             self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
    570 
    571             testfunc(gzipname, "r")
    572             testfunc(gzipname, "r:*")
    573             testfunc(gzipname, "r:gz")
    574             testfunc(gzipname, "r|*")
    575             testfunc(gzipname, "r|gz")
    576 
    577         if bz2:
    578             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
    579             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
    580             self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
    581             self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
    582 
    583             testfunc(bz2name, "r")
    584             testfunc(bz2name, "r:*")
    585             testfunc(bz2name, "r:bz2")
    586             testfunc(bz2name, "r|*")
    587             testfunc(bz2name, "r|bz2")
    588 
    589     def test_detect_file(self):
    590         self._test_modes(self._testfunc_file)
    591 
    592     def test_detect_fileobj(self):
    593         self._test_modes(self._testfunc_fileobj)
    594 
    595     @unittest.skipUnless(bz2, 'requires bz2')
    596     def test_detect_stream_bz2(self):
    597         # Originally, tarfile's stream detection looked for the string
    598         # "BZh91" at the start of the file. This is incorrect because
    599         # the '9' represents the blocksize (900kB). If the file was
    600         # compressed using another blocksize autodetection fails.
    601         with open(tarname, "rb") as fobj:
    602             data = fobj.read()
    603 
    604         # Compress with blocksize 100kB, the file starts with "BZh11".
    605         with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
    606             fobj.write(data)
    607 
    608         self._testfunc_file(tmpname, "r|*")
    609 
    610 
    611 class MemberReadTest(ReadTest):
    612 
    613     def _test_member(self, tarinfo, chksum=None, **kwargs):
    614         if chksum is not None:
    615             self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
    616                     "wrong md5sum for %s" % tarinfo.name)
    617 
    618         kwargs["mtime"] = 07606136617
    619         kwargs["uid"] = 1000
    620         kwargs["gid"] = 100
    621         if "old-v7" not in tarinfo.name:
    622             # V7 tar can't handle alphabetic owners.
    623             kwargs["uname"] = "tarfile"
    624             kwargs["gname"] = "tarfile"
    625         for k, v in kwargs.iteritems():
    626             self.assertTrue(getattr(tarinfo, k) == v,
    627                     "wrong value in %s field of %s" % (k, tarinfo.name))
    628 
    629     def test_find_regtype(self):
    630         tarinfo = self.tar.getmember("ustar/regtype")
    631         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    632 
    633     def test_find_conttype(self):
    634         tarinfo = self.tar.getmember("ustar/conttype")
    635         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    636 
    637     def test_find_dirtype(self):
    638         tarinfo = self.tar.getmember("ustar/dirtype")
    639         self._test_member(tarinfo, size=0)
    640 
    641     def test_find_dirtype_with_size(self):
    642         tarinfo = self.tar.getmember("ustar/dirtype-with-size")
    643         self._test_member(tarinfo, size=255)
    644 
    645     def test_find_lnktype(self):
    646         tarinfo = self.tar.getmember("ustar/lnktype")
    647         self._test_member(tarinfo, size=0, linkname="ustar/regtype")
    648 
    649     def test_find_symtype(self):
    650         tarinfo = self.tar.getmember("ustar/symtype")
    651         self._test_member(tarinfo, size=0, linkname="regtype")
    652 
    653     def test_find_blktype(self):
    654         tarinfo = self.tar.getmember("ustar/blktype")
    655         self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
    656 
    657     def test_find_chrtype(self):
    658         tarinfo = self.tar.getmember("ustar/chrtype")
    659         self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
    660 
    661     def test_find_fifotype(self):
    662         tarinfo = self.tar.getmember("ustar/fifotype")
    663         self._test_member(tarinfo, size=0)
    664 
    665     def test_find_sparse(self):
    666         tarinfo = self.tar.getmember("ustar/sparse")
    667         self._test_member(tarinfo, size=86016, chksum=md5_sparse)
    668 
    669     def test_find_umlauts(self):
    670         tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    671         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    672 
    673     def test_find_ustar_longname(self):
    674         name = "ustar/" + "12345/" * 39 + "1234567/longname"
    675         self.assertIn(name, self.tar.getnames())
    676 
    677     def test_find_regtype_oldv7(self):
    678         tarinfo = self.tar.getmember("misc/regtype-old-v7")
    679         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    680 
    681     def test_find_pax_umlauts(self):
    682         self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
    683         tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    684         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    685 
    686 
    687 class LongnameTest(ReadTest):
    688 
    689     def test_read_longname(self):
    690         # Test reading of longname (bug #1471427).
    691         longname = self.subdir + "/" + "123/" * 125 + "longname"
    692         try:
    693             tarinfo = self.tar.getmember(longname)
    694         except KeyError:
    695             self.fail("longname not found")
    696         self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
    697 
    698     def test_read_longlink(self):
    699         longname = self.subdir + "/" + "123/" * 125 + "longname"
    700         longlink = self.subdir + "/" + "123/" * 125 + "longlink"
    701         try:
    702             tarinfo = self.tar.getmember(longlink)
    703         except KeyError:
    704             self.fail("longlink not found")
    705         self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
    706 
    707     def test_truncated_longname(self):
    708         longname = self.subdir + "/" + "123/" * 125 + "longname"
    709         tarinfo = self.tar.getmember(longname)
    710         offset = tarinfo.offset
    711         self.tar.fileobj.seek(offset)
    712         fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
    713         self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
    714 
    715     def test_header_offset(self):
    716         # Test if the start offset of the TarInfo object includes
    717         # the preceding extended header.
    718         longname = self.subdir + "/" + "123/" * 125 + "longname"
    719         offset = self.tar.getmember(longname).offset
    720         fobj = open(tarname)
    721         fobj.seek(offset)
    722         tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
    723         self.assertEqual(tarinfo.type, self.longnametype)
    724 
    725 
    726 class GNUReadTest(LongnameTest):
    727 
    728     subdir = "gnu"
    729     longnametype = tarfile.GNUTYPE_LONGNAME
    730 
    731     def test_sparse_file(self):
    732         tarinfo1 = self.tar.getmember("ustar/sparse")
    733         fobj1 = self.tar.extractfile(tarinfo1)
    734         tarinfo2 = self.tar.getmember("gnu/sparse")
    735         fobj2 = self.tar.extractfile(tarinfo2)
    736         self.assertTrue(fobj1.read() == fobj2.read(),
    737                 "sparse file extraction failed")
    738 
    739 
    740 class PaxReadTest(LongnameTest):
    741 
    742     subdir = "pax"
    743     longnametype = tarfile.XHDTYPE
    744 
    745     def test_pax_global_headers(self):
    746         tar = tarfile.open(tarname, encoding="iso8859-1")
    747         try:
    748 
    749             tarinfo = tar.getmember("pax/regtype1")
    750             self.assertEqual(tarinfo.uname, "foo")
    751             self.assertEqual(tarinfo.gname, "bar")
    752             self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    753 
    754             tarinfo = tar.getmember("pax/regtype2")
    755             self.assertEqual(tarinfo.uname, "")
    756             self.assertEqual(tarinfo.gname, "bar")
    757             self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    758 
    759             tarinfo = tar.getmember("pax/regtype3")
    760             self.assertEqual(tarinfo.uname, "tarfile")
    761             self.assertEqual(tarinfo.gname, "tarfile")
    762             self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    763         finally:
    764             tar.close()
    765 
    766     def test_pax_number_fields(self):
    767         # All following number fields are read from the pax header.
    768         tar = tarfile.open(tarname, encoding="iso8859-1")
    769         try:
    770             tarinfo = tar.getmember("pax/regtype4")
    771             self.assertEqual(tarinfo.size, 7011)
    772             self.assertEqual(tarinfo.uid, 123)
    773             self.assertEqual(tarinfo.gid, 123)
    774             self.assertEqual(tarinfo.mtime, 1041808783.0)
    775             self.assertEqual(type(tarinfo.mtime), float)
    776             self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
    777             self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
    778         finally:
    779             tar.close()
    780 
    781 
    782 class WriteTestBase(unittest.TestCase):
    783     # Put all write tests in here that are supposed to be tested
    784     # in all possible mode combinations.
    785 
    786     def test_fileobj_no_close(self):
    787         fobj = StringIO.StringIO()
    788         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    789         tar.addfile(tarfile.TarInfo("foo"))
    790         tar.close()
    791         self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
    792         # Issue #20238: Incomplete gzip output with mode="w:gz"
    793         data = fobj.getvalue()
    794         del tar
    795         test_support.gc_collect()
    796         self.assertFalse(fobj.closed)
    797         self.assertEqual(data, fobj.getvalue())
    798 
    799 
    800 class WriteTest(WriteTestBase):
    801 
    802     mode = "w:"
    803 
    804     def test_100_char_name(self):
    805         # The name field in a tar header stores strings of at most 100 chars.
    806         # If a string is shorter than 100 chars it has to be padded with '\0',
    807         # which implies that a string of exactly 100 chars is stored without
    808         # a trailing '\0'.
    809         name = "0123456789" * 10
    810         tar = tarfile.open(tmpname, self.mode)
    811         try:
    812             t = tarfile.TarInfo(name)
    813             tar.addfile(t)
    814         finally:
    815             tar.close()
    816 
    817         tar = tarfile.open(tmpname)
    818         try:
    819             self.assertTrue(tar.getnames()[0] == name,
    820                     "failed to store 100 char filename")
    821         finally:
    822             tar.close()
    823 
    824     def test_tar_size(self):
    825         # Test for bug #1013882.
    826         tar = tarfile.open(tmpname, self.mode)
    827         try:
    828             path = os.path.join(TEMPDIR, "file")
    829             with open(path, "wb") as fobj:
    830                 fobj.write("aaa")
    831             tar.add(path)
    832         finally:
    833             tar.close()
    834         self.assertTrue(os.path.getsize(tmpname) > 0,
    835                 "tarfile is empty")
    836 
    837     # The test_*_size tests test for bug #1167128.
    838     def test_file_size(self):
    839         tar = tarfile.open(tmpname, self.mode)
    840         try:
    841 
    842             path = os.path.join(TEMPDIR, "file")
    843             with open(path, "wb"):
    844                 pass
    845             tarinfo = tar.gettarinfo(path)
    846             self.assertEqual(tarinfo.size, 0)
    847 
    848             with open(path, "wb") as fobj:
    849                 fobj.write("aaa")
    850             tarinfo = tar.gettarinfo(path)
    851             self.assertEqual(tarinfo.size, 3)
    852         finally:
    853             tar.close()
    854 
    855     def test_directory_size(self):
    856         path = os.path.join(TEMPDIR, "directory")
    857         os.mkdir(path)
    858         try:
    859             tar = tarfile.open(tmpname, self.mode)
    860             try:
    861                 tarinfo = tar.gettarinfo(path)
    862                 self.assertEqual(tarinfo.size, 0)
    863             finally:
    864                 tar.close()
    865         finally:
    866             os.rmdir(path)
    867 
    868     def test_link_size(self):
    869         if hasattr(os, "link"):
    870             link = os.path.join(TEMPDIR, "link")
    871             target = os.path.join(TEMPDIR, "link_target")
    872             with open(target, "wb") as fobj:
    873                 fobj.write("aaa")
    874             os.link(target, link)
    875             try:
    876                 tar = tarfile.open(tmpname, self.mode)
    877                 try:
    878                     # Record the link target in the inodes list.
    879                     tar.gettarinfo(target)
    880                     tarinfo = tar.gettarinfo(link)
    881                     self.assertEqual(tarinfo.size, 0)
    882                 finally:
    883                     tar.close()
    884             finally:
    885                 os.remove(target)
    886                 os.remove(link)
    887 
    888     def test_symlink_size(self):
    889         if hasattr(os, "symlink"):
    890             path = os.path.join(TEMPDIR, "symlink")
    891             os.symlink("link_target", path)
    892             try:
    893                 tar = tarfile.open(tmpname, self.mode)
    894                 try:
    895                     tarinfo = tar.gettarinfo(path)
    896                     self.assertEqual(tarinfo.size, 0)
    897                 finally:
    898                     tar.close()
    899             finally:
    900                 os.remove(path)
    901 
    902     def test_add_self(self):
    903         # Test for #1257255.
    904         dstname = os.path.abspath(tmpname)
    905         tar = tarfile.open(tmpname, self.mode)
    906         try:
    907             self.assertTrue(tar.name == dstname, "archive name must be absolute")
    908             tar.add(dstname)
    909             self.assertTrue(tar.getnames() == [], "added the archive to itself")
    910 
    911             cwd = os.getcwd()
    912             os.chdir(TEMPDIR)
    913             tar.add(dstname)
    914             os.chdir(cwd)
    915             self.assertTrue(tar.getnames() == [], "added the archive to itself")
    916         finally:
    917             tar.close()
    918 
    919     def test_exclude(self):
    920         tempdir = os.path.join(TEMPDIR, "exclude")
    921         os.mkdir(tempdir)
    922         try:
    923             for name in ("foo", "bar", "baz"):
    924                 name = os.path.join(tempdir, name)
    925                 open(name, "wb").close()
    926 
    927             exclude = os.path.isfile
    928 
    929             tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
    930             try:
    931                 with test_support.check_warnings(("use the filter argument",
    932                                                 DeprecationWarning)):
    933                     tar.add(tempdir, arcname="empty_dir", exclude=exclude)
    934             finally:
    935                 tar.close()
    936 
    937             tar = tarfile.open(tmpname, "r")
    938             try:
    939                 self.assertEqual(len(tar.getmembers()), 1)
    940                 self.assertEqual(tar.getnames()[0], "empty_dir")
    941             finally:
    942                 tar.close()
    943         finally:
    944             shutil.rmtree(tempdir)
    945 
    946     def test_filter(self):
    947         tempdir = os.path.join(TEMPDIR, "filter")
    948         os.mkdir(tempdir)
    949         try:
    950             for name in ("foo", "bar", "baz"):
    951                 name = os.path.join(tempdir, name)
    952                 open(name, "wb").close()
    953 
    954             def filter(tarinfo):
    955                 if os.path.basename(tarinfo.name) == "bar":
    956                     return
    957                 tarinfo.uid = 123
    958                 tarinfo.uname = "foo"
    959                 return tarinfo
    960 
    961             tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
    962             try:
    963                 tar.add(tempdir, arcname="empty_dir", filter=filter)
    964             finally:
    965                 tar.close()
    966 
    967             tar = tarfile.open(tmpname, "r")
    968             try:
    969                 for tarinfo in tar:
    970                     self.assertEqual(tarinfo.uid, 123)
    971                     self.assertEqual(tarinfo.uname, "foo")
    972                 self.assertEqual(len(tar.getmembers()), 3)
    973             finally:
    974                 tar.close()
    975         finally:
    976             shutil.rmtree(tempdir)
    977 
    978     # Guarantee that stored pathnames are not modified. Don't
    979     # remove ./ or ../ or double slashes. Still make absolute
    980     # pathnames relative.
    981     # For details see bug #6054.
    982     def _test_pathname(self, path, cmp_path=None, dir=False):
    983         # Create a tarfile with an empty member named path
    984         # and compare the stored name with the original.
    985         foo = os.path.join(TEMPDIR, "foo")
    986         if not dir:
    987             open(foo, "w").close()
    988         else:
    989             os.mkdir(foo)
    990 
    991         tar = tarfile.open(tmpname, self.mode)
    992         try:
    993             tar.add(foo, arcname=path)
    994         finally:
    995             tar.close()
    996 
    997         tar = tarfile.open(tmpname, "r")
    998         try:
    999             t = tar.next()
   1000         finally:
   1001             tar.close()
   1002 
   1003         if not dir:
   1004             os.remove(foo)
   1005         else:
   1006             os.rmdir(foo)
   1007 
   1008         self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
   1009 
   1010     def test_pathnames(self):
   1011         self._test_pathname("foo")
   1012         self._test_pathname(os.path.join("foo", ".", "bar"))
   1013         self._test_pathname(os.path.join("foo", "..", "bar"))
   1014         self._test_pathname(os.path.join(".", "foo"))
   1015         self._test_pathname(os.path.join(".", "foo", "."))
   1016         self._test_pathname(os.path.join(".", "foo", ".", "bar"))
   1017         self._test_pathname(os.path.join(".", "foo", "..", "bar"))
   1018         self._test_pathname(os.path.join(".", "foo", "..", "bar"))
   1019         self._test_pathname(os.path.join("..", "foo"))
   1020         self._test_pathname(os.path.join("..", "foo", ".."))
   1021         self._test_pathname(os.path.join("..", "foo", ".", "bar"))
   1022         self._test_pathname(os.path.join("..", "foo", "..", "bar"))
   1023 
   1024         self._test_pathname("foo" + os.sep + os.sep + "bar")
   1025         self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
   1026 
   1027     def test_abs_pathnames(self):
   1028         if sys.platform == "win32":
   1029             self._test_pathname("C:\\foo", "foo")
   1030         else:
   1031             self._test_pathname("/foo", "foo")
   1032             self._test_pathname("///foo", "foo")
   1033 
   1034     def test_cwd(self):
   1035         # Test adding the current working directory.
   1036         with support.change_cwd(TEMPDIR):
   1037             tar = tarfile.open(tmpname, self.mode)
   1038             try:
   1039                 tar.add(".")
   1040             finally:
   1041                 tar.close()
   1042 
   1043             tar = tarfile.open(tmpname, "r")
   1044             try:
   1045                 for t in tar:
   1046                     self.assertTrue(t.name == "." or t.name.startswith("./"))
   1047             finally:
   1048                 tar.close()
   1049 
   1050     @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
   1051     def test_extractall_symlinks(self):
   1052         # Test if extractall works properly when tarfile contains symlinks
   1053         tempdir = os.path.join(TEMPDIR, "testsymlinks")
   1054         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
   1055         os.mkdir(tempdir)
   1056         try:
   1057             source_file = os.path.join(tempdir,'source')
   1058             target_file = os.path.join(tempdir,'symlink')
   1059             with open(source_file,'w') as f:
   1060                 f.write('something\n')
   1061             os.symlink(source_file, target_file)
   1062             tar = tarfile.open(temparchive,'w')
   1063             tar.add(source_file, arcname=os.path.basename(source_file))
   1064             tar.add(target_file, arcname=os.path.basename(target_file))
   1065             tar.close()
   1066             # Let's extract it to the location which contains the symlink
   1067             tar = tarfile.open(temparchive,'r')
   1068             # this should not raise OSError: [Errno 17] File exists
   1069             try:
   1070                 tar.extractall(path=tempdir)
   1071             except OSError:
   1072                 self.fail("extractall failed with symlinked files")
   1073             finally:
   1074                 tar.close()
   1075         finally:
   1076             os.unlink(temparchive)
   1077             shutil.rmtree(tempdir)
   1078 
   1079     @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
   1080     def test_extractall_broken_symlinks(self):
   1081         # Test if extractall works properly when tarfile contains broken
   1082         # symlinks
   1083         tempdir = os.path.join(TEMPDIR, "testsymlinks")
   1084         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
   1085         os.mkdir(tempdir)
   1086         try:
   1087             source_file = os.path.join(tempdir,'source')
   1088             target_file = os.path.join(tempdir,'symlink')
   1089             with open(source_file,'w') as f:
   1090                 f.write('something\n')
   1091             os.symlink(source_file, target_file)
   1092             tar = tarfile.open(temparchive,'w')
   1093             tar.add(target_file, arcname=os.path.basename(target_file))
   1094             tar.close()
   1095             # remove the real file
   1096             os.unlink(source_file)
   1097             # Let's extract it to the location which contains the symlink
   1098             tar = tarfile.open(temparchive,'r')
   1099             # this should not raise OSError: [Errno 17] File exists
   1100             try:
   1101                 tar.extractall(path=tempdir)
   1102             except OSError:
   1103                 self.fail("extractall failed with broken symlinked files")
   1104             finally:
   1105                 tar.close()
   1106         finally:
   1107             os.unlink(temparchive)
   1108             shutil.rmtree(tempdir)
   1109 
   1110     @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
   1111     def test_extractall_hardlinks(self):
   1112         # Test if extractall works properly when tarfile contains symlinks
   1113         tempdir = os.path.join(TEMPDIR, "testsymlinks")
   1114         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
   1115         os.mkdir(tempdir)
   1116         try:
   1117             source_file = os.path.join(tempdir,'source')
   1118             target_file = os.path.join(tempdir,'symlink')
   1119             with open(source_file,'w') as f:
   1120                 f.write('something\n')
   1121             os.link(source_file, target_file)
   1122             tar = tarfile.open(temparchive,'w')
   1123             tar.add(source_file, arcname=os.path.basename(source_file))
   1124             tar.add(target_file, arcname=os.path.basename(target_file))
   1125             tar.close()
   1126             # Let's extract it to the location which contains the symlink
   1127             tar = tarfile.open(temparchive,'r')
   1128             # this should not raise OSError: [Errno 17] File exists
   1129             try:
   1130                 tar.extractall(path=tempdir)
   1131             except OSError:
   1132                 self.fail("extractall failed with linked files")
   1133             finally:
   1134                 tar.close()
   1135         finally:
   1136             os.unlink(temparchive)
   1137             shutil.rmtree(tempdir)
   1138 
   1139     def test_open_nonwritable_fileobj(self):
   1140         for exctype in IOError, EOFError, RuntimeError:
   1141             class BadFile(StringIO.StringIO):
   1142                 first = True
   1143                 def write(self, data):
   1144                     if self.first:
   1145                         self.first = False
   1146                         raise exctype
   1147 
   1148             f = BadFile()
   1149             with self.assertRaises(exctype):
   1150                 tar = tarfile.open(tmpname, self.mode, fileobj=f,
   1151                                    format=tarfile.PAX_FORMAT,
   1152                                    pax_headers={'non': 'empty'})
   1153             self.assertFalse(f.closed)
   1154 
   1155 class StreamWriteTest(WriteTestBase):
   1156 
   1157     mode = "w|"
   1158 
   1159     def test_stream_padding(self):
   1160         # Test for bug #1543303.
   1161         tar = tarfile.open(tmpname, self.mode)
   1162         tar.close()
   1163 
   1164         if self.mode.endswith("gz"):
   1165             with gzip.GzipFile(tmpname) as fobj:
   1166                 data = fobj.read()
   1167         elif self.mode.endswith("bz2"):
   1168             dec = bz2.BZ2Decompressor()
   1169             with open(tmpname, "rb") as fobj:
   1170                 data = fobj.read()
   1171             data = dec.decompress(data)
   1172             self.assertTrue(len(dec.unused_data) == 0,
   1173                     "found trailing data")
   1174         else:
   1175             with open(tmpname, "rb") as fobj:
   1176                 data = fobj.read()
   1177 
   1178         self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
   1179                          "incorrect zero padding")
   1180 
   1181     @unittest.skipIf(sys.platform == 'win32', 'not appropriate for Windows')
   1182     @unittest.skipUnless(hasattr(os, 'umask'), 'requires os.umask')
   1183     def test_file_mode(self):
   1184         # Test for issue #8464: Create files with correct
   1185         # permissions.
   1186         if os.path.exists(tmpname):
   1187             os.remove(tmpname)
   1188 
   1189         original_umask = os.umask(0022)
   1190         try:
   1191             tar = tarfile.open(tmpname, self.mode)
   1192             tar.close()
   1193             mode = os.stat(tmpname).st_mode & 0777
   1194             self.assertEqual(mode, 0644, "wrong file permissions")
   1195         finally:
   1196             os.umask(original_umask)
   1197 
   1198     def test_issue13639(self):
   1199         try:
   1200             with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode):
   1201                 pass
   1202         except UnicodeDecodeError:
   1203             self.fail("_Stream failed to write unicode filename")
   1204 
   1205 
   1206 class GNUWriteTest(unittest.TestCase):
   1207     # This testcase checks for correct creation of GNU Longname
   1208     # and Longlink extended headers (cp. bug #812325).
   1209 
   1210     def _length(self, s):
   1211         blocks, remainder = divmod(len(s) + 1, 512)
   1212         if remainder:
   1213             blocks += 1
   1214         return blocks * 512
   1215 
   1216     def _calc_size(self, name, link=None):
   1217         # Initial tar header
   1218         count = 512
   1219 
   1220         if len(name) > tarfile.LENGTH_NAME:
   1221             # GNU longname extended header + longname
   1222             count += 512
   1223             count += self._length(name)
   1224         if link is not None and len(link) > tarfile.LENGTH_LINK:
   1225             # GNU longlink extended header + longlink
   1226             count += 512
   1227             count += self._length(link)
   1228         return count
   1229 
   1230     def _test(self, name, link=None):
   1231         tarinfo = tarfile.TarInfo(name)
   1232         if link:
   1233             tarinfo.linkname = link
   1234             tarinfo.type = tarfile.LNKTYPE
   1235 
   1236         tar = tarfile.open(tmpname, "w")
   1237         try:
   1238             tar.format = tarfile.GNU_FORMAT
   1239             tar.addfile(tarinfo)
   1240 
   1241             v1 = self._calc_size(name, link)
   1242             v2 = tar.offset
   1243             self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
   1244         finally:
   1245             tar.close()
   1246 
   1247         tar = tarfile.open(tmpname)
   1248         try:
   1249             member = tar.next()
   1250             self.assertIsNotNone(member,
   1251                     "unable to read longname member")
   1252             self.assertEqual(tarinfo.name, member.name,
   1253                     "unable to read longname member")
   1254             self.assertEqual(tarinfo.linkname, member.linkname,
   1255                     "unable to read longname member")
   1256         finally:
   1257             tar.close()
   1258 
   1259     def test_longname_1023(self):
   1260         self._test(("longnam/" * 127) + "longnam")
   1261 
   1262     def test_longname_1024(self):
   1263         self._test(("longnam/" * 127) + "longname")
   1264 
   1265     def test_longname_1025(self):
   1266         self._test(("longnam/" * 127) + "longname_")
   1267 
   1268     def test_longlink_1023(self):
   1269         self._test("name", ("longlnk/" * 127) + "longlnk")
   1270 
   1271     def test_longlink_1024(self):
   1272         self._test("name", ("longlnk/" * 127) + "longlink")
   1273 
   1274     def test_longlink_1025(self):
   1275         self._test("name", ("longlnk/" * 127) + "longlink_")
   1276 
   1277     def test_longnamelink_1023(self):
   1278         self._test(("longnam/" * 127) + "longnam",
   1279                    ("longlnk/" * 127) + "longlnk")
   1280 
   1281     def test_longnamelink_1024(self):
   1282         self._test(("longnam/" * 127) + "longname",
   1283                    ("longlnk/" * 127) + "longlink")
   1284 
   1285     def test_longnamelink_1025(self):
   1286         self._test(("longnam/" * 127) + "longname_",
   1287                    ("longlnk/" * 127) + "longlink_")
   1288 
   1289 
   1290 class HardlinkTest(unittest.TestCase):
   1291     # Test the creation of LNKTYPE (hardlink) members in an archive.
   1292 
   1293     def setUp(self):
   1294         self.foo = os.path.join(TEMPDIR, "foo")
   1295         self.bar = os.path.join(TEMPDIR, "bar")
   1296 
   1297         with open(self.foo, "wb") as fobj:
   1298             fobj.write("foo")
   1299 
   1300         os.link(self.foo, self.bar)
   1301 
   1302         self.tar = tarfile.open(tmpname, "w")
   1303         self.tar.add(self.foo)
   1304 
   1305     def tearDown(self):
   1306         self.tar.close()
   1307         support.unlink(self.foo)
   1308         support.unlink(self.bar)
   1309 
   1310     def test_add_twice(self):
   1311         # The same name will be added as a REGTYPE every
   1312         # time regardless of st_nlink.
   1313         tarinfo = self.tar.gettarinfo(self.foo)
   1314         self.assertTrue(tarinfo.type == tarfile.REGTYPE,
   1315                 "add file as regular failed")
   1316 
   1317     def test_add_hardlink(self):
   1318         tarinfo = self.tar.gettarinfo(self.bar)
   1319         self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
   1320                 "add file as hardlink failed")
   1321 
   1322     def test_dereference_hardlink(self):
   1323         self.tar.dereference = True
   1324         tarinfo = self.tar.gettarinfo(self.bar)
   1325         self.assertTrue(tarinfo.type == tarfile.REGTYPE,
   1326                 "dereferencing hardlink failed")
   1327 
   1328 
   1329 class PaxWriteTest(GNUWriteTest):
   1330 
   1331     def _test(self, name, link=None):
   1332         # See GNUWriteTest.
   1333         tarinfo = tarfile.TarInfo(name)
   1334         if link:
   1335             tarinfo.linkname = link
   1336             tarinfo.type = tarfile.LNKTYPE
   1337 
   1338         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
   1339         try:
   1340             tar.addfile(tarinfo)
   1341         finally:
   1342             tar.close()
   1343 
   1344         tar = tarfile.open(tmpname)
   1345         try:
   1346             if link:
   1347                 l = tar.getmembers()[0].linkname
   1348                 self.assertTrue(link == l, "PAX longlink creation failed")
   1349             else:
   1350                 n = tar.getmembers()[0].name
   1351                 self.assertTrue(name == n, "PAX longname creation failed")
   1352         finally:
   1353             tar.close()
   1354 
   1355     def test_pax_global_header(self):
   1356         pax_headers = {
   1357                 u"foo": u"bar",
   1358                 u"uid": u"0",
   1359                 u"mtime": u"1.23",
   1360                 u"test": u"\xe4\xf6\xfc",
   1361                 u"\xe4\xf6\xfc": u"test"}
   1362 
   1363         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
   1364                 pax_headers=pax_headers)
   1365         try:
   1366             tar.addfile(tarfile.TarInfo("test"))
   1367         finally:
   1368             tar.close()
   1369 
   1370         # Test if the global header was written correctly.
   1371         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1372         try:
   1373             self.assertEqual(tar.pax_headers, pax_headers)
   1374             self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
   1375 
   1376             # Test if all the fields are unicode.
   1377             for key, val in tar.pax_headers.iteritems():
   1378                 self.assertTrue(type(key) is unicode)
   1379                 self.assertTrue(type(val) is unicode)
   1380                 if key in tarfile.PAX_NUMBER_FIELDS:
   1381                     try:
   1382                         tarfile.PAX_NUMBER_FIELDS[key](val)
   1383                     except (TypeError, ValueError):
   1384                         self.fail("unable to convert pax header field")
   1385         finally:
   1386             tar.close()
   1387 
   1388     def test_pax_extended_header(self):
   1389         # The fields from the pax header have priority over the
   1390         # TarInfo.
   1391         pax_headers = {u"path": u"foo", u"uid": u"123"}
   1392 
   1393         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
   1394         try:
   1395             t = tarfile.TarInfo()
   1396             t.name = u"\xe4\xf6\xfc"     # non-ASCII
   1397             t.uid = 8**8        # too large
   1398             t.pax_headers = pax_headers
   1399             tar.addfile(t)
   1400         finally:
   1401             tar.close()
   1402 
   1403         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1404         try:
   1405             t = tar.getmembers()[0]
   1406             self.assertEqual(t.pax_headers, pax_headers)
   1407             self.assertEqual(t.name, "foo")
   1408             self.assertEqual(t.uid, 123)
   1409         finally:
   1410             tar.close()
   1411 
   1412 
   1413 class UstarUnicodeTest(unittest.TestCase):
   1414     # All *UnicodeTests FIXME
   1415 
   1416     format = tarfile.USTAR_FORMAT
   1417 
   1418     def test_iso8859_1_filename(self):
   1419         self._test_unicode_filename("iso8859-1")
   1420 
   1421     def test_utf7_filename(self):
   1422         self._test_unicode_filename("utf7")
   1423 
   1424     def test_utf8_filename(self):
   1425         self._test_unicode_filename("utf8")
   1426 
   1427     def _test_unicode_filename(self, encoding):
   1428         tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
   1429         try:
   1430             name = u"\xe4\xf6\xfc"
   1431             tar.addfile(tarfile.TarInfo(name))
   1432         finally:
   1433             tar.close()
   1434 
   1435         tar = tarfile.open(tmpname, encoding=encoding)
   1436         try:
   1437             self.assertTrue(type(tar.getnames()[0]) is not unicode)
   1438             self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
   1439         finally:
   1440             tar.close()
   1441 
   1442     def test_unicode_filename_error(self):
   1443         tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
   1444         try:
   1445             tarinfo = tarfile.TarInfo()
   1446 
   1447             tarinfo.name = "\xe4\xf6\xfc"
   1448             if self.format == tarfile.PAX_FORMAT:
   1449                 self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1450             else:
   1451                 tar.addfile(tarinfo)
   1452 
   1453             tarinfo.name = u"\xe4\xf6\xfc"
   1454             self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1455 
   1456             tarinfo.name = "foo"
   1457             tarinfo.uname = u"\xe4\xf6\xfc"
   1458             self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1459         finally:
   1460             tar.close()
   1461 
   1462     def test_unicode_argument(self):
   1463         tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
   1464         try:
   1465             for t in tar:
   1466                 self.assertTrue(type(t.name) is str)
   1467                 self.assertTrue(type(t.linkname) is str)
   1468                 self.assertTrue(type(t.uname) is str)
   1469                 self.assertTrue(type(t.gname) is str)
   1470         finally:
   1471             tar.close()
   1472 
   1473     def test_uname_unicode(self):
   1474         for name in (u"\xe4\xf6\xfc", "\xe4\xf6\xfc"):
   1475             t = tarfile.TarInfo("foo")
   1476             t.uname = name
   1477             t.gname = name
   1478 
   1479             fobj = StringIO.StringIO()
   1480             tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
   1481             try:
   1482                 tar.addfile(t)
   1483             finally:
   1484                 tar.close()
   1485             fobj.seek(0)
   1486 
   1487             tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
   1488             t = tar.getmember("foo")
   1489             self.assertEqual(t.uname, "\xe4\xf6\xfc")
   1490             self.assertEqual(t.gname, "\xe4\xf6\xfc")
   1491 
   1492 
   1493 class GNUUnicodeTest(UstarUnicodeTest):
   1494 
   1495     format = tarfile.GNU_FORMAT
   1496 
   1497 
   1498 class PaxUnicodeTest(UstarUnicodeTest):
   1499 
   1500     format = tarfile.PAX_FORMAT
   1501 
   1502     def _create_unicode_name(self, name):
   1503         tar = tarfile.open(tmpname, "w", format=self.format)
   1504         t = tarfile.TarInfo()
   1505         t.pax_headers["path"] = name
   1506         tar.addfile(t)
   1507         tar.close()
   1508 
   1509     def test_error_handlers(self):
   1510         # Test if the unicode error handlers work correctly for characters
   1511         # that cannot be expressed in a given encoding.
   1512         self._create_unicode_name(u"\xe4\xf6\xfc")
   1513 
   1514         for handler, name in (("utf-8", u"\xe4\xf6\xfc".encode("utf8")),
   1515                     ("replace", "???"), ("ignore", "")):
   1516             tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
   1517                     errors=handler)
   1518             self.assertEqual(tar.getnames()[0], name)
   1519 
   1520         self.assertRaises(UnicodeError, tarfile.open, tmpname,
   1521                 encoding="ascii", errors="strict")
   1522 
   1523     def test_error_handler_utf8(self):
   1524         # Create a pathname that has one component representable using
   1525         # iso8859-1 and the other only in iso8859-15.
   1526         self._create_unicode_name(u"\xe4\xf6\xfc/\u20ac")
   1527 
   1528         tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
   1529                 errors="utf-8")
   1530         self.assertEqual(tar.getnames()[0], "\xe4\xf6\xfc/" + u"\u20ac".encode("utf8"))
   1531 
   1532 
   1533 class AppendTest(unittest.TestCase):
   1534     # Test append mode (cp. patch #1652681).
   1535 
   1536     def setUp(self):
   1537         self.tarname = tmpname
   1538         if os.path.exists(self.tarname):
   1539             os.remove(self.tarname)
   1540 
   1541     def _add_testfile(self, fileobj=None):
   1542         with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
   1543             tar.addfile(tarfile.TarInfo("bar"))
   1544 
   1545     def _create_testtar(self, mode="w:"):
   1546         with tarfile.open(tarname, encoding="iso8859-1") as src:
   1547             t = src.getmember("ustar/regtype")
   1548             t.name = "foo"
   1549             f = src.extractfile(t)
   1550             with tarfile.open(self.tarname, mode) as tar:
   1551                 tar.addfile(t, f)
   1552 
   1553     def _test(self, names=["bar"], fileobj=None):
   1554         with tarfile.open(self.tarname, fileobj=fileobj) as tar:
   1555             self.assertEqual(tar.getnames(), names)
   1556 
   1557     def test_non_existing(self):
   1558         self._add_testfile()
   1559         self._test()
   1560 
   1561     def test_empty(self):
   1562         tarfile.open(self.tarname, "w:").close()
   1563         self._add_testfile()
   1564         self._test()
   1565 
   1566     def test_empty_fileobj(self):
   1567         fobj = StringIO.StringIO("\0" * 1024)
   1568         self._add_testfile(fobj)
   1569         fobj.seek(0)
   1570         self._test(fileobj=fobj)
   1571 
   1572     def test_fileobj(self):
   1573         self._create_testtar()
   1574         with open(self.tarname) as fobj:
   1575             data = fobj.read()
   1576         fobj = StringIO.StringIO(data)
   1577         self._add_testfile(fobj)
   1578         fobj.seek(0)
   1579         self._test(names=["foo", "bar"], fileobj=fobj)
   1580 
   1581     def test_existing(self):
   1582         self._create_testtar()
   1583         self._add_testfile()
   1584         self._test(names=["foo", "bar"])
   1585 
   1586     @unittest.skipUnless(gzip, 'requires gzip')
   1587     def test_append_gz(self):
   1588         self._create_testtar("w:gz")
   1589         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
   1590 
   1591     @unittest.skipUnless(bz2, 'requires bz2')
   1592     def test_append_bz2(self):
   1593         self._create_testtar("w:bz2")
   1594         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
   1595 
   1596     # Append mode is supposed to fail if the tarfile to append to
   1597     # does not end with a zero block.
   1598     def _test_error(self, data):
   1599         with open(self.tarname, "wb") as fobj:
   1600             fobj.write(data)
   1601         self.assertRaises(tarfile.ReadError, self._add_testfile)
   1602 
   1603     def test_null(self):
   1604         self._test_error("")
   1605 
   1606     def test_incomplete(self):
   1607         self._test_error("\0" * 13)
   1608 
   1609     def test_premature_eof(self):
   1610         data = tarfile.TarInfo("foo").tobuf()
   1611         self._test_error(data)
   1612 
   1613     def test_trailing_garbage(self):
   1614         data = tarfile.TarInfo("foo").tobuf()
   1615         self._test_error(data + "\0" * 13)
   1616 
   1617     def test_invalid(self):
   1618         self._test_error("a" * 512)
   1619 
   1620 
   1621 class LimitsTest(unittest.TestCase):
   1622 
   1623     def test_ustar_limits(self):
   1624         # 100 char name
   1625         tarinfo = tarfile.TarInfo("0123456789" * 10)
   1626         tarinfo.tobuf(tarfile.USTAR_FORMAT)
   1627 
   1628         # 101 char name that cannot be stored
   1629         tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
   1630         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1631 
   1632         # 256 char name with a slash at pos 156
   1633         tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
   1634         tarinfo.tobuf(tarfile.USTAR_FORMAT)
   1635 
   1636         # 256 char name that cannot be stored
   1637         tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
   1638         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1639 
   1640         # 512 char name
   1641         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1642         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1643 
   1644         # 512 char linkname
   1645         tarinfo = tarfile.TarInfo("longlink")
   1646         tarinfo.linkname = "123/" * 126 + "longname"
   1647         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1648 
   1649         # uid > 8 digits
   1650         tarinfo = tarfile.TarInfo("name")
   1651         tarinfo.uid = 010000000
   1652         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1653 
   1654     def test_gnu_limits(self):
   1655         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1656         tarinfo.tobuf(tarfile.GNU_FORMAT)
   1657 
   1658         tarinfo = tarfile.TarInfo("longlink")
   1659         tarinfo.linkname = "123/" * 126 + "longname"
   1660         tarinfo.tobuf(tarfile.GNU_FORMAT)
   1661 
   1662         # uid >= 256 ** 7
   1663         tarinfo = tarfile.TarInfo("name")
   1664         tarinfo.uid = 04000000000000000000L
   1665         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
   1666 
   1667     def test_pax_limits(self):
   1668         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1669         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1670 
   1671         tarinfo = tarfile.TarInfo("longlink")
   1672         tarinfo.linkname = "123/" * 126 + "longname"
   1673         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1674 
   1675         tarinfo = tarfile.TarInfo("name")
   1676         tarinfo.uid = 04000000000000000000L
   1677         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1678 
   1679 
   1680 class MiscTest(unittest.TestCase):
   1681 
   1682     def test_read_number_fields(self):
   1683         # Issue 24514: Test if empty number fields are converted to zero.
   1684         self.assertEqual(tarfile.nti("\0"), 0)
   1685         self.assertEqual(tarfile.nti("       \0"), 0)
   1686 
   1687 
   1688 class ContextManagerTest(unittest.TestCase):
   1689 
   1690     def test_basic(self):
   1691         with tarfile.open(tarname) as tar:
   1692             self.assertFalse(tar.closed, "closed inside runtime context")
   1693         self.assertTrue(tar.closed, "context manager failed")
   1694 
   1695     def test_closed(self):
   1696         # The __enter__() method is supposed to raise IOError
   1697         # if the TarFile object is already closed.
   1698         tar = tarfile.open(tarname)
   1699         tar.close()
   1700         with self.assertRaises(IOError):
   1701             with tar:
   1702                 pass
   1703 
   1704     def test_exception(self):
   1705         # Test if the IOError exception is passed through properly.
   1706         with self.assertRaises(Exception) as exc:
   1707             with tarfile.open(tarname) as tar:
   1708                 raise IOError
   1709         self.assertIsInstance(exc.exception, IOError,
   1710                               "wrong exception raised in context manager")
   1711         self.assertTrue(tar.closed, "context manager failed")
   1712 
   1713     def test_no_eof(self):
   1714         # __exit__() must not write end-of-archive blocks if an
   1715         # exception was raised.
   1716         try:
   1717             with tarfile.open(tmpname, "w") as tar:
   1718                 raise Exception
   1719         except:
   1720             pass
   1721         self.assertEqual(os.path.getsize(tmpname), 0,
   1722                 "context manager wrote an end-of-archive block")
   1723         self.assertTrue(tar.closed, "context manager failed")
   1724 
   1725     def test_eof(self):
   1726         # __exit__() must write end-of-archive blocks, i.e. call
   1727         # TarFile.close() if there was no error.
   1728         with tarfile.open(tmpname, "w"):
   1729             pass
   1730         self.assertNotEqual(os.path.getsize(tmpname), 0,
   1731                 "context manager wrote no end-of-archive block")
   1732 
   1733     def test_fileobj(self):
   1734         # Test that __exit__() did not close the external file
   1735         # object.
   1736         with open(tmpname, "wb") as fobj:
   1737             try:
   1738                 with tarfile.open(fileobj=fobj, mode="w") as tar:
   1739                     raise Exception
   1740             except:
   1741                 pass
   1742             self.assertFalse(fobj.closed, "external file object was closed")
   1743             self.assertTrue(tar.closed, "context manager failed")
   1744 
   1745 
   1746 class LinkEmulationTest(ReadTest):
   1747 
   1748     # Test for issue #8741 regression. On platforms that do not support
   1749     # symbolic or hard links tarfile tries to extract these types of members as
   1750     # the regular files they point to.
   1751     def _test_link_extraction(self, name):
   1752         self.tar.extract(name, TEMPDIR)
   1753         data = open(os.path.join(TEMPDIR, name), "rb").read()
   1754         self.assertEqual(md5sum(data), md5_regtype)
   1755 
   1756     def test_hardlink_extraction1(self):
   1757         self._test_link_extraction("ustar/lnktype")
   1758 
   1759     def test_hardlink_extraction2(self):
   1760         self._test_link_extraction("./ustar/linktest2/lnktype")
   1761 
   1762     def test_symlink_extraction1(self):
   1763         self._test_link_extraction("ustar/symtype")
   1764 
   1765     def test_symlink_extraction2(self):
   1766         self._test_link_extraction("./ustar/linktest2/symtype")
   1767 
   1768 
   1769 class GzipMiscReadTest(MiscReadTest):
   1770     tarname = gzipname
   1771     mode = "r:gz"
   1772     taropen = tarfile.TarFile.gzopen
   1773 class GzipUstarReadTest(UstarReadTest):
   1774     tarname = gzipname
   1775     mode = "r:gz"
   1776 class GzipStreamReadTest(StreamReadTest):
   1777     tarname = gzipname
   1778     mode = "r|gz"
   1779 class GzipWriteTest(WriteTest):
   1780     mode = "w:gz"
   1781 class GzipStreamWriteTest(StreamWriteTest):
   1782     mode = "w|gz"
   1783 
   1784 
   1785 class Bz2MiscReadTest(MiscReadTest):
   1786     tarname = bz2name
   1787     mode = "r:bz2"
   1788     taropen = tarfile.TarFile.bz2open
   1789 class Bz2UstarReadTest(UstarReadTest):
   1790     tarname = bz2name
   1791     mode = "r:bz2"
   1792 class Bz2StreamReadTest(StreamReadTest):
   1793     tarname = bz2name
   1794     mode = "r|bz2"
   1795 class Bz2WriteTest(WriteTest):
   1796     mode = "w:bz2"
   1797 class Bz2StreamWriteTest(StreamWriteTest):
   1798     mode = "w|bz2"
   1799 
   1800 class Bz2PartialReadTest(unittest.TestCase):
   1801     # Issue5068: The _BZ2Proxy.read() method loops forever
   1802     # on an empty or partial bzipped file.
   1803 
   1804     def _test_partial_input(self, mode):
   1805         class MyStringIO(StringIO.StringIO):
   1806             hit_eof = False
   1807             def read(self, n):
   1808                 if self.hit_eof:
   1809                     raise AssertionError("infinite loop detected in tarfile.open()")
   1810                 self.hit_eof = self.pos == self.len
   1811                 return StringIO.StringIO.read(self, n)
   1812             def seek(self, *args):
   1813                 self.hit_eof = False
   1814                 return StringIO.StringIO.seek(self, *args)
   1815 
   1816         data = bz2.compress(tarfile.TarInfo("foo").tobuf())
   1817         for x in range(len(data) + 1):
   1818             try:
   1819                 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
   1820             except tarfile.ReadError:
   1821                 pass # we have no interest in ReadErrors
   1822 
   1823     def test_partial_input(self):
   1824         self._test_partial_input("r")
   1825 
   1826     def test_partial_input_bz2(self):
   1827         self._test_partial_input("r:bz2")
   1828 
   1829 
   1830 def test_main():
   1831     support.unlink(TEMPDIR)
   1832     os.makedirs(TEMPDIR)
   1833 
   1834     tests = [
   1835         UstarReadTest,
   1836         MiscReadTest,
   1837         StreamReadTest,
   1838         DetectReadTest,
   1839         MemberReadTest,
   1840         GNUReadTest,
   1841         PaxReadTest,
   1842         ListTest,
   1843         WriteTest,
   1844         StreamWriteTest,
   1845         GNUWriteTest,
   1846         PaxWriteTest,
   1847         UstarUnicodeTest,
   1848         GNUUnicodeTest,
   1849         PaxUnicodeTest,
   1850         AppendTest,
   1851         LimitsTest,
   1852         MiscTest,
   1853         ContextManagerTest,
   1854     ]
   1855 
   1856     if hasattr(os, "link"):
   1857         tests.append(HardlinkTest)
   1858     else:
   1859         tests.append(LinkEmulationTest)
   1860 
   1861     with open(tarname, "rb") as fobj:
   1862         data = fobj.read()
   1863 
   1864     if gzip:
   1865         # Create testtar.tar.gz and add gzip-specific tests.
   1866         support.unlink(gzipname)
   1867         with gzip.open(gzipname, "wb") as tar:
   1868             tar.write(data)
   1869 
   1870         tests += [
   1871             GzipMiscReadTest,
   1872             GzipUstarReadTest,
   1873             GzipStreamReadTest,
   1874             GzipListTest,
   1875             GzipWriteTest,
   1876             GzipStreamWriteTest,
   1877         ]
   1878 
   1879     if bz2:
   1880         # Create testtar.tar.bz2 and add bz2-specific tests.
   1881         support.unlink(bz2name)
   1882         tar = bz2.BZ2File(bz2name, "wb")
   1883         try:
   1884             tar.write(data)
   1885         finally:
   1886             tar.close()
   1887 
   1888         tests += [
   1889             Bz2MiscReadTest,
   1890             Bz2UstarReadTest,
   1891             Bz2StreamReadTest,
   1892             Bz2ListTest,
   1893             Bz2WriteTest,
   1894             Bz2StreamWriteTest,
   1895             Bz2PartialReadTest,
   1896         ]
   1897 
   1898     try:
   1899         test_support.run_unittest(*tests)
   1900     finally:
   1901         if os.path.exists(TEMPDIR):
   1902             shutil.rmtree(TEMPDIR)
   1903 
   1904 if __name__ == "__main__":
   1905     test_main()
   1906