Home | History | Annotate | Download | only in test
      1 import unittest
      2 from unittest import mock
      3 from test import support
      4 import subprocess
      5 import sys
      6 import platform
      7 import signal
      8 import io
      9 import os
     10 import errno
     11 import tempfile
     12 import time
     13 import selectors
     14 import sysconfig
     15 import select
     16 import shutil
     17 import gc
     18 import textwrap
     19 
     20 try:
     21     import ctypes
     22 except ImportError:
     23     ctypes = None
     24 
     25 try:
     26     import threading
     27 except ImportError:
     28     threading = None
     29 
     30 if support.PGO:
     31     raise unittest.SkipTest("test is not helpful for PGO")
     32 
     33 mswindows = (sys.platform == "win32")
     34 
     35 #
     36 # Depends on the following external programs: Python
     37 #
     38 
     39 if mswindows:
     40     SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
     41                                                 'os.O_BINARY);')
     42 else:
     43     SETBINARY = ''
     44 
     45 
     46 class BaseTestCase(unittest.TestCase):
     47     def setUp(self):
     48         # Try to minimize the number of children we have so this test
     49         # doesn't crash on some buildbots (Alphas in particular).
     50         support.reap_children()
     51 
     52     def tearDown(self):
     53         for inst in subprocess._active:
     54             inst.wait()
     55         subprocess._cleanup()
     56         self.assertFalse(subprocess._active, "subprocess._active not empty")
     57 
     58     def assertStderrEqual(self, stderr, expected, msg=None):
     59         # In a debug build, stuff like "[6580 refs]" is printed to stderr at
     60         # shutdown time.  That frustrates tests trying to check stderr produced
     61         # from a spawned Python process.
     62         actual = support.strip_python_stderr(stderr)
     63         # strip_python_stderr also strips whitespace, so we do too.
     64         expected = expected.strip()
     65         self.assertEqual(actual, expected, msg)
     66 
     67 
     68 class PopenTestException(Exception):
     69     pass
     70 
     71 
     72 class PopenExecuteChildRaises(subprocess.Popen):
     73     """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
     74     _execute_child fails.
     75     """
     76     def _execute_child(self, *args, **kwargs):
     77         raise PopenTestException("Forced Exception for Test")
     78 
     79 
     80 class ProcessTestCase(BaseTestCase):
     81 
     82     def test_io_buffered_by_default(self):
     83         p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
     84                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
     85                              stderr=subprocess.PIPE)
     86         try:
     87             self.assertIsInstance(p.stdin, io.BufferedIOBase)
     88             self.assertIsInstance(p.stdout, io.BufferedIOBase)
     89             self.assertIsInstance(p.stderr, io.BufferedIOBase)
     90         finally:
     91             p.stdin.close()
     92             p.stdout.close()
     93             p.stderr.close()
     94             p.wait()
     95 
     96     def test_io_unbuffered_works(self):
     97         p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
     98                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
     99                              stderr=subprocess.PIPE, bufsize=0)
    100         try:
    101             self.assertIsInstance(p.stdin, io.RawIOBase)
    102             self.assertIsInstance(p.stdout, io.RawIOBase)
    103             self.assertIsInstance(p.stderr, io.RawIOBase)
    104         finally:
    105             p.stdin.close()
    106             p.stdout.close()
    107             p.stderr.close()
    108             p.wait()
    109 
    110     def test_call_seq(self):
    111         # call() function with sequence argument
    112         rc = subprocess.call([sys.executable, "-c",
    113                               "import sys; sys.exit(47)"])
    114         self.assertEqual(rc, 47)
    115 
    116     def test_call_timeout(self):
    117         # call() function with timeout argument; we want to test that the child
    118         # process gets killed when the timeout expires.  If the child isn't
    119         # killed, this call will deadlock since subprocess.call waits for the
    120         # child.
    121         self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
    122                           [sys.executable, "-c", "while True: pass"],
    123                           timeout=0.1)
    124 
    125     def test_check_call_zero(self):
    126         # check_call() function with zero return code
    127         rc = subprocess.check_call([sys.executable, "-c",
    128                                     "import sys; sys.exit(0)"])
    129         self.assertEqual(rc, 0)
    130 
    131     def test_check_call_nonzero(self):
    132         # check_call() function with non-zero return code
    133         with self.assertRaises(subprocess.CalledProcessError) as c:
    134             subprocess.check_call([sys.executable, "-c",
    135                                    "import sys; sys.exit(47)"])
    136         self.assertEqual(c.exception.returncode, 47)
    137 
    138     def test_check_output(self):
    139         # check_output() function with zero return code
    140         output = subprocess.check_output(
    141                 [sys.executable, "-c", "print('BDFL')"])
    142         self.assertIn(b'BDFL', output)
    143 
    144     def test_check_output_nonzero(self):
    145         # check_call() function with non-zero return code
    146         with self.assertRaises(subprocess.CalledProcessError) as c:
    147             subprocess.check_output(
    148                     [sys.executable, "-c", "import sys; sys.exit(5)"])
    149         self.assertEqual(c.exception.returncode, 5)
    150 
    151     def test_check_output_stderr(self):
    152         # check_output() function stderr redirected to stdout
    153         output = subprocess.check_output(
    154                 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
    155                 stderr=subprocess.STDOUT)
    156         self.assertIn(b'BDFL', output)
    157 
    158     def test_check_output_stdin_arg(self):
    159         # check_output() can be called with stdin set to a file
    160         tf = tempfile.TemporaryFile()
    161         self.addCleanup(tf.close)
    162         tf.write(b'pear')
    163         tf.seek(0)
    164         output = subprocess.check_output(
    165                 [sys.executable, "-c",
    166                  "import sys; sys.stdout.write(sys.stdin.read().upper())"],
    167                 stdin=tf)
    168         self.assertIn(b'PEAR', output)
    169 
    170     def test_check_output_input_arg(self):
    171         # check_output() can be called with input set to a string
    172         output = subprocess.check_output(
    173                 [sys.executable, "-c",
    174                  "import sys; sys.stdout.write(sys.stdin.read().upper())"],
    175                 input=b'pear')
    176         self.assertIn(b'PEAR', output)
    177 
    178     def test_check_output_stdout_arg(self):
    179         # check_output() refuses to accept 'stdout' argument
    180         with self.assertRaises(ValueError) as c:
    181             output = subprocess.check_output(
    182                     [sys.executable, "-c", "print('will not be run')"],
    183                     stdout=sys.stdout)
    184             self.fail("Expected ValueError when stdout arg supplied.")
    185         self.assertIn('stdout', c.exception.args[0])
    186 
    187     def test_check_output_stdin_with_input_arg(self):
    188         # check_output() refuses to accept 'stdin' with 'input'
    189         tf = tempfile.TemporaryFile()
    190         self.addCleanup(tf.close)
    191         tf.write(b'pear')
    192         tf.seek(0)
    193         with self.assertRaises(ValueError) as c:
    194             output = subprocess.check_output(
    195                     [sys.executable, "-c", "print('will not be run')"],
    196                     stdin=tf, input=b'hare')
    197             self.fail("Expected ValueError when stdin and input args supplied.")
    198         self.assertIn('stdin', c.exception.args[0])
    199         self.assertIn('input', c.exception.args[0])
    200 
    201     def test_check_output_timeout(self):
    202         # check_output() function with timeout arg
    203         with self.assertRaises(subprocess.TimeoutExpired) as c:
    204             output = subprocess.check_output(
    205                     [sys.executable, "-c",
    206                      "import sys, time\n"
    207                      "sys.stdout.write('BDFL')\n"
    208                      "sys.stdout.flush()\n"
    209                      "time.sleep(3600)"],
    210                     # Some heavily loaded buildbots (sparc Debian 3.x) require
    211                     # this much time to start and print.
    212                     timeout=3)
    213             self.fail("Expected TimeoutExpired.")
    214         self.assertEqual(c.exception.output, b'BDFL')
    215 
    216     def test_call_kwargs(self):
    217         # call() function with keyword args
    218         newenv = os.environ.copy()
    219         newenv["FRUIT"] = "banana"
    220         rc = subprocess.call([sys.executable, "-c",
    221                               'import sys, os;'
    222                               'sys.exit(os.getenv("FRUIT")=="banana")'],
    223                              env=newenv)
    224         self.assertEqual(rc, 1)
    225 
    226     def test_invalid_args(self):
    227         # Popen() called with invalid arguments should raise TypeError
    228         # but Popen.__del__ should not complain (issue #12085)
    229         with support.captured_stderr() as s:
    230             self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
    231             argcount = subprocess.Popen.__init__.__code__.co_argcount
    232             too_many_args = [0] * (argcount + 1)
    233             self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
    234         self.assertEqual(s.getvalue(), '')
    235 
    236     def test_stdin_none(self):
    237         # .stdin is None when not redirected
    238         p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
    239                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    240         self.addCleanup(p.stdout.close)
    241         self.addCleanup(p.stderr.close)
    242         p.wait()
    243         self.assertEqual(p.stdin, None)
    244 
    245     def test_stdout_none(self):
    246         # .stdout is None when not redirected, and the child's stdout will
    247         # be inherited from the parent.  In order to test this we run a
    248         # subprocess in a subprocess:
    249         # this_test
    250         #   \-- subprocess created by this test (parent)
    251         #          \-- subprocess created by the parent subprocess (child)
    252         # The parent doesn't specify stdout, so the child will use the
    253         # parent's stdout.  This test checks that the message printed by the
    254         # child goes to the parent stdout.  The parent also checks that the
    255         # child's stdout is None.  See #11963.
    256         code = ('import sys; from subprocess import Popen, PIPE;'
    257                 'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],'
    258                 '          stdin=PIPE, stderr=PIPE);'
    259                 'p.wait(); assert p.stdout is None;')
    260         p = subprocess.Popen([sys.executable, "-c", code],
    261                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    262         self.addCleanup(p.stdout.close)
    263         self.addCleanup(p.stderr.close)
    264         out, err = p.communicate()
    265         self.assertEqual(p.returncode, 0, err)
    266         self.assertEqual(out.rstrip(), b'test_stdout_none')
    267 
    268     def test_stderr_none(self):
    269         # .stderr is None when not redirected
    270         p = subprocess.Popen([sys.executable, "-c", 'print("banana")'],
    271                          stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    272         self.addCleanup(p.stdout.close)
    273         self.addCleanup(p.stdin.close)
    274         p.wait()
    275         self.assertEqual(p.stderr, None)
    276 
    277     def _assert_python(self, pre_args, **kwargs):
    278         # We include sys.exit() to prevent the test runner from hanging
    279         # whenever python is found.
    280         args = pre_args + ["import sys; sys.exit(47)"]
    281         p = subprocess.Popen(args, **kwargs)
    282         p.wait()
    283         self.assertEqual(47, p.returncode)
    284 
    285     def test_executable(self):
    286         # Check that the executable argument works.
    287         #
    288         # On Unix (non-Mac and non-Windows), Python looks at args[0] to
    289         # determine where its standard library is, so we need the directory
    290         # of args[0] to be valid for the Popen() call to Python to succeed.
    291         # See also issue #16170 and issue #7774.
    292         doesnotexist = os.path.join(os.path.dirname(sys.executable),
    293                                     "doesnotexist")
    294         self._assert_python([doesnotexist, "-c"], executable=sys.executable)
    295 
    296     def test_executable_takes_precedence(self):
    297         # Check that the executable argument takes precedence over args[0].
    298         #
    299         # Verify first that the call succeeds without the executable arg.
    300         pre_args = [sys.executable, "-c"]
    301         self._assert_python(pre_args)
    302         self.assertRaises((FileNotFoundError, PermissionError),
    303                           self._assert_python, pre_args,
    304                           executable="doesnotexist")
    305 
    306     @unittest.skipIf(mswindows, "executable argument replaces shell")
    307     def test_executable_replaces_shell(self):
    308         # Check that the executable argument replaces the default shell
    309         # when shell=True.
    310         self._assert_python([], executable=sys.executable, shell=True)
    311 
    312     # For use in the test_cwd* tests below.
    313     def _normalize_cwd(self, cwd):
    314         # Normalize an expected cwd (for Tru64 support).
    315         # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
    316         # strings.  See bug #1063571.
    317         with support.change_cwd(cwd):
    318             return os.getcwd()
    319 
    320     # For use in the test_cwd* tests below.
    321     def _split_python_path(self):
    322         # Return normalized (python_dir, python_base).
    323         python_path = os.path.realpath(sys.executable)
    324         return os.path.split(python_path)
    325 
    326     # For use in the test_cwd* tests below.
    327     def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
    328         # Invoke Python via Popen, and assert that (1) the call succeeds,
    329         # and that (2) the current working directory of the child process
    330         # matches *expected_cwd*.
    331         p = subprocess.Popen([python_arg, "-c",
    332                               "import os, sys; "
    333                               "sys.stdout.write(os.getcwd()); "
    334                               "sys.exit(47)"],
    335                               stdout=subprocess.PIPE,
    336                               **kwargs)
    337         self.addCleanup(p.stdout.close)
    338         p.wait()
    339         self.assertEqual(47, p.returncode)
    340         normcase = os.path.normcase
    341         self.assertEqual(normcase(expected_cwd),
    342                          normcase(p.stdout.read().decode("utf-8")))
    343 
    344     def test_cwd(self):
    345         # Check that cwd changes the cwd for the child process.
    346         temp_dir = tempfile.gettempdir()
    347         temp_dir = self._normalize_cwd(temp_dir)
    348         self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
    349 
    350     def test_cwd_with_pathlike(self):
    351         temp_dir = tempfile.gettempdir()
    352         temp_dir = self._normalize_cwd(temp_dir)
    353 
    354         class _PathLikeObj:
    355             def __fspath__(self):
    356                 return temp_dir
    357 
    358         self._assert_cwd(temp_dir, sys.executable, cwd=_PathLikeObj())
    359 
    360     @unittest.skipIf(mswindows, "pending resolution of issue #15533")
    361     def test_cwd_with_relative_arg(self):
    362         # Check that Popen looks for args[0] relative to cwd if args[0]
    363         # is relative.
    364         python_dir, python_base = self._split_python_path()
    365         rel_python = os.path.join(os.curdir, python_base)
    366         with support.temp_cwd() as wrong_dir:
    367             # Before calling with the correct cwd, confirm that the call fails
    368             # without cwd and with the wrong cwd.
    369             self.assertRaises(FileNotFoundError, subprocess.Popen,
    370                               [rel_python])
    371             self.assertRaises(FileNotFoundError, subprocess.Popen,
    372                               [rel_python], cwd=wrong_dir)
    373             python_dir = self._normalize_cwd(python_dir)
    374             self._assert_cwd(python_dir, rel_python, cwd=python_dir)
    375 
    376     @unittest.skipIf(mswindows, "pending resolution of issue #15533")
    377     def test_cwd_with_relative_executable(self):
    378         # Check that Popen looks for executable relative to cwd if executable
    379         # is relative (and that executable takes precedence over args[0]).
    380         python_dir, python_base = self._split_python_path()
    381         rel_python = os.path.join(os.curdir, python_base)
    382         doesntexist = "somethingyoudonthave"
    383         with support.temp_cwd() as wrong_dir:
    384             # Before calling with the correct cwd, confirm that the call fails
    385             # without cwd and with the wrong cwd.
    386             self.assertRaises(FileNotFoundError, subprocess.Popen,
    387                               [doesntexist], executable=rel_python)
    388             self.assertRaises(FileNotFoundError, subprocess.Popen,
    389                               [doesntexist], executable=rel_python,
    390                               cwd=wrong_dir)
    391             python_dir = self._normalize_cwd(python_dir)
    392             self._assert_cwd(python_dir, doesntexist, executable=rel_python,
    393                              cwd=python_dir)
    394 
    395     def test_cwd_with_absolute_arg(self):
    396         # Check that Popen can find the executable when the cwd is wrong
    397         # if args[0] is an absolute path.
    398         python_dir, python_base = self._split_python_path()
    399         abs_python = os.path.join(python_dir, python_base)
    400         rel_python = os.path.join(os.curdir, python_base)
    401         with support.temp_dir() as wrong_dir:
    402             # Before calling with an absolute path, confirm that using a
    403             # relative path fails.
    404             self.assertRaises(FileNotFoundError, subprocess.Popen,
    405                               [rel_python], cwd=wrong_dir)
    406             wrong_dir = self._normalize_cwd(wrong_dir)
    407             self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
    408 
    409     @unittest.skipIf(sys.base_prefix != sys.prefix,
    410                      'Test is not venv-compatible')
    411     def test_executable_with_cwd(self):
    412         python_dir, python_base = self._split_python_path()
    413         python_dir = self._normalize_cwd(python_dir)
    414         self._assert_cwd(python_dir, "somethingyoudonthave",
    415                          executable=sys.executable, cwd=python_dir)
    416 
    417     @unittest.skipIf(sys.base_prefix != sys.prefix,
    418                      'Test is not venv-compatible')
    419     @unittest.skipIf(sysconfig.is_python_build(),
    420                      "need an installed Python. See #7774")
    421     def test_executable_without_cwd(self):
    422         # For a normal installation, it should work without 'cwd'
    423         # argument.  For test runs in the build directory, see #7774.
    424         self._assert_cwd(os.getcwd(), "somethingyoudonthave",
    425                          executable=sys.executable)
    426 
    427     def test_stdin_pipe(self):
    428         # stdin redirection
    429         p = subprocess.Popen([sys.executable, "-c",
    430                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    431                         stdin=subprocess.PIPE)
    432         p.stdin.write(b"pear")
    433         p.stdin.close()
    434         p.wait()
    435         self.assertEqual(p.returncode, 1)
    436 
    437     def test_stdin_filedes(self):
    438         # stdin is set to open file descriptor
    439         tf = tempfile.TemporaryFile()
    440         self.addCleanup(tf.close)
    441         d = tf.fileno()
    442         os.write(d, b"pear")
    443         os.lseek(d, 0, 0)
    444         p = subprocess.Popen([sys.executable, "-c",
    445                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    446                          stdin=d)
    447         p.wait()
    448         self.assertEqual(p.returncode, 1)
    449 
    450     def test_stdin_fileobj(self):
    451         # stdin is set to open file object
    452         tf = tempfile.TemporaryFile()
    453         self.addCleanup(tf.close)
    454         tf.write(b"pear")
    455         tf.seek(0)
    456         p = subprocess.Popen([sys.executable, "-c",
    457                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
    458                          stdin=tf)
    459         p.wait()
    460         self.assertEqual(p.returncode, 1)
    461 
    462     def test_stdout_pipe(self):
    463         # stdout redirection
    464         p = subprocess.Popen([sys.executable, "-c",
    465                           'import sys; sys.stdout.write("orange")'],
    466                          stdout=subprocess.PIPE)
    467         with p:
    468             self.assertEqual(p.stdout.read(), b"orange")
    469 
    470     def test_stdout_filedes(self):
    471         # stdout is set to open file descriptor
    472         tf = tempfile.TemporaryFile()
    473         self.addCleanup(tf.close)
    474         d = tf.fileno()
    475         p = subprocess.Popen([sys.executable, "-c",
    476                           'import sys; sys.stdout.write("orange")'],
    477                          stdout=d)
    478         p.wait()
    479         os.lseek(d, 0, 0)
    480         self.assertEqual(os.read(d, 1024), b"orange")
    481 
    482     def test_stdout_fileobj(self):
    483         # stdout is set to open file object
    484         tf = tempfile.TemporaryFile()
    485         self.addCleanup(tf.close)
    486         p = subprocess.Popen([sys.executable, "-c",
    487                           'import sys; sys.stdout.write("orange")'],
    488                          stdout=tf)
    489         p.wait()
    490         tf.seek(0)
    491         self.assertEqual(tf.read(), b"orange")
    492 
    493     def test_stderr_pipe(self):
    494         # stderr redirection
    495         p = subprocess.Popen([sys.executable, "-c",
    496                           'import sys; sys.stderr.write("strawberry")'],
    497                          stderr=subprocess.PIPE)
    498         with p:
    499             self.assertStderrEqual(p.stderr.read(), b"strawberry")
    500 
    501     def test_stderr_filedes(self):
    502         # stderr is set to open file descriptor
    503         tf = tempfile.TemporaryFile()
    504         self.addCleanup(tf.close)
    505         d = tf.fileno()
    506         p = subprocess.Popen([sys.executable, "-c",
    507                           'import sys; sys.stderr.write("strawberry")'],
    508                          stderr=d)
    509         p.wait()
    510         os.lseek(d, 0, 0)
    511         self.assertStderrEqual(os.read(d, 1024), b"strawberry")
    512 
    513     def test_stderr_fileobj(self):
    514         # stderr is set to open file object
    515         tf = tempfile.TemporaryFile()
    516         self.addCleanup(tf.close)
    517         p = subprocess.Popen([sys.executable, "-c",
    518                           'import sys; sys.stderr.write("strawberry")'],
    519                          stderr=tf)
    520         p.wait()
    521         tf.seek(0)
    522         self.assertStderrEqual(tf.read(), b"strawberry")
    523 
    524     def test_stderr_redirect_with_no_stdout_redirect(self):
    525         # test stderr=STDOUT while stdout=None (not set)
    526 
    527         # - grandchild prints to stderr
    528         # - child redirects grandchild's stderr to its stdout
    529         # - the parent should get grandchild's stderr in child's stdout
    530         p = subprocess.Popen([sys.executable, "-c",
    531                               'import sys, subprocess;'
    532                               'rc = subprocess.call([sys.executable, "-c",'
    533                               '    "import sys;"'
    534                               '    "sys.stderr.write(\'42\')"],'
    535                               '    stderr=subprocess.STDOUT);'
    536                               'sys.exit(rc)'],
    537                              stdout=subprocess.PIPE,
    538                              stderr=subprocess.PIPE)
    539         stdout, stderr = p.communicate()
    540         #NOTE: stdout should get stderr from grandchild
    541         self.assertStderrEqual(stdout, b'42')
    542         self.assertStderrEqual(stderr, b'') # should be empty
    543         self.assertEqual(p.returncode, 0)
    544 
    545     def test_stdout_stderr_pipe(self):
    546         # capture stdout and stderr to the same pipe
    547         p = subprocess.Popen([sys.executable, "-c",
    548                               'import sys;'
    549                               'sys.stdout.write("apple");'
    550                               'sys.stdout.flush();'
    551                               'sys.stderr.write("orange")'],
    552                              stdout=subprocess.PIPE,
    553                              stderr=subprocess.STDOUT)
    554         with p:
    555             self.assertStderrEqual(p.stdout.read(), b"appleorange")
    556 
    557     def test_stdout_stderr_file(self):
    558         # capture stdout and stderr to the same open file
    559         tf = tempfile.TemporaryFile()
    560         self.addCleanup(tf.close)
    561         p = subprocess.Popen([sys.executable, "-c",
    562                               'import sys;'
    563                               'sys.stdout.write("apple");'
    564                               'sys.stdout.flush();'
    565                               'sys.stderr.write("orange")'],
    566                              stdout=tf,
    567                              stderr=tf)
    568         p.wait()
    569         tf.seek(0)
    570         self.assertStderrEqual(tf.read(), b"appleorange")
    571 
    572     def test_stdout_filedes_of_stdout(self):
    573         # stdout is set to 1 (#1531862).
    574         # To avoid printing the text on stdout, we do something similar to
    575         # test_stdout_none (see above).  The parent subprocess calls the child
    576         # subprocess passing stdout=1, and this test uses stdout=PIPE in
    577         # order to capture and check the output of the parent. See #11963.
    578         code = ('import sys, subprocess; '
    579                 'rc = subprocess.call([sys.executable, "-c", '
    580                 '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
    581                      'b\'test with stdout=1\'))"], stdout=1); '
    582                 'assert rc == 18')
    583         p = subprocess.Popen([sys.executable, "-c", code],
    584                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    585         self.addCleanup(p.stdout.close)
    586         self.addCleanup(p.stderr.close)
    587         out, err = p.communicate()
    588         self.assertEqual(p.returncode, 0, err)
    589         self.assertEqual(out.rstrip(), b'test with stdout=1')
    590 
    591     def test_stdout_devnull(self):
    592         p = subprocess.Popen([sys.executable, "-c",
    593                               'for i in range(10240):'
    594                               'print("x" * 1024)'],
    595                               stdout=subprocess.DEVNULL)
    596         p.wait()
    597         self.assertEqual(p.stdout, None)
    598 
    599     def test_stderr_devnull(self):
    600         p = subprocess.Popen([sys.executable, "-c",
    601                               'import sys\n'
    602                               'for i in range(10240):'
    603                               'sys.stderr.write("x" * 1024)'],
    604                               stderr=subprocess.DEVNULL)
    605         p.wait()
    606         self.assertEqual(p.stderr, None)
    607 
    608     def test_stdin_devnull(self):
    609         p = subprocess.Popen([sys.executable, "-c",
    610                               'import sys;'
    611                               'sys.stdin.read(1)'],
    612                               stdin=subprocess.DEVNULL)
    613         p.wait()
    614         self.assertEqual(p.stdin, None)
    615 
    616     def test_env(self):
    617         newenv = os.environ.copy()
    618         newenv["FRUIT"] = "orange"
    619         with subprocess.Popen([sys.executable, "-c",
    620                                'import sys,os;'
    621                                'sys.stdout.write(os.getenv("FRUIT"))'],
    622                               stdout=subprocess.PIPE,
    623                               env=newenv) as p:
    624             stdout, stderr = p.communicate()
    625             self.assertEqual(stdout, b"orange")
    626 
    627     # Windows requires at least the SYSTEMROOT environment variable to start
    628     # Python
    629     @unittest.skipIf(sys.platform == 'win32',
    630                      'cannot test an empty env on Windows')
    631     @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') is not None,
    632                      'the python library cannot be loaded '
    633                      'with an empty environment')
    634     def test_empty_env(self):
    635         with subprocess.Popen([sys.executable, "-c",
    636                                'import os; '
    637                                'print(list(os.environ.keys()))'],
    638                               stdout=subprocess.PIPE,
    639                               env={}) as p:
    640             stdout, stderr = p.communicate()
    641             self.assertIn(stdout.strip(),
    642                 (b"[]",
    643                  # Mac OS X adds __CF_USER_TEXT_ENCODING variable to an empty
    644                  # environment
    645                  b"['__CF_USER_TEXT_ENCODING']"))
    646 
    647     def test_communicate_stdin(self):
    648         p = subprocess.Popen([sys.executable, "-c",
    649                               'import sys;'
    650                               'sys.exit(sys.stdin.read() == "pear")'],
    651                              stdin=subprocess.PIPE)
    652         p.communicate(b"pear")
    653         self.assertEqual(p.returncode, 1)
    654 
    655     def test_communicate_stdout(self):
    656         p = subprocess.Popen([sys.executable, "-c",
    657                               'import sys; sys.stdout.write("pineapple")'],
    658                              stdout=subprocess.PIPE)
    659         (stdout, stderr) = p.communicate()
    660         self.assertEqual(stdout, b"pineapple")
    661         self.assertEqual(stderr, None)
    662 
    663     def test_communicate_stderr(self):
    664         p = subprocess.Popen([sys.executable, "-c",
    665                               'import sys; sys.stderr.write("pineapple")'],
    666                              stderr=subprocess.PIPE)
    667         (stdout, stderr) = p.communicate()
    668         self.assertEqual(stdout, None)
    669         self.assertStderrEqual(stderr, b"pineapple")
    670 
    671     def test_communicate(self):
    672         p = subprocess.Popen([sys.executable, "-c",
    673                               'import sys,os;'
    674                               'sys.stderr.write("pineapple");'
    675                               'sys.stdout.write(sys.stdin.read())'],
    676                              stdin=subprocess.PIPE,
    677                              stdout=subprocess.PIPE,
    678                              stderr=subprocess.PIPE)
    679         self.addCleanup(p.stdout.close)
    680         self.addCleanup(p.stderr.close)
    681         self.addCleanup(p.stdin.close)
    682         (stdout, stderr) = p.communicate(b"banana")
    683         self.assertEqual(stdout, b"banana")
    684         self.assertStderrEqual(stderr, b"pineapple")
    685 
    686     def test_communicate_timeout(self):
    687         p = subprocess.Popen([sys.executable, "-c",
    688                               'import sys,os,time;'
    689                               'sys.stderr.write("pineapple\\n");'
    690                               'time.sleep(1);'
    691                               'sys.stderr.write("pear\\n");'
    692                               'sys.stdout.write(sys.stdin.read())'],
    693                              universal_newlines=True,
    694                              stdin=subprocess.PIPE,
    695                              stdout=subprocess.PIPE,
    696                              stderr=subprocess.PIPE)
    697         self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
    698                           timeout=0.3)
    699         # Make sure we can keep waiting for it, and that we get the whole output
    700         # after it completes.
    701         (stdout, stderr) = p.communicate()
    702         self.assertEqual(stdout, "banana")
    703         self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
    704 
    705     def test_communicate_timeout_large_output(self):
    706         # Test an expiring timeout while the child is outputting lots of data.
    707         p = subprocess.Popen([sys.executable, "-c",
    708                               'import sys,os,time;'
    709                               'sys.stdout.write("a" * (64 * 1024));'
    710                               'time.sleep(0.2);'
    711                               'sys.stdout.write("a" * (64 * 1024));'
    712                               'time.sleep(0.2);'
    713                               'sys.stdout.write("a" * (64 * 1024));'
    714                               'time.sleep(0.2);'
    715                               'sys.stdout.write("a" * (64 * 1024));'],
    716                              stdout=subprocess.PIPE)
    717         self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
    718         (stdout, _) = p.communicate()
    719         self.assertEqual(len(stdout), 4 * 64 * 1024)
    720 
    721     # Test for the fd leak reported in http://bugs.python.org/issue2791.
    722     def test_communicate_pipe_fd_leak(self):
    723         for stdin_pipe in (False, True):
    724             for stdout_pipe in (False, True):
    725                 for stderr_pipe in (False, True):
    726                     options = {}
    727                     if stdin_pipe:
    728                         options['stdin'] = subprocess.PIPE
    729                     if stdout_pipe:
    730                         options['stdout'] = subprocess.PIPE
    731                     if stderr_pipe:
    732                         options['stderr'] = subprocess.PIPE
    733                     if not options:
    734                         continue
    735                     p = subprocess.Popen((sys.executable, "-c", "pass"), **options)
    736                     p.communicate()
    737                     if p.stdin is not None:
    738                         self.assertTrue(p.stdin.closed)
    739                     if p.stdout is not None:
    740                         self.assertTrue(p.stdout.closed)
    741                     if p.stderr is not None:
    742                         self.assertTrue(p.stderr.closed)
    743 
    744     def test_communicate_returns(self):
    745         # communicate() should return None if no redirection is active
    746         p = subprocess.Popen([sys.executable, "-c",
    747                               "import sys; sys.exit(47)"])
    748         (stdout, stderr) = p.communicate()
    749         self.assertEqual(stdout, None)
    750         self.assertEqual(stderr, None)
    751 
    752     def test_communicate_pipe_buf(self):
    753         # communicate() with writes larger than pipe_buf
    754         # This test will probably deadlock rather than fail, if
    755         # communicate() does not work properly.
    756         x, y = os.pipe()
    757         os.close(x)
    758         os.close(y)
    759         p = subprocess.Popen([sys.executable, "-c",
    760                               'import sys,os;'
    761                               'sys.stdout.write(sys.stdin.read(47));'
    762                               'sys.stderr.write("x" * %d);'
    763                               'sys.stdout.write(sys.stdin.read())' %
    764                               support.PIPE_MAX_SIZE],
    765                              stdin=subprocess.PIPE,
    766                              stdout=subprocess.PIPE,
    767                              stderr=subprocess.PIPE)
    768         self.addCleanup(p.stdout.close)
    769         self.addCleanup(p.stderr.close)
    770         self.addCleanup(p.stdin.close)
    771         string_to_write = b"a" * support.PIPE_MAX_SIZE
    772         (stdout, stderr) = p.communicate(string_to_write)
    773         self.assertEqual(stdout, string_to_write)
    774 
    775     def test_writes_before_communicate(self):
    776         # stdin.write before communicate()
    777         p = subprocess.Popen([sys.executable, "-c",
    778                               'import sys,os;'
    779                               'sys.stdout.write(sys.stdin.read())'],
    780                              stdin=subprocess.PIPE,
    781                              stdout=subprocess.PIPE,
    782                              stderr=subprocess.PIPE)
    783         self.addCleanup(p.stdout.close)
    784         self.addCleanup(p.stderr.close)
    785         self.addCleanup(p.stdin.close)
    786         p.stdin.write(b"banana")
    787         (stdout, stderr) = p.communicate(b"split")
    788         self.assertEqual(stdout, b"bananasplit")
    789         self.assertStderrEqual(stderr, b"")
    790 
    791     def test_universal_newlines(self):
    792         p = subprocess.Popen([sys.executable, "-c",
    793                               'import sys,os;' + SETBINARY +
    794                               'buf = sys.stdout.buffer;'
    795                               'buf.write(sys.stdin.readline().encode());'
    796                               'buf.flush();'
    797                               'buf.write(b"line2\\n");'
    798                               'buf.flush();'
    799                               'buf.write(sys.stdin.read().encode());'
    800                               'buf.flush();'
    801                               'buf.write(b"line4\\n");'
    802                               'buf.flush();'
    803                               'buf.write(b"line5\\r\\n");'
    804                               'buf.flush();'
    805                               'buf.write(b"line6\\r");'
    806                               'buf.flush();'
    807                               'buf.write(b"\\nline7");'
    808                               'buf.flush();'
    809                               'buf.write(b"\\nline8");'],
    810                              stdin=subprocess.PIPE,
    811                              stdout=subprocess.PIPE,
    812                              universal_newlines=1)
    813         with p:
    814             p.stdin.write("line1\n")
    815             p.stdin.flush()
    816             self.assertEqual(p.stdout.readline(), "line1\n")
    817             p.stdin.write("line3\n")
    818             p.stdin.close()
    819             self.addCleanup(p.stdout.close)
    820             self.assertEqual(p.stdout.readline(),
    821                              "line2\n")
    822             self.assertEqual(p.stdout.read(6),
    823                              "line3\n")
    824             self.assertEqual(p.stdout.read(),
    825                              "line4\nline5\nline6\nline7\nline8")
    826 
    827     def test_universal_newlines_communicate(self):
    828         # universal newlines through communicate()
    829         p = subprocess.Popen([sys.executable, "-c",
    830                               'import sys,os;' + SETBINARY +
    831                               'buf = sys.stdout.buffer;'
    832                               'buf.write(b"line2\\n");'
    833                               'buf.flush();'
    834                               'buf.write(b"line4\\n");'
    835                               'buf.flush();'
    836                               'buf.write(b"line5\\r\\n");'
    837                               'buf.flush();'
    838                               'buf.write(b"line6\\r");'
    839                               'buf.flush();'
    840                               'buf.write(b"\\nline7");'
    841                               'buf.flush();'
    842                               'buf.write(b"\\nline8");'],
    843                              stderr=subprocess.PIPE,
    844                              stdout=subprocess.PIPE,
    845                              universal_newlines=1)
    846         self.addCleanup(p.stdout.close)
    847         self.addCleanup(p.stderr.close)
    848         (stdout, stderr) = p.communicate()
    849         self.assertEqual(stdout,
    850                          "line2\nline4\nline5\nline6\nline7\nline8")
    851 
    852     def test_universal_newlines_communicate_stdin(self):
    853         # universal newlines through communicate(), with only stdin
    854         p = subprocess.Popen([sys.executable, "-c",
    855                               'import sys,os;' + SETBINARY + textwrap.dedent('''
    856                                s = sys.stdin.readline()
    857                                assert s == "line1\\n", repr(s)
    858                                s = sys.stdin.read()
    859                                assert s == "line3\\n", repr(s)
    860                               ''')],
    861                              stdin=subprocess.PIPE,
    862                              universal_newlines=1)
    863         (stdout, stderr) = p.communicate("line1\nline3\n")
    864         self.assertEqual(p.returncode, 0)
    865 
    866     def test_universal_newlines_communicate_input_none(self):
    867         # Test communicate(input=None) with universal newlines.
    868         #
    869         # We set stdout to PIPE because, as of this writing, a different
    870         # code path is tested when the number of pipes is zero or one.
    871         p = subprocess.Popen([sys.executable, "-c", "pass"],
    872                              stdin=subprocess.PIPE,
    873                              stdout=subprocess.PIPE,
    874                              universal_newlines=True)
    875         p.communicate()
    876         self.assertEqual(p.returncode, 0)
    877 
    878     def test_universal_newlines_communicate_stdin_stdout_stderr(self):
    879         # universal newlines through communicate(), with stdin, stdout, stderr
    880         p = subprocess.Popen([sys.executable, "-c",
    881                               'import sys,os;' + SETBINARY + textwrap.dedent('''
    882                                s = sys.stdin.buffer.readline()
    883                                sys.stdout.buffer.write(s)
    884                                sys.stdout.buffer.write(b"line2\\r")
    885                                sys.stderr.buffer.write(b"eline2\\n")
    886                                s = sys.stdin.buffer.read()
    887                                sys.stdout.buffer.write(s)
    888                                sys.stdout.buffer.write(b"line4\\n")
    889                                sys.stdout.buffer.write(b"line5\\r\\n")
    890                                sys.stderr.buffer.write(b"eline6\\r")
    891                                sys.stderr.buffer.write(b"eline7\\r\\nz")
    892                               ''')],
    893                              stdin=subprocess.PIPE,
    894                              stderr=subprocess.PIPE,
    895                              stdout=subprocess.PIPE,
    896                              universal_newlines=True)
    897         self.addCleanup(p.stdout.close)
    898         self.addCleanup(p.stderr.close)
    899         (stdout, stderr) = p.communicate("line1\nline3\n")
    900         self.assertEqual(p.returncode, 0)
    901         self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout)
    902         # Python debug build push something like "[42442 refs]\n"
    903         # to stderr at exit of subprocess.
    904         # Don't use assertStderrEqual because it strips CR and LF from output.
    905         self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
    906 
    907     def test_universal_newlines_communicate_encodings(self):
    908         # Check that universal newlines mode works for various encodings,
    909         # in particular for encodings in the UTF-16 and UTF-32 families.
    910         # See issue #15595.
    911         #
    912         # UTF-16 and UTF-32-BE are sufficient to check both with BOM and
    913         # without, and UTF-16 and UTF-32.
    914         for encoding in ['utf-16', 'utf-32-be']:
    915             code = ("import sys; "
    916                     r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
    917                     encoding)
    918             args = [sys.executable, '-c', code]
    919             # We set stdin to be non-None because, as of this writing,
    920             # a different code path is used when the number of pipes is
    921             # zero or one.
    922             popen = subprocess.Popen(args,
    923                                      stdin=subprocess.PIPE,
    924                                      stdout=subprocess.PIPE,
    925                                      encoding=encoding)
    926             stdout, stderr = popen.communicate(input='')
    927             self.assertEqual(stdout, '1\n2\n3\n4')
    928 
    929     def test_communicate_errors(self):
    930         for errors, expected in [
    931             ('ignore', ''),
    932             ('replace', '\ufffd\ufffd'),
    933             ('surrogateescape', '\udc80\udc80'),
    934             ('backslashreplace', '\\x80\\x80'),
    935         ]:
    936             code = ("import sys; "
    937                     r"sys.stdout.buffer.write(b'[\x80\x80]')")
    938             args = [sys.executable, '-c', code]
    939             # We set stdin to be non-None because, as of this writing,
    940             # a different code path is used when the number of pipes is
    941             # zero or one.
    942             popen = subprocess.Popen(args,
    943                                      stdin=subprocess.PIPE,
    944                                      stdout=subprocess.PIPE,
    945                                      encoding='utf-8',
    946                                      errors=errors)
    947             stdout, stderr = popen.communicate(input='')
    948             self.assertEqual(stdout, '[{}]'.format(expected))
    949 
    950     def test_no_leaking(self):
    951         # Make sure we leak no resources
    952         if not mswindows:
    953             max_handles = 1026 # too much for most UNIX systems
    954         else:
    955             max_handles = 2050 # too much for (at least some) Windows setups
    956         handles = []
    957         tmpdir = tempfile.mkdtemp()
    958         try:
    959             for i in range(max_handles):
    960                 try:
    961                     tmpfile = os.path.join(tmpdir, support.TESTFN)
    962                     handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT))
    963                 except OSError as e:
    964                     if e.errno != errno.EMFILE:
    965                         raise
    966                     break
    967             else:
    968                 self.skipTest("failed to reach the file descriptor limit "
    969                     "(tried %d)" % max_handles)
    970             # Close a couple of them (should be enough for a subprocess)
    971             for i in range(10):
    972                 os.close(handles.pop())
    973             # Loop creating some subprocesses. If one of them leaks some fds,
    974             # the next loop iteration will fail by reaching the max fd limit.
    975             for i in range(15):
    976                 p = subprocess.Popen([sys.executable, "-c",
    977                                       "import sys;"
    978                                       "sys.stdout.write(sys.stdin.read())"],
    979                                      stdin=subprocess.PIPE,
    980                                      stdout=subprocess.PIPE,
    981                                      stderr=subprocess.PIPE)
    982                 data = p.communicate(b"lime")[0]
    983                 self.assertEqual(data, b"lime")
    984         finally:
    985             for h in handles:
    986                 os.close(h)
    987             shutil.rmtree(tmpdir)
    988 
    989     def test_list2cmdline(self):
    990         self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
    991                          '"a b c" d e')
    992         self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
    993                          'ab\\"c \\ d')
    994         self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
    995                          'ab\\"c " \\\\" d')
    996         self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
    997                          'a\\\\\\b "de fg" h')
    998         self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
    999                          'a\\\\\\"b c d')
   1000         self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
   1001                          '"a\\\\b c" d e')
   1002         self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
   1003                          '"a\\\\b\\ c" d e')
   1004         self.assertEqual(subprocess.list2cmdline(['ab', '']),
   1005                          'ab ""')
   1006 
   1007     def test_poll(self):
   1008         p = subprocess.Popen([sys.executable, "-c",
   1009                               "import os; os.read(0, 1)"],
   1010                              stdin=subprocess.PIPE)
   1011         self.addCleanup(p.stdin.close)
   1012         self.assertIsNone(p.poll())
   1013         os.write(p.stdin.fileno(), b'A')
   1014         p.wait()
   1015         # Subsequent invocations should just return the returncode
   1016         self.assertEqual(p.poll(), 0)
   1017 
   1018     def test_wait(self):
   1019         p = subprocess.Popen([sys.executable, "-c", "pass"])
   1020         self.assertEqual(p.wait(), 0)
   1021         # Subsequent invocations should just return the returncode
   1022         self.assertEqual(p.wait(), 0)
   1023 
   1024     def test_wait_timeout(self):
   1025         p = subprocess.Popen([sys.executable,
   1026                               "-c", "import time; time.sleep(0.3)"])
   1027         with self.assertRaises(subprocess.TimeoutExpired) as c:
   1028             p.wait(timeout=0.0001)
   1029         self.assertIn("0.0001", str(c.exception))  # For coverage of __str__.
   1030         # Some heavily loaded buildbots (sparc Debian 3.x) require this much
   1031         # time to start.
   1032         self.assertEqual(p.wait(timeout=3), 0)
   1033 
   1034     def test_wait_endtime(self):
   1035         """Confirm that the deprecated endtime parameter warns."""
   1036         p = subprocess.Popen([sys.executable, "-c", "pass"])
   1037         try:
   1038             with self.assertWarns(DeprecationWarning) as warn_cm:
   1039                 p.wait(endtime=time.time()+0.01)
   1040         except subprocess.TimeoutExpired:
   1041             pass  # We're not testing endtime timeout behavior.
   1042         finally:
   1043             p.kill()
   1044         self.assertIn('test_subprocess.py', warn_cm.filename)
   1045         self.assertIn('endtime', str(warn_cm.warning))
   1046 
   1047     def test_invalid_bufsize(self):
   1048         # an invalid type of the bufsize argument should raise
   1049         # TypeError.
   1050         with self.assertRaises(TypeError):
   1051             subprocess.Popen([sys.executable, "-c", "pass"], "orange")
   1052 
   1053     def test_bufsize_is_none(self):
   1054         # bufsize=None should be the same as bufsize=0.
   1055         p = subprocess.Popen([sys.executable, "-c", "pass"], None)
   1056         self.assertEqual(p.wait(), 0)
   1057         # Again with keyword arg
   1058         p = subprocess.Popen([sys.executable, "-c", "pass"], bufsize=None)
   1059         self.assertEqual(p.wait(), 0)
   1060 
   1061     def _test_bufsize_equal_one(self, line, expected, universal_newlines):
   1062         # subprocess may deadlock with bufsize=1, see issue #21332
   1063         with subprocess.Popen([sys.executable, "-c", "import sys;"
   1064                                "sys.stdout.write(sys.stdin.readline());"
   1065                                "sys.stdout.flush()"],
   1066                               stdin=subprocess.PIPE,
   1067                               stdout=subprocess.PIPE,
   1068                               stderr=subprocess.DEVNULL,
   1069                               bufsize=1,
   1070                               universal_newlines=universal_newlines) as p:
   1071             p.stdin.write(line) # expect that it flushes the line in text mode
   1072             os.close(p.stdin.fileno()) # close it without flushing the buffer
   1073             read_line = p.stdout.readline()
   1074             try:
   1075                 p.stdin.close()
   1076             except OSError:
   1077                 pass
   1078             p.stdin = None
   1079         self.assertEqual(p.returncode, 0)
   1080         self.assertEqual(read_line, expected)
   1081 
   1082     def test_bufsize_equal_one_text_mode(self):
   1083         # line is flushed in text mode with bufsize=1.
   1084         # we should get the full line in return
   1085         line = "line\n"
   1086         self._test_bufsize_equal_one(line, line, universal_newlines=True)
   1087 
   1088     def test_bufsize_equal_one_binary_mode(self):
   1089         # line is not flushed in binary mode with bufsize=1.
   1090         # we should get empty response
   1091         line = b'line' + os.linesep.encode() # assume ascii-based locale
   1092         self._test_bufsize_equal_one(line, b'', universal_newlines=False)
   1093 
   1094     def test_leaking_fds_on_error(self):
   1095         # see bug #5179: Popen leaks file descriptors to PIPEs if
   1096         # the child fails to execute; this will eventually exhaust
   1097         # the maximum number of open fds. 1024 seems a very common
   1098         # value for that limit, but Windows has 2048, so we loop
   1099         # 1024 times (each call leaked two fds).
   1100         for i in range(1024):
   1101             with self.assertRaises(OSError) as c:
   1102                 subprocess.Popen(['nonexisting_i_hope'],
   1103                                  stdout=subprocess.PIPE,
   1104                                  stderr=subprocess.PIPE)
   1105             # ignore errors that indicate the command was not found
   1106             if c.exception.errno not in (errno.ENOENT, errno.EACCES):
   1107                 raise c.exception
   1108 
   1109     @unittest.skipIf(threading is None, "threading required")
   1110     def test_double_close_on_error(self):
   1111         # Issue #18851
   1112         fds = []
   1113         def open_fds():
   1114             for i in range(20):
   1115                 fds.extend(os.pipe())
   1116                 time.sleep(0.001)
   1117         t = threading.Thread(target=open_fds)
   1118         t.start()
   1119         try:
   1120             with self.assertRaises(EnvironmentError):
   1121                 subprocess.Popen(['nonexisting_i_hope'],
   1122                                  stdin=subprocess.PIPE,
   1123                                  stdout=subprocess.PIPE,
   1124                                  stderr=subprocess.PIPE)
   1125         finally:
   1126             t.join()
   1127             exc = None
   1128             for fd in fds:
   1129                 # If a double close occurred, some of those fds will
   1130                 # already have been closed by mistake, and os.close()
   1131                 # here will raise.
   1132                 try:
   1133                     os.close(fd)
   1134                 except OSError as e:
   1135                     exc = e
   1136             if exc is not None:
   1137                 raise exc
   1138 
   1139     @unittest.skipIf(threading is None, "threading required")
   1140     def test_threadsafe_wait(self):
   1141         """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
   1142         proc = subprocess.Popen([sys.executable, '-c',
   1143                                  'import time; time.sleep(12)'])
   1144         self.assertEqual(proc.returncode, None)
   1145         results = []
   1146 
   1147         def kill_proc_timer_thread():
   1148             results.append(('thread-start-poll-result', proc.poll()))
   1149             # terminate it from the thread and wait for the result.
   1150             proc.kill()
   1151             proc.wait()
   1152             results.append(('thread-after-kill-and-wait', proc.returncode))
   1153             # this wait should be a no-op given the above.
   1154             proc.wait()
   1155             results.append(('thread-after-second-wait', proc.returncode))
   1156 
   1157         # This is a timing sensitive test, the failure mode is
   1158         # triggered when both the main thread and this thread are in
   1159         # the wait() call at once.  The delay here is to allow the
   1160         # main thread to most likely be blocked in its wait() call.
   1161         t = threading.Timer(0.2, kill_proc_timer_thread)
   1162         t.start()
   1163 
   1164         if mswindows:
   1165             expected_errorcode = 1
   1166         else:
   1167             # Should be -9 because of the proc.kill() from the thread.
   1168             expected_errorcode = -9
   1169 
   1170         # Wait for the process to finish; the thread should kill it
   1171         # long before it finishes on its own.  Supplying a timeout
   1172         # triggers a different code path for better coverage.
   1173         proc.wait(timeout=20)
   1174         self.assertEqual(proc.returncode, expected_errorcode,
   1175                          msg="unexpected result in wait from main thread")
   1176 
   1177         # This should be a no-op with no change in returncode.
   1178         proc.wait()
   1179         self.assertEqual(proc.returncode, expected_errorcode,
   1180                          msg="unexpected result in second main wait.")
   1181 
   1182         t.join()
   1183         # Ensure that all of the thread results are as expected.
   1184         # When a race condition occurs in wait(), the returncode could
   1185         # be set by the wrong thread that doesn't actually have it
   1186         # leading to an incorrect value.
   1187         self.assertEqual([('thread-start-poll-result', None),
   1188                           ('thread-after-kill-and-wait', expected_errorcode),
   1189                           ('thread-after-second-wait', expected_errorcode)],
   1190                          results)
   1191 
   1192     def test_issue8780(self):
   1193         # Ensure that stdout is inherited from the parent
   1194         # if stdout=PIPE is not used
   1195         code = ';'.join((
   1196             'import subprocess, sys',
   1197             'retcode = subprocess.call('
   1198                 "[sys.executable, '-c', 'print(\"Hello World!\")'])",
   1199             'assert retcode == 0'))
   1200         output = subprocess.check_output([sys.executable, '-c', code])
   1201         self.assertTrue(output.startswith(b'Hello World!'), ascii(output))
   1202 
   1203     def test_handles_closed_on_exception(self):
   1204         # If CreateProcess exits with an error, ensure the
   1205         # duplicate output handles are released
   1206         ifhandle, ifname = tempfile.mkstemp()
   1207         ofhandle, ofname = tempfile.mkstemp()
   1208         efhandle, efname = tempfile.mkstemp()
   1209         try:
   1210             subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
   1211               stderr=efhandle)
   1212         except OSError:
   1213             os.close(ifhandle)
   1214             os.remove(ifname)
   1215             os.close(ofhandle)
   1216             os.remove(ofname)
   1217             os.close(efhandle)
   1218             os.remove(efname)
   1219         self.assertFalse(os.path.exists(ifname))
   1220         self.assertFalse(os.path.exists(ofname))
   1221         self.assertFalse(os.path.exists(efname))
   1222 
   1223     def test_communicate_epipe(self):
   1224         # Issue 10963: communicate() should hide EPIPE
   1225         p = subprocess.Popen([sys.executable, "-c", 'pass'],
   1226                              stdin=subprocess.PIPE,
   1227                              stdout=subprocess.PIPE,
   1228                              stderr=subprocess.PIPE)
   1229         self.addCleanup(p.stdout.close)
   1230         self.addCleanup(p.stderr.close)
   1231         self.addCleanup(p.stdin.close)
   1232         p.communicate(b"x" * 2**20)
   1233 
   1234     def test_communicate_epipe_only_stdin(self):
   1235         # Issue 10963: communicate() should hide EPIPE
   1236         p = subprocess.Popen([sys.executable, "-c", 'pass'],
   1237                              stdin=subprocess.PIPE)
   1238         self.addCleanup(p.stdin.close)
   1239         p.wait()
   1240         p.communicate(b"x" * 2**20)
   1241 
   1242     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
   1243                          "Requires signal.SIGUSR1")
   1244     @unittest.skipUnless(hasattr(os, 'kill'),
   1245                          "Requires os.kill")
   1246     @unittest.skipUnless(hasattr(os, 'getppid'),
   1247                          "Requires os.getppid")
   1248     def test_communicate_eintr(self):
   1249         # Issue #12493: communicate() should handle EINTR
   1250         def handler(signum, frame):
   1251             pass
   1252         old_handler = signal.signal(signal.SIGUSR1, handler)
   1253         self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
   1254 
   1255         args = [sys.executable, "-c",
   1256                 'import os, signal;'
   1257                 'os.kill(os.getppid(), signal.SIGUSR1)']
   1258         for stream in ('stdout', 'stderr'):
   1259             kw = {stream: subprocess.PIPE}
   1260             with subprocess.Popen(args, **kw) as process:
   1261                 # communicate() will be interrupted by SIGUSR1
   1262                 process.communicate()
   1263 
   1264 
   1265     # This test is Linux-ish specific for simplicity to at least have
   1266     # some coverage.  It is not a platform specific bug.
   1267     @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
   1268                          "Linux specific")
   1269     def test_failed_child_execute_fd_leak(self):
   1270         """Test for the fork() failure fd leak reported in issue16327."""
   1271         fd_directory = '/proc/%d/fd' % os.getpid()
   1272         fds_before_popen = os.listdir(fd_directory)
   1273         with self.assertRaises(PopenTestException):
   1274             PopenExecuteChildRaises(
   1275                     [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
   1276                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
   1277 
   1278         # NOTE: This test doesn't verify that the real _execute_child
   1279         # does not close the file descriptors itself on the way out
   1280         # during an exception.  Code inspection has confirmed that.
   1281 
   1282         fds_after_exception = os.listdir(fd_directory)
   1283         self.assertEqual(fds_before_popen, fds_after_exception)
   1284 
   1285 
   1286 class RunFuncTestCase(BaseTestCase):
   1287     def run_python(self, code, **kwargs):
   1288         """Run Python code in a subprocess using subprocess.run"""
   1289         argv = [sys.executable, "-c", code]
   1290         return subprocess.run(argv, **kwargs)
   1291 
   1292     def test_returncode(self):
   1293         # call() function with sequence argument
   1294         cp = self.run_python("import sys; sys.exit(47)")
   1295         self.assertEqual(cp.returncode, 47)
   1296         with self.assertRaises(subprocess.CalledProcessError):
   1297             cp.check_returncode()
   1298 
   1299     def test_check(self):
   1300         with self.assertRaises(subprocess.CalledProcessError) as c:
   1301             self.run_python("import sys; sys.exit(47)", check=True)
   1302         self.assertEqual(c.exception.returncode, 47)
   1303 
   1304     def test_check_zero(self):
   1305         # check_returncode shouldn't raise when returncode is zero
   1306         cp = self.run_python("import sys; sys.exit(0)", check=True)
   1307         self.assertEqual(cp.returncode, 0)
   1308 
   1309     def test_timeout(self):
   1310         # run() function with timeout argument; we want to test that the child
   1311         # process gets killed when the timeout expires.  If the child isn't
   1312         # killed, this call will deadlock since subprocess.run waits for the
   1313         # child.
   1314         with self.assertRaises(subprocess.TimeoutExpired):
   1315             self.run_python("while True: pass", timeout=0.0001)
   1316 
   1317     def test_capture_stdout(self):
   1318         # capture stdout with zero return code
   1319         cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
   1320         self.assertIn(b'BDFL', cp.stdout)
   1321 
   1322     def test_capture_stderr(self):
   1323         cp = self.run_python("import sys; sys.stderr.write('BDFL')",
   1324                              stderr=subprocess.PIPE)
   1325         self.assertIn(b'BDFL', cp.stderr)
   1326 
   1327     def test_check_output_stdin_arg(self):
   1328         # run() can be called with stdin set to a file
   1329         tf = tempfile.TemporaryFile()
   1330         self.addCleanup(tf.close)
   1331         tf.write(b'pear')
   1332         tf.seek(0)
   1333         cp = self.run_python(
   1334                  "import sys; sys.stdout.write(sys.stdin.read().upper())",
   1335                 stdin=tf, stdout=subprocess.PIPE)
   1336         self.assertIn(b'PEAR', cp.stdout)
   1337 
   1338     def test_check_output_input_arg(self):
   1339         # check_output() can be called with input set to a string
   1340         cp = self.run_python(
   1341                 "import sys; sys.stdout.write(sys.stdin.read().upper())",
   1342                 input=b'pear', stdout=subprocess.PIPE)
   1343         self.assertIn(b'PEAR', cp.stdout)
   1344 
   1345     def test_check_output_stdin_with_input_arg(self):
   1346         # run() refuses to accept 'stdin' with 'input'
   1347         tf = tempfile.TemporaryFile()
   1348         self.addCleanup(tf.close)
   1349         tf.write(b'pear')
   1350         tf.seek(0)
   1351         with self.assertRaises(ValueError,
   1352               msg="Expected ValueError when stdin and input args supplied.") as c:
   1353             output = self.run_python("print('will not be run')",
   1354                                      stdin=tf, input=b'hare')
   1355         self.assertIn('stdin', c.exception.args[0])
   1356         self.assertIn('input', c.exception.args[0])
   1357 
   1358     def test_check_output_timeout(self):
   1359         with self.assertRaises(subprocess.TimeoutExpired) as c:
   1360             cp = self.run_python((
   1361                      "import sys, time\n"
   1362                      "sys.stdout.write('BDFL')\n"
   1363                      "sys.stdout.flush()\n"
   1364                      "time.sleep(3600)"),
   1365                     # Some heavily loaded buildbots (sparc Debian 3.x) require
   1366                     # this much time to start and print.
   1367                     timeout=3, stdout=subprocess.PIPE)
   1368         self.assertEqual(c.exception.output, b'BDFL')
   1369         # output is aliased to stdout
   1370         self.assertEqual(c.exception.stdout, b'BDFL')
   1371 
   1372     def test_run_kwargs(self):
   1373         newenv = os.environ.copy()
   1374         newenv["FRUIT"] = "banana"
   1375         cp = self.run_python(('import sys, os;'
   1376                       'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'),
   1377                              env=newenv)
   1378         self.assertEqual(cp.returncode, 33)
   1379 
   1380 
   1381 @unittest.skipIf(mswindows, "POSIX specific tests")
   1382 class POSIXProcessTestCase(BaseTestCase):
   1383 
   1384     def setUp(self):
   1385         super().setUp()
   1386         self._nonexistent_dir = "/_this/pa.th/does/not/exist"
   1387 
   1388     def _get_chdir_exception(self):
   1389         try:
   1390             os.chdir(self._nonexistent_dir)
   1391         except OSError as e:
   1392             # This avoids hard coding the errno value or the OS perror()
   1393             # string and instead capture the exception that we want to see
   1394             # below for comparison.
   1395             desired_exception = e
   1396             desired_exception.strerror += ': ' + repr(self._nonexistent_dir)
   1397         else:
   1398             self.fail("chdir to nonexistent directory %s succeeded." %
   1399                       self._nonexistent_dir)
   1400         return desired_exception
   1401 
   1402     def test_exception_cwd(self):
   1403         """Test error in the child raised in the parent for a bad cwd."""
   1404         desired_exception = self._get_chdir_exception()
   1405         try:
   1406             p = subprocess.Popen([sys.executable, "-c", ""],
   1407                                  cwd=self._nonexistent_dir)
   1408         except OSError as e:
   1409             # Test that the child process chdir failure actually makes
   1410             # it up to the parent process as the correct exception.
   1411             self.assertEqual(desired_exception.errno, e.errno)
   1412             self.assertEqual(desired_exception.strerror, e.strerror)
   1413         else:
   1414             self.fail("Expected OSError: %s" % desired_exception)
   1415 
   1416     def test_exception_bad_executable(self):
   1417         """Test error in the child raised in the parent for a bad executable."""
   1418         desired_exception = self._get_chdir_exception()
   1419         try:
   1420             p = subprocess.Popen([sys.executable, "-c", ""],
   1421                                  executable=self._nonexistent_dir)
   1422         except OSError as e:
   1423             # Test that the child process exec failure actually makes
   1424             # it up to the parent process as the correct exception.
   1425             self.assertEqual(desired_exception.errno, e.errno)
   1426             self.assertEqual(desired_exception.strerror, e.strerror)
   1427         else:
   1428             self.fail("Expected OSError: %s" % desired_exception)
   1429 
   1430     def test_exception_bad_args_0(self):
   1431         """Test error in the child raised in the parent for a bad args[0]."""
   1432         desired_exception = self._get_chdir_exception()
   1433         try:
   1434             p = subprocess.Popen([self._nonexistent_dir, "-c", ""])
   1435         except OSError as e:
   1436             # Test that the child process exec failure actually makes
   1437             # it up to the parent process as the correct exception.
   1438             self.assertEqual(desired_exception.errno, e.errno)
   1439             self.assertEqual(desired_exception.strerror, e.strerror)
   1440         else:
   1441             self.fail("Expected OSError: %s" % desired_exception)
   1442 
   1443     def test_restore_signals(self):
   1444         # Code coverage for both values of restore_signals to make sure it
   1445         # at least does not blow up.
   1446         # A test for behavior would be complex.  Contributions welcome.
   1447         subprocess.call([sys.executable, "-c", ""], restore_signals=True)
   1448         subprocess.call([sys.executable, "-c", ""], restore_signals=False)
   1449 
   1450     def test_start_new_session(self):
   1451         # For code coverage of calling setsid().  We don't care if we get an
   1452         # EPERM error from it depending on the test execution environment, that
   1453         # still indicates that it was called.
   1454         try:
   1455             output = subprocess.check_output(
   1456                     [sys.executable, "-c",
   1457                      "import os; print(os.getpgid(os.getpid()))"],
   1458                     start_new_session=True)
   1459         except OSError as e:
   1460             if e.errno != errno.EPERM:
   1461                 raise
   1462         else:
   1463             parent_pgid = os.getpgid(os.getpid())
   1464             child_pgid = int(output)
   1465             self.assertNotEqual(parent_pgid, child_pgid)
   1466 
   1467     def test_run_abort(self):
   1468         # returncode handles signal termination
   1469         with support.SuppressCrashReport():
   1470             p = subprocess.Popen([sys.executable, "-c",
   1471                                   'import os; os.abort()'])
   1472             p.wait()
   1473         self.assertEqual(-p.returncode, signal.SIGABRT)
   1474 
   1475     def test_CalledProcessError_str_signal(self):
   1476         err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
   1477         error_string = str(err)
   1478         # We're relying on the repr() of the signal.Signals intenum to provide
   1479         # the word signal, the signal name and the numeric value.
   1480         self.assertIn("signal", error_string.lower())
   1481         # We're not being specific about the signal name as some signals have
   1482         # multiple names and which name is revealed can vary.
   1483         self.assertIn("SIG", error_string)
   1484         self.assertIn(str(signal.SIGABRT), error_string)
   1485 
   1486     def test_CalledProcessError_str_unknown_signal(self):
   1487         err = subprocess.CalledProcessError(-9876543, "fake cmd")
   1488         error_string = str(err)
   1489         self.assertIn("unknown signal 9876543.", error_string)
   1490 
   1491     def test_CalledProcessError_str_non_zero(self):
   1492         err = subprocess.CalledProcessError(2, "fake cmd")
   1493         error_string = str(err)
   1494         self.assertIn("non-zero exit status 2.", error_string)
   1495 
   1496     def test_preexec(self):
   1497         # DISCLAIMER: Setting environment variables is *not* a good use
   1498         # of a preexec_fn.  This is merely a test.
   1499         p = subprocess.Popen([sys.executable, "-c",
   1500                               'import sys,os;'
   1501                               'sys.stdout.write(os.getenv("FRUIT"))'],
   1502                              stdout=subprocess.PIPE,
   1503                              preexec_fn=lambda: os.putenv("FRUIT", "apple"))
   1504         with p:
   1505             self.assertEqual(p.stdout.read(), b"apple")
   1506 
   1507     def test_preexec_exception(self):
   1508         def raise_it():
   1509             raise ValueError("What if two swallows carried a coconut?")
   1510         try:
   1511             p = subprocess.Popen([sys.executable, "-c", ""],
   1512                                  preexec_fn=raise_it)
   1513         except subprocess.SubprocessError as e:
   1514             self.assertTrue(
   1515                     subprocess._posixsubprocess,
   1516                     "Expected a ValueError from the preexec_fn")
   1517         except ValueError as e:
   1518             self.assertIn("coconut", e.args[0])
   1519         else:
   1520             self.fail("Exception raised by preexec_fn did not make it "
   1521                       "to the parent process.")
   1522 
   1523     class _TestExecuteChildPopen(subprocess.Popen):
   1524         """Used to test behavior at the end of _execute_child."""
   1525         def __init__(self, testcase, *args, **kwargs):
   1526             self._testcase = testcase
   1527             subprocess.Popen.__init__(self, *args, **kwargs)
   1528 
   1529         def _execute_child(self, *args, **kwargs):
   1530             try:
   1531                 subprocess.Popen._execute_child(self, *args, **kwargs)
   1532             finally:
   1533                 # Open a bunch of file descriptors and verify that
   1534                 # none of them are the same as the ones the Popen
   1535                 # instance is using for stdin/stdout/stderr.
   1536                 devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
   1537                                for _ in range(8)]
   1538                 try:
   1539                     for fd in devzero_fds:
   1540                         self._testcase.assertNotIn(
   1541                                 fd, (self.stdin.fileno(), self.stdout.fileno(),
   1542                                      self.stderr.fileno()),
   1543                                 msg="At least one fd was closed early.")
   1544                 finally:
   1545                     for fd in devzero_fds:
   1546                         os.close(fd)
   1547 
   1548     @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
   1549     def test_preexec_errpipe_does_not_double_close_pipes(self):
   1550         """Issue16140: Don't double close pipes on preexec error."""
   1551 
   1552         def raise_it():
   1553             raise subprocess.SubprocessError(
   1554                     "force the _execute_child() errpipe_data path.")
   1555 
   1556         with self.assertRaises(subprocess.SubprocessError):
   1557             self._TestExecuteChildPopen(
   1558                         self, [sys.executable, "-c", "pass"],
   1559                         stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   1560                         stderr=subprocess.PIPE, preexec_fn=raise_it)
   1561 
   1562     def test_preexec_gc_module_failure(self):
   1563         # This tests the code that disables garbage collection if the child
   1564         # process will execute any Python.
   1565         def raise_runtime_error():
   1566             raise RuntimeError("this shouldn't escape")
   1567         enabled = gc.isenabled()
   1568         orig_gc_disable = gc.disable
   1569         orig_gc_isenabled = gc.isenabled
   1570         try:
   1571             gc.disable()
   1572             self.assertFalse(gc.isenabled())
   1573             subprocess.call([sys.executable, '-c', ''],
   1574                             preexec_fn=lambda: None)
   1575             self.assertFalse(gc.isenabled(),
   1576                              "Popen enabled gc when it shouldn't.")
   1577 
   1578             gc.enable()
   1579             self.assertTrue(gc.isenabled())
   1580             subprocess.call([sys.executable, '-c', ''],
   1581                             preexec_fn=lambda: None)
   1582             self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
   1583 
   1584             gc.disable = raise_runtime_error
   1585             self.assertRaises(RuntimeError, subprocess.Popen,
   1586                               [sys.executable, '-c', ''],
   1587                               preexec_fn=lambda: None)
   1588 
   1589             del gc.isenabled  # force an AttributeError
   1590             self.assertRaises(AttributeError, subprocess.Popen,
   1591                               [sys.executable, '-c', ''],
   1592                               preexec_fn=lambda: None)
   1593         finally:
   1594             gc.disable = orig_gc_disable
   1595             gc.isenabled = orig_gc_isenabled
   1596             if not enabled:
   1597                 gc.disable()
   1598 
   1599     @unittest.skipIf(
   1600         sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
   1601     def test_preexec_fork_failure(self):
   1602         # The internal code did not preserve the previous exception when
   1603         # re-enabling garbage collection
   1604         try:
   1605             from resource import getrlimit, setrlimit, RLIMIT_NPROC
   1606         except ImportError as err:
   1607             self.skipTest(err)  # RLIMIT_NPROC is specific to Linux and BSD
   1608         limits = getrlimit(RLIMIT_NPROC)
   1609         [_, hard] = limits
   1610         setrlimit(RLIMIT_NPROC, (0, hard))
   1611         self.addCleanup(setrlimit, RLIMIT_NPROC, limits)
   1612         try:
   1613             subprocess.call([sys.executable, '-c', ''],
   1614                             preexec_fn=lambda: None)
   1615         except BlockingIOError:
   1616             # Forking should raise EAGAIN, translated to BlockingIOError
   1617             pass
   1618         else:
   1619             self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
   1620 
   1621     def test_args_string(self):
   1622         # args is a string
   1623         fd, fname = tempfile.mkstemp()
   1624         # reopen in text mode
   1625         with open(fd, "w", errors="surrogateescape") as fobj:
   1626             fobj.write("#!%s\n" % support.unix_shell)
   1627             fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
   1628                        sys.executable)
   1629         os.chmod(fname, 0o700)
   1630         p = subprocess.Popen(fname)
   1631         p.wait()
   1632         os.remove(fname)
   1633         self.assertEqual(p.returncode, 47)
   1634 
   1635     def test_invalid_args(self):
   1636         # invalid arguments should raise ValueError
   1637         self.assertRaises(ValueError, subprocess.call,
   1638                           [sys.executable, "-c",
   1639                            "import sys; sys.exit(47)"],
   1640                           startupinfo=47)
   1641         self.assertRaises(ValueError, subprocess.call,
   1642                           [sys.executable, "-c",
   1643                            "import sys; sys.exit(47)"],
   1644                           creationflags=47)
   1645 
   1646     def test_shell_sequence(self):
   1647         # Run command through the shell (sequence)
   1648         newenv = os.environ.copy()
   1649         newenv["FRUIT"] = "apple"
   1650         p = subprocess.Popen(["echo $FRUIT"], shell=1,
   1651                              stdout=subprocess.PIPE,
   1652                              env=newenv)
   1653         with p:
   1654             self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
   1655 
   1656     def test_shell_string(self):
   1657         # Run command through the shell (string)
   1658         newenv = os.environ.copy()
   1659         newenv["FRUIT"] = "apple"
   1660         p = subprocess.Popen("echo $FRUIT", shell=1,
   1661                              stdout=subprocess.PIPE,
   1662                              env=newenv)
   1663         with p:
   1664             self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
   1665 
   1666     def test_call_string(self):
   1667         # call() function with string argument on UNIX
   1668         fd, fname = tempfile.mkstemp()
   1669         # reopen in text mode
   1670         with open(fd, "w", errors="surrogateescape") as fobj:
   1671             fobj.write("#!%s\n" % support.unix_shell)
   1672             fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
   1673                        sys.executable)
   1674         os.chmod(fname, 0o700)
   1675         rc = subprocess.call(fname)
   1676         os.remove(fname)
   1677         self.assertEqual(rc, 47)
   1678 
   1679     def test_specific_shell(self):
   1680         # Issue #9265: Incorrect name passed as arg[0].
   1681         shells = []
   1682         for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
   1683             for name in ['bash', 'ksh']:
   1684                 sh = os.path.join(prefix, name)
   1685                 if os.path.isfile(sh):
   1686                     shells.append(sh)
   1687         if not shells: # Will probably work for any shell but csh.
   1688             self.skipTest("bash or ksh required for this test")
   1689         sh = '/bin/sh'
   1690         if os.path.isfile(sh) and not os.path.islink(sh):
   1691             # Test will fail if /bin/sh is a symlink to csh.
   1692             shells.append(sh)
   1693         for sh in shells:
   1694             p = subprocess.Popen("echo $0", executable=sh, shell=True,
   1695                                  stdout=subprocess.PIPE)
   1696             with p:
   1697                 self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
   1698 
   1699     def _kill_process(self, method, *args):
   1700         # Do not inherit file handles from the parent.
   1701         # It should fix failures on some platforms.
   1702         # Also set the SIGINT handler to the default to make sure it's not
   1703         # being ignored (some tests rely on that.)
   1704         old_handler = signal.signal(signal.SIGINT, signal.default_int_handler)
   1705         try:
   1706             p = subprocess.Popen([sys.executable, "-c", """if 1:
   1707                                  import sys, time
   1708                                  sys.stdout.write('x\\n')
   1709                                  sys.stdout.flush()
   1710                                  time.sleep(30)
   1711                                  """],
   1712                                  close_fds=True,
   1713                                  stdin=subprocess.PIPE,
   1714                                  stdout=subprocess.PIPE,
   1715                                  stderr=subprocess.PIPE)
   1716         finally:
   1717             signal.signal(signal.SIGINT, old_handler)
   1718         # Wait for the interpreter to be completely initialized before
   1719         # sending any signal.
   1720         p.stdout.read(1)
   1721         getattr(p, method)(*args)
   1722         return p
   1723 
   1724     @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
   1725                      "Due to known OS bug (issue #16762)")
   1726     def _kill_dead_process(self, method, *args):
   1727         # Do not inherit file handles from the parent.
   1728         # It should fix failures on some platforms.
   1729         p = subprocess.Popen([sys.executable, "-c", """if 1:
   1730                              import sys, time
   1731                              sys.stdout.write('x\\n')
   1732                              sys.stdout.flush()
   1733                              """],
   1734                              close_fds=True,
   1735                              stdin=subprocess.PIPE,
   1736                              stdout=subprocess.PIPE,
   1737                              stderr=subprocess.PIPE)
   1738         # Wait for the interpreter to be completely initialized before
   1739         # sending any signal.
   1740         p.stdout.read(1)
   1741         # The process should end after this
   1742         time.sleep(1)
   1743         # This shouldn't raise even though the child is now dead
   1744         getattr(p, method)(*args)
   1745         p.communicate()
   1746 
   1747     def test_send_signal(self):
   1748         p = self._kill_process('send_signal', signal.SIGINT)
   1749         _, stderr = p.communicate()
   1750         self.assertIn(b'KeyboardInterrupt', stderr)
   1751         self.assertNotEqual(p.wait(), 0)
   1752 
   1753     def test_kill(self):
   1754         p = self._kill_process('kill')
   1755         _, stderr = p.communicate()
   1756         self.assertStderrEqual(stderr, b'')
   1757         self.assertEqual(p.wait(), -signal.SIGKILL)
   1758 
   1759     def test_terminate(self):
   1760         p = self._kill_process('terminate')
   1761         _, stderr = p.communicate()
   1762         self.assertStderrEqual(stderr, b'')
   1763         self.assertEqual(p.wait(), -signal.SIGTERM)
   1764 
   1765     def test_send_signal_dead(self):
   1766         # Sending a signal to a dead process
   1767         self._kill_dead_process('send_signal', signal.SIGINT)
   1768 
   1769     def test_kill_dead(self):
   1770         # Killing a dead process
   1771         self._kill_dead_process('kill')
   1772 
   1773     def test_terminate_dead(self):
   1774         # Terminating a dead process
   1775         self._kill_dead_process('terminate')
   1776 
   1777     def _save_fds(self, save_fds):
   1778         fds = []
   1779         for fd in save_fds:
   1780             inheritable = os.get_inheritable(fd)
   1781             saved = os.dup(fd)
   1782             fds.append((fd, saved, inheritable))
   1783         return fds
   1784 
   1785     def _restore_fds(self, fds):
   1786         for fd, saved, inheritable in fds:
   1787             os.dup2(saved, fd, inheritable=inheritable)
   1788             os.close(saved)
   1789 
   1790     def check_close_std_fds(self, fds):
   1791         # Issue #9905: test that subprocess pipes still work properly with
   1792         # some standard fds closed
   1793         stdin = 0
   1794         saved_fds = self._save_fds(fds)
   1795         for fd, saved, inheritable in saved_fds:
   1796             if fd == 0:
   1797                 stdin = saved
   1798                 break
   1799         try:
   1800             for fd in fds:
   1801                 os.close(fd)
   1802             out, err = subprocess.Popen([sys.executable, "-c",
   1803                               'import sys;'
   1804                               'sys.stdout.write("apple");'
   1805                               'sys.stdout.flush();'
   1806                               'sys.stderr.write("orange")'],
   1807                        stdin=stdin,
   1808                        stdout=subprocess.PIPE,
   1809                        stderr=subprocess.PIPE).communicate()
   1810             err = support.strip_python_stderr(err)
   1811             self.assertEqual((out, err), (b'apple', b'orange'))
   1812         finally:
   1813             self._restore_fds(saved_fds)
   1814 
   1815     def test_close_fd_0(self):
   1816         self.check_close_std_fds([0])
   1817 
   1818     def test_close_fd_1(self):
   1819         self.check_close_std_fds([1])
   1820 
   1821     def test_close_fd_2(self):
   1822         self.check_close_std_fds([2])
   1823 
   1824     def test_close_fds_0_1(self):
   1825         self.check_close_std_fds([0, 1])
   1826 
   1827     def test_close_fds_0_2(self):
   1828         self.check_close_std_fds([0, 2])
   1829 
   1830     def test_close_fds_1_2(self):
   1831         self.check_close_std_fds([1, 2])
   1832 
   1833     def test_close_fds_0_1_2(self):
   1834         # Issue #10806: test that subprocess pipes still work properly with
   1835         # all standard fds closed.
   1836         self.check_close_std_fds([0, 1, 2])
   1837 
   1838     def test_small_errpipe_write_fd(self):
   1839         """Issue #15798: Popen should work when stdio fds are available."""
   1840         new_stdin = os.dup(0)
   1841         new_stdout = os.dup(1)
   1842         try:
   1843             os.close(0)
   1844             os.close(1)
   1845 
   1846             # Side test: if errpipe_write fails to have its CLOEXEC
   1847             # flag set this should cause the parent to think the exec
   1848             # failed.  Extremely unlikely: everyone supports CLOEXEC.
   1849             subprocess.Popen([
   1850                     sys.executable, "-c",
   1851                     "print('AssertionError:0:CLOEXEC failure.')"]).wait()
   1852         finally:
   1853             # Restore original stdin and stdout
   1854             os.dup2(new_stdin, 0)
   1855             os.dup2(new_stdout, 1)
   1856             os.close(new_stdin)
   1857             os.close(new_stdout)
   1858 
   1859     def test_remapping_std_fds(self):
   1860         # open up some temporary files
   1861         temps = [tempfile.mkstemp() for i in range(3)]
   1862         try:
   1863             temp_fds = [fd for fd, fname in temps]
   1864 
   1865             # unlink the files -- we won't need to reopen them
   1866             for fd, fname in temps:
   1867                 os.unlink(fname)
   1868 
   1869             # write some data to what will become stdin, and rewind
   1870             os.write(temp_fds[1], b"STDIN")
   1871             os.lseek(temp_fds[1], 0, 0)
   1872 
   1873             # move the standard file descriptors out of the way
   1874             saved_fds = self._save_fds(range(3))
   1875             try:
   1876                 # duplicate the file objects over the standard fd's
   1877                 for fd, temp_fd in enumerate(temp_fds):
   1878                     os.dup2(temp_fd, fd)
   1879 
   1880                 # now use those files in the "wrong" order, so that subprocess
   1881                 # has to rearrange them in the child
   1882                 p = subprocess.Popen([sys.executable, "-c",
   1883                     'import sys; got = sys.stdin.read();'
   1884                     'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
   1885                     stdin=temp_fds[1],
   1886                     stdout=temp_fds[2],
   1887                     stderr=temp_fds[0])
   1888                 p.wait()
   1889             finally:
   1890                 self._restore_fds(saved_fds)
   1891 
   1892             for fd in temp_fds:
   1893                 os.lseek(fd, 0, 0)
   1894 
   1895             out = os.read(temp_fds[2], 1024)
   1896             err = support.strip_python_stderr(os.read(temp_fds[0], 1024))
   1897             self.assertEqual(out, b"got STDIN")
   1898             self.assertEqual(err, b"err")
   1899 
   1900         finally:
   1901             for fd in temp_fds:
   1902                 os.close(fd)
   1903 
   1904     def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
   1905         # open up some temporary files
   1906         temps = [tempfile.mkstemp() for i in range(3)]
   1907         temp_fds = [fd for fd, fname in temps]
   1908         try:
   1909             # unlink the files -- we won't need to reopen them
   1910             for fd, fname in temps:
   1911                 os.unlink(fname)
   1912 
   1913             # save a copy of the standard file descriptors
   1914             saved_fds = self._save_fds(range(3))
   1915             try:
   1916                 # duplicate the temp files over the standard fd's 0, 1, 2
   1917                 for fd, temp_fd in enumerate(temp_fds):
   1918                     os.dup2(temp_fd, fd)
   1919 
   1920                 # write some data to what will become stdin, and rewind
   1921                 os.write(stdin_no, b"STDIN")
   1922                 os.lseek(stdin_no, 0, 0)
   1923 
   1924                 # now use those files in the given order, so that subprocess
   1925                 # has to rearrange them in the child
   1926                 p = subprocess.Popen([sys.executable, "-c",
   1927                     'import sys; got = sys.stdin.read();'
   1928                     'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
   1929                     stdin=stdin_no,
   1930                     stdout=stdout_no,
   1931                     stderr=stderr_no)
   1932                 p.wait()
   1933 
   1934                 for fd in temp_fds:
   1935                     os.lseek(fd, 0, 0)
   1936 
   1937                 out = os.read(stdout_no, 1024)
   1938                 err = support.strip_python_stderr(os.read(stderr_no, 1024))
   1939             finally:
   1940                 self._restore_fds(saved_fds)
   1941 
   1942             self.assertEqual(out, b"got STDIN")
   1943             self.assertEqual(err, b"err")
   1944 
   1945         finally:
   1946             for fd in temp_fds:
   1947                 os.close(fd)
   1948 
   1949     # When duping fds, if there arises a situation where one of the fds is
   1950     # either 0, 1 or 2, it is possible that it is overwritten (#12607).
   1951     # This tests all combinations of this.
   1952     def test_swap_fds(self):
   1953         self.check_swap_fds(0, 1, 2)
   1954         self.check_swap_fds(0, 2, 1)
   1955         self.check_swap_fds(1, 0, 2)
   1956         self.check_swap_fds(1, 2, 0)
   1957         self.check_swap_fds(2, 0, 1)
   1958         self.check_swap_fds(2, 1, 0)
   1959 
   1960     def test_surrogates_error_message(self):
   1961         def prepare():
   1962             raise ValueError("surrogate:\uDCff")
   1963 
   1964         try:
   1965             subprocess.call(
   1966                 [sys.executable, "-c", "pass"],
   1967                 preexec_fn=prepare)
   1968         except ValueError as err:
   1969             # Pure Python implementations keeps the message
   1970             self.assertIsNone(subprocess._posixsubprocess)
   1971             self.assertEqual(str(err), "surrogate:\uDCff")
   1972         except subprocess.SubprocessError as err:
   1973             # _posixsubprocess uses a default message
   1974             self.assertIsNotNone(subprocess._posixsubprocess)
   1975             self.assertEqual(str(err), "Exception occurred in preexec_fn.")
   1976         else:
   1977             self.fail("Expected ValueError or subprocess.SubprocessError")
   1978 
   1979     def test_undecodable_env(self):
   1980         for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
   1981             encoded_value = value.encode("ascii", "surrogateescape")
   1982 
   1983             # test str with surrogates
   1984             script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
   1985             env = os.environ.copy()
   1986             env[key] = value
   1987             # Use C locale to get ASCII for the locale encoding to force
   1988             # surrogate-escaping of \xFF in the child process; otherwise it can
   1989             # be decoded as-is if the default locale is latin-1.
   1990             env['LC_ALL'] = 'C'
   1991             if sys.platform.startswith("aix"):
   1992                 # On AIX, the C locale uses the Latin1 encoding
   1993                 decoded_value = encoded_value.decode("latin1", "surrogateescape")
   1994             else:
   1995                 # On other UNIXes, the C locale uses the ASCII encoding
   1996                 decoded_value = value
   1997             stdout = subprocess.check_output(
   1998                 [sys.executable, "-c", script],
   1999                 env=env)
   2000             stdout = stdout.rstrip(b'\n\r')
   2001             self.assertEqual(stdout.decode('ascii'), ascii(decoded_value))
   2002 
   2003             # test bytes
   2004             key = key.encode("ascii", "surrogateescape")
   2005             script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
   2006             env = os.environ.copy()
   2007             env[key] = encoded_value
   2008             stdout = subprocess.check_output(
   2009                 [sys.executable, "-c", script],
   2010                 env=env)
   2011             stdout = stdout.rstrip(b'\n\r')
   2012             self.assertEqual(stdout.decode('ascii'), ascii(encoded_value))
   2013 
   2014     def test_bytes_program(self):
   2015         abs_program = os.fsencode(sys.executable)
   2016         path, program = os.path.split(sys.executable)
   2017         program = os.fsencode(program)
   2018 
   2019         # absolute bytes path
   2020         exitcode = subprocess.call([abs_program, "-c", "pass"])
   2021         self.assertEqual(exitcode, 0)
   2022 
   2023         # absolute bytes path as a string
   2024         cmd = b"'" + abs_program + b"' -c pass"
   2025         exitcode = subprocess.call(cmd, shell=True)
   2026         self.assertEqual(exitcode, 0)
   2027 
   2028         # bytes program, unicode PATH
   2029         env = os.environ.copy()
   2030         env["PATH"] = path
   2031         exitcode = subprocess.call([program, "-c", "pass"], env=env)
   2032         self.assertEqual(exitcode, 0)
   2033 
   2034         # bytes program, bytes PATH
   2035         envb = os.environb.copy()
   2036         envb[b"PATH"] = os.fsencode(path)
   2037         exitcode = subprocess.call([program, "-c", "pass"], env=envb)
   2038         self.assertEqual(exitcode, 0)
   2039 
   2040     def test_pipe_cloexec(self):
   2041         sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
   2042         fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
   2043 
   2044         p1 = subprocess.Popen([sys.executable, sleeper],
   2045                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   2046                               stderr=subprocess.PIPE, close_fds=False)
   2047 
   2048         self.addCleanup(p1.communicate, b'')
   2049 
   2050         p2 = subprocess.Popen([sys.executable, fd_status],
   2051                               stdout=subprocess.PIPE, close_fds=False)
   2052 
   2053         output, error = p2.communicate()
   2054         result_fds = set(map(int, output.split(b',')))
   2055         unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
   2056                             p1.stderr.fileno()])
   2057 
   2058         self.assertFalse(result_fds & unwanted_fds,
   2059                          "Expected no fds from %r to be open in child, "
   2060                          "found %r" %
   2061                               (unwanted_fds, result_fds & unwanted_fds))
   2062 
   2063     def test_pipe_cloexec_real_tools(self):
   2064         qcat = support.findfile("qcat.py", subdir="subprocessdata")
   2065         qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
   2066 
   2067         subdata = b'zxcvbn'
   2068         data = subdata * 4 + b'\n'
   2069 
   2070         p1 = subprocess.Popen([sys.executable, qcat],
   2071                               stdin=subprocess.PIPE, stdout=subprocess.PIPE,
   2072                               close_fds=False)
   2073 
   2074         p2 = subprocess.Popen([sys.executable, qgrep, subdata],
   2075                               stdin=p1.stdout, stdout=subprocess.PIPE,
   2076                               close_fds=False)
   2077 
   2078         self.addCleanup(p1.wait)
   2079         self.addCleanup(p2.wait)
   2080         def kill_p1():
   2081             try:
   2082                 p1.terminate()
   2083             except ProcessLookupError:
   2084                 pass
   2085         def kill_p2():
   2086             try:
   2087                 p2.terminate()
   2088             except ProcessLookupError:
   2089                 pass
   2090         self.addCleanup(kill_p1)
   2091         self.addCleanup(kill_p2)
   2092 
   2093         p1.stdin.write(data)
   2094         p1.stdin.close()
   2095 
   2096         readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
   2097 
   2098         self.assertTrue(readfiles, "The child hung")
   2099         self.assertEqual(p2.stdout.read(), data)
   2100 
   2101         p1.stdout.close()
   2102         p2.stdout.close()
   2103 
   2104     def test_close_fds(self):
   2105         fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
   2106 
   2107         fds = os.pipe()
   2108         self.addCleanup(os.close, fds[0])
   2109         self.addCleanup(os.close, fds[1])
   2110 
   2111         open_fds = set(fds)
   2112         # add a bunch more fds
   2113         for _ in range(9):
   2114             fd = os.open(os.devnull, os.O_RDONLY)
   2115             self.addCleanup(os.close, fd)
   2116             open_fds.add(fd)
   2117 
   2118         for fd in open_fds:
   2119             os.set_inheritable(fd, True)
   2120 
   2121         p = subprocess.Popen([sys.executable, fd_status],
   2122                              stdout=subprocess.PIPE, close_fds=False)
   2123         output, ignored = p.communicate()
   2124         remaining_fds = set(map(int, output.split(b',')))
   2125 
   2126         self.assertEqual(remaining_fds & open_fds, open_fds,
   2127                          "Some fds were closed")
   2128 
   2129         p = subprocess.Popen([sys.executable, fd_status],
   2130                              stdout=subprocess.PIPE, close_fds=True)
   2131         output, ignored = p.communicate()
   2132         remaining_fds = set(map(int, output.split(b',')))
   2133 
   2134         self.assertFalse(remaining_fds & open_fds,
   2135                          "Some fds were left open")
   2136         self.assertIn(1, remaining_fds, "Subprocess failed")
   2137 
   2138         # Keep some of the fd's we opened open in the subprocess.
   2139         # This tests _posixsubprocess.c's proper handling of fds_to_keep.
   2140         fds_to_keep = set(open_fds.pop() for _ in range(8))
   2141         p = subprocess.Popen([sys.executable, fd_status],
   2142                              stdout=subprocess.PIPE, close_fds=True,
   2143                              pass_fds=())
   2144         output, ignored = p.communicate()
   2145         remaining_fds = set(map(int, output.split(b',')))
   2146 
   2147         self.assertFalse(remaining_fds & fds_to_keep & open_fds,
   2148                          "Some fds not in pass_fds were left open")
   2149         self.assertIn(1, remaining_fds, "Subprocess failed")
   2150 
   2151 
   2152     @unittest.skipIf(sys.platform.startswith("freebsd") and
   2153                      os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
   2154                      "Requires fdescfs mounted on /dev/fd on FreeBSD.")
   2155     def test_close_fds_when_max_fd_is_lowered(self):
   2156         """Confirm that issue21618 is fixed (may fail under valgrind)."""
   2157         fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
   2158 
   2159         # This launches the meat of the test in a child process to
   2160         # avoid messing with the larger unittest processes maximum
   2161         # number of file descriptors.
   2162         #  This process launches:
   2163         #  +--> Process that lowers its RLIMIT_NOFILE aftr setting up
   2164         #    a bunch of high open fds above the new lower rlimit.
   2165         #    Those are reported via stdout before launching a new
   2166         #    process with close_fds=False to run the actual test:
   2167         #    +--> The TEST: This one launches a fd_status.py
   2168         #      subprocess with close_fds=True so we can find out if
   2169         #      any of the fds above the lowered rlimit are still open.
   2170         p = subprocess.Popen([sys.executable, '-c', textwrap.dedent(
   2171         '''
   2172         import os, resource, subprocess, sys, textwrap
   2173         open_fds = set()
   2174         # Add a bunch more fds to pass down.
   2175         for _ in range(40):
   2176             fd = os.open(os.devnull, os.O_RDONLY)
   2177             open_fds.add(fd)
   2178 
   2179         # Leave a two pairs of low ones available for use by the
   2180         # internal child error pipe and the stdout pipe.
   2181         # We also leave 10 more open as some Python buildbots run into
   2182         # "too many open files" errors during the test if we do not.
   2183         for fd in sorted(open_fds)[:14]:
   2184             os.close(fd)
   2185             open_fds.remove(fd)
   2186 
   2187         for fd in open_fds:
   2188             #self.addCleanup(os.close, fd)
   2189             os.set_inheritable(fd, True)
   2190 
   2191         max_fd_open = max(open_fds)
   2192 
   2193         # Communicate the open_fds to the parent unittest.TestCase process.
   2194         print(','.join(map(str, sorted(open_fds))))
   2195         sys.stdout.flush()
   2196 
   2197         rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
   2198         try:
   2199             # 29 is lower than the highest fds we are leaving open.
   2200             resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
   2201             # Launch a new Python interpreter with our low fd rlim_cur that
   2202             # inherits open fds above that limit.  It then uses subprocess
   2203             # with close_fds=True to get a report of open fds in the child.
   2204             # An explicit list of fds to check is passed to fd_status.py as
   2205             # letting fd_status rely on its default logic would miss the
   2206             # fds above rlim_cur as it normally only checks up to that limit.
   2207             subprocess.Popen(
   2208                 [sys.executable, '-c',
   2209                  textwrap.dedent("""
   2210                      import subprocess, sys
   2211                      subprocess.Popen([sys.executable, %r] +
   2212                                       [str(x) for x in range({max_fd})],
   2213                                       close_fds=True).wait()
   2214                      """.format(max_fd=max_fd_open+1))],
   2215                 close_fds=False).wait()
   2216         finally:
   2217             resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
   2218         ''' % fd_status)], stdout=subprocess.PIPE)
   2219 
   2220         output, unused_stderr = p.communicate()
   2221         output_lines = output.splitlines()
   2222         self.assertEqual(len(output_lines), 2,
   2223                          msg="expected exactly two lines of output:\n%r" % output)
   2224         opened_fds = set(map(int, output_lines[0].strip().split(b',')))
   2225         remaining_fds = set(map(int, output_lines[1].strip().split(b',')))
   2226 
   2227         self.assertFalse(remaining_fds & opened_fds,
   2228                          msg="Some fds were left open.")
   2229 
   2230 
   2231     # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
   2232     # descriptor of a pipe closed in the parent process is valid in the
   2233     # child process according to fstat(), but the mode of the file
   2234     # descriptor is invalid, and read or write raise an error.
   2235     @support.requires_mac_ver(10, 5)
   2236     def test_pass_fds(self):
   2237         fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
   2238 
   2239         open_fds = set()
   2240 
   2241         for x in range(5):
   2242             fds = os.pipe()
   2243             self.addCleanup(os.close, fds[0])
   2244             self.addCleanup(os.close, fds[1])
   2245             os.set_inheritable(fds[0], True)
   2246             os.set_inheritable(fds[1], True)
   2247             open_fds.update(fds)
   2248 
   2249         for fd in open_fds:
   2250             p = subprocess.Popen([sys.executable, fd_status],
   2251                                  stdout=subprocess.PIPE, close_fds=True,
   2252                                  pass_fds=(fd, ))
   2253             output, ignored = p.communicate()
   2254 
   2255             remaining_fds = set(map(int, output.split(b',')))
   2256             to_be_closed = open_fds - {fd}
   2257 
   2258             self.assertIn(fd, remaining_fds, "fd to be passed not passed")
   2259             self.assertFalse(remaining_fds & to_be_closed,
   2260                              "fd to be closed passed")
   2261 
   2262             # pass_fds overrides close_fds with a warning.
   2263             with self.assertWarns(RuntimeWarning) as context:
   2264                 self.assertFalse(subprocess.call(
   2265                         [sys.executable, "-c", "import sys; sys.exit(0)"],
   2266                         close_fds=False, pass_fds=(fd, )))
   2267             self.assertIn('overriding close_fds', str(context.warning))
   2268 
   2269     def test_pass_fds_inheritable(self):
   2270         script = support.findfile("fd_status.py", subdir="subprocessdata")
   2271 
   2272         inheritable, non_inheritable = os.pipe()
   2273         self.addCleanup(os.close, inheritable)
   2274         self.addCleanup(os.close, non_inheritable)
   2275         os.set_inheritable(inheritable, True)
   2276         os.set_inheritable(non_inheritable, False)
   2277         pass_fds = (inheritable, non_inheritable)
   2278         args = [sys.executable, script]
   2279         args += list(map(str, pass_fds))
   2280 
   2281         p = subprocess.Popen(args,
   2282                              stdout=subprocess.PIPE, close_fds=True,
   2283                              pass_fds=pass_fds)
   2284         output, ignored = p.communicate()
   2285         fds = set(map(int, output.split(b',')))
   2286 
   2287         # the inheritable file descriptor must be inherited, so its inheritable
   2288         # flag must be set in the child process after fork() and before exec()
   2289         self.assertEqual(fds, set(pass_fds), "output=%a" % output)
   2290 
   2291         # inheritable flag must not be changed in the parent process
   2292         self.assertEqual(os.get_inheritable(inheritable), True)
   2293         self.assertEqual(os.get_inheritable(non_inheritable), False)
   2294 
   2295     def test_stdout_stdin_are_single_inout_fd(self):
   2296         with io.open(os.devnull, "r+") as inout:
   2297             p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
   2298                                  stdout=inout, stdin=inout)
   2299             p.wait()
   2300 
   2301     def test_stdout_stderr_are_single_inout_fd(self):
   2302         with io.open(os.devnull, "r+") as inout:
   2303             p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
   2304                                  stdout=inout, stderr=inout)
   2305             p.wait()
   2306 
   2307     def test_stderr_stdin_are_single_inout_fd(self):
   2308         with io.open(os.devnull, "r+") as inout:
   2309             p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
   2310                                  stderr=inout, stdin=inout)
   2311             p.wait()
   2312 
   2313     def test_wait_when_sigchild_ignored(self):
   2314         # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
   2315         sigchild_ignore = support.findfile("sigchild_ignore.py",
   2316                                            subdir="subprocessdata")
   2317         p = subprocess.Popen([sys.executable, sigchild_ignore],
   2318                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
   2319         stdout, stderr = p.communicate()
   2320         self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
   2321                          " non-zero with this error:\n%s" %
   2322                          stderr.decode('utf-8'))
   2323 
   2324     def test_select_unbuffered(self):
   2325         # Issue #11459: bufsize=0 should really set the pipes as
   2326         # unbuffered (and therefore let select() work properly).
   2327         select = support.import_module("select")
   2328         p = subprocess.Popen([sys.executable, "-c",
   2329                               'import sys;'
   2330                               'sys.stdout.write("apple")'],
   2331                              stdout=subprocess.PIPE,
   2332                              bufsize=0)
   2333         f = p.stdout
   2334         self.addCleanup(f.close)
   2335         try:
   2336             self.assertEqual(f.read(4), b"appl")
   2337             self.assertIn(f, select.select([f], [], [], 0.0)[0])
   2338         finally:
   2339             p.wait()
   2340 
   2341     def test_zombie_fast_process_del(self):
   2342         # Issue #12650: on Unix, if Popen.__del__() was called before the
   2343         # process exited, it wouldn't be added to subprocess._active, and would
   2344         # remain a zombie.
   2345         # spawn a Popen, and delete its reference before it exits
   2346         p = subprocess.Popen([sys.executable, "-c",
   2347                               'import sys, time;'
   2348                               'time.sleep(0.2)'],
   2349                              stdout=subprocess.PIPE,
   2350                              stderr=subprocess.PIPE)
   2351         self.addCleanup(p.stdout.close)
   2352         self.addCleanup(p.stderr.close)
   2353         ident = id(p)
   2354         pid = p.pid
   2355         with support.check_warnings(('', ResourceWarning)):
   2356             p = None
   2357 
   2358         # check that p is in the active processes list
   2359         self.assertIn(ident, [id(o) for o in subprocess._active])
   2360 
   2361     def test_leak_fast_process_del_killed(self):
   2362         # Issue #12650: on Unix, if Popen.__del__() was called before the
   2363         # process exited, and the process got killed by a signal, it would never
   2364         # be removed from subprocess._active, which triggered a FD and memory
   2365         # leak.
   2366         # spawn a Popen, delete its reference and kill it
   2367         p = subprocess.Popen([sys.executable, "-c",
   2368                               'import time;'
   2369                               'time.sleep(3)'],
   2370                              stdout=subprocess.PIPE,
   2371                              stderr=subprocess.PIPE)
   2372         self.addCleanup(p.stdout.close)
   2373         self.addCleanup(p.stderr.close)
   2374         ident = id(p)
   2375         pid = p.pid
   2376         with support.check_warnings(('', ResourceWarning)):
   2377             p = None
   2378 
   2379         os.kill(pid, signal.SIGKILL)
   2380         # check that p is in the active processes list
   2381         self.assertIn(ident, [id(o) for o in subprocess._active])
   2382 
   2383         # let some time for the process to exit, and create a new Popen: this
   2384         # should trigger the wait() of p
   2385         time.sleep(0.2)
   2386         with self.assertRaises(OSError) as c:
   2387             with subprocess.Popen(['nonexisting_i_hope'],
   2388                                   stdout=subprocess.PIPE,
   2389                                   stderr=subprocess.PIPE) as proc:
   2390                 pass
   2391         # p should have been wait()ed on, and removed from the _active list
   2392         self.assertRaises(OSError, os.waitpid, pid, 0)
   2393         self.assertNotIn(ident, [id(o) for o in subprocess._active])
   2394 
   2395     def test_close_fds_after_preexec(self):
   2396         fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
   2397 
   2398         # this FD is used as dup2() target by preexec_fn, and should be closed
   2399         # in the child process
   2400         fd = os.dup(1)
   2401         self.addCleanup(os.close, fd)
   2402 
   2403         p = subprocess.Popen([sys.executable, fd_status],
   2404                              stdout=subprocess.PIPE, close_fds=True,
   2405                              preexec_fn=lambda: os.dup2(1, fd))
   2406         output, ignored = p.communicate()
   2407 
   2408         remaining_fds = set(map(int, output.split(b',')))
   2409 
   2410         self.assertNotIn(fd, remaining_fds)
   2411 
   2412     @support.cpython_only
   2413     def test_fork_exec(self):
   2414         # Issue #22290: fork_exec() must not crash on memory allocation failure
   2415         # or other errors
   2416         import _posixsubprocess
   2417         gc_enabled = gc.isenabled()
   2418         try:
   2419             # Use a preexec function and enable the garbage collector
   2420             # to force fork_exec() to re-enable the garbage collector
   2421             # on error.
   2422             func = lambda: None
   2423             gc.enable()
   2424 
   2425             for args, exe_list, cwd, env_list in (
   2426                 (123,      [b"exe"], None, [b"env"]),
   2427                 ([b"arg"], 123,      None, [b"env"]),
   2428                 ([b"arg"], [b"exe"], 123,  [b"env"]),
   2429                 ([b"arg"], [b"exe"], None, 123),
   2430             ):
   2431                 with self.assertRaises(TypeError):
   2432                     _posixsubprocess.fork_exec(
   2433                         args, exe_list,
   2434                         True, [], cwd, env_list,
   2435                         -1, -1, -1, -1,
   2436                         1, 2, 3, 4,
   2437                         True, True, func)
   2438         finally:
   2439             if not gc_enabled:
   2440                 gc.disable()
   2441 
   2442     @support.cpython_only
   2443     def test_fork_exec_sorted_fd_sanity_check(self):
   2444         # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
   2445         import _posixsubprocess
   2446         gc_enabled = gc.isenabled()
   2447         try:
   2448             gc.enable()
   2449 
   2450             for fds_to_keep in (
   2451                 (-1, 2, 3, 4, 5),  # Negative number.
   2452                 ('str', 4),  # Not an int.
   2453                 (18, 23, 42, 2**63),  # Out of range.
   2454                 (5, 4),  # Not sorted.
   2455                 (6, 7, 7, 8),  # Duplicate.
   2456             ):
   2457                 with self.assertRaises(
   2458                         ValueError,
   2459                         msg='fds_to_keep={}'.format(fds_to_keep)) as c:
   2460                     _posixsubprocess.fork_exec(
   2461                         [b"false"], [b"false"],
   2462                         True, fds_to_keep, None, [b"env"],
   2463                         -1, -1, -1, -1,
   2464                         1, 2, 3, 4,
   2465                         True, True, None)
   2466                 self.assertIn('fds_to_keep', str(c.exception))
   2467         finally:
   2468             if not gc_enabled:
   2469                 gc.disable()
   2470 
   2471     def test_communicate_BrokenPipeError_stdin_close(self):
   2472         # By not setting stdout or stderr or a timeout we force the fast path
   2473         # that just calls _stdin_write() internally due to our mock.
   2474         proc = subprocess.Popen([sys.executable, '-c', 'pass'])
   2475         with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
   2476             mock_proc_stdin.close.side_effect = BrokenPipeError
   2477             proc.communicate()  # Should swallow BrokenPipeError from close.
   2478             mock_proc_stdin.close.assert_called_with()
   2479 
   2480     def test_communicate_BrokenPipeError_stdin_write(self):
   2481         # By not setting stdout or stderr or a timeout we force the fast path
   2482         # that just calls _stdin_write() internally due to our mock.
   2483         proc = subprocess.Popen([sys.executable, '-c', 'pass'])
   2484         with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
   2485             mock_proc_stdin.write.side_effect = BrokenPipeError
   2486             proc.communicate(b'stuff')  # Should swallow the BrokenPipeError.
   2487             mock_proc_stdin.write.assert_called_once_with(b'stuff')
   2488             mock_proc_stdin.close.assert_called_once_with()
   2489 
   2490     def test_communicate_BrokenPipeError_stdin_flush(self):
   2491         # Setting stdin and stdout forces the ._communicate() code path.
   2492         # python -h exits faster than python -c pass (but spams stdout).
   2493         proc = subprocess.Popen([sys.executable, '-h'],
   2494                                 stdin=subprocess.PIPE,
   2495                                 stdout=subprocess.PIPE)
   2496         with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \
   2497                 open(os.devnull, 'wb') as dev_null:
   2498             mock_proc_stdin.flush.side_effect = BrokenPipeError
   2499             # because _communicate registers a selector using proc.stdin...
   2500             mock_proc_stdin.fileno.return_value = dev_null.fileno()
   2501             # _communicate() should swallow BrokenPipeError from flush.
   2502             proc.communicate(b'stuff')
   2503             mock_proc_stdin.flush.assert_called_once_with()
   2504 
   2505     def test_communicate_BrokenPipeError_stdin_close_with_timeout(self):
   2506         # Setting stdin and stdout forces the ._communicate() code path.
   2507         # python -h exits faster than python -c pass (but spams stdout).
   2508         proc = subprocess.Popen([sys.executable, '-h'],
   2509                                 stdin=subprocess.PIPE,
   2510                                 stdout=subprocess.PIPE)
   2511         with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin:
   2512             mock_proc_stdin.close.side_effect = BrokenPipeError
   2513             # _communicate() should swallow BrokenPipeError from close.
   2514             proc.communicate(timeout=999)
   2515             mock_proc_stdin.close.assert_called_once_with()
   2516 
   2517     _libc_file_extensions = {
   2518       'Linux': 'so.6',
   2519       'Darwin': 'dylib',
   2520     }
   2521     @unittest.skipIf(not ctypes, 'ctypes module required.')
   2522     @unittest.skipIf(platform.uname()[0] not in _libc_file_extensions,
   2523                      'Test requires a libc this code can load with ctypes.')
   2524     @unittest.skipIf(not sys.executable, 'Test requires sys.executable.')
   2525     def test_child_terminated_in_stopped_state(self):
   2526         """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
   2527         PTRACE_TRACEME = 0  # From glibc and MacOS (PT_TRACE_ME).
   2528         libc_name = 'libc.' + self._libc_file_extensions[platform.uname()[0]]
   2529         libc = ctypes.CDLL(libc_name)
   2530         if not hasattr(libc, 'ptrace'):
   2531             raise unittest.SkipTest('ptrace() required.')
   2532         test_ptrace = subprocess.Popen(
   2533             [sys.executable, '-c', """if True:
   2534              import ctypes
   2535              libc = ctypes.CDLL({libc_name!r})
   2536              libc.ptrace({PTRACE_TRACEME}, 0, 0)
   2537              """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME)
   2538             ])
   2539         if test_ptrace.wait() != 0:
   2540             raise unittest.SkipTest('ptrace() failed - unable to test.')
   2541         child = subprocess.Popen(
   2542             [sys.executable, '-c', """if True:
   2543              import ctypes
   2544              libc = ctypes.CDLL({libc_name!r})
   2545              libc.ptrace({PTRACE_TRACEME}, 0, 0)
   2546              libc.printf(ctypes.c_char_p(0xdeadbeef))  # Crash the process.
   2547              """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME)
   2548             ])
   2549         try:
   2550             returncode = child.wait()
   2551         except Exception as e:
   2552             child.kill()  # Clean up the hung stopped process.
   2553             raise e
   2554         self.assertNotEqual(0, returncode)
   2555         self.assertLess(returncode, 0)  # signal death, likely SIGSEGV.
   2556 
   2557 
   2558 @unittest.skipUnless(mswindows, "Windows specific tests")
   2559 class Win32ProcessTestCase(BaseTestCase):
   2560 
   2561     def test_startupinfo(self):
   2562         # startupinfo argument
   2563         # We uses hardcoded constants, because we do not want to
   2564         # depend on win32all.
   2565         STARTF_USESHOWWINDOW = 1
   2566         SW_MAXIMIZE = 3
   2567         startupinfo = subprocess.STARTUPINFO()
   2568         startupinfo.dwFlags = STARTF_USESHOWWINDOW
   2569         startupinfo.wShowWindow = SW_MAXIMIZE
   2570         # Since Python is a console process, it won't be affected
   2571         # by wShowWindow, but the argument should be silently
   2572         # ignored
   2573         subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
   2574                         startupinfo=startupinfo)
   2575 
   2576     def test_creationflags(self):
   2577         # creationflags argument
   2578         CREATE_NEW_CONSOLE = 16
   2579         sys.stderr.write("    a DOS box should flash briefly ...\n")
   2580         subprocess.call(sys.executable +
   2581                         ' -c "import time; time.sleep(0.25)"',
   2582                         creationflags=CREATE_NEW_CONSOLE)
   2583 
   2584     def test_invalid_args(self):
   2585         # invalid arguments should raise ValueError
   2586         self.assertRaises(ValueError, subprocess.call,
   2587                           [sys.executable, "-c",
   2588                            "import sys; sys.exit(47)"],
   2589                           preexec_fn=lambda: 1)
   2590         self.assertRaises(ValueError, subprocess.call,
   2591                           [sys.executable, "-c",
   2592                            "import sys; sys.exit(47)"],
   2593                           stdout=subprocess.PIPE,
   2594                           close_fds=True)
   2595 
   2596     def test_close_fds(self):
   2597         # close file descriptors
   2598         rc = subprocess.call([sys.executable, "-c",
   2599                               "import sys; sys.exit(47)"],
   2600                               close_fds=True)
   2601         self.assertEqual(rc, 47)
   2602 
   2603     def test_shell_sequence(self):
   2604         # Run command through the shell (sequence)
   2605         newenv = os.environ.copy()
   2606         newenv["FRUIT"] = "physalis"
   2607         p = subprocess.Popen(["set"], shell=1,
   2608                              stdout=subprocess.PIPE,
   2609                              env=newenv)
   2610         with p:
   2611             self.assertIn(b"physalis", p.stdout.read())
   2612 
   2613     def test_shell_string(self):
   2614         # Run command through the shell (string)
   2615         newenv = os.environ.copy()
   2616         newenv["FRUIT"] = "physalis"
   2617         p = subprocess.Popen("set", shell=1,
   2618                              stdout=subprocess.PIPE,
   2619                              env=newenv)
   2620         with p:
   2621             self.assertIn(b"physalis", p.stdout.read())
   2622 
   2623     def test_shell_encodings(self):
   2624         # Run command through the shell (string)
   2625         for enc in ['ansi', 'oem']:
   2626             newenv = os.environ.copy()
   2627             newenv["FRUIT"] = "physalis"
   2628             p = subprocess.Popen("set", shell=1,
   2629                                  stdout=subprocess.PIPE,
   2630                                  env=newenv,
   2631                                  encoding=enc)
   2632             with p:
   2633                 self.assertIn("physalis", p.stdout.read(), enc)
   2634 
   2635     def test_call_string(self):
   2636         # call() function with string argument on Windows
   2637         rc = subprocess.call(sys.executable +
   2638                              ' -c "import sys; sys.exit(47)"')
   2639         self.assertEqual(rc, 47)
   2640 
   2641     def _kill_process(self, method, *args):
   2642         # Some win32 buildbot raises EOFError if stdin is inherited
   2643         p = subprocess.Popen([sys.executable, "-c", """if 1:
   2644                              import sys, time
   2645                              sys.stdout.write('x\\n')
   2646                              sys.stdout.flush()
   2647                              time.sleep(30)
   2648                              """],
   2649                              stdin=subprocess.PIPE,
   2650                              stdout=subprocess.PIPE,
   2651                              stderr=subprocess.PIPE)
   2652         with p:
   2653             # Wait for the interpreter to be completely initialized before
   2654             # sending any signal.
   2655             p.stdout.read(1)
   2656             getattr(p, method)(*args)
   2657             _, stderr = p.communicate()
   2658             self.assertStderrEqual(stderr, b'')
   2659             returncode = p.wait()
   2660         self.assertNotEqual(returncode, 0)
   2661 
   2662     def _kill_dead_process(self, method, *args):
   2663         p = subprocess.Popen([sys.executable, "-c", """if 1:
   2664                              import sys, time
   2665                              sys.stdout.write('x\\n')
   2666                              sys.stdout.flush()
   2667                              sys.exit(42)
   2668                              """],
   2669                              stdin=subprocess.PIPE,
   2670                              stdout=subprocess.PIPE,
   2671                              stderr=subprocess.PIPE)
   2672         with p:
   2673             # Wait for the interpreter to be completely initialized before
   2674             # sending any signal.
   2675             p.stdout.read(1)
   2676             # The process should end after this
   2677             time.sleep(1)
   2678             # This shouldn't raise even though the child is now dead
   2679             getattr(p, method)(*args)
   2680             _, stderr = p.communicate()
   2681             self.assertStderrEqual(stderr, b'')
   2682             rc = p.wait()
   2683         self.assertEqual(rc, 42)
   2684 
   2685     def test_send_signal(self):
   2686         self._kill_process('send_signal', signal.SIGTERM)
   2687 
   2688     def test_kill(self):
   2689         self._kill_process('kill')
   2690 
   2691     def test_terminate(self):
   2692         self._kill_process('terminate')
   2693 
   2694     def test_send_signal_dead(self):
   2695         self._kill_dead_process('send_signal', signal.SIGTERM)
   2696 
   2697     def test_kill_dead(self):
   2698         self._kill_dead_process('kill')
   2699 
   2700     def test_terminate_dead(self):
   2701         self._kill_dead_process('terminate')
   2702 
   2703 class MiscTests(unittest.TestCase):
   2704     def test_getoutput(self):
   2705         self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
   2706         self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
   2707                          (0, 'xyzzy'))
   2708 
   2709         # we use mkdtemp in the next line to create an empty directory
   2710         # under our exclusive control; from that, we can invent a pathname
   2711         # that we _know_ won't exist.  This is guaranteed to fail.
   2712         dir = None
   2713         try:
   2714             dir = tempfile.mkdtemp()
   2715             name = os.path.join(dir, "foo")
   2716             status, output = subprocess.getstatusoutput(
   2717                 ("type " if mswindows else "cat ") + name)
   2718             self.assertNotEqual(status, 0)
   2719         finally:
   2720             if dir is not None:
   2721                 os.rmdir(dir)
   2722 
   2723     def test__all__(self):
   2724         """Ensure that __all__ is populated properly."""
   2725         intentionally_excluded = {"list2cmdline", "Handle"}
   2726         exported = set(subprocess.__all__)
   2727         possible_exports = set()
   2728         import types
   2729         for name, value in subprocess.__dict__.items():
   2730             if name.startswith('_'):
   2731                 continue
   2732             if isinstance(value, (types.ModuleType,)):
   2733                 continue
   2734             possible_exports.add(name)
   2735         self.assertEqual(exported, possible_exports - intentionally_excluded)
   2736 
   2737 
   2738 @unittest.skipUnless(hasattr(selectors, 'PollSelector'),
   2739                      "Test needs selectors.PollSelector")
   2740 class ProcessTestCaseNoPoll(ProcessTestCase):
   2741     def setUp(self):
   2742         self.orig_selector = subprocess._PopenSelector
   2743         subprocess._PopenSelector = selectors.SelectSelector
   2744         ProcessTestCase.setUp(self)
   2745 
   2746     def tearDown(self):
   2747         subprocess._PopenSelector = self.orig_selector
   2748         ProcessTestCase.tearDown(self)
   2749 
   2750 
   2751 @unittest.skipUnless(mswindows, "Windows-specific tests")
   2752 class CommandsWithSpaces (BaseTestCase):
   2753 
   2754     def setUp(self):
   2755         super().setUp()
   2756         f, fname = tempfile.mkstemp(".py", "te st")
   2757         self.fname = fname.lower ()
   2758         os.write(f, b"import sys;"
   2759                     b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
   2760         )
   2761         os.close(f)
   2762 
   2763     def tearDown(self):
   2764         os.remove(self.fname)
   2765         super().tearDown()
   2766 
   2767     def with_spaces(self, *args, **kwargs):
   2768         kwargs['stdout'] = subprocess.PIPE
   2769         p = subprocess.Popen(*args, **kwargs)
   2770         with p:
   2771             self.assertEqual(
   2772               p.stdout.read ().decode("mbcs"),
   2773               "2 [%r, 'ab cd']" % self.fname
   2774             )
   2775 
   2776     def test_shell_string_with_spaces(self):
   2777         # call() function with string argument with spaces on Windows
   2778         self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
   2779                                              "ab cd"), shell=1)
   2780 
   2781     def test_shell_sequence_with_spaces(self):
   2782         # call() function with sequence argument with spaces on Windows
   2783         self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
   2784 
   2785     def test_noshell_string_with_spaces(self):
   2786         # call() function with string argument with spaces on Windows
   2787         self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
   2788                              "ab cd"))
   2789 
   2790     def test_noshell_sequence_with_spaces(self):
   2791         # call() function with sequence argument with spaces on Windows
   2792         self.with_spaces([sys.executable, self.fname, "ab cd"])
   2793 
   2794 
   2795 class ContextManagerTests(BaseTestCase):
   2796 
   2797     def test_pipe(self):
   2798         with subprocess.Popen([sys.executable, "-c",
   2799                                "import sys;"
   2800                                "sys.stdout.write('stdout');"
   2801                                "sys.stderr.write('stderr');"],
   2802                               stdout=subprocess.PIPE,
   2803                               stderr=subprocess.PIPE) as proc:
   2804             self.assertEqual(proc.stdout.read(), b"stdout")
   2805             self.assertStderrEqual(proc.stderr.read(), b"stderr")
   2806 
   2807         self.assertTrue(proc.stdout.closed)
   2808         self.assertTrue(proc.stderr.closed)
   2809 
   2810     def test_returncode(self):
   2811         with subprocess.Popen([sys.executable, "-c",
   2812                                "import sys; sys.exit(100)"]) as proc:
   2813             pass
   2814         # __exit__ calls wait(), so the returncode should be set
   2815         self.assertEqual(proc.returncode, 100)
   2816 
   2817     def test_communicate_stdin(self):
   2818         with subprocess.Popen([sys.executable, "-c",
   2819                               "import sys;"
   2820                               "sys.exit(sys.stdin.read() == 'context')"],
   2821                              stdin=subprocess.PIPE) as proc:
   2822             proc.communicate(b"context")
   2823             self.assertEqual(proc.returncode, 1)
   2824 
   2825     def test_invalid_args(self):
   2826         with self.assertRaises((FileNotFoundError, PermissionError)) as c:
   2827             with subprocess.Popen(['nonexisting_i_hope'],
   2828                                   stdout=subprocess.PIPE,
   2829                                   stderr=subprocess.PIPE) as proc:
   2830                 pass
   2831 
   2832     def test_broken_pipe_cleanup(self):
   2833         """Broken pipe error should not prevent wait() (Issue 21619)"""
   2834         proc = subprocess.Popen([sys.executable, '-c', 'pass'],
   2835                                 stdin=subprocess.PIPE,
   2836                                 bufsize=support.PIPE_MAX_SIZE*2)
   2837         proc = proc.__enter__()
   2838         # Prepare to send enough data to overflow any OS pipe buffering and
   2839         # guarantee a broken pipe error. Data is held in BufferedWriter
   2840         # buffer until closed.
   2841         proc.stdin.write(b'x' * support.PIPE_MAX_SIZE)
   2842         self.assertIsNone(proc.returncode)
   2843         # EPIPE expected under POSIX; EINVAL under Windows
   2844         self.assertRaises(OSError, proc.__exit__, None, None, None)
   2845         self.assertEqual(proc.returncode, 0)
   2846         self.assertTrue(proc.stdin.closed)
   2847 
   2848 
   2849 if __name__ == "__main__":
   2850     unittest.main()
   2851