Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support
      3 import subprocess
      4 import sys
      5 import signal
      6 import os
      7 import errno
      8 import tempfile
      9 import time
     10 import re
     11 import sysconfig
     12 
     13 try:
     14     import resource
     15 except ImportError:
     16     resource = None
     17 try:
     18     import threading
     19 except ImportError:
     20     threading = None
     21 
     22 mswindows = (sys.platform == "win32")
     23 
     24 #
     25 # Depends on the following external programs: Python
     26 #
     27 
     28 if mswindows:
     29     SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
     30                                                 'os.O_BINARY);')
     31 else:
     32     SETBINARY = ''
     33 
     34 
     35 class BaseTestCase(unittest.TestCase):
     36     def setUp(self):
     37         # Try to minimize the number of children we have so this test
     38         # doesn't crash on some buildbots (Alphas in particular).
     39         test_support.reap_children()
     40 
     41     def tearDown(self):
     42         for inst in subprocess._active:
     43             inst.wait()
     44         subprocess._cleanup()
     45         self.assertFalse(subprocess._active, "subprocess._active not empty")
     46 
     47     def assertStderrEqual(self, stderr, expected, msg=None):
     48         # In a debug build, stuff like "[6580 refs]" is printed to stderr at
     49         # shutdown time.  That frustrates tests trying to check stderr produced
     50         # from a spawned Python process.
     51         actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
     52         self.assertEqual(actual, expected, msg)
     53 
     54 
     55 class PopenTestException(Exception):
     56     pass
     57 
     58 
     59 class PopenExecuteChildRaises(subprocess.Popen):
     60     """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
     61     _execute_child fails.
     62     """
     63     def _execute_child(self, *args, **kwargs):
     64         raise PopenTestException("Forced Exception for Test")
     65 
     66 
     67 class ProcessTestCase(BaseTestCase):
     68 
     69     def test_call_seq(self):
     70         # call() function with sequence argument
     71         rc = subprocess.call([sys.executable, "-c",
     72                               "import sys; sys.exit(47)"])
     73         self.assertEqual(rc, 47)
     74 
     75     def test_check_call_zero(self):
     76         # check_call() function with zero return code
     77         rc = subprocess.check_call([sys.executable, "-c",
     78                                     "import sys; sys.exit(0)"])
     79         self.assertEqual(rc, 0)
     80 
     81     def test_check_call_nonzero(self):
     82         # check_call() function with non-zero return code
     83         with self.assertRaises(subprocess.CalledProcessError) as c:
     84             subprocess.check_call([sys.executable, "-c",
     85                                    "import sys; sys.exit(47)"])
     86         self.assertEqual(c.exception.returncode, 47)
     87 
     88     def test_check_output(self):
     89         # check_output() function with zero return code
     90         output = subprocess.check_output(
     91                 [sys.executable, "-c", "print 'BDFL'"])
     92         self.assertIn('BDFL', output)
     93 
     94     def test_check_output_nonzero(self):
     95         # check_call() function with non-zero return code
     96         with self.assertRaises(subprocess.CalledProcessError) as c:
     97             subprocess.check_output(
     98                     [sys.executable, "-c", "import sys; sys.exit(5)"])
     99         self.assertEqual(c.exception.returncode, 5)
    100 
    101     def test_check_output_stderr(self):
    102         # check_output() function stderr redirected to stdout
    103         output = subprocess.check_output(
    104                 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
    105                 stderr=subprocess.STDOUT)
    106         self.assertIn('BDFL', output)
    107 
    108     def test_check_output_stdout_arg(self):
    109         # check_output() function stderr redirected to stdout
    110         with self.assertRaises(ValueError) as c:
    111             output = subprocess.check_output(
    112                     [sys.executable, "-c", "print 'will not be run'"],
    113                     stdout=sys.stdout)
    114             self.fail("Expected ValueError when stdout arg supplied.")
    115         self.assertIn('stdout', c.exception.args[0])
    116 
    117     def test_call_kwargs(self):
    118         # call() function with keyword args
    119         newenv = os.environ.copy()
    120         newenv["FRUIT"] = "banana"
    121         rc = subprocess.call([sys.executable, "-c",
    122                               'import sys, os;'
    123                               'sys.exit(os.getenv("FRUIT")=="banana")'],
    124                              env=newenv)
    125         self.assertEqual(rc, 1)
    126 
    127     def test_invalid_args(self):
    128         # Popen() called with invalid arguments should raise TypeError
    129         # but Popen.__del__ should not complain (issue #12085)
    130         with test_support.captured_stderr() as s:
    131             self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
    132             argcount = subprocess.Popen.__init__.__code__.co_argcount
    133             too_many_args = [0] * (argcount + 1)
    134             self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
    135         self.assertEqual(s.getvalue(), '')
    136 
    137     def test_stdin_none(self):
    138         # .stdin is None when not redirected
    139         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
    140                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    141         self.addCleanup(p.stdout.close)
    142         self.addCleanup(p.stderr.close)
    143         p.wait()
    144         self.assertEqual(p.stdin, None)
    145 
    146     def test_stdout_none(self):
    147         # .stdout is None when not redirected, and the child's stdout will
    148         # be inherited from the parent.  In order to test this we run a
    149         # subprocess in a subprocess:
    150         # this_test
    151         #   \-- subprocess created by this test (parent)
    152         #          \-- subprocess created by the parent subprocess (child)
    153         # The parent doesn't specify stdout, so the child will use the
    154         # parent's stdout.  This test checks that the message printed by the
    155         # child goes to the parent stdout.  The parent also checks that the
    156         # child's stdout is None.  See #11963.
    157         code = ('import sys; from subprocess import Popen, PIPE;'
    158                 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
    159                 '          stdin=PIPE, stderr=PIPE);'
    160                 'p.wait(); assert p.stdout is None;')
    161         p = subprocess.Popen([sys.executable, "-c", code],
    162                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    163         self.addCleanup(p.stdout.close)
    164         self.addCleanup(p.stderr.close)
    165         out, err = p.communicate()
    166         self.assertEqual(p.returncode, 0, err)
    167         self.assertEqual(out.rstrip(), 'test_stdout_none')
    168 
    169     def test_stderr_none(self):
    170         # .stderr is None when not redirected
    171         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
    172                          stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    173         self.addCleanup(p.stdout.close)
    174         self.addCleanup(p.stdin.close)
    175         p.wait()
    176         self.assertEqual(p.stderr, None)
    177 
    178     def test_executable_with_cwd(self):
    179         python_dir = os.path.dirname(os.path.realpath(sys.executable))
    180         p = subprocess.Popen(["somethingyoudonthave", "-c",
    181                               "import sys; sys.exit(47)"],
    182                              executable=sys.executable, cwd=python_dir)
    183         p.wait()
    184         self.assertEqual(p.returncode, 47)
    185 
    186     @unittest.skipIf(sysconfig.is_python_build(),
    187                      "need an installed Python. See #7774")
    188     def test_executable_without_cwd(self):
    189         # For a normal installation, it should work without 'cwd'
    190         # argument.  For test runs in the build directory, see #7774.
    191         p = subprocess.Popen(["somethingyoudonthave", "-c",
    192                               "import sys; sys.exit(47)"],
    193                              executable=sys.executable)
    194         p.wait()
    195         self.assertEqual(p.returncode, 47)
    196 
    197     def test_stdin_pipe(self):
    198         # stdin redirection
    199         p = subprocess.Popen([sys.executable, "-c",
    200                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    201                         stdin=subprocess.PIPE)
    202         p.stdin.write("pear")
    203         p.stdin.close()
    204         p.wait()
    205         self.assertEqual(p.returncode, 1)
    206 
    207     def test_stdin_filedes(self):
    208         # stdin is set to open file descriptor
    209         tf = tempfile.TemporaryFile()
    210         d = tf.fileno()
    211         os.write(d, "pear")
    212         os.lseek(d, 0, 0)
    213         p = subprocess.Popen([sys.executable, "-c",
    214                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    215                          stdin=d)
    216         p.wait()
    217         self.assertEqual(p.returncode, 1)
    218 
    219     def test_stdin_fileobj(self):
    220         # stdin is set to open file object
    221         tf = tempfile.TemporaryFile()
    222         tf.write("pear")
    223         tf.seek(0)
    224         p = subprocess.Popen([sys.executable, "-c",
    225                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    226                          stdin=tf)
    227         p.wait()
    228         self.assertEqual(p.returncode, 1)
    229 
    230     def test_stdout_pipe(self):
    231         # stdout redirection
    232         p = subprocess.Popen([sys.executable, "-c",
    233                           'import sys; sys.stdout.write("orange")'],
    234                          stdout=subprocess.PIPE)
    235         self.addCleanup(p.stdout.close)
    236         self.assertEqual(p.stdout.read(), "orange")
    237 
    238     def test_stdout_filedes(self):
    239         # stdout is set to open file descriptor
    240         tf = tempfile.TemporaryFile()
    241         d = tf.fileno()
    242         p = subprocess.Popen([sys.executable, "-c",
    243                           'import sys; sys.stdout.write("orange")'],
    244                          stdout=d)
    245         p.wait()
    246         os.lseek(d, 0, 0)
    247         self.assertEqual(os.read(d, 1024), "orange")
    248 
    249     def test_stdout_fileobj(self):
    250         # stdout is set to open file object
    251         tf = tempfile.TemporaryFile()
    252         p = subprocess.Popen([sys.executable, "-c",
    253                           'import sys; sys.stdout.write("orange")'],
    254                          stdout=tf)
    255         p.wait()
    256         tf.seek(0)
    257         self.assertEqual(tf.read(), "orange")
    258 
    259     def test_stderr_pipe(self):
    260         # stderr redirection
    261         p = subprocess.Popen([sys.executable, "-c",
    262                           'import sys; sys.stderr.write("strawberry")'],
    263                          stderr=subprocess.PIPE)
    264         self.addCleanup(p.stderr.close)
    265         self.assertStderrEqual(p.stderr.read(), "strawberry")
    266 
    267     def test_stderr_filedes(self):
    268         # stderr is set to open file descriptor
    269         tf = tempfile.TemporaryFile()
    270         d = tf.fileno()
    271         p = subprocess.Popen([sys.executable, "-c",
    272                           'import sys; sys.stderr.write("strawberry")'],
    273                          stderr=d)
    274         p.wait()
    275         os.lseek(d, 0, 0)
    276         self.assertStderrEqual(os.read(d, 1024), "strawberry")
    277 
    278     def test_stderr_fileobj(self):
    279         # stderr is set to open file object
    280         tf = tempfile.TemporaryFile()
    281         p = subprocess.Popen([sys.executable, "-c",
    282                           'import sys; sys.stderr.write("strawberry")'],
    283                          stderr=tf)
    284         p.wait()
    285         tf.seek(0)
    286         self.assertStderrEqual(tf.read(), "strawberry")
    287 
    288     def test_stderr_redirect_with_no_stdout_redirect(self):
    289         # test stderr=STDOUT while stdout=None (not set)
    290 
    291         # - grandchild prints to stderr
    292         # - child redirects grandchild's stderr to its stdout
    293         # - the parent should get grandchild's stderr in child's stdout
    294         p = subprocess.Popen([sys.executable, "-c",
    295                               'import sys, subprocess;'
    296                               'rc = subprocess.call([sys.executable, "-c",'
    297                               '    "import sys;"'
    298                               '    "sys.stderr.write(\'42\')"],'
    299                               '    stderr=subprocess.STDOUT);'
    300                               'sys.exit(rc)'],
    301                              stdout=subprocess.PIPE,
    302                              stderr=subprocess.PIPE)
    303         stdout, stderr = p.communicate()
    304         #NOTE: stdout should get stderr from grandchild
    305         self.assertStderrEqual(stdout, b'42')
    306         self.assertStderrEqual(stderr, b'') # should be empty
    307         self.assertEqual(p.returncode, 0)
    308 
    309     def test_stdout_stderr_pipe(self):
    310         # capture stdout and stderr to the same pipe
    311         p = subprocess.Popen([sys.executable, "-c",
    312                           'import sys;'
    313                           'sys.stdout.write("apple");'
    314                           'sys.stdout.flush();'
    315                           'sys.stderr.write("orange")'],
    316                          stdout=subprocess.PIPE,
    317                          stderr=subprocess.STDOUT)
    318         self.addCleanup(p.stdout.close)
    319         self.assertStderrEqual(p.stdout.read(), "appleorange")
    320 
    321     def test_stdout_stderr_file(self):
    322         # capture stdout and stderr to the same open file
    323         tf = tempfile.TemporaryFile()
    324         p = subprocess.Popen([sys.executable, "-c",
    325                           'import sys;'
    326                           'sys.stdout.write("apple");'
    327                           'sys.stdout.flush();'
    328                           'sys.stderr.write("orange")'],
    329                          stdout=tf,
    330                          stderr=tf)
    331         p.wait()
    332         tf.seek(0)
    333         self.assertStderrEqual(tf.read(), "appleorange")
    334 
    335     def test_stdout_filedes_of_stdout(self):
    336         # stdout is set to 1 (#1531862).
    337         # To avoid printing the text on stdout, we do something similar to
    338         # test_stdout_none (see above).  The parent subprocess calls the child
    339         # subprocess passing stdout=1, and this test uses stdout=PIPE in
    340         # order to capture and check the output of the parent. See #11963.
    341         code = ('import sys, subprocess; '
    342                 'rc = subprocess.call([sys.executable, "-c", '
    343                 '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
    344                      '\'test with stdout=1\'))"], stdout=1); '
    345                 'assert rc == 18')
    346         p = subprocess.Popen([sys.executable, "-c", code],
    347                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    348         self.addCleanup(p.stdout.close)
    349         self.addCleanup(p.stderr.close)
    350         out, err = p.communicate()
    351         self.assertEqual(p.returncode, 0, err)
    352         self.assertEqual(out.rstrip(), 'test with stdout=1')
    353 
    354     def test_cwd(self):
    355         tmpdir = tempfile.gettempdir()
    356         # We cannot use os.path.realpath to canonicalize the path,
    357         # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
    358         cwd = os.getcwd()
    359         os.chdir(tmpdir)
    360         tmpdir = os.getcwd()
    361         os.chdir(cwd)
    362         p = subprocess.Popen([sys.executable, "-c",
    363                           'import sys,os;'
    364                           'sys.stdout.write(os.getcwd())'],
    365                          stdout=subprocess.PIPE,
    366                          cwd=tmpdir)
    367         self.addCleanup(p.stdout.close)
    368         normcase = os.path.normcase
    369         self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
    370 
    371     def test_env(self):
    372         newenv = os.environ.copy()
    373         newenv["FRUIT"] = "orange"
    374         p = subprocess.Popen([sys.executable, "-c",
    375                           'import sys,os;'
    376                           'sys.stdout.write(os.getenv("FRUIT"))'],
    377                          stdout=subprocess.PIPE,
    378                          env=newenv)
    379         self.addCleanup(p.stdout.close)
    380         self.assertEqual(p.stdout.read(), "orange")
    381 
    382     def test_communicate_stdin(self):
    383         p = subprocess.Popen([sys.executable, "-c",
    384                               'import sys;'
    385                               'sys.exit(sys.stdin.read() == "pear")'],
    386                              stdin=subprocess.PIPE)
    387         p.communicate("pear")
    388         self.assertEqual(p.returncode, 1)
    389 
    390     def test_communicate_stdout(self):
    391         p = subprocess.Popen([sys.executable, "-c",
    392                               'import sys; sys.stdout.write("pineapple")'],
    393                              stdout=subprocess.PIPE)
    394         (stdout, stderr) = p.communicate()
    395         self.assertEqual(stdout, "pineapple")
    396         self.assertEqual(stderr, None)
    397 
    398     def test_communicate_stderr(self):
    399         p = subprocess.Popen([sys.executable, "-c",
    400                               'import sys; sys.stderr.write("pineapple")'],
    401                              stderr=subprocess.PIPE)
    402         (stdout, stderr) = p.communicate()
    403         self.assertEqual(stdout, None)
    404         self.assertStderrEqual(stderr, "pineapple")
    405 
    406     def test_communicate(self):
    407         p = subprocess.Popen([sys.executable, "-c",
    408                           'import sys,os;'
    409                           'sys.stderr.write("pineapple");'
    410                           'sys.stdout.write(sys.stdin.read())'],
    411                          stdin=subprocess.PIPE,
    412                          stdout=subprocess.PIPE,
    413                          stderr=subprocess.PIPE)
    414         self.addCleanup(p.stdout.close)
    415         self.addCleanup(p.stderr.close)
    416         self.addCleanup(p.stdin.close)
    417         (stdout, stderr) = p.communicate("banana")
    418         self.assertEqual(stdout, "banana")
    419         self.assertStderrEqual(stderr, "pineapple")
    420 
    421     # This test is Linux specific for simplicity to at least have
    422     # some coverage.  It is not a platform specific bug.
    423     @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
    424                          "Linux specific")
    425     # Test for the fd leak reported in http://bugs.python.org/issue2791.
    426     def test_communicate_pipe_fd_leak(self):
    427         fd_directory = '/proc/%d/fd' % os.getpid()
    428         num_fds_before_popen = len(os.listdir(fd_directory))
    429         p = subprocess.Popen([sys.executable, "-c", "print('')"],
    430                              stdout=subprocess.PIPE)
    431         p.communicate()
    432         num_fds_after_communicate = len(os.listdir(fd_directory))
    433         del p
    434         num_fds_after_destruction = len(os.listdir(fd_directory))
    435         self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
    436         self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
    437 
    438     def test_communicate_returns(self):
    439         # communicate() should return None if no redirection is active
    440         p = subprocess.Popen([sys.executable, "-c",
    441                               "import sys; sys.exit(47)"])
    442         (stdout, stderr) = p.communicate()
    443         self.assertEqual(stdout, None)
    444         self.assertEqual(stderr, None)
    445 
    446     def test_communicate_pipe_buf(self):
    447         # communicate() with writes larger than pipe_buf
    448         # This test will probably deadlock rather than fail, if
    449         # communicate() does not work properly.
    450         x, y = os.pipe()
    451         if mswindows:
    452             pipe_buf = 512
    453         else:
    454             pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
    455         os.close(x)
    456         os.close(y)
    457         p = subprocess.Popen([sys.executable, "-c",
    458                           'import sys,os;'
    459                           'sys.stdout.write(sys.stdin.read(47));'
    460                           'sys.stderr.write("xyz"*%d);'
    461                           'sys.stdout.write(sys.stdin.read())' % pipe_buf],
    462                          stdin=subprocess.PIPE,
    463                          stdout=subprocess.PIPE,
    464                          stderr=subprocess.PIPE)
    465         self.addCleanup(p.stdout.close)
    466         self.addCleanup(p.stderr.close)
    467         self.addCleanup(p.stdin.close)
    468         string_to_write = "abc"*pipe_buf
    469         (stdout, stderr) = p.communicate(string_to_write)
    470         self.assertEqual(stdout, string_to_write)
    471 
    472     def test_writes_before_communicate(self):
    473         # stdin.write before communicate()
    474         p = subprocess.Popen([sys.executable, "-c",
    475                           'import sys,os;'
    476                           'sys.stdout.write(sys.stdin.read())'],
    477                          stdin=subprocess.PIPE,
    478                          stdout=subprocess.PIPE,
    479                          stderr=subprocess.PIPE)
    480         self.addCleanup(p.stdout.close)
    481         self.addCleanup(p.stderr.close)
    482         self.addCleanup(p.stdin.close)
    483         p.stdin.write("banana")
    484         (stdout, stderr) = p.communicate("split")
    485         self.assertEqual(stdout, "bananasplit")
    486         self.assertStderrEqual(stderr, "")
    487 
    488     def test_universal_newlines(self):
    489         p = subprocess.Popen([sys.executable, "-c",
    490                           'import sys,os;' + SETBINARY +
    491                           'sys.stdout.write("line1\\n");'
    492                           'sys.stdout.flush();'
    493                           'sys.stdout.write("line2\\r");'
    494                           'sys.stdout.flush();'
    495                           'sys.stdout.write("line3\\r\\n");'
    496                           'sys.stdout.flush();'
    497                           'sys.stdout.write("line4\\r");'
    498                           'sys.stdout.flush();'
    499                           'sys.stdout.write("\\nline5");'
    500                           'sys.stdout.flush();'
    501                           'sys.stdout.write("\\nline6");'],
    502                          stdout=subprocess.PIPE,
    503                          universal_newlines=1)
    504         self.addCleanup(p.stdout.close)
    505         stdout = p.stdout.read()
    506         if hasattr(file, 'newlines'):
    507             # Interpreter with universal newline support
    508             self.assertEqual(stdout,
    509                              "line1\nline2\nline3\nline4\nline5\nline6")
    510         else:
    511             # Interpreter without universal newline support
    512             self.assertEqual(stdout,
    513                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
    514 
    515     def test_universal_newlines_communicate(self):
    516         # universal newlines through communicate()
    517         p = subprocess.Popen([sys.executable, "-c",
    518                           'import sys,os;' + SETBINARY +
    519                           'sys.stdout.write("line1\\n");'
    520                           'sys.stdout.flush();'
    521                           'sys.stdout.write("line2\\r");'
    522                           'sys.stdout.flush();'
    523                           'sys.stdout.write("line3\\r\\n");'
    524                           'sys.stdout.flush();'
    525                           'sys.stdout.write("line4\\r");'
    526                           'sys.stdout.flush();'
    527                           'sys.stdout.write("\\nline5");'
    528                           'sys.stdout.flush();'
    529                           'sys.stdout.write("\\nline6");'],
    530                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    531                          universal_newlines=1)
    532         self.addCleanup(p.stdout.close)
    533         self.addCleanup(p.stderr.close)
    534         (stdout, stderr) = p.communicate()
    535         if hasattr(file, 'newlines'):
    536             # Interpreter with universal newline support
    537             self.assertEqual(stdout,
    538                              "line1\nline2\nline3\nline4\nline5\nline6")
    539         else:
    540             # Interpreter without universal newline support
    541             self.assertEqual(stdout,
    542                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
    543 
    544     def test_no_leaking(self):
    545         # Make sure we leak no resources
    546         if not mswindows:
    547             max_handles = 1026 # too much for most UNIX systems
    548         else:
    549             max_handles = 2050 # too much for (at least some) Windows setups
    550         handles = []
    551         try:
    552             for i in range(max_handles):
    553                 try:
    554                     handles.append(os.open(test_support.TESTFN,
    555                                            os.O_WRONLY | os.O_CREAT))
    556                 except OSError as e:
    557                     if e.errno != errno.EMFILE:
    558                         raise
    559                     break
    560             else:
    561                 self.skipTest("failed to reach the file descriptor limit "
    562                     "(tried %d)" % max_handles)
    563             # Close a couple of them (should be enough for a subprocess)
    564             for i in range(10):
    565                 os.close(handles.pop())
    566             # Loop creating some subprocesses. If one of them leaks some fds,
    567             # the next loop iteration will fail by reaching the max fd limit.
    568             for i in range(15):
    569                 p = subprocess.Popen([sys.executable, "-c",
    570                                       "import sys;"
    571                                       "sys.stdout.write(sys.stdin.read())"],
    572                                      stdin=subprocess.PIPE,
    573                                      stdout=subprocess.PIPE,
    574                                      stderr=subprocess.PIPE)
    575                 data = p.communicate(b"lime")[0]
    576                 self.assertEqual(data, b"lime")
    577         finally:
    578             for h in handles:
    579                 os.close(h)
    580             test_support.unlink(test_support.TESTFN)
    581 
    582     def test_list2cmdline(self):
    583         self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
    584                          '"a b c" d e')
    585         self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
    586                          'ab\\"c \\ d')
    587         self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
    588                          'ab\\"c " \\\\" d')
    589         self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
    590                          'a\\\\\\b "de fg" h')
    591         self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
    592                          'a\\\\\\"b c d')
    593         self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
    594                          '"a\\\\b c" d e')
    595         self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
    596                          '"a\\\\b\\ c" d e')
    597         self.assertEqual(subprocess.list2cmdline(['ab', '']),
    598                          'ab ""')
    599 
    600 
    601     def test_poll(self):
    602         p = subprocess.Popen([sys.executable,
    603                           "-c", "import time; time.sleep(1)"])
    604         count = 0
    605         while p.poll() is None:
    606             time.sleep(0.1)
    607             count += 1
    608         # We expect that the poll loop probably went around about 10 times,
    609         # but, based on system scheduling we can't control, it's possible
    610         # poll() never returned None.  It "should be" very rare that it
    611         # didn't go around at least twice.
    612         self.assertGreaterEqual(count, 2)
    613         # Subsequent invocations should just return the returncode
    614         self.assertEqual(p.poll(), 0)
    615 
    616 
    617     def test_wait(self):
    618         p = subprocess.Popen([sys.executable,
    619                           "-c", "import time; time.sleep(2)"])
    620         self.assertEqual(p.wait(), 0)
    621         # Subsequent invocations should just return the returncode
    622         self.assertEqual(p.wait(), 0)
    623 
    624 
    625     def test_invalid_bufsize(self):
    626         # an invalid type of the bufsize argument should raise
    627         # TypeError.
    628         with self.assertRaises(TypeError):
    629             subprocess.Popen([sys.executable, "-c", "pass"], "orange")
    630 
    631     def test_leaking_fds_on_error(self):
    632         # see bug #5179: Popen leaks file descriptors to PIPEs if
    633         # the child fails to execute; this will eventually exhaust
    634         # the maximum number of open fds. 1024 seems a very common
    635         # value for that limit, but Windows has 2048, so we loop
    636         # 1024 times (each call leaked two fds).
    637         for i in range(1024):
    638             # Windows raises IOError.  Others raise OSError.
    639             with self.assertRaises(EnvironmentError) as c:
    640                 subprocess.Popen(['nonexisting_i_hope'],
    641                                  stdout=subprocess.PIPE,
    642                                  stderr=subprocess.PIPE)
    643             # ignore errors that indicate the command was not found
    644             if c.exception.errno not in (errno.ENOENT, errno.EACCES):
    645                 raise c.exception
    646 
    647     @unittest.skipIf(threading is None, "threading required")
    648     def test_double_close_on_error(self):
    649         # Issue #18851
    650         fds = []
    651         def open_fds():
    652             for i in range(20):
    653                 fds.extend(os.pipe())
    654                 time.sleep(0.001)
    655         t = threading.Thread(target=open_fds)
    656         t.start()
    657         try:
    658             with self.assertRaises(EnvironmentError):
    659                 subprocess.Popen(['nonexisting_i_hope'],
    660                                  stdin=subprocess.PIPE,
    661                                  stdout=subprocess.PIPE,
    662                                  stderr=subprocess.PIPE)
    663         finally:
    664             t.join()
    665             exc = None
    666             for fd in fds:
    667                 # If a double close occurred, some of those fds will
    668                 # already have been closed by mistake, and os.close()
    669                 # here will raise.
    670                 try:
    671                     os.close(fd)
    672                 except OSError as e:
    673                     exc = e
    674             if exc is not None:
    675                 raise exc
    676 
    677     def test_handles_closed_on_exception(self):
    678         # If CreateProcess exits with an error, ensure the
    679         # duplicate output handles are released
    680         ifhandle, ifname = tempfile.mkstemp()
    681         ofhandle, ofname = tempfile.mkstemp()
    682         efhandle, efname = tempfile.mkstemp()
    683         try:
    684             subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
    685               stderr=efhandle)
    686         except OSError:
    687             os.close(ifhandle)
    688             os.remove(ifname)
    689             os.close(ofhandle)
    690             os.remove(ofname)
    691             os.close(efhandle)
    692             os.remove(efname)
    693         self.assertFalse(os.path.exists(ifname))
    694         self.assertFalse(os.path.exists(ofname))
    695         self.assertFalse(os.path.exists(efname))
    696 
    697     def test_communicate_epipe(self):
    698         # Issue 10963: communicate() should hide EPIPE
    699         p = subprocess.Popen([sys.executable, "-c", 'pass'],
    700                              stdin=subprocess.PIPE,
    701                              stdout=subprocess.PIPE,
    702                              stderr=subprocess.PIPE)
    703         self.addCleanup(p.stdout.close)
    704         self.addCleanup(p.stderr.close)
    705         self.addCleanup(p.stdin.close)
    706         p.communicate("x" * 2**20)
    707 
    708     def test_communicate_epipe_only_stdin(self):
    709         # Issue 10963: communicate() should hide EPIPE
    710         p = subprocess.Popen([sys.executable, "-c", 'pass'],
    711                              stdin=subprocess.PIPE)
    712         self.addCleanup(p.stdin.close)
    713         time.sleep(2)
    714         p.communicate("x" * 2**20)
    715 
    716     # This test is Linux-ish specific for simplicity to at least have
    717     # some coverage.  It is not a platform specific bug.
    718     @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
    719                          "Linux specific")
    720     def test_failed_child_execute_fd_leak(self):
    721         """Test for the fork() failure fd leak reported in issue16327."""
    722         fd_directory = '/proc/%d/fd' % os.getpid()
    723         fds_before_popen = os.listdir(fd_directory)
    724         with self.assertRaises(PopenTestException):
    725             PopenExecuteChildRaises(
    726                     [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
    727                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    728 
    729         # NOTE: This test doesn't verify that the real _execute_child
    730         # does not close the file descriptors itself on the way out
    731         # during an exception.  Code inspection has confirmed that.
    732 
    733         fds_after_exception = os.listdir(fd_directory)
    734         self.assertEqual(fds_before_popen, fds_after_exception)
    735 
    736 
    737 # context manager
    738 class _SuppressCoreFiles(object):
    739     """Try to prevent core files from being created."""
    740     old_limit = None
    741 
    742     def __enter__(self):
    743         """Try to save previous ulimit, then set it to (0, 0)."""
    744         if resource is not None:
    745             try:
    746                 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
    747                 resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
    748             except (ValueError, resource.error):
    749                 pass
    750 
    751         if sys.platform == 'darwin':
    752             # Check if the 'Crash Reporter' on OSX was configured
    753             # in 'Developer' mode and warn that it will get triggered
    754             # when it is.
    755             #
    756             # This assumes that this context manager is used in tests
    757             # that might trigger the next manager.
    758             value = subprocess.Popen(['/usr/bin/defaults', 'read',
    759                     'com.apple.CrashReporter', 'DialogType'],
    760                     stdout=subprocess.PIPE).communicate()[0]
    761             if value.strip() == b'developer':
    762                 print "this tests triggers the Crash Reporter, that is intentional"
    763                 sys.stdout.flush()
    764 
    765     def __exit__(self, *args):
    766         """Return core file behavior to default."""
    767         if self.old_limit is None:
    768             return
    769         if resource is not None:
    770             try:
    771                 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
    772             except (ValueError, resource.error):
    773                 pass
    774 
    775     @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
    776                          "Requires signal.SIGALRM")
    777     def test_communicate_eintr(self):
    778         # Issue #12493: communicate() should handle EINTR
    779         def handler(signum, frame):
    780             pass
    781         old_handler = signal.signal(signal.SIGALRM, handler)
    782         self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
    783 
    784         # the process is running for 2 seconds
    785         args = [sys.executable, "-c", 'import time; time.sleep(2)']
    786         for stream in ('stdout', 'stderr'):
    787             kw = {stream: subprocess.PIPE}
    788             with subprocess.Popen(args, **kw) as process:
    789                 signal.alarm(1)
    790                 # communicate() will be interrupted by SIGALRM
    791                 process.communicate()
    792 
    793 
    794 @unittest.skipIf(mswindows, "POSIX specific tests")
    795 class POSIXProcessTestCase(BaseTestCase):
    796 
    797     def test_exceptions(self):
    798         # caught & re-raised exceptions
    799         with self.assertRaises(OSError) as c:
    800             p = subprocess.Popen([sys.executable, "-c", ""],
    801                                  cwd="/this/path/does/not/exist")
    802         # The attribute child_traceback should contain "os.chdir" somewhere.
    803         self.assertIn("os.chdir", c.exception.child_traceback)
    804 
    805     def test_run_abort(self):
    806         # returncode handles signal termination
    807         with _SuppressCoreFiles():
    808             p = subprocess.Popen([sys.executable, "-c",
    809                                   "import os; os.abort()"])
    810             p.wait()
    811         self.assertEqual(-p.returncode, signal.SIGABRT)
    812 
    813     def test_preexec(self):
    814         # preexec function
    815         p = subprocess.Popen([sys.executable, "-c",
    816                               "import sys, os;"
    817                               "sys.stdout.write(os.getenv('FRUIT'))"],
    818                              stdout=subprocess.PIPE,
    819                              preexec_fn=lambda: os.putenv("FRUIT", "apple"))
    820         self.addCleanup(p.stdout.close)
    821         self.assertEqual(p.stdout.read(), "apple")
    822 
    823     class _TestExecuteChildPopen(subprocess.Popen):
    824         """Used to test behavior at the end of _execute_child."""
    825         def __init__(self, testcase, *args, **kwargs):
    826             self._testcase = testcase
    827             subprocess.Popen.__init__(self, *args, **kwargs)
    828 
    829         def _execute_child(
    830                 self, args, executable, preexec_fn, close_fds, cwd, env,
    831                 universal_newlines, startupinfo, creationflags, shell, to_close,
    832                 p2cread, p2cwrite,
    833                 c2pread, c2pwrite,
    834                 errread, errwrite):
    835             try:
    836                 subprocess.Popen._execute_child(
    837                         self, args, executable, preexec_fn, close_fds,
    838                         cwd, env, universal_newlines,
    839                         startupinfo, creationflags, shell, to_close,
    840                         p2cread, p2cwrite,
    841                         c2pread, c2pwrite,
    842                         errread, errwrite)
    843             finally:
    844                 # Open a bunch of file descriptors and verify that
    845                 # none of them are the same as the ones the Popen
    846                 # instance is using for stdin/stdout/stderr.
    847                 devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
    848                                for _ in range(8)]
    849                 try:
    850                     for fd in devzero_fds:
    851                         self._testcase.assertNotIn(
    852                                 fd, (p2cwrite, c2pread, errread))
    853                 finally:
    854                     for fd in devzero_fds:
    855                         os.close(fd)
    856 
    857     @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
    858     def test_preexec_errpipe_does_not_double_close_pipes(self):
    859         """Issue16140: Don't double close pipes on preexec error."""
    860 
    861         def raise_it():
    862             raise RuntimeError("force the _execute_child() errpipe_data path.")
    863 
    864         with self.assertRaises(RuntimeError):
    865             self._TestExecuteChildPopen(
    866                     self, [sys.executable, "-c", "pass"],
    867                     stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    868                     stderr=subprocess.PIPE, preexec_fn=raise_it)
    869 
    870     def test_args_string(self):
    871         # args is a string
    872         f, fname = tempfile.mkstemp()
    873         os.write(f, "#!/bin/sh\n")
    874         os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
    875                     sys.executable)
    876         os.close(f)
    877         os.chmod(fname, 0o700)
    878         p = subprocess.Popen(fname)
    879         p.wait()
    880         os.remove(fname)
    881         self.assertEqual(p.returncode, 47)
    882 
    883     def test_invalid_args(self):
    884         # invalid arguments should raise ValueError
    885         self.assertRaises(ValueError, subprocess.call,
    886                           [sys.executable, "-c",
    887                            "import sys; sys.exit(47)"],
    888                           startupinfo=47)
    889         self.assertRaises(ValueError, subprocess.call,
    890                           [sys.executable, "-c",
    891                            "import sys; sys.exit(47)"],
    892                           creationflags=47)
    893 
    894     def test_shell_sequence(self):
    895         # Run command through the shell (sequence)
    896         newenv = os.environ.copy()
    897         newenv["FRUIT"] = "apple"
    898         p = subprocess.Popen(["echo $FRUIT"], shell=1,
    899                              stdout=subprocess.PIPE,
    900                              env=newenv)
    901         self.addCleanup(p.stdout.close)
    902         self.assertEqual(p.stdout.read().strip(), "apple")
    903 
    904     def test_shell_string(self):
    905         # Run command through the shell (string)
    906         newenv = os.environ.copy()
    907         newenv["FRUIT"] = "apple"
    908         p = subprocess.Popen("echo $FRUIT", shell=1,
    909                              stdout=subprocess.PIPE,
    910                              env=newenv)
    911         self.addCleanup(p.stdout.close)
    912         self.assertEqual(p.stdout.read().strip(), "apple")
    913 
    914     def test_call_string(self):
    915         # call() function with string argument on UNIX
    916         f, fname = tempfile.mkstemp()
    917         os.write(f, "#!/bin/sh\n")
    918         os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
    919                     sys.executable)
    920         os.close(f)
    921         os.chmod(fname, 0700)
    922         rc = subprocess.call(fname)
    923         os.remove(fname)
    924         self.assertEqual(rc, 47)
    925 
    926     def test_specific_shell(self):
    927         # Issue #9265: Incorrect name passed as arg[0].
    928         shells = []
    929         for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
    930             for name in ['bash', 'ksh']:
    931                 sh = os.path.join(prefix, name)
    932                 if os.path.isfile(sh):
    933                     shells.append(sh)
    934         if not shells: # Will probably work for any shell but csh.
    935             self.skipTest("bash or ksh required for this test")
    936         sh = '/bin/sh'
    937         if os.path.isfile(sh) and not os.path.islink(sh):
    938             # Test will fail if /bin/sh is a symlink to csh.
    939             shells.append(sh)
    940         for sh in shells:
    941             p = subprocess.Popen("echo $0", executable=sh, shell=True,
    942                                  stdout=subprocess.PIPE)
    943             self.addCleanup(p.stdout.close)
    944             self.assertEqual(p.stdout.read().strip(), sh)
    945 
    946     def _kill_process(self, method, *args):
    947         # Do not inherit file handles from the parent.
    948         # It should fix failures on some platforms.
    949         p = subprocess.Popen([sys.executable, "-c", """if 1:
    950                              import sys, time
    951                              sys.stdout.write('x\\n')
    952                              sys.stdout.flush()
    953                              time.sleep(30)
    954                              """],
    955                              close_fds=True,
    956                              stdin=subprocess.PIPE,
    957                              stdout=subprocess.PIPE,
    958                              stderr=subprocess.PIPE)
    959         # Wait for the interpreter to be completely initialized before
    960         # sending any signal.
    961         p.stdout.read(1)
    962         getattr(p, method)(*args)
    963         return p
    964 
    965     @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
    966                      "Due to known OS bug (issue #16762)")
    967     def _kill_dead_process(self, method, *args):
    968         # Do not inherit file handles from the parent.
    969         # It should fix failures on some platforms.
    970         p = subprocess.Popen([sys.executable, "-c", """if 1:
    971                              import sys, time
    972                              sys.stdout.write('x\\n')
    973                              sys.stdout.flush()
    974                              """],
    975                              close_fds=True,
    976                              stdin=subprocess.PIPE,
    977                              stdout=subprocess.PIPE,
    978                              stderr=subprocess.PIPE)
    979         # Wait for the interpreter to be completely initialized before
    980         # sending any signal.
    981         p.stdout.read(1)
    982         # The process should end after this
    983         time.sleep(1)
    984         # This shouldn't raise even though the child is now dead
    985         getattr(p, method)(*args)
    986         p.communicate()
    987 
    988     def test_send_signal(self):
    989         p = self._kill_process('send_signal', signal.SIGINT)
    990         _, stderr = p.communicate()
    991         self.assertIn('KeyboardInterrupt', stderr)
    992         self.assertNotEqual(p.wait(), 0)
    993 
    994     def test_kill(self):
    995         p = self._kill_process('kill')
    996         _, stderr = p.communicate()
    997         self.assertStderrEqual(stderr, '')
    998         self.assertEqual(p.wait(), -signal.SIGKILL)
    999 
   1000     def test_terminate(self):
   1001         p = self._kill_process('terminate')
   1002         _, stderr = p.communicate()
   1003         self.assertStderrEqual(stderr, '')
   1004         self.assertEqual(p.wait(), -signal.SIGTERM)
   1005 
   1006     def test_send_signal_dead(self):
   1007         # Sending a signal to a dead process
   1008         self._kill_dead_process('send_signal', signal.SIGINT)
   1009 
   1010     def test_kill_dead(self):
   1011         # Killing a dead process
   1012         self._kill_dead_process('kill')
   1013 
   1014     def test_terminate_dead(self):
   1015         # Terminating a dead process
   1016         self._kill_dead_process('terminate')
   1017 
   1018     def check_close_std_fds(self, fds):
   1019         # Issue #9905: test that subprocess pipes still work properly with
   1020         # some standard fds closed
   1021         stdin = 0
   1022         newfds = []
   1023         for a in fds:
   1024             b = os.dup(a)
   1025             newfds.append(b)
   1026             if a == 0:
   1027                 stdin = b
   1028         try:
   1029             for fd in fds:
   1030                 os.close(fd)
   1031             out, err = subprocess.Popen([sys.executable, "-c",
   1032                               'import sys;'
   1033                               'sys.stdout.write("apple");'
   1034                               'sys.stdout.flush();'
   1035                               'sys.stderr.write("orange")'],
   1036                        stdin=stdin,
   1037                        stdout=subprocess.PIPE,
   1038                        stderr=subprocess.PIPE).communicate()
   1039             err = test_support.strip_python_stderr(err)
   1040             self.assertEqual((out, err), (b'apple', b'orange'))
   1041         finally:
   1042             for b, a in zip(newfds, fds):
   1043                 os.dup2(b, a)
   1044             for b in newfds:
   1045                 os.close(b)
   1046 
   1047     def test_close_fd_0(self):
   1048         self.check_close_std_fds([0])
   1049 
   1050     def test_close_fd_1(self):
   1051         self.check_close_std_fds([1])
   1052 
   1053     def test_close_fd_2(self):
   1054         self.check_close_std_fds([2])
   1055 
   1056     def test_close_fds_0_1(self):
   1057         self.check_close_std_fds([0, 1])
   1058 
   1059     def test_close_fds_0_2(self):
   1060         self.check_close_std_fds([0, 2])
   1061 
   1062     def test_close_fds_1_2(self):
   1063         self.check_close_std_fds([1, 2])
   1064 
   1065     def test_close_fds_0_1_2(self):
   1066         # Issue #10806: test that subprocess pipes still work properly with
   1067         # all standard fds closed.
   1068         self.check_close_std_fds([0, 1, 2])
   1069 
   1070     def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
   1071         # open up some temporary files
   1072         temps = [tempfile.mkstemp() for i in range(3)]
   1073         temp_fds = [fd for fd, fname in temps]
   1074         try:
   1075             # unlink the files -- we won't need to reopen them
   1076             for fd, fname in temps:
   1077                 os.unlink(fname)
   1078 
   1079             # save a copy of the standard file descriptors
   1080             saved_fds = [os.dup(fd) for fd in range(3)]
   1081             try:
   1082                 # duplicate the temp files over the standard fd's 0, 1, 2
   1083                 for fd, temp_fd in enumerate(temp_fds):
   1084                     os.dup2(temp_fd, fd)
   1085 
   1086                 # write some data to what will become stdin, and rewind
   1087                 os.write(stdin_no, b"STDIN")
   1088                 os.lseek(stdin_no, 0, 0)
   1089 
   1090                 # now use those files in the given order, so that subprocess
   1091                 # has to rearrange them in the child
   1092                 p = subprocess.Popen([sys.executable, "-c",
   1093                     'import sys; got = sys.stdin.read();'
   1094                     'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
   1095                     stdin=stdin_no,
   1096                     stdout=stdout_no,
   1097                     stderr=stderr_no)
   1098                 p.wait()
   1099 
   1100                 for fd in temp_fds:
   1101                     os.lseek(fd, 0, 0)
   1102 
   1103                 out = os.read(stdout_no, 1024)
   1104                 err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
   1105             finally:
   1106                 for std, saved in enumerate(saved_fds):
   1107                     os.dup2(saved, std)
   1108                     os.close(saved)
   1109 
   1110             self.assertEqual(out, b"got STDIN")
   1111             self.assertEqual(err, b"err")
   1112 
   1113         finally:
   1114             for fd in temp_fds:
   1115                 os.close(fd)
   1116 
   1117     # When duping fds, if there arises a situation where one of the fds is
   1118     # either 0, 1 or 2, it is possible that it is overwritten (#12607).
   1119     # This tests all combinations of this.
   1120     def test_swap_fds(self):
   1121         self.check_swap_fds(0, 1, 2)
   1122         self.check_swap_fds(0, 2, 1)
   1123         self.check_swap_fds(1, 0, 2)
   1124         self.check_swap_fds(1, 2, 0)
   1125         self.check_swap_fds(2, 0, 1)
   1126         self.check_swap_fds(2, 1, 0)
   1127 
   1128     def test_wait_when_sigchild_ignored(self):
   1129         # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
   1130         sigchild_ignore = test_support.findfile("sigchild_ignore.py",
   1131                                                 subdir="subprocessdata")
   1132         p = subprocess.Popen([sys.executable, sigchild_ignore],
   1133                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
   1134         stdout, stderr = p.communicate()
   1135         self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
   1136                          " non-zero with this error:\n%s" % stderr)
   1137 
   1138     def test_zombie_fast_process_del(self):
   1139         # Issue #12650: on Unix, if Popen.__del__() was called before the
   1140         # process exited, it wouldn't be added to subprocess._active, and would
   1141         # remain a zombie.
   1142         # spawn a Popen, and delete its reference before it exits
   1143         p = subprocess.Popen([sys.executable, "-c",
   1144                               'import sys, time;'
   1145                               'time.sleep(0.2)'],
   1146                              stdout=subprocess.PIPE,
   1147                              stderr=subprocess.PIPE)
   1148         self.addCleanup(p.stdout.close)
   1149         self.addCleanup(p.stderr.close)
   1150         ident = id(p)
   1151         pid = p.pid
   1152         del p
   1153         # check that p is in the active processes list
   1154         self.assertIn(ident, [id(o) for o in subprocess._active])
   1155 
   1156     def test_leak_fast_process_del_killed(self):
   1157         # Issue #12650: on Unix, if Popen.__del__() was called before the
   1158         # process exited, and the process got killed by a signal, it would never
   1159         # be removed from subprocess._active, which triggered a FD and memory
   1160         # leak.
   1161         # spawn a Popen, delete its reference and kill it
   1162         p = subprocess.Popen([sys.executable, "-c",
   1163                               'import time;'
   1164                               'time.sleep(3)'],
   1165                              stdout=subprocess.PIPE,
   1166                              stderr=subprocess.PIPE)
   1167         self.addCleanup(p.stdout.close)
   1168         self.addCleanup(p.stderr.close)
   1169         ident = id(p)
   1170         pid = p.pid
   1171         del p
   1172         os.kill(pid, signal.SIGKILL)
   1173         # check that p is in the active processes list
   1174         self.assertIn(ident, [id(o) for o in subprocess._active])
   1175 
   1176         # let some time for the process to exit, and create a new Popen: this
   1177         # should trigger the wait() of p
   1178         time.sleep(0.2)
   1179         with self.assertRaises(EnvironmentError) as c:
   1180             with subprocess.Popen(['nonexisting_i_hope'],
   1181                                   stdout=subprocess.PIPE,
   1182                                   stderr=subprocess.PIPE) as proc:
   1183                 pass
   1184         # p should have been wait()ed on, and removed from the _active list
   1185         self.assertRaises(OSError, os.waitpid, pid, 0)
   1186         self.assertNotIn(ident, [id(o) for o in subprocess._active])
   1187 
   1188     def test_pipe_cloexec(self):
   1189         # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
   1190         # and are not inherited by another child process.
   1191         p1 = subprocess.Popen([sys.executable, "-c",
   1192                                'import os;'
   1193                                'os.read(0, 1)'
   1194                               ],
   1195                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   1196                               stderr=subprocess.PIPE)
   1197 
   1198         p2 = subprocess.Popen([sys.executable, "-c", """if True:
   1199                                import os, errno, sys
   1200                                for fd in %r:
   1201                                    try:
   1202                                        os.close(fd)
   1203                                    except OSError as e:
   1204                                        if e.errno != errno.EBADF:
   1205                                            raise
   1206                                    else:
   1207                                        sys.exit(1)
   1208                                sys.exit(0)
   1209                                """ % [f.fileno() for f in (p1.stdin, p1.stdout,
   1210                                                            p1.stderr)]
   1211                               ],
   1212                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   1213                               stderr=subprocess.PIPE, close_fds=False)
   1214         p1.communicate('foo')
   1215         _, stderr = p2.communicate()
   1216 
   1217         self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
   1218 
   1219 
   1220 @unittest.skipUnless(mswindows, "Windows specific tests")
   1221 class Win32ProcessTestCase(BaseTestCase):
   1222 
   1223     def test_startupinfo(self):
   1224         # startupinfo argument
   1225         # We uses hardcoded constants, because we do not want to
   1226         # depend on win32all.
   1227         STARTF_USESHOWWINDOW = 1
   1228         SW_MAXIMIZE = 3
   1229         startupinfo = subprocess.STARTUPINFO()
   1230         startupinfo.dwFlags = STARTF_USESHOWWINDOW
   1231         startupinfo.wShowWindow = SW_MAXIMIZE
   1232         # Since Python is a console process, it won't be affected
   1233         # by wShowWindow, but the argument should be silently
   1234         # ignored
   1235         subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
   1236                         startupinfo=startupinfo)
   1237 
   1238     def test_creationflags(self):
   1239         # creationflags argument
   1240         CREATE_NEW_CONSOLE = 16
   1241         sys.stderr.write("    a DOS box should flash briefly ...\n")
   1242         subprocess.call(sys.executable +
   1243                         ' -c "import time; time.sleep(0.25)"',
   1244                         creationflags=CREATE_NEW_CONSOLE)
   1245 
   1246     def test_invalid_args(self):
   1247         # invalid arguments should raise ValueError
   1248         self.assertRaises(ValueError, subprocess.call,
   1249                           [sys.executable, "-c",
   1250                            "import sys; sys.exit(47)"],
   1251                           preexec_fn=lambda: 1)
   1252         self.assertRaises(ValueError, subprocess.call,
   1253                           [sys.executable, "-c",
   1254                            "import sys; sys.exit(47)"],
   1255                           stdout=subprocess.PIPE,
   1256                           close_fds=True)
   1257 
   1258     def test_close_fds(self):
   1259         # close file descriptors
   1260         rc = subprocess.call([sys.executable, "-c",
   1261                               "import sys; sys.exit(47)"],
   1262                               close_fds=True)
   1263         self.assertEqual(rc, 47)
   1264 
   1265     def test_shell_sequence(self):
   1266         # Run command through the shell (sequence)
   1267         newenv = os.environ.copy()
   1268         newenv["FRUIT"] = "physalis"
   1269         p = subprocess.Popen(["set"], shell=1,
   1270                              stdout=subprocess.PIPE,
   1271                              env=newenv)
   1272         self.addCleanup(p.stdout.close)
   1273         self.assertIn("physalis", p.stdout.read())
   1274 
   1275     def test_shell_string(self):
   1276         # Run command through the shell (string)
   1277         newenv = os.environ.copy()
   1278         newenv["FRUIT"] = "physalis"
   1279         p = subprocess.Popen("set", shell=1,
   1280                              stdout=subprocess.PIPE,
   1281                              env=newenv)
   1282         self.addCleanup(p.stdout.close)
   1283         self.assertIn("physalis", p.stdout.read())
   1284 
   1285     def test_call_string(self):
   1286         # call() function with string argument on Windows
   1287         rc = subprocess.call(sys.executable +
   1288                              ' -c "import sys; sys.exit(47)"')
   1289         self.assertEqual(rc, 47)
   1290 
   1291     def _kill_process(self, method, *args):
   1292         # Some win32 buildbot raises EOFError if stdin is inherited
   1293         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1294                              import sys, time
   1295                              sys.stdout.write('x\\n')
   1296                              sys.stdout.flush()
   1297                              time.sleep(30)
   1298                              """],
   1299                              stdin=subprocess.PIPE,
   1300                              stdout=subprocess.PIPE,
   1301                              stderr=subprocess.PIPE)
   1302         self.addCleanup(p.stdout.close)
   1303         self.addCleanup(p.stderr.close)
   1304         self.addCleanup(p.stdin.close)
   1305         # Wait for the interpreter to be completely initialized before
   1306         # sending any signal.
   1307         p.stdout.read(1)
   1308         getattr(p, method)(*args)
   1309         _, stderr = p.communicate()
   1310         self.assertStderrEqual(stderr, '')
   1311         returncode = p.wait()
   1312         self.assertNotEqual(returncode, 0)
   1313 
   1314     def _kill_dead_process(self, method, *args):
   1315         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1316                              import sys, time
   1317                              sys.stdout.write('x\\n')
   1318                              sys.stdout.flush()
   1319                              sys.exit(42)
   1320                              """],
   1321                              stdin=subprocess.PIPE,
   1322                              stdout=subprocess.PIPE,
   1323                              stderr=subprocess.PIPE)
   1324         self.addCleanup(p.stdout.close)
   1325         self.addCleanup(p.stderr.close)
   1326         self.addCleanup(p.stdin.close)
   1327         # Wait for the interpreter to be completely initialized before
   1328         # sending any signal.
   1329         p.stdout.read(1)
   1330         # The process should end after this
   1331         time.sleep(1)
   1332         # This shouldn't raise even though the child is now dead
   1333         getattr(p, method)(*args)
   1334         _, stderr = p.communicate()
   1335         self.assertStderrEqual(stderr, b'')
   1336         rc = p.wait()
   1337         self.assertEqual(rc, 42)
   1338 
   1339     def test_send_signal(self):
   1340         self._kill_process('send_signal', signal.SIGTERM)
   1341 
   1342     def test_kill(self):
   1343         self._kill_process('kill')
   1344 
   1345     def test_terminate(self):
   1346         self._kill_process('terminate')
   1347 
   1348     def test_send_signal_dead(self):
   1349         self._kill_dead_process('send_signal', signal.SIGTERM)
   1350 
   1351     def test_kill_dead(self):
   1352         self._kill_dead_process('kill')
   1353 
   1354     def test_terminate_dead(self):
   1355         self._kill_dead_process('terminate')
   1356 
   1357 
   1358 @unittest.skipUnless(getattr(subprocess, '_has_poll', False),
   1359                      "poll system call not supported")
   1360 class ProcessTestCaseNoPoll(ProcessTestCase):
   1361     def setUp(self):
   1362         subprocess._has_poll = False
   1363         ProcessTestCase.setUp(self)
   1364 
   1365     def tearDown(self):
   1366         subprocess._has_poll = True
   1367         ProcessTestCase.tearDown(self)
   1368 
   1369 
   1370 class HelperFunctionTests(unittest.TestCase):
   1371     @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
   1372     def test_eintr_retry_call(self):
   1373         record_calls = []
   1374         def fake_os_func(*args):
   1375             record_calls.append(args)
   1376             if len(record_calls) == 2:
   1377                 raise OSError(errno.EINTR, "fake interrupted system call")
   1378             return tuple(reversed(args))
   1379 
   1380         self.assertEqual((999, 256),
   1381                          subprocess._eintr_retry_call(fake_os_func, 256, 999))
   1382         self.assertEqual([(256, 999)], record_calls)
   1383         # This time there will be an EINTR so it will loop once.
   1384         self.assertEqual((666,),
   1385                          subprocess._eintr_retry_call(fake_os_func, 666))
   1386         self.assertEqual([(256, 999), (666,), (666,)], record_calls)
   1387 
   1388 @unittest.skipUnless(mswindows, "mswindows only")
   1389 class CommandsWithSpaces (BaseTestCase):
   1390 
   1391     def setUp(self):
   1392         super(CommandsWithSpaces, self).setUp()
   1393         f, fname = tempfile.mkstemp(".py", "te st")
   1394         self.fname = fname.lower ()
   1395         os.write(f, b"import sys;"
   1396                     b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
   1397         )
   1398         os.close(f)
   1399 
   1400     def tearDown(self):
   1401         os.remove(self.fname)
   1402         super(CommandsWithSpaces, self).tearDown()
   1403 
   1404     def with_spaces(self, *args, **kwargs):
   1405         kwargs['stdout'] = subprocess.PIPE
   1406         p = subprocess.Popen(*args, **kwargs)
   1407         self.addCleanup(p.stdout.close)
   1408         self.assertEqual(
   1409           p.stdout.read ().decode("mbcs"),
   1410           "2 [%r, 'ab cd']" % self.fname
   1411         )
   1412 
   1413     def test_shell_string_with_spaces(self):
   1414         # call() function with string argument with spaces on Windows
   1415         self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
   1416                                              "ab cd"), shell=1)
   1417 
   1418     def test_shell_sequence_with_spaces(self):
   1419         # call() function with sequence argument with spaces on Windows
   1420         self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
   1421 
   1422     def test_noshell_string_with_spaces(self):
   1423         # call() function with string argument with spaces on Windows
   1424         self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
   1425                              "ab cd"))
   1426 
   1427     def test_noshell_sequence_with_spaces(self):
   1428         # call() function with sequence argument with spaces on Windows
   1429         self.with_spaces([sys.executable, self.fname, "ab cd"])
   1430 
   1431 def test_main():
   1432     unit_tests = (ProcessTestCase,
   1433                   POSIXProcessTestCase,
   1434                   Win32ProcessTestCase,
   1435                   ProcessTestCaseNoPoll,
   1436                   HelperFunctionTests,
   1437                   CommandsWithSpaces)
   1438 
   1439     test_support.run_unittest(*unit_tests)
   1440     test_support.reap_children()
   1441 
   1442 if __name__ == "__main__":
   1443     test_main()
   1444