Home | History | Annotate | Download | only in asyncio
      1 import collections
      2 import subprocess
      3 import warnings
      4 
      5 from . import protocols
      6 from . import transports
      7 from .log import logger
      8 
      9 
     10 class BaseSubprocessTransport(transports.SubprocessTransport):
     11 
     12     def __init__(self, loop, protocol, args, shell,
     13                  stdin, stdout, stderr, bufsize,
     14                  waiter=None, extra=None, **kwargs):
     15         super().__init__(extra)
     16         self._closed = False
     17         self._protocol = protocol
     18         self._loop = loop
     19         self._proc = None
     20         self._pid = None
     21         self._returncode = None
     22         self._exit_waiters = []
     23         self._pending_calls = collections.deque()
     24         self._pipes = {}
     25         self._finished = False
     26 
     27         if stdin == subprocess.PIPE:
     28             self._pipes[0] = None
     29         if stdout == subprocess.PIPE:
     30             self._pipes[1] = None
     31         if stderr == subprocess.PIPE:
     32             self._pipes[2] = None
     33 
     34         # Create the child process: set the _proc attribute
     35         try:
     36             self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
     37                         stderr=stderr, bufsize=bufsize, **kwargs)
     38         except:
     39             self.close()
     40             raise
     41 
     42         self._pid = self._proc.pid
     43         self._extra['subprocess'] = self._proc
     44 
     45         if self._loop.get_debug():
     46             if isinstance(args, (bytes, str)):
     47                 program = args
     48             else:
     49                 program = args[0]
     50             logger.debug('process %r created: pid %s',
     51                          program, self._pid)
     52 
     53         self._loop.create_task(self._connect_pipes(waiter))
     54 
     55     def __repr__(self):
     56         info = [self.__class__.__name__]
     57         if self._closed:
     58             info.append('closed')
     59         if self._pid is not None:
     60             info.append(f'pid={self._pid}')
     61         if self._returncode is not None:
     62             info.append(f'returncode={self._returncode}')
     63         elif self._pid is not None:
     64             info.append('running')
     65         else:
     66             info.append('not started')
     67 
     68         stdin = self._pipes.get(0)
     69         if stdin is not None:
     70             info.append(f'stdin={stdin.pipe}')
     71 
     72         stdout = self._pipes.get(1)
     73         stderr = self._pipes.get(2)
     74         if stdout is not None and stderr is stdout:
     75             info.append(f'stdout=stderr={stdout.pipe}')
     76         else:
     77             if stdout is not None:
     78                 info.append(f'stdout={stdout.pipe}')
     79             if stderr is not None:
     80                 info.append(f'stderr={stderr.pipe}')
     81 
     82         return '<{}>'.format(' '.join(info))
     83 
     84     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
     85         raise NotImplementedError
     86 
     87     def set_protocol(self, protocol):
     88         self._protocol = protocol
     89 
     90     def get_protocol(self):
     91         return self._protocol
     92 
     93     def is_closing(self):
     94         return self._closed
     95 
     96     def close(self):
     97         if self._closed:
     98             return
     99         self._closed = True
    100 
    101         for proto in self._pipes.values():
    102             if proto is None:
    103                 continue
    104             proto.pipe.close()
    105 
    106         if (self._proc is not None and
    107                 # has the child process finished?
    108                 self._returncode is None and
    109                 # the child process has finished, but the
    110                 # transport hasn't been notified yet?
    111                 self._proc.poll() is None):
    112 
    113             if self._loop.get_debug():
    114                 logger.warning('Close running child process: kill %r', self)
    115 
    116             try:
    117                 self._proc.kill()
    118             except ProcessLookupError:
    119                 pass
    120 
    121             # Don't clear the _proc reference yet: _post_init() may still run
    122 
    123     def __del__(self):
    124         if not self._closed:
    125             warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
    126                           source=self)
    127             self.close()
    128 
    129     def get_pid(self):
    130         return self._pid
    131 
    132     def get_returncode(self):
    133         return self._returncode
    134 
    135     def get_pipe_transport(self, fd):
    136         if fd in self._pipes:
    137             return self._pipes[fd].pipe
    138         else:
    139             return None
    140 
    141     def _check_proc(self):
    142         if self._proc is None:
    143             raise ProcessLookupError()
    144 
    145     def send_signal(self, signal):
    146         self._check_proc()
    147         self._proc.send_signal(signal)
    148 
    149     def terminate(self):
    150         self._check_proc()
    151         self._proc.terminate()
    152 
    153     def kill(self):
    154         self._check_proc()
    155         self._proc.kill()
    156 
    157     async def _connect_pipes(self, waiter):
    158         try:
    159             proc = self._proc
    160             loop = self._loop
    161 
    162             if proc.stdin is not None:
    163                 _, pipe = await loop.connect_write_pipe(
    164                     lambda: WriteSubprocessPipeProto(self, 0),
    165                     proc.stdin)
    166                 self._pipes[0] = pipe
    167 
    168             if proc.stdout is not None:
    169                 _, pipe = await loop.connect_read_pipe(
    170                     lambda: ReadSubprocessPipeProto(self, 1),
    171                     proc.stdout)
    172                 self._pipes[1] = pipe
    173 
    174             if proc.stderr is not None:
    175                 _, pipe = await loop.connect_read_pipe(
    176                     lambda: ReadSubprocessPipeProto(self, 2),
    177                     proc.stderr)
    178                 self._pipes[2] = pipe
    179 
    180             assert self._pending_calls is not None
    181 
    182             loop.call_soon(self._protocol.connection_made, self)
    183             for callback, data in self._pending_calls:
    184                 loop.call_soon(callback, *data)
    185             self._pending_calls = None
    186         except Exception as exc:
    187             if waiter is not None and not waiter.cancelled():
    188                 waiter.set_exception(exc)
    189         else:
    190             if waiter is not None and not waiter.cancelled():
    191                 waiter.set_result(None)
    192 
    193     def _call(self, cb, *data):
    194         if self._pending_calls is not None:
    195             self._pending_calls.append((cb, data))
    196         else:
    197             self._loop.call_soon(cb, *data)
    198 
    199     def _pipe_connection_lost(self, fd, exc):
    200         self._call(self._protocol.pipe_connection_lost, fd, exc)
    201         self._try_finish()
    202 
    203     def _pipe_data_received(self, fd, data):
    204         self._call(self._protocol.pipe_data_received, fd, data)
    205 
    206     def _process_exited(self, returncode):
    207         assert returncode is not None, returncode
    208         assert self._returncode is None, self._returncode
    209         if self._loop.get_debug():
    210             logger.info('%r exited with return code %r', self, returncode)
    211         self._returncode = returncode
    212         if self._proc.returncode is None:
    213             # asyncio uses a child watcher: copy the status into the Popen
    214             # object. On Python 3.6, it is required to avoid a ResourceWarning.
    215             self._proc.returncode = returncode
    216         self._call(self._protocol.process_exited)
    217         self._try_finish()
    218 
    219         # wake up futures waiting for wait()
    220         for waiter in self._exit_waiters:
    221             if not waiter.cancelled():
    222                 waiter.set_result(returncode)
    223         self._exit_waiters = None
    224 
    225     async def _wait(self):
    226         """Wait until the process exit and return the process return code.
    227 
    228         This method is a coroutine."""
    229         if self._returncode is not None:
    230             return self._returncode
    231 
    232         waiter = self._loop.create_future()
    233         self._exit_waiters.append(waiter)
    234         return await waiter
    235 
    236     def _try_finish(self):
    237         assert not self._finished
    238         if self._returncode is None:
    239             return
    240         if all(p is not None and p.disconnected
    241                for p in self._pipes.values()):
    242             self._finished = True
    243             self._call(self._call_connection_lost, None)
    244 
    245     def _call_connection_lost(self, exc):
    246         try:
    247             self._protocol.connection_lost(exc)
    248         finally:
    249             self._loop = None
    250             self._proc = None
    251             self._protocol = None
    252 
    253 
    254 class WriteSubprocessPipeProto(protocols.BaseProtocol):
    255 
    256     def __init__(self, proc, fd):
    257         self.proc = proc
    258         self.fd = fd
    259         self.pipe = None
    260         self.disconnected = False
    261 
    262     def connection_made(self, transport):
    263         self.pipe = transport
    264 
    265     def __repr__(self):
    266         return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>'
    267 
    268     def connection_lost(self, exc):
    269         self.disconnected = True
    270         self.proc._pipe_connection_lost(self.fd, exc)
    271         self.proc = None
    272 
    273     def pause_writing(self):
    274         self.proc._protocol.pause_writing()
    275 
    276     def resume_writing(self):
    277         self.proc._protocol.resume_writing()
    278 
    279 
    280 class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
    281                               protocols.Protocol):
    282 
    283     def data_received(self, data):
    284         self.proc._pipe_data_received(self.fd, data)
    285