Home | History | Annotate | Download | only in test
      1 # As a test suite for the os module, this is woefully inadequate, but this

      2 # does add tests for a few functions which have been determined to be more

      3 # portable than they had been thought to be.

      4 
      5 import os
      6 import errno
      7 import unittest
      8 import warnings
      9 import sys
     10 import signal
     11 import subprocess
     12 import time
     13 from test import test_support
     14 import mmap
     15 import uuid
     16 
     17 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__)
     18 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__)
     19 
     20 # Tests creating TESTFN

     21 class FileTests(unittest.TestCase):
     22     def setUp(self):
     23         if os.path.exists(test_support.TESTFN):
     24             os.unlink(test_support.TESTFN)
     25     tearDown = setUp
     26 
     27     def test_access(self):
     28         f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
     29         os.close(f)
     30         self.assertTrue(os.access(test_support.TESTFN, os.W_OK))
     31 
     32     def test_closerange(self):
     33         first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
     34         # We must allocate two consecutive file descriptors, otherwise

     35         # it will mess up other file descriptors (perhaps even the three

     36         # standard ones).

     37         second = os.dup(first)
     38         try:
     39             retries = 0
     40             while second != first + 1:
     41                 os.close(first)
     42                 retries += 1
     43                 if retries > 10:
     44                     # XXX test skipped

     45                     self.skipTest("couldn't allocate two consecutive fds")
     46                 first, second = second, os.dup(second)
     47         finally:
     48             os.close(second)
     49         # close a fd that is open, and one that isn't

     50         os.closerange(first, first + 2)
     51         self.assertRaises(OSError, os.write, first, "a")
     52 
     53     @test_support.cpython_only
     54     def test_rename(self):
     55         path = unicode(test_support.TESTFN)
     56         old = sys.getrefcount(path)
     57         self.assertRaises(TypeError, os.rename, path, 0)
     58         new = sys.getrefcount(path)
     59         self.assertEqual(old, new)
     60 
     61 
     62 class TemporaryFileTests(unittest.TestCase):
     63     def setUp(self):
     64         self.files = []
     65         os.mkdir(test_support.TESTFN)
     66 
     67     def tearDown(self):
     68         for name in self.files:
     69             os.unlink(name)
     70         os.rmdir(test_support.TESTFN)
     71 
     72     def check_tempfile(self, name):
     73         # make sure it doesn't already exist:

     74         self.assertFalse(os.path.exists(name),
     75                     "file already exists for temporary file")
     76         # make sure we can create the file

     77         open(name, "w")
     78         self.files.append(name)
     79 
     80     def test_tempnam(self):
     81         if not hasattr(os, "tempnam"):
     82             return
     83         with warnings.catch_warnings():
     84             warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
     85                                     r"test_os$")
     86             warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
     87             self.check_tempfile(os.tempnam())
     88 
     89             name = os.tempnam(test_support.TESTFN)
     90             self.check_tempfile(name)
     91 
     92             name = os.tempnam(test_support.TESTFN, "pfx")
     93             self.assertTrue(os.path.basename(name)[:3] == "pfx")
     94             self.check_tempfile(name)
     95 
     96     def test_tmpfile(self):
     97         if not hasattr(os, "tmpfile"):
     98             return
     99         # As with test_tmpnam() below, the Windows implementation of tmpfile()

    100         # attempts to create a file in the root directory of the current drive.

    101         # On Vista and Server 2008, this test will always fail for normal users

    102         # as writing to the root directory requires elevated privileges.  With

    103         # XP and below, the semantics of tmpfile() are the same, but the user

    104         # running the test is more likely to have administrative privileges on

    105         # their account already.  If that's the case, then os.tmpfile() should

    106         # work.  In order to make this test as useful as possible, rather than

    107         # trying to detect Windows versions or whether or not the user has the

    108         # right permissions, just try and create a file in the root directory

    109         # and see if it raises a 'Permission denied' OSError.  If it does, then

    110         # test that a subsequent call to os.tmpfile() raises the same error. If

    111         # it doesn't, assume we're on XP or below and the user running the test

    112         # has administrative privileges, and proceed with the test as normal.

    113         with warnings.catch_warnings():
    114             warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
    115 
    116             if sys.platform == 'win32':
    117                 name = '\\python_test_os_test_tmpfile.txt'
    118                 if os.path.exists(name):
    119                     os.remove(name)
    120                 try:
    121                     fp = open(name, 'w')
    122                 except IOError, first:
    123                     # open() failed, assert tmpfile() fails in the same way.

    124                     # Although open() raises an IOError and os.tmpfile() raises an

    125                     # OSError(), 'args' will be (13, 'Permission denied') in both

    126                     # cases.

    127                     try:
    128                         fp = os.tmpfile()
    129                     except OSError, second:
    130                         self.assertEqual(first.args, second.args)
    131                     else:
    132                         self.fail("expected os.tmpfile() to raise OSError")
    133                     return
    134                 else:
    135                     # open() worked, therefore, tmpfile() should work.  Close our

    136                     # dummy file and proceed with the test as normal.

    137                     fp.close()
    138                     os.remove(name)
    139 
    140             fp = os.tmpfile()
    141             fp.write("foobar")
    142             fp.seek(0,0)
    143             s = fp.read()
    144             fp.close()
    145             self.assertTrue(s == "foobar")
    146 
    147     def test_tmpnam(self):
    148         if not hasattr(os, "tmpnam"):
    149             return
    150         with warnings.catch_warnings():
    151             warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
    152                                     r"test_os$")
    153             warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning)
    154 
    155             name = os.tmpnam()
    156             if sys.platform in ("win32",):
    157                 # The Windows tmpnam() seems useless.  From the MS docs:

    158                 #

    159                 #     The character string that tmpnam creates consists of

    160                 #     the path prefix, defined by the entry P_tmpdir in the

    161                 #     file STDIO.H, followed by a sequence consisting of the

    162                 #     digit characters '0' through '9'; the numerical value

    163                 #     of this string is in the range 1 - 65,535.  Changing the

    164                 #     definitions of L_tmpnam or P_tmpdir in STDIO.H does not

    165                 #     change the operation of tmpnam.

    166                 #

    167                 # The really bizarre part is that, at least under MSVC6,

    168                 # P_tmpdir is "\\".  That is, the path returned refers to

    169                 # the root of the current drive.  That's a terrible place to

    170                 # put temp files, and, depending on privileges, the user

    171                 # may not even be able to open a file in the root directory.

    172                 self.assertFalse(os.path.exists(name),
    173                             "file already exists for temporary file")
    174             else:
    175                 self.check_tempfile(name)
    176 
    177 # Test attributes on return values from os.*stat* family.

    178 class StatAttributeTests(unittest.TestCase):
    179     def setUp(self):
    180         os.mkdir(test_support.TESTFN)
    181         self.fname = os.path.join(test_support.TESTFN, "f1")
    182         f = open(self.fname, 'wb')
    183         f.write("ABC")
    184         f.close()
    185 
    186     def tearDown(self):
    187         os.unlink(self.fname)
    188         os.rmdir(test_support.TESTFN)
    189 
    190     def test_stat_attributes(self):
    191         if not hasattr(os, "stat"):
    192             return
    193 
    194         import stat
    195         result = os.stat(self.fname)
    196 
    197         # Make sure direct access works

    198         self.assertEqual(result[stat.ST_SIZE], 3)
    199         self.assertEqual(result.st_size, 3)
    200 
    201         # Make sure all the attributes are there

    202         members = dir(result)
    203         for name in dir(stat):
    204             if name[:3] == 'ST_':
    205                 attr = name.lower()
    206                 if name.endswith("TIME"):
    207                     def trunc(x): return int(x)
    208                 else:
    209                     def trunc(x): return x
    210                 self.assertEqual(trunc(getattr(result, attr)),
    211                                  result[getattr(stat, name)])
    212                 self.assertIn(attr, members)
    213 
    214         try:
    215             result[200]
    216             self.fail("No exception thrown")
    217         except IndexError:
    218             pass
    219 
    220         # Make sure that assignment fails

    221         try:
    222             result.st_mode = 1
    223             self.fail("No exception thrown")
    224         except (AttributeError, TypeError):
    225             pass
    226 
    227         try:
    228             result.st_rdev = 1
    229             self.fail("No exception thrown")
    230         except (AttributeError, TypeError):
    231             pass
    232 
    233         try:
    234             result.parrot = 1
    235             self.fail("No exception thrown")
    236         except AttributeError:
    237             pass
    238 
    239         # Use the stat_result constructor with a too-short tuple.

    240         try:
    241             result2 = os.stat_result((10,))
    242             self.fail("No exception thrown")
    243         except TypeError:
    244             pass
    245 
    246         # Use the constructor with a too-long tuple.

    247         try:
    248             result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
    249         except TypeError:
    250             pass
    251 
    252 
    253     def test_statvfs_attributes(self):
    254         if not hasattr(os, "statvfs"):
    255             return
    256 
    257         try:
    258             result = os.statvfs(self.fname)
    259         except OSError, e:
    260             # On AtheOS, glibc always returns ENOSYS

    261             if e.errno == errno.ENOSYS:
    262                 return
    263 
    264         # Make sure direct access works

    265         self.assertEqual(result.f_bfree, result[3])
    266 
    267         # Make sure all the attributes are there.

    268         members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
    269                     'ffree', 'favail', 'flag', 'namemax')
    270         for value, member in enumerate(members):
    271             self.assertEqual(getattr(result, 'f_' + member), result[value])
    272 
    273         # Make sure that assignment really fails

    274         try:
    275             result.f_bfree = 1
    276             self.fail("No exception thrown")
    277         except TypeError:
    278             pass
    279 
    280         try:
    281             result.parrot = 1
    282             self.fail("No exception thrown")
    283         except AttributeError:
    284             pass
    285 
    286         # Use the constructor with a too-short tuple.

    287         try:
    288             result2 = os.statvfs_result((10,))
    289             self.fail("No exception thrown")
    290         except TypeError:
    291             pass
    292 
    293         # Use the constructor with a too-long tuple.

    294         try:
    295             result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
    296         except TypeError:
    297             pass
    298 
    299     def test_utime_dir(self):
    300         delta = 1000000
    301         st = os.stat(test_support.TESTFN)
    302         # round to int, because some systems may support sub-second

    303         # time stamps in stat, but not in utime.

    304         os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
    305         st2 = os.stat(test_support.TESTFN)
    306         self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
    307 
    308     # Restrict test to Win32, since there is no guarantee other

    309     # systems support centiseconds

    310     if sys.platform == 'win32':
    311         def get_file_system(path):
    312             root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
    313             import ctypes
    314             kernel32 = ctypes.windll.kernel32
    315             buf = ctypes.create_string_buffer("", 100)
    316             if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):
    317                 return buf.value
    318 
    319         if get_file_system(test_support.TESTFN) == "NTFS":
    320             def test_1565150(self):
    321                 t1 = 1159195039.25
    322                 os.utime(self.fname, (t1, t1))
    323                 self.assertEqual(os.stat(self.fname).st_mtime, t1)
    324 
    325             def test_large_time(self):
    326                 t1 = 5000000000 # some day in 2128
    327                 os.utime(self.fname, (t1, t1))
    328                 self.assertEqual(os.stat(self.fname).st_mtime, t1)
    329 
    330         def test_1686475(self):
    331             # Verify that an open file can be stat'ed
    332             try:
    333                 os.stat(r"c:\pagefile.sys")
    334             except WindowsError, e:
    335                 if e.errno == 2: # file does not exist; cannot run test

    336                     return
    337                 self.fail("Could not stat pagefile.sys")
    338 
    339 from test import mapping_tests
    340 
    341 class EnvironTests(mapping_tests.BasicTestMappingProtocol):
    342     """check that os.environ object conform to mapping protocol"""
    343     type2test = None
    344     def _reference(self):
    345         return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
    346     def _empty_mapping(self):
    347         os.environ.clear()
    348         return os.environ
    349     def setUp(self):
    350         self.__save = dict(os.environ)
    351         os.environ.clear()
    352     def tearDown(self):
    353         os.environ.clear()
    354         os.environ.update(self.__save)
    355 
    356     # Bug 1110478

    357     def test_update2(self):
    358         if os.path.exists("/bin/sh"):
    359             os.environ.update(HELLO="World")
    360             with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
    361                 value = popen.read().strip()
    362                 self.assertEqual(value, "World")
    363 
    364 class WalkTests(unittest.TestCase):
    365     """Tests for os.walk()."""
    366 
    367     def test_traversal(self):
    368         import os
    369         from os.path import join
    370 
    371         # Build:

    372         #     TESTFN/

    373         #       TEST1/              a file kid and two directory kids

    374         #         tmp1

    375         #         SUB1/             a file kid and a directory kid

    376         #           tmp2

    377         #           SUB11/          no kids

    378         #         SUB2/             a file kid and a dirsymlink kid

    379         #           tmp3

    380         #           link/           a symlink to TESTFN.2

    381         #       TEST2/

    382         #         tmp4              a lone file

    383         walk_path = join(test_support.TESTFN, "TEST1")
    384         sub1_path = join(walk_path, "SUB1")
    385         sub11_path = join(sub1_path, "SUB11")
    386         sub2_path = join(walk_path, "SUB2")
    387         tmp1_path = join(walk_path, "tmp1")
    388         tmp2_path = join(sub1_path, "tmp2")
    389         tmp3_path = join(sub2_path, "tmp3")
    390         link_path = join(sub2_path, "link")
    391         t2_path = join(test_support.TESTFN, "TEST2")
    392         tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4")
    393 
    394         # Create stuff.

    395         os.makedirs(sub11_path)
    396         os.makedirs(sub2_path)
    397         os.makedirs(t2_path)
    398         for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
    399             f = file(path, "w")
    400             f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
    401             f.close()
    402         if hasattr(os, "symlink"):
    403             os.symlink(os.path.abspath(t2_path), link_path)
    404             sub2_tree = (sub2_path, ["link"], ["tmp3"])
    405         else:
    406             sub2_tree = (sub2_path, [], ["tmp3"])
    407 
    408         # Walk top-down.

    409         all = list(os.walk(walk_path))
    410         self.assertEqual(len(all), 4)
    411         # We can't know which order SUB1 and SUB2 will appear in.

    412         # Not flipped:  TESTFN, SUB1, SUB11, SUB2

    413         #     flipped:  TESTFN, SUB2, SUB1, SUB11

    414         flipped = all[0][1][0] != "SUB1"
    415         all[0][1].sort()
    416         self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
    417         self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
    418         self.assertEqual(all[2 + flipped], (sub11_path, [], []))
    419         self.assertEqual(all[3 - 2 * flipped], sub2_tree)
    420 
    421         # Prune the search.

    422         all = []
    423         for root, dirs, files in os.walk(walk_path):
    424             all.append((root, dirs, files))
    425             # Don't descend into SUB1.

    426             if 'SUB1' in dirs:
    427                 # Note that this also mutates the dirs we appended to all!

    428                 dirs.remove('SUB1')
    429         self.assertEqual(len(all), 2)
    430         self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
    431         self.assertEqual(all[1], sub2_tree)
    432 
    433         # Walk bottom-up.

    434         all = list(os.walk(walk_path, topdown=False))
    435         self.assertEqual(len(all), 4)
    436         # We can't know which order SUB1 and SUB2 will appear in.

    437         # Not flipped:  SUB11, SUB1, SUB2, TESTFN

    438         #     flipped:  SUB2, SUB11, SUB1, TESTFN

    439         flipped = all[3][1][0] != "SUB1"
    440         all[3][1].sort()
    441         self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
    442         self.assertEqual(all[flipped], (sub11_path, [], []))
    443         self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
    444         self.assertEqual(all[2 - 2 * flipped], sub2_tree)
    445 
    446         if hasattr(os, "symlink"):
    447             # Walk, following symlinks.

    448             for root, dirs, files in os.walk(walk_path, followlinks=True):
    449                 if root == link_path:
    450                     self.assertEqual(dirs, [])
    451                     self.assertEqual(files, ["tmp4"])
    452                     break
    453             else:
    454                 self.fail("Didn't follow symlink with followlinks=True")
    455 
    456     def tearDown(self):
    457         # Tear everything down.  This is a decent use for bottom-up on

    458         # Windows, which doesn't have a recursive delete command.  The

    459         # (not so) subtlety is that rmdir will fail unless the dir's

    460         # kids are removed first, so bottom up is essential.

    461         for root, dirs, files in os.walk(test_support.TESTFN, topdown=False):
    462             for name in files:
    463                 os.remove(os.path.join(root, name))
    464             for name in dirs:
    465                 dirname = os.path.join(root, name)
    466                 if not os.path.islink(dirname):
    467                     os.rmdir(dirname)
    468                 else:
    469                     os.remove(dirname)
    470         os.rmdir(test_support.TESTFN)
    471 
    472 class MakedirTests (unittest.TestCase):
    473     def setUp(self):
    474         os.mkdir(test_support.TESTFN)
    475 
    476     def test_makedir(self):
    477         base = test_support.TESTFN
    478         path = os.path.join(base, 'dir1', 'dir2', 'dir3')
    479         os.makedirs(path)             # Should work

    480         path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
    481         os.makedirs(path)
    482 
    483         # Try paths with a '.' in them

    484         self.assertRaises(OSError, os.makedirs, os.curdir)
    485         path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
    486         os.makedirs(path)
    487         path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
    488                             'dir5', 'dir6')
    489         os.makedirs(path)
    490 
    491 
    492 
    493 
    494     def tearDown(self):
    495         path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3',
    496                             'dir4', 'dir5', 'dir6')
    497         # If the tests failed, the bottom-most directory ('../dir6')

    498         # may not have been created, so we look for the outermost directory

    499         # that exists.

    500         while not os.path.exists(path) and path != test_support.TESTFN:
    501             path = os.path.dirname(path)
    502 
    503         os.removedirs(path)
    504 
    505 class DevNullTests (unittest.TestCase):
    506     def test_devnull(self):
    507         f = file(os.devnull, 'w')
    508         f.write('hello')
    509         f.close()
    510         f = file(os.devnull, 'r')
    511         self.assertEqual(f.read(), '')
    512         f.close()
    513 
    514 class URandomTests (unittest.TestCase):
    515     def test_urandom(self):
    516         try:
    517             self.assertEqual(len(os.urandom(1)), 1)
    518             self.assertEqual(len(os.urandom(10)), 10)
    519             self.assertEqual(len(os.urandom(100)), 100)
    520             self.assertEqual(len(os.urandom(1000)), 1000)
    521             # see http://bugs.python.org/issue3708

    522             self.assertRaises(TypeError, os.urandom, 0.9)
    523             self.assertRaises(TypeError, os.urandom, 1.1)
    524             self.assertRaises(TypeError, os.urandom, 2.0)
    525         except NotImplementedError:
    526             pass
    527 
    528     def test_execvpe_with_bad_arglist(self):
    529         self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
    530 
    531 class Win32ErrorTests(unittest.TestCase):
    532     def test_rename(self):
    533         self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")
    534 
    535     def test_remove(self):
    536         self.assertRaises(WindowsError, os.remove, test_support.TESTFN)
    537 
    538     def test_chdir(self):
    539         self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)
    540 
    541     def test_mkdir(self):
    542         f = open(test_support.TESTFN, "w")
    543         try:
    544             self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN)
    545         finally:
    546             f.close()
    547             os.unlink(test_support.TESTFN)
    548 
    549     def test_utime(self):
    550         self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None)
    551 
    552     def test_chmod(self):
    553         self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0)
    554 
    555 class TestInvalidFD(unittest.TestCase):
    556     singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat",
    557                "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
    558     #singles.append("close")

    559     #We omit close because it doesn'r raise an exception on some platforms

    560     def get_single(f):
    561         def helper(self):
    562             if  hasattr(os, f):
    563                 self.check(getattr(os, f))
    564         return helper
    565     for f in singles:
    566         locals()["test_"+f] = get_single(f)
    567 
    568     def check(self, f, *args):
    569         try:
    570             f(test_support.make_bad_fd(), *args)
    571         except OSError as e:
    572             self.assertEqual(e.errno, errno.EBADF)
    573         else:
    574             self.fail("%r didn't raise a OSError with a bad file descriptor"
    575                       % f)
    576 
    577     def test_isatty(self):
    578         if hasattr(os, "isatty"):
    579             self.assertEqual(os.isatty(test_support.make_bad_fd()), False)
    580 
    581     def test_closerange(self):
    582         if hasattr(os, "closerange"):
    583             fd = test_support.make_bad_fd()
    584             # Make sure none of the descriptors we are about to close are

    585             # currently valid (issue 6542).

    586             for i in range(10):
    587                 try: os.fstat(fd+i)
    588                 except OSError:
    589                     pass
    590                 else:
    591                     break
    592             if i < 2:
    593                 raise unittest.SkipTest(
    594                     "Unable to acquire a range of invalid file descriptors")
    595             self.assertEqual(os.closerange(fd, fd + i-1), None)
    596 
    597     def test_dup2(self):
    598         if hasattr(os, "dup2"):
    599             self.check(os.dup2, 20)
    600 
    601     def test_fchmod(self):
    602         if hasattr(os, "fchmod"):
    603             self.check(os.fchmod, 0)
    604 
    605     def test_fchown(self):
    606         if hasattr(os, "fchown"):
    607             self.check(os.fchown, -1, -1)
    608 
    609     def test_fpathconf(self):
    610         if hasattr(os, "fpathconf"):
    611             self.check(os.fpathconf, "PC_NAME_MAX")
    612 
    613     def test_ftruncate(self):
    614         if hasattr(os, "ftruncate"):
    615             self.check(os.ftruncate, 0)
    616 
    617     def test_lseek(self):
    618         if hasattr(os, "lseek"):
    619             self.check(os.lseek, 0, 0)
    620 
    621     def test_read(self):
    622         if hasattr(os, "read"):
    623             self.check(os.read, 1)
    624 
    625     def test_tcsetpgrpt(self):
    626         if hasattr(os, "tcsetpgrp"):
    627             self.check(os.tcsetpgrp, 0)
    628 
    629     def test_write(self):
    630         if hasattr(os, "write"):
    631             self.check(os.write, " ")
    632 
    633 if sys.platform != 'win32':
    634     class Win32ErrorTests(unittest.TestCase):
    635         pass
    636 
    637     class PosixUidGidTests(unittest.TestCase):
    638         if hasattr(os, 'setuid'):
    639             def test_setuid(self):
    640                 if os.getuid() != 0:
    641                     self.assertRaises(os.error, os.setuid, 0)
    642                 self.assertRaises(OverflowError, os.setuid, 1<<32)
    643 
    644         if hasattr(os, 'setgid'):
    645             def test_setgid(self):
    646                 if os.getuid() != 0:
    647                     self.assertRaises(os.error, os.setgid, 0)
    648                 self.assertRaises(OverflowError, os.setgid, 1<<32)
    649 
    650         if hasattr(os, 'seteuid'):
    651             def test_seteuid(self):
    652                 if os.getuid() != 0:
    653                     self.assertRaises(os.error, os.seteuid, 0)
    654                 self.assertRaises(OverflowError, os.seteuid, 1<<32)
    655 
    656         if hasattr(os, 'setegid'):
    657             def test_setegid(self):
    658                 if os.getuid() != 0:
    659                     self.assertRaises(os.error, os.setegid, 0)
    660                 self.assertRaises(OverflowError, os.setegid, 1<<32)
    661 
    662         if hasattr(os, 'setreuid'):
    663             def test_setreuid(self):
    664                 if os.getuid() != 0:
    665                     self.assertRaises(os.error, os.setreuid, 0, 0)
    666                 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
    667                 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
    668 
    669             def test_setreuid_neg1(self):
    670                 # Needs to accept -1.  We run this in a subprocess to avoid

    671                 # altering the test runner's process state (issue8045).

    672                 subprocess.check_call([
    673                         sys.executable, '-c',
    674                         'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
    675 
    676         if hasattr(os, 'setregid'):
    677             def test_setregid(self):
    678                 if os.getuid() != 0:
    679                     self.assertRaises(os.error, os.setregid, 0, 0)
    680                 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
    681                 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
    682 
    683             def test_setregid_neg1(self):
    684                 # Needs to accept -1.  We run this in a subprocess to avoid

    685                 # altering the test runner's process state (issue8045).

    686                 subprocess.check_call([
    687                         sys.executable, '-c',
    688                         'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
    689 else:
    690     class PosixUidGidTests(unittest.TestCase):
    691         pass
    692 
    693 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    694 class Win32KillTests(unittest.TestCase):
    695     def _kill(self, sig):
    696         # Start sys.executable as a subprocess and communicate from the

    697         # subprocess to the parent that the interpreter is ready. When it

    698         # becomes ready, send *sig* via os.kill to the subprocess and check

    699         # that the return code is equal to *sig*.

    700         import ctypes
    701         from ctypes import wintypes
    702         import msvcrt
    703 
    704         # Since we can't access the contents of the process' stdout until the

    705         # process has exited, use PeekNamedPipe to see what's inside stdout

    706         # without waiting. This is done so we can tell that the interpreter

    707         # is started and running at a point where it could handle a signal.

    708         PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
    709         PeekNamedPipe.restype = wintypes.BOOL
    710         PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle

    711                                   ctypes.POINTER(ctypes.c_char), # stdout buf

    712                                   wintypes.DWORD, # Buffer size

    713                                   ctypes.POINTER(wintypes.DWORD), # bytes read

    714                                   ctypes.POINTER(wintypes.DWORD), # bytes avail

    715                                   ctypes.POINTER(wintypes.DWORD)) # bytes left

    716         msg = "running"
    717         proc = subprocess.Popen([sys.executable, "-c",
    718                                  "import sys;"
    719                                  "sys.stdout.write('{}');"
    720                                  "sys.stdout.flush();"
    721                                  "input()".format(msg)],
    722                                 stdout=subprocess.PIPE,
    723                                 stderr=subprocess.PIPE,
    724                                 stdin=subprocess.PIPE)
    725         self.addCleanup(proc.stdout.close)
    726         self.addCleanup(proc.stderr.close)
    727         self.addCleanup(proc.stdin.close)
    728 
    729         count, max = 0, 100
    730         while count < max and proc.poll() is None:
    731             # Create a string buffer to store the result of stdout from the pipe

    732             buf = ctypes.create_string_buffer(len(msg))
    733             # Obtain the text currently in proc.stdout

    734             # Bytes read/avail/left are left as NULL and unused

    735             rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
    736                                  buf, ctypes.sizeof(buf), None, None, None)
    737             self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
    738             if buf.value:
    739                 self.assertEqual(msg, buf.value)
    740                 break
    741             time.sleep(0.1)
    742             count += 1
    743         else:
    744             self.fail("Did not receive communication from the subprocess")
    745 
    746         os.kill(proc.pid, sig)
    747         self.assertEqual(proc.wait(), sig)
    748 
    749     def test_kill_sigterm(self):
    750         # SIGTERM doesn't mean anything special, but make sure it works

    751         self._kill(signal.SIGTERM)
    752 
    753     def test_kill_int(self):
    754         # os.kill on Windows can take an int which gets set as the exit code

    755         self._kill(100)
    756 
    757     def _kill_with_event(self, event, name):
    758         tagname = "test_os_%s" % uuid.uuid1()
    759         m = mmap.mmap(-1, 1, tagname)
    760         m[0] = '0'
    761         # Run a script which has console control handling enabled.

    762         proc = subprocess.Popen([sys.executable,
    763                    os.path.join(os.path.dirname(__file__),
    764                                 "win_console_handler.py"), tagname],
    765                    creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
    766         # Let the interpreter startup before we send signals. See #3137.

    767         count, max = 0, 20
    768         while count < max and proc.poll() is None:
    769             if m[0] == '1':
    770                 break
    771             time.sleep(0.5)
    772             count += 1
    773         else:
    774             self.fail("Subprocess didn't finish initialization")
    775         os.kill(proc.pid, event)
    776         # proc.send_signal(event) could also be done here.

    777         # Allow time for the signal to be passed and the process to exit.

    778         time.sleep(0.5)
    779         if not proc.poll():
    780             # Forcefully kill the process if we weren't able to signal it.

    781             os.kill(proc.pid, signal.SIGINT)
    782             self.fail("subprocess did not stop on {}".format(name))
    783 
    784     @unittest.skip("subprocesses aren't inheriting CTRL+C property")
    785     def test_CTRL_C_EVENT(self):
    786         from ctypes import wintypes
    787         import ctypes
    788 
    789         # Make a NULL value by creating a pointer with no argument.

    790         NULL = ctypes.POINTER(ctypes.c_int)()
    791         SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
    792         SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
    793                                           wintypes.BOOL)
    794         SetConsoleCtrlHandler.restype = wintypes.BOOL
    795 
    796         # Calling this with NULL and FALSE causes the calling process to

    797         # handle CTRL+C, rather than ignore it. This property is inherited

    798         # by subprocesses.

    799         SetConsoleCtrlHandler(NULL, 0)
    800 
    801         self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
    802 
    803     def test_CTRL_BREAK_EVENT(self):
    804         self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
    805 
    806 
    807 def test_main():
    808     test_support.run_unittest(
    809         FileTests,
    810         TemporaryFileTests,
    811         StatAttributeTests,
    812         EnvironTests,
    813         WalkTests,
    814         MakedirTests,
    815         DevNullTests,
    816         URandomTests,
    817         Win32ErrorTests,
    818         TestInvalidFD,
    819         PosixUidGidTests,
    820         Win32KillTests
    821     )
    822 
    823 if __name__ == "__main__":
    824     test_main()
    825