Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support
      3 import subprocess
      4 import sys
      5 import signal
      6 import os
      7 import errno
      8 import tempfile
      9 import time
     10 import re
     11 import sysconfig
     12 
     13 try:
     14     import resource
     15 except ImportError:
     16     resource = None
     17 
     18 mswindows = (sys.platform == "win32")
     19 
     20 #
     21 # Depends on the following external programs: Python
     22 #
     23 
     24 if mswindows:
     25     SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
     26                                                 'os.O_BINARY);')
     27 else:
     28     SETBINARY = ''
     29 
     30 
     31 try:
     32     mkstemp = tempfile.mkstemp
     33 except AttributeError:
     34     # tempfile.mkstemp is not available
     35     def mkstemp():
     36         """Replacement for mkstemp, calling mktemp."""
     37         fname = tempfile.mktemp()
     38         return os.open(fname, os.O_RDWR|os.O_CREAT), fname
     39 
     40 
     41 class BaseTestCase(unittest.TestCase):
     42     def setUp(self):
     43         # Try to minimize the number of children we have so this test
     44         # doesn't crash on some buildbots (Alphas in particular).
     45         test_support.reap_children()
     46 
     47     def tearDown(self):
     48         for inst in subprocess._active:
     49             inst.wait()
     50         subprocess._cleanup()
     51         self.assertFalse(subprocess._active, "subprocess._active not empty")
     52 
     53     def assertStderrEqual(self, stderr, expected, msg=None):
     54         # In a debug build, stuff like "[6580 refs]" is printed to stderr at
     55         # shutdown time.  That frustrates tests trying to check stderr produced
     56         # from a spawned Python process.
     57         actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
     58         self.assertEqual(actual, expected, msg)
     59 
     60 
     61 class PopenTestException(Exception):
     62     pass
     63 
     64 
     65 class PopenExecuteChildRaises(subprocess.Popen):
     66     """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
     67     _execute_child fails.
     68     """
     69     def _execute_child(self, *args, **kwargs):
     70         raise PopenTestException("Forced Exception for Test")
     71 
     72 
     73 class ProcessTestCase(BaseTestCase):
     74 
     75     def test_call_seq(self):
     76         # call() function with sequence argument
     77         rc = subprocess.call([sys.executable, "-c",
     78                               "import sys; sys.exit(47)"])
     79         self.assertEqual(rc, 47)
     80 
     81     def test_check_call_zero(self):
     82         # check_call() function with zero return code
     83         rc = subprocess.check_call([sys.executable, "-c",
     84                                     "import sys; sys.exit(0)"])
     85         self.assertEqual(rc, 0)
     86 
     87     def test_check_call_nonzero(self):
     88         # check_call() function with non-zero return code
     89         with self.assertRaises(subprocess.CalledProcessError) as c:
     90             subprocess.check_call([sys.executable, "-c",
     91                                    "import sys; sys.exit(47)"])
     92         self.assertEqual(c.exception.returncode, 47)
     93 
     94     def test_check_output(self):
     95         # check_output() function with zero return code
     96         output = subprocess.check_output(
     97                 [sys.executable, "-c", "print 'BDFL'"])
     98         self.assertIn('BDFL', output)
     99 
    100     def test_check_output_nonzero(self):
    101         # check_call() function with non-zero return code
    102         with self.assertRaises(subprocess.CalledProcessError) as c:
    103             subprocess.check_output(
    104                     [sys.executable, "-c", "import sys; sys.exit(5)"])
    105         self.assertEqual(c.exception.returncode, 5)
    106 
    107     def test_check_output_stderr(self):
    108         # check_output() function stderr redirected to stdout
    109         output = subprocess.check_output(
    110                 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
    111                 stderr=subprocess.STDOUT)
    112         self.assertIn('BDFL', output)
    113 
    114     def test_check_output_stdout_arg(self):
    115         # check_output() function stderr redirected to stdout
    116         with self.assertRaises(ValueError) as c:
    117             output = subprocess.check_output(
    118                     [sys.executable, "-c", "print 'will not be run'"],
    119                     stdout=sys.stdout)
    120             self.fail("Expected ValueError when stdout arg supplied.")
    121         self.assertIn('stdout', c.exception.args[0])
    122 
    123     def test_call_kwargs(self):
    124         # call() function with keyword args
    125         newenv = os.environ.copy()
    126         newenv["FRUIT"] = "banana"
    127         rc = subprocess.call([sys.executable, "-c",
    128                               'import sys, os;'
    129                               'sys.exit(os.getenv("FRUIT")=="banana")'],
    130                              env=newenv)
    131         self.assertEqual(rc, 1)
    132 
    133     def test_invalid_args(self):
    134         # Popen() called with invalid arguments should raise TypeError
    135         # but Popen.__del__ should not complain (issue #12085)
    136         with test_support.captured_stderr() as s:
    137             self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
    138             argcount = subprocess.Popen.__init__.__code__.co_argcount
    139             too_many_args = [0] * (argcount + 1)
    140             self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
    141         self.assertEqual(s.getvalue(), '')
    142 
    143     def test_stdin_none(self):
    144         # .stdin is None when not redirected
    145         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
    146                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    147         self.addCleanup(p.stdout.close)
    148         self.addCleanup(p.stderr.close)
    149         p.wait()
    150         self.assertEqual(p.stdin, None)
    151 
    152     def test_stdout_none(self):
    153         # .stdout is None when not redirected, and the child's stdout will
    154         # be inherited from the parent.  In order to test this we run a
    155         # subprocess in a subprocess:
    156         # this_test
    157         #   \-- subprocess created by this test (parent)
    158         #          \-- subprocess created by the parent subprocess (child)
    159         # The parent doesn't specify stdout, so the child will use the
    160         # parent's stdout.  This test checks that the message printed by the
    161         # child goes to the parent stdout.  The parent also checks that the
    162         # child's stdout is None.  See #11963.
    163         code = ('import sys; from subprocess import Popen, PIPE;'
    164                 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
    165                 '          stdin=PIPE, stderr=PIPE);'
    166                 'p.wait(); assert p.stdout is None;')
    167         p = subprocess.Popen([sys.executable, "-c", code],
    168                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    169         self.addCleanup(p.stdout.close)
    170         self.addCleanup(p.stderr.close)
    171         out, err = p.communicate()
    172         self.assertEqual(p.returncode, 0, err)
    173         self.assertEqual(out.rstrip(), 'test_stdout_none')
    174 
    175     def test_stderr_none(self):
    176         # .stderr is None when not redirected
    177         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
    178                          stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    179         self.addCleanup(p.stdout.close)
    180         self.addCleanup(p.stdin.close)
    181         p.wait()
    182         self.assertEqual(p.stderr, None)
    183 
    184     def test_executable_with_cwd(self):
    185         python_dir = os.path.dirname(os.path.realpath(sys.executable))
    186         p = subprocess.Popen(["somethingyoudonthave", "-c",
    187                               "import sys; sys.exit(47)"],
    188                              executable=sys.executable, cwd=python_dir)
    189         p.wait()
    190         self.assertEqual(p.returncode, 47)
    191 
    192     @unittest.skipIf(sysconfig.is_python_build(),
    193                      "need an installed Python. See #7774")
    194     def test_executable_without_cwd(self):
    195         # For a normal installation, it should work without 'cwd'
    196         # argument.  For test runs in the build directory, see #7774.
    197         p = subprocess.Popen(["somethingyoudonthave", "-c",
    198                               "import sys; sys.exit(47)"],
    199                              executable=sys.executable)
    200         p.wait()
    201         self.assertEqual(p.returncode, 47)
    202 
    203     def test_stdin_pipe(self):
    204         # stdin redirection
    205         p = subprocess.Popen([sys.executable, "-c",
    206                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    207                         stdin=subprocess.PIPE)
    208         p.stdin.write("pear")
    209         p.stdin.close()
    210         p.wait()
    211         self.assertEqual(p.returncode, 1)
    212 
    213     def test_stdin_filedes(self):
    214         # stdin is set to open file descriptor
    215         tf = tempfile.TemporaryFile()
    216         d = tf.fileno()
    217         os.write(d, "pear")
    218         os.lseek(d, 0, 0)
    219         p = subprocess.Popen([sys.executable, "-c",
    220                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    221                          stdin=d)
    222         p.wait()
    223         self.assertEqual(p.returncode, 1)
    224 
    225     def test_stdin_fileobj(self):
    226         # stdin is set to open file object
    227         tf = tempfile.TemporaryFile()
    228         tf.write("pear")
    229         tf.seek(0)
    230         p = subprocess.Popen([sys.executable, "-c",
    231                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    232                          stdin=tf)
    233         p.wait()
    234         self.assertEqual(p.returncode, 1)
    235 
    236     def test_stdout_pipe(self):
    237         # stdout redirection
    238         p = subprocess.Popen([sys.executable, "-c",
    239                           'import sys; sys.stdout.write("orange")'],
    240                          stdout=subprocess.PIPE)
    241         self.addCleanup(p.stdout.close)
    242         self.assertEqual(p.stdout.read(), "orange")
    243 
    244     def test_stdout_filedes(self):
    245         # stdout is set to open file descriptor
    246         tf = tempfile.TemporaryFile()
    247         d = tf.fileno()
    248         p = subprocess.Popen([sys.executable, "-c",
    249                           'import sys; sys.stdout.write("orange")'],
    250                          stdout=d)
    251         p.wait()
    252         os.lseek(d, 0, 0)
    253         self.assertEqual(os.read(d, 1024), "orange")
    254 
    255     def test_stdout_fileobj(self):
    256         # stdout is set to open file object
    257         tf = tempfile.TemporaryFile()
    258         p = subprocess.Popen([sys.executable, "-c",
    259                           'import sys; sys.stdout.write("orange")'],
    260                          stdout=tf)
    261         p.wait()
    262         tf.seek(0)
    263         self.assertEqual(tf.read(), "orange")
    264 
    265     def test_stderr_pipe(self):
    266         # stderr redirection
    267         p = subprocess.Popen([sys.executable, "-c",
    268                           'import sys; sys.stderr.write("strawberry")'],
    269                          stderr=subprocess.PIPE)
    270         self.addCleanup(p.stderr.close)
    271         self.assertStderrEqual(p.stderr.read(), "strawberry")
    272 
    273     def test_stderr_filedes(self):
    274         # stderr is set to open file descriptor
    275         tf = tempfile.TemporaryFile()
    276         d = tf.fileno()
    277         p = subprocess.Popen([sys.executable, "-c",
    278                           'import sys; sys.stderr.write("strawberry")'],
    279                          stderr=d)
    280         p.wait()
    281         os.lseek(d, 0, 0)
    282         self.assertStderrEqual(os.read(d, 1024), "strawberry")
    283 
    284     def test_stderr_fileobj(self):
    285         # stderr is set to open file object
    286         tf = tempfile.TemporaryFile()
    287         p = subprocess.Popen([sys.executable, "-c",
    288                           'import sys; sys.stderr.write("strawberry")'],
    289                          stderr=tf)
    290         p.wait()
    291         tf.seek(0)
    292         self.assertStderrEqual(tf.read(), "strawberry")
    293 
    294     def test_stdout_stderr_pipe(self):
    295         # capture stdout and stderr to the same pipe
    296         p = subprocess.Popen([sys.executable, "-c",
    297                           'import sys;'
    298                           'sys.stdout.write("apple");'
    299                           'sys.stdout.flush();'
    300                           'sys.stderr.write("orange")'],
    301                          stdout=subprocess.PIPE,
    302                          stderr=subprocess.STDOUT)
    303         self.addCleanup(p.stdout.close)
    304         self.assertStderrEqual(p.stdout.read(), "appleorange")
    305 
    306     def test_stdout_stderr_file(self):
    307         # capture stdout and stderr to the same open file
    308         tf = tempfile.TemporaryFile()
    309         p = subprocess.Popen([sys.executable, "-c",
    310                           'import sys;'
    311                           'sys.stdout.write("apple");'
    312                           'sys.stdout.flush();'
    313                           'sys.stderr.write("orange")'],
    314                          stdout=tf,
    315                          stderr=tf)
    316         p.wait()
    317         tf.seek(0)
    318         self.assertStderrEqual(tf.read(), "appleorange")
    319 
    320     def test_stdout_filedes_of_stdout(self):
    321         # stdout is set to 1 (#1531862).
    322         # To avoid printing the text on stdout, we do something similar to
    323         # test_stdout_none (see above).  The parent subprocess calls the child
    324         # subprocess passing stdout=1, and this test uses stdout=PIPE in
    325         # order to capture and check the output of the parent. See #11963.
    326         code = ('import sys, subprocess; '
    327                 'rc = subprocess.call([sys.executable, "-c", '
    328                 '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
    329                      '\'test with stdout=1\'))"], stdout=1); '
    330                 'assert rc == 18')
    331         p = subprocess.Popen([sys.executable, "-c", code],
    332                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    333         self.addCleanup(p.stdout.close)
    334         self.addCleanup(p.stderr.close)
    335         out, err = p.communicate()
    336         self.assertEqual(p.returncode, 0, err)
    337         self.assertEqual(out.rstrip(), 'test with stdout=1')
    338 
    339     def test_cwd(self):
    340         tmpdir = tempfile.gettempdir()
    341         # We cannot use os.path.realpath to canonicalize the path,
    342         # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
    343         cwd = os.getcwd()
    344         os.chdir(tmpdir)
    345         tmpdir = os.getcwd()
    346         os.chdir(cwd)
    347         p = subprocess.Popen([sys.executable, "-c",
    348                           'import sys,os;'
    349                           'sys.stdout.write(os.getcwd())'],
    350                          stdout=subprocess.PIPE,
    351                          cwd=tmpdir)
    352         self.addCleanup(p.stdout.close)
    353         normcase = os.path.normcase
    354         self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
    355 
    356     def test_env(self):
    357         newenv = os.environ.copy()
    358         newenv["FRUIT"] = "orange"
    359         p = subprocess.Popen([sys.executable, "-c",
    360                           'import sys,os;'
    361                           'sys.stdout.write(os.getenv("FRUIT"))'],
    362                          stdout=subprocess.PIPE,
    363                          env=newenv)
    364         self.addCleanup(p.stdout.close)
    365         self.assertEqual(p.stdout.read(), "orange")
    366 
    367     def test_communicate_stdin(self):
    368         p = subprocess.Popen([sys.executable, "-c",
    369                               'import sys;'
    370                               'sys.exit(sys.stdin.read() == "pear")'],
    371                              stdin=subprocess.PIPE)
    372         p.communicate("pear")
    373         self.assertEqual(p.returncode, 1)
    374 
    375     def test_communicate_stdout(self):
    376         p = subprocess.Popen([sys.executable, "-c",
    377                               'import sys; sys.stdout.write("pineapple")'],
    378                              stdout=subprocess.PIPE)
    379         (stdout, stderr) = p.communicate()
    380         self.assertEqual(stdout, "pineapple")
    381         self.assertEqual(stderr, None)
    382 
    383     def test_communicate_stderr(self):
    384         p = subprocess.Popen([sys.executable, "-c",
    385                               'import sys; sys.stderr.write("pineapple")'],
    386                              stderr=subprocess.PIPE)
    387         (stdout, stderr) = p.communicate()
    388         self.assertEqual(stdout, None)
    389         self.assertStderrEqual(stderr, "pineapple")
    390 
    391     def test_communicate(self):
    392         p = subprocess.Popen([sys.executable, "-c",
    393                           'import sys,os;'
    394                           'sys.stderr.write("pineapple");'
    395                           'sys.stdout.write(sys.stdin.read())'],
    396                          stdin=subprocess.PIPE,
    397                          stdout=subprocess.PIPE,
    398                          stderr=subprocess.PIPE)
    399         self.addCleanup(p.stdout.close)
    400         self.addCleanup(p.stderr.close)
    401         self.addCleanup(p.stdin.close)
    402         (stdout, stderr) = p.communicate("banana")
    403         self.assertEqual(stdout, "banana")
    404         self.assertStderrEqual(stderr, "pineapple")
    405 
    406     # This test is Linux specific for simplicity to at least have
    407     # some coverage.  It is not a platform specific bug.
    408     @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
    409                          "Linux specific")
    410     # Test for the fd leak reported in http://bugs.python.org/issue2791.
    411     def test_communicate_pipe_fd_leak(self):
    412         fd_directory = '/proc/%d/fd' % os.getpid()
    413         num_fds_before_popen = len(os.listdir(fd_directory))
    414         p = subprocess.Popen([sys.executable, "-c", "print()"],
    415                              stdout=subprocess.PIPE)
    416         p.communicate()
    417         num_fds_after_communicate = len(os.listdir(fd_directory))
    418         del p
    419         num_fds_after_destruction = len(os.listdir(fd_directory))
    420         self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
    421         self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
    422 
    423     def test_communicate_returns(self):
    424         # communicate() should return None if no redirection is active
    425         p = subprocess.Popen([sys.executable, "-c",
    426                               "import sys; sys.exit(47)"])
    427         (stdout, stderr) = p.communicate()
    428         self.assertEqual(stdout, None)
    429         self.assertEqual(stderr, None)
    430 
    431     def test_communicate_pipe_buf(self):
    432         # communicate() with writes larger than pipe_buf
    433         # This test will probably deadlock rather than fail, if
    434         # communicate() does not work properly.
    435         x, y = os.pipe()
    436         if mswindows:
    437             pipe_buf = 512
    438         else:
    439             pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
    440         os.close(x)
    441         os.close(y)
    442         p = subprocess.Popen([sys.executable, "-c",
    443                           'import sys,os;'
    444                           'sys.stdout.write(sys.stdin.read(47));'
    445                           'sys.stderr.write("xyz"*%d);'
    446                           'sys.stdout.write(sys.stdin.read())' % pipe_buf],
    447                          stdin=subprocess.PIPE,
    448                          stdout=subprocess.PIPE,
    449                          stderr=subprocess.PIPE)
    450         self.addCleanup(p.stdout.close)
    451         self.addCleanup(p.stderr.close)
    452         self.addCleanup(p.stdin.close)
    453         string_to_write = "abc"*pipe_buf
    454         (stdout, stderr) = p.communicate(string_to_write)
    455         self.assertEqual(stdout, string_to_write)
    456 
    457     def test_writes_before_communicate(self):
    458         # stdin.write before communicate()
    459         p = subprocess.Popen([sys.executable, "-c",
    460                           'import sys,os;'
    461                           'sys.stdout.write(sys.stdin.read())'],
    462                          stdin=subprocess.PIPE,
    463                          stdout=subprocess.PIPE,
    464                          stderr=subprocess.PIPE)
    465         self.addCleanup(p.stdout.close)
    466         self.addCleanup(p.stderr.close)
    467         self.addCleanup(p.stdin.close)
    468         p.stdin.write("banana")
    469         (stdout, stderr) = p.communicate("split")
    470         self.assertEqual(stdout, "bananasplit")
    471         self.assertStderrEqual(stderr, "")
    472 
    473     def test_universal_newlines(self):
    474         p = subprocess.Popen([sys.executable, "-c",
    475                           'import sys,os;' + SETBINARY +
    476                           'sys.stdout.write("line1\\n");'
    477                           'sys.stdout.flush();'
    478                           'sys.stdout.write("line2\\r");'
    479                           'sys.stdout.flush();'
    480                           'sys.stdout.write("line3\\r\\n");'
    481                           'sys.stdout.flush();'
    482                           'sys.stdout.write("line4\\r");'
    483                           'sys.stdout.flush();'
    484                           'sys.stdout.write("\\nline5");'
    485                           'sys.stdout.flush();'
    486                           'sys.stdout.write("\\nline6");'],
    487                          stdout=subprocess.PIPE,
    488                          universal_newlines=1)
    489         self.addCleanup(p.stdout.close)
    490         stdout = p.stdout.read()
    491         if hasattr(file, 'newlines'):
    492             # Interpreter with universal newline support
    493             self.assertEqual(stdout,
    494                              "line1\nline2\nline3\nline4\nline5\nline6")
    495         else:
    496             # Interpreter without universal newline support
    497             self.assertEqual(stdout,
    498                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
    499 
    500     def test_universal_newlines_communicate(self):
    501         # universal newlines through communicate()
    502         p = subprocess.Popen([sys.executable, "-c",
    503                           'import sys,os;' + SETBINARY +
    504                           'sys.stdout.write("line1\\n");'
    505                           'sys.stdout.flush();'
    506                           'sys.stdout.write("line2\\r");'
    507                           'sys.stdout.flush();'
    508                           'sys.stdout.write("line3\\r\\n");'
    509                           'sys.stdout.flush();'
    510                           'sys.stdout.write("line4\\r");'
    511                           'sys.stdout.flush();'
    512                           'sys.stdout.write("\\nline5");'
    513                           'sys.stdout.flush();'
    514                           'sys.stdout.write("\\nline6");'],
    515                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    516                          universal_newlines=1)
    517         self.addCleanup(p.stdout.close)
    518         self.addCleanup(p.stderr.close)
    519         (stdout, stderr) = p.communicate()
    520         if hasattr(file, 'newlines'):
    521             # Interpreter with universal newline support
    522             self.assertEqual(stdout,
    523                              "line1\nline2\nline3\nline4\nline5\nline6")
    524         else:
    525             # Interpreter without universal newline support
    526             self.assertEqual(stdout,
    527                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
    528 
    529     def test_no_leaking(self):
    530         # Make sure we leak no resources
    531         if not mswindows:
    532             max_handles = 1026 # too much for most UNIX systems
    533         else:
    534             max_handles = 2050 # too much for (at least some) Windows setups
    535         handles = []
    536         try:
    537             for i in range(max_handles):
    538                 try:
    539                     handles.append(os.open(test_support.TESTFN,
    540                                            os.O_WRONLY | os.O_CREAT))
    541                 except OSError as e:
    542                     if e.errno != errno.EMFILE:
    543                         raise
    544                     break
    545             else:
    546                 self.skipTest("failed to reach the file descriptor limit "
    547                     "(tried %d)" % max_handles)
    548             # Close a couple of them (should be enough for a subprocess)
    549             for i in range(10):
    550                 os.close(handles.pop())
    551             # Loop creating some subprocesses. If one of them leaks some fds,
    552             # the next loop iteration will fail by reaching the max fd limit.
    553             for i in range(15):
    554                 p = subprocess.Popen([sys.executable, "-c",
    555                                       "import sys;"
    556                                       "sys.stdout.write(sys.stdin.read())"],
    557                                      stdin=subprocess.PIPE,
    558                                      stdout=subprocess.PIPE,
    559                                      stderr=subprocess.PIPE)
    560                 data = p.communicate(b"lime")[0]
    561                 self.assertEqual(data, b"lime")
    562         finally:
    563             for h in handles:
    564                 os.close(h)
    565             test_support.unlink(test_support.TESTFN)
    566 
    567     def test_list2cmdline(self):
    568         self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
    569                          '"a b c" d e')
    570         self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
    571                          'ab\\"c \\ d')
    572         self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
    573                          'ab\\"c " \\\\" d')
    574         self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
    575                          'a\\\\\\b "de fg" h')
    576         self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
    577                          'a\\\\\\"b c d')
    578         self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
    579                          '"a\\\\b c" d e')
    580         self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
    581                          '"a\\\\b\\ c" d e')
    582         self.assertEqual(subprocess.list2cmdline(['ab', '']),
    583                          'ab ""')
    584 
    585 
    586     def test_poll(self):
    587         p = subprocess.Popen([sys.executable,
    588                           "-c", "import time; time.sleep(1)"])
    589         count = 0
    590         while p.poll() is None:
    591             time.sleep(0.1)
    592             count += 1
    593         # We expect that the poll loop probably went around about 10 times,
    594         # but, based on system scheduling we can't control, it's possible
    595         # poll() never returned None.  It "should be" very rare that it
    596         # didn't go around at least twice.
    597         self.assertGreaterEqual(count, 2)
    598         # Subsequent invocations should just return the returncode
    599         self.assertEqual(p.poll(), 0)
    600 
    601 
    602     def test_wait(self):
    603         p = subprocess.Popen([sys.executable,
    604                           "-c", "import time; time.sleep(2)"])
    605         self.assertEqual(p.wait(), 0)
    606         # Subsequent invocations should just return the returncode
    607         self.assertEqual(p.wait(), 0)
    608 
    609 
    610     def test_invalid_bufsize(self):
    611         # an invalid type of the bufsize argument should raise
    612         # TypeError.
    613         with self.assertRaises(TypeError):
    614             subprocess.Popen([sys.executable, "-c", "pass"], "orange")
    615 
    616     def test_leaking_fds_on_error(self):
    617         # see bug #5179: Popen leaks file descriptors to PIPEs if
    618         # the child fails to execute; this will eventually exhaust
    619         # the maximum number of open fds. 1024 seems a very common
    620         # value for that limit, but Windows has 2048, so we loop
    621         # 1024 times (each call leaked two fds).
    622         for i in range(1024):
    623             # Windows raises IOError.  Others raise OSError.
    624             with self.assertRaises(EnvironmentError) as c:
    625                 subprocess.Popen(['nonexisting_i_hope'],
    626                                  stdout=subprocess.PIPE,
    627                                  stderr=subprocess.PIPE)
    628             # ignore errors that indicate the command was not found
    629             if c.exception.errno not in (errno.ENOENT, errno.EACCES):
    630                 raise c.exception
    631 
    632     def test_handles_closed_on_exception(self):
    633         # If CreateProcess exits with an error, ensure the
    634         # duplicate output handles are released
    635         ifhandle, ifname = mkstemp()
    636         ofhandle, ofname = mkstemp()
    637         efhandle, efname = mkstemp()
    638         try:
    639             subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
    640               stderr=efhandle)
    641         except OSError:
    642             os.close(ifhandle)
    643             os.remove(ifname)
    644             os.close(ofhandle)
    645             os.remove(ofname)
    646             os.close(efhandle)
    647             os.remove(efname)
    648         self.assertFalse(os.path.exists(ifname))
    649         self.assertFalse(os.path.exists(ofname))
    650         self.assertFalse(os.path.exists(efname))
    651 
    652     def test_communicate_epipe(self):
    653         # Issue 10963: communicate() should hide EPIPE
    654         p = subprocess.Popen([sys.executable, "-c", 'pass'],
    655                              stdin=subprocess.PIPE,
    656                              stdout=subprocess.PIPE,
    657                              stderr=subprocess.PIPE)
    658         self.addCleanup(p.stdout.close)
    659         self.addCleanup(p.stderr.close)
    660         self.addCleanup(p.stdin.close)
    661         p.communicate("x" * 2**20)
    662 
    663     def test_communicate_epipe_only_stdin(self):
    664         # Issue 10963: communicate() should hide EPIPE
    665         p = subprocess.Popen([sys.executable, "-c", 'pass'],
    666                              stdin=subprocess.PIPE)
    667         self.addCleanup(p.stdin.close)
    668         time.sleep(2)
    669         p.communicate("x" * 2**20)
    670 
    671     # This test is Linux-ish specific for simplicity to at least have
    672     # some coverage.  It is not a platform specific bug.
    673     @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
    674                          "Linux specific")
    675     def test_failed_child_execute_fd_leak(self):
    676         """Test for the fork() failure fd leak reported in issue16327."""
    677         fd_directory = '/proc/%d/fd' % os.getpid()
    678         fds_before_popen = os.listdir(fd_directory)
    679         with self.assertRaises(PopenTestException):
    680             PopenExecuteChildRaises(
    681                     [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
    682                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    683 
    684         # NOTE: This test doesn't verify that the real _execute_child
    685         # does not close the file descriptors itself on the way out
    686         # during an exception.  Code inspection has confirmed that.
    687 
    688         fds_after_exception = os.listdir(fd_directory)
    689         self.assertEqual(fds_before_popen, fds_after_exception)
    690 
    691 
    692 # context manager
    693 class _SuppressCoreFiles(object):
    694     """Try to prevent core files from being created."""
    695     old_limit = None
    696 
    697     def __enter__(self):
    698         """Try to save previous ulimit, then set it to (0, 0)."""
    699         if resource is not None:
    700             try:
    701                 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
    702                 resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
    703             except (ValueError, resource.error):
    704                 pass
    705 
    706         if sys.platform == 'darwin':
    707             # Check if the 'Crash Reporter' on OSX was configured
    708             # in 'Developer' mode and warn that it will get triggered
    709             # when it is.
    710             #
    711             # This assumes that this context manager is used in tests
    712             # that might trigger the next manager.
    713             value = subprocess.Popen(['/usr/bin/defaults', 'read',
    714                     'com.apple.CrashReporter', 'DialogType'],
    715                     stdout=subprocess.PIPE).communicate()[0]
    716             if value.strip() == b'developer':
    717                 print "this tests triggers the Crash Reporter, that is intentional"
    718                 sys.stdout.flush()
    719 
    720     def __exit__(self, *args):
    721         """Return core file behavior to default."""
    722         if self.old_limit is None:
    723             return
    724         if resource is not None:
    725             try:
    726                 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
    727             except (ValueError, resource.error):
    728                 pass
    729 
    730     @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
    731                          "Requires signal.SIGALRM")
    732     def test_communicate_eintr(self):
    733         # Issue #12493: communicate() should handle EINTR
    734         def handler(signum, frame):
    735             pass
    736         old_handler = signal.signal(signal.SIGALRM, handler)
    737         self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
    738 
    739         # the process is running for 2 seconds
    740         args = [sys.executable, "-c", 'import time; time.sleep(2)']
    741         for stream in ('stdout', 'stderr'):
    742             kw = {stream: subprocess.PIPE}
    743             with subprocess.Popen(args, **kw) as process:
    744                 signal.alarm(1)
    745                 # communicate() will be interrupted by SIGALRM
    746                 process.communicate()
    747 
    748 
    749 @unittest.skipIf(mswindows, "POSIX specific tests")
    750 class POSIXProcessTestCase(BaseTestCase):
    751 
    752     def test_exceptions(self):
    753         # caught & re-raised exceptions
    754         with self.assertRaises(OSError) as c:
    755             p = subprocess.Popen([sys.executable, "-c", ""],
    756                                  cwd="/this/path/does/not/exist")
    757         # The attribute child_traceback should contain "os.chdir" somewhere.
    758         self.assertIn("os.chdir", c.exception.child_traceback)
    759 
    760     def test_run_abort(self):
    761         # returncode handles signal termination
    762         with _SuppressCoreFiles():
    763             p = subprocess.Popen([sys.executable, "-c",
    764                                   "import os; os.abort()"])
    765             p.wait()
    766         self.assertEqual(-p.returncode, signal.SIGABRT)
    767 
    768     def test_preexec(self):
    769         # preexec function
    770         p = subprocess.Popen([sys.executable, "-c",
    771                               "import sys, os;"
    772                               "sys.stdout.write(os.getenv('FRUIT'))"],
    773                              stdout=subprocess.PIPE,
    774                              preexec_fn=lambda: os.putenv("FRUIT", "apple"))
    775         self.addCleanup(p.stdout.close)
    776         self.assertEqual(p.stdout.read(), "apple")
    777 
    778     class _TestExecuteChildPopen(subprocess.Popen):
    779         """Used to test behavior at the end of _execute_child."""
    780         def __init__(self, testcase, *args, **kwargs):
    781             self._testcase = testcase
    782             subprocess.Popen.__init__(self, *args, **kwargs)
    783 
    784         def _execute_child(
    785                 self, args, executable, preexec_fn, close_fds, cwd, env,
    786                 universal_newlines, startupinfo, creationflags, shell,
    787                 p2cread, p2cwrite,
    788                 c2pread, c2pwrite,
    789                 errread, errwrite):
    790             try:
    791                 subprocess.Popen._execute_child(
    792                         self, args, executable, preexec_fn, close_fds,
    793                         cwd, env, universal_newlines,
    794                         startupinfo, creationflags, shell,
    795                         p2cread, p2cwrite,
    796                         c2pread, c2pwrite,
    797                         errread, errwrite)
    798             finally:
    799                 # Open a bunch of file descriptors and verify that
    800                 # none of them are the same as the ones the Popen
    801                 # instance is using for stdin/stdout/stderr.
    802                 devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
    803                                for _ in range(8)]
    804                 try:
    805                     for fd in devzero_fds:
    806                         self._testcase.assertNotIn(
    807                                 fd, (p2cwrite, c2pread, errread))
    808                 finally:
    809                     map(os.close, devzero_fds)
    810 
    811     @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
    812     def test_preexec_errpipe_does_not_double_close_pipes(self):
    813         """Issue16140: Don't double close pipes on preexec error."""
    814 
    815         def raise_it():
    816             raise RuntimeError("force the _execute_child() errpipe_data path.")
    817 
    818         with self.assertRaises(RuntimeError):
    819             self._TestExecuteChildPopen(
    820                     self, [sys.executable, "-c", "pass"],
    821                     stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    822                     stderr=subprocess.PIPE, preexec_fn=raise_it)
    823 
    824     def test_args_string(self):
    825         # args is a string
    826         f, fname = mkstemp()
    827         os.write(f, "#!/bin/sh\n")
    828         os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
    829                     sys.executable)
    830         os.close(f)
    831         os.chmod(fname, 0o700)
    832         p = subprocess.Popen(fname)
    833         p.wait()
    834         os.remove(fname)
    835         self.assertEqual(p.returncode, 47)
    836 
    837     def test_invalid_args(self):
    838         # invalid arguments should raise ValueError
    839         self.assertRaises(ValueError, subprocess.call,
    840                           [sys.executable, "-c",
    841                            "import sys; sys.exit(47)"],
    842                           startupinfo=47)
    843         self.assertRaises(ValueError, subprocess.call,
    844                           [sys.executable, "-c",
    845                            "import sys; sys.exit(47)"],
    846                           creationflags=47)
    847 
    848     def test_shell_sequence(self):
    849         # Run command through the shell (sequence)
    850         newenv = os.environ.copy()
    851         newenv["FRUIT"] = "apple"
    852         p = subprocess.Popen(["echo $FRUIT"], shell=1,
    853                              stdout=subprocess.PIPE,
    854                              env=newenv)
    855         self.addCleanup(p.stdout.close)
    856         self.assertEqual(p.stdout.read().strip(), "apple")
    857 
    858     def test_shell_string(self):
    859         # Run command through the shell (string)
    860         newenv = os.environ.copy()
    861         newenv["FRUIT"] = "apple"
    862         p = subprocess.Popen("echo $FRUIT", shell=1,
    863                              stdout=subprocess.PIPE,
    864                              env=newenv)
    865         self.addCleanup(p.stdout.close)
    866         self.assertEqual(p.stdout.read().strip(), "apple")
    867 
    868     def test_call_string(self):
    869         # call() function with string argument on UNIX
    870         f, fname = mkstemp()
    871         os.write(f, "#!/bin/sh\n")
    872         os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
    873                     sys.executable)
    874         os.close(f)
    875         os.chmod(fname, 0700)
    876         rc = subprocess.call(fname)
    877         os.remove(fname)
    878         self.assertEqual(rc, 47)
    879 
    880     def test_specific_shell(self):
    881         # Issue #9265: Incorrect name passed as arg[0].
    882         shells = []
    883         for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
    884             for name in ['bash', 'ksh']:
    885                 sh = os.path.join(prefix, name)
    886                 if os.path.isfile(sh):
    887                     shells.append(sh)
    888         if not shells: # Will probably work for any shell but csh.
    889             self.skipTest("bash or ksh required for this test")
    890         sh = '/bin/sh'
    891         if os.path.isfile(sh) and not os.path.islink(sh):
    892             # Test will fail if /bin/sh is a symlink to csh.
    893             shells.append(sh)
    894         for sh in shells:
    895             p = subprocess.Popen("echo $0", executable=sh, shell=True,
    896                                  stdout=subprocess.PIPE)
    897             self.addCleanup(p.stdout.close)
    898             self.assertEqual(p.stdout.read().strip(), sh)
    899 
    900     def _kill_process(self, method, *args):
    901         # Do not inherit file handles from the parent.
    902         # It should fix failures on some platforms.
    903         p = subprocess.Popen([sys.executable, "-c", """if 1:
    904                              import sys, time
    905                              sys.stdout.write('x\\n')
    906                              sys.stdout.flush()
    907                              time.sleep(30)
    908                              """],
    909                              close_fds=True,
    910                              stdin=subprocess.PIPE,
    911                              stdout=subprocess.PIPE,
    912                              stderr=subprocess.PIPE)
    913         # Wait for the interpreter to be completely initialized before
    914         # sending any signal.
    915         p.stdout.read(1)
    916         getattr(p, method)(*args)
    917         return p
    918 
    919     @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
    920                      "Due to known OS bug (issue #16762)")
    921     def _kill_dead_process(self, method, *args):
    922         # Do not inherit file handles from the parent.
    923         # It should fix failures on some platforms.
    924         p = subprocess.Popen([sys.executable, "-c", """if 1:
    925                              import sys, time
    926                              sys.stdout.write('x\\n')
    927                              sys.stdout.flush()
    928                              """],
    929                              close_fds=True,
    930                              stdin=subprocess.PIPE,
    931                              stdout=subprocess.PIPE,
    932                              stderr=subprocess.PIPE)
    933         # Wait for the interpreter to be completely initialized before
    934         # sending any signal.
    935         p.stdout.read(1)
    936         # The process should end after this
    937         time.sleep(1)
    938         # This shouldn't raise even though the child is now dead
    939         getattr(p, method)(*args)
    940         p.communicate()
    941 
    942     def test_send_signal(self):
    943         p = self._kill_process('send_signal', signal.SIGINT)
    944         _, stderr = p.communicate()
    945         self.assertIn('KeyboardInterrupt', stderr)
    946         self.assertNotEqual(p.wait(), 0)
    947 
    948     def test_kill(self):
    949         p = self._kill_process('kill')
    950         _, stderr = p.communicate()
    951         self.assertStderrEqual(stderr, '')
    952         self.assertEqual(p.wait(), -signal.SIGKILL)
    953 
    954     def test_terminate(self):
    955         p = self._kill_process('terminate')
    956         _, stderr = p.communicate()
    957         self.assertStderrEqual(stderr, '')
    958         self.assertEqual(p.wait(), -signal.SIGTERM)
    959 
    960     def test_send_signal_dead(self):
    961         # Sending a signal to a dead process
    962         self._kill_dead_process('send_signal', signal.SIGINT)
    963 
    964     def test_kill_dead(self):
    965         # Killing a dead process
    966         self._kill_dead_process('kill')
    967 
    968     def test_terminate_dead(self):
    969         # Terminating a dead process
    970         self._kill_dead_process('terminate')
    971 
    972     def check_close_std_fds(self, fds):
    973         # Issue #9905: test that subprocess pipes still work properly with
    974         # some standard fds closed
    975         stdin = 0
    976         newfds = []
    977         for a in fds:
    978             b = os.dup(a)
    979             newfds.append(b)
    980             if a == 0:
    981                 stdin = b
    982         try:
    983             for fd in fds:
    984                 os.close(fd)
    985             out, err = subprocess.Popen([sys.executable, "-c",
    986                               'import sys;'
    987                               'sys.stdout.write("apple");'
    988                               'sys.stdout.flush();'
    989                               'sys.stderr.write("orange")'],
    990                        stdin=stdin,
    991                        stdout=subprocess.PIPE,
    992                        stderr=subprocess.PIPE).communicate()
    993             err = test_support.strip_python_stderr(err)
    994             self.assertEqual((out, err), (b'apple', b'orange'))
    995         finally:
    996             for b, a in zip(newfds, fds):
    997                 os.dup2(b, a)
    998             for b in newfds:
    999                 os.close(b)
   1000 
   1001     def test_close_fd_0(self):
   1002         self.check_close_std_fds([0])
   1003 
   1004     def test_close_fd_1(self):
   1005         self.check_close_std_fds([1])
   1006 
   1007     def test_close_fd_2(self):
   1008         self.check_close_std_fds([2])
   1009 
   1010     def test_close_fds_0_1(self):
   1011         self.check_close_std_fds([0, 1])
   1012 
   1013     def test_close_fds_0_2(self):
   1014         self.check_close_std_fds([0, 2])
   1015 
   1016     def test_close_fds_1_2(self):
   1017         self.check_close_std_fds([1, 2])
   1018 
   1019     def test_close_fds_0_1_2(self):
   1020         # Issue #10806: test that subprocess pipes still work properly with
   1021         # all standard fds closed.
   1022         self.check_close_std_fds([0, 1, 2])
   1023 
   1024     def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
   1025         # open up some temporary files
   1026         temps = [mkstemp() for i in range(3)]
   1027         temp_fds = [fd for fd, fname in temps]
   1028         try:
   1029             # unlink the files -- we won't need to reopen them
   1030             for fd, fname in temps:
   1031                 os.unlink(fname)
   1032 
   1033             # save a copy of the standard file descriptors
   1034             saved_fds = [os.dup(fd) for fd in range(3)]
   1035             try:
   1036                 # duplicate the temp files over the standard fd's 0, 1, 2
   1037                 for fd, temp_fd in enumerate(temp_fds):
   1038                     os.dup2(temp_fd, fd)
   1039 
   1040                 # write some data to what will become stdin, and rewind
   1041                 os.write(stdin_no, b"STDIN")
   1042                 os.lseek(stdin_no, 0, 0)
   1043 
   1044                 # now use those files in the given order, so that subprocess
   1045                 # has to rearrange them in the child
   1046                 p = subprocess.Popen([sys.executable, "-c",
   1047                     'import sys; got = sys.stdin.read();'
   1048                     'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
   1049                     stdin=stdin_no,
   1050                     stdout=stdout_no,
   1051                     stderr=stderr_no)
   1052                 p.wait()
   1053 
   1054                 for fd in temp_fds:
   1055                     os.lseek(fd, 0, 0)
   1056 
   1057                 out = os.read(stdout_no, 1024)
   1058                 err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
   1059             finally:
   1060                 for std, saved in enumerate(saved_fds):
   1061                     os.dup2(saved, std)
   1062                     os.close(saved)
   1063 
   1064             self.assertEqual(out, b"got STDIN")
   1065             self.assertEqual(err, b"err")
   1066 
   1067         finally:
   1068             for fd in temp_fds:
   1069                 os.close(fd)
   1070 
   1071     # When duping fds, if there arises a situation where one of the fds is
   1072     # either 0, 1 or 2, it is possible that it is overwritten (#12607).
   1073     # This tests all combinations of this.
   1074     def test_swap_fds(self):
   1075         self.check_swap_fds(0, 1, 2)
   1076         self.check_swap_fds(0, 2, 1)
   1077         self.check_swap_fds(1, 0, 2)
   1078         self.check_swap_fds(1, 2, 0)
   1079         self.check_swap_fds(2, 0, 1)
   1080         self.check_swap_fds(2, 1, 0)
   1081 
   1082     def test_wait_when_sigchild_ignored(self):
   1083         # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
   1084         sigchild_ignore = test_support.findfile("sigchild_ignore.py",
   1085                                                 subdir="subprocessdata")
   1086         p = subprocess.Popen([sys.executable, sigchild_ignore],
   1087                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
   1088         stdout, stderr = p.communicate()
   1089         self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
   1090                          " non-zero with this error:\n%s" % stderr)
   1091 
   1092     def test_zombie_fast_process_del(self):
   1093         # Issue #12650: on Unix, if Popen.__del__() was called before the
   1094         # process exited, it wouldn't be added to subprocess._active, and would
   1095         # remain a zombie.
   1096         # spawn a Popen, and delete its reference before it exits
   1097         p = subprocess.Popen([sys.executable, "-c",
   1098                               'import sys, time;'
   1099                               'time.sleep(0.2)'],
   1100                              stdout=subprocess.PIPE,
   1101                              stderr=subprocess.PIPE)
   1102         self.addCleanup(p.stdout.close)
   1103         self.addCleanup(p.stderr.close)
   1104         ident = id(p)
   1105         pid = p.pid
   1106         del p
   1107         # check that p is in the active processes list
   1108         self.assertIn(ident, [id(o) for o in subprocess._active])
   1109 
   1110     def test_leak_fast_process_del_killed(self):
   1111         # Issue #12650: on Unix, if Popen.__del__() was called before the
   1112         # process exited, and the process got killed by a signal, it would never
   1113         # be removed from subprocess._active, which triggered a FD and memory
   1114         # leak.
   1115         # spawn a Popen, delete its reference and kill it
   1116         p = subprocess.Popen([sys.executable, "-c",
   1117                               'import time;'
   1118                               'time.sleep(3)'],
   1119                              stdout=subprocess.PIPE,
   1120                              stderr=subprocess.PIPE)
   1121         self.addCleanup(p.stdout.close)
   1122         self.addCleanup(p.stderr.close)
   1123         ident = id(p)
   1124         pid = p.pid
   1125         del p
   1126         os.kill(pid, signal.SIGKILL)
   1127         # check that p is in the active processes list
   1128         self.assertIn(ident, [id(o) for o in subprocess._active])
   1129 
   1130         # let some time for the process to exit, and create a new Popen: this
   1131         # should trigger the wait() of p
   1132         time.sleep(0.2)
   1133         with self.assertRaises(EnvironmentError) as c:
   1134             with subprocess.Popen(['nonexisting_i_hope'],
   1135                                   stdout=subprocess.PIPE,
   1136                                   stderr=subprocess.PIPE) as proc:
   1137                 pass
   1138         # p should have been wait()ed on, and removed from the _active list
   1139         self.assertRaises(OSError, os.waitpid, pid, 0)
   1140         self.assertNotIn(ident, [id(o) for o in subprocess._active])
   1141 
   1142     def test_pipe_cloexec(self):
   1143         # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
   1144         # and are not inherited by another child process.
   1145         p1 = subprocess.Popen([sys.executable, "-c",
   1146                                'import os;'
   1147                                'os.read(0, 1)'
   1148                               ],
   1149                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   1150                               stderr=subprocess.PIPE)
   1151 
   1152         p2 = subprocess.Popen([sys.executable, "-c", """if True:
   1153                                import os, errno, sys
   1154                                for fd in %r:
   1155                                    try:
   1156                                        os.close(fd)
   1157                                    except OSError as e:
   1158                                        if e.errno != errno.EBADF:
   1159                                            raise
   1160                                    else:
   1161                                        sys.exit(1)
   1162                                sys.exit(0)
   1163                                """ % [f.fileno() for f in (p1.stdin, p1.stdout,
   1164                                                            p1.stderr)]
   1165                               ],
   1166                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   1167                               stderr=subprocess.PIPE, close_fds=False)
   1168         p1.communicate('foo')
   1169         _, stderr = p2.communicate()
   1170 
   1171         self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
   1172 
   1173 
   1174 @unittest.skipUnless(mswindows, "Windows specific tests")
   1175 class Win32ProcessTestCase(BaseTestCase):
   1176 
   1177     def test_startupinfo(self):
   1178         # startupinfo argument
   1179         # We uses hardcoded constants, because we do not want to
   1180         # depend on win32all.
   1181         STARTF_USESHOWWINDOW = 1
   1182         SW_MAXIMIZE = 3
   1183         startupinfo = subprocess.STARTUPINFO()
   1184         startupinfo.dwFlags = STARTF_USESHOWWINDOW
   1185         startupinfo.wShowWindow = SW_MAXIMIZE
   1186         # Since Python is a console process, it won't be affected
   1187         # by wShowWindow, but the argument should be silently
   1188         # ignored
   1189         subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
   1190                         startupinfo=startupinfo)
   1191 
   1192     def test_creationflags(self):
   1193         # creationflags argument
   1194         CREATE_NEW_CONSOLE = 16
   1195         sys.stderr.write("    a DOS box should flash briefly ...\n")
   1196         subprocess.call(sys.executable +
   1197                         ' -c "import time; time.sleep(0.25)"',
   1198                         creationflags=CREATE_NEW_CONSOLE)
   1199 
   1200     def test_invalid_args(self):
   1201         # invalid arguments should raise ValueError
   1202         self.assertRaises(ValueError, subprocess.call,
   1203                           [sys.executable, "-c",
   1204                            "import sys; sys.exit(47)"],
   1205                           preexec_fn=lambda: 1)
   1206         self.assertRaises(ValueError, subprocess.call,
   1207                           [sys.executable, "-c",
   1208                            "import sys; sys.exit(47)"],
   1209                           stdout=subprocess.PIPE,
   1210                           close_fds=True)
   1211 
   1212     def test_close_fds(self):
   1213         # close file descriptors
   1214         rc = subprocess.call([sys.executable, "-c",
   1215                               "import sys; sys.exit(47)"],
   1216                               close_fds=True)
   1217         self.assertEqual(rc, 47)
   1218 
   1219     def test_shell_sequence(self):
   1220         # Run command through the shell (sequence)
   1221         newenv = os.environ.copy()
   1222         newenv["FRUIT"] = "physalis"
   1223         p = subprocess.Popen(["set"], shell=1,
   1224                              stdout=subprocess.PIPE,
   1225                              env=newenv)
   1226         self.addCleanup(p.stdout.close)
   1227         self.assertIn("physalis", p.stdout.read())
   1228 
   1229     def test_shell_string(self):
   1230         # Run command through the shell (string)
   1231         newenv = os.environ.copy()
   1232         newenv["FRUIT"] = "physalis"
   1233         p = subprocess.Popen("set", shell=1,
   1234                              stdout=subprocess.PIPE,
   1235                              env=newenv)
   1236         self.addCleanup(p.stdout.close)
   1237         self.assertIn("physalis", p.stdout.read())
   1238 
   1239     def test_call_string(self):
   1240         # call() function with string argument on Windows
   1241         rc = subprocess.call(sys.executable +
   1242                              ' -c "import sys; sys.exit(47)"')
   1243         self.assertEqual(rc, 47)
   1244 
   1245     def _kill_process(self, method, *args):
   1246         # Some win32 buildbot raises EOFError if stdin is inherited
   1247         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1248                              import sys, time
   1249                              sys.stdout.write('x\\n')
   1250                              sys.stdout.flush()
   1251                              time.sleep(30)
   1252                              """],
   1253                              stdin=subprocess.PIPE,
   1254                              stdout=subprocess.PIPE,
   1255                              stderr=subprocess.PIPE)
   1256         self.addCleanup(p.stdout.close)
   1257         self.addCleanup(p.stderr.close)
   1258         self.addCleanup(p.stdin.close)
   1259         # Wait for the interpreter to be completely initialized before
   1260         # sending any signal.
   1261         p.stdout.read(1)
   1262         getattr(p, method)(*args)
   1263         _, stderr = p.communicate()
   1264         self.assertStderrEqual(stderr, '')
   1265         returncode = p.wait()
   1266         self.assertNotEqual(returncode, 0)
   1267 
   1268     def _kill_dead_process(self, method, *args):
   1269         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1270                              import sys, time
   1271                              sys.stdout.write('x\\n')
   1272                              sys.stdout.flush()
   1273                              sys.exit(42)
   1274                              """],
   1275                              stdin=subprocess.PIPE,
   1276                              stdout=subprocess.PIPE,
   1277                              stderr=subprocess.PIPE)
   1278         self.addCleanup(p.stdout.close)
   1279         self.addCleanup(p.stderr.close)
   1280         self.addCleanup(p.stdin.close)
   1281         # Wait for the interpreter to be completely initialized before
   1282         # sending any signal.
   1283         p.stdout.read(1)
   1284         # The process should end after this
   1285         time.sleep(1)
   1286         # This shouldn't raise even though the child is now dead
   1287         getattr(p, method)(*args)
   1288         _, stderr = p.communicate()
   1289         self.assertStderrEqual(stderr, b'')
   1290         rc = p.wait()
   1291         self.assertEqual(rc, 42)
   1292 
   1293     def test_send_signal(self):
   1294         self._kill_process('send_signal', signal.SIGTERM)
   1295 
   1296     def test_kill(self):
   1297         self._kill_process('kill')
   1298 
   1299     def test_terminate(self):
   1300         self._kill_process('terminate')
   1301 
   1302     def test_send_signal_dead(self):
   1303         self._kill_dead_process('send_signal', signal.SIGTERM)
   1304 
   1305     def test_kill_dead(self):
   1306         self._kill_dead_process('kill')
   1307 
   1308     def test_terminate_dead(self):
   1309         self._kill_dead_process('terminate')
   1310 
   1311 
   1312 @unittest.skipUnless(getattr(subprocess, '_has_poll', False),
   1313                      "poll system call not supported")
   1314 class ProcessTestCaseNoPoll(ProcessTestCase):
   1315     def setUp(self):
   1316         subprocess._has_poll = False
   1317         ProcessTestCase.setUp(self)
   1318 
   1319     def tearDown(self):
   1320         subprocess._has_poll = True
   1321         ProcessTestCase.tearDown(self)
   1322 
   1323 
   1324 class HelperFunctionTests(unittest.TestCase):
   1325     @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
   1326     def test_eintr_retry_call(self):
   1327         record_calls = []
   1328         def fake_os_func(*args):
   1329             record_calls.append(args)
   1330             if len(record_calls) == 2:
   1331                 raise OSError(errno.EINTR, "fake interrupted system call")
   1332             return tuple(reversed(args))
   1333 
   1334         self.assertEqual((999, 256),
   1335                          subprocess._eintr_retry_call(fake_os_func, 256, 999))
   1336         self.assertEqual([(256, 999)], record_calls)
   1337         # This time there will be an EINTR so it will loop once.
   1338         self.assertEqual((666,),
   1339                          subprocess._eintr_retry_call(fake_os_func, 666))
   1340         self.assertEqual([(256, 999), (666,), (666,)], record_calls)
   1341 
   1342 @unittest.skipUnless(mswindows, "mswindows only")
   1343 class CommandsWithSpaces (BaseTestCase):
   1344 
   1345     def setUp(self):
   1346         super(CommandsWithSpaces, self).setUp()
   1347         f, fname = mkstemp(".py", "te st")
   1348         self.fname = fname.lower ()
   1349         os.write(f, b"import sys;"
   1350                     b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
   1351         )
   1352         os.close(f)
   1353 
   1354     def tearDown(self):
   1355         os.remove(self.fname)
   1356         super(CommandsWithSpaces, self).tearDown()
   1357 
   1358     def with_spaces(self, *args, **kwargs):
   1359         kwargs['stdout'] = subprocess.PIPE
   1360         p = subprocess.Popen(*args, **kwargs)
   1361         self.addCleanup(p.stdout.close)
   1362         self.assertEqual(
   1363           p.stdout.read ().decode("mbcs"),
   1364           "2 [%r, 'ab cd']" % self.fname
   1365         )
   1366 
   1367     def test_shell_string_with_spaces(self):
   1368         # call() function with string argument with spaces on Windows
   1369         self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
   1370                                              "ab cd"), shell=1)
   1371 
   1372     def test_shell_sequence_with_spaces(self):
   1373         # call() function with sequence argument with spaces on Windows
   1374         self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
   1375 
   1376     def test_noshell_string_with_spaces(self):
   1377         # call() function with string argument with spaces on Windows
   1378         self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
   1379                              "ab cd"))
   1380 
   1381     def test_noshell_sequence_with_spaces(self):
   1382         # call() function with sequence argument with spaces on Windows
   1383         self.with_spaces([sys.executable, self.fname, "ab cd"])
   1384 
   1385 def test_main():
   1386     unit_tests = (ProcessTestCase,
   1387                   POSIXProcessTestCase,
   1388                   Win32ProcessTestCase,
   1389                   ProcessTestCaseNoPoll,
   1390                   HelperFunctionTests,
   1391                   CommandsWithSpaces)
   1392 
   1393     test_support.run_unittest(*unit_tests)
   1394     test_support.reap_children()
   1395 
   1396 if __name__ == "__main__":
   1397     test_main()
   1398