1 "Test posix functions" 2 3 from test import test_support 4 5 # Skip these tests if there is no posix module. 6 posix = test_support.import_module('posix') 7 8 import errno 9 import sys 10 import time 11 import os 12 import platform 13 import pwd 14 import shutil 15 import stat 16 import sys 17 import tempfile 18 import unittest 19 import warnings 20 21 _DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 22 test_support.TESTFN + '-dummy-symlink') 23 24 warnings.filterwarnings('ignore', '.* potential security risk .*', 25 RuntimeWarning) 26 27 class PosixTester(unittest.TestCase): 28 29 def setUp(self): 30 # create empty file 31 fp = open(test_support.TESTFN, 'w+') 32 fp.close() 33 self.teardown_files = [ test_support.TESTFN ] 34 35 def tearDown(self): 36 for teardown_file in self.teardown_files: 37 os.unlink(teardown_file) 38 39 def testNoArgFunctions(self): 40 # test posix functions which take no arguments and have 41 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 42 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdu", "uname", 43 "times", "getloadavg", "tmpnam", 44 "getegid", "geteuid", "getgid", "getgroups", 45 "getpid", "getpgrp", "getppid", "getuid", 46 ] 47 48 with warnings.catch_warnings(): 49 warnings.filterwarnings("ignore", "", DeprecationWarning) 50 for name in NO_ARG_FUNCTIONS: 51 posix_func = getattr(posix, name, None) 52 if posix_func is not None: 53 posix_func() 54 self.assertRaises(TypeError, posix_func, 1) 55 56 if hasattr(posix, 'getresuid'): 57 def test_getresuid(self): 58 user_ids = posix.getresuid() 59 self.assertEqual(len(user_ids), 3) 60 for val in user_ids: 61 self.assertGreaterEqual(val, 0) 62 63 if hasattr(posix, 'getresgid'): 64 def test_getresgid(self): 65 group_ids = posix.getresgid() 66 self.assertEqual(len(group_ids), 3) 67 for val in group_ids: 68 self.assertGreaterEqual(val, 0) 69 70 if hasattr(posix, 'setresuid'): 71 def test_setresuid(self): 72 current_user_ids = posix.getresuid() 73 self.assertIsNone(posix.setresuid(*current_user_ids)) 74 # -1 means don't change that value. 75 self.assertIsNone(posix.setresuid(-1, -1, -1)) 76 77 def test_setresuid_exception(self): 78 # Don't do this test if someone is silly enough to run us as root. 79 current_user_ids = posix.getresuid() 80 if 0 not in current_user_ids: 81 new_user_ids = (current_user_ids[0]+1, -1, -1) 82 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 83 84 if hasattr(posix, 'setresgid'): 85 def test_setresgid(self): 86 current_group_ids = posix.getresgid() 87 self.assertIsNone(posix.setresgid(*current_group_ids)) 88 # -1 means don't change that value. 89 self.assertIsNone(posix.setresgid(-1, -1, -1)) 90 91 def test_setresgid_exception(self): 92 # Don't do this test if someone is silly enough to run us as root. 93 current_group_ids = posix.getresgid() 94 if 0 not in current_group_ids: 95 new_group_ids = (current_group_ids[0]+1, -1, -1) 96 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 97 98 @unittest.skipUnless(hasattr(posix, 'initgroups'), 99 "test needs os.initgroups()") 100 def test_initgroups(self): 101 # It takes a string and an integer; check that it raises a TypeError 102 # for other argument lists. 103 self.assertRaises(TypeError, posix.initgroups) 104 self.assertRaises(TypeError, posix.initgroups, None) 105 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 106 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 107 108 # If a non-privileged user invokes it, it should fail with OSError 109 # EPERM. 110 if os.getuid() != 0: 111 try: 112 name = pwd.getpwuid(posix.getuid()).pw_name 113 except KeyError: 114 # the current UID may not have a pwd entry 115 raise unittest.SkipTest("need a pwd entry") 116 try: 117 posix.initgroups(name, 13) 118 except OSError as e: 119 self.assertEqual(e.errno, errno.EPERM) 120 else: 121 self.fail("Expected OSError to be raised by initgroups") 122 123 def test_statvfs(self): 124 if hasattr(posix, 'statvfs'): 125 self.assertTrue(posix.statvfs(os.curdir)) 126 127 def test_fstatvfs(self): 128 if hasattr(posix, 'fstatvfs'): 129 fp = open(test_support.TESTFN) 130 try: 131 self.assertTrue(posix.fstatvfs(fp.fileno())) 132 finally: 133 fp.close() 134 135 def test_ftruncate(self): 136 if hasattr(posix, 'ftruncate'): 137 fp = open(test_support.TESTFN, 'w+') 138 try: 139 # we need to have some data to truncate 140 fp.write('test') 141 fp.flush() 142 posix.ftruncate(fp.fileno(), 0) 143 finally: 144 fp.close() 145 146 def test_dup(self): 147 if hasattr(posix, 'dup'): 148 fp = open(test_support.TESTFN) 149 try: 150 fd = posix.dup(fp.fileno()) 151 self.assertIsInstance(fd, int) 152 os.close(fd) 153 finally: 154 fp.close() 155 156 def test_confstr(self): 157 if hasattr(posix, 'confstr'): 158 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 159 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 160 161 def test_dup2(self): 162 if hasattr(posix, 'dup2'): 163 fp1 = open(test_support.TESTFN) 164 fp2 = open(test_support.TESTFN) 165 try: 166 posix.dup2(fp1.fileno(), fp2.fileno()) 167 finally: 168 fp1.close() 169 fp2.close() 170 171 def fdopen_helper(self, *args): 172 fd = os.open(test_support.TESTFN, os.O_RDONLY) 173 fp2 = posix.fdopen(fd, *args) 174 fp2.close() 175 176 def test_fdopen(self): 177 if hasattr(posix, 'fdopen'): 178 self.fdopen_helper() 179 self.fdopen_helper('r') 180 self.fdopen_helper('r', 100) 181 182 def test_osexlock(self): 183 if hasattr(posix, "O_EXLOCK"): 184 fd = os.open(test_support.TESTFN, 185 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 186 self.assertRaises(OSError, os.open, test_support.TESTFN, 187 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 188 os.close(fd) 189 190 if hasattr(posix, "O_SHLOCK"): 191 fd = os.open(test_support.TESTFN, 192 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 193 self.assertRaises(OSError, os.open, test_support.TESTFN, 194 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 195 os.close(fd) 196 197 def test_osshlock(self): 198 if hasattr(posix, "O_SHLOCK"): 199 fd1 = os.open(test_support.TESTFN, 200 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 201 fd2 = os.open(test_support.TESTFN, 202 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 203 os.close(fd2) 204 os.close(fd1) 205 206 if hasattr(posix, "O_EXLOCK"): 207 fd = os.open(test_support.TESTFN, 208 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 209 self.assertRaises(OSError, os.open, test_support.TESTFN, 210 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 211 os.close(fd) 212 213 def test_fstat(self): 214 if hasattr(posix, 'fstat'): 215 fp = open(test_support.TESTFN) 216 try: 217 self.assertTrue(posix.fstat(fp.fileno())) 218 finally: 219 fp.close() 220 221 def test_stat(self): 222 if hasattr(posix, 'stat'): 223 self.assertTrue(posix.stat(test_support.TESTFN)) 224 225 def _test_all_chown_common(self, chown_func, first_param, stat_func): 226 """Common code for chown, fchown and lchown tests.""" 227 def check_stat(uid, gid): 228 if stat_func is not None: 229 stat = stat_func(first_param) 230 self.assertEqual(stat.st_uid, uid) 231 self.assertEqual(stat.st_gid, gid) 232 uid = os.getuid() 233 gid = os.getgid() 234 # test a successful chown call 235 chown_func(first_param, uid, gid) 236 check_stat(uid, gid) 237 chown_func(first_param, -1, gid) 238 check_stat(uid, gid) 239 chown_func(first_param, uid, -1) 240 check_stat(uid, gid) 241 242 if uid == 0: 243 # Try an amusingly large uid/gid to make sure we handle 244 # large unsigned values. (chown lets you use any 245 # uid/gid you like, even if they aren't defined.) 246 # 247 # This problem keeps coming up: 248 # http://bugs.python.org/issue1747858 249 # http://bugs.python.org/issue4591 250 # http://bugs.python.org/issue15301 251 # Hopefully the fix in 4591 fixes it for good! 252 # 253 # This part of the test only runs when run as root. 254 # Only scary people run their tests as root. 255 256 big_value = 2**31 257 chown_func(first_param, big_value, big_value) 258 check_stat(big_value, big_value) 259 chown_func(first_param, -1, -1) 260 check_stat(big_value, big_value) 261 chown_func(first_param, uid, gid) 262 check_stat(uid, gid) 263 elif platform.system() in ('HP-UX', 'SunOS'): 264 # HP-UX and Solaris can allow a non-root user to chown() to root 265 # (issue #5113) 266 raise unittest.SkipTest("Skipping because of non-standard chown() " 267 "behavior") 268 else: 269 # non-root cannot chown to root, raises OSError 270 self.assertRaises(OSError, chown_func, first_param, 0, 0) 271 check_stat(uid, gid) 272 self.assertRaises(OSError, chown_func, first_param, 0, -1) 273 check_stat(uid, gid) 274 if 0 not in os.getgroups(): 275 self.assertRaises(OSError, chown_func, first_param, -1, 0) 276 check_stat(uid, gid) 277 # test illegal types 278 for t in str, float: 279 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 280 check_stat(uid, gid) 281 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 282 check_stat(uid, gid) 283 284 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") 285 def test_chown(self): 286 # raise an OSError if the file does not exist 287 os.unlink(test_support.TESTFN) 288 self.assertRaises(OSError, posix.chown, test_support.TESTFN, -1, -1) 289 290 # re-create the file 291 open(test_support.TESTFN, 'w').close() 292 self._test_all_chown_common(posix.chown, test_support.TESTFN, 293 getattr(posix, 'stat', None)) 294 295 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 296 def test_fchown(self): 297 os.unlink(test_support.TESTFN) 298 299 # re-create the file 300 test_file = open(test_support.TESTFN, 'w') 301 try: 302 fd = test_file.fileno() 303 self._test_all_chown_common(posix.fchown, fd, 304 getattr(posix, 'fstat', None)) 305 finally: 306 test_file.close() 307 308 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 309 def test_lchown(self): 310 os.unlink(test_support.TESTFN) 311 # create a symlink 312 os.symlink(_DUMMY_SYMLINK, test_support.TESTFN) 313 self._test_all_chown_common(posix.lchown, test_support.TESTFN, 314 getattr(posix, 'lstat', None)) 315 316 def test_chdir(self): 317 if hasattr(posix, 'chdir'): 318 posix.chdir(os.curdir) 319 self.assertRaises(OSError, posix.chdir, test_support.TESTFN) 320 321 def test_lsdir(self): 322 if hasattr(posix, 'lsdir'): 323 self.assertIn(test_support.TESTFN, posix.lsdir(os.curdir)) 324 325 def test_access(self): 326 if hasattr(posix, 'access'): 327 self.assertTrue(posix.access(test_support.TESTFN, os.R_OK)) 328 329 def test_umask(self): 330 if hasattr(posix, 'umask'): 331 old_mask = posix.umask(0) 332 self.assertIsInstance(old_mask, int) 333 posix.umask(old_mask) 334 335 def test_strerror(self): 336 if hasattr(posix, 'strerror'): 337 self.assertTrue(posix.strerror(0)) 338 339 def test_pipe(self): 340 if hasattr(posix, 'pipe'): 341 reader, writer = posix.pipe() 342 os.close(reader) 343 os.close(writer) 344 345 def test_tempnam(self): 346 if hasattr(posix, 'tempnam'): 347 with warnings.catch_warnings(): 348 warnings.filterwarnings("ignore", "tempnam", DeprecationWarning) 349 self.assertTrue(posix.tempnam()) 350 self.assertTrue(posix.tempnam(os.curdir)) 351 self.assertTrue(posix.tempnam(os.curdir, 'blah')) 352 353 def test_tmpfile(self): 354 if hasattr(posix, 'tmpfile'): 355 with warnings.catch_warnings(): 356 warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning) 357 fp = posix.tmpfile() 358 fp.close() 359 360 def test_utime(self): 361 if hasattr(posix, 'utime'): 362 now = time.time() 363 posix.utime(test_support.TESTFN, None) 364 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None)) 365 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None)) 366 self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now)) 367 posix.utime(test_support.TESTFN, (int(now), int(now))) 368 posix.utime(test_support.TESTFN, (now, now)) 369 370 def _test_chflags_regular_file(self, chflags_func, target_file): 371 st = os.stat(target_file) 372 self.assertTrue(hasattr(st, 'st_flags')) 373 374 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 375 try: 376 chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE) 377 except OSError as err: 378 if err.errno != errno.EOPNOTSUPP: 379 raise 380 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 381 self.skipTest(msg) 382 383 try: 384 new_st = os.stat(target_file) 385 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 386 try: 387 fd = open(target_file, 'w+') 388 except IOError as e: 389 self.assertEqual(e.errno, errno.EPERM) 390 finally: 391 posix.chflags(target_file, st.st_flags) 392 393 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 394 def test_chflags(self): 395 self._test_chflags_regular_file(posix.chflags, test_support.TESTFN) 396 397 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 398 def test_lchflags_regular_file(self): 399 self._test_chflags_regular_file(posix.lchflags, test_support.TESTFN) 400 401 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 402 def test_lchflags_symlink(self): 403 testfn_st = os.stat(test_support.TESTFN) 404 405 self.assertTrue(hasattr(testfn_st, 'st_flags')) 406 407 os.symlink(test_support.TESTFN, _DUMMY_SYMLINK) 408 self.teardown_files.append(_DUMMY_SYMLINK) 409 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 410 411 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 412 try: 413 posix.lchflags(_DUMMY_SYMLINK, 414 dummy_symlink_st.st_flags | stat.UF_IMMUTABLE) 415 except OSError as err: 416 if err.errno != errno.EOPNOTSUPP: 417 raise 418 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 419 self.skipTest(msg) 420 421 try: 422 new_testfn_st = os.stat(test_support.TESTFN) 423 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 424 425 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 426 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 427 new_dummy_symlink_st.st_flags) 428 finally: 429 posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 430 431 def test_getcwd_long_pathnames(self): 432 if hasattr(posix, 'getcwd'): 433 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 434 curdir = os.getcwd() 435 base_path = os.path.abspath(test_support.TESTFN) + '.getcwd' 436 437 try: 438 os.mkdir(base_path) 439 os.chdir(base_path) 440 except: 441 # Just returning nothing instead of the SkipTest exception, 442 # because the test results in Error in that case. 443 # Is that ok? 444 # raise unittest.SkipTest, "cannot create directory for testing" 445 return 446 447 try: 448 def _create_and_do_getcwd(dirname, current_path_length = 0): 449 try: 450 os.mkdir(dirname) 451 except: 452 raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test" 453 454 os.chdir(dirname) 455 try: 456 os.getcwd() 457 if current_path_length < 4099: 458 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 459 except OSError as e: 460 expected_errno = errno.ENAMETOOLONG 461 # The following platforms have quirky getcwd() 462 # behaviour -- see issue 9185 and 15765 for 463 # more information. 464 quirky_platform = ( 465 'sunos' in sys.platform or 466 'netbsd' in sys.platform or 467 'openbsd' in sys.platform 468 ) 469 if quirky_platform: 470 expected_errno = errno.ERANGE 471 self.assertEqual(e.errno, expected_errno) 472 finally: 473 os.chdir('..') 474 os.rmdir(dirname) 475 476 _create_and_do_getcwd(dirname) 477 478 finally: 479 os.chdir(curdir) 480 shutil.rmtree(base_path) 481 482 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 483 def test_getgroups(self): 484 with os.popen('id -G') as idg: 485 groups = idg.read().strip() 486 ret = idg.close() 487 488 if ret != None or not groups: 489 raise unittest.SkipTest("need working 'id -G'") 490 491 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 492 if sys.platform == 'darwin': 493 import sysconfig 494 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0' 495 if float(dt) < 10.6: 496 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 497 498 # 'id -G' and 'os.getgroups()' should return the same 499 # groups, ignoring order and duplicates. 500 # #10822 - it is implementation defined whether posix.getgroups() 501 # includes the effective gid so we include it anyway, since id -G does 502 self.assertEqual( 503 set([int(x) for x in groups.split()]), 504 set(posix.getgroups() + [posix.getegid()])) 505 506 class PosixGroupsTester(unittest.TestCase): 507 508 def setUp(self): 509 if posix.getuid() != 0: 510 raise unittest.SkipTest("not enough privileges") 511 if not hasattr(posix, 'getgroups'): 512 raise unittest.SkipTest("need posix.getgroups") 513 if sys.platform == 'darwin': 514 raise unittest.SkipTest("getgroups(2) is broken on OSX") 515 self.saved_groups = posix.getgroups() 516 517 def tearDown(self): 518 if hasattr(posix, 'setgroups'): 519 posix.setgroups(self.saved_groups) 520 elif hasattr(posix, 'initgroups'): 521 name = pwd.getpwuid(posix.getuid()).pw_name 522 posix.initgroups(name, self.saved_groups[0]) 523 524 @unittest.skipUnless(hasattr(posix, 'initgroups'), 525 "test needs posix.initgroups()") 526 def test_initgroups(self): 527 # find missing group 528 529 g = max(self.saved_groups) + 1 530 name = pwd.getpwuid(posix.getuid()).pw_name 531 posix.initgroups(name, g) 532 self.assertIn(g, posix.getgroups()) 533 534 @unittest.skipUnless(hasattr(posix, 'setgroups'), 535 "test needs posix.setgroups()") 536 def test_setgroups(self): 537 for groups in [[0], range(16)]: 538 posix.setgroups(groups) 539 self.assertListEqual(groups, posix.getgroups()) 540 541 542 def test_main(): 543 test_support.run_unittest(PosixTester, PosixGroupsTester) 544 545 if __name__ == '__main__': 546 test_main() 547