1 # Copyright (C) 2003 Python Software Foundation 2 3 import unittest 4 import unittest.mock 5 import shutil 6 import tempfile 7 import sys 8 import stat 9 import os 10 import os.path 11 import errno 12 import functools 13 import subprocess 14 from contextlib import ExitStack 15 from shutil import (make_archive, 16 register_archive_format, unregister_archive_format, 17 get_archive_formats, Error, unpack_archive, 18 register_unpack_format, RegistryError, 19 unregister_unpack_format, get_unpack_formats, 20 SameFileError) 21 import tarfile 22 import zipfile 23 import warnings 24 25 from test import support 26 from test.support import (TESTFN, check_warnings, captured_stdout, 27 android_not_root) 28 29 TESTFN2 = TESTFN + "2" 30 31 try: 32 import grp 33 import pwd 34 UID_GID_SUPPORT = True 35 except ImportError: 36 UID_GID_SUPPORT = False 37 38 def _fake_rename(*args, **kwargs): 39 # Pretend the destination path is on a different filesystem. 40 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link") 41 42 def mock_rename(func): 43 @functools.wraps(func) 44 def wrap(*args, **kwargs): 45 try: 46 builtin_rename = os.rename 47 os.rename = _fake_rename 48 return func(*args, **kwargs) 49 finally: 50 os.rename = builtin_rename 51 return wrap 52 53 def write_file(path, content, binary=False): 54 """Write *content* to a file located at *path*. 55 56 If *path* is a tuple instead of a string, os.path.join will be used to 57 make a path. If *binary* is true, the file will be opened in binary 58 mode. 59 """ 60 if isinstance(path, tuple): 61 path = os.path.join(*path) 62 with open(path, 'wb' if binary else 'w') as fp: 63 fp.write(content) 64 65 def read_file(path, binary=False): 66 """Return contents from a file located at *path*. 67 68 If *path* is a tuple instead of a string, os.path.join will be used to 69 make a path. If *binary* is true, the file will be opened in binary 70 mode. 71 """ 72 if isinstance(path, tuple): 73 path = os.path.join(*path) 74 with open(path, 'rb' if binary else 'r') as fp: 75 return fp.read() 76 77 def rlistdir(path): 78 res = [] 79 for name in sorted(os.listdir(path)): 80 p = os.path.join(path, name) 81 if os.path.isdir(p) and not os.path.islink(p): 82 res.append(name + '/') 83 for n in rlistdir(p): 84 res.append(name + '/' + n) 85 else: 86 res.append(name) 87 return res 88 89 90 class TestShutil(unittest.TestCase): 91 92 def setUp(self): 93 super(TestShutil, self).setUp() 94 self.tempdirs = [] 95 96 def tearDown(self): 97 super(TestShutil, self).tearDown() 98 while self.tempdirs: 99 d = self.tempdirs.pop() 100 shutil.rmtree(d, os.name in ('nt', 'cygwin')) 101 102 103 def mkdtemp(self): 104 """Create a temporary directory that will be cleaned up. 105 106 Returns the path of the directory. 107 """ 108 d = tempfile.mkdtemp() 109 self.tempdirs.append(d) 110 return d 111 112 def test_rmtree_works_on_bytes(self): 113 tmp = self.mkdtemp() 114 victim = os.path.join(tmp, 'killme') 115 os.mkdir(victim) 116 write_file(os.path.join(victim, 'somefile'), 'foo') 117 victim = os.fsencode(victim) 118 self.assertIsInstance(victim, bytes) 119 shutil.rmtree(victim) 120 121 @support.skip_unless_symlink 122 def test_rmtree_fails_on_symlink(self): 123 tmp = self.mkdtemp() 124 dir_ = os.path.join(tmp, 'dir') 125 os.mkdir(dir_) 126 link = os.path.join(tmp, 'link') 127 os.symlink(dir_, link) 128 self.assertRaises(OSError, shutil.rmtree, link) 129 self.assertTrue(os.path.exists(dir_)) 130 self.assertTrue(os.path.lexists(link)) 131 errors = [] 132 def onerror(*args): 133 errors.append(args) 134 shutil.rmtree(link, onerror=onerror) 135 self.assertEqual(len(errors), 1) 136 self.assertIs(errors[0][0], os.path.islink) 137 self.assertEqual(errors[0][1], link) 138 self.assertIsInstance(errors[0][2][1], OSError) 139 140 @support.skip_unless_symlink 141 def test_rmtree_works_on_symlinks(self): 142 tmp = self.mkdtemp() 143 dir1 = os.path.join(tmp, 'dir1') 144 dir2 = os.path.join(dir1, 'dir2') 145 dir3 = os.path.join(tmp, 'dir3') 146 for d in dir1, dir2, dir3: 147 os.mkdir(d) 148 file1 = os.path.join(tmp, 'file1') 149 write_file(file1, 'foo') 150 link1 = os.path.join(dir1, 'link1') 151 os.symlink(dir2, link1) 152 link2 = os.path.join(dir1, 'link2') 153 os.symlink(dir3, link2) 154 link3 = os.path.join(dir1, 'link3') 155 os.symlink(file1, link3) 156 # make sure symlinks are removed but not followed 157 shutil.rmtree(dir1) 158 self.assertFalse(os.path.exists(dir1)) 159 self.assertTrue(os.path.exists(dir3)) 160 self.assertTrue(os.path.exists(file1)) 161 162 def test_rmtree_errors(self): 163 # filename is guaranteed not to exist 164 filename = tempfile.mktemp() 165 self.assertRaises(FileNotFoundError, shutil.rmtree, filename) 166 # test that ignore_errors option is honored 167 shutil.rmtree(filename, ignore_errors=True) 168 169 # existing file 170 tmpdir = self.mkdtemp() 171 write_file((tmpdir, "tstfile"), "") 172 filename = os.path.join(tmpdir, "tstfile") 173 with self.assertRaises(NotADirectoryError) as cm: 174 shutil.rmtree(filename) 175 # The reason for this rather odd construct is that Windows sprinkles 176 # a \*.* at the end of file names. But only sometimes on some buildbots 177 possible_args = [filename, os.path.join(filename, '*.*')] 178 self.assertIn(cm.exception.filename, possible_args) 179 self.assertTrue(os.path.exists(filename)) 180 # test that ignore_errors option is honored 181 shutil.rmtree(filename, ignore_errors=True) 182 self.assertTrue(os.path.exists(filename)) 183 errors = [] 184 def onerror(*args): 185 errors.append(args) 186 shutil.rmtree(filename, onerror=onerror) 187 self.assertEqual(len(errors), 2) 188 self.assertIs(errors[0][0], os.listdir) 189 self.assertEqual(errors[0][1], filename) 190 self.assertIsInstance(errors[0][2][1], NotADirectoryError) 191 self.assertIn(errors[0][2][1].filename, possible_args) 192 self.assertIs(errors[1][0], os.rmdir) 193 self.assertEqual(errors[1][1], filename) 194 self.assertIsInstance(errors[1][2][1], NotADirectoryError) 195 self.assertIn(errors[1][2][1].filename, possible_args) 196 197 198 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()') 199 @unittest.skipIf(sys.platform[:6] == 'cygwin', 200 "This test can't be run on Cygwin (issue #1071513).") 201 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 202 "This test can't be run reliably as root (issue #1076467).") 203 def test_on_error(self): 204 self.errorState = 0 205 os.mkdir(TESTFN) 206 self.addCleanup(shutil.rmtree, TESTFN) 207 208 self.child_file_path = os.path.join(TESTFN, 'a') 209 self.child_dir_path = os.path.join(TESTFN, 'b') 210 support.create_empty_file(self.child_file_path) 211 os.mkdir(self.child_dir_path) 212 old_dir_mode = os.stat(TESTFN).st_mode 213 old_child_file_mode = os.stat(self.child_file_path).st_mode 214 old_child_dir_mode = os.stat(self.child_dir_path).st_mode 215 # Make unwritable. 216 new_mode = stat.S_IREAD|stat.S_IEXEC 217 os.chmod(self.child_file_path, new_mode) 218 os.chmod(self.child_dir_path, new_mode) 219 os.chmod(TESTFN, new_mode) 220 221 self.addCleanup(os.chmod, TESTFN, old_dir_mode) 222 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) 223 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) 224 225 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) 226 # Test whether onerror has actually been called. 227 self.assertEqual(self.errorState, 3, 228 "Expected call to onerror function did not happen.") 229 230 def check_args_to_onerror(self, func, arg, exc): 231 # test_rmtree_errors deliberately runs rmtree 232 # on a directory that is chmod 500, which will fail. 233 # This function is run when shutil.rmtree fails. 234 # 99.9% of the time it initially fails to remove 235 # a file in the directory, so the first time through 236 # func is os.remove. 237 # However, some Linux machines running ZFS on 238 # FUSE experienced a failure earlier in the process 239 # at os.listdir. The first failure may legally 240 # be either. 241 if self.errorState < 2: 242 if func is os.unlink: 243 self.assertEqual(arg, self.child_file_path) 244 elif func is os.rmdir: 245 self.assertEqual(arg, self.child_dir_path) 246 else: 247 self.assertIs(func, os.listdir) 248 self.assertIn(arg, [TESTFN, self.child_dir_path]) 249 self.assertTrue(issubclass(exc[0], OSError)) 250 self.errorState += 1 251 else: 252 self.assertEqual(func, os.rmdir) 253 self.assertEqual(arg, TESTFN) 254 self.assertTrue(issubclass(exc[0], OSError)) 255 self.errorState = 3 256 257 def test_rmtree_does_not_choke_on_failing_lstat(self): 258 try: 259 orig_lstat = os.lstat 260 def raiser(fn, *args, **kwargs): 261 if fn != TESTFN: 262 raise OSError() 263 else: 264 return orig_lstat(fn) 265 os.lstat = raiser 266 267 os.mkdir(TESTFN) 268 write_file((TESTFN, 'foo'), 'foo') 269 shutil.rmtree(TESTFN) 270 finally: 271 os.lstat = orig_lstat 272 273 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod') 274 @support.skip_unless_symlink 275 def test_copymode_follow_symlinks(self): 276 tmp_dir = self.mkdtemp() 277 src = os.path.join(tmp_dir, 'foo') 278 dst = os.path.join(tmp_dir, 'bar') 279 src_link = os.path.join(tmp_dir, 'baz') 280 dst_link = os.path.join(tmp_dir, 'quux') 281 write_file(src, 'foo') 282 write_file(dst, 'foo') 283 os.symlink(src, src_link) 284 os.symlink(dst, dst_link) 285 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 286 # file to file 287 os.chmod(dst, stat.S_IRWXO) 288 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 289 shutil.copymode(src, dst) 290 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 291 # On Windows, os.chmod does not follow symlinks (issue #15411) 292 if os.name != 'nt': 293 # follow src link 294 os.chmod(dst, stat.S_IRWXO) 295 shutil.copymode(src_link, dst) 296 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 297 # follow dst link 298 os.chmod(dst, stat.S_IRWXO) 299 shutil.copymode(src, dst_link) 300 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 301 # follow both links 302 os.chmod(dst, stat.S_IRWXO) 303 shutil.copymode(src_link, dst_link) 304 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 305 306 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') 307 @support.skip_unless_symlink 308 def test_copymode_symlink_to_symlink(self): 309 tmp_dir = self.mkdtemp() 310 src = os.path.join(tmp_dir, 'foo') 311 dst = os.path.join(tmp_dir, 'bar') 312 src_link = os.path.join(tmp_dir, 'baz') 313 dst_link = os.path.join(tmp_dir, 'quux') 314 write_file(src, 'foo') 315 write_file(dst, 'foo') 316 os.symlink(src, src_link) 317 os.symlink(dst, dst_link) 318 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 319 os.chmod(dst, stat.S_IRWXU) 320 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) 321 # link to link 322 os.lchmod(dst_link, stat.S_IRWXO) 323 shutil.copymode(src_link, dst_link, follow_symlinks=False) 324 self.assertEqual(os.lstat(src_link).st_mode, 325 os.lstat(dst_link).st_mode) 326 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 327 # src link - use chmod 328 os.lchmod(dst_link, stat.S_IRWXO) 329 shutil.copymode(src_link, dst, follow_symlinks=False) 330 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 331 # dst link - use chmod 332 os.lchmod(dst_link, stat.S_IRWXO) 333 shutil.copymode(src, dst_link, follow_symlinks=False) 334 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 335 336 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') 337 @support.skip_unless_symlink 338 def test_copymode_symlink_to_symlink_wo_lchmod(self): 339 tmp_dir = self.mkdtemp() 340 src = os.path.join(tmp_dir, 'foo') 341 dst = os.path.join(tmp_dir, 'bar') 342 src_link = os.path.join(tmp_dir, 'baz') 343 dst_link = os.path.join(tmp_dir, 'quux') 344 write_file(src, 'foo') 345 write_file(dst, 'foo') 346 os.symlink(src, src_link) 347 os.symlink(dst, dst_link) 348 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail 349 350 @support.skip_unless_symlink 351 def test_copystat_symlinks(self): 352 tmp_dir = self.mkdtemp() 353 src = os.path.join(tmp_dir, 'foo') 354 dst = os.path.join(tmp_dir, 'bar') 355 src_link = os.path.join(tmp_dir, 'baz') 356 dst_link = os.path.join(tmp_dir, 'qux') 357 write_file(src, 'foo') 358 src_stat = os.stat(src) 359 os.utime(src, (src_stat.st_atime, 360 src_stat.st_mtime - 42.0)) # ensure different mtimes 361 write_file(dst, 'bar') 362 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) 363 os.symlink(src, src_link) 364 os.symlink(dst, dst_link) 365 if hasattr(os, 'lchmod'): 366 os.lchmod(src_link, stat.S_IRWXO) 367 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 368 os.lchflags(src_link, stat.UF_NODUMP) 369 src_link_stat = os.lstat(src_link) 370 # follow 371 if hasattr(os, 'lchmod'): 372 shutil.copystat(src_link, dst_link, follow_symlinks=True) 373 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) 374 # don't follow 375 shutil.copystat(src_link, dst_link, follow_symlinks=False) 376 dst_link_stat = os.lstat(dst_link) 377 if os.utime in os.supports_follow_symlinks: 378 for attr in 'st_atime', 'st_mtime': 379 # The modification times may be truncated in the new file. 380 self.assertLessEqual(getattr(src_link_stat, attr), 381 getattr(dst_link_stat, attr) + 1) 382 if hasattr(os, 'lchmod'): 383 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) 384 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 385 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) 386 # tell to follow but dst is not a link 387 shutil.copystat(src_link, dst, follow_symlinks=False) 388 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < 389 00000.1) 390 391 @unittest.skipUnless(hasattr(os, 'chflags') and 392 hasattr(errno, 'EOPNOTSUPP') and 393 hasattr(errno, 'ENOTSUP'), 394 "requires os.chflags, EOPNOTSUPP & ENOTSUP") 395 def test_copystat_handles_harmless_chflags_errors(self): 396 tmpdir = self.mkdtemp() 397 file1 = os.path.join(tmpdir, 'file1') 398 file2 = os.path.join(tmpdir, 'file2') 399 write_file(file1, 'xxx') 400 write_file(file2, 'xxx') 401 402 def make_chflags_raiser(err): 403 ex = OSError() 404 405 def _chflags_raiser(path, flags, *, follow_symlinks=True): 406 ex.errno = err 407 raise ex 408 return _chflags_raiser 409 old_chflags = os.chflags 410 try: 411 for err in errno.EOPNOTSUPP, errno.ENOTSUP: 412 os.chflags = make_chflags_raiser(err) 413 shutil.copystat(file1, file2) 414 # assert others errors break it 415 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) 416 self.assertRaises(OSError, shutil.copystat, file1, file2) 417 finally: 418 os.chflags = old_chflags 419 420 @support.skip_unless_xattr 421 def test_copyxattr(self): 422 tmp_dir = self.mkdtemp() 423 src = os.path.join(tmp_dir, 'foo') 424 write_file(src, 'foo') 425 dst = os.path.join(tmp_dir, 'bar') 426 write_file(dst, 'bar') 427 428 # no xattr == no problem 429 shutil._copyxattr(src, dst) 430 # common case 431 os.setxattr(src, 'user.foo', b'42') 432 os.setxattr(src, 'user.bar', b'43') 433 shutil._copyxattr(src, dst) 434 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst))) 435 self.assertEqual( 436 os.getxattr(src, 'user.foo'), 437 os.getxattr(dst, 'user.foo')) 438 # check errors don't affect other attrs 439 os.remove(dst) 440 write_file(dst, 'bar') 441 os_error = OSError(errno.EPERM, 'EPERM') 442 443 def _raise_on_user_foo(fname, attr, val, **kwargs): 444 if attr == 'user.foo': 445 raise os_error 446 else: 447 orig_setxattr(fname, attr, val, **kwargs) 448 try: 449 orig_setxattr = os.setxattr 450 os.setxattr = _raise_on_user_foo 451 shutil._copyxattr(src, dst) 452 self.assertIn('user.bar', os.listxattr(dst)) 453 finally: 454 os.setxattr = orig_setxattr 455 # the source filesystem not supporting xattrs should be ok, too. 456 def _raise_on_src(fname, *, follow_symlinks=True): 457 if fname == src: 458 raise OSError(errno.ENOTSUP, 'Operation not supported') 459 return orig_listxattr(fname, follow_symlinks=follow_symlinks) 460 try: 461 orig_listxattr = os.listxattr 462 os.listxattr = _raise_on_src 463 shutil._copyxattr(src, dst) 464 finally: 465 os.listxattr = orig_listxattr 466 467 # test that shutil.copystat copies xattrs 468 src = os.path.join(tmp_dir, 'the_original') 469 write_file(src, src) 470 os.setxattr(src, 'user.the_value', b'fiddly') 471 dst = os.path.join(tmp_dir, 'the_copy') 472 write_file(dst, dst) 473 shutil.copystat(src, dst) 474 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') 475 476 @support.skip_unless_symlink 477 @support.skip_unless_xattr 478 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0, 479 'root privileges required') 480 def test_copyxattr_symlinks(self): 481 # On Linux, it's only possible to access non-user xattr for symlinks; 482 # which in turn require root privileges. This test should be expanded 483 # as soon as other platforms gain support for extended attributes. 484 tmp_dir = self.mkdtemp() 485 src = os.path.join(tmp_dir, 'foo') 486 src_link = os.path.join(tmp_dir, 'baz') 487 write_file(src, 'foo') 488 os.symlink(src, src_link) 489 os.setxattr(src, 'trusted.foo', b'42') 490 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) 491 dst = os.path.join(tmp_dir, 'bar') 492 dst_link = os.path.join(tmp_dir, 'qux') 493 write_file(dst, 'bar') 494 os.symlink(dst, dst_link) 495 shutil._copyxattr(src_link, dst_link, follow_symlinks=False) 496 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') 497 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') 498 shutil._copyxattr(src_link, dst, follow_symlinks=False) 499 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') 500 501 @support.skip_unless_symlink 502 def test_copy_symlinks(self): 503 tmp_dir = self.mkdtemp() 504 src = os.path.join(tmp_dir, 'foo') 505 dst = os.path.join(tmp_dir, 'bar') 506 src_link = os.path.join(tmp_dir, 'baz') 507 write_file(src, 'foo') 508 os.symlink(src, src_link) 509 if hasattr(os, 'lchmod'): 510 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 511 # don't follow 512 shutil.copy(src_link, dst, follow_symlinks=True) 513 self.assertFalse(os.path.islink(dst)) 514 self.assertEqual(read_file(src), read_file(dst)) 515 os.remove(dst) 516 # follow 517 shutil.copy(src_link, dst, follow_symlinks=False) 518 self.assertTrue(os.path.islink(dst)) 519 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 520 if hasattr(os, 'lchmod'): 521 self.assertEqual(os.lstat(src_link).st_mode, 522 os.lstat(dst).st_mode) 523 524 @support.skip_unless_symlink 525 def test_copy2_symlinks(self): 526 tmp_dir = self.mkdtemp() 527 src = os.path.join(tmp_dir, 'foo') 528 dst = os.path.join(tmp_dir, 'bar') 529 src_link = os.path.join(tmp_dir, 'baz') 530 write_file(src, 'foo') 531 os.symlink(src, src_link) 532 if hasattr(os, 'lchmod'): 533 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 534 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 535 os.lchflags(src_link, stat.UF_NODUMP) 536 src_stat = os.stat(src) 537 src_link_stat = os.lstat(src_link) 538 # follow 539 shutil.copy2(src_link, dst, follow_symlinks=True) 540 self.assertFalse(os.path.islink(dst)) 541 self.assertEqual(read_file(src), read_file(dst)) 542 os.remove(dst) 543 # don't follow 544 shutil.copy2(src_link, dst, follow_symlinks=False) 545 self.assertTrue(os.path.islink(dst)) 546 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 547 dst_stat = os.lstat(dst) 548 if os.utime in os.supports_follow_symlinks: 549 for attr in 'st_atime', 'st_mtime': 550 # The modification times may be truncated in the new file. 551 self.assertLessEqual(getattr(src_link_stat, attr), 552 getattr(dst_stat, attr) + 1) 553 if hasattr(os, 'lchmod'): 554 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) 555 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) 556 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 557 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) 558 559 @support.skip_unless_xattr 560 def test_copy2_xattr(self): 561 tmp_dir = self.mkdtemp() 562 src = os.path.join(tmp_dir, 'foo') 563 dst = os.path.join(tmp_dir, 'bar') 564 write_file(src, 'foo') 565 os.setxattr(src, 'user.foo', b'42') 566 shutil.copy2(src, dst) 567 self.assertEqual( 568 os.getxattr(src, 'user.foo'), 569 os.getxattr(dst, 'user.foo')) 570 os.remove(dst) 571 572 @support.skip_unless_symlink 573 def test_copyfile_symlinks(self): 574 tmp_dir = self.mkdtemp() 575 src = os.path.join(tmp_dir, 'src') 576 dst = os.path.join(tmp_dir, 'dst') 577 dst_link = os.path.join(tmp_dir, 'dst_link') 578 link = os.path.join(tmp_dir, 'link') 579 write_file(src, 'foo') 580 os.symlink(src, link) 581 # don't follow 582 shutil.copyfile(link, dst_link, follow_symlinks=False) 583 self.assertTrue(os.path.islink(dst_link)) 584 self.assertEqual(os.readlink(link), os.readlink(dst_link)) 585 # follow 586 shutil.copyfile(link, dst) 587 self.assertFalse(os.path.islink(dst)) 588 589 def test_rmtree_uses_safe_fd_version_if_available(self): 590 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 591 os.supports_dir_fd and 592 os.listdir in os.supports_fd and 593 os.stat in os.supports_follow_symlinks) 594 if _use_fd_functions: 595 self.assertTrue(shutil._use_fd_functions) 596 self.assertTrue(shutil.rmtree.avoids_symlink_attacks) 597 tmp_dir = self.mkdtemp() 598 d = os.path.join(tmp_dir, 'a') 599 os.mkdir(d) 600 try: 601 real_rmtree = shutil._rmtree_safe_fd 602 class Called(Exception): pass 603 def _raiser(*args, **kwargs): 604 raise Called 605 shutil._rmtree_safe_fd = _raiser 606 self.assertRaises(Called, shutil.rmtree, d) 607 finally: 608 shutil._rmtree_safe_fd = real_rmtree 609 else: 610 self.assertFalse(shutil._use_fd_functions) 611 self.assertFalse(shutil.rmtree.avoids_symlink_attacks) 612 613 def test_rmtree_dont_delete_file(self): 614 # When called on a file instead of a directory, don't delete it. 615 handle, path = tempfile.mkstemp() 616 os.close(handle) 617 self.assertRaises(NotADirectoryError, shutil.rmtree, path) 618 os.remove(path) 619 620 def test_copytree_simple(self): 621 src_dir = tempfile.mkdtemp() 622 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') 623 self.addCleanup(shutil.rmtree, src_dir) 624 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 625 write_file((src_dir, 'test.txt'), '123') 626 os.mkdir(os.path.join(src_dir, 'test_dir')) 627 write_file((src_dir, 'test_dir', 'test.txt'), '456') 628 629 shutil.copytree(src_dir, dst_dir) 630 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) 631 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) 632 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', 633 'test.txt'))) 634 actual = read_file((dst_dir, 'test.txt')) 635 self.assertEqual(actual, '123') 636 actual = read_file((dst_dir, 'test_dir', 'test.txt')) 637 self.assertEqual(actual, '456') 638 639 @support.skip_unless_symlink 640 def test_copytree_symlinks(self): 641 tmp_dir = self.mkdtemp() 642 src_dir = os.path.join(tmp_dir, 'src') 643 dst_dir = os.path.join(tmp_dir, 'dst') 644 sub_dir = os.path.join(src_dir, 'sub') 645 os.mkdir(src_dir) 646 os.mkdir(sub_dir) 647 write_file((src_dir, 'file.txt'), 'foo') 648 src_link = os.path.join(sub_dir, 'link') 649 dst_link = os.path.join(dst_dir, 'sub/link') 650 os.symlink(os.path.join(src_dir, 'file.txt'), 651 src_link) 652 if hasattr(os, 'lchmod'): 653 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 654 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 655 os.lchflags(src_link, stat.UF_NODUMP) 656 src_stat = os.lstat(src_link) 657 shutil.copytree(src_dir, dst_dir, symlinks=True) 658 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link'))) 659 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')), 660 os.path.join(src_dir, 'file.txt')) 661 dst_stat = os.lstat(dst_link) 662 if hasattr(os, 'lchmod'): 663 self.assertEqual(dst_stat.st_mode, src_stat.st_mode) 664 if hasattr(os, 'lchflags'): 665 self.assertEqual(dst_stat.st_flags, src_stat.st_flags) 666 667 def test_copytree_with_exclude(self): 668 # creating data 669 join = os.path.join 670 exists = os.path.exists 671 src_dir = tempfile.mkdtemp() 672 try: 673 dst_dir = join(tempfile.mkdtemp(), 'destination') 674 write_file((src_dir, 'test.txt'), '123') 675 write_file((src_dir, 'test.tmp'), '123') 676 os.mkdir(join(src_dir, 'test_dir')) 677 write_file((src_dir, 'test_dir', 'test.txt'), '456') 678 os.mkdir(join(src_dir, 'test_dir2')) 679 write_file((src_dir, 'test_dir2', 'test.txt'), '456') 680 os.mkdir(join(src_dir, 'test_dir2', 'subdir')) 681 os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) 682 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') 683 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') 684 685 # testing glob-like patterns 686 try: 687 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') 688 shutil.copytree(src_dir, dst_dir, ignore=patterns) 689 # checking the result: some elements should not be copied 690 self.assertTrue(exists(join(dst_dir, 'test.txt'))) 691 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 692 self.assertFalse(exists(join(dst_dir, 'test_dir2'))) 693 finally: 694 shutil.rmtree(dst_dir) 695 try: 696 patterns = shutil.ignore_patterns('*.tmp', 'subdir*') 697 shutil.copytree(src_dir, dst_dir, ignore=patterns) 698 # checking the result: some elements should not be copied 699 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 700 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2'))) 701 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 702 finally: 703 shutil.rmtree(dst_dir) 704 705 # testing callable-style 706 try: 707 def _filter(src, names): 708 res = [] 709 for name in names: 710 path = os.path.join(src, name) 711 712 if (os.path.isdir(path) and 713 path.split()[-1] == 'subdir'): 714 res.append(name) 715 elif os.path.splitext(path)[-1] in ('.py'): 716 res.append(name) 717 return res 718 719 shutil.copytree(src_dir, dst_dir, ignore=_filter) 720 721 # checking the result: some elements should not be copied 722 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2', 723 'test.py'))) 724 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 725 726 finally: 727 shutil.rmtree(dst_dir) 728 finally: 729 shutil.rmtree(src_dir) 730 shutil.rmtree(os.path.dirname(dst_dir)) 731 732 def test_copytree_retains_permissions(self): 733 tmp_dir = tempfile.mkdtemp() 734 src_dir = os.path.join(tmp_dir, 'source') 735 os.mkdir(src_dir) 736 dst_dir = os.path.join(tmp_dir, 'destination') 737 self.addCleanup(shutil.rmtree, tmp_dir) 738 739 os.chmod(src_dir, 0o777) 740 write_file((src_dir, 'permissive.txt'), '123') 741 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777) 742 write_file((src_dir, 'restrictive.txt'), '456') 743 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600) 744 restrictive_subdir = tempfile.mkdtemp(dir=src_dir) 745 os.chmod(restrictive_subdir, 0o600) 746 747 shutil.copytree(src_dir, dst_dir) 748 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode) 749 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode, 750 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode) 751 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode, 752 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode) 753 restrictive_subdir_dst = os.path.join(dst_dir, 754 os.path.split(restrictive_subdir)[1]) 755 self.assertEqual(os.stat(restrictive_subdir).st_mode, 756 os.stat(restrictive_subdir_dst).st_mode) 757 758 @unittest.mock.patch('os.chmod') 759 def test_copytree_winerror(self, mock_patch): 760 # When copying to VFAT, copystat() raises OSError. On Windows, the 761 # exception object has a meaningful 'winerror' attribute, but not 762 # on other operating systems. Do not assume 'winerror' is set. 763 src_dir = tempfile.mkdtemp() 764 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') 765 self.addCleanup(shutil.rmtree, src_dir) 766 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 767 768 mock_patch.side_effect = PermissionError('ka-boom') 769 with self.assertRaises(shutil.Error): 770 shutil.copytree(src_dir, dst_dir) 771 772 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows') 773 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') 774 @unittest.skipIf(android_not_root, "hard links not allowed, non root user") 775 def test_dont_copy_file_onto_link_to_itself(self): 776 # bug 851123. 777 os.mkdir(TESTFN) 778 src = os.path.join(TESTFN, 'cheese') 779 dst = os.path.join(TESTFN, 'shop') 780 try: 781 with open(src, 'w') as f: 782 f.write('cheddar') 783 os.link(src, dst) 784 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 785 with open(src, 'r') as f: 786 self.assertEqual(f.read(), 'cheddar') 787 os.remove(dst) 788 finally: 789 shutil.rmtree(TESTFN, ignore_errors=True) 790 791 @support.skip_unless_symlink 792 def test_dont_copy_file_onto_symlink_to_itself(self): 793 # bug 851123. 794 os.mkdir(TESTFN) 795 src = os.path.join(TESTFN, 'cheese') 796 dst = os.path.join(TESTFN, 'shop') 797 try: 798 with open(src, 'w') as f: 799 f.write('cheddar') 800 # Using `src` here would mean we end up with a symlink pointing 801 # to TESTFN/TESTFN/cheese, while it should point at 802 # TESTFN/cheese. 803 os.symlink('cheese', dst) 804 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 805 with open(src, 'r') as f: 806 self.assertEqual(f.read(), 'cheddar') 807 os.remove(dst) 808 finally: 809 shutil.rmtree(TESTFN, ignore_errors=True) 810 811 @support.skip_unless_symlink 812 def test_rmtree_on_symlink(self): 813 # bug 1669. 814 os.mkdir(TESTFN) 815 try: 816 src = os.path.join(TESTFN, 'cheese') 817 dst = os.path.join(TESTFN, 'shop') 818 os.mkdir(src) 819 os.symlink(src, dst) 820 self.assertRaises(OSError, shutil.rmtree, dst) 821 shutil.rmtree(dst, ignore_errors=True) 822 finally: 823 shutil.rmtree(TESTFN, ignore_errors=True) 824 825 # Issue #3002: copyfile and copytree block indefinitely on named pipes 826 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 827 @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user") 828 def test_copyfile_named_pipe(self): 829 os.mkfifo(TESTFN) 830 try: 831 self.assertRaises(shutil.SpecialFileError, 832 shutil.copyfile, TESTFN, TESTFN2) 833 self.assertRaises(shutil.SpecialFileError, 834 shutil.copyfile, __file__, TESTFN) 835 finally: 836 os.remove(TESTFN) 837 838 @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user") 839 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 840 @support.skip_unless_symlink 841 def test_copytree_named_pipe(self): 842 os.mkdir(TESTFN) 843 try: 844 subdir = os.path.join(TESTFN, "subdir") 845 os.mkdir(subdir) 846 pipe = os.path.join(subdir, "mypipe") 847 os.mkfifo(pipe) 848 try: 849 shutil.copytree(TESTFN, TESTFN2) 850 except shutil.Error as e: 851 errors = e.args[0] 852 self.assertEqual(len(errors), 1) 853 src, dst, error_msg = errors[0] 854 self.assertEqual("`%s` is a named pipe" % pipe, error_msg) 855 else: 856 self.fail("shutil.Error should have been raised") 857 finally: 858 shutil.rmtree(TESTFN, ignore_errors=True) 859 shutil.rmtree(TESTFN2, ignore_errors=True) 860 861 def test_copytree_special_func(self): 862 863 src_dir = self.mkdtemp() 864 dst_dir = os.path.join(self.mkdtemp(), 'destination') 865 write_file((src_dir, 'test.txt'), '123') 866 os.mkdir(os.path.join(src_dir, 'test_dir')) 867 write_file((src_dir, 'test_dir', 'test.txt'), '456') 868 869 copied = [] 870 def _copy(src, dst): 871 copied.append((src, dst)) 872 873 shutil.copytree(src_dir, dst_dir, copy_function=_copy) 874 self.assertEqual(len(copied), 2) 875 876 @support.skip_unless_symlink 877 def test_copytree_dangling_symlinks(self): 878 879 # a dangling symlink raises an error at the end 880 src_dir = self.mkdtemp() 881 dst_dir = os.path.join(self.mkdtemp(), 'destination') 882 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) 883 os.mkdir(os.path.join(src_dir, 'test_dir')) 884 write_file((src_dir, 'test_dir', 'test.txt'), '456') 885 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) 886 887 # a dangling symlink is ignored with the proper flag 888 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 889 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) 890 self.assertNotIn('test.txt', os.listdir(dst_dir)) 891 892 # a dangling symlink is copied if symlinks=True 893 dst_dir = os.path.join(self.mkdtemp(), 'destination3') 894 shutil.copytree(src_dir, dst_dir, symlinks=True) 895 self.assertIn('test.txt', os.listdir(dst_dir)) 896 897 @support.skip_unless_symlink 898 def test_copytree_symlink_dir(self): 899 src_dir = self.mkdtemp() 900 dst_dir = os.path.join(self.mkdtemp(), 'destination') 901 os.mkdir(os.path.join(src_dir, 'real_dir')) 902 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'): 903 pass 904 os.symlink(os.path.join(src_dir, 'real_dir'), 905 os.path.join(src_dir, 'link_to_dir'), 906 target_is_directory=True) 907 908 shutil.copytree(src_dir, dst_dir, symlinks=False) 909 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 910 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 911 912 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 913 shutil.copytree(src_dir, dst_dir, symlinks=True) 914 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 915 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 916 917 def _copy_file(self, method): 918 fname = 'test.txt' 919 tmpdir = self.mkdtemp() 920 write_file((tmpdir, fname), 'xxx') 921 file1 = os.path.join(tmpdir, fname) 922 tmpdir2 = self.mkdtemp() 923 method(file1, tmpdir2) 924 file2 = os.path.join(tmpdir2, fname) 925 return (file1, file2) 926 927 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod') 928 def test_copy(self): 929 # Ensure that the copied file exists and has the same mode bits. 930 file1, file2 = self._copy_file(shutil.copy) 931 self.assertTrue(os.path.exists(file2)) 932 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode) 933 934 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod') 935 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime') 936 def test_copy2(self): 937 # Ensure that the copied file exists and has the same mode and 938 # modification time bits. 939 file1, file2 = self._copy_file(shutil.copy2) 940 self.assertTrue(os.path.exists(file2)) 941 file1_stat = os.stat(file1) 942 file2_stat = os.stat(file2) 943 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode) 944 for attr in 'st_atime', 'st_mtime': 945 # The modification times may be truncated in the new file. 946 self.assertLessEqual(getattr(file1_stat, attr), 947 getattr(file2_stat, attr) + 1) 948 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'): 949 self.assertEqual(getattr(file1_stat, 'st_flags'), 950 getattr(file2_stat, 'st_flags')) 951 952 @support.requires_zlib 953 def test_make_tarball(self): 954 # creating something to tar 955 root_dir, base_dir = self._create_files('') 956 957 tmpdir2 = self.mkdtemp() 958 # force shutil to create the directory 959 os.rmdir(tmpdir2) 960 # working with relative paths 961 work_dir = os.path.dirname(tmpdir2) 962 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 963 964 with support.change_cwd(work_dir): 965 base_name = os.path.abspath(rel_base_name) 966 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.') 967 968 # check if the compressed tarball was created 969 self.assertEqual(tarball, base_name + '.tar.gz') 970 self.assertTrue(os.path.isfile(tarball)) 971 self.assertTrue(tarfile.is_tarfile(tarball)) 972 with tarfile.open(tarball, 'r:gz') as tf: 973 self.assertCountEqual(tf.getnames(), 974 ['.', './sub', './sub2', 975 './file1', './file2', './sub/file3']) 976 977 # trying an uncompressed one 978 with support.change_cwd(work_dir): 979 tarball = make_archive(rel_base_name, 'tar', root_dir, '.') 980 self.assertEqual(tarball, base_name + '.tar') 981 self.assertTrue(os.path.isfile(tarball)) 982 self.assertTrue(tarfile.is_tarfile(tarball)) 983 with tarfile.open(tarball, 'r') as tf: 984 self.assertCountEqual(tf.getnames(), 985 ['.', './sub', './sub2', 986 './file1', './file2', './sub/file3']) 987 988 def _tarinfo(self, path): 989 with tarfile.open(path) as tar: 990 names = tar.getnames() 991 names.sort() 992 return tuple(names) 993 994 def _create_files(self, base_dir='dist'): 995 # creating something to tar 996 root_dir = self.mkdtemp() 997 dist = os.path.join(root_dir, base_dir) 998 os.makedirs(dist, exist_ok=True) 999 write_file((dist, 'file1'), 'xxx') 1000 write_file((dist, 'file2'), 'xxx') 1001 os.mkdir(os.path.join(dist, 'sub')) 1002 write_file((dist, 'sub', 'file3'), 'xxx') 1003 os.mkdir(os.path.join(dist, 'sub2')) 1004 if base_dir: 1005 write_file((root_dir, 'outer'), 'xxx') 1006 return root_dir, base_dir 1007 1008 @support.requires_zlib 1009 @unittest.skipUnless(shutil.which('tar'), 1010 'Need the tar command to run') 1011 def test_tarfile_vs_tar(self): 1012 root_dir, base_dir = self._create_files() 1013 base_name = os.path.join(self.mkdtemp(), 'archive') 1014 tarball = make_archive(base_name, 'gztar', root_dir, base_dir) 1015 1016 # check if the compressed tarball was created 1017 self.assertEqual(tarball, base_name + '.tar.gz') 1018 self.assertTrue(os.path.isfile(tarball)) 1019 1020 # now create another tarball using `tar` 1021 tarball2 = os.path.join(root_dir, 'archive2.tar') 1022 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] 1023 subprocess.check_call(tar_cmd, cwd=root_dir, 1024 stdout=subprocess.DEVNULL) 1025 1026 self.assertTrue(os.path.isfile(tarball2)) 1027 # let's compare both tarballs 1028 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) 1029 1030 # trying an uncompressed one 1031 tarball = make_archive(base_name, 'tar', root_dir, base_dir) 1032 self.assertEqual(tarball, base_name + '.tar') 1033 self.assertTrue(os.path.isfile(tarball)) 1034 1035 # now for a dry_run 1036 tarball = make_archive(base_name, 'tar', root_dir, base_dir, 1037 dry_run=True) 1038 self.assertEqual(tarball, base_name + '.tar') 1039 self.assertTrue(os.path.isfile(tarball)) 1040 1041 @support.requires_zlib 1042 def test_make_zipfile(self): 1043 # creating something to zip 1044 root_dir, base_dir = self._create_files() 1045 1046 tmpdir2 = self.mkdtemp() 1047 # force shutil to create the directory 1048 os.rmdir(tmpdir2) 1049 # working with relative paths 1050 work_dir = os.path.dirname(tmpdir2) 1051 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1052 1053 with support.change_cwd(work_dir): 1054 base_name = os.path.abspath(rel_base_name) 1055 res = make_archive(rel_base_name, 'zip', root_dir) 1056 1057 self.assertEqual(res, base_name + '.zip') 1058 self.assertTrue(os.path.isfile(res)) 1059 self.assertTrue(zipfile.is_zipfile(res)) 1060 with zipfile.ZipFile(res) as zf: 1061 self.assertCountEqual(zf.namelist(), 1062 ['dist/', 'dist/sub/', 'dist/sub2/', 1063 'dist/file1', 'dist/file2', 'dist/sub/file3', 1064 'outer']) 1065 1066 with support.change_cwd(work_dir): 1067 base_name = os.path.abspath(rel_base_name) 1068 res = make_archive(rel_base_name, 'zip', root_dir, base_dir) 1069 1070 self.assertEqual(res, base_name + '.zip') 1071 self.assertTrue(os.path.isfile(res)) 1072 self.assertTrue(zipfile.is_zipfile(res)) 1073 with zipfile.ZipFile(res) as zf: 1074 self.assertCountEqual(zf.namelist(), 1075 ['dist/', 'dist/sub/', 'dist/sub2/', 1076 'dist/file1', 'dist/file2', 'dist/sub/file3']) 1077 1078 @support.requires_zlib 1079 @unittest.skipUnless(shutil.which('zip'), 1080 'Need the zip command to run') 1081 def test_zipfile_vs_zip(self): 1082 root_dir, base_dir = self._create_files() 1083 base_name = os.path.join(self.mkdtemp(), 'archive') 1084 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1085 1086 # check if ZIP file was created 1087 self.assertEqual(archive, base_name + '.zip') 1088 self.assertTrue(os.path.isfile(archive)) 1089 1090 # now create another ZIP file using `zip` 1091 archive2 = os.path.join(root_dir, 'archive2.zip') 1092 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] 1093 subprocess.check_call(zip_cmd, cwd=root_dir, 1094 stdout=subprocess.DEVNULL) 1095 1096 self.assertTrue(os.path.isfile(archive2)) 1097 # let's compare both ZIP files 1098 with zipfile.ZipFile(archive) as zf: 1099 names = zf.namelist() 1100 with zipfile.ZipFile(archive2) as zf: 1101 names2 = zf.namelist() 1102 self.assertEqual(sorted(names), sorted(names2)) 1103 1104 @support.requires_zlib 1105 @unittest.skipUnless(shutil.which('unzip'), 1106 'Need the unzip command to run') 1107 def test_unzip_zipfile(self): 1108 root_dir, base_dir = self._create_files() 1109 base_name = os.path.join(self.mkdtemp(), 'archive') 1110 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1111 1112 # check if ZIP file was created 1113 self.assertEqual(archive, base_name + '.zip') 1114 self.assertTrue(os.path.isfile(archive)) 1115 1116 # now check the ZIP file using `unzip -t` 1117 zip_cmd = ['unzip', '-t', archive] 1118 with support.change_cwd(root_dir): 1119 try: 1120 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT) 1121 except subprocess.CalledProcessError as exc: 1122 details = exc.output.decode(errors="replace") 1123 msg = "{}\n\n**Unzip Output**\n{}" 1124 self.fail(msg.format(exc, details)) 1125 1126 def test_make_archive(self): 1127 tmpdir = self.mkdtemp() 1128 base_name = os.path.join(tmpdir, 'archive') 1129 self.assertRaises(ValueError, make_archive, base_name, 'xxx') 1130 1131 @support.requires_zlib 1132 def test_make_archive_owner_group(self): 1133 # testing make_archive with owner and group, with various combinations 1134 # this works even if there's not gid/uid support 1135 if UID_GID_SUPPORT: 1136 group = grp.getgrgid(0)[0] 1137 owner = pwd.getpwuid(0)[0] 1138 else: 1139 group = owner = 'root' 1140 1141 root_dir, base_dir = self._create_files() 1142 base_name = os.path.join(self.mkdtemp(), 'archive') 1143 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, 1144 group=group) 1145 self.assertTrue(os.path.isfile(res)) 1146 1147 res = make_archive(base_name, 'zip', root_dir, base_dir) 1148 self.assertTrue(os.path.isfile(res)) 1149 1150 res = make_archive(base_name, 'tar', root_dir, base_dir, 1151 owner=owner, group=group) 1152 self.assertTrue(os.path.isfile(res)) 1153 1154 res = make_archive(base_name, 'tar', root_dir, base_dir, 1155 owner='kjhkjhkjg', group='oihohoh') 1156 self.assertTrue(os.path.isfile(res)) 1157 1158 1159 @support.requires_zlib 1160 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1161 def test_tarfile_root_owner(self): 1162 root_dir, base_dir = self._create_files() 1163 base_name = os.path.join(self.mkdtemp(), 'archive') 1164 group = grp.getgrgid(0)[0] 1165 owner = pwd.getpwuid(0)[0] 1166 with support.change_cwd(root_dir): 1167 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', 1168 owner=owner, group=group) 1169 1170 # check if the compressed tarball was created 1171 self.assertTrue(os.path.isfile(archive_name)) 1172 1173 # now checks the rights 1174 archive = tarfile.open(archive_name) 1175 try: 1176 for member in archive.getmembers(): 1177 self.assertEqual(member.uid, 0) 1178 self.assertEqual(member.gid, 0) 1179 finally: 1180 archive.close() 1181 1182 def test_make_archive_cwd(self): 1183 current_dir = os.getcwd() 1184 def _breaks(*args, **kw): 1185 raise RuntimeError() 1186 1187 register_archive_format('xxx', _breaks, [], 'xxx file') 1188 try: 1189 try: 1190 make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) 1191 except Exception: 1192 pass 1193 self.assertEqual(os.getcwd(), current_dir) 1194 finally: 1195 unregister_archive_format('xxx') 1196 1197 def test_make_tarfile_in_curdir(self): 1198 # Issue #21280 1199 root_dir = self.mkdtemp() 1200 with support.change_cwd(root_dir): 1201 self.assertEqual(make_archive('test', 'tar'), 'test.tar') 1202 self.assertTrue(os.path.isfile('test.tar')) 1203 1204 @support.requires_zlib 1205 def test_make_zipfile_in_curdir(self): 1206 # Issue #21280 1207 root_dir = self.mkdtemp() 1208 with support.change_cwd(root_dir): 1209 self.assertEqual(make_archive('test', 'zip'), 'test.zip') 1210 self.assertTrue(os.path.isfile('test.zip')) 1211 1212 def test_register_archive_format(self): 1213 1214 self.assertRaises(TypeError, register_archive_format, 'xxx', 1) 1215 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1216 1) 1217 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1218 [(1, 2), (1, 2, 3)]) 1219 1220 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') 1221 formats = [name for name, params in get_archive_formats()] 1222 self.assertIn('xxx', formats) 1223 1224 unregister_archive_format('xxx') 1225 formats = [name for name, params in get_archive_formats()] 1226 self.assertNotIn('xxx', formats) 1227 1228 def check_unpack_archive(self, format): 1229 root_dir, base_dir = self._create_files() 1230 expected = rlistdir(root_dir) 1231 expected.remove('outer') 1232 1233 base_name = os.path.join(self.mkdtemp(), 'archive') 1234 filename = make_archive(base_name, format, root_dir, base_dir) 1235 1236 # let's try to unpack it now 1237 tmpdir2 = self.mkdtemp() 1238 unpack_archive(filename, tmpdir2) 1239 self.assertEqual(rlistdir(tmpdir2), expected) 1240 1241 # and again, this time with the format specified 1242 tmpdir3 = self.mkdtemp() 1243 unpack_archive(filename, tmpdir3, format=format) 1244 self.assertEqual(rlistdir(tmpdir3), expected) 1245 1246 self.assertRaises(shutil.ReadError, unpack_archive, TESTFN) 1247 self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx') 1248 1249 def test_unpack_archive_tar(self): 1250 self.check_unpack_archive('tar') 1251 1252 @support.requires_zlib 1253 def test_unpack_archive_gztar(self): 1254 self.check_unpack_archive('gztar') 1255 1256 @support.requires_bz2 1257 def test_unpack_archive_bztar(self): 1258 self.check_unpack_archive('bztar') 1259 1260 @support.requires_lzma 1261 def test_unpack_archive_xztar(self): 1262 self.check_unpack_archive('xztar') 1263 1264 @support.requires_zlib 1265 def test_unpack_archive_zip(self): 1266 self.check_unpack_archive('zip') 1267 1268 def test_unpack_registry(self): 1269 1270 formats = get_unpack_formats() 1271 1272 def _boo(filename, extract_dir, extra): 1273 self.assertEqual(extra, 1) 1274 self.assertEqual(filename, 'stuff.boo') 1275 self.assertEqual(extract_dir, 'xx') 1276 1277 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) 1278 unpack_archive('stuff.boo', 'xx') 1279 1280 # trying to register a .boo unpacker again 1281 self.assertRaises(RegistryError, register_unpack_format, 'Boo2', 1282 ['.boo'], _boo) 1283 1284 # should work now 1285 unregister_unpack_format('Boo') 1286 register_unpack_format('Boo2', ['.boo'], _boo) 1287 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) 1288 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) 1289 1290 # let's leave a clean state 1291 unregister_unpack_format('Boo2') 1292 self.assertEqual(get_unpack_formats(), formats) 1293 1294 @unittest.skipUnless(hasattr(shutil, 'disk_usage'), 1295 "disk_usage not available on this platform") 1296 def test_disk_usage(self): 1297 usage = shutil.disk_usage(os.getcwd()) 1298 self.assertGreater(usage.total, 0) 1299 self.assertGreater(usage.used, 0) 1300 self.assertGreaterEqual(usage.free, 0) 1301 self.assertGreaterEqual(usage.total, usage.used) 1302 self.assertGreater(usage.total, usage.free) 1303 1304 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1305 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown') 1306 def test_chown(self): 1307 1308 # cleaned-up automatically by TestShutil.tearDown method 1309 dirname = self.mkdtemp() 1310 filename = tempfile.mktemp(dir=dirname) 1311 write_file(filename, 'testing chown function') 1312 1313 with self.assertRaises(ValueError): 1314 shutil.chown(filename) 1315 1316 with self.assertRaises(LookupError): 1317 shutil.chown(filename, user='non-existing username') 1318 1319 with self.assertRaises(LookupError): 1320 shutil.chown(filename, group='non-existing groupname') 1321 1322 with self.assertRaises(TypeError): 1323 shutil.chown(filename, b'spam') 1324 1325 with self.assertRaises(TypeError): 1326 shutil.chown(filename, 3.14) 1327 1328 uid = os.getuid() 1329 gid = os.getgid() 1330 1331 def check_chown(path, uid=None, gid=None): 1332 s = os.stat(filename) 1333 if uid is not None: 1334 self.assertEqual(uid, s.st_uid) 1335 if gid is not None: 1336 self.assertEqual(gid, s.st_gid) 1337 1338 shutil.chown(filename, uid, gid) 1339 check_chown(filename, uid, gid) 1340 shutil.chown(filename, uid) 1341 check_chown(filename, uid) 1342 shutil.chown(filename, user=uid) 1343 check_chown(filename, uid) 1344 shutil.chown(filename, group=gid) 1345 check_chown(filename, gid=gid) 1346 1347 shutil.chown(dirname, uid, gid) 1348 check_chown(dirname, uid, gid) 1349 shutil.chown(dirname, uid) 1350 check_chown(dirname, uid) 1351 shutil.chown(dirname, user=uid) 1352 check_chown(dirname, uid) 1353 shutil.chown(dirname, group=gid) 1354 check_chown(dirname, gid=gid) 1355 1356 user = pwd.getpwuid(uid)[0] 1357 group = grp.getgrgid(gid)[0] 1358 shutil.chown(filename, user, group) 1359 check_chown(filename, uid, gid) 1360 shutil.chown(dirname, user, group) 1361 check_chown(dirname, uid, gid) 1362 1363 def test_copy_return_value(self): 1364 # copy and copy2 both return their destination path. 1365 for fn in (shutil.copy, shutil.copy2): 1366 src_dir = self.mkdtemp() 1367 dst_dir = self.mkdtemp() 1368 src = os.path.join(src_dir, 'foo') 1369 write_file(src, 'foo') 1370 rv = fn(src, dst_dir) 1371 self.assertEqual(rv, os.path.join(dst_dir, 'foo')) 1372 rv = fn(src, os.path.join(dst_dir, 'bar')) 1373 self.assertEqual(rv, os.path.join(dst_dir, 'bar')) 1374 1375 def test_copyfile_return_value(self): 1376 # copytree returns its destination path. 1377 src_dir = self.mkdtemp() 1378 dst_dir = self.mkdtemp() 1379 dst_file = os.path.join(dst_dir, 'bar') 1380 src_file = os.path.join(src_dir, 'foo') 1381 write_file(src_file, 'foo') 1382 rv = shutil.copyfile(src_file, dst_file) 1383 self.assertTrue(os.path.exists(rv)) 1384 self.assertEqual(read_file(src_file), read_file(dst_file)) 1385 1386 def test_copyfile_same_file(self): 1387 # copyfile() should raise SameFileError if the source and destination 1388 # are the same. 1389 src_dir = self.mkdtemp() 1390 src_file = os.path.join(src_dir, 'foo') 1391 write_file(src_file, 'foo') 1392 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) 1393 # But Error should work too, to stay backward compatible. 1394 self.assertRaises(Error, shutil.copyfile, src_file, src_file) 1395 1396 def test_copytree_return_value(self): 1397 # copytree returns its destination path. 1398 src_dir = self.mkdtemp() 1399 dst_dir = src_dir + "dest" 1400 self.addCleanup(shutil.rmtree, dst_dir, True) 1401 src = os.path.join(src_dir, 'foo') 1402 write_file(src, 'foo') 1403 rv = shutil.copytree(src_dir, dst_dir) 1404 self.assertEqual(['foo'], os.listdir(rv)) 1405 1406 1407 class TestWhich(unittest.TestCase): 1408 1409 def setUp(self): 1410 self.temp_dir = tempfile.mkdtemp(prefix="Tmp") 1411 self.addCleanup(shutil.rmtree, self.temp_dir, True) 1412 # Give the temp_file an ".exe" suffix for all. 1413 # It's needed on Windows and not harmful on other platforms. 1414 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1415 prefix="Tmp", 1416 suffix=".Exe") 1417 os.chmod(self.temp_file.name, stat.S_IXUSR) 1418 self.addCleanup(self.temp_file.close) 1419 self.dir, self.file = os.path.split(self.temp_file.name) 1420 1421 def test_basic(self): 1422 # Given an EXE in a directory, it should be returned. 1423 rv = shutil.which(self.file, path=self.dir) 1424 self.assertEqual(rv, self.temp_file.name) 1425 1426 def test_absolute_cmd(self): 1427 # When given the fully qualified path to an executable that exists, 1428 # it should be returned. 1429 rv = shutil.which(self.temp_file.name, path=self.temp_dir) 1430 self.assertEqual(rv, self.temp_file.name) 1431 1432 def test_relative_cmd(self): 1433 # When given the relative path with a directory part to an executable 1434 # that exists, it should be returned. 1435 base_dir, tail_dir = os.path.split(self.dir) 1436 relpath = os.path.join(tail_dir, self.file) 1437 with support.change_cwd(path=base_dir): 1438 rv = shutil.which(relpath, path=self.temp_dir) 1439 self.assertEqual(rv, relpath) 1440 # But it shouldn't be searched in PATH directories (issue #16957). 1441 with support.change_cwd(path=self.dir): 1442 rv = shutil.which(relpath, path=base_dir) 1443 self.assertIsNone(rv) 1444 1445 def test_cwd(self): 1446 # Issue #16957 1447 base_dir = os.path.dirname(self.dir) 1448 with support.change_cwd(path=self.dir): 1449 rv = shutil.which(self.file, path=base_dir) 1450 if sys.platform == "win32": 1451 # Windows: current directory implicitly on PATH 1452 self.assertEqual(rv, os.path.join(os.curdir, self.file)) 1453 else: 1454 # Other platforms: shouldn't match in the current directory. 1455 self.assertIsNone(rv) 1456 1457 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 1458 'non-root user required') 1459 def test_non_matching_mode(self): 1460 # Set the file read-only and ask for writeable files. 1461 os.chmod(self.temp_file.name, stat.S_IREAD) 1462 if os.access(self.temp_file.name, os.W_OK): 1463 self.skipTest("can't set the file read-only") 1464 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK) 1465 self.assertIsNone(rv) 1466 1467 def test_relative_path(self): 1468 base_dir, tail_dir = os.path.split(self.dir) 1469 with support.change_cwd(path=base_dir): 1470 rv = shutil.which(self.file, path=tail_dir) 1471 self.assertEqual(rv, os.path.join(tail_dir, self.file)) 1472 1473 def test_nonexistent_file(self): 1474 # Return None when no matching executable file is found on the path. 1475 rv = shutil.which("foo.exe", path=self.dir) 1476 self.assertIsNone(rv) 1477 1478 @unittest.skipUnless(sys.platform == "win32", 1479 "pathext check is Windows-only") 1480 def test_pathext_checking(self): 1481 # Ask for the file without the ".exe" extension, then ensure that 1482 # it gets found properly with the extension. 1483 rv = shutil.which(self.file[:-4], path=self.dir) 1484 self.assertEqual(rv, self.temp_file.name[:-4] + ".EXE") 1485 1486 def test_environ_path(self): 1487 with support.EnvironmentVarGuard() as env: 1488 env['PATH'] = self.dir 1489 rv = shutil.which(self.file) 1490 self.assertEqual(rv, self.temp_file.name) 1491 1492 def test_empty_path(self): 1493 base_dir = os.path.dirname(self.dir) 1494 with support.change_cwd(path=self.dir), \ 1495 support.EnvironmentVarGuard() as env: 1496 env['PATH'] = self.dir 1497 rv = shutil.which(self.file, path='') 1498 self.assertIsNone(rv) 1499 1500 def test_empty_path_no_PATH(self): 1501 with support.EnvironmentVarGuard() as env: 1502 env.pop('PATH', None) 1503 rv = shutil.which(self.file) 1504 self.assertIsNone(rv) 1505 1506 1507 class TestMove(unittest.TestCase): 1508 1509 def setUp(self): 1510 filename = "foo" 1511 self.src_dir = tempfile.mkdtemp() 1512 self.dst_dir = tempfile.mkdtemp() 1513 self.src_file = os.path.join(self.src_dir, filename) 1514 self.dst_file = os.path.join(self.dst_dir, filename) 1515 with open(self.src_file, "wb") as f: 1516 f.write(b"spam") 1517 1518 def tearDown(self): 1519 for d in (self.src_dir, self.dst_dir): 1520 try: 1521 if d: 1522 shutil.rmtree(d) 1523 except: 1524 pass 1525 1526 def _check_move_file(self, src, dst, real_dst): 1527 with open(src, "rb") as f: 1528 contents = f.read() 1529 shutil.move(src, dst) 1530 with open(real_dst, "rb") as f: 1531 self.assertEqual(contents, f.read()) 1532 self.assertFalse(os.path.exists(src)) 1533 1534 def _check_move_dir(self, src, dst, real_dst): 1535 contents = sorted(os.listdir(src)) 1536 shutil.move(src, dst) 1537 self.assertEqual(contents, sorted(os.listdir(real_dst))) 1538 self.assertFalse(os.path.exists(src)) 1539 1540 def test_move_file(self): 1541 # Move a file to another location on the same filesystem. 1542 self._check_move_file(self.src_file, self.dst_file, self.dst_file) 1543 1544 def test_move_file_to_dir(self): 1545 # Move a file inside an existing dir on the same filesystem. 1546 self._check_move_file(self.src_file, self.dst_dir, self.dst_file) 1547 1548 @mock_rename 1549 def test_move_file_other_fs(self): 1550 # Move a file to an existing dir on another filesystem. 1551 self.test_move_file() 1552 1553 @mock_rename 1554 def test_move_file_to_dir_other_fs(self): 1555 # Move a file to another location on another filesystem. 1556 self.test_move_file_to_dir() 1557 1558 def test_move_dir(self): 1559 # Move a dir to another location on the same filesystem. 1560 dst_dir = tempfile.mktemp() 1561 try: 1562 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 1563 finally: 1564 try: 1565 shutil.rmtree(dst_dir) 1566 except: 1567 pass 1568 1569 @mock_rename 1570 def test_move_dir_other_fs(self): 1571 # Move a dir to another location on another filesystem. 1572 self.test_move_dir() 1573 1574 def test_move_dir_to_dir(self): 1575 # Move a dir inside an existing dir on the same filesystem. 1576 self._check_move_dir(self.src_dir, self.dst_dir, 1577 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1578 1579 @mock_rename 1580 def test_move_dir_to_dir_other_fs(self): 1581 # Move a dir inside an existing dir on another filesystem. 1582 self.test_move_dir_to_dir() 1583 1584 def test_move_dir_sep_to_dir(self): 1585 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir, 1586 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1587 1588 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep') 1589 def test_move_dir_altsep_to_dir(self): 1590 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir, 1591 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1592 1593 def test_existing_file_inside_dest_dir(self): 1594 # A file with the same name inside the destination dir already exists. 1595 with open(self.dst_file, "wb"): 1596 pass 1597 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) 1598 1599 def test_dont_move_dir_in_itself(self): 1600 # Moving a dir inside itself raises an Error. 1601 dst = os.path.join(self.src_dir, "bar") 1602 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) 1603 1604 def test_destinsrc_false_negative(self): 1605 os.mkdir(TESTFN) 1606 try: 1607 for src, dst in [('srcdir', 'srcdir/dest')]: 1608 src = os.path.join(TESTFN, src) 1609 dst = os.path.join(TESTFN, dst) 1610 self.assertTrue(shutil._destinsrc(src, dst), 1611 msg='_destinsrc() wrongly concluded that ' 1612 'dst (%s) is not in src (%s)' % (dst, src)) 1613 finally: 1614 shutil.rmtree(TESTFN, ignore_errors=True) 1615 1616 def test_destinsrc_false_positive(self): 1617 os.mkdir(TESTFN) 1618 try: 1619 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: 1620 src = os.path.join(TESTFN, src) 1621 dst = os.path.join(TESTFN, dst) 1622 self.assertFalse(shutil._destinsrc(src, dst), 1623 msg='_destinsrc() wrongly concluded that ' 1624 'dst (%s) is in src (%s)' % (dst, src)) 1625 finally: 1626 shutil.rmtree(TESTFN, ignore_errors=True) 1627 1628 @support.skip_unless_symlink 1629 @mock_rename 1630 def test_move_file_symlink(self): 1631 dst = os.path.join(self.src_dir, 'bar') 1632 os.symlink(self.src_file, dst) 1633 shutil.move(dst, self.dst_file) 1634 self.assertTrue(os.path.islink(self.dst_file)) 1635 self.assertTrue(os.path.samefile(self.src_file, self.dst_file)) 1636 1637 @support.skip_unless_symlink 1638 @mock_rename 1639 def test_move_file_symlink_to_dir(self): 1640 filename = "bar" 1641 dst = os.path.join(self.src_dir, filename) 1642 os.symlink(self.src_file, dst) 1643 shutil.move(dst, self.dst_dir) 1644 final_link = os.path.join(self.dst_dir, filename) 1645 self.assertTrue(os.path.islink(final_link)) 1646 self.assertTrue(os.path.samefile(self.src_file, final_link)) 1647 1648 @support.skip_unless_symlink 1649 @mock_rename 1650 def test_move_dangling_symlink(self): 1651 src = os.path.join(self.src_dir, 'baz') 1652 dst = os.path.join(self.src_dir, 'bar') 1653 os.symlink(src, dst) 1654 dst_link = os.path.join(self.dst_dir, 'quux') 1655 shutil.move(dst, dst_link) 1656 self.assertTrue(os.path.islink(dst_link)) 1657 # On Windows, os.path.realpath does not follow symlinks (issue #9949) 1658 if os.name == 'nt': 1659 self.assertEqual(os.path.realpath(src), os.readlink(dst_link)) 1660 else: 1661 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link)) 1662 1663 @support.skip_unless_symlink 1664 @mock_rename 1665 def test_move_dir_symlink(self): 1666 src = os.path.join(self.src_dir, 'baz') 1667 dst = os.path.join(self.src_dir, 'bar') 1668 os.mkdir(src) 1669 os.symlink(src, dst) 1670 dst_link = os.path.join(self.dst_dir, 'quux') 1671 shutil.move(dst, dst_link) 1672 self.assertTrue(os.path.islink(dst_link)) 1673 self.assertTrue(os.path.samefile(src, dst_link)) 1674 1675 def test_move_return_value(self): 1676 rv = shutil.move(self.src_file, self.dst_dir) 1677 self.assertEqual(rv, 1678 os.path.join(self.dst_dir, os.path.basename(self.src_file))) 1679 1680 def test_move_as_rename_return_value(self): 1681 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar')) 1682 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar')) 1683 1684 @mock_rename 1685 def test_move_file_special_function(self): 1686 moved = [] 1687 def _copy(src, dst): 1688 moved.append((src, dst)) 1689 shutil.move(self.src_file, self.dst_dir, copy_function=_copy) 1690 self.assertEqual(len(moved), 1) 1691 1692 @mock_rename 1693 def test_move_dir_special_function(self): 1694 moved = [] 1695 def _copy(src, dst): 1696 moved.append((src, dst)) 1697 support.create_empty_file(os.path.join(self.src_dir, 'child')) 1698 support.create_empty_file(os.path.join(self.src_dir, 'child1')) 1699 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy) 1700 self.assertEqual(len(moved), 3) 1701 1702 1703 class TestCopyFile(unittest.TestCase): 1704 1705 _delete = False 1706 1707 class Faux(object): 1708 _entered = False 1709 _exited_with = None 1710 _raised = False 1711 def __init__(self, raise_in_exit=False, suppress_at_exit=True): 1712 self._raise_in_exit = raise_in_exit 1713 self._suppress_at_exit = suppress_at_exit 1714 def read(self, *args): 1715 return '' 1716 def __enter__(self): 1717 self._entered = True 1718 def __exit__(self, exc_type, exc_val, exc_tb): 1719 self._exited_with = exc_type, exc_val, exc_tb 1720 if self._raise_in_exit: 1721 self._raised = True 1722 raise OSError("Cannot close") 1723 return self._suppress_at_exit 1724 1725 def tearDown(self): 1726 if self._delete: 1727 del shutil.open 1728 1729 def _set_shutil_open(self, func): 1730 shutil.open = func 1731 self._delete = True 1732 1733 def test_w_source_open_fails(self): 1734 def _open(filename, mode='r'): 1735 if filename == 'srcfile': 1736 raise OSError('Cannot open "srcfile"') 1737 assert 0 # shouldn't reach here. 1738 1739 self._set_shutil_open(_open) 1740 1741 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile') 1742 1743 def test_w_dest_open_fails(self): 1744 1745 srcfile = self.Faux() 1746 1747 def _open(filename, mode='r'): 1748 if filename == 'srcfile': 1749 return srcfile 1750 if filename == 'destfile': 1751 raise OSError('Cannot open "destfile"') 1752 assert 0 # shouldn't reach here. 1753 1754 self._set_shutil_open(_open) 1755 1756 shutil.copyfile('srcfile', 'destfile') 1757 self.assertTrue(srcfile._entered) 1758 self.assertTrue(srcfile._exited_with[0] is OSError) 1759 self.assertEqual(srcfile._exited_with[1].args, 1760 ('Cannot open "destfile"',)) 1761 1762 def test_w_dest_close_fails(self): 1763 1764 srcfile = self.Faux() 1765 destfile = self.Faux(True) 1766 1767 def _open(filename, mode='r'): 1768 if filename == 'srcfile': 1769 return srcfile 1770 if filename == 'destfile': 1771 return destfile 1772 assert 0 # shouldn't reach here. 1773 1774 self._set_shutil_open(_open) 1775 1776 shutil.copyfile('srcfile', 'destfile') 1777 self.assertTrue(srcfile._entered) 1778 self.assertTrue(destfile._entered) 1779 self.assertTrue(destfile._raised) 1780 self.assertTrue(srcfile._exited_with[0] is OSError) 1781 self.assertEqual(srcfile._exited_with[1].args, 1782 ('Cannot close',)) 1783 1784 def test_w_source_close_fails(self): 1785 1786 srcfile = self.Faux(True) 1787 destfile = self.Faux() 1788 1789 def _open(filename, mode='r'): 1790 if filename == 'srcfile': 1791 return srcfile 1792 if filename == 'destfile': 1793 return destfile 1794 assert 0 # shouldn't reach here. 1795 1796 self._set_shutil_open(_open) 1797 1798 self.assertRaises(OSError, 1799 shutil.copyfile, 'srcfile', 'destfile') 1800 self.assertTrue(srcfile._entered) 1801 self.assertTrue(destfile._entered) 1802 self.assertFalse(destfile._raised) 1803 self.assertTrue(srcfile._exited_with[0] is None) 1804 self.assertTrue(srcfile._raised) 1805 1806 def test_move_dir_caseinsensitive(self): 1807 # Renames a folder to the same name 1808 # but a different case. 1809 1810 self.src_dir = tempfile.mkdtemp() 1811 self.addCleanup(shutil.rmtree, self.src_dir, True) 1812 dst_dir = os.path.join( 1813 os.path.dirname(self.src_dir), 1814 os.path.basename(self.src_dir).upper()) 1815 self.assertNotEqual(self.src_dir, dst_dir) 1816 1817 try: 1818 shutil.move(self.src_dir, dst_dir) 1819 self.assertTrue(os.path.isdir(dst_dir)) 1820 finally: 1821 os.rmdir(dst_dir) 1822 1823 class TermsizeTests(unittest.TestCase): 1824 def test_does_not_crash(self): 1825 """Check if get_terminal_size() returns a meaningful value. 1826 1827 There's no easy portable way to actually check the size of the 1828 terminal, so let's check if it returns something sensible instead. 1829 """ 1830 size = shutil.get_terminal_size() 1831 self.assertGreaterEqual(size.columns, 0) 1832 self.assertGreaterEqual(size.lines, 0) 1833 1834 def test_os_environ_first(self): 1835 "Check if environment variables have precedence" 1836 1837 with support.EnvironmentVarGuard() as env: 1838 env['COLUMNS'] = '777' 1839 del env['LINES'] 1840 size = shutil.get_terminal_size() 1841 self.assertEqual(size.columns, 777) 1842 1843 with support.EnvironmentVarGuard() as env: 1844 del env['COLUMNS'] 1845 env['LINES'] = '888' 1846 size = shutil.get_terminal_size() 1847 self.assertEqual(size.lines, 888) 1848 1849 def test_bad_environ(self): 1850 with support.EnvironmentVarGuard() as env: 1851 env['COLUMNS'] = 'xxx' 1852 env['LINES'] = 'yyy' 1853 size = shutil.get_terminal_size() 1854 self.assertGreaterEqual(size.columns, 0) 1855 self.assertGreaterEqual(size.lines, 0) 1856 1857 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty") 1858 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), 1859 'need os.get_terminal_size()') 1860 def test_stty_match(self): 1861 """Check if stty returns the same results ignoring env 1862 1863 This test will fail if stdin and stdout are connected to 1864 different terminals with different sizes. Nevertheless, such 1865 situations should be pretty rare. 1866 """ 1867 try: 1868 size = subprocess.check_output(['stty', 'size']).decode().split() 1869 except (FileNotFoundError, PermissionError, 1870 subprocess.CalledProcessError): 1871 self.skipTest("stty invocation failed") 1872 expected = (int(size[1]), int(size[0])) # reversed order 1873 1874 with support.EnvironmentVarGuard() as env: 1875 del env['LINES'] 1876 del env['COLUMNS'] 1877 actual = shutil.get_terminal_size() 1878 1879 self.assertEqual(expected, actual) 1880 1881 def test_fallback(self): 1882 with support.EnvironmentVarGuard() as env: 1883 del env['LINES'] 1884 del env['COLUMNS'] 1885 1886 # sys.__stdout__ has no fileno() 1887 with support.swap_attr(sys, '__stdout__', None): 1888 size = shutil.get_terminal_size(fallback=(10, 20)) 1889 self.assertEqual(size.columns, 10) 1890 self.assertEqual(size.lines, 20) 1891 1892 # sys.__stdout__ is not a terminal on Unix 1893 # or fileno() not in (0, 1, 2) on Windows 1894 with open(os.devnull, 'w') as f, \ 1895 support.swap_attr(sys, '__stdout__', f): 1896 size = shutil.get_terminal_size(fallback=(30, 40)) 1897 self.assertEqual(size.columns, 30) 1898 self.assertEqual(size.lines, 40) 1899 1900 1901 class PublicAPITests(unittest.TestCase): 1902 """Ensures that the correct values are exposed in the public API.""" 1903 1904 def test_module_all_attribute(self): 1905 self.assertTrue(hasattr(shutil, '__all__')) 1906 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat', 1907 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error', 1908 'SpecialFileError', 'ExecError', 'make_archive', 1909 'get_archive_formats', 'register_archive_format', 1910 'unregister_archive_format', 'get_unpack_formats', 1911 'register_unpack_format', 'unregister_unpack_format', 1912 'unpack_archive', 'ignore_patterns', 'chown', 'which', 1913 'get_terminal_size', 'SameFileError'] 1914 if hasattr(os, 'statvfs') or os.name == 'nt': 1915 target_api.append('disk_usage') 1916 self.assertEqual(set(shutil.__all__), set(target_api)) 1917 1918 1919 if __name__ == '__main__': 1920 unittest.main() 1921