Home | History | Annotate | Download | only in asyncio
      1 """Selector event loop for Unix with signal handling."""
      2 
      3 import errno
      4 import os
      5 import signal
      6 import socket
      7 import stat
      8 import subprocess
      9 import sys
     10 import threading
     11 import warnings
     12 
     13 
     14 from . import base_events
     15 from . import base_subprocess
     16 from . import compat
     17 from . import constants
     18 from . import coroutines
     19 from . import events
     20 from . import futures
     21 from . import selector_events
     22 from . import selectors
     23 from . import transports
     24 from .coroutines import coroutine
     25 from .log import logger
     26 
     27 
     28 __all__ = ['SelectorEventLoop',
     29            'AbstractChildWatcher', 'SafeChildWatcher',
     30            'FastChildWatcher', 'DefaultEventLoopPolicy',
     31            ]
     32 
     33 if sys.platform == 'win32':  # pragma: no cover
     34     raise ImportError('Signals are not really supported on Windows')
     35 
     36 
     37 def _sighandler_noop(signum, frame):
     38     """Dummy signal handler."""
     39     pass
     40 
     41 
     42 try:
     43     _fspath = os.fspath
     44 except AttributeError:
     45     # Python 3.5 or earlier
     46     _fspath = lambda path: path
     47 
     48 
     49 class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
     50     """Unix event loop.
     51 
     52     Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
     53     """
     54 
     55     def __init__(self, selector=None):
     56         super().__init__(selector)
     57         self._signal_handlers = {}
     58 
     59     def _socketpair(self):
     60         return socket.socketpair()
     61 
     62     def close(self):
     63         super().close()
     64         for sig in list(self._signal_handlers):
     65             self.remove_signal_handler(sig)
     66 
     67     def _process_self_data(self, data):
     68         for signum in data:
     69             if not signum:
     70                 # ignore null bytes written by _write_to_self()
     71                 continue
     72             self._handle_signal(signum)
     73 
     74     def add_signal_handler(self, sig, callback, *args):
     75         """Add a handler for a signal.  UNIX only.
     76 
     77         Raise ValueError if the signal number is invalid or uncatchable.
     78         Raise RuntimeError if there is a problem setting up the handler.
     79         """
     80         if (coroutines.iscoroutine(callback)
     81         or coroutines.iscoroutinefunction(callback)):
     82             raise TypeError("coroutines cannot be used "
     83                             "with add_signal_handler()")
     84         self._check_signal(sig)
     85         self._check_closed()
     86         try:
     87             # set_wakeup_fd() raises ValueError if this is not the
     88             # main thread.  By calling it early we ensure that an
     89             # event loop running in another thread cannot add a signal
     90             # handler.
     91             signal.set_wakeup_fd(self._csock.fileno())
     92         except (ValueError, OSError) as exc:
     93             raise RuntimeError(str(exc))
     94 
     95         handle = events.Handle(callback, args, self)
     96         self._signal_handlers[sig] = handle
     97 
     98         try:
     99             # Register a dummy signal handler to ask Python to write the signal
    100             # number in the wakup file descriptor. _process_self_data() will
    101             # read signal numbers from this file descriptor to handle signals.
    102             signal.signal(sig, _sighandler_noop)
    103 
    104             # Set SA_RESTART to limit EINTR occurrences.
    105             signal.siginterrupt(sig, False)
    106         except OSError as exc:
    107             del self._signal_handlers[sig]
    108             if not self._signal_handlers:
    109                 try:
    110                     signal.set_wakeup_fd(-1)
    111                 except (ValueError, OSError) as nexc:
    112                     logger.info('set_wakeup_fd(-1) failed: %s', nexc)
    113 
    114             if exc.errno == errno.EINVAL:
    115                 raise RuntimeError('sig {} cannot be caught'.format(sig))
    116             else:
    117                 raise
    118 
    119     def _handle_signal(self, sig):
    120         """Internal helper that is the actual signal handler."""
    121         handle = self._signal_handlers.get(sig)
    122         if handle is None:
    123             return  # Assume it's some race condition.
    124         if handle._cancelled:
    125             self.remove_signal_handler(sig)  # Remove it properly.
    126         else:
    127             self._add_callback_signalsafe(handle)
    128 
    129     def remove_signal_handler(self, sig):
    130         """Remove a handler for a signal.  UNIX only.
    131 
    132         Return True if a signal handler was removed, False if not.
    133         """
    134         self._check_signal(sig)
    135         try:
    136             del self._signal_handlers[sig]
    137         except KeyError:
    138             return False
    139 
    140         if sig == signal.SIGINT:
    141             handler = signal.default_int_handler
    142         else:
    143             handler = signal.SIG_DFL
    144 
    145         try:
    146             signal.signal(sig, handler)
    147         except OSError as exc:
    148             if exc.errno == errno.EINVAL:
    149                 raise RuntimeError('sig {} cannot be caught'.format(sig))
    150             else:
    151                 raise
    152 
    153         if not self._signal_handlers:
    154             try:
    155                 signal.set_wakeup_fd(-1)
    156             except (ValueError, OSError) as exc:
    157                 logger.info('set_wakeup_fd(-1) failed: %s', exc)
    158 
    159         return True
    160 
    161     def _check_signal(self, sig):
    162         """Internal helper to validate a signal.
    163 
    164         Raise ValueError if the signal number is invalid or uncatchable.
    165         Raise RuntimeError if there is a problem setting up the handler.
    166         """
    167         if not isinstance(sig, int):
    168             raise TypeError('sig must be an int, not {!r}'.format(sig))
    169 
    170         if not (1 <= sig < signal.NSIG):
    171             raise ValueError(
    172                 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
    173 
    174     def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
    175                                   extra=None):
    176         return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
    177 
    178     def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
    179                                    extra=None):
    180         return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
    181 
    182     @coroutine
    183     def _make_subprocess_transport(self, protocol, args, shell,
    184                                    stdin, stdout, stderr, bufsize,
    185                                    extra=None, **kwargs):
    186         with events.get_child_watcher() as watcher:
    187             waiter = self.create_future()
    188             transp = _UnixSubprocessTransport(self, protocol, args, shell,
    189                                               stdin, stdout, stderr, bufsize,
    190                                               waiter=waiter, extra=extra,
    191                                               **kwargs)
    192 
    193             watcher.add_child_handler(transp.get_pid(),
    194                                       self._child_watcher_callback, transp)
    195             try:
    196                 yield from waiter
    197             except Exception as exc:
    198                 # Workaround CPython bug #23353: using yield/yield-from in an
    199                 # except block of a generator doesn't clear properly
    200                 # sys.exc_info()
    201                 err = exc
    202             else:
    203                 err = None
    204 
    205             if err is not None:
    206                 transp.close()
    207                 yield from transp._wait()
    208                 raise err
    209 
    210         return transp
    211 
    212     def _child_watcher_callback(self, pid, returncode, transp):
    213         self.call_soon_threadsafe(transp._process_exited, returncode)
    214 
    215     @coroutine
    216     def create_unix_connection(self, protocol_factory, path, *,
    217                                ssl=None, sock=None,
    218                                server_hostname=None):
    219         assert server_hostname is None or isinstance(server_hostname, str)
    220         if ssl:
    221             if server_hostname is None:
    222                 raise ValueError(
    223                     'you have to pass server_hostname when using ssl')
    224         else:
    225             if server_hostname is not None:
    226                 raise ValueError('server_hostname is only meaningful with ssl')
    227 
    228         if path is not None:
    229             if sock is not None:
    230                 raise ValueError(
    231                     'path and sock can not be specified at the same time')
    232 
    233             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
    234             try:
    235                 sock.setblocking(False)
    236                 yield from self.sock_connect(sock, path)
    237             except:
    238                 sock.close()
    239                 raise
    240 
    241         else:
    242             if sock is None:
    243                 raise ValueError('no path and sock were specified')
    244             if (sock.family != socket.AF_UNIX or
    245                     not base_events._is_stream_socket(sock)):
    246                 raise ValueError(
    247                     'A UNIX Domain Stream Socket was expected, got {!r}'
    248                     .format(sock))
    249             sock.setblocking(False)
    250 
    251         transport, protocol = yield from self._create_connection_transport(
    252             sock, protocol_factory, ssl, server_hostname)
    253         return transport, protocol
    254 
    255     @coroutine
    256     def create_unix_server(self, protocol_factory, path=None, *,
    257                            sock=None, backlog=100, ssl=None):
    258         if isinstance(ssl, bool):
    259             raise TypeError('ssl argument must be an SSLContext or None')
    260 
    261         if path is not None:
    262             if sock is not None:
    263                 raise ValueError(
    264                     'path and sock can not be specified at the same time')
    265 
    266             path = _fspath(path)
    267             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    268 
    269             # Check for abstract socket. `str` and `bytes` paths are supported.
    270             if path[0] not in (0, '\x00'):
    271                 try:
    272                     if stat.S_ISSOCK(os.stat(path).st_mode):
    273                         os.remove(path)
    274                 except FileNotFoundError:
    275                     pass
    276                 except OSError as err:
    277                     # Directory may have permissions only to create socket.
    278                     logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
    279 
    280             try:
    281                 sock.bind(path)
    282             except OSError as exc:
    283                 sock.close()
    284                 if exc.errno == errno.EADDRINUSE:
    285                     # Let's improve the error message by adding
    286                     # with what exact address it occurs.
    287                     msg = 'Address {!r} is already in use'.format(path)
    288                     raise OSError(errno.EADDRINUSE, msg) from None
    289                 else:
    290                     raise
    291             except:
    292                 sock.close()
    293                 raise
    294         else:
    295             if sock is None:
    296                 raise ValueError(
    297                     'path was not specified, and no sock specified')
    298 
    299             if (sock.family != socket.AF_UNIX or
    300                     not base_events._is_stream_socket(sock)):
    301                 raise ValueError(
    302                     'A UNIX Domain Stream Socket was expected, got {!r}'
    303                     .format(sock))
    304 
    305         server = base_events.Server(self, [sock])
    306         sock.listen(backlog)
    307         sock.setblocking(False)
    308         self._start_serving(protocol_factory, sock, ssl, server)
    309         return server
    310 
    311 
    312 if hasattr(os, 'set_blocking'):
    313     def _set_nonblocking(fd):
    314         os.set_blocking(fd, False)
    315 else:
    316     import fcntl
    317 
    318     def _set_nonblocking(fd):
    319         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    320         flags = flags | os.O_NONBLOCK
    321         fcntl.fcntl(fd, fcntl.F_SETFL, flags)
    322 
    323 
    324 class _UnixReadPipeTransport(transports.ReadTransport):
    325 
    326     max_size = 256 * 1024  # max bytes we read in one event loop iteration
    327 
    328     def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
    329         super().__init__(extra)
    330         self._extra['pipe'] = pipe
    331         self._loop = loop
    332         self._pipe = pipe
    333         self._fileno = pipe.fileno()
    334         self._protocol = protocol
    335         self._closing = False
    336 
    337         mode = os.fstat(self._fileno).st_mode
    338         if not (stat.S_ISFIFO(mode) or
    339                 stat.S_ISSOCK(mode) or
    340                 stat.S_ISCHR(mode)):
    341             self._pipe = None
    342             self._fileno = None
    343             self._protocol = None
    344             raise ValueError("Pipe transport is for pipes/sockets only.")
    345 
    346         _set_nonblocking(self._fileno)
    347 
    348         self._loop.call_soon(self._protocol.connection_made, self)
    349         # only start reading when connection_made() has been called
    350         self._loop.call_soon(self._loop._add_reader,
    351                              self._fileno, self._read_ready)
    352         if waiter is not None:
    353             # only wake up the waiter when connection_made() has been called
    354             self._loop.call_soon(futures._set_result_unless_cancelled,
    355                                  waiter, None)
    356 
    357     def __repr__(self):
    358         info = [self.__class__.__name__]
    359         if self._pipe is None:
    360             info.append('closed')
    361         elif self._closing:
    362             info.append('closing')
    363         info.append('fd=%s' % self._fileno)
    364         selector = getattr(self._loop, '_selector', None)
    365         if self._pipe is not None and selector is not None:
    366             polling = selector_events._test_selector_event(
    367                           selector,
    368                           self._fileno, selectors.EVENT_READ)
    369             if polling:
    370                 info.append('polling')
    371             else:
    372                 info.append('idle')
    373         elif self._pipe is not None:
    374             info.append('open')
    375         else:
    376             info.append('closed')
    377         return '<%s>' % ' '.join(info)
    378 
    379     def _read_ready(self):
    380         try:
    381             data = os.read(self._fileno, self.max_size)
    382         except (BlockingIOError, InterruptedError):
    383             pass
    384         except OSError as exc:
    385             self._fatal_error(exc, 'Fatal read error on pipe transport')
    386         else:
    387             if data:
    388                 self._protocol.data_received(data)
    389             else:
    390                 if self._loop.get_debug():
    391                     logger.info("%r was closed by peer", self)
    392                 self._closing = True
    393                 self._loop._remove_reader(self._fileno)
    394                 self._loop.call_soon(self._protocol.eof_received)
    395                 self._loop.call_soon(self._call_connection_lost, None)
    396 
    397     def pause_reading(self):
    398         self._loop._remove_reader(self._fileno)
    399 
    400     def resume_reading(self):
    401         self._loop._add_reader(self._fileno, self._read_ready)
    402 
    403     def set_protocol(self, protocol):
    404         self._protocol = protocol
    405 
    406     def get_protocol(self):
    407         return self._protocol
    408 
    409     def is_closing(self):
    410         return self._closing
    411 
    412     def close(self):
    413         if not self._closing:
    414             self._close(None)
    415 
    416     # On Python 3.3 and older, objects with a destructor part of a reference
    417     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
    418     # to the PEP 442.
    419     if compat.PY34:
    420         def __del__(self):
    421             if self._pipe is not None:
    422                 warnings.warn("unclosed transport %r" % self, ResourceWarning,
    423                               source=self)
    424                 self._pipe.close()
    425 
    426     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
    427         # should be called by exception handler only
    428         if (isinstance(exc, OSError) and exc.errno == errno.EIO):
    429             if self._loop.get_debug():
    430                 logger.debug("%r: %s", self, message, exc_info=True)
    431         else:
    432             self._loop.call_exception_handler({
    433                 'message': message,
    434                 'exception': exc,
    435                 'transport': self,
    436                 'protocol': self._protocol,
    437             })
    438         self._close(exc)
    439 
    440     def _close(self, exc):
    441         self._closing = True
    442         self._loop._remove_reader(self._fileno)
    443         self._loop.call_soon(self._call_connection_lost, exc)
    444 
    445     def _call_connection_lost(self, exc):
    446         try:
    447             self._protocol.connection_lost(exc)
    448         finally:
    449             self._pipe.close()
    450             self._pipe = None
    451             self._protocol = None
    452             self._loop = None
    453 
    454 
    455 class _UnixWritePipeTransport(transports._FlowControlMixin,
    456                               transports.WriteTransport):
    457 
    458     def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
    459         super().__init__(extra, loop)
    460         self._extra['pipe'] = pipe
    461         self._pipe = pipe
    462         self._fileno = pipe.fileno()
    463         self._protocol = protocol
    464         self._buffer = bytearray()
    465         self._conn_lost = 0
    466         self._closing = False  # Set when close() or write_eof() called.
    467 
    468         mode = os.fstat(self._fileno).st_mode
    469         is_char = stat.S_ISCHR(mode)
    470         is_fifo = stat.S_ISFIFO(mode)
    471         is_socket = stat.S_ISSOCK(mode)
    472         if not (is_char or is_fifo or is_socket):
    473             self._pipe = None
    474             self._fileno = None
    475             self._protocol = None
    476             raise ValueError("Pipe transport is only for "
    477                              "pipes, sockets and character devices")
    478 
    479         _set_nonblocking(self._fileno)
    480         self._loop.call_soon(self._protocol.connection_made, self)
    481 
    482         # On AIX, the reader trick (to be notified when the read end of the
    483         # socket is closed) only works for sockets. On other platforms it
    484         # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
    485         if is_socket or (is_fifo and not sys.platform.startswith("aix")):
    486             # only start reading when connection_made() has been called
    487             self._loop.call_soon(self._loop._add_reader,
    488                                  self._fileno, self._read_ready)
    489 
    490         if waiter is not None:
    491             # only wake up the waiter when connection_made() has been called
    492             self._loop.call_soon(futures._set_result_unless_cancelled,
    493                                  waiter, None)
    494 
    495     def __repr__(self):
    496         info = [self.__class__.__name__]
    497         if self._pipe is None:
    498             info.append('closed')
    499         elif self._closing:
    500             info.append('closing')
    501         info.append('fd=%s' % self._fileno)
    502         selector = getattr(self._loop, '_selector', None)
    503         if self._pipe is not None and selector is not None:
    504             polling = selector_events._test_selector_event(
    505                           selector,
    506                           self._fileno, selectors.EVENT_WRITE)
    507             if polling:
    508                 info.append('polling')
    509             else:
    510                 info.append('idle')
    511 
    512             bufsize = self.get_write_buffer_size()
    513             info.append('bufsize=%s' % bufsize)
    514         elif self._pipe is not None:
    515             info.append('open')
    516         else:
    517             info.append('closed')
    518         return '<%s>' % ' '.join(info)
    519 
    520     def get_write_buffer_size(self):
    521         return len(self._buffer)
    522 
    523     def _read_ready(self):
    524         # Pipe was closed by peer.
    525         if self._loop.get_debug():
    526             logger.info("%r was closed by peer", self)
    527         if self._buffer:
    528             self._close(BrokenPipeError())
    529         else:
    530             self._close()
    531 
    532     def write(self, data):
    533         assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
    534         if isinstance(data, bytearray):
    535             data = memoryview(data)
    536         if not data:
    537             return
    538 
    539         if self._conn_lost or self._closing:
    540             if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
    541                 logger.warning('pipe closed by peer or '
    542                                'os.write(pipe, data) raised exception.')
    543             self._conn_lost += 1
    544             return
    545 
    546         if not self._buffer:
    547             # Attempt to send it right away first.
    548             try:
    549                 n = os.write(self._fileno, data)
    550             except (BlockingIOError, InterruptedError):
    551                 n = 0
    552             except Exception as exc:
    553                 self._conn_lost += 1
    554                 self._fatal_error(exc, 'Fatal write error on pipe transport')
    555                 return
    556             if n == len(data):
    557                 return
    558             elif n > 0:
    559                 data = memoryview(data)[n:]
    560             self._loop._add_writer(self._fileno, self._write_ready)
    561 
    562         self._buffer += data
    563         self._maybe_pause_protocol()
    564 
    565     def _write_ready(self):
    566         assert self._buffer, 'Data should not be empty'
    567 
    568         try:
    569             n = os.write(self._fileno, self._buffer)
    570         except (BlockingIOError, InterruptedError):
    571             pass
    572         except Exception as exc:
    573             self._buffer.clear()
    574             self._conn_lost += 1
    575             # Remove writer here, _fatal_error() doesn't it
    576             # because _buffer is empty.
    577             self._loop._remove_writer(self._fileno)
    578             self._fatal_error(exc, 'Fatal write error on pipe transport')
    579         else:
    580             if n == len(self._buffer):
    581                 self._buffer.clear()
    582                 self._loop._remove_writer(self._fileno)
    583                 self._maybe_resume_protocol()  # May append to buffer.
    584                 if self._closing:
    585                     self._loop._remove_reader(self._fileno)
    586                     self._call_connection_lost(None)
    587                 return
    588             elif n > 0:
    589                 del self._buffer[:n]
    590 
    591     def can_write_eof(self):
    592         return True
    593 
    594     def write_eof(self):
    595         if self._closing:
    596             return
    597         assert self._pipe
    598         self._closing = True
    599         if not self._buffer:
    600             self._loop._remove_reader(self._fileno)
    601             self._loop.call_soon(self._call_connection_lost, None)
    602 
    603     def set_protocol(self, protocol):
    604         self._protocol = protocol
    605 
    606     def get_protocol(self):
    607         return self._protocol
    608 
    609     def is_closing(self):
    610         return self._closing
    611 
    612     def close(self):
    613         if self._pipe is not None and not self._closing:
    614             # write_eof is all what we needed to close the write pipe
    615             self.write_eof()
    616 
    617     # On Python 3.3 and older, objects with a destructor part of a reference
    618     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
    619     # to the PEP 442.
    620     if compat.PY34:
    621         def __del__(self):
    622             if self._pipe is not None:
    623                 warnings.warn("unclosed transport %r" % self, ResourceWarning,
    624                               source=self)
    625                 self._pipe.close()
    626 
    627     def abort(self):
    628         self._close(None)
    629 
    630     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
    631         # should be called by exception handler only
    632         if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
    633             if self._loop.get_debug():
    634                 logger.debug("%r: %s", self, message, exc_info=True)
    635         else:
    636             self._loop.call_exception_handler({
    637                 'message': message,
    638                 'exception': exc,
    639                 'transport': self,
    640                 'protocol': self._protocol,
    641             })
    642         self._close(exc)
    643 
    644     def _close(self, exc=None):
    645         self._closing = True
    646         if self._buffer:
    647             self._loop._remove_writer(self._fileno)
    648         self._buffer.clear()
    649         self._loop._remove_reader(self._fileno)
    650         self._loop.call_soon(self._call_connection_lost, exc)
    651 
    652     def _call_connection_lost(self, exc):
    653         try:
    654             self._protocol.connection_lost(exc)
    655         finally:
    656             self._pipe.close()
    657             self._pipe = None
    658             self._protocol = None
    659             self._loop = None
    660 
    661 
    662 if hasattr(os, 'set_inheritable'):
    663     # Python 3.4 and newer
    664     _set_inheritable = os.set_inheritable
    665 else:
    666     import fcntl
    667 
    668     def _set_inheritable(fd, inheritable):
    669         cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
    670 
    671         old = fcntl.fcntl(fd, fcntl.F_GETFD)
    672         if not inheritable:
    673             fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
    674         else:
    675             fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
    676 
    677 
    678 class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
    679 
    680     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
    681         stdin_w = None
    682         if stdin == subprocess.PIPE:
    683             # Use a socket pair for stdin, since not all platforms
    684             # support selecting read events on the write end of a
    685             # socket (which we use in order to detect closing of the
    686             # other end).  Notably this is needed on AIX, and works
    687             # just fine on other platforms.
    688             stdin, stdin_w = self._loop._socketpair()
    689 
    690             # Mark the write end of the stdin pipe as non-inheritable,
    691             # needed by close_fds=False on Python 3.3 and older
    692             # (Python 3.4 implements the PEP 446, socketpair returns
    693             # non-inheritable sockets)
    694             _set_inheritable(stdin_w.fileno(), False)
    695         self._proc = subprocess.Popen(
    696             args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
    697             universal_newlines=False, bufsize=bufsize, **kwargs)
    698         if stdin_w is not None:
    699             stdin.close()
    700             self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
    701 
    702 
    703 class AbstractChildWatcher:
    704     """Abstract base class for monitoring child processes.
    705 
    706     Objects derived from this class monitor a collection of subprocesses and
    707     report their termination or interruption by a signal.
    708 
    709     New callbacks are registered with .add_child_handler(). Starting a new
    710     process must be done within a 'with' block to allow the watcher to suspend
    711     its activity until the new process if fully registered (this is needed to
    712     prevent a race condition in some implementations).
    713 
    714     Example:
    715         with watcher:
    716             proc = subprocess.Popen("sleep 1")
    717             watcher.add_child_handler(proc.pid, callback)
    718 
    719     Notes:
    720         Implementations of this class must be thread-safe.
    721 
    722         Since child watcher objects may catch the SIGCHLD signal and call
    723         waitpid(-1), there should be only one active object per process.
    724     """
    725 
    726     def add_child_handler(self, pid, callback, *args):
    727         """Register a new child handler.
    728 
    729         Arrange for callback(pid, returncode, *args) to be called when
    730         process 'pid' terminates. Specifying another callback for the same
    731         process replaces the previous handler.
    732 
    733         Note: callback() must be thread-safe.
    734         """
    735         raise NotImplementedError()
    736 
    737     def remove_child_handler(self, pid):
    738         """Removes the handler for process 'pid'.
    739 
    740         The function returns True if the handler was successfully removed,
    741         False if there was nothing to remove."""
    742 
    743         raise NotImplementedError()
    744 
    745     def attach_loop(self, loop):
    746         """Attach the watcher to an event loop.
    747 
    748         If the watcher was previously attached to an event loop, then it is
    749         first detached before attaching to the new loop.
    750 
    751         Note: loop may be None.
    752         """
    753         raise NotImplementedError()
    754 
    755     def close(self):
    756         """Close the watcher.
    757 
    758         This must be called to make sure that any underlying resource is freed.
    759         """
    760         raise NotImplementedError()
    761 
    762     def __enter__(self):
    763         """Enter the watcher's context and allow starting new processes
    764 
    765         This function must return self"""
    766         raise NotImplementedError()
    767 
    768     def __exit__(self, a, b, c):
    769         """Exit the watcher's context"""
    770         raise NotImplementedError()
    771 
    772 
    773 class BaseChildWatcher(AbstractChildWatcher):
    774 
    775     def __init__(self):
    776         self._loop = None
    777         self._callbacks = {}
    778 
    779     def close(self):
    780         self.attach_loop(None)
    781 
    782     def _do_waitpid(self, expected_pid):
    783         raise NotImplementedError()
    784 
    785     def _do_waitpid_all(self):
    786         raise NotImplementedError()
    787 
    788     def attach_loop(self, loop):
    789         assert loop is None or isinstance(loop, events.AbstractEventLoop)
    790 
    791         if self._loop is not None and loop is None and self._callbacks:
    792             warnings.warn(
    793                 'A loop is being detached '
    794                 'from a child watcher with pending handlers',
    795                 RuntimeWarning)
    796 
    797         if self._loop is not None:
    798             self._loop.remove_signal_handler(signal.SIGCHLD)
    799 
    800         self._loop = loop
    801         if loop is not None:
    802             loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
    803 
    804             # Prevent a race condition in case a child terminated
    805             # during the switch.
    806             self._do_waitpid_all()
    807 
    808     def _sig_chld(self):
    809         try:
    810             self._do_waitpid_all()
    811         except Exception as exc:
    812             # self._loop should always be available here
    813             # as '_sig_chld' is added as a signal handler
    814             # in 'attach_loop'
    815             self._loop.call_exception_handler({
    816                 'message': 'Unknown exception in SIGCHLD handler',
    817                 'exception': exc,
    818             })
    819 
    820     def _compute_returncode(self, status):
    821         if os.WIFSIGNALED(status):
    822             # The child process died because of a signal.
    823             return -os.WTERMSIG(status)
    824         elif os.WIFEXITED(status):
    825             # The child process exited (e.g sys.exit()).
    826             return os.WEXITSTATUS(status)
    827         else:
    828             # The child exited, but we don't understand its status.
    829             # This shouldn't happen, but if it does, let's just
    830             # return that status; perhaps that helps debug it.
    831             return status
    832 
    833 
    834 class SafeChildWatcher(BaseChildWatcher):
    835     """'Safe' child watcher implementation.
    836 
    837     This implementation avoids disrupting other code spawning processes by
    838     polling explicitly each process in the SIGCHLD handler instead of calling
    839     os.waitpid(-1).
    840 
    841     This is a safe solution but it has a significant overhead when handling a
    842     big number of children (O(n) each time SIGCHLD is raised)
    843     """
    844 
    845     def close(self):
    846         self._callbacks.clear()
    847         super().close()
    848 
    849     def __enter__(self):
    850         return self
    851 
    852     def __exit__(self, a, b, c):
    853         pass
    854 
    855     def add_child_handler(self, pid, callback, *args):
    856         if self._loop is None:
    857             raise RuntimeError(
    858                 "Cannot add child handler, "
    859                 "the child watcher does not have a loop attached")
    860 
    861         self._callbacks[pid] = (callback, args)
    862 
    863         # Prevent a race condition in case the child is already terminated.
    864         self._do_waitpid(pid)
    865 
    866     def remove_child_handler(self, pid):
    867         try:
    868             del self._callbacks[pid]
    869             return True
    870         except KeyError:
    871             return False
    872 
    873     def _do_waitpid_all(self):
    874 
    875         for pid in list(self._callbacks):
    876             self._do_waitpid(pid)
    877 
    878     def _do_waitpid(self, expected_pid):
    879         assert expected_pid > 0
    880 
    881         try:
    882             pid, status = os.waitpid(expected_pid, os.WNOHANG)
    883         except ChildProcessError:
    884             # The child process is already reaped
    885             # (may happen if waitpid() is called elsewhere).
    886             pid = expected_pid
    887             returncode = 255
    888             logger.warning(
    889                 "Unknown child process pid %d, will report returncode 255",
    890                 pid)
    891         else:
    892             if pid == 0:
    893                 # The child process is still alive.
    894                 return
    895 
    896             returncode = self._compute_returncode(status)
    897             if self._loop.get_debug():
    898                 logger.debug('process %s exited with returncode %s',
    899                              expected_pid, returncode)
    900 
    901         try:
    902             callback, args = self._callbacks.pop(pid)
    903         except KeyError:  # pragma: no cover
    904             # May happen if .remove_child_handler() is called
    905             # after os.waitpid() returns.
    906             if self._loop.get_debug():
    907                 logger.warning("Child watcher got an unexpected pid: %r",
    908                                pid, exc_info=True)
    909         else:
    910             callback(pid, returncode, *args)
    911 
    912 
    913 class FastChildWatcher(BaseChildWatcher):
    914     """'Fast' child watcher implementation.
    915 
    916     This implementation reaps every terminated processes by calling
    917     os.waitpid(-1) directly, possibly breaking other code spawning processes
    918     and waiting for their termination.
    919 
    920     There is no noticeable overhead when handling a big number of children
    921     (O(1) each time a child terminates).
    922     """
    923     def __init__(self):
    924         super().__init__()
    925         self._lock = threading.Lock()
    926         self._zombies = {}
    927         self._forks = 0
    928 
    929     def close(self):
    930         self._callbacks.clear()
    931         self._zombies.clear()
    932         super().close()
    933 
    934     def __enter__(self):
    935         with self._lock:
    936             self._forks += 1
    937 
    938             return self
    939 
    940     def __exit__(self, a, b, c):
    941         with self._lock:
    942             self._forks -= 1
    943 
    944             if self._forks or not self._zombies:
    945                 return
    946 
    947             collateral_victims = str(self._zombies)
    948             self._zombies.clear()
    949 
    950         logger.warning(
    951             "Caught subprocesses termination from unknown pids: %s",
    952             collateral_victims)
    953 
    954     def add_child_handler(self, pid, callback, *args):
    955         assert self._forks, "Must use the context manager"
    956 
    957         if self._loop is None:
    958             raise RuntimeError(
    959                 "Cannot add child handler, "
    960                 "the child watcher does not have a loop attached")
    961 
    962         with self._lock:
    963             try:
    964                 returncode = self._zombies.pop(pid)
    965             except KeyError:
    966                 # The child is running.
    967                 self._callbacks[pid] = callback, args
    968                 return
    969 
    970         # The child is dead already. We can fire the callback.
    971         callback(pid, returncode, *args)
    972 
    973     def remove_child_handler(self, pid):
    974         try:
    975             del self._callbacks[pid]
    976             return True
    977         except KeyError:
    978             return False
    979 
    980     def _do_waitpid_all(self):
    981         # Because of signal coalescing, we must keep calling waitpid() as
    982         # long as we're able to reap a child.
    983         while True:
    984             try:
    985                 pid, status = os.waitpid(-1, os.WNOHANG)
    986             except ChildProcessError:
    987                 # No more child processes exist.
    988                 return
    989             else:
    990                 if pid == 0:
    991                     # A child process is still alive.
    992                     return
    993 
    994                 returncode = self._compute_returncode(status)
    995 
    996             with self._lock:
    997                 try:
    998                     callback, args = self._callbacks.pop(pid)
    999                 except KeyError:
   1000                     # unknown child
   1001                     if self._forks:
   1002                         # It may not be registered yet.
   1003                         self._zombies[pid] = returncode
   1004                         if self._loop.get_debug():
   1005                             logger.debug('unknown process %s exited '
   1006                                          'with returncode %s',
   1007                                          pid, returncode)
   1008                         continue
   1009                     callback = None
   1010                 else:
   1011                     if self._loop.get_debug():
   1012                         logger.debug('process %s exited with returncode %s',
   1013                                      pid, returncode)
   1014 
   1015             if callback is None:
   1016                 logger.warning(
   1017                     "Caught subprocess termination from unknown pid: "
   1018                     "%d -> %d", pid, returncode)
   1019             else:
   1020                 callback(pid, returncode, *args)
   1021 
   1022 
   1023 class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
   1024     """UNIX event loop policy with a watcher for child processes."""
   1025     _loop_factory = _UnixSelectorEventLoop
   1026 
   1027     def __init__(self):
   1028         super().__init__()
   1029         self._watcher = None
   1030 
   1031     def _init_watcher(self):
   1032         with events._lock:
   1033             if self._watcher is None:  # pragma: no branch
   1034                 self._watcher = SafeChildWatcher()
   1035                 if isinstance(threading.current_thread(),
   1036                               threading._MainThread):
   1037                     self._watcher.attach_loop(self._local._loop)
   1038 
   1039     def set_event_loop(self, loop):
   1040         """Set the event loop.
   1041 
   1042         As a side effect, if a child watcher was set before, then calling
   1043         .set_event_loop() from the main thread will call .attach_loop(loop) on
   1044         the child watcher.
   1045         """
   1046 
   1047         super().set_event_loop(loop)
   1048 
   1049         if self._watcher is not None and \
   1050             isinstance(threading.current_thread(), threading._MainThread):
   1051             self._watcher.attach_loop(loop)
   1052 
   1053     def get_child_watcher(self):
   1054         """Get the watcher for child processes.
   1055 
   1056         If not yet set, a SafeChildWatcher object is automatically created.
   1057         """
   1058         if self._watcher is None:
   1059             self._init_watcher()
   1060 
   1061         return self._watcher
   1062 
   1063     def set_child_watcher(self, watcher):
   1064         """Set the watcher for child processes."""
   1065 
   1066         assert watcher is None or isinstance(watcher, AbstractChildWatcher)
   1067 
   1068         if self._watcher is not None:
   1069             self._watcher.close()
   1070 
   1071         self._watcher = watcher
   1072 
   1073 SelectorEventLoop = _UnixSelectorEventLoop
   1074 DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
   1075