Home | History | Annotate | Download | only in test
      1 # Copyright (C) 2003 Python Software Foundation
      2 
      3 import unittest
      4 import shutil
      5 import tempfile
      6 import sys
      7 import stat
      8 import os
      9 import os.path
     10 import errno
     11 from os.path import splitdrive
     12 from distutils.spawn import find_executable, spawn
     13 from shutil import (_make_tarball, _make_zipfile, make_archive,
     14                     register_archive_format, unregister_archive_format,
     15                     get_archive_formats)
     16 import tarfile
     17 import warnings
     18 
     19 from test import test_support
     20 from test.test_support import TESTFN, check_warnings, captured_stdout
     21 
     22 TESTFN2 = TESTFN + "2"
     23 
     24 try:
     25     import grp
     26     import pwd
     27     UID_GID_SUPPORT = True
     28 except ImportError:
     29     UID_GID_SUPPORT = False
     30 
     31 try:
     32     import zlib
     33 except ImportError:
     34     zlib = None
     35 
     36 try:
     37     import zipfile
     38     ZIP_SUPPORT = True
     39 except ImportError:
     40     ZIP_SUPPORT = find_executable('zip')
     41 
     42 class TestShutil(unittest.TestCase):
     43 
     44     def setUp(self):
     45         super(TestShutil, self).setUp()
     46         self.tempdirs = []
     47 
     48     def tearDown(self):
     49         super(TestShutil, self).tearDown()
     50         while self.tempdirs:
     51             d = self.tempdirs.pop()
     52             shutil.rmtree(d, os.name in ('nt', 'cygwin'))
     53 
     54     def write_file(self, path, content='xxx'):
     55         """Writes a file in the given path.
     56 
     57 
     58         path can be a string or a sequence.
     59         """
     60         if isinstance(path, (list, tuple)):
     61             path = os.path.join(*path)
     62         f = open(path, 'w')
     63         try:
     64             f.write(content)
     65         finally:
     66             f.close()
     67 
     68     def mkdtemp(self):
     69         """Create a temporary directory that will be cleaned up.
     70 
     71         Returns the path of the directory.
     72         """
     73         d = tempfile.mkdtemp()
     74         self.tempdirs.append(d)
     75         return d
     76     def test_rmtree_errors(self):
     77         # filename is guaranteed not to exist
     78         filename = tempfile.mktemp()
     79         self.assertRaises(OSError, shutil.rmtree, filename)
     80 
     81     # See bug #1071513 for why we don't run this on cygwin
     82     # and bug #1076467 for why we don't run this as root.
     83     if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin'
     84         and not (hasattr(os, 'geteuid') and os.geteuid() == 0)):
     85         def test_on_error(self):
     86             self.errorState = 0
     87             os.mkdir(TESTFN)
     88             self.childpath = os.path.join(TESTFN, 'a')
     89             f = open(self.childpath, 'w')
     90             f.close()
     91             old_dir_mode = os.stat(TESTFN).st_mode
     92             old_child_mode = os.stat(self.childpath).st_mode
     93             # Make unwritable.
     94             os.chmod(self.childpath, stat.S_IREAD)
     95             os.chmod(TESTFN, stat.S_IREAD)
     96 
     97             shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
     98             # Test whether onerror has actually been called.
     99             self.assertEqual(self.errorState, 2,
    100                              "Expected call to onerror function did not happen.")
    101 
    102             # Make writable again.
    103             os.chmod(TESTFN, old_dir_mode)
    104             os.chmod(self.childpath, old_child_mode)
    105 
    106             # Clean up.
    107             shutil.rmtree(TESTFN)
    108 
    109     def check_args_to_onerror(self, func, arg, exc):
    110         # test_rmtree_errors deliberately runs rmtree
    111         # on a directory that is chmod 400, which will fail.
    112         # This function is run when shutil.rmtree fails.
    113         # 99.9% of the time it initially fails to remove
    114         # a file in the directory, so the first time through
    115         # func is os.remove.
    116         # However, some Linux machines running ZFS on
    117         # FUSE experienced a failure earlier in the process
    118         # at os.listdir.  The first failure may legally
    119         # be either.
    120         if self.errorState == 0:
    121             if func is os.remove:
    122                 self.assertEqual(arg, self.childpath)
    123             else:
    124                 self.assertIs(func, os.listdir,
    125                               "func must be either os.remove or os.listdir")
    126                 self.assertEqual(arg, TESTFN)
    127             self.assertTrue(issubclass(exc[0], OSError))
    128             self.errorState = 1
    129         else:
    130             self.assertEqual(func, os.rmdir)
    131             self.assertEqual(arg, TESTFN)
    132             self.assertTrue(issubclass(exc[0], OSError))
    133             self.errorState = 2
    134 
    135     def test_rmtree_dont_delete_file(self):
    136         # When called on a file instead of a directory, don't delete it.
    137         handle, path = tempfile.mkstemp()
    138         os.fdopen(handle).close()
    139         self.assertRaises(OSError, shutil.rmtree, path)
    140         os.remove(path)
    141 
    142     def test_copytree_simple(self):
    143         def write_data(path, data):
    144             f = open(path, "w")
    145             f.write(data)
    146             f.close()
    147 
    148         def read_data(path):
    149             f = open(path)
    150             data = f.read()
    151             f.close()
    152             return data
    153 
    154         src_dir = tempfile.mkdtemp()
    155         dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
    156 
    157         write_data(os.path.join(src_dir, 'test.txt'), '123')
    158 
    159         os.mkdir(os.path.join(src_dir, 'test_dir'))
    160         write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
    161 
    162         try:
    163             shutil.copytree(src_dir, dst_dir)
    164             self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
    165             self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
    166             self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
    167                                                         'test.txt')))
    168             actual = read_data(os.path.join(dst_dir, 'test.txt'))
    169             self.assertEqual(actual, '123')
    170             actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
    171             self.assertEqual(actual, '456')
    172         finally:
    173             for path in (
    174                     os.path.join(src_dir, 'test.txt'),
    175                     os.path.join(dst_dir, 'test.txt'),
    176                     os.path.join(src_dir, 'test_dir', 'test.txt'),
    177                     os.path.join(dst_dir, 'test_dir', 'test.txt'),
    178                 ):
    179                 if os.path.exists(path):
    180                     os.remove(path)
    181             for path in (src_dir,
    182                     os.path.dirname(dst_dir)
    183                 ):
    184                 if os.path.exists(path):
    185                     shutil.rmtree(path)
    186 
    187     def test_copytree_with_exclude(self):
    188 
    189         def write_data(path, data):
    190             f = open(path, "w")
    191             f.write(data)
    192             f.close()
    193 
    194         def read_data(path):
    195             f = open(path)
    196             data = f.read()
    197             f.close()
    198             return data
    199 
    200         # creating data
    201         join = os.path.join
    202         exists = os.path.exists
    203         src_dir = tempfile.mkdtemp()
    204         try:
    205             dst_dir = join(tempfile.mkdtemp(), 'destination')
    206             write_data(join(src_dir, 'test.txt'), '123')
    207             write_data(join(src_dir, 'test.tmp'), '123')
    208             os.mkdir(join(src_dir, 'test_dir'))
    209             write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
    210             os.mkdir(join(src_dir, 'test_dir2'))
    211             write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
    212             os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
    213             os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
    214             write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
    215             write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
    216 
    217 
    218             # testing glob-like patterns
    219             try:
    220                 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
    221                 shutil.copytree(src_dir, dst_dir, ignore=patterns)
    222                 # checking the result: some elements should not be copied
    223                 self.assertTrue(exists(join(dst_dir, 'test.txt')))
    224                 self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
    225                 self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
    226             finally:
    227                 if os.path.exists(dst_dir):
    228                     shutil.rmtree(dst_dir)
    229             try:
    230                 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
    231                 shutil.copytree(src_dir, dst_dir, ignore=patterns)
    232                 # checking the result: some elements should not be copied
    233                 self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
    234                 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
    235                 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
    236             finally:
    237                 if os.path.exists(dst_dir):
    238                     shutil.rmtree(dst_dir)
    239 
    240             # testing callable-style
    241             try:
    242                 def _filter(src, names):
    243                     res = []
    244                     for name in names:
    245                         path = os.path.join(src, name)
    246 
    247                         if (os.path.isdir(path) and
    248                             path.split()[-1] == 'subdir'):
    249                             res.append(name)
    250                         elif os.path.splitext(path)[-1] in ('.py'):
    251                             res.append(name)
    252                     return res
    253 
    254                 shutil.copytree(src_dir, dst_dir, ignore=_filter)
    255 
    256                 # checking the result: some elements should not be copied
    257                 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
    258                                         'test.py')))
    259                 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
    260 
    261             finally:
    262                 if os.path.exists(dst_dir):
    263                     shutil.rmtree(dst_dir)
    264         finally:
    265             shutil.rmtree(src_dir)
    266             shutil.rmtree(os.path.dirname(dst_dir))
    267 
    268     if hasattr(os, "symlink"):
    269         def test_dont_copy_file_onto_link_to_itself(self):
    270             # bug 851123.
    271             os.mkdir(TESTFN)
    272             src = os.path.join(TESTFN, 'cheese')
    273             dst = os.path.join(TESTFN, 'shop')
    274             try:
    275                 f = open(src, 'w')
    276                 f.write('cheddar')
    277                 f.close()
    278 
    279                 os.link(src, dst)
    280                 self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
    281                 with open(src, 'r') as f:
    282                     self.assertEqual(f.read(), 'cheddar')
    283                 os.remove(dst)
    284 
    285                 # Using `src` here would mean we end up with a symlink pointing
    286                 # to TESTFN/TESTFN/cheese, while it should point at
    287                 # TESTFN/cheese.
    288                 os.symlink('cheese', dst)
    289                 self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
    290                 with open(src, 'r') as f:
    291                     self.assertEqual(f.read(), 'cheddar')
    292                 os.remove(dst)
    293             finally:
    294                 try:
    295                     shutil.rmtree(TESTFN)
    296                 except OSError:
    297                     pass
    298 
    299         def test_rmtree_on_symlink(self):
    300             # bug 1669.
    301             os.mkdir(TESTFN)
    302             try:
    303                 src = os.path.join(TESTFN, 'cheese')
    304                 dst = os.path.join(TESTFN, 'shop')
    305                 os.mkdir(src)
    306                 os.symlink(src, dst)
    307                 self.assertRaises(OSError, shutil.rmtree, dst)
    308             finally:
    309                 shutil.rmtree(TESTFN, ignore_errors=True)
    310 
    311     if hasattr(os, "mkfifo"):
    312         # Issue #3002: copyfile and copytree block indefinitely on named pipes
    313         def test_copyfile_named_pipe(self):
    314             os.mkfifo(TESTFN)
    315             try:
    316                 self.assertRaises(shutil.SpecialFileError,
    317                                   shutil.copyfile, TESTFN, TESTFN2)
    318                 self.assertRaises(shutil.SpecialFileError,
    319                                   shutil.copyfile, __file__, TESTFN)
    320             finally:
    321                 os.remove(TESTFN)
    322 
    323         def test_copytree_named_pipe(self):
    324             os.mkdir(TESTFN)
    325             try:
    326                 subdir = os.path.join(TESTFN, "subdir")
    327                 os.mkdir(subdir)
    328                 pipe = os.path.join(subdir, "mypipe")
    329                 os.mkfifo(pipe)
    330                 try:
    331                     shutil.copytree(TESTFN, TESTFN2)
    332                 except shutil.Error as e:
    333                     errors = e.args[0]
    334                     self.assertEqual(len(errors), 1)
    335                     src, dst, error_msg = errors[0]
    336                     self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
    337                 else:
    338                     self.fail("shutil.Error should have been raised")
    339             finally:
    340                 shutil.rmtree(TESTFN, ignore_errors=True)
    341                 shutil.rmtree(TESTFN2, ignore_errors=True)
    342 
    343     @unittest.skipUnless(hasattr(os, 'chflags') and
    344                          hasattr(errno, 'EOPNOTSUPP') and
    345                          hasattr(errno, 'ENOTSUP'),
    346                          "requires os.chflags, EOPNOTSUPP & ENOTSUP")
    347     def test_copystat_handles_harmless_chflags_errors(self):
    348         tmpdir = self.mkdtemp()
    349         file1 = os.path.join(tmpdir, 'file1')
    350         file2 = os.path.join(tmpdir, 'file2')
    351         self.write_file(file1, 'xxx')
    352         self.write_file(file2, 'xxx')
    353 
    354         def make_chflags_raiser(err):
    355             ex = OSError()
    356 
    357             def _chflags_raiser(path, flags):
    358                 ex.errno = err
    359                 raise ex
    360             return _chflags_raiser
    361         old_chflags = os.chflags
    362         try:
    363             for err in errno.EOPNOTSUPP, errno.ENOTSUP:
    364                 os.chflags = make_chflags_raiser(err)
    365                 shutil.copystat(file1, file2)
    366             # assert others errors break it
    367             os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
    368             self.assertRaises(OSError, shutil.copystat, file1, file2)
    369         finally:
    370             os.chflags = old_chflags
    371 
    372     @unittest.skipUnless(zlib, "requires zlib")
    373     def test_make_tarball(self):
    374         # creating something to tar
    375         tmpdir = self.mkdtemp()
    376         self.write_file([tmpdir, 'file1'], 'xxx')
    377         self.write_file([tmpdir, 'file2'], 'xxx')
    378         os.mkdir(os.path.join(tmpdir, 'sub'))
    379         self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
    380 
    381         tmpdir2 = self.mkdtemp()
    382         # force shutil to create the directory
    383         os.rmdir(tmpdir2)
    384         unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
    385                             "source and target should be on same drive")
    386 
    387         base_name = os.path.join(tmpdir2, 'archive')
    388 
    389         # working with relative paths to avoid tar warnings
    390         old_dir = os.getcwd()
    391         os.chdir(tmpdir)
    392         try:
    393             _make_tarball(splitdrive(base_name)[1], '.')
    394         finally:
    395             os.chdir(old_dir)
    396 
    397         # check if the compressed tarball was created
    398         tarball = base_name + '.tar.gz'
    399         self.assertTrue(os.path.exists(tarball))
    400 
    401         # trying an uncompressed one
    402         base_name = os.path.join(tmpdir2, 'archive')
    403         old_dir = os.getcwd()
    404         os.chdir(tmpdir)
    405         try:
    406             _make_tarball(splitdrive(base_name)[1], '.', compress=None)
    407         finally:
    408             os.chdir(old_dir)
    409         tarball = base_name + '.tar'
    410         self.assertTrue(os.path.exists(tarball))
    411 
    412     def _tarinfo(self, path):
    413         tar = tarfile.open(path)
    414         try:
    415             names = tar.getnames()
    416             names.sort()
    417             return tuple(names)
    418         finally:
    419             tar.close()
    420 
    421     def _create_files(self):
    422         # creating something to tar
    423         tmpdir = self.mkdtemp()
    424         dist = os.path.join(tmpdir, 'dist')
    425         os.mkdir(dist)
    426         self.write_file([dist, 'file1'], 'xxx')
    427         self.write_file([dist, 'file2'], 'xxx')
    428         os.mkdir(os.path.join(dist, 'sub'))
    429         self.write_file([dist, 'sub', 'file3'], 'xxx')
    430         os.mkdir(os.path.join(dist, 'sub2'))
    431         tmpdir2 = self.mkdtemp()
    432         base_name = os.path.join(tmpdir2, 'archive')
    433         return tmpdir, tmpdir2, base_name
    434 
    435     @unittest.skipUnless(zlib, "Requires zlib")
    436     @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
    437                          'Need the tar command to run')
    438     def test_tarfile_vs_tar(self):
    439         tmpdir, tmpdir2, base_name =  self._create_files()
    440         old_dir = os.getcwd()
    441         os.chdir(tmpdir)
    442         try:
    443             _make_tarball(base_name, 'dist')
    444         finally:
    445             os.chdir(old_dir)
    446 
    447         # check if the compressed tarball was created
    448         tarball = base_name + '.tar.gz'
    449         self.assertTrue(os.path.exists(tarball))
    450 
    451         # now create another tarball using `tar`
    452         tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
    453         tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
    454         gzip_cmd = ['gzip', '-f9', 'archive2.tar']
    455         old_dir = os.getcwd()
    456         os.chdir(tmpdir)
    457         try:
    458             with captured_stdout() as s:
    459                 spawn(tar_cmd)
    460                 spawn(gzip_cmd)
    461         finally:
    462             os.chdir(old_dir)
    463 
    464         self.assertTrue(os.path.exists(tarball2))
    465         # let's compare both tarballs
    466         self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
    467 
    468         # trying an uncompressed one
    469         base_name = os.path.join(tmpdir2, 'archive')
    470         old_dir = os.getcwd()
    471         os.chdir(tmpdir)
    472         try:
    473             _make_tarball(base_name, 'dist', compress=None)
    474         finally:
    475             os.chdir(old_dir)
    476         tarball = base_name + '.tar'
    477         self.assertTrue(os.path.exists(tarball))
    478 
    479         # now for a dry_run
    480         base_name = os.path.join(tmpdir2, 'archive')
    481         old_dir = os.getcwd()
    482         os.chdir(tmpdir)
    483         try:
    484             _make_tarball(base_name, 'dist', compress=None, dry_run=True)
    485         finally:
    486             os.chdir(old_dir)
    487         tarball = base_name + '.tar'
    488         self.assertTrue(os.path.exists(tarball))
    489 
    490     @unittest.skipUnless(zlib, "Requires zlib")
    491     @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
    492     def test_make_zipfile(self):
    493         # creating something to tar
    494         tmpdir = self.mkdtemp()
    495         self.write_file([tmpdir, 'file1'], 'xxx')
    496         self.write_file([tmpdir, 'file2'], 'xxx')
    497 
    498         tmpdir2 = self.mkdtemp()
    499         # force shutil to create the directory
    500         os.rmdir(tmpdir2)
    501         base_name = os.path.join(tmpdir2, 'archive')
    502         _make_zipfile(base_name, tmpdir)
    503 
    504         # check if the compressed tarball was created
    505         tarball = base_name + '.zip'
    506         self.assertTrue(os.path.exists(tarball))
    507 
    508 
    509     def test_make_archive(self):
    510         tmpdir = self.mkdtemp()
    511         base_name = os.path.join(tmpdir, 'archive')
    512         self.assertRaises(ValueError, make_archive, base_name, 'xxx')
    513 
    514     @unittest.skipUnless(zlib, "Requires zlib")
    515     def test_make_archive_owner_group(self):
    516         # testing make_archive with owner and group, with various combinations
    517         # this works even if there's not gid/uid support
    518         if UID_GID_SUPPORT:
    519             group = grp.getgrgid(0)[0]
    520             owner = pwd.getpwuid(0)[0]
    521         else:
    522             group = owner = 'root'
    523 
    524         base_dir, root_dir, base_name =  self._create_files()
    525         base_name = os.path.join(self.mkdtemp() , 'archive')
    526         res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
    527                            group=group)
    528         self.assertTrue(os.path.exists(res))
    529 
    530         res = make_archive(base_name, 'zip', root_dir, base_dir)
    531         self.assertTrue(os.path.exists(res))
    532 
    533         res = make_archive(base_name, 'tar', root_dir, base_dir,
    534                            owner=owner, group=group)
    535         self.assertTrue(os.path.exists(res))
    536 
    537         res = make_archive(base_name, 'tar', root_dir, base_dir,
    538                            owner='kjhkjhkjg', group='oihohoh')
    539         self.assertTrue(os.path.exists(res))
    540 
    541     @unittest.skipUnless(zlib, "Requires zlib")
    542     @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
    543     def test_tarfile_root_owner(self):
    544         tmpdir, tmpdir2, base_name =  self._create_files()
    545         old_dir = os.getcwd()
    546         os.chdir(tmpdir)
    547         group = grp.getgrgid(0)[0]
    548         owner = pwd.getpwuid(0)[0]
    549         try:
    550             archive_name = _make_tarball(base_name, 'dist', compress=None,
    551                                          owner=owner, group=group)
    552         finally:
    553             os.chdir(old_dir)
    554 
    555         # check if the compressed tarball was created
    556         self.assertTrue(os.path.exists(archive_name))
    557 
    558         # now checks the rights
    559         archive = tarfile.open(archive_name)
    560         try:
    561             for member in archive.getmembers():
    562                 self.assertEqual(member.uid, 0)
    563                 self.assertEqual(member.gid, 0)
    564         finally:
    565             archive.close()
    566 
    567     def test_make_archive_cwd(self):
    568         current_dir = os.getcwd()
    569         def _breaks(*args, **kw):
    570             raise RuntimeError()
    571 
    572         register_archive_format('xxx', _breaks, [], 'xxx file')
    573         try:
    574             try:
    575                 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
    576             except Exception:
    577                 pass
    578             self.assertEqual(os.getcwd(), current_dir)
    579         finally:
    580             unregister_archive_format('xxx')
    581 
    582     def test_register_archive_format(self):
    583 
    584         self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
    585         self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
    586                           1)
    587         self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
    588                           [(1, 2), (1, 2, 3)])
    589 
    590         register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
    591         formats = [name for name, params in get_archive_formats()]
    592         self.assertIn('xxx', formats)
    593 
    594         unregister_archive_format('xxx')
    595         formats = [name for name, params in get_archive_formats()]
    596         self.assertNotIn('xxx', formats)
    597 
    598 
    599 class TestMove(unittest.TestCase):
    600 
    601     def setUp(self):
    602         filename = "foo"
    603         self.src_dir = tempfile.mkdtemp()
    604         self.dst_dir = tempfile.mkdtemp()
    605         self.src_file = os.path.join(self.src_dir, filename)
    606         self.dst_file = os.path.join(self.dst_dir, filename)
    607         # Try to create a dir in the current directory, hoping that it is
    608         # not located on the same filesystem as the system tmp dir.
    609         try:
    610             self.dir_other_fs = tempfile.mkdtemp(
    611                 dir=os.path.dirname(__file__))
    612             self.file_other_fs = os.path.join(self.dir_other_fs,
    613                 filename)
    614         except OSError:
    615             self.dir_other_fs = None
    616         with open(self.src_file, "wb") as f:
    617             f.write("spam")
    618 
    619     def tearDown(self):
    620         for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
    621             try:
    622                 if d:
    623                     shutil.rmtree(d)
    624             except:
    625                 pass
    626 
    627     def _check_move_file(self, src, dst, real_dst):
    628         with open(src, "rb") as f:
    629             contents = f.read()
    630         shutil.move(src, dst)
    631         with open(real_dst, "rb") as f:
    632             self.assertEqual(contents, f.read())
    633         self.assertFalse(os.path.exists(src))
    634 
    635     def _check_move_dir(self, src, dst, real_dst):
    636         contents = sorted(os.listdir(src))
    637         shutil.move(src, dst)
    638         self.assertEqual(contents, sorted(os.listdir(real_dst)))
    639         self.assertFalse(os.path.exists(src))
    640 
    641     def test_move_file(self):
    642         # Move a file to another location on the same filesystem.
    643         self._check_move_file(self.src_file, self.dst_file, self.dst_file)
    644 
    645     def test_move_file_to_dir(self):
    646         # Move a file inside an existing dir on the same filesystem.
    647         self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
    648 
    649     def test_move_file_other_fs(self):
    650         # Move a file to an existing dir on another filesystem.
    651         if not self.dir_other_fs:
    652             # skip
    653             return
    654         self._check_move_file(self.src_file, self.file_other_fs,
    655             self.file_other_fs)
    656 
    657     def test_move_file_to_dir_other_fs(self):
    658         # Move a file to another location on another filesystem.
    659         if not self.dir_other_fs:
    660             # skip
    661             return
    662         self._check_move_file(self.src_file, self.dir_other_fs,
    663             self.file_other_fs)
    664 
    665     def test_move_dir(self):
    666         # Move a dir to another location on the same filesystem.
    667         dst_dir = tempfile.mktemp()
    668         try:
    669             self._check_move_dir(self.src_dir, dst_dir, dst_dir)
    670         finally:
    671             try:
    672                 shutil.rmtree(dst_dir)
    673             except:
    674                 pass
    675 
    676     def test_move_dir_other_fs(self):
    677         # Move a dir to another location on another filesystem.
    678         if not self.dir_other_fs:
    679             # skip
    680             return
    681         dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
    682         try:
    683             self._check_move_dir(self.src_dir, dst_dir, dst_dir)
    684         finally:
    685             try:
    686                 shutil.rmtree(dst_dir)
    687             except:
    688                 pass
    689 
    690     def test_move_dir_to_dir(self):
    691         # Move a dir inside an existing dir on the same filesystem.
    692         self._check_move_dir(self.src_dir, self.dst_dir,
    693             os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
    694 
    695     def test_move_dir_to_dir_other_fs(self):
    696         # Move a dir inside an existing dir on another filesystem.
    697         if not self.dir_other_fs:
    698             # skip
    699             return
    700         self._check_move_dir(self.src_dir, self.dir_other_fs,
    701             os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
    702 
    703     def test_existing_file_inside_dest_dir(self):
    704         # A file with the same name inside the destination dir already exists.
    705         with open(self.dst_file, "wb"):
    706             pass
    707         self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
    708 
    709     def test_dont_move_dir_in_itself(self):
    710         # Moving a dir inside itself raises an Error.
    711         dst = os.path.join(self.src_dir, "bar")
    712         self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
    713 
    714     def test_destinsrc_false_negative(self):
    715         os.mkdir(TESTFN)
    716         try:
    717             for src, dst in [('srcdir', 'srcdir/dest')]:
    718                 src = os.path.join(TESTFN, src)
    719                 dst = os.path.join(TESTFN, dst)
    720                 self.assertTrue(shutil._destinsrc(src, dst),
    721                              msg='_destinsrc() wrongly concluded that '
    722                              'dst (%s) is not in src (%s)' % (dst, src))
    723         finally:
    724             shutil.rmtree(TESTFN, ignore_errors=True)
    725 
    726     def test_destinsrc_false_positive(self):
    727         os.mkdir(TESTFN)
    728         try:
    729             for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
    730                 src = os.path.join(TESTFN, src)
    731                 dst = os.path.join(TESTFN, dst)
    732                 self.assertFalse(shutil._destinsrc(src, dst),
    733                             msg='_destinsrc() wrongly concluded that '
    734                             'dst (%s) is in src (%s)' % (dst, src))
    735         finally:
    736             shutil.rmtree(TESTFN, ignore_errors=True)
    737 
    738 
    739 class TestCopyFile(unittest.TestCase):
    740 
    741     _delete = False
    742 
    743     class Faux(object):
    744         _entered = False
    745         _exited_with = None
    746         _raised = False
    747         def __init__(self, raise_in_exit=False, suppress_at_exit=True):
    748             self._raise_in_exit = raise_in_exit
    749             self._suppress_at_exit = suppress_at_exit
    750         def read(self, *args):
    751             return ''
    752         def __enter__(self):
    753             self._entered = True
    754         def __exit__(self, exc_type, exc_val, exc_tb):
    755             self._exited_with = exc_type, exc_val, exc_tb
    756             if self._raise_in_exit:
    757                 self._raised = True
    758                 raise IOError("Cannot close")
    759             return self._suppress_at_exit
    760 
    761     def tearDown(self):
    762         if self._delete:
    763             del shutil.open
    764 
    765     def _set_shutil_open(self, func):
    766         shutil.open = func
    767         self._delete = True
    768 
    769     def test_w_source_open_fails(self):
    770         def _open(filename, mode='r'):
    771             if filename == 'srcfile':
    772                 raise IOError('Cannot open "srcfile"')
    773             assert 0  # shouldn't reach here.
    774 
    775         self._set_shutil_open(_open)
    776 
    777         self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
    778 
    779     def test_w_dest_open_fails(self):
    780 
    781         srcfile = self.Faux()
    782 
    783         def _open(filename, mode='r'):
    784             if filename == 'srcfile':
    785                 return srcfile
    786             if filename == 'destfile':
    787                 raise IOError('Cannot open "destfile"')
    788             assert 0  # shouldn't reach here.
    789 
    790         self._set_shutil_open(_open)
    791 
    792         shutil.copyfile('srcfile', 'destfile')
    793         self.assertTrue(srcfile._entered)
    794         self.assertTrue(srcfile._exited_with[0] is IOError)
    795         self.assertEqual(srcfile._exited_with[1].args,
    796                          ('Cannot open "destfile"',))
    797 
    798     def test_w_dest_close_fails(self):
    799 
    800         srcfile = self.Faux()
    801         destfile = self.Faux(True)
    802 
    803         def _open(filename, mode='r'):
    804             if filename == 'srcfile':
    805                 return srcfile
    806             if filename == 'destfile':
    807                 return destfile
    808             assert 0  # shouldn't reach here.
    809 
    810         self._set_shutil_open(_open)
    811 
    812         shutil.copyfile('srcfile', 'destfile')
    813         self.assertTrue(srcfile._entered)
    814         self.assertTrue(destfile._entered)
    815         self.assertTrue(destfile._raised)
    816         self.assertTrue(srcfile._exited_with[0] is IOError)
    817         self.assertEqual(srcfile._exited_with[1].args,
    818                          ('Cannot close',))
    819 
    820     def test_w_source_close_fails(self):
    821 
    822         srcfile = self.Faux(True)
    823         destfile = self.Faux()
    824 
    825         def _open(filename, mode='r'):
    826             if filename == 'srcfile':
    827                 return srcfile
    828             if filename == 'destfile':
    829                 return destfile
    830             assert 0  # shouldn't reach here.
    831 
    832         self._set_shutil_open(_open)
    833 
    834         self.assertRaises(IOError,
    835                           shutil.copyfile, 'srcfile', 'destfile')
    836         self.assertTrue(srcfile._entered)
    837         self.assertTrue(destfile._entered)
    838         self.assertFalse(destfile._raised)
    839         self.assertTrue(srcfile._exited_with[0] is None)
    840         self.assertTrue(srcfile._raised)
    841 
    842     def test_move_dir_caseinsensitive(self):
    843         # Renames a folder to the same name
    844         # but a different case.
    845 
    846         self.src_dir = tempfile.mkdtemp()
    847         dst_dir = os.path.join(
    848                 os.path.dirname(self.src_dir),
    849                 os.path.basename(self.src_dir).upper())
    850         self.assertNotEqual(self.src_dir, dst_dir)
    851 
    852         try:
    853             shutil.move(self.src_dir, dst_dir)
    854             self.assertTrue(os.path.isdir(dst_dir))
    855         finally:
    856             if os.path.exists(dst_dir):
    857                 os.rmdir(dst_dir)
    858 
    859 
    860 
    861 def test_main():
    862     test_support.run_unittest(TestShutil, TestMove, TestCopyFile)
    863 
    864 if __name__ == '__main__':
    865     test_main()
    866