Home | History | Annotate | Download | only in test
      1 # -*- coding: iso-8859-15 -*-
      2 
      3 import sys
      4 import os
      5 import shutil
      6 import StringIO
      7 from hashlib import md5
      8 import errno
      9 
     10 import unittest
     11 import tarfile
     12 
     13 from test import test_support
     14 
     15 # Check for our compression modules.
     16 try:
     17     import gzip
     18     gzip.GzipFile
     19 except (ImportError, AttributeError):
     20     gzip = None
     21 try:
     22     import bz2
     23 except ImportError:
     24     bz2 = None
     25 
     26 def md5sum(data):
     27     return md5(data).hexdigest()
     28 
     29 TEMPDIR = os.path.abspath(test_support.TESTFN)
     30 tarname = test_support.findfile("testtar.tar")
     31 gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
     32 bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
     33 tmpname = os.path.join(TEMPDIR, "tmp.tar")
     34 
     35 md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
     36 md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
     37 
     38 
     39 class ReadTest(unittest.TestCase):
     40 
     41     tarname = tarname
     42     mode = "r:"
     43 
     44     def setUp(self):
     45         self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
     46 
     47     def tearDown(self):
     48         self.tar.close()
     49 
     50 
     51 class UstarReadTest(ReadTest):
     52 
     53     def test_fileobj_regular_file(self):
     54         tarinfo = self.tar.getmember("ustar/regtype")
     55         fobj = self.tar.extractfile(tarinfo)
     56         data = fobj.read()
     57         self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
     58                 "regular file extraction failed")
     59 
     60     def test_fileobj_readlines(self):
     61         self.tar.extract("ustar/regtype", TEMPDIR)
     62         tarinfo = self.tar.getmember("ustar/regtype")
     63         fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
     64         fobj2 = self.tar.extractfile(tarinfo)
     65 
     66         lines1 = fobj1.readlines()
     67         lines2 = fobj2.readlines()
     68         self.assertTrue(lines1 == lines2,
     69                 "fileobj.readlines() failed")
     70         self.assertTrue(len(lines2) == 114,
     71                 "fileobj.readlines() failed")
     72         self.assertTrue(lines2[83] ==
     73                 "I will gladly admit that Python is not the fastest running scripting language.\n",
     74                 "fileobj.readlines() failed")
     75 
     76     def test_fileobj_iter(self):
     77         self.tar.extract("ustar/regtype", TEMPDIR)
     78         tarinfo = self.tar.getmember("ustar/regtype")
     79         fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
     80         fobj2 = self.tar.extractfile(tarinfo)
     81         lines1 = fobj1.readlines()
     82         lines2 = [line for line in fobj2]
     83         self.assertTrue(lines1 == lines2,
     84                      "fileobj.__iter__() failed")
     85 
     86     def test_fileobj_seek(self):
     87         self.tar.extract("ustar/regtype", TEMPDIR)
     88         fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
     89         data = fobj.read()
     90         fobj.close()
     91 
     92         tarinfo = self.tar.getmember("ustar/regtype")
     93         fobj = self.tar.extractfile(tarinfo)
     94 
     95         text = fobj.read()
     96         fobj.seek(0)
     97         self.assertTrue(0 == fobj.tell(),
     98                      "seek() to file's start failed")
     99         fobj.seek(2048, 0)
    100         self.assertTrue(2048 == fobj.tell(),
    101                      "seek() to absolute position failed")
    102         fobj.seek(-1024, 1)
    103         self.assertTrue(1024 == fobj.tell(),
    104                      "seek() to negative relative position failed")
    105         fobj.seek(1024, 1)
    106         self.assertTrue(2048 == fobj.tell(),
    107                      "seek() to positive relative position failed")
    108         s = fobj.read(10)
    109         self.assertTrue(s == data[2048:2058],
    110                      "read() after seek failed")
    111         fobj.seek(0, 2)
    112         self.assertTrue(tarinfo.size == fobj.tell(),
    113                      "seek() to file's end failed")
    114         self.assertTrue(fobj.read() == "",
    115                      "read() at file's end did not return empty string")
    116         fobj.seek(-tarinfo.size, 2)
    117         self.assertTrue(0 == fobj.tell(),
    118                      "relative seek() to file's start failed")
    119         fobj.seek(512)
    120         s1 = fobj.readlines()
    121         fobj.seek(512)
    122         s2 = fobj.readlines()
    123         self.assertTrue(s1 == s2,
    124                      "readlines() after seek failed")
    125         fobj.seek(0)
    126         self.assertTrue(len(fobj.readline()) == fobj.tell(),
    127                      "tell() after readline() failed")
    128         fobj.seek(512)
    129         self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
    130                      "tell() after seek() and readline() failed")
    131         fobj.seek(0)
    132         line = fobj.readline()
    133         self.assertTrue(fobj.read() == data[len(line):],
    134                      "read() after readline() failed")
    135         fobj.close()
    136 
    137     # Test if symbolic and hard links are resolved by extractfile().  The
    138     # test link members each point to a regular member whose data is
    139     # supposed to be exported.
    140     def _test_fileobj_link(self, lnktype, regtype):
    141         a = self.tar.extractfile(lnktype)
    142         b = self.tar.extractfile(regtype)
    143         self.assertEqual(a.name, b.name)
    144 
    145     def test_fileobj_link1(self):
    146         self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
    147 
    148     def test_fileobj_link2(self):
    149         self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
    150 
    151     def test_fileobj_symlink1(self):
    152         self._test_fileobj_link("ustar/symtype", "ustar/regtype")
    153 
    154     def test_fileobj_symlink2(self):
    155         self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
    156 
    157     def test_issue14160(self):
    158         self._test_fileobj_link("symtype2", "ustar/regtype")
    159 
    160 
    161 class CommonReadTest(ReadTest):
    162 
    163     def test_empty_tarfile(self):
    164         # Test for issue6123: Allow opening empty archives.
    165         # This test checks if tarfile.open() is able to open an empty tar
    166         # archive successfully. Note that an empty tar archive is not the
    167         # same as an empty file!
    168         tarfile.open(tmpname, self.mode.replace("r", "w")).close()
    169         try:
    170             tar = tarfile.open(tmpname, self.mode)
    171             tar.getnames()
    172         except tarfile.ReadError:
    173             self.fail("tarfile.open() failed on empty archive")
    174         self.assertListEqual(tar.getmembers(), [])
    175 
    176     def test_null_tarfile(self):
    177         # Test for issue6123: Allow opening empty archives.
    178         # This test guarantees that tarfile.open() does not treat an empty
    179         # file as an empty tar archive.
    180         open(tmpname, "wb").close()
    181         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
    182         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
    183 
    184     def test_ignore_zeros(self):
    185         # Test TarFile's ignore_zeros option.
    186         if self.mode.endswith(":gz"):
    187             _open = gzip.GzipFile
    188         elif self.mode.endswith(":bz2"):
    189             _open = bz2.BZ2File
    190         else:
    191             _open = open
    192 
    193         for char in ('\0', 'a'):
    194             # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
    195             # are ignored correctly.
    196             fobj = _open(tmpname, "wb")
    197             fobj.write(char * 1024)
    198             fobj.write(tarfile.TarInfo("foo").tobuf())
    199             fobj.close()
    200 
    201             tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
    202             self.assertListEqual(tar.getnames(), ["foo"],
    203                     "ignore_zeros=True should have skipped the %r-blocks" % char)
    204             tar.close()
    205 
    206 
    207 class MiscReadTest(CommonReadTest):
    208 
    209     def test_no_name_argument(self):
    210         fobj = open(self.tarname, "rb")
    211         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    212         self.assertEqual(tar.name, os.path.abspath(fobj.name))
    213 
    214     def test_no_name_attribute(self):
    215         data = open(self.tarname, "rb").read()
    216         fobj = StringIO.StringIO(data)
    217         self.assertRaises(AttributeError, getattr, fobj, "name")
    218         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    219         self.assertEqual(tar.name, None)
    220 
    221     def test_empty_name_attribute(self):
    222         data = open(self.tarname, "rb").read()
    223         fobj = StringIO.StringIO(data)
    224         fobj.name = ""
    225         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    226         self.assertEqual(tar.name, None)
    227 
    228     def test_fileobj_with_offset(self):
    229         # Skip the first member and store values from the second member
    230         # of the testtar.
    231         tar = tarfile.open(self.tarname, mode=self.mode)
    232         tar.next()
    233         t = tar.next()
    234         name = t.name
    235         offset = t.offset
    236         data = tar.extractfile(t).read()
    237         tar.close()
    238 
    239         # Open the testtar and seek to the offset of the second member.
    240         if self.mode.endswith(":gz"):
    241             _open = gzip.GzipFile
    242         elif self.mode.endswith(":bz2"):
    243             _open = bz2.BZ2File
    244         else:
    245             _open = open
    246         fobj = _open(self.tarname, "rb")
    247         fobj.seek(offset)
    248 
    249         # Test if the tarfile starts with the second member.
    250         tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
    251         t = tar.next()
    252         self.assertEqual(t.name, name)
    253         # Read to the end of fileobj and test if seeking back to the
    254         # beginning works.
    255         tar.getmembers()
    256         self.assertEqual(tar.extractfile(t).read(), data,
    257                 "seek back did not work")
    258         tar.close()
    259 
    260     def test_fail_comp(self):
    261         # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
    262         if self.mode == "r:":
    263             return
    264         self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
    265         fobj = open(tarname, "rb")
    266         self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
    267 
    268     def test_v7_dirtype(self):
    269         # Test old style dirtype member (bug #1336623):
    270         # Old V7 tars create directory members using an AREGTYPE
    271         # header with a "/" appended to the filename field.
    272         tarinfo = self.tar.getmember("misc/dirtype-old-v7")
    273         self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
    274                 "v7 dirtype failed")
    275 
    276     def test_xstar_type(self):
    277         # The xstar format stores extra atime and ctime fields inside the
    278         # space reserved for the prefix field. The prefix field must be
    279         # ignored in this case, otherwise it will mess up the name.
    280         try:
    281             self.tar.getmember("misc/regtype-xstar")
    282         except KeyError:
    283             self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
    284 
    285     def test_check_members(self):
    286         for tarinfo in self.tar:
    287             self.assertTrue(int(tarinfo.mtime) == 07606136617,
    288                     "wrong mtime for %s" % tarinfo.name)
    289             if not tarinfo.name.startswith("ustar/"):
    290                 continue
    291             self.assertTrue(tarinfo.uname == "tarfile",
    292                     "wrong uname for %s" % tarinfo.name)
    293 
    294     def test_find_members(self):
    295         self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
    296                 "could not find all members")
    297 
    298     def test_extract_hardlink(self):
    299         # Test hardlink extraction (e.g. bug #857297).
    300         with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
    301             tar.extract("ustar/regtype", TEMPDIR)
    302             self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
    303 
    304             tar.extract("ustar/lnktype", TEMPDIR)
    305             self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
    306             with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
    307                 data = f.read()
    308             self.assertEqual(md5sum(data), md5_regtype)
    309 
    310             tar.extract("ustar/symtype", TEMPDIR)
    311             self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
    312             with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
    313                 data = f.read()
    314             self.assertEqual(md5sum(data), md5_regtype)
    315 
    316     def test_extractall(self):
    317         # Test if extractall() correctly restores directory permissions
    318         # and times (see issue1735).
    319         tar = tarfile.open(tarname, encoding="iso8859-1")
    320         directories = [t for t in tar if t.isdir()]
    321         tar.extractall(TEMPDIR, directories)
    322         for tarinfo in directories:
    323             path = os.path.join(TEMPDIR, tarinfo.name)
    324             if sys.platform != "win32":
    325                 # Win32 has no support for fine grained permissions.
    326                 self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
    327             self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
    328         tar.close()
    329 
    330     def test_init_close_fobj(self):
    331         # Issue #7341: Close the internal file object in the TarFile
    332         # constructor in case of an error. For the test we rely on
    333         # the fact that opening an empty file raises a ReadError.
    334         empty = os.path.join(TEMPDIR, "empty")
    335         open(empty, "wb").write("")
    336 
    337         try:
    338             tar = object.__new__(tarfile.TarFile)
    339             try:
    340                 tar.__init__(empty)
    341             except tarfile.ReadError:
    342                 self.assertTrue(tar.fileobj.closed)
    343             else:
    344                 self.fail("ReadError not raised")
    345         finally:
    346             os.remove(empty)
    347 
    348     def test_parallel_iteration(self):
    349         # Issue #16601: Restarting iteration over tarfile continued
    350         # from where it left off.
    351         with tarfile.open(self.tarname) as tar:
    352             for m1, m2 in zip(tar, tar):
    353                 self.assertEqual(m1.offset, m2.offset)
    354                 self.assertEqual(m1.name, m2.name)
    355 
    356 
    357 class StreamReadTest(CommonReadTest):
    358 
    359     mode="r|"
    360 
    361     def test_fileobj_regular_file(self):
    362         tarinfo = self.tar.next() # get "regtype" (can't use getmember)
    363         fobj = self.tar.extractfile(tarinfo)
    364         data = fobj.read()
    365         self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
    366                 "regular file extraction failed")
    367 
    368     def test_provoke_stream_error(self):
    369         tarinfos = self.tar.getmembers()
    370         f = self.tar.extractfile(tarinfos[0]) # read the first member
    371         self.assertRaises(tarfile.StreamError, f.read)
    372 
    373     def test_compare_members(self):
    374         tar1 = tarfile.open(tarname, encoding="iso8859-1")
    375         tar2 = self.tar
    376 
    377         while True:
    378             t1 = tar1.next()
    379             t2 = tar2.next()
    380             if t1 is None:
    381                 break
    382             self.assertTrue(t2 is not None, "stream.next() failed.")
    383 
    384             if t2.islnk() or t2.issym():
    385                 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
    386                 continue
    387 
    388             v1 = tar1.extractfile(t1)
    389             v2 = tar2.extractfile(t2)
    390             if v1 is None:
    391                 continue
    392             self.assertTrue(v2 is not None, "stream.extractfile() failed")
    393             self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
    394 
    395         tar1.close()
    396 
    397 
    398 class DetectReadTest(unittest.TestCase):
    399 
    400     def _testfunc_file(self, name, mode):
    401         try:
    402             tarfile.open(name, mode)
    403         except tarfile.ReadError:
    404             self.fail()
    405 
    406     def _testfunc_fileobj(self, name, mode):
    407         try:
    408             tarfile.open(name, mode, fileobj=open(name, "rb"))
    409         except tarfile.ReadError:
    410             self.fail()
    411 
    412     def _test_modes(self, testfunc):
    413         testfunc(tarname, "r")
    414         testfunc(tarname, "r:")
    415         testfunc(tarname, "r:*")
    416         testfunc(tarname, "r|")
    417         testfunc(tarname, "r|*")
    418 
    419         if gzip:
    420             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
    421             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
    422             self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
    423             self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
    424 
    425             testfunc(gzipname, "r")
    426             testfunc(gzipname, "r:*")
    427             testfunc(gzipname, "r:gz")
    428             testfunc(gzipname, "r|*")
    429             testfunc(gzipname, "r|gz")
    430 
    431         if bz2:
    432             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
    433             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
    434             self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
    435             self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
    436 
    437             testfunc(bz2name, "r")
    438             testfunc(bz2name, "r:*")
    439             testfunc(bz2name, "r:bz2")
    440             testfunc(bz2name, "r|*")
    441             testfunc(bz2name, "r|bz2")
    442 
    443     def test_detect_file(self):
    444         self._test_modes(self._testfunc_file)
    445 
    446     def test_detect_fileobj(self):
    447         self._test_modes(self._testfunc_fileobj)
    448 
    449     def test_detect_stream_bz2(self):
    450         # Originally, tarfile's stream detection looked for the string
    451         # "BZh91" at the start of the file. This is incorrect because
    452         # the '9' represents the blocksize (900kB). If the file was
    453         # compressed using another blocksize autodetection fails.
    454         if not bz2:
    455             return
    456 
    457         with open(tarname, "rb") as fobj:
    458             data = fobj.read()
    459 
    460         # Compress with blocksize 100kB, the file starts with "BZh11".
    461         with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
    462             fobj.write(data)
    463 
    464         self._testfunc_file(tmpname, "r|*")
    465 
    466 
    467 class MemberReadTest(ReadTest):
    468 
    469     def _test_member(self, tarinfo, chksum=None, **kwargs):
    470         if chksum is not None:
    471             self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
    472                     "wrong md5sum for %s" % tarinfo.name)
    473 
    474         kwargs["mtime"] = 07606136617
    475         kwargs["uid"] = 1000
    476         kwargs["gid"] = 100
    477         if "old-v7" not in tarinfo.name:
    478             # V7 tar can't handle alphabetic owners.
    479             kwargs["uname"] = "tarfile"
    480             kwargs["gname"] = "tarfile"
    481         for k, v in kwargs.iteritems():
    482             self.assertTrue(getattr(tarinfo, k) == v,
    483                     "wrong value in %s field of %s" % (k, tarinfo.name))
    484 
    485     def test_find_regtype(self):
    486         tarinfo = self.tar.getmember("ustar/regtype")
    487         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    488 
    489     def test_find_conttype(self):
    490         tarinfo = self.tar.getmember("ustar/conttype")
    491         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    492 
    493     def test_find_dirtype(self):
    494         tarinfo = self.tar.getmember("ustar/dirtype")
    495         self._test_member(tarinfo, size=0)
    496 
    497     def test_find_dirtype_with_size(self):
    498         tarinfo = self.tar.getmember("ustar/dirtype-with-size")
    499         self._test_member(tarinfo, size=255)
    500 
    501     def test_find_lnktype(self):
    502         tarinfo = self.tar.getmember("ustar/lnktype")
    503         self._test_member(tarinfo, size=0, linkname="ustar/regtype")
    504 
    505     def test_find_symtype(self):
    506         tarinfo = self.tar.getmember("ustar/symtype")
    507         self._test_member(tarinfo, size=0, linkname="regtype")
    508 
    509     def test_find_blktype(self):
    510         tarinfo = self.tar.getmember("ustar/blktype")
    511         self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
    512 
    513     def test_find_chrtype(self):
    514         tarinfo = self.tar.getmember("ustar/chrtype")
    515         self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
    516 
    517     def test_find_fifotype(self):
    518         tarinfo = self.tar.getmember("ustar/fifotype")
    519         self._test_member(tarinfo, size=0)
    520 
    521     def test_find_sparse(self):
    522         tarinfo = self.tar.getmember("ustar/sparse")
    523         self._test_member(tarinfo, size=86016, chksum=md5_sparse)
    524 
    525     def test_find_umlauts(self):
    526         tarinfo = self.tar.getmember("ustar/umlauts-")
    527         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    528 
    529     def test_find_ustar_longname(self):
    530         name = "ustar/" + "12345/" * 39 + "1234567/longname"
    531         self.assertIn(name, self.tar.getnames())
    532 
    533     def test_find_regtype_oldv7(self):
    534         tarinfo = self.tar.getmember("misc/regtype-old-v7")
    535         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    536 
    537     def test_find_pax_umlauts(self):
    538         self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
    539         tarinfo = self.tar.getmember("pax/umlauts-")
    540         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    541 
    542 
    543 class LongnameTest(ReadTest):
    544 
    545     def test_read_longname(self):
    546         # Test reading of longname (bug #1471427).
    547         longname = self.subdir + "/" + "123/" * 125 + "longname"
    548         try:
    549             tarinfo = self.tar.getmember(longname)
    550         except KeyError:
    551             self.fail("longname not found")
    552         self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
    553 
    554     def test_read_longlink(self):
    555         longname = self.subdir + "/" + "123/" * 125 + "longname"
    556         longlink = self.subdir + "/" + "123/" * 125 + "longlink"
    557         try:
    558             tarinfo = self.tar.getmember(longlink)
    559         except KeyError:
    560             self.fail("longlink not found")
    561         self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
    562 
    563     def test_truncated_longname(self):
    564         longname = self.subdir + "/" + "123/" * 125 + "longname"
    565         tarinfo = self.tar.getmember(longname)
    566         offset = tarinfo.offset
    567         self.tar.fileobj.seek(offset)
    568         fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
    569         self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
    570 
    571     def test_header_offset(self):
    572         # Test if the start offset of the TarInfo object includes
    573         # the preceding extended header.
    574         longname = self.subdir + "/" + "123/" * 125 + "longname"
    575         offset = self.tar.getmember(longname).offset
    576         fobj = open(tarname)
    577         fobj.seek(offset)
    578         tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
    579         self.assertEqual(tarinfo.type, self.longnametype)
    580 
    581 
    582 class GNUReadTest(LongnameTest):
    583 
    584     subdir = "gnu"
    585     longnametype = tarfile.GNUTYPE_LONGNAME
    586 
    587     def test_sparse_file(self):
    588         tarinfo1 = self.tar.getmember("ustar/sparse")
    589         fobj1 = self.tar.extractfile(tarinfo1)
    590         tarinfo2 = self.tar.getmember("gnu/sparse")
    591         fobj2 = self.tar.extractfile(tarinfo2)
    592         self.assertTrue(fobj1.read() == fobj2.read(),
    593                 "sparse file extraction failed")
    594 
    595 
    596 class PaxReadTest(LongnameTest):
    597 
    598     subdir = "pax"
    599     longnametype = tarfile.XHDTYPE
    600 
    601     def test_pax_global_headers(self):
    602         tar = tarfile.open(tarname, encoding="iso8859-1")
    603 
    604         tarinfo = tar.getmember("pax/regtype1")
    605         self.assertEqual(tarinfo.uname, "foo")
    606         self.assertEqual(tarinfo.gname, "bar")
    607         self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"")
    608 
    609         tarinfo = tar.getmember("pax/regtype2")
    610         self.assertEqual(tarinfo.uname, "")
    611         self.assertEqual(tarinfo.gname, "bar")
    612         self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"")
    613 
    614         tarinfo = tar.getmember("pax/regtype3")
    615         self.assertEqual(tarinfo.uname, "tarfile")
    616         self.assertEqual(tarinfo.gname, "tarfile")
    617         self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"")
    618 
    619     def test_pax_number_fields(self):
    620         # All following number fields are read from the pax header.
    621         tar = tarfile.open(tarname, encoding="iso8859-1")
    622         tarinfo = tar.getmember("pax/regtype4")
    623         self.assertEqual(tarinfo.size, 7011)
    624         self.assertEqual(tarinfo.uid, 123)
    625         self.assertEqual(tarinfo.gid, 123)
    626         self.assertEqual(tarinfo.mtime, 1041808783.0)
    627         self.assertEqual(type(tarinfo.mtime), float)
    628         self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
    629         self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
    630 
    631 
    632 class WriteTestBase(unittest.TestCase):
    633     # Put all write tests in here that are supposed to be tested
    634     # in all possible mode combinations.
    635 
    636     def test_fileobj_no_close(self):
    637         fobj = StringIO.StringIO()
    638         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    639         tar.addfile(tarfile.TarInfo("foo"))
    640         tar.close()
    641         self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
    642 
    643 
    644 class WriteTest(WriteTestBase):
    645 
    646     mode = "w:"
    647 
    648     def test_100_char_name(self):
    649         # The name field in a tar header stores strings of at most 100 chars.
    650         # If a string is shorter than 100 chars it has to be padded with '\0',
    651         # which implies that a string of exactly 100 chars is stored without
    652         # a trailing '\0'.
    653         name = "0123456789" * 10
    654         tar = tarfile.open(tmpname, self.mode)
    655         t = tarfile.TarInfo(name)
    656         tar.addfile(t)
    657         tar.close()
    658 
    659         tar = tarfile.open(tmpname)
    660         self.assertTrue(tar.getnames()[0] == name,
    661                 "failed to store 100 char filename")
    662         tar.close()
    663 
    664     def test_tar_size(self):
    665         # Test for bug #1013882.
    666         tar = tarfile.open(tmpname, self.mode)
    667         path = os.path.join(TEMPDIR, "file")
    668         fobj = open(path, "wb")
    669         fobj.write("aaa")
    670         fobj.close()
    671         tar.add(path)
    672         tar.close()
    673         self.assertTrue(os.path.getsize(tmpname) > 0,
    674                 "tarfile is empty")
    675 
    676     # The test_*_size tests test for bug #1167128.
    677     def test_file_size(self):
    678         tar = tarfile.open(tmpname, self.mode)
    679 
    680         path = os.path.join(TEMPDIR, "file")
    681         fobj = open(path, "wb")
    682         fobj.close()
    683         tarinfo = tar.gettarinfo(path)
    684         self.assertEqual(tarinfo.size, 0)
    685 
    686         fobj = open(path, "wb")
    687         fobj.write("aaa")
    688         fobj.close()
    689         tarinfo = tar.gettarinfo(path)
    690         self.assertEqual(tarinfo.size, 3)
    691 
    692         tar.close()
    693 
    694     def test_directory_size(self):
    695         path = os.path.join(TEMPDIR, "directory")
    696         os.mkdir(path)
    697         try:
    698             tar = tarfile.open(tmpname, self.mode)
    699             tarinfo = tar.gettarinfo(path)
    700             self.assertEqual(tarinfo.size, 0)
    701         finally:
    702             os.rmdir(path)
    703 
    704     def test_link_size(self):
    705         if hasattr(os, "link"):
    706             link = os.path.join(TEMPDIR, "link")
    707             target = os.path.join(TEMPDIR, "link_target")
    708             fobj = open(target, "wb")
    709             fobj.write("aaa")
    710             fobj.close()
    711             os.link(target, link)
    712             try:
    713                 tar = tarfile.open(tmpname, self.mode)
    714                 # Record the link target in the inodes list.
    715                 tar.gettarinfo(target)
    716                 tarinfo = tar.gettarinfo(link)
    717                 self.assertEqual(tarinfo.size, 0)
    718             finally:
    719                 os.remove(target)
    720                 os.remove(link)
    721 
    722     def test_symlink_size(self):
    723         if hasattr(os, "symlink"):
    724             path = os.path.join(TEMPDIR, "symlink")
    725             os.symlink("link_target", path)
    726             try:
    727                 tar = tarfile.open(tmpname, self.mode)
    728                 tarinfo = tar.gettarinfo(path)
    729                 self.assertEqual(tarinfo.size, 0)
    730             finally:
    731                 os.remove(path)
    732 
    733     def test_add_self(self):
    734         # Test for #1257255.
    735         dstname = os.path.abspath(tmpname)
    736 
    737         tar = tarfile.open(tmpname, self.mode)
    738         self.assertTrue(tar.name == dstname, "archive name must be absolute")
    739 
    740         tar.add(dstname)
    741         self.assertTrue(tar.getnames() == [], "added the archive to itself")
    742 
    743         cwd = os.getcwd()
    744         os.chdir(TEMPDIR)
    745         tar.add(dstname)
    746         os.chdir(cwd)
    747         self.assertTrue(tar.getnames() == [], "added the archive to itself")
    748 
    749     def test_exclude(self):
    750         tempdir = os.path.join(TEMPDIR, "exclude")
    751         os.mkdir(tempdir)
    752         try:
    753             for name in ("foo", "bar", "baz"):
    754                 name = os.path.join(tempdir, name)
    755                 open(name, "wb").close()
    756 
    757             exclude = os.path.isfile
    758 
    759             tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
    760             with test_support.check_warnings(("use the filter argument",
    761                                               DeprecationWarning)):
    762                 tar.add(tempdir, arcname="empty_dir", exclude=exclude)
    763             tar.close()
    764 
    765             tar = tarfile.open(tmpname, "r")
    766             self.assertEqual(len(tar.getmembers()), 1)
    767             self.assertEqual(tar.getnames()[0], "empty_dir")
    768         finally:
    769             shutil.rmtree(tempdir)
    770 
    771     def test_filter(self):
    772         tempdir = os.path.join(TEMPDIR, "filter")
    773         os.mkdir(tempdir)
    774         try:
    775             for name in ("foo", "bar", "baz"):
    776                 name = os.path.join(tempdir, name)
    777                 open(name, "wb").close()
    778 
    779             def filter(tarinfo):
    780                 if os.path.basename(tarinfo.name) == "bar":
    781                     return
    782                 tarinfo.uid = 123
    783                 tarinfo.uname = "foo"
    784                 return tarinfo
    785 
    786             tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
    787             tar.add(tempdir, arcname="empty_dir", filter=filter)
    788             tar.close()
    789 
    790             tar = tarfile.open(tmpname, "r")
    791             for tarinfo in tar:
    792                 self.assertEqual(tarinfo.uid, 123)
    793                 self.assertEqual(tarinfo.uname, "foo")
    794             self.assertEqual(len(tar.getmembers()), 3)
    795             tar.close()
    796         finally:
    797             shutil.rmtree(tempdir)
    798 
    799     # Guarantee that stored pathnames are not modified. Don't
    800     # remove ./ or ../ or double slashes. Still make absolute
    801     # pathnames relative.
    802     # For details see bug #6054.
    803     def _test_pathname(self, path, cmp_path=None, dir=False):
    804         # Create a tarfile with an empty member named path
    805         # and compare the stored name with the original.
    806         foo = os.path.join(TEMPDIR, "foo")
    807         if not dir:
    808             open(foo, "w").close()
    809         else:
    810             os.mkdir(foo)
    811 
    812         tar = tarfile.open(tmpname, self.mode)
    813         tar.add(foo, arcname=path)
    814         tar.close()
    815 
    816         tar = tarfile.open(tmpname, "r")
    817         t = tar.next()
    818         tar.close()
    819 
    820         if not dir:
    821             os.remove(foo)
    822         else:
    823             os.rmdir(foo)
    824 
    825         self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
    826 
    827     def test_pathnames(self):
    828         self._test_pathname("foo")
    829         self._test_pathname(os.path.join("foo", ".", "bar"))
    830         self._test_pathname(os.path.join("foo", "..", "bar"))
    831         self._test_pathname(os.path.join(".", "foo"))
    832         self._test_pathname(os.path.join(".", "foo", "."))
    833         self._test_pathname(os.path.join(".", "foo", ".", "bar"))
    834         self._test_pathname(os.path.join(".", "foo", "..", "bar"))
    835         self._test_pathname(os.path.join(".", "foo", "..", "bar"))
    836         self._test_pathname(os.path.join("..", "foo"))
    837         self._test_pathname(os.path.join("..", "foo", ".."))
    838         self._test_pathname(os.path.join("..", "foo", ".", "bar"))
    839         self._test_pathname(os.path.join("..", "foo", "..", "bar"))
    840 
    841         self._test_pathname("foo" + os.sep + os.sep + "bar")
    842         self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
    843 
    844     def test_abs_pathnames(self):
    845         if sys.platform == "win32":
    846             self._test_pathname("C:\\foo", "foo")
    847         else:
    848             self._test_pathname("/foo", "foo")
    849             self._test_pathname("///foo", "foo")
    850 
    851     def test_cwd(self):
    852         # Test adding the current working directory.
    853         cwd = os.getcwd()
    854         os.chdir(TEMPDIR)
    855         try:
    856             open("foo", "w").close()
    857 
    858             tar = tarfile.open(tmpname, self.mode)
    859             tar.add(".")
    860             tar.close()
    861 
    862             tar = tarfile.open(tmpname, "r")
    863             for t in tar:
    864                 self.assertTrue(t.name == "." or t.name.startswith("./"))
    865             tar.close()
    866         finally:
    867             os.chdir(cwd)
    868 
    869     @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
    870     def test_extractall_symlinks(self):
    871         # Test if extractall works properly when tarfile contains symlinks
    872         tempdir = os.path.join(TEMPDIR, "testsymlinks")
    873         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
    874         os.mkdir(tempdir)
    875         try:
    876             source_file = os.path.join(tempdir,'source')
    877             target_file = os.path.join(tempdir,'symlink')
    878             with open(source_file,'w') as f:
    879                 f.write('something\n')
    880             os.symlink(source_file, target_file)
    881             tar = tarfile.open(temparchive,'w')
    882             tar.add(source_file, arcname=os.path.basename(source_file))
    883             tar.add(target_file, arcname=os.path.basename(target_file))
    884             tar.close()
    885             # Let's extract it to the location which contains the symlink
    886             tar = tarfile.open(temparchive,'r')
    887             # this should not raise OSError: [Errno 17] File exists
    888             try:
    889                 tar.extractall(path=tempdir)
    890             except OSError:
    891                 self.fail("extractall failed with symlinked files")
    892             finally:
    893                 tar.close()
    894         finally:
    895             os.unlink(temparchive)
    896             shutil.rmtree(tempdir)
    897 
    898     @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
    899     def test_extractall_broken_symlinks(self):
    900         # Test if extractall works properly when tarfile contains broken
    901         # symlinks
    902         tempdir = os.path.join(TEMPDIR, "testsymlinks")
    903         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
    904         os.mkdir(tempdir)
    905         try:
    906             source_file = os.path.join(tempdir,'source')
    907             target_file = os.path.join(tempdir,'symlink')
    908             with open(source_file,'w') as f:
    909                 f.write('something\n')
    910             os.symlink(source_file, target_file)
    911             tar = tarfile.open(temparchive,'w')
    912             tar.add(target_file, arcname=os.path.basename(target_file))
    913             tar.close()
    914             # remove the real file
    915             os.unlink(source_file)
    916             # Let's extract it to the location which contains the symlink
    917             tar = tarfile.open(temparchive,'r')
    918             # this should not raise OSError: [Errno 17] File exists
    919             try:
    920                 tar.extractall(path=tempdir)
    921             except OSError:
    922                 self.fail("extractall failed with broken symlinked files")
    923             finally:
    924                 tar.close()
    925         finally:
    926             os.unlink(temparchive)
    927             shutil.rmtree(tempdir)
    928 
    929     @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
    930     def test_extractall_hardlinks(self):
    931         # Test if extractall works properly when tarfile contains symlinks
    932         tempdir = os.path.join(TEMPDIR, "testsymlinks")
    933         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
    934         os.mkdir(tempdir)
    935         try:
    936             source_file = os.path.join(tempdir,'source')
    937             target_file = os.path.join(tempdir,'symlink')
    938             with open(source_file,'w') as f:
    939                 f.write('something\n')
    940             os.link(source_file, target_file)
    941             tar = tarfile.open(temparchive,'w')
    942             tar.add(source_file, arcname=os.path.basename(source_file))
    943             tar.add(target_file, arcname=os.path.basename(target_file))
    944             tar.close()
    945             # Let's extract it to the location which contains the symlink
    946             tar = tarfile.open(temparchive,'r')
    947             # this should not raise OSError: [Errno 17] File exists
    948             try:
    949                 tar.extractall(path=tempdir)
    950             except OSError:
    951                 self.fail("extractall failed with linked files")
    952             finally:
    953                 tar.close()
    954         finally:
    955             os.unlink(temparchive)
    956             shutil.rmtree(tempdir)
    957 
    958 class StreamWriteTest(WriteTestBase):
    959 
    960     mode = "w|"
    961 
    962     def test_stream_padding(self):
    963         # Test for bug #1543303.
    964         tar = tarfile.open(tmpname, self.mode)
    965         tar.close()
    966 
    967         if self.mode.endswith("gz"):
    968             fobj = gzip.GzipFile(tmpname)
    969             data = fobj.read()
    970             fobj.close()
    971         elif self.mode.endswith("bz2"):
    972             dec = bz2.BZ2Decompressor()
    973             data = open(tmpname, "rb").read()
    974             data = dec.decompress(data)
    975             self.assertTrue(len(dec.unused_data) == 0,
    976                     "found trailing data")
    977         else:
    978             fobj = open(tmpname, "rb")
    979             data = fobj.read()
    980             fobj.close()
    981 
    982         self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
    983                          "incorrect zero padding")
    984 
    985     def test_file_mode(self):
    986         # Test for issue #8464: Create files with correct
    987         # permissions.
    988         if sys.platform == "win32" or not hasattr(os, "umask"):
    989             return
    990 
    991         if os.path.exists(tmpname):
    992             os.remove(tmpname)
    993 
    994         original_umask = os.umask(0022)
    995         try:
    996             tar = tarfile.open(tmpname, self.mode)
    997             tar.close()
    998             mode = os.stat(tmpname).st_mode & 0777
    999             self.assertEqual(mode, 0644, "wrong file permissions")
   1000         finally:
   1001             os.umask(original_umask)
   1002 
   1003     def test_issue13639(self):
   1004         try:
   1005             with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode):
   1006                 pass
   1007         except UnicodeDecodeError:
   1008             self.fail("_Stream failed to write unicode filename")
   1009 
   1010 
   1011 class GNUWriteTest(unittest.TestCase):
   1012     # This testcase checks for correct creation of GNU Longname
   1013     # and Longlink extended headers (cp. bug #812325).
   1014 
   1015     def _length(self, s):
   1016         blocks, remainder = divmod(len(s) + 1, 512)
   1017         if remainder:
   1018             blocks += 1
   1019         return blocks * 512
   1020 
   1021     def _calc_size(self, name, link=None):
   1022         # Initial tar header
   1023         count = 512
   1024 
   1025         if len(name) > tarfile.LENGTH_NAME:
   1026             # GNU longname extended header + longname
   1027             count += 512
   1028             count += self._length(name)
   1029         if link is not None and len(link) > tarfile.LENGTH_LINK:
   1030             # GNU longlink extended header + longlink
   1031             count += 512
   1032             count += self._length(link)
   1033         return count
   1034 
   1035     def _test(self, name, link=None):
   1036         tarinfo = tarfile.TarInfo(name)
   1037         if link:
   1038             tarinfo.linkname = link
   1039             tarinfo.type = tarfile.LNKTYPE
   1040 
   1041         tar = tarfile.open(tmpname, "w")
   1042         tar.format = tarfile.GNU_FORMAT
   1043         tar.addfile(tarinfo)
   1044 
   1045         v1 = self._calc_size(name, link)
   1046         v2 = tar.offset
   1047         self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
   1048 
   1049         tar.close()
   1050 
   1051         tar = tarfile.open(tmpname)
   1052         member = tar.next()
   1053         self.assertIsNotNone(member,
   1054                 "unable to read longname member")
   1055         self.assertEqual(tarinfo.name, member.name,
   1056                 "unable to read longname member")
   1057         self.assertEqual(tarinfo.linkname, member.linkname,
   1058                 "unable to read longname member")
   1059 
   1060     def test_longname_1023(self):
   1061         self._test(("longnam/" * 127) + "longnam")
   1062 
   1063     def test_longname_1024(self):
   1064         self._test(("longnam/" * 127) + "longname")
   1065 
   1066     def test_longname_1025(self):
   1067         self._test(("longnam/" * 127) + "longname_")
   1068 
   1069     def test_longlink_1023(self):
   1070         self._test("name", ("longlnk/" * 127) + "longlnk")
   1071 
   1072     def test_longlink_1024(self):
   1073         self._test("name", ("longlnk/" * 127) + "longlink")
   1074 
   1075     def test_longlink_1025(self):
   1076         self._test("name", ("longlnk/" * 127) + "longlink_")
   1077 
   1078     def test_longnamelink_1023(self):
   1079         self._test(("longnam/" * 127) + "longnam",
   1080                    ("longlnk/" * 127) + "longlnk")
   1081 
   1082     def test_longnamelink_1024(self):
   1083         self._test(("longnam/" * 127) + "longname",
   1084                    ("longlnk/" * 127) + "longlink")
   1085 
   1086     def test_longnamelink_1025(self):
   1087         self._test(("longnam/" * 127) + "longname_",
   1088                    ("longlnk/" * 127) + "longlink_")
   1089 
   1090 
   1091 class HardlinkTest(unittest.TestCase):
   1092     # Test the creation of LNKTYPE (hardlink) members in an archive.
   1093 
   1094     def setUp(self):
   1095         self.foo = os.path.join(TEMPDIR, "foo")
   1096         self.bar = os.path.join(TEMPDIR, "bar")
   1097 
   1098         fobj = open(self.foo, "wb")
   1099         fobj.write("foo")
   1100         fobj.close()
   1101 
   1102         os.link(self.foo, self.bar)
   1103 
   1104         self.tar = tarfile.open(tmpname, "w")
   1105         self.tar.add(self.foo)
   1106 
   1107     def tearDown(self):
   1108         self.tar.close()
   1109         os.remove(self.foo)
   1110         os.remove(self.bar)
   1111 
   1112     def test_add_twice(self):
   1113         # The same name will be added as a REGTYPE every
   1114         # time regardless of st_nlink.
   1115         tarinfo = self.tar.gettarinfo(self.foo)
   1116         self.assertTrue(tarinfo.type == tarfile.REGTYPE,
   1117                 "add file as regular failed")
   1118 
   1119     def test_add_hardlink(self):
   1120         tarinfo = self.tar.gettarinfo(self.bar)
   1121         self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
   1122                 "add file as hardlink failed")
   1123 
   1124     def test_dereference_hardlink(self):
   1125         self.tar.dereference = True
   1126         tarinfo = self.tar.gettarinfo(self.bar)
   1127         self.assertTrue(tarinfo.type == tarfile.REGTYPE,
   1128                 "dereferencing hardlink failed")
   1129 
   1130 
   1131 class PaxWriteTest(GNUWriteTest):
   1132 
   1133     def _test(self, name, link=None):
   1134         # See GNUWriteTest.
   1135         tarinfo = tarfile.TarInfo(name)
   1136         if link:
   1137             tarinfo.linkname = link
   1138             tarinfo.type = tarfile.LNKTYPE
   1139 
   1140         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
   1141         tar.addfile(tarinfo)
   1142         tar.close()
   1143 
   1144         tar = tarfile.open(tmpname)
   1145         if link:
   1146             l = tar.getmembers()[0].linkname
   1147             self.assertTrue(link == l, "PAX longlink creation failed")
   1148         else:
   1149             n = tar.getmembers()[0].name
   1150             self.assertTrue(name == n, "PAX longname creation failed")
   1151 
   1152     def test_pax_global_header(self):
   1153         pax_headers = {
   1154                 u"foo": u"bar",
   1155                 u"uid": u"0",
   1156                 u"mtime": u"1.23",
   1157                 u"test": u"",
   1158                 u"": u"test"}
   1159 
   1160         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
   1161                 pax_headers=pax_headers)
   1162         tar.addfile(tarfile.TarInfo("test"))
   1163         tar.close()
   1164 
   1165         # Test if the global header was written correctly.
   1166         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1167         self.assertEqual(tar.pax_headers, pax_headers)
   1168         self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
   1169 
   1170         # Test if all the fields are unicode.
   1171         for key, val in tar.pax_headers.iteritems():
   1172             self.assertTrue(type(key) is unicode)
   1173             self.assertTrue(type(val) is unicode)
   1174             if key in tarfile.PAX_NUMBER_FIELDS:
   1175                 try:
   1176                     tarfile.PAX_NUMBER_FIELDS[key](val)
   1177                 except (TypeError, ValueError):
   1178                     self.fail("unable to convert pax header field")
   1179 
   1180     def test_pax_extended_header(self):
   1181         # The fields from the pax header have priority over the
   1182         # TarInfo.
   1183         pax_headers = {u"path": u"foo", u"uid": u"123"}
   1184 
   1185         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
   1186         t = tarfile.TarInfo()
   1187         t.name = u""     # non-ASCII
   1188         t.uid = 8**8        # too large
   1189         t.pax_headers = pax_headers
   1190         tar.addfile(t)
   1191         tar.close()
   1192 
   1193         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1194         t = tar.getmembers()[0]
   1195         self.assertEqual(t.pax_headers, pax_headers)
   1196         self.assertEqual(t.name, "foo")
   1197         self.assertEqual(t.uid, 123)
   1198 
   1199 
   1200 class UstarUnicodeTest(unittest.TestCase):
   1201     # All *UnicodeTests FIXME
   1202 
   1203     format = tarfile.USTAR_FORMAT
   1204 
   1205     def test_iso8859_1_filename(self):
   1206         self._test_unicode_filename("iso8859-1")
   1207 
   1208     def test_utf7_filename(self):
   1209         self._test_unicode_filename("utf7")
   1210 
   1211     def test_utf8_filename(self):
   1212         self._test_unicode_filename("utf8")
   1213 
   1214     def _test_unicode_filename(self, encoding):
   1215         tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
   1216         name = u""
   1217         tar.addfile(tarfile.TarInfo(name))
   1218         tar.close()
   1219 
   1220         tar = tarfile.open(tmpname, encoding=encoding)
   1221         self.assertTrue(type(tar.getnames()[0]) is not unicode)
   1222         self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
   1223         tar.close()
   1224 
   1225     def test_unicode_filename_error(self):
   1226         tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
   1227         tarinfo = tarfile.TarInfo()
   1228 
   1229         tarinfo.name = ""
   1230         if self.format == tarfile.PAX_FORMAT:
   1231             self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1232         else:
   1233             tar.addfile(tarinfo)
   1234 
   1235         tarinfo.name = u""
   1236         self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1237 
   1238         tarinfo.name = "foo"
   1239         tarinfo.uname = u""
   1240         self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1241 
   1242     def test_unicode_argument(self):
   1243         tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
   1244         for t in tar:
   1245             self.assertTrue(type(t.name) is str)
   1246             self.assertTrue(type(t.linkname) is str)
   1247             self.assertTrue(type(t.uname) is str)
   1248             self.assertTrue(type(t.gname) is str)
   1249         tar.close()
   1250 
   1251     def test_uname_unicode(self):
   1252         for name in (u"", ""):
   1253             t = tarfile.TarInfo("foo")
   1254             t.uname = name
   1255             t.gname = name
   1256 
   1257             fobj = StringIO.StringIO()
   1258             tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
   1259             tar.addfile(t)
   1260             tar.close()
   1261             fobj.seek(0)
   1262 
   1263             tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
   1264             t = tar.getmember("foo")
   1265             self.assertEqual(t.uname, "")
   1266             self.assertEqual(t.gname, "")
   1267 
   1268 
   1269 class GNUUnicodeTest(UstarUnicodeTest):
   1270 
   1271     format = tarfile.GNU_FORMAT
   1272 
   1273 
   1274 class PaxUnicodeTest(UstarUnicodeTest):
   1275 
   1276     format = tarfile.PAX_FORMAT
   1277 
   1278     def _create_unicode_name(self, name):
   1279         tar = tarfile.open(tmpname, "w", format=self.format)
   1280         t = tarfile.TarInfo()
   1281         t.pax_headers["path"] = name
   1282         tar.addfile(t)
   1283         tar.close()
   1284 
   1285     def test_error_handlers(self):
   1286         # Test if the unicode error handlers work correctly for characters
   1287         # that cannot be expressed in a given encoding.
   1288         self._create_unicode_name(u"")
   1289 
   1290         for handler, name in (("utf-8", u"".encode("utf8")),
   1291                     ("replace", "???"), ("ignore", "")):
   1292             tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
   1293                     errors=handler)
   1294             self.assertEqual(tar.getnames()[0], name)
   1295 
   1296         self.assertRaises(UnicodeError, tarfile.open, tmpname,
   1297                 encoding="ascii", errors="strict")
   1298 
   1299     def test_error_handler_utf8(self):
   1300         # Create a pathname that has one component representable using
   1301         # iso8859-1 and the other only in iso8859-15.
   1302         self._create_unicode_name(u"/")
   1303 
   1304         tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
   1305                 errors="utf-8")
   1306         self.assertEqual(tar.getnames()[0], "/" + u"".encode("utf8"))
   1307 
   1308 
   1309 class AppendTest(unittest.TestCase):
   1310     # Test append mode (cp. patch #1652681).
   1311 
   1312     def setUp(self):
   1313         self.tarname = tmpname
   1314         if os.path.exists(self.tarname):
   1315             os.remove(self.tarname)
   1316 
   1317     def _add_testfile(self, fileobj=None):
   1318         tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
   1319         tar.addfile(tarfile.TarInfo("bar"))
   1320         tar.close()
   1321 
   1322     def _create_testtar(self, mode="w:"):
   1323         src = tarfile.open(tarname, encoding="iso8859-1")
   1324         t = src.getmember("ustar/regtype")
   1325         t.name = "foo"
   1326         f = src.extractfile(t)
   1327         tar = tarfile.open(self.tarname, mode)
   1328         tar.addfile(t, f)
   1329         tar.close()
   1330 
   1331     def _test(self, names=["bar"], fileobj=None):
   1332         tar = tarfile.open(self.tarname, fileobj=fileobj)
   1333         self.assertEqual(tar.getnames(), names)
   1334 
   1335     def test_non_existing(self):
   1336         self._add_testfile()
   1337         self._test()
   1338 
   1339     def test_empty(self):
   1340         tarfile.open(self.tarname, "w:").close()
   1341         self._add_testfile()
   1342         self._test()
   1343 
   1344     def test_empty_fileobj(self):
   1345         fobj = StringIO.StringIO("\0" * 1024)
   1346         self._add_testfile(fobj)
   1347         fobj.seek(0)
   1348         self._test(fileobj=fobj)
   1349 
   1350     def test_fileobj(self):
   1351         self._create_testtar()
   1352         data = open(self.tarname).read()
   1353         fobj = StringIO.StringIO(data)
   1354         self._add_testfile(fobj)
   1355         fobj.seek(0)
   1356         self._test(names=["foo", "bar"], fileobj=fobj)
   1357 
   1358     def test_existing(self):
   1359         self._create_testtar()
   1360         self._add_testfile()
   1361         self._test(names=["foo", "bar"])
   1362 
   1363     def test_append_gz(self):
   1364         if gzip is None:
   1365             return
   1366         self._create_testtar("w:gz")
   1367         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
   1368 
   1369     def test_append_bz2(self):
   1370         if bz2 is None:
   1371             return
   1372         self._create_testtar("w:bz2")
   1373         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
   1374 
   1375     # Append mode is supposed to fail if the tarfile to append to
   1376     # does not end with a zero block.
   1377     def _test_error(self, data):
   1378         open(self.tarname, "wb").write(data)
   1379         self.assertRaises(tarfile.ReadError, self._add_testfile)
   1380 
   1381     def test_null(self):
   1382         self._test_error("")
   1383 
   1384     def test_incomplete(self):
   1385         self._test_error("\0" * 13)
   1386 
   1387     def test_premature_eof(self):
   1388         data = tarfile.TarInfo("foo").tobuf()
   1389         self._test_error(data)
   1390 
   1391     def test_trailing_garbage(self):
   1392         data = tarfile.TarInfo("foo").tobuf()
   1393         self._test_error(data + "\0" * 13)
   1394 
   1395     def test_invalid(self):
   1396         self._test_error("a" * 512)
   1397 
   1398 
   1399 class LimitsTest(unittest.TestCase):
   1400 
   1401     def test_ustar_limits(self):
   1402         # 100 char name
   1403         tarinfo = tarfile.TarInfo("0123456789" * 10)
   1404         tarinfo.tobuf(tarfile.USTAR_FORMAT)
   1405 
   1406         # 101 char name that cannot be stored
   1407         tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
   1408         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1409 
   1410         # 256 char name with a slash at pos 156
   1411         tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
   1412         tarinfo.tobuf(tarfile.USTAR_FORMAT)
   1413 
   1414         # 256 char name that cannot be stored
   1415         tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
   1416         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1417 
   1418         # 512 char name
   1419         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1420         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1421 
   1422         # 512 char linkname
   1423         tarinfo = tarfile.TarInfo("longlink")
   1424         tarinfo.linkname = "123/" * 126 + "longname"
   1425         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1426 
   1427         # uid > 8 digits
   1428         tarinfo = tarfile.TarInfo("name")
   1429         tarinfo.uid = 010000000
   1430         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1431 
   1432     def test_gnu_limits(self):
   1433         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1434         tarinfo.tobuf(tarfile.GNU_FORMAT)
   1435 
   1436         tarinfo = tarfile.TarInfo("longlink")
   1437         tarinfo.linkname = "123/" * 126 + "longname"
   1438         tarinfo.tobuf(tarfile.GNU_FORMAT)
   1439 
   1440         # uid >= 256 ** 7
   1441         tarinfo = tarfile.TarInfo("name")
   1442         tarinfo.uid = 04000000000000000000L
   1443         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
   1444 
   1445     def test_pax_limits(self):
   1446         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1447         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1448 
   1449         tarinfo = tarfile.TarInfo("longlink")
   1450         tarinfo.linkname = "123/" * 126 + "longname"
   1451         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1452 
   1453         tarinfo = tarfile.TarInfo("name")
   1454         tarinfo.uid = 04000000000000000000L
   1455         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1456 
   1457 
   1458 class ContextManagerTest(unittest.TestCase):
   1459 
   1460     def test_basic(self):
   1461         with tarfile.open(tarname) as tar:
   1462             self.assertFalse(tar.closed, "closed inside runtime context")
   1463         self.assertTrue(tar.closed, "context manager failed")
   1464 
   1465     def test_closed(self):
   1466         # The __enter__() method is supposed to raise IOError
   1467         # if the TarFile object is already closed.
   1468         tar = tarfile.open(tarname)
   1469         tar.close()
   1470         with self.assertRaises(IOError):
   1471             with tar:
   1472                 pass
   1473 
   1474     def test_exception(self):
   1475         # Test if the IOError exception is passed through properly.
   1476         with self.assertRaises(Exception) as exc:
   1477             with tarfile.open(tarname) as tar:
   1478                 raise IOError
   1479         self.assertIsInstance(exc.exception, IOError,
   1480                               "wrong exception raised in context manager")
   1481         self.assertTrue(tar.closed, "context manager failed")
   1482 
   1483     def test_no_eof(self):
   1484         # __exit__() must not write end-of-archive blocks if an
   1485         # exception was raised.
   1486         try:
   1487             with tarfile.open(tmpname, "w") as tar:
   1488                 raise Exception
   1489         except:
   1490             pass
   1491         self.assertEqual(os.path.getsize(tmpname), 0,
   1492                 "context manager wrote an end-of-archive block")
   1493         self.assertTrue(tar.closed, "context manager failed")
   1494 
   1495     def test_eof(self):
   1496         # __exit__() must write end-of-archive blocks, i.e. call
   1497         # TarFile.close() if there was no error.
   1498         with tarfile.open(tmpname, "w"):
   1499             pass
   1500         self.assertNotEqual(os.path.getsize(tmpname), 0,
   1501                 "context manager wrote no end-of-archive block")
   1502 
   1503     def test_fileobj(self):
   1504         # Test that __exit__() did not close the external file
   1505         # object.
   1506         fobj = open(tmpname, "wb")
   1507         try:
   1508             with tarfile.open(fileobj=fobj, mode="w") as tar:
   1509                 raise Exception
   1510         except:
   1511             pass
   1512         self.assertFalse(fobj.closed, "external file object was closed")
   1513         self.assertTrue(tar.closed, "context manager failed")
   1514         fobj.close()
   1515 
   1516 
   1517 class LinkEmulationTest(ReadTest):
   1518 
   1519     # Test for issue #8741 regression. On platforms that do not support
   1520     # symbolic or hard links tarfile tries to extract these types of members as
   1521     # the regular files they point to.
   1522     def _test_link_extraction(self, name):
   1523         self.tar.extract(name, TEMPDIR)
   1524         data = open(os.path.join(TEMPDIR, name), "rb").read()
   1525         self.assertEqual(md5sum(data), md5_regtype)
   1526 
   1527     def test_hardlink_extraction1(self):
   1528         self._test_link_extraction("ustar/lnktype")
   1529 
   1530     def test_hardlink_extraction2(self):
   1531         self._test_link_extraction("./ustar/linktest2/lnktype")
   1532 
   1533     def test_symlink_extraction1(self):
   1534         self._test_link_extraction("ustar/symtype")
   1535 
   1536     def test_symlink_extraction2(self):
   1537         self._test_link_extraction("./ustar/linktest2/symtype")
   1538 
   1539 
   1540 class GzipMiscReadTest(MiscReadTest):
   1541     tarname = gzipname
   1542     mode = "r:gz"
   1543 class GzipUstarReadTest(UstarReadTest):
   1544     tarname = gzipname
   1545     mode = "r:gz"
   1546 class GzipStreamReadTest(StreamReadTest):
   1547     tarname = gzipname
   1548     mode = "r|gz"
   1549 class GzipWriteTest(WriteTest):
   1550     mode = "w:gz"
   1551 class GzipStreamWriteTest(StreamWriteTest):
   1552     mode = "w|gz"
   1553 
   1554 
   1555 class Bz2MiscReadTest(MiscReadTest):
   1556     tarname = bz2name
   1557     mode = "r:bz2"
   1558 class Bz2UstarReadTest(UstarReadTest):
   1559     tarname = bz2name
   1560     mode = "r:bz2"
   1561 class Bz2StreamReadTest(StreamReadTest):
   1562     tarname = bz2name
   1563     mode = "r|bz2"
   1564 class Bz2WriteTest(WriteTest):
   1565     mode = "w:bz2"
   1566 class Bz2StreamWriteTest(StreamWriteTest):
   1567     mode = "w|bz2"
   1568 
   1569 class Bz2PartialReadTest(unittest.TestCase):
   1570     # Issue5068: The _BZ2Proxy.read() method loops forever
   1571     # on an empty or partial bzipped file.
   1572 
   1573     def _test_partial_input(self, mode):
   1574         class MyStringIO(StringIO.StringIO):
   1575             hit_eof = False
   1576             def read(self, n):
   1577                 if self.hit_eof:
   1578                     raise AssertionError("infinite loop detected in tarfile.open()")
   1579                 self.hit_eof = self.pos == self.len
   1580                 return StringIO.StringIO.read(self, n)
   1581             def seek(self, *args):
   1582                 self.hit_eof = False
   1583                 return StringIO.StringIO.seek(self, *args)
   1584 
   1585         data = bz2.compress(tarfile.TarInfo("foo").tobuf())
   1586         for x in range(len(data) + 1):
   1587             try:
   1588                 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
   1589             except tarfile.ReadError:
   1590                 pass # we have no interest in ReadErrors
   1591 
   1592     def test_partial_input(self):
   1593         self._test_partial_input("r")
   1594 
   1595     def test_partial_input_bz2(self):
   1596         self._test_partial_input("r:bz2")
   1597 
   1598 
   1599 def test_main():
   1600     os.makedirs(TEMPDIR)
   1601 
   1602     tests = [
   1603         UstarReadTest,
   1604         MiscReadTest,
   1605         StreamReadTest,
   1606         DetectReadTest,
   1607         MemberReadTest,
   1608         GNUReadTest,
   1609         PaxReadTest,
   1610         WriteTest,
   1611         StreamWriteTest,
   1612         GNUWriteTest,
   1613         PaxWriteTest,
   1614         UstarUnicodeTest,
   1615         GNUUnicodeTest,
   1616         PaxUnicodeTest,
   1617         AppendTest,
   1618         LimitsTest,
   1619         ContextManagerTest,
   1620     ]
   1621 
   1622     if hasattr(os, "link"):
   1623         tests.append(HardlinkTest)
   1624     else:
   1625         tests.append(LinkEmulationTest)
   1626 
   1627     fobj = open(tarname, "rb")
   1628     data = fobj.read()
   1629     fobj.close()
   1630 
   1631     if gzip:
   1632         # Create testtar.tar.gz and add gzip-specific tests.
   1633         tar = gzip.open(gzipname, "wb")
   1634         tar.write(data)
   1635         tar.close()
   1636 
   1637         tests += [
   1638             GzipMiscReadTest,
   1639             GzipUstarReadTest,
   1640             GzipStreamReadTest,
   1641             GzipWriteTest,
   1642             GzipStreamWriteTest,
   1643         ]
   1644 
   1645     if bz2:
   1646         # Create testtar.tar.bz2 and add bz2-specific tests.
   1647         tar = bz2.BZ2File(bz2name, "wb")
   1648         tar.write(data)
   1649         tar.close()
   1650 
   1651         tests += [
   1652             Bz2MiscReadTest,
   1653             Bz2UstarReadTest,
   1654             Bz2StreamReadTest,
   1655             Bz2WriteTest,
   1656             Bz2StreamWriteTest,
   1657             Bz2PartialReadTest,
   1658         ]
   1659 
   1660     try:
   1661         test_support.run_unittest(*tests)
   1662     finally:
   1663         if os.path.exists(TEMPDIR):
   1664             shutil.rmtree(TEMPDIR)
   1665 
   1666 if __name__ == "__main__":
   1667     test_main()
   1668