Home | History | Annotate | Download | only in test
      1 import sys
      2 import os
      3 import io
      4 from hashlib import md5
      5 from contextlib import contextmanager
      6 from random import Random
      7 
      8 import unittest
      9 import unittest.mock
     10 import tarfile
     11 
     12 from test import support
     13 from test.support import script_helper
     14 
     15 # Check for our compression modules.
     16 try:
     17     import gzip
     18 except ImportError:
     19     gzip = None
     20 try:
     21     import bz2
     22 except ImportError:
     23     bz2 = None
     24 try:
     25     import lzma
     26 except ImportError:
     27     lzma = None
     28 
     29 def md5sum(data):
     30     return md5(data).hexdigest()
     31 
     32 TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
     33 tarextdir = TEMPDIR + '-extract-test'
     34 tarname = support.findfile("testtar.tar")
     35 gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
     36 bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
     37 xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
     38 tmpname = os.path.join(TEMPDIR, "tmp.tar")
     39 dotlessname = os.path.join(TEMPDIR, "testtar")
     40 
     41 md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
     42 md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
     43 
     44 
     45 class TarTest:
     46     tarname = tarname
     47     suffix = ''
     48     open = io.FileIO
     49     taropen = tarfile.TarFile.taropen
     50 
     51     @property
     52     def mode(self):
     53         return self.prefix + self.suffix
     54 
     55 @support.requires_gzip
     56 class GzipTest:
     57     tarname = gzipname
     58     suffix = 'gz'
     59     open = gzip.GzipFile if gzip else None
     60     taropen = tarfile.TarFile.gzopen
     61 
     62 @support.requires_bz2
     63 class Bz2Test:
     64     tarname = bz2name
     65     suffix = 'bz2'
     66     open = bz2.BZ2File if bz2 else None
     67     taropen = tarfile.TarFile.bz2open
     68 
     69 @support.requires_lzma
     70 class LzmaTest:
     71     tarname = xzname
     72     suffix = 'xz'
     73     open = lzma.LZMAFile if lzma else None
     74     taropen = tarfile.TarFile.xzopen
     75 
     76 
     77 class ReadTest(TarTest):
     78 
     79     prefix = "r:"
     80 
     81     def setUp(self):
     82         self.tar = tarfile.open(self.tarname, mode=self.mode,
     83                                 encoding="iso8859-1")
     84 
     85     def tearDown(self):
     86         self.tar.close()
     87 
     88 
     89 class UstarReadTest(ReadTest, unittest.TestCase):
     90 
     91     def test_fileobj_regular_file(self):
     92         tarinfo = self.tar.getmember("ustar/regtype")
     93         with self.tar.extractfile(tarinfo) as fobj:
     94             data = fobj.read()
     95             self.assertEqual(len(data), tarinfo.size,
     96                     "regular file extraction failed")
     97             self.assertEqual(md5sum(data), md5_regtype,
     98                     "regular file extraction failed")
     99 
    100     def test_fileobj_readlines(self):
    101         self.tar.extract("ustar/regtype", TEMPDIR)
    102         tarinfo = self.tar.getmember("ustar/regtype")
    103         with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
    104             lines1 = fobj1.readlines()
    105 
    106         with self.tar.extractfile(tarinfo) as fobj:
    107             fobj2 = io.TextIOWrapper(fobj)
    108             lines2 = fobj2.readlines()
    109             self.assertEqual(lines1, lines2,
    110                     "fileobj.readlines() failed")
    111             self.assertEqual(len(lines2), 114,
    112                     "fileobj.readlines() failed")
    113             self.assertEqual(lines2[83],
    114                     "I will gladly admit that Python is not the fastest "
    115                     "running scripting language.\n",
    116                     "fileobj.readlines() failed")
    117 
    118     def test_fileobj_iter(self):
    119         self.tar.extract("ustar/regtype", TEMPDIR)
    120         tarinfo = self.tar.getmember("ustar/regtype")
    121         with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
    122             lines1 = fobj1.readlines()
    123         with self.tar.extractfile(tarinfo) as fobj2:
    124             lines2 = list(io.TextIOWrapper(fobj2))
    125             self.assertEqual(lines1, lines2,
    126                     "fileobj.__iter__() failed")
    127 
    128     def test_fileobj_seek(self):
    129         self.tar.extract("ustar/regtype", TEMPDIR)
    130         with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
    131             data = fobj.read()
    132 
    133         tarinfo = self.tar.getmember("ustar/regtype")
    134         fobj = self.tar.extractfile(tarinfo)
    135 
    136         text = fobj.read()
    137         fobj.seek(0)
    138         self.assertEqual(0, fobj.tell(),
    139                      "seek() to file's start failed")
    140         fobj.seek(2048, 0)
    141         self.assertEqual(2048, fobj.tell(),
    142                      "seek() to absolute position failed")
    143         fobj.seek(-1024, 1)
    144         self.assertEqual(1024, fobj.tell(),
    145                      "seek() to negative relative position failed")
    146         fobj.seek(1024, 1)
    147         self.assertEqual(2048, fobj.tell(),
    148                      "seek() to positive relative position failed")
    149         s = fobj.read(10)
    150         self.assertEqual(s, data[2048:2058],
    151                      "read() after seek failed")
    152         fobj.seek(0, 2)
    153         self.assertEqual(tarinfo.size, fobj.tell(),
    154                      "seek() to file's end failed")
    155         self.assertEqual(fobj.read(), b"",
    156                      "read() at file's end did not return empty string")
    157         fobj.seek(-tarinfo.size, 2)
    158         self.assertEqual(0, fobj.tell(),
    159                      "relative seek() to file's end failed")
    160         fobj.seek(512)
    161         s1 = fobj.readlines()
    162         fobj.seek(512)
    163         s2 = fobj.readlines()
    164         self.assertEqual(s1, s2,
    165                      "readlines() after seek failed")
    166         fobj.seek(0)
    167         self.assertEqual(len(fobj.readline()), fobj.tell(),
    168                      "tell() after readline() failed")
    169         fobj.seek(512)
    170         self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
    171                      "tell() after seek() and readline() failed")
    172         fobj.seek(0)
    173         line = fobj.readline()
    174         self.assertEqual(fobj.read(), data[len(line):],
    175                      "read() after readline() failed")
    176         fobj.close()
    177 
    178     def test_fileobj_text(self):
    179         with self.tar.extractfile("ustar/regtype") as fobj:
    180             fobj = io.TextIOWrapper(fobj)
    181             data = fobj.read().encode("iso8859-1")
    182             self.assertEqual(md5sum(data), md5_regtype)
    183             try:
    184                 fobj.seek(100)
    185             except AttributeError:
    186                 # Issue #13815: seek() complained about a missing
    187                 # flush() method.
    188                 self.fail("seeking failed in text mode")
    189 
    190     # Test if symbolic and hard links are resolved by extractfile().  The
    191     # test link members each point to a regular member whose data is
    192     # supposed to be exported.
    193     def _test_fileobj_link(self, lnktype, regtype):
    194         with self.tar.extractfile(lnktype) as a, \
    195              self.tar.extractfile(regtype) as b:
    196             self.assertEqual(a.name, b.name)
    197 
    198     def test_fileobj_link1(self):
    199         self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
    200 
    201     def test_fileobj_link2(self):
    202         self._test_fileobj_link("./ustar/linktest2/lnktype",
    203                                 "ustar/linktest1/regtype")
    204 
    205     def test_fileobj_symlink1(self):
    206         self._test_fileobj_link("ustar/symtype", "ustar/regtype")
    207 
    208     def test_fileobj_symlink2(self):
    209         self._test_fileobj_link("./ustar/linktest2/symtype",
    210                                 "ustar/linktest1/regtype")
    211 
    212     def test_issue14160(self):
    213         self._test_fileobj_link("symtype2", "ustar/regtype")
    214 
    215 class GzipUstarReadTest(GzipTest, UstarReadTest):
    216     pass
    217 
    218 class Bz2UstarReadTest(Bz2Test, UstarReadTest):
    219     pass
    220 
    221 class LzmaUstarReadTest(LzmaTest, UstarReadTest):
    222     pass
    223 
    224 
    225 class ListTest(ReadTest, unittest.TestCase):
    226 
    227     # Override setUp to use default encoding (UTF-8)
    228     def setUp(self):
    229         self.tar = tarfile.open(self.tarname, mode=self.mode)
    230 
    231     def test_list(self):
    232         tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
    233         with support.swap_attr(sys, 'stdout', tio):
    234             self.tar.list(verbose=False)
    235         out = tio.detach().getvalue()
    236         self.assertIn(b'ustar/conttype', out)
    237         self.assertIn(b'ustar/regtype', out)
    238         self.assertIn(b'ustar/lnktype', out)
    239         self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
    240         self.assertIn(b'./ustar/linktest2/symtype', out)
    241         self.assertIn(b'./ustar/linktest2/lnktype', out)
    242         # Make sure it puts trailing slash for directory
    243         self.assertIn(b'ustar/dirtype/', out)
    244         self.assertIn(b'ustar/dirtype-with-size/', out)
    245         # Make sure it is able to print unencodable characters
    246         def conv(b):
    247             s = b.decode(self.tar.encoding, 'surrogateescape')
    248             return s.encode('ascii', 'backslashreplace')
    249         self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
    250         self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
    251                            b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
    252         self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
    253                            b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
    254         self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
    255         self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
    256         # Make sure it prints files separated by one newline without any
    257         # 'ls -l'-like accessories if verbose flag is not being used
    258         # ...
    259         # ustar/conttype
    260         # ustar/regtype
    261         # ...
    262         self.assertRegex(out, br'ustar/conttype ?\r?\n'
    263                               br'ustar/regtype ?\r?\n')
    264         # Make sure it does not print the source of link without verbose flag
    265         self.assertNotIn(b'link to', out)
    266         self.assertNotIn(b'->', out)
    267 
    268     def test_list_verbose(self):
    269         tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
    270         with support.swap_attr(sys, 'stdout', tio):
    271             self.tar.list(verbose=True)
    272         out = tio.detach().getvalue()
    273         # Make sure it prints files separated by one newline with 'ls -l'-like
    274         # accessories if verbose flag is being used
    275         # ...
    276         # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
    277         # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
    278         # ...
    279         self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
    280                                br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
    281                                br'ustar/\w+type ?\r?\n') * 2)
    282         # Make sure it prints the source of link with verbose flag
    283         self.assertIn(b'ustar/symtype -> regtype', out)
    284         self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
    285         self.assertIn(b'./ustar/linktest2/lnktype link to '
    286                       b'./ustar/linktest1/regtype', out)
    287         self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
    288                       (b'/123' * 125) + b'/longname', out)
    289         self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
    290                       (b'/123' * 125) + b'/longname', out)
    291 
    292     def test_list_members(self):
    293         tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
    294         def members(tar):
    295             for tarinfo in tar.getmembers():
    296                 if 'reg' in tarinfo.name:
    297                     yield tarinfo
    298         with support.swap_attr(sys, 'stdout', tio):
    299             self.tar.list(verbose=False, members=members(self.tar))
    300         out = tio.detach().getvalue()
    301         self.assertIn(b'ustar/regtype', out)
    302         self.assertNotIn(b'ustar/conttype', out)
    303 
    304 
    305 class GzipListTest(GzipTest, ListTest):
    306     pass
    307 
    308 
    309 class Bz2ListTest(Bz2Test, ListTest):
    310     pass
    311 
    312 
    313 class LzmaListTest(LzmaTest, ListTest):
    314     pass
    315 
    316 
    317 class CommonReadTest(ReadTest):
    318 
    319     def test_empty_tarfile(self):
    320         # Test for issue6123: Allow opening empty archives.
    321         # This test checks if tarfile.open() is able to open an empty tar
    322         # archive successfully. Note that an empty tar archive is not the
    323         # same as an empty file!
    324         with tarfile.open(tmpname, self.mode.replace("r", "w")):
    325             pass
    326         try:
    327             tar = tarfile.open(tmpname, self.mode)
    328             tar.getnames()
    329         except tarfile.ReadError:
    330             self.fail("tarfile.open() failed on empty archive")
    331         else:
    332             self.assertListEqual(tar.getmembers(), [])
    333         finally:
    334             tar.close()
    335 
    336     def test_non_existent_tarfile(self):
    337         # Test for issue11513: prevent non-existent gzipped tarfiles raising
    338         # multiple exceptions.
    339         with self.assertRaisesRegex(FileNotFoundError, "xxx"):
    340             tarfile.open("xxx", self.mode)
    341 
    342     def test_null_tarfile(self):
    343         # Test for issue6123: Allow opening empty archives.
    344         # This test guarantees that tarfile.open() does not treat an empty
    345         # file as an empty tar archive.
    346         with open(tmpname, "wb"):
    347             pass
    348         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
    349         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
    350 
    351     def test_ignore_zeros(self):
    352         # Test TarFile's ignore_zeros option.
    353         # generate 512 pseudorandom bytes
    354         data = Random(0).getrandbits(512*8).to_bytes(512, 'big')
    355         for char in (b'\0', b'a'):
    356             # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
    357             # are ignored correctly.
    358             with self.open(tmpname, "w") as fobj:
    359                 fobj.write(char * 1024)
    360                 tarinfo = tarfile.TarInfo("foo")
    361                 tarinfo.size = len(data)
    362                 fobj.write(tarinfo.tobuf())
    363                 fobj.write(data)
    364 
    365             tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
    366             try:
    367                 self.assertListEqual(tar.getnames(), ["foo"],
    368                     "ignore_zeros=True should have skipped the %r-blocks" %
    369                     char)
    370             finally:
    371                 tar.close()
    372 
    373     def test_premature_end_of_archive(self):
    374         for size in (512, 600, 1024, 1200):
    375             with tarfile.open(tmpname, "w:") as tar:
    376                 t = tarfile.TarInfo("foo")
    377                 t.size = 1024
    378                 tar.addfile(t, io.BytesIO(b"a" * 1024))
    379 
    380             with open(tmpname, "r+b") as fobj:
    381                 fobj.truncate(size)
    382 
    383             with tarfile.open(tmpname) as tar:
    384                 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
    385                     for t in tar:
    386                         pass
    387 
    388             with tarfile.open(tmpname) as tar:
    389                 t = tar.next()
    390 
    391                 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
    392                     tar.extract(t, TEMPDIR)
    393 
    394                 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
    395                     tar.extractfile(t).read()
    396 
    397 class MiscReadTestBase(CommonReadTest):
    398     def requires_name_attribute(self):
    399         pass
    400 
    401     def test_no_name_argument(self):
    402         self.requires_name_attribute()
    403         with open(self.tarname, "rb") as fobj:
    404             self.assertIsInstance(fobj.name, str)
    405             with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
    406                 self.assertIsInstance(tar.name, str)
    407                 self.assertEqual(tar.name, os.path.abspath(fobj.name))
    408 
    409     def test_no_name_attribute(self):
    410         with open(self.tarname, "rb") as fobj:
    411             data = fobj.read()
    412         fobj = io.BytesIO(data)
    413         self.assertRaises(AttributeError, getattr, fobj, "name")
    414         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    415         self.assertIsNone(tar.name)
    416 
    417     def test_empty_name_attribute(self):
    418         with open(self.tarname, "rb") as fobj:
    419             data = fobj.read()
    420         fobj = io.BytesIO(data)
    421         fobj.name = ""
    422         with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
    423             self.assertIsNone(tar.name)
    424 
    425     def test_int_name_attribute(self):
    426         # Issue 21044: tarfile.open() should handle fileobj with an integer
    427         # 'name' attribute.
    428         fd = os.open(self.tarname, os.O_RDONLY)
    429         with open(fd, 'rb') as fobj:
    430             self.assertIsInstance(fobj.name, int)
    431             with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
    432                 self.assertIsNone(tar.name)
    433 
    434     def test_bytes_name_attribute(self):
    435         self.requires_name_attribute()
    436         tarname = os.fsencode(self.tarname)
    437         with open(tarname, 'rb') as fobj:
    438             self.assertIsInstance(fobj.name, bytes)
    439             with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
    440                 self.assertIsInstance(tar.name, bytes)
    441                 self.assertEqual(tar.name, os.path.abspath(fobj.name))
    442 
    443     def test_illegal_mode_arg(self):
    444         with open(tmpname, 'wb'):
    445             pass
    446         with self.assertRaisesRegex(ValueError, 'mode must be '):
    447             tar = self.taropen(tmpname, 'q')
    448         with self.assertRaisesRegex(ValueError, 'mode must be '):
    449             tar = self.taropen(tmpname, 'rw')
    450         with self.assertRaisesRegex(ValueError, 'mode must be '):
    451             tar = self.taropen(tmpname, '')
    452 
    453     def test_fileobj_with_offset(self):
    454         # Skip the first member and store values from the second member
    455         # of the testtar.
    456         tar = tarfile.open(self.tarname, mode=self.mode)
    457         try:
    458             tar.next()
    459             t = tar.next()
    460             name = t.name
    461             offset = t.offset
    462             with tar.extractfile(t) as f:
    463                 data = f.read()
    464         finally:
    465             tar.close()
    466 
    467         # Open the testtar and seek to the offset of the second member.
    468         with self.open(self.tarname) as fobj:
    469             fobj.seek(offset)
    470 
    471             # Test if the tarfile starts with the second member.
    472             tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
    473             t = tar.next()
    474             self.assertEqual(t.name, name)
    475             # Read to the end of fileobj and test if seeking back to the
    476             # beginning works.
    477             tar.getmembers()
    478             self.assertEqual(tar.extractfile(t).read(), data,
    479                     "seek back did not work")
    480             tar.close()
    481 
    482     def test_fail_comp(self):
    483         # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
    484         self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
    485         with open(tarname, "rb") as fobj:
    486             self.assertRaises(tarfile.ReadError, tarfile.open,
    487                               fileobj=fobj, mode=self.mode)
    488 
    489     def test_v7_dirtype(self):
    490         # Test old style dirtype member (bug #1336623):
    491         # Old V7 tars create directory members using an AREGTYPE
    492         # header with a "/" appended to the filename field.
    493         tarinfo = self.tar.getmember("misc/dirtype-old-v7")
    494         self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
    495                 "v7 dirtype failed")
    496 
    497     def test_xstar_type(self):
    498         # The xstar format stores extra atime and ctime fields inside the
    499         # space reserved for the prefix field. The prefix field must be
    500         # ignored in this case, otherwise it will mess up the name.
    501         try:
    502             self.tar.getmember("misc/regtype-xstar")
    503         except KeyError:
    504             self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
    505 
    506     def test_check_members(self):
    507         for tarinfo in self.tar:
    508             self.assertEqual(int(tarinfo.mtime), 0o7606136617,
    509                     "wrong mtime for %s" % tarinfo.name)
    510             if not tarinfo.name.startswith("ustar/"):
    511                 continue
    512             self.assertEqual(tarinfo.uname, "tarfile",
    513                     "wrong uname for %s" % tarinfo.name)
    514 
    515     def test_find_members(self):
    516         self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
    517                 "could not find all members")
    518 
    519     @unittest.skipUnless(hasattr(os, "link"),
    520                          "Missing hardlink implementation")
    521     @support.skip_unless_symlink
    522     def test_extract_hardlink(self):
    523         # Test hardlink extraction (e.g. bug #857297).
    524         with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
    525             tar.extract("ustar/regtype", TEMPDIR)
    526             self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
    527 
    528             tar.extract("ustar/lnktype", TEMPDIR)
    529             self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
    530             with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
    531                 data = f.read()
    532             self.assertEqual(md5sum(data), md5_regtype)
    533 
    534             tar.extract("ustar/symtype", TEMPDIR)
    535             self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
    536             with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
    537                 data = f.read()
    538             self.assertEqual(md5sum(data), md5_regtype)
    539 
    540     def test_extractall(self):
    541         # Test if extractall() correctly restores directory permissions
    542         # and times (see issue1735).
    543         tar = tarfile.open(tarname, encoding="iso8859-1")
    544         DIR = os.path.join(TEMPDIR, "extractall")
    545         os.mkdir(DIR)
    546         try:
    547             directories = [t for t in tar if t.isdir()]
    548             tar.extractall(DIR, directories)
    549             for tarinfo in directories:
    550                 path = os.path.join(DIR, tarinfo.name)
    551                 if sys.platform != "win32":
    552                     # Win32 has no support for fine grained permissions.
    553                     self.assertEqual(tarinfo.mode & 0o777,
    554                                      os.stat(path).st_mode & 0o777)
    555                 def format_mtime(mtime):
    556                     if isinstance(mtime, float):
    557                         return "{} ({})".format(mtime, mtime.hex())
    558                     else:
    559                         return "{!r} (int)".format(mtime)
    560                 file_mtime = os.path.getmtime(path)
    561                 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
    562                     format_mtime(tarinfo.mtime),
    563                     format_mtime(file_mtime),
    564                     path)
    565                 self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
    566         finally:
    567             tar.close()
    568             support.rmtree(DIR)
    569 
    570     def test_extract_directory(self):
    571         dirtype = "ustar/dirtype"
    572         DIR = os.path.join(TEMPDIR, "extractdir")
    573         os.mkdir(DIR)
    574         try:
    575             with tarfile.open(tarname, encoding="iso8859-1") as tar:
    576                 tarinfo = tar.getmember(dirtype)
    577                 tar.extract(tarinfo, path=DIR)
    578                 extracted = os.path.join(DIR, dirtype)
    579                 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
    580                 if sys.platform != "win32":
    581                     self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
    582         finally:
    583             support.rmtree(DIR)
    584 
    585     def test_init_close_fobj(self):
    586         # Issue #7341: Close the internal file object in the TarFile
    587         # constructor in case of an error. For the test we rely on
    588         # the fact that opening an empty file raises a ReadError.
    589         empty = os.path.join(TEMPDIR, "empty")
    590         with open(empty, "wb") as fobj:
    591             fobj.write(b"")
    592 
    593         try:
    594             tar = object.__new__(tarfile.TarFile)
    595             try:
    596                 tar.__init__(empty)
    597             except tarfile.ReadError:
    598                 self.assertTrue(tar.fileobj.closed)
    599             else:
    600                 self.fail("ReadError not raised")
    601         finally:
    602             support.unlink(empty)
    603 
    604     def test_parallel_iteration(self):
    605         # Issue #16601: Restarting iteration over tarfile continued
    606         # from where it left off.
    607         with tarfile.open(self.tarname) as tar:
    608             for m1, m2 in zip(tar, tar):
    609                 self.assertEqual(m1.offset, m2.offset)
    610                 self.assertEqual(m1.get_info(), m2.get_info())
    611 
    612 class MiscReadTest(MiscReadTestBase, unittest.TestCase):
    613     test_fail_comp = None
    614 
    615 class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
    616     pass
    617 
    618 class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
    619     def requires_name_attribute(self):
    620         self.skipTest("BZ2File have no name attribute")
    621 
    622 class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
    623     def requires_name_attribute(self):
    624         self.skipTest("LZMAFile have no name attribute")
    625 
    626 
    627 class StreamReadTest(CommonReadTest, unittest.TestCase):
    628 
    629     prefix="r|"
    630 
    631     def test_read_through(self):
    632         # Issue #11224: A poorly designed _FileInFile.read() method
    633         # caused seeking errors with stream tar files.
    634         for tarinfo in self.tar:
    635             if not tarinfo.isreg():
    636                 continue
    637             with self.tar.extractfile(tarinfo) as fobj:
    638                 while True:
    639                     try:
    640                         buf = fobj.read(512)
    641                     except tarfile.StreamError:
    642                         self.fail("simple read-through using "
    643                                   "TarFile.extractfile() failed")
    644                     if not buf:
    645                         break
    646 
    647     def test_fileobj_regular_file(self):
    648         tarinfo = self.tar.next() # get "regtype" (can't use getmember)
    649         with self.tar.extractfile(tarinfo) as fobj:
    650             data = fobj.read()
    651         self.assertEqual(len(data), tarinfo.size,
    652                 "regular file extraction failed")
    653         self.assertEqual(md5sum(data), md5_regtype,
    654                 "regular file extraction failed")
    655 
    656     def test_provoke_stream_error(self):
    657         tarinfos = self.tar.getmembers()
    658         with self.tar.extractfile(tarinfos[0]) as f: # read the first member
    659             self.assertRaises(tarfile.StreamError, f.read)
    660 
    661     def test_compare_members(self):
    662         tar1 = tarfile.open(tarname, encoding="iso8859-1")
    663         try:
    664             tar2 = self.tar
    665 
    666             while True:
    667                 t1 = tar1.next()
    668                 t2 = tar2.next()
    669                 if t1 is None:
    670                     break
    671                 self.assertIsNotNone(t2, "stream.next() failed.")
    672 
    673                 if t2.islnk() or t2.issym():
    674                     with self.assertRaises(tarfile.StreamError):
    675                         tar2.extractfile(t2)
    676                     continue
    677 
    678                 v1 = tar1.extractfile(t1)
    679                 v2 = tar2.extractfile(t2)
    680                 if v1 is None:
    681                     continue
    682                 self.assertIsNotNone(v2, "stream.extractfile() failed")
    683                 self.assertEqual(v1.read(), v2.read(),
    684                         "stream extraction failed")
    685         finally:
    686             tar1.close()
    687 
    688 class GzipStreamReadTest(GzipTest, StreamReadTest):
    689     pass
    690 
    691 class Bz2StreamReadTest(Bz2Test, StreamReadTest):
    692     pass
    693 
    694 class LzmaStreamReadTest(LzmaTest, StreamReadTest):
    695     pass
    696 
    697 
    698 class DetectReadTest(TarTest, unittest.TestCase):
    699     def _testfunc_file(self, name, mode):
    700         try:
    701             tar = tarfile.open(name, mode)
    702         except tarfile.ReadError as e:
    703             self.fail()
    704         else:
    705             tar.close()
    706 
    707     def _testfunc_fileobj(self, name, mode):
    708         try:
    709             with open(name, "rb") as f:
    710                 tar = tarfile.open(name, mode, fileobj=f)
    711         except tarfile.ReadError as e:
    712             self.fail()
    713         else:
    714             tar.close()
    715 
    716     def _test_modes(self, testfunc):
    717         if self.suffix:
    718             with self.assertRaises(tarfile.ReadError):
    719                 tarfile.open(tarname, mode="r:" + self.suffix)
    720             with self.assertRaises(tarfile.ReadError):
    721                 tarfile.open(tarname, mode="r|" + self.suffix)
    722             with self.assertRaises(tarfile.ReadError):
    723                 tarfile.open(self.tarname, mode="r:")
    724             with self.assertRaises(tarfile.ReadError):
    725                 tarfile.open(self.tarname, mode="r|")
    726         testfunc(self.tarname, "r")
    727         testfunc(self.tarname, "r:" + self.suffix)
    728         testfunc(self.tarname, "r:*")
    729         testfunc(self.tarname, "r|" + self.suffix)
    730         testfunc(self.tarname, "r|*")
    731 
    732     def test_detect_file(self):
    733         self._test_modes(self._testfunc_file)
    734 
    735     def test_detect_fileobj(self):
    736         self._test_modes(self._testfunc_fileobj)
    737 
    738 class GzipDetectReadTest(GzipTest, DetectReadTest):
    739     pass
    740 
    741 class Bz2DetectReadTest(Bz2Test, DetectReadTest):
    742     def test_detect_stream_bz2(self):
    743         # Originally, tarfile's stream detection looked for the string
    744         # "BZh91" at the start of the file. This is incorrect because
    745         # the '9' represents the blocksize (900kB). If the file was
    746         # compressed using another blocksize autodetection fails.
    747         with open(tarname, "rb") as fobj:
    748             data = fobj.read()
    749 
    750         # Compress with blocksize 100kB, the file starts with "BZh11".
    751         with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
    752             fobj.write(data)
    753 
    754         self._testfunc_file(tmpname, "r|*")
    755 
    756 class LzmaDetectReadTest(LzmaTest, DetectReadTest):
    757     pass
    758 
    759 
    760 class MemberReadTest(ReadTest, unittest.TestCase):
    761 
    762     def _test_member(self, tarinfo, chksum=None, **kwargs):
    763         if chksum is not None:
    764             with self.tar.extractfile(tarinfo) as f:
    765                 self.assertEqual(md5sum(f.read()), chksum,
    766                         "wrong md5sum for %s" % tarinfo.name)
    767 
    768         kwargs["mtime"] = 0o7606136617
    769         kwargs["uid"] = 1000
    770         kwargs["gid"] = 100
    771         if "old-v7" not in tarinfo.name:
    772             # V7 tar can't handle alphabetic owners.
    773             kwargs["uname"] = "tarfile"
    774             kwargs["gname"] = "tarfile"
    775         for k, v in kwargs.items():
    776             self.assertEqual(getattr(tarinfo, k), v,
    777                     "wrong value in %s field of %s" % (k, tarinfo.name))
    778 
    779     def test_find_regtype(self):
    780         tarinfo = self.tar.getmember("ustar/regtype")
    781         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    782 
    783     def test_find_conttype(self):
    784         tarinfo = self.tar.getmember("ustar/conttype")
    785         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    786 
    787     def test_find_dirtype(self):
    788         tarinfo = self.tar.getmember("ustar/dirtype")
    789         self._test_member(tarinfo, size=0)
    790 
    791     def test_find_dirtype_with_size(self):
    792         tarinfo = self.tar.getmember("ustar/dirtype-with-size")
    793         self._test_member(tarinfo, size=255)
    794 
    795     def test_find_lnktype(self):
    796         tarinfo = self.tar.getmember("ustar/lnktype")
    797         self._test_member(tarinfo, size=0, linkname="ustar/regtype")
    798 
    799     def test_find_symtype(self):
    800         tarinfo = self.tar.getmember("ustar/symtype")
    801         self._test_member(tarinfo, size=0, linkname="regtype")
    802 
    803     def test_find_blktype(self):
    804         tarinfo = self.tar.getmember("ustar/blktype")
    805         self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
    806 
    807     def test_find_chrtype(self):
    808         tarinfo = self.tar.getmember("ustar/chrtype")
    809         self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
    810 
    811     def test_find_fifotype(self):
    812         tarinfo = self.tar.getmember("ustar/fifotype")
    813         self._test_member(tarinfo, size=0)
    814 
    815     def test_find_sparse(self):
    816         tarinfo = self.tar.getmember("ustar/sparse")
    817         self._test_member(tarinfo, size=86016, chksum=md5_sparse)
    818 
    819     def test_find_gnusparse(self):
    820         tarinfo = self.tar.getmember("gnu/sparse")
    821         self._test_member(tarinfo, size=86016, chksum=md5_sparse)
    822 
    823     def test_find_gnusparse_00(self):
    824         tarinfo = self.tar.getmember("gnu/sparse-0.0")
    825         self._test_member(tarinfo, size=86016, chksum=md5_sparse)
    826 
    827     def test_find_gnusparse_01(self):
    828         tarinfo = self.tar.getmember("gnu/sparse-0.1")
    829         self._test_member(tarinfo, size=86016, chksum=md5_sparse)
    830 
    831     def test_find_gnusparse_10(self):
    832         tarinfo = self.tar.getmember("gnu/sparse-1.0")
    833         self._test_member(tarinfo, size=86016, chksum=md5_sparse)
    834 
    835     def test_find_umlauts(self):
    836         tarinfo = self.tar.getmember("ustar/umlauts-"
    837                                      "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    838         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    839 
    840     def test_find_ustar_longname(self):
    841         name = "ustar/" + "12345/" * 39 + "1234567/longname"
    842         self.assertIn(name, self.tar.getnames())
    843 
    844     def test_find_regtype_oldv7(self):
    845         tarinfo = self.tar.getmember("misc/regtype-old-v7")
    846         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    847 
    848     def test_find_pax_umlauts(self):
    849         self.tar.close()
    850         self.tar = tarfile.open(self.tarname, mode=self.mode,
    851                                 encoding="iso8859-1")
    852         tarinfo = self.tar.getmember("pax/umlauts-"
    853                                      "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    854         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    855 
    856 
    857 class LongnameTest:
    858 
    859     def test_read_longname(self):
    860         # Test reading of longname (bug #1471427).
    861         longname = self.subdir + "/" + "123/" * 125 + "longname"
    862         try:
    863             tarinfo = self.tar.getmember(longname)
    864         except KeyError:
    865             self.fail("longname not found")
    866         self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
    867                 "read longname as dirtype")
    868 
    869     def test_read_longlink(self):
    870         longname = self.subdir + "/" + "123/" * 125 + "longname"
    871         longlink = self.subdir + "/" + "123/" * 125 + "longlink"
    872         try:
    873             tarinfo = self.tar.getmember(longlink)
    874         except KeyError:
    875             self.fail("longlink not found")
    876         self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
    877 
    878     def test_truncated_longname(self):
    879         longname = self.subdir + "/" + "123/" * 125 + "longname"
    880         tarinfo = self.tar.getmember(longname)
    881         offset = tarinfo.offset
    882         self.tar.fileobj.seek(offset)
    883         fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
    884         with self.assertRaises(tarfile.ReadError):
    885             tarfile.open(name="foo.tar", fileobj=fobj)
    886 
    887     def test_header_offset(self):
    888         # Test if the start offset of the TarInfo object includes
    889         # the preceding extended header.
    890         longname = self.subdir + "/" + "123/" * 125 + "longname"
    891         offset = self.tar.getmember(longname).offset
    892         with open(tarname, "rb") as fobj:
    893             fobj.seek(offset)
    894             tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
    895                                               "iso8859-1", "strict")
    896             self.assertEqual(tarinfo.type, self.longnametype)
    897 
    898 
    899 class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
    900 
    901     subdir = "gnu"
    902     longnametype = tarfile.GNUTYPE_LONGNAME
    903 
    904     # Since 3.2 tarfile is supposed to accurately restore sparse members and
    905     # produce files with holes. This is what we actually want to test here.
    906     # Unfortunately, not all platforms/filesystems support sparse files, and
    907     # even on platforms that do it is non-trivial to make reliable assertions
    908     # about holes in files. Therefore, we first do one basic test which works
    909     # an all platforms, and after that a test that will work only on
    910     # platforms/filesystems that prove to support sparse files.
    911     def _test_sparse_file(self, name):
    912         self.tar.extract(name, TEMPDIR)
    913         filename = os.path.join(TEMPDIR, name)
    914         with open(filename, "rb") as fobj:
    915             data = fobj.read()
    916         self.assertEqual(md5sum(data), md5_sparse,
    917                 "wrong md5sum for %s" % name)
    918 
    919         if self._fs_supports_holes():
    920             s = os.stat(filename)
    921             self.assertLess(s.st_blocks * 512, s.st_size)
    922 
    923     def test_sparse_file_old(self):
    924         self._test_sparse_file("gnu/sparse")
    925 
    926     def test_sparse_file_00(self):
    927         self._test_sparse_file("gnu/sparse-0.0")
    928 
    929     def test_sparse_file_01(self):
    930         self._test_sparse_file("gnu/sparse-0.1")
    931 
    932     def test_sparse_file_10(self):
    933         self._test_sparse_file("gnu/sparse-1.0")
    934 
    935     @staticmethod
    936     def _fs_supports_holes():
    937         # Return True if the platform knows the st_blocks stat attribute and
    938         # uses st_blocks units of 512 bytes, and if the filesystem is able to
    939         # store holes in files.
    940         if sys.platform.startswith("linux"):
    941             # Linux evidentially has 512 byte st_blocks units.
    942             name = os.path.join(TEMPDIR, "sparse-test")
    943             with open(name, "wb") as fobj:
    944                 fobj.seek(4096)
    945                 fobj.truncate()
    946             s = os.stat(name)
    947             support.unlink(name)
    948             return s.st_blocks == 0
    949         else:
    950             return False
    951 
    952 
    953 class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
    954 
    955     subdir = "pax"
    956     longnametype = tarfile.XHDTYPE
    957 
    958     def test_pax_global_headers(self):
    959         tar = tarfile.open(tarname, encoding="iso8859-1")
    960         try:
    961             tarinfo = tar.getmember("pax/regtype1")
    962             self.assertEqual(tarinfo.uname, "foo")
    963             self.assertEqual(tarinfo.gname, "bar")
    964             self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
    965                              "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    966 
    967             tarinfo = tar.getmember("pax/regtype2")
    968             self.assertEqual(tarinfo.uname, "")
    969             self.assertEqual(tarinfo.gname, "bar")
    970             self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
    971                              "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    972 
    973             tarinfo = tar.getmember("pax/regtype3")
    974             self.assertEqual(tarinfo.uname, "tarfile")
    975             self.assertEqual(tarinfo.gname, "tarfile")
    976             self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
    977                              "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
    978         finally:
    979             tar.close()
    980 
    981     def test_pax_number_fields(self):
    982         # All following number fields are read from the pax header.
    983         tar = tarfile.open(tarname, encoding="iso8859-1")
    984         try:
    985             tarinfo = tar.getmember("pax/regtype4")
    986             self.assertEqual(tarinfo.size, 7011)
    987             self.assertEqual(tarinfo.uid, 123)
    988             self.assertEqual(tarinfo.gid, 123)
    989             self.assertEqual(tarinfo.mtime, 1041808783.0)
    990             self.assertEqual(type(tarinfo.mtime), float)
    991             self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
    992             self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
    993         finally:
    994             tar.close()
    995 
    996 
    997 class WriteTestBase(TarTest):
    998     # Put all write tests in here that are supposed to be tested
    999     # in all possible mode combinations.
   1000 
   1001     def test_fileobj_no_close(self):
   1002         fobj = io.BytesIO()
   1003         tar = tarfile.open(fileobj=fobj, mode=self.mode)
   1004         tar.addfile(tarfile.TarInfo("foo"))
   1005         tar.close()
   1006         self.assertFalse(fobj.closed, "external fileobjs must never closed")
   1007         # Issue #20238: Incomplete gzip output with mode="w:gz"
   1008         data = fobj.getvalue()
   1009         del tar
   1010         support.gc_collect()
   1011         self.assertFalse(fobj.closed)
   1012         self.assertEqual(data, fobj.getvalue())
   1013 
   1014     def test_eof_marker(self):
   1015         # Make sure an end of archive marker is written (two zero blocks).
   1016         # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
   1017         # So, we create an archive that has exactly 10240 bytes without the
   1018         # marker, and has 20480 bytes once the marker is written.
   1019         with tarfile.open(tmpname, self.mode) as tar:
   1020             t = tarfile.TarInfo("foo")
   1021             t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
   1022             tar.addfile(t, io.BytesIO(b"a" * t.size))
   1023 
   1024         with self.open(tmpname, "rb") as fobj:
   1025             self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
   1026 
   1027 
   1028 class WriteTest(WriteTestBase, unittest.TestCase):
   1029 
   1030     prefix = "w:"
   1031 
   1032     def test_100_char_name(self):
   1033         # The name field in a tar header stores strings of at most 100 chars.
   1034         # If a string is shorter than 100 chars it has to be padded with '\0',
   1035         # which implies that a string of exactly 100 chars is stored without
   1036         # a trailing '\0'.
   1037         name = "0123456789" * 10
   1038         tar = tarfile.open(tmpname, self.mode)
   1039         try:
   1040             t = tarfile.TarInfo(name)
   1041             tar.addfile(t)
   1042         finally:
   1043             tar.close()
   1044 
   1045         tar = tarfile.open(tmpname)
   1046         try:
   1047             self.assertEqual(tar.getnames()[0], name,
   1048                     "failed to store 100 char filename")
   1049         finally:
   1050             tar.close()
   1051 
   1052     def test_tar_size(self):
   1053         # Test for bug #1013882.
   1054         tar = tarfile.open(tmpname, self.mode)
   1055         try:
   1056             path = os.path.join(TEMPDIR, "file")
   1057             with open(path, "wb") as fobj:
   1058                 fobj.write(b"aaa")
   1059             tar.add(path)
   1060         finally:
   1061             tar.close()
   1062         self.assertGreater(os.path.getsize(tmpname), 0,
   1063                 "tarfile is empty")
   1064 
   1065     # The test_*_size tests test for bug #1167128.
   1066     def test_file_size(self):
   1067         tar = tarfile.open(tmpname, self.mode)
   1068         try:
   1069             path = os.path.join(TEMPDIR, "file")
   1070             with open(path, "wb"):
   1071                 pass
   1072             tarinfo = tar.gettarinfo(path)
   1073             self.assertEqual(tarinfo.size, 0)
   1074 
   1075             with open(path, "wb") as fobj:
   1076                 fobj.write(b"aaa")
   1077             tarinfo = tar.gettarinfo(path)
   1078             self.assertEqual(tarinfo.size, 3)
   1079         finally:
   1080             tar.close()
   1081 
   1082     def test_directory_size(self):
   1083         path = os.path.join(TEMPDIR, "directory")
   1084         os.mkdir(path)
   1085         try:
   1086             tar = tarfile.open(tmpname, self.mode)
   1087             try:
   1088                 tarinfo = tar.gettarinfo(path)
   1089                 self.assertEqual(tarinfo.size, 0)
   1090             finally:
   1091                 tar.close()
   1092         finally:
   1093             support.rmdir(path)
   1094 
   1095     @unittest.skipUnless(hasattr(os, "link"),
   1096                          "Missing hardlink implementation")
   1097     def test_link_size(self):
   1098         link = os.path.join(TEMPDIR, "link")
   1099         target = os.path.join(TEMPDIR, "link_target")
   1100         with open(target, "wb") as fobj:
   1101             fobj.write(b"aaa")
   1102         os.link(target, link)
   1103         try:
   1104             tar = tarfile.open(tmpname, self.mode)
   1105             try:
   1106                 # Record the link target in the inodes list.
   1107                 tar.gettarinfo(target)
   1108                 tarinfo = tar.gettarinfo(link)
   1109                 self.assertEqual(tarinfo.size, 0)
   1110             finally:
   1111                 tar.close()
   1112         finally:
   1113             support.unlink(target)
   1114             support.unlink(link)
   1115 
   1116     @support.skip_unless_symlink
   1117     def test_symlink_size(self):
   1118         path = os.path.join(TEMPDIR, "symlink")
   1119         os.symlink("link_target", path)
   1120         try:
   1121             tar = tarfile.open(tmpname, self.mode)
   1122             try:
   1123                 tarinfo = tar.gettarinfo(path)
   1124                 self.assertEqual(tarinfo.size, 0)
   1125             finally:
   1126                 tar.close()
   1127         finally:
   1128             support.unlink(path)
   1129 
   1130     def test_add_self(self):
   1131         # Test for #1257255.
   1132         dstname = os.path.abspath(tmpname)
   1133         tar = tarfile.open(tmpname, self.mode)
   1134         try:
   1135             self.assertEqual(tar.name, dstname,
   1136                     "archive name must be absolute")
   1137             tar.add(dstname)
   1138             self.assertEqual(tar.getnames(), [],
   1139                     "added the archive to itself")
   1140 
   1141             with support.change_cwd(TEMPDIR):
   1142                 tar.add(dstname)
   1143             self.assertEqual(tar.getnames(), [],
   1144                     "added the archive to itself")
   1145         finally:
   1146             tar.close()
   1147 
   1148     def test_exclude(self):
   1149         tempdir = os.path.join(TEMPDIR, "exclude")
   1150         os.mkdir(tempdir)
   1151         try:
   1152             for name in ("foo", "bar", "baz"):
   1153                 name = os.path.join(tempdir, name)
   1154                 support.create_empty_file(name)
   1155 
   1156             exclude = os.path.isfile
   1157 
   1158             tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
   1159             try:
   1160                 with support.check_warnings(("use the filter argument",
   1161                                              DeprecationWarning)):
   1162                     tar.add(tempdir, arcname="empty_dir", exclude=exclude)
   1163             finally:
   1164                 tar.close()
   1165 
   1166             tar = tarfile.open(tmpname, "r")
   1167             try:
   1168                 self.assertEqual(len(tar.getmembers()), 1)
   1169                 self.assertEqual(tar.getnames()[0], "empty_dir")
   1170             finally:
   1171                 tar.close()
   1172         finally:
   1173             support.rmtree(tempdir)
   1174 
   1175     def test_filter(self):
   1176         tempdir = os.path.join(TEMPDIR, "filter")
   1177         os.mkdir(tempdir)
   1178         try:
   1179             for name in ("foo", "bar", "baz"):
   1180                 name = os.path.join(tempdir, name)
   1181                 support.create_empty_file(name)
   1182 
   1183             def filter(tarinfo):
   1184                 if os.path.basename(tarinfo.name) == "bar":
   1185                     return
   1186                 tarinfo.uid = 123
   1187                 tarinfo.uname = "foo"
   1188                 return tarinfo
   1189 
   1190             tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
   1191             try:
   1192                 tar.add(tempdir, arcname="empty_dir", filter=filter)
   1193             finally:
   1194                 tar.close()
   1195 
   1196             # Verify that filter is a keyword-only argument
   1197             with self.assertRaises(TypeError):
   1198                 tar.add(tempdir, "empty_dir", True, None, filter)
   1199 
   1200             tar = tarfile.open(tmpname, "r")
   1201             try:
   1202                 for tarinfo in tar:
   1203                     self.assertEqual(tarinfo.uid, 123)
   1204                     self.assertEqual(tarinfo.uname, "foo")
   1205                 self.assertEqual(len(tar.getmembers()), 3)
   1206             finally:
   1207                 tar.close()
   1208         finally:
   1209             support.rmtree(tempdir)
   1210 
   1211     # Guarantee that stored pathnames are not modified. Don't
   1212     # remove ./ or ../ or double slashes. Still make absolute
   1213     # pathnames relative.
   1214     # For details see bug #6054.
   1215     def _test_pathname(self, path, cmp_path=None, dir=False):
   1216         # Create a tarfile with an empty member named path
   1217         # and compare the stored name with the original.
   1218         foo = os.path.join(TEMPDIR, "foo")
   1219         if not dir:
   1220             support.create_empty_file(foo)
   1221         else:
   1222             os.mkdir(foo)
   1223 
   1224         tar = tarfile.open(tmpname, self.mode)
   1225         try:
   1226             tar.add(foo, arcname=path)
   1227         finally:
   1228             tar.close()
   1229 
   1230         tar = tarfile.open(tmpname, "r")
   1231         try:
   1232             t = tar.next()
   1233         finally:
   1234             tar.close()
   1235 
   1236         if not dir:
   1237             support.unlink(foo)
   1238         else:
   1239             support.rmdir(foo)
   1240 
   1241         self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
   1242 
   1243 
   1244     @support.skip_unless_symlink
   1245     def test_extractall_symlinks(self):
   1246         # Test if extractall works properly when tarfile contains symlinks
   1247         tempdir = os.path.join(TEMPDIR, "testsymlinks")
   1248         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
   1249         os.mkdir(tempdir)
   1250         try:
   1251             source_file = os.path.join(tempdir,'source')
   1252             target_file = os.path.join(tempdir,'symlink')
   1253             with open(source_file,'w') as f:
   1254                 f.write('something\n')
   1255             os.symlink(source_file, target_file)
   1256             tar = tarfile.open(temparchive,'w')
   1257             tar.add(source_file)
   1258             tar.add(target_file)
   1259             tar.close()
   1260             # Let's extract it to the location which contains the symlink
   1261             tar = tarfile.open(temparchive,'r')
   1262             # this should not raise OSError: [Errno 17] File exists
   1263             try:
   1264                 tar.extractall(path=tempdir)
   1265             except OSError:
   1266                 self.fail("extractall failed with symlinked files")
   1267             finally:
   1268                 tar.close()
   1269         finally:
   1270             support.unlink(temparchive)
   1271             support.rmtree(tempdir)
   1272 
   1273     def test_pathnames(self):
   1274         self._test_pathname("foo")
   1275         self._test_pathname(os.path.join("foo", ".", "bar"))
   1276         self._test_pathname(os.path.join("foo", "..", "bar"))
   1277         self._test_pathname(os.path.join(".", "foo"))
   1278         self._test_pathname(os.path.join(".", "foo", "."))
   1279         self._test_pathname(os.path.join(".", "foo", ".", "bar"))
   1280         self._test_pathname(os.path.join(".", "foo", "..", "bar"))
   1281         self._test_pathname(os.path.join(".", "foo", "..", "bar"))
   1282         self._test_pathname(os.path.join("..", "foo"))
   1283         self._test_pathname(os.path.join("..", "foo", ".."))
   1284         self._test_pathname(os.path.join("..", "foo", ".", "bar"))
   1285         self._test_pathname(os.path.join("..", "foo", "..", "bar"))
   1286 
   1287         self._test_pathname("foo" + os.sep + os.sep + "bar")
   1288         self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
   1289 
   1290     def test_abs_pathnames(self):
   1291         if sys.platform == "win32":
   1292             self._test_pathname("C:\\foo", "foo")
   1293         else:
   1294             self._test_pathname("/foo", "foo")
   1295             self._test_pathname("///foo", "foo")
   1296 
   1297     def test_cwd(self):
   1298         # Test adding the current working directory.
   1299         with support.change_cwd(TEMPDIR):
   1300             tar = tarfile.open(tmpname, self.mode)
   1301             try:
   1302                 tar.add(".")
   1303             finally:
   1304                 tar.close()
   1305 
   1306             tar = tarfile.open(tmpname, "r")
   1307             try:
   1308                 for t in tar:
   1309                     if t.name != ".":
   1310                         self.assertTrue(t.name.startswith("./"), t.name)
   1311             finally:
   1312                 tar.close()
   1313 
   1314     def test_open_nonwritable_fileobj(self):
   1315         for exctype in OSError, EOFError, RuntimeError:
   1316             class BadFile(io.BytesIO):
   1317                 first = True
   1318                 def write(self, data):
   1319                     if self.first:
   1320                         self.first = False
   1321                         raise exctype
   1322 
   1323             f = BadFile()
   1324             with self.assertRaises(exctype):
   1325                 tar = tarfile.open(tmpname, self.mode, fileobj=f,
   1326                                    format=tarfile.PAX_FORMAT,
   1327                                    pax_headers={'non': 'empty'})
   1328             self.assertFalse(f.closed)
   1329 
   1330 class GzipWriteTest(GzipTest, WriteTest):
   1331     pass
   1332 
   1333 class Bz2WriteTest(Bz2Test, WriteTest):
   1334     pass
   1335 
   1336 class LzmaWriteTest(LzmaTest, WriteTest):
   1337     pass
   1338 
   1339 
   1340 class StreamWriteTest(WriteTestBase, unittest.TestCase):
   1341 
   1342     prefix = "w|"
   1343     decompressor = None
   1344 
   1345     def test_stream_padding(self):
   1346         # Test for bug #1543303.
   1347         tar = tarfile.open(tmpname, self.mode)
   1348         tar.close()
   1349         if self.decompressor:
   1350             dec = self.decompressor()
   1351             with open(tmpname, "rb") as fobj:
   1352                 data = fobj.read()
   1353             data = dec.decompress(data)
   1354             self.assertFalse(dec.unused_data, "found trailing data")
   1355         else:
   1356             with self.open(tmpname) as fobj:
   1357                 data = fobj.read()
   1358         self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
   1359                         "incorrect zero padding")
   1360 
   1361     @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
   1362                          "Missing umask implementation")
   1363     def test_file_mode(self):
   1364         # Test for issue #8464: Create files with correct
   1365         # permissions.
   1366         if os.path.exists(tmpname):
   1367             support.unlink(tmpname)
   1368 
   1369         original_umask = os.umask(0o022)
   1370         try:
   1371             tar = tarfile.open(tmpname, self.mode)
   1372             tar.close()
   1373             mode = os.stat(tmpname).st_mode & 0o777
   1374             self.assertEqual(mode, 0o644, "wrong file permissions")
   1375         finally:
   1376             os.umask(original_umask)
   1377 
   1378 class GzipStreamWriteTest(GzipTest, StreamWriteTest):
   1379     pass
   1380 
   1381 class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
   1382     decompressor = bz2.BZ2Decompressor if bz2 else None
   1383 
   1384 class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
   1385     decompressor = lzma.LZMADecompressor if lzma else None
   1386 
   1387 
   1388 class GNUWriteTest(unittest.TestCase):
   1389     # This testcase checks for correct creation of GNU Longname
   1390     # and Longlink extended headers (cp. bug #812325).
   1391 
   1392     def _length(self, s):
   1393         blocks = len(s) // 512 + 1
   1394         return blocks * 512
   1395 
   1396     def _calc_size(self, name, link=None):
   1397         # Initial tar header
   1398         count = 512
   1399 
   1400         if len(name) > tarfile.LENGTH_NAME:
   1401             # GNU longname extended header + longname
   1402             count += 512
   1403             count += self._length(name)
   1404         if link is not None and len(link) > tarfile.LENGTH_LINK:
   1405             # GNU longlink extended header + longlink
   1406             count += 512
   1407             count += self._length(link)
   1408         return count
   1409 
   1410     def _test(self, name, link=None):
   1411         tarinfo = tarfile.TarInfo(name)
   1412         if link:
   1413             tarinfo.linkname = link
   1414             tarinfo.type = tarfile.LNKTYPE
   1415 
   1416         tar = tarfile.open(tmpname, "w")
   1417         try:
   1418             tar.format = tarfile.GNU_FORMAT
   1419             tar.addfile(tarinfo)
   1420 
   1421             v1 = self._calc_size(name, link)
   1422             v2 = tar.offset
   1423             self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
   1424         finally:
   1425             tar.close()
   1426 
   1427         tar = tarfile.open(tmpname)
   1428         try:
   1429             member = tar.next()
   1430             self.assertIsNotNone(member,
   1431                     "unable to read longname member")
   1432             self.assertEqual(tarinfo.name, member.name,
   1433                     "unable to read longname member")
   1434             self.assertEqual(tarinfo.linkname, member.linkname,
   1435                     "unable to read longname member")
   1436         finally:
   1437             tar.close()
   1438 
   1439     def test_longname_1023(self):
   1440         self._test(("longnam/" * 127) + "longnam")
   1441 
   1442     def test_longname_1024(self):
   1443         self._test(("longnam/" * 127) + "longname")
   1444 
   1445     def test_longname_1025(self):
   1446         self._test(("longnam/" * 127) + "longname_")
   1447 
   1448     def test_longlink_1023(self):
   1449         self._test("name", ("longlnk/" * 127) + "longlnk")
   1450 
   1451     def test_longlink_1024(self):
   1452         self._test("name", ("longlnk/" * 127) + "longlink")
   1453 
   1454     def test_longlink_1025(self):
   1455         self._test("name", ("longlnk/" * 127) + "longlink_")
   1456 
   1457     def test_longnamelink_1023(self):
   1458         self._test(("longnam/" * 127) + "longnam",
   1459                    ("longlnk/" * 127) + "longlnk")
   1460 
   1461     def test_longnamelink_1024(self):
   1462         self._test(("longnam/" * 127) + "longname",
   1463                    ("longlnk/" * 127) + "longlink")
   1464 
   1465     def test_longnamelink_1025(self):
   1466         self._test(("longnam/" * 127) + "longname_",
   1467                    ("longlnk/" * 127) + "longlink_")
   1468 
   1469 
   1470 class CreateTest(WriteTestBase, unittest.TestCase):
   1471 
   1472     prefix = "x:"
   1473 
   1474     file_path = os.path.join(TEMPDIR, "spameggs42")
   1475 
   1476     def setUp(self):
   1477         support.unlink(tmpname)
   1478 
   1479     @classmethod
   1480     def setUpClass(cls):
   1481         with open(cls.file_path, "wb") as fobj:
   1482             fobj.write(b"aaa")
   1483 
   1484     @classmethod
   1485     def tearDownClass(cls):
   1486         support.unlink(cls.file_path)
   1487 
   1488     def test_create(self):
   1489         with tarfile.open(tmpname, self.mode) as tobj:
   1490             tobj.add(self.file_path)
   1491 
   1492         with self.taropen(tmpname) as tobj:
   1493             names = tobj.getnames()
   1494         self.assertEqual(len(names), 1)
   1495         self.assertIn('spameggs42', names[0])
   1496 
   1497     def test_create_existing(self):
   1498         with tarfile.open(tmpname, self.mode) as tobj:
   1499             tobj.add(self.file_path)
   1500 
   1501         with self.assertRaises(FileExistsError):
   1502             tobj = tarfile.open(tmpname, self.mode)
   1503 
   1504         with self.taropen(tmpname) as tobj:
   1505             names = tobj.getnames()
   1506         self.assertEqual(len(names), 1)
   1507         self.assertIn('spameggs42', names[0])
   1508 
   1509     def test_create_taropen(self):
   1510         with self.taropen(tmpname, "x") as tobj:
   1511             tobj.add(self.file_path)
   1512 
   1513         with self.taropen(tmpname) as tobj:
   1514             names = tobj.getnames()
   1515         self.assertEqual(len(names), 1)
   1516         self.assertIn('spameggs42', names[0])
   1517 
   1518     def test_create_existing_taropen(self):
   1519         with self.taropen(tmpname, "x") as tobj:
   1520             tobj.add(self.file_path)
   1521 
   1522         with self.assertRaises(FileExistsError):
   1523             with self.taropen(tmpname, "x"):
   1524                 pass
   1525 
   1526         with self.taropen(tmpname) as tobj:
   1527             names = tobj.getnames()
   1528         self.assertEqual(len(names), 1)
   1529         self.assertIn("spameggs42", names[0])
   1530 
   1531 
   1532 class GzipCreateTest(GzipTest, CreateTest):
   1533     pass
   1534 
   1535 
   1536 class Bz2CreateTest(Bz2Test, CreateTest):
   1537     pass
   1538 
   1539 
   1540 class LzmaCreateTest(LzmaTest, CreateTest):
   1541     pass
   1542 
   1543 
   1544 class CreateWithXModeTest(CreateTest):
   1545 
   1546     prefix = "x"
   1547 
   1548     test_create_taropen = None
   1549     test_create_existing_taropen = None
   1550 
   1551 
   1552 @unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
   1553 class HardlinkTest(unittest.TestCase):
   1554     # Test the creation of LNKTYPE (hardlink) members in an archive.
   1555 
   1556     def setUp(self):
   1557         self.foo = os.path.join(TEMPDIR, "foo")
   1558         self.bar = os.path.join(TEMPDIR, "bar")
   1559 
   1560         with open(self.foo, "wb") as fobj:
   1561             fobj.write(b"foo")
   1562 
   1563         os.link(self.foo, self.bar)
   1564 
   1565         self.tar = tarfile.open(tmpname, "w")
   1566         self.tar.add(self.foo)
   1567 
   1568     def tearDown(self):
   1569         self.tar.close()
   1570         support.unlink(self.foo)
   1571         support.unlink(self.bar)
   1572 
   1573     def test_add_twice(self):
   1574         # The same name will be added as a REGTYPE every
   1575         # time regardless of st_nlink.
   1576         tarinfo = self.tar.gettarinfo(self.foo)
   1577         self.assertEqual(tarinfo.type, tarfile.REGTYPE,
   1578                 "add file as regular failed")
   1579 
   1580     def test_add_hardlink(self):
   1581         tarinfo = self.tar.gettarinfo(self.bar)
   1582         self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
   1583                 "add file as hardlink failed")
   1584 
   1585     def test_dereference_hardlink(self):
   1586         self.tar.dereference = True
   1587         tarinfo = self.tar.gettarinfo(self.bar)
   1588         self.assertEqual(tarinfo.type, tarfile.REGTYPE,
   1589                 "dereferencing hardlink failed")
   1590 
   1591 
   1592 class PaxWriteTest(GNUWriteTest):
   1593 
   1594     def _test(self, name, link=None):
   1595         # See GNUWriteTest.
   1596         tarinfo = tarfile.TarInfo(name)
   1597         if link:
   1598             tarinfo.linkname = link
   1599             tarinfo.type = tarfile.LNKTYPE
   1600 
   1601         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
   1602         try:
   1603             tar.addfile(tarinfo)
   1604         finally:
   1605             tar.close()
   1606 
   1607         tar = tarfile.open(tmpname)
   1608         try:
   1609             if link:
   1610                 l = tar.getmembers()[0].linkname
   1611                 self.assertEqual(link, l, "PAX longlink creation failed")
   1612             else:
   1613                 n = tar.getmembers()[0].name
   1614                 self.assertEqual(name, n, "PAX longname creation failed")
   1615         finally:
   1616             tar.close()
   1617 
   1618     def test_pax_global_header(self):
   1619         pax_headers = {
   1620                 "foo": "bar",
   1621                 "uid": "0",
   1622                 "mtime": "1.23",
   1623                 "test": "\xe4\xf6\xfc",
   1624                 "\xe4\xf6\xfc": "test"}
   1625 
   1626         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
   1627                 pax_headers=pax_headers)
   1628         try:
   1629             tar.addfile(tarfile.TarInfo("test"))
   1630         finally:
   1631             tar.close()
   1632 
   1633         # Test if the global header was written correctly.
   1634         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1635         try:
   1636             self.assertEqual(tar.pax_headers, pax_headers)
   1637             self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
   1638             # Test if all the fields are strings.
   1639             for key, val in tar.pax_headers.items():
   1640                 self.assertIsNot(type(key), bytes)
   1641                 self.assertIsNot(type(val), bytes)
   1642                 if key in tarfile.PAX_NUMBER_FIELDS:
   1643                     try:
   1644                         tarfile.PAX_NUMBER_FIELDS[key](val)
   1645                     except (TypeError, ValueError):
   1646                         self.fail("unable to convert pax header field")
   1647         finally:
   1648             tar.close()
   1649 
   1650     def test_pax_extended_header(self):
   1651         # The fields from the pax header have priority over the
   1652         # TarInfo.
   1653         pax_headers = {"path": "foo", "uid": "123"}
   1654 
   1655         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
   1656                            encoding="iso8859-1")
   1657         try:
   1658             t = tarfile.TarInfo()
   1659             t.name = "\xe4\xf6\xfc" # non-ASCII
   1660             t.uid = 8**8 # too large
   1661             t.pax_headers = pax_headers
   1662             tar.addfile(t)
   1663         finally:
   1664             tar.close()
   1665 
   1666         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1667         try:
   1668             t = tar.getmembers()[0]
   1669             self.assertEqual(t.pax_headers, pax_headers)
   1670             self.assertEqual(t.name, "foo")
   1671             self.assertEqual(t.uid, 123)
   1672         finally:
   1673             tar.close()
   1674 
   1675 
   1676 class UnicodeTest:
   1677 
   1678     def test_iso8859_1_filename(self):
   1679         self._test_unicode_filename("iso8859-1")
   1680 
   1681     def test_utf7_filename(self):
   1682         self._test_unicode_filename("utf7")
   1683 
   1684     def test_utf8_filename(self):
   1685         self._test_unicode_filename("utf-8")
   1686 
   1687     def _test_unicode_filename(self, encoding):
   1688         tar = tarfile.open(tmpname, "w", format=self.format,
   1689                            encoding=encoding, errors="strict")
   1690         try:
   1691             name = "\xe4\xf6\xfc"
   1692             tar.addfile(tarfile.TarInfo(name))
   1693         finally:
   1694             tar.close()
   1695 
   1696         tar = tarfile.open(tmpname, encoding=encoding)
   1697         try:
   1698             self.assertEqual(tar.getmembers()[0].name, name)
   1699         finally:
   1700             tar.close()
   1701 
   1702     def test_unicode_filename_error(self):
   1703         tar = tarfile.open(tmpname, "w", format=self.format,
   1704                            encoding="ascii", errors="strict")
   1705         try:
   1706             tarinfo = tarfile.TarInfo()
   1707 
   1708             tarinfo.name = "\xe4\xf6\xfc"
   1709             self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1710 
   1711             tarinfo.name = "foo"
   1712             tarinfo.uname = "\xe4\xf6\xfc"
   1713             self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1714         finally:
   1715             tar.close()
   1716 
   1717     def test_unicode_argument(self):
   1718         tar = tarfile.open(tarname, "r",
   1719                            encoding="iso8859-1", errors="strict")
   1720         try:
   1721             for t in tar:
   1722                 self.assertIs(type(t.name), str)
   1723                 self.assertIs(type(t.linkname), str)
   1724                 self.assertIs(type(t.uname), str)
   1725                 self.assertIs(type(t.gname), str)
   1726         finally:
   1727             tar.close()
   1728 
   1729     def test_uname_unicode(self):
   1730         t = tarfile.TarInfo("foo")
   1731         t.uname = "\xe4\xf6\xfc"
   1732         t.gname = "\xe4\xf6\xfc"
   1733 
   1734         tar = tarfile.open(tmpname, mode="w", format=self.format,
   1735                            encoding="iso8859-1")
   1736         try:
   1737             tar.addfile(t)
   1738         finally:
   1739             tar.close()
   1740 
   1741         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1742         try:
   1743             t = tar.getmember("foo")
   1744             self.assertEqual(t.uname, "\xe4\xf6\xfc")
   1745             self.assertEqual(t.gname, "\xe4\xf6\xfc")
   1746 
   1747             if self.format != tarfile.PAX_FORMAT:
   1748                 tar.close()
   1749                 tar = tarfile.open(tmpname, encoding="ascii")
   1750                 t = tar.getmember("foo")
   1751                 self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
   1752                 self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
   1753         finally:
   1754             tar.close()
   1755 
   1756 
   1757 class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
   1758 
   1759     format = tarfile.USTAR_FORMAT
   1760 
   1761     # Test whether the utf-8 encoded version of a filename exceeds the 100
   1762     # bytes name field limit (every occurrence of '\xff' will be expanded to 2
   1763     # bytes).
   1764     def test_unicode_name1(self):
   1765         self._test_ustar_name("0123456789" * 10)
   1766         self._test_ustar_name("0123456789" * 10 + "0", ValueError)
   1767         self._test_ustar_name("0123456789" * 9 + "01234567\xff")
   1768         self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
   1769 
   1770     def test_unicode_name2(self):
   1771         self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
   1772         self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
   1773 
   1774     # Test whether the utf-8 encoded version of a filename exceeds the 155
   1775     # bytes prefix + '/' + 100 bytes name limit.
   1776     def test_unicode_longname1(self):
   1777         self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
   1778         self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
   1779         self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
   1780         self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
   1781 
   1782     def test_unicode_longname2(self):
   1783         self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
   1784         self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
   1785 
   1786     def test_unicode_longname3(self):
   1787         self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
   1788         self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
   1789         self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
   1790 
   1791     def test_unicode_longname4(self):
   1792         self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
   1793         self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
   1794 
   1795     def _test_ustar_name(self, name, exc=None):
   1796         with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
   1797             t = tarfile.TarInfo(name)
   1798             if exc is None:
   1799                 tar.addfile(t)
   1800             else:
   1801                 self.assertRaises(exc, tar.addfile, t)
   1802 
   1803         if exc is None:
   1804             with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
   1805                 for t in tar:
   1806                     self.assertEqual(name, t.name)
   1807                     break
   1808 
   1809     # Test the same as above for the 100 bytes link field.
   1810     def test_unicode_link1(self):
   1811         self._test_ustar_link("0123456789" * 10)
   1812         self._test_ustar_link("0123456789" * 10 + "0", ValueError)
   1813         self._test_ustar_link("0123456789" * 9 + "01234567\xff")
   1814         self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
   1815 
   1816     def test_unicode_link2(self):
   1817         self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
   1818         self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
   1819 
   1820     def _test_ustar_link(self, name, exc=None):
   1821         with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
   1822             t = tarfile.TarInfo("foo")
   1823             t.linkname = name
   1824             if exc is None:
   1825                 tar.addfile(t)
   1826             else:
   1827                 self.assertRaises(exc, tar.addfile, t)
   1828 
   1829         if exc is None:
   1830             with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
   1831                 for t in tar:
   1832                     self.assertEqual(name, t.linkname)
   1833                     break
   1834 
   1835 
   1836 class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
   1837 
   1838     format = tarfile.GNU_FORMAT
   1839 
   1840     def test_bad_pax_header(self):
   1841         # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
   1842         # without a hdrcharset=BINARY header.
   1843         for encoding, name in (
   1844                 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
   1845                 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
   1846             with tarfile.open(tarname, encoding=encoding,
   1847                               errors="surrogateescape") as tar:
   1848                 try:
   1849                     t = tar.getmember(name)
   1850                 except KeyError:
   1851                     self.fail("unable to read bad GNU tar pax header")
   1852 
   1853 
   1854 class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
   1855 
   1856     format = tarfile.PAX_FORMAT
   1857 
   1858     # PAX_FORMAT ignores encoding in write mode.
   1859     test_unicode_filename_error = None
   1860 
   1861     def test_binary_header(self):
   1862         # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
   1863         for encoding, name in (
   1864                 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
   1865                 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
   1866             with tarfile.open(tarname, encoding=encoding,
   1867                               errors="surrogateescape") as tar:
   1868                 try:
   1869                     t = tar.getmember(name)
   1870                 except KeyError:
   1871                     self.fail("unable to read POSIX.1-2008 binary header")
   1872 
   1873 
   1874 class AppendTestBase:
   1875     # Test append mode (cp. patch #1652681).
   1876 
   1877     def setUp(self):
   1878         self.tarname = tmpname
   1879         if os.path.exists(self.tarname):
   1880             support.unlink(self.tarname)
   1881 
   1882     def _create_testtar(self, mode="w:"):
   1883         with tarfile.open(tarname, encoding="iso8859-1") as src:
   1884             t = src.getmember("ustar/regtype")
   1885             t.name = "foo"
   1886             with src.extractfile(t) as f:
   1887                 with tarfile.open(self.tarname, mode) as tar:
   1888                     tar.addfile(t, f)
   1889 
   1890     def test_append_compressed(self):
   1891         self._create_testtar("w:" + self.suffix)
   1892         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
   1893 
   1894 class AppendTest(AppendTestBase, unittest.TestCase):
   1895     test_append_compressed = None
   1896 
   1897     def _add_testfile(self, fileobj=None):
   1898         with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
   1899             tar.addfile(tarfile.TarInfo("bar"))
   1900 
   1901     def _test(self, names=["bar"], fileobj=None):
   1902         with tarfile.open(self.tarname, fileobj=fileobj) as tar:
   1903             self.assertEqual(tar.getnames(), names)
   1904 
   1905     def test_non_existing(self):
   1906         self._add_testfile()
   1907         self._test()
   1908 
   1909     def test_empty(self):
   1910         tarfile.open(self.tarname, "w:").close()
   1911         self._add_testfile()
   1912         self._test()
   1913 
   1914     def test_empty_fileobj(self):
   1915         fobj = io.BytesIO(b"\0" * 1024)
   1916         self._add_testfile(fobj)
   1917         fobj.seek(0)
   1918         self._test(fileobj=fobj)
   1919 
   1920     def test_fileobj(self):
   1921         self._create_testtar()
   1922         with open(self.tarname, "rb") as fobj:
   1923             data = fobj.read()
   1924         fobj = io.BytesIO(data)
   1925         self._add_testfile(fobj)
   1926         fobj.seek(0)
   1927         self._test(names=["foo", "bar"], fileobj=fobj)
   1928 
   1929     def test_existing(self):
   1930         self._create_testtar()
   1931         self._add_testfile()
   1932         self._test(names=["foo", "bar"])
   1933 
   1934     # Append mode is supposed to fail if the tarfile to append to
   1935     # does not end with a zero block.
   1936     def _test_error(self, data):
   1937         with open(self.tarname, "wb") as fobj:
   1938             fobj.write(data)
   1939         self.assertRaises(tarfile.ReadError, self._add_testfile)
   1940 
   1941     def test_null(self):
   1942         self._test_error(b"")
   1943 
   1944     def test_incomplete(self):
   1945         self._test_error(b"\0" * 13)
   1946 
   1947     def test_premature_eof(self):
   1948         data = tarfile.TarInfo("foo").tobuf()
   1949         self._test_error(data)
   1950 
   1951     def test_trailing_garbage(self):
   1952         data = tarfile.TarInfo("foo").tobuf()
   1953         self._test_error(data + b"\0" * 13)
   1954 
   1955     def test_invalid(self):
   1956         self._test_error(b"a" * 512)
   1957 
   1958 class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
   1959     pass
   1960 
   1961 class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
   1962     pass
   1963 
   1964 class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
   1965     pass
   1966 
   1967 
   1968 class LimitsTest(unittest.TestCase):
   1969 
   1970     def test_ustar_limits(self):
   1971         # 100 char name
   1972         tarinfo = tarfile.TarInfo("0123456789" * 10)
   1973         tarinfo.tobuf(tarfile.USTAR_FORMAT)
   1974 
   1975         # 101 char name that cannot be stored
   1976         tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
   1977         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1978 
   1979         # 256 char name with a slash at pos 156
   1980         tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
   1981         tarinfo.tobuf(tarfile.USTAR_FORMAT)
   1982 
   1983         # 256 char name that cannot be stored
   1984         tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
   1985         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1986 
   1987         # 512 char name
   1988         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1989         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1990 
   1991         # 512 char linkname
   1992         tarinfo = tarfile.TarInfo("longlink")
   1993         tarinfo.linkname = "123/" * 126 + "longname"
   1994         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1995 
   1996         # uid > 8 digits
   1997         tarinfo = tarfile.TarInfo("name")
   1998         tarinfo.uid = 0o10000000
   1999         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   2000 
   2001     def test_gnu_limits(self):
   2002         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   2003         tarinfo.tobuf(tarfile.GNU_FORMAT)
   2004 
   2005         tarinfo = tarfile.TarInfo("longlink")
   2006         tarinfo.linkname = "123/" * 126 + "longname"
   2007         tarinfo.tobuf(tarfile.GNU_FORMAT)
   2008 
   2009         # uid >= 256 ** 7
   2010         tarinfo = tarfile.TarInfo("name")
   2011         tarinfo.uid = 0o4000000000000000000
   2012         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
   2013 
   2014     def test_pax_limits(self):
   2015         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   2016         tarinfo.tobuf(tarfile.PAX_FORMAT)
   2017 
   2018         tarinfo = tarfile.TarInfo("longlink")
   2019         tarinfo.linkname = "123/" * 126 + "longname"
   2020         tarinfo.tobuf(tarfile.PAX_FORMAT)
   2021 
   2022         tarinfo = tarfile.TarInfo("name")
   2023         tarinfo.uid = 0o4000000000000000000
   2024         tarinfo.tobuf(tarfile.PAX_FORMAT)
   2025 
   2026 
   2027 class MiscTest(unittest.TestCase):
   2028 
   2029     def test_char_fields(self):
   2030         self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
   2031                          b"foo\0\0\0\0\0")
   2032         self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
   2033                          b"foo")
   2034         self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
   2035                          "foo")
   2036         self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
   2037                          "foo")
   2038 
   2039     def test_read_number_fields(self):
   2040         # Issue 13158: Test if GNU tar specific base-256 number fields
   2041         # are decoded correctly.
   2042         self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
   2043         self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
   2044         self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
   2045                          0o10000000)
   2046         self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
   2047                          0xffffffff)
   2048         self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
   2049                          -1)
   2050         self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
   2051                          -100)
   2052         self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
   2053                          -0x100000000000000)
   2054 
   2055         # Issue 24514: Test if empty number fields are converted to zero.
   2056         self.assertEqual(tarfile.nti(b"\0"), 0)
   2057         self.assertEqual(tarfile.nti(b"       \0"), 0)
   2058 
   2059     def test_write_number_fields(self):
   2060         self.assertEqual(tarfile.itn(1), b"0000001\x00")
   2061         self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
   2062         self.assertEqual(tarfile.itn(0o10000000),
   2063                          b"\x80\x00\x00\x00\x00\x20\x00\x00")
   2064         self.assertEqual(tarfile.itn(0xffffffff),
   2065                          b"\x80\x00\x00\x00\xff\xff\xff\xff")
   2066         self.assertEqual(tarfile.itn(-1),
   2067                          b"\xff\xff\xff\xff\xff\xff\xff\xff")
   2068         self.assertEqual(tarfile.itn(-100),
   2069                          b"\xff\xff\xff\xff\xff\xff\xff\x9c")
   2070         self.assertEqual(tarfile.itn(-0x100000000000000),
   2071                          b"\xff\x00\x00\x00\x00\x00\x00\x00")
   2072 
   2073     def test_number_field_limits(self):
   2074         with self.assertRaises(ValueError):
   2075             tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
   2076         with self.assertRaises(ValueError):
   2077             tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
   2078         with self.assertRaises(ValueError):
   2079             tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
   2080         with self.assertRaises(ValueError):
   2081             tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
   2082 
   2083     def test__all__(self):
   2084         blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
   2085                      'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
   2086                      'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
   2087                      'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
   2088                      'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
   2089                      'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
   2090                      'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
   2091                      'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
   2092                      'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
   2093                      'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
   2094                      'filemode',
   2095                      'EmptyHeaderError', 'TruncatedHeaderError',
   2096                      'EOFHeaderError', 'InvalidHeaderError',
   2097                      'SubsequentHeaderError', 'ExFileObject',
   2098                      'main'}
   2099         support.check__all__(self, tarfile, blacklist=blacklist)
   2100 
   2101 
   2102 class CommandLineTest(unittest.TestCase):
   2103 
   2104     def tarfilecmd(self, *args, **kwargs):
   2105         rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
   2106                                                       **kwargs)
   2107         return out.replace(os.linesep.encode(), b'\n')
   2108 
   2109     def tarfilecmd_failure(self, *args):
   2110         return script_helper.assert_python_failure('-m', 'tarfile', *args)
   2111 
   2112     def make_simple_tarfile(self, tar_name):
   2113         files = [support.findfile('tokenize_tests.txt'),
   2114                  support.findfile('tokenize_tests-no-coding-cookie-'
   2115                                   'and-utf8-bom-sig-only.txt')]
   2116         self.addCleanup(support.unlink, tar_name)
   2117         with tarfile.open(tar_name, 'w') as tf:
   2118             for tardata in files:
   2119                 tf.add(tardata, arcname=os.path.basename(tardata))
   2120 
   2121     def test_test_command(self):
   2122         for tar_name in testtarnames:
   2123             for opt in '-t', '--test':
   2124                 out = self.tarfilecmd(opt, tar_name)
   2125                 self.assertEqual(out, b'')
   2126 
   2127     def test_test_command_verbose(self):
   2128         for tar_name in testtarnames:
   2129             for opt in '-v', '--verbose':
   2130                 out = self.tarfilecmd(opt, '-t', tar_name)
   2131                 self.assertIn(b'is a tar archive.\n', out)
   2132 
   2133     def test_test_command_invalid_file(self):
   2134         zipname = support.findfile('zipdir.zip')
   2135         rc, out, err = self.tarfilecmd_failure('-t', zipname)
   2136         self.assertIn(b' is not a tar archive.', err)
   2137         self.assertEqual(out, b'')
   2138         self.assertEqual(rc, 1)
   2139 
   2140         for tar_name in testtarnames:
   2141             with self.subTest(tar_name=tar_name):
   2142                 with open(tar_name, 'rb') as f:
   2143                     data = f.read()
   2144                 try:
   2145                     with open(tmpname, 'wb') as f:
   2146                         f.write(data[:511])
   2147                     rc, out, err = self.tarfilecmd_failure('-t', tmpname)
   2148                     self.assertEqual(out, b'')
   2149                     self.assertEqual(rc, 1)
   2150                 finally:
   2151                     support.unlink(tmpname)
   2152 
   2153     def test_list_command(self):
   2154         for tar_name in testtarnames:
   2155             with support.captured_stdout() as t:
   2156                 with tarfile.open(tar_name, 'r') as tf:
   2157                     tf.list(verbose=False)
   2158             expected = t.getvalue().encode('ascii', 'backslashreplace')
   2159             for opt in '-l', '--list':
   2160                 out = self.tarfilecmd(opt, tar_name,
   2161                                       PYTHONIOENCODING='ascii')
   2162                 self.assertEqual(out, expected)
   2163 
   2164     def test_list_command_verbose(self):
   2165         for tar_name in testtarnames:
   2166             with support.captured_stdout() as t:
   2167                 with tarfile.open(tar_name, 'r') as tf:
   2168                     tf.list(verbose=True)
   2169             expected = t.getvalue().encode('ascii', 'backslashreplace')
   2170             for opt in '-v', '--verbose':
   2171                 out = self.tarfilecmd(opt, '-l', tar_name,
   2172                                       PYTHONIOENCODING='ascii')
   2173                 self.assertEqual(out, expected)
   2174 
   2175     def test_list_command_invalid_file(self):
   2176         zipname = support.findfile('zipdir.zip')
   2177         rc, out, err = self.tarfilecmd_failure('-l', zipname)
   2178         self.assertIn(b' is not a tar archive.', err)
   2179         self.assertEqual(out, b'')
   2180         self.assertEqual(rc, 1)
   2181 
   2182     def test_create_command(self):
   2183         files = [support.findfile('tokenize_tests.txt'),
   2184                  support.findfile('tokenize_tests-no-coding-cookie-'
   2185                                   'and-utf8-bom-sig-only.txt')]
   2186         for opt in '-c', '--create':
   2187             try:
   2188                 out = self.tarfilecmd(opt, tmpname, *files)
   2189                 self.assertEqual(out, b'')
   2190                 with tarfile.open(tmpname) as tar:
   2191                     tar.getmembers()
   2192             finally:
   2193                 support.unlink(tmpname)
   2194 
   2195     def test_create_command_verbose(self):
   2196         files = [support.findfile('tokenize_tests.txt'),
   2197                  support.findfile('tokenize_tests-no-coding-cookie-'
   2198                                   'and-utf8-bom-sig-only.txt')]
   2199         for opt in '-v', '--verbose':
   2200             try:
   2201                 out = self.tarfilecmd(opt, '-c', tmpname, *files)
   2202                 self.assertIn(b' file created.', out)
   2203                 with tarfile.open(tmpname) as tar:
   2204                     tar.getmembers()
   2205             finally:
   2206                 support.unlink(tmpname)
   2207 
   2208     def test_create_command_dotless_filename(self):
   2209         files = [support.findfile('tokenize_tests.txt')]
   2210         try:
   2211             out = self.tarfilecmd('-c', dotlessname, *files)
   2212             self.assertEqual(out, b'')
   2213             with tarfile.open(dotlessname) as tar:
   2214                 tar.getmembers()
   2215         finally:
   2216             support.unlink(dotlessname)
   2217 
   2218     def test_create_command_dot_started_filename(self):
   2219         tar_name = os.path.join(TEMPDIR, ".testtar")
   2220         files = [support.findfile('tokenize_tests.txt')]
   2221         try:
   2222             out = self.tarfilecmd('-c', tar_name, *files)
   2223             self.assertEqual(out, b'')
   2224             with tarfile.open(tar_name) as tar:
   2225                 tar.getmembers()
   2226         finally:
   2227             support.unlink(tar_name)
   2228 
   2229     def test_create_command_compressed(self):
   2230         files = [support.findfile('tokenize_tests.txt'),
   2231                  support.findfile('tokenize_tests-no-coding-cookie-'
   2232                                   'and-utf8-bom-sig-only.txt')]
   2233         for filetype in (GzipTest, Bz2Test, LzmaTest):
   2234             if not filetype.open:
   2235                 continue
   2236             try:
   2237                 tar_name = tmpname + '.' + filetype.suffix
   2238                 out = self.tarfilecmd('-c', tar_name, *files)
   2239                 with filetype.taropen(tar_name) as tar:
   2240                     tar.getmembers()
   2241             finally:
   2242                 support.unlink(tar_name)
   2243 
   2244     def test_extract_command(self):
   2245         self.make_simple_tarfile(tmpname)
   2246         for opt in '-e', '--extract':
   2247             try:
   2248                 with support.temp_cwd(tarextdir):
   2249                     out = self.tarfilecmd(opt, tmpname)
   2250                 self.assertEqual(out, b'')
   2251             finally:
   2252                 support.rmtree(tarextdir)
   2253 
   2254     def test_extract_command_verbose(self):
   2255         self.make_simple_tarfile(tmpname)
   2256         for opt in '-v', '--verbose':
   2257             try:
   2258                 with support.temp_cwd(tarextdir):
   2259                     out = self.tarfilecmd(opt, '-e', tmpname)
   2260                 self.assertIn(b' file is extracted.', out)
   2261             finally:
   2262                 support.rmtree(tarextdir)
   2263 
   2264     def test_extract_command_different_directory(self):
   2265         self.make_simple_tarfile(tmpname)
   2266         try:
   2267             with support.temp_cwd(tarextdir):
   2268                 out = self.tarfilecmd('-e', tmpname, 'spamdir')
   2269             self.assertEqual(out, b'')
   2270         finally:
   2271             support.rmtree(tarextdir)
   2272 
   2273     def test_extract_command_invalid_file(self):
   2274         zipname = support.findfile('zipdir.zip')
   2275         with support.temp_cwd(tarextdir):
   2276             rc, out, err = self.tarfilecmd_failure('-e', zipname)
   2277         self.assertIn(b' is not a tar archive.', err)
   2278         self.assertEqual(out, b'')
   2279         self.assertEqual(rc, 1)
   2280 
   2281 
   2282 class ContextManagerTest(unittest.TestCase):
   2283 
   2284     def test_basic(self):
   2285         with tarfile.open(tarname) as tar:
   2286             self.assertFalse(tar.closed, "closed inside runtime context")
   2287         self.assertTrue(tar.closed, "context manager failed")
   2288 
   2289     def test_closed(self):
   2290         # The __enter__() method is supposed to raise OSError
   2291         # if the TarFile object is already closed.
   2292         tar = tarfile.open(tarname)
   2293         tar.close()
   2294         with self.assertRaises(OSError):
   2295             with tar:
   2296                 pass
   2297 
   2298     def test_exception(self):
   2299         # Test if the OSError exception is passed through properly.
   2300         with self.assertRaises(Exception) as exc:
   2301             with tarfile.open(tarname) as tar:
   2302                 raise OSError
   2303         self.assertIsInstance(exc.exception, OSError,
   2304                               "wrong exception raised in context manager")
   2305         self.assertTrue(tar.closed, "context manager failed")
   2306 
   2307     def test_no_eof(self):
   2308         # __exit__() must not write end-of-archive blocks if an
   2309         # exception was raised.
   2310         try:
   2311             with tarfile.open(tmpname, "w") as tar:
   2312                 raise Exception
   2313         except:
   2314             pass
   2315         self.assertEqual(os.path.getsize(tmpname), 0,
   2316                 "context manager wrote an end-of-archive block")
   2317         self.assertTrue(tar.closed, "context manager failed")
   2318 
   2319     def test_eof(self):
   2320         # __exit__() must write end-of-archive blocks, i.e. call
   2321         # TarFile.close() if there was no error.
   2322         with tarfile.open(tmpname, "w"):
   2323             pass
   2324         self.assertNotEqual(os.path.getsize(tmpname), 0,
   2325                 "context manager wrote no end-of-archive block")
   2326 
   2327     def test_fileobj(self):
   2328         # Test that __exit__() did not close the external file
   2329         # object.
   2330         with open(tmpname, "wb") as fobj:
   2331             try:
   2332                 with tarfile.open(fileobj=fobj, mode="w") as tar:
   2333                     raise Exception
   2334             except:
   2335                 pass
   2336             self.assertFalse(fobj.closed, "external file object was closed")
   2337             self.assertTrue(tar.closed, "context manager failed")
   2338 
   2339 
   2340 @unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
   2341 class LinkEmulationTest(ReadTest, unittest.TestCase):
   2342 
   2343     # Test for issue #8741 regression. On platforms that do not support
   2344     # symbolic or hard links tarfile tries to extract these types of members
   2345     # as the regular files they point to.
   2346     def _test_link_extraction(self, name):
   2347         self.tar.extract(name, TEMPDIR)
   2348         with open(os.path.join(TEMPDIR, name), "rb") as f:
   2349             data = f.read()
   2350         self.assertEqual(md5sum(data), md5_regtype)
   2351 
   2352     # See issues #1578269, #8879, and #17689 for some history on these skips
   2353     @unittest.skipIf(hasattr(os.path, "islink"),
   2354                      "Skip emulation - has os.path.islink but not os.link")
   2355     def test_hardlink_extraction1(self):
   2356         self._test_link_extraction("ustar/lnktype")
   2357 
   2358     @unittest.skipIf(hasattr(os.path, "islink"),
   2359                      "Skip emulation - has os.path.islink but not os.link")
   2360     def test_hardlink_extraction2(self):
   2361         self._test_link_extraction("./ustar/linktest2/lnktype")
   2362 
   2363     @unittest.skipIf(hasattr(os, "symlink"),
   2364                      "Skip emulation if symlink exists")
   2365     def test_symlink_extraction1(self):
   2366         self._test_link_extraction("ustar/symtype")
   2367 
   2368     @unittest.skipIf(hasattr(os, "symlink"),
   2369                      "Skip emulation if symlink exists")
   2370     def test_symlink_extraction2(self):
   2371         self._test_link_extraction("./ustar/linktest2/symtype")
   2372 
   2373 
   2374 class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
   2375     # Issue5068: The _BZ2Proxy.read() method loops forever
   2376     # on an empty or partial bzipped file.
   2377 
   2378     def _test_partial_input(self, mode):
   2379         class MyBytesIO(io.BytesIO):
   2380             hit_eof = False
   2381             def read(self, n):
   2382                 if self.hit_eof:
   2383                     raise AssertionError("infinite loop detected in "
   2384                                          "tarfile.open()")
   2385                 self.hit_eof = self.tell() == len(self.getvalue())
   2386                 return super(MyBytesIO, self).read(n)
   2387             def seek(self, *args):
   2388                 self.hit_eof = False
   2389                 return super(MyBytesIO, self).seek(*args)
   2390 
   2391         data = bz2.compress(tarfile.TarInfo("foo").tobuf())
   2392         for x in range(len(data) + 1):
   2393             try:
   2394                 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
   2395             except tarfile.ReadError:
   2396                 pass # we have no interest in ReadErrors
   2397 
   2398     def test_partial_input(self):
   2399         self._test_partial_input("r")
   2400 
   2401     def test_partial_input_bz2(self):
   2402         self._test_partial_input("r:bz2")
   2403 
   2404 
   2405 def root_is_uid_gid_0():
   2406     try:
   2407         import pwd, grp
   2408     except ImportError:
   2409         return False
   2410     if pwd.getpwuid(0)[0] != 'root':
   2411         return False
   2412     if grp.getgrgid(0)[0] != 'root':
   2413         return False
   2414     return True
   2415 
   2416 
   2417 @unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
   2418 @unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
   2419 class NumericOwnerTest(unittest.TestCase):
   2420     # mock the following:
   2421     #  os.chown: so we can test what's being called
   2422     #  os.chmod: so the modes are not actually changed. if they are, we can't
   2423     #             delete the files/directories
   2424     #  os.geteuid: so we can lie and say we're root (uid = 0)
   2425 
   2426     @staticmethod
   2427     def _make_test_archive(filename_1, dirname_1, filename_2):
   2428         # the file contents to write
   2429         fobj = io.BytesIO(b"content")
   2430 
   2431         # create a tar file with a file, a directory, and a file within that
   2432         #  directory. Assign various .uid/.gid values to them
   2433         items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
   2434                  (dirname_1,  77, 76, tarfile.DIRTYPE, None),
   2435                  (filename_2, 88, 87, tarfile.REGTYPE, fobj),
   2436                  ]
   2437         with tarfile.open(tmpname, 'w') as tarfl:
   2438             for name, uid, gid, typ, contents in items:
   2439                 t = tarfile.TarInfo(name)
   2440                 t.uid = uid
   2441                 t.gid = gid
   2442                 t.uname = 'root'
   2443                 t.gname = 'root'
   2444                 t.type = typ
   2445                 tarfl.addfile(t, contents)
   2446 
   2447         # return the full pathname to the tar file
   2448         return tmpname
   2449 
   2450     @staticmethod
   2451     @contextmanager
   2452     def _setup_test(mock_geteuid):
   2453         mock_geteuid.return_value = 0  # lie and say we're root
   2454         fname = 'numeric-owner-testfile'
   2455         dirname = 'dir'
   2456 
   2457         # the names we want stored in the tarfile
   2458         filename_1 = fname
   2459         dirname_1 = dirname
   2460         filename_2 = os.path.join(dirname, fname)
   2461 
   2462         # create the tarfile with the contents we're after
   2463         tar_filename = NumericOwnerTest._make_test_archive(filename_1,
   2464                                                            dirname_1,
   2465                                                            filename_2)
   2466 
   2467         # open the tarfile for reading. yield it and the names of the items
   2468         #  we stored into the file
   2469         with tarfile.open(tar_filename) as tarfl:
   2470             yield tarfl, filename_1, dirname_1, filename_2
   2471 
   2472     @unittest.mock.patch('os.chown')
   2473     @unittest.mock.patch('os.chmod')
   2474     @unittest.mock.patch('os.geteuid')
   2475     def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
   2476                                         mock_chown):
   2477         with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
   2478                                                 filename_2):
   2479             tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
   2480             tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
   2481 
   2482         # convert to filesystem paths
   2483         f_filename_1 = os.path.join(TEMPDIR, filename_1)
   2484         f_filename_2 = os.path.join(TEMPDIR, filename_2)
   2485 
   2486         mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
   2487                                      unittest.mock.call(f_filename_2, 88, 87),
   2488                                      ],
   2489                                     any_order=True)
   2490 
   2491     @unittest.mock.patch('os.chown')
   2492     @unittest.mock.patch('os.chmod')
   2493     @unittest.mock.patch('os.geteuid')
   2494     def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
   2495                                            mock_chown):
   2496         with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
   2497                                                 filename_2):
   2498             tarfl.extractall(TEMPDIR, numeric_owner=True)
   2499 
   2500         # convert to filesystem paths
   2501         f_filename_1 = os.path.join(TEMPDIR, filename_1)
   2502         f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
   2503         f_filename_2 = os.path.join(TEMPDIR, filename_2)
   2504 
   2505         mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
   2506                                      unittest.mock.call(f_dirname_1, 77, 76),
   2507                                      unittest.mock.call(f_filename_2, 88, 87),
   2508                                      ],
   2509                                     any_order=True)
   2510 
   2511     # this test requires that uid=0 and gid=0 really be named 'root'. that's
   2512     #  because the uname and gname in the test file are 'root', and extract()
   2513     #  will look them up using pwd and grp to find their uid and gid, which we
   2514     #  test here to be 0.
   2515     @unittest.skipUnless(root_is_uid_gid_0(),
   2516                          'uid=0,gid=0 must be named "root"')
   2517     @unittest.mock.patch('os.chown')
   2518     @unittest.mock.patch('os.chmod')
   2519     @unittest.mock.patch('os.geteuid')
   2520     def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
   2521                                            mock_chown):
   2522         with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
   2523             tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
   2524 
   2525         # convert to filesystem paths
   2526         f_filename_1 = os.path.join(TEMPDIR, filename_1)
   2527 
   2528         mock_chown.assert_called_with(f_filename_1, 0, 0)
   2529 
   2530     @unittest.mock.patch('os.geteuid')
   2531     def test_keyword_only(self, mock_geteuid):
   2532         with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
   2533             self.assertRaises(TypeError,
   2534                               tarfl.extract, filename_1, TEMPDIR, False, True)
   2535 
   2536 
   2537 def setUpModule():
   2538     support.unlink(TEMPDIR)
   2539     os.makedirs(TEMPDIR)
   2540 
   2541     global testtarnames
   2542     testtarnames = [tarname]
   2543     with open(tarname, "rb") as fobj:
   2544         data = fobj.read()
   2545 
   2546     # Create compressed tarfiles.
   2547     for c in GzipTest, Bz2Test, LzmaTest:
   2548         if c.open:
   2549             support.unlink(c.tarname)
   2550             testtarnames.append(c.tarname)
   2551             with c.open(c.tarname, "wb") as tar:
   2552                 tar.write(data)
   2553 
   2554 def tearDownModule():
   2555     if os.path.exists(TEMPDIR):
   2556         support.rmtree(TEMPDIR)
   2557 
   2558 if __name__ == "__main__":
   2559     unittest.main()
   2560