Home | History | Annotate | Download | only in test
      1 "Test posix functions"
      2 
      3 from test import support
      4 android_not_root = support.android_not_root
      5 
      6 # Skip these tests if there is no posix module.
      7 posix = support.import_module('posix')
      8 
      9 import errno
     10 import sys
     11 import time
     12 import os
     13 import platform
     14 import pwd
     15 import stat
     16 import tempfile
     17 import unittest
     18 import warnings
     19 
     20 _DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
     21                               support.TESTFN + '-dummy-symlink')
     22 
     23 class PosixTester(unittest.TestCase):
     24 
     25     def setUp(self):
     26         # create empty file
     27         fp = open(support.TESTFN, 'w+')
     28         fp.close()
     29         self.teardown_files = [ support.TESTFN ]
     30         self._warnings_manager = support.check_warnings()
     31         self._warnings_manager.__enter__()
     32         warnings.filterwarnings('ignore', '.* potential security risk .*',
     33                                 RuntimeWarning)
     34 
     35     def tearDown(self):
     36         for teardown_file in self.teardown_files:
     37             support.unlink(teardown_file)
     38         self._warnings_manager.__exit__(None, None, None)
     39 
     40     def testNoArgFunctions(self):
     41         # test posix functions which take no arguments and have
     42         # no side-effects which we need to cleanup (e.g., fork, wait, abort)
     43         NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
     44                              "times", "getloadavg",
     45                              "getegid", "geteuid", "getgid", "getgroups",
     46                              "getpid", "getpgrp", "getppid", "getuid", "sync",
     47                            ]
     48 
     49         for name in NO_ARG_FUNCTIONS:
     50             posix_func = getattr(posix, name, None)
     51             if posix_func is not None:
     52                 posix_func()
     53                 self.assertRaises(TypeError, posix_func, 1)
     54 
     55     @unittest.skipUnless(hasattr(posix, 'getresuid'),
     56                          'test needs posix.getresuid()')
     57     def test_getresuid(self):
     58         user_ids = posix.getresuid()
     59         self.assertEqual(len(user_ids), 3)
     60         for val in user_ids:
     61             self.assertGreaterEqual(val, 0)
     62 
     63     @unittest.skipUnless(hasattr(posix, 'getresgid'),
     64                          'test needs posix.getresgid()')
     65     def test_getresgid(self):
     66         group_ids = posix.getresgid()
     67         self.assertEqual(len(group_ids), 3)
     68         for val in group_ids:
     69             self.assertGreaterEqual(val, 0)
     70 
     71     @unittest.skipUnless(hasattr(posix, 'setresuid'),
     72                          'test needs posix.setresuid()')
     73     def test_setresuid(self):
     74         current_user_ids = posix.getresuid()
     75         self.assertIsNone(posix.setresuid(*current_user_ids))
     76         # -1 means don't change that value.
     77         self.assertIsNone(posix.setresuid(-1, -1, -1))
     78 
     79     @unittest.skipUnless(hasattr(posix, 'setresuid'),
     80                          'test needs posix.setresuid()')
     81     def test_setresuid_exception(self):
     82         # Don't do this test if someone is silly enough to run us as root.
     83         current_user_ids = posix.getresuid()
     84         if 0 not in current_user_ids:
     85             new_user_ids = (current_user_ids[0]+1, -1, -1)
     86             self.assertRaises(OSError, posix.setresuid, *new_user_ids)
     87 
     88     @unittest.skipUnless(hasattr(posix, 'setresgid'),
     89                          'test needs posix.setresgid()')
     90     def test_setresgid(self):
     91         current_group_ids = posix.getresgid()
     92         self.assertIsNone(posix.setresgid(*current_group_ids))
     93         # -1 means don't change that value.
     94         self.assertIsNone(posix.setresgid(-1, -1, -1))
     95 
     96     @unittest.skipUnless(hasattr(posix, 'setresgid'),
     97                          'test needs posix.setresgid()')
     98     def test_setresgid_exception(self):
     99         # Don't do this test if someone is silly enough to run us as root.
    100         current_group_ids = posix.getresgid()
    101         if 0 not in current_group_ids:
    102             new_group_ids = (current_group_ids[0]+1, -1, -1)
    103             self.assertRaises(OSError, posix.setresgid, *new_group_ids)
    104 
    105     @unittest.skipUnless(hasattr(posix, 'initgroups'),
    106                          "test needs os.initgroups()")
    107     def test_initgroups(self):
    108         # It takes a string and an integer; check that it raises a TypeError
    109         # for other argument lists.
    110         self.assertRaises(TypeError, posix.initgroups)
    111         self.assertRaises(TypeError, posix.initgroups, None)
    112         self.assertRaises(TypeError, posix.initgroups, 3, "foo")
    113         self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
    114 
    115         # If a non-privileged user invokes it, it should fail with OSError
    116         # EPERM.
    117         if os.getuid() != 0:
    118             try:
    119                 name = pwd.getpwuid(posix.getuid()).pw_name
    120             except KeyError:
    121                 # the current UID may not have a pwd entry
    122                 raise unittest.SkipTest("need a pwd entry")
    123             try:
    124                 posix.initgroups(name, 13)
    125             except OSError as e:
    126                 self.assertEqual(e.errno, errno.EPERM)
    127             else:
    128                 self.fail("Expected OSError to be raised by initgroups")
    129 
    130     @unittest.skipUnless(hasattr(posix, 'statvfs'),
    131                          'test needs posix.statvfs()')
    132     def test_statvfs(self):
    133         self.assertTrue(posix.statvfs(os.curdir))
    134 
    135     @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
    136                          'test needs posix.fstatvfs()')
    137     def test_fstatvfs(self):
    138         fp = open(support.TESTFN)
    139         try:
    140             self.assertTrue(posix.fstatvfs(fp.fileno()))
    141             self.assertTrue(posix.statvfs(fp.fileno()))
    142         finally:
    143             fp.close()
    144 
    145     @unittest.skipUnless(hasattr(posix, 'ftruncate'),
    146                          'test needs posix.ftruncate()')
    147     def test_ftruncate(self):
    148         fp = open(support.TESTFN, 'w+')
    149         try:
    150             # we need to have some data to truncate
    151             fp.write('test')
    152             fp.flush()
    153             posix.ftruncate(fp.fileno(), 0)
    154         finally:
    155             fp.close()
    156 
    157     @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
    158     def test_truncate(self):
    159         with open(support.TESTFN, 'w') as fp:
    160             fp.write('test')
    161             fp.flush()
    162         posix.truncate(support.TESTFN, 0)
    163 
    164     @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
    165     @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
    166     @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
    167     def test_fexecve(self):
    168         fp = os.open(sys.executable, os.O_RDONLY)
    169         try:
    170             pid = os.fork()
    171             if pid == 0:
    172                 os.chdir(os.path.split(sys.executable)[0])
    173                 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
    174             else:
    175                 self.assertEqual(os.waitpid(pid, 0), (pid, 0))
    176         finally:
    177             os.close(fp)
    178 
    179     @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
    180     @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
    181     def test_waitid(self):
    182         pid = os.fork()
    183         if pid == 0:
    184             os.chdir(os.path.split(sys.executable)[0])
    185             posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
    186         else:
    187             res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
    188             self.assertEqual(pid, res.si_pid)
    189 
    190     @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
    191     def test_lockf(self):
    192         fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
    193         try:
    194             os.write(fd, b'test')
    195             os.lseek(fd, 0, os.SEEK_SET)
    196             posix.lockf(fd, posix.F_LOCK, 4)
    197             # section is locked
    198             posix.lockf(fd, posix.F_ULOCK, 4)
    199         finally:
    200             os.close(fd)
    201 
    202     @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
    203     def test_pread(self):
    204         fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
    205         try:
    206             os.write(fd, b'test')
    207             os.lseek(fd, 0, os.SEEK_SET)
    208             self.assertEqual(b'es', posix.pread(fd, 2, 1))
    209             # the first pread() shouldn't disturb the file offset
    210             self.assertEqual(b'te', posix.read(fd, 2))
    211         finally:
    212             os.close(fd)
    213 
    214     @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
    215     def test_pwrite(self):
    216         fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
    217         try:
    218             os.write(fd, b'test')
    219             os.lseek(fd, 0, os.SEEK_SET)
    220             posix.pwrite(fd, b'xx', 1)
    221             self.assertEqual(b'txxt', posix.read(fd, 4))
    222         finally:
    223             os.close(fd)
    224 
    225     @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
    226         "test needs posix.posix_fallocate()")
    227     def test_posix_fallocate(self):
    228         fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
    229         try:
    230             posix.posix_fallocate(fd, 0, 10)
    231         except OSError as inst:
    232             # issue10812, ZFS doesn't appear to support posix_fallocate,
    233             # so skip Solaris-based since they are likely to have ZFS.
    234             if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"):
    235                 raise
    236         finally:
    237             os.close(fd)
    238 
    239     @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
    240         "test needs posix.posix_fadvise()")
    241     def test_posix_fadvise(self):
    242         fd = os.open(support.TESTFN, os.O_RDONLY)
    243         try:
    244             posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
    245         finally:
    246             os.close(fd)
    247 
    248     @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
    249     def test_utime_with_fd(self):
    250         now = time.time()
    251         fd = os.open(support.TESTFN, os.O_RDONLY)
    252         try:
    253             posix.utime(fd)
    254             posix.utime(fd, None)
    255             self.assertRaises(TypeError, posix.utime, fd, (None, None))
    256             self.assertRaises(TypeError, posix.utime, fd, (now, None))
    257             self.assertRaises(TypeError, posix.utime, fd, (None, now))
    258             posix.utime(fd, (int(now), int(now)))
    259             posix.utime(fd, (now, now))
    260             self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
    261             self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
    262             self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
    263             posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
    264             posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
    265 
    266         finally:
    267             os.close(fd)
    268 
    269     @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
    270     def test_utime_nofollow_symlinks(self):
    271         now = time.time()
    272         posix.utime(support.TESTFN, None, follow_symlinks=False)
    273         self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False)
    274         self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False)
    275         self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False)
    276         posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False)
    277         posix.utime(support.TESTFN, (now, now), follow_symlinks=False)
    278         posix.utime(support.TESTFN, follow_symlinks=False)
    279 
    280     @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
    281     def test_writev(self):
    282         fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
    283         try:
    284             n = os.writev(fd, (b'test1', b'tt2', b't3'))
    285             self.assertEqual(n, 10)
    286 
    287             os.lseek(fd, 0, os.SEEK_SET)
    288             self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
    289 
    290             # Issue #20113: empty list of buffers should not crash
    291             try:
    292                 size = posix.writev(fd, [])
    293             except OSError:
    294                 # writev(fd, []) raises OSError(22, "Invalid argument")
    295                 # on OpenIndiana
    296                 pass
    297             else:
    298                 self.assertEqual(size, 0)
    299         finally:
    300             os.close(fd)
    301 
    302     @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
    303     def test_readv(self):
    304         fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
    305         try:
    306             os.write(fd, b'test1tt2t3')
    307             os.lseek(fd, 0, os.SEEK_SET)
    308             buf = [bytearray(i) for i in [5, 3, 2]]
    309             self.assertEqual(posix.readv(fd, buf), 10)
    310             self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
    311 
    312             # Issue #20113: empty list of buffers should not crash
    313             try:
    314                 size = posix.readv(fd, [])
    315             except OSError:
    316                 # readv(fd, []) raises OSError(22, "Invalid argument")
    317                 # on OpenIndiana
    318                 pass
    319             else:
    320                 self.assertEqual(size, 0)
    321         finally:
    322             os.close(fd)
    323 
    324     @unittest.skipUnless(hasattr(posix, 'dup'),
    325                          'test needs posix.dup()')
    326     def test_dup(self):
    327         fp = open(support.TESTFN)
    328         try:
    329             fd = posix.dup(fp.fileno())
    330             self.assertIsInstance(fd, int)
    331             os.close(fd)
    332         finally:
    333             fp.close()
    334 
    335     @unittest.skipUnless(hasattr(posix, 'confstr'),
    336                          'test needs posix.confstr()')
    337     def test_confstr(self):
    338         self.assertRaises(ValueError, posix.confstr, "CS_garbage")
    339         self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
    340 
    341     @unittest.skipUnless(hasattr(posix, 'dup2'),
    342                          'test needs posix.dup2()')
    343     def test_dup2(self):
    344         fp1 = open(support.TESTFN)
    345         fp2 = open(support.TESTFN)
    346         try:
    347             posix.dup2(fp1.fileno(), fp2.fileno())
    348         finally:
    349             fp1.close()
    350             fp2.close()
    351 
    352     @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
    353     @support.requires_linux_version(2, 6, 23)
    354     def test_oscloexec(self):
    355         fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
    356         self.addCleanup(os.close, fd)
    357         self.assertFalse(os.get_inheritable(fd))
    358 
    359     @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
    360                          'test needs posix.O_EXLOCK')
    361     def test_osexlock(self):
    362         fd = os.open(support.TESTFN,
    363                      os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
    364         self.assertRaises(OSError, os.open, support.TESTFN,
    365                           os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
    366         os.close(fd)
    367 
    368         if hasattr(posix, "O_SHLOCK"):
    369             fd = os.open(support.TESTFN,
    370                          os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
    371             self.assertRaises(OSError, os.open, support.TESTFN,
    372                               os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
    373             os.close(fd)
    374 
    375     @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
    376                          'test needs posix.O_SHLOCK')
    377     def test_osshlock(self):
    378         fd1 = os.open(support.TESTFN,
    379                      os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
    380         fd2 = os.open(support.TESTFN,
    381                       os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
    382         os.close(fd2)
    383         os.close(fd1)
    384 
    385         if hasattr(posix, "O_EXLOCK"):
    386             fd = os.open(support.TESTFN,
    387                          os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
    388             self.assertRaises(OSError, os.open, support.TESTFN,
    389                               os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
    390             os.close(fd)
    391 
    392     @unittest.skipUnless(hasattr(posix, 'fstat'),
    393                          'test needs posix.fstat()')
    394     def test_fstat(self):
    395         fp = open(support.TESTFN)
    396         try:
    397             self.assertTrue(posix.fstat(fp.fileno()))
    398             self.assertTrue(posix.stat(fp.fileno()))
    399 
    400             self.assertRaisesRegex(TypeError,
    401                     'should be string, bytes, os.PathLike or integer, not',
    402                     posix.stat, float(fp.fileno()))
    403         finally:
    404             fp.close()
    405 
    406     @unittest.skipUnless(hasattr(posix, 'stat'),
    407                          'test needs posix.stat()')
    408     def test_stat(self):
    409         self.assertTrue(posix.stat(support.TESTFN))
    410         self.assertTrue(posix.stat(os.fsencode(support.TESTFN)))
    411 
    412         self.assertWarnsRegex(DeprecationWarning,
    413                 'should be string, bytes, os.PathLike or integer, not',
    414                 posix.stat, bytearray(os.fsencode(support.TESTFN)))
    415         self.assertRaisesRegex(TypeError,
    416                 'should be string, bytes, os.PathLike or integer, not',
    417                 posix.stat, None)
    418         self.assertRaisesRegex(TypeError,
    419                 'should be string, bytes, os.PathLike or integer, not',
    420                 posix.stat, list(support.TESTFN))
    421         self.assertRaisesRegex(TypeError,
    422                 'should be string, bytes, os.PathLike or integer, not',
    423                 posix.stat, list(os.fsencode(support.TESTFN)))
    424 
    425     @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
    426     @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
    427     def test_mkfifo(self):
    428         support.unlink(support.TESTFN)
    429         posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
    430         self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
    431 
    432     @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
    433                          "don't have mknod()/S_IFIFO")
    434     @unittest.skipIf(android_not_root, "mknod not allowed, non root user")
    435     def test_mknod(self):
    436         # Test using mknod() to create a FIFO (the only use specified
    437         # by POSIX).
    438         support.unlink(support.TESTFN)
    439         mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
    440         try:
    441             posix.mknod(support.TESTFN, mode, 0)
    442         except OSError as e:
    443             # Some old systems don't allow unprivileged users to use
    444             # mknod(), or only support creating device nodes.
    445             self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
    446         else:
    447             self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
    448 
    449         # Keyword arguments are also supported
    450         support.unlink(support.TESTFN)
    451         try:
    452             posix.mknod(path=support.TESTFN, mode=mode, device=0,
    453                 dir_fd=None)
    454         except OSError as e:
    455             self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
    456 
    457     @unittest.skipUnless(hasattr(posix, 'stat'), 'test needs posix.stat()')
    458     @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')
    459     def test_makedev(self):
    460         st = posix.stat(support.TESTFN)
    461         dev = st.st_dev
    462         self.assertIsInstance(dev, int)
    463         self.assertGreaterEqual(dev, 0)
    464 
    465         major = posix.major(dev)
    466         self.assertIsInstance(major, int)
    467         self.assertGreaterEqual(major, 0)
    468         self.assertEqual(posix.major(dev), major)
    469         self.assertRaises(TypeError, posix.major, float(dev))
    470         self.assertRaises(TypeError, posix.major)
    471         self.assertRaises((ValueError, OverflowError), posix.major, -1)
    472 
    473         minor = posix.minor(dev)
    474         self.assertIsInstance(minor, int)
    475         self.assertGreaterEqual(minor, 0)
    476         self.assertEqual(posix.minor(dev), minor)
    477         self.assertRaises(TypeError, posix.minor, float(dev))
    478         self.assertRaises(TypeError, posix.minor)
    479         self.assertRaises((ValueError, OverflowError), posix.minor, -1)
    480 
    481         self.assertEqual(posix.makedev(major, minor), dev)
    482         self.assertRaises(TypeError, posix.makedev, float(major), minor)
    483         self.assertRaises(TypeError, posix.makedev, major, float(minor))
    484         self.assertRaises(TypeError, posix.makedev, major)
    485         self.assertRaises(TypeError, posix.makedev)
    486 
    487     def _test_all_chown_common(self, chown_func, first_param, stat_func):
    488         """Common code for chown, fchown and lchown tests."""
    489         def check_stat(uid, gid):
    490             if stat_func is not None:
    491                 stat = stat_func(first_param)
    492                 self.assertEqual(stat.st_uid, uid)
    493                 self.assertEqual(stat.st_gid, gid)
    494         uid = os.getuid()
    495         gid = os.getgid()
    496         # test a successful chown call
    497         chown_func(first_param, uid, gid)
    498         check_stat(uid, gid)
    499         chown_func(first_param, -1, gid)
    500         check_stat(uid, gid)
    501         chown_func(first_param, uid, -1)
    502         check_stat(uid, gid)
    503 
    504         if uid == 0:
    505             # Try an amusingly large uid/gid to make sure we handle
    506             # large unsigned values.  (chown lets you use any
    507             # uid/gid you like, even if they aren't defined.)
    508             #
    509             # This problem keeps coming up:
    510             #   http://bugs.python.org/issue1747858
    511             #   http://bugs.python.org/issue4591
    512             #   http://bugs.python.org/issue15301
    513             # Hopefully the fix in 4591 fixes it for good!
    514             #
    515             # This part of the test only runs when run as root.
    516             # Only scary people run their tests as root.
    517 
    518             big_value = 2**31
    519             chown_func(first_param, big_value, big_value)
    520             check_stat(big_value, big_value)
    521             chown_func(first_param, -1, -1)
    522             check_stat(big_value, big_value)
    523             chown_func(first_param, uid, gid)
    524             check_stat(uid, gid)
    525         elif platform.system() in ('HP-UX', 'SunOS'):
    526             # HP-UX and Solaris can allow a non-root user to chown() to root
    527             # (issue #5113)
    528             raise unittest.SkipTest("Skipping because of non-standard chown() "
    529                                     "behavior")
    530         else:
    531             # non-root cannot chown to root, raises OSError
    532             self.assertRaises(OSError, chown_func, first_param, 0, 0)
    533             check_stat(uid, gid)
    534             self.assertRaises(OSError, chown_func, first_param, 0, -1)
    535             check_stat(uid, gid)
    536             if 0 not in os.getgroups():
    537                 self.assertRaises(OSError, chown_func, first_param, -1, 0)
    538                 check_stat(uid, gid)
    539         # test illegal types
    540         for t in str, float:
    541             self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
    542             check_stat(uid, gid)
    543             self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
    544             check_stat(uid, gid)
    545 
    546     @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
    547     def test_chown(self):
    548         # raise an OSError if the file does not exist
    549         os.unlink(support.TESTFN)
    550         self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1)
    551 
    552         # re-create the file
    553         support.create_empty_file(support.TESTFN)
    554         self._test_all_chown_common(posix.chown, support.TESTFN,
    555                                     getattr(posix, 'stat', None))
    556 
    557     @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
    558     def test_fchown(self):
    559         os.unlink(support.TESTFN)
    560 
    561         # re-create the file
    562         test_file = open(support.TESTFN, 'w')
    563         try:
    564             fd = test_file.fileno()
    565             self._test_all_chown_common(posix.fchown, fd,
    566                                         getattr(posix, 'fstat', None))
    567         finally:
    568             test_file.close()
    569 
    570     @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
    571     def test_lchown(self):
    572         os.unlink(support.TESTFN)
    573         # create a symlink
    574         os.symlink(_DUMMY_SYMLINK, support.TESTFN)
    575         self._test_all_chown_common(posix.lchown, support.TESTFN,
    576                                     getattr(posix, 'lstat', None))
    577 
    578     @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
    579     def test_chdir(self):
    580         posix.chdir(os.curdir)
    581         self.assertRaises(OSError, posix.chdir, support.TESTFN)
    582 
    583     def test_listdir(self):
    584         self.assertTrue(support.TESTFN in posix.listdir(os.curdir))
    585 
    586     def test_listdir_default(self):
    587         # When listdir is called without argument,
    588         # it's the same as listdir(os.curdir).
    589         self.assertTrue(support.TESTFN in posix.listdir())
    590 
    591     def test_listdir_bytes(self):
    592         # When listdir is called with a bytes object,
    593         # the returned strings are of type bytes.
    594         self.assertTrue(os.fsencode(support.TESTFN) in posix.listdir(b'.'))
    595 
    596     @unittest.skipUnless(posix.listdir in os.supports_fd,
    597                          "test needs fd support for posix.listdir()")
    598     def test_listdir_fd(self):
    599         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    600         self.addCleanup(posix.close, f)
    601         self.assertEqual(
    602             sorted(posix.listdir('.')),
    603             sorted(posix.listdir(f))
    604             )
    605         # Check that the fd offset was reset (issue #13739)
    606         self.assertEqual(
    607             sorted(posix.listdir('.')),
    608             sorted(posix.listdir(f))
    609             )
    610 
    611     @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
    612     def test_access(self):
    613         self.assertTrue(posix.access(support.TESTFN, os.R_OK))
    614 
    615     @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
    616     def test_umask(self):
    617         old_mask = posix.umask(0)
    618         self.assertIsInstance(old_mask, int)
    619         posix.umask(old_mask)
    620 
    621     @unittest.skipUnless(hasattr(posix, 'strerror'),
    622                          'test needs posix.strerror()')
    623     def test_strerror(self):
    624         self.assertTrue(posix.strerror(0))
    625 
    626     @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
    627     def test_pipe(self):
    628         reader, writer = posix.pipe()
    629         os.close(reader)
    630         os.close(writer)
    631 
    632     @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
    633     @support.requires_linux_version(2, 6, 27)
    634     def test_pipe2(self):
    635         self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
    636         self.assertRaises(TypeError, os.pipe2, 0, 0)
    637 
    638         # try calling with flags = 0, like os.pipe()
    639         r, w = os.pipe2(0)
    640         os.close(r)
    641         os.close(w)
    642 
    643         # test flags
    644         r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
    645         self.addCleanup(os.close, r)
    646         self.addCleanup(os.close, w)
    647         self.assertFalse(os.get_inheritable(r))
    648         self.assertFalse(os.get_inheritable(w))
    649         self.assertFalse(os.get_blocking(r))
    650         self.assertFalse(os.get_blocking(w))
    651         # try reading from an empty pipe: this should fail, not block
    652         self.assertRaises(OSError, os.read, r, 1)
    653         # try a write big enough to fill-up the pipe: this should either
    654         # fail or perform a partial write, not block
    655         try:
    656             os.write(w, b'x' * support.PIPE_MAX_SIZE)
    657         except OSError:
    658             pass
    659 
    660     @support.cpython_only
    661     @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
    662     @support.requires_linux_version(2, 6, 27)
    663     def test_pipe2_c_limits(self):
    664         # Issue 15989
    665         import _testcapi
    666         self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
    667         self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
    668 
    669     @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
    670     def test_utime(self):
    671         now = time.time()
    672         posix.utime(support.TESTFN, None)
    673         self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None))
    674         self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None))
    675         self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now))
    676         posix.utime(support.TESTFN, (int(now), int(now)))
    677         posix.utime(support.TESTFN, (now, now))
    678 
    679     def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
    680         st = os.stat(target_file)
    681         self.assertTrue(hasattr(st, 'st_flags'))
    682 
    683         # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
    684         flags = st.st_flags | stat.UF_IMMUTABLE
    685         try:
    686             chflags_func(target_file, flags, **kwargs)
    687         except OSError as err:
    688             if err.errno != errno.EOPNOTSUPP:
    689                 raise
    690             msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
    691             self.skipTest(msg)
    692 
    693         try:
    694             new_st = os.stat(target_file)
    695             self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
    696             try:
    697                 fd = open(target_file, 'w+')
    698             except OSError as e:
    699                 self.assertEqual(e.errno, errno.EPERM)
    700         finally:
    701             posix.chflags(target_file, st.st_flags)
    702 
    703     @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
    704     def test_chflags(self):
    705         self._test_chflags_regular_file(posix.chflags, support.TESTFN)
    706 
    707     @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
    708     def test_lchflags_regular_file(self):
    709         self._test_chflags_regular_file(posix.lchflags, support.TESTFN)
    710         self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False)
    711 
    712     @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
    713     def test_lchflags_symlink(self):
    714         testfn_st = os.stat(support.TESTFN)
    715 
    716         self.assertTrue(hasattr(testfn_st, 'st_flags'))
    717 
    718         os.symlink(support.TESTFN, _DUMMY_SYMLINK)
    719         self.teardown_files.append(_DUMMY_SYMLINK)
    720         dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
    721 
    722         def chflags_nofollow(path, flags):
    723             return posix.chflags(path, flags, follow_symlinks=False)
    724 
    725         for fn in (posix.lchflags, chflags_nofollow):
    726             # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
    727             flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
    728             try:
    729                 fn(_DUMMY_SYMLINK, flags)
    730             except OSError as err:
    731                 if err.errno != errno.EOPNOTSUPP:
    732                     raise
    733                 msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
    734                 self.skipTest(msg)
    735             try:
    736                 new_testfn_st = os.stat(support.TESTFN)
    737                 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
    738 
    739                 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
    740                 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
    741                                  new_dummy_symlink_st.st_flags)
    742             finally:
    743                 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
    744 
    745     def test_environ(self):
    746         if os.name == "nt":
    747             item_type = str
    748         else:
    749             item_type = bytes
    750         for k, v in posix.environ.items():
    751             self.assertEqual(type(k), item_type)
    752             self.assertEqual(type(v), item_type)
    753 
    754     @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
    755     def test_getcwd_long_pathnames(self):
    756         dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
    757         curdir = os.getcwd()
    758         base_path = os.path.abspath(support.TESTFN) + '.getcwd'
    759 
    760         try:
    761             os.mkdir(base_path)
    762             os.chdir(base_path)
    763         except:
    764             #  Just returning nothing instead of the SkipTest exception, because
    765             #  the test results in Error in that case.  Is that ok?
    766             #  raise unittest.SkipTest("cannot create directory for testing")
    767             return
    768 
    769             def _create_and_do_getcwd(dirname, current_path_length = 0):
    770                 try:
    771                     os.mkdir(dirname)
    772                 except:
    773                     raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
    774 
    775                 os.chdir(dirname)
    776                 try:
    777                     os.getcwd()
    778                     if current_path_length < 1027:
    779                         _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
    780                 finally:
    781                     os.chdir('..')
    782                     os.rmdir(dirname)
    783 
    784             _create_and_do_getcwd(dirname)
    785 
    786         finally:
    787             os.chdir(curdir)
    788             support.rmtree(base_path)
    789 
    790     @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
    791     @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
    792     @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
    793     def test_getgrouplist(self):
    794         user = pwd.getpwuid(os.getuid())[0]
    795         group = pwd.getpwuid(os.getuid())[3]
    796         self.assertIn(group, posix.getgrouplist(user, group))
    797 
    798 
    799     @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
    800     def test_getgroups(self):
    801         with os.popen('id -G 2>/dev/null') as idg:
    802             groups = idg.read().strip()
    803             ret = idg.close()
    804 
    805         try:
    806             idg_groups = set(int(g) for g in groups.split())
    807         except ValueError:
    808             idg_groups = set()
    809         if ret is not None or not idg_groups:
    810             raise unittest.SkipTest("need working 'id -G'")
    811 
    812         # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
    813         if sys.platform == 'darwin':
    814             import sysconfig
    815             dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0'
    816             if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6):
    817                 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
    818 
    819         # 'id -G' and 'os.getgroups()' should return the same
    820         # groups, ignoring order, duplicates, and the effective gid.
    821         # #10822/#26944 - It is implementation defined whether
    822         # posix.getgroups() includes the effective gid.
    823         symdiff = idg_groups.symmetric_difference(posix.getgroups())
    824         self.assertTrue(not symdiff or symdiff == {posix.getegid()})
    825 
    826     # tests for the posix *at functions follow
    827 
    828     @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
    829     def test_access_dir_fd(self):
    830         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    831         try:
    832             self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f))
    833         finally:
    834             posix.close(f)
    835 
    836     @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
    837     def test_chmod_dir_fd(self):
    838         os.chmod(support.TESTFN, stat.S_IRUSR)
    839 
    840         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    841         try:
    842             posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
    843 
    844             s = posix.stat(support.TESTFN)
    845             self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
    846         finally:
    847             posix.close(f)
    848 
    849     @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()")
    850     def test_chown_dir_fd(self):
    851         support.unlink(support.TESTFN)
    852         support.create_empty_file(support.TESTFN)
    853 
    854         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    855         try:
    856             posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f)
    857         finally:
    858             posix.close(f)
    859 
    860     @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
    861     def test_stat_dir_fd(self):
    862         support.unlink(support.TESTFN)
    863         with open(support.TESTFN, 'w') as outfile:
    864             outfile.write("testline\n")
    865 
    866         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    867         try:
    868             s1 = posix.stat(support.TESTFN)
    869             s2 = posix.stat(support.TESTFN, dir_fd=f)
    870             self.assertEqual(s1, s2)
    871             s2 = posix.stat(support.TESTFN, dir_fd=None)
    872             self.assertEqual(s1, s2)
    873             self.assertRaisesRegex(TypeError, 'should be integer or None, not',
    874                     posix.stat, support.TESTFN, dir_fd=posix.getcwd())
    875             self.assertRaisesRegex(TypeError, 'should be integer or None, not',
    876                     posix.stat, support.TESTFN, dir_fd=float(f))
    877             self.assertRaises(OverflowError,
    878                     posix.stat, support.TESTFN, dir_fd=10**20)
    879         finally:
    880             posix.close(f)
    881 
    882     @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
    883     def test_utime_dir_fd(self):
    884         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    885         try:
    886             now = time.time()
    887             posix.utime(support.TESTFN, None, dir_fd=f)
    888             posix.utime(support.TESTFN, dir_fd=f)
    889             self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f)
    890             self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f)
    891             self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f)
    892             self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f)
    893             self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f)
    894             posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f)
    895             posix.utime(support.TESTFN, (now, now), dir_fd=f)
    896             posix.utime(support.TESTFN,
    897                     (int(now), int((now - int(now)) * 1e9)), dir_fd=f)
    898             posix.utime(support.TESTFN, dir_fd=f,
    899                             times=(int(now), int((now - int(now)) * 1e9)))
    900 
    901             # try dir_fd and follow_symlinks together
    902             if os.utime in os.supports_follow_symlinks:
    903                 try:
    904                     posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f)
    905                 except ValueError:
    906                     # whoops!  using both together not supported on this platform.
    907                     pass
    908 
    909         finally:
    910             posix.close(f)
    911 
    912     @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
    913     @unittest.skipIf(android_not_root, "hard link not allowed, non root user")
    914     def test_link_dir_fd(self):
    915         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    916         try:
    917             posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f)
    918             # should have same inodes
    919             self.assertEqual(posix.stat(support.TESTFN)[1],
    920                 posix.stat(support.TESTFN + 'link')[1])
    921         finally:
    922             posix.close(f)
    923             support.unlink(support.TESTFN + 'link')
    924 
    925     @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
    926     def test_mkdir_dir_fd(self):
    927         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    928         try:
    929             posix.mkdir(support.TESTFN + 'dir', dir_fd=f)
    930             posix.stat(support.TESTFN + 'dir') # should not raise exception
    931         finally:
    932             posix.close(f)
    933             support.rmtree(support.TESTFN + 'dir')
    934 
    935     @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'),
    936                          "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
    937     @unittest.skipIf(android_not_root, "mknod not allowed, non root user")
    938     def test_mknod_dir_fd(self):
    939         # Test using mknodat() to create a FIFO (the only use specified
    940         # by POSIX).
    941         support.unlink(support.TESTFN)
    942         mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
    943         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    944         try:
    945             posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
    946         except OSError as e:
    947             # Some old systems don't allow unprivileged users to use
    948             # mknod(), or only support creating device nodes.
    949             self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
    950         else:
    951             self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
    952         finally:
    953             posix.close(f)
    954 
    955     @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
    956     def test_open_dir_fd(self):
    957         support.unlink(support.TESTFN)
    958         with open(support.TESTFN, 'w') as outfile:
    959             outfile.write("testline\n")
    960         a = posix.open(posix.getcwd(), posix.O_RDONLY)
    961         b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a)
    962         try:
    963             res = posix.read(b, 9).decode(encoding="utf-8")
    964             self.assertEqual("testline\n", res)
    965         finally:
    966             posix.close(a)
    967             posix.close(b)
    968 
    969     @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()")
    970     def test_readlink_dir_fd(self):
    971         os.symlink(support.TESTFN, support.TESTFN + 'link')
    972         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    973         try:
    974             self.assertEqual(posix.readlink(support.TESTFN + 'link'),
    975                 posix.readlink(support.TESTFN + 'link', dir_fd=f))
    976         finally:
    977             support.unlink(support.TESTFN + 'link')
    978             posix.close(f)
    979 
    980     @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
    981     def test_rename_dir_fd(self):
    982         support.unlink(support.TESTFN)
    983         support.create_empty_file(support.TESTFN + 'ren')
    984         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    985         try:
    986             posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f)
    987         except:
    988             posix.rename(support.TESTFN + 'ren', support.TESTFN)
    989             raise
    990         else:
    991             posix.stat(support.TESTFN) # should not raise exception
    992         finally:
    993             posix.close(f)
    994 
    995     @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
    996     def test_symlink_dir_fd(self):
    997         f = posix.open(posix.getcwd(), posix.O_RDONLY)
    998         try:
    999             posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f)
   1000             self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
   1001         finally:
   1002             posix.close(f)
   1003             support.unlink(support.TESTFN + 'link')
   1004 
   1005     @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
   1006     def test_unlink_dir_fd(self):
   1007         f = posix.open(posix.getcwd(), posix.O_RDONLY)
   1008         support.create_empty_file(support.TESTFN + 'del')
   1009         posix.stat(support.TESTFN + 'del') # should not raise exception
   1010         try:
   1011             posix.unlink(support.TESTFN + 'del', dir_fd=f)
   1012         except:
   1013             support.unlink(support.TESTFN + 'del')
   1014             raise
   1015         else:
   1016             self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
   1017         finally:
   1018             posix.close(f)
   1019 
   1020     @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
   1021     @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")
   1022     def test_mkfifo_dir_fd(self):
   1023         support.unlink(support.TESTFN)
   1024         f = posix.open(posix.getcwd(), posix.O_RDONLY)
   1025         try:
   1026             posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
   1027             self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
   1028         finally:
   1029             posix.close(f)
   1030 
   1031     requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
   1032                                            "don't have scheduling support")
   1033     requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
   1034                                                   "don't have sched affinity support")
   1035 
   1036     @requires_sched_h
   1037     def test_sched_yield(self):
   1038         # This has no error conditions (at least on Linux).
   1039         posix.sched_yield()
   1040 
   1041     @requires_sched_h
   1042     @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
   1043                          "requires sched_get_priority_max()")
   1044     def test_sched_priority(self):
   1045         # Round-robin usually has interesting priorities.
   1046         pol = posix.SCHED_RR
   1047         lo = posix.sched_get_priority_min(pol)
   1048         hi = posix.sched_get_priority_max(pol)
   1049         self.assertIsInstance(lo, int)
   1050         self.assertIsInstance(hi, int)
   1051         self.assertGreaterEqual(hi, lo)
   1052         # OSX evidently just returns 15 without checking the argument.
   1053         if sys.platform != "darwin":
   1054             self.assertRaises(OSError, posix.sched_get_priority_min, -23)
   1055             self.assertRaises(OSError, posix.sched_get_priority_max, -23)
   1056 
   1057     @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler")
   1058     def test_get_and_set_scheduler_and_param(self):
   1059         possible_schedulers = [sched for name, sched in posix.__dict__.items()
   1060                                if name.startswith("SCHED_")]
   1061         mine = posix.sched_getscheduler(0)
   1062         self.assertIn(mine, possible_schedulers)
   1063         try:
   1064             parent = posix.sched_getscheduler(os.getppid())
   1065         except OSError as e:
   1066             if e.errno != errno.EPERM:
   1067                 raise
   1068         else:
   1069             self.assertIn(parent, possible_schedulers)
   1070         self.assertRaises(OSError, posix.sched_getscheduler, -1)
   1071         self.assertRaises(OSError, posix.sched_getparam, -1)
   1072         param = posix.sched_getparam(0)
   1073         self.assertIsInstance(param.sched_priority, int)
   1074 
   1075         # POSIX states that calling sched_setparam() or sched_setscheduler() on
   1076         # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
   1077         # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
   1078         if not sys.platform.startswith(('freebsd', 'netbsd')):
   1079             try:
   1080                 posix.sched_setscheduler(0, mine, param)
   1081                 posix.sched_setparam(0, param)
   1082             except OSError as e:
   1083                 if e.errno != errno.EPERM:
   1084                     raise
   1085             self.assertRaises(OSError, posix.sched_setparam, -1, param)
   1086 
   1087         self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
   1088         self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
   1089         self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
   1090         param = posix.sched_param(None)
   1091         self.assertRaises(TypeError, posix.sched_setparam, 0, param)
   1092         large = 214748364700
   1093         param = posix.sched_param(large)
   1094         self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
   1095         param = posix.sched_param(sched_priority=-large)
   1096         self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
   1097 
   1098     @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
   1099     def test_sched_rr_get_interval(self):
   1100         try:
   1101             interval = posix.sched_rr_get_interval(0)
   1102         except OSError as e:
   1103             # This likely means that sched_rr_get_interval is only valid for
   1104             # processes with the SCHED_RR scheduler in effect.
   1105             if e.errno != errno.EINVAL:
   1106                 raise
   1107             self.skipTest("only works on SCHED_RR processes")
   1108         self.assertIsInstance(interval, float)
   1109         # Reasonable constraints, I think.
   1110         self.assertGreaterEqual(interval, 0.)
   1111         self.assertLess(interval, 1.)
   1112 
   1113     @requires_sched_affinity
   1114     def test_sched_getaffinity(self):
   1115         mask = posix.sched_getaffinity(0)
   1116         self.assertIsInstance(mask, set)
   1117         self.assertGreaterEqual(len(mask), 1)
   1118         self.assertRaises(OSError, posix.sched_getaffinity, -1)
   1119         for cpu in mask:
   1120             self.assertIsInstance(cpu, int)
   1121             self.assertGreaterEqual(cpu, 0)
   1122             self.assertLess(cpu, 1 << 32)
   1123 
   1124     @requires_sched_affinity
   1125     def test_sched_setaffinity(self):
   1126         mask = posix.sched_getaffinity(0)
   1127         if len(mask) > 1:
   1128             # Empty masks are forbidden
   1129             mask.pop()
   1130         posix.sched_setaffinity(0, mask)
   1131         self.assertEqual(posix.sched_getaffinity(0), mask)
   1132         self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
   1133         self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
   1134         self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
   1135         self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
   1136 
   1137     def test_rtld_constants(self):
   1138         # check presence of major RTLD_* constants
   1139         posix.RTLD_LAZY
   1140         posix.RTLD_NOW
   1141         posix.RTLD_GLOBAL
   1142         posix.RTLD_LOCAL
   1143 
   1144     @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
   1145                          "test needs an OS that reports file holes")
   1146     def test_fs_holes(self):
   1147         # Even if the filesystem doesn't report holes,
   1148         # if the OS supports it the SEEK_* constants
   1149         # will be defined and will have a consistent
   1150         # behaviour:
   1151         # os.SEEK_DATA = current position
   1152         # os.SEEK_HOLE = end of file position
   1153         with open(support.TESTFN, 'r+b') as fp:
   1154             fp.write(b"hello")
   1155             fp.flush()
   1156             size = fp.tell()
   1157             fno = fp.fileno()
   1158             try :
   1159                 for i in range(size):
   1160                     self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
   1161                     self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
   1162                 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
   1163                 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
   1164             except OSError :
   1165                 # Some OSs claim to support SEEK_HOLE/SEEK_DATA
   1166                 # but it is not true.
   1167                 # For instance:
   1168                 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
   1169                 raise unittest.SkipTest("OSError raised!")
   1170 
   1171     def test_path_error2(self):
   1172         """
   1173         Test functions that call path_error2(), providing two filenames in their exceptions.
   1174         """
   1175         for name in ("rename", "replace", "link"):
   1176             function = getattr(os, name, None)
   1177             if function is None:
   1178                 continue
   1179 
   1180             for dst in ("noodly2", support.TESTFN):
   1181                 try:
   1182                     function('doesnotexistfilename', dst)
   1183                 except OSError as e:
   1184                     self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))
   1185                     break
   1186             else:
   1187                 self.fail("No valid path_error2() test for os." + name)
   1188 
   1189     def test_path_with_null_character(self):
   1190         fn = support.TESTFN
   1191         fn_with_NUL = fn + '\0'
   1192         self.addCleanup(support.unlink, fn)
   1193         support.unlink(fn)
   1194         fd = None
   1195         try:
   1196             with self.assertRaises(ValueError):
   1197                 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
   1198         finally:
   1199             if fd is not None:
   1200                 os.close(fd)
   1201         self.assertFalse(os.path.exists(fn))
   1202         self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
   1203         self.assertFalse(os.path.exists(fn))
   1204         open(fn, 'wb').close()
   1205         self.assertRaises(ValueError, os.stat, fn_with_NUL)
   1206 
   1207     def test_path_with_null_byte(self):
   1208         fn = os.fsencode(support.TESTFN)
   1209         fn_with_NUL = fn + b'\0'
   1210         self.addCleanup(support.unlink, fn)
   1211         support.unlink(fn)
   1212         fd = None
   1213         try:
   1214             with self.assertRaises(ValueError):
   1215                 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
   1216         finally:
   1217             if fd is not None:
   1218                 os.close(fd)
   1219         self.assertFalse(os.path.exists(fn))
   1220         self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
   1221         self.assertFalse(os.path.exists(fn))
   1222         open(fn, 'wb').close()
   1223         self.assertRaises(ValueError, os.stat, fn_with_NUL)
   1224 
   1225 class PosixGroupsTester(unittest.TestCase):
   1226 
   1227     def setUp(self):
   1228         if posix.getuid() != 0:
   1229             raise unittest.SkipTest("not enough privileges")
   1230         if not hasattr(posix, 'getgroups'):
   1231             raise unittest.SkipTest("need posix.getgroups")
   1232         if sys.platform == 'darwin':
   1233             raise unittest.SkipTest("getgroups(2) is broken on OSX")
   1234         self.saved_groups = posix.getgroups()
   1235 
   1236     def tearDown(self):
   1237         if hasattr(posix, 'setgroups'):
   1238             posix.setgroups(self.saved_groups)
   1239         elif hasattr(posix, 'initgroups'):
   1240             name = pwd.getpwuid(posix.getuid()).pw_name
   1241             posix.initgroups(name, self.saved_groups[0])
   1242 
   1243     @unittest.skipUnless(hasattr(posix, 'initgroups'),
   1244                          "test needs posix.initgroups()")
   1245     def test_initgroups(self):
   1246         # find missing group
   1247 
   1248         g = max(self.saved_groups or [0]) + 1
   1249         name = pwd.getpwuid(posix.getuid()).pw_name
   1250         posix.initgroups(name, g)
   1251         self.assertIn(g, posix.getgroups())
   1252 
   1253     @unittest.skipUnless(hasattr(posix, 'setgroups'),
   1254                          "test needs posix.setgroups()")
   1255     def test_setgroups(self):
   1256         for groups in [[0], list(range(16))]:
   1257             posix.setgroups(groups)
   1258             self.assertListEqual(groups, posix.getgroups())
   1259 
   1260 def test_main():
   1261     try:
   1262         support.run_unittest(PosixTester, PosixGroupsTester)
   1263     finally:
   1264         support.reap_children()
   1265 
   1266 if __name__ == '__main__':
   1267     test_main()
   1268