Home | History | Annotate | Download | only in test
      1 # As a test suite for the os module, this is woefully inadequate, but this
      2 # does add tests for a few functions which have been determined to be more
      3 # portable than they had been thought to be.
      4 
      5 import asynchat
      6 import asyncore
      7 import codecs
      8 import contextlib
      9 import decimal
     10 import errno
     11 import fractions
     12 import getpass
     13 import itertools
     14 import locale
     15 import mmap
     16 import os
     17 import pickle
     18 import re
     19 import shutil
     20 import signal
     21 import socket
     22 import stat
     23 import subprocess
     24 import sys
     25 import sysconfig
     26 import time
     27 import unittest
     28 import uuid
     29 import warnings
     30 from test import support
     31 try:
     32     import threading
     33 except ImportError:
     34     threading = None
     35 try:
     36     import resource
     37 except ImportError:
     38     resource = None
     39 try:
     40     import fcntl
     41 except ImportError:
     42     fcntl = None
     43 try:
     44     import _winapi
     45 except ImportError:
     46     _winapi = None
     47 try:
     48     import grp
     49     groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
     50     if hasattr(os, 'getgid'):
     51         process_gid = os.getgid()
     52         if process_gid not in groups:
     53             groups.append(process_gid)
     54 except ImportError:
     55     groups = []
     56 try:
     57     import pwd
     58     all_users = [u.pw_uid for u in pwd.getpwall()]
     59 except (ImportError, AttributeError):
     60     all_users = []
     61 try:
     62     from _testcapi import INT_MAX, PY_SSIZE_T_MAX
     63 except ImportError:
     64     INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
     65 
     66 from test.support.script_helper import assert_python_ok
     67 from test.support import unix_shell
     68 
     69 
     70 root_in_posix = False
     71 if hasattr(os, 'geteuid'):
     72     root_in_posix = (os.geteuid() == 0)
     73 
     74 # Detect whether we're on a Linux system that uses the (now outdated
     75 # and unmaintained) linuxthreads threading library.  There's an issue
     76 # when combining linuxthreads with a failed execv call: see
     77 # http://bugs.python.org/issue4970.
     78 if hasattr(sys, 'thread_info') and sys.thread_info.version:
     79     USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
     80 else:
     81     USING_LINUXTHREADS = False
     82 
     83 # Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
     84 HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
     85 
     86 
     87 @contextlib.contextmanager
     88 def ignore_deprecation_warnings(msg_regex, quiet=False):
     89     with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
     90         yield
     91 
     92 
     93 def requires_os_func(name):
     94     return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
     95 
     96 
     97 class _PathLike(os.PathLike):
     98 
     99     def __init__(self, path=""):
    100         self.path = path
    101 
    102     def __str__(self):
    103         return str(self.path)
    104 
    105     def __fspath__(self):
    106         if isinstance(self.path, BaseException):
    107             raise self.path
    108         else:
    109             return self.path
    110 
    111 
    112 def create_file(filename, content=b'content'):
    113     with open(filename, "xb", 0) as fp:
    114         fp.write(content)
    115 
    116 
    117 # Tests creating TESTFN
    118 class FileTests(unittest.TestCase):
    119     def setUp(self):
    120         if os.path.lexists(support.TESTFN):
    121             os.unlink(support.TESTFN)
    122     tearDown = setUp
    123 
    124     def test_access(self):
    125         f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
    126         os.close(f)
    127         self.assertTrue(os.access(support.TESTFN, os.W_OK))
    128 
    129     def test_closerange(self):
    130         first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
    131         # We must allocate two consecutive file descriptors, otherwise
    132         # it will mess up other file descriptors (perhaps even the three
    133         # standard ones).
    134         second = os.dup(first)
    135         try:
    136             retries = 0
    137             while second != first + 1:
    138                 os.close(first)
    139                 retries += 1
    140                 if retries > 10:
    141                     # XXX test skipped
    142                     self.skipTest("couldn't allocate two consecutive fds")
    143                 first, second = second, os.dup(second)
    144         finally:
    145             os.close(second)
    146         # close a fd that is open, and one that isn't
    147         os.closerange(first, first + 2)
    148         self.assertRaises(OSError, os.write, first, b"a")
    149 
    150     @support.cpython_only
    151     def test_rename(self):
    152         path = support.TESTFN
    153         old = sys.getrefcount(path)
    154         self.assertRaises(TypeError, os.rename, path, 0)
    155         new = sys.getrefcount(path)
    156         self.assertEqual(old, new)
    157 
    158     def test_read(self):
    159         with open(support.TESTFN, "w+b") as fobj:
    160             fobj.write(b"spam")
    161             fobj.flush()
    162             fd = fobj.fileno()
    163             os.lseek(fd, 0, 0)
    164             s = os.read(fd, 4)
    165             self.assertEqual(type(s), bytes)
    166             self.assertEqual(s, b"spam")
    167 
    168     @support.cpython_only
    169     # Skip the test on 32-bit platforms: the number of bytes must fit in a
    170     # Py_ssize_t type
    171     @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
    172                          "needs INT_MAX < PY_SSIZE_T_MAX")
    173     @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
    174     def test_large_read(self, size):
    175         self.addCleanup(support.unlink, support.TESTFN)
    176         create_file(support.TESTFN, b'test')
    177 
    178         # Issue #21932: Make sure that os.read() does not raise an
    179         # OverflowError for size larger than INT_MAX
    180         with open(support.TESTFN, "rb") as fp:
    181             data = os.read(fp.fileno(), size)
    182 
    183         # The test does not try to read more than 2 GB at once because the
    184         # operating system is free to return less bytes than requested.
    185         self.assertEqual(data, b'test')
    186 
    187     def test_write(self):
    188         # os.write() accepts bytes- and buffer-like objects but not strings
    189         fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
    190         self.assertRaises(TypeError, os.write, fd, "beans")
    191         os.write(fd, b"bacon\n")
    192         os.write(fd, bytearray(b"eggs\n"))
    193         os.write(fd, memoryview(b"spam\n"))
    194         os.close(fd)
    195         with open(support.TESTFN, "rb") as fobj:
    196             self.assertEqual(fobj.read().splitlines(),
    197                 [b"bacon", b"eggs", b"spam"])
    198 
    199     def write_windows_console(self, *args):
    200         retcode = subprocess.call(args,
    201             # use a new console to not flood the test output
    202             creationflags=subprocess.CREATE_NEW_CONSOLE,
    203             # use a shell to hide the console window (SW_HIDE)
    204             shell=True)
    205         self.assertEqual(retcode, 0)
    206 
    207     @unittest.skipUnless(sys.platform == 'win32',
    208                          'test specific to the Windows console')
    209     def test_write_windows_console(self):
    210         # Issue #11395: the Windows console returns an error (12: not enough
    211         # space error) on writing into stdout if stdout mode is binary and the
    212         # length is greater than 66,000 bytes (or less, depending on heap
    213         # usage).
    214         code = "print('x' * 100000)"
    215         self.write_windows_console(sys.executable, "-c", code)
    216         self.write_windows_console(sys.executable, "-u", "-c", code)
    217 
    218     def fdopen_helper(self, *args):
    219         fd = os.open(support.TESTFN, os.O_RDONLY)
    220         f = os.fdopen(fd, *args)
    221         f.close()
    222 
    223     def test_fdopen(self):
    224         fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
    225         os.close(fd)
    226 
    227         self.fdopen_helper()
    228         self.fdopen_helper('r')
    229         self.fdopen_helper('r', 100)
    230 
    231     def test_replace(self):
    232         TESTFN2 = support.TESTFN + ".2"
    233         self.addCleanup(support.unlink, support.TESTFN)
    234         self.addCleanup(support.unlink, TESTFN2)
    235 
    236         create_file(support.TESTFN, b"1")
    237         create_file(TESTFN2, b"2")
    238 
    239         os.replace(support.TESTFN, TESTFN2)
    240         self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
    241         with open(TESTFN2, 'r') as f:
    242             self.assertEqual(f.read(), "1")
    243 
    244     def test_open_keywords(self):
    245         f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
    246             dir_fd=None)
    247         os.close(f)
    248 
    249     def test_symlink_keywords(self):
    250         symlink = support.get_attribute(os, "symlink")
    251         try:
    252             symlink(src='target', dst=support.TESTFN,
    253                 target_is_directory=False, dir_fd=None)
    254         except (NotImplementedError, OSError):
    255             pass  # No OS support or unprivileged user
    256 
    257 
    258 # Test attributes on return values from os.*stat* family.
    259 class StatAttributeTests(unittest.TestCase):
    260     def setUp(self):
    261         self.fname = support.TESTFN
    262         self.addCleanup(support.unlink, self.fname)
    263         create_file(self.fname, b"ABC")
    264 
    265     @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
    266     def check_stat_attributes(self, fname):
    267         result = os.stat(fname)
    268 
    269         # Make sure direct access works
    270         self.assertEqual(result[stat.ST_SIZE], 3)
    271         self.assertEqual(result.st_size, 3)
    272 
    273         # Make sure all the attributes are there
    274         members = dir(result)
    275         for name in dir(stat):
    276             if name[:3] == 'ST_':
    277                 attr = name.lower()
    278                 if name.endswith("TIME"):
    279                     def trunc(x): return int(x)
    280                 else:
    281                     def trunc(x): return x
    282                 self.assertEqual(trunc(getattr(result, attr)),
    283                                   result[getattr(stat, name)])
    284                 self.assertIn(attr, members)
    285 
    286         # Make sure that the st_?time and st_?time_ns fields roughly agree
    287         # (they should always agree up to around tens-of-microseconds)
    288         for name in 'st_atime st_mtime st_ctime'.split():
    289             floaty = int(getattr(result, name) * 100000)
    290             nanosecondy = getattr(result, name + "_ns") // 10000
    291             self.assertAlmostEqual(floaty, nanosecondy, delta=2)
    292 
    293         try:
    294             result[200]
    295             self.fail("No exception raised")
    296         except IndexError:
    297             pass
    298 
    299         # Make sure that assignment fails
    300         try:
    301             result.st_mode = 1
    302             self.fail("No exception raised")
    303         except AttributeError:
    304             pass
    305 
    306         try:
    307             result.st_rdev = 1
    308             self.fail("No exception raised")
    309         except (AttributeError, TypeError):
    310             pass
    311 
    312         try:
    313             result.parrot = 1
    314             self.fail("No exception raised")
    315         except AttributeError:
    316             pass
    317 
    318         # Use the stat_result constructor with a too-short tuple.
    319         try:
    320             result2 = os.stat_result((10,))
    321             self.fail("No exception raised")
    322         except TypeError:
    323             pass
    324 
    325         # Use the constructor with a too-long tuple.
    326         try:
    327             result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
    328         except TypeError:
    329             pass
    330 
    331     def test_stat_attributes(self):
    332         self.check_stat_attributes(self.fname)
    333 
    334     def test_stat_attributes_bytes(self):
    335         try:
    336             fname = self.fname.encode(sys.getfilesystemencoding())
    337         except UnicodeEncodeError:
    338             self.skipTest("cannot encode %a for the filesystem" % self.fname)
    339         self.check_stat_attributes(fname)
    340 
    341     def test_stat_result_pickle(self):
    342         result = os.stat(self.fname)
    343         for proto in range(pickle.HIGHEST_PROTOCOL + 1):
    344             p = pickle.dumps(result, proto)
    345             self.assertIn(b'stat_result', p)
    346             if proto < 4:
    347                 self.assertIn(b'cos\nstat_result\n', p)
    348             unpickled = pickle.loads(p)
    349             self.assertEqual(result, unpickled)
    350 
    351     @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
    352     def test_statvfs_attributes(self):
    353         try:
    354             result = os.statvfs(self.fname)
    355         except OSError as e:
    356             # On AtheOS, glibc always returns ENOSYS
    357             if e.errno == errno.ENOSYS:
    358                 self.skipTest('os.statvfs() failed with ENOSYS')
    359 
    360         # Make sure direct access works
    361         self.assertEqual(result.f_bfree, result[3])
    362 
    363         # Make sure all the attributes are there.
    364         members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
    365                     'ffree', 'favail', 'flag', 'namemax')
    366         for value, member in enumerate(members):
    367             self.assertEqual(getattr(result, 'f_' + member), result[value])
    368 
    369         # Make sure that assignment really fails
    370         try:
    371             result.f_bfree = 1
    372             self.fail("No exception raised")
    373         except AttributeError:
    374             pass
    375 
    376         try:
    377             result.parrot = 1
    378             self.fail("No exception raised")
    379         except AttributeError:
    380             pass
    381 
    382         # Use the constructor with a too-short tuple.
    383         try:
    384             result2 = os.statvfs_result((10,))
    385             self.fail("No exception raised")
    386         except TypeError:
    387             pass
    388 
    389         # Use the constructor with a too-long tuple.
    390         try:
    391             result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
    392         except TypeError:
    393             pass
    394 
    395     @unittest.skipUnless(hasattr(os, 'statvfs'),
    396                          "need os.statvfs()")
    397     def test_statvfs_result_pickle(self):
    398         try:
    399             result = os.statvfs(self.fname)
    400         except OSError as e:
    401             # On AtheOS, glibc always returns ENOSYS
    402             if e.errno == errno.ENOSYS:
    403                 self.skipTest('os.statvfs() failed with ENOSYS')
    404 
    405         for proto in range(pickle.HIGHEST_PROTOCOL + 1):
    406             p = pickle.dumps(result, proto)
    407             self.assertIn(b'statvfs_result', p)
    408             if proto < 4:
    409                 self.assertIn(b'cos\nstatvfs_result\n', p)
    410             unpickled = pickle.loads(p)
    411             self.assertEqual(result, unpickled)
    412 
    413     @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    414     def test_1686475(self):
    415         # Verify that an open file can be stat'ed
    416         try:
    417             os.stat(r"c:\pagefile.sys")
    418         except FileNotFoundError:
    419             self.skipTest(r'c:\pagefile.sys does not exist')
    420         except OSError as e:
    421             self.fail("Could not stat pagefile.sys")
    422 
    423     @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    424     @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
    425     def test_15261(self):
    426         # Verify that stat'ing a closed fd does not cause crash
    427         r, w = os.pipe()
    428         try:
    429             os.stat(r)          # should not raise error
    430         finally:
    431             os.close(r)
    432             os.close(w)
    433         with self.assertRaises(OSError) as ctx:
    434             os.stat(r)
    435         self.assertEqual(ctx.exception.errno, errno.EBADF)
    436 
    437     def check_file_attributes(self, result):
    438         self.assertTrue(hasattr(result, 'st_file_attributes'))
    439         self.assertTrue(isinstance(result.st_file_attributes, int))
    440         self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
    441 
    442     @unittest.skipUnless(sys.platform == "win32",
    443                          "st_file_attributes is Win32 specific")
    444     def test_file_attributes(self):
    445         # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
    446         result = os.stat(self.fname)
    447         self.check_file_attributes(result)
    448         self.assertEqual(
    449             result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
    450             0)
    451 
    452         # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
    453         dirname = support.TESTFN + "dir"
    454         os.mkdir(dirname)
    455         self.addCleanup(os.rmdir, dirname)
    456 
    457         result = os.stat(dirname)
    458         self.check_file_attributes(result)
    459         self.assertEqual(
    460             result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
    461             stat.FILE_ATTRIBUTE_DIRECTORY)
    462 
    463     @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    464     def test_access_denied(self):
    465         # Default to FindFirstFile WIN32_FIND_DATA when access is
    466         # denied. See issue 28075.
    467         # os.environ['TEMP'] should be located on a volume that
    468         # supports file ACLs.
    469         fname = os.path.join(os.environ['TEMP'], self.fname)
    470         self.addCleanup(support.unlink, fname)
    471         create_file(fname, b'ABC')
    472         # Deny the right to [S]YNCHRONIZE on the file to
    473         # force CreateFile to fail with ERROR_ACCESS_DENIED.
    474         DETACHED_PROCESS = 8
    475         subprocess.check_call(
    476             ['icacls.exe', fname, '/deny', 'Users:(S)'],
    477             creationflags=DETACHED_PROCESS
    478         )
    479         result = os.stat(fname)
    480         self.assertNotEqual(result.st_size, 0)
    481 
    482 
    483 class UtimeTests(unittest.TestCase):
    484     def setUp(self):
    485         self.dirname = support.TESTFN
    486         self.fname = os.path.join(self.dirname, "f1")
    487 
    488         self.addCleanup(support.rmtree, self.dirname)
    489         os.mkdir(self.dirname)
    490         create_file(self.fname)
    491 
    492         def restore_float_times(state):
    493             with ignore_deprecation_warnings('stat_float_times'):
    494                 os.stat_float_times(state)
    495 
    496         # ensure that st_atime and st_mtime are float
    497         with ignore_deprecation_warnings('stat_float_times'):
    498             old_float_times = os.stat_float_times(-1)
    499             self.addCleanup(restore_float_times, old_float_times)
    500 
    501             os.stat_float_times(True)
    502 
    503     def support_subsecond(self, filename):
    504         # Heuristic to check if the filesystem supports timestamp with
    505         # subsecond resolution: check if float and int timestamps are different
    506         st = os.stat(filename)
    507         return ((st.st_atime != st[7])
    508                 or (st.st_mtime != st[8])
    509                 or (st.st_ctime != st[9]))
    510 
    511     def _test_utime(self, set_time, filename=None):
    512         if not filename:
    513             filename = self.fname
    514 
    515         support_subsecond = self.support_subsecond(filename)
    516         if support_subsecond:
    517             # Timestamp with a resolution of 1 microsecond (10^-6).
    518             #
    519             # The resolution of the C internal function used by os.utime()
    520             # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
    521             # test with a resolution of 1 ns requires more work:
    522             # see the issue #15745.
    523             atime_ns = 1002003000   # 1.002003 seconds
    524             mtime_ns = 4005006000   # 4.005006 seconds
    525         else:
    526             # use a resolution of 1 second
    527             atime_ns = 5 * 10**9
    528             mtime_ns = 8 * 10**9
    529 
    530         set_time(filename, (atime_ns, mtime_ns))
    531         st = os.stat(filename)
    532 
    533         if support_subsecond:
    534             self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
    535             self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
    536         else:
    537             self.assertEqual(st.st_atime, atime_ns * 1e-9)
    538             self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
    539         self.assertEqual(st.st_atime_ns, atime_ns)
    540         self.assertEqual(st.st_mtime_ns, mtime_ns)
    541 
    542     def test_utime(self):
    543         def set_time(filename, ns):
    544             # test the ns keyword parameter
    545             os.utime(filename, ns=ns)
    546         self._test_utime(set_time)
    547 
    548     @staticmethod
    549     def ns_to_sec(ns):
    550         # Convert a number of nanosecond (int) to a number of seconds (float).
    551         # Round towards infinity by adding 0.5 nanosecond to avoid rounding
    552         # issue, os.utime() rounds towards minus infinity.
    553         return (ns * 1e-9) + 0.5e-9
    554 
    555     def test_utime_by_indexed(self):
    556         # pass times as floating point seconds as the second indexed parameter
    557         def set_time(filename, ns):
    558             atime_ns, mtime_ns = ns
    559             atime = self.ns_to_sec(atime_ns)
    560             mtime = self.ns_to_sec(mtime_ns)
    561             # test utimensat(timespec), utimes(timeval), utime(utimbuf)
    562             # or utime(time_t)
    563             os.utime(filename, (atime, mtime))
    564         self._test_utime(set_time)
    565 
    566     def test_utime_by_times(self):
    567         def set_time(filename, ns):
    568             atime_ns, mtime_ns = ns
    569             atime = self.ns_to_sec(atime_ns)
    570             mtime = self.ns_to_sec(mtime_ns)
    571             # test the times keyword parameter
    572             os.utime(filename, times=(atime, mtime))
    573         self._test_utime(set_time)
    574 
    575     @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
    576                          "follow_symlinks support for utime required "
    577                          "for this test.")
    578     def test_utime_nofollow_symlinks(self):
    579         def set_time(filename, ns):
    580             # use follow_symlinks=False to test utimensat(timespec)
    581             # or lutimes(timeval)
    582             os.utime(filename, ns=ns, follow_symlinks=False)
    583         self._test_utime(set_time)
    584 
    585     @unittest.skipUnless(os.utime in os.supports_fd,
    586                          "fd support for utime required for this test.")
    587     def test_utime_fd(self):
    588         def set_time(filename, ns):
    589             with open(filename, 'wb', 0) as fp:
    590                 # use a file descriptor to test futimens(timespec)
    591                 # or futimes(timeval)
    592                 os.utime(fp.fileno(), ns=ns)
    593         self._test_utime(set_time)
    594 
    595     @unittest.skipUnless(os.utime in os.supports_dir_fd,
    596                          "dir_fd support for utime required for this test.")
    597     def test_utime_dir_fd(self):
    598         def set_time(filename, ns):
    599             dirname, name = os.path.split(filename)
    600             dirfd = os.open(dirname, os.O_RDONLY)
    601             try:
    602                 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
    603                 os.utime(name, dir_fd=dirfd, ns=ns)
    604             finally:
    605                 os.close(dirfd)
    606         self._test_utime(set_time)
    607 
    608     def test_utime_directory(self):
    609         def set_time(filename, ns):
    610             # test calling os.utime() on a directory
    611             os.utime(filename, ns=ns)
    612         self._test_utime(set_time, filename=self.dirname)
    613 
    614     def _test_utime_current(self, set_time):
    615         # Get the system clock
    616         current = time.time()
    617 
    618         # Call os.utime() to set the timestamp to the current system clock
    619         set_time(self.fname)
    620 
    621         if not self.support_subsecond(self.fname):
    622             delta = 1.0
    623         else:
    624             # On Windows, the usual resolution of time.time() is 15.6 ms
    625             delta = 0.020
    626         st = os.stat(self.fname)
    627         msg = ("st_time=%r, current=%r, dt=%r"
    628                % (st.st_mtime, current, st.st_mtime - current))
    629         self.assertAlmostEqual(st.st_mtime, current,
    630                                delta=delta, msg=msg)
    631 
    632     def test_utime_current(self):
    633         def set_time(filename):
    634             # Set to the current time in the new way
    635             os.utime(self.fname)
    636         self._test_utime_current(set_time)
    637 
    638     def test_utime_current_old(self):
    639         def set_time(filename):
    640             # Set to the current time in the old explicit way.
    641             os.utime(self.fname, None)
    642         self._test_utime_current(set_time)
    643 
    644     def get_file_system(self, path):
    645         if sys.platform == 'win32':
    646             root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
    647             import ctypes
    648             kernel32 = ctypes.windll.kernel32
    649             buf = ctypes.create_unicode_buffer("", 100)
    650             ok = kernel32.GetVolumeInformationW(root, None, 0,
    651                                                 None, None, None,
    652                                                 buf, len(buf))
    653             if ok:
    654                 return buf.value
    655         # return None if the filesystem is unknown
    656 
    657     def test_large_time(self):
    658         # Many filesystems are limited to the year 2038. At least, the test
    659         # pass with NTFS filesystem.
    660         if self.get_file_system(self.dirname) != "NTFS":
    661             self.skipTest("requires NTFS")
    662 
    663         large = 5000000000   # some day in 2128
    664         os.utime(self.fname, (large, large))
    665         self.assertEqual(os.stat(self.fname).st_mtime, large)
    666 
    667     def test_utime_invalid_arguments(self):
    668         # seconds and nanoseconds parameters are mutually exclusive
    669         with self.assertRaises(ValueError):
    670             os.utime(self.fname, (5, 5), ns=(5, 5))
    671 
    672 
    673 from test import mapping_tests
    674 
    675 class EnvironTests(mapping_tests.BasicTestMappingProtocol):
    676     """check that os.environ object conform to mapping protocol"""
    677     type2test = None
    678 
    679     def setUp(self):
    680         self.__save = dict(os.environ)
    681         if os.supports_bytes_environ:
    682             self.__saveb = dict(os.environb)
    683         for key, value in self._reference().items():
    684             os.environ[key] = value
    685 
    686     def tearDown(self):
    687         os.environ.clear()
    688         os.environ.update(self.__save)
    689         if os.supports_bytes_environ:
    690             os.environb.clear()
    691             os.environb.update(self.__saveb)
    692 
    693     def _reference(self):
    694         return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
    695 
    696     def _empty_mapping(self):
    697         os.environ.clear()
    698         return os.environ
    699 
    700     # Bug 1110478
    701     @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
    702                          'requires a shell')
    703     def test_update2(self):
    704         os.environ.clear()
    705         os.environ.update(HELLO="World")
    706         with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
    707             value = popen.read().strip()
    708             self.assertEqual(value, "World")
    709 
    710     @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
    711                          'requires a shell')
    712     def test_os_popen_iter(self):
    713         with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
    714                       % unix_shell) as popen:
    715             it = iter(popen)
    716             self.assertEqual(next(it), "line1\n")
    717             self.assertEqual(next(it), "line2\n")
    718             self.assertEqual(next(it), "line3\n")
    719             self.assertRaises(StopIteration, next, it)
    720 
    721     # Verify environ keys and values from the OS are of the
    722     # correct str type.
    723     def test_keyvalue_types(self):
    724         for key, val in os.environ.items():
    725             self.assertEqual(type(key), str)
    726             self.assertEqual(type(val), str)
    727 
    728     def test_items(self):
    729         for key, value in self._reference().items():
    730             self.assertEqual(os.environ.get(key), value)
    731 
    732     # Issue 7310
    733     def test___repr__(self):
    734         """Check that the repr() of os.environ looks like environ({...})."""
    735         env = os.environ
    736         self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
    737             '{!r}: {!r}'.format(key, value)
    738             for key, value in env.items())))
    739 
    740     def test_get_exec_path(self):
    741         defpath_list = os.defpath.split(os.pathsep)
    742         test_path = ['/monty', '/python', '', '/flying/circus']
    743         test_env = {'PATH': os.pathsep.join(test_path)}
    744 
    745         saved_environ = os.environ
    746         try:
    747             os.environ = dict(test_env)
    748             # Test that defaulting to os.environ works.
    749             self.assertSequenceEqual(test_path, os.get_exec_path())
    750             self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
    751         finally:
    752             os.environ = saved_environ
    753 
    754         # No PATH environment variable
    755         self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
    756         # Empty PATH environment variable
    757         self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
    758         # Supplied PATH environment variable
    759         self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
    760 
    761         if os.supports_bytes_environ:
    762             # env cannot contain 'PATH' and b'PATH' keys
    763             try:
    764                 # ignore BytesWarning warning
    765                 with warnings.catch_warnings(record=True):
    766                     mixed_env = {'PATH': '1', b'PATH': b'2'}
    767             except BytesWarning:
    768                 # mixed_env cannot be created with python -bb
    769                 pass
    770             else:
    771                 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
    772 
    773             # bytes key and/or value
    774             self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
    775                 ['abc'])
    776             self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
    777                 ['abc'])
    778             self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
    779                 ['abc'])
    780 
    781     @unittest.skipUnless(os.supports_bytes_environ,
    782                          "os.environb required for this test.")
    783     def test_environb(self):
    784         # os.environ -> os.environb
    785         value = 'euro\u20ac'
    786         try:
    787             value_bytes = value.encode(sys.getfilesystemencoding(),
    788                                        'surrogateescape')
    789         except UnicodeEncodeError:
    790             msg = "U+20AC character is not encodable to %s" % (
    791                 sys.getfilesystemencoding(),)
    792             self.skipTest(msg)
    793         os.environ['unicode'] = value
    794         self.assertEqual(os.environ['unicode'], value)
    795         self.assertEqual(os.environb[b'unicode'], value_bytes)
    796 
    797         # os.environb -> os.environ
    798         value = b'\xff'
    799         os.environb[b'bytes'] = value
    800         self.assertEqual(os.environb[b'bytes'], value)
    801         value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
    802         self.assertEqual(os.environ['bytes'], value_str)
    803 
    804     # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
    805     # #13415).
    806     @support.requires_freebsd_version(7)
    807     @support.requires_mac_ver(10, 6)
    808     def test_unset_error(self):
    809         if sys.platform == "win32":
    810             # an environment variable is limited to 32,767 characters
    811             key = 'x' * 50000
    812             self.assertRaises(ValueError, os.environ.__delitem__, key)
    813         else:
    814             # "=" is not allowed in a variable name
    815             key = 'key='
    816             self.assertRaises(OSError, os.environ.__delitem__, key)
    817 
    818     def test_key_type(self):
    819         missing = 'missingkey'
    820         self.assertNotIn(missing, os.environ)
    821 
    822         with self.assertRaises(KeyError) as cm:
    823             os.environ[missing]
    824         self.assertIs(cm.exception.args[0], missing)
    825         self.assertTrue(cm.exception.__suppress_context__)
    826 
    827         with self.assertRaises(KeyError) as cm:
    828             del os.environ[missing]
    829         self.assertIs(cm.exception.args[0], missing)
    830         self.assertTrue(cm.exception.__suppress_context__)
    831 
    832 
    833 class WalkTests(unittest.TestCase):
    834     """Tests for os.walk()."""
    835 
    836     # Wrapper to hide minor differences between os.walk and os.fwalk
    837     # to tests both functions with the same code base
    838     def walk(self, top, **kwargs):
    839         if 'follow_symlinks' in kwargs:
    840             kwargs['followlinks'] = kwargs.pop('follow_symlinks')
    841         return os.walk(top, **kwargs)
    842 
    843     def setUp(self):
    844         join = os.path.join
    845         self.addCleanup(support.rmtree, support.TESTFN)
    846 
    847         # Build:
    848         #     TESTFN/
    849         #       TEST1/              a file kid and two directory kids
    850         #         tmp1
    851         #         SUB1/             a file kid and a directory kid
    852         #           tmp2
    853         #           SUB11/          no kids
    854         #         SUB2/             a file kid and a dirsymlink kid
    855         #           tmp3
    856         #           SUB21/          not readable
    857         #             tmp5
    858         #           link/           a symlink to TESTFN.2
    859         #           broken_link
    860         #           broken_link2
    861         #           broken_link3
    862         #       TEST2/
    863         #         tmp4              a lone file
    864         self.walk_path = join(support.TESTFN, "TEST1")
    865         self.sub1_path = join(self.walk_path, "SUB1")
    866         self.sub11_path = join(self.sub1_path, "SUB11")
    867         sub2_path = join(self.walk_path, "SUB2")
    868         sub21_path = join(sub2_path, "SUB21")
    869         tmp1_path = join(self.walk_path, "tmp1")
    870         tmp2_path = join(self.sub1_path, "tmp2")
    871         tmp3_path = join(sub2_path, "tmp3")
    872         tmp5_path = join(sub21_path, "tmp3")
    873         self.link_path = join(sub2_path, "link")
    874         t2_path = join(support.TESTFN, "TEST2")
    875         tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
    876         broken_link_path = join(sub2_path, "broken_link")
    877         broken_link2_path = join(sub2_path, "broken_link2")
    878         broken_link3_path = join(sub2_path, "broken_link3")
    879 
    880         # Create stuff.
    881         os.makedirs(self.sub11_path)
    882         os.makedirs(sub2_path)
    883         os.makedirs(sub21_path)
    884         os.makedirs(t2_path)
    885 
    886         for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
    887             with open(path, "x") as f:
    888                 f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
    889 
    890         if support.can_symlink():
    891             os.symlink(os.path.abspath(t2_path), self.link_path)
    892             os.symlink('broken', broken_link_path, True)
    893             os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
    894             os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
    895             self.sub2_tree = (sub2_path, ["SUB21", "link"],
    896                               ["broken_link", "broken_link2", "broken_link3",
    897                                "tmp3"])
    898         else:
    899             self.sub2_tree = (sub2_path, [], ["tmp3"])
    900 
    901         os.chmod(sub21_path, 0)
    902         try:
    903             os.listdir(sub21_path)
    904         except PermissionError:
    905             self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
    906         else:
    907             os.chmod(sub21_path, stat.S_IRWXU)
    908             os.unlink(tmp5_path)
    909             os.rmdir(sub21_path)
    910             del self.sub2_tree[1][:1]
    911 
    912     def test_walk_topdown(self):
    913         # Walk top-down.
    914         all = list(self.walk(self.walk_path))
    915 
    916         self.assertEqual(len(all), 4)
    917         # We can't know which order SUB1 and SUB2 will appear in.
    918         # Not flipped:  TESTFN, SUB1, SUB11, SUB2
    919         #     flipped:  TESTFN, SUB2, SUB1, SUB11
    920         flipped = all[0][1][0] != "SUB1"
    921         all[0][1].sort()
    922         all[3 - 2 * flipped][-1].sort()
    923         all[3 - 2 * flipped][1].sort()
    924         self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
    925         self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
    926         self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
    927         self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
    928 
    929     def test_walk_prune(self, walk_path=None):
    930         if walk_path is None:
    931             walk_path = self.walk_path
    932         # Prune the search.
    933         all = []
    934         for root, dirs, files in self.walk(walk_path):
    935             all.append((root, dirs, files))
    936             # Don't descend into SUB1.
    937             if 'SUB1' in dirs:
    938                 # Note that this also mutates the dirs we appended to all!
    939                 dirs.remove('SUB1')
    940 
    941         self.assertEqual(len(all), 2)
    942         self.assertEqual(all[0],
    943                          (str(walk_path), ["SUB2"], ["tmp1"]))
    944 
    945         all[1][-1].sort()
    946         all[1][1].sort()
    947         self.assertEqual(all[1], self.sub2_tree)
    948 
    949     def test_file_like_path(self):
    950         self.test_walk_prune(_PathLike(self.walk_path))
    951 
    952     def test_walk_bottom_up(self):
    953         # Walk bottom-up.
    954         all = list(self.walk(self.walk_path, topdown=False))
    955 
    956         self.assertEqual(len(all), 4, all)
    957         # We can't know which order SUB1 and SUB2 will appear in.
    958         # Not flipped:  SUB11, SUB1, SUB2, TESTFN
    959         #     flipped:  SUB2, SUB11, SUB1, TESTFN
    960         flipped = all[3][1][0] != "SUB1"
    961         all[3][1].sort()
    962         all[2 - 2 * flipped][-1].sort()
    963         all[2 - 2 * flipped][1].sort()
    964         self.assertEqual(all[3],
    965                          (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
    966         self.assertEqual(all[flipped],
    967                          (self.sub11_path, [], []))
    968         self.assertEqual(all[flipped + 1],
    969                          (self.sub1_path, ["SUB11"], ["tmp2"]))
    970         self.assertEqual(all[2 - 2 * flipped],
    971                          self.sub2_tree)
    972 
    973     def test_walk_symlink(self):
    974         if not support.can_symlink():
    975             self.skipTest("need symlink support")
    976 
    977         # Walk, following symlinks.
    978         walk_it = self.walk(self.walk_path, follow_symlinks=True)
    979         for root, dirs, files in walk_it:
    980             if root == self.link_path:
    981                 self.assertEqual(dirs, [])
    982                 self.assertEqual(files, ["tmp4"])
    983                 break
    984         else:
    985             self.fail("Didn't follow symlink with followlinks=True")
    986 
    987     def test_walk_bad_dir(self):
    988         # Walk top-down.
    989         errors = []
    990         walk_it = self.walk(self.walk_path, onerror=errors.append)
    991         root, dirs, files = next(walk_it)
    992         self.assertEqual(errors, [])
    993         dir1 = 'SUB1'
    994         path1 = os.path.join(root, dir1)
    995         path1new = os.path.join(root, dir1 + '.new')
    996         os.rename(path1, path1new)
    997         try:
    998             roots = [r for r, d, f in walk_it]
    999             self.assertTrue(errors)
   1000             self.assertNotIn(path1, roots)
   1001             self.assertNotIn(path1new, roots)
   1002             for dir2 in dirs:
   1003                 if dir2 != dir1:
   1004                     self.assertIn(os.path.join(root, dir2), roots)
   1005         finally:
   1006             os.rename(path1new, path1)
   1007 
   1008 
   1009 @unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
   1010 class FwalkTests(WalkTests):
   1011     """Tests for os.fwalk()."""
   1012 
   1013     def walk(self, top, **kwargs):
   1014         for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
   1015             yield (root, dirs, files)
   1016 
   1017     def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
   1018         """
   1019         compare with walk() results.
   1020         """
   1021         walk_kwargs = walk_kwargs.copy()
   1022         fwalk_kwargs = fwalk_kwargs.copy()
   1023         for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
   1024             walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
   1025             fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
   1026 
   1027             expected = {}
   1028             for root, dirs, files in os.walk(**walk_kwargs):
   1029                 expected[root] = (set(dirs), set(files))
   1030 
   1031             for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
   1032                 self.assertIn(root, expected)
   1033                 self.assertEqual(expected[root], (set(dirs), set(files)))
   1034 
   1035     def test_compare_to_walk(self):
   1036         kwargs = {'top': support.TESTFN}
   1037         self._compare_to_walk(kwargs, kwargs)
   1038 
   1039     def test_dir_fd(self):
   1040         try:
   1041             fd = os.open(".", os.O_RDONLY)
   1042             walk_kwargs = {'top': support.TESTFN}
   1043             fwalk_kwargs = walk_kwargs.copy()
   1044             fwalk_kwargs['dir_fd'] = fd
   1045             self._compare_to_walk(walk_kwargs, fwalk_kwargs)
   1046         finally:
   1047             os.close(fd)
   1048 
   1049     def test_yields_correct_dir_fd(self):
   1050         # check returned file descriptors
   1051         for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
   1052             args = support.TESTFN, topdown, None
   1053             for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
   1054                 # check that the FD is valid
   1055                 os.fstat(rootfd)
   1056                 # redundant check
   1057                 os.stat(rootfd)
   1058                 # check that listdir() returns consistent information
   1059                 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
   1060 
   1061     def test_fd_leak(self):
   1062         # Since we're opening a lot of FDs, we must be careful to avoid leaks:
   1063         # we both check that calling fwalk() a large number of times doesn't
   1064         # yield EMFILE, and that the minimum allocated FD hasn't changed.
   1065         minfd = os.dup(1)
   1066         os.close(minfd)
   1067         for i in range(256):
   1068             for x in os.fwalk(support.TESTFN):
   1069                 pass
   1070         newfd = os.dup(1)
   1071         self.addCleanup(os.close, newfd)
   1072         self.assertEqual(newfd, minfd)
   1073 
   1074 class BytesWalkTests(WalkTests):
   1075     """Tests for os.walk() with bytes."""
   1076     def setUp(self):
   1077         super().setUp()
   1078         self.stack = contextlib.ExitStack()
   1079 
   1080     def tearDown(self):
   1081         self.stack.close()
   1082         super().tearDown()
   1083 
   1084     def walk(self, top, **kwargs):
   1085         if 'follow_symlinks' in kwargs:
   1086             kwargs['followlinks'] = kwargs.pop('follow_symlinks')
   1087         for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
   1088             root = os.fsdecode(broot)
   1089             dirs = list(map(os.fsdecode, bdirs))
   1090             files = list(map(os.fsdecode, bfiles))
   1091             yield (root, dirs, files)
   1092             bdirs[:] = list(map(os.fsencode, dirs))
   1093             bfiles[:] = list(map(os.fsencode, files))
   1094 
   1095 
   1096 class MakedirTests(unittest.TestCase):
   1097     def setUp(self):
   1098         os.mkdir(support.TESTFN)
   1099 
   1100     def test_makedir(self):
   1101         base = support.TESTFN
   1102         path = os.path.join(base, 'dir1', 'dir2', 'dir3')
   1103         os.makedirs(path)             # Should work
   1104         path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
   1105         os.makedirs(path)
   1106 
   1107         # Try paths with a '.' in them
   1108         self.assertRaises(OSError, os.makedirs, os.curdir)
   1109         path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
   1110         os.makedirs(path)
   1111         path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
   1112                             'dir5', 'dir6')
   1113         os.makedirs(path)
   1114 
   1115     def test_exist_ok_existing_directory(self):
   1116         path = os.path.join(support.TESTFN, 'dir1')
   1117         mode = 0o777
   1118         old_mask = os.umask(0o022)
   1119         os.makedirs(path, mode)
   1120         self.assertRaises(OSError, os.makedirs, path, mode)
   1121         self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
   1122         os.makedirs(path, 0o776, exist_ok=True)
   1123         os.makedirs(path, mode=mode, exist_ok=True)
   1124         os.umask(old_mask)
   1125 
   1126         # Issue #25583: A drive root could raise PermissionError on Windows
   1127         os.makedirs(os.path.abspath('/'), exist_ok=True)
   1128 
   1129     def test_exist_ok_s_isgid_directory(self):
   1130         path = os.path.join(support.TESTFN, 'dir1')
   1131         S_ISGID = stat.S_ISGID
   1132         mode = 0o777
   1133         old_mask = os.umask(0o022)
   1134         try:
   1135             existing_testfn_mode = stat.S_IMODE(
   1136                     os.lstat(support.TESTFN).st_mode)
   1137             try:
   1138                 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
   1139             except PermissionError:
   1140                 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
   1141             if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
   1142                 raise unittest.SkipTest('No support for S_ISGID dir mode.')
   1143             # The os should apply S_ISGID from the parent dir for us, but
   1144             # this test need not depend on that behavior.  Be explicit.
   1145             os.makedirs(path, mode | S_ISGID)
   1146             # http://bugs.python.org/issue14992
   1147             # Should not fail when the bit is already set.
   1148             os.makedirs(path, mode, exist_ok=True)
   1149             # remove the bit.
   1150             os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
   1151             # May work even when the bit is not already set when demanded.
   1152             os.makedirs(path, mode | S_ISGID, exist_ok=True)
   1153         finally:
   1154             os.umask(old_mask)
   1155 
   1156     def test_exist_ok_existing_regular_file(self):
   1157         base = support.TESTFN
   1158         path = os.path.join(support.TESTFN, 'dir1')
   1159         f = open(path, 'w')
   1160         f.write('abc')
   1161         f.close()
   1162         self.assertRaises(OSError, os.makedirs, path)
   1163         self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
   1164         self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
   1165         os.remove(path)
   1166 
   1167     def tearDown(self):
   1168         path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
   1169                             'dir4', 'dir5', 'dir6')
   1170         # If the tests failed, the bottom-most directory ('../dir6')
   1171         # may not have been created, so we look for the outermost directory
   1172         # that exists.
   1173         while not os.path.exists(path) and path != support.TESTFN:
   1174             path = os.path.dirname(path)
   1175 
   1176         os.removedirs(path)
   1177 
   1178 
   1179 @unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
   1180 class ChownFileTests(unittest.TestCase):
   1181 
   1182     @classmethod
   1183     def setUpClass(cls):
   1184         os.mkdir(support.TESTFN)
   1185 
   1186     def test_chown_uid_gid_arguments_must_be_index(self):
   1187         stat = os.stat(support.TESTFN)
   1188         uid = stat.st_uid
   1189         gid = stat.st_gid
   1190         for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
   1191             self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
   1192             self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
   1193         self.assertIsNone(os.chown(support.TESTFN, uid, gid))
   1194         self.assertIsNone(os.chown(support.TESTFN, -1, -1))
   1195 
   1196     @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
   1197     def test_chown(self):
   1198         gid_1, gid_2 = groups[:2]
   1199         uid = os.stat(support.TESTFN).st_uid
   1200         os.chown(support.TESTFN, uid, gid_1)
   1201         gid = os.stat(support.TESTFN).st_gid
   1202         self.assertEqual(gid, gid_1)
   1203         os.chown(support.TESTFN, uid, gid_2)
   1204         gid = os.stat(support.TESTFN).st_gid
   1205         self.assertEqual(gid, gid_2)
   1206 
   1207     @unittest.skipUnless(root_in_posix and len(all_users) > 1,
   1208                          "test needs root privilege and more than one user")
   1209     def test_chown_with_root(self):
   1210         uid_1, uid_2 = all_users[:2]
   1211         gid = os.stat(support.TESTFN).st_gid
   1212         os.chown(support.TESTFN, uid_1, gid)
   1213         uid = os.stat(support.TESTFN).st_uid
   1214         self.assertEqual(uid, uid_1)
   1215         os.chown(support.TESTFN, uid_2, gid)
   1216         uid = os.stat(support.TESTFN).st_uid
   1217         self.assertEqual(uid, uid_2)
   1218 
   1219     @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
   1220                          "test needs non-root account and more than one user")
   1221     def test_chown_without_permission(self):
   1222         uid_1, uid_2 = all_users[:2]
   1223         gid = os.stat(support.TESTFN).st_gid
   1224         with self.assertRaises(PermissionError):
   1225             os.chown(support.TESTFN, uid_1, gid)
   1226             os.chown(support.TESTFN, uid_2, gid)
   1227 
   1228     @classmethod
   1229     def tearDownClass(cls):
   1230         os.rmdir(support.TESTFN)
   1231 
   1232 
   1233 class RemoveDirsTests(unittest.TestCase):
   1234     def setUp(self):
   1235         os.makedirs(support.TESTFN)
   1236 
   1237     def tearDown(self):
   1238         support.rmtree(support.TESTFN)
   1239 
   1240     def test_remove_all(self):
   1241         dira = os.path.join(support.TESTFN, 'dira')
   1242         os.mkdir(dira)
   1243         dirb = os.path.join(dira, 'dirb')
   1244         os.mkdir(dirb)
   1245         os.removedirs(dirb)
   1246         self.assertFalse(os.path.exists(dirb))
   1247         self.assertFalse(os.path.exists(dira))
   1248         self.assertFalse(os.path.exists(support.TESTFN))
   1249 
   1250     def test_remove_partial(self):
   1251         dira = os.path.join(support.TESTFN, 'dira')
   1252         os.mkdir(dira)
   1253         dirb = os.path.join(dira, 'dirb')
   1254         os.mkdir(dirb)
   1255         create_file(os.path.join(dira, 'file.txt'))
   1256         os.removedirs(dirb)
   1257         self.assertFalse(os.path.exists(dirb))
   1258         self.assertTrue(os.path.exists(dira))
   1259         self.assertTrue(os.path.exists(support.TESTFN))
   1260 
   1261     def test_remove_nothing(self):
   1262         dira = os.path.join(support.TESTFN, 'dira')
   1263         os.mkdir(dira)
   1264         dirb = os.path.join(dira, 'dirb')
   1265         os.mkdir(dirb)
   1266         create_file(os.path.join(dirb, 'file.txt'))
   1267         with self.assertRaises(OSError):
   1268             os.removedirs(dirb)
   1269         self.assertTrue(os.path.exists(dirb))
   1270         self.assertTrue(os.path.exists(dira))
   1271         self.assertTrue(os.path.exists(support.TESTFN))
   1272 
   1273 
   1274 class DevNullTests(unittest.TestCase):
   1275     def test_devnull(self):
   1276         with open(os.devnull, 'wb', 0) as f:
   1277             f.write(b'hello')
   1278             f.close()
   1279         with open(os.devnull, 'rb') as f:
   1280             self.assertEqual(f.read(), b'')
   1281 
   1282 
   1283 class URandomTests(unittest.TestCase):
   1284     def test_urandom_length(self):
   1285         self.assertEqual(len(os.urandom(0)), 0)
   1286         self.assertEqual(len(os.urandom(1)), 1)
   1287         self.assertEqual(len(os.urandom(10)), 10)
   1288         self.assertEqual(len(os.urandom(100)), 100)
   1289         self.assertEqual(len(os.urandom(1000)), 1000)
   1290 
   1291     def test_urandom_value(self):
   1292         data1 = os.urandom(16)
   1293         self.assertIsInstance(data1, bytes)
   1294         data2 = os.urandom(16)
   1295         self.assertNotEqual(data1, data2)
   1296 
   1297     def get_urandom_subprocess(self, count):
   1298         code = '\n'.join((
   1299             'import os, sys',
   1300             'data = os.urandom(%s)' % count,
   1301             'sys.stdout.buffer.write(data)',
   1302             'sys.stdout.buffer.flush()'))
   1303         out = assert_python_ok('-c', code)
   1304         stdout = out[1]
   1305         self.assertEqual(len(stdout), 16)
   1306         return stdout
   1307 
   1308     def test_urandom_subprocess(self):
   1309         data1 = self.get_urandom_subprocess(16)
   1310         data2 = self.get_urandom_subprocess(16)
   1311         self.assertNotEqual(data1, data2)
   1312 
   1313 
   1314 @unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
   1315 class GetRandomTests(unittest.TestCase):
   1316     @classmethod
   1317     def setUpClass(cls):
   1318         try:
   1319             os.getrandom(1)
   1320         except OSError as exc:
   1321             if exc.errno == errno.ENOSYS:
   1322                 # Python compiled on a more recent Linux version
   1323                 # than the current Linux kernel
   1324                 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
   1325             else:
   1326                 raise
   1327 
   1328     def test_getrandom_type(self):
   1329         data = os.getrandom(16)
   1330         self.assertIsInstance(data, bytes)
   1331         self.assertEqual(len(data), 16)
   1332 
   1333     def test_getrandom0(self):
   1334         empty = os.getrandom(0)
   1335         self.assertEqual(empty, b'')
   1336 
   1337     def test_getrandom_random(self):
   1338         self.assertTrue(hasattr(os, 'GRND_RANDOM'))
   1339 
   1340         # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
   1341         # resource /dev/random
   1342 
   1343     def test_getrandom_nonblock(self):
   1344         # The call must not fail. Check also that the flag exists
   1345         try:
   1346             os.getrandom(1, os.GRND_NONBLOCK)
   1347         except BlockingIOError:
   1348             # System urandom is not initialized yet
   1349             pass
   1350 
   1351     def test_getrandom_value(self):
   1352         data1 = os.getrandom(16)
   1353         data2 = os.getrandom(16)
   1354         self.assertNotEqual(data1, data2)
   1355 
   1356 
   1357 # os.urandom() doesn't use a file descriptor when it is implemented with the
   1358 # getentropy() function, the getrandom() function or the getrandom() syscall
   1359 OS_URANDOM_DONT_USE_FD = (
   1360     sysconfig.get_config_var('HAVE_GETENTROPY') == 1
   1361     or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
   1362     or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
   1363 
   1364 @unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
   1365                  "os.random() does not use a file descriptor")
   1366 class URandomFDTests(unittest.TestCase):
   1367     @unittest.skipUnless(resource, "test requires the resource module")
   1368     def test_urandom_failure(self):
   1369         # Check urandom() failing when it is not able to open /dev/random.
   1370         # We spawn a new process to make the test more robust (if getrlimit()
   1371         # failed to restore the file descriptor limit after this, the whole
   1372         # test suite would crash; this actually happened on the OS X Tiger
   1373         # buildbot).
   1374         code = """if 1:
   1375             import errno
   1376             import os
   1377             import resource
   1378 
   1379             soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
   1380             resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
   1381             try:
   1382                 os.urandom(16)
   1383             except OSError as e:
   1384                 assert e.errno == errno.EMFILE, e.errno
   1385             else:
   1386                 raise AssertionError("OSError not raised")
   1387             """
   1388         assert_python_ok('-c', code)
   1389 
   1390     def test_urandom_fd_closed(self):
   1391         # Issue #21207: urandom() should reopen its fd to /dev/urandom if
   1392         # closed.
   1393         code = """if 1:
   1394             import os
   1395             import sys
   1396             import test.support
   1397             os.urandom(4)
   1398             with test.support.SuppressCrashReport():
   1399                 os.closerange(3, 256)
   1400             sys.stdout.buffer.write(os.urandom(4))
   1401             """
   1402         rc, out, err = assert_python_ok('-Sc', code)
   1403 
   1404     def test_urandom_fd_reopened(self):
   1405         # Issue #21207: urandom() should detect its fd to /dev/urandom
   1406         # changed to something else, and reopen it.
   1407         self.addCleanup(support.unlink, support.TESTFN)
   1408         create_file(support.TESTFN, b"x" * 256)
   1409 
   1410         code = """if 1:
   1411             import os
   1412             import sys
   1413             import test.support
   1414             os.urandom(4)
   1415             with test.support.SuppressCrashReport():
   1416                 for fd in range(3, 256):
   1417                     try:
   1418                         os.close(fd)
   1419                     except OSError:
   1420                         pass
   1421                     else:
   1422                         # Found the urandom fd (XXX hopefully)
   1423                         break
   1424                 os.closerange(3, 256)
   1425             with open({TESTFN!r}, 'rb') as f:
   1426                 new_fd = f.fileno()
   1427                 # Issue #26935: posix allows new_fd and fd to be equal but
   1428                 # some libc implementations have dup2 return an error in this
   1429                 # case.
   1430                 if new_fd != fd:
   1431                     os.dup2(new_fd, fd)
   1432                 sys.stdout.buffer.write(os.urandom(4))
   1433                 sys.stdout.buffer.write(os.urandom(4))
   1434             """.format(TESTFN=support.TESTFN)
   1435         rc, out, err = assert_python_ok('-Sc', code)
   1436         self.assertEqual(len(out), 8)
   1437         self.assertNotEqual(out[0:4], out[4:8])
   1438         rc, out2, err2 = assert_python_ok('-Sc', code)
   1439         self.assertEqual(len(out2), 8)
   1440         self.assertNotEqual(out2, out)
   1441 
   1442 
   1443 @contextlib.contextmanager
   1444 def _execvpe_mockup(defpath=None):
   1445     """
   1446     Stubs out execv and execve functions when used as context manager.
   1447     Records exec calls. The mock execv and execve functions always raise an
   1448     exception as they would normally never return.
   1449     """
   1450     # A list of tuples containing (function name, first arg, args)
   1451     # of calls to execv or execve that have been made.
   1452     calls = []
   1453 
   1454     def mock_execv(name, *args):
   1455         calls.append(('execv', name, args))
   1456         raise RuntimeError("execv called")
   1457 
   1458     def mock_execve(name, *args):
   1459         calls.append(('execve', name, args))
   1460         raise OSError(errno.ENOTDIR, "execve called")
   1461 
   1462     try:
   1463         orig_execv = os.execv
   1464         orig_execve = os.execve
   1465         orig_defpath = os.defpath
   1466         os.execv = mock_execv
   1467         os.execve = mock_execve
   1468         if defpath is not None:
   1469             os.defpath = defpath
   1470         yield calls
   1471     finally:
   1472         os.execv = orig_execv
   1473         os.execve = orig_execve
   1474         os.defpath = orig_defpath
   1475 
   1476 
   1477 class ExecTests(unittest.TestCase):
   1478     @unittest.skipIf(USING_LINUXTHREADS,
   1479                      "avoid triggering a linuxthreads bug: see issue #4970")
   1480     def test_execvpe_with_bad_program(self):
   1481         self.assertRaises(OSError, os.execvpe, 'no such app-',
   1482                           ['no such app-'], None)
   1483 
   1484     def test_execv_with_bad_arglist(self):
   1485         self.assertRaises(ValueError, os.execv, 'notepad', ())
   1486         self.assertRaises(ValueError, os.execv, 'notepad', [])
   1487         self.assertRaises(ValueError, os.execv, 'notepad', ('',))
   1488         self.assertRaises(ValueError, os.execv, 'notepad', [''])
   1489 
   1490     def test_execvpe_with_bad_arglist(self):
   1491         self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
   1492         self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
   1493         self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
   1494 
   1495     @unittest.skipUnless(hasattr(os, '_execvpe'),
   1496                          "No internal os._execvpe function to test.")
   1497     def _test_internal_execvpe(self, test_type):
   1498         program_path = os.sep + 'absolutepath'
   1499         if test_type is bytes:
   1500             program = b'executable'
   1501             fullpath = os.path.join(os.fsencode(program_path), program)
   1502             native_fullpath = fullpath
   1503             arguments = [b'progname', 'arg1', 'arg2']
   1504         else:
   1505             program = 'executable'
   1506             arguments = ['progname', 'arg1', 'arg2']
   1507             fullpath = os.path.join(program_path, program)
   1508             if os.name != "nt":
   1509                 native_fullpath = os.fsencode(fullpath)
   1510             else:
   1511                 native_fullpath = fullpath
   1512         env = {'spam': 'beans'}
   1513 
   1514         # test os._execvpe() with an absolute path
   1515         with _execvpe_mockup() as calls:
   1516             self.assertRaises(RuntimeError,
   1517                 os._execvpe, fullpath, arguments)
   1518             self.assertEqual(len(calls), 1)
   1519             self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
   1520 
   1521         # test os._execvpe() with a relative path:
   1522         # os.get_exec_path() returns defpath
   1523         with _execvpe_mockup(defpath=program_path) as calls:
   1524             self.assertRaises(OSError,
   1525                 os._execvpe, program, arguments, env=env)
   1526             self.assertEqual(len(calls), 1)
   1527             self.assertSequenceEqual(calls[0],
   1528                 ('execve', native_fullpath, (arguments, env)))
   1529 
   1530         # test os._execvpe() with a relative path:
   1531         # os.get_exec_path() reads the 'PATH' variable
   1532         with _execvpe_mockup() as calls:
   1533             env_path = env.copy()
   1534             if test_type is bytes:
   1535                 env_path[b'PATH'] = program_path
   1536             else:
   1537                 env_path['PATH'] = program_path
   1538             self.assertRaises(OSError,
   1539                 os._execvpe, program, arguments, env=env_path)
   1540             self.assertEqual(len(calls), 1)
   1541             self.assertSequenceEqual(calls[0],
   1542                 ('execve', native_fullpath, (arguments, env_path)))
   1543 
   1544     def test_internal_execvpe_str(self):
   1545         self._test_internal_execvpe(str)
   1546         if os.name != "nt":
   1547             self._test_internal_execvpe(bytes)
   1548 
   1549 
   1550 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
   1551 class Win32ErrorTests(unittest.TestCase):
   1552     def setUp(self):
   1553         try:
   1554             os.stat(support.TESTFN)
   1555         except FileNotFoundError:
   1556             exists = False
   1557         except OSError as exc:
   1558             exists = True
   1559             self.fail("file %s must not exist; os.stat failed with %s"
   1560                       % (support.TESTFN, exc))
   1561         else:
   1562             self.fail("file %s must not exist" % support.TESTFN)
   1563 
   1564     def test_rename(self):
   1565         self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
   1566 
   1567     def test_remove(self):
   1568         self.assertRaises(OSError, os.remove, support.TESTFN)
   1569 
   1570     def test_chdir(self):
   1571         self.assertRaises(OSError, os.chdir, support.TESTFN)
   1572 
   1573     def test_mkdir(self):
   1574         self.addCleanup(support.unlink, support.TESTFN)
   1575 
   1576         with open(support.TESTFN, "x") as f:
   1577             self.assertRaises(OSError, os.mkdir, support.TESTFN)
   1578 
   1579     def test_utime(self):
   1580         self.assertRaises(OSError, os.utime, support.TESTFN, None)
   1581 
   1582     def test_chmod(self):
   1583         self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
   1584 
   1585 
   1586 class TestInvalidFD(unittest.TestCase):
   1587     singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
   1588                "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
   1589     #singles.append("close")
   1590     #We omit close because it doesn't raise an exception on some platforms
   1591     def get_single(f):
   1592         def helper(self):
   1593             if  hasattr(os, f):
   1594                 self.check(getattr(os, f))
   1595         return helper
   1596     for f in singles:
   1597         locals()["test_"+f] = get_single(f)
   1598 
   1599     def check(self, f, *args):
   1600         try:
   1601             f(support.make_bad_fd(), *args)
   1602         except OSError as e:
   1603             self.assertEqual(e.errno, errno.EBADF)
   1604         else:
   1605             self.fail("%r didn't raise an OSError with a bad file descriptor"
   1606                       % f)
   1607 
   1608     @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
   1609     def test_isatty(self):
   1610         self.assertEqual(os.isatty(support.make_bad_fd()), False)
   1611 
   1612     @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
   1613     def test_closerange(self):
   1614         fd = support.make_bad_fd()
   1615         # Make sure none of the descriptors we are about to close are
   1616         # currently valid (issue 6542).
   1617         for i in range(10):
   1618             try: os.fstat(fd+i)
   1619             except OSError:
   1620                 pass
   1621             else:
   1622                 break
   1623         if i < 2:
   1624             raise unittest.SkipTest(
   1625                 "Unable to acquire a range of invalid file descriptors")
   1626         self.assertEqual(os.closerange(fd, fd + i-1), None)
   1627 
   1628     @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
   1629     def test_dup2(self):
   1630         self.check(os.dup2, 20)
   1631 
   1632     @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
   1633     def test_fchmod(self):
   1634         self.check(os.fchmod, 0)
   1635 
   1636     @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
   1637     def test_fchown(self):
   1638         self.check(os.fchown, -1, -1)
   1639 
   1640     @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
   1641     def test_fpathconf(self):
   1642         self.check(os.pathconf, "PC_NAME_MAX")
   1643         self.check(os.fpathconf, "PC_NAME_MAX")
   1644 
   1645     @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
   1646     def test_ftruncate(self):
   1647         self.check(os.truncate, 0)
   1648         self.check(os.ftruncate, 0)
   1649 
   1650     @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
   1651     def test_lseek(self):
   1652         self.check(os.lseek, 0, 0)
   1653 
   1654     @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
   1655     def test_read(self):
   1656         self.check(os.read, 1)
   1657 
   1658     @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
   1659     def test_readv(self):
   1660         buf = bytearray(10)
   1661         self.check(os.readv, [buf])
   1662 
   1663     @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
   1664     def test_tcsetpgrpt(self):
   1665         self.check(os.tcsetpgrp, 0)
   1666 
   1667     @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
   1668     def test_write(self):
   1669         self.check(os.write, b" ")
   1670 
   1671     @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
   1672     def test_writev(self):
   1673         self.check(os.writev, [b'abc'])
   1674 
   1675     def test_inheritable(self):
   1676         self.check(os.get_inheritable)
   1677         self.check(os.set_inheritable, True)
   1678 
   1679     @unittest.skipUnless(hasattr(os, 'get_blocking'),
   1680                          'needs os.get_blocking() and os.set_blocking()')
   1681     def test_blocking(self):
   1682         self.check(os.get_blocking)
   1683         self.check(os.set_blocking, True)
   1684 
   1685 
   1686 class LinkTests(unittest.TestCase):
   1687     def setUp(self):
   1688         self.file1 = support.TESTFN
   1689         self.file2 = os.path.join(support.TESTFN + "2")
   1690 
   1691     def tearDown(self):
   1692         for file in (self.file1, self.file2):
   1693             if os.path.exists(file):
   1694                 os.unlink(file)
   1695 
   1696     def _test_link(self, file1, file2):
   1697         create_file(file1)
   1698 
   1699         os.link(file1, file2)
   1700         with open(file1, "r") as f1, open(file2, "r") as f2:
   1701             self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
   1702 
   1703     def test_link(self):
   1704         self._test_link(self.file1, self.file2)
   1705 
   1706     def test_link_bytes(self):
   1707         self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
   1708                         bytes(self.file2, sys.getfilesystemencoding()))
   1709 
   1710     def test_unicode_name(self):
   1711         try:
   1712             os.fsencode("\xf1")
   1713         except UnicodeError:
   1714             raise unittest.SkipTest("Unable to encode for this platform.")
   1715 
   1716         self.file1 += "\xf1"
   1717         self.file2 = self.file1 + "2"
   1718         self._test_link(self.file1, self.file2)
   1719 
   1720 @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
   1721 class PosixUidGidTests(unittest.TestCase):
   1722     @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
   1723     def test_setuid(self):
   1724         if os.getuid() != 0:
   1725             self.assertRaises(OSError, os.setuid, 0)
   1726         self.assertRaises(OverflowError, os.setuid, 1<<32)
   1727 
   1728     @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
   1729     def test_setgid(self):
   1730         if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
   1731             self.assertRaises(OSError, os.setgid, 0)
   1732         self.assertRaises(OverflowError, os.setgid, 1<<32)
   1733 
   1734     @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
   1735     def test_seteuid(self):
   1736         if os.getuid() != 0:
   1737             self.assertRaises(OSError, os.seteuid, 0)
   1738         self.assertRaises(OverflowError, os.seteuid, 1<<32)
   1739 
   1740     @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
   1741     def test_setegid(self):
   1742         if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
   1743             self.assertRaises(OSError, os.setegid, 0)
   1744         self.assertRaises(OverflowError, os.setegid, 1<<32)
   1745 
   1746     @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
   1747     def test_setreuid(self):
   1748         if os.getuid() != 0:
   1749             self.assertRaises(OSError, os.setreuid, 0, 0)
   1750         self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
   1751         self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
   1752 
   1753     @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
   1754     def test_setreuid_neg1(self):
   1755         # Needs to accept -1.  We run this in a subprocess to avoid
   1756         # altering the test runner's process state (issue8045).
   1757         subprocess.check_call([
   1758                 sys.executable, '-c',
   1759                 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
   1760 
   1761     @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
   1762     def test_setregid(self):
   1763         if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
   1764             self.assertRaises(OSError, os.setregid, 0, 0)
   1765         self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
   1766         self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
   1767 
   1768     @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
   1769     def test_setregid_neg1(self):
   1770         # Needs to accept -1.  We run this in a subprocess to avoid
   1771         # altering the test runner's process state (issue8045).
   1772         subprocess.check_call([
   1773                 sys.executable, '-c',
   1774                 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
   1775 
   1776 @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
   1777 class Pep383Tests(unittest.TestCase):
   1778     def setUp(self):
   1779         if support.TESTFN_UNENCODABLE:
   1780             self.dir = support.TESTFN_UNENCODABLE
   1781         elif support.TESTFN_NONASCII:
   1782             self.dir = support.TESTFN_NONASCII
   1783         else:
   1784             self.dir = support.TESTFN
   1785         self.bdir = os.fsencode(self.dir)
   1786 
   1787         bytesfn = []
   1788         def add_filename(fn):
   1789             try:
   1790                 fn = os.fsencode(fn)
   1791             except UnicodeEncodeError:
   1792                 return
   1793             bytesfn.append(fn)
   1794         add_filename(support.TESTFN_UNICODE)
   1795         if support.TESTFN_UNENCODABLE:
   1796             add_filename(support.TESTFN_UNENCODABLE)
   1797         if support.TESTFN_NONASCII:
   1798             add_filename(support.TESTFN_NONASCII)
   1799         if not bytesfn:
   1800             self.skipTest("couldn't create any non-ascii filename")
   1801 
   1802         self.unicodefn = set()
   1803         os.mkdir(self.dir)
   1804         try:
   1805             for fn in bytesfn:
   1806                 support.create_empty_file(os.path.join(self.bdir, fn))
   1807                 fn = os.fsdecode(fn)
   1808                 if fn in self.unicodefn:
   1809                     raise ValueError("duplicate filename")
   1810                 self.unicodefn.add(fn)
   1811         except:
   1812             shutil.rmtree(self.dir)
   1813             raise
   1814 
   1815     def tearDown(self):
   1816         shutil.rmtree(self.dir)
   1817 
   1818     def test_listdir(self):
   1819         expected = self.unicodefn
   1820         found = set(os.listdir(self.dir))
   1821         self.assertEqual(found, expected)
   1822         # test listdir without arguments
   1823         current_directory = os.getcwd()
   1824         try:
   1825             os.chdir(os.sep)
   1826             self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
   1827         finally:
   1828             os.chdir(current_directory)
   1829 
   1830     def test_open(self):
   1831         for fn in self.unicodefn:
   1832             f = open(os.path.join(self.dir, fn), 'rb')
   1833             f.close()
   1834 
   1835     @unittest.skipUnless(hasattr(os, 'statvfs'),
   1836                             "need os.statvfs()")
   1837     def test_statvfs(self):
   1838         # issue #9645
   1839         for fn in self.unicodefn:
   1840             # should not fail with file not found error
   1841             fullname = os.path.join(self.dir, fn)
   1842             os.statvfs(fullname)
   1843 
   1844     def test_stat(self):
   1845         for fn in self.unicodefn:
   1846             os.stat(os.path.join(self.dir, fn))
   1847 
   1848 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
   1849 class Win32KillTests(unittest.TestCase):
   1850     def _kill(self, sig):
   1851         # Start sys.executable as a subprocess and communicate from the
   1852         # subprocess to the parent that the interpreter is ready. When it
   1853         # becomes ready, send *sig* via os.kill to the subprocess and check
   1854         # that the return code is equal to *sig*.
   1855         import ctypes
   1856         from ctypes import wintypes
   1857         import msvcrt
   1858 
   1859         # Since we can't access the contents of the process' stdout until the
   1860         # process has exited, use PeekNamedPipe to see what's inside stdout
   1861         # without waiting. This is done so we can tell that the interpreter
   1862         # is started and running at a point where it could handle a signal.
   1863         PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
   1864         PeekNamedPipe.restype = wintypes.BOOL
   1865         PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
   1866                                   ctypes.POINTER(ctypes.c_char), # stdout buf
   1867                                   wintypes.DWORD, # Buffer size
   1868                                   ctypes.POINTER(wintypes.DWORD), # bytes read
   1869                                   ctypes.POINTER(wintypes.DWORD), # bytes avail
   1870                                   ctypes.POINTER(wintypes.DWORD)) # bytes left
   1871         msg = "running"
   1872         proc = subprocess.Popen([sys.executable, "-c",
   1873                                  "import sys;"
   1874                                  "sys.stdout.write('{}');"
   1875                                  "sys.stdout.flush();"
   1876                                  "input()".format(msg)],
   1877                                 stdout=subprocess.PIPE,
   1878                                 stderr=subprocess.PIPE,
   1879                                 stdin=subprocess.PIPE)
   1880         self.addCleanup(proc.stdout.close)
   1881         self.addCleanup(proc.stderr.close)
   1882         self.addCleanup(proc.stdin.close)
   1883 
   1884         count, max = 0, 100
   1885         while count < max and proc.poll() is None:
   1886             # Create a string buffer to store the result of stdout from the pipe
   1887             buf = ctypes.create_string_buffer(len(msg))
   1888             # Obtain the text currently in proc.stdout
   1889             # Bytes read/avail/left are left as NULL and unused
   1890             rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
   1891                                  buf, ctypes.sizeof(buf), None, None, None)
   1892             self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
   1893             if buf.value:
   1894                 self.assertEqual(msg, buf.value.decode())
   1895                 break
   1896             time.sleep(0.1)
   1897             count += 1
   1898         else:
   1899             self.fail("Did not receive communication from the subprocess")
   1900 
   1901         os.kill(proc.pid, sig)
   1902         self.assertEqual(proc.wait(), sig)
   1903 
   1904     def test_kill_sigterm(self):
   1905         # SIGTERM doesn't mean anything special, but make sure it works
   1906         self._kill(signal.SIGTERM)
   1907 
   1908     def test_kill_int(self):
   1909         # os.kill on Windows can take an int which gets set as the exit code
   1910         self._kill(100)
   1911 
   1912     def _kill_with_event(self, event, name):
   1913         tagname = "test_os_%s" % uuid.uuid1()
   1914         m = mmap.mmap(-1, 1, tagname)
   1915         m[0] = 0
   1916         # Run a script which has console control handling enabled.
   1917         proc = subprocess.Popen([sys.executable,
   1918                    os.path.join(os.path.dirname(__file__),
   1919                                 "win_console_handler.py"), tagname],
   1920                    creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
   1921         # Let the interpreter startup before we send signals. See #3137.
   1922         count, max = 0, 100
   1923         while count < max and proc.poll() is None:
   1924             if m[0] == 1:
   1925                 break
   1926             time.sleep(0.1)
   1927             count += 1
   1928         else:
   1929             # Forcefully kill the process if we weren't able to signal it.
   1930             os.kill(proc.pid, signal.SIGINT)
   1931             self.fail("Subprocess didn't finish initialization")
   1932         os.kill(proc.pid, event)
   1933         # proc.send_signal(event) could also be done here.
   1934         # Allow time for the signal to be passed and the process to exit.
   1935         time.sleep(0.5)
   1936         if not proc.poll():
   1937             # Forcefully kill the process if we weren't able to signal it.
   1938             os.kill(proc.pid, signal.SIGINT)
   1939             self.fail("subprocess did not stop on {}".format(name))
   1940 
   1941     @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
   1942     def test_CTRL_C_EVENT(self):
   1943         from ctypes import wintypes
   1944         import ctypes
   1945 
   1946         # Make a NULL value by creating a pointer with no argument.
   1947         NULL = ctypes.POINTER(ctypes.c_int)()
   1948         SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
   1949         SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
   1950                                           wintypes.BOOL)
   1951         SetConsoleCtrlHandler.restype = wintypes.BOOL
   1952 
   1953         # Calling this with NULL and FALSE causes the calling process to
   1954         # handle Ctrl+C, rather than ignore it. This property is inherited
   1955         # by subprocesses.
   1956         SetConsoleCtrlHandler(NULL, 0)
   1957 
   1958         self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
   1959 
   1960     def test_CTRL_BREAK_EVENT(self):
   1961         self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
   1962 
   1963 
   1964 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
   1965 class Win32ListdirTests(unittest.TestCase):
   1966     """Test listdir on Windows."""
   1967 
   1968     def setUp(self):
   1969         self.created_paths = []
   1970         for i in range(2):
   1971             dir_name = 'SUB%d' % i
   1972             dir_path = os.path.join(support.TESTFN, dir_name)
   1973             file_name = 'FILE%d' % i
   1974             file_path = os.path.join(support.TESTFN, file_name)
   1975             os.makedirs(dir_path)
   1976             with open(file_path, 'w') as f:
   1977                 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
   1978             self.created_paths.extend([dir_name, file_name])
   1979         self.created_paths.sort()
   1980 
   1981     def tearDown(self):
   1982         shutil.rmtree(support.TESTFN)
   1983 
   1984     def test_listdir_no_extended_path(self):
   1985         """Test when the path is not an "extended" path."""
   1986         # unicode
   1987         self.assertEqual(
   1988                 sorted(os.listdir(support.TESTFN)),
   1989                 self.created_paths)
   1990 
   1991         # bytes
   1992         self.assertEqual(
   1993                 sorted(os.listdir(os.fsencode(support.TESTFN))),
   1994                 [os.fsencode(path) for path in self.created_paths])
   1995 
   1996     def test_listdir_extended_path(self):
   1997         """Test when the path starts with '\\\\?\\'."""
   1998         # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
   1999         # unicode
   2000         path = '\\\\?\\' + os.path.abspath(support.TESTFN)
   2001         self.assertEqual(
   2002                 sorted(os.listdir(path)),
   2003                 self.created_paths)
   2004 
   2005         # bytes
   2006         path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
   2007         self.assertEqual(
   2008                 sorted(os.listdir(path)),
   2009                 [os.fsencode(path) for path in self.created_paths])
   2010 
   2011 
   2012 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
   2013 @support.skip_unless_symlink
   2014 class Win32SymlinkTests(unittest.TestCase):
   2015     filelink = 'filelinktest'
   2016     filelink_target = os.path.abspath(__file__)
   2017     dirlink = 'dirlinktest'
   2018     dirlink_target = os.path.dirname(filelink_target)
   2019     missing_link = 'missing link'
   2020 
   2021     def setUp(self):
   2022         assert os.path.exists(self.dirlink_target)
   2023         assert os.path.exists(self.filelink_target)
   2024         assert not os.path.exists(self.dirlink)
   2025         assert not os.path.exists(self.filelink)
   2026         assert not os.path.exists(self.missing_link)
   2027 
   2028     def tearDown(self):
   2029         if os.path.exists(self.filelink):
   2030             os.remove(self.filelink)
   2031         if os.path.exists(self.dirlink):
   2032             os.rmdir(self.dirlink)
   2033         if os.path.lexists(self.missing_link):
   2034             os.remove(self.missing_link)
   2035 
   2036     def test_directory_link(self):
   2037         os.symlink(self.dirlink_target, self.dirlink)
   2038         self.assertTrue(os.path.exists(self.dirlink))
   2039         self.assertTrue(os.path.isdir(self.dirlink))
   2040         self.assertTrue(os.path.islink(self.dirlink))
   2041         self.check_stat(self.dirlink, self.dirlink_target)
   2042 
   2043     def test_file_link(self):
   2044         os.symlink(self.filelink_target, self.filelink)
   2045         self.assertTrue(os.path.exists(self.filelink))
   2046         self.assertTrue(os.path.isfile(self.filelink))
   2047         self.assertTrue(os.path.islink(self.filelink))
   2048         self.check_stat(self.filelink, self.filelink_target)
   2049 
   2050     def _create_missing_dir_link(self):
   2051         'Create a "directory" link to a non-existent target'
   2052         linkname = self.missing_link
   2053         if os.path.lexists(linkname):
   2054             os.remove(linkname)
   2055         target = r'c:\\target does not exist.29r3c740'
   2056         assert not os.path.exists(target)
   2057         target_is_dir = True
   2058         os.symlink(target, linkname, target_is_dir)
   2059 
   2060     def test_remove_directory_link_to_missing_target(self):
   2061         self._create_missing_dir_link()
   2062         # For compatibility with Unix, os.remove will check the
   2063         #  directory status and call RemoveDirectory if the symlink
   2064         #  was created with target_is_dir==True.
   2065         os.remove(self.missing_link)
   2066 
   2067     @unittest.skip("currently fails; consider for improvement")
   2068     def test_isdir_on_directory_link_to_missing_target(self):
   2069         self._create_missing_dir_link()
   2070         # consider having isdir return true for directory links
   2071         self.assertTrue(os.path.isdir(self.missing_link))
   2072 
   2073     @unittest.skip("currently fails; consider for improvement")
   2074     def test_rmdir_on_directory_link_to_missing_target(self):
   2075         self._create_missing_dir_link()
   2076         # consider allowing rmdir to remove directory links
   2077         os.rmdir(self.missing_link)
   2078 
   2079     def check_stat(self, link, target):
   2080         self.assertEqual(os.stat(link), os.stat(target))
   2081         self.assertNotEqual(os.lstat(link), os.stat(link))
   2082 
   2083         bytes_link = os.fsencode(link)
   2084         self.assertEqual(os.stat(bytes_link), os.stat(target))
   2085         self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
   2086 
   2087     def test_12084(self):
   2088         level1 = os.path.abspath(support.TESTFN)
   2089         level2 = os.path.join(level1, "level2")
   2090         level3 = os.path.join(level2, "level3")
   2091         self.addCleanup(support.rmtree, level1)
   2092 
   2093         os.mkdir(level1)
   2094         os.mkdir(level2)
   2095         os.mkdir(level3)
   2096 
   2097         file1 = os.path.abspath(os.path.join(level1, "file1"))
   2098         create_file(file1)
   2099 
   2100         orig_dir = os.getcwd()
   2101         try:
   2102             os.chdir(level2)
   2103             link = os.path.join(level2, "link")
   2104             os.symlink(os.path.relpath(file1), "link")
   2105             self.assertIn("link", os.listdir(os.getcwd()))
   2106 
   2107             # Check os.stat calls from the same dir as the link
   2108             self.assertEqual(os.stat(file1), os.stat("link"))
   2109 
   2110             # Check os.stat calls from a dir below the link
   2111             os.chdir(level1)
   2112             self.assertEqual(os.stat(file1),
   2113                              os.stat(os.path.relpath(link)))
   2114 
   2115             # Check os.stat calls from a dir above the link
   2116             os.chdir(level3)
   2117             self.assertEqual(os.stat(file1),
   2118                              os.stat(os.path.relpath(link)))
   2119         finally:
   2120             os.chdir(orig_dir)
   2121 
   2122 
   2123 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
   2124 class Win32JunctionTests(unittest.TestCase):
   2125     junction = 'junctiontest'
   2126     junction_target = os.path.dirname(os.path.abspath(__file__))
   2127 
   2128     def setUp(self):
   2129         assert os.path.exists(self.junction_target)
   2130         assert not os.path.exists(self.junction)
   2131 
   2132     def tearDown(self):
   2133         if os.path.exists(self.junction):
   2134             # os.rmdir delegates to Windows' RemoveDirectoryW,
   2135             # which removes junction points safely.
   2136             os.rmdir(self.junction)
   2137 
   2138     def test_create_junction(self):
   2139         _winapi.CreateJunction(self.junction_target, self.junction)
   2140         self.assertTrue(os.path.exists(self.junction))
   2141         self.assertTrue(os.path.isdir(self.junction))
   2142 
   2143         # Junctions are not recognized as links.
   2144         self.assertFalse(os.path.islink(self.junction))
   2145 
   2146     def test_unlink_removes_junction(self):
   2147         _winapi.CreateJunction(self.junction_target, self.junction)
   2148         self.assertTrue(os.path.exists(self.junction))
   2149 
   2150         os.unlink(self.junction)
   2151         self.assertFalse(os.path.exists(self.junction))
   2152 
   2153 
   2154 @support.skip_unless_symlink
   2155 class NonLocalSymlinkTests(unittest.TestCase):
   2156 
   2157     def setUp(self):
   2158         r"""
   2159         Create this structure:
   2160 
   2161         base
   2162          \___ some_dir
   2163         """
   2164         os.makedirs('base/some_dir')
   2165 
   2166     def tearDown(self):
   2167         shutil.rmtree('base')
   2168 
   2169     def test_directory_link_nonlocal(self):
   2170         """
   2171         The symlink target should resolve relative to the link, not relative
   2172         to the current directory.
   2173 
   2174         Then, link base/some_link -> base/some_dir and ensure that some_link
   2175         is resolved as a directory.
   2176 
   2177         In issue13772, it was discovered that directory detection failed if
   2178         the symlink target was not specified relative to the current
   2179         directory, which was a defect in the implementation.
   2180         """
   2181         src = os.path.join('base', 'some_link')
   2182         os.symlink('some_dir', src)
   2183         assert os.path.isdir(src)
   2184 
   2185 
   2186 class FSEncodingTests(unittest.TestCase):
   2187     def test_nop(self):
   2188         self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
   2189         self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
   2190 
   2191     def test_identity(self):
   2192         # assert fsdecode(fsencode(x)) == x
   2193         for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
   2194             try:
   2195                 bytesfn = os.fsencode(fn)
   2196             except UnicodeEncodeError:
   2197                 continue
   2198             self.assertEqual(os.fsdecode(bytesfn), fn)
   2199 
   2200 
   2201 
   2202 class DeviceEncodingTests(unittest.TestCase):
   2203 
   2204     def test_bad_fd(self):
   2205         # Return None when an fd doesn't actually exist.
   2206         self.assertIsNone(os.device_encoding(123456))
   2207 
   2208     @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
   2209             (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
   2210             'test requires a tty and either Windows or nl_langinfo(CODESET)')
   2211     def test_device_encoding(self):
   2212         encoding = os.device_encoding(0)
   2213         self.assertIsNotNone(encoding)
   2214         self.assertTrue(codecs.lookup(encoding))
   2215 
   2216 
   2217 class PidTests(unittest.TestCase):
   2218     @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
   2219     def test_getppid(self):
   2220         p = subprocess.Popen([sys.executable, '-c',
   2221                               'import os; print(os.getppid())'],
   2222                              stdout=subprocess.PIPE)
   2223         stdout, _ = p.communicate()
   2224         # We are the parent of our subprocess
   2225         self.assertEqual(int(stdout), os.getpid())
   2226 
   2227     def test_waitpid(self):
   2228         args = [sys.executable, '-c', 'pass']
   2229         # Add an implicit test for PyUnicode_FSConverter().
   2230         pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
   2231         status = os.waitpid(pid, 0)
   2232         self.assertEqual(status, (pid, 0))
   2233 
   2234 
   2235 class SpawnTests(unittest.TestCase):
   2236     def create_args(self, *, with_env=False, use_bytes=False):
   2237         self.exitcode = 17
   2238 
   2239         filename = support.TESTFN
   2240         self.addCleanup(support.unlink, filename)
   2241 
   2242         if not with_env:
   2243             code = 'import sys; sys.exit(%s)' % self.exitcode
   2244         else:
   2245             self.env = dict(os.environ)
   2246             # create an unique key
   2247             self.key = str(uuid.uuid4())
   2248             self.env[self.key] = self.key
   2249             # read the variable from os.environ to check that it exists
   2250             code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
   2251                     % (self.key, self.exitcode))
   2252 
   2253         with open(filename, "w") as fp:
   2254             fp.write(code)
   2255 
   2256         args = [sys.executable, filename]
   2257         if use_bytes:
   2258             args = [os.fsencode(a) for a in args]
   2259             self.env = {os.fsencode(k): os.fsencode(v)
   2260                         for k, v in self.env.items()}
   2261 
   2262         return args
   2263 
   2264     @requires_os_func('spawnl')
   2265     def test_spawnl(self):
   2266         args = self.create_args()
   2267         exitcode = os.spawnl(os.P_WAIT, args[0], *args)
   2268         self.assertEqual(exitcode, self.exitcode)
   2269 
   2270     @requires_os_func('spawnle')
   2271     def test_spawnle(self):
   2272         args = self.create_args(with_env=True)
   2273         exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
   2274         self.assertEqual(exitcode, self.exitcode)
   2275 
   2276     @requires_os_func('spawnlp')
   2277     def test_spawnlp(self):
   2278         args = self.create_args()
   2279         exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
   2280         self.assertEqual(exitcode, self.exitcode)
   2281 
   2282     @requires_os_func('spawnlpe')
   2283     def test_spawnlpe(self):
   2284         args = self.create_args(with_env=True)
   2285         exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
   2286         self.assertEqual(exitcode, self.exitcode)
   2287 
   2288     @requires_os_func('spawnv')
   2289     def test_spawnv(self):
   2290         args = self.create_args()
   2291         exitcode = os.spawnv(os.P_WAIT, args[0], args)
   2292         self.assertEqual(exitcode, self.exitcode)
   2293 
   2294     @requires_os_func('spawnve')
   2295     def test_spawnve(self):
   2296         args = self.create_args(with_env=True)
   2297         exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
   2298         self.assertEqual(exitcode, self.exitcode)
   2299 
   2300     @requires_os_func('spawnvp')
   2301     def test_spawnvp(self):
   2302         args = self.create_args()
   2303         exitcode = os.spawnvp(os.P_WAIT, args[0], args)
   2304         self.assertEqual(exitcode, self.exitcode)
   2305 
   2306     @requires_os_func('spawnvpe')
   2307     def test_spawnvpe(self):
   2308         args = self.create_args(with_env=True)
   2309         exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
   2310         self.assertEqual(exitcode, self.exitcode)
   2311 
   2312     @requires_os_func('spawnv')
   2313     def test_nowait(self):
   2314         args = self.create_args()
   2315         pid = os.spawnv(os.P_NOWAIT, args[0], args)
   2316         result = os.waitpid(pid, 0)
   2317         self.assertEqual(result[0], pid)
   2318         status = result[1]
   2319         if hasattr(os, 'WIFEXITED'):
   2320             self.assertTrue(os.WIFEXITED(status))
   2321             self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
   2322         else:
   2323             self.assertEqual(status, self.exitcode << 8)
   2324 
   2325     @requires_os_func('spawnve')
   2326     def test_spawnve_bytes(self):
   2327         # Test bytes handling in parse_arglist and parse_envlist (#28114)
   2328         args = self.create_args(with_env=True, use_bytes=True)
   2329         exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
   2330         self.assertEqual(exitcode, self.exitcode)
   2331 
   2332     @requires_os_func('spawnl')
   2333     def test_spawnl_noargs(self):
   2334         args = self.create_args()
   2335         self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
   2336         self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
   2337 
   2338     @requires_os_func('spawnle')
   2339     def test_spawnle_noargs(self):
   2340         args = self.create_args()
   2341         self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
   2342         self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
   2343 
   2344     @requires_os_func('spawnv')
   2345     def test_spawnv_noargs(self):
   2346         args = self.create_args()
   2347         self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
   2348         self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
   2349         self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
   2350         self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
   2351 
   2352     @requires_os_func('spawnve')
   2353     def test_spawnve_noargs(self):
   2354         args = self.create_args()
   2355         self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
   2356         self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
   2357         self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
   2358         self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
   2359 
   2360 # The introduction of this TestCase caused at least two different errors on
   2361 # *nix buildbots. Temporarily skip this to let the buildbots move along.
   2362 @unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
   2363 @unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
   2364 class LoginTests(unittest.TestCase):
   2365     def test_getlogin(self):
   2366         user_name = os.getlogin()
   2367         self.assertNotEqual(len(user_name), 0)
   2368 
   2369 
   2370 @unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
   2371                      "needs os.getpriority and os.setpriority")
   2372 class ProgramPriorityTests(unittest.TestCase):
   2373     """Tests for os.getpriority() and os.setpriority()."""
   2374 
   2375     def test_set_get_priority(self):
   2376 
   2377         base = os.getpriority(os.PRIO_PROCESS, os.getpid())
   2378         os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
   2379         try:
   2380             new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
   2381             if base >= 19 and new_prio <= 19:
   2382                 raise unittest.SkipTest("unable to reliably test setpriority "
   2383                                         "at current nice level of %s" % base)
   2384             else:
   2385                 self.assertEqual(new_prio, base + 1)
   2386         finally:
   2387             try:
   2388                 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
   2389             except OSError as err:
   2390                 if err.errno != errno.EACCES:
   2391                     raise
   2392 
   2393 
   2394 if threading is not None:
   2395     class SendfileTestServer(asyncore.dispatcher, threading.Thread):
   2396 
   2397         class Handler(asynchat.async_chat):
   2398 
   2399             def __init__(self, conn):
   2400                 asynchat.async_chat.__init__(self, conn)
   2401                 self.in_buffer = []
   2402                 self.closed = False
   2403                 self.push(b"220 ready\r\n")
   2404 
   2405             def handle_read(self):
   2406                 data = self.recv(4096)
   2407                 self.in_buffer.append(data)
   2408 
   2409             def get_data(self):
   2410                 return b''.join(self.in_buffer)
   2411 
   2412             def handle_close(self):
   2413                 self.close()
   2414                 self.closed = True
   2415 
   2416             def handle_error(self):
   2417                 raise
   2418 
   2419         def __init__(self, address):
   2420             threading.Thread.__init__(self)
   2421             asyncore.dispatcher.__init__(self)
   2422             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
   2423             self.bind(address)
   2424             self.listen(5)
   2425             self.host, self.port = self.socket.getsockname()[:2]
   2426             self.handler_instance = None
   2427             self._active = False
   2428             self._active_lock = threading.Lock()
   2429 
   2430         # --- public API
   2431 
   2432         @property
   2433         def running(self):
   2434             return self._active
   2435 
   2436         def start(self):
   2437             assert not self.running
   2438             self.__flag = threading.Event()
   2439             threading.Thread.start(self)
   2440             self.__flag.wait()
   2441 
   2442         def stop(self):
   2443             assert self.running
   2444             self._active = False
   2445             self.join()
   2446 
   2447         def wait(self):
   2448             # wait for handler connection to be closed, then stop the server
   2449             while not getattr(self.handler_instance, "closed", False):
   2450                 time.sleep(0.001)
   2451             self.stop()
   2452 
   2453         # --- internals
   2454 
   2455         def run(self):
   2456             self._active = True
   2457             self.__flag.set()
   2458             while self._active and asyncore.socket_map:
   2459                 self._active_lock.acquire()
   2460                 asyncore.loop(timeout=0.001, count=1)
   2461                 self._active_lock.release()
   2462             asyncore.close_all()
   2463 
   2464         def handle_accept(self):
   2465             conn, addr = self.accept()
   2466             self.handler_instance = self.Handler(conn)
   2467 
   2468         def handle_connect(self):
   2469             self.close()
   2470         handle_read = handle_connect
   2471 
   2472         def writable(self):
   2473             return 0
   2474 
   2475         def handle_error(self):
   2476             raise
   2477 
   2478 
   2479 @unittest.skipUnless(threading is not None, "test needs threading module")
   2480 @unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
   2481 class TestSendfile(unittest.TestCase):
   2482 
   2483     DATA = b"12345abcde" * 16 * 1024  # 160 KB
   2484     SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
   2485                                not sys.platform.startswith("solaris") and \
   2486                                not sys.platform.startswith("sunos")
   2487     requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
   2488             'requires headers and trailers support')
   2489 
   2490     @classmethod
   2491     def setUpClass(cls):
   2492         cls.key = support.threading_setup()
   2493         create_file(support.TESTFN, cls.DATA)
   2494 
   2495     @classmethod
   2496     def tearDownClass(cls):
   2497         support.threading_cleanup(*cls.key)
   2498         support.unlink(support.TESTFN)
   2499 
   2500     def setUp(self):
   2501         self.server = SendfileTestServer((support.HOST, 0))
   2502         self.server.start()
   2503         self.client = socket.socket()
   2504         self.client.connect((self.server.host, self.server.port))
   2505         self.client.settimeout(1)
   2506         # synchronize by waiting for "220 ready" response
   2507         self.client.recv(1024)
   2508         self.sockno = self.client.fileno()
   2509         self.file = open(support.TESTFN, 'rb')
   2510         self.fileno = self.file.fileno()
   2511 
   2512     def tearDown(self):
   2513         self.file.close()
   2514         self.client.close()
   2515         if self.server.running:
   2516             self.server.stop()
   2517 
   2518     def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
   2519         """A higher level wrapper representing how an application is
   2520         supposed to use sendfile().
   2521         """
   2522         while 1:
   2523             try:
   2524                 if self.SUPPORT_HEADERS_TRAILERS:
   2525                     return os.sendfile(sock, file, offset, nbytes, headers,
   2526                                        trailers)
   2527                 else:
   2528                     return os.sendfile(sock, file, offset, nbytes)
   2529             except OSError as err:
   2530                 if err.errno == errno.ECONNRESET:
   2531                     # disconnected
   2532                     raise
   2533                 elif err.errno in (errno.EAGAIN, errno.EBUSY):
   2534                     # we have to retry send data
   2535                     continue
   2536                 else:
   2537                     raise
   2538 
   2539     def test_send_whole_file(self):
   2540         # normal send
   2541         total_sent = 0
   2542         offset = 0
   2543         nbytes = 4096
   2544         while total_sent < len(self.DATA):
   2545             sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
   2546             if sent == 0:
   2547                 break
   2548             offset += sent
   2549             total_sent += sent
   2550             self.assertTrue(sent <= nbytes)
   2551             self.assertEqual(offset, total_sent)
   2552 
   2553         self.assertEqual(total_sent, len(self.DATA))
   2554         self.client.shutdown(socket.SHUT_RDWR)
   2555         self.client.close()
   2556         self.server.wait()
   2557         data = self.server.handler_instance.get_data()
   2558         self.assertEqual(len(data), len(self.DATA))
   2559         self.assertEqual(data, self.DATA)
   2560 
   2561     def test_send_at_certain_offset(self):
   2562         # start sending a file at a certain offset
   2563         total_sent = 0
   2564         offset = len(self.DATA) // 2
   2565         must_send = len(self.DATA) - offset
   2566         nbytes = 4096
   2567         while total_sent < must_send:
   2568             sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
   2569             if sent == 0:
   2570                 break
   2571             offset += sent
   2572             total_sent += sent
   2573             self.assertTrue(sent <= nbytes)
   2574 
   2575         self.client.shutdown(socket.SHUT_RDWR)
   2576         self.client.close()
   2577         self.server.wait()
   2578         data = self.server.handler_instance.get_data()
   2579         expected = self.DATA[len(self.DATA) // 2:]
   2580         self.assertEqual(total_sent, len(expected))
   2581         self.assertEqual(len(data), len(expected))
   2582         self.assertEqual(data, expected)
   2583 
   2584     def test_offset_overflow(self):
   2585         # specify an offset > file size
   2586         offset = len(self.DATA) + 4096
   2587         try:
   2588             sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
   2589         except OSError as e:
   2590             # Solaris can raise EINVAL if offset >= file length, ignore.
   2591             if e.errno != errno.EINVAL:
   2592                 raise
   2593         else:
   2594             self.assertEqual(sent, 0)
   2595         self.client.shutdown(socket.SHUT_RDWR)
   2596         self.client.close()
   2597         self.server.wait()
   2598         data = self.server.handler_instance.get_data()
   2599         self.assertEqual(data, b'')
   2600 
   2601     def test_invalid_offset(self):
   2602         with self.assertRaises(OSError) as cm:
   2603             os.sendfile(self.sockno, self.fileno, -1, 4096)
   2604         self.assertEqual(cm.exception.errno, errno.EINVAL)
   2605 
   2606     def test_keywords(self):
   2607         # Keyword arguments should be supported
   2608         os.sendfile(out=self.sockno, offset=0, count=4096,
   2609             **{'in': self.fileno})
   2610         if self.SUPPORT_HEADERS_TRAILERS:
   2611             os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
   2612                 headers=(), trailers=(), flags=0)
   2613 
   2614     # --- headers / trailers tests
   2615 
   2616     @requires_headers_trailers
   2617     def test_headers(self):
   2618         total_sent = 0
   2619         sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
   2620                             headers=[b"x" * 512])
   2621         total_sent += sent
   2622         offset = 4096
   2623         nbytes = 4096
   2624         while 1:
   2625             sent = self.sendfile_wrapper(self.sockno, self.fileno,
   2626                                                     offset, nbytes)
   2627             if sent == 0:
   2628                 break
   2629             total_sent += sent
   2630             offset += sent
   2631 
   2632         expected_data = b"x" * 512 + self.DATA
   2633         self.assertEqual(total_sent, len(expected_data))
   2634         self.client.close()
   2635         self.server.wait()
   2636         data = self.server.handler_instance.get_data()
   2637         self.assertEqual(hash(data), hash(expected_data))
   2638 
   2639     @requires_headers_trailers
   2640     def test_trailers(self):
   2641         TESTFN2 = support.TESTFN + "2"
   2642         file_data = b"abcdef"
   2643 
   2644         self.addCleanup(support.unlink, TESTFN2)
   2645         create_file(TESTFN2, file_data)
   2646 
   2647         with open(TESTFN2, 'rb') as f:
   2648             os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
   2649                         trailers=[b"1234"])
   2650             self.client.close()
   2651             self.server.wait()
   2652             data = self.server.handler_instance.get_data()
   2653             self.assertEqual(data, b"abcdef1234")
   2654 
   2655     @requires_headers_trailers
   2656     @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
   2657                          'test needs os.SF_NODISKIO')
   2658     def test_flags(self):
   2659         try:
   2660             os.sendfile(self.sockno, self.fileno, 0, 4096,
   2661                         flags=os.SF_NODISKIO)
   2662         except OSError as err:
   2663             if err.errno not in (errno.EBUSY, errno.EAGAIN):
   2664                 raise
   2665 
   2666 
   2667 def supports_extended_attributes():
   2668     if not hasattr(os, "setxattr"):
   2669         return False
   2670 
   2671     try:
   2672         with open(support.TESTFN, "xb", 0) as fp:
   2673             try:
   2674                 os.setxattr(fp.fileno(), b"user.test", b"")
   2675             except OSError:
   2676                 return False
   2677     finally:
   2678         support.unlink(support.TESTFN)
   2679 
   2680     return True
   2681 
   2682 
   2683 @unittest.skipUnless(supports_extended_attributes(),
   2684                      "no non-broken extended attribute support")
   2685 # Kernels < 2.6.39 don't respect setxattr flags.
   2686 @support.requires_linux_version(2, 6, 39)
   2687 class ExtendedAttributeTests(unittest.TestCase):
   2688 
   2689     def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
   2690         fn = support.TESTFN
   2691         self.addCleanup(support.unlink, fn)
   2692         create_file(fn)
   2693 
   2694         with self.assertRaises(OSError) as cm:
   2695             getxattr(fn, s("user.test"), **kwargs)
   2696         self.assertEqual(cm.exception.errno, errno.ENODATA)
   2697 
   2698         init_xattr = listxattr(fn)
   2699         self.assertIsInstance(init_xattr, list)
   2700 
   2701         setxattr(fn, s("user.test"), b"", **kwargs)
   2702         xattr = set(init_xattr)
   2703         xattr.add("user.test")
   2704         self.assertEqual(set(listxattr(fn)), xattr)
   2705         self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
   2706         setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
   2707         self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
   2708 
   2709         with self.assertRaises(OSError) as cm:
   2710             setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
   2711         self.assertEqual(cm.exception.errno, errno.EEXIST)
   2712 
   2713         with self.assertRaises(OSError) as cm:
   2714             setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
   2715         self.assertEqual(cm.exception.errno, errno.ENODATA)
   2716 
   2717         setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
   2718         xattr.add("user.test2")
   2719         self.assertEqual(set(listxattr(fn)), xattr)
   2720         removexattr(fn, s("user.test"), **kwargs)
   2721 
   2722         with self.assertRaises(OSError) as cm:
   2723             getxattr(fn, s("user.test"), **kwargs)
   2724         self.assertEqual(cm.exception.errno, errno.ENODATA)
   2725 
   2726         xattr.remove("user.test")
   2727         self.assertEqual(set(listxattr(fn)), xattr)
   2728         self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
   2729         setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
   2730         self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
   2731         removexattr(fn, s("user.test"), **kwargs)
   2732         many = sorted("user.test{}".format(i) for i in range(100))
   2733         for thing in many:
   2734             setxattr(fn, thing, b"x", **kwargs)
   2735         self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
   2736 
   2737     def _check_xattrs(self, *args, **kwargs):
   2738         self._check_xattrs_str(str, *args, **kwargs)
   2739         support.unlink(support.TESTFN)
   2740 
   2741         self._check_xattrs_str(os.fsencode, *args, **kwargs)
   2742         support.unlink(support.TESTFN)
   2743 
   2744     def test_simple(self):
   2745         self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
   2746                            os.listxattr)
   2747 
   2748     def test_lpath(self):
   2749         self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
   2750                            os.listxattr, follow_symlinks=False)
   2751 
   2752     def test_fds(self):
   2753         def getxattr(path, *args):
   2754             with open(path, "rb") as fp:
   2755                 return os.getxattr(fp.fileno(), *args)
   2756         def setxattr(path, *args):
   2757             with open(path, "wb", 0) as fp:
   2758                 os.setxattr(fp.fileno(), *args)
   2759         def removexattr(path, *args):
   2760             with open(path, "wb", 0) as fp:
   2761                 os.removexattr(fp.fileno(), *args)
   2762         def listxattr(path, *args):
   2763             with open(path, "rb") as fp:
   2764                 return os.listxattr(fp.fileno(), *args)
   2765         self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
   2766 
   2767 
   2768 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
   2769 class TermsizeTests(unittest.TestCase):
   2770     def test_does_not_crash(self):
   2771         """Check if get_terminal_size() returns a meaningful value.
   2772 
   2773         There's no easy portable way to actually check the size of the
   2774         terminal, so let's check if it returns something sensible instead.
   2775         """
   2776         try:
   2777             size = os.get_terminal_size()
   2778         except OSError as e:
   2779             if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
   2780                 # Under win32 a generic OSError can be thrown if the
   2781                 # handle cannot be retrieved
   2782                 self.skipTest("failed to query terminal size")
   2783             raise
   2784 
   2785         self.assertGreaterEqual(size.columns, 0)
   2786         self.assertGreaterEqual(size.lines, 0)
   2787 
   2788     def test_stty_match(self):
   2789         """Check if stty returns the same results
   2790 
   2791         stty actually tests stdin, so get_terminal_size is invoked on
   2792         stdin explicitly. If stty succeeded, then get_terminal_size()
   2793         should work too.
   2794         """
   2795         try:
   2796             size = subprocess.check_output(['stty', 'size']).decode().split()
   2797         except (FileNotFoundError, subprocess.CalledProcessError):
   2798             self.skipTest("stty invocation failed")
   2799         expected = (int(size[1]), int(size[0])) # reversed order
   2800 
   2801         try:
   2802             actual = os.get_terminal_size(sys.__stdin__.fileno())
   2803         except OSError as e:
   2804             if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
   2805                 # Under win32 a generic OSError can be thrown if the
   2806                 # handle cannot be retrieved
   2807                 self.skipTest("failed to query terminal size")
   2808             raise
   2809         self.assertEqual(expected, actual)
   2810 
   2811 
   2812 class OSErrorTests(unittest.TestCase):
   2813     def setUp(self):
   2814         class Str(str):
   2815             pass
   2816 
   2817         self.bytes_filenames = []
   2818         self.unicode_filenames = []
   2819         if support.TESTFN_UNENCODABLE is not None:
   2820             decoded = support.TESTFN_UNENCODABLE
   2821         else:
   2822             decoded = support.TESTFN
   2823         self.unicode_filenames.append(decoded)
   2824         self.unicode_filenames.append(Str(decoded))
   2825         if support.TESTFN_UNDECODABLE is not None:
   2826             encoded = support.TESTFN_UNDECODABLE
   2827         else:
   2828             encoded = os.fsencode(support.TESTFN)
   2829         self.bytes_filenames.append(encoded)
   2830         self.bytes_filenames.append(bytearray(encoded))
   2831         self.bytes_filenames.append(memoryview(encoded))
   2832 
   2833         self.filenames = self.bytes_filenames + self.unicode_filenames
   2834 
   2835     def test_oserror_filename(self):
   2836         funcs = [
   2837             (self.filenames, os.chdir,),
   2838             (self.filenames, os.chmod, 0o777),
   2839             (self.filenames, os.lstat,),
   2840             (self.filenames, os.open, os.O_RDONLY),
   2841             (self.filenames, os.rmdir,),
   2842             (self.filenames, os.stat,),
   2843             (self.filenames, os.unlink,),
   2844         ]
   2845         if sys.platform == "win32":
   2846             funcs.extend((
   2847                 (self.bytes_filenames, os.rename, b"dst"),
   2848                 (self.bytes_filenames, os.replace, b"dst"),
   2849                 (self.unicode_filenames, os.rename, "dst"),
   2850                 (self.unicode_filenames, os.replace, "dst"),
   2851                 (self.unicode_filenames, os.listdir, ),
   2852             ))
   2853         else:
   2854             funcs.extend((
   2855                 (self.filenames, os.listdir,),
   2856                 (self.filenames, os.rename, "dst"),
   2857                 (self.filenames, os.replace, "dst"),
   2858             ))
   2859         if hasattr(os, "chown"):
   2860             funcs.append((self.filenames, os.chown, 0, 0))
   2861         if hasattr(os, "lchown"):
   2862             funcs.append((self.filenames, os.lchown, 0, 0))
   2863         if hasattr(os, "truncate"):
   2864             funcs.append((self.filenames, os.truncate, 0))
   2865         if hasattr(os, "chflags"):
   2866             funcs.append((self.filenames, os.chflags, 0))
   2867         if hasattr(os, "lchflags"):
   2868             funcs.append((self.filenames, os.lchflags, 0))
   2869         if hasattr(os, "chroot"):
   2870             funcs.append((self.filenames, os.chroot,))
   2871         if hasattr(os, "link"):
   2872             if sys.platform == "win32":
   2873                 funcs.append((self.bytes_filenames, os.link, b"dst"))
   2874                 funcs.append((self.unicode_filenames, os.link, "dst"))
   2875             else:
   2876                 funcs.append((self.filenames, os.link, "dst"))
   2877         if hasattr(os, "listxattr"):
   2878             funcs.extend((
   2879                 (self.filenames, os.listxattr,),
   2880                 (self.filenames, os.getxattr, "user.test"),
   2881                 (self.filenames, os.setxattr, "user.test", b'user'),
   2882                 (self.filenames, os.removexattr, "user.test"),
   2883             ))
   2884         if hasattr(os, "lchmod"):
   2885             funcs.append((self.filenames, os.lchmod, 0o777))
   2886         if hasattr(os, "readlink"):
   2887             if sys.platform == "win32":
   2888                 funcs.append((self.unicode_filenames, os.readlink,))
   2889             else:
   2890                 funcs.append((self.filenames, os.readlink,))
   2891 
   2892 
   2893         for filenames, func, *func_args in funcs:
   2894             for name in filenames:
   2895                 try:
   2896                     if isinstance(name, (str, bytes)):
   2897                         func(name, *func_args)
   2898                     else:
   2899                         with self.assertWarnsRegex(DeprecationWarning, 'should be'):
   2900                             func(name, *func_args)
   2901                 except OSError as err:
   2902                     self.assertIs(err.filename, name, str(func))
   2903                 except UnicodeDecodeError:
   2904                     pass
   2905                 else:
   2906                     self.fail("No exception thrown by {}".format(func))
   2907 
   2908 class CPUCountTests(unittest.TestCase):
   2909     def test_cpu_count(self):
   2910         cpus = os.cpu_count()
   2911         if cpus is not None:
   2912             self.assertIsInstance(cpus, int)
   2913             self.assertGreater(cpus, 0)
   2914         else:
   2915             self.skipTest("Could not determine the number of CPUs")
   2916 
   2917 
   2918 class FDInheritanceTests(unittest.TestCase):
   2919     def test_get_set_inheritable(self):
   2920         fd = os.open(__file__, os.O_RDONLY)
   2921         self.addCleanup(os.close, fd)
   2922         self.assertEqual(os.get_inheritable(fd), False)
   2923 
   2924         os.set_inheritable(fd, True)
   2925         self.assertEqual(os.get_inheritable(fd), True)
   2926 
   2927     @unittest.skipIf(fcntl is None, "need fcntl")
   2928     def test_get_inheritable_cloexec(self):
   2929         fd = os.open(__file__, os.O_RDONLY)
   2930         self.addCleanup(os.close, fd)
   2931         self.assertEqual(os.get_inheritable(fd), False)
   2932 
   2933         # clear FD_CLOEXEC flag
   2934         flags = fcntl.fcntl(fd, fcntl.F_GETFD)
   2935         flags &= ~fcntl.FD_CLOEXEC
   2936         fcntl.fcntl(fd, fcntl.F_SETFD, flags)
   2937 
   2938         self.assertEqual(os.get_inheritable(fd), True)
   2939 
   2940     @unittest.skipIf(fcntl is None, "need fcntl")
   2941     def test_set_inheritable_cloexec(self):
   2942         fd = os.open(__file__, os.O_RDONLY)
   2943         self.addCleanup(os.close, fd)
   2944         self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
   2945                          fcntl.FD_CLOEXEC)
   2946 
   2947         os.set_inheritable(fd, True)
   2948         self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
   2949                          0)
   2950 
   2951     def test_open(self):
   2952         fd = os.open(__file__, os.O_RDONLY)
   2953         self.addCleanup(os.close, fd)
   2954         self.assertEqual(os.get_inheritable(fd), False)
   2955 
   2956     @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
   2957     def test_pipe(self):
   2958         rfd, wfd = os.pipe()
   2959         self.addCleanup(os.close, rfd)
   2960         self.addCleanup(os.close, wfd)
   2961         self.assertEqual(os.get_inheritable(rfd), False)
   2962         self.assertEqual(os.get_inheritable(wfd), False)
   2963 
   2964     def test_dup(self):
   2965         fd1 = os.open(__file__, os.O_RDONLY)
   2966         self.addCleanup(os.close, fd1)
   2967 
   2968         fd2 = os.dup(fd1)
   2969         self.addCleanup(os.close, fd2)
   2970         self.assertEqual(os.get_inheritable(fd2), False)
   2971 
   2972     @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
   2973     def test_dup2(self):
   2974         fd = os.open(__file__, os.O_RDONLY)
   2975         self.addCleanup(os.close, fd)
   2976 
   2977         # inheritable by default
   2978         fd2 = os.open(__file__, os.O_RDONLY)
   2979         try:
   2980             os.dup2(fd, fd2)
   2981             self.assertEqual(os.get_inheritable(fd2), True)
   2982         finally:
   2983             os.close(fd2)
   2984 
   2985         # force non-inheritable
   2986         fd3 = os.open(__file__, os.O_RDONLY)
   2987         try:
   2988             os.dup2(fd, fd3, inheritable=False)
   2989             self.assertEqual(os.get_inheritable(fd3), False)
   2990         finally:
   2991             os.close(fd3)
   2992 
   2993     @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
   2994     def test_openpty(self):
   2995         master_fd, slave_fd = os.openpty()
   2996         self.addCleanup(os.close, master_fd)
   2997         self.addCleanup(os.close, slave_fd)
   2998         self.assertEqual(os.get_inheritable(master_fd), False)
   2999         self.assertEqual(os.get_inheritable(slave_fd), False)
   3000 
   3001 
   3002 class PathTConverterTests(unittest.TestCase):
   3003     # tuples of (function name, allows fd arguments, additional arguments to
   3004     # function, cleanup function)
   3005     functions = [
   3006         ('stat', True, (), None),
   3007         ('lstat', False, (), None),
   3008         ('access', False, (os.F_OK,), None),
   3009         ('chflags', False, (0,), None),
   3010         ('lchflags', False, (0,), None),
   3011         ('open', False, (0,), getattr(os, 'close', None)),
   3012     ]
   3013 
   3014     def test_path_t_converter(self):
   3015         str_filename = support.TESTFN
   3016         if os.name == 'nt':
   3017             bytes_fspath = bytes_filename = None
   3018         else:
   3019             bytes_filename = support.TESTFN.encode('ascii')
   3020             bytes_fspath = _PathLike(bytes_filename)
   3021         fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
   3022         self.addCleanup(support.unlink, support.TESTFN)
   3023         self.addCleanup(os.close, fd)
   3024 
   3025         int_fspath = _PathLike(fd)
   3026         str_fspath = _PathLike(str_filename)
   3027 
   3028         for name, allow_fd, extra_args, cleanup_fn in self.functions:
   3029             with self.subTest(name=name):
   3030                 try:
   3031                     fn = getattr(os, name)
   3032                 except AttributeError:
   3033                     continue
   3034 
   3035                 for path in (str_filename, bytes_filename, str_fspath,
   3036                              bytes_fspath):
   3037                     if path is None:
   3038                         continue
   3039                     with self.subTest(name=name, path=path):
   3040                         result = fn(path, *extra_args)
   3041                         if cleanup_fn is not None:
   3042                             cleanup_fn(result)
   3043 
   3044                 with self.assertRaisesRegex(
   3045                         TypeError, 'should be string, bytes'):
   3046                     fn(int_fspath, *extra_args)
   3047 
   3048                 if allow_fd:
   3049                     result = fn(fd, *extra_args)  # should not fail
   3050                     if cleanup_fn is not None:
   3051                         cleanup_fn(result)
   3052                 else:
   3053                     with self.assertRaisesRegex(
   3054                             TypeError,
   3055                             'os.PathLike'):
   3056                         fn(fd, *extra_args)
   3057 
   3058 
   3059 @unittest.skipUnless(hasattr(os, 'get_blocking'),
   3060                      'needs os.get_blocking() and os.set_blocking()')
   3061 class BlockingTests(unittest.TestCase):
   3062     def test_blocking(self):
   3063         fd = os.open(__file__, os.O_RDONLY)
   3064         self.addCleanup(os.close, fd)
   3065         self.assertEqual(os.get_blocking(fd), True)
   3066 
   3067         os.set_blocking(fd, False)
   3068         self.assertEqual(os.get_blocking(fd), False)
   3069 
   3070         os.set_blocking(fd, True)
   3071         self.assertEqual(os.get_blocking(fd), True)
   3072 
   3073 
   3074 
   3075 class ExportsTests(unittest.TestCase):
   3076     def test_os_all(self):
   3077         self.assertIn('open', os.__all__)
   3078         self.assertIn('walk', os.__all__)
   3079 
   3080 
   3081 class TestScandir(unittest.TestCase):
   3082     check_no_resource_warning = support.check_no_resource_warning
   3083 
   3084     def setUp(self):
   3085         self.path = os.path.realpath(support.TESTFN)
   3086         self.bytes_path = os.fsencode(self.path)
   3087         self.addCleanup(support.rmtree, self.path)
   3088         os.mkdir(self.path)
   3089 
   3090     def create_file(self, name="file.txt"):
   3091         path = self.bytes_path if isinstance(name, bytes) else self.path
   3092         filename = os.path.join(path, name)
   3093         create_file(filename, b'python')
   3094         return filename
   3095 
   3096     def get_entries(self, names):
   3097         entries = dict((entry.name, entry)
   3098                        for entry in os.scandir(self.path))
   3099         self.assertEqual(sorted(entries.keys()), names)
   3100         return entries
   3101 
   3102     def assert_stat_equal(self, stat1, stat2, skip_fields):
   3103         if skip_fields:
   3104             for attr in dir(stat1):
   3105                 if not attr.startswith("st_"):
   3106                     continue
   3107                 if attr in ("st_dev", "st_ino", "st_nlink"):
   3108                     continue
   3109                 self.assertEqual(getattr(stat1, attr),
   3110                                  getattr(stat2, attr),
   3111                                  (stat1, stat2, attr))
   3112         else:
   3113             self.assertEqual(stat1, stat2)
   3114 
   3115     def check_entry(self, entry, name, is_dir, is_file, is_symlink):
   3116         self.assertIsInstance(entry, os.DirEntry)
   3117         self.assertEqual(entry.name, name)
   3118         self.assertEqual(entry.path, os.path.join(self.path, name))
   3119         self.assertEqual(entry.inode(),
   3120                          os.stat(entry.path, follow_symlinks=False).st_ino)
   3121 
   3122         entry_stat = os.stat(entry.path)
   3123         self.assertEqual(entry.is_dir(),
   3124                          stat.S_ISDIR(entry_stat.st_mode))
   3125         self.assertEqual(entry.is_file(),
   3126                          stat.S_ISREG(entry_stat.st_mode))
   3127         self.assertEqual(entry.is_symlink(),
   3128                          os.path.islink(entry.path))
   3129 
   3130         entry_lstat = os.stat(entry.path, follow_symlinks=False)
   3131         self.assertEqual(entry.is_dir(follow_symlinks=False),
   3132                          stat.S_ISDIR(entry_lstat.st_mode))
   3133         self.assertEqual(entry.is_file(follow_symlinks=False),
   3134                          stat.S_ISREG(entry_lstat.st_mode))
   3135 
   3136         self.assert_stat_equal(entry.stat(),
   3137                                entry_stat,
   3138                                os.name == 'nt' and not is_symlink)
   3139         self.assert_stat_equal(entry.stat(follow_symlinks=False),
   3140                                entry_lstat,
   3141                                os.name == 'nt')
   3142 
   3143     def test_attributes(self):
   3144         link = hasattr(os, 'link')
   3145         symlink = support.can_symlink()
   3146 
   3147         dirname = os.path.join(self.path, "dir")
   3148         os.mkdir(dirname)
   3149         filename = self.create_file("file.txt")
   3150         if link:
   3151             os.link(filename, os.path.join(self.path, "link_file.txt"))
   3152         if symlink:
   3153             os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
   3154                        target_is_directory=True)
   3155             os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
   3156 
   3157         names = ['dir', 'file.txt']
   3158         if link:
   3159             names.append('link_file.txt')
   3160         if symlink:
   3161             names.extend(('symlink_dir', 'symlink_file.txt'))
   3162         entries = self.get_entries(names)
   3163 
   3164         entry = entries['dir']
   3165         self.check_entry(entry, 'dir', True, False, False)
   3166 
   3167         entry = entries['file.txt']
   3168         self.check_entry(entry, 'file.txt', False, True, False)
   3169 
   3170         if link:
   3171             entry = entries['link_file.txt']
   3172             self.check_entry(entry, 'link_file.txt', False, True, False)
   3173 
   3174         if symlink:
   3175             entry = entries['symlink_dir']
   3176             self.check_entry(entry, 'symlink_dir', True, False, True)
   3177 
   3178             entry = entries['symlink_file.txt']
   3179             self.check_entry(entry, 'symlink_file.txt', False, True, True)
   3180 
   3181     def get_entry(self, name):
   3182         path = self.bytes_path if isinstance(name, bytes) else self.path
   3183         entries = list(os.scandir(path))
   3184         self.assertEqual(len(entries), 1)
   3185 
   3186         entry = entries[0]
   3187         self.assertEqual(entry.name, name)
   3188         return entry
   3189 
   3190     def create_file_entry(self, name='file.txt'):
   3191         filename = self.create_file(name=name)
   3192         return self.get_entry(os.path.basename(filename))
   3193 
   3194     def test_current_directory(self):
   3195         filename = self.create_file()
   3196         old_dir = os.getcwd()
   3197         try:
   3198             os.chdir(self.path)
   3199 
   3200             # call scandir() without parameter: it must list the content
   3201             # of the current directory
   3202             entries = dict((entry.name, entry) for entry in os.scandir())
   3203             self.assertEqual(sorted(entries.keys()),
   3204                              [os.path.basename(filename)])
   3205         finally:
   3206             os.chdir(old_dir)
   3207 
   3208     def test_repr(self):
   3209         entry = self.create_file_entry()
   3210         self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
   3211 
   3212     def test_fspath_protocol(self):
   3213         entry = self.create_file_entry()
   3214         self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
   3215 
   3216     def test_fspath_protocol_bytes(self):
   3217         bytes_filename = os.fsencode('bytesfile.txt')
   3218         bytes_entry = self.create_file_entry(name=bytes_filename)
   3219         fspath = os.fspath(bytes_entry)
   3220         self.assertIsInstance(fspath, bytes)
   3221         self.assertEqual(fspath,
   3222                          os.path.join(os.fsencode(self.path),bytes_filename))
   3223 
   3224     def test_removed_dir(self):
   3225         path = os.path.join(self.path, 'dir')
   3226 
   3227         os.mkdir(path)
   3228         entry = self.get_entry('dir')
   3229         os.rmdir(path)
   3230 
   3231         # On POSIX, is_dir() result depends if scandir() filled d_type or not
   3232         if os.name == 'nt':
   3233             self.assertTrue(entry.is_dir())
   3234         self.assertFalse(entry.is_file())
   3235         self.assertFalse(entry.is_symlink())
   3236         if os.name == 'nt':
   3237             self.assertRaises(FileNotFoundError, entry.inode)
   3238             # don't fail
   3239             entry.stat()
   3240             entry.stat(follow_symlinks=False)
   3241         else:
   3242             self.assertGreater(entry.inode(), 0)
   3243             self.assertRaises(FileNotFoundError, entry.stat)
   3244             self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
   3245 
   3246     def test_removed_file(self):
   3247         entry = self.create_file_entry()
   3248         os.unlink(entry.path)
   3249 
   3250         self.assertFalse(entry.is_dir())
   3251         # On POSIX, is_dir() result depends if scandir() filled d_type or not
   3252         if os.name == 'nt':
   3253             self.assertTrue(entry.is_file())
   3254         self.assertFalse(entry.is_symlink())
   3255         if os.name == 'nt':
   3256             self.assertRaises(FileNotFoundError, entry.inode)
   3257             # don't fail
   3258             entry.stat()
   3259             entry.stat(follow_symlinks=False)
   3260         else:
   3261             self.assertGreater(entry.inode(), 0)
   3262             self.assertRaises(FileNotFoundError, entry.stat)
   3263             self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
   3264 
   3265     def test_broken_symlink(self):
   3266         if not support.can_symlink():
   3267             return self.skipTest('cannot create symbolic link')
   3268 
   3269         filename = self.create_file("file.txt")
   3270         os.symlink(filename,
   3271                    os.path.join(self.path, "symlink.txt"))
   3272         entries = self.get_entries(['file.txt', 'symlink.txt'])
   3273         entry = entries['symlink.txt']
   3274         os.unlink(filename)
   3275 
   3276         self.assertGreater(entry.inode(), 0)
   3277         self.assertFalse(entry.is_dir())
   3278         self.assertFalse(entry.is_file())  # broken symlink returns False
   3279         self.assertFalse(entry.is_dir(follow_symlinks=False))
   3280         self.assertFalse(entry.is_file(follow_symlinks=False))
   3281         self.assertTrue(entry.is_symlink())
   3282         self.assertRaises(FileNotFoundError, entry.stat)
   3283         # don't fail
   3284         entry.stat(follow_symlinks=False)
   3285 
   3286     def test_bytes(self):
   3287         self.create_file("file.txt")
   3288 
   3289         path_bytes = os.fsencode(self.path)
   3290         entries = list(os.scandir(path_bytes))
   3291         self.assertEqual(len(entries), 1, entries)
   3292         entry = entries[0]
   3293 
   3294         self.assertEqual(entry.name, b'file.txt')
   3295         self.assertEqual(entry.path,
   3296                          os.fsencode(os.path.join(self.path, 'file.txt')))
   3297 
   3298     def test_empty_path(self):
   3299         self.assertRaises(FileNotFoundError, os.scandir, '')
   3300 
   3301     def test_consume_iterator_twice(self):
   3302         self.create_file("file.txt")
   3303         iterator = os.scandir(self.path)
   3304 
   3305         entries = list(iterator)
   3306         self.assertEqual(len(entries), 1, entries)
   3307 
   3308         # check than consuming the iterator twice doesn't raise exception
   3309         entries2 = list(iterator)
   3310         self.assertEqual(len(entries2), 0, entries2)
   3311 
   3312     def test_bad_path_type(self):
   3313         for obj in [1234, 1.234, {}, []]:
   3314             self.assertRaises(TypeError, os.scandir, obj)
   3315 
   3316     def test_close(self):
   3317         self.create_file("file.txt")
   3318         self.create_file("file2.txt")
   3319         iterator = os.scandir(self.path)
   3320         next(iterator)
   3321         iterator.close()
   3322         # multiple closes
   3323         iterator.close()
   3324         with self.check_no_resource_warning():
   3325             del iterator
   3326 
   3327     def test_context_manager(self):
   3328         self.create_file("file.txt")
   3329         self.create_file("file2.txt")
   3330         with os.scandir(self.path) as iterator:
   3331             next(iterator)
   3332         with self.check_no_resource_warning():
   3333             del iterator
   3334 
   3335     def test_context_manager_close(self):
   3336         self.create_file("file.txt")
   3337         self.create_file("file2.txt")
   3338         with os.scandir(self.path) as iterator:
   3339             next(iterator)
   3340             iterator.close()
   3341 
   3342     def test_context_manager_exception(self):
   3343         self.create_file("file.txt")
   3344         self.create_file("file2.txt")
   3345         with self.assertRaises(ZeroDivisionError):
   3346             with os.scandir(self.path) as iterator:
   3347                 next(iterator)
   3348                 1/0
   3349         with self.check_no_resource_warning():
   3350             del iterator
   3351 
   3352     def test_resource_warning(self):
   3353         self.create_file("file.txt")
   3354         self.create_file("file2.txt")
   3355         iterator = os.scandir(self.path)
   3356         next(iterator)
   3357         with self.assertWarns(ResourceWarning):
   3358             del iterator
   3359             support.gc_collect()
   3360         # exhausted iterator
   3361         iterator = os.scandir(self.path)
   3362         list(iterator)
   3363         with self.check_no_resource_warning():
   3364             del iterator
   3365 
   3366 
   3367 class TestPEP519(unittest.TestCase):
   3368 
   3369     # Abstracted so it can be overridden to test pure Python implementation
   3370     # if a C version is provided.
   3371     fspath = staticmethod(os.fspath)
   3372 
   3373     def test_return_bytes(self):
   3374         for b in b'hello', b'goodbye', b'some/path/and/file':
   3375             self.assertEqual(b, self.fspath(b))
   3376 
   3377     def test_return_string(self):
   3378         for s in 'hello', 'goodbye', 'some/path/and/file':
   3379             self.assertEqual(s, self.fspath(s))
   3380 
   3381     def test_fsencode_fsdecode(self):
   3382         for p in "path/like/object", b"path/like/object":
   3383             pathlike = _PathLike(p)
   3384 
   3385             self.assertEqual(p, self.fspath(pathlike))
   3386             self.assertEqual(b"path/like/object", os.fsencode(pathlike))
   3387             self.assertEqual("path/like/object", os.fsdecode(pathlike))
   3388 
   3389     def test_pathlike(self):
   3390         self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
   3391         self.assertTrue(issubclass(_PathLike, os.PathLike))
   3392         self.assertTrue(isinstance(_PathLike(), os.PathLike))
   3393 
   3394     def test_garbage_in_exception_out(self):
   3395         vapor = type('blah', (), {})
   3396         for o in int, type, os, vapor():
   3397             self.assertRaises(TypeError, self.fspath, o)
   3398 
   3399     def test_argument_required(self):
   3400         self.assertRaises(TypeError, self.fspath)
   3401 
   3402     def test_bad_pathlike(self):
   3403         # __fspath__ returns a value other than str or bytes.
   3404         self.assertRaises(TypeError, self.fspath, _PathLike(42))
   3405         # __fspath__ attribute that is not callable.
   3406         c = type('foo', (), {})
   3407         c.__fspath__ = 1
   3408         self.assertRaises(TypeError, self.fspath, c())
   3409         # __fspath__ raises an exception.
   3410         self.assertRaises(ZeroDivisionError, self.fspath,
   3411                           _PathLike(ZeroDivisionError()))
   3412 
   3413 # Only test if the C version is provided, otherwise TestPEP519 already tested
   3414 # the pure Python implementation.
   3415 if hasattr(os, "_fspath"):
   3416     class TestPEP519PurePython(TestPEP519):
   3417 
   3418         """Explicitly test the pure Python implementation of os.fspath()."""
   3419 
   3420         fspath = staticmethod(os._fspath)
   3421 
   3422 
   3423 if __name__ == "__main__":
   3424     unittest.main()
   3425