Home | History | Annotate | Download | only in test
      1 "Test posix functions"
      2 
      3 from test import test_support
      4 
      5 # Skip these tests if there is no posix module.
      6 posix = test_support.import_module('posix')
      7 
      8 import errno
      9 import sys
     10 import time
     11 import os
     12 import platform
     13 import pwd
     14 import shutil
     15 import stat
     16 import sys
     17 import tempfile
     18 import unittest
     19 import warnings
     20 
     21 _DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
     22                               test_support.TESTFN + '-dummy-symlink')
     23 
     24 warnings.filterwarnings('ignore', '.* potential security risk .*',
     25                         RuntimeWarning)
     26 
     27 class PosixTester(unittest.TestCase):
     28 
     29     def setUp(self):
     30         # create empty file
     31         fp = open(test_support.TESTFN, 'w+')
     32         fp.close()
     33         self.teardown_files = [ test_support.TESTFN ]
     34 
     35     def tearDown(self):
     36         for teardown_file in self.teardown_files:
     37             os.unlink(teardown_file)
     38 
     39     def testNoArgFunctions(self):
     40         # test posix functions which take no arguments and have
     41         # no side-effects which we need to cleanup (e.g., fork, wait, abort)
     42         NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdu", "uname",
     43                              "times", "getloadavg", "tmpnam",
     44                              "getegid", "geteuid", "getgid", "getgroups",
     45                              "getpid", "getpgrp", "getppid", "getuid",
     46                            ]
     47 
     48         with warnings.catch_warnings():
     49             warnings.filterwarnings("ignore", "", DeprecationWarning)
     50             for name in NO_ARG_FUNCTIONS:
     51                 posix_func = getattr(posix, name, None)
     52                 if posix_func is not None:
     53                     posix_func()
     54                     self.assertRaises(TypeError, posix_func, 1)
     55 
     56     if hasattr(posix, 'getresuid'):
     57         def test_getresuid(self):
     58             user_ids = posix.getresuid()
     59             self.assertEqual(len(user_ids), 3)
     60             for val in user_ids:
     61                 self.assertGreaterEqual(val, 0)
     62 
     63     if hasattr(posix, 'getresgid'):
     64         def test_getresgid(self):
     65             group_ids = posix.getresgid()
     66             self.assertEqual(len(group_ids), 3)
     67             for val in group_ids:
     68                 self.assertGreaterEqual(val, 0)
     69 
     70     if hasattr(posix, 'setresuid'):
     71         def test_setresuid(self):
     72             current_user_ids = posix.getresuid()
     73             self.assertIsNone(posix.setresuid(*current_user_ids))
     74             # -1 means don't change that value.
     75             self.assertIsNone(posix.setresuid(-1, -1, -1))
     76 
     77         def test_setresuid_exception(self):
     78             # Don't do this test if someone is silly enough to run us as root.
     79             current_user_ids = posix.getresuid()
     80             if 0 not in current_user_ids:
     81                 new_user_ids = (current_user_ids[0]+1, -1, -1)
     82                 self.assertRaises(OSError, posix.setresuid, *new_user_ids)
     83 
     84     if hasattr(posix, 'setresgid'):
     85         def test_setresgid(self):
     86             current_group_ids = posix.getresgid()
     87             self.assertIsNone(posix.setresgid(*current_group_ids))
     88             # -1 means don't change that value.
     89             self.assertIsNone(posix.setresgid(-1, -1, -1))
     90 
     91         def test_setresgid_exception(self):
     92             # Don't do this test if someone is silly enough to run us as root.
     93             current_group_ids = posix.getresgid()
     94             if 0 not in current_group_ids:
     95                 new_group_ids = (current_group_ids[0]+1, -1, -1)
     96                 self.assertRaises(OSError, posix.setresgid, *new_group_ids)
     97 
     98     @unittest.skipUnless(hasattr(posix, 'initgroups'),
     99                          "test needs os.initgroups()")
    100     def test_initgroups(self):
    101         # It takes a string and an integer; check that it raises a TypeError
    102         # for other argument lists.
    103         self.assertRaises(TypeError, posix.initgroups)
    104         self.assertRaises(TypeError, posix.initgroups, None)
    105         self.assertRaises(TypeError, posix.initgroups, 3, "foo")
    106         self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
    107 
    108         # If a non-privileged user invokes it, it should fail with OSError
    109         # EPERM.
    110         if os.getuid() != 0:
    111             try:
    112                 name = pwd.getpwuid(posix.getuid()).pw_name
    113             except KeyError:
    114                 # the current UID may not have a pwd entry
    115                 raise unittest.SkipTest("need a pwd entry")
    116             try:
    117                 posix.initgroups(name, 13)
    118             except OSError as e:
    119                 self.assertEqual(e.errno, errno.EPERM)
    120             else:
    121                 self.fail("Expected OSError to be raised by initgroups")
    122 
    123     def test_statvfs(self):
    124         if hasattr(posix, 'statvfs'):
    125             self.assertTrue(posix.statvfs(os.curdir))
    126 
    127     def test_fstatvfs(self):
    128         if hasattr(posix, 'fstatvfs'):
    129             fp = open(test_support.TESTFN)
    130             try:
    131                 self.assertTrue(posix.fstatvfs(fp.fileno()))
    132             finally:
    133                 fp.close()
    134 
    135     def test_ftruncate(self):
    136         if hasattr(posix, 'ftruncate'):
    137             fp = open(test_support.TESTFN, 'w+')
    138             try:
    139                 # we need to have some data to truncate
    140                 fp.write('test')
    141                 fp.flush()
    142                 posix.ftruncate(fp.fileno(), 0)
    143             finally:
    144                 fp.close()
    145 
    146     def test_dup(self):
    147         if hasattr(posix, 'dup'):
    148             fp = open(test_support.TESTFN)
    149             try:
    150                 fd = posix.dup(fp.fileno())
    151                 self.assertIsInstance(fd, int)
    152                 os.close(fd)
    153             finally:
    154                 fp.close()
    155 
    156     def test_confstr(self):
    157         if hasattr(posix, 'confstr'):
    158             self.assertRaises(ValueError, posix.confstr, "CS_garbage")
    159             self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
    160 
    161     def test_dup2(self):
    162         if hasattr(posix, 'dup2'):
    163             fp1 = open(test_support.TESTFN)
    164             fp2 = open(test_support.TESTFN)
    165             try:
    166                 posix.dup2(fp1.fileno(), fp2.fileno())
    167             finally:
    168                 fp1.close()
    169                 fp2.close()
    170 
    171     def fdopen_helper(self, *args):
    172         fd = os.open(test_support.TESTFN, os.O_RDONLY)
    173         fp2 = posix.fdopen(fd, *args)
    174         fp2.close()
    175 
    176     def test_fdopen(self):
    177         if hasattr(posix, 'fdopen'):
    178             self.fdopen_helper()
    179             self.fdopen_helper('r')
    180             self.fdopen_helper('r', 100)
    181 
    182     def test_osexlock(self):
    183         if hasattr(posix, "O_EXLOCK"):
    184             fd = os.open(test_support.TESTFN,
    185                          os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
    186             self.assertRaises(OSError, os.open, test_support.TESTFN,
    187                               os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
    188             os.close(fd)
    189 
    190             if hasattr(posix, "O_SHLOCK"):
    191                 fd = os.open(test_support.TESTFN,
    192                              os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
    193                 self.assertRaises(OSError, os.open, test_support.TESTFN,
    194                                   os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
    195                 os.close(fd)
    196 
    197     def test_osshlock(self):
    198         if hasattr(posix, "O_SHLOCK"):
    199             fd1 = os.open(test_support.TESTFN,
    200                          os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
    201             fd2 = os.open(test_support.TESTFN,
    202                           os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
    203             os.close(fd2)
    204             os.close(fd1)
    205 
    206             if hasattr(posix, "O_EXLOCK"):
    207                 fd = os.open(test_support.TESTFN,
    208                              os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
    209                 self.assertRaises(OSError, os.open, test_support.TESTFN,
    210                                   os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
    211                 os.close(fd)
    212 
    213     def test_fstat(self):
    214         if hasattr(posix, 'fstat'):
    215             fp = open(test_support.TESTFN)
    216             try:
    217                 self.assertTrue(posix.fstat(fp.fileno()))
    218             finally:
    219                 fp.close()
    220 
    221     def test_stat(self):
    222         if hasattr(posix, 'stat'):
    223             self.assertTrue(posix.stat(test_support.TESTFN))
    224 
    225     def _test_all_chown_common(self, chown_func, first_param, stat_func):
    226         """Common code for chown, fchown and lchown tests."""
    227         def check_stat(uid, gid):
    228             if stat_func is not None:
    229                 stat = stat_func(first_param)
    230                 self.assertEqual(stat.st_uid, uid)
    231                 self.assertEqual(stat.st_gid, gid)
    232         uid = os.getuid()
    233         gid = os.getgid()
    234         # test a successful chown call
    235         chown_func(first_param, uid, gid)
    236         check_stat(uid, gid)
    237         chown_func(first_param, -1, gid)
    238         check_stat(uid, gid)
    239         chown_func(first_param, uid, -1)
    240         check_stat(uid, gid)
    241 
    242         if uid == 0:
    243             # Try an amusingly large uid/gid to make sure we handle
    244             # large unsigned values.  (chown lets you use any
    245             # uid/gid you like, even if they aren't defined.)
    246             #
    247             # This problem keeps coming up:
    248             #   http://bugs.python.org/issue1747858
    249             #   http://bugs.python.org/issue4591
    250             #   http://bugs.python.org/issue15301
    251             # Hopefully the fix in 4591 fixes it for good!
    252             #
    253             # This part of the test only runs when run as root.
    254             # Only scary people run their tests as root.
    255 
    256             big_value = 2**31
    257             chown_func(first_param, big_value, big_value)
    258             check_stat(big_value, big_value)
    259             chown_func(first_param, -1, -1)
    260             check_stat(big_value, big_value)
    261             chown_func(first_param, uid, gid)
    262             check_stat(uid, gid)
    263         elif platform.system() in ('HP-UX', 'SunOS'):
    264             # HP-UX and Solaris can allow a non-root user to chown() to root
    265             # (issue #5113)
    266             raise unittest.SkipTest("Skipping because of non-standard chown() "
    267                                     "behavior")
    268         else:
    269             # non-root cannot chown to root, raises OSError
    270             self.assertRaises(OSError, chown_func, first_param, 0, 0)
    271             check_stat(uid, gid)
    272             self.assertRaises(OSError, chown_func, first_param, 0, -1)
    273             check_stat(uid, gid)
    274             if 0 not in os.getgroups():
    275                 self.assertRaises(OSError, chown_func, first_param, -1, 0)
    276                 check_stat(uid, gid)
    277         # test illegal types
    278         for t in str, float:
    279             self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
    280             check_stat(uid, gid)
    281             self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
    282             check_stat(uid, gid)
    283 
    284     @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
    285     def test_chown(self):
    286         # raise an OSError if the file does not exist
    287         os.unlink(test_support.TESTFN)
    288         self.assertRaises(OSError, posix.chown, test_support.TESTFN, -1, -1)
    289 
    290         # re-create the file
    291         open(test_support.TESTFN, 'w').close()
    292         self._test_all_chown_common(posix.chown, test_support.TESTFN,
    293                                     getattr(posix, 'stat', None))
    294 
    295     @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
    296     def test_fchown(self):
    297         os.unlink(test_support.TESTFN)
    298 
    299         # re-create the file
    300         test_file = open(test_support.TESTFN, 'w')
    301         try:
    302             fd = test_file.fileno()
    303             self._test_all_chown_common(posix.fchown, fd,
    304                                         getattr(posix, 'fstat', None))
    305         finally:
    306             test_file.close()
    307 
    308     @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
    309     def test_lchown(self):
    310         os.unlink(test_support.TESTFN)
    311         # create a symlink
    312         os.symlink(_DUMMY_SYMLINK, test_support.TESTFN)
    313         self._test_all_chown_common(posix.lchown, test_support.TESTFN,
    314                                     getattr(posix, 'lstat', None))
    315 
    316     def test_chdir(self):
    317         if hasattr(posix, 'chdir'):
    318             posix.chdir(os.curdir)
    319             self.assertRaises(OSError, posix.chdir, test_support.TESTFN)
    320 
    321     def test_lsdir(self):
    322         if hasattr(posix, 'lsdir'):
    323             self.assertIn(test_support.TESTFN, posix.lsdir(os.curdir))
    324 
    325     def test_access(self):
    326         if hasattr(posix, 'access'):
    327             self.assertTrue(posix.access(test_support.TESTFN, os.R_OK))
    328 
    329     def test_umask(self):
    330         if hasattr(posix, 'umask'):
    331             old_mask = posix.umask(0)
    332             self.assertIsInstance(old_mask, int)
    333             posix.umask(old_mask)
    334 
    335     def test_strerror(self):
    336         if hasattr(posix, 'strerror'):
    337             self.assertTrue(posix.strerror(0))
    338 
    339     def test_pipe(self):
    340         if hasattr(posix, 'pipe'):
    341             reader, writer = posix.pipe()
    342             os.close(reader)
    343             os.close(writer)
    344 
    345     def test_tempnam(self):
    346         if hasattr(posix, 'tempnam'):
    347             with warnings.catch_warnings():
    348                 warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
    349                 self.assertTrue(posix.tempnam())
    350                 self.assertTrue(posix.tempnam(os.curdir))
    351                 self.assertTrue(posix.tempnam(os.curdir, 'blah'))
    352 
    353     def test_tmpfile(self):
    354         if hasattr(posix, 'tmpfile'):
    355             with warnings.catch_warnings():
    356                 warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
    357                 fp = posix.tmpfile()
    358                 fp.close()
    359 
    360     def test_utime(self):
    361         if hasattr(posix, 'utime'):
    362             now = time.time()
    363             posix.utime(test_support.TESTFN, None)
    364             self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None))
    365             self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None))
    366             self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now))
    367             posix.utime(test_support.TESTFN, (int(now), int(now)))
    368             posix.utime(test_support.TESTFN, (now, now))
    369 
    370     def _test_chflags_regular_file(self, chflags_func, target_file):
    371         st = os.stat(target_file)
    372         self.assertTrue(hasattr(st, 'st_flags'))
    373 
    374         # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
    375         try:
    376             chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
    377         except OSError as err:
    378             if err.errno != errno.EOPNOTSUPP:
    379                 raise
    380             msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
    381             self.skipTest(msg)
    382 
    383         try:
    384             new_st = os.stat(target_file)
    385             self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
    386             try:
    387                 fd = open(target_file, 'w+')
    388             except IOError as e:
    389                 self.assertEqual(e.errno, errno.EPERM)
    390         finally:
    391             posix.chflags(target_file, st.st_flags)
    392 
    393     @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
    394     def test_chflags(self):
    395         self._test_chflags_regular_file(posix.chflags, test_support.TESTFN)
    396 
    397     @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
    398     def test_lchflags_regular_file(self):
    399         self._test_chflags_regular_file(posix.lchflags, test_support.TESTFN)
    400 
    401     @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
    402     def test_lchflags_symlink(self):
    403         testfn_st = os.stat(test_support.TESTFN)
    404 
    405         self.assertTrue(hasattr(testfn_st, 'st_flags'))
    406 
    407         os.symlink(test_support.TESTFN, _DUMMY_SYMLINK)
    408         self.teardown_files.append(_DUMMY_SYMLINK)
    409         dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
    410 
    411         # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
    412         try:
    413             posix.lchflags(_DUMMY_SYMLINK,
    414                            dummy_symlink_st.st_flags | stat.UF_IMMUTABLE)
    415         except OSError as err:
    416             if err.errno != errno.EOPNOTSUPP:
    417                 raise
    418             msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
    419             self.skipTest(msg)
    420 
    421         try:
    422             new_testfn_st = os.stat(test_support.TESTFN)
    423             new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
    424 
    425             self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
    426             self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
    427                              new_dummy_symlink_st.st_flags)
    428         finally:
    429             posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
    430 
    431     def test_getcwd_long_pathnames(self):
    432         if hasattr(posix, 'getcwd'):
    433             dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
    434             curdir = os.getcwd()
    435             base_path = os.path.abspath(test_support.TESTFN) + '.getcwd'
    436 
    437             try:
    438                 os.mkdir(base_path)
    439                 os.chdir(base_path)
    440             except:
    441 #               Just returning nothing instead of the SkipTest exception,
    442 #               because the test results in Error in that case.
    443 #               Is that ok?
    444 #                raise unittest.SkipTest, "cannot create directory for testing"
    445                 return
    446 
    447             try:
    448                 def _create_and_do_getcwd(dirname, current_path_length = 0):
    449                     try:
    450                         os.mkdir(dirname)
    451                     except:
    452                         raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test"
    453 
    454                     os.chdir(dirname)
    455                     try:
    456                         os.getcwd()
    457                         if current_path_length < 4099:
    458                             _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
    459                     except OSError as e:
    460                         expected_errno = errno.ENAMETOOLONG
    461                         # The following platforms have quirky getcwd()
    462                         # behaviour -- see issue 9185 and 15765 for
    463                         # more information.
    464                         quirky_platform = (
    465                             'sunos' in sys.platform or
    466                             'netbsd' in sys.platform or
    467                             'openbsd' in sys.platform
    468                         )
    469                         if quirky_platform:
    470                             expected_errno = errno.ERANGE
    471                         self.assertEqual(e.errno, expected_errno)
    472                     finally:
    473                         os.chdir('..')
    474                         os.rmdir(dirname)
    475 
    476                 _create_and_do_getcwd(dirname)
    477 
    478             finally:
    479                 os.chdir(curdir)
    480                 shutil.rmtree(base_path)
    481 
    482     @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
    483     def test_getgroups(self):
    484         with os.popen('id -G') as idg:
    485             groups = idg.read().strip()
    486             ret = idg.close()
    487 
    488         if ret != None or not groups:
    489             raise unittest.SkipTest("need working 'id -G'")
    490 
    491         # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
    492         if sys.platform == 'darwin':
    493             import sysconfig
    494             dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0'
    495             if float(dt) < 10.6:
    496                 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
    497 
    498         # 'id -G' and 'os.getgroups()' should return the same
    499         # groups, ignoring order and duplicates.
    500         # #10822 - it is implementation defined whether posix.getgroups()
    501         # includes the effective gid so we include it anyway, since id -G does
    502         self.assertEqual(
    503                 set([int(x) for x in groups.split()]),
    504                 set(posix.getgroups() + [posix.getegid()]))
    505 
    506 class PosixGroupsTester(unittest.TestCase):
    507 
    508     def setUp(self):
    509         if posix.getuid() != 0:
    510             raise unittest.SkipTest("not enough privileges")
    511         if not hasattr(posix, 'getgroups'):
    512             raise unittest.SkipTest("need posix.getgroups")
    513         if sys.platform == 'darwin':
    514             raise unittest.SkipTest("getgroups(2) is broken on OSX")
    515         self.saved_groups = posix.getgroups()
    516 
    517     def tearDown(self):
    518         if hasattr(posix, 'setgroups'):
    519             posix.setgroups(self.saved_groups)
    520         elif hasattr(posix, 'initgroups'):
    521             name = pwd.getpwuid(posix.getuid()).pw_name
    522             posix.initgroups(name, self.saved_groups[0])
    523 
    524     @unittest.skipUnless(hasattr(posix, 'initgroups'),
    525                          "test needs posix.initgroups()")
    526     def test_initgroups(self):
    527         # find missing group
    528 
    529         g = max(self.saved_groups) + 1
    530         name = pwd.getpwuid(posix.getuid()).pw_name
    531         posix.initgroups(name, g)
    532         self.assertIn(g, posix.getgroups())
    533 
    534     @unittest.skipUnless(hasattr(posix, 'setgroups'),
    535                          "test needs posix.setgroups()")
    536     def test_setgroups(self):
    537         for groups in [[0], range(16)]:
    538             posix.setgroups(groups)
    539             self.assertListEqual(groups, posix.getgroups())
    540 
    541 
    542 def test_main():
    543     test_support.run_unittest(PosixTester, PosixGroupsTester)
    544 
    545 if __name__ == '__main__':
    546     test_main()
    547