Home | History | Annotate | Download | only in test
      1 # Copyright (C) 2003 Python Software Foundation
      2 
      3 import unittest
      4 import unittest.mock
      5 import shutil
      6 import tempfile
      7 import sys
      8 import stat
      9 import os
     10 import os.path
     11 import errno
     12 import functools
     13 import subprocess
     14 from contextlib import ExitStack
     15 from shutil import (make_archive,
     16                     register_archive_format, unregister_archive_format,
     17                     get_archive_formats, Error, unpack_archive,
     18                     register_unpack_format, RegistryError,
     19                     unregister_unpack_format, get_unpack_formats,
     20                     SameFileError)
     21 import tarfile
     22 import zipfile
     23 import warnings
     24 
     25 from test import support
     26 from test.support import (TESTFN, check_warnings, captured_stdout,
     27                           android_not_root)
     28 
     29 TESTFN2 = TESTFN + "2"
     30 
     31 try:
     32     import grp
     33     import pwd
     34     UID_GID_SUPPORT = True
     35 except ImportError:
     36     UID_GID_SUPPORT = False
     37 
     38 def _fake_rename(*args, **kwargs):
     39     # Pretend the destination path is on a different filesystem.
     40     raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
     41 
     42 def mock_rename(func):
     43     @functools.wraps(func)
     44     def wrap(*args, **kwargs):
     45         try:
     46             builtin_rename = os.rename
     47             os.rename = _fake_rename
     48             return func(*args, **kwargs)
     49         finally:
     50             os.rename = builtin_rename
     51     return wrap
     52 
     53 def write_file(path, content, binary=False):
     54     """Write *content* to a file located at *path*.
     55 
     56     If *path* is a tuple instead of a string, os.path.join will be used to
     57     make a path.  If *binary* is true, the file will be opened in binary
     58     mode.
     59     """
     60     if isinstance(path, tuple):
     61         path = os.path.join(*path)
     62     with open(path, 'wb' if binary else 'w') as fp:
     63         fp.write(content)
     64 
     65 def read_file(path, binary=False):
     66     """Return contents from a file located at *path*.
     67 
     68     If *path* is a tuple instead of a string, os.path.join will be used to
     69     make a path.  If *binary* is true, the file will be opened in binary
     70     mode.
     71     """
     72     if isinstance(path, tuple):
     73         path = os.path.join(*path)
     74     with open(path, 'rb' if binary else 'r') as fp:
     75         return fp.read()
     76 
     77 def rlistdir(path):
     78     res = []
     79     for name in sorted(os.listdir(path)):
     80         p = os.path.join(path, name)
     81         if os.path.isdir(p) and not os.path.islink(p):
     82             res.append(name + '/')
     83             for n in rlistdir(p):
     84                 res.append(name + '/' + n)
     85         else:
     86             res.append(name)
     87     return res
     88 
     89 
     90 class TestShutil(unittest.TestCase):
     91 
     92     def setUp(self):
     93         super(TestShutil, self).setUp()
     94         self.tempdirs = []
     95 
     96     def tearDown(self):
     97         super(TestShutil, self).tearDown()
     98         while self.tempdirs:
     99             d = self.tempdirs.pop()
    100             shutil.rmtree(d, os.name in ('nt', 'cygwin'))
    101 
    102 
    103     def mkdtemp(self):
    104         """Create a temporary directory that will be cleaned up.
    105 
    106         Returns the path of the directory.
    107         """
    108         d = tempfile.mkdtemp()
    109         self.tempdirs.append(d)
    110         return d
    111 
    112     def test_rmtree_works_on_bytes(self):
    113         tmp = self.mkdtemp()
    114         victim = os.path.join(tmp, 'killme')
    115         os.mkdir(victim)
    116         write_file(os.path.join(victim, 'somefile'), 'foo')
    117         victim = os.fsencode(victim)
    118         self.assertIsInstance(victim, bytes)
    119         shutil.rmtree(victim)
    120 
    121     @support.skip_unless_symlink
    122     def test_rmtree_fails_on_symlink(self):
    123         tmp = self.mkdtemp()
    124         dir_ = os.path.join(tmp, 'dir')
    125         os.mkdir(dir_)
    126         link = os.path.join(tmp, 'link')
    127         os.symlink(dir_, link)
    128         self.assertRaises(OSError, shutil.rmtree, link)
    129         self.assertTrue(os.path.exists(dir_))
    130         self.assertTrue(os.path.lexists(link))
    131         errors = []
    132         def onerror(*args):
    133             errors.append(args)
    134         shutil.rmtree(link, onerror=onerror)
    135         self.assertEqual(len(errors), 1)
    136         self.assertIs(errors[0][0], os.path.islink)
    137         self.assertEqual(errors[0][1], link)
    138         self.assertIsInstance(errors[0][2][1], OSError)
    139 
    140     @support.skip_unless_symlink
    141     def test_rmtree_works_on_symlinks(self):
    142         tmp = self.mkdtemp()
    143         dir1 = os.path.join(tmp, 'dir1')
    144         dir2 = os.path.join(dir1, 'dir2')
    145         dir3 = os.path.join(tmp, 'dir3')
    146         for d in dir1, dir2, dir3:
    147             os.mkdir(d)
    148         file1 = os.path.join(tmp, 'file1')
    149         write_file(file1, 'foo')
    150         link1 = os.path.join(dir1, 'link1')
    151         os.symlink(dir2, link1)
    152         link2 = os.path.join(dir1, 'link2')
    153         os.symlink(dir3, link2)
    154         link3 = os.path.join(dir1, 'link3')
    155         os.symlink(file1, link3)
    156         # make sure symlinks are removed but not followed
    157         shutil.rmtree(dir1)
    158         self.assertFalse(os.path.exists(dir1))
    159         self.assertTrue(os.path.exists(dir3))
    160         self.assertTrue(os.path.exists(file1))
    161 
    162     def test_rmtree_errors(self):
    163         # filename is guaranteed not to exist
    164         filename = tempfile.mktemp()
    165         self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
    166         # test that ignore_errors option is honored
    167         shutil.rmtree(filename, ignore_errors=True)
    168 
    169         # existing file
    170         tmpdir = self.mkdtemp()
    171         write_file((tmpdir, "tstfile"), "")
    172         filename = os.path.join(tmpdir, "tstfile")
    173         with self.assertRaises(NotADirectoryError) as cm:
    174             shutil.rmtree(filename)
    175         # The reason for this rather odd construct is that Windows sprinkles
    176         # a \*.* at the end of file names. But only sometimes on some buildbots
    177         possible_args = [filename, os.path.join(filename, '*.*')]
    178         self.assertIn(cm.exception.filename, possible_args)
    179         self.assertTrue(os.path.exists(filename))
    180         # test that ignore_errors option is honored
    181         shutil.rmtree(filename, ignore_errors=True)
    182         self.assertTrue(os.path.exists(filename))
    183         errors = []
    184         def onerror(*args):
    185             errors.append(args)
    186         shutil.rmtree(filename, onerror=onerror)
    187         self.assertEqual(len(errors), 2)
    188         self.assertIs(errors[0][0], os.listdir)
    189         self.assertEqual(errors[0][1], filename)
    190         self.assertIsInstance(errors[0][2][1], NotADirectoryError)
    191         self.assertIn(errors[0][2][1].filename, possible_args)
    192         self.assertIs(errors[1][0], os.rmdir)
    193         self.assertEqual(errors[1][1], filename)
    194         self.assertIsInstance(errors[1][2][1], NotADirectoryError)
    195         self.assertIn(errors[1][2][1].filename, possible_args)
    196 
    197 
    198     @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
    199     @unittest.skipIf(sys.platform[:6] == 'cygwin',
    200                      "This test can't be run on Cygwin (issue #1071513).")
    201     @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
    202                      "This test can't be run reliably as root (issue #1076467).")
    203     def test_on_error(self):
    204         self.errorState = 0
    205         os.mkdir(TESTFN)
    206         self.addCleanup(shutil.rmtree, TESTFN)
    207 
    208         self.child_file_path = os.path.join(TESTFN, 'a')
    209         self.child_dir_path = os.path.join(TESTFN, 'b')
    210         support.create_empty_file(self.child_file_path)
    211         os.mkdir(self.child_dir_path)
    212         old_dir_mode = os.stat(TESTFN).st_mode
    213         old_child_file_mode = os.stat(self.child_file_path).st_mode
    214         old_child_dir_mode = os.stat(self.child_dir_path).st_mode
    215         # Make unwritable.
    216         new_mode = stat.S_IREAD|stat.S_IEXEC
    217         os.chmod(self.child_file_path, new_mode)
    218         os.chmod(self.child_dir_path, new_mode)
    219         os.chmod(TESTFN, new_mode)
    220 
    221         self.addCleanup(os.chmod, TESTFN, old_dir_mode)
    222         self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
    223         self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
    224 
    225         shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
    226         # Test whether onerror has actually been called.
    227         self.assertEqual(self.errorState, 3,
    228                          "Expected call to onerror function did not happen.")
    229 
    230     def check_args_to_onerror(self, func, arg, exc):
    231         # test_rmtree_errors deliberately runs rmtree
    232         # on a directory that is chmod 500, which will fail.
    233         # This function is run when shutil.rmtree fails.
    234         # 99.9% of the time it initially fails to remove
    235         # a file in the directory, so the first time through
    236         # func is os.remove.
    237         # However, some Linux machines running ZFS on
    238         # FUSE experienced a failure earlier in the process
    239         # at os.listdir.  The first failure may legally
    240         # be either.
    241         if self.errorState < 2:
    242             if func is os.unlink:
    243                 self.assertEqual(arg, self.child_file_path)
    244             elif func is os.rmdir:
    245                 self.assertEqual(arg, self.child_dir_path)
    246             else:
    247                 self.assertIs(func, os.listdir)
    248                 self.assertIn(arg, [TESTFN, self.child_dir_path])
    249             self.assertTrue(issubclass(exc[0], OSError))
    250             self.errorState += 1
    251         else:
    252             self.assertEqual(func, os.rmdir)
    253             self.assertEqual(arg, TESTFN)
    254             self.assertTrue(issubclass(exc[0], OSError))
    255             self.errorState = 3
    256 
    257     def test_rmtree_does_not_choke_on_failing_lstat(self):
    258         try:
    259             orig_lstat = os.lstat
    260             def raiser(fn, *args, **kwargs):
    261                 if fn != TESTFN:
    262                     raise OSError()
    263                 else:
    264                     return orig_lstat(fn)
    265             os.lstat = raiser
    266 
    267             os.mkdir(TESTFN)
    268             write_file((TESTFN, 'foo'), 'foo')
    269             shutil.rmtree(TESTFN)
    270         finally:
    271             os.lstat = orig_lstat
    272 
    273     @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
    274     @support.skip_unless_symlink
    275     def test_copymode_follow_symlinks(self):
    276         tmp_dir = self.mkdtemp()
    277         src = os.path.join(tmp_dir, 'foo')
    278         dst = os.path.join(tmp_dir, 'bar')
    279         src_link = os.path.join(tmp_dir, 'baz')
    280         dst_link = os.path.join(tmp_dir, 'quux')
    281         write_file(src, 'foo')
    282         write_file(dst, 'foo')
    283         os.symlink(src, src_link)
    284         os.symlink(dst, dst_link)
    285         os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
    286         # file to file
    287         os.chmod(dst, stat.S_IRWXO)
    288         self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
    289         shutil.copymode(src, dst)
    290         self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
    291         # On Windows, os.chmod does not follow symlinks (issue #15411)
    292         if os.name != 'nt':
    293             # follow src link
    294             os.chmod(dst, stat.S_IRWXO)
    295             shutil.copymode(src_link, dst)
    296             self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
    297             # follow dst link
    298             os.chmod(dst, stat.S_IRWXO)
    299             shutil.copymode(src, dst_link)
    300             self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
    301             # follow both links
    302             os.chmod(dst, stat.S_IRWXO)
    303             shutil.copymode(src_link, dst_link)
    304             self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
    305 
    306     @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
    307     @support.skip_unless_symlink
    308     def test_copymode_symlink_to_symlink(self):
    309         tmp_dir = self.mkdtemp()
    310         src = os.path.join(tmp_dir, 'foo')
    311         dst = os.path.join(tmp_dir, 'bar')
    312         src_link = os.path.join(tmp_dir, 'baz')
    313         dst_link = os.path.join(tmp_dir, 'quux')
    314         write_file(src, 'foo')
    315         write_file(dst, 'foo')
    316         os.symlink(src, src_link)
    317         os.symlink(dst, dst_link)
    318         os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
    319         os.chmod(dst, stat.S_IRWXU)
    320         os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
    321         # link to link
    322         os.lchmod(dst_link, stat.S_IRWXO)
    323         shutil.copymode(src_link, dst_link, follow_symlinks=False)
    324         self.assertEqual(os.lstat(src_link).st_mode,
    325                          os.lstat(dst_link).st_mode)
    326         self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
    327         # src link - use chmod
    328         os.lchmod(dst_link, stat.S_IRWXO)
    329         shutil.copymode(src_link, dst, follow_symlinks=False)
    330         self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
    331         # dst link - use chmod
    332         os.lchmod(dst_link, stat.S_IRWXO)
    333         shutil.copymode(src, dst_link, follow_symlinks=False)
    334         self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
    335 
    336     @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
    337     @support.skip_unless_symlink
    338     def test_copymode_symlink_to_symlink_wo_lchmod(self):
    339         tmp_dir = self.mkdtemp()
    340         src = os.path.join(tmp_dir, 'foo')
    341         dst = os.path.join(tmp_dir, 'bar')
    342         src_link = os.path.join(tmp_dir, 'baz')
    343         dst_link = os.path.join(tmp_dir, 'quux')
    344         write_file(src, 'foo')
    345         write_file(dst, 'foo')
    346         os.symlink(src, src_link)
    347         os.symlink(dst, dst_link)
    348         shutil.copymode(src_link, dst_link, follow_symlinks=False)  # silent fail
    349 
    350     @support.skip_unless_symlink
    351     def test_copystat_symlinks(self):
    352         tmp_dir = self.mkdtemp()
    353         src = os.path.join(tmp_dir, 'foo')
    354         dst = os.path.join(tmp_dir, 'bar')
    355         src_link = os.path.join(tmp_dir, 'baz')
    356         dst_link = os.path.join(tmp_dir, 'qux')
    357         write_file(src, 'foo')
    358         src_stat = os.stat(src)
    359         os.utime(src, (src_stat.st_atime,
    360                        src_stat.st_mtime - 42.0))  # ensure different mtimes
    361         write_file(dst, 'bar')
    362         self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
    363         os.symlink(src, src_link)
    364         os.symlink(dst, dst_link)
    365         if hasattr(os, 'lchmod'):
    366             os.lchmod(src_link, stat.S_IRWXO)
    367         if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
    368             os.lchflags(src_link, stat.UF_NODUMP)
    369         src_link_stat = os.lstat(src_link)
    370         # follow
    371         if hasattr(os, 'lchmod'):
    372             shutil.copystat(src_link, dst_link, follow_symlinks=True)
    373             self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
    374         # don't follow
    375         shutil.copystat(src_link, dst_link, follow_symlinks=False)
    376         dst_link_stat = os.lstat(dst_link)
    377         if os.utime in os.supports_follow_symlinks:
    378             for attr in 'st_atime', 'st_mtime':
    379                 # The modification times may be truncated in the new file.
    380                 self.assertLessEqual(getattr(src_link_stat, attr),
    381                                      getattr(dst_link_stat, attr) + 1)
    382         if hasattr(os, 'lchmod'):
    383             self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
    384         if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
    385             self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
    386         # tell to follow but dst is not a link
    387         shutil.copystat(src_link, dst, follow_symlinks=False)
    388         self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
    389                         00000.1)
    390 
    391     @unittest.skipUnless(hasattr(os, 'chflags') and
    392                          hasattr(errno, 'EOPNOTSUPP') and
    393                          hasattr(errno, 'ENOTSUP'),
    394                          "requires os.chflags, EOPNOTSUPP & ENOTSUP")
    395     def test_copystat_handles_harmless_chflags_errors(self):
    396         tmpdir = self.mkdtemp()
    397         file1 = os.path.join(tmpdir, 'file1')
    398         file2 = os.path.join(tmpdir, 'file2')
    399         write_file(file1, 'xxx')
    400         write_file(file2, 'xxx')
    401 
    402         def make_chflags_raiser(err):
    403             ex = OSError()
    404 
    405             def _chflags_raiser(path, flags, *, follow_symlinks=True):
    406                 ex.errno = err
    407                 raise ex
    408             return _chflags_raiser
    409         old_chflags = os.chflags
    410         try:
    411             for err in errno.EOPNOTSUPP, errno.ENOTSUP:
    412                 os.chflags = make_chflags_raiser(err)
    413                 shutil.copystat(file1, file2)
    414             # assert others errors break it
    415             os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
    416             self.assertRaises(OSError, shutil.copystat, file1, file2)
    417         finally:
    418             os.chflags = old_chflags
    419 
    420     @support.skip_unless_xattr
    421     def test_copyxattr(self):
    422         tmp_dir = self.mkdtemp()
    423         src = os.path.join(tmp_dir, 'foo')
    424         write_file(src, 'foo')
    425         dst = os.path.join(tmp_dir, 'bar')
    426         write_file(dst, 'bar')
    427 
    428         # no xattr == no problem
    429         shutil._copyxattr(src, dst)
    430         # common case
    431         os.setxattr(src, 'user.foo', b'42')
    432         os.setxattr(src, 'user.bar', b'43')
    433         shutil._copyxattr(src, dst)
    434         self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
    435         self.assertEqual(
    436                 os.getxattr(src, 'user.foo'),
    437                 os.getxattr(dst, 'user.foo'))
    438         # check errors don't affect other attrs
    439         os.remove(dst)
    440         write_file(dst, 'bar')
    441         os_error = OSError(errno.EPERM, 'EPERM')
    442 
    443         def _raise_on_user_foo(fname, attr, val, **kwargs):
    444             if attr == 'user.foo':
    445                 raise os_error
    446             else:
    447                 orig_setxattr(fname, attr, val, **kwargs)
    448         try:
    449             orig_setxattr = os.setxattr
    450             os.setxattr = _raise_on_user_foo
    451             shutil._copyxattr(src, dst)
    452             self.assertIn('user.bar', os.listxattr(dst))
    453         finally:
    454             os.setxattr = orig_setxattr
    455         # the source filesystem not supporting xattrs should be ok, too.
    456         def _raise_on_src(fname, *, follow_symlinks=True):
    457             if fname == src:
    458                 raise OSError(errno.ENOTSUP, 'Operation not supported')
    459             return orig_listxattr(fname, follow_symlinks=follow_symlinks)
    460         try:
    461             orig_listxattr = os.listxattr
    462             os.listxattr = _raise_on_src
    463             shutil._copyxattr(src, dst)
    464         finally:
    465             os.listxattr = orig_listxattr
    466 
    467         # test that shutil.copystat copies xattrs
    468         src = os.path.join(tmp_dir, 'the_original')
    469         write_file(src, src)
    470         os.setxattr(src, 'user.the_value', b'fiddly')
    471         dst = os.path.join(tmp_dir, 'the_copy')
    472         write_file(dst, dst)
    473         shutil.copystat(src, dst)
    474         self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
    475 
    476     @support.skip_unless_symlink
    477     @support.skip_unless_xattr
    478     @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
    479                          'root privileges required')
    480     def test_copyxattr_symlinks(self):
    481         # On Linux, it's only possible to access non-user xattr for symlinks;
    482         # which in turn require root privileges. This test should be expanded
    483         # as soon as other platforms gain support for extended attributes.
    484         tmp_dir = self.mkdtemp()
    485         src = os.path.join(tmp_dir, 'foo')
    486         src_link = os.path.join(tmp_dir, 'baz')
    487         write_file(src, 'foo')
    488         os.symlink(src, src_link)
    489         os.setxattr(src, 'trusted.foo', b'42')
    490         os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
    491         dst = os.path.join(tmp_dir, 'bar')
    492         dst_link = os.path.join(tmp_dir, 'qux')
    493         write_file(dst, 'bar')
    494         os.symlink(dst, dst_link)
    495         shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
    496         self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
    497         self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
    498         shutil._copyxattr(src_link, dst, follow_symlinks=False)
    499         self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
    500 
    501     @support.skip_unless_symlink
    502     def test_copy_symlinks(self):
    503         tmp_dir = self.mkdtemp()
    504         src = os.path.join(tmp_dir, 'foo')
    505         dst = os.path.join(tmp_dir, 'bar')
    506         src_link = os.path.join(tmp_dir, 'baz')
    507         write_file(src, 'foo')
    508         os.symlink(src, src_link)
    509         if hasattr(os, 'lchmod'):
    510             os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
    511         # don't follow
    512         shutil.copy(src_link, dst, follow_symlinks=True)
    513         self.assertFalse(os.path.islink(dst))
    514         self.assertEqual(read_file(src), read_file(dst))
    515         os.remove(dst)
    516         # follow
    517         shutil.copy(src_link, dst, follow_symlinks=False)
    518         self.assertTrue(os.path.islink(dst))
    519         self.assertEqual(os.readlink(dst), os.readlink(src_link))
    520         if hasattr(os, 'lchmod'):
    521             self.assertEqual(os.lstat(src_link).st_mode,
    522                              os.lstat(dst).st_mode)
    523 
    524     @support.skip_unless_symlink
    525     def test_copy2_symlinks(self):
    526         tmp_dir = self.mkdtemp()
    527         src = os.path.join(tmp_dir, 'foo')
    528         dst = os.path.join(tmp_dir, 'bar')
    529         src_link = os.path.join(tmp_dir, 'baz')
    530         write_file(src, 'foo')
    531         os.symlink(src, src_link)
    532         if hasattr(os, 'lchmod'):
    533             os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
    534         if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
    535             os.lchflags(src_link, stat.UF_NODUMP)
    536         src_stat = os.stat(src)
    537         src_link_stat = os.lstat(src_link)
    538         # follow
    539         shutil.copy2(src_link, dst, follow_symlinks=True)
    540         self.assertFalse(os.path.islink(dst))
    541         self.assertEqual(read_file(src), read_file(dst))
    542         os.remove(dst)
    543         # don't follow
    544         shutil.copy2(src_link, dst, follow_symlinks=False)
    545         self.assertTrue(os.path.islink(dst))
    546         self.assertEqual(os.readlink(dst), os.readlink(src_link))
    547         dst_stat = os.lstat(dst)
    548         if os.utime in os.supports_follow_symlinks:
    549             for attr in 'st_atime', 'st_mtime':
    550                 # The modification times may be truncated in the new file.
    551                 self.assertLessEqual(getattr(src_link_stat, attr),
    552                                      getattr(dst_stat, attr) + 1)
    553         if hasattr(os, 'lchmod'):
    554             self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
    555             self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
    556         if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
    557             self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
    558 
    559     @support.skip_unless_xattr
    560     def test_copy2_xattr(self):
    561         tmp_dir = self.mkdtemp()
    562         src = os.path.join(tmp_dir, 'foo')
    563         dst = os.path.join(tmp_dir, 'bar')
    564         write_file(src, 'foo')
    565         os.setxattr(src, 'user.foo', b'42')
    566         shutil.copy2(src, dst)
    567         self.assertEqual(
    568                 os.getxattr(src, 'user.foo'),
    569                 os.getxattr(dst, 'user.foo'))
    570         os.remove(dst)
    571 
    572     @support.skip_unless_symlink
    573     def test_copyfile_symlinks(self):
    574         tmp_dir = self.mkdtemp()
    575         src = os.path.join(tmp_dir, 'src')
    576         dst = os.path.join(tmp_dir, 'dst')
    577         dst_link = os.path.join(tmp_dir, 'dst_link')
    578         link = os.path.join(tmp_dir, 'link')
    579         write_file(src, 'foo')
    580         os.symlink(src, link)
    581         # don't follow
    582         shutil.copyfile(link, dst_link, follow_symlinks=False)
    583         self.assertTrue(os.path.islink(dst_link))
    584         self.assertEqual(os.readlink(link), os.readlink(dst_link))
    585         # follow
    586         shutil.copyfile(link, dst)
    587         self.assertFalse(os.path.islink(dst))
    588 
    589     def test_rmtree_uses_safe_fd_version_if_available(self):
    590         _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
    591                              os.supports_dir_fd and
    592                              os.listdir in os.supports_fd and
    593                              os.stat in os.supports_follow_symlinks)
    594         if _use_fd_functions:
    595             self.assertTrue(shutil._use_fd_functions)
    596             self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
    597             tmp_dir = self.mkdtemp()
    598             d = os.path.join(tmp_dir, 'a')
    599             os.mkdir(d)
    600             try:
    601                 real_rmtree = shutil._rmtree_safe_fd
    602                 class Called(Exception): pass
    603                 def _raiser(*args, **kwargs):
    604                     raise Called
    605                 shutil._rmtree_safe_fd = _raiser
    606                 self.assertRaises(Called, shutil.rmtree, d)
    607             finally:
    608                 shutil._rmtree_safe_fd = real_rmtree
    609         else:
    610             self.assertFalse(shutil._use_fd_functions)
    611             self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
    612 
    613     def test_rmtree_dont_delete_file(self):
    614         # When called on a file instead of a directory, don't delete it.
    615         handle, path = tempfile.mkstemp()
    616         os.close(handle)
    617         self.assertRaises(NotADirectoryError, shutil.rmtree, path)
    618         os.remove(path)
    619 
    620     def test_copytree_simple(self):
    621         src_dir = tempfile.mkdtemp()
    622         dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
    623         self.addCleanup(shutil.rmtree, src_dir)
    624         self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
    625         write_file((src_dir, 'test.txt'), '123')
    626         os.mkdir(os.path.join(src_dir, 'test_dir'))
    627         write_file((src_dir, 'test_dir', 'test.txt'), '456')
    628 
    629         shutil.copytree(src_dir, dst_dir)
    630         self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
    631         self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
    632         self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
    633                                                     'test.txt')))
    634         actual = read_file((dst_dir, 'test.txt'))
    635         self.assertEqual(actual, '123')
    636         actual = read_file((dst_dir, 'test_dir', 'test.txt'))
    637         self.assertEqual(actual, '456')
    638 
    639     @support.skip_unless_symlink
    640     def test_copytree_symlinks(self):
    641         tmp_dir = self.mkdtemp()
    642         src_dir = os.path.join(tmp_dir, 'src')
    643         dst_dir = os.path.join(tmp_dir, 'dst')
    644         sub_dir = os.path.join(src_dir, 'sub')
    645         os.mkdir(src_dir)
    646         os.mkdir(sub_dir)
    647         write_file((src_dir, 'file.txt'), 'foo')
    648         src_link = os.path.join(sub_dir, 'link')
    649         dst_link = os.path.join(dst_dir, 'sub/link')
    650         os.symlink(os.path.join(src_dir, 'file.txt'),
    651                    src_link)
    652         if hasattr(os, 'lchmod'):
    653             os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
    654         if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
    655             os.lchflags(src_link, stat.UF_NODUMP)
    656         src_stat = os.lstat(src_link)
    657         shutil.copytree(src_dir, dst_dir, symlinks=True)
    658         self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
    659         self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
    660                          os.path.join(src_dir, 'file.txt'))
    661         dst_stat = os.lstat(dst_link)
    662         if hasattr(os, 'lchmod'):
    663             self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
    664         if hasattr(os, 'lchflags'):
    665             self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
    666 
    667     def test_copytree_with_exclude(self):
    668         # creating data
    669         join = os.path.join
    670         exists = os.path.exists
    671         src_dir = tempfile.mkdtemp()
    672         try:
    673             dst_dir = join(tempfile.mkdtemp(), 'destination')
    674             write_file((src_dir, 'test.txt'), '123')
    675             write_file((src_dir, 'test.tmp'), '123')
    676             os.mkdir(join(src_dir, 'test_dir'))
    677             write_file((src_dir, 'test_dir', 'test.txt'), '456')
    678             os.mkdir(join(src_dir, 'test_dir2'))
    679             write_file((src_dir, 'test_dir2', 'test.txt'), '456')
    680             os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
    681             os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
    682             write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
    683             write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
    684 
    685             # testing glob-like patterns
    686             try:
    687                 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
    688                 shutil.copytree(src_dir, dst_dir, ignore=patterns)
    689                 # checking the result: some elements should not be copied
    690                 self.assertTrue(exists(join(dst_dir, 'test.txt')))
    691                 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
    692                 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
    693             finally:
    694                 shutil.rmtree(dst_dir)
    695             try:
    696                 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
    697                 shutil.copytree(src_dir, dst_dir, ignore=patterns)
    698                 # checking the result: some elements should not be copied
    699                 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
    700                 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
    701                 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
    702             finally:
    703                 shutil.rmtree(dst_dir)
    704 
    705             # testing callable-style
    706             try:
    707                 def _filter(src, names):
    708                     res = []
    709                     for name in names:
    710                         path = os.path.join(src, name)
    711 
    712                         if (os.path.isdir(path) and
    713                             path.split()[-1] == 'subdir'):
    714                             res.append(name)
    715                         elif os.path.splitext(path)[-1] in ('.py'):
    716                             res.append(name)
    717                     return res
    718 
    719                 shutil.copytree(src_dir, dst_dir, ignore=_filter)
    720 
    721                 # checking the result: some elements should not be copied
    722                 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
    723                                              'test.py')))
    724                 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
    725 
    726             finally:
    727                 shutil.rmtree(dst_dir)
    728         finally:
    729             shutil.rmtree(src_dir)
    730             shutil.rmtree(os.path.dirname(dst_dir))
    731 
    732     def test_copytree_retains_permissions(self):
    733         tmp_dir = tempfile.mkdtemp()
    734         src_dir = os.path.join(tmp_dir, 'source')
    735         os.mkdir(src_dir)
    736         dst_dir = os.path.join(tmp_dir, 'destination')
    737         self.addCleanup(shutil.rmtree, tmp_dir)
    738 
    739         os.chmod(src_dir, 0o777)
    740         write_file((src_dir, 'permissive.txt'), '123')
    741         os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
    742         write_file((src_dir, 'restrictive.txt'), '456')
    743         os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
    744         restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
    745         os.chmod(restrictive_subdir, 0o600)
    746 
    747         shutil.copytree(src_dir, dst_dir)
    748         self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
    749         self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
    750                           os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
    751         self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
    752                           os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
    753         restrictive_subdir_dst = os.path.join(dst_dir,
    754                                               os.path.split(restrictive_subdir)[1])
    755         self.assertEqual(os.stat(restrictive_subdir).st_mode,
    756                           os.stat(restrictive_subdir_dst).st_mode)
    757 
    758     @unittest.mock.patch('os.chmod')
    759     def test_copytree_winerror(self, mock_patch):
    760         # When copying to VFAT, copystat() raises OSError. On Windows, the
    761         # exception object has a meaningful 'winerror' attribute, but not
    762         # on other operating systems. Do not assume 'winerror' is set.
    763         src_dir = tempfile.mkdtemp()
    764         dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
    765         self.addCleanup(shutil.rmtree, src_dir)
    766         self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
    767 
    768         mock_patch.side_effect = PermissionError('ka-boom')
    769         with self.assertRaises(shutil.Error):
    770             shutil.copytree(src_dir, dst_dir)
    771 
    772     @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
    773     @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
    774     @unittest.skipIf(android_not_root, "hard links not allowed, non root user")
    775     def test_dont_copy_file_onto_link_to_itself(self):
    776         # bug 851123.
    777         os.mkdir(TESTFN)
    778         src = os.path.join(TESTFN, 'cheese')
    779         dst = os.path.join(TESTFN, 'shop')
    780         try:
    781             with open(src, 'w') as f:
    782                 f.write('cheddar')
    783             os.link(src, dst)
    784             self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
    785             with open(src, 'r') as f:
    786                 self.assertEqual(f.read(), 'cheddar')
    787             os.remove(dst)
    788         finally:
    789             shutil.rmtree(TESTFN, ignore_errors=True)
    790 
    791     @support.skip_unless_symlink
    792     def test_dont_copy_file_onto_symlink_to_itself(self):
    793         # bug 851123.
    794         os.mkdir(TESTFN)
    795         src = os.path.join(TESTFN, 'cheese')
    796         dst = os.path.join(TESTFN, 'shop')
    797         try:
    798             with open(src, 'w') as f:
    799                 f.write('cheddar')
    800             # Using `src` here would mean we end up with a symlink pointing
    801             # to TESTFN/TESTFN/cheese, while it should point at
    802             # TESTFN/cheese.
    803             os.symlink('cheese', dst)
    804             self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
    805             with open(src, 'r') as f:
    806                 self.assertEqual(f.read(), 'cheddar')
    807             os.remove(dst)
    808         finally:
    809             shutil.rmtree(TESTFN, ignore_errors=True)
    810 
    811     @support.skip_unless_symlink
    812     def test_rmtree_on_symlink(self):
    813         # bug 1669.
    814         os.mkdir(TESTFN)
    815         try:
    816             src = os.path.join(TESTFN, 'cheese')
    817             dst = os.path.join(TESTFN, 'shop')
    818             os.mkdir(src)
    819             os.symlink(src, dst)
    820             self.assertRaises(OSError, shutil.rmtree, dst)
    821             shutil.rmtree(dst, ignore_errors=True)
    822         finally:
    823             shutil.rmtree(TESTFN, ignore_errors=True)
    824 
    825     # Issue #3002: copyfile and copytree block indefinitely on named pipes
    826     @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
    827     @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
    828     def test_copyfile_named_pipe(self):
    829         os.mkfifo(TESTFN)
    830         try:
    831             self.assertRaises(shutil.SpecialFileError,
    832                                 shutil.copyfile, TESTFN, TESTFN2)
    833             self.assertRaises(shutil.SpecialFileError,
    834                                 shutil.copyfile, __file__, TESTFN)
    835         finally:
    836             os.remove(TESTFN)
    837 
    838     @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
    839     @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
    840     @support.skip_unless_symlink
    841     def test_copytree_named_pipe(self):
    842         os.mkdir(TESTFN)
    843         try:
    844             subdir = os.path.join(TESTFN, "subdir")
    845             os.mkdir(subdir)
    846             pipe = os.path.join(subdir, "mypipe")
    847             os.mkfifo(pipe)
    848             try:
    849                 shutil.copytree(TESTFN, TESTFN2)
    850             except shutil.Error as e:
    851                 errors = e.args[0]
    852                 self.assertEqual(len(errors), 1)
    853                 src, dst, error_msg = errors[0]
    854                 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
    855             else:
    856                 self.fail("shutil.Error should have been raised")
    857         finally:
    858             shutil.rmtree(TESTFN, ignore_errors=True)
    859             shutil.rmtree(TESTFN2, ignore_errors=True)
    860 
    861     def test_copytree_special_func(self):
    862 
    863         src_dir = self.mkdtemp()
    864         dst_dir = os.path.join(self.mkdtemp(), 'destination')
    865         write_file((src_dir, 'test.txt'), '123')
    866         os.mkdir(os.path.join(src_dir, 'test_dir'))
    867         write_file((src_dir, 'test_dir', 'test.txt'), '456')
    868 
    869         copied = []
    870         def _copy(src, dst):
    871             copied.append((src, dst))
    872 
    873         shutil.copytree(src_dir, dst_dir, copy_function=_copy)
    874         self.assertEqual(len(copied), 2)
    875 
    876     @support.skip_unless_symlink
    877     def test_copytree_dangling_symlinks(self):
    878 
    879         # a dangling symlink raises an error at the end
    880         src_dir = self.mkdtemp()
    881         dst_dir = os.path.join(self.mkdtemp(), 'destination')
    882         os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
    883         os.mkdir(os.path.join(src_dir, 'test_dir'))
    884         write_file((src_dir, 'test_dir', 'test.txt'), '456')
    885         self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
    886 
    887         # a dangling symlink is ignored with the proper flag
    888         dst_dir = os.path.join(self.mkdtemp(), 'destination2')
    889         shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
    890         self.assertNotIn('test.txt', os.listdir(dst_dir))
    891 
    892         # a dangling symlink is copied if symlinks=True
    893         dst_dir = os.path.join(self.mkdtemp(), 'destination3')
    894         shutil.copytree(src_dir, dst_dir, symlinks=True)
    895         self.assertIn('test.txt', os.listdir(dst_dir))
    896 
    897     @support.skip_unless_symlink
    898     def test_copytree_symlink_dir(self):
    899         src_dir = self.mkdtemp()
    900         dst_dir = os.path.join(self.mkdtemp(), 'destination')
    901         os.mkdir(os.path.join(src_dir, 'real_dir'))
    902         with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
    903             pass
    904         os.symlink(os.path.join(src_dir, 'real_dir'),
    905                    os.path.join(src_dir, 'link_to_dir'),
    906                    target_is_directory=True)
    907 
    908         shutil.copytree(src_dir, dst_dir, symlinks=False)
    909         self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
    910         self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
    911 
    912         dst_dir = os.path.join(self.mkdtemp(), 'destination2')
    913         shutil.copytree(src_dir, dst_dir, symlinks=True)
    914         self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
    915         self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
    916 
    917     def _copy_file(self, method):
    918         fname = 'test.txt'
    919         tmpdir = self.mkdtemp()
    920         write_file((tmpdir, fname), 'xxx')
    921         file1 = os.path.join(tmpdir, fname)
    922         tmpdir2 = self.mkdtemp()
    923         method(file1, tmpdir2)
    924         file2 = os.path.join(tmpdir2, fname)
    925         return (file1, file2)
    926 
    927     @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
    928     def test_copy(self):
    929         # Ensure that the copied file exists and has the same mode bits.
    930         file1, file2 = self._copy_file(shutil.copy)
    931         self.assertTrue(os.path.exists(file2))
    932         self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
    933 
    934     @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
    935     @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
    936     def test_copy2(self):
    937         # Ensure that the copied file exists and has the same mode and
    938         # modification time bits.
    939         file1, file2 = self._copy_file(shutil.copy2)
    940         self.assertTrue(os.path.exists(file2))
    941         file1_stat = os.stat(file1)
    942         file2_stat = os.stat(file2)
    943         self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
    944         for attr in 'st_atime', 'st_mtime':
    945             # The modification times may be truncated in the new file.
    946             self.assertLessEqual(getattr(file1_stat, attr),
    947                                  getattr(file2_stat, attr) + 1)
    948         if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
    949             self.assertEqual(getattr(file1_stat, 'st_flags'),
    950                              getattr(file2_stat, 'st_flags'))
    951 
    952     @support.requires_zlib
    953     def test_make_tarball(self):
    954         # creating something to tar
    955         root_dir, base_dir = self._create_files('')
    956 
    957         tmpdir2 = self.mkdtemp()
    958         # force shutil to create the directory
    959         os.rmdir(tmpdir2)
    960         # working with relative paths
    961         work_dir = os.path.dirname(tmpdir2)
    962         rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
    963 
    964         with support.change_cwd(work_dir):
    965             base_name = os.path.abspath(rel_base_name)
    966             tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
    967 
    968         # check if the compressed tarball was created
    969         self.assertEqual(tarball, base_name + '.tar.gz')
    970         self.assertTrue(os.path.isfile(tarball))
    971         self.assertTrue(tarfile.is_tarfile(tarball))
    972         with tarfile.open(tarball, 'r:gz') as tf:
    973             self.assertCountEqual(tf.getnames(),
    974                                   ['.', './sub', './sub2',
    975                                    './file1', './file2', './sub/file3'])
    976 
    977         # trying an uncompressed one
    978         with support.change_cwd(work_dir):
    979             tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
    980         self.assertEqual(tarball, base_name + '.tar')
    981         self.assertTrue(os.path.isfile(tarball))
    982         self.assertTrue(tarfile.is_tarfile(tarball))
    983         with tarfile.open(tarball, 'r') as tf:
    984             self.assertCountEqual(tf.getnames(),
    985                                   ['.', './sub', './sub2',
    986                                   './file1', './file2', './sub/file3'])
    987 
    988     def _tarinfo(self, path):
    989         with tarfile.open(path) as tar:
    990             names = tar.getnames()
    991             names.sort()
    992             return tuple(names)
    993 
    994     def _create_files(self, base_dir='dist'):
    995         # creating something to tar
    996         root_dir = self.mkdtemp()
    997         dist = os.path.join(root_dir, base_dir)
    998         os.makedirs(dist, exist_ok=True)
    999         write_file((dist, 'file1'), 'xxx')
   1000         write_file((dist, 'file2'), 'xxx')
   1001         os.mkdir(os.path.join(dist, 'sub'))
   1002         write_file((dist, 'sub', 'file3'), 'xxx')
   1003         os.mkdir(os.path.join(dist, 'sub2'))
   1004         if base_dir:
   1005             write_file((root_dir, 'outer'), 'xxx')
   1006         return root_dir, base_dir
   1007 
   1008     @support.requires_zlib
   1009     @unittest.skipUnless(shutil.which('tar'),
   1010                          'Need the tar command to run')
   1011     def test_tarfile_vs_tar(self):
   1012         root_dir, base_dir = self._create_files()
   1013         base_name = os.path.join(self.mkdtemp(), 'archive')
   1014         tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
   1015 
   1016         # check if the compressed tarball was created
   1017         self.assertEqual(tarball, base_name + '.tar.gz')
   1018         self.assertTrue(os.path.isfile(tarball))
   1019 
   1020         # now create another tarball using `tar`
   1021         tarball2 = os.path.join(root_dir, 'archive2.tar')
   1022         tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
   1023         subprocess.check_call(tar_cmd, cwd=root_dir,
   1024                               stdout=subprocess.DEVNULL)
   1025 
   1026         self.assertTrue(os.path.isfile(tarball2))
   1027         # let's compare both tarballs
   1028         self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
   1029 
   1030         # trying an uncompressed one
   1031         tarball = make_archive(base_name, 'tar', root_dir, base_dir)
   1032         self.assertEqual(tarball, base_name + '.tar')
   1033         self.assertTrue(os.path.isfile(tarball))
   1034 
   1035         # now for a dry_run
   1036         tarball = make_archive(base_name, 'tar', root_dir, base_dir,
   1037                                dry_run=True)
   1038         self.assertEqual(tarball, base_name + '.tar')
   1039         self.assertTrue(os.path.isfile(tarball))
   1040 
   1041     @support.requires_zlib
   1042     def test_make_zipfile(self):
   1043         # creating something to zip
   1044         root_dir, base_dir = self._create_files()
   1045 
   1046         tmpdir2 = self.mkdtemp()
   1047         # force shutil to create the directory
   1048         os.rmdir(tmpdir2)
   1049         # working with relative paths
   1050         work_dir = os.path.dirname(tmpdir2)
   1051         rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
   1052 
   1053         with support.change_cwd(work_dir):
   1054             base_name = os.path.abspath(rel_base_name)
   1055             res = make_archive(rel_base_name, 'zip', root_dir)
   1056 
   1057         self.assertEqual(res, base_name + '.zip')
   1058         self.assertTrue(os.path.isfile(res))
   1059         self.assertTrue(zipfile.is_zipfile(res))
   1060         with zipfile.ZipFile(res) as zf:
   1061             self.assertCountEqual(zf.namelist(),
   1062                     ['dist/', 'dist/sub/', 'dist/sub2/',
   1063                      'dist/file1', 'dist/file2', 'dist/sub/file3',
   1064                      'outer'])
   1065 
   1066         with support.change_cwd(work_dir):
   1067             base_name = os.path.abspath(rel_base_name)
   1068             res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
   1069 
   1070         self.assertEqual(res, base_name + '.zip')
   1071         self.assertTrue(os.path.isfile(res))
   1072         self.assertTrue(zipfile.is_zipfile(res))
   1073         with zipfile.ZipFile(res) as zf:
   1074             self.assertCountEqual(zf.namelist(),
   1075                     ['dist/', 'dist/sub/', 'dist/sub2/',
   1076                      'dist/file1', 'dist/file2', 'dist/sub/file3'])
   1077 
   1078     @support.requires_zlib
   1079     @unittest.skipUnless(shutil.which('zip'),
   1080                          'Need the zip command to run')
   1081     def test_zipfile_vs_zip(self):
   1082         root_dir, base_dir = self._create_files()
   1083         base_name = os.path.join(self.mkdtemp(), 'archive')
   1084         archive = make_archive(base_name, 'zip', root_dir, base_dir)
   1085 
   1086         # check if ZIP file  was created
   1087         self.assertEqual(archive, base_name + '.zip')
   1088         self.assertTrue(os.path.isfile(archive))
   1089 
   1090         # now create another ZIP file using `zip`
   1091         archive2 = os.path.join(root_dir, 'archive2.zip')
   1092         zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
   1093         subprocess.check_call(zip_cmd, cwd=root_dir,
   1094                               stdout=subprocess.DEVNULL)
   1095 
   1096         self.assertTrue(os.path.isfile(archive2))
   1097         # let's compare both ZIP files
   1098         with zipfile.ZipFile(archive) as zf:
   1099             names = zf.namelist()
   1100         with zipfile.ZipFile(archive2) as zf:
   1101             names2 = zf.namelist()
   1102         self.assertEqual(sorted(names), sorted(names2))
   1103 
   1104     @support.requires_zlib
   1105     @unittest.skipUnless(shutil.which('unzip'),
   1106                          'Need the unzip command to run')
   1107     def test_unzip_zipfile(self):
   1108         root_dir, base_dir = self._create_files()
   1109         base_name = os.path.join(self.mkdtemp(), 'archive')
   1110         archive = make_archive(base_name, 'zip', root_dir, base_dir)
   1111 
   1112         # check if ZIP file  was created
   1113         self.assertEqual(archive, base_name + '.zip')
   1114         self.assertTrue(os.path.isfile(archive))
   1115 
   1116         # now check the ZIP file using `unzip -t`
   1117         zip_cmd = ['unzip', '-t', archive]
   1118         with support.change_cwd(root_dir):
   1119             try:
   1120                 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
   1121             except subprocess.CalledProcessError as exc:
   1122                 details = exc.output.decode(errors="replace")
   1123                 msg = "{}\n\n**Unzip Output**\n{}"
   1124                 self.fail(msg.format(exc, details))
   1125 
   1126     def test_make_archive(self):
   1127         tmpdir = self.mkdtemp()
   1128         base_name = os.path.join(tmpdir, 'archive')
   1129         self.assertRaises(ValueError, make_archive, base_name, 'xxx')
   1130 
   1131     @support.requires_zlib
   1132     def test_make_archive_owner_group(self):
   1133         # testing make_archive with owner and group, with various combinations
   1134         # this works even if there's not gid/uid support
   1135         if UID_GID_SUPPORT:
   1136             group = grp.getgrgid(0)[0]
   1137             owner = pwd.getpwuid(0)[0]
   1138         else:
   1139             group = owner = 'root'
   1140 
   1141         root_dir, base_dir = self._create_files()
   1142         base_name = os.path.join(self.mkdtemp(), 'archive')
   1143         res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
   1144                            group=group)
   1145         self.assertTrue(os.path.isfile(res))
   1146 
   1147         res = make_archive(base_name, 'zip', root_dir, base_dir)
   1148         self.assertTrue(os.path.isfile(res))
   1149 
   1150         res = make_archive(base_name, 'tar', root_dir, base_dir,
   1151                            owner=owner, group=group)
   1152         self.assertTrue(os.path.isfile(res))
   1153 
   1154         res = make_archive(base_name, 'tar', root_dir, base_dir,
   1155                            owner='kjhkjhkjg', group='oihohoh')
   1156         self.assertTrue(os.path.isfile(res))
   1157 
   1158 
   1159     @support.requires_zlib
   1160     @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
   1161     def test_tarfile_root_owner(self):
   1162         root_dir, base_dir = self._create_files()
   1163         base_name = os.path.join(self.mkdtemp(), 'archive')
   1164         group = grp.getgrgid(0)[0]
   1165         owner = pwd.getpwuid(0)[0]
   1166         with support.change_cwd(root_dir):
   1167             archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
   1168                                         owner=owner, group=group)
   1169 
   1170         # check if the compressed tarball was created
   1171         self.assertTrue(os.path.isfile(archive_name))
   1172 
   1173         # now checks the rights
   1174         archive = tarfile.open(archive_name)
   1175         try:
   1176             for member in archive.getmembers():
   1177                 self.assertEqual(member.uid, 0)
   1178                 self.assertEqual(member.gid, 0)
   1179         finally:
   1180             archive.close()
   1181 
   1182     def test_make_archive_cwd(self):
   1183         current_dir = os.getcwd()
   1184         def _breaks(*args, **kw):
   1185             raise RuntimeError()
   1186 
   1187         register_archive_format('xxx', _breaks, [], 'xxx file')
   1188         try:
   1189             try:
   1190                 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
   1191             except Exception:
   1192                 pass
   1193             self.assertEqual(os.getcwd(), current_dir)
   1194         finally:
   1195             unregister_archive_format('xxx')
   1196 
   1197     def test_make_tarfile_in_curdir(self):
   1198         # Issue #21280
   1199         root_dir = self.mkdtemp()
   1200         with support.change_cwd(root_dir):
   1201             self.assertEqual(make_archive('test', 'tar'), 'test.tar')
   1202             self.assertTrue(os.path.isfile('test.tar'))
   1203 
   1204     @support.requires_zlib
   1205     def test_make_zipfile_in_curdir(self):
   1206         # Issue #21280
   1207         root_dir = self.mkdtemp()
   1208         with support.change_cwd(root_dir):
   1209             self.assertEqual(make_archive('test', 'zip'), 'test.zip')
   1210             self.assertTrue(os.path.isfile('test.zip'))
   1211 
   1212     def test_register_archive_format(self):
   1213 
   1214         self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
   1215         self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
   1216                           1)
   1217         self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
   1218                           [(1, 2), (1, 2, 3)])
   1219 
   1220         register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
   1221         formats = [name for name, params in get_archive_formats()]
   1222         self.assertIn('xxx', formats)
   1223 
   1224         unregister_archive_format('xxx')
   1225         formats = [name for name, params in get_archive_formats()]
   1226         self.assertNotIn('xxx', formats)
   1227 
   1228     def check_unpack_archive(self, format):
   1229         root_dir, base_dir = self._create_files()
   1230         expected = rlistdir(root_dir)
   1231         expected.remove('outer')
   1232 
   1233         base_name = os.path.join(self.mkdtemp(), 'archive')
   1234         filename = make_archive(base_name, format, root_dir, base_dir)
   1235 
   1236         # let's try to unpack it now
   1237         tmpdir2 = self.mkdtemp()
   1238         unpack_archive(filename, tmpdir2)
   1239         self.assertEqual(rlistdir(tmpdir2), expected)
   1240 
   1241         # and again, this time with the format specified
   1242         tmpdir3 = self.mkdtemp()
   1243         unpack_archive(filename, tmpdir3, format=format)
   1244         self.assertEqual(rlistdir(tmpdir3), expected)
   1245 
   1246         self.assertRaises(shutil.ReadError, unpack_archive, TESTFN)
   1247         self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
   1248 
   1249     def test_unpack_archive_tar(self):
   1250         self.check_unpack_archive('tar')
   1251 
   1252     @support.requires_zlib
   1253     def test_unpack_archive_gztar(self):
   1254         self.check_unpack_archive('gztar')
   1255 
   1256     @support.requires_bz2
   1257     def test_unpack_archive_bztar(self):
   1258         self.check_unpack_archive('bztar')
   1259 
   1260     @support.requires_lzma
   1261     def test_unpack_archive_xztar(self):
   1262         self.check_unpack_archive('xztar')
   1263 
   1264     @support.requires_zlib
   1265     def test_unpack_archive_zip(self):
   1266         self.check_unpack_archive('zip')
   1267 
   1268     def test_unpack_registry(self):
   1269 
   1270         formats = get_unpack_formats()
   1271 
   1272         def _boo(filename, extract_dir, extra):
   1273             self.assertEqual(extra, 1)
   1274             self.assertEqual(filename, 'stuff.boo')
   1275             self.assertEqual(extract_dir, 'xx')
   1276 
   1277         register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
   1278         unpack_archive('stuff.boo', 'xx')
   1279 
   1280         # trying to register a .boo unpacker again
   1281         self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
   1282                           ['.boo'], _boo)
   1283 
   1284         # should work now
   1285         unregister_unpack_format('Boo')
   1286         register_unpack_format('Boo2', ['.boo'], _boo)
   1287         self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
   1288         self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
   1289 
   1290         # let's leave a clean state
   1291         unregister_unpack_format('Boo2')
   1292         self.assertEqual(get_unpack_formats(), formats)
   1293 
   1294     @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
   1295                          "disk_usage not available on this platform")
   1296     def test_disk_usage(self):
   1297         usage = shutil.disk_usage(os.getcwd())
   1298         self.assertGreater(usage.total, 0)
   1299         self.assertGreater(usage.used, 0)
   1300         self.assertGreaterEqual(usage.free, 0)
   1301         self.assertGreaterEqual(usage.total, usage.used)
   1302         self.assertGreater(usage.total, usage.free)
   1303 
   1304     @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
   1305     @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
   1306     def test_chown(self):
   1307 
   1308         # cleaned-up automatically by TestShutil.tearDown method
   1309         dirname = self.mkdtemp()
   1310         filename = tempfile.mktemp(dir=dirname)
   1311         write_file(filename, 'testing chown function')
   1312 
   1313         with self.assertRaises(ValueError):
   1314             shutil.chown(filename)
   1315 
   1316         with self.assertRaises(LookupError):
   1317             shutil.chown(filename, user='non-existing username')
   1318 
   1319         with self.assertRaises(LookupError):
   1320             shutil.chown(filename, group='non-existing groupname')
   1321 
   1322         with self.assertRaises(TypeError):
   1323             shutil.chown(filename, b'spam')
   1324 
   1325         with self.assertRaises(TypeError):
   1326             shutil.chown(filename, 3.14)
   1327 
   1328         uid = os.getuid()
   1329         gid = os.getgid()
   1330 
   1331         def check_chown(path, uid=None, gid=None):
   1332             s = os.stat(filename)
   1333             if uid is not None:
   1334                 self.assertEqual(uid, s.st_uid)
   1335             if gid is not None:
   1336                 self.assertEqual(gid, s.st_gid)
   1337 
   1338         shutil.chown(filename, uid, gid)
   1339         check_chown(filename, uid, gid)
   1340         shutil.chown(filename, uid)
   1341         check_chown(filename, uid)
   1342         shutil.chown(filename, user=uid)
   1343         check_chown(filename, uid)
   1344         shutil.chown(filename, group=gid)
   1345         check_chown(filename, gid=gid)
   1346 
   1347         shutil.chown(dirname, uid, gid)
   1348         check_chown(dirname, uid, gid)
   1349         shutil.chown(dirname, uid)
   1350         check_chown(dirname, uid)
   1351         shutil.chown(dirname, user=uid)
   1352         check_chown(dirname, uid)
   1353         shutil.chown(dirname, group=gid)
   1354         check_chown(dirname, gid=gid)
   1355 
   1356         user = pwd.getpwuid(uid)[0]
   1357         group = grp.getgrgid(gid)[0]
   1358         shutil.chown(filename, user, group)
   1359         check_chown(filename, uid, gid)
   1360         shutil.chown(dirname, user, group)
   1361         check_chown(dirname, uid, gid)
   1362 
   1363     def test_copy_return_value(self):
   1364         # copy and copy2 both return their destination path.
   1365         for fn in (shutil.copy, shutil.copy2):
   1366             src_dir = self.mkdtemp()
   1367             dst_dir = self.mkdtemp()
   1368             src = os.path.join(src_dir, 'foo')
   1369             write_file(src, 'foo')
   1370             rv = fn(src, dst_dir)
   1371             self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
   1372             rv = fn(src, os.path.join(dst_dir, 'bar'))
   1373             self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
   1374 
   1375     def test_copyfile_return_value(self):
   1376         # copytree returns its destination path.
   1377         src_dir = self.mkdtemp()
   1378         dst_dir = self.mkdtemp()
   1379         dst_file = os.path.join(dst_dir, 'bar')
   1380         src_file = os.path.join(src_dir, 'foo')
   1381         write_file(src_file, 'foo')
   1382         rv = shutil.copyfile(src_file, dst_file)
   1383         self.assertTrue(os.path.exists(rv))
   1384         self.assertEqual(read_file(src_file), read_file(dst_file))
   1385 
   1386     def test_copyfile_same_file(self):
   1387         # copyfile() should raise SameFileError if the source and destination
   1388         # are the same.
   1389         src_dir = self.mkdtemp()
   1390         src_file = os.path.join(src_dir, 'foo')
   1391         write_file(src_file, 'foo')
   1392         self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
   1393         # But Error should work too, to stay backward compatible.
   1394         self.assertRaises(Error, shutil.copyfile, src_file, src_file)
   1395 
   1396     def test_copytree_return_value(self):
   1397         # copytree returns its destination path.
   1398         src_dir = self.mkdtemp()
   1399         dst_dir = src_dir + "dest"
   1400         self.addCleanup(shutil.rmtree, dst_dir, True)
   1401         src = os.path.join(src_dir, 'foo')
   1402         write_file(src, 'foo')
   1403         rv = shutil.copytree(src_dir, dst_dir)
   1404         self.assertEqual(['foo'], os.listdir(rv))
   1405 
   1406 
   1407 class TestWhich(unittest.TestCase):
   1408 
   1409     def setUp(self):
   1410         self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
   1411         self.addCleanup(shutil.rmtree, self.temp_dir, True)
   1412         # Give the temp_file an ".exe" suffix for all.
   1413         # It's needed on Windows and not harmful on other platforms.
   1414         self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
   1415                                                      prefix="Tmp",
   1416                                                      suffix=".Exe")
   1417         os.chmod(self.temp_file.name, stat.S_IXUSR)
   1418         self.addCleanup(self.temp_file.close)
   1419         self.dir, self.file = os.path.split(self.temp_file.name)
   1420 
   1421     def test_basic(self):
   1422         # Given an EXE in a directory, it should be returned.
   1423         rv = shutil.which(self.file, path=self.dir)
   1424         self.assertEqual(rv, self.temp_file.name)
   1425 
   1426     def test_absolute_cmd(self):
   1427         # When given the fully qualified path to an executable that exists,
   1428         # it should be returned.
   1429         rv = shutil.which(self.temp_file.name, path=self.temp_dir)
   1430         self.assertEqual(rv, self.temp_file.name)
   1431 
   1432     def test_relative_cmd(self):
   1433         # When given the relative path with a directory part to an executable
   1434         # that exists, it should be returned.
   1435         base_dir, tail_dir = os.path.split(self.dir)
   1436         relpath = os.path.join(tail_dir, self.file)
   1437         with support.change_cwd(path=base_dir):
   1438             rv = shutil.which(relpath, path=self.temp_dir)
   1439             self.assertEqual(rv, relpath)
   1440         # But it shouldn't be searched in PATH directories (issue #16957).
   1441         with support.change_cwd(path=self.dir):
   1442             rv = shutil.which(relpath, path=base_dir)
   1443             self.assertIsNone(rv)
   1444 
   1445     def test_cwd(self):
   1446         # Issue #16957
   1447         base_dir = os.path.dirname(self.dir)
   1448         with support.change_cwd(path=self.dir):
   1449             rv = shutil.which(self.file, path=base_dir)
   1450             if sys.platform == "win32":
   1451                 # Windows: current directory implicitly on PATH
   1452                 self.assertEqual(rv, os.path.join(os.curdir, self.file))
   1453             else:
   1454                 # Other platforms: shouldn't match in the current directory.
   1455                 self.assertIsNone(rv)
   1456 
   1457     @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
   1458                      'non-root user required')
   1459     def test_non_matching_mode(self):
   1460         # Set the file read-only and ask for writeable files.
   1461         os.chmod(self.temp_file.name, stat.S_IREAD)
   1462         if os.access(self.temp_file.name, os.W_OK):
   1463             self.skipTest("can't set the file read-only")
   1464         rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
   1465         self.assertIsNone(rv)
   1466 
   1467     def test_relative_path(self):
   1468         base_dir, tail_dir = os.path.split(self.dir)
   1469         with support.change_cwd(path=base_dir):
   1470             rv = shutil.which(self.file, path=tail_dir)
   1471             self.assertEqual(rv, os.path.join(tail_dir, self.file))
   1472 
   1473     def test_nonexistent_file(self):
   1474         # Return None when no matching executable file is found on the path.
   1475         rv = shutil.which("foo.exe", path=self.dir)
   1476         self.assertIsNone(rv)
   1477 
   1478     @unittest.skipUnless(sys.platform == "win32",
   1479                          "pathext check is Windows-only")
   1480     def test_pathext_checking(self):
   1481         # Ask for the file without the ".exe" extension, then ensure that
   1482         # it gets found properly with the extension.
   1483         rv = shutil.which(self.file[:-4], path=self.dir)
   1484         self.assertEqual(rv, self.temp_file.name[:-4] + ".EXE")
   1485 
   1486     def test_environ_path(self):
   1487         with support.EnvironmentVarGuard() as env:
   1488             env['PATH'] = self.dir
   1489             rv = shutil.which(self.file)
   1490             self.assertEqual(rv, self.temp_file.name)
   1491 
   1492     def test_empty_path(self):
   1493         base_dir = os.path.dirname(self.dir)
   1494         with support.change_cwd(path=self.dir), \
   1495              support.EnvironmentVarGuard() as env:
   1496             env['PATH'] = self.dir
   1497             rv = shutil.which(self.file, path='')
   1498             self.assertIsNone(rv)
   1499 
   1500     def test_empty_path_no_PATH(self):
   1501         with support.EnvironmentVarGuard() as env:
   1502             env.pop('PATH', None)
   1503             rv = shutil.which(self.file)
   1504             self.assertIsNone(rv)
   1505 
   1506 
   1507 class TestMove(unittest.TestCase):
   1508 
   1509     def setUp(self):
   1510         filename = "foo"
   1511         self.src_dir = tempfile.mkdtemp()
   1512         self.dst_dir = tempfile.mkdtemp()
   1513         self.src_file = os.path.join(self.src_dir, filename)
   1514         self.dst_file = os.path.join(self.dst_dir, filename)
   1515         with open(self.src_file, "wb") as f:
   1516             f.write(b"spam")
   1517 
   1518     def tearDown(self):
   1519         for d in (self.src_dir, self.dst_dir):
   1520             try:
   1521                 if d:
   1522                     shutil.rmtree(d)
   1523             except:
   1524                 pass
   1525 
   1526     def _check_move_file(self, src, dst, real_dst):
   1527         with open(src, "rb") as f:
   1528             contents = f.read()
   1529         shutil.move(src, dst)
   1530         with open(real_dst, "rb") as f:
   1531             self.assertEqual(contents, f.read())
   1532         self.assertFalse(os.path.exists(src))
   1533 
   1534     def _check_move_dir(self, src, dst, real_dst):
   1535         contents = sorted(os.listdir(src))
   1536         shutil.move(src, dst)
   1537         self.assertEqual(contents, sorted(os.listdir(real_dst)))
   1538         self.assertFalse(os.path.exists(src))
   1539 
   1540     def test_move_file(self):
   1541         # Move a file to another location on the same filesystem.
   1542         self._check_move_file(self.src_file, self.dst_file, self.dst_file)
   1543 
   1544     def test_move_file_to_dir(self):
   1545         # Move a file inside an existing dir on the same filesystem.
   1546         self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
   1547 
   1548     @mock_rename
   1549     def test_move_file_other_fs(self):
   1550         # Move a file to an existing dir on another filesystem.
   1551         self.test_move_file()
   1552 
   1553     @mock_rename
   1554     def test_move_file_to_dir_other_fs(self):
   1555         # Move a file to another location on another filesystem.
   1556         self.test_move_file_to_dir()
   1557 
   1558     def test_move_dir(self):
   1559         # Move a dir to another location on the same filesystem.
   1560         dst_dir = tempfile.mktemp()
   1561         try:
   1562             self._check_move_dir(self.src_dir, dst_dir, dst_dir)
   1563         finally:
   1564             try:
   1565                 shutil.rmtree(dst_dir)
   1566             except:
   1567                 pass
   1568 
   1569     @mock_rename
   1570     def test_move_dir_other_fs(self):
   1571         # Move a dir to another location on another filesystem.
   1572         self.test_move_dir()
   1573 
   1574     def test_move_dir_to_dir(self):
   1575         # Move a dir inside an existing dir on the same filesystem.
   1576         self._check_move_dir(self.src_dir, self.dst_dir,
   1577             os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
   1578 
   1579     @mock_rename
   1580     def test_move_dir_to_dir_other_fs(self):
   1581         # Move a dir inside an existing dir on another filesystem.
   1582         self.test_move_dir_to_dir()
   1583 
   1584     def test_move_dir_sep_to_dir(self):
   1585         self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
   1586             os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
   1587 
   1588     @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
   1589     def test_move_dir_altsep_to_dir(self):
   1590         self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
   1591             os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
   1592 
   1593     def test_existing_file_inside_dest_dir(self):
   1594         # A file with the same name inside the destination dir already exists.
   1595         with open(self.dst_file, "wb"):
   1596             pass
   1597         self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
   1598 
   1599     def test_dont_move_dir_in_itself(self):
   1600         # Moving a dir inside itself raises an Error.
   1601         dst = os.path.join(self.src_dir, "bar")
   1602         self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
   1603 
   1604     def test_destinsrc_false_negative(self):
   1605         os.mkdir(TESTFN)
   1606         try:
   1607             for src, dst in [('srcdir', 'srcdir/dest')]:
   1608                 src = os.path.join(TESTFN, src)
   1609                 dst = os.path.join(TESTFN, dst)
   1610                 self.assertTrue(shutil._destinsrc(src, dst),
   1611                              msg='_destinsrc() wrongly concluded that '
   1612                              'dst (%s) is not in src (%s)' % (dst, src))
   1613         finally:
   1614             shutil.rmtree(TESTFN, ignore_errors=True)
   1615 
   1616     def test_destinsrc_false_positive(self):
   1617         os.mkdir(TESTFN)
   1618         try:
   1619             for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
   1620                 src = os.path.join(TESTFN, src)
   1621                 dst = os.path.join(TESTFN, dst)
   1622                 self.assertFalse(shutil._destinsrc(src, dst),
   1623                             msg='_destinsrc() wrongly concluded that '
   1624                             'dst (%s) is in src (%s)' % (dst, src))
   1625         finally:
   1626             shutil.rmtree(TESTFN, ignore_errors=True)
   1627 
   1628     @support.skip_unless_symlink
   1629     @mock_rename
   1630     def test_move_file_symlink(self):
   1631         dst = os.path.join(self.src_dir, 'bar')
   1632         os.symlink(self.src_file, dst)
   1633         shutil.move(dst, self.dst_file)
   1634         self.assertTrue(os.path.islink(self.dst_file))
   1635         self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
   1636 
   1637     @support.skip_unless_symlink
   1638     @mock_rename
   1639     def test_move_file_symlink_to_dir(self):
   1640         filename = "bar"
   1641         dst = os.path.join(self.src_dir, filename)
   1642         os.symlink(self.src_file, dst)
   1643         shutil.move(dst, self.dst_dir)
   1644         final_link = os.path.join(self.dst_dir, filename)
   1645         self.assertTrue(os.path.islink(final_link))
   1646         self.assertTrue(os.path.samefile(self.src_file, final_link))
   1647 
   1648     @support.skip_unless_symlink
   1649     @mock_rename
   1650     def test_move_dangling_symlink(self):
   1651         src = os.path.join(self.src_dir, 'baz')
   1652         dst = os.path.join(self.src_dir, 'bar')
   1653         os.symlink(src, dst)
   1654         dst_link = os.path.join(self.dst_dir, 'quux')
   1655         shutil.move(dst, dst_link)
   1656         self.assertTrue(os.path.islink(dst_link))
   1657         # On Windows, os.path.realpath does not follow symlinks (issue #9949)
   1658         if os.name == 'nt':
   1659             self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
   1660         else:
   1661             self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
   1662 
   1663     @support.skip_unless_symlink
   1664     @mock_rename
   1665     def test_move_dir_symlink(self):
   1666         src = os.path.join(self.src_dir, 'baz')
   1667         dst = os.path.join(self.src_dir, 'bar')
   1668         os.mkdir(src)
   1669         os.symlink(src, dst)
   1670         dst_link = os.path.join(self.dst_dir, 'quux')
   1671         shutil.move(dst, dst_link)
   1672         self.assertTrue(os.path.islink(dst_link))
   1673         self.assertTrue(os.path.samefile(src, dst_link))
   1674 
   1675     def test_move_return_value(self):
   1676         rv = shutil.move(self.src_file, self.dst_dir)
   1677         self.assertEqual(rv,
   1678                 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
   1679 
   1680     def test_move_as_rename_return_value(self):
   1681         rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
   1682         self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
   1683 
   1684     @mock_rename
   1685     def test_move_file_special_function(self):
   1686         moved = []
   1687         def _copy(src, dst):
   1688             moved.append((src, dst))
   1689         shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
   1690         self.assertEqual(len(moved), 1)
   1691 
   1692     @mock_rename
   1693     def test_move_dir_special_function(self):
   1694         moved = []
   1695         def _copy(src, dst):
   1696             moved.append((src, dst))
   1697         support.create_empty_file(os.path.join(self.src_dir, 'child'))
   1698         support.create_empty_file(os.path.join(self.src_dir, 'child1'))
   1699         shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
   1700         self.assertEqual(len(moved), 3)
   1701 
   1702 
   1703 class TestCopyFile(unittest.TestCase):
   1704 
   1705     _delete = False
   1706 
   1707     class Faux(object):
   1708         _entered = False
   1709         _exited_with = None
   1710         _raised = False
   1711         def __init__(self, raise_in_exit=False, suppress_at_exit=True):
   1712             self._raise_in_exit = raise_in_exit
   1713             self._suppress_at_exit = suppress_at_exit
   1714         def read(self, *args):
   1715             return ''
   1716         def __enter__(self):
   1717             self._entered = True
   1718         def __exit__(self, exc_type, exc_val, exc_tb):
   1719             self._exited_with = exc_type, exc_val, exc_tb
   1720             if self._raise_in_exit:
   1721                 self._raised = True
   1722                 raise OSError("Cannot close")
   1723             return self._suppress_at_exit
   1724 
   1725     def tearDown(self):
   1726         if self._delete:
   1727             del shutil.open
   1728 
   1729     def _set_shutil_open(self, func):
   1730         shutil.open = func
   1731         self._delete = True
   1732 
   1733     def test_w_source_open_fails(self):
   1734         def _open(filename, mode='r'):
   1735             if filename == 'srcfile':
   1736                 raise OSError('Cannot open "srcfile"')
   1737             assert 0  # shouldn't reach here.
   1738 
   1739         self._set_shutil_open(_open)
   1740 
   1741         self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
   1742 
   1743     def test_w_dest_open_fails(self):
   1744 
   1745         srcfile = self.Faux()
   1746 
   1747         def _open(filename, mode='r'):
   1748             if filename == 'srcfile':
   1749                 return srcfile
   1750             if filename == 'destfile':
   1751                 raise OSError('Cannot open "destfile"')
   1752             assert 0  # shouldn't reach here.
   1753 
   1754         self._set_shutil_open(_open)
   1755 
   1756         shutil.copyfile('srcfile', 'destfile')
   1757         self.assertTrue(srcfile._entered)
   1758         self.assertTrue(srcfile._exited_with[0] is OSError)
   1759         self.assertEqual(srcfile._exited_with[1].args,
   1760                          ('Cannot open "destfile"',))
   1761 
   1762     def test_w_dest_close_fails(self):
   1763 
   1764         srcfile = self.Faux()
   1765         destfile = self.Faux(True)
   1766 
   1767         def _open(filename, mode='r'):
   1768             if filename == 'srcfile':
   1769                 return srcfile
   1770             if filename == 'destfile':
   1771                 return destfile
   1772             assert 0  # shouldn't reach here.
   1773 
   1774         self._set_shutil_open(_open)
   1775 
   1776         shutil.copyfile('srcfile', 'destfile')
   1777         self.assertTrue(srcfile._entered)
   1778         self.assertTrue(destfile._entered)
   1779         self.assertTrue(destfile._raised)
   1780         self.assertTrue(srcfile._exited_with[0] is OSError)
   1781         self.assertEqual(srcfile._exited_with[1].args,
   1782                          ('Cannot close',))
   1783 
   1784     def test_w_source_close_fails(self):
   1785 
   1786         srcfile = self.Faux(True)
   1787         destfile = self.Faux()
   1788 
   1789         def _open(filename, mode='r'):
   1790             if filename == 'srcfile':
   1791                 return srcfile
   1792             if filename == 'destfile':
   1793                 return destfile
   1794             assert 0  # shouldn't reach here.
   1795 
   1796         self._set_shutil_open(_open)
   1797 
   1798         self.assertRaises(OSError,
   1799                           shutil.copyfile, 'srcfile', 'destfile')
   1800         self.assertTrue(srcfile._entered)
   1801         self.assertTrue(destfile._entered)
   1802         self.assertFalse(destfile._raised)
   1803         self.assertTrue(srcfile._exited_with[0] is None)
   1804         self.assertTrue(srcfile._raised)
   1805 
   1806     def test_move_dir_caseinsensitive(self):
   1807         # Renames a folder to the same name
   1808         # but a different case.
   1809 
   1810         self.src_dir = tempfile.mkdtemp()
   1811         self.addCleanup(shutil.rmtree, self.src_dir, True)
   1812         dst_dir = os.path.join(
   1813                 os.path.dirname(self.src_dir),
   1814                 os.path.basename(self.src_dir).upper())
   1815         self.assertNotEqual(self.src_dir, dst_dir)
   1816 
   1817         try:
   1818             shutil.move(self.src_dir, dst_dir)
   1819             self.assertTrue(os.path.isdir(dst_dir))
   1820         finally:
   1821             os.rmdir(dst_dir)
   1822 
   1823 class TermsizeTests(unittest.TestCase):
   1824     def test_does_not_crash(self):
   1825         """Check if get_terminal_size() returns a meaningful value.
   1826 
   1827         There's no easy portable way to actually check the size of the
   1828         terminal, so let's check if it returns something sensible instead.
   1829         """
   1830         size = shutil.get_terminal_size()
   1831         self.assertGreaterEqual(size.columns, 0)
   1832         self.assertGreaterEqual(size.lines, 0)
   1833 
   1834     def test_os_environ_first(self):
   1835         "Check if environment variables have precedence"
   1836 
   1837         with support.EnvironmentVarGuard() as env:
   1838             env['COLUMNS'] = '777'
   1839             del env['LINES']
   1840             size = shutil.get_terminal_size()
   1841         self.assertEqual(size.columns, 777)
   1842 
   1843         with support.EnvironmentVarGuard() as env:
   1844             del env['COLUMNS']
   1845             env['LINES'] = '888'
   1846             size = shutil.get_terminal_size()
   1847         self.assertEqual(size.lines, 888)
   1848 
   1849     def test_bad_environ(self):
   1850         with support.EnvironmentVarGuard() as env:
   1851             env['COLUMNS'] = 'xxx'
   1852             env['LINES'] = 'yyy'
   1853             size = shutil.get_terminal_size()
   1854         self.assertGreaterEqual(size.columns, 0)
   1855         self.assertGreaterEqual(size.lines, 0)
   1856 
   1857     @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
   1858     @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
   1859                          'need os.get_terminal_size()')
   1860     def test_stty_match(self):
   1861         """Check if stty returns the same results ignoring env
   1862 
   1863         This test will fail if stdin and stdout are connected to
   1864         different terminals with different sizes. Nevertheless, such
   1865         situations should be pretty rare.
   1866         """
   1867         try:
   1868             size = subprocess.check_output(['stty', 'size']).decode().split()
   1869         except (FileNotFoundError, PermissionError,
   1870                 subprocess.CalledProcessError):
   1871             self.skipTest("stty invocation failed")
   1872         expected = (int(size[1]), int(size[0])) # reversed order
   1873 
   1874         with support.EnvironmentVarGuard() as env:
   1875             del env['LINES']
   1876             del env['COLUMNS']
   1877             actual = shutil.get_terminal_size()
   1878 
   1879         self.assertEqual(expected, actual)
   1880 
   1881     def test_fallback(self):
   1882         with support.EnvironmentVarGuard() as env:
   1883             del env['LINES']
   1884             del env['COLUMNS']
   1885 
   1886             # sys.__stdout__ has no fileno()
   1887             with support.swap_attr(sys, '__stdout__', None):
   1888                 size = shutil.get_terminal_size(fallback=(10, 20))
   1889             self.assertEqual(size.columns, 10)
   1890             self.assertEqual(size.lines, 20)
   1891 
   1892             # sys.__stdout__ is not a terminal on Unix
   1893             # or fileno() not in (0, 1, 2) on Windows
   1894             with open(os.devnull, 'w') as f, \
   1895                  support.swap_attr(sys, '__stdout__', f):
   1896                 size = shutil.get_terminal_size(fallback=(30, 40))
   1897             self.assertEqual(size.columns, 30)
   1898             self.assertEqual(size.lines, 40)
   1899 
   1900 
   1901 class PublicAPITests(unittest.TestCase):
   1902     """Ensures that the correct values are exposed in the public API."""
   1903 
   1904     def test_module_all_attribute(self):
   1905         self.assertTrue(hasattr(shutil, '__all__'))
   1906         target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
   1907                       'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
   1908                       'SpecialFileError', 'ExecError', 'make_archive',
   1909                       'get_archive_formats', 'register_archive_format',
   1910                       'unregister_archive_format', 'get_unpack_formats',
   1911                       'register_unpack_format', 'unregister_unpack_format',
   1912                       'unpack_archive', 'ignore_patterns', 'chown', 'which',
   1913                       'get_terminal_size', 'SameFileError']
   1914         if hasattr(os, 'statvfs') or os.name == 'nt':
   1915             target_api.append('disk_usage')
   1916         self.assertEqual(set(shutil.__all__), set(target_api))
   1917 
   1918 
   1919 if __name__ == '__main__':
   1920     unittest.main()
   1921