Home | History | Annotate | Download | only in patman
      1 # Copyright (c) 2012 The Chromium OS Authors.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 #
      5 # Copyright (c) 2003-2005 by Peter Astrand <astrand (at] lysator.liu.se>
      6 # Licensed to PSF under a Contributor Agreement.
      7 # See http://www.python.org/2.4/license for licensing details.
      8 
      9 """Subprocress execution
     10 
     11 This module holds a subclass of subprocess.Popen with our own required
     12 features, mainly that we get access to the subprocess output while it
     13 is running rather than just at the end. This makes it easiler to show
     14 progress information and filter output in real time.
     15 """
     16 
     17 import errno
     18 import os
     19 import pty
     20 import select
     21 import subprocess
     22 import sys
     23 import unittest
     24 
     25 
     26 # Import these here so the caller does not need to import subprocess also.
     27 PIPE = subprocess.PIPE
     28 STDOUT = subprocess.STDOUT
     29 PIPE_PTY = -3     # Pipe output through a pty
     30 stay_alive = True
     31 
     32 
     33 class Popen(subprocess.Popen):
     34     """Like subprocess.Popen with ptys and incremental output
     35 
     36     This class deals with running a child process and filtering its output on
     37     both stdout and stderr while it is running. We do this so we can monitor
     38     progress, and possibly relay the output to the user if requested.
     39 
     40     The class is similar to subprocess.Popen, the equivalent is something like:
     41 
     42         Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     43 
     44     But this class has many fewer features, and two enhancement:
     45 
     46     1. Rather than getting the output data only at the end, this class sends it
     47          to a provided operation as it arrives.
     48     2. We use pseudo terminals so that the child will hopefully flush its output
     49          to us as soon as it is produced, rather than waiting for the end of a
     50          line.
     51 
     52     Use CommunicateFilter() to handle output from the subprocess.
     53 
     54     """
     55 
     56     def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
     57                  shell=False, cwd=None, env=None, **kwargs):
     58         """Cut-down constructor
     59 
     60         Args:
     61             args: Program and arguments for subprocess to execute.
     62             stdin: See subprocess.Popen()
     63             stdout: See subprocess.Popen(), except that we support the sentinel
     64                     value of cros_subprocess.PIPE_PTY.
     65             stderr: See subprocess.Popen(), except that we support the sentinel
     66                     value of cros_subprocess.PIPE_PTY.
     67             shell: See subprocess.Popen()
     68             cwd: Working directory to change to for subprocess, or None if none.
     69             env: Environment to use for this subprocess, or None to inherit parent.
     70             kwargs: No other arguments are supported at the moment.    Passing other
     71                     arguments will cause a ValueError to be raised.
     72         """
     73         stdout_pty = None
     74         stderr_pty = None
     75 
     76         if stdout == PIPE_PTY:
     77             stdout_pty = pty.openpty()
     78             stdout = os.fdopen(stdout_pty[1])
     79         if stderr == PIPE_PTY:
     80             stderr_pty = pty.openpty()
     81             stderr = os.fdopen(stderr_pty[1])
     82 
     83         super(Popen, self).__init__(args, stdin=stdin,
     84                 stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
     85                 **kwargs)
     86 
     87         # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
     88         # We want to use the master half on our end from now on.    Setting this here
     89         # does make some assumptions about the implementation of subprocess, but
     90         # those assumptions are pretty minor.
     91 
     92         # Note that if stderr is STDOUT, then self.stderr will be set to None by
     93         # this constructor.
     94         if stdout_pty is not None:
     95             self.stdout = os.fdopen(stdout_pty[0])
     96         if stderr_pty is not None:
     97             self.stderr = os.fdopen(stderr_pty[0])
     98 
     99         # Insist that unit tests exist for other arguments we don't support.
    100         if kwargs:
    101             raise ValueError("Unit tests do not test extra args - please add tests")
    102 
    103     def CommunicateFilter(self, output):
    104         """Interact with process: Read data from stdout and stderr.
    105 
    106         This method runs until end-of-file is reached, then waits for the
    107         subprocess to terminate.
    108 
    109         The output function is sent all output from the subprocess and must be
    110         defined like this:
    111 
    112             def Output([self,] stream, data)
    113             Args:
    114                 stream: the stream the output was received on, which will be
    115                         sys.stdout or sys.stderr.
    116                 data: a string containing the data
    117 
    118         Note: The data read is buffered in memory, so do not use this
    119         method if the data size is large or unlimited.
    120 
    121         Args:
    122             output: Function to call with each fragment of output.
    123 
    124         Returns:
    125             A tuple (stdout, stderr, combined) which is the data received on
    126             stdout, stderr and the combined data (interleaved stdout and stderr).
    127 
    128             Note that the interleaved output will only be sensible if you have
    129             set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
    130             the timing of the output in the subprocess. If a subprocess flips
    131             between stdout and stderr quickly in succession, by the time we come to
    132             read the output from each we may see several lines in each, and will read
    133             all the stdout lines, then all the stderr lines. So the interleaving
    134             may not be correct. In this case you might want to pass
    135             stderr=cros_subprocess.STDOUT to the constructor.
    136 
    137             This feature is still useful for subprocesses where stderr is
    138             rarely used and indicates an error.
    139 
    140             Note also that if you set stderr to STDOUT, then stderr will be empty
    141             and the combined output will just be the same as stdout.
    142         """
    143 
    144         read_set = []
    145         write_set = []
    146         stdout = None # Return
    147         stderr = None # Return
    148 
    149         if self.stdin:
    150             # Flush stdio buffer.    This might block, if the user has
    151             # been writing to .stdin in an uncontrolled fashion.
    152             self.stdin.flush()
    153             if input:
    154                 write_set.append(self.stdin)
    155             else:
    156                 self.stdin.close()
    157         if self.stdout:
    158             read_set.append(self.stdout)
    159             stdout = []
    160         if self.stderr and self.stderr != self.stdout:
    161             read_set.append(self.stderr)
    162             stderr = []
    163         combined = []
    164 
    165         input_offset = 0
    166         while read_set or write_set:
    167             try:
    168                 rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
    169             except select.error as e:
    170                 if e.args[0] == errno.EINTR:
    171                     continue
    172                 raise
    173 
    174             if not stay_alive:
    175                     self.terminate()
    176 
    177             if self.stdin in wlist:
    178                 # When select has indicated that the file is writable,
    179                 # we can write up to PIPE_BUF bytes without risk
    180                 # blocking.    POSIX defines PIPE_BUF >= 512
    181                 chunk = input[input_offset : input_offset + 512]
    182                 bytes_written = os.write(self.stdin.fileno(), chunk)
    183                 input_offset += bytes_written
    184                 if input_offset >= len(input):
    185                     self.stdin.close()
    186                     write_set.remove(self.stdin)
    187 
    188             if self.stdout in rlist:
    189                 data = ""
    190                 # We will get an error on read if the pty is closed
    191                 try:
    192                     data = os.read(self.stdout.fileno(), 1024)
    193                 except OSError:
    194                     pass
    195                 if data == "":
    196                     self.stdout.close()
    197                     read_set.remove(self.stdout)
    198                 else:
    199                     stdout.append(data)
    200                     combined.append(data)
    201                     if output:
    202                         output(sys.stdout, data)
    203             if self.stderr in rlist:
    204                 data = ""
    205                 # We will get an error on read if the pty is closed
    206                 try:
    207                     data = os.read(self.stderr.fileno(), 1024)
    208                 except OSError:
    209                     pass
    210                 if data == "":
    211                     self.stderr.close()
    212                     read_set.remove(self.stderr)
    213                 else:
    214                     stderr.append(data)
    215                     combined.append(data)
    216                     if output:
    217                         output(sys.stderr, data)
    218 
    219         # All data exchanged.    Translate lists into strings.
    220         if stdout is not None:
    221             stdout = ''.join(stdout)
    222         else:
    223             stdout = ''
    224         if stderr is not None:
    225             stderr = ''.join(stderr)
    226         else:
    227             stderr = ''
    228         combined = ''.join(combined)
    229 
    230         # Translate newlines, if requested.    We cannot let the file
    231         # object do the translation: It is based on stdio, which is
    232         # impossible to combine with select (unless forcing no
    233         # buffering).
    234         if self.universal_newlines and hasattr(file, 'newlines'):
    235             if stdout:
    236                 stdout = self._translate_newlines(stdout)
    237             if stderr:
    238                 stderr = self._translate_newlines(stderr)
    239 
    240         self.wait()
    241         return (stdout, stderr, combined)
    242 
    243 
    244 # Just being a unittest.TestCase gives us 14 public methods.    Unless we
    245 # disable this, we can only have 6 tests in a TestCase.    That's not enough.
    246 #
    247 # pylint: disable=R0904
    248 
    249 class TestSubprocess(unittest.TestCase):
    250     """Our simple unit test for this module"""
    251 
    252     class MyOperation:
    253         """Provides a operation that we can pass to Popen"""
    254         def __init__(self, input_to_send=None):
    255             """Constructor to set up the operation and possible input.
    256 
    257             Args:
    258                 input_to_send: a text string to send when we first get input. We will
    259                     add \r\n to the string.
    260             """
    261             self.stdout_data = ''
    262             self.stderr_data = ''
    263             self.combined_data = ''
    264             self.stdin_pipe = None
    265             self._input_to_send = input_to_send
    266             if input_to_send:
    267                 pipe = os.pipe()
    268                 self.stdin_read_pipe = pipe[0]
    269                 self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
    270 
    271         def Output(self, stream, data):
    272             """Output handler for Popen. Stores the data for later comparison"""
    273             if stream == sys.stdout:
    274                 self.stdout_data += data
    275             if stream == sys.stderr:
    276                 self.stderr_data += data
    277             self.combined_data += data
    278 
    279             # Output the input string if we have one.
    280             if self._input_to_send:
    281                 self._stdin_write_pipe.write(self._input_to_send + '\r\n')
    282                 self._stdin_write_pipe.flush()
    283 
    284     def _BasicCheck(self, plist, oper):
    285         """Basic checks that the output looks sane."""
    286         self.assertEqual(plist[0], oper.stdout_data)
    287         self.assertEqual(plist[1], oper.stderr_data)
    288         self.assertEqual(plist[2], oper.combined_data)
    289 
    290         # The total length of stdout and stderr should equal the combined length
    291         self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
    292 
    293     def test_simple(self):
    294         """Simple redirection: Get process list"""
    295         oper = TestSubprocess.MyOperation()
    296         plist = Popen(['ps']).CommunicateFilter(oper.Output)
    297         self._BasicCheck(plist, oper)
    298 
    299     def test_stderr(self):
    300         """Check stdout and stderr"""
    301         oper = TestSubprocess.MyOperation()
    302         cmd = 'echo fred >/dev/stderr && false || echo bad'
    303         plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
    304         self._BasicCheck(plist, oper)
    305         self.assertEqual(plist [0], 'bad\r\n')
    306         self.assertEqual(plist [1], 'fred\r\n')
    307 
    308     def test_shell(self):
    309         """Check with and without shell works"""
    310         oper = TestSubprocess.MyOperation()
    311         cmd = 'echo test >/dev/stderr'
    312         self.assertRaises(OSError, Popen, [cmd], shell=False)
    313         plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
    314         self._BasicCheck(plist, oper)
    315         self.assertEqual(len(plist [0]), 0)
    316         self.assertEqual(plist [1], 'test\r\n')
    317 
    318     def test_list_args(self):
    319         """Check with and without shell works using list arguments"""
    320         oper = TestSubprocess.MyOperation()
    321         cmd = ['echo', 'test', '>/dev/stderr']
    322         plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
    323         self._BasicCheck(plist, oper)
    324         self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
    325         self.assertEqual(len(plist [1]), 0)
    326 
    327         oper = TestSubprocess.MyOperation()
    328 
    329         # this should be interpreted as 'echo' with the other args dropped
    330         cmd = ['echo', 'test', '>/dev/stderr']
    331         plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
    332         self._BasicCheck(plist, oper)
    333         self.assertEqual(plist [0], '\r\n')
    334 
    335     def test_cwd(self):
    336         """Check we can change directory"""
    337         for shell in (False, True):
    338             oper = TestSubprocess.MyOperation()
    339             plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
    340             self._BasicCheck(plist, oper)
    341             self.assertEqual(plist [0], '/tmp\r\n')
    342 
    343     def test_env(self):
    344         """Check we can change environment"""
    345         for add in (False, True):
    346             oper = TestSubprocess.MyOperation()
    347             env = os.environ
    348             if add:
    349                 env ['FRED'] = 'fred'
    350             cmd = 'echo $FRED'
    351             plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
    352             self._BasicCheck(plist, oper)
    353             self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
    354 
    355     def test_extra_args(self):
    356         """Check we can't add extra arguments"""
    357         self.assertRaises(ValueError, Popen, 'true', close_fds=False)
    358 
    359     def test_basic_input(self):
    360         """Check that incremental input works
    361 
    362         We set up a subprocess which will prompt for name. When we see this prompt
    363         we send the name as input to the process. It should then print the name
    364         properly to stdout.
    365         """
    366         oper = TestSubprocess.MyOperation('Flash')
    367         prompt = 'What is your name?: '
    368         cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
    369         plist = Popen([cmd], stdin=oper.stdin_read_pipe,
    370                 shell=True).CommunicateFilter(oper.Output)
    371         self._BasicCheck(plist, oper)
    372         self.assertEqual(len(plist [1]), 0)
    373         self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
    374 
    375     def test_isatty(self):
    376         """Check that ptys appear as terminals to the subprocess"""
    377         oper = TestSubprocess.MyOperation()
    378         cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
    379                 'else echo "not %d" >&%d; fi;')
    380         both_cmds = ''
    381         for fd in (1, 2):
    382             both_cmds += cmd % (fd, fd, fd, fd, fd)
    383         plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
    384         self._BasicCheck(plist, oper)
    385         self.assertEqual(plist [0], 'terminal 1\r\n')
    386         self.assertEqual(plist [1], 'terminal 2\r\n')
    387 
    388         # Now try with PIPE and make sure it is not a terminal
    389         oper = TestSubprocess.MyOperation()
    390         plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    391                 shell=True).CommunicateFilter(oper.Output)
    392         self._BasicCheck(plist, oper)
    393         self.assertEqual(plist [0], 'not 1\n')
    394         self.assertEqual(plist [1], 'not 2\n')
    395 
    396 if __name__ == '__main__':
    397     unittest.main()
    398