Home | History | Annotate | Download | only in asyncio
      1 __all__ = ['create_subprocess_exec', 'create_subprocess_shell']
      2 
      3 import subprocess
      4 
      5 from . import events
      6 from . import protocols
      7 from . import streams
      8 from . import tasks
      9 from .coroutines import coroutine
     10 from .log import logger
     11 
     12 
     13 PIPE = subprocess.PIPE
     14 STDOUT = subprocess.STDOUT
     15 DEVNULL = subprocess.DEVNULL
     16 
     17 
     18 class SubprocessStreamProtocol(streams.FlowControlMixin,
     19                                protocols.SubprocessProtocol):
     20     """Like StreamReaderProtocol, but for a subprocess."""
     21 
     22     def __init__(self, limit, loop):
     23         super().__init__(loop=loop)
     24         self._limit = limit
     25         self.stdin = self.stdout = self.stderr = None
     26         self._transport = None
     27         self._process_exited = False
     28         self._pipe_fds = []
     29 
     30     def __repr__(self):
     31         info = [self.__class__.__name__]
     32         if self.stdin is not None:
     33             info.append('stdin=%r' % self.stdin)
     34         if self.stdout is not None:
     35             info.append('stdout=%r' % self.stdout)
     36         if self.stderr is not None:
     37             info.append('stderr=%r' % self.stderr)
     38         return '<%s>' % ' '.join(info)
     39 
     40     def connection_made(self, transport):
     41         self._transport = transport
     42 
     43         stdout_transport = transport.get_pipe_transport(1)
     44         if stdout_transport is not None:
     45             self.stdout = streams.StreamReader(limit=self._limit,
     46                                                loop=self._loop)
     47             self.stdout.set_transport(stdout_transport)
     48             self._pipe_fds.append(1)
     49 
     50         stderr_transport = transport.get_pipe_transport(2)
     51         if stderr_transport is not None:
     52             self.stderr = streams.StreamReader(limit=self._limit,
     53                                                loop=self._loop)
     54             self.stderr.set_transport(stderr_transport)
     55             self._pipe_fds.append(2)
     56 
     57         stdin_transport = transport.get_pipe_transport(0)
     58         if stdin_transport is not None:
     59             self.stdin = streams.StreamWriter(stdin_transport,
     60                                               protocol=self,
     61                                               reader=None,
     62                                               loop=self._loop)
     63 
     64     def pipe_data_received(self, fd, data):
     65         if fd == 1:
     66             reader = self.stdout
     67         elif fd == 2:
     68             reader = self.stderr
     69         else:
     70             reader = None
     71         if reader is not None:
     72             reader.feed_data(data)
     73 
     74     def pipe_connection_lost(self, fd, exc):
     75         if fd == 0:
     76             pipe = self.stdin
     77             if pipe is not None:
     78                 pipe.close()
     79             self.connection_lost(exc)
     80             return
     81         if fd == 1:
     82             reader = self.stdout
     83         elif fd == 2:
     84             reader = self.stderr
     85         else:
     86             reader = None
     87         if reader != None:
     88             if exc is None:
     89                 reader.feed_eof()
     90             else:
     91                 reader.set_exception(exc)
     92 
     93         if fd in self._pipe_fds:
     94             self._pipe_fds.remove(fd)
     95         self._maybe_close_transport()
     96 
     97     def process_exited(self):
     98         self._process_exited = True
     99         self._maybe_close_transport()
    100 
    101     def _maybe_close_transport(self):
    102         if len(self._pipe_fds) == 0 and self._process_exited:
    103             self._transport.close()
    104             self._transport = None
    105 
    106 
    107 class Process:
    108     def __init__(self, transport, protocol, loop):
    109         self._transport = transport
    110         self._protocol = protocol
    111         self._loop = loop
    112         self.stdin = protocol.stdin
    113         self.stdout = protocol.stdout
    114         self.stderr = protocol.stderr
    115         self.pid = transport.get_pid()
    116 
    117     def __repr__(self):
    118         return '<%s %s>' % (self.__class__.__name__, self.pid)
    119 
    120     @property
    121     def returncode(self):
    122         return self._transport.get_returncode()
    123 
    124     @coroutine
    125     def wait(self):
    126         """Wait until the process exit and return the process return code.
    127 
    128         This method is a coroutine."""
    129         return (yield from self._transport._wait())
    130 
    131     def send_signal(self, signal):
    132         self._transport.send_signal(signal)
    133 
    134     def terminate(self):
    135         self._transport.terminate()
    136 
    137     def kill(self):
    138         self._transport.kill()
    139 
    140     @coroutine
    141     def _feed_stdin(self, input):
    142         debug = self._loop.get_debug()
    143         self.stdin.write(input)
    144         if debug:
    145             logger.debug('%r communicate: feed stdin (%s bytes)',
    146                         self, len(input))
    147         try:
    148             yield from self.stdin.drain()
    149         except (BrokenPipeError, ConnectionResetError) as exc:
    150             # communicate() ignores BrokenPipeError and ConnectionResetError
    151             if debug:
    152                 logger.debug('%r communicate: stdin got %r', self, exc)
    153 
    154         if debug:
    155             logger.debug('%r communicate: close stdin', self)
    156         self.stdin.close()
    157 
    158     @coroutine
    159     def _noop(self):
    160         return None
    161 
    162     @coroutine
    163     def _read_stream(self, fd):
    164         transport = self._transport.get_pipe_transport(fd)
    165         if fd == 2:
    166             stream = self.stderr
    167         else:
    168             assert fd == 1
    169             stream = self.stdout
    170         if self._loop.get_debug():
    171             name = 'stdout' if fd == 1 else 'stderr'
    172             logger.debug('%r communicate: read %s', self, name)
    173         output = yield from stream.read()
    174         if self._loop.get_debug():
    175             name = 'stdout' if fd == 1 else 'stderr'
    176             logger.debug('%r communicate: close %s', self, name)
    177         transport.close()
    178         return output
    179 
    180     @coroutine
    181     def communicate(self, input=None):
    182         if input is not None:
    183             stdin = self._feed_stdin(input)
    184         else:
    185             stdin = self._noop()
    186         if self.stdout is not None:
    187             stdout = self._read_stream(1)
    188         else:
    189             stdout = self._noop()
    190         if self.stderr is not None:
    191             stderr = self._read_stream(2)
    192         else:
    193             stderr = self._noop()
    194         stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
    195                                                         loop=self._loop)
    196         yield from self.wait()
    197         return (stdout, stderr)
    198 
    199 
    200 @coroutine
    201 def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
    202                             loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
    203     if loop is None:
    204         loop = events.get_event_loop()
    205     protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
    206                                                         loop=loop)
    207     transport, protocol = yield from loop.subprocess_shell(
    208                                             protocol_factory,
    209                                             cmd, stdin=stdin, stdout=stdout,
    210                                             stderr=stderr, **kwds)
    211     return Process(transport, protocol, loop)
    212 
    213 @coroutine
    214 def create_subprocess_exec(program, *args, stdin=None, stdout=None,
    215                            stderr=None, loop=None,
    216                            limit=streams._DEFAULT_LIMIT, **kwds):
    217     if loop is None:
    218         loop = events.get_event_loop()
    219     protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
    220                                                         loop=loop)
    221     transport, protocol = yield from loop.subprocess_exec(
    222                                             protocol_factory,
    223                                             program, *args,
    224                                             stdin=stdin, stdout=stdout,
    225                                             stderr=stderr, **kwds)
    226     return Process(transport, protocol, loop)
    227