Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support
      3 import subprocess
      4 import sys
      5 import platform
      6 import signal
      7 import os
      8 import errno
      9 import tempfile
     10 import time
     11 import re
     12 import sysconfig
     13 import textwrap
     14 
     15 try:
     16     import ctypes
     17 except ImportError:
     18     ctypes = None
     19 else:
     20     import ctypes.util
     21 
     22 try:
     23     import resource
     24 except ImportError:
     25     resource = None
     26 try:
     27     import threading
     28 except ImportError:
     29     threading = None
     30 
     31 try:
     32     import _testcapi
     33 except ImportError:
     34     _testcapi = None
     35 
     36 mswindows = (sys.platform == "win32")
     37 
     38 #
     39 # Depends on the following external programs: Python
     40 #
     41 
     42 if mswindows:
     43     SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
     44                                                 'os.O_BINARY);')
     45 else:
     46     SETBINARY = ''
     47 
     48 
     49 class BaseTestCase(unittest.TestCase):
     50     def setUp(self):
     51         # Try to minimize the number of children we have so this test
     52         # doesn't crash on some buildbots (Alphas in particular).
     53         test_support.reap_children()
     54 
     55     def tearDown(self):
     56         for inst in subprocess._active:
     57             inst.wait()
     58         subprocess._cleanup()
     59         self.assertFalse(subprocess._active, "subprocess._active not empty")
     60         self.doCleanups()
     61         test_support.reap_children()
     62 
     63     def assertStderrEqual(self, stderr, expected, msg=None):
     64         # In a debug build, stuff like "[6580 refs]" is printed to stderr at
     65         # shutdown time.  That frustrates tests trying to check stderr produced
     66         # from a spawned Python process.
     67         actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
     68         self.assertEqual(actual, expected, msg)
     69 
     70 
     71 class PopenTestException(Exception):
     72     pass
     73 
     74 
     75 class PopenExecuteChildRaises(subprocess.Popen):
     76     """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
     77     _execute_child fails.
     78     """
     79     def _execute_child(self, *args, **kwargs):
     80         raise PopenTestException("Forced Exception for Test")
     81 
     82 
     83 class ProcessTestCase(BaseTestCase):
     84 
     85     def test_call_seq(self):
     86         # call() function with sequence argument
     87         rc = subprocess.call([sys.executable, "-c",
     88                               "import sys; sys.exit(47)"])
     89         self.assertEqual(rc, 47)
     90 
     91     def test_check_call_zero(self):
     92         # check_call() function with zero return code
     93         rc = subprocess.check_call([sys.executable, "-c",
     94                                     "import sys; sys.exit(0)"])
     95         self.assertEqual(rc, 0)
     96 
     97     def test_check_call_nonzero(self):
     98         # check_call() function with non-zero return code
     99         with self.assertRaises(subprocess.CalledProcessError) as c:
    100             subprocess.check_call([sys.executable, "-c",
    101                                    "import sys; sys.exit(47)"])
    102         self.assertEqual(c.exception.returncode, 47)
    103 
    104     def test_check_output(self):
    105         # check_output() function with zero return code
    106         output = subprocess.check_output(
    107                 [sys.executable, "-c", "print 'BDFL'"])
    108         self.assertIn('BDFL', output)
    109 
    110     def test_check_output_nonzero(self):
    111         # check_call() function with non-zero return code
    112         with self.assertRaises(subprocess.CalledProcessError) as c:
    113             subprocess.check_output(
    114                     [sys.executable, "-c", "import sys; sys.exit(5)"])
    115         self.assertEqual(c.exception.returncode, 5)
    116 
    117     def test_check_output_stderr(self):
    118         # check_output() function stderr redirected to stdout
    119         output = subprocess.check_output(
    120                 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
    121                 stderr=subprocess.STDOUT)
    122         self.assertIn('BDFL', output)
    123 
    124     def test_check_output_stdout_arg(self):
    125         # check_output() function stderr redirected to stdout
    126         with self.assertRaises(ValueError) as c:
    127             output = subprocess.check_output(
    128                     [sys.executable, "-c", "print 'will not be run'"],
    129                     stdout=sys.stdout)
    130             self.fail("Expected ValueError when stdout arg supplied.")
    131         self.assertIn('stdout', c.exception.args[0])
    132 
    133     def test_call_kwargs(self):
    134         # call() function with keyword args
    135         newenv = os.environ.copy()
    136         newenv["FRUIT"] = "banana"
    137         rc = subprocess.call([sys.executable, "-c",
    138                               'import sys, os;'
    139                               'sys.exit(os.getenv("FRUIT")=="banana")'],
    140                              env=newenv)
    141         self.assertEqual(rc, 1)
    142 
    143     def test_invalid_args(self):
    144         # Popen() called with invalid arguments should raise TypeError
    145         # but Popen.__del__ should not complain (issue #12085)
    146         with test_support.captured_stderr() as s:
    147             self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
    148             argcount = subprocess.Popen.__init__.__code__.co_argcount
    149             too_many_args = [0] * (argcount + 1)
    150             self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
    151         self.assertEqual(s.getvalue(), '')
    152 
    153     def test_stdin_none(self):
    154         # .stdin is None when not redirected
    155         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
    156                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    157         self.addCleanup(p.stdout.close)
    158         self.addCleanup(p.stderr.close)
    159         p.wait()
    160         self.assertEqual(p.stdin, None)
    161 
    162     def test_stdout_none(self):
    163         # .stdout is None when not redirected, and the child's stdout will
    164         # be inherited from the parent.  In order to test this we run a
    165         # subprocess in a subprocess:
    166         # this_test
    167         #   \-- subprocess created by this test (parent)
    168         #          \-- subprocess created by the parent subprocess (child)
    169         # The parent doesn't specify stdout, so the child will use the
    170         # parent's stdout.  This test checks that the message printed by the
    171         # child goes to the parent stdout.  The parent also checks that the
    172         # child's stdout is None.  See #11963.
    173         code = ('import sys; from subprocess import Popen, PIPE;'
    174                 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
    175                 '          stdin=PIPE, stderr=PIPE);'
    176                 'p.wait(); assert p.stdout is None;')
    177         p = subprocess.Popen([sys.executable, "-c", code],
    178                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    179         self.addCleanup(p.stdout.close)
    180         self.addCleanup(p.stderr.close)
    181         out, err = p.communicate()
    182         self.assertEqual(p.returncode, 0, err)
    183         self.assertEqual(out.rstrip(), 'test_stdout_none')
    184 
    185     def test_stderr_none(self):
    186         # .stderr is None when not redirected
    187         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
    188                          stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    189         self.addCleanup(p.stdout.close)
    190         self.addCleanup(p.stdin.close)
    191         p.wait()
    192         self.assertEqual(p.stderr, None)
    193 
    194     def test_executable_with_cwd(self):
    195         python_dir = os.path.dirname(os.path.realpath(sys.executable))
    196         p = subprocess.Popen(["somethingyoudonthave", "-c",
    197                               "import sys; sys.exit(47)"],
    198                              executable=sys.executable, cwd=python_dir)
    199         p.wait()
    200         self.assertEqual(p.returncode, 47)
    201 
    202     @unittest.skipIf(sysconfig.is_python_build(),
    203                      "need an installed Python. See #7774")
    204     def test_executable_without_cwd(self):
    205         # For a normal installation, it should work without 'cwd'
    206         # argument.  For test runs in the build directory, see #7774.
    207         p = subprocess.Popen(["somethingyoudonthave", "-c",
    208                               "import sys; sys.exit(47)"],
    209                              executable=sys.executable)
    210         p.wait()
    211         self.assertEqual(p.returncode, 47)
    212 
    213     def test_stdin_pipe(self):
    214         # stdin redirection
    215         p = subprocess.Popen([sys.executable, "-c",
    216                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    217                         stdin=subprocess.PIPE)
    218         p.stdin.write("pear")
    219         p.stdin.close()
    220         p.wait()
    221         self.assertEqual(p.returncode, 1)
    222 
    223     def test_stdin_filedes(self):
    224         # stdin is set to open file descriptor
    225         tf = tempfile.TemporaryFile()
    226         d = tf.fileno()
    227         os.write(d, "pear")
    228         os.lseek(d, 0, 0)
    229         p = subprocess.Popen([sys.executable, "-c",
    230                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    231                          stdin=d)
    232         p.wait()
    233         self.assertEqual(p.returncode, 1)
    234 
    235     def test_stdin_fileobj(self):
    236         # stdin is set to open file object
    237         tf = tempfile.TemporaryFile()
    238         tf.write("pear")
    239         tf.seek(0)
    240         p = subprocess.Popen([sys.executable, "-c",
    241                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    242                          stdin=tf)
    243         p.wait()
    244         self.assertEqual(p.returncode, 1)
    245 
    246     def test_stdout_pipe(self):
    247         # stdout redirection
    248         p = subprocess.Popen([sys.executable, "-c",
    249                           'import sys; sys.stdout.write("orange")'],
    250                          stdout=subprocess.PIPE)
    251         self.addCleanup(p.stdout.close)
    252         self.assertEqual(p.stdout.read(), "orange")
    253 
    254     def test_stdout_filedes(self):
    255         # stdout is set to open file descriptor
    256         tf = tempfile.TemporaryFile()
    257         d = tf.fileno()
    258         p = subprocess.Popen([sys.executable, "-c",
    259                           'import sys; sys.stdout.write("orange")'],
    260                          stdout=d)
    261         p.wait()
    262         os.lseek(d, 0, 0)
    263         self.assertEqual(os.read(d, 1024), "orange")
    264 
    265     def test_stdout_fileobj(self):
    266         # stdout is set to open file object
    267         tf = tempfile.TemporaryFile()
    268         p = subprocess.Popen([sys.executable, "-c",
    269                           'import sys; sys.stdout.write("orange")'],
    270                          stdout=tf)
    271         p.wait()
    272         tf.seek(0)
    273         self.assertEqual(tf.read(), "orange")
    274 
    275     def test_stderr_pipe(self):
    276         # stderr redirection
    277         p = subprocess.Popen([sys.executable, "-c",
    278                           'import sys; sys.stderr.write("strawberry")'],
    279                          stderr=subprocess.PIPE)
    280         self.addCleanup(p.stderr.close)
    281         self.assertStderrEqual(p.stderr.read(), "strawberry")
    282 
    283     def test_stderr_filedes(self):
    284         # stderr is set to open file descriptor
    285         tf = tempfile.TemporaryFile()
    286         d = tf.fileno()
    287         p = subprocess.Popen([sys.executable, "-c",
    288                           'import sys; sys.stderr.write("strawberry")'],
    289                          stderr=d)
    290         p.wait()
    291         os.lseek(d, 0, 0)
    292         self.assertStderrEqual(os.read(d, 1024), "strawberry")
    293 
    294     def test_stderr_fileobj(self):
    295         # stderr is set to open file object
    296         tf = tempfile.TemporaryFile()
    297         p = subprocess.Popen([sys.executable, "-c",
    298                           'import sys; sys.stderr.write("strawberry")'],
    299                          stderr=tf)
    300         p.wait()
    301         tf.seek(0)
    302         self.assertStderrEqual(tf.read(), "strawberry")
    303 
    304     def test_stderr_redirect_with_no_stdout_redirect(self):
    305         # test stderr=STDOUT while stdout=None (not set)
    306 
    307         # - grandchild prints to stderr
    308         # - child redirects grandchild's stderr to its stdout
    309         # - the parent should get grandchild's stderr in child's stdout
    310         p = subprocess.Popen([sys.executable, "-c",
    311                               'import sys, subprocess;'
    312                               'rc = subprocess.call([sys.executable, "-c",'
    313                               '    "import sys;"'
    314                               '    "sys.stderr.write(\'42\')"],'
    315                               '    stderr=subprocess.STDOUT);'
    316                               'sys.exit(rc)'],
    317                              stdout=subprocess.PIPE,
    318                              stderr=subprocess.PIPE)
    319         stdout, stderr = p.communicate()
    320         #NOTE: stdout should get stderr from grandchild
    321         self.assertStderrEqual(stdout, b'42')
    322         self.assertStderrEqual(stderr, b'') # should be empty
    323         self.assertEqual(p.returncode, 0)
    324 
    325     def test_stdout_stderr_pipe(self):
    326         # capture stdout and stderr to the same pipe
    327         p = subprocess.Popen([sys.executable, "-c",
    328                           'import sys;'
    329                           'sys.stdout.write("apple");'
    330                           'sys.stdout.flush();'
    331                           'sys.stderr.write("orange")'],
    332                          stdout=subprocess.PIPE,
    333                          stderr=subprocess.STDOUT)
    334         self.addCleanup(p.stdout.close)
    335         self.assertStderrEqual(p.stdout.read(), "appleorange")
    336 
    337     def test_stdout_stderr_file(self):
    338         # capture stdout and stderr to the same open file
    339         tf = tempfile.TemporaryFile()
    340         p = subprocess.Popen([sys.executable, "-c",
    341                           'import sys;'
    342                           'sys.stdout.write("apple");'
    343                           'sys.stdout.flush();'
    344                           'sys.stderr.write("orange")'],
    345                          stdout=tf,
    346                          stderr=tf)
    347         p.wait()
    348         tf.seek(0)
    349         self.assertStderrEqual(tf.read(), "appleorange")
    350 
    351     def test_stdout_filedes_of_stdout(self):
    352         # stdout is set to 1 (#1531862).
    353         # To avoid printing the text on stdout, we do something similar to
    354         # test_stdout_none (see above).  The parent subprocess calls the child
    355         # subprocess passing stdout=1, and this test uses stdout=PIPE in
    356         # order to capture and check the output of the parent. See #11963.
    357         code = ('import sys, subprocess; '
    358                 'rc = subprocess.call([sys.executable, "-c", '
    359                 '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
    360                      '\'test with stdout=1\'))"], stdout=1); '
    361                 'assert rc == 18')
    362         p = subprocess.Popen([sys.executable, "-c", code],
    363                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    364         self.addCleanup(p.stdout.close)
    365         self.addCleanup(p.stderr.close)
    366         out, err = p.communicate()
    367         self.assertEqual(p.returncode, 0, err)
    368         self.assertEqual(out.rstrip(), 'test with stdout=1')
    369 
    370     def test_cwd(self):
    371         tmpdir = tempfile.gettempdir()
    372         # We cannot use os.path.realpath to canonicalize the path,
    373         # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
    374         cwd = os.getcwd()
    375         os.chdir(tmpdir)
    376         tmpdir = os.getcwd()
    377         os.chdir(cwd)
    378         p = subprocess.Popen([sys.executable, "-c",
    379                           'import sys,os;'
    380                           'sys.stdout.write(os.getcwd())'],
    381                          stdout=subprocess.PIPE,
    382                          cwd=tmpdir)
    383         self.addCleanup(p.stdout.close)
    384         normcase = os.path.normcase
    385         self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
    386 
    387     def test_env(self):
    388         newenv = os.environ.copy()
    389         newenv["FRUIT"] = "orange"
    390         p = subprocess.Popen([sys.executable, "-c",
    391                           'import sys,os;'
    392                           'sys.stdout.write(os.getenv("FRUIT"))'],
    393                          stdout=subprocess.PIPE,
    394                          env=newenv)
    395         self.addCleanup(p.stdout.close)
    396         self.assertEqual(p.stdout.read(), "orange")
    397 
    398     def test_invalid_cmd(self):
    399         # null character in the command name
    400         cmd = sys.executable + '\0'
    401         with self.assertRaises(TypeError):
    402             subprocess.Popen([cmd, "-c", "pass"])
    403 
    404         # null character in the command argument
    405         with self.assertRaises(TypeError):
    406             subprocess.Popen([sys.executable, "-c", "pass#\0"])
    407 
    408     def test_invalid_env(self):
    409         # null character in the enviroment variable name
    410         newenv = os.environ.copy()
    411         newenv["FRUIT\0VEGETABLE"] = "cabbage"
    412         with self.assertRaises(TypeError):
    413             subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
    414 
    415         # null character in the enviroment variable value
    416         newenv = os.environ.copy()
    417         newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
    418         with self.assertRaises(TypeError):
    419             subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
    420 
    421         # equal character in the enviroment variable name
    422         newenv = os.environ.copy()
    423         newenv["FRUIT=ORANGE"] = "lemon"
    424         with self.assertRaises(ValueError):
    425             subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
    426 
    427         # equal character in the enviroment variable value
    428         newenv = os.environ.copy()
    429         newenv["FRUIT"] = "orange=lemon"
    430         p = subprocess.Popen([sys.executable, "-c",
    431                               'import sys, os;'
    432                               'sys.stdout.write(os.getenv("FRUIT"))'],
    433                              stdout=subprocess.PIPE,
    434                              env=newenv)
    435         stdout, stderr = p.communicate()
    436         self.assertEqual(stdout, "orange=lemon")
    437 
    438     def test_communicate_stdin(self):
    439         p = subprocess.Popen([sys.executable, "-c",
    440                               'import sys;'
    441                               'sys.exit(sys.stdin.read() == "pear")'],
    442                              stdin=subprocess.PIPE)
    443         p.communicate("pear")
    444         self.assertEqual(p.returncode, 1)
    445 
    446     def test_communicate_stdout(self):
    447         p = subprocess.Popen([sys.executable, "-c",
    448                               'import sys; sys.stdout.write("pineapple")'],
    449                              stdout=subprocess.PIPE)
    450         (stdout, stderr) = p.communicate()
    451         self.assertEqual(stdout, "pineapple")
    452         self.assertEqual(stderr, None)
    453 
    454     def test_communicate_stderr(self):
    455         p = subprocess.Popen([sys.executable, "-c",
    456                               'import sys; sys.stderr.write("pineapple")'],
    457                              stderr=subprocess.PIPE)
    458         (stdout, stderr) = p.communicate()
    459         self.assertEqual(stdout, None)
    460         self.assertStderrEqual(stderr, "pineapple")
    461 
    462     def test_communicate(self):
    463         p = subprocess.Popen([sys.executable, "-c",
    464                           'import sys,os;'
    465                           'sys.stderr.write("pineapple");'
    466                           'sys.stdout.write(sys.stdin.read())'],
    467                          stdin=subprocess.PIPE,
    468                          stdout=subprocess.PIPE,
    469                          stderr=subprocess.PIPE)
    470         self.addCleanup(p.stdout.close)
    471         self.addCleanup(p.stderr.close)
    472         self.addCleanup(p.stdin.close)
    473         (stdout, stderr) = p.communicate("banana")
    474         self.assertEqual(stdout, "banana")
    475         self.assertStderrEqual(stderr, "pineapple")
    476 
    477     # This test is Linux specific for simplicity to at least have
    478     # some coverage.  It is not a platform specific bug.
    479     @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
    480                          "Linux specific")
    481     # Test for the fd leak reported in http://bugs.python.org/issue2791.
    482     def test_communicate_pipe_fd_leak(self):
    483         fd_directory = '/proc/%d/fd' % os.getpid()
    484         num_fds_before_popen = len(os.listdir(fd_directory))
    485         p = subprocess.Popen([sys.executable, "-c", "print('')"],
    486                              stdout=subprocess.PIPE)
    487         p.communicate()
    488         num_fds_after_communicate = len(os.listdir(fd_directory))
    489         del p
    490         num_fds_after_destruction = len(os.listdir(fd_directory))
    491         self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
    492         self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
    493 
    494     def test_communicate_returns(self):
    495         # communicate() should return None if no redirection is active
    496         p = subprocess.Popen([sys.executable, "-c",
    497                               "import sys; sys.exit(47)"])
    498         (stdout, stderr) = p.communicate()
    499         self.assertEqual(stdout, None)
    500         self.assertEqual(stderr, None)
    501 
    502     def test_communicate_pipe_buf(self):
    503         # communicate() with writes larger than pipe_buf
    504         # This test will probably deadlock rather than fail, if
    505         # communicate() does not work properly.
    506         x, y = os.pipe()
    507         if mswindows:
    508             pipe_buf = 512
    509         else:
    510             pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
    511         os.close(x)
    512         os.close(y)
    513         p = subprocess.Popen([sys.executable, "-c",
    514                           'import sys,os;'
    515                           'sys.stdout.write(sys.stdin.read(47));'
    516                           'sys.stderr.write("xyz"*%d);'
    517                           'sys.stdout.write(sys.stdin.read())' % pipe_buf],
    518                          stdin=subprocess.PIPE,
    519                          stdout=subprocess.PIPE,
    520                          stderr=subprocess.PIPE)
    521         self.addCleanup(p.stdout.close)
    522         self.addCleanup(p.stderr.close)
    523         self.addCleanup(p.stdin.close)
    524         string_to_write = "abc"*pipe_buf
    525         (stdout, stderr) = p.communicate(string_to_write)
    526         self.assertEqual(stdout, string_to_write)
    527 
    528     def test_writes_before_communicate(self):
    529         # stdin.write before communicate()
    530         p = subprocess.Popen([sys.executable, "-c",
    531                           'import sys,os;'
    532                           'sys.stdout.write(sys.stdin.read())'],
    533                          stdin=subprocess.PIPE,
    534                          stdout=subprocess.PIPE,
    535                          stderr=subprocess.PIPE)
    536         self.addCleanup(p.stdout.close)
    537         self.addCleanup(p.stderr.close)
    538         self.addCleanup(p.stdin.close)
    539         p.stdin.write("banana")
    540         (stdout, stderr) = p.communicate("split")
    541         self.assertEqual(stdout, "bananasplit")
    542         self.assertStderrEqual(stderr, "")
    543 
    544     def test_universal_newlines(self):
    545         p = subprocess.Popen([sys.executable, "-c",
    546                           'import sys,os;' + SETBINARY +
    547                           'sys.stdout.write("line1\\n");'
    548                           'sys.stdout.flush();'
    549                           'sys.stdout.write("line2\\r");'
    550                           'sys.stdout.flush();'
    551                           'sys.stdout.write("line3\\r\\n");'
    552                           'sys.stdout.flush();'
    553                           'sys.stdout.write("line4\\r");'
    554                           'sys.stdout.flush();'
    555                           'sys.stdout.write("\\nline5");'
    556                           'sys.stdout.flush();'
    557                           'sys.stdout.write("\\nline6");'],
    558                          stdout=subprocess.PIPE,
    559                          universal_newlines=1)
    560         self.addCleanup(p.stdout.close)
    561         stdout = p.stdout.read()
    562         if hasattr(file, 'newlines'):
    563             # Interpreter with universal newline support
    564             self.assertEqual(stdout,
    565                              "line1\nline2\nline3\nline4\nline5\nline6")
    566         else:
    567             # Interpreter without universal newline support
    568             self.assertEqual(stdout,
    569                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
    570 
    571     def test_universal_newlines_communicate(self):
    572         # universal newlines through communicate()
    573         p = subprocess.Popen([sys.executable, "-c",
    574                           'import sys,os;' + SETBINARY +
    575                           'sys.stdout.write("line1\\n");'
    576                           'sys.stdout.flush();'
    577                           'sys.stdout.write("line2\\r");'
    578                           'sys.stdout.flush();'
    579                           'sys.stdout.write("line3\\r\\n");'
    580                           'sys.stdout.flush();'
    581                           'sys.stdout.write("line4\\r");'
    582                           'sys.stdout.flush();'
    583                           'sys.stdout.write("\\nline5");'
    584                           'sys.stdout.flush();'
    585                           'sys.stdout.write("\\nline6");'],
    586                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    587                          universal_newlines=1)
    588         self.addCleanup(p.stdout.close)
    589         self.addCleanup(p.stderr.close)
    590         (stdout, stderr) = p.communicate()
    591         if hasattr(file, 'newlines'):
    592             # Interpreter with universal newline support
    593             self.assertEqual(stdout,
    594                              "line1\nline2\nline3\nline4\nline5\nline6")
    595         else:
    596             # Interpreter without universal newline support
    597             self.assertEqual(stdout,
    598                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
    599 
    600     def test_no_leaking(self):
    601         # Make sure we leak no resources
    602         if not mswindows:
    603             max_handles = 1026 # too much for most UNIX systems
    604         else:
    605             max_handles = 2050 # too much for (at least some) Windows setups
    606         handles = []
    607         try:
    608             for i in range(max_handles):
    609                 try:
    610                     handles.append(os.open(test_support.TESTFN,
    611                                            os.O_WRONLY | os.O_CREAT))
    612                 except OSError as e:
    613                     if e.errno != errno.EMFILE:
    614                         raise
    615                     break
    616             else:
    617                 self.skipTest("failed to reach the file descriptor limit "
    618                     "(tried %d)" % max_handles)
    619             # Close a couple of them (should be enough for a subprocess)
    620             for i in range(10):
    621                 os.close(handles.pop())
    622             # Loop creating some subprocesses. If one of them leaks some fds,
    623             # the next loop iteration will fail by reaching the max fd limit.
    624             for i in range(15):
    625                 p = subprocess.Popen([sys.executable, "-c",
    626                                       "import sys;"
    627                                       "sys.stdout.write(sys.stdin.read())"],
    628                                      stdin=subprocess.PIPE,
    629                                      stdout=subprocess.PIPE,
    630                                      stderr=subprocess.PIPE)
    631                 data = p.communicate(b"lime")[0]
    632                 self.assertEqual(data, b"lime")
    633         finally:
    634             for h in handles:
    635                 os.close(h)
    636             test_support.unlink(test_support.TESTFN)
    637 
    638     def test_list2cmdline(self):
    639         self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
    640                          '"a b c" d e')
    641         self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
    642                          'ab\\"c \\ d')
    643         self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
    644                          'ab\\"c " \\\\" d')
    645         self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
    646                          'a\\\\\\b "de fg" h')
    647         self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
    648                          'a\\\\\\"b c d')
    649         self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
    650                          '"a\\\\b c" d e')
    651         self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
    652                          '"a\\\\b\\ c" d e')
    653         self.assertEqual(subprocess.list2cmdline(['ab', '']),
    654                          'ab ""')
    655 
    656 
    657     def test_poll(self):
    658         p = subprocess.Popen([sys.executable,
    659                           "-c", "import time; time.sleep(1)"])
    660         count = 0
    661         while p.poll() is None:
    662             time.sleep(0.1)
    663             count += 1
    664         # We expect that the poll loop probably went around about 10 times,
    665         # but, based on system scheduling we can't control, it's possible
    666         # poll() never returned None.  It "should be" very rare that it
    667         # didn't go around at least twice.
    668         self.assertGreaterEqual(count, 2)
    669         # Subsequent invocations should just return the returncode
    670         self.assertEqual(p.poll(), 0)
    671 
    672 
    673     def test_wait(self):
    674         p = subprocess.Popen([sys.executable,
    675                           "-c", "import time; time.sleep(2)"])
    676         self.assertEqual(p.wait(), 0)
    677         # Subsequent invocations should just return the returncode
    678         self.assertEqual(p.wait(), 0)
    679 
    680 
    681     def test_invalid_bufsize(self):
    682         # an invalid type of the bufsize argument should raise
    683         # TypeError.
    684         with self.assertRaises(TypeError):
    685             subprocess.Popen([sys.executable, "-c", "pass"], "orange")
    686 
    687     def test_leaking_fds_on_error(self):
    688         # see bug #5179: Popen leaks file descriptors to PIPEs if
    689         # the child fails to execute; this will eventually exhaust
    690         # the maximum number of open fds. 1024 seems a very common
    691         # value for that limit, but Windows has 2048, so we loop
    692         # 1024 times (each call leaked two fds).
    693         for i in range(1024):
    694             # Windows raises IOError.  Others raise OSError.
    695             with self.assertRaises(EnvironmentError) as c:
    696                 subprocess.Popen(['nonexisting_i_hope'],
    697                                  stdout=subprocess.PIPE,
    698                                  stderr=subprocess.PIPE)
    699             # ignore errors that indicate the command was not found
    700             if c.exception.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EACCES):
    701                 raise c.exception
    702 
    703     @unittest.skipIf(threading is None, "threading required")
    704     def test_double_close_on_error(self):
    705         # Issue #18851
    706         fds = []
    707         def open_fds():
    708             for i in range(20):
    709                 fds.extend(os.pipe())
    710                 time.sleep(0.001)
    711         t = threading.Thread(target=open_fds)
    712         t.start()
    713         try:
    714             with self.assertRaises(EnvironmentError):
    715                 subprocess.Popen(['nonexisting_i_hope'],
    716                                  stdin=subprocess.PIPE,
    717                                  stdout=subprocess.PIPE,
    718                                  stderr=subprocess.PIPE)
    719         finally:
    720             t.join()
    721             exc = None
    722             for fd in fds:
    723                 # If a double close occurred, some of those fds will
    724                 # already have been closed by mistake, and os.close()
    725                 # here will raise.
    726                 try:
    727                     os.close(fd)
    728                 except OSError as e:
    729                     exc = e
    730             if exc is not None:
    731                 raise exc
    732 
    733     def test_handles_closed_on_exception(self):
    734         # If CreateProcess exits with an error, ensure the
    735         # duplicate output handles are released
    736         ifhandle, ifname = tempfile.mkstemp()
    737         ofhandle, ofname = tempfile.mkstemp()
    738         efhandle, efname = tempfile.mkstemp()
    739         try:
    740             subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
    741               stderr=efhandle)
    742         except OSError:
    743             os.close(ifhandle)
    744             os.remove(ifname)
    745             os.close(ofhandle)
    746             os.remove(ofname)
    747             os.close(efhandle)
    748             os.remove(efname)
    749         self.assertFalse(os.path.exists(ifname))
    750         self.assertFalse(os.path.exists(ofname))
    751         self.assertFalse(os.path.exists(efname))
    752 
    753     def test_communicate_epipe(self):
    754         # Issue 10963: communicate() should hide EPIPE
    755         p = subprocess.Popen([sys.executable, "-c", 'pass'],
    756                              stdin=subprocess.PIPE,
    757                              stdout=subprocess.PIPE,
    758                              stderr=subprocess.PIPE)
    759         self.addCleanup(p.stdout.close)
    760         self.addCleanup(p.stderr.close)
    761         self.addCleanup(p.stdin.close)
    762         p.communicate("x" * 2**20)
    763 
    764     def test_communicate_epipe_only_stdin(self):
    765         # Issue 10963: communicate() should hide EPIPE
    766         p = subprocess.Popen([sys.executable, "-c", 'pass'],
    767                              stdin=subprocess.PIPE)
    768         self.addCleanup(p.stdin.close)
    769         time.sleep(2)
    770         p.communicate("x" * 2**20)
    771 
    772     # This test is Linux-ish specific for simplicity to at least have
    773     # some coverage.  It is not a platform specific bug.
    774     @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
    775                          "Linux specific")
    776     def test_failed_child_execute_fd_leak(self):
    777         """Test for the fork() failure fd leak reported in issue16327."""
    778         fd_directory = '/proc/%d/fd' % os.getpid()
    779         fds_before_popen = os.listdir(fd_directory)
    780         with self.assertRaises(PopenTestException):
    781             PopenExecuteChildRaises(
    782                     [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
    783                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    784 
    785         # NOTE: This test doesn't verify that the real _execute_child
    786         # does not close the file descriptors itself on the way out
    787         # during an exception.  Code inspection has confirmed that.
    788 
    789         fds_after_exception = os.listdir(fd_directory)
    790         self.assertEqual(fds_before_popen, fds_after_exception)
    791 
    792 
    793 # context manager
    794 class _SuppressCoreFiles(object):
    795     """Try to prevent core files from being created."""
    796     old_limit = None
    797 
    798     def __enter__(self):
    799         """Try to save previous ulimit, then set it to (0, 0)."""
    800         if resource is not None:
    801             try:
    802                 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
    803                 resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
    804             except (ValueError, resource.error):
    805                 pass
    806 
    807         if sys.platform == 'darwin':
    808             # Check if the 'Crash Reporter' on OSX was configured
    809             # in 'Developer' mode and warn that it will get triggered
    810             # when it is.
    811             #
    812             # This assumes that this context manager is used in tests
    813             # that might trigger the next manager.
    814             value = subprocess.Popen(['/usr/bin/defaults', 'read',
    815                     'com.apple.CrashReporter', 'DialogType'],
    816                     stdout=subprocess.PIPE).communicate()[0]
    817             if value.strip() == b'developer':
    818                 print "this tests triggers the Crash Reporter, that is intentional"
    819                 sys.stdout.flush()
    820 
    821     def __exit__(self, *args):
    822         """Return core file behavior to default."""
    823         if self.old_limit is None:
    824             return
    825         if resource is not None:
    826             try:
    827                 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
    828             except (ValueError, resource.error):
    829                 pass
    830 
    831     @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
    832                          "Requires signal.SIGALRM")
    833     def test_communicate_eintr(self):
    834         # Issue #12493: communicate() should handle EINTR
    835         def handler(signum, frame):
    836             pass
    837         old_handler = signal.signal(signal.SIGALRM, handler)
    838         self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
    839 
    840         # the process is running for 2 seconds
    841         args = [sys.executable, "-c", 'import time; time.sleep(2)']
    842         for stream in ('stdout', 'stderr'):
    843             kw = {stream: subprocess.PIPE}
    844             with subprocess.Popen(args, **kw) as process:
    845                 signal.alarm(1)
    846                 try:
    847                     # communicate() will be interrupted by SIGALRM
    848                     process.communicate()
    849                 finally:
    850                     signal.alarm(0)
    851 
    852 
    853 @unittest.skipIf(mswindows, "POSIX specific tests")
    854 class POSIXProcessTestCase(BaseTestCase):
    855 
    856     def test_exceptions(self):
    857         # caught & re-raised exceptions
    858         with self.assertRaises(OSError) as c:
    859             p = subprocess.Popen([sys.executable, "-c", ""],
    860                                  cwd="/this/path/does/not/exist")
    861         # The attribute child_traceback should contain "os.chdir" somewhere.
    862         self.assertIn("os.chdir", c.exception.child_traceback)
    863 
    864     def test_run_abort(self):
    865         # returncode handles signal termination
    866         with _SuppressCoreFiles():
    867             p = subprocess.Popen([sys.executable, "-c",
    868                                   "import os; os.abort()"])
    869             p.wait()
    870         self.assertEqual(-p.returncode, signal.SIGABRT)
    871 
    872     def test_preexec(self):
    873         # preexec function
    874         p = subprocess.Popen([sys.executable, "-c",
    875                               "import sys, os;"
    876                               "sys.stdout.write(os.getenv('FRUIT'))"],
    877                              stdout=subprocess.PIPE,
    878                              preexec_fn=lambda: os.putenv("FRUIT", "apple"))
    879         self.addCleanup(p.stdout.close)
    880         self.assertEqual(p.stdout.read(), "apple")
    881 
    882     class _TestExecuteChildPopen(subprocess.Popen):
    883         """Used to test behavior at the end of _execute_child."""
    884         def __init__(self, testcase, *args, **kwargs):
    885             self._testcase = testcase
    886             subprocess.Popen.__init__(self, *args, **kwargs)
    887 
    888         def _execute_child(
    889                 self, args, executable, preexec_fn, close_fds, cwd, env,
    890                 universal_newlines, startupinfo, creationflags, shell, to_close,
    891                 p2cread, p2cwrite,
    892                 c2pread, c2pwrite,
    893                 errread, errwrite):
    894             try:
    895                 subprocess.Popen._execute_child(
    896                         self, args, executable, preexec_fn, close_fds,
    897                         cwd, env, universal_newlines,
    898                         startupinfo, creationflags, shell, to_close,
    899                         p2cread, p2cwrite,
    900                         c2pread, c2pwrite,
    901                         errread, errwrite)
    902             finally:
    903                 # Open a bunch of file descriptors and verify that
    904                 # none of them are the same as the ones the Popen
    905                 # instance is using for stdin/stdout/stderr.
    906                 devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
    907                                for _ in range(8)]
    908                 try:
    909                     for fd in devzero_fds:
    910                         self._testcase.assertNotIn(
    911                                 fd, (p2cwrite, c2pread, errread))
    912                 finally:
    913                     for fd in devzero_fds:
    914                         os.close(fd)
    915 
    916     @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
    917     def test_preexec_errpipe_does_not_double_close_pipes(self):
    918         """Issue16140: Don't double close pipes on preexec error."""
    919 
    920         def raise_it():
    921             raise RuntimeError("force the _execute_child() errpipe_data path.")
    922 
    923         with self.assertRaises(RuntimeError):
    924             self._TestExecuteChildPopen(
    925                     self, [sys.executable, "-c", "pass"],
    926                     stdin=subprocess.PIPE, stdout=subprocess.PIPE,
    927                     stderr=subprocess.PIPE, preexec_fn=raise_it)
    928 
    929     def test_args_string(self):
    930         # args is a string
    931         f, fname = tempfile.mkstemp()
    932         os.write(f, "#!/bin/sh\n")
    933         os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
    934                     sys.executable)
    935         os.close(f)
    936         os.chmod(fname, 0o700)
    937         p = subprocess.Popen(fname)
    938         p.wait()
    939         os.remove(fname)
    940         self.assertEqual(p.returncode, 47)
    941 
    942     def test_invalid_args(self):
    943         # invalid arguments should raise ValueError
    944         self.assertRaises(ValueError, subprocess.call,
    945                           [sys.executable, "-c",
    946                            "import sys; sys.exit(47)"],
    947                           startupinfo=47)
    948         self.assertRaises(ValueError, subprocess.call,
    949                           [sys.executable, "-c",
    950                            "import sys; sys.exit(47)"],
    951                           creationflags=47)
    952 
    953     def test_shell_sequence(self):
    954         # Run command through the shell (sequence)
    955         newenv = os.environ.copy()
    956         newenv["FRUIT"] = "apple"
    957         p = subprocess.Popen(["echo $FRUIT"], shell=1,
    958                              stdout=subprocess.PIPE,
    959                              env=newenv)
    960         self.addCleanup(p.stdout.close)
    961         self.assertEqual(p.stdout.read().strip(), "apple")
    962 
    963     def test_shell_string(self):
    964         # Run command through the shell (string)
    965         newenv = os.environ.copy()
    966         newenv["FRUIT"] = "apple"
    967         p = subprocess.Popen("echo $FRUIT", shell=1,
    968                              stdout=subprocess.PIPE,
    969                              env=newenv)
    970         self.addCleanup(p.stdout.close)
    971         self.assertEqual(p.stdout.read().strip(), "apple")
    972 
    973     def test_call_string(self):
    974         # call() function with string argument on UNIX
    975         f, fname = tempfile.mkstemp()
    976         os.write(f, "#!/bin/sh\n")
    977         os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
    978                     sys.executable)
    979         os.close(f)
    980         os.chmod(fname, 0700)
    981         rc = subprocess.call(fname)
    982         os.remove(fname)
    983         self.assertEqual(rc, 47)
    984 
    985     def test_specific_shell(self):
    986         # Issue #9265: Incorrect name passed as arg[0].
    987         shells = []
    988         for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
    989             for name in ['bash', 'ksh']:
    990                 sh = os.path.join(prefix, name)
    991                 if os.path.isfile(sh):
    992                     shells.append(sh)
    993         if not shells: # Will probably work for any shell but csh.
    994             self.skipTest("bash or ksh required for this test")
    995         sh = '/bin/sh'
    996         if os.path.isfile(sh) and not os.path.islink(sh):
    997             # Test will fail if /bin/sh is a symlink to csh.
    998             shells.append(sh)
    999         for sh in shells:
   1000             p = subprocess.Popen("echo $0", executable=sh, shell=True,
   1001                                  stdout=subprocess.PIPE)
   1002             self.addCleanup(p.stdout.close)
   1003             self.assertEqual(p.stdout.read().strip(), sh)
   1004 
   1005     def _kill_process(self, method, *args):
   1006         # Do not inherit file handles from the parent.
   1007         # It should fix failures on some platforms.
   1008         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1009                              import sys, time
   1010                              sys.stdout.write('x\\n')
   1011                              sys.stdout.flush()
   1012                              time.sleep(30)
   1013                              """],
   1014                              close_fds=True,
   1015                              stdin=subprocess.PIPE,
   1016                              stdout=subprocess.PIPE,
   1017                              stderr=subprocess.PIPE)
   1018         # Wait for the interpreter to be completely initialized before
   1019         # sending any signal.
   1020         p.stdout.read(1)
   1021         getattr(p, method)(*args)
   1022         return p
   1023 
   1024     @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
   1025                      "Due to known OS bug (issue #16762)")
   1026     def _kill_dead_process(self, method, *args):
   1027         # Do not inherit file handles from the parent.
   1028         # It should fix failures on some platforms.
   1029         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1030                              import sys, time
   1031                              sys.stdout.write('x\\n')
   1032                              sys.stdout.flush()
   1033                              """],
   1034                              close_fds=True,
   1035                              stdin=subprocess.PIPE,
   1036                              stdout=subprocess.PIPE,
   1037                              stderr=subprocess.PIPE)
   1038         # Wait for the interpreter to be completely initialized before
   1039         # sending any signal.
   1040         p.stdout.read(1)
   1041         # The process should end after this
   1042         time.sleep(1)
   1043         # This shouldn't raise even though the child is now dead
   1044         getattr(p, method)(*args)
   1045         p.communicate()
   1046 
   1047     def test_send_signal(self):
   1048         p = self._kill_process('send_signal', signal.SIGINT)
   1049         _, stderr = p.communicate()
   1050         self.assertIn('KeyboardInterrupt', stderr)
   1051         self.assertNotEqual(p.wait(), 0)
   1052 
   1053     def test_kill(self):
   1054         p = self._kill_process('kill')
   1055         _, stderr = p.communicate()
   1056         self.assertStderrEqual(stderr, '')
   1057         self.assertEqual(p.wait(), -signal.SIGKILL)
   1058 
   1059     def test_terminate(self):
   1060         p = self._kill_process('terminate')
   1061         _, stderr = p.communicate()
   1062         self.assertStderrEqual(stderr, '')
   1063         self.assertEqual(p.wait(), -signal.SIGTERM)
   1064 
   1065     def test_send_signal_dead(self):
   1066         # Sending a signal to a dead process
   1067         self._kill_dead_process('send_signal', signal.SIGINT)
   1068 
   1069     def test_kill_dead(self):
   1070         # Killing a dead process
   1071         self._kill_dead_process('kill')
   1072 
   1073     def test_terminate_dead(self):
   1074         # Terminating a dead process
   1075         self._kill_dead_process('terminate')
   1076 
   1077     def check_close_std_fds(self, fds):
   1078         # Issue #9905: test that subprocess pipes still work properly with
   1079         # some standard fds closed
   1080         stdin = 0
   1081         newfds = []
   1082         for a in fds:
   1083             b = os.dup(a)
   1084             newfds.append(b)
   1085             if a == 0:
   1086                 stdin = b
   1087         try:
   1088             for fd in fds:
   1089                 os.close(fd)
   1090             out, err = subprocess.Popen([sys.executable, "-c",
   1091                               'import sys;'
   1092                               'sys.stdout.write("apple");'
   1093                               'sys.stdout.flush();'
   1094                               'sys.stderr.write("orange")'],
   1095                        stdin=stdin,
   1096                        stdout=subprocess.PIPE,
   1097                        stderr=subprocess.PIPE).communicate()
   1098             err = test_support.strip_python_stderr(err)
   1099             self.assertEqual((out, err), (b'apple', b'orange'))
   1100         finally:
   1101             for b, a in zip(newfds, fds):
   1102                 os.dup2(b, a)
   1103             for b in newfds:
   1104                 os.close(b)
   1105 
   1106     def test_close_fd_0(self):
   1107         self.check_close_std_fds([0])
   1108 
   1109     def test_close_fd_1(self):
   1110         self.check_close_std_fds([1])
   1111 
   1112     def test_close_fd_2(self):
   1113         self.check_close_std_fds([2])
   1114 
   1115     def test_close_fds_0_1(self):
   1116         self.check_close_std_fds([0, 1])
   1117 
   1118     def test_close_fds_0_2(self):
   1119         self.check_close_std_fds([0, 2])
   1120 
   1121     def test_close_fds_1_2(self):
   1122         self.check_close_std_fds([1, 2])
   1123 
   1124     def test_close_fds_0_1_2(self):
   1125         # Issue #10806: test that subprocess pipes still work properly with
   1126         # all standard fds closed.
   1127         self.check_close_std_fds([0, 1, 2])
   1128 
   1129     def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
   1130         # open up some temporary files
   1131         temps = [tempfile.mkstemp() for i in range(3)]
   1132         temp_fds = [fd for fd, fname in temps]
   1133         try:
   1134             # unlink the files -- we won't need to reopen them
   1135             for fd, fname in temps:
   1136                 os.unlink(fname)
   1137 
   1138             # save a copy of the standard file descriptors
   1139             saved_fds = [os.dup(fd) for fd in range(3)]
   1140             try:
   1141                 # duplicate the temp files over the standard fd's 0, 1, 2
   1142                 for fd, temp_fd in enumerate(temp_fds):
   1143                     os.dup2(temp_fd, fd)
   1144 
   1145                 # write some data to what will become stdin, and rewind
   1146                 os.write(stdin_no, b"STDIN")
   1147                 os.lseek(stdin_no, 0, 0)
   1148 
   1149                 # now use those files in the given order, so that subprocess
   1150                 # has to rearrange them in the child
   1151                 p = subprocess.Popen([sys.executable, "-c",
   1152                     'import sys; got = sys.stdin.read();'
   1153                     'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
   1154                     stdin=stdin_no,
   1155                     stdout=stdout_no,
   1156                     stderr=stderr_no)
   1157                 p.wait()
   1158 
   1159                 for fd in temp_fds:
   1160                     os.lseek(fd, 0, 0)
   1161 
   1162                 out = os.read(stdout_no, 1024)
   1163                 err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
   1164             finally:
   1165                 for std, saved in enumerate(saved_fds):
   1166                     os.dup2(saved, std)
   1167                     os.close(saved)
   1168 
   1169             self.assertEqual(out, b"got STDIN")
   1170             self.assertEqual(err, b"err")
   1171 
   1172         finally:
   1173             for fd in temp_fds:
   1174                 os.close(fd)
   1175 
   1176     # When duping fds, if there arises a situation where one of the fds is
   1177     # either 0, 1 or 2, it is possible that it is overwritten (#12607).
   1178     # This tests all combinations of this.
   1179     def test_swap_fds(self):
   1180         self.check_swap_fds(0, 1, 2)
   1181         self.check_swap_fds(0, 2, 1)
   1182         self.check_swap_fds(1, 0, 2)
   1183         self.check_swap_fds(1, 2, 0)
   1184         self.check_swap_fds(2, 0, 1)
   1185         self.check_swap_fds(2, 1, 0)
   1186 
   1187     def test_wait_when_sigchild_ignored(self):
   1188         # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
   1189         sigchild_ignore = test_support.findfile("sigchild_ignore.py",
   1190                                                 subdir="subprocessdata")
   1191         p = subprocess.Popen([sys.executable, sigchild_ignore],
   1192                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
   1193         stdout, stderr = p.communicate()
   1194         self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
   1195                          " non-zero with this error:\n%s" % stderr)
   1196 
   1197     def test_zombie_fast_process_del(self):
   1198         # Issue #12650: on Unix, if Popen.__del__() was called before the
   1199         # process exited, it wouldn't be added to subprocess._active, and would
   1200         # remain a zombie.
   1201         # spawn a Popen, and delete its reference before it exits
   1202         p = subprocess.Popen([sys.executable, "-c",
   1203                               'import sys, time;'
   1204                               'time.sleep(0.2)'],
   1205                              stdout=subprocess.PIPE,
   1206                              stderr=subprocess.PIPE)
   1207         self.addCleanup(p.stdout.close)
   1208         self.addCleanup(p.stderr.close)
   1209         ident = id(p)
   1210         pid = p.pid
   1211         del p
   1212         # check that p is in the active processes list
   1213         self.assertIn(ident, [id(o) for o in subprocess._active])
   1214 
   1215     def test_leak_fast_process_del_killed(self):
   1216         # Issue #12650: on Unix, if Popen.__del__() was called before the
   1217         # process exited, and the process got killed by a signal, it would never
   1218         # be removed from subprocess._active, which triggered a FD and memory
   1219         # leak.
   1220         # spawn a Popen, delete its reference and kill it
   1221         p = subprocess.Popen([sys.executable, "-c",
   1222                               'import time;'
   1223                               'time.sleep(3)'],
   1224                              stdout=subprocess.PIPE,
   1225                              stderr=subprocess.PIPE)
   1226         self.addCleanup(p.stdout.close)
   1227         self.addCleanup(p.stderr.close)
   1228         ident = id(p)
   1229         pid = p.pid
   1230         del p
   1231         os.kill(pid, signal.SIGKILL)
   1232         # check that p is in the active processes list
   1233         self.assertIn(ident, [id(o) for o in subprocess._active])
   1234 
   1235         # let some time for the process to exit, and create a new Popen: this
   1236         # should trigger the wait() of p
   1237         time.sleep(0.2)
   1238         with self.assertRaises(EnvironmentError) as c:
   1239             with subprocess.Popen(['nonexisting_i_hope'],
   1240                                   stdout=subprocess.PIPE,
   1241                                   stderr=subprocess.PIPE) as proc:
   1242                 pass
   1243         # p should have been wait()ed on, and removed from the _active list
   1244         self.assertRaises(OSError, os.waitpid, pid, 0)
   1245         self.assertNotIn(ident, [id(o) for o in subprocess._active])
   1246 
   1247     def test_pipe_cloexec(self):
   1248         # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
   1249         # and are not inherited by another child process.
   1250         p1 = subprocess.Popen([sys.executable, "-c",
   1251                                'import os;'
   1252                                'os.read(0, 1)'
   1253                               ],
   1254                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   1255                               stderr=subprocess.PIPE)
   1256 
   1257         p2 = subprocess.Popen([sys.executable, "-c", """if True:
   1258                                import os, errno, sys
   1259                                for fd in %r:
   1260                                    try:
   1261                                        os.close(fd)
   1262                                    except OSError as e:
   1263                                        if e.errno != errno.EBADF:
   1264                                            raise
   1265                                    else:
   1266                                        sys.exit(1)
   1267                                sys.exit(0)
   1268                                """ % [f.fileno() for f in (p1.stdin, p1.stdout,
   1269                                                            p1.stderr)]
   1270                               ],
   1271                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   1272                               stderr=subprocess.PIPE, close_fds=False)
   1273         p1.communicate('foo')
   1274         _, stderr = p2.communicate()
   1275 
   1276         self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
   1277 
   1278     @unittest.skipUnless(_testcapi is not None
   1279                          and hasattr(_testcapi, 'W_STOPCODE'),
   1280                          'need _testcapi.W_STOPCODE')
   1281     def test_stopped(self):
   1282         """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
   1283         args = [sys.executable, '-c', 'pass']
   1284         proc = subprocess.Popen(args)
   1285 
   1286         # Wait until the real process completes to avoid zombie process
   1287         pid = proc.pid
   1288         pid, status = os.waitpid(pid, 0)
   1289         self.assertEqual(status, 0)
   1290 
   1291         status = _testcapi.W_STOPCODE(3)
   1292 
   1293         def mock_waitpid(pid, flags):
   1294             return (pid, status)
   1295 
   1296         with test_support.swap_attr(os, 'waitpid', mock_waitpid):
   1297             returncode = proc.wait()
   1298 
   1299         self.assertEqual(returncode, -3)
   1300 
   1301 
   1302 @unittest.skipUnless(mswindows, "Windows specific tests")
   1303 class Win32ProcessTestCase(BaseTestCase):
   1304 
   1305     def test_startupinfo(self):
   1306         # startupinfo argument
   1307         # We uses hardcoded constants, because we do not want to
   1308         # depend on win32all.
   1309         STARTF_USESHOWWINDOW = 1
   1310         SW_MAXIMIZE = 3
   1311         startupinfo = subprocess.STARTUPINFO()
   1312         startupinfo.dwFlags = STARTF_USESHOWWINDOW
   1313         startupinfo.wShowWindow = SW_MAXIMIZE
   1314         # Since Python is a console process, it won't be affected
   1315         # by wShowWindow, but the argument should be silently
   1316         # ignored
   1317         subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
   1318                         startupinfo=startupinfo)
   1319 
   1320     def test_creationflags(self):
   1321         # creationflags argument
   1322         CREATE_NEW_CONSOLE = 16
   1323         sys.stderr.write("    a DOS box should flash briefly ...\n")
   1324         subprocess.call(sys.executable +
   1325                         ' -c "import time; time.sleep(0.25)"',
   1326                         creationflags=CREATE_NEW_CONSOLE)
   1327 
   1328     def test_invalid_args(self):
   1329         # invalid arguments should raise ValueError
   1330         self.assertRaises(ValueError, subprocess.call,
   1331                           [sys.executable, "-c",
   1332                            "import sys; sys.exit(47)"],
   1333                           preexec_fn=lambda: 1)
   1334         self.assertRaises(ValueError, subprocess.call,
   1335                           [sys.executable, "-c",
   1336                            "import sys; sys.exit(47)"],
   1337                           stdout=subprocess.PIPE,
   1338                           close_fds=True)
   1339 
   1340     def test_close_fds(self):
   1341         # close file descriptors
   1342         rc = subprocess.call([sys.executable, "-c",
   1343                               "import sys; sys.exit(47)"],
   1344                               close_fds=True)
   1345         self.assertEqual(rc, 47)
   1346 
   1347     def test_shell_sequence(self):
   1348         # Run command through the shell (sequence)
   1349         newenv = os.environ.copy()
   1350         newenv["FRUIT"] = "physalis"
   1351         p = subprocess.Popen(["set"], shell=1,
   1352                              stdout=subprocess.PIPE,
   1353                              env=newenv)
   1354         self.addCleanup(p.stdout.close)
   1355         self.assertIn("physalis", p.stdout.read())
   1356 
   1357     def test_shell_string(self):
   1358         # Run command through the shell (string)
   1359         newenv = os.environ.copy()
   1360         newenv["FRUIT"] = "physalis"
   1361         p = subprocess.Popen("set", shell=1,
   1362                              stdout=subprocess.PIPE,
   1363                              env=newenv)
   1364         self.addCleanup(p.stdout.close)
   1365         self.assertIn("physalis", p.stdout.read())
   1366 
   1367     def test_call_string(self):
   1368         # call() function with string argument on Windows
   1369         rc = subprocess.call(sys.executable +
   1370                              ' -c "import sys; sys.exit(47)"')
   1371         self.assertEqual(rc, 47)
   1372 
   1373     def _kill_process(self, method, *args):
   1374         # Some win32 buildbot raises EOFError if stdin is inherited
   1375         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1376                              import sys, time
   1377                              sys.stdout.write('x\\n')
   1378                              sys.stdout.flush()
   1379                              time.sleep(30)
   1380                              """],
   1381                              stdin=subprocess.PIPE,
   1382                              stdout=subprocess.PIPE,
   1383                              stderr=subprocess.PIPE)
   1384         self.addCleanup(p.stdout.close)
   1385         self.addCleanup(p.stderr.close)
   1386         self.addCleanup(p.stdin.close)
   1387         # Wait for the interpreter to be completely initialized before
   1388         # sending any signal.
   1389         p.stdout.read(1)
   1390         getattr(p, method)(*args)
   1391         _, stderr = p.communicate()
   1392         self.assertStderrEqual(stderr, '')
   1393         returncode = p.wait()
   1394         self.assertNotEqual(returncode, 0)
   1395 
   1396     def _kill_dead_process(self, method, *args):
   1397         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1398                              import sys, time
   1399                              sys.stdout.write('x\\n')
   1400                              sys.stdout.flush()
   1401                              sys.exit(42)
   1402                              """],
   1403                              stdin=subprocess.PIPE,
   1404                              stdout=subprocess.PIPE,
   1405                              stderr=subprocess.PIPE)
   1406         self.addCleanup(p.stdout.close)
   1407         self.addCleanup(p.stderr.close)
   1408         self.addCleanup(p.stdin.close)
   1409         # Wait for the interpreter to be completely initialized before
   1410         # sending any signal.
   1411         p.stdout.read(1)
   1412         # The process should end after this
   1413         time.sleep(1)
   1414         # This shouldn't raise even though the child is now dead
   1415         getattr(p, method)(*args)
   1416         _, stderr = p.communicate()
   1417         self.assertStderrEqual(stderr, b'')
   1418         rc = p.wait()
   1419         self.assertEqual(rc, 42)
   1420 
   1421     def test_send_signal(self):
   1422         self._kill_process('send_signal', signal.SIGTERM)
   1423 
   1424     def test_kill(self):
   1425         self._kill_process('kill')
   1426 
   1427     def test_terminate(self):
   1428         self._kill_process('terminate')
   1429 
   1430     def test_send_signal_dead(self):
   1431         self._kill_dead_process('send_signal', signal.SIGTERM)
   1432 
   1433     def test_kill_dead(self):
   1434         self._kill_dead_process('kill')
   1435 
   1436     def test_terminate_dead(self):
   1437         self._kill_dead_process('terminate')
   1438 
   1439 
   1440 @unittest.skipUnless(getattr(subprocess, '_has_poll', False),
   1441                      "poll system call not supported")
   1442 class ProcessTestCaseNoPoll(ProcessTestCase):
   1443     def setUp(self):
   1444         subprocess._has_poll = False
   1445         ProcessTestCase.setUp(self)
   1446 
   1447     def tearDown(self):
   1448         subprocess._has_poll = True
   1449         ProcessTestCase.tearDown(self)
   1450 
   1451 
   1452 class HelperFunctionTests(unittest.TestCase):
   1453     @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
   1454     def test_eintr_retry_call(self):
   1455         record_calls = []
   1456         def fake_os_func(*args):
   1457             record_calls.append(args)
   1458             if len(record_calls) == 2:
   1459                 raise OSError(errno.EINTR, "fake interrupted system call")
   1460             return tuple(reversed(args))
   1461 
   1462         self.assertEqual((999, 256),
   1463                          subprocess._eintr_retry_call(fake_os_func, 256, 999))
   1464         self.assertEqual([(256, 999)], record_calls)
   1465         # This time there will be an EINTR so it will loop once.
   1466         self.assertEqual((666,),
   1467                          subprocess._eintr_retry_call(fake_os_func, 666))
   1468         self.assertEqual([(256, 999), (666,), (666,)], record_calls)
   1469 
   1470 @unittest.skipUnless(mswindows, "mswindows only")
   1471 class CommandsWithSpaces (BaseTestCase):
   1472 
   1473     def setUp(self):
   1474         super(CommandsWithSpaces, self).setUp()
   1475         f, fname = tempfile.mkstemp(".py", "te st")
   1476         self.fname = fname.lower ()
   1477         os.write(f, b"import sys;"
   1478                     b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
   1479         )
   1480         os.close(f)
   1481 
   1482     def tearDown(self):
   1483         os.remove(self.fname)
   1484         super(CommandsWithSpaces, self).tearDown()
   1485 
   1486     def with_spaces(self, *args, **kwargs):
   1487         kwargs['stdout'] = subprocess.PIPE
   1488         p = subprocess.Popen(*args, **kwargs)
   1489         self.addCleanup(p.stdout.close)
   1490         self.assertEqual(
   1491           p.stdout.read ().decode("mbcs"),
   1492           "2 [%r, 'ab cd']" % self.fname
   1493         )
   1494 
   1495     def test_shell_string_with_spaces(self):
   1496         # call() function with string argument with spaces on Windows
   1497         self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
   1498                                              "ab cd"), shell=1)
   1499 
   1500     def test_shell_sequence_with_spaces(self):
   1501         # call() function with sequence argument with spaces on Windows
   1502         self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
   1503 
   1504     def test_noshell_string_with_spaces(self):
   1505         # call() function with string argument with spaces on Windows
   1506         self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
   1507                              "ab cd"))
   1508 
   1509     def test_noshell_sequence_with_spaces(self):
   1510         # call() function with sequence argument with spaces on Windows
   1511         self.with_spaces([sys.executable, self.fname, "ab cd"])
   1512 
   1513 def test_main():
   1514     unit_tests = (ProcessTestCase,
   1515                   POSIXProcessTestCase,
   1516                   Win32ProcessTestCase,
   1517                   ProcessTestCaseNoPoll,
   1518                   HelperFunctionTests,
   1519                   CommandsWithSpaces)
   1520 
   1521     test_support.run_unittest(*unit_tests)
   1522     test_support.reap_children()
   1523 
   1524 if __name__ == "__main__":
   1525     test_main()
   1526