1 # As a test suite for the os module, this is woefully inadequate, but this 2 # does add tests for a few functions which have been determined to be more 3 # portable than they had been thought to be. 4 5 import asynchat 6 import asyncore 7 import codecs 8 import contextlib 9 import decimal 10 import errno 11 import fractions 12 import getpass 13 import itertools 14 import locale 15 import mmap 16 import os 17 import pickle 18 import re 19 import shutil 20 import signal 21 import socket 22 import stat 23 import subprocess 24 import sys 25 import sysconfig 26 import time 27 import unittest 28 import uuid 29 import warnings 30 from test import support 31 try: 32 import threading 33 except ImportError: 34 threading = None 35 try: 36 import resource 37 except ImportError: 38 resource = None 39 try: 40 import fcntl 41 except ImportError: 42 fcntl = None 43 try: 44 import _winapi 45 except ImportError: 46 _winapi = None 47 try: 48 import grp 49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem] 50 if hasattr(os, 'getgid'): 51 process_gid = os.getgid() 52 if process_gid not in groups: 53 groups.append(process_gid) 54 except ImportError: 55 groups = [] 56 try: 57 import pwd 58 all_users = [u.pw_uid for u in pwd.getpwall()] 59 except (ImportError, AttributeError): 60 all_users = [] 61 try: 62 from _testcapi import INT_MAX, PY_SSIZE_T_MAX 63 except ImportError: 64 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize 65 66 from test.support.script_helper import assert_python_ok 67 from test.support import unix_shell 68 69 70 root_in_posix = False 71 if hasattr(os, 'geteuid'): 72 root_in_posix = (os.geteuid() == 0) 73 74 # Detect whether we're on a Linux system that uses the (now outdated 75 # and unmaintained) linuxthreads threading library. There's an issue 76 # when combining linuxthreads with a failed execv call: see 77 # http://bugs.python.org/issue4970. 78 if hasattr(sys, 'thread_info') and sys.thread_info.version: 79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 80 else: 81 USING_LINUXTHREADS = False 82 83 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 84 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 85 86 87 @contextlib.contextmanager 88 def ignore_deprecation_warnings(msg_regex, quiet=False): 89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet): 90 yield 91 92 93 def requires_os_func(name): 94 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) 95 96 97 class _PathLike(os.PathLike): 98 99 def __init__(self, path=""): 100 self.path = path 101 102 def __str__(self): 103 return str(self.path) 104 105 def __fspath__(self): 106 if isinstance(self.path, BaseException): 107 raise self.path 108 else: 109 return self.path 110 111 112 def create_file(filename, content=b'content'): 113 with open(filename, "xb", 0) as fp: 114 fp.write(content) 115 116 117 # Tests creating TESTFN 118 class FileTests(unittest.TestCase): 119 def setUp(self): 120 if os.path.lexists(support.TESTFN): 121 os.unlink(support.TESTFN) 122 tearDown = setUp 123 124 def test_access(self): 125 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 126 os.close(f) 127 self.assertTrue(os.access(support.TESTFN, os.W_OK)) 128 129 def test_closerange(self): 130 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 131 # We must allocate two consecutive file descriptors, otherwise 132 # it will mess up other file descriptors (perhaps even the three 133 # standard ones). 134 second = os.dup(first) 135 try: 136 retries = 0 137 while second != first + 1: 138 os.close(first) 139 retries += 1 140 if retries > 10: 141 # XXX test skipped 142 self.skipTest("couldn't allocate two consecutive fds") 143 first, second = second, os.dup(second) 144 finally: 145 os.close(second) 146 # close a fd that is open, and one that isn't 147 os.closerange(first, first + 2) 148 self.assertRaises(OSError, os.write, first, b"a") 149 150 @support.cpython_only 151 def test_rename(self): 152 path = support.TESTFN 153 old = sys.getrefcount(path) 154 self.assertRaises(TypeError, os.rename, path, 0) 155 new = sys.getrefcount(path) 156 self.assertEqual(old, new) 157 158 def test_read(self): 159 with open(support.TESTFN, "w+b") as fobj: 160 fobj.write(b"spam") 161 fobj.flush() 162 fd = fobj.fileno() 163 os.lseek(fd, 0, 0) 164 s = os.read(fd, 4) 165 self.assertEqual(type(s), bytes) 166 self.assertEqual(s, b"spam") 167 168 @support.cpython_only 169 # Skip the test on 32-bit platforms: the number of bytes must fit in a 170 # Py_ssize_t type 171 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, 172 "needs INT_MAX < PY_SSIZE_T_MAX") 173 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) 174 def test_large_read(self, size): 175 self.addCleanup(support.unlink, support.TESTFN) 176 create_file(support.TESTFN, b'test') 177 178 # Issue #21932: Make sure that os.read() does not raise an 179 # OverflowError for size larger than INT_MAX 180 with open(support.TESTFN, "rb") as fp: 181 data = os.read(fp.fileno(), size) 182 183 # The test does not try to read more than 2 GB at once because the 184 # operating system is free to return less bytes than requested. 185 self.assertEqual(data, b'test') 186 187 def test_write(self): 188 # os.write() accepts bytes- and buffer-like objects but not strings 189 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY) 190 self.assertRaises(TypeError, os.write, fd, "beans") 191 os.write(fd, b"bacon\n") 192 os.write(fd, bytearray(b"eggs\n")) 193 os.write(fd, memoryview(b"spam\n")) 194 os.close(fd) 195 with open(support.TESTFN, "rb") as fobj: 196 self.assertEqual(fobj.read().splitlines(), 197 [b"bacon", b"eggs", b"spam"]) 198 199 def write_windows_console(self, *args): 200 retcode = subprocess.call(args, 201 # use a new console to not flood the test output 202 creationflags=subprocess.CREATE_NEW_CONSOLE, 203 # use a shell to hide the console window (SW_HIDE) 204 shell=True) 205 self.assertEqual(retcode, 0) 206 207 @unittest.skipUnless(sys.platform == 'win32', 208 'test specific to the Windows console') 209 def test_write_windows_console(self): 210 # Issue #11395: the Windows console returns an error (12: not enough 211 # space error) on writing into stdout if stdout mode is binary and the 212 # length is greater than 66,000 bytes (or less, depending on heap 213 # usage). 214 code = "print('x' * 100000)" 215 self.write_windows_console(sys.executable, "-c", code) 216 self.write_windows_console(sys.executable, "-u", "-c", code) 217 218 def fdopen_helper(self, *args): 219 fd = os.open(support.TESTFN, os.O_RDONLY) 220 f = os.fdopen(fd, *args) 221 f.close() 222 223 def test_fdopen(self): 224 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 225 os.close(fd) 226 227 self.fdopen_helper() 228 self.fdopen_helper('r') 229 self.fdopen_helper('r', 100) 230 231 def test_replace(self): 232 TESTFN2 = support.TESTFN + ".2" 233 self.addCleanup(support.unlink, support.TESTFN) 234 self.addCleanup(support.unlink, TESTFN2) 235 236 create_file(support.TESTFN, b"1") 237 create_file(TESTFN2, b"2") 238 239 os.replace(support.TESTFN, TESTFN2) 240 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) 241 with open(TESTFN2, 'r') as f: 242 self.assertEqual(f.read(), "1") 243 244 def test_open_keywords(self): 245 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, 246 dir_fd=None) 247 os.close(f) 248 249 def test_symlink_keywords(self): 250 symlink = support.get_attribute(os, "symlink") 251 try: 252 symlink(src='target', dst=support.TESTFN, 253 target_is_directory=False, dir_fd=None) 254 except (NotImplementedError, OSError): 255 pass # No OS support or unprivileged user 256 257 258 # Test attributes on return values from os.*stat* family. 259 class StatAttributeTests(unittest.TestCase): 260 def setUp(self): 261 self.fname = support.TESTFN 262 self.addCleanup(support.unlink, self.fname) 263 create_file(self.fname, b"ABC") 264 265 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 266 def check_stat_attributes(self, fname): 267 result = os.stat(fname) 268 269 # Make sure direct access works 270 self.assertEqual(result[stat.ST_SIZE], 3) 271 self.assertEqual(result.st_size, 3) 272 273 # Make sure all the attributes are there 274 members = dir(result) 275 for name in dir(stat): 276 if name[:3] == 'ST_': 277 attr = name.lower() 278 if name.endswith("TIME"): 279 def trunc(x): return int(x) 280 else: 281 def trunc(x): return x 282 self.assertEqual(trunc(getattr(result, attr)), 283 result[getattr(stat, name)]) 284 self.assertIn(attr, members) 285 286 # Make sure that the st_?time and st_?time_ns fields roughly agree 287 # (they should always agree up to around tens-of-microseconds) 288 for name in 'st_atime st_mtime st_ctime'.split(): 289 floaty = int(getattr(result, name) * 100000) 290 nanosecondy = getattr(result, name + "_ns") // 10000 291 self.assertAlmostEqual(floaty, nanosecondy, delta=2) 292 293 try: 294 result[200] 295 self.fail("No exception raised") 296 except IndexError: 297 pass 298 299 # Make sure that assignment fails 300 try: 301 result.st_mode = 1 302 self.fail("No exception raised") 303 except AttributeError: 304 pass 305 306 try: 307 result.st_rdev = 1 308 self.fail("No exception raised") 309 except (AttributeError, TypeError): 310 pass 311 312 try: 313 result.parrot = 1 314 self.fail("No exception raised") 315 except AttributeError: 316 pass 317 318 # Use the stat_result constructor with a too-short tuple. 319 try: 320 result2 = os.stat_result((10,)) 321 self.fail("No exception raised") 322 except TypeError: 323 pass 324 325 # Use the constructor with a too-long tuple. 326 try: 327 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 328 except TypeError: 329 pass 330 331 def test_stat_attributes(self): 332 self.check_stat_attributes(self.fname) 333 334 def test_stat_attributes_bytes(self): 335 try: 336 fname = self.fname.encode(sys.getfilesystemencoding()) 337 except UnicodeEncodeError: 338 self.skipTest("cannot encode %a for the filesystem" % self.fname) 339 self.check_stat_attributes(fname) 340 341 def test_stat_result_pickle(self): 342 result = os.stat(self.fname) 343 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 344 p = pickle.dumps(result, proto) 345 self.assertIn(b'stat_result', p) 346 if proto < 4: 347 self.assertIn(b'cos\nstat_result\n', p) 348 unpickled = pickle.loads(p) 349 self.assertEqual(result, unpickled) 350 351 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 352 def test_statvfs_attributes(self): 353 try: 354 result = os.statvfs(self.fname) 355 except OSError as e: 356 # On AtheOS, glibc always returns ENOSYS 357 if e.errno == errno.ENOSYS: 358 self.skipTest('os.statvfs() failed with ENOSYS') 359 360 # Make sure direct access works 361 self.assertEqual(result.f_bfree, result[3]) 362 363 # Make sure all the attributes are there. 364 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 365 'ffree', 'favail', 'flag', 'namemax') 366 for value, member in enumerate(members): 367 self.assertEqual(getattr(result, 'f_' + member), result[value]) 368 369 # Make sure that assignment really fails 370 try: 371 result.f_bfree = 1 372 self.fail("No exception raised") 373 except AttributeError: 374 pass 375 376 try: 377 result.parrot = 1 378 self.fail("No exception raised") 379 except AttributeError: 380 pass 381 382 # Use the constructor with a too-short tuple. 383 try: 384 result2 = os.statvfs_result((10,)) 385 self.fail("No exception raised") 386 except TypeError: 387 pass 388 389 # Use the constructor with a too-long tuple. 390 try: 391 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 392 except TypeError: 393 pass 394 395 @unittest.skipUnless(hasattr(os, 'statvfs'), 396 "need os.statvfs()") 397 def test_statvfs_result_pickle(self): 398 try: 399 result = os.statvfs(self.fname) 400 except OSError as e: 401 # On AtheOS, glibc always returns ENOSYS 402 if e.errno == errno.ENOSYS: 403 self.skipTest('os.statvfs() failed with ENOSYS') 404 405 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 406 p = pickle.dumps(result, proto) 407 self.assertIn(b'statvfs_result', p) 408 if proto < 4: 409 self.assertIn(b'cos\nstatvfs_result\n', p) 410 unpickled = pickle.loads(p) 411 self.assertEqual(result, unpickled) 412 413 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 414 def test_1686475(self): 415 # Verify that an open file can be stat'ed 416 try: 417 os.stat(r"c:\pagefile.sys") 418 except FileNotFoundError: 419 self.skipTest(r'c:\pagefile.sys does not exist') 420 except OSError as e: 421 self.fail("Could not stat pagefile.sys") 422 423 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 424 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 425 def test_15261(self): 426 # Verify that stat'ing a closed fd does not cause crash 427 r, w = os.pipe() 428 try: 429 os.stat(r) # should not raise error 430 finally: 431 os.close(r) 432 os.close(w) 433 with self.assertRaises(OSError) as ctx: 434 os.stat(r) 435 self.assertEqual(ctx.exception.errno, errno.EBADF) 436 437 def check_file_attributes(self, result): 438 self.assertTrue(hasattr(result, 'st_file_attributes')) 439 self.assertTrue(isinstance(result.st_file_attributes, int)) 440 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) 441 442 @unittest.skipUnless(sys.platform == "win32", 443 "st_file_attributes is Win32 specific") 444 def test_file_attributes(self): 445 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) 446 result = os.stat(self.fname) 447 self.check_file_attributes(result) 448 self.assertEqual( 449 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 450 0) 451 452 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) 453 dirname = support.TESTFN + "dir" 454 os.mkdir(dirname) 455 self.addCleanup(os.rmdir, dirname) 456 457 result = os.stat(dirname) 458 self.check_file_attributes(result) 459 self.assertEqual( 460 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 461 stat.FILE_ATTRIBUTE_DIRECTORY) 462 463 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 464 def test_access_denied(self): 465 # Default to FindFirstFile WIN32_FIND_DATA when access is 466 # denied. See issue 28075. 467 # os.environ['TEMP'] should be located on a volume that 468 # supports file ACLs. 469 fname = os.path.join(os.environ['TEMP'], self.fname) 470 self.addCleanup(support.unlink, fname) 471 create_file(fname, b'ABC') 472 # Deny the right to [S]YNCHRONIZE on the file to 473 # force CreateFile to fail with ERROR_ACCESS_DENIED. 474 DETACHED_PROCESS = 8 475 subprocess.check_call( 476 ['icacls.exe', fname, '/deny', 'Users:(S)'], 477 creationflags=DETACHED_PROCESS 478 ) 479 result = os.stat(fname) 480 self.assertNotEqual(result.st_size, 0) 481 482 483 class UtimeTests(unittest.TestCase): 484 def setUp(self): 485 self.dirname = support.TESTFN 486 self.fname = os.path.join(self.dirname, "f1") 487 488 self.addCleanup(support.rmtree, self.dirname) 489 os.mkdir(self.dirname) 490 create_file(self.fname) 491 492 def restore_float_times(state): 493 with ignore_deprecation_warnings('stat_float_times'): 494 os.stat_float_times(state) 495 496 # ensure that st_atime and st_mtime are float 497 with ignore_deprecation_warnings('stat_float_times'): 498 old_float_times = os.stat_float_times(-1) 499 self.addCleanup(restore_float_times, old_float_times) 500 501 os.stat_float_times(True) 502 503 def support_subsecond(self, filename): 504 # Heuristic to check if the filesystem supports timestamp with 505 # subsecond resolution: check if float and int timestamps are different 506 st = os.stat(filename) 507 return ((st.st_atime != st[7]) 508 or (st.st_mtime != st[8]) 509 or (st.st_ctime != st[9])) 510 511 def _test_utime(self, set_time, filename=None): 512 if not filename: 513 filename = self.fname 514 515 support_subsecond = self.support_subsecond(filename) 516 if support_subsecond: 517 # Timestamp with a resolution of 1 microsecond (10^-6). 518 # 519 # The resolution of the C internal function used by os.utime() 520 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable 521 # test with a resolution of 1 ns requires more work: 522 # see the issue #15745. 523 atime_ns = 1002003000 # 1.002003 seconds 524 mtime_ns = 4005006000 # 4.005006 seconds 525 else: 526 # use a resolution of 1 second 527 atime_ns = 5 * 10**9 528 mtime_ns = 8 * 10**9 529 530 set_time(filename, (atime_ns, mtime_ns)) 531 st = os.stat(filename) 532 533 if support_subsecond: 534 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6) 535 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6) 536 else: 537 self.assertEqual(st.st_atime, atime_ns * 1e-9) 538 self.assertEqual(st.st_mtime, mtime_ns * 1e-9) 539 self.assertEqual(st.st_atime_ns, atime_ns) 540 self.assertEqual(st.st_mtime_ns, mtime_ns) 541 542 def test_utime(self): 543 def set_time(filename, ns): 544 # test the ns keyword parameter 545 os.utime(filename, ns=ns) 546 self._test_utime(set_time) 547 548 @staticmethod 549 def ns_to_sec(ns): 550 # Convert a number of nanosecond (int) to a number of seconds (float). 551 # Round towards infinity by adding 0.5 nanosecond to avoid rounding 552 # issue, os.utime() rounds towards minus infinity. 553 return (ns * 1e-9) + 0.5e-9 554 555 def test_utime_by_indexed(self): 556 # pass times as floating point seconds as the second indexed parameter 557 def set_time(filename, ns): 558 atime_ns, mtime_ns = ns 559 atime = self.ns_to_sec(atime_ns) 560 mtime = self.ns_to_sec(mtime_ns) 561 # test utimensat(timespec), utimes(timeval), utime(utimbuf) 562 # or utime(time_t) 563 os.utime(filename, (atime, mtime)) 564 self._test_utime(set_time) 565 566 def test_utime_by_times(self): 567 def set_time(filename, ns): 568 atime_ns, mtime_ns = ns 569 atime = self.ns_to_sec(atime_ns) 570 mtime = self.ns_to_sec(mtime_ns) 571 # test the times keyword parameter 572 os.utime(filename, times=(atime, mtime)) 573 self._test_utime(set_time) 574 575 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, 576 "follow_symlinks support for utime required " 577 "for this test.") 578 def test_utime_nofollow_symlinks(self): 579 def set_time(filename, ns): 580 # use follow_symlinks=False to test utimensat(timespec) 581 # or lutimes(timeval) 582 os.utime(filename, ns=ns, follow_symlinks=False) 583 self._test_utime(set_time) 584 585 @unittest.skipUnless(os.utime in os.supports_fd, 586 "fd support for utime required for this test.") 587 def test_utime_fd(self): 588 def set_time(filename, ns): 589 with open(filename, 'wb', 0) as fp: 590 # use a file descriptor to test futimens(timespec) 591 # or futimes(timeval) 592 os.utime(fp.fileno(), ns=ns) 593 self._test_utime(set_time) 594 595 @unittest.skipUnless(os.utime in os.supports_dir_fd, 596 "dir_fd support for utime required for this test.") 597 def test_utime_dir_fd(self): 598 def set_time(filename, ns): 599 dirname, name = os.path.split(filename) 600 dirfd = os.open(dirname, os.O_RDONLY) 601 try: 602 # pass dir_fd to test utimensat(timespec) or futimesat(timeval) 603 os.utime(name, dir_fd=dirfd, ns=ns) 604 finally: 605 os.close(dirfd) 606 self._test_utime(set_time) 607 608 def test_utime_directory(self): 609 def set_time(filename, ns): 610 # test calling os.utime() on a directory 611 os.utime(filename, ns=ns) 612 self._test_utime(set_time, filename=self.dirname) 613 614 def _test_utime_current(self, set_time): 615 # Get the system clock 616 current = time.time() 617 618 # Call os.utime() to set the timestamp to the current system clock 619 set_time(self.fname) 620 621 if not self.support_subsecond(self.fname): 622 delta = 1.0 623 else: 624 # On Windows, the usual resolution of time.time() is 15.6 ms 625 delta = 0.020 626 st = os.stat(self.fname) 627 msg = ("st_time=%r, current=%r, dt=%r" 628 % (st.st_mtime, current, st.st_mtime - current)) 629 self.assertAlmostEqual(st.st_mtime, current, 630 delta=delta, msg=msg) 631 632 def test_utime_current(self): 633 def set_time(filename): 634 # Set to the current time in the new way 635 os.utime(self.fname) 636 self._test_utime_current(set_time) 637 638 def test_utime_current_old(self): 639 def set_time(filename): 640 # Set to the current time in the old explicit way. 641 os.utime(self.fname, None) 642 self._test_utime_current(set_time) 643 644 def get_file_system(self, path): 645 if sys.platform == 'win32': 646 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 647 import ctypes 648 kernel32 = ctypes.windll.kernel32 649 buf = ctypes.create_unicode_buffer("", 100) 650 ok = kernel32.GetVolumeInformationW(root, None, 0, 651 None, None, None, 652 buf, len(buf)) 653 if ok: 654 return buf.value 655 # return None if the filesystem is unknown 656 657 def test_large_time(self): 658 # Many filesystems are limited to the year 2038. At least, the test 659 # pass with NTFS filesystem. 660 if self.get_file_system(self.dirname) != "NTFS": 661 self.skipTest("requires NTFS") 662 663 large = 5000000000 # some day in 2128 664 os.utime(self.fname, (large, large)) 665 self.assertEqual(os.stat(self.fname).st_mtime, large) 666 667 def test_utime_invalid_arguments(self): 668 # seconds and nanoseconds parameters are mutually exclusive 669 with self.assertRaises(ValueError): 670 os.utime(self.fname, (5, 5), ns=(5, 5)) 671 672 673 from test import mapping_tests 674 675 class EnvironTests(mapping_tests.BasicTestMappingProtocol): 676 """check that os.environ object conform to mapping protocol""" 677 type2test = None 678 679 def setUp(self): 680 self.__save = dict(os.environ) 681 if os.supports_bytes_environ: 682 self.__saveb = dict(os.environb) 683 for key, value in self._reference().items(): 684 os.environ[key] = value 685 686 def tearDown(self): 687 os.environ.clear() 688 os.environ.update(self.__save) 689 if os.supports_bytes_environ: 690 os.environb.clear() 691 os.environb.update(self.__saveb) 692 693 def _reference(self): 694 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 695 696 def _empty_mapping(self): 697 os.environ.clear() 698 return os.environ 699 700 # Bug 1110478 701 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 702 'requires a shell') 703 def test_update2(self): 704 os.environ.clear() 705 os.environ.update(HELLO="World") 706 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: 707 value = popen.read().strip() 708 self.assertEqual(value, "World") 709 710 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 711 'requires a shell') 712 def test_os_popen_iter(self): 713 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" 714 % unix_shell) as popen: 715 it = iter(popen) 716 self.assertEqual(next(it), "line1\n") 717 self.assertEqual(next(it), "line2\n") 718 self.assertEqual(next(it), "line3\n") 719 self.assertRaises(StopIteration, next, it) 720 721 # Verify environ keys and values from the OS are of the 722 # correct str type. 723 def test_keyvalue_types(self): 724 for key, val in os.environ.items(): 725 self.assertEqual(type(key), str) 726 self.assertEqual(type(val), str) 727 728 def test_items(self): 729 for key, value in self._reference().items(): 730 self.assertEqual(os.environ.get(key), value) 731 732 # Issue 7310 733 def test___repr__(self): 734 """Check that the repr() of os.environ looks like environ({...}).""" 735 env = os.environ 736 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join( 737 '{!r}: {!r}'.format(key, value) 738 for key, value in env.items()))) 739 740 def test_get_exec_path(self): 741 defpath_list = os.defpath.split(os.pathsep) 742 test_path = ['/monty', '/python', '', '/flying/circus'] 743 test_env = {'PATH': os.pathsep.join(test_path)} 744 745 saved_environ = os.environ 746 try: 747 os.environ = dict(test_env) 748 # Test that defaulting to os.environ works. 749 self.assertSequenceEqual(test_path, os.get_exec_path()) 750 self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) 751 finally: 752 os.environ = saved_environ 753 754 # No PATH environment variable 755 self.assertSequenceEqual(defpath_list, os.get_exec_path({})) 756 # Empty PATH environment variable 757 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) 758 # Supplied PATH environment variable 759 self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) 760 761 if os.supports_bytes_environ: 762 # env cannot contain 'PATH' and b'PATH' keys 763 try: 764 # ignore BytesWarning warning 765 with warnings.catch_warnings(record=True): 766 mixed_env = {'PATH': '1', b'PATH': b'2'} 767 except BytesWarning: 768 # mixed_env cannot be created with python -bb 769 pass 770 else: 771 self.assertRaises(ValueError, os.get_exec_path, mixed_env) 772 773 # bytes key and/or value 774 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), 775 ['abc']) 776 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), 777 ['abc']) 778 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), 779 ['abc']) 780 781 @unittest.skipUnless(os.supports_bytes_environ, 782 "os.environb required for this test.") 783 def test_environb(self): 784 # os.environ -> os.environb 785 value = 'euro\u20ac' 786 try: 787 value_bytes = value.encode(sys.getfilesystemencoding(), 788 'surrogateescape') 789 except UnicodeEncodeError: 790 msg = "U+20AC character is not encodable to %s" % ( 791 sys.getfilesystemencoding(),) 792 self.skipTest(msg) 793 os.environ['unicode'] = value 794 self.assertEqual(os.environ['unicode'], value) 795 self.assertEqual(os.environb[b'unicode'], value_bytes) 796 797 # os.environb -> os.environ 798 value = b'\xff' 799 os.environb[b'bytes'] = value 800 self.assertEqual(os.environb[b'bytes'], value) 801 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') 802 self.assertEqual(os.environ['bytes'], value_str) 803 804 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue 805 # #13415). 806 @support.requires_freebsd_version(7) 807 @support.requires_mac_ver(10, 6) 808 def test_unset_error(self): 809 if sys.platform == "win32": 810 # an environment variable is limited to 32,767 characters 811 key = 'x' * 50000 812 self.assertRaises(ValueError, os.environ.__delitem__, key) 813 else: 814 # "=" is not allowed in a variable name 815 key = 'key=' 816 self.assertRaises(OSError, os.environ.__delitem__, key) 817 818 def test_key_type(self): 819 missing = 'missingkey' 820 self.assertNotIn(missing, os.environ) 821 822 with self.assertRaises(KeyError) as cm: 823 os.environ[missing] 824 self.assertIs(cm.exception.args[0], missing) 825 self.assertTrue(cm.exception.__suppress_context__) 826 827 with self.assertRaises(KeyError) as cm: 828 del os.environ[missing] 829 self.assertIs(cm.exception.args[0], missing) 830 self.assertTrue(cm.exception.__suppress_context__) 831 832 833 class WalkTests(unittest.TestCase): 834 """Tests for os.walk().""" 835 836 # Wrapper to hide minor differences between os.walk and os.fwalk 837 # to tests both functions with the same code base 838 def walk(self, top, **kwargs): 839 if 'follow_symlinks' in kwargs: 840 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 841 return os.walk(top, **kwargs) 842 843 def setUp(self): 844 join = os.path.join 845 self.addCleanup(support.rmtree, support.TESTFN) 846 847 # Build: 848 # TESTFN/ 849 # TEST1/ a file kid and two directory kids 850 # tmp1 851 # SUB1/ a file kid and a directory kid 852 # tmp2 853 # SUB11/ no kids 854 # SUB2/ a file kid and a dirsymlink kid 855 # tmp3 856 # SUB21/ not readable 857 # tmp5 858 # link/ a symlink to TESTFN.2 859 # broken_link 860 # broken_link2 861 # broken_link3 862 # TEST2/ 863 # tmp4 a lone file 864 self.walk_path = join(support.TESTFN, "TEST1") 865 self.sub1_path = join(self.walk_path, "SUB1") 866 self.sub11_path = join(self.sub1_path, "SUB11") 867 sub2_path = join(self.walk_path, "SUB2") 868 sub21_path = join(sub2_path, "SUB21") 869 tmp1_path = join(self.walk_path, "tmp1") 870 tmp2_path = join(self.sub1_path, "tmp2") 871 tmp3_path = join(sub2_path, "tmp3") 872 tmp5_path = join(sub21_path, "tmp3") 873 self.link_path = join(sub2_path, "link") 874 t2_path = join(support.TESTFN, "TEST2") 875 tmp4_path = join(support.TESTFN, "TEST2", "tmp4") 876 broken_link_path = join(sub2_path, "broken_link") 877 broken_link2_path = join(sub2_path, "broken_link2") 878 broken_link3_path = join(sub2_path, "broken_link3") 879 880 # Create stuff. 881 os.makedirs(self.sub11_path) 882 os.makedirs(sub2_path) 883 os.makedirs(sub21_path) 884 os.makedirs(t2_path) 885 886 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path: 887 with open(path, "x") as f: 888 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 889 890 if support.can_symlink(): 891 os.symlink(os.path.abspath(t2_path), self.link_path) 892 os.symlink('broken', broken_link_path, True) 893 os.symlink(join('tmp3', 'broken'), broken_link2_path, True) 894 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True) 895 self.sub2_tree = (sub2_path, ["SUB21", "link"], 896 ["broken_link", "broken_link2", "broken_link3", 897 "tmp3"]) 898 else: 899 self.sub2_tree = (sub2_path, [], ["tmp3"]) 900 901 os.chmod(sub21_path, 0) 902 try: 903 os.listdir(sub21_path) 904 except PermissionError: 905 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU) 906 else: 907 os.chmod(sub21_path, stat.S_IRWXU) 908 os.unlink(tmp5_path) 909 os.rmdir(sub21_path) 910 del self.sub2_tree[1][:1] 911 912 def test_walk_topdown(self): 913 # Walk top-down. 914 all = list(self.walk(self.walk_path)) 915 916 self.assertEqual(len(all), 4) 917 # We can't know which order SUB1 and SUB2 will appear in. 918 # Not flipped: TESTFN, SUB1, SUB11, SUB2 919 # flipped: TESTFN, SUB2, SUB1, SUB11 920 flipped = all[0][1][0] != "SUB1" 921 all[0][1].sort() 922 all[3 - 2 * flipped][-1].sort() 923 all[3 - 2 * flipped][1].sort() 924 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 925 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) 926 self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) 927 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) 928 929 def test_walk_prune(self, walk_path=None): 930 if walk_path is None: 931 walk_path = self.walk_path 932 # Prune the search. 933 all = [] 934 for root, dirs, files in self.walk(walk_path): 935 all.append((root, dirs, files)) 936 # Don't descend into SUB1. 937 if 'SUB1' in dirs: 938 # Note that this also mutates the dirs we appended to all! 939 dirs.remove('SUB1') 940 941 self.assertEqual(len(all), 2) 942 self.assertEqual(all[0], 943 (str(walk_path), ["SUB2"], ["tmp1"])) 944 945 all[1][-1].sort() 946 all[1][1].sort() 947 self.assertEqual(all[1], self.sub2_tree) 948 949 def test_file_like_path(self): 950 self.test_walk_prune(_PathLike(self.walk_path)) 951 952 def test_walk_bottom_up(self): 953 # Walk bottom-up. 954 all = list(self.walk(self.walk_path, topdown=False)) 955 956 self.assertEqual(len(all), 4, all) 957 # We can't know which order SUB1 and SUB2 will appear in. 958 # Not flipped: SUB11, SUB1, SUB2, TESTFN 959 # flipped: SUB2, SUB11, SUB1, TESTFN 960 flipped = all[3][1][0] != "SUB1" 961 all[3][1].sort() 962 all[2 - 2 * flipped][-1].sort() 963 all[2 - 2 * flipped][1].sort() 964 self.assertEqual(all[3], 965 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 966 self.assertEqual(all[flipped], 967 (self.sub11_path, [], [])) 968 self.assertEqual(all[flipped + 1], 969 (self.sub1_path, ["SUB11"], ["tmp2"])) 970 self.assertEqual(all[2 - 2 * flipped], 971 self.sub2_tree) 972 973 def test_walk_symlink(self): 974 if not support.can_symlink(): 975 self.skipTest("need symlink support") 976 977 # Walk, following symlinks. 978 walk_it = self.walk(self.walk_path, follow_symlinks=True) 979 for root, dirs, files in walk_it: 980 if root == self.link_path: 981 self.assertEqual(dirs, []) 982 self.assertEqual(files, ["tmp4"]) 983 break 984 else: 985 self.fail("Didn't follow symlink with followlinks=True") 986 987 def test_walk_bad_dir(self): 988 # Walk top-down. 989 errors = [] 990 walk_it = self.walk(self.walk_path, onerror=errors.append) 991 root, dirs, files = next(walk_it) 992 self.assertEqual(errors, []) 993 dir1 = 'SUB1' 994 path1 = os.path.join(root, dir1) 995 path1new = os.path.join(root, dir1 + '.new') 996 os.rename(path1, path1new) 997 try: 998 roots = [r for r, d, f in walk_it] 999 self.assertTrue(errors) 1000 self.assertNotIn(path1, roots) 1001 self.assertNotIn(path1new, roots) 1002 for dir2 in dirs: 1003 if dir2 != dir1: 1004 self.assertIn(os.path.join(root, dir2), roots) 1005 finally: 1006 os.rename(path1new, path1) 1007 1008 1009 @unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1010 class FwalkTests(WalkTests): 1011 """Tests for os.fwalk().""" 1012 1013 def walk(self, top, **kwargs): 1014 for root, dirs, files, root_fd in os.fwalk(top, **kwargs): 1015 yield (root, dirs, files) 1016 1017 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): 1018 """ 1019 compare with walk() results. 1020 """ 1021 walk_kwargs = walk_kwargs.copy() 1022 fwalk_kwargs = fwalk_kwargs.copy() 1023 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1024 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks) 1025 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks) 1026 1027 expected = {} 1028 for root, dirs, files in os.walk(**walk_kwargs): 1029 expected[root] = (set(dirs), set(files)) 1030 1031 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs): 1032 self.assertIn(root, expected) 1033 self.assertEqual(expected[root], (set(dirs), set(files))) 1034 1035 def test_compare_to_walk(self): 1036 kwargs = {'top': support.TESTFN} 1037 self._compare_to_walk(kwargs, kwargs) 1038 1039 def test_dir_fd(self): 1040 try: 1041 fd = os.open(".", os.O_RDONLY) 1042 walk_kwargs = {'top': support.TESTFN} 1043 fwalk_kwargs = walk_kwargs.copy() 1044 fwalk_kwargs['dir_fd'] = fd 1045 self._compare_to_walk(walk_kwargs, fwalk_kwargs) 1046 finally: 1047 os.close(fd) 1048 1049 def test_yields_correct_dir_fd(self): 1050 # check returned file descriptors 1051 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1052 args = support.TESTFN, topdown, None 1053 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks): 1054 # check that the FD is valid 1055 os.fstat(rootfd) 1056 # redundant check 1057 os.stat(rootfd) 1058 # check that listdir() returns consistent information 1059 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) 1060 1061 def test_fd_leak(self): 1062 # Since we're opening a lot of FDs, we must be careful to avoid leaks: 1063 # we both check that calling fwalk() a large number of times doesn't 1064 # yield EMFILE, and that the minimum allocated FD hasn't changed. 1065 minfd = os.dup(1) 1066 os.close(minfd) 1067 for i in range(256): 1068 for x in os.fwalk(support.TESTFN): 1069 pass 1070 newfd = os.dup(1) 1071 self.addCleanup(os.close, newfd) 1072 self.assertEqual(newfd, minfd) 1073 1074 class BytesWalkTests(WalkTests): 1075 """Tests for os.walk() with bytes.""" 1076 def setUp(self): 1077 super().setUp() 1078 self.stack = contextlib.ExitStack() 1079 1080 def tearDown(self): 1081 self.stack.close() 1082 super().tearDown() 1083 1084 def walk(self, top, **kwargs): 1085 if 'follow_symlinks' in kwargs: 1086 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1087 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs): 1088 root = os.fsdecode(broot) 1089 dirs = list(map(os.fsdecode, bdirs)) 1090 files = list(map(os.fsdecode, bfiles)) 1091 yield (root, dirs, files) 1092 bdirs[:] = list(map(os.fsencode, dirs)) 1093 bfiles[:] = list(map(os.fsencode, files)) 1094 1095 1096 class MakedirTests(unittest.TestCase): 1097 def setUp(self): 1098 os.mkdir(support.TESTFN) 1099 1100 def test_makedir(self): 1101 base = support.TESTFN 1102 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 1103 os.makedirs(path) # Should work 1104 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 1105 os.makedirs(path) 1106 1107 # Try paths with a '.' in them 1108 self.assertRaises(OSError, os.makedirs, os.curdir) 1109 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 1110 os.makedirs(path) 1111 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 1112 'dir5', 'dir6') 1113 os.makedirs(path) 1114 1115 def test_exist_ok_existing_directory(self): 1116 path = os.path.join(support.TESTFN, 'dir1') 1117 mode = 0o777 1118 old_mask = os.umask(0o022) 1119 os.makedirs(path, mode) 1120 self.assertRaises(OSError, os.makedirs, path, mode) 1121 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 1122 os.makedirs(path, 0o776, exist_ok=True) 1123 os.makedirs(path, mode=mode, exist_ok=True) 1124 os.umask(old_mask) 1125 1126 # Issue #25583: A drive root could raise PermissionError on Windows 1127 os.makedirs(os.path.abspath('/'), exist_ok=True) 1128 1129 def test_exist_ok_s_isgid_directory(self): 1130 path = os.path.join(support.TESTFN, 'dir1') 1131 S_ISGID = stat.S_ISGID 1132 mode = 0o777 1133 old_mask = os.umask(0o022) 1134 try: 1135 existing_testfn_mode = stat.S_IMODE( 1136 os.lstat(support.TESTFN).st_mode) 1137 try: 1138 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID) 1139 except PermissionError: 1140 raise unittest.SkipTest('Cannot set S_ISGID for dir.') 1141 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID): 1142 raise unittest.SkipTest('No support for S_ISGID dir mode.') 1143 # The os should apply S_ISGID from the parent dir for us, but 1144 # this test need not depend on that behavior. Be explicit. 1145 os.makedirs(path, mode | S_ISGID) 1146 # http://bugs.python.org/issue14992 1147 # Should not fail when the bit is already set. 1148 os.makedirs(path, mode, exist_ok=True) 1149 # remove the bit. 1150 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID) 1151 # May work even when the bit is not already set when demanded. 1152 os.makedirs(path, mode | S_ISGID, exist_ok=True) 1153 finally: 1154 os.umask(old_mask) 1155 1156 def test_exist_ok_existing_regular_file(self): 1157 base = support.TESTFN 1158 path = os.path.join(support.TESTFN, 'dir1') 1159 f = open(path, 'w') 1160 f.write('abc') 1161 f.close() 1162 self.assertRaises(OSError, os.makedirs, path) 1163 self.assertRaises(OSError, os.makedirs, path, exist_ok=False) 1164 self.assertRaises(OSError, os.makedirs, path, exist_ok=True) 1165 os.remove(path) 1166 1167 def tearDown(self): 1168 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3', 1169 'dir4', 'dir5', 'dir6') 1170 # If the tests failed, the bottom-most directory ('../dir6') 1171 # may not have been created, so we look for the outermost directory 1172 # that exists. 1173 while not os.path.exists(path) and path != support.TESTFN: 1174 path = os.path.dirname(path) 1175 1176 os.removedirs(path) 1177 1178 1179 @unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown") 1180 class ChownFileTests(unittest.TestCase): 1181 1182 @classmethod 1183 def setUpClass(cls): 1184 os.mkdir(support.TESTFN) 1185 1186 def test_chown_uid_gid_arguments_must_be_index(self): 1187 stat = os.stat(support.TESTFN) 1188 uid = stat.st_uid 1189 gid = stat.st_gid 1190 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): 1191 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) 1192 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) 1193 self.assertIsNone(os.chown(support.TESTFN, uid, gid)) 1194 self.assertIsNone(os.chown(support.TESTFN, -1, -1)) 1195 1196 @unittest.skipUnless(len(groups) > 1, "test needs more than one group") 1197 def test_chown(self): 1198 gid_1, gid_2 = groups[:2] 1199 uid = os.stat(support.TESTFN).st_uid 1200 os.chown(support.TESTFN, uid, gid_1) 1201 gid = os.stat(support.TESTFN).st_gid 1202 self.assertEqual(gid, gid_1) 1203 os.chown(support.TESTFN, uid, gid_2) 1204 gid = os.stat(support.TESTFN).st_gid 1205 self.assertEqual(gid, gid_2) 1206 1207 @unittest.skipUnless(root_in_posix and len(all_users) > 1, 1208 "test needs root privilege and more than one user") 1209 def test_chown_with_root(self): 1210 uid_1, uid_2 = all_users[:2] 1211 gid = os.stat(support.TESTFN).st_gid 1212 os.chown(support.TESTFN, uid_1, gid) 1213 uid = os.stat(support.TESTFN).st_uid 1214 self.assertEqual(uid, uid_1) 1215 os.chown(support.TESTFN, uid_2, gid) 1216 uid = os.stat(support.TESTFN).st_uid 1217 self.assertEqual(uid, uid_2) 1218 1219 @unittest.skipUnless(not root_in_posix and len(all_users) > 1, 1220 "test needs non-root account and more than one user") 1221 def test_chown_without_permission(self): 1222 uid_1, uid_2 = all_users[:2] 1223 gid = os.stat(support.TESTFN).st_gid 1224 with self.assertRaises(PermissionError): 1225 os.chown(support.TESTFN, uid_1, gid) 1226 os.chown(support.TESTFN, uid_2, gid) 1227 1228 @classmethod 1229 def tearDownClass(cls): 1230 os.rmdir(support.TESTFN) 1231 1232 1233 class RemoveDirsTests(unittest.TestCase): 1234 def setUp(self): 1235 os.makedirs(support.TESTFN) 1236 1237 def tearDown(self): 1238 support.rmtree(support.TESTFN) 1239 1240 def test_remove_all(self): 1241 dira = os.path.join(support.TESTFN, 'dira') 1242 os.mkdir(dira) 1243 dirb = os.path.join(dira, 'dirb') 1244 os.mkdir(dirb) 1245 os.removedirs(dirb) 1246 self.assertFalse(os.path.exists(dirb)) 1247 self.assertFalse(os.path.exists(dira)) 1248 self.assertFalse(os.path.exists(support.TESTFN)) 1249 1250 def test_remove_partial(self): 1251 dira = os.path.join(support.TESTFN, 'dira') 1252 os.mkdir(dira) 1253 dirb = os.path.join(dira, 'dirb') 1254 os.mkdir(dirb) 1255 create_file(os.path.join(dira, 'file.txt')) 1256 os.removedirs(dirb) 1257 self.assertFalse(os.path.exists(dirb)) 1258 self.assertTrue(os.path.exists(dira)) 1259 self.assertTrue(os.path.exists(support.TESTFN)) 1260 1261 def test_remove_nothing(self): 1262 dira = os.path.join(support.TESTFN, 'dira') 1263 os.mkdir(dira) 1264 dirb = os.path.join(dira, 'dirb') 1265 os.mkdir(dirb) 1266 create_file(os.path.join(dirb, 'file.txt')) 1267 with self.assertRaises(OSError): 1268 os.removedirs(dirb) 1269 self.assertTrue(os.path.exists(dirb)) 1270 self.assertTrue(os.path.exists(dira)) 1271 self.assertTrue(os.path.exists(support.TESTFN)) 1272 1273 1274 class DevNullTests(unittest.TestCase): 1275 def test_devnull(self): 1276 with open(os.devnull, 'wb', 0) as f: 1277 f.write(b'hello') 1278 f.close() 1279 with open(os.devnull, 'rb') as f: 1280 self.assertEqual(f.read(), b'') 1281 1282 1283 class URandomTests(unittest.TestCase): 1284 def test_urandom_length(self): 1285 self.assertEqual(len(os.urandom(0)), 0) 1286 self.assertEqual(len(os.urandom(1)), 1) 1287 self.assertEqual(len(os.urandom(10)), 10) 1288 self.assertEqual(len(os.urandom(100)), 100) 1289 self.assertEqual(len(os.urandom(1000)), 1000) 1290 1291 def test_urandom_value(self): 1292 data1 = os.urandom(16) 1293 self.assertIsInstance(data1, bytes) 1294 data2 = os.urandom(16) 1295 self.assertNotEqual(data1, data2) 1296 1297 def get_urandom_subprocess(self, count): 1298 code = '\n'.join(( 1299 'import os, sys', 1300 'data = os.urandom(%s)' % count, 1301 'sys.stdout.buffer.write(data)', 1302 'sys.stdout.buffer.flush()')) 1303 out = assert_python_ok('-c', code) 1304 stdout = out[1] 1305 self.assertEqual(len(stdout), 16) 1306 return stdout 1307 1308 def test_urandom_subprocess(self): 1309 data1 = self.get_urandom_subprocess(16) 1310 data2 = self.get_urandom_subprocess(16) 1311 self.assertNotEqual(data1, data2) 1312 1313 1314 @unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()') 1315 class GetRandomTests(unittest.TestCase): 1316 @classmethod 1317 def setUpClass(cls): 1318 try: 1319 os.getrandom(1) 1320 except OSError as exc: 1321 if exc.errno == errno.ENOSYS: 1322 # Python compiled on a more recent Linux version 1323 # than the current Linux kernel 1324 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS") 1325 else: 1326 raise 1327 1328 def test_getrandom_type(self): 1329 data = os.getrandom(16) 1330 self.assertIsInstance(data, bytes) 1331 self.assertEqual(len(data), 16) 1332 1333 def test_getrandom0(self): 1334 empty = os.getrandom(0) 1335 self.assertEqual(empty, b'') 1336 1337 def test_getrandom_random(self): 1338 self.assertTrue(hasattr(os, 'GRND_RANDOM')) 1339 1340 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare 1341 # resource /dev/random 1342 1343 def test_getrandom_nonblock(self): 1344 # The call must not fail. Check also that the flag exists 1345 try: 1346 os.getrandom(1, os.GRND_NONBLOCK) 1347 except BlockingIOError: 1348 # System urandom is not initialized yet 1349 pass 1350 1351 def test_getrandom_value(self): 1352 data1 = os.getrandom(16) 1353 data2 = os.getrandom(16) 1354 self.assertNotEqual(data1, data2) 1355 1356 1357 # os.urandom() doesn't use a file descriptor when it is implemented with the 1358 # getentropy() function, the getrandom() function or the getrandom() syscall 1359 OS_URANDOM_DONT_USE_FD = ( 1360 sysconfig.get_config_var('HAVE_GETENTROPY') == 1 1361 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1 1362 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) 1363 1364 @unittest.skipIf(OS_URANDOM_DONT_USE_FD , 1365 "os.random() does not use a file descriptor") 1366 class URandomFDTests(unittest.TestCase): 1367 @unittest.skipUnless(resource, "test requires the resource module") 1368 def test_urandom_failure(self): 1369 # Check urandom() failing when it is not able to open /dev/random. 1370 # We spawn a new process to make the test more robust (if getrlimit() 1371 # failed to restore the file descriptor limit after this, the whole 1372 # test suite would crash; this actually happened on the OS X Tiger 1373 # buildbot). 1374 code = """if 1: 1375 import errno 1376 import os 1377 import resource 1378 1379 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 1380 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 1381 try: 1382 os.urandom(16) 1383 except OSError as e: 1384 assert e.errno == errno.EMFILE, e.errno 1385 else: 1386 raise AssertionError("OSError not raised") 1387 """ 1388 assert_python_ok('-c', code) 1389 1390 def test_urandom_fd_closed(self): 1391 # Issue #21207: urandom() should reopen its fd to /dev/urandom if 1392 # closed. 1393 code = """if 1: 1394 import os 1395 import sys 1396 import test.support 1397 os.urandom(4) 1398 with test.support.SuppressCrashReport(): 1399 os.closerange(3, 256) 1400 sys.stdout.buffer.write(os.urandom(4)) 1401 """ 1402 rc, out, err = assert_python_ok('-Sc', code) 1403 1404 def test_urandom_fd_reopened(self): 1405 # Issue #21207: urandom() should detect its fd to /dev/urandom 1406 # changed to something else, and reopen it. 1407 self.addCleanup(support.unlink, support.TESTFN) 1408 create_file(support.TESTFN, b"x" * 256) 1409 1410 code = """if 1: 1411 import os 1412 import sys 1413 import test.support 1414 os.urandom(4) 1415 with test.support.SuppressCrashReport(): 1416 for fd in range(3, 256): 1417 try: 1418 os.close(fd) 1419 except OSError: 1420 pass 1421 else: 1422 # Found the urandom fd (XXX hopefully) 1423 break 1424 os.closerange(3, 256) 1425 with open({TESTFN!r}, 'rb') as f: 1426 new_fd = f.fileno() 1427 # Issue #26935: posix allows new_fd and fd to be equal but 1428 # some libc implementations have dup2 return an error in this 1429 # case. 1430 if new_fd != fd: 1431 os.dup2(new_fd, fd) 1432 sys.stdout.buffer.write(os.urandom(4)) 1433 sys.stdout.buffer.write(os.urandom(4)) 1434 """.format(TESTFN=support.TESTFN) 1435 rc, out, err = assert_python_ok('-Sc', code) 1436 self.assertEqual(len(out), 8) 1437 self.assertNotEqual(out[0:4], out[4:8]) 1438 rc, out2, err2 = assert_python_ok('-Sc', code) 1439 self.assertEqual(len(out2), 8) 1440 self.assertNotEqual(out2, out) 1441 1442 1443 @contextlib.contextmanager 1444 def _execvpe_mockup(defpath=None): 1445 """ 1446 Stubs out execv and execve functions when used as context manager. 1447 Records exec calls. The mock execv and execve functions always raise an 1448 exception as they would normally never return. 1449 """ 1450 # A list of tuples containing (function name, first arg, args) 1451 # of calls to execv or execve that have been made. 1452 calls = [] 1453 1454 def mock_execv(name, *args): 1455 calls.append(('execv', name, args)) 1456 raise RuntimeError("execv called") 1457 1458 def mock_execve(name, *args): 1459 calls.append(('execve', name, args)) 1460 raise OSError(errno.ENOTDIR, "execve called") 1461 1462 try: 1463 orig_execv = os.execv 1464 orig_execve = os.execve 1465 orig_defpath = os.defpath 1466 os.execv = mock_execv 1467 os.execve = mock_execve 1468 if defpath is not None: 1469 os.defpath = defpath 1470 yield calls 1471 finally: 1472 os.execv = orig_execv 1473 os.execve = orig_execve 1474 os.defpath = orig_defpath 1475 1476 1477 class ExecTests(unittest.TestCase): 1478 @unittest.skipIf(USING_LINUXTHREADS, 1479 "avoid triggering a linuxthreads bug: see issue #4970") 1480 def test_execvpe_with_bad_program(self): 1481 self.assertRaises(OSError, os.execvpe, 'no such app-', 1482 ['no such app-'], None) 1483 1484 def test_execv_with_bad_arglist(self): 1485 self.assertRaises(ValueError, os.execv, 'notepad', ()) 1486 self.assertRaises(ValueError, os.execv, 'notepad', []) 1487 self.assertRaises(ValueError, os.execv, 'notepad', ('',)) 1488 self.assertRaises(ValueError, os.execv, 'notepad', ['']) 1489 1490 def test_execvpe_with_bad_arglist(self): 1491 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 1492 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {}) 1493 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {}) 1494 1495 @unittest.skipUnless(hasattr(os, '_execvpe'), 1496 "No internal os._execvpe function to test.") 1497 def _test_internal_execvpe(self, test_type): 1498 program_path = os.sep + 'absolutepath' 1499 if test_type is bytes: 1500 program = b'executable' 1501 fullpath = os.path.join(os.fsencode(program_path), program) 1502 native_fullpath = fullpath 1503 arguments = [b'progname', 'arg1', 'arg2'] 1504 else: 1505 program = 'executable' 1506 arguments = ['progname', 'arg1', 'arg2'] 1507 fullpath = os.path.join(program_path, program) 1508 if os.name != "nt": 1509 native_fullpath = os.fsencode(fullpath) 1510 else: 1511 native_fullpath = fullpath 1512 env = {'spam': 'beans'} 1513 1514 # test os._execvpe() with an absolute path 1515 with _execvpe_mockup() as calls: 1516 self.assertRaises(RuntimeError, 1517 os._execvpe, fullpath, arguments) 1518 self.assertEqual(len(calls), 1) 1519 self.assertEqual(calls[0], ('execv', fullpath, (arguments,))) 1520 1521 # test os._execvpe() with a relative path: 1522 # os.get_exec_path() returns defpath 1523 with _execvpe_mockup(defpath=program_path) as calls: 1524 self.assertRaises(OSError, 1525 os._execvpe, program, arguments, env=env) 1526 self.assertEqual(len(calls), 1) 1527 self.assertSequenceEqual(calls[0], 1528 ('execve', native_fullpath, (arguments, env))) 1529 1530 # test os._execvpe() with a relative path: 1531 # os.get_exec_path() reads the 'PATH' variable 1532 with _execvpe_mockup() as calls: 1533 env_path = env.copy() 1534 if test_type is bytes: 1535 env_path[b'PATH'] = program_path 1536 else: 1537 env_path['PATH'] = program_path 1538 self.assertRaises(OSError, 1539 os._execvpe, program, arguments, env=env_path) 1540 self.assertEqual(len(calls), 1) 1541 self.assertSequenceEqual(calls[0], 1542 ('execve', native_fullpath, (arguments, env_path))) 1543 1544 def test_internal_execvpe_str(self): 1545 self._test_internal_execvpe(str) 1546 if os.name != "nt": 1547 self._test_internal_execvpe(bytes) 1548 1549 1550 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1551 class Win32ErrorTests(unittest.TestCase): 1552 def setUp(self): 1553 try: 1554 os.stat(support.TESTFN) 1555 except FileNotFoundError: 1556 exists = False 1557 except OSError as exc: 1558 exists = True 1559 self.fail("file %s must not exist; os.stat failed with %s" 1560 % (support.TESTFN, exc)) 1561 else: 1562 self.fail("file %s must not exist" % support.TESTFN) 1563 1564 def test_rename(self): 1565 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") 1566 1567 def test_remove(self): 1568 self.assertRaises(OSError, os.remove, support.TESTFN) 1569 1570 def test_chdir(self): 1571 self.assertRaises(OSError, os.chdir, support.TESTFN) 1572 1573 def test_mkdir(self): 1574 self.addCleanup(support.unlink, support.TESTFN) 1575 1576 with open(support.TESTFN, "x") as f: 1577 self.assertRaises(OSError, os.mkdir, support.TESTFN) 1578 1579 def test_utime(self): 1580 self.assertRaises(OSError, os.utime, support.TESTFN, None) 1581 1582 def test_chmod(self): 1583 self.assertRaises(OSError, os.chmod, support.TESTFN, 0) 1584 1585 1586 class TestInvalidFD(unittest.TestCase): 1587 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", 1588 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 1589 #singles.append("close") 1590 #We omit close because it doesn't raise an exception on some platforms 1591 def get_single(f): 1592 def helper(self): 1593 if hasattr(os, f): 1594 self.check(getattr(os, f)) 1595 return helper 1596 for f in singles: 1597 locals()["test_"+f] = get_single(f) 1598 1599 def check(self, f, *args): 1600 try: 1601 f(support.make_bad_fd(), *args) 1602 except OSError as e: 1603 self.assertEqual(e.errno, errno.EBADF) 1604 else: 1605 self.fail("%r didn't raise an OSError with a bad file descriptor" 1606 % f) 1607 1608 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 1609 def test_isatty(self): 1610 self.assertEqual(os.isatty(support.make_bad_fd()), False) 1611 1612 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 1613 def test_closerange(self): 1614 fd = support.make_bad_fd() 1615 # Make sure none of the descriptors we are about to close are 1616 # currently valid (issue 6542). 1617 for i in range(10): 1618 try: os.fstat(fd+i) 1619 except OSError: 1620 pass 1621 else: 1622 break 1623 if i < 2: 1624 raise unittest.SkipTest( 1625 "Unable to acquire a range of invalid file descriptors") 1626 self.assertEqual(os.closerange(fd, fd + i-1), None) 1627 1628 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 1629 def test_dup2(self): 1630 self.check(os.dup2, 20) 1631 1632 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 1633 def test_fchmod(self): 1634 self.check(os.fchmod, 0) 1635 1636 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 1637 def test_fchown(self): 1638 self.check(os.fchown, -1, -1) 1639 1640 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 1641 def test_fpathconf(self): 1642 self.check(os.pathconf, "PC_NAME_MAX") 1643 self.check(os.fpathconf, "PC_NAME_MAX") 1644 1645 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 1646 def test_ftruncate(self): 1647 self.check(os.truncate, 0) 1648 self.check(os.ftruncate, 0) 1649 1650 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 1651 def test_lseek(self): 1652 self.check(os.lseek, 0, 0) 1653 1654 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 1655 def test_read(self): 1656 self.check(os.read, 1) 1657 1658 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()') 1659 def test_readv(self): 1660 buf = bytearray(10) 1661 self.check(os.readv, [buf]) 1662 1663 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 1664 def test_tcsetpgrpt(self): 1665 self.check(os.tcsetpgrp, 0) 1666 1667 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 1668 def test_write(self): 1669 self.check(os.write, b" ") 1670 1671 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') 1672 def test_writev(self): 1673 self.check(os.writev, [b'abc']) 1674 1675 def test_inheritable(self): 1676 self.check(os.get_inheritable) 1677 self.check(os.set_inheritable, True) 1678 1679 @unittest.skipUnless(hasattr(os, 'get_blocking'), 1680 'needs os.get_blocking() and os.set_blocking()') 1681 def test_blocking(self): 1682 self.check(os.get_blocking) 1683 self.check(os.set_blocking, True) 1684 1685 1686 class LinkTests(unittest.TestCase): 1687 def setUp(self): 1688 self.file1 = support.TESTFN 1689 self.file2 = os.path.join(support.TESTFN + "2") 1690 1691 def tearDown(self): 1692 for file in (self.file1, self.file2): 1693 if os.path.exists(file): 1694 os.unlink(file) 1695 1696 def _test_link(self, file1, file2): 1697 create_file(file1) 1698 1699 os.link(file1, file2) 1700 with open(file1, "r") as f1, open(file2, "r") as f2: 1701 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) 1702 1703 def test_link(self): 1704 self._test_link(self.file1, self.file2) 1705 1706 def test_link_bytes(self): 1707 self._test_link(bytes(self.file1, sys.getfilesystemencoding()), 1708 bytes(self.file2, sys.getfilesystemencoding())) 1709 1710 def test_unicode_name(self): 1711 try: 1712 os.fsencode("\xf1") 1713 except UnicodeError: 1714 raise unittest.SkipTest("Unable to encode for this platform.") 1715 1716 self.file1 += "\xf1" 1717 self.file2 = self.file1 + "2" 1718 self._test_link(self.file1, self.file2) 1719 1720 @unittest.skipIf(sys.platform == "win32", "Posix specific tests") 1721 class PosixUidGidTests(unittest.TestCase): 1722 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 1723 def test_setuid(self): 1724 if os.getuid() != 0: 1725 self.assertRaises(OSError, os.setuid, 0) 1726 self.assertRaises(OverflowError, os.setuid, 1<<32) 1727 1728 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 1729 def test_setgid(self): 1730 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 1731 self.assertRaises(OSError, os.setgid, 0) 1732 self.assertRaises(OverflowError, os.setgid, 1<<32) 1733 1734 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 1735 def test_seteuid(self): 1736 if os.getuid() != 0: 1737 self.assertRaises(OSError, os.seteuid, 0) 1738 self.assertRaises(OverflowError, os.seteuid, 1<<32) 1739 1740 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 1741 def test_setegid(self): 1742 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 1743 self.assertRaises(OSError, os.setegid, 0) 1744 self.assertRaises(OverflowError, os.setegid, 1<<32) 1745 1746 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 1747 def test_setreuid(self): 1748 if os.getuid() != 0: 1749 self.assertRaises(OSError, os.setreuid, 0, 0) 1750 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) 1751 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) 1752 1753 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 1754 def test_setreuid_neg1(self): 1755 # Needs to accept -1. We run this in a subprocess to avoid 1756 # altering the test runner's process state (issue8045). 1757 subprocess.check_call([ 1758 sys.executable, '-c', 1759 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 1760 1761 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 1762 def test_setregid(self): 1763 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 1764 self.assertRaises(OSError, os.setregid, 0, 0) 1765 self.assertRaises(OverflowError, os.setregid, 1<<32, 0) 1766 self.assertRaises(OverflowError, os.setregid, 0, 1<<32) 1767 1768 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 1769 def test_setregid_neg1(self): 1770 # Needs to accept -1. We run this in a subprocess to avoid 1771 # altering the test runner's process state (issue8045). 1772 subprocess.check_call([ 1773 sys.executable, '-c', 1774 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 1775 1776 @unittest.skipIf(sys.platform == "win32", "Posix specific tests") 1777 class Pep383Tests(unittest.TestCase): 1778 def setUp(self): 1779 if support.TESTFN_UNENCODABLE: 1780 self.dir = support.TESTFN_UNENCODABLE 1781 elif support.TESTFN_NONASCII: 1782 self.dir = support.TESTFN_NONASCII 1783 else: 1784 self.dir = support.TESTFN 1785 self.bdir = os.fsencode(self.dir) 1786 1787 bytesfn = [] 1788 def add_filename(fn): 1789 try: 1790 fn = os.fsencode(fn) 1791 except UnicodeEncodeError: 1792 return 1793 bytesfn.append(fn) 1794 add_filename(support.TESTFN_UNICODE) 1795 if support.TESTFN_UNENCODABLE: 1796 add_filename(support.TESTFN_UNENCODABLE) 1797 if support.TESTFN_NONASCII: 1798 add_filename(support.TESTFN_NONASCII) 1799 if not bytesfn: 1800 self.skipTest("couldn't create any non-ascii filename") 1801 1802 self.unicodefn = set() 1803 os.mkdir(self.dir) 1804 try: 1805 for fn in bytesfn: 1806 support.create_empty_file(os.path.join(self.bdir, fn)) 1807 fn = os.fsdecode(fn) 1808 if fn in self.unicodefn: 1809 raise ValueError("duplicate filename") 1810 self.unicodefn.add(fn) 1811 except: 1812 shutil.rmtree(self.dir) 1813 raise 1814 1815 def tearDown(self): 1816 shutil.rmtree(self.dir) 1817 1818 def test_listdir(self): 1819 expected = self.unicodefn 1820 found = set(os.listdir(self.dir)) 1821 self.assertEqual(found, expected) 1822 # test listdir without arguments 1823 current_directory = os.getcwd() 1824 try: 1825 os.chdir(os.sep) 1826 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 1827 finally: 1828 os.chdir(current_directory) 1829 1830 def test_open(self): 1831 for fn in self.unicodefn: 1832 f = open(os.path.join(self.dir, fn), 'rb') 1833 f.close() 1834 1835 @unittest.skipUnless(hasattr(os, 'statvfs'), 1836 "need os.statvfs()") 1837 def test_statvfs(self): 1838 # issue #9645 1839 for fn in self.unicodefn: 1840 # should not fail with file not found error 1841 fullname = os.path.join(self.dir, fn) 1842 os.statvfs(fullname) 1843 1844 def test_stat(self): 1845 for fn in self.unicodefn: 1846 os.stat(os.path.join(self.dir, fn)) 1847 1848 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1849 class Win32KillTests(unittest.TestCase): 1850 def _kill(self, sig): 1851 # Start sys.executable as a subprocess and communicate from the 1852 # subprocess to the parent that the interpreter is ready. When it 1853 # becomes ready, send *sig* via os.kill to the subprocess and check 1854 # that the return code is equal to *sig*. 1855 import ctypes 1856 from ctypes import wintypes 1857 import msvcrt 1858 1859 # Since we can't access the contents of the process' stdout until the 1860 # process has exited, use PeekNamedPipe to see what's inside stdout 1861 # without waiting. This is done so we can tell that the interpreter 1862 # is started and running at a point where it could handle a signal. 1863 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 1864 PeekNamedPipe.restype = wintypes.BOOL 1865 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 1866 ctypes.POINTER(ctypes.c_char), # stdout buf 1867 wintypes.DWORD, # Buffer size 1868 ctypes.POINTER(wintypes.DWORD), # bytes read 1869 ctypes.POINTER(wintypes.DWORD), # bytes avail 1870 ctypes.POINTER(wintypes.DWORD)) # bytes left 1871 msg = "running" 1872 proc = subprocess.Popen([sys.executable, "-c", 1873 "import sys;" 1874 "sys.stdout.write('{}');" 1875 "sys.stdout.flush();" 1876 "input()".format(msg)], 1877 stdout=subprocess.PIPE, 1878 stderr=subprocess.PIPE, 1879 stdin=subprocess.PIPE) 1880 self.addCleanup(proc.stdout.close) 1881 self.addCleanup(proc.stderr.close) 1882 self.addCleanup(proc.stdin.close) 1883 1884 count, max = 0, 100 1885 while count < max and proc.poll() is None: 1886 # Create a string buffer to store the result of stdout from the pipe 1887 buf = ctypes.create_string_buffer(len(msg)) 1888 # Obtain the text currently in proc.stdout 1889 # Bytes read/avail/left are left as NULL and unused 1890 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 1891 buf, ctypes.sizeof(buf), None, None, None) 1892 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 1893 if buf.value: 1894 self.assertEqual(msg, buf.value.decode()) 1895 break 1896 time.sleep(0.1) 1897 count += 1 1898 else: 1899 self.fail("Did not receive communication from the subprocess") 1900 1901 os.kill(proc.pid, sig) 1902 self.assertEqual(proc.wait(), sig) 1903 1904 def test_kill_sigterm(self): 1905 # SIGTERM doesn't mean anything special, but make sure it works 1906 self._kill(signal.SIGTERM) 1907 1908 def test_kill_int(self): 1909 # os.kill on Windows can take an int which gets set as the exit code 1910 self._kill(100) 1911 1912 def _kill_with_event(self, event, name): 1913 tagname = "test_os_%s" % uuid.uuid1() 1914 m = mmap.mmap(-1, 1, tagname) 1915 m[0] = 0 1916 # Run a script which has console control handling enabled. 1917 proc = subprocess.Popen([sys.executable, 1918 os.path.join(os.path.dirname(__file__), 1919 "win_console_handler.py"), tagname], 1920 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 1921 # Let the interpreter startup before we send signals. See #3137. 1922 count, max = 0, 100 1923 while count < max and proc.poll() is None: 1924 if m[0] == 1: 1925 break 1926 time.sleep(0.1) 1927 count += 1 1928 else: 1929 # Forcefully kill the process if we weren't able to signal it. 1930 os.kill(proc.pid, signal.SIGINT) 1931 self.fail("Subprocess didn't finish initialization") 1932 os.kill(proc.pid, event) 1933 # proc.send_signal(event) could also be done here. 1934 # Allow time for the signal to be passed and the process to exit. 1935 time.sleep(0.5) 1936 if not proc.poll(): 1937 # Forcefully kill the process if we weren't able to signal it. 1938 os.kill(proc.pid, signal.SIGINT) 1939 self.fail("subprocess did not stop on {}".format(name)) 1940 1941 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 1942 def test_CTRL_C_EVENT(self): 1943 from ctypes import wintypes 1944 import ctypes 1945 1946 # Make a NULL value by creating a pointer with no argument. 1947 NULL = ctypes.POINTER(ctypes.c_int)() 1948 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 1949 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 1950 wintypes.BOOL) 1951 SetConsoleCtrlHandler.restype = wintypes.BOOL 1952 1953 # Calling this with NULL and FALSE causes the calling process to 1954 # handle Ctrl+C, rather than ignore it. This property is inherited 1955 # by subprocesses. 1956 SetConsoleCtrlHandler(NULL, 0) 1957 1958 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 1959 1960 def test_CTRL_BREAK_EVENT(self): 1961 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 1962 1963 1964 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1965 class Win32ListdirTests(unittest.TestCase): 1966 """Test listdir on Windows.""" 1967 1968 def setUp(self): 1969 self.created_paths = [] 1970 for i in range(2): 1971 dir_name = 'SUB%d' % i 1972 dir_path = os.path.join(support.TESTFN, dir_name) 1973 file_name = 'FILE%d' % i 1974 file_path = os.path.join(support.TESTFN, file_name) 1975 os.makedirs(dir_path) 1976 with open(file_path, 'w') as f: 1977 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) 1978 self.created_paths.extend([dir_name, file_name]) 1979 self.created_paths.sort() 1980 1981 def tearDown(self): 1982 shutil.rmtree(support.TESTFN) 1983 1984 def test_listdir_no_extended_path(self): 1985 """Test when the path is not an "extended" path.""" 1986 # unicode 1987 self.assertEqual( 1988 sorted(os.listdir(support.TESTFN)), 1989 self.created_paths) 1990 1991 # bytes 1992 self.assertEqual( 1993 sorted(os.listdir(os.fsencode(support.TESTFN))), 1994 [os.fsencode(path) for path in self.created_paths]) 1995 1996 def test_listdir_extended_path(self): 1997 """Test when the path starts with '\\\\?\\'.""" 1998 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath 1999 # unicode 2000 path = '\\\\?\\' + os.path.abspath(support.TESTFN) 2001 self.assertEqual( 2002 sorted(os.listdir(path)), 2003 self.created_paths) 2004 2005 # bytes 2006 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) 2007 self.assertEqual( 2008 sorted(os.listdir(path)), 2009 [os.fsencode(path) for path in self.created_paths]) 2010 2011 2012 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2013 @support.skip_unless_symlink 2014 class Win32SymlinkTests(unittest.TestCase): 2015 filelink = 'filelinktest' 2016 filelink_target = os.path.abspath(__file__) 2017 dirlink = 'dirlinktest' 2018 dirlink_target = os.path.dirname(filelink_target) 2019 missing_link = 'missing link' 2020 2021 def setUp(self): 2022 assert os.path.exists(self.dirlink_target) 2023 assert os.path.exists(self.filelink_target) 2024 assert not os.path.exists(self.dirlink) 2025 assert not os.path.exists(self.filelink) 2026 assert not os.path.exists(self.missing_link) 2027 2028 def tearDown(self): 2029 if os.path.exists(self.filelink): 2030 os.remove(self.filelink) 2031 if os.path.exists(self.dirlink): 2032 os.rmdir(self.dirlink) 2033 if os.path.lexists(self.missing_link): 2034 os.remove(self.missing_link) 2035 2036 def test_directory_link(self): 2037 os.symlink(self.dirlink_target, self.dirlink) 2038 self.assertTrue(os.path.exists(self.dirlink)) 2039 self.assertTrue(os.path.isdir(self.dirlink)) 2040 self.assertTrue(os.path.islink(self.dirlink)) 2041 self.check_stat(self.dirlink, self.dirlink_target) 2042 2043 def test_file_link(self): 2044 os.symlink(self.filelink_target, self.filelink) 2045 self.assertTrue(os.path.exists(self.filelink)) 2046 self.assertTrue(os.path.isfile(self.filelink)) 2047 self.assertTrue(os.path.islink(self.filelink)) 2048 self.check_stat(self.filelink, self.filelink_target) 2049 2050 def _create_missing_dir_link(self): 2051 'Create a "directory" link to a non-existent target' 2052 linkname = self.missing_link 2053 if os.path.lexists(linkname): 2054 os.remove(linkname) 2055 target = r'c:\\target does not exist.29r3c740' 2056 assert not os.path.exists(target) 2057 target_is_dir = True 2058 os.symlink(target, linkname, target_is_dir) 2059 2060 def test_remove_directory_link_to_missing_target(self): 2061 self._create_missing_dir_link() 2062 # For compatibility with Unix, os.remove will check the 2063 # directory status and call RemoveDirectory if the symlink 2064 # was created with target_is_dir==True. 2065 os.remove(self.missing_link) 2066 2067 @unittest.skip("currently fails; consider for improvement") 2068 def test_isdir_on_directory_link_to_missing_target(self): 2069 self._create_missing_dir_link() 2070 # consider having isdir return true for directory links 2071 self.assertTrue(os.path.isdir(self.missing_link)) 2072 2073 @unittest.skip("currently fails; consider for improvement") 2074 def test_rmdir_on_directory_link_to_missing_target(self): 2075 self._create_missing_dir_link() 2076 # consider allowing rmdir to remove directory links 2077 os.rmdir(self.missing_link) 2078 2079 def check_stat(self, link, target): 2080 self.assertEqual(os.stat(link), os.stat(target)) 2081 self.assertNotEqual(os.lstat(link), os.stat(link)) 2082 2083 bytes_link = os.fsencode(link) 2084 self.assertEqual(os.stat(bytes_link), os.stat(target)) 2085 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) 2086 2087 def test_12084(self): 2088 level1 = os.path.abspath(support.TESTFN) 2089 level2 = os.path.join(level1, "level2") 2090 level3 = os.path.join(level2, "level3") 2091 self.addCleanup(support.rmtree, level1) 2092 2093 os.mkdir(level1) 2094 os.mkdir(level2) 2095 os.mkdir(level3) 2096 2097 file1 = os.path.abspath(os.path.join(level1, "file1")) 2098 create_file(file1) 2099 2100 orig_dir = os.getcwd() 2101 try: 2102 os.chdir(level2) 2103 link = os.path.join(level2, "link") 2104 os.symlink(os.path.relpath(file1), "link") 2105 self.assertIn("link", os.listdir(os.getcwd())) 2106 2107 # Check os.stat calls from the same dir as the link 2108 self.assertEqual(os.stat(file1), os.stat("link")) 2109 2110 # Check os.stat calls from a dir below the link 2111 os.chdir(level1) 2112 self.assertEqual(os.stat(file1), 2113 os.stat(os.path.relpath(link))) 2114 2115 # Check os.stat calls from a dir above the link 2116 os.chdir(level3) 2117 self.assertEqual(os.stat(file1), 2118 os.stat(os.path.relpath(link))) 2119 finally: 2120 os.chdir(orig_dir) 2121 2122 2123 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2124 class Win32JunctionTests(unittest.TestCase): 2125 junction = 'junctiontest' 2126 junction_target = os.path.dirname(os.path.abspath(__file__)) 2127 2128 def setUp(self): 2129 assert os.path.exists(self.junction_target) 2130 assert not os.path.exists(self.junction) 2131 2132 def tearDown(self): 2133 if os.path.exists(self.junction): 2134 # os.rmdir delegates to Windows' RemoveDirectoryW, 2135 # which removes junction points safely. 2136 os.rmdir(self.junction) 2137 2138 def test_create_junction(self): 2139 _winapi.CreateJunction(self.junction_target, self.junction) 2140 self.assertTrue(os.path.exists(self.junction)) 2141 self.assertTrue(os.path.isdir(self.junction)) 2142 2143 # Junctions are not recognized as links. 2144 self.assertFalse(os.path.islink(self.junction)) 2145 2146 def test_unlink_removes_junction(self): 2147 _winapi.CreateJunction(self.junction_target, self.junction) 2148 self.assertTrue(os.path.exists(self.junction)) 2149 2150 os.unlink(self.junction) 2151 self.assertFalse(os.path.exists(self.junction)) 2152 2153 2154 @support.skip_unless_symlink 2155 class NonLocalSymlinkTests(unittest.TestCase): 2156 2157 def setUp(self): 2158 r""" 2159 Create this structure: 2160 2161 base 2162 \___ some_dir 2163 """ 2164 os.makedirs('base/some_dir') 2165 2166 def tearDown(self): 2167 shutil.rmtree('base') 2168 2169 def test_directory_link_nonlocal(self): 2170 """ 2171 The symlink target should resolve relative to the link, not relative 2172 to the current directory. 2173 2174 Then, link base/some_link -> base/some_dir and ensure that some_link 2175 is resolved as a directory. 2176 2177 In issue13772, it was discovered that directory detection failed if 2178 the symlink target was not specified relative to the current 2179 directory, which was a defect in the implementation. 2180 """ 2181 src = os.path.join('base', 'some_link') 2182 os.symlink('some_dir', src) 2183 assert os.path.isdir(src) 2184 2185 2186 class FSEncodingTests(unittest.TestCase): 2187 def test_nop(self): 2188 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 2189 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 2190 2191 def test_identity(self): 2192 # assert fsdecode(fsencode(x)) == x 2193 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 2194 try: 2195 bytesfn = os.fsencode(fn) 2196 except UnicodeEncodeError: 2197 continue 2198 self.assertEqual(os.fsdecode(bytesfn), fn) 2199 2200 2201 2202 class DeviceEncodingTests(unittest.TestCase): 2203 2204 def test_bad_fd(self): 2205 # Return None when an fd doesn't actually exist. 2206 self.assertIsNone(os.device_encoding(123456)) 2207 2208 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or 2209 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))), 2210 'test requires a tty and either Windows or nl_langinfo(CODESET)') 2211 def test_device_encoding(self): 2212 encoding = os.device_encoding(0) 2213 self.assertIsNotNone(encoding) 2214 self.assertTrue(codecs.lookup(encoding)) 2215 2216 2217 class PidTests(unittest.TestCase): 2218 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid") 2219 def test_getppid(self): 2220 p = subprocess.Popen([sys.executable, '-c', 2221 'import os; print(os.getppid())'], 2222 stdout=subprocess.PIPE) 2223 stdout, _ = p.communicate() 2224 # We are the parent of our subprocess 2225 self.assertEqual(int(stdout), os.getpid()) 2226 2227 def test_waitpid(self): 2228 args = [sys.executable, '-c', 'pass'] 2229 # Add an implicit test for PyUnicode_FSConverter(). 2230 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args) 2231 status = os.waitpid(pid, 0) 2232 self.assertEqual(status, (pid, 0)) 2233 2234 2235 class SpawnTests(unittest.TestCase): 2236 def create_args(self, *, with_env=False, use_bytes=False): 2237 self.exitcode = 17 2238 2239 filename = support.TESTFN 2240 self.addCleanup(support.unlink, filename) 2241 2242 if not with_env: 2243 code = 'import sys; sys.exit(%s)' % self.exitcode 2244 else: 2245 self.env = dict(os.environ) 2246 # create an unique key 2247 self.key = str(uuid.uuid4()) 2248 self.env[self.key] = self.key 2249 # read the variable from os.environ to check that it exists 2250 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' 2251 % (self.key, self.exitcode)) 2252 2253 with open(filename, "w") as fp: 2254 fp.write(code) 2255 2256 args = [sys.executable, filename] 2257 if use_bytes: 2258 args = [os.fsencode(a) for a in args] 2259 self.env = {os.fsencode(k): os.fsencode(v) 2260 for k, v in self.env.items()} 2261 2262 return args 2263 2264 @requires_os_func('spawnl') 2265 def test_spawnl(self): 2266 args = self.create_args() 2267 exitcode = os.spawnl(os.P_WAIT, args[0], *args) 2268 self.assertEqual(exitcode, self.exitcode) 2269 2270 @requires_os_func('spawnle') 2271 def test_spawnle(self): 2272 args = self.create_args(with_env=True) 2273 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env) 2274 self.assertEqual(exitcode, self.exitcode) 2275 2276 @requires_os_func('spawnlp') 2277 def test_spawnlp(self): 2278 args = self.create_args() 2279 exitcode = os.spawnlp(os.P_WAIT, args[0], *args) 2280 self.assertEqual(exitcode, self.exitcode) 2281 2282 @requires_os_func('spawnlpe') 2283 def test_spawnlpe(self): 2284 args = self.create_args(with_env=True) 2285 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env) 2286 self.assertEqual(exitcode, self.exitcode) 2287 2288 @requires_os_func('spawnv') 2289 def test_spawnv(self): 2290 args = self.create_args() 2291 exitcode = os.spawnv(os.P_WAIT, args[0], args) 2292 self.assertEqual(exitcode, self.exitcode) 2293 2294 @requires_os_func('spawnve') 2295 def test_spawnve(self): 2296 args = self.create_args(with_env=True) 2297 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 2298 self.assertEqual(exitcode, self.exitcode) 2299 2300 @requires_os_func('spawnvp') 2301 def test_spawnvp(self): 2302 args = self.create_args() 2303 exitcode = os.spawnvp(os.P_WAIT, args[0], args) 2304 self.assertEqual(exitcode, self.exitcode) 2305 2306 @requires_os_func('spawnvpe') 2307 def test_spawnvpe(self): 2308 args = self.create_args(with_env=True) 2309 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env) 2310 self.assertEqual(exitcode, self.exitcode) 2311 2312 @requires_os_func('spawnv') 2313 def test_nowait(self): 2314 args = self.create_args() 2315 pid = os.spawnv(os.P_NOWAIT, args[0], args) 2316 result = os.waitpid(pid, 0) 2317 self.assertEqual(result[0], pid) 2318 status = result[1] 2319 if hasattr(os, 'WIFEXITED'): 2320 self.assertTrue(os.WIFEXITED(status)) 2321 self.assertEqual(os.WEXITSTATUS(status), self.exitcode) 2322 else: 2323 self.assertEqual(status, self.exitcode << 8) 2324 2325 @requires_os_func('spawnve') 2326 def test_spawnve_bytes(self): 2327 # Test bytes handling in parse_arglist and parse_envlist (#28114) 2328 args = self.create_args(with_env=True, use_bytes=True) 2329 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 2330 self.assertEqual(exitcode, self.exitcode) 2331 2332 @requires_os_func('spawnl') 2333 def test_spawnl_noargs(self): 2334 args = self.create_args() 2335 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0]) 2336 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '') 2337 2338 @requires_os_func('spawnle') 2339 def test_spawnle_noargs(self): 2340 args = self.create_args() 2341 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {}) 2342 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {}) 2343 2344 @requires_os_func('spawnv') 2345 def test_spawnv_noargs(self): 2346 args = self.create_args() 2347 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ()) 2348 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], []) 2349 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',)) 2350 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ['']) 2351 2352 @requires_os_func('spawnve') 2353 def test_spawnve_noargs(self): 2354 args = self.create_args() 2355 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {}) 2356 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {}) 2357 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {}) 2358 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {}) 2359 2360 # The introduction of this TestCase caused at least two different errors on 2361 # *nix buildbots. Temporarily skip this to let the buildbots move along. 2362 @unittest.skip("Skip due to platform/environment differences on *NIX buildbots") 2363 @unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin") 2364 class LoginTests(unittest.TestCase): 2365 def test_getlogin(self): 2366 user_name = os.getlogin() 2367 self.assertNotEqual(len(user_name), 0) 2368 2369 2370 @unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), 2371 "needs os.getpriority and os.setpriority") 2372 class ProgramPriorityTests(unittest.TestCase): 2373 """Tests for os.getpriority() and os.setpriority().""" 2374 2375 def test_set_get_priority(self): 2376 2377 base = os.getpriority(os.PRIO_PROCESS, os.getpid()) 2378 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) 2379 try: 2380 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) 2381 if base >= 19 and new_prio <= 19: 2382 raise unittest.SkipTest("unable to reliably test setpriority " 2383 "at current nice level of %s" % base) 2384 else: 2385 self.assertEqual(new_prio, base + 1) 2386 finally: 2387 try: 2388 os.setpriority(os.PRIO_PROCESS, os.getpid(), base) 2389 except OSError as err: 2390 if err.errno != errno.EACCES: 2391 raise 2392 2393 2394 if threading is not None: 2395 class SendfileTestServer(asyncore.dispatcher, threading.Thread): 2396 2397 class Handler(asynchat.async_chat): 2398 2399 def __init__(self, conn): 2400 asynchat.async_chat.__init__(self, conn) 2401 self.in_buffer = [] 2402 self.closed = False 2403 self.push(b"220 ready\r\n") 2404 2405 def handle_read(self): 2406 data = self.recv(4096) 2407 self.in_buffer.append(data) 2408 2409 def get_data(self): 2410 return b''.join(self.in_buffer) 2411 2412 def handle_close(self): 2413 self.close() 2414 self.closed = True 2415 2416 def handle_error(self): 2417 raise 2418 2419 def __init__(self, address): 2420 threading.Thread.__init__(self) 2421 asyncore.dispatcher.__init__(self) 2422 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 2423 self.bind(address) 2424 self.listen(5) 2425 self.host, self.port = self.socket.getsockname()[:2] 2426 self.handler_instance = None 2427 self._active = False 2428 self._active_lock = threading.Lock() 2429 2430 # --- public API 2431 2432 @property 2433 def running(self): 2434 return self._active 2435 2436 def start(self): 2437 assert not self.running 2438 self.__flag = threading.Event() 2439 threading.Thread.start(self) 2440 self.__flag.wait() 2441 2442 def stop(self): 2443 assert self.running 2444 self._active = False 2445 self.join() 2446 2447 def wait(self): 2448 # wait for handler connection to be closed, then stop the server 2449 while not getattr(self.handler_instance, "closed", False): 2450 time.sleep(0.001) 2451 self.stop() 2452 2453 # --- internals 2454 2455 def run(self): 2456 self._active = True 2457 self.__flag.set() 2458 while self._active and asyncore.socket_map: 2459 self._active_lock.acquire() 2460 asyncore.loop(timeout=0.001, count=1) 2461 self._active_lock.release() 2462 asyncore.close_all() 2463 2464 def handle_accept(self): 2465 conn, addr = self.accept() 2466 self.handler_instance = self.Handler(conn) 2467 2468 def handle_connect(self): 2469 self.close() 2470 handle_read = handle_connect 2471 2472 def writable(self): 2473 return 0 2474 2475 def handle_error(self): 2476 raise 2477 2478 2479 @unittest.skipUnless(threading is not None, "test needs threading module") 2480 @unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 2481 class TestSendfile(unittest.TestCase): 2482 2483 DATA = b"12345abcde" * 16 * 1024 # 160 KB 2484 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 2485 not sys.platform.startswith("solaris") and \ 2486 not sys.platform.startswith("sunos") 2487 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 2488 'requires headers and trailers support') 2489 2490 @classmethod 2491 def setUpClass(cls): 2492 cls.key = support.threading_setup() 2493 create_file(support.TESTFN, cls.DATA) 2494 2495 @classmethod 2496 def tearDownClass(cls): 2497 support.threading_cleanup(*cls.key) 2498 support.unlink(support.TESTFN) 2499 2500 def setUp(self): 2501 self.server = SendfileTestServer((support.HOST, 0)) 2502 self.server.start() 2503 self.client = socket.socket() 2504 self.client.connect((self.server.host, self.server.port)) 2505 self.client.settimeout(1) 2506 # synchronize by waiting for "220 ready" response 2507 self.client.recv(1024) 2508 self.sockno = self.client.fileno() 2509 self.file = open(support.TESTFN, 'rb') 2510 self.fileno = self.file.fileno() 2511 2512 def tearDown(self): 2513 self.file.close() 2514 self.client.close() 2515 if self.server.running: 2516 self.server.stop() 2517 2518 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]): 2519 """A higher level wrapper representing how an application is 2520 supposed to use sendfile(). 2521 """ 2522 while 1: 2523 try: 2524 if self.SUPPORT_HEADERS_TRAILERS: 2525 return os.sendfile(sock, file, offset, nbytes, headers, 2526 trailers) 2527 else: 2528 return os.sendfile(sock, file, offset, nbytes) 2529 except OSError as err: 2530 if err.errno == errno.ECONNRESET: 2531 # disconnected 2532 raise 2533 elif err.errno in (errno.EAGAIN, errno.EBUSY): 2534 # we have to retry send data 2535 continue 2536 else: 2537 raise 2538 2539 def test_send_whole_file(self): 2540 # normal send 2541 total_sent = 0 2542 offset = 0 2543 nbytes = 4096 2544 while total_sent < len(self.DATA): 2545 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 2546 if sent == 0: 2547 break 2548 offset += sent 2549 total_sent += sent 2550 self.assertTrue(sent <= nbytes) 2551 self.assertEqual(offset, total_sent) 2552 2553 self.assertEqual(total_sent, len(self.DATA)) 2554 self.client.shutdown(socket.SHUT_RDWR) 2555 self.client.close() 2556 self.server.wait() 2557 data = self.server.handler_instance.get_data() 2558 self.assertEqual(len(data), len(self.DATA)) 2559 self.assertEqual(data, self.DATA) 2560 2561 def test_send_at_certain_offset(self): 2562 # start sending a file at a certain offset 2563 total_sent = 0 2564 offset = len(self.DATA) // 2 2565 must_send = len(self.DATA) - offset 2566 nbytes = 4096 2567 while total_sent < must_send: 2568 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 2569 if sent == 0: 2570 break 2571 offset += sent 2572 total_sent += sent 2573 self.assertTrue(sent <= nbytes) 2574 2575 self.client.shutdown(socket.SHUT_RDWR) 2576 self.client.close() 2577 self.server.wait() 2578 data = self.server.handler_instance.get_data() 2579 expected = self.DATA[len(self.DATA) // 2:] 2580 self.assertEqual(total_sent, len(expected)) 2581 self.assertEqual(len(data), len(expected)) 2582 self.assertEqual(data, expected) 2583 2584 def test_offset_overflow(self): 2585 # specify an offset > file size 2586 offset = len(self.DATA) + 4096 2587 try: 2588 sent = os.sendfile(self.sockno, self.fileno, offset, 4096) 2589 except OSError as e: 2590 # Solaris can raise EINVAL if offset >= file length, ignore. 2591 if e.errno != errno.EINVAL: 2592 raise 2593 else: 2594 self.assertEqual(sent, 0) 2595 self.client.shutdown(socket.SHUT_RDWR) 2596 self.client.close() 2597 self.server.wait() 2598 data = self.server.handler_instance.get_data() 2599 self.assertEqual(data, b'') 2600 2601 def test_invalid_offset(self): 2602 with self.assertRaises(OSError) as cm: 2603 os.sendfile(self.sockno, self.fileno, -1, 4096) 2604 self.assertEqual(cm.exception.errno, errno.EINVAL) 2605 2606 def test_keywords(self): 2607 # Keyword arguments should be supported 2608 os.sendfile(out=self.sockno, offset=0, count=4096, 2609 **{'in': self.fileno}) 2610 if self.SUPPORT_HEADERS_TRAILERS: 2611 os.sendfile(self.sockno, self.fileno, offset=0, count=4096, 2612 headers=(), trailers=(), flags=0) 2613 2614 # --- headers / trailers tests 2615 2616 @requires_headers_trailers 2617 def test_headers(self): 2618 total_sent = 0 2619 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, 2620 headers=[b"x" * 512]) 2621 total_sent += sent 2622 offset = 4096 2623 nbytes = 4096 2624 while 1: 2625 sent = self.sendfile_wrapper(self.sockno, self.fileno, 2626 offset, nbytes) 2627 if sent == 0: 2628 break 2629 total_sent += sent 2630 offset += sent 2631 2632 expected_data = b"x" * 512 + self.DATA 2633 self.assertEqual(total_sent, len(expected_data)) 2634 self.client.close() 2635 self.server.wait() 2636 data = self.server.handler_instance.get_data() 2637 self.assertEqual(hash(data), hash(expected_data)) 2638 2639 @requires_headers_trailers 2640 def test_trailers(self): 2641 TESTFN2 = support.TESTFN + "2" 2642 file_data = b"abcdef" 2643 2644 self.addCleanup(support.unlink, TESTFN2) 2645 create_file(TESTFN2, file_data) 2646 2647 with open(TESTFN2, 'rb') as f: 2648 os.sendfile(self.sockno, f.fileno(), 0, len(file_data), 2649 trailers=[b"1234"]) 2650 self.client.close() 2651 self.server.wait() 2652 data = self.server.handler_instance.get_data() 2653 self.assertEqual(data, b"abcdef1234") 2654 2655 @requires_headers_trailers 2656 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), 2657 'test needs os.SF_NODISKIO') 2658 def test_flags(self): 2659 try: 2660 os.sendfile(self.sockno, self.fileno, 0, 4096, 2661 flags=os.SF_NODISKIO) 2662 except OSError as err: 2663 if err.errno not in (errno.EBUSY, errno.EAGAIN): 2664 raise 2665 2666 2667 def supports_extended_attributes(): 2668 if not hasattr(os, "setxattr"): 2669 return False 2670 2671 try: 2672 with open(support.TESTFN, "xb", 0) as fp: 2673 try: 2674 os.setxattr(fp.fileno(), b"user.test", b"") 2675 except OSError: 2676 return False 2677 finally: 2678 support.unlink(support.TESTFN) 2679 2680 return True 2681 2682 2683 @unittest.skipUnless(supports_extended_attributes(), 2684 "no non-broken extended attribute support") 2685 # Kernels < 2.6.39 don't respect setxattr flags. 2686 @support.requires_linux_version(2, 6, 39) 2687 class ExtendedAttributeTests(unittest.TestCase): 2688 2689 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): 2690 fn = support.TESTFN 2691 self.addCleanup(support.unlink, fn) 2692 create_file(fn) 2693 2694 with self.assertRaises(OSError) as cm: 2695 getxattr(fn, s("user.test"), **kwargs) 2696 self.assertEqual(cm.exception.errno, errno.ENODATA) 2697 2698 init_xattr = listxattr(fn) 2699 self.assertIsInstance(init_xattr, list) 2700 2701 setxattr(fn, s("user.test"), b"", **kwargs) 2702 xattr = set(init_xattr) 2703 xattr.add("user.test") 2704 self.assertEqual(set(listxattr(fn)), xattr) 2705 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") 2706 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) 2707 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") 2708 2709 with self.assertRaises(OSError) as cm: 2710 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) 2711 self.assertEqual(cm.exception.errno, errno.EEXIST) 2712 2713 with self.assertRaises(OSError) as cm: 2714 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) 2715 self.assertEqual(cm.exception.errno, errno.ENODATA) 2716 2717 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) 2718 xattr.add("user.test2") 2719 self.assertEqual(set(listxattr(fn)), xattr) 2720 removexattr(fn, s("user.test"), **kwargs) 2721 2722 with self.assertRaises(OSError) as cm: 2723 getxattr(fn, s("user.test"), **kwargs) 2724 self.assertEqual(cm.exception.errno, errno.ENODATA) 2725 2726 xattr.remove("user.test") 2727 self.assertEqual(set(listxattr(fn)), xattr) 2728 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") 2729 setxattr(fn, s("user.test"), b"a"*1024, **kwargs) 2730 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) 2731 removexattr(fn, s("user.test"), **kwargs) 2732 many = sorted("user.test{}".format(i) for i in range(100)) 2733 for thing in many: 2734 setxattr(fn, thing, b"x", **kwargs) 2735 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) 2736 2737 def _check_xattrs(self, *args, **kwargs): 2738 self._check_xattrs_str(str, *args, **kwargs) 2739 support.unlink(support.TESTFN) 2740 2741 self._check_xattrs_str(os.fsencode, *args, **kwargs) 2742 support.unlink(support.TESTFN) 2743 2744 def test_simple(self): 2745 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 2746 os.listxattr) 2747 2748 def test_lpath(self): 2749 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 2750 os.listxattr, follow_symlinks=False) 2751 2752 def test_fds(self): 2753 def getxattr(path, *args): 2754 with open(path, "rb") as fp: 2755 return os.getxattr(fp.fileno(), *args) 2756 def setxattr(path, *args): 2757 with open(path, "wb", 0) as fp: 2758 os.setxattr(fp.fileno(), *args) 2759 def removexattr(path, *args): 2760 with open(path, "wb", 0) as fp: 2761 os.removexattr(fp.fileno(), *args) 2762 def listxattr(path, *args): 2763 with open(path, "rb") as fp: 2764 return os.listxattr(fp.fileno(), *args) 2765 self._check_xattrs(getxattr, setxattr, removexattr, listxattr) 2766 2767 2768 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size") 2769 class TermsizeTests(unittest.TestCase): 2770 def test_does_not_crash(self): 2771 """Check if get_terminal_size() returns a meaningful value. 2772 2773 There's no easy portable way to actually check the size of the 2774 terminal, so let's check if it returns something sensible instead. 2775 """ 2776 try: 2777 size = os.get_terminal_size() 2778 except OSError as e: 2779 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 2780 # Under win32 a generic OSError can be thrown if the 2781 # handle cannot be retrieved 2782 self.skipTest("failed to query terminal size") 2783 raise 2784 2785 self.assertGreaterEqual(size.columns, 0) 2786 self.assertGreaterEqual(size.lines, 0) 2787 2788 def test_stty_match(self): 2789 """Check if stty returns the same results 2790 2791 stty actually tests stdin, so get_terminal_size is invoked on 2792 stdin explicitly. If stty succeeded, then get_terminal_size() 2793 should work too. 2794 """ 2795 try: 2796 size = subprocess.check_output(['stty', 'size']).decode().split() 2797 except (FileNotFoundError, subprocess.CalledProcessError): 2798 self.skipTest("stty invocation failed") 2799 expected = (int(size[1]), int(size[0])) # reversed order 2800 2801 try: 2802 actual = os.get_terminal_size(sys.__stdin__.fileno()) 2803 except OSError as e: 2804 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 2805 # Under win32 a generic OSError can be thrown if the 2806 # handle cannot be retrieved 2807 self.skipTest("failed to query terminal size") 2808 raise 2809 self.assertEqual(expected, actual) 2810 2811 2812 class OSErrorTests(unittest.TestCase): 2813 def setUp(self): 2814 class Str(str): 2815 pass 2816 2817 self.bytes_filenames = [] 2818 self.unicode_filenames = [] 2819 if support.TESTFN_UNENCODABLE is not None: 2820 decoded = support.TESTFN_UNENCODABLE 2821 else: 2822 decoded = support.TESTFN 2823 self.unicode_filenames.append(decoded) 2824 self.unicode_filenames.append(Str(decoded)) 2825 if support.TESTFN_UNDECODABLE is not None: 2826 encoded = support.TESTFN_UNDECODABLE 2827 else: 2828 encoded = os.fsencode(support.TESTFN) 2829 self.bytes_filenames.append(encoded) 2830 self.bytes_filenames.append(bytearray(encoded)) 2831 self.bytes_filenames.append(memoryview(encoded)) 2832 2833 self.filenames = self.bytes_filenames + self.unicode_filenames 2834 2835 def test_oserror_filename(self): 2836 funcs = [ 2837 (self.filenames, os.chdir,), 2838 (self.filenames, os.chmod, 0o777), 2839 (self.filenames, os.lstat,), 2840 (self.filenames, os.open, os.O_RDONLY), 2841 (self.filenames, os.rmdir,), 2842 (self.filenames, os.stat,), 2843 (self.filenames, os.unlink,), 2844 ] 2845 if sys.platform == "win32": 2846 funcs.extend(( 2847 (self.bytes_filenames, os.rename, b"dst"), 2848 (self.bytes_filenames, os.replace, b"dst"), 2849 (self.unicode_filenames, os.rename, "dst"), 2850 (self.unicode_filenames, os.replace, "dst"), 2851 (self.unicode_filenames, os.listdir, ), 2852 )) 2853 else: 2854 funcs.extend(( 2855 (self.filenames, os.listdir,), 2856 (self.filenames, os.rename, "dst"), 2857 (self.filenames, os.replace, "dst"), 2858 )) 2859 if hasattr(os, "chown"): 2860 funcs.append((self.filenames, os.chown, 0, 0)) 2861 if hasattr(os, "lchown"): 2862 funcs.append((self.filenames, os.lchown, 0, 0)) 2863 if hasattr(os, "truncate"): 2864 funcs.append((self.filenames, os.truncate, 0)) 2865 if hasattr(os, "chflags"): 2866 funcs.append((self.filenames, os.chflags, 0)) 2867 if hasattr(os, "lchflags"): 2868 funcs.append((self.filenames, os.lchflags, 0)) 2869 if hasattr(os, "chroot"): 2870 funcs.append((self.filenames, os.chroot,)) 2871 if hasattr(os, "link"): 2872 if sys.platform == "win32": 2873 funcs.append((self.bytes_filenames, os.link, b"dst")) 2874 funcs.append((self.unicode_filenames, os.link, "dst")) 2875 else: 2876 funcs.append((self.filenames, os.link, "dst")) 2877 if hasattr(os, "listxattr"): 2878 funcs.extend(( 2879 (self.filenames, os.listxattr,), 2880 (self.filenames, os.getxattr, "user.test"), 2881 (self.filenames, os.setxattr, "user.test", b'user'), 2882 (self.filenames, os.removexattr, "user.test"), 2883 )) 2884 if hasattr(os, "lchmod"): 2885 funcs.append((self.filenames, os.lchmod, 0o777)) 2886 if hasattr(os, "readlink"): 2887 if sys.platform == "win32": 2888 funcs.append((self.unicode_filenames, os.readlink,)) 2889 else: 2890 funcs.append((self.filenames, os.readlink,)) 2891 2892 2893 for filenames, func, *func_args in funcs: 2894 for name in filenames: 2895 try: 2896 if isinstance(name, (str, bytes)): 2897 func(name, *func_args) 2898 else: 2899 with self.assertWarnsRegex(DeprecationWarning, 'should be'): 2900 func(name, *func_args) 2901 except OSError as err: 2902 self.assertIs(err.filename, name, str(func)) 2903 except UnicodeDecodeError: 2904 pass 2905 else: 2906 self.fail("No exception thrown by {}".format(func)) 2907 2908 class CPUCountTests(unittest.TestCase): 2909 def test_cpu_count(self): 2910 cpus = os.cpu_count() 2911 if cpus is not None: 2912 self.assertIsInstance(cpus, int) 2913 self.assertGreater(cpus, 0) 2914 else: 2915 self.skipTest("Could not determine the number of CPUs") 2916 2917 2918 class FDInheritanceTests(unittest.TestCase): 2919 def test_get_set_inheritable(self): 2920 fd = os.open(__file__, os.O_RDONLY) 2921 self.addCleanup(os.close, fd) 2922 self.assertEqual(os.get_inheritable(fd), False) 2923 2924 os.set_inheritable(fd, True) 2925 self.assertEqual(os.get_inheritable(fd), True) 2926 2927 @unittest.skipIf(fcntl is None, "need fcntl") 2928 def test_get_inheritable_cloexec(self): 2929 fd = os.open(__file__, os.O_RDONLY) 2930 self.addCleanup(os.close, fd) 2931 self.assertEqual(os.get_inheritable(fd), False) 2932 2933 # clear FD_CLOEXEC flag 2934 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 2935 flags &= ~fcntl.FD_CLOEXEC 2936 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 2937 2938 self.assertEqual(os.get_inheritable(fd), True) 2939 2940 @unittest.skipIf(fcntl is None, "need fcntl") 2941 def test_set_inheritable_cloexec(self): 2942 fd = os.open(__file__, os.O_RDONLY) 2943 self.addCleanup(os.close, fd) 2944 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 2945 fcntl.FD_CLOEXEC) 2946 2947 os.set_inheritable(fd, True) 2948 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 2949 0) 2950 2951 def test_open(self): 2952 fd = os.open(__file__, os.O_RDONLY) 2953 self.addCleanup(os.close, fd) 2954 self.assertEqual(os.get_inheritable(fd), False) 2955 2956 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") 2957 def test_pipe(self): 2958 rfd, wfd = os.pipe() 2959 self.addCleanup(os.close, rfd) 2960 self.addCleanup(os.close, wfd) 2961 self.assertEqual(os.get_inheritable(rfd), False) 2962 self.assertEqual(os.get_inheritable(wfd), False) 2963 2964 def test_dup(self): 2965 fd1 = os.open(__file__, os.O_RDONLY) 2966 self.addCleanup(os.close, fd1) 2967 2968 fd2 = os.dup(fd1) 2969 self.addCleanup(os.close, fd2) 2970 self.assertEqual(os.get_inheritable(fd2), False) 2971 2972 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") 2973 def test_dup2(self): 2974 fd = os.open(__file__, os.O_RDONLY) 2975 self.addCleanup(os.close, fd) 2976 2977 # inheritable by default 2978 fd2 = os.open(__file__, os.O_RDONLY) 2979 try: 2980 os.dup2(fd, fd2) 2981 self.assertEqual(os.get_inheritable(fd2), True) 2982 finally: 2983 os.close(fd2) 2984 2985 # force non-inheritable 2986 fd3 = os.open(__file__, os.O_RDONLY) 2987 try: 2988 os.dup2(fd, fd3, inheritable=False) 2989 self.assertEqual(os.get_inheritable(fd3), False) 2990 finally: 2991 os.close(fd3) 2992 2993 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") 2994 def test_openpty(self): 2995 master_fd, slave_fd = os.openpty() 2996 self.addCleanup(os.close, master_fd) 2997 self.addCleanup(os.close, slave_fd) 2998 self.assertEqual(os.get_inheritable(master_fd), False) 2999 self.assertEqual(os.get_inheritable(slave_fd), False) 3000 3001 3002 class PathTConverterTests(unittest.TestCase): 3003 # tuples of (function name, allows fd arguments, additional arguments to 3004 # function, cleanup function) 3005 functions = [ 3006 ('stat', True, (), None), 3007 ('lstat', False, (), None), 3008 ('access', False, (os.F_OK,), None), 3009 ('chflags', False, (0,), None), 3010 ('lchflags', False, (0,), None), 3011 ('open', False, (0,), getattr(os, 'close', None)), 3012 ] 3013 3014 def test_path_t_converter(self): 3015 str_filename = support.TESTFN 3016 if os.name == 'nt': 3017 bytes_fspath = bytes_filename = None 3018 else: 3019 bytes_filename = support.TESTFN.encode('ascii') 3020 bytes_fspath = _PathLike(bytes_filename) 3021 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT) 3022 self.addCleanup(support.unlink, support.TESTFN) 3023 self.addCleanup(os.close, fd) 3024 3025 int_fspath = _PathLike(fd) 3026 str_fspath = _PathLike(str_filename) 3027 3028 for name, allow_fd, extra_args, cleanup_fn in self.functions: 3029 with self.subTest(name=name): 3030 try: 3031 fn = getattr(os, name) 3032 except AttributeError: 3033 continue 3034 3035 for path in (str_filename, bytes_filename, str_fspath, 3036 bytes_fspath): 3037 if path is None: 3038 continue 3039 with self.subTest(name=name, path=path): 3040 result = fn(path, *extra_args) 3041 if cleanup_fn is not None: 3042 cleanup_fn(result) 3043 3044 with self.assertRaisesRegex( 3045 TypeError, 'should be string, bytes'): 3046 fn(int_fspath, *extra_args) 3047 3048 if allow_fd: 3049 result = fn(fd, *extra_args) # should not fail 3050 if cleanup_fn is not None: 3051 cleanup_fn(result) 3052 else: 3053 with self.assertRaisesRegex( 3054 TypeError, 3055 'os.PathLike'): 3056 fn(fd, *extra_args) 3057 3058 3059 @unittest.skipUnless(hasattr(os, 'get_blocking'), 3060 'needs os.get_blocking() and os.set_blocking()') 3061 class BlockingTests(unittest.TestCase): 3062 def test_blocking(self): 3063 fd = os.open(__file__, os.O_RDONLY) 3064 self.addCleanup(os.close, fd) 3065 self.assertEqual(os.get_blocking(fd), True) 3066 3067 os.set_blocking(fd, False) 3068 self.assertEqual(os.get_blocking(fd), False) 3069 3070 os.set_blocking(fd, True) 3071 self.assertEqual(os.get_blocking(fd), True) 3072 3073 3074 3075 class ExportsTests(unittest.TestCase): 3076 def test_os_all(self): 3077 self.assertIn('open', os.__all__) 3078 self.assertIn('walk', os.__all__) 3079 3080 3081 class TestScandir(unittest.TestCase): 3082 check_no_resource_warning = support.check_no_resource_warning 3083 3084 def setUp(self): 3085 self.path = os.path.realpath(support.TESTFN) 3086 self.bytes_path = os.fsencode(self.path) 3087 self.addCleanup(support.rmtree, self.path) 3088 os.mkdir(self.path) 3089 3090 def create_file(self, name="file.txt"): 3091 path = self.bytes_path if isinstance(name, bytes) else self.path 3092 filename = os.path.join(path, name) 3093 create_file(filename, b'python') 3094 return filename 3095 3096 def get_entries(self, names): 3097 entries = dict((entry.name, entry) 3098 for entry in os.scandir(self.path)) 3099 self.assertEqual(sorted(entries.keys()), names) 3100 return entries 3101 3102 def assert_stat_equal(self, stat1, stat2, skip_fields): 3103 if skip_fields: 3104 for attr in dir(stat1): 3105 if not attr.startswith("st_"): 3106 continue 3107 if attr in ("st_dev", "st_ino", "st_nlink"): 3108 continue 3109 self.assertEqual(getattr(stat1, attr), 3110 getattr(stat2, attr), 3111 (stat1, stat2, attr)) 3112 else: 3113 self.assertEqual(stat1, stat2) 3114 3115 def check_entry(self, entry, name, is_dir, is_file, is_symlink): 3116 self.assertIsInstance(entry, os.DirEntry) 3117 self.assertEqual(entry.name, name) 3118 self.assertEqual(entry.path, os.path.join(self.path, name)) 3119 self.assertEqual(entry.inode(), 3120 os.stat(entry.path, follow_symlinks=False).st_ino) 3121 3122 entry_stat = os.stat(entry.path) 3123 self.assertEqual(entry.is_dir(), 3124 stat.S_ISDIR(entry_stat.st_mode)) 3125 self.assertEqual(entry.is_file(), 3126 stat.S_ISREG(entry_stat.st_mode)) 3127 self.assertEqual(entry.is_symlink(), 3128 os.path.islink(entry.path)) 3129 3130 entry_lstat = os.stat(entry.path, follow_symlinks=False) 3131 self.assertEqual(entry.is_dir(follow_symlinks=False), 3132 stat.S_ISDIR(entry_lstat.st_mode)) 3133 self.assertEqual(entry.is_file(follow_symlinks=False), 3134 stat.S_ISREG(entry_lstat.st_mode)) 3135 3136 self.assert_stat_equal(entry.stat(), 3137 entry_stat, 3138 os.name == 'nt' and not is_symlink) 3139 self.assert_stat_equal(entry.stat(follow_symlinks=False), 3140 entry_lstat, 3141 os.name == 'nt') 3142 3143 def test_attributes(self): 3144 link = hasattr(os, 'link') 3145 symlink = support.can_symlink() 3146 3147 dirname = os.path.join(self.path, "dir") 3148 os.mkdir(dirname) 3149 filename = self.create_file("file.txt") 3150 if link: 3151 os.link(filename, os.path.join(self.path, "link_file.txt")) 3152 if symlink: 3153 os.symlink(dirname, os.path.join(self.path, "symlink_dir"), 3154 target_is_directory=True) 3155 os.symlink(filename, os.path.join(self.path, "symlink_file.txt")) 3156 3157 names = ['dir', 'file.txt'] 3158 if link: 3159 names.append('link_file.txt') 3160 if symlink: 3161 names.extend(('symlink_dir', 'symlink_file.txt')) 3162 entries = self.get_entries(names) 3163 3164 entry = entries['dir'] 3165 self.check_entry(entry, 'dir', True, False, False) 3166 3167 entry = entries['file.txt'] 3168 self.check_entry(entry, 'file.txt', False, True, False) 3169 3170 if link: 3171 entry = entries['link_file.txt'] 3172 self.check_entry(entry, 'link_file.txt', False, True, False) 3173 3174 if symlink: 3175 entry = entries['symlink_dir'] 3176 self.check_entry(entry, 'symlink_dir', True, False, True) 3177 3178 entry = entries['symlink_file.txt'] 3179 self.check_entry(entry, 'symlink_file.txt', False, True, True) 3180 3181 def get_entry(self, name): 3182 path = self.bytes_path if isinstance(name, bytes) else self.path 3183 entries = list(os.scandir(path)) 3184 self.assertEqual(len(entries), 1) 3185 3186 entry = entries[0] 3187 self.assertEqual(entry.name, name) 3188 return entry 3189 3190 def create_file_entry(self, name='file.txt'): 3191 filename = self.create_file(name=name) 3192 return self.get_entry(os.path.basename(filename)) 3193 3194 def test_current_directory(self): 3195 filename = self.create_file() 3196 old_dir = os.getcwd() 3197 try: 3198 os.chdir(self.path) 3199 3200 # call scandir() without parameter: it must list the content 3201 # of the current directory 3202 entries = dict((entry.name, entry) for entry in os.scandir()) 3203 self.assertEqual(sorted(entries.keys()), 3204 [os.path.basename(filename)]) 3205 finally: 3206 os.chdir(old_dir) 3207 3208 def test_repr(self): 3209 entry = self.create_file_entry() 3210 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") 3211 3212 def test_fspath_protocol(self): 3213 entry = self.create_file_entry() 3214 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt')) 3215 3216 def test_fspath_protocol_bytes(self): 3217 bytes_filename = os.fsencode('bytesfile.txt') 3218 bytes_entry = self.create_file_entry(name=bytes_filename) 3219 fspath = os.fspath(bytes_entry) 3220 self.assertIsInstance(fspath, bytes) 3221 self.assertEqual(fspath, 3222 os.path.join(os.fsencode(self.path),bytes_filename)) 3223 3224 def test_removed_dir(self): 3225 path = os.path.join(self.path, 'dir') 3226 3227 os.mkdir(path) 3228 entry = self.get_entry('dir') 3229 os.rmdir(path) 3230 3231 # On POSIX, is_dir() result depends if scandir() filled d_type or not 3232 if os.name == 'nt': 3233 self.assertTrue(entry.is_dir()) 3234 self.assertFalse(entry.is_file()) 3235 self.assertFalse(entry.is_symlink()) 3236 if os.name == 'nt': 3237 self.assertRaises(FileNotFoundError, entry.inode) 3238 # don't fail 3239 entry.stat() 3240 entry.stat(follow_symlinks=False) 3241 else: 3242 self.assertGreater(entry.inode(), 0) 3243 self.assertRaises(FileNotFoundError, entry.stat) 3244 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 3245 3246 def test_removed_file(self): 3247 entry = self.create_file_entry() 3248 os.unlink(entry.path) 3249 3250 self.assertFalse(entry.is_dir()) 3251 # On POSIX, is_dir() result depends if scandir() filled d_type or not 3252 if os.name == 'nt': 3253 self.assertTrue(entry.is_file()) 3254 self.assertFalse(entry.is_symlink()) 3255 if os.name == 'nt': 3256 self.assertRaises(FileNotFoundError, entry.inode) 3257 # don't fail 3258 entry.stat() 3259 entry.stat(follow_symlinks=False) 3260 else: 3261 self.assertGreater(entry.inode(), 0) 3262 self.assertRaises(FileNotFoundError, entry.stat) 3263 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 3264 3265 def test_broken_symlink(self): 3266 if not support.can_symlink(): 3267 return self.skipTest('cannot create symbolic link') 3268 3269 filename = self.create_file("file.txt") 3270 os.symlink(filename, 3271 os.path.join(self.path, "symlink.txt")) 3272 entries = self.get_entries(['file.txt', 'symlink.txt']) 3273 entry = entries['symlink.txt'] 3274 os.unlink(filename) 3275 3276 self.assertGreater(entry.inode(), 0) 3277 self.assertFalse(entry.is_dir()) 3278 self.assertFalse(entry.is_file()) # broken symlink returns False 3279 self.assertFalse(entry.is_dir(follow_symlinks=False)) 3280 self.assertFalse(entry.is_file(follow_symlinks=False)) 3281 self.assertTrue(entry.is_symlink()) 3282 self.assertRaises(FileNotFoundError, entry.stat) 3283 # don't fail 3284 entry.stat(follow_symlinks=False) 3285 3286 def test_bytes(self): 3287 self.create_file("file.txt") 3288 3289 path_bytes = os.fsencode(self.path) 3290 entries = list(os.scandir(path_bytes)) 3291 self.assertEqual(len(entries), 1, entries) 3292 entry = entries[0] 3293 3294 self.assertEqual(entry.name, b'file.txt') 3295 self.assertEqual(entry.path, 3296 os.fsencode(os.path.join(self.path, 'file.txt'))) 3297 3298 def test_empty_path(self): 3299 self.assertRaises(FileNotFoundError, os.scandir, '') 3300 3301 def test_consume_iterator_twice(self): 3302 self.create_file("file.txt") 3303 iterator = os.scandir(self.path) 3304 3305 entries = list(iterator) 3306 self.assertEqual(len(entries), 1, entries) 3307 3308 # check than consuming the iterator twice doesn't raise exception 3309 entries2 = list(iterator) 3310 self.assertEqual(len(entries2), 0, entries2) 3311 3312 def test_bad_path_type(self): 3313 for obj in [1234, 1.234, {}, []]: 3314 self.assertRaises(TypeError, os.scandir, obj) 3315 3316 def test_close(self): 3317 self.create_file("file.txt") 3318 self.create_file("file2.txt") 3319 iterator = os.scandir(self.path) 3320 next(iterator) 3321 iterator.close() 3322 # multiple closes 3323 iterator.close() 3324 with self.check_no_resource_warning(): 3325 del iterator 3326 3327 def test_context_manager(self): 3328 self.create_file("file.txt") 3329 self.create_file("file2.txt") 3330 with os.scandir(self.path) as iterator: 3331 next(iterator) 3332 with self.check_no_resource_warning(): 3333 del iterator 3334 3335 def test_context_manager_close(self): 3336 self.create_file("file.txt") 3337 self.create_file("file2.txt") 3338 with os.scandir(self.path) as iterator: 3339 next(iterator) 3340 iterator.close() 3341 3342 def test_context_manager_exception(self): 3343 self.create_file("file.txt") 3344 self.create_file("file2.txt") 3345 with self.assertRaises(ZeroDivisionError): 3346 with os.scandir(self.path) as iterator: 3347 next(iterator) 3348 1/0 3349 with self.check_no_resource_warning(): 3350 del iterator 3351 3352 def test_resource_warning(self): 3353 self.create_file("file.txt") 3354 self.create_file("file2.txt") 3355 iterator = os.scandir(self.path) 3356 next(iterator) 3357 with self.assertWarns(ResourceWarning): 3358 del iterator 3359 support.gc_collect() 3360 # exhausted iterator 3361 iterator = os.scandir(self.path) 3362 list(iterator) 3363 with self.check_no_resource_warning(): 3364 del iterator 3365 3366 3367 class TestPEP519(unittest.TestCase): 3368 3369 # Abstracted so it can be overridden to test pure Python implementation 3370 # if a C version is provided. 3371 fspath = staticmethod(os.fspath) 3372 3373 def test_return_bytes(self): 3374 for b in b'hello', b'goodbye', b'some/path/and/file': 3375 self.assertEqual(b, self.fspath(b)) 3376 3377 def test_return_string(self): 3378 for s in 'hello', 'goodbye', 'some/path/and/file': 3379 self.assertEqual(s, self.fspath(s)) 3380 3381 def test_fsencode_fsdecode(self): 3382 for p in "path/like/object", b"path/like/object": 3383 pathlike = _PathLike(p) 3384 3385 self.assertEqual(p, self.fspath(pathlike)) 3386 self.assertEqual(b"path/like/object", os.fsencode(pathlike)) 3387 self.assertEqual("path/like/object", os.fsdecode(pathlike)) 3388 3389 def test_pathlike(self): 3390 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil'))) 3391 self.assertTrue(issubclass(_PathLike, os.PathLike)) 3392 self.assertTrue(isinstance(_PathLike(), os.PathLike)) 3393 3394 def test_garbage_in_exception_out(self): 3395 vapor = type('blah', (), {}) 3396 for o in int, type, os, vapor(): 3397 self.assertRaises(TypeError, self.fspath, o) 3398 3399 def test_argument_required(self): 3400 self.assertRaises(TypeError, self.fspath) 3401 3402 def test_bad_pathlike(self): 3403 # __fspath__ returns a value other than str or bytes. 3404 self.assertRaises(TypeError, self.fspath, _PathLike(42)) 3405 # __fspath__ attribute that is not callable. 3406 c = type('foo', (), {}) 3407 c.__fspath__ = 1 3408 self.assertRaises(TypeError, self.fspath, c()) 3409 # __fspath__ raises an exception. 3410 self.assertRaises(ZeroDivisionError, self.fspath, 3411 _PathLike(ZeroDivisionError())) 3412 3413 # Only test if the C version is provided, otherwise TestPEP519 already tested 3414 # the pure Python implementation. 3415 if hasattr(os, "_fspath"): 3416 class TestPEP519PurePython(TestPEP519): 3417 3418 """Explicitly test the pure Python implementation of os.fspath().""" 3419 3420 fspath = staticmethod(os._fspath) 3421 3422 3423 if __name__ == "__main__": 3424 unittest.main() 3425