1 # As a test suite for the os module, this is woefully inadequate, but this 2 # does add tests for a few functions which have been determined to be more 3 # portable than they had been thought to be. 4 5 import os 6 import errno 7 import unittest 8 import warnings 9 import sys 10 import signal 11 import subprocess 12 import sysconfig 13 import textwrap 14 import time 15 try: 16 import resource 17 except ImportError: 18 resource = None 19 20 from test import test_support 21 from test.script_helper import assert_python_ok 22 import mmap 23 import uuid 24 25 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__) 26 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__) 27 28 # Tests creating TESTFN 29 class FileTests(unittest.TestCase): 30 def setUp(self): 31 if os.path.exists(test_support.TESTFN): 32 os.unlink(test_support.TESTFN) 33 tearDown = setUp 34 35 def test_access(self): 36 f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) 37 os.close(f) 38 self.assertTrue(os.access(test_support.TESTFN, os.W_OK)) 39 40 def test_closerange(self): 41 first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) 42 # We must allocate two consecutive file descriptors, otherwise 43 # it will mess up other file descriptors (perhaps even the three 44 # standard ones). 45 second = os.dup(first) 46 try: 47 retries = 0 48 while second != first + 1: 49 os.close(first) 50 retries += 1 51 if retries > 10: 52 # XXX test skipped 53 self.skipTest("couldn't allocate two consecutive fds") 54 first, second = second, os.dup(second) 55 finally: 56 os.close(second) 57 # close a fd that is open, and one that isn't 58 os.closerange(first, first + 2) 59 self.assertRaises(OSError, os.write, first, "a") 60 61 @test_support.cpython_only 62 def test_rename(self): 63 path = unicode(test_support.TESTFN) 64 old = sys.getrefcount(path) 65 self.assertRaises(TypeError, os.rename, path, 0) 66 new = sys.getrefcount(path) 67 self.assertEqual(old, new) 68 69 70 class TemporaryFileTests(unittest.TestCase): 71 def setUp(self): 72 self.files = [] 73 os.mkdir(test_support.TESTFN) 74 75 def tearDown(self): 76 for name in self.files: 77 os.unlink(name) 78 os.rmdir(test_support.TESTFN) 79 80 def check_tempfile(self, name): 81 # make sure it doesn't already exist: 82 self.assertFalse(os.path.exists(name), 83 "file already exists for temporary file") 84 # make sure we can create the file 85 open(name, "w") 86 self.files.append(name) 87 88 @unittest.skipUnless(hasattr(os, 'tempnam'), 'test needs os.tempnam()') 89 def test_tempnam(self): 90 with warnings.catch_warnings(): 91 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, 92 r"test_os$") 93 warnings.filterwarnings("ignore", "tempnam", DeprecationWarning) 94 self.check_tempfile(os.tempnam()) 95 96 name = os.tempnam(test_support.TESTFN) 97 self.check_tempfile(name) 98 99 name = os.tempnam(test_support.TESTFN, "pfx") 100 self.assertTrue(os.path.basename(name)[:3] == "pfx") 101 self.check_tempfile(name) 102 103 @unittest.skipUnless(hasattr(os, 'tmpfile'), 'test needs os.tmpfile()') 104 def test_tmpfile(self): 105 # As with test_tmpnam() below, the Windows implementation of tmpfile() 106 # attempts to create a file in the root directory of the current drive. 107 # On Vista and Server 2008, this test will always fail for normal users 108 # as writing to the root directory requires elevated privileges. With 109 # XP and below, the semantics of tmpfile() are the same, but the user 110 # running the test is more likely to have administrative privileges on 111 # their account already. If that's the case, then os.tmpfile() should 112 # work. In order to make this test as useful as possible, rather than 113 # trying to detect Windows versions or whether or not the user has the 114 # right permissions, just try and create a file in the root directory 115 # and see if it raises a 'Permission denied' OSError. If it does, then 116 # test that a subsequent call to os.tmpfile() raises the same error. If 117 # it doesn't, assume we're on XP or below and the user running the test 118 # has administrative privileges, and proceed with the test as normal. 119 with warnings.catch_warnings(): 120 warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning) 121 122 if sys.platform == 'win32': 123 name = '\\python_test_os_test_tmpfile.txt' 124 if os.path.exists(name): 125 os.remove(name) 126 try: 127 fp = open(name, 'w') 128 except IOError, first: 129 # open() failed, assert tmpfile() fails in the same way. 130 # Although open() raises an IOError and os.tmpfile() raises an 131 # OSError(), 'args' will be (13, 'Permission denied') in both 132 # cases. 133 try: 134 fp = os.tmpfile() 135 except OSError, second: 136 self.assertEqual(first.args, second.args) 137 else: 138 self.fail("expected os.tmpfile() to raise OSError") 139 return 140 else: 141 # open() worked, therefore, tmpfile() should work. Close our 142 # dummy file and proceed with the test as normal. 143 fp.close() 144 os.remove(name) 145 146 fp = os.tmpfile() 147 fp.write("foobar") 148 fp.seek(0,0) 149 s = fp.read() 150 fp.close() 151 self.assertTrue(s == "foobar") 152 153 @unittest.skipUnless(hasattr(os, 'tmpnam'), 'test needs os.tmpnam()') 154 def test_tmpnam(self): 155 with warnings.catch_warnings(): 156 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, 157 r"test_os$") 158 warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning) 159 160 name = os.tmpnam() 161 if sys.platform in ("win32",): 162 # The Windows tmpnam() seems useless. From the MS docs: 163 # 164 # The character string that tmpnam creates consists of 165 # the path prefix, defined by the entry P_tmpdir in the 166 # file STDIO.H, followed by a sequence consisting of the 167 # digit characters '0' through '9'; the numerical value 168 # of this string is in the range 1 - 65,535. Changing the 169 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not 170 # change the operation of tmpnam. 171 # 172 # The really bizarre part is that, at least under MSVC6, 173 # P_tmpdir is "\\". That is, the path returned refers to 174 # the root of the current drive. That's a terrible place to 175 # put temp files, and, depending on privileges, the user 176 # may not even be able to open a file in the root directory. 177 self.assertFalse(os.path.exists(name), 178 "file already exists for temporary file") 179 else: 180 self.check_tempfile(name) 181 182 # Test attributes on return values from os.*stat* family. 183 class StatAttributeTests(unittest.TestCase): 184 def setUp(self): 185 os.mkdir(test_support.TESTFN) 186 self.fname = os.path.join(test_support.TESTFN, "f1") 187 f = open(self.fname, 'wb') 188 f.write("ABC") 189 f.close() 190 191 def tearDown(self): 192 os.unlink(self.fname) 193 os.rmdir(test_support.TESTFN) 194 195 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') 196 def test_stat_attributes(self): 197 import stat 198 result = os.stat(self.fname) 199 200 # Make sure direct access works 201 self.assertEqual(result[stat.ST_SIZE], 3) 202 self.assertEqual(result.st_size, 3) 203 204 # Make sure all the attributes are there 205 members = dir(result) 206 for name in dir(stat): 207 if name[:3] == 'ST_': 208 attr = name.lower() 209 if name.endswith("TIME"): 210 def trunc(x): return int(x) 211 else: 212 def trunc(x): return x 213 self.assertEqual(trunc(getattr(result, attr)), 214 result[getattr(stat, name)]) 215 self.assertIn(attr, members) 216 217 try: 218 result[200] 219 self.fail("No exception raised") 220 except IndexError: 221 pass 222 223 # Make sure that assignment fails 224 try: 225 result.st_mode = 1 226 self.fail("No exception raised") 227 except (AttributeError, TypeError): 228 pass 229 230 try: 231 result.st_rdev = 1 232 self.fail("No exception raised") 233 except (AttributeError, TypeError): 234 pass 235 236 try: 237 result.parrot = 1 238 self.fail("No exception raised") 239 except AttributeError: 240 pass 241 242 # Use the stat_result constructor with a too-short tuple. 243 try: 244 result2 = os.stat_result((10,)) 245 self.fail("No exception raised") 246 except TypeError: 247 pass 248 249 # Use the constructor with a too-long tuple. 250 try: 251 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 252 except TypeError: 253 pass 254 255 256 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 257 def test_statvfs_attributes(self): 258 try: 259 result = os.statvfs(self.fname) 260 except OSError, e: 261 # On AtheOS, glibc always returns ENOSYS 262 if e.errno == errno.ENOSYS: 263 self.skipTest('glibc always returns ENOSYS on AtheOS') 264 265 # Make sure direct access works 266 self.assertEqual(result.f_bfree, result[3]) 267 268 # Make sure all the attributes are there. 269 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 270 'ffree', 'favail', 'flag', 'namemax') 271 for value, member in enumerate(members): 272 self.assertEqual(getattr(result, 'f_' + member), result[value]) 273 274 # Make sure that assignment really fails 275 try: 276 result.f_bfree = 1 277 self.fail("No exception raised") 278 except TypeError: 279 pass 280 281 try: 282 result.parrot = 1 283 self.fail("No exception raised") 284 except AttributeError: 285 pass 286 287 # Use the constructor with a too-short tuple. 288 try: 289 result2 = os.statvfs_result((10,)) 290 self.fail("No exception raised") 291 except TypeError: 292 pass 293 294 # Use the constructor with a too-long tuple. 295 try: 296 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 297 except TypeError: 298 pass 299 300 def test_utime_dir(self): 301 delta = 1000000 302 st = os.stat(test_support.TESTFN) 303 # round to int, because some systems may support sub-second 304 # time stamps in stat, but not in utime. 305 os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) 306 st2 = os.stat(test_support.TESTFN) 307 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta)) 308 309 # Restrict tests to Win32, since there is no guarantee other 310 # systems support centiseconds 311 def get_file_system(path): 312 if sys.platform == 'win32': 313 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 314 import ctypes 315 kernel32 = ctypes.windll.kernel32 316 buf = ctypes.create_string_buffer("", 100) 317 if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)): 318 return buf.value 319 320 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 321 @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS", 322 "requires NTFS") 323 def test_1565150(self): 324 t1 = 1159195039.25 325 os.utime(self.fname, (t1, t1)) 326 self.assertEqual(os.stat(self.fname).st_mtime, t1) 327 328 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 329 @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS", 330 "requires NTFS") 331 def test_large_time(self): 332 t1 = 5000000000 # some day in 2128 333 os.utime(self.fname, (t1, t1)) 334 self.assertEqual(os.stat(self.fname).st_mtime, t1) 335 336 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 337 def test_1686475(self): 338 # Verify that an open file can be stat'ed 339 try: 340 os.stat(r"c:\pagefile.sys") 341 except WindowsError, e: 342 if e.errno == 2: # file does not exist; cannot run test 343 self.skipTest(r'c:\pagefile.sys does not exist') 344 self.fail("Could not stat pagefile.sys") 345 346 from test import mapping_tests 347 348 class EnvironTests(mapping_tests.BasicTestMappingProtocol): 349 """check that os.environ object conform to mapping protocol""" 350 type2test = None 351 def _reference(self): 352 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 353 def _empty_mapping(self): 354 os.environ.clear() 355 return os.environ 356 def setUp(self): 357 self.__save = dict(os.environ) 358 os.environ.clear() 359 def tearDown(self): 360 os.environ.clear() 361 os.environ.update(self.__save) 362 363 # Bug 1110478 364 def test_update2(self): 365 if os.path.exists("/bin/sh"): 366 os.environ.update(HELLO="World") 367 with os.popen("/bin/sh -c 'echo $HELLO'") as popen: 368 value = popen.read().strip() 369 self.assertEqual(value, "World") 370 371 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue 372 # #13415). 373 @unittest.skipIf(sys.platform.startswith(('freebsd', 'darwin')), 374 "due to known OS bug: see issue #13415") 375 def test_unset_error(self): 376 if sys.platform == "win32": 377 # an environment variable is limited to 32,767 characters 378 key = 'x' * 50000 379 self.assertRaises(ValueError, os.environ.__delitem__, key) 380 else: 381 # "=" is not allowed in a variable name 382 key = 'key=' 383 self.assertRaises(OSError, os.environ.__delitem__, key) 384 385 class WalkTests(unittest.TestCase): 386 """Tests for os.walk().""" 387 388 def test_traversal(self): 389 import os 390 from os.path import join 391 392 # Build: 393 # TESTFN/ 394 # TEST1/ a file kid and two directory kids 395 # tmp1 396 # SUB1/ a file kid and a directory kid 397 # tmp2 398 # SUB11/ no kids 399 # SUB2/ a file kid and a dirsymlink kid 400 # tmp3 401 # link/ a symlink to TESTFN.2 402 # TEST2/ 403 # tmp4 a lone file 404 walk_path = join(test_support.TESTFN, "TEST1") 405 sub1_path = join(walk_path, "SUB1") 406 sub11_path = join(sub1_path, "SUB11") 407 sub2_path = join(walk_path, "SUB2") 408 tmp1_path = join(walk_path, "tmp1") 409 tmp2_path = join(sub1_path, "tmp2") 410 tmp3_path = join(sub2_path, "tmp3") 411 link_path = join(sub2_path, "link") 412 t2_path = join(test_support.TESTFN, "TEST2") 413 tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4") 414 415 # Create stuff. 416 os.makedirs(sub11_path) 417 os.makedirs(sub2_path) 418 os.makedirs(t2_path) 419 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: 420 f = file(path, "w") 421 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 422 f.close() 423 if hasattr(os, "symlink"): 424 os.symlink(os.path.abspath(t2_path), link_path) 425 sub2_tree = (sub2_path, ["link"], ["tmp3"]) 426 else: 427 sub2_tree = (sub2_path, [], ["tmp3"]) 428 429 # Walk top-down. 430 all = list(os.walk(walk_path)) 431 self.assertEqual(len(all), 4) 432 # We can't know which order SUB1 and SUB2 will appear in. 433 # Not flipped: TESTFN, SUB1, SUB11, SUB2 434 # flipped: TESTFN, SUB2, SUB1, SUB11 435 flipped = all[0][1][0] != "SUB1" 436 all[0][1].sort() 437 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) 438 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"])) 439 self.assertEqual(all[2 + flipped], (sub11_path, [], [])) 440 self.assertEqual(all[3 - 2 * flipped], sub2_tree) 441 442 # Prune the search. 443 all = [] 444 for root, dirs, files in os.walk(walk_path): 445 all.append((root, dirs, files)) 446 # Don't descend into SUB1. 447 if 'SUB1' in dirs: 448 # Note that this also mutates the dirs we appended to all! 449 dirs.remove('SUB1') 450 self.assertEqual(len(all), 2) 451 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"])) 452 self.assertEqual(all[1], sub2_tree) 453 454 # Walk bottom-up. 455 all = list(os.walk(walk_path, topdown=False)) 456 self.assertEqual(len(all), 4) 457 # We can't know which order SUB1 and SUB2 will appear in. 458 # Not flipped: SUB11, SUB1, SUB2, TESTFN 459 # flipped: SUB2, SUB11, SUB1, TESTFN 460 flipped = all[3][1][0] != "SUB1" 461 all[3][1].sort() 462 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) 463 self.assertEqual(all[flipped], (sub11_path, [], [])) 464 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"])) 465 self.assertEqual(all[2 - 2 * flipped], sub2_tree) 466 467 if hasattr(os, "symlink"): 468 # Walk, following symlinks. 469 for root, dirs, files in os.walk(walk_path, followlinks=True): 470 if root == link_path: 471 self.assertEqual(dirs, []) 472 self.assertEqual(files, ["tmp4"]) 473 break 474 else: 475 self.fail("Didn't follow symlink with followlinks=True") 476 477 def tearDown(self): 478 # Tear everything down. This is a decent use for bottom-up on 479 # Windows, which doesn't have a recursive delete command. The 480 # (not so) subtlety is that rmdir will fail unless the dir's 481 # kids are removed first, so bottom up is essential. 482 for root, dirs, files in os.walk(test_support.TESTFN, topdown=False): 483 for name in files: 484 os.remove(os.path.join(root, name)) 485 for name in dirs: 486 dirname = os.path.join(root, name) 487 if not os.path.islink(dirname): 488 os.rmdir(dirname) 489 else: 490 os.remove(dirname) 491 os.rmdir(test_support.TESTFN) 492 493 class MakedirTests (unittest.TestCase): 494 def setUp(self): 495 os.mkdir(test_support.TESTFN) 496 497 def test_makedir(self): 498 base = test_support.TESTFN 499 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 500 os.makedirs(path) # Should work 501 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 502 os.makedirs(path) 503 504 # Try paths with a '.' in them 505 self.assertRaises(OSError, os.makedirs, os.curdir) 506 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 507 os.makedirs(path) 508 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 509 'dir5', 'dir6') 510 os.makedirs(path) 511 512 513 514 515 def tearDown(self): 516 path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3', 517 'dir4', 'dir5', 'dir6') 518 # If the tests failed, the bottom-most directory ('../dir6') 519 # may not have been created, so we look for the outermost directory 520 # that exists. 521 while not os.path.exists(path) and path != test_support.TESTFN: 522 path = os.path.dirname(path) 523 524 os.removedirs(path) 525 526 class DevNullTests (unittest.TestCase): 527 def test_devnull(self): 528 f = file(os.devnull, 'w') 529 f.write('hello') 530 f.close() 531 f = file(os.devnull, 'r') 532 self.assertEqual(f.read(), '') 533 f.close() 534 535 class URandomTests (unittest.TestCase): 536 537 def test_urandom_length(self): 538 self.assertEqual(len(os.urandom(0)), 0) 539 self.assertEqual(len(os.urandom(1)), 1) 540 self.assertEqual(len(os.urandom(10)), 10) 541 self.assertEqual(len(os.urandom(100)), 100) 542 self.assertEqual(len(os.urandom(1000)), 1000) 543 544 def test_urandom_value(self): 545 data1 = os.urandom(16) 546 data2 = os.urandom(16) 547 self.assertNotEqual(data1, data2) 548 549 def get_urandom_subprocess(self, count): 550 # We need to use repr() and eval() to avoid line ending conversions 551 # under Windows. 552 code = '\n'.join(( 553 'import os, sys', 554 'data = os.urandom(%s)' % count, 555 'sys.stdout.write(repr(data))', 556 'sys.stdout.flush()', 557 'print >> sys.stderr, (len(data), data)')) 558 cmd_line = [sys.executable, '-c', code] 559 p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, 560 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 561 out, err = p.communicate() 562 self.assertEqual(p.wait(), 0, (p.wait(), err)) 563 out = eval(out) 564 self.assertEqual(len(out), count, err) 565 return out 566 567 def test_urandom_subprocess(self): 568 data1 = self.get_urandom_subprocess(16) 569 data2 = self.get_urandom_subprocess(16) 570 self.assertNotEqual(data1, data2) 571 572 573 HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1) 574 575 @unittest.skipIf(HAVE_GETENTROPY, 576 "getentropy() does not use a file descriptor") 577 class URandomFDTests(unittest.TestCase): 578 @unittest.skipUnless(resource, "test requires the resource module") 579 def test_urandom_failure(self): 580 # Check urandom() failing when it is not able to open /dev/random. 581 # We spawn a new process to make the test more robust (if getrlimit() 582 # failed to restore the file descriptor limit after this, the whole 583 # test suite would crash; this actually happened on the OS X Tiger 584 # buildbot). 585 code = """if 1: 586 import errno 587 import os 588 import resource 589 590 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 591 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 592 try: 593 os.urandom(16) 594 except OSError as e: 595 assert e.errno == errno.EMFILE, e.errno 596 else: 597 raise AssertionError("OSError not raised") 598 """ 599 assert_python_ok('-c', code) 600 601 602 class ExecvpeTests(unittest.TestCase): 603 604 def test_execvpe_with_bad_arglist(self): 605 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 606 607 608 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 609 class Win32ErrorTests(unittest.TestCase): 610 def test_rename(self): 611 self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak") 612 613 def test_remove(self): 614 self.assertRaises(WindowsError, os.remove, test_support.TESTFN) 615 616 def test_chdir(self): 617 self.assertRaises(WindowsError, os.chdir, test_support.TESTFN) 618 619 def test_mkdir(self): 620 f = open(test_support.TESTFN, "w") 621 try: 622 self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN) 623 finally: 624 f.close() 625 os.unlink(test_support.TESTFN) 626 627 def test_utime(self): 628 self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None) 629 630 def test_chmod(self): 631 self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0) 632 633 class TestInvalidFD(unittest.TestCase): 634 singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat", 635 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 636 #singles.append("close") 637 #We omit close because it doesn't raise an exception on some platforms 638 def get_single(f): 639 def helper(self): 640 if hasattr(os, f): 641 self.check(getattr(os, f)) 642 return helper 643 for f in singles: 644 locals()["test_"+f] = get_single(f) 645 646 def check(self, f, *args): 647 try: 648 f(test_support.make_bad_fd(), *args) 649 except OSError as e: 650 self.assertEqual(e.errno, errno.EBADF) 651 else: 652 self.fail("%r didn't raise an OSError with a bad file descriptor" 653 % f) 654 655 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 656 def test_isatty(self): 657 self.assertEqual(os.isatty(test_support.make_bad_fd()), False) 658 659 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 660 def test_closerange(self): 661 fd = test_support.make_bad_fd() 662 # Make sure none of the descriptors we are about to close are 663 # currently valid (issue 6542). 664 for i in range(10): 665 try: os.fstat(fd+i) 666 except OSError: 667 pass 668 else: 669 break 670 if i < 2: 671 raise unittest.SkipTest( 672 "Unable to acquire a range of invalid file descriptors") 673 self.assertEqual(os.closerange(fd, fd + i-1), None) 674 675 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 676 def test_dup2(self): 677 self.check(os.dup2, 20) 678 679 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 680 def test_fchmod(self): 681 self.check(os.fchmod, 0) 682 683 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 684 def test_fchown(self): 685 self.check(os.fchown, -1, -1) 686 687 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 688 def test_fpathconf(self): 689 self.check(os.fpathconf, "PC_NAME_MAX") 690 691 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 692 def test_ftruncate(self): 693 self.check(os.ftruncate, 0) 694 695 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 696 def test_lseek(self): 697 self.check(os.lseek, 0, 0) 698 699 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 700 def test_read(self): 701 self.check(os.read, 1) 702 703 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 704 def test_tcsetpgrpt(self): 705 self.check(os.tcsetpgrp, 0) 706 707 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 708 def test_write(self): 709 self.check(os.write, " ") 710 711 @unittest.skipIf(sys.platform == "win32", "Posix specific tests") 712 class PosixUidGidTests(unittest.TestCase): 713 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 714 def test_setuid(self): 715 if os.getuid() != 0: 716 self.assertRaises(os.error, os.setuid, 0) 717 self.assertRaises(OverflowError, os.setuid, 1<<32) 718 719 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 720 def test_setgid(self): 721 if os.getuid() != 0: 722 self.assertRaises(os.error, os.setgid, 0) 723 self.assertRaises(OverflowError, os.setgid, 1<<32) 724 725 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 726 def test_seteuid(self): 727 if os.getuid() != 0: 728 self.assertRaises(os.error, os.seteuid, 0) 729 self.assertRaises(OverflowError, os.seteuid, 1<<32) 730 731 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 732 def test_setegid(self): 733 if os.getuid() != 0: 734 self.assertRaises(os.error, os.setegid, 0) 735 self.assertRaises(OverflowError, os.setegid, 1<<32) 736 737 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 738 def test_setreuid(self): 739 if os.getuid() != 0: 740 self.assertRaises(os.error, os.setreuid, 0, 0) 741 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) 742 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) 743 744 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 745 def test_setreuid_neg1(self): 746 # Needs to accept -1. We run this in a subprocess to avoid 747 # altering the test runner's process state (issue8045). 748 subprocess.check_call([ 749 sys.executable, '-c', 750 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 751 752 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 753 def test_setregid(self): 754 if os.getuid() != 0: 755 self.assertRaises(os.error, os.setregid, 0, 0) 756 self.assertRaises(OverflowError, os.setregid, 1<<32, 0) 757 self.assertRaises(OverflowError, os.setregid, 0, 1<<32) 758 759 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 760 def test_setregid_neg1(self): 761 # Needs to accept -1. We run this in a subprocess to avoid 762 # altering the test runner's process state (issue8045). 763 subprocess.check_call([ 764 sys.executable, '-c', 765 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 766 767 768 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 769 class Win32KillTests(unittest.TestCase): 770 def _kill(self, sig): 771 # Start sys.executable as a subprocess and communicate from the 772 # subprocess to the parent that the interpreter is ready. When it 773 # becomes ready, send *sig* via os.kill to the subprocess and check 774 # that the return code is equal to *sig*. 775 import ctypes 776 from ctypes import wintypes 777 import msvcrt 778 779 # Since we can't access the contents of the process' stdout until the 780 # process has exited, use PeekNamedPipe to see what's inside stdout 781 # without waiting. This is done so we can tell that the interpreter 782 # is started and running at a point where it could handle a signal. 783 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 784 PeekNamedPipe.restype = wintypes.BOOL 785 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 786 ctypes.POINTER(ctypes.c_char), # stdout buf 787 wintypes.DWORD, # Buffer size 788 ctypes.POINTER(wintypes.DWORD), # bytes read 789 ctypes.POINTER(wintypes.DWORD), # bytes avail 790 ctypes.POINTER(wintypes.DWORD)) # bytes left 791 msg = "running" 792 proc = subprocess.Popen([sys.executable, "-c", 793 "import sys;" 794 "sys.stdout.write('{}');" 795 "sys.stdout.flush();" 796 "input()".format(msg)], 797 stdout=subprocess.PIPE, 798 stderr=subprocess.PIPE, 799 stdin=subprocess.PIPE) 800 self.addCleanup(proc.stdout.close) 801 self.addCleanup(proc.stderr.close) 802 self.addCleanup(proc.stdin.close) 803 804 count, max = 0, 100 805 while count < max and proc.poll() is None: 806 # Create a string buffer to store the result of stdout from the pipe 807 buf = ctypes.create_string_buffer(len(msg)) 808 # Obtain the text currently in proc.stdout 809 # Bytes read/avail/left are left as NULL and unused 810 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 811 buf, ctypes.sizeof(buf), None, None, None) 812 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 813 if buf.value: 814 self.assertEqual(msg, buf.value) 815 break 816 time.sleep(0.1) 817 count += 1 818 else: 819 self.fail("Did not receive communication from the subprocess") 820 821 os.kill(proc.pid, sig) 822 self.assertEqual(proc.wait(), sig) 823 824 def test_kill_sigterm(self): 825 # SIGTERM doesn't mean anything special, but make sure it works 826 self._kill(signal.SIGTERM) 827 828 def test_kill_int(self): 829 # os.kill on Windows can take an int which gets set as the exit code 830 self._kill(100) 831 832 def _kill_with_event(self, event, name): 833 tagname = "test_os_%s" % uuid.uuid1() 834 m = mmap.mmap(-1, 1, tagname) 835 m[0] = '0' 836 # Run a script which has console control handling enabled. 837 proc = subprocess.Popen([sys.executable, 838 os.path.join(os.path.dirname(__file__), 839 "win_console_handler.py"), tagname], 840 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 841 # Let the interpreter startup before we send signals. See #3137. 842 count, max = 0, 20 843 while count < max and proc.poll() is None: 844 if m[0] == '1': 845 break 846 time.sleep(0.5) 847 count += 1 848 else: 849 self.fail("Subprocess didn't finish initialization") 850 os.kill(proc.pid, event) 851 # proc.send_signal(event) could also be done here. 852 # Allow time for the signal to be passed and the process to exit. 853 time.sleep(0.5) 854 if not proc.poll(): 855 # Forcefully kill the process if we weren't able to signal it. 856 os.kill(proc.pid, signal.SIGINT) 857 self.fail("subprocess did not stop on {}".format(name)) 858 859 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 860 def test_CTRL_C_EVENT(self): 861 from ctypes import wintypes 862 import ctypes 863 864 # Make a NULL value by creating a pointer with no argument. 865 NULL = ctypes.POINTER(ctypes.c_int)() 866 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 867 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 868 wintypes.BOOL) 869 SetConsoleCtrlHandler.restype = wintypes.BOOL 870 871 # Calling this with NULL and FALSE causes the calling process to 872 # handle Ctrl+C, rather than ignore it. This property is inherited 873 # by subprocesses. 874 SetConsoleCtrlHandler(NULL, 0) 875 876 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 877 878 def test_CTRL_BREAK_EVENT(self): 879 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 880 881 882 def test_main(): 883 test_support.run_unittest( 884 FileTests, 885 TemporaryFileTests, 886 StatAttributeTests, 887 EnvironTests, 888 WalkTests, 889 MakedirTests, 890 DevNullTests, 891 URandomTests, 892 URandomFDTests, 893 ExecvpeTests, 894 Win32ErrorTests, 895 TestInvalidFD, 896 PosixUidGidTests, 897 Win32KillTests 898 ) 899 900 if __name__ == "__main__": 901 test_main() 902