Home | History | Annotate | Download | only in asyncio
      1 import collections
      2 import subprocess
      3 import warnings
      4 
      5 from . import compat
      6 from . import protocols
      7 from . import transports
      8 from .coroutines import coroutine
      9 from .log import logger
     10 
     11 
     12 class BaseSubprocessTransport(transports.SubprocessTransport):
     13 
     14     def __init__(self, loop, protocol, args, shell,
     15                  stdin, stdout, stderr, bufsize,
     16                  waiter=None, extra=None, **kwargs):
     17         super().__init__(extra)
     18         self._closed = False
     19         self._protocol = protocol
     20         self._loop = loop
     21         self._proc = None
     22         self._pid = None
     23         self._returncode = None
     24         self._exit_waiters = []
     25         self._pending_calls = collections.deque()
     26         self._pipes = {}
     27         self._finished = False
     28 
     29         if stdin == subprocess.PIPE:
     30             self._pipes[0] = None
     31         if stdout == subprocess.PIPE:
     32             self._pipes[1] = None
     33         if stderr == subprocess.PIPE:
     34             self._pipes[2] = None
     35 
     36         # Create the child process: set the _proc attribute
     37         try:
     38             self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
     39                         stderr=stderr, bufsize=bufsize, **kwargs)
     40         except:
     41             self.close()
     42             raise
     43 
     44         self._pid = self._proc.pid
     45         self._extra['subprocess'] = self._proc
     46 
     47         if self._loop.get_debug():
     48             if isinstance(args, (bytes, str)):
     49                 program = args
     50             else:
     51                 program = args[0]
     52             logger.debug('process %r created: pid %s',
     53                          program, self._pid)
     54 
     55         self._loop.create_task(self._connect_pipes(waiter))
     56 
     57     def __repr__(self):
     58         info = [self.__class__.__name__]
     59         if self._closed:
     60             info.append('closed')
     61         if self._pid is not None:
     62             info.append('pid=%s' % self._pid)
     63         if self._returncode is not None:
     64             info.append('returncode=%s' % self._returncode)
     65         elif self._pid is not None:
     66             info.append('running')
     67         else:
     68             info.append('not started')
     69 
     70         stdin = self._pipes.get(0)
     71         if stdin is not None:
     72             info.append('stdin=%s' % stdin.pipe)
     73 
     74         stdout = self._pipes.get(1)
     75         stderr = self._pipes.get(2)
     76         if stdout is not None and stderr is stdout:
     77             info.append('stdout=stderr=%s' % stdout.pipe)
     78         else:
     79             if stdout is not None:
     80                 info.append('stdout=%s' % stdout.pipe)
     81             if stderr is not None:
     82                 info.append('stderr=%s' % stderr.pipe)
     83 
     84         return '<%s>' % ' '.join(info)
     85 
     86     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
     87         raise NotImplementedError
     88 
     89     def set_protocol(self, protocol):
     90         self._protocol = protocol
     91 
     92     def get_protocol(self):
     93         return self._protocol
     94 
     95     def is_closing(self):
     96         return self._closed
     97 
     98     def close(self):
     99         if self._closed:
    100             return
    101         self._closed = True
    102 
    103         for proto in self._pipes.values():
    104             if proto is None:
    105                 continue
    106             proto.pipe.close()
    107 
    108         if (self._proc is not None
    109         # the child process finished?
    110         and self._returncode is None
    111         # the child process finished but the transport was not notified yet?
    112         and self._proc.poll() is None
    113         ):
    114             if self._loop.get_debug():
    115                 logger.warning('Close running child process: kill %r', self)
    116 
    117             try:
    118                 self._proc.kill()
    119             except ProcessLookupError:
    120                 pass
    121 
    122             # Don't clear the _proc reference yet: _post_init() may still run
    123 
    124     # On Python 3.3 and older, objects with a destructor part of a reference
    125     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
    126     # to the PEP 442.
    127     if compat.PY34:
    128         def __del__(self):
    129             if not self._closed:
    130                 warnings.warn("unclosed transport %r" % self, ResourceWarning,
    131                               source=self)
    132                 self.close()
    133 
    134     def get_pid(self):
    135         return self._pid
    136 
    137     def get_returncode(self):
    138         return self._returncode
    139 
    140     def get_pipe_transport(self, fd):
    141         if fd in self._pipes:
    142             return self._pipes[fd].pipe
    143         else:
    144             return None
    145 
    146     def _check_proc(self):
    147         if self._proc is None:
    148             raise ProcessLookupError()
    149 
    150     def send_signal(self, signal):
    151         self._check_proc()
    152         self._proc.send_signal(signal)
    153 
    154     def terminate(self):
    155         self._check_proc()
    156         self._proc.terminate()
    157 
    158     def kill(self):
    159         self._check_proc()
    160         self._proc.kill()
    161 
    162     @coroutine
    163     def _connect_pipes(self, waiter):
    164         try:
    165             proc = self._proc
    166             loop = self._loop
    167 
    168             if proc.stdin is not None:
    169                 _, pipe = yield from loop.connect_write_pipe(
    170                     lambda: WriteSubprocessPipeProto(self, 0),
    171                     proc.stdin)
    172                 self._pipes[0] = pipe
    173 
    174             if proc.stdout is not None:
    175                 _, pipe = yield from loop.connect_read_pipe(
    176                     lambda: ReadSubprocessPipeProto(self, 1),
    177                     proc.stdout)
    178                 self._pipes[1] = pipe
    179 
    180             if proc.stderr is not None:
    181                 _, pipe = yield from loop.connect_read_pipe(
    182                     lambda: ReadSubprocessPipeProto(self, 2),
    183                     proc.stderr)
    184                 self._pipes[2] = pipe
    185 
    186             assert self._pending_calls is not None
    187 
    188             loop.call_soon(self._protocol.connection_made, self)
    189             for callback, data in self._pending_calls:
    190                 loop.call_soon(callback, *data)
    191             self._pending_calls = None
    192         except Exception as exc:
    193             if waiter is not None and not waiter.cancelled():
    194                 waiter.set_exception(exc)
    195         else:
    196             if waiter is not None and not waiter.cancelled():
    197                 waiter.set_result(None)
    198 
    199     def _call(self, cb, *data):
    200         if self._pending_calls is not None:
    201             self._pending_calls.append((cb, data))
    202         else:
    203             self._loop.call_soon(cb, *data)
    204 
    205     def _pipe_connection_lost(self, fd, exc):
    206         self._call(self._protocol.pipe_connection_lost, fd, exc)
    207         self._try_finish()
    208 
    209     def _pipe_data_received(self, fd, data):
    210         self._call(self._protocol.pipe_data_received, fd, data)
    211 
    212     def _process_exited(self, returncode):
    213         assert returncode is not None, returncode
    214         assert self._returncode is None, self._returncode
    215         if self._loop.get_debug():
    216             logger.info('%r exited with return code %r',
    217                         self, returncode)
    218         self._returncode = returncode
    219         if self._proc.returncode is None:
    220             # asyncio uses a child watcher: copy the status into the Popen
    221             # object. On Python 3.6, it is required to avoid a ResourceWarning.
    222             self._proc.returncode = returncode
    223         self._call(self._protocol.process_exited)
    224         self._try_finish()
    225 
    226         # wake up futures waiting for wait()
    227         for waiter in self._exit_waiters:
    228             if not waiter.cancelled():
    229                 waiter.set_result(returncode)
    230         self._exit_waiters = None
    231 
    232     @coroutine
    233     def _wait(self):
    234         """Wait until the process exit and return the process return code.
    235 
    236         This method is a coroutine."""
    237         if self._returncode is not None:
    238             return self._returncode
    239 
    240         waiter = self._loop.create_future()
    241         self._exit_waiters.append(waiter)
    242         return (yield from waiter)
    243 
    244     def _try_finish(self):
    245         assert not self._finished
    246         if self._returncode is None:
    247             return
    248         if all(p is not None and p.disconnected
    249                for p in self._pipes.values()):
    250             self._finished = True
    251             self._call(self._call_connection_lost, None)
    252 
    253     def _call_connection_lost(self, exc):
    254         try:
    255             self._protocol.connection_lost(exc)
    256         finally:
    257             self._loop = None
    258             self._proc = None
    259             self._protocol = None
    260 
    261 
    262 class WriteSubprocessPipeProto(protocols.BaseProtocol):
    263 
    264     def __init__(self, proc, fd):
    265         self.proc = proc
    266         self.fd = fd
    267         self.pipe = None
    268         self.disconnected = False
    269 
    270     def connection_made(self, transport):
    271         self.pipe = transport
    272 
    273     def __repr__(self):
    274         return ('<%s fd=%s pipe=%r>'
    275                 % (self.__class__.__name__, self.fd, self.pipe))
    276 
    277     def connection_lost(self, exc):
    278         self.disconnected = True
    279         self.proc._pipe_connection_lost(self.fd, exc)
    280         self.proc = None
    281 
    282     def pause_writing(self):
    283         self.proc._protocol.pause_writing()
    284 
    285     def resume_writing(self):
    286         self.proc._protocol.resume_writing()
    287 
    288 
    289 class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
    290                               protocols.Protocol):
    291 
    292     def data_received(self, data):
    293         self.proc._pipe_data_received(self.fd, data)
    294