1 "Test posix functions" 2 3 from test import test_support 4 5 # Skip these tests if there is no posix module. 6 posix = test_support.import_module('posix') 7 8 import errno 9 import sys 10 import time 11 import os 12 import pwd 13 import shutil 14 import sys 15 import unittest 16 import warnings 17 18 19 warnings.filterwarnings('ignore', '.* potential security risk .*', 20 RuntimeWarning) 21 22 class PosixTester(unittest.TestCase): 23 24 def setUp(self): 25 # create empty file 26 fp = open(test_support.TESTFN, 'w+') 27 fp.close() 28 29 def tearDown(self): 30 os.unlink(test_support.TESTFN) 31 32 def testNoArgFunctions(self): 33 # test posix functions which take no arguments and have 34 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 35 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdu", "uname", 36 "times", "getloadavg", "tmpnam", 37 "getegid", "geteuid", "getgid", "getgroups", 38 "getpid", "getpgrp", "getppid", "getuid", 39 ] 40 41 with warnings.catch_warnings(): 42 warnings.filterwarnings("ignore", "", DeprecationWarning) 43 for name in NO_ARG_FUNCTIONS: 44 posix_func = getattr(posix, name, None) 45 if posix_func is not None: 46 posix_func() 47 self.assertRaises(TypeError, posix_func, 1) 48 49 if hasattr(posix, 'getresuid'): 50 def test_getresuid(self): 51 user_ids = posix.getresuid() 52 self.assertEqual(len(user_ids), 3) 53 for val in user_ids: 54 self.assertGreaterEqual(val, 0) 55 56 if hasattr(posix, 'getresgid'): 57 def test_getresgid(self): 58 group_ids = posix.getresgid() 59 self.assertEqual(len(group_ids), 3) 60 for val in group_ids: 61 self.assertGreaterEqual(val, 0) 62 63 if hasattr(posix, 'setresuid'): 64 def test_setresuid(self): 65 current_user_ids = posix.getresuid() 66 self.assertIsNone(posix.setresuid(*current_user_ids)) 67 # -1 means don't change that value. 68 self.assertIsNone(posix.setresuid(-1, -1, -1)) 69 70 def test_setresuid_exception(self): 71 # Don't do this test if someone is silly enough to run us as root. 72 current_user_ids = posix.getresuid() 73 if 0 not in current_user_ids: 74 new_user_ids = (current_user_ids[0]+1, -1, -1) 75 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 76 77 if hasattr(posix, 'setresgid'): 78 def test_setresgid(self): 79 current_group_ids = posix.getresgid() 80 self.assertIsNone(posix.setresgid(*current_group_ids)) 81 # -1 means don't change that value. 82 self.assertIsNone(posix.setresgid(-1, -1, -1)) 83 84 def test_setresgid_exception(self): 85 # Don't do this test if someone is silly enough to run us as root. 86 current_group_ids = posix.getresgid() 87 if 0 not in current_group_ids: 88 new_group_ids = (current_group_ids[0]+1, -1, -1) 89 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 90 91 @unittest.skipUnless(hasattr(posix, 'initgroups'), 92 "test needs os.initgroups()") 93 def test_initgroups(self): 94 # It takes a string and an integer; check that it raises a TypeError 95 # for other argument lists. 96 self.assertRaises(TypeError, posix.initgroups) 97 self.assertRaises(TypeError, posix.initgroups, None) 98 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 99 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 100 101 # If a non-privileged user invokes it, it should fail with OSError 102 # EPERM. 103 if os.getuid() != 0: 104 name = pwd.getpwuid(posix.getuid()).pw_name 105 try: 106 posix.initgroups(name, 13) 107 except OSError as e: 108 self.assertEqual(e.errno, errno.EPERM) 109 else: 110 self.fail("Expected OSError to be raised by initgroups") 111 112 def test_statvfs(self): 113 if hasattr(posix, 'statvfs'): 114 self.assertTrue(posix.statvfs(os.curdir)) 115 116 def test_fstatvfs(self): 117 if hasattr(posix, 'fstatvfs'): 118 fp = open(test_support.TESTFN) 119 try: 120 self.assertTrue(posix.fstatvfs(fp.fileno())) 121 finally: 122 fp.close() 123 124 def test_ftruncate(self): 125 if hasattr(posix, 'ftruncate'): 126 fp = open(test_support.TESTFN, 'w+') 127 try: 128 # we need to have some data to truncate 129 fp.write('test') 130 fp.flush() 131 posix.ftruncate(fp.fileno(), 0) 132 finally: 133 fp.close() 134 135 def test_dup(self): 136 if hasattr(posix, 'dup'): 137 fp = open(test_support.TESTFN) 138 try: 139 fd = posix.dup(fp.fileno()) 140 self.assertIsInstance(fd, int) 141 os.close(fd) 142 finally: 143 fp.close() 144 145 def test_confstr(self): 146 if hasattr(posix, 'confstr'): 147 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 148 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 149 150 def test_dup2(self): 151 if hasattr(posix, 'dup2'): 152 fp1 = open(test_support.TESTFN) 153 fp2 = open(test_support.TESTFN) 154 try: 155 posix.dup2(fp1.fileno(), fp2.fileno()) 156 finally: 157 fp1.close() 158 fp2.close() 159 160 def fdopen_helper(self, *args): 161 fd = os.open(test_support.TESTFN, os.O_RDONLY) 162 fp2 = posix.fdopen(fd, *args) 163 fp2.close() 164 165 def test_fdopen(self): 166 if hasattr(posix, 'fdopen'): 167 self.fdopen_helper() 168 self.fdopen_helper('r') 169 self.fdopen_helper('r', 100) 170 171 def test_osexlock(self): 172 if hasattr(posix, "O_EXLOCK"): 173 fd = os.open(test_support.TESTFN, 174 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 175 self.assertRaises(OSError, os.open, test_support.TESTFN, 176 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 177 os.close(fd) 178 179 if hasattr(posix, "O_SHLOCK"): 180 fd = os.open(test_support.TESTFN, 181 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 182 self.assertRaises(OSError, os.open, test_support.TESTFN, 183 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 184 os.close(fd) 185 186 def test_osshlock(self): 187 if hasattr(posix, "O_SHLOCK"): 188 fd1 = os.open(test_support.TESTFN, 189 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 190 fd2 = os.open(test_support.TESTFN, 191 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 192 os.close(fd2) 193 os.close(fd1) 194 195 if hasattr(posix, "O_EXLOCK"): 196 fd = os.open(test_support.TESTFN, 197 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 198 self.assertRaises(OSError, os.open, test_support.TESTFN, 199 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 200 os.close(fd) 201 202 def test_fstat(self): 203 if hasattr(posix, 'fstat'): 204 fp = open(test_support.TESTFN) 205 try: 206 self.assertTrue(posix.fstat(fp.fileno())) 207 finally: 208 fp.close() 209 210 def test_stat(self): 211 if hasattr(posix, 'stat'): 212 self.assertTrue(posix.stat(test_support.TESTFN)) 213 214 def _test_all_chown_common(self, chown_func, first_param): 215 """Common code for chown, fchown and lchown tests.""" 216 if os.getuid() == 0: 217 try: 218 # Many linux distros have a nfsnobody user as MAX_UID-2 219 # that makes a good test case for signedness issues. 220 # http://bugs.python.org/issue1747858 221 # This part of the test only runs when run as root. 222 # Only scary people run their tests as root. 223 ent = pwd.getpwnam('nfsnobody') 224 chown_func(first_param, ent.pw_uid, ent.pw_gid) 225 except KeyError: 226 pass 227 else: 228 # non-root cannot chown to root, raises OSError 229 self.assertRaises(OSError, chown_func, 230 first_param, 0, 0) 231 232 # test a successful chown call 233 chown_func(first_param, os.getuid(), os.getgid()) 234 235 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") 236 def test_chown(self): 237 # raise an OSError if the file does not exist 238 os.unlink(test_support.TESTFN) 239 self.assertRaises(OSError, posix.chown, test_support.TESTFN, -1, -1) 240 241 # re-create the file 242 open(test_support.TESTFN, 'w').close() 243 self._test_all_chown_common(posix.chown, test_support.TESTFN) 244 245 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 246 def test_fchown(self): 247 os.unlink(test_support.TESTFN) 248 249 # re-create the file 250 test_file = open(test_support.TESTFN, 'w') 251 try: 252 fd = test_file.fileno() 253 self._test_all_chown_common(posix.fchown, fd) 254 finally: 255 test_file.close() 256 257 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 258 def test_lchown(self): 259 os.unlink(test_support.TESTFN) 260 # create a symlink 261 os.symlink('/tmp/dummy-symlink-target', test_support.TESTFN) 262 self._test_all_chown_common(posix.lchown, test_support.TESTFN) 263 264 def test_chdir(self): 265 if hasattr(posix, 'chdir'): 266 posix.chdir(os.curdir) 267 self.assertRaises(OSError, posix.chdir, test_support.TESTFN) 268 269 def test_lsdir(self): 270 if hasattr(posix, 'lsdir'): 271 self.assertIn(test_support.TESTFN, posix.lsdir(os.curdir)) 272 273 def test_access(self): 274 if hasattr(posix, 'access'): 275 self.assertTrue(posix.access(test_support.TESTFN, os.R_OK)) 276 277 def test_umask(self): 278 if hasattr(posix, 'umask'): 279 old_mask = posix.umask(0) 280 self.assertIsInstance(old_mask, int) 281 posix.umask(old_mask) 282 283 def test_strerror(self): 284 if hasattr(posix, 'strerror'): 285 self.assertTrue(posix.strerror(0)) 286 287 def test_pipe(self): 288 if hasattr(posix, 'pipe'): 289 reader, writer = posix.pipe() 290 os.close(reader) 291 os.close(writer) 292 293 def test_tempnam(self): 294 if hasattr(posix, 'tempnam'): 295 with warnings.catch_warnings(): 296 warnings.filterwarnings("ignore", "tempnam", DeprecationWarning) 297 self.assertTrue(posix.tempnam()) 298 self.assertTrue(posix.tempnam(os.curdir)) 299 self.assertTrue(posix.tempnam(os.curdir, 'blah')) 300 301 def test_tmpfile(self): 302 if hasattr(posix, 'tmpfile'): 303 with warnings.catch_warnings(): 304 warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning) 305 fp = posix.tmpfile() 306 fp.close() 307 308 def test_utime(self): 309 if hasattr(posix, 'utime'): 310 now = time.time() 311 posix.utime(test_support.TESTFN, None) 312 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None)) 313 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None)) 314 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now)) 315 posix.utime(test_support.TESTFN, (int(now), int(now))) 316 posix.utime(test_support.TESTFN, (now, now)) 317 318 def test_chflags(self): 319 if hasattr(posix, 'chflags'): 320 st = os.stat(test_support.TESTFN) 321 if hasattr(st, 'st_flags'): 322 posix.chflags(test_support.TESTFN, st.st_flags) 323 324 def test_lchflags(self): 325 if hasattr(posix, 'lchflags'): 326 st = os.stat(test_support.TESTFN) 327 if hasattr(st, 'st_flags'): 328 posix.lchflags(test_support.TESTFN, st.st_flags) 329 330 def test_getcwd_long_pathnames(self): 331 if hasattr(posix, 'getcwd'): 332 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 333 curdir = os.getcwd() 334 base_path = os.path.abspath(test_support.TESTFN) + '.getcwd' 335 336 try: 337 os.mkdir(base_path) 338 os.chdir(base_path) 339 except: 340 # Just returning nothing instead of the SkipTest exception, 341 # because the test results in Error in that case. 342 # Is that ok? 343 # raise unittest.SkipTest, "cannot create directory for testing" 344 return 345 346 try: 347 def _create_and_do_getcwd(dirname, current_path_length = 0): 348 try: 349 os.mkdir(dirname) 350 except: 351 raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test" 352 353 os.chdir(dirname) 354 try: 355 os.getcwd() 356 if current_path_length < 4099: 357 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 358 except OSError as e: 359 expected_errno = errno.ENAMETOOLONG 360 if 'sunos' in sys.platform or 'openbsd' in sys.platform: 361 expected_errno = errno.ERANGE # Issue 9185 362 self.assertEqual(e.errno, expected_errno) 363 finally: 364 os.chdir('..') 365 os.rmdir(dirname) 366 367 _create_and_do_getcwd(dirname) 368 369 finally: 370 os.chdir(curdir) 371 shutil.rmtree(base_path) 372 373 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 374 def test_getgroups(self): 375 with os.popen('id -G') as idg: 376 groups = idg.read().strip() 377 378 if not groups: 379 raise unittest.SkipTest("need working 'id -G'") 380 381 # 'id -G' and 'os.getgroups()' should return the same 382 # groups, ignoring order and duplicates. 383 # #10822 - it is implementation defined whether posix.getgroups() 384 # includes the effective gid so we include it anyway, since id -G does 385 self.assertEqual( 386 set([int(x) for x in groups.split()]), 387 set(posix.getgroups() + [posix.getegid()])) 388 389 class PosixGroupsTester(unittest.TestCase): 390 391 def setUp(self): 392 if posix.getuid() != 0: 393 raise unittest.SkipTest("not enough privileges") 394 if not hasattr(posix, 'getgroups'): 395 raise unittest.SkipTest("need posix.getgroups") 396 if sys.platform == 'darwin': 397 raise unittest.SkipTest("getgroups(2) is broken on OSX") 398 self.saved_groups = posix.getgroups() 399 400 def tearDown(self): 401 if hasattr(posix, 'setgroups'): 402 posix.setgroups(self.saved_groups) 403 elif hasattr(posix, 'initgroups'): 404 name = pwd.getpwuid(posix.getuid()).pw_name 405 posix.initgroups(name, self.saved_groups[0]) 406 407 @unittest.skipUnless(hasattr(posix, 'initgroups'), 408 "test needs posix.initgroups()") 409 def test_initgroups(self): 410 # find missing group 411 412 g = max(self.saved_groups) + 1 413 name = pwd.getpwuid(posix.getuid()).pw_name 414 posix.initgroups(name, g) 415 self.assertIn(g, posix.getgroups()) 416 417 @unittest.skipUnless(hasattr(posix, 'setgroups'), 418 "test needs posix.setgroups()") 419 def test_setgroups(self): 420 for groups in [[0], range(16)]: 421 posix.setgroups(groups) 422 self.assertListEqual(groups, posix.getgroups()) 423 424 425 def test_main(): 426 test_support.run_unittest(PosixTester, PosixGroupsTester) 427 428 if __name__ == '__main__': 429 test_main() 430