Home | History | Annotate | Download | only in test
      1 # -*- coding: iso-8859-15 -*-

      2 
      3 import sys
      4 import os
      5 import shutil
      6 import StringIO
      7 from hashlib import md5
      8 import errno
      9 
     10 import unittest
     11 import tarfile
     12 
     13 from test import test_support
     14 
     15 # Check for our compression modules.

     16 try:
     17     import gzip
     18     gzip.GzipFile
     19 except (ImportError, AttributeError):
     20     gzip = None
     21 try:
     22     import bz2
     23 except ImportError:
     24     bz2 = None
     25 
     26 def md5sum(data):
     27     return md5(data).hexdigest()
     28 
     29 TEMPDIR = os.path.abspath(test_support.TESTFN)
     30 tarname = test_support.findfile("testtar.tar")
     31 gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
     32 bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
     33 tmpname = os.path.join(TEMPDIR, "tmp.tar")
     34 
     35 md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
     36 md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
     37 
     38 
     39 class ReadTest(unittest.TestCase):
     40 
     41     tarname = tarname
     42     mode = "r:"
     43 
     44     def setUp(self):
     45         self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
     46 
     47     def tearDown(self):
     48         self.tar.close()
     49 
     50 
     51 class UstarReadTest(ReadTest):
     52 
     53     def test_fileobj_regular_file(self):
     54         tarinfo = self.tar.getmember("ustar/regtype")
     55         fobj = self.tar.extractfile(tarinfo)
     56         data = fobj.read()
     57         self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
     58                 "regular file extraction failed")
     59 
     60     def test_fileobj_readlines(self):
     61         self.tar.extract("ustar/regtype", TEMPDIR)
     62         tarinfo = self.tar.getmember("ustar/regtype")
     63         fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
     64         fobj2 = self.tar.extractfile(tarinfo)
     65 
     66         lines1 = fobj1.readlines()
     67         lines2 = fobj2.readlines()
     68         self.assertTrue(lines1 == lines2,
     69                 "fileobj.readlines() failed")
     70         self.assertTrue(len(lines2) == 114,
     71                 "fileobj.readlines() failed")
     72         self.assertTrue(lines2[83] ==
     73                 "I will gladly admit that Python is not the fastest running scripting language.\n",
     74                 "fileobj.readlines() failed")
     75 
     76     def test_fileobj_iter(self):
     77         self.tar.extract("ustar/regtype", TEMPDIR)
     78         tarinfo = self.tar.getmember("ustar/regtype")
     79         fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
     80         fobj2 = self.tar.extractfile(tarinfo)
     81         lines1 = fobj1.readlines()
     82         lines2 = [line for line in fobj2]
     83         self.assertTrue(lines1 == lines2,
     84                      "fileobj.__iter__() failed")
     85 
     86     def test_fileobj_seek(self):
     87         self.tar.extract("ustar/regtype", TEMPDIR)
     88         fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
     89         data = fobj.read()
     90         fobj.close()
     91 
     92         tarinfo = self.tar.getmember("ustar/regtype")
     93         fobj = self.tar.extractfile(tarinfo)
     94 
     95         text = fobj.read()
     96         fobj.seek(0)
     97         self.assertTrue(0 == fobj.tell(),
     98                      "seek() to file's start failed")
     99         fobj.seek(2048, 0)
    100         self.assertTrue(2048 == fobj.tell(),
    101                      "seek() to absolute position failed")
    102         fobj.seek(-1024, 1)
    103         self.assertTrue(1024 == fobj.tell(),
    104                      "seek() to negative relative position failed")
    105         fobj.seek(1024, 1)
    106         self.assertTrue(2048 == fobj.tell(),
    107                      "seek() to positive relative position failed")
    108         s = fobj.read(10)
    109         self.assertTrue(s == data[2048:2058],
    110                      "read() after seek failed")
    111         fobj.seek(0, 2)
    112         self.assertTrue(tarinfo.size == fobj.tell(),
    113                      "seek() to file's end failed")
    114         self.assertTrue(fobj.read() == "",
    115                      "read() at file's end did not return empty string")
    116         fobj.seek(-tarinfo.size, 2)
    117         self.assertTrue(0 == fobj.tell(),
    118                      "relative seek() to file's start failed")
    119         fobj.seek(512)
    120         s1 = fobj.readlines()
    121         fobj.seek(512)
    122         s2 = fobj.readlines()
    123         self.assertTrue(s1 == s2,
    124                      "readlines() after seek failed")
    125         fobj.seek(0)
    126         self.assertTrue(len(fobj.readline()) == fobj.tell(),
    127                      "tell() after readline() failed")
    128         fobj.seek(512)
    129         self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
    130                      "tell() after seek() and readline() failed")
    131         fobj.seek(0)
    132         line = fobj.readline()
    133         self.assertTrue(fobj.read() == data[len(line):],
    134                      "read() after readline() failed")
    135         fobj.close()
    136 
    137     # Test if symbolic and hard links are resolved by extractfile().  The

    138     # test link members each point to a regular member whose data is

    139     # supposed to be exported.

    140     def _test_fileobj_link(self, lnktype, regtype):
    141         a = self.tar.extractfile(lnktype)
    142         b = self.tar.extractfile(regtype)
    143         self.assertEqual(a.name, b.name)
    144 
    145     def test_fileobj_link1(self):
    146         self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
    147 
    148     def test_fileobj_link2(self):
    149         self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
    150 
    151     def test_fileobj_symlink1(self):
    152         self._test_fileobj_link("ustar/symtype", "ustar/regtype")
    153 
    154     def test_fileobj_symlink2(self):
    155         self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
    156 
    157 
    158 class CommonReadTest(ReadTest):
    159 
    160     def test_empty_tarfile(self):
    161         # Test for issue6123: Allow opening empty archives.

    162         # This test checks if tarfile.open() is able to open an empty tar

    163         # archive successfully. Note that an empty tar archive is not the

    164         # same as an empty file!

    165         tarfile.open(tmpname, self.mode.replace("r", "w")).close()
    166         try:
    167             tar = tarfile.open(tmpname, self.mode)
    168             tar.getnames()
    169         except tarfile.ReadError:
    170             self.fail("tarfile.open() failed on empty archive")
    171         self.assertListEqual(tar.getmembers(), [])
    172 
    173     def test_null_tarfile(self):
    174         # Test for issue6123: Allow opening empty archives.

    175         # This test guarantees that tarfile.open() does not treat an empty

    176         # file as an empty tar archive.

    177         open(tmpname, "wb").close()
    178         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
    179         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
    180 
    181     def test_ignore_zeros(self):
    182         # Test TarFile's ignore_zeros option.

    183         if self.mode.endswith(":gz"):
    184             _open = gzip.GzipFile
    185         elif self.mode.endswith(":bz2"):
    186             _open = bz2.BZ2File
    187         else:
    188             _open = open
    189 
    190         for char in ('\0', 'a'):
    191             # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')

    192             # are ignored correctly.

    193             fobj = _open(tmpname, "wb")
    194             fobj.write(char * 1024)
    195             fobj.write(tarfile.TarInfo("foo").tobuf())
    196             fobj.close()
    197 
    198             tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
    199             self.assertListEqual(tar.getnames(), ["foo"],
    200                     "ignore_zeros=True should have skipped the %r-blocks" % char)
    201             tar.close()
    202 
    203 
    204 class MiscReadTest(CommonReadTest):
    205 
    206     def test_no_name_argument(self):
    207         fobj = open(self.tarname, "rb")
    208         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    209         self.assertEqual(tar.name, os.path.abspath(fobj.name))
    210 
    211     def test_no_name_attribute(self):
    212         data = open(self.tarname, "rb").read()
    213         fobj = StringIO.StringIO(data)
    214         self.assertRaises(AttributeError, getattr, fobj, "name")
    215         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    216         self.assertEqual(tar.name, None)
    217 
    218     def test_empty_name_attribute(self):
    219         data = open(self.tarname, "rb").read()
    220         fobj = StringIO.StringIO(data)
    221         fobj.name = ""
    222         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    223         self.assertEqual(tar.name, None)
    224 
    225     def test_fileobj_with_offset(self):
    226         # Skip the first member and store values from the second member

    227         # of the testtar.

    228         tar = tarfile.open(self.tarname, mode=self.mode)
    229         tar.next()
    230         t = tar.next()
    231         name = t.name
    232         offset = t.offset
    233         data = tar.extractfile(t).read()
    234         tar.close()
    235 
    236         # Open the testtar and seek to the offset of the second member.

    237         if self.mode.endswith(":gz"):
    238             _open = gzip.GzipFile
    239         elif self.mode.endswith(":bz2"):
    240             _open = bz2.BZ2File
    241         else:
    242             _open = open
    243         fobj = _open(self.tarname, "rb")
    244         fobj.seek(offset)
    245 
    246         # Test if the tarfile starts with the second member.

    247         tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
    248         t = tar.next()
    249         self.assertEqual(t.name, name)
    250         # Read to the end of fileobj and test if seeking back to the

    251         # beginning works.

    252         tar.getmembers()
    253         self.assertEqual(tar.extractfile(t).read(), data,
    254                 "seek back did not work")
    255         tar.close()
    256 
    257     def test_fail_comp(self):
    258         # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.

    259         if self.mode == "r:":
    260             return
    261         self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
    262         fobj = open(tarname, "rb")
    263         self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
    264 
    265     def test_v7_dirtype(self):
    266         # Test old style dirtype member (bug #1336623):

    267         # Old V7 tars create directory members using an AREGTYPE

    268         # header with a "/" appended to the filename field.

    269         tarinfo = self.tar.getmember("misc/dirtype-old-v7")
    270         self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
    271                 "v7 dirtype failed")
    272 
    273     def test_xstar_type(self):
    274         # The xstar format stores extra atime and ctime fields inside the

    275         # space reserved for the prefix field. The prefix field must be

    276         # ignored in this case, otherwise it will mess up the name.

    277         try:
    278             self.tar.getmember("misc/regtype-xstar")
    279         except KeyError:
    280             self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
    281 
    282     def test_check_members(self):
    283         for tarinfo in self.tar:
    284             self.assertTrue(int(tarinfo.mtime) == 07606136617,
    285                     "wrong mtime for %s" % tarinfo.name)
    286             if not tarinfo.name.startswith("ustar/"):
    287                 continue
    288             self.assertTrue(tarinfo.uname == "tarfile",
    289                     "wrong uname for %s" % tarinfo.name)
    290 
    291     def test_find_members(self):
    292         self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
    293                 "could not find all members")
    294 
    295     def test_extract_hardlink(self):
    296         # Test hardlink extraction (e.g. bug #857297).

    297         tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
    298 
    299         tar.extract("ustar/regtype", TEMPDIR)
    300         try:
    301             tar.extract("ustar/lnktype", TEMPDIR)
    302         except EnvironmentError, e:
    303             if e.errno == errno.ENOENT:
    304                 self.fail("hardlink not extracted properly")
    305 
    306         data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read()
    307         self.assertEqual(md5sum(data), md5_regtype)
    308 
    309         try:
    310             tar.extract("ustar/symtype", TEMPDIR)
    311         except EnvironmentError, e:
    312             if e.errno == errno.ENOENT:
    313                 self.fail("symlink not extracted properly")
    314 
    315         data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read()
    316         self.assertEqual(md5sum(data), md5_regtype)
    317 
    318     def test_extractall(self):
    319         # Test if extractall() correctly restores directory permissions

    320         # and times (see issue1735).

    321         tar = tarfile.open(tarname, encoding="iso8859-1")
    322         directories = [t for t in tar if t.isdir()]
    323         tar.extractall(TEMPDIR, directories)
    324         for tarinfo in directories:
    325             path = os.path.join(TEMPDIR, tarinfo.name)
    326             if sys.platform != "win32":
    327                 # Win32 has no support for fine grained permissions.

    328                 self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
    329             self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
    330         tar.close()
    331 
    332     def test_init_close_fobj(self):
    333         # Issue #7341: Close the internal file object in the TarFile

    334         # constructor in case of an error. For the test we rely on

    335         # the fact that opening an empty file raises a ReadError.

    336         empty = os.path.join(TEMPDIR, "empty")
    337         open(empty, "wb").write("")
    338 
    339         try:
    340             tar = object.__new__(tarfile.TarFile)
    341             try:
    342                 tar.__init__(empty)
    343             except tarfile.ReadError:
    344                 self.assertTrue(tar.fileobj.closed)
    345             else:
    346                 self.fail("ReadError not raised")
    347         finally:
    348             os.remove(empty)
    349 
    350 
    351 class StreamReadTest(CommonReadTest):
    352 
    353     mode="r|"
    354 
    355     def test_fileobj_regular_file(self):
    356         tarinfo = self.tar.next() # get "regtype" (can't use getmember)

    357         fobj = self.tar.extractfile(tarinfo)
    358         data = fobj.read()
    359         self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
    360                 "regular file extraction failed")
    361 
    362     def test_provoke_stream_error(self):
    363         tarinfos = self.tar.getmembers()
    364         f = self.tar.extractfile(tarinfos[0]) # read the first member

    365         self.assertRaises(tarfile.StreamError, f.read)
    366 
    367     def test_compare_members(self):
    368         tar1 = tarfile.open(tarname, encoding="iso8859-1")
    369         tar2 = self.tar
    370 
    371         while True:
    372             t1 = tar1.next()
    373             t2 = tar2.next()
    374             if t1 is None:
    375                 break
    376             self.assertTrue(t2 is not None, "stream.next() failed.")
    377 
    378             if t2.islnk() or t2.issym():
    379                 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
    380                 continue
    381 
    382             v1 = tar1.extractfile(t1)
    383             v2 = tar2.extractfile(t2)
    384             if v1 is None:
    385                 continue
    386             self.assertTrue(v2 is not None, "stream.extractfile() failed")
    387             self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
    388 
    389         tar1.close()
    390 
    391 
    392 class DetectReadTest(unittest.TestCase):
    393 
    394     def _testfunc_file(self, name, mode):
    395         try:
    396             tarfile.open(name, mode)
    397         except tarfile.ReadError:
    398             self.fail()
    399 
    400     def _testfunc_fileobj(self, name, mode):
    401         try:
    402             tarfile.open(name, mode, fileobj=open(name, "rb"))
    403         except tarfile.ReadError:
    404             self.fail()
    405 
    406     def _test_modes(self, testfunc):
    407         testfunc(tarname, "r")
    408         testfunc(tarname, "r:")
    409         testfunc(tarname, "r:*")
    410         testfunc(tarname, "r|")
    411         testfunc(tarname, "r|*")
    412 
    413         if gzip:
    414             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
    415             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
    416             self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
    417             self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
    418 
    419             testfunc(gzipname, "r")
    420             testfunc(gzipname, "r:*")
    421             testfunc(gzipname, "r:gz")
    422             testfunc(gzipname, "r|*")
    423             testfunc(gzipname, "r|gz")
    424 
    425         if bz2:
    426             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
    427             self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
    428             self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
    429             self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
    430 
    431             testfunc(bz2name, "r")
    432             testfunc(bz2name, "r:*")
    433             testfunc(bz2name, "r:bz2")
    434             testfunc(bz2name, "r|*")
    435             testfunc(bz2name, "r|bz2")
    436 
    437     def test_detect_file(self):
    438         self._test_modes(self._testfunc_file)
    439 
    440     def test_detect_fileobj(self):
    441         self._test_modes(self._testfunc_fileobj)
    442 
    443 
    444 class MemberReadTest(ReadTest):
    445 
    446     def _test_member(self, tarinfo, chksum=None, **kwargs):
    447         if chksum is not None:
    448             self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
    449                     "wrong md5sum for %s" % tarinfo.name)
    450 
    451         kwargs["mtime"] = 07606136617
    452         kwargs["uid"] = 1000
    453         kwargs["gid"] = 100
    454         if "old-v7" not in tarinfo.name:
    455             # V7 tar can't handle alphabetic owners.

    456             kwargs["uname"] = "tarfile"
    457             kwargs["gname"] = "tarfile"
    458         for k, v in kwargs.iteritems():
    459             self.assertTrue(getattr(tarinfo, k) == v,
    460                     "wrong value in %s field of %s" % (k, tarinfo.name))
    461 
    462     def test_find_regtype(self):
    463         tarinfo = self.tar.getmember("ustar/regtype")
    464         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    465 
    466     def test_find_conttype(self):
    467         tarinfo = self.tar.getmember("ustar/conttype")
    468         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    469 
    470     def test_find_dirtype(self):
    471         tarinfo = self.tar.getmember("ustar/dirtype")
    472         self._test_member(tarinfo, size=0)
    473 
    474     def test_find_dirtype_with_size(self):
    475         tarinfo = self.tar.getmember("ustar/dirtype-with-size")
    476         self._test_member(tarinfo, size=255)
    477 
    478     def test_find_lnktype(self):
    479         tarinfo = self.tar.getmember("ustar/lnktype")
    480         self._test_member(tarinfo, size=0, linkname="ustar/regtype")
    481 
    482     def test_find_symtype(self):
    483         tarinfo = self.tar.getmember("ustar/symtype")
    484         self._test_member(tarinfo, size=0, linkname="regtype")
    485 
    486     def test_find_blktype(self):
    487         tarinfo = self.tar.getmember("ustar/blktype")
    488         self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
    489 
    490     def test_find_chrtype(self):
    491         tarinfo = self.tar.getmember("ustar/chrtype")
    492         self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
    493 
    494     def test_find_fifotype(self):
    495         tarinfo = self.tar.getmember("ustar/fifotype")
    496         self._test_member(tarinfo, size=0)
    497 
    498     def test_find_sparse(self):
    499         tarinfo = self.tar.getmember("ustar/sparse")
    500         self._test_member(tarinfo, size=86016, chksum=md5_sparse)
    501 
    502     def test_find_umlauts(self):
    503         tarinfo = self.tar.getmember("ustar/umlauts-")
    504         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    505 
    506     def test_find_ustar_longname(self):
    507         name = "ustar/" + "12345/" * 39 + "1234567/longname"
    508         self.assertIn(name, self.tar.getnames())
    509 
    510     def test_find_regtype_oldv7(self):
    511         tarinfo = self.tar.getmember("misc/regtype-old-v7")
    512         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    513 
    514     def test_find_pax_umlauts(self):
    515         self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
    516         tarinfo = self.tar.getmember("pax/umlauts-")
    517         self._test_member(tarinfo, size=7011, chksum=md5_regtype)
    518 
    519 
    520 class LongnameTest(ReadTest):
    521 
    522     def test_read_longname(self):
    523         # Test reading of longname (bug #1471427).

    524         longname = self.subdir + "/" + "123/" * 125 + "longname"
    525         try:
    526             tarinfo = self.tar.getmember(longname)
    527         except KeyError:
    528             self.fail("longname not found")
    529         self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
    530 
    531     def test_read_longlink(self):
    532         longname = self.subdir + "/" + "123/" * 125 + "longname"
    533         longlink = self.subdir + "/" + "123/" * 125 + "longlink"
    534         try:
    535             tarinfo = self.tar.getmember(longlink)
    536         except KeyError:
    537             self.fail("longlink not found")
    538         self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
    539 
    540     def test_truncated_longname(self):
    541         longname = self.subdir + "/" + "123/" * 125 + "longname"
    542         tarinfo = self.tar.getmember(longname)
    543         offset = tarinfo.offset
    544         self.tar.fileobj.seek(offset)
    545         fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
    546         self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
    547 
    548     def test_header_offset(self):
    549         # Test if the start offset of the TarInfo object includes

    550         # the preceding extended header.

    551         longname = self.subdir + "/" + "123/" * 125 + "longname"
    552         offset = self.tar.getmember(longname).offset
    553         fobj = open(tarname)
    554         fobj.seek(offset)
    555         tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
    556         self.assertEqual(tarinfo.type, self.longnametype)
    557 
    558 
    559 class GNUReadTest(LongnameTest):
    560 
    561     subdir = "gnu"
    562     longnametype = tarfile.GNUTYPE_LONGNAME
    563 
    564     def test_sparse_file(self):
    565         tarinfo1 = self.tar.getmember("ustar/sparse")
    566         fobj1 = self.tar.extractfile(tarinfo1)
    567         tarinfo2 = self.tar.getmember("gnu/sparse")
    568         fobj2 = self.tar.extractfile(tarinfo2)
    569         self.assertTrue(fobj1.read() == fobj2.read(),
    570                 "sparse file extraction failed")
    571 
    572 
    573 class PaxReadTest(LongnameTest):
    574 
    575     subdir = "pax"
    576     longnametype = tarfile.XHDTYPE
    577 
    578     def test_pax_global_headers(self):
    579         tar = tarfile.open(tarname, encoding="iso8859-1")
    580 
    581         tarinfo = tar.getmember("pax/regtype1")
    582         self.assertEqual(tarinfo.uname, "foo")
    583         self.assertEqual(tarinfo.gname, "bar")
    584         self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"")
    585 
    586         tarinfo = tar.getmember("pax/regtype2")
    587         self.assertEqual(tarinfo.uname, "")
    588         self.assertEqual(tarinfo.gname, "bar")
    589         self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"")
    590 
    591         tarinfo = tar.getmember("pax/regtype3")
    592         self.assertEqual(tarinfo.uname, "tarfile")
    593         self.assertEqual(tarinfo.gname, "tarfile")
    594         self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"")
    595 
    596     def test_pax_number_fields(self):
    597         # All following number fields are read from the pax header.

    598         tar = tarfile.open(tarname, encoding="iso8859-1")
    599         tarinfo = tar.getmember("pax/regtype4")
    600         self.assertEqual(tarinfo.size, 7011)
    601         self.assertEqual(tarinfo.uid, 123)
    602         self.assertEqual(tarinfo.gid, 123)
    603         self.assertEqual(tarinfo.mtime, 1041808783.0)
    604         self.assertEqual(type(tarinfo.mtime), float)
    605         self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
    606         self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
    607 
    608 
    609 class WriteTestBase(unittest.TestCase):
    610     # Put all write tests in here that are supposed to be tested

    611     # in all possible mode combinations.

    612 
    613     def test_fileobj_no_close(self):
    614         fobj = StringIO.StringIO()
    615         tar = tarfile.open(fileobj=fobj, mode=self.mode)
    616         tar.addfile(tarfile.TarInfo("foo"))
    617         tar.close()
    618         self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
    619 
    620 
    621 class WriteTest(WriteTestBase):
    622 
    623     mode = "w:"
    624 
    625     def test_100_char_name(self):
    626         # The name field in a tar header stores strings of at most 100 chars.

    627         # If a string is shorter than 100 chars it has to be padded with '\0',

    628         # which implies that a string of exactly 100 chars is stored without

    629         # a trailing '\0'.

    630         name = "0123456789" * 10
    631         tar = tarfile.open(tmpname, self.mode)
    632         t = tarfile.TarInfo(name)
    633         tar.addfile(t)
    634         tar.close()
    635 
    636         tar = tarfile.open(tmpname)
    637         self.assertTrue(tar.getnames()[0] == name,
    638                 "failed to store 100 char filename")
    639         tar.close()
    640 
    641     def test_tar_size(self):
    642         # Test for bug #1013882.

    643         tar = tarfile.open(tmpname, self.mode)
    644         path = os.path.join(TEMPDIR, "file")
    645         fobj = open(path, "wb")
    646         fobj.write("aaa")
    647         fobj.close()
    648         tar.add(path)
    649         tar.close()
    650         self.assertTrue(os.path.getsize(tmpname) > 0,
    651                 "tarfile is empty")
    652 
    653     # The test_*_size tests test for bug #1167128.

    654     def test_file_size(self):
    655         tar = tarfile.open(tmpname, self.mode)
    656 
    657         path = os.path.join(TEMPDIR, "file")
    658         fobj = open(path, "wb")
    659         fobj.close()
    660         tarinfo = tar.gettarinfo(path)
    661         self.assertEqual(tarinfo.size, 0)
    662 
    663         fobj = open(path, "wb")
    664         fobj.write("aaa")
    665         fobj.close()
    666         tarinfo = tar.gettarinfo(path)
    667         self.assertEqual(tarinfo.size, 3)
    668 
    669         tar.close()
    670 
    671     def test_directory_size(self):
    672         path = os.path.join(TEMPDIR, "directory")
    673         os.mkdir(path)
    674         try:
    675             tar = tarfile.open(tmpname, self.mode)
    676             tarinfo = tar.gettarinfo(path)
    677             self.assertEqual(tarinfo.size, 0)
    678         finally:
    679             os.rmdir(path)
    680 
    681     def test_link_size(self):
    682         if hasattr(os, "link"):
    683             link = os.path.join(TEMPDIR, "link")
    684             target = os.path.join(TEMPDIR, "link_target")
    685             fobj = open(target, "wb")
    686             fobj.write("aaa")
    687             fobj.close()
    688             os.link(target, link)
    689             try:
    690                 tar = tarfile.open(tmpname, self.mode)
    691                 # Record the link target in the inodes list.

    692                 tar.gettarinfo(target)
    693                 tarinfo = tar.gettarinfo(link)
    694                 self.assertEqual(tarinfo.size, 0)
    695             finally:
    696                 os.remove(target)
    697                 os.remove(link)
    698 
    699     def test_symlink_size(self):
    700         if hasattr(os, "symlink"):
    701             path = os.path.join(TEMPDIR, "symlink")
    702             os.symlink("link_target", path)
    703             try:
    704                 tar = tarfile.open(tmpname, self.mode)
    705                 tarinfo = tar.gettarinfo(path)
    706                 self.assertEqual(tarinfo.size, 0)
    707             finally:
    708                 os.remove(path)
    709 
    710     def test_add_self(self):
    711         # Test for #1257255.

    712         dstname = os.path.abspath(tmpname)
    713 
    714         tar = tarfile.open(tmpname, self.mode)
    715         self.assertTrue(tar.name == dstname, "archive name must be absolute")
    716 
    717         tar.add(dstname)
    718         self.assertTrue(tar.getnames() == [], "added the archive to itself")
    719 
    720         cwd = os.getcwd()
    721         os.chdir(TEMPDIR)
    722         tar.add(dstname)
    723         os.chdir(cwd)
    724         self.assertTrue(tar.getnames() == [], "added the archive to itself")
    725 
    726     def test_exclude(self):
    727         tempdir = os.path.join(TEMPDIR, "exclude")
    728         os.mkdir(tempdir)
    729         try:
    730             for name in ("foo", "bar", "baz"):
    731                 name = os.path.join(tempdir, name)
    732                 open(name, "wb").close()
    733 
    734             exclude = os.path.isfile
    735 
    736             tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
    737             with test_support.check_warnings(("use the filter argument",
    738                                               DeprecationWarning)):
    739                 tar.add(tempdir, arcname="empty_dir", exclude=exclude)
    740             tar.close()
    741 
    742             tar = tarfile.open(tmpname, "r")
    743             self.assertEqual(len(tar.getmembers()), 1)
    744             self.assertEqual(tar.getnames()[0], "empty_dir")
    745         finally:
    746             shutil.rmtree(tempdir)
    747 
    748     def test_filter(self):
    749         tempdir = os.path.join(TEMPDIR, "filter")
    750         os.mkdir(tempdir)
    751         try:
    752             for name in ("foo", "bar", "baz"):
    753                 name = os.path.join(tempdir, name)
    754                 open(name, "wb").close()
    755 
    756             def filter(tarinfo):
    757                 if os.path.basename(tarinfo.name) == "bar":
    758                     return
    759                 tarinfo.uid = 123
    760                 tarinfo.uname = "foo"
    761                 return tarinfo
    762 
    763             tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
    764             tar.add(tempdir, arcname="empty_dir", filter=filter)
    765             tar.close()
    766 
    767             tar = tarfile.open(tmpname, "r")
    768             for tarinfo in tar:
    769                 self.assertEqual(tarinfo.uid, 123)
    770                 self.assertEqual(tarinfo.uname, "foo")
    771             self.assertEqual(len(tar.getmembers()), 3)
    772             tar.close()
    773         finally:
    774             shutil.rmtree(tempdir)
    775 
    776     # Guarantee that stored pathnames are not modified. Don't

    777     # remove ./ or ../ or double slashes. Still make absolute

    778     # pathnames relative.

    779     # For details see bug #6054.

    780     def _test_pathname(self, path, cmp_path=None, dir=False):
    781         # Create a tarfile with an empty member named path

    782         # and compare the stored name with the original.

    783         foo = os.path.join(TEMPDIR, "foo")
    784         if not dir:
    785             open(foo, "w").close()
    786         else:
    787             os.mkdir(foo)
    788 
    789         tar = tarfile.open(tmpname, self.mode)
    790         tar.add(foo, arcname=path)
    791         tar.close()
    792 
    793         tar = tarfile.open(tmpname, "r")
    794         t = tar.next()
    795         tar.close()
    796 
    797         if not dir:
    798             os.remove(foo)
    799         else:
    800             os.rmdir(foo)
    801 
    802         self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
    803 
    804     def test_pathnames(self):
    805         self._test_pathname("foo")
    806         self._test_pathname(os.path.join("foo", ".", "bar"))
    807         self._test_pathname(os.path.join("foo", "..", "bar"))
    808         self._test_pathname(os.path.join(".", "foo"))
    809         self._test_pathname(os.path.join(".", "foo", "."))
    810         self._test_pathname(os.path.join(".", "foo", ".", "bar"))
    811         self._test_pathname(os.path.join(".", "foo", "..", "bar"))
    812         self._test_pathname(os.path.join(".", "foo", "..", "bar"))
    813         self._test_pathname(os.path.join("..", "foo"))
    814         self._test_pathname(os.path.join("..", "foo", ".."))
    815         self._test_pathname(os.path.join("..", "foo", ".", "bar"))
    816         self._test_pathname(os.path.join("..", "foo", "..", "bar"))
    817 
    818         self._test_pathname("foo" + os.sep + os.sep + "bar")
    819         self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
    820 
    821     def test_abs_pathnames(self):
    822         if sys.platform == "win32":
    823             self._test_pathname("C:\\foo", "foo")
    824         else:
    825             self._test_pathname("/foo", "foo")
    826             self._test_pathname("///foo", "foo")
    827 
    828     def test_cwd(self):
    829         # Test adding the current working directory.

    830         cwd = os.getcwd()
    831         os.chdir(TEMPDIR)
    832         try:
    833             open("foo", "w").close()
    834 
    835             tar = tarfile.open(tmpname, self.mode)
    836             tar.add(".")
    837             tar.close()
    838 
    839             tar = tarfile.open(tmpname, "r")
    840             for t in tar:
    841                 self.assert_(t.name == "." or t.name.startswith("./"))
    842             tar.close()
    843         finally:
    844             os.chdir(cwd)
    845 
    846     @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
    847     def test_extractall_symlinks(self):
    848         # Test if extractall works properly when tarfile contains symlinks

    849         tempdir = os.path.join(TEMPDIR, "testsymlinks")
    850         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
    851         os.mkdir(tempdir)
    852         try:
    853             source_file = os.path.join(tempdir,'source')
    854             target_file = os.path.join(tempdir,'symlink')
    855             with open(source_file,'w') as f:
    856                 f.write('something\n')
    857             os.symlink(source_file, target_file)
    858             tar = tarfile.open(temparchive,'w')
    859             tar.add(source_file, arcname=os.path.basename(source_file))
    860             tar.add(target_file, arcname=os.path.basename(target_file))
    861             tar.close()
    862             # Let's extract it to the location which contains the symlink

    863             tar = tarfile.open(temparchive,'r')
    864             # this should not raise OSError: [Errno 17] File exists

    865             try:
    866                 tar.extractall(path=tempdir)
    867             except OSError:
    868                 self.fail("extractall failed with symlinked files")
    869             finally:
    870                 tar.close()
    871         finally:
    872             os.unlink(temparchive)
    873             shutil.rmtree(tempdir)
    874 
    875     @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
    876     def test_extractall_broken_symlinks(self):
    877         # Test if extractall works properly when tarfile contains broken

    878         # symlinks

    879         tempdir = os.path.join(TEMPDIR, "testsymlinks")
    880         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
    881         os.mkdir(tempdir)
    882         try:
    883             source_file = os.path.join(tempdir,'source')
    884             target_file = os.path.join(tempdir,'symlink')
    885             with open(source_file,'w') as f:
    886                 f.write('something\n')
    887             os.symlink(source_file, target_file)
    888             tar = tarfile.open(temparchive,'w')
    889             tar.add(target_file, arcname=os.path.basename(target_file))
    890             tar.close()
    891             # remove the real file

    892             os.unlink(source_file)
    893             # Let's extract it to the location which contains the symlink

    894             tar = tarfile.open(temparchive,'r')
    895             # this should not raise OSError: [Errno 17] File exists

    896             try:
    897                 tar.extractall(path=tempdir)
    898             except OSError:
    899                 self.fail("extractall failed with broken symlinked files")
    900             finally:
    901                 tar.close()
    902         finally:
    903             os.unlink(temparchive)
    904             shutil.rmtree(tempdir)
    905 
    906     @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
    907     def test_extractall_hardlinks(self):
    908         # Test if extractall works properly when tarfile contains symlinks

    909         tempdir = os.path.join(TEMPDIR, "testsymlinks")
    910         temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
    911         os.mkdir(tempdir)
    912         try:
    913             source_file = os.path.join(tempdir,'source')
    914             target_file = os.path.join(tempdir,'symlink')
    915             with open(source_file,'w') as f:
    916                 f.write('something\n')
    917             os.link(source_file, target_file)
    918             tar = tarfile.open(temparchive,'w')
    919             tar.add(source_file, arcname=os.path.basename(source_file))
    920             tar.add(target_file, arcname=os.path.basename(target_file))
    921             tar.close()
    922             # Let's extract it to the location which contains the symlink

    923             tar = tarfile.open(temparchive,'r')
    924             # this should not raise OSError: [Errno 17] File exists

    925             try:
    926                 tar.extractall(path=tempdir)
    927             except OSError:
    928                 self.fail("extractall failed with linked files")
    929             finally:
    930                 tar.close()
    931         finally:
    932             os.unlink(temparchive)
    933             shutil.rmtree(tempdir)
    934 
    935 class StreamWriteTest(WriteTestBase):
    936 
    937     mode = "w|"
    938 
    939     def test_stream_padding(self):
    940         # Test for bug #1543303.

    941         tar = tarfile.open(tmpname, self.mode)
    942         tar.close()
    943 
    944         if self.mode.endswith("gz"):
    945             fobj = gzip.GzipFile(tmpname)
    946             data = fobj.read()
    947             fobj.close()
    948         elif self.mode.endswith("bz2"):
    949             dec = bz2.BZ2Decompressor()
    950             data = open(tmpname, "rb").read()
    951             data = dec.decompress(data)
    952             self.assertTrue(len(dec.unused_data) == 0,
    953                     "found trailing data")
    954         else:
    955             fobj = open(tmpname, "rb")
    956             data = fobj.read()
    957             fobj.close()
    958 
    959         self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
    960                          "incorrect zero padding")
    961 
    962     def test_file_mode(self):
    963         # Test for issue #8464: Create files with correct

    964         # permissions.

    965         if sys.platform == "win32" or not hasattr(os, "umask"):
    966             return
    967 
    968         if os.path.exists(tmpname):
    969             os.remove(tmpname)
    970 
    971         original_umask = os.umask(0022)
    972         try:
    973             tar = tarfile.open(tmpname, self.mode)
    974             tar.close()
    975             mode = os.stat(tmpname).st_mode & 0777
    976             self.assertEqual(mode, 0644, "wrong file permissions")
    977         finally:
    978             os.umask(original_umask)
    979 
    980 
    981 class GNUWriteTest(unittest.TestCase):
    982     # This testcase checks for correct creation of GNU Longname

    983     # and Longlink extended headers (cp. bug #812325).

    984 
    985     def _length(self, s):
    986         blocks, remainder = divmod(len(s) + 1, 512)
    987         if remainder:
    988             blocks += 1
    989         return blocks * 512
    990 
    991     def _calc_size(self, name, link=None):
    992         # Initial tar header

    993         count = 512
    994 
    995         if len(name) > tarfile.LENGTH_NAME:
    996             # GNU longname extended header + longname

    997             count += 512
    998             count += self._length(name)
    999         if link is not None and len(link) > tarfile.LENGTH_LINK:
   1000             # GNU longlink extended header + longlink

   1001             count += 512
   1002             count += self._length(link)
   1003         return count
   1004 
   1005     def _test(self, name, link=None):
   1006         tarinfo = tarfile.TarInfo(name)
   1007         if link:
   1008             tarinfo.linkname = link
   1009             tarinfo.type = tarfile.LNKTYPE
   1010 
   1011         tar = tarfile.open(tmpname, "w")
   1012         tar.format = tarfile.GNU_FORMAT
   1013         tar.addfile(tarinfo)
   1014 
   1015         v1 = self._calc_size(name, link)
   1016         v2 = tar.offset
   1017         self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
   1018 
   1019         tar.close()
   1020 
   1021         tar = tarfile.open(tmpname)
   1022         member = tar.next()
   1023         self.assertIsNotNone(member,
   1024                 "unable to read longname member")
   1025         self.assertEqual(tarinfo.name, member.name,
   1026                 "unable to read longname member")
   1027         self.assertEqual(tarinfo.linkname, member.linkname,
   1028                 "unable to read longname member")
   1029 
   1030     def test_longname_1023(self):
   1031         self._test(("longnam/" * 127) + "longnam")
   1032 
   1033     def test_longname_1024(self):
   1034         self._test(("longnam/" * 127) + "longname")
   1035 
   1036     def test_longname_1025(self):
   1037         self._test(("longnam/" * 127) + "longname_")
   1038 
   1039     def test_longlink_1023(self):
   1040         self._test("name", ("longlnk/" * 127) + "longlnk")
   1041 
   1042     def test_longlink_1024(self):
   1043         self._test("name", ("longlnk/" * 127) + "longlink")
   1044 
   1045     def test_longlink_1025(self):
   1046         self._test("name", ("longlnk/" * 127) + "longlink_")
   1047 
   1048     def test_longnamelink_1023(self):
   1049         self._test(("longnam/" * 127) + "longnam",
   1050                    ("longlnk/" * 127) + "longlnk")
   1051 
   1052     def test_longnamelink_1024(self):
   1053         self._test(("longnam/" * 127) + "longname",
   1054                    ("longlnk/" * 127) + "longlink")
   1055 
   1056     def test_longnamelink_1025(self):
   1057         self._test(("longnam/" * 127) + "longname_",
   1058                    ("longlnk/" * 127) + "longlink_")
   1059 
   1060 
   1061 class HardlinkTest(unittest.TestCase):
   1062     # Test the creation of LNKTYPE (hardlink) members in an archive.

   1063 
   1064     def setUp(self):
   1065         self.foo = os.path.join(TEMPDIR, "foo")
   1066         self.bar = os.path.join(TEMPDIR, "bar")
   1067 
   1068         fobj = open(self.foo, "wb")
   1069         fobj.write("foo")
   1070         fobj.close()
   1071 
   1072         os.link(self.foo, self.bar)
   1073 
   1074         self.tar = tarfile.open(tmpname, "w")
   1075         self.tar.add(self.foo)
   1076 
   1077     def tearDown(self):
   1078         self.tar.close()
   1079         os.remove(self.foo)
   1080         os.remove(self.bar)
   1081 
   1082     def test_add_twice(self):
   1083         # The same name will be added as a REGTYPE every

   1084         # time regardless of st_nlink.

   1085         tarinfo = self.tar.gettarinfo(self.foo)
   1086         self.assertTrue(tarinfo.type == tarfile.REGTYPE,
   1087                 "add file as regular failed")
   1088 
   1089     def test_add_hardlink(self):
   1090         tarinfo = self.tar.gettarinfo(self.bar)
   1091         self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
   1092                 "add file as hardlink failed")
   1093 
   1094     def test_dereference_hardlink(self):
   1095         self.tar.dereference = True
   1096         tarinfo = self.tar.gettarinfo(self.bar)
   1097         self.assertTrue(tarinfo.type == tarfile.REGTYPE,
   1098                 "dereferencing hardlink failed")
   1099 
   1100 
   1101 class PaxWriteTest(GNUWriteTest):
   1102 
   1103     def _test(self, name, link=None):
   1104         # See GNUWriteTest.

   1105         tarinfo = tarfile.TarInfo(name)
   1106         if link:
   1107             tarinfo.linkname = link
   1108             tarinfo.type = tarfile.LNKTYPE
   1109 
   1110         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
   1111         tar.addfile(tarinfo)
   1112         tar.close()
   1113 
   1114         tar = tarfile.open(tmpname)
   1115         if link:
   1116             l = tar.getmembers()[0].linkname
   1117             self.assertTrue(link == l, "PAX longlink creation failed")
   1118         else:
   1119             n = tar.getmembers()[0].name
   1120             self.assertTrue(name == n, "PAX longname creation failed")
   1121 
   1122     def test_pax_global_header(self):
   1123         pax_headers = {
   1124                 u"foo": u"bar",
   1125                 u"uid": u"0",
   1126                 u"mtime": u"1.23",
   1127                 u"test": u"",
   1128                 u"": u"test"}
   1129 
   1130         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
   1131                 pax_headers=pax_headers)
   1132         tar.addfile(tarfile.TarInfo("test"))
   1133         tar.close()
   1134 
   1135         # Test if the global header was written correctly.

   1136         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1137         self.assertEqual(tar.pax_headers, pax_headers)
   1138         self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
   1139 
   1140         # Test if all the fields are unicode.

   1141         for key, val in tar.pax_headers.iteritems():
   1142             self.assertTrue(type(key) is unicode)
   1143             self.assertTrue(type(val) is unicode)
   1144             if key in tarfile.PAX_NUMBER_FIELDS:
   1145                 try:
   1146                     tarfile.PAX_NUMBER_FIELDS[key](val)
   1147                 except (TypeError, ValueError):
   1148                     self.fail("unable to convert pax header field")
   1149 
   1150     def test_pax_extended_header(self):
   1151         # The fields from the pax header have priority over the

   1152         # TarInfo.

   1153         pax_headers = {u"path": u"foo", u"uid": u"123"}
   1154 
   1155         tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
   1156         t = tarfile.TarInfo()
   1157         t.name = u""     # non-ASCII

   1158         t.uid = 8**8        # too large

   1159         t.pax_headers = pax_headers
   1160         tar.addfile(t)
   1161         tar.close()
   1162 
   1163         tar = tarfile.open(tmpname, encoding="iso8859-1")
   1164         t = tar.getmembers()[0]
   1165         self.assertEqual(t.pax_headers, pax_headers)
   1166         self.assertEqual(t.name, "foo")
   1167         self.assertEqual(t.uid, 123)
   1168 
   1169 
   1170 class UstarUnicodeTest(unittest.TestCase):
   1171     # All *UnicodeTests FIXME

   1172 
   1173     format = tarfile.USTAR_FORMAT
   1174 
   1175     def test_iso8859_1_filename(self):
   1176         self._test_unicode_filename("iso8859-1")
   1177 
   1178     def test_utf7_filename(self):
   1179         self._test_unicode_filename("utf7")
   1180 
   1181     def test_utf8_filename(self):
   1182         self._test_unicode_filename("utf8")
   1183 
   1184     def _test_unicode_filename(self, encoding):
   1185         tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
   1186         name = u""
   1187         tar.addfile(tarfile.TarInfo(name))
   1188         tar.close()
   1189 
   1190         tar = tarfile.open(tmpname, encoding=encoding)
   1191         self.assertTrue(type(tar.getnames()[0]) is not unicode)
   1192         self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
   1193         tar.close()
   1194 
   1195     def test_unicode_filename_error(self):
   1196         tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
   1197         tarinfo = tarfile.TarInfo()
   1198 
   1199         tarinfo.name = ""
   1200         if self.format == tarfile.PAX_FORMAT:
   1201             self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1202         else:
   1203             tar.addfile(tarinfo)
   1204 
   1205         tarinfo.name = u""
   1206         self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1207 
   1208         tarinfo.name = "foo"
   1209         tarinfo.uname = u""
   1210         self.assertRaises(UnicodeError, tar.addfile, tarinfo)
   1211 
   1212     def test_unicode_argument(self):
   1213         tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
   1214         for t in tar:
   1215             self.assertTrue(type(t.name) is str)
   1216             self.assertTrue(type(t.linkname) is str)
   1217             self.assertTrue(type(t.uname) is str)
   1218             self.assertTrue(type(t.gname) is str)
   1219         tar.close()
   1220 
   1221     def test_uname_unicode(self):
   1222         for name in (u"", ""):
   1223             t = tarfile.TarInfo("foo")
   1224             t.uname = name
   1225             t.gname = name
   1226 
   1227             fobj = StringIO.StringIO()
   1228             tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
   1229             tar.addfile(t)
   1230             tar.close()
   1231             fobj.seek(0)
   1232 
   1233             tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
   1234             t = tar.getmember("foo")
   1235             self.assertEqual(t.uname, "")
   1236             self.assertEqual(t.gname, "")
   1237 
   1238 
   1239 class GNUUnicodeTest(UstarUnicodeTest):
   1240 
   1241     format = tarfile.GNU_FORMAT
   1242 
   1243 
   1244 class PaxUnicodeTest(UstarUnicodeTest):
   1245 
   1246     format = tarfile.PAX_FORMAT
   1247 
   1248     def _create_unicode_name(self, name):
   1249         tar = tarfile.open(tmpname, "w", format=self.format)
   1250         t = tarfile.TarInfo()
   1251         t.pax_headers["path"] = name
   1252         tar.addfile(t)
   1253         tar.close()
   1254 
   1255     def test_error_handlers(self):
   1256         # Test if the unicode error handlers work correctly for characters

   1257         # that cannot be expressed in a given encoding.

   1258         self._create_unicode_name(u"")
   1259 
   1260         for handler, name in (("utf-8", u"".encode("utf8")),
   1261                     ("replace", "???"), ("ignore", "")):
   1262             tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
   1263                     errors=handler)
   1264             self.assertEqual(tar.getnames()[0], name)
   1265 
   1266         self.assertRaises(UnicodeError, tarfile.open, tmpname,
   1267                 encoding="ascii", errors="strict")
   1268 
   1269     def test_error_handler_utf8(self):
   1270         # Create a pathname that has one component representable using

   1271         # iso8859-1 and the other only in iso8859-15.

   1272         self._create_unicode_name(u"/")
   1273 
   1274         tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
   1275                 errors="utf-8")
   1276         self.assertEqual(tar.getnames()[0], "/" + u"".encode("utf8"))
   1277 
   1278 
   1279 class AppendTest(unittest.TestCase):
   1280     # Test append mode (cp. patch #1652681).

   1281 
   1282     def setUp(self):
   1283         self.tarname = tmpname
   1284         if os.path.exists(self.tarname):
   1285             os.remove(self.tarname)
   1286 
   1287     def _add_testfile(self, fileobj=None):
   1288         tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
   1289         tar.addfile(tarfile.TarInfo("bar"))
   1290         tar.close()
   1291 
   1292     def _create_testtar(self, mode="w:"):
   1293         src = tarfile.open(tarname, encoding="iso8859-1")
   1294         t = src.getmember("ustar/regtype")
   1295         t.name = "foo"
   1296         f = src.extractfile(t)
   1297         tar = tarfile.open(self.tarname, mode)
   1298         tar.addfile(t, f)
   1299         tar.close()
   1300 
   1301     def _test(self, names=["bar"], fileobj=None):
   1302         tar = tarfile.open(self.tarname, fileobj=fileobj)
   1303         self.assertEqual(tar.getnames(), names)
   1304 
   1305     def test_non_existing(self):
   1306         self._add_testfile()
   1307         self._test()
   1308 
   1309     def test_empty(self):
   1310         tarfile.open(self.tarname, "w:").close()
   1311         self._add_testfile()
   1312         self._test()
   1313 
   1314     def test_empty_fileobj(self):
   1315         fobj = StringIO.StringIO("\0" * 1024)
   1316         self._add_testfile(fobj)
   1317         fobj.seek(0)
   1318         self._test(fileobj=fobj)
   1319 
   1320     def test_fileobj(self):
   1321         self._create_testtar()
   1322         data = open(self.tarname).read()
   1323         fobj = StringIO.StringIO(data)
   1324         self._add_testfile(fobj)
   1325         fobj.seek(0)
   1326         self._test(names=["foo", "bar"], fileobj=fobj)
   1327 
   1328     def test_existing(self):
   1329         self._create_testtar()
   1330         self._add_testfile()
   1331         self._test(names=["foo", "bar"])
   1332 
   1333     def test_append_gz(self):
   1334         if gzip is None:
   1335             return
   1336         self._create_testtar("w:gz")
   1337         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
   1338 
   1339     def test_append_bz2(self):
   1340         if bz2 is None:
   1341             return
   1342         self._create_testtar("w:bz2")
   1343         self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
   1344 
   1345     # Append mode is supposed to fail if the tarfile to append to

   1346     # does not end with a zero block.

   1347     def _test_error(self, data):
   1348         open(self.tarname, "wb").write(data)
   1349         self.assertRaises(tarfile.ReadError, self._add_testfile)
   1350 
   1351     def test_null(self):
   1352         self._test_error("")
   1353 
   1354     def test_incomplete(self):
   1355         self._test_error("\0" * 13)
   1356 
   1357     def test_premature_eof(self):
   1358         data = tarfile.TarInfo("foo").tobuf()
   1359         self._test_error(data)
   1360 
   1361     def test_trailing_garbage(self):
   1362         data = tarfile.TarInfo("foo").tobuf()
   1363         self._test_error(data + "\0" * 13)
   1364 
   1365     def test_invalid(self):
   1366         self._test_error("a" * 512)
   1367 
   1368 
   1369 class LimitsTest(unittest.TestCase):
   1370 
   1371     def test_ustar_limits(self):
   1372         # 100 char name

   1373         tarinfo = tarfile.TarInfo("0123456789" * 10)
   1374         tarinfo.tobuf(tarfile.USTAR_FORMAT)
   1375 
   1376         # 101 char name that cannot be stored

   1377         tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
   1378         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1379 
   1380         # 256 char name with a slash at pos 156

   1381         tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
   1382         tarinfo.tobuf(tarfile.USTAR_FORMAT)
   1383 
   1384         # 256 char name that cannot be stored

   1385         tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
   1386         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1387 
   1388         # 512 char name

   1389         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1390         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1391 
   1392         # 512 char linkname

   1393         tarinfo = tarfile.TarInfo("longlink")
   1394         tarinfo.linkname = "123/" * 126 + "longname"
   1395         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1396 
   1397         # uid > 8 digits

   1398         tarinfo = tarfile.TarInfo("name")
   1399         tarinfo.uid = 010000000
   1400         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
   1401 
   1402     def test_gnu_limits(self):
   1403         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1404         tarinfo.tobuf(tarfile.GNU_FORMAT)
   1405 
   1406         tarinfo = tarfile.TarInfo("longlink")
   1407         tarinfo.linkname = "123/" * 126 + "longname"
   1408         tarinfo.tobuf(tarfile.GNU_FORMAT)
   1409 
   1410         # uid >= 256 ** 7

   1411         tarinfo = tarfile.TarInfo("name")
   1412         tarinfo.uid = 04000000000000000000L
   1413         self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
   1414 
   1415     def test_pax_limits(self):
   1416         tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
   1417         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1418 
   1419         tarinfo = tarfile.TarInfo("longlink")
   1420         tarinfo.linkname = "123/" * 126 + "longname"
   1421         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1422 
   1423         tarinfo = tarfile.TarInfo("name")
   1424         tarinfo.uid = 04000000000000000000L
   1425         tarinfo.tobuf(tarfile.PAX_FORMAT)
   1426 
   1427 
   1428 class ContextManagerTest(unittest.TestCase):
   1429 
   1430     def test_basic(self):
   1431         with tarfile.open(tarname) as tar:
   1432             self.assertFalse(tar.closed, "closed inside runtime context")
   1433         self.assertTrue(tar.closed, "context manager failed")
   1434 
   1435     def test_closed(self):
   1436         # The __enter__() method is supposed to raise IOError

   1437         # if the TarFile object is already closed.

   1438         tar = tarfile.open(tarname)
   1439         tar.close()
   1440         with self.assertRaises(IOError):
   1441             with tar:
   1442                 pass
   1443 
   1444     def test_exception(self):
   1445         # Test if the IOError exception is passed through properly.

   1446         with self.assertRaises(Exception) as exc:
   1447             with tarfile.open(tarname) as tar:
   1448                 raise IOError
   1449         self.assertIsInstance(exc.exception, IOError,
   1450                               "wrong exception raised in context manager")
   1451         self.assertTrue(tar.closed, "context manager failed")
   1452 
   1453     def test_no_eof(self):
   1454         # __exit__() must not write end-of-archive blocks if an

   1455         # exception was raised.

   1456         try:
   1457             with tarfile.open(tmpname, "w") as tar:
   1458                 raise Exception
   1459         except:
   1460             pass
   1461         self.assertEqual(os.path.getsize(tmpname), 0,
   1462                 "context manager wrote an end-of-archive block")
   1463         self.assertTrue(tar.closed, "context manager failed")
   1464 
   1465     def test_eof(self):
   1466         # __exit__() must write end-of-archive blocks, i.e. call

   1467         # TarFile.close() if there was no error.

   1468         with tarfile.open(tmpname, "w"):
   1469             pass
   1470         self.assertNotEqual(os.path.getsize(tmpname), 0,
   1471                 "context manager wrote no end-of-archive block")
   1472 
   1473     def test_fileobj(self):
   1474         # Test that __exit__() did not close the external file

   1475         # object.

   1476         fobj = open(tmpname, "wb")
   1477         try:
   1478             with tarfile.open(fileobj=fobj, mode="w") as tar:
   1479                 raise Exception
   1480         except:
   1481             pass
   1482         self.assertFalse(fobj.closed, "external file object was closed")
   1483         self.assertTrue(tar.closed, "context manager failed")
   1484         fobj.close()
   1485 
   1486 
   1487 class LinkEmulationTest(ReadTest):
   1488 
   1489     # Test for issue #8741 regression. On platforms that do not support

   1490     # symbolic or hard links tarfile tries to extract these types of members as

   1491     # the regular files they point to.

   1492     def _test_link_extraction(self, name):
   1493         self.tar.extract(name, TEMPDIR)
   1494         data = open(os.path.join(TEMPDIR, name), "rb").read()
   1495         self.assertEqual(md5sum(data), md5_regtype)
   1496 
   1497     def test_hardlink_extraction1(self):
   1498         self._test_link_extraction("ustar/lnktype")
   1499 
   1500     def test_hardlink_extraction2(self):
   1501         self._test_link_extraction("./ustar/linktest2/lnktype")
   1502 
   1503     def test_symlink_extraction1(self):
   1504         self._test_link_extraction("ustar/symtype")
   1505 
   1506     def test_symlink_extraction2(self):
   1507         self._test_link_extraction("./ustar/linktest2/symtype")
   1508 
   1509 
   1510 class GzipMiscReadTest(MiscReadTest):
   1511     tarname = gzipname
   1512     mode = "r:gz"
   1513 class GzipUstarReadTest(UstarReadTest):
   1514     tarname = gzipname
   1515     mode = "r:gz"
   1516 class GzipStreamReadTest(StreamReadTest):
   1517     tarname = gzipname
   1518     mode = "r|gz"
   1519 class GzipWriteTest(WriteTest):
   1520     mode = "w:gz"
   1521 class GzipStreamWriteTest(StreamWriteTest):
   1522     mode = "w|gz"
   1523 
   1524 
   1525 class Bz2MiscReadTest(MiscReadTest):
   1526     tarname = bz2name
   1527     mode = "r:bz2"
   1528 class Bz2UstarReadTest(UstarReadTest):
   1529     tarname = bz2name
   1530     mode = "r:bz2"
   1531 class Bz2StreamReadTest(StreamReadTest):
   1532     tarname = bz2name
   1533     mode = "r|bz2"
   1534 class Bz2WriteTest(WriteTest):
   1535     mode = "w:bz2"
   1536 class Bz2StreamWriteTest(StreamWriteTest):
   1537     mode = "w|bz2"
   1538 
   1539 class Bz2PartialReadTest(unittest.TestCase):
   1540     # Issue5068: The _BZ2Proxy.read() method loops forever

   1541     # on an empty or partial bzipped file.

   1542 
   1543     def _test_partial_input(self, mode):
   1544         class MyStringIO(StringIO.StringIO):
   1545             hit_eof = False
   1546             def read(self, n):
   1547                 if self.hit_eof:
   1548                     raise AssertionError("infinite loop detected in tarfile.open()")
   1549                 self.hit_eof = self.pos == self.len
   1550                 return StringIO.StringIO.read(self, n)
   1551             def seek(self, *args):
   1552                 self.hit_eof = False
   1553                 return StringIO.StringIO.seek(self, *args)
   1554 
   1555         data = bz2.compress(tarfile.TarInfo("foo").tobuf())
   1556         for x in range(len(data) + 1):
   1557             try:
   1558                 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
   1559             except tarfile.ReadError:
   1560                 pass # we have no interest in ReadErrors

   1561 
   1562     def test_partial_input(self):
   1563         self._test_partial_input("r")
   1564 
   1565     def test_partial_input_bz2(self):
   1566         self._test_partial_input("r:bz2")
   1567 
   1568 
   1569 def test_main():
   1570     os.makedirs(TEMPDIR)
   1571 
   1572     tests = [
   1573         UstarReadTest,
   1574         MiscReadTest,
   1575         StreamReadTest,
   1576         DetectReadTest,
   1577         MemberReadTest,
   1578         GNUReadTest,
   1579         PaxReadTest,
   1580         WriteTest,
   1581         StreamWriteTest,
   1582         GNUWriteTest,
   1583         PaxWriteTest,
   1584         UstarUnicodeTest,
   1585         GNUUnicodeTest,
   1586         PaxUnicodeTest,
   1587         AppendTest,
   1588         LimitsTest,
   1589         ContextManagerTest,
   1590     ]
   1591 
   1592     if hasattr(os, "link"):
   1593         tests.append(HardlinkTest)
   1594     else:
   1595         tests.append(LinkEmulationTest)
   1596 
   1597     fobj = open(tarname, "rb")
   1598     data = fobj.read()
   1599     fobj.close()
   1600 
   1601     if gzip:
   1602         # Create testtar.tar.gz and add gzip-specific tests.

   1603         tar = gzip.open(gzipname, "wb")
   1604         tar.write(data)
   1605         tar.close()
   1606 
   1607         tests += [
   1608             GzipMiscReadTest,
   1609             GzipUstarReadTest,
   1610             GzipStreamReadTest,
   1611             GzipWriteTest,
   1612             GzipStreamWriteTest,
   1613         ]
   1614 
   1615     if bz2:
   1616         # Create testtar.tar.bz2 and add bz2-specific tests.

   1617         tar = bz2.BZ2File(bz2name, "wb")
   1618         tar.write(data)
   1619         tar.close()
   1620 
   1621         tests += [
   1622             Bz2MiscReadTest,
   1623             Bz2UstarReadTest,
   1624             Bz2StreamReadTest,
   1625             Bz2WriteTest,
   1626             Bz2StreamWriteTest,
   1627             Bz2PartialReadTest,
   1628         ]
   1629 
   1630     try:
   1631         test_support.run_unittest(*tests)
   1632     finally:
   1633         if os.path.exists(TEMPDIR):
   1634             shutil.rmtree(TEMPDIR)
   1635 
   1636 if __name__ == "__main__":
   1637     test_main()
   1638