Home | History | Annotate | Download | only in test
      1 # As a test suite for the os module, this is woefully inadequate, but this
      2 # does add tests for a few functions which have been determined to be more
      3 # portable than they had been thought to be.
      4 
      5 import os
      6 import errno
      7 import unittest
      8 import warnings
      9 import sys
     10 import signal
     11 import subprocess
     12 import sysconfig
     13 import textwrap
     14 import time
     15 try:
     16     import resource
     17 except ImportError:
     18     resource = None
     19 
     20 from test import test_support
     21 from test.script_helper import assert_python_ok
     22 import mmap
     23 import uuid
     24 
     25 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__)
     26 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__)
     27 
     28 # Tests creating TESTFN
     29 class FileTests(unittest.TestCase):
     30     def setUp(self):
     31         if os.path.exists(test_support.TESTFN):
     32             os.unlink(test_support.TESTFN)
     33     tearDown = setUp
     34 
     35     def test_access(self):
     36         f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
     37         os.close(f)
     38         self.assertTrue(os.access(test_support.TESTFN, os.W_OK))
     39 
     40     def test_closerange(self):
     41         first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
     42         # We must allocate two consecutive file descriptors, otherwise
     43         # it will mess up other file descriptors (perhaps even the three
     44         # standard ones).
     45         second = os.dup(first)
     46         try:
     47             retries = 0
     48             while second != first + 1:
     49                 os.close(first)
     50                 retries += 1
     51                 if retries > 10:
     52                     # XXX test skipped
     53                     self.skipTest("couldn't allocate two consecutive fds")
     54                 first, second = second, os.dup(second)
     55         finally:
     56             os.close(second)
     57         # close a fd that is open, and one that isn't
     58         os.closerange(first, first + 2)
     59         self.assertRaises(OSError, os.write, first, "a")
     60 
     61     @test_support.cpython_only
     62     def test_rename(self):
     63         path = unicode(test_support.TESTFN)
     64         old = sys.getrefcount(path)
     65         self.assertRaises(TypeError, os.rename, path, 0)
     66         new = sys.getrefcount(path)
     67         self.assertEqual(old, new)
     68 
     69 
     70 class TemporaryFileTests(unittest.TestCase):
     71     def setUp(self):
     72         self.files = []
     73         os.mkdir(test_support.TESTFN)
     74 
     75     def tearDown(self):
     76         for name in self.files:
     77             os.unlink(name)
     78         os.rmdir(test_support.TESTFN)
     79 
     80     def check_tempfile(self, name):
     81         # make sure it doesn't already exist:
     82         self.assertFalse(os.path.exists(name),
     83                     "file already exists for temporary file")
     84         # make sure we can create the file
     85         open(name, "w")
     86         self.files.append(name)
     87 
     88     @unittest.skipUnless(hasattr(os, 'tempnam'), 'test needs os.tempnam()')
     89     def test_tempnam(self):
     90         with warnings.catch_warnings():
     91             warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
     92                                     r"test_os$")
     93             warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
     94             self.check_tempfile(os.tempnam())
     95 
     96             name = os.tempnam(test_support.TESTFN)
     97             self.check_tempfile(name)
     98 
     99             name = os.tempnam(test_support.TESTFN, "pfx")
    100             self.assertTrue(os.path.basename(name)[:3] == "pfx")
    101             self.check_tempfile(name)
    102 
    103     @unittest.skipUnless(hasattr(os, 'tmpfile'), 'test needs os.tmpfile()')
    104     def test_tmpfile(self):
    105         # As with test_tmpnam() below, the Windows implementation of tmpfile()
    106         # attempts to create a file in the root directory of the current drive.
    107         # On Vista and Server 2008, this test will always fail for normal users
    108         # as writing to the root directory requires elevated privileges.  With
    109         # XP and below, the semantics of tmpfile() are the same, but the user
    110         # running the test is more likely to have administrative privileges on
    111         # their account already.  If that's the case, then os.tmpfile() should
    112         # work.  In order to make this test as useful as possible, rather than
    113         # trying to detect Windows versions or whether or not the user has the
    114         # right permissions, just try and create a file in the root directory
    115         # and see if it raises a 'Permission denied' OSError.  If it does, then
    116         # test that a subsequent call to os.tmpfile() raises the same error. If
    117         # it doesn't, assume we're on XP or below and the user running the test
    118         # has administrative privileges, and proceed with the test as normal.
    119         with warnings.catch_warnings():
    120             warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
    121 
    122             if sys.platform == 'win32':
    123                 name = '\\python_test_os_test_tmpfile.txt'
    124                 if os.path.exists(name):
    125                     os.remove(name)
    126                 try:
    127                     fp = open(name, 'w')
    128                 except IOError, first:
    129                     # open() failed, assert tmpfile() fails in the same way.
    130                     # Although open() raises an IOError and os.tmpfile() raises an
    131                     # OSError(), 'args' will be (13, 'Permission denied') in both
    132                     # cases.
    133                     try:
    134                         fp = os.tmpfile()
    135                     except OSError, second:
    136                         self.assertEqual(first.args, second.args)
    137                     else:
    138                         self.fail("expected os.tmpfile() to raise OSError")
    139                     return
    140                 else:
    141                     # open() worked, therefore, tmpfile() should work.  Close our
    142                     # dummy file and proceed with the test as normal.
    143                     fp.close()
    144                     os.remove(name)
    145 
    146             fp = os.tmpfile()
    147             fp.write("foobar")
    148             fp.seek(0,0)
    149             s = fp.read()
    150             fp.close()
    151             self.assertTrue(s == "foobar")
    152 
    153     @unittest.skipUnless(hasattr(os, 'tmpnam'), 'test needs os.tmpnam()')
    154     def test_tmpnam(self):
    155         with warnings.catch_warnings():
    156             warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
    157                                     r"test_os$")
    158             warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning)
    159 
    160             name = os.tmpnam()
    161             if sys.platform in ("win32",):
    162                 # The Windows tmpnam() seems useless.  From the MS docs:
    163                 #
    164                 #     The character string that tmpnam creates consists of
    165                 #     the path prefix, defined by the entry P_tmpdir in the
    166                 #     file STDIO.H, followed by a sequence consisting of the
    167                 #     digit characters '0' through '9'; the numerical value
    168                 #     of this string is in the range 1 - 65,535.  Changing the
    169                 #     definitions of L_tmpnam or P_tmpdir in STDIO.H does not
    170                 #     change the operation of tmpnam.
    171                 #
    172                 # The really bizarre part is that, at least under MSVC6,
    173                 # P_tmpdir is "\\".  That is, the path returned refers to
    174                 # the root of the current drive.  That's a terrible place to
    175                 # put temp files, and, depending on privileges, the user
    176                 # may not even be able to open a file in the root directory.
    177                 self.assertFalse(os.path.exists(name),
    178                             "file already exists for temporary file")
    179             else:
    180                 self.check_tempfile(name)
    181 
    182 # Test attributes on return values from os.*stat* family.
    183 class StatAttributeTests(unittest.TestCase):
    184     def setUp(self):
    185         os.mkdir(test_support.TESTFN)
    186         self.fname = os.path.join(test_support.TESTFN, "f1")
    187         f = open(self.fname, 'wb')
    188         f.write("ABC")
    189         f.close()
    190 
    191     def tearDown(self):
    192         os.unlink(self.fname)
    193         os.rmdir(test_support.TESTFN)
    194 
    195     @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
    196     def test_stat_attributes(self):
    197         import stat
    198         result = os.stat(self.fname)
    199 
    200         # Make sure direct access works
    201         self.assertEqual(result[stat.ST_SIZE], 3)
    202         self.assertEqual(result.st_size, 3)
    203 
    204         # Make sure all the attributes are there
    205         members = dir(result)
    206         for name in dir(stat):
    207             if name[:3] == 'ST_':
    208                 attr = name.lower()
    209                 if name.endswith("TIME"):
    210                     def trunc(x): return int(x)
    211                 else:
    212                     def trunc(x): return x
    213                 self.assertEqual(trunc(getattr(result, attr)),
    214                                  result[getattr(stat, name)])
    215                 self.assertIn(attr, members)
    216 
    217         try:
    218             result[200]
    219             self.fail("No exception raised")
    220         except IndexError:
    221             pass
    222 
    223         # Make sure that assignment fails
    224         try:
    225             result.st_mode = 1
    226             self.fail("No exception raised")
    227         except (AttributeError, TypeError):
    228             pass
    229 
    230         try:
    231             result.st_rdev = 1
    232             self.fail("No exception raised")
    233         except (AttributeError, TypeError):
    234             pass
    235 
    236         try:
    237             result.parrot = 1
    238             self.fail("No exception raised")
    239         except AttributeError:
    240             pass
    241 
    242         # Use the stat_result constructor with a too-short tuple.
    243         try:
    244             result2 = os.stat_result((10,))
    245             self.fail("No exception raised")
    246         except TypeError:
    247             pass
    248 
    249         # Use the constructor with a too-long tuple.
    250         try:
    251             result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
    252         except TypeError:
    253             pass
    254 
    255 
    256     @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
    257     def test_statvfs_attributes(self):
    258         try:
    259             result = os.statvfs(self.fname)
    260         except OSError, e:
    261             # On AtheOS, glibc always returns ENOSYS
    262             if e.errno == errno.ENOSYS:
    263                 self.skipTest('glibc always returns ENOSYS on AtheOS')
    264 
    265         # Make sure direct access works
    266         self.assertEqual(result.f_bfree, result[3])
    267 
    268         # Make sure all the attributes are there.
    269         members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
    270                     'ffree', 'favail', 'flag', 'namemax')
    271         for value, member in enumerate(members):
    272             self.assertEqual(getattr(result, 'f_' + member), result[value])
    273 
    274         # Make sure that assignment really fails
    275         try:
    276             result.f_bfree = 1
    277             self.fail("No exception raised")
    278         except TypeError:
    279             pass
    280 
    281         try:
    282             result.parrot = 1
    283             self.fail("No exception raised")
    284         except AttributeError:
    285             pass
    286 
    287         # Use the constructor with a too-short tuple.
    288         try:
    289             result2 = os.statvfs_result((10,))
    290             self.fail("No exception raised")
    291         except TypeError:
    292             pass
    293 
    294         # Use the constructor with a too-long tuple.
    295         try:
    296             result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
    297         except TypeError:
    298             pass
    299 
    300     def test_utime_dir(self):
    301         delta = 1000000
    302         st = os.stat(test_support.TESTFN)
    303         # round to int, because some systems may support sub-second
    304         # time stamps in stat, but not in utime.
    305         os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
    306         st2 = os.stat(test_support.TESTFN)
    307         self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
    308 
    309     # Restrict tests to Win32, since there is no guarantee other
    310     # systems support centiseconds
    311     def get_file_system(path):
    312         if sys.platform == 'win32':
    313             root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
    314             import ctypes
    315             kernel32 = ctypes.windll.kernel32
    316             buf = ctypes.create_string_buffer("", 100)
    317             if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):
    318                 return buf.value
    319 
    320     @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    321     @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS",
    322                          "requires NTFS")
    323     def test_1565150(self):
    324         t1 = 1159195039.25
    325         os.utime(self.fname, (t1, t1))
    326         self.assertEqual(os.stat(self.fname).st_mtime, t1)
    327 
    328     @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    329     @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS",
    330                          "requires NTFS")
    331     def test_large_time(self):
    332         t1 = 5000000000 # some day in 2128
    333         os.utime(self.fname, (t1, t1))
    334         self.assertEqual(os.stat(self.fname).st_mtime, t1)
    335 
    336     @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    337     def test_1686475(self):
    338         # Verify that an open file can be stat'ed
    339         try:
    340             os.stat(r"c:\pagefile.sys")
    341         except WindowsError, e:
    342             if e.errno == 2: # file does not exist; cannot run test
    343                 self.skipTest(r'c:\pagefile.sys does not exist')
    344             self.fail("Could not stat pagefile.sys")
    345 
    346 from test import mapping_tests
    347 
    348 class EnvironTests(mapping_tests.BasicTestMappingProtocol):
    349     """check that os.environ object conform to mapping protocol"""
    350     type2test = None
    351     def _reference(self):
    352         return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
    353     def _empty_mapping(self):
    354         os.environ.clear()
    355         return os.environ
    356     def setUp(self):
    357         self.__save = dict(os.environ)
    358         os.environ.clear()
    359     def tearDown(self):
    360         os.environ.clear()
    361         os.environ.update(self.__save)
    362 
    363     # Bug 1110478
    364     def test_update2(self):
    365         if os.path.exists("/bin/sh"):
    366             os.environ.update(HELLO="World")
    367             with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
    368                 value = popen.read().strip()
    369                 self.assertEqual(value, "World")
    370 
    371     # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
    372     # #13415).
    373     @unittest.skipIf(sys.platform.startswith(('freebsd', 'darwin')),
    374                      "due to known OS bug: see issue #13415")
    375     def test_unset_error(self):
    376         if sys.platform == "win32":
    377             # an environment variable is limited to 32,767 characters
    378             key = 'x' * 50000
    379             self.assertRaises(ValueError, os.environ.__delitem__, key)
    380         else:
    381             # "=" is not allowed in a variable name
    382             key = 'key='
    383             self.assertRaises(OSError, os.environ.__delitem__, key)
    384 
    385 class WalkTests(unittest.TestCase):
    386     """Tests for os.walk()."""
    387 
    388     def test_traversal(self):
    389         import os
    390         from os.path import join
    391 
    392         # Build:
    393         #     TESTFN/
    394         #       TEST1/              a file kid and two directory kids
    395         #         tmp1
    396         #         SUB1/             a file kid and a directory kid
    397         #           tmp2
    398         #           SUB11/          no kids
    399         #         SUB2/             a file kid and a dirsymlink kid
    400         #           tmp3
    401         #           link/           a symlink to TESTFN.2
    402         #       TEST2/
    403         #         tmp4              a lone file
    404         walk_path = join(test_support.TESTFN, "TEST1")
    405         sub1_path = join(walk_path, "SUB1")
    406         sub11_path = join(sub1_path, "SUB11")
    407         sub2_path = join(walk_path, "SUB2")
    408         tmp1_path = join(walk_path, "tmp1")
    409         tmp2_path = join(sub1_path, "tmp2")
    410         tmp3_path = join(sub2_path, "tmp3")
    411         link_path = join(sub2_path, "link")
    412         t2_path = join(test_support.TESTFN, "TEST2")
    413         tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4")
    414 
    415         # Create stuff.
    416         os.makedirs(sub11_path)
    417         os.makedirs(sub2_path)
    418         os.makedirs(t2_path)
    419         for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
    420             f = file(path, "w")
    421             f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
    422             f.close()
    423         if hasattr(os, "symlink"):
    424             os.symlink(os.path.abspath(t2_path), link_path)
    425             sub2_tree = (sub2_path, ["link"], ["tmp3"])
    426         else:
    427             sub2_tree = (sub2_path, [], ["tmp3"])
    428 
    429         # Walk top-down.
    430         all = list(os.walk(walk_path))
    431         self.assertEqual(len(all), 4)
    432         # We can't know which order SUB1 and SUB2 will appear in.
    433         # Not flipped:  TESTFN, SUB1, SUB11, SUB2
    434         #     flipped:  TESTFN, SUB2, SUB1, SUB11
    435         flipped = all[0][1][0] != "SUB1"
    436         all[0][1].sort()
    437         self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
    438         self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
    439         self.assertEqual(all[2 + flipped], (sub11_path, [], []))
    440         self.assertEqual(all[3 - 2 * flipped], sub2_tree)
    441 
    442         # Prune the search.
    443         all = []
    444         for root, dirs, files in os.walk(walk_path):
    445             all.append((root, dirs, files))
    446             # Don't descend into SUB1.
    447             if 'SUB1' in dirs:
    448                 # Note that this also mutates the dirs we appended to all!
    449                 dirs.remove('SUB1')
    450         self.assertEqual(len(all), 2)
    451         self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
    452         self.assertEqual(all[1], sub2_tree)
    453 
    454         # Walk bottom-up.
    455         all = list(os.walk(walk_path, topdown=False))
    456         self.assertEqual(len(all), 4)
    457         # We can't know which order SUB1 and SUB2 will appear in.
    458         # Not flipped:  SUB11, SUB1, SUB2, TESTFN
    459         #     flipped:  SUB2, SUB11, SUB1, TESTFN
    460         flipped = all[3][1][0] != "SUB1"
    461         all[3][1].sort()
    462         self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
    463         self.assertEqual(all[flipped], (sub11_path, [], []))
    464         self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
    465         self.assertEqual(all[2 - 2 * flipped], sub2_tree)
    466 
    467         if hasattr(os, "symlink"):
    468             # Walk, following symlinks.
    469             for root, dirs, files in os.walk(walk_path, followlinks=True):
    470                 if root == link_path:
    471                     self.assertEqual(dirs, [])
    472                     self.assertEqual(files, ["tmp4"])
    473                     break
    474             else:
    475                 self.fail("Didn't follow symlink with followlinks=True")
    476 
    477     def tearDown(self):
    478         # Tear everything down.  This is a decent use for bottom-up on
    479         # Windows, which doesn't have a recursive delete command.  The
    480         # (not so) subtlety is that rmdir will fail unless the dir's
    481         # kids are removed first, so bottom up is essential.
    482         for root, dirs, files in os.walk(test_support.TESTFN, topdown=False):
    483             for name in files:
    484                 os.remove(os.path.join(root, name))
    485             for name in dirs:
    486                 dirname = os.path.join(root, name)
    487                 if not os.path.islink(dirname):
    488                     os.rmdir(dirname)
    489                 else:
    490                     os.remove(dirname)
    491         os.rmdir(test_support.TESTFN)
    492 
    493 class MakedirTests (unittest.TestCase):
    494     def setUp(self):
    495         os.mkdir(test_support.TESTFN)
    496 
    497     def test_makedir(self):
    498         base = test_support.TESTFN
    499         path = os.path.join(base, 'dir1', 'dir2', 'dir3')
    500         os.makedirs(path)             # Should work
    501         path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
    502         os.makedirs(path)
    503 
    504         # Try paths with a '.' in them
    505         self.assertRaises(OSError, os.makedirs, os.curdir)
    506         path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
    507         os.makedirs(path)
    508         path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
    509                             'dir5', 'dir6')
    510         os.makedirs(path)
    511 
    512 
    513 
    514 
    515     def tearDown(self):
    516         path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3',
    517                             'dir4', 'dir5', 'dir6')
    518         # If the tests failed, the bottom-most directory ('../dir6')
    519         # may not have been created, so we look for the outermost directory
    520         # that exists.
    521         while not os.path.exists(path) and path != test_support.TESTFN:
    522             path = os.path.dirname(path)
    523 
    524         os.removedirs(path)
    525 
    526 class DevNullTests (unittest.TestCase):
    527     def test_devnull(self):
    528         f = file(os.devnull, 'w')
    529         f.write('hello')
    530         f.close()
    531         f = file(os.devnull, 'r')
    532         self.assertEqual(f.read(), '')
    533         f.close()
    534 
    535 class URandomTests (unittest.TestCase):
    536 
    537     def test_urandom_length(self):
    538         self.assertEqual(len(os.urandom(0)), 0)
    539         self.assertEqual(len(os.urandom(1)), 1)
    540         self.assertEqual(len(os.urandom(10)), 10)
    541         self.assertEqual(len(os.urandom(100)), 100)
    542         self.assertEqual(len(os.urandom(1000)), 1000)
    543 
    544     def test_urandom_value(self):
    545         data1 = os.urandom(16)
    546         data2 = os.urandom(16)
    547         self.assertNotEqual(data1, data2)
    548 
    549     def get_urandom_subprocess(self, count):
    550         # We need to use repr() and eval() to avoid line ending conversions
    551         # under Windows.
    552         code = '\n'.join((
    553             'import os, sys',
    554             'data = os.urandom(%s)' % count,
    555             'sys.stdout.write(repr(data))',
    556             'sys.stdout.flush()',
    557             'print >> sys.stderr, (len(data), data)'))
    558         cmd_line = [sys.executable, '-c', code]
    559         p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
    560                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    561         out, err = p.communicate()
    562         self.assertEqual(p.wait(), 0, (p.wait(), err))
    563         out = eval(out)
    564         self.assertEqual(len(out), count, err)
    565         return out
    566 
    567     def test_urandom_subprocess(self):
    568         data1 = self.get_urandom_subprocess(16)
    569         data2 = self.get_urandom_subprocess(16)
    570         self.assertNotEqual(data1, data2)
    571 
    572 
    573 HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
    574 
    575 @unittest.skipIf(HAVE_GETENTROPY,
    576                  "getentropy() does not use a file descriptor")
    577 class URandomFDTests(unittest.TestCase):
    578     @unittest.skipUnless(resource, "test requires the resource module")
    579     def test_urandom_failure(self):
    580         # Check urandom() failing when it is not able to open /dev/random.
    581         # We spawn a new process to make the test more robust (if getrlimit()
    582         # failed to restore the file descriptor limit after this, the whole
    583         # test suite would crash; this actually happened on the OS X Tiger
    584         # buildbot).
    585         code = """if 1:
    586             import errno
    587             import os
    588             import resource
    589 
    590             soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
    591             resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
    592             try:
    593                 os.urandom(16)
    594             except OSError as e:
    595                 assert e.errno == errno.EMFILE, e.errno
    596             else:
    597                 raise AssertionError("OSError not raised")
    598             """
    599         assert_python_ok('-c', code)
    600 
    601 
    602 class ExecTests(unittest.TestCase):
    603 
    604     def test_execvpe_with_bad_arglist(self):
    605         self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
    606 
    607     def test_execve_invalid_env(self):
    608         args = [sys.executable, '-c', 'pass']
    609 
    610         # null character in the enviroment variable name
    611         newenv = os.environ.copy()
    612         newenv["FRUIT\0VEGETABLE"] = "cabbage"
    613         with self.assertRaises(TypeError):
    614             os.execve(args[0], args, newenv)
    615 
    616         # null character in the enviroment variable value
    617         newenv = os.environ.copy()
    618         newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
    619         with self.assertRaises(TypeError):
    620             os.execve(args[0], args, newenv)
    621 
    622         # equal character in the enviroment variable name
    623         newenv = os.environ.copy()
    624         newenv["FRUIT=ORANGE"] = "lemon"
    625         with self.assertRaises(ValueError):
    626             os.execve(args[0], args, newenv)
    627 
    628 
    629 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    630 class Win32ErrorTests(unittest.TestCase):
    631     def test_rename(self):
    632         self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")
    633 
    634     def test_remove(self):
    635         self.assertRaises(WindowsError, os.remove, test_support.TESTFN)
    636 
    637     def test_chdir(self):
    638         self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)
    639 
    640     def test_mkdir(self):
    641         f = open(test_support.TESTFN, "w")
    642         try:
    643             self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN)
    644         finally:
    645             f.close()
    646             os.unlink(test_support.TESTFN)
    647 
    648     def test_utime(self):
    649         self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None)
    650 
    651     def test_chmod(self):
    652         self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0)
    653 
    654 class TestInvalidFD(unittest.TestCase):
    655     singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat",
    656                "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
    657     #singles.append("close")
    658     #We omit close because it doesn't raise an exception on some platforms
    659     def get_single(f):
    660         def helper(self):
    661             if  hasattr(os, f):
    662                 self.check(getattr(os, f))
    663         return helper
    664     for f in singles:
    665         locals()["test_"+f] = get_single(f)
    666 
    667     def check(self, f, *args):
    668         try:
    669             f(test_support.make_bad_fd(), *args)
    670         except OSError as e:
    671             self.assertEqual(e.errno, errno.EBADF)
    672         else:
    673             self.fail("%r didn't raise an OSError with a bad file descriptor"
    674                       % f)
    675 
    676     @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
    677     def test_isatty(self):
    678         self.assertEqual(os.isatty(test_support.make_bad_fd()), False)
    679 
    680     @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
    681     def test_closerange(self):
    682         fd = test_support.make_bad_fd()
    683         # Make sure none of the descriptors we are about to close are
    684         # currently valid (issue 6542).
    685         for i in range(10):
    686             try: os.fstat(fd+i)
    687             except OSError:
    688                 pass
    689             else:
    690                 break
    691         if i < 2:
    692             raise unittest.SkipTest(
    693                 "Unable to acquire a range of invalid file descriptors")
    694         self.assertEqual(os.closerange(fd, fd + i-1), None)
    695 
    696     @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
    697     def test_dup2(self):
    698         self.check(os.dup2, 20)
    699 
    700     @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
    701     def test_fchmod(self):
    702         self.check(os.fchmod, 0)
    703 
    704     @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
    705     def test_fchown(self):
    706         self.check(os.fchown, -1, -1)
    707 
    708     @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
    709     def test_fpathconf(self):
    710         self.check(os.fpathconf, "PC_NAME_MAX")
    711 
    712     @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
    713     def test_ftruncate(self):
    714         self.check(os.ftruncate, 0)
    715 
    716     @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
    717     def test_lseek(self):
    718         self.check(os.lseek, 0, 0)
    719 
    720     @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
    721     def test_read(self):
    722         self.check(os.read, 1)
    723 
    724     @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
    725     def test_tcsetpgrpt(self):
    726         self.check(os.tcsetpgrp, 0)
    727 
    728     @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
    729     def test_write(self):
    730         self.check(os.write, " ")
    731 
    732 @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
    733 class PosixUidGidTests(unittest.TestCase):
    734     @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
    735     def test_setuid(self):
    736         if os.getuid() != 0:
    737             self.assertRaises(os.error, os.setuid, 0)
    738         self.assertRaises(OverflowError, os.setuid, 1<<32)
    739 
    740     @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
    741     def test_setgid(self):
    742         if os.getuid() != 0:
    743             self.assertRaises(os.error, os.setgid, 0)
    744         self.assertRaises(OverflowError, os.setgid, 1<<32)
    745 
    746     @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
    747     def test_seteuid(self):
    748         if os.getuid() != 0:
    749             self.assertRaises(os.error, os.seteuid, 0)
    750         self.assertRaises(OverflowError, os.seteuid, 1<<32)
    751 
    752     @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
    753     def test_setegid(self):
    754         if os.getuid() != 0:
    755             self.assertRaises(os.error, os.setegid, 0)
    756         self.assertRaises(OverflowError, os.setegid, 1<<32)
    757 
    758     @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
    759     def test_setreuid(self):
    760         if os.getuid() != 0:
    761             self.assertRaises(os.error, os.setreuid, 0, 0)
    762         self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
    763         self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
    764 
    765     @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
    766     def test_setreuid_neg1(self):
    767         # Needs to accept -1.  We run this in a subprocess to avoid
    768         # altering the test runner's process state (issue8045).
    769         subprocess.check_call([
    770                 sys.executable, '-c',
    771                 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
    772 
    773     @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
    774     def test_setregid(self):
    775         if os.getuid() != 0:
    776             self.assertRaises(os.error, os.setregid, 0, 0)
    777         self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
    778         self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
    779 
    780     @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
    781     def test_setregid_neg1(self):
    782         # Needs to accept -1.  We run this in a subprocess to avoid
    783         # altering the test runner's process state (issue8045).
    784         subprocess.check_call([
    785                 sys.executable, '-c',
    786                 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
    787 
    788 
    789 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    790 class Win32KillTests(unittest.TestCase):
    791     def _kill(self, sig):
    792         # Start sys.executable as a subprocess and communicate from the
    793         # subprocess to the parent that the interpreter is ready. When it
    794         # becomes ready, send *sig* via os.kill to the subprocess and check
    795         # that the return code is equal to *sig*.
    796         import ctypes
    797         from ctypes import wintypes
    798         import msvcrt
    799 
    800         # Since we can't access the contents of the process' stdout until the
    801         # process has exited, use PeekNamedPipe to see what's inside stdout
    802         # without waiting. This is done so we can tell that the interpreter
    803         # is started and running at a point where it could handle a signal.
    804         PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
    805         PeekNamedPipe.restype = wintypes.BOOL
    806         PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
    807                                   ctypes.POINTER(ctypes.c_char), # stdout buf
    808                                   wintypes.DWORD, # Buffer size
    809                                   ctypes.POINTER(wintypes.DWORD), # bytes read
    810                                   ctypes.POINTER(wintypes.DWORD), # bytes avail
    811                                   ctypes.POINTER(wintypes.DWORD)) # bytes left
    812         msg = "running"
    813         proc = subprocess.Popen([sys.executable, "-c",
    814                                  "import sys;"
    815                                  "sys.stdout.write('{}');"
    816                                  "sys.stdout.flush();"
    817                                  "input()".format(msg)],
    818                                 stdout=subprocess.PIPE,
    819                                 stderr=subprocess.PIPE,
    820                                 stdin=subprocess.PIPE)
    821         self.addCleanup(proc.stdout.close)
    822         self.addCleanup(proc.stderr.close)
    823         self.addCleanup(proc.stdin.close)
    824 
    825         count, max = 0, 100
    826         while count < max and proc.poll() is None:
    827             # Create a string buffer to store the result of stdout from the pipe
    828             buf = ctypes.create_string_buffer(len(msg))
    829             # Obtain the text currently in proc.stdout
    830             # Bytes read/avail/left are left as NULL and unused
    831             rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
    832                                  buf, ctypes.sizeof(buf), None, None, None)
    833             self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
    834             if buf.value:
    835                 self.assertEqual(msg, buf.value)
    836                 break
    837             time.sleep(0.1)
    838             count += 1
    839         else:
    840             self.fail("Did not receive communication from the subprocess")
    841 
    842         os.kill(proc.pid, sig)
    843         self.assertEqual(proc.wait(), sig)
    844 
    845     def test_kill_sigterm(self):
    846         # SIGTERM doesn't mean anything special, but make sure it works
    847         self._kill(signal.SIGTERM)
    848 
    849     def test_kill_int(self):
    850         # os.kill on Windows can take an int which gets set as the exit code
    851         self._kill(100)
    852 
    853     def _kill_with_event(self, event, name):
    854         tagname = "test_os_%s" % uuid.uuid1()
    855         m = mmap.mmap(-1, 1, tagname)
    856         m[0] = '0'
    857         # Run a script which has console control handling enabled.
    858         proc = subprocess.Popen([sys.executable,
    859                    os.path.join(os.path.dirname(__file__),
    860                                 "win_console_handler.py"), tagname],
    861                    creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
    862         # Let the interpreter startup before we send signals. See #3137.
    863         count, max = 0, 20
    864         while count < max and proc.poll() is None:
    865             if m[0] == '1':
    866                 break
    867             time.sleep(0.5)
    868             count += 1
    869         else:
    870             self.fail("Subprocess didn't finish initialization")
    871         os.kill(proc.pid, event)
    872         # proc.send_signal(event) could also be done here.
    873         # Allow time for the signal to be passed and the process to exit.
    874         time.sleep(0.5)
    875         if not proc.poll():
    876             # Forcefully kill the process if we weren't able to signal it.
    877             os.kill(proc.pid, signal.SIGINT)
    878             self.fail("subprocess did not stop on {}".format(name))
    879 
    880     @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
    881     def test_CTRL_C_EVENT(self):
    882         from ctypes import wintypes
    883         import ctypes
    884 
    885         # Make a NULL value by creating a pointer with no argument.
    886         NULL = ctypes.POINTER(ctypes.c_int)()
    887         SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
    888         SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
    889                                           wintypes.BOOL)
    890         SetConsoleCtrlHandler.restype = wintypes.BOOL
    891 
    892         # Calling this with NULL and FALSE causes the calling process to
    893         # handle Ctrl+C, rather than ignore it. This property is inherited
    894         # by subprocesses.
    895         SetConsoleCtrlHandler(NULL, 0)
    896 
    897         self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
    898 
    899     def test_CTRL_BREAK_EVENT(self):
    900         self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
    901 
    902 
    903 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
    904 class Win32ListdirTests(unittest.TestCase):
    905     """Test listdir on Windows."""
    906 
    907     def setUp(self):
    908         self.created_paths = []
    909         for i in range(2):
    910             dir_name = 'SUB%d' % i
    911             dir_path = os.path.join(support.TESTFN, dir_name)
    912             file_name = 'FILE%d' % i
    913             file_path = os.path.join(support.TESTFN, file_name)
    914             os.makedirs(dir_path)
    915             with open(file_path, 'w') as f:
    916                 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
    917             self.created_paths.extend([dir_name, file_name])
    918         self.created_paths.sort()
    919 
    920     def tearDown(self):
    921         shutil.rmtree(support.TESTFN)
    922 
    923     def test_listdir_no_extended_path(self):
    924         """Test when the path is not an "extended" path."""
    925         # unicode
    926         fs_encoding = sys.getfilesystemencoding()
    927         self.assertEqual(
    928                 sorted(os.listdir(support.TESTFN.decode(fs_encoding))),
    929                 [path.decode(fs_encoding) for path in self.created_paths])
    930 
    931         # bytes
    932         self.assertEqual(
    933                 sorted(os.listdir(os.fsencode(support.TESTFN))),
    934                 self.created_paths)
    935 
    936     def test_listdir_extended_path(self):
    937         """Test when the path starts with '\\\\?\\'."""
    938         # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
    939         # unicode
    940         fs_encoding = sys.getfilesystemencoding()
    941         path = u'\\\\?\\' + os.path.abspath(support.TESTFN.decode(fs_encoding))
    942         self.assertEqual(
    943                 sorted(os.listdir(path)),
    944                 [path.decode(fs_encoding) for path in self.created_paths])
    945 
    946         # bytes
    947         path = b'\\\\?\\' + os.path.abspath(support.TESTFN)
    948         self.assertEqual(
    949                 sorted(os.listdir(path)),
    950                 self.created_paths)
    951 
    952 
    953 class SpawnTests(unittest.TestCase):
    954     def _test_invalid_env(self, spawn):
    955         args = [sys.executable, '-c', 'pass']
    956 
    957         # null character in the enviroment variable name
    958         newenv = os.environ.copy()
    959         newenv["FRUIT\0VEGETABLE"] = "cabbage"
    960         try:
    961             exitcode = spawn(os.P_WAIT, args[0], args, newenv)
    962         except TypeError:
    963             pass
    964         else:
    965             self.assertEqual(exitcode, 127)
    966 
    967         # null character in the enviroment variable value
    968         newenv = os.environ.copy()
    969         newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
    970         try:
    971             exitcode = spawn(os.P_WAIT, args[0], args, newenv)
    972         except TypeError:
    973             pass
    974         else:
    975             self.assertEqual(exitcode, 127)
    976 
    977         # equal character in the enviroment variable name
    978         newenv = os.environ.copy()
    979         newenv["FRUIT=ORANGE"] = "lemon"
    980         try:
    981             exitcode = spawn(os.P_WAIT, args[0], args, newenv)
    982         except ValueError:
    983             pass
    984         else:
    985             self.assertEqual(exitcode, 127)
    986 
    987         # equal character in the enviroment variable value
    988         filename = test_support.TESTFN
    989         self.addCleanup(test_support.unlink, filename)
    990         with open(filename, "w") as fp:
    991             fp.write('import sys, os\n'
    992                      'if os.getenv("FRUIT") != "orange=lemon":\n'
    993                      '    raise AssertionError')
    994         args = [sys.executable, filename]
    995         newenv = os.environ.copy()
    996         newenv["FRUIT"] = "orange=lemon"
    997         exitcode = spawn(os.P_WAIT, args[0], args, newenv)
    998         self.assertEqual(exitcode, 0)
    999 
   1000     @unittest.skipUnless(hasattr(os, 'spawnve'), 'test needs os.spawnve()')
   1001     def test_spawnve_invalid_env(self):
   1002         self._test_invalid_env(os.spawnve)
   1003 
   1004     @unittest.skipUnless(hasattr(os, 'spawnvpe'), 'test needs os.spawnvpe()')
   1005     def test_spawnvpe_invalid_env(self):
   1006         self._test_invalid_env(os.spawnvpe)
   1007 
   1008 
   1009 def test_main():
   1010     test_support.run_unittest(
   1011         FileTests,
   1012         TemporaryFileTests,
   1013         StatAttributeTests,
   1014         EnvironTests,
   1015         WalkTests,
   1016         MakedirTests,
   1017         DevNullTests,
   1018         URandomTests,
   1019         URandomFDTests,
   1020         ExecTests,
   1021         Win32ErrorTests,
   1022         TestInvalidFD,
   1023         PosixUidGidTests,
   1024         Win32KillTests,
   1025         SpawnTests,
   1026     )
   1027 
   1028 if __name__ == "__main__":
   1029     test_main()
   1030