Home | History | Annotate | Download | only in asyncio
      1 """Selector event loop for Unix with signal handling."""
      2 
      3 import errno
      4 import io
      5 import os
      6 import selectors
      7 import signal
      8 import socket
      9 import stat
     10 import subprocess
     11 import sys
     12 import threading
     13 import warnings
     14 
     15 
     16 from . import base_events
     17 from . import base_subprocess
     18 from . import constants
     19 from . import coroutines
     20 from . import events
     21 from . import futures
     22 from . import selector_events
     23 from . import tasks
     24 from . import transports
     25 from .log import logger
     26 
     27 
     28 __all__ = (
     29     'SelectorEventLoop',
     30     'AbstractChildWatcher', 'SafeChildWatcher',
     31     'FastChildWatcher', 'DefaultEventLoopPolicy',
     32 )
     33 
     34 
     35 if sys.platform == 'win32':  # pragma: no cover
     36     raise ImportError('Signals are not really supported on Windows')
     37 
     38 
     39 def _sighandler_noop(signum, frame):
     40     """Dummy signal handler."""
     41     pass
     42 
     43 
     44 class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
     45     """Unix event loop.
     46 
     47     Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
     48     """
     49 
     50     def __init__(self, selector=None):
     51         super().__init__(selector)
     52         self._signal_handlers = {}
     53 
     54     def close(self):
     55         super().close()
     56         if not sys.is_finalizing():
     57             for sig in list(self._signal_handlers):
     58                 self.remove_signal_handler(sig)
     59         else:
     60             if self._signal_handlers:
     61                 warnings.warn(f"Closing the loop {self!r} "
     62                               f"on interpreter shutdown "
     63                               f"stage, skipping signal handlers removal",
     64                               ResourceWarning,
     65                               source=self)
     66                 self._signal_handlers.clear()
     67 
     68     def _process_self_data(self, data):
     69         for signum in data:
     70             if not signum:
     71                 # ignore null bytes written by _write_to_self()
     72                 continue
     73             self._handle_signal(signum)
     74 
     75     def add_signal_handler(self, sig, callback, *args):
     76         """Add a handler for a signal.  UNIX only.
     77 
     78         Raise ValueError if the signal number is invalid or uncatchable.
     79         Raise RuntimeError if there is a problem setting up the handler.
     80         """
     81         if (coroutines.iscoroutine(callback) or
     82                 coroutines.iscoroutinefunction(callback)):
     83             raise TypeError("coroutines cannot be used "
     84                             "with add_signal_handler()")
     85         self._check_signal(sig)
     86         self._check_closed()
     87         try:
     88             # set_wakeup_fd() raises ValueError if this is not the
     89             # main thread.  By calling it early we ensure that an
     90             # event loop running in another thread cannot add a signal
     91             # handler.
     92             signal.set_wakeup_fd(self._csock.fileno())
     93         except (ValueError, OSError) as exc:
     94             raise RuntimeError(str(exc))
     95 
     96         handle = events.Handle(callback, args, self, None)
     97         self._signal_handlers[sig] = handle
     98 
     99         try:
    100             # Register a dummy signal handler to ask Python to write the signal
    101             # number in the wakup file descriptor. _process_self_data() will
    102             # read signal numbers from this file descriptor to handle signals.
    103             signal.signal(sig, _sighandler_noop)
    104 
    105             # Set SA_RESTART to limit EINTR occurrences.
    106             signal.siginterrupt(sig, False)
    107         except OSError as exc:
    108             del self._signal_handlers[sig]
    109             if not self._signal_handlers:
    110                 try:
    111                     signal.set_wakeup_fd(-1)
    112                 except (ValueError, OSError) as nexc:
    113                     logger.info('set_wakeup_fd(-1) failed: %s', nexc)
    114 
    115             if exc.errno == errno.EINVAL:
    116                 raise RuntimeError(f'sig {sig} cannot be caught')
    117             else:
    118                 raise
    119 
    120     def _handle_signal(self, sig):
    121         """Internal helper that is the actual signal handler."""
    122         handle = self._signal_handlers.get(sig)
    123         if handle is None:
    124             return  # Assume it's some race condition.
    125         if handle._cancelled:
    126             self.remove_signal_handler(sig)  # Remove it properly.
    127         else:
    128             self._add_callback_signalsafe(handle)
    129 
    130     def remove_signal_handler(self, sig):
    131         """Remove a handler for a signal.  UNIX only.
    132 
    133         Return True if a signal handler was removed, False if not.
    134         """
    135         self._check_signal(sig)
    136         try:
    137             del self._signal_handlers[sig]
    138         except KeyError:
    139             return False
    140 
    141         if sig == signal.SIGINT:
    142             handler = signal.default_int_handler
    143         else:
    144             handler = signal.SIG_DFL
    145 
    146         try:
    147             signal.signal(sig, handler)
    148         except OSError as exc:
    149             if exc.errno == errno.EINVAL:
    150                 raise RuntimeError(f'sig {sig} cannot be caught')
    151             else:
    152                 raise
    153 
    154         if not self._signal_handlers:
    155             try:
    156                 signal.set_wakeup_fd(-1)
    157             except (ValueError, OSError) as exc:
    158                 logger.info('set_wakeup_fd(-1) failed: %s', exc)
    159 
    160         return True
    161 
    162     def _check_signal(self, sig):
    163         """Internal helper to validate a signal.
    164 
    165         Raise ValueError if the signal number is invalid or uncatchable.
    166         Raise RuntimeError if there is a problem setting up the handler.
    167         """
    168         if not isinstance(sig, int):
    169             raise TypeError(f'sig must be an int, not {sig!r}')
    170 
    171         if not (1 <= sig < signal.NSIG):
    172             raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
    173 
    174     def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
    175                                   extra=None):
    176         return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
    177 
    178     def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
    179                                    extra=None):
    180         return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
    181 
    182     async def _make_subprocess_transport(self, protocol, args, shell,
    183                                          stdin, stdout, stderr, bufsize,
    184                                          extra=None, **kwargs):
    185         with events.get_child_watcher() as watcher:
    186             waiter = self.create_future()
    187             transp = _UnixSubprocessTransport(self, protocol, args, shell,
    188                                               stdin, stdout, stderr, bufsize,
    189                                               waiter=waiter, extra=extra,
    190                                               **kwargs)
    191 
    192             watcher.add_child_handler(transp.get_pid(),
    193                                       self._child_watcher_callback, transp)
    194             try:
    195                 await waiter
    196             except Exception:
    197                 transp.close()
    198                 await transp._wait()
    199                 raise
    200 
    201         return transp
    202 
    203     def _child_watcher_callback(self, pid, returncode, transp):
    204         self.call_soon_threadsafe(transp._process_exited, returncode)
    205 
    206     async def create_unix_connection(
    207             self, protocol_factory, path=None, *,
    208             ssl=None, sock=None,
    209             server_hostname=None,
    210             ssl_handshake_timeout=None):
    211         assert server_hostname is None or isinstance(server_hostname, str)
    212         if ssl:
    213             if server_hostname is None:
    214                 raise ValueError(
    215                     'you have to pass server_hostname when using ssl')
    216         else:
    217             if server_hostname is not None:
    218                 raise ValueError('server_hostname is only meaningful with ssl')
    219             if ssl_handshake_timeout is not None:
    220                 raise ValueError(
    221                     'ssl_handshake_timeout is only meaningful with ssl')
    222 
    223         if path is not None:
    224             if sock is not None:
    225                 raise ValueError(
    226                     'path and sock can not be specified at the same time')
    227 
    228             path = os.fspath(path)
    229             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
    230             try:
    231                 sock.setblocking(False)
    232                 await self.sock_connect(sock, path)
    233             except:
    234                 sock.close()
    235                 raise
    236 
    237         else:
    238             if sock is None:
    239                 raise ValueError('no path and sock were specified')
    240             if (sock.family != socket.AF_UNIX or
    241                     sock.type != socket.SOCK_STREAM):
    242                 raise ValueError(
    243                     f'A UNIX Domain Stream Socket was expected, got {sock!r}')
    244             sock.setblocking(False)
    245 
    246         transport, protocol = await self._create_connection_transport(
    247             sock, protocol_factory, ssl, server_hostname,
    248             ssl_handshake_timeout=ssl_handshake_timeout)
    249         return transport, protocol
    250 
    251     async def create_unix_server(
    252             self, protocol_factory, path=None, *,
    253             sock=None, backlog=100, ssl=None,
    254             ssl_handshake_timeout=None,
    255             start_serving=True):
    256         if isinstance(ssl, bool):
    257             raise TypeError('ssl argument must be an SSLContext or None')
    258 
    259         if ssl_handshake_timeout is not None and not ssl:
    260             raise ValueError(
    261                 'ssl_handshake_timeout is only meaningful with ssl')
    262 
    263         if path is not None:
    264             if sock is not None:
    265                 raise ValueError(
    266                     'path and sock can not be specified at the same time')
    267 
    268             path = os.fspath(path)
    269             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    270 
    271             # Check for abstract socket. `str` and `bytes` paths are supported.
    272             if path[0] not in (0, '\x00'):
    273                 try:
    274                     if stat.S_ISSOCK(os.stat(path).st_mode):
    275                         os.remove(path)
    276                 except FileNotFoundError:
    277                     pass
    278                 except OSError as err:
    279                     # Directory may have permissions only to create socket.
    280                     logger.error('Unable to check or remove stale UNIX socket '
    281                                  '%r: %r', path, err)
    282 
    283             try:
    284                 sock.bind(path)
    285             except OSError as exc:
    286                 sock.close()
    287                 if exc.errno == errno.EADDRINUSE:
    288                     # Let's improve the error message by adding
    289                     # with what exact address it occurs.
    290                     msg = f'Address {path!r} is already in use'
    291                     raise OSError(errno.EADDRINUSE, msg) from None
    292                 else:
    293                     raise
    294             except:
    295                 sock.close()
    296                 raise
    297         else:
    298             if sock is None:
    299                 raise ValueError(
    300                     'path was not specified, and no sock specified')
    301 
    302             if (sock.family != socket.AF_UNIX or
    303                     sock.type != socket.SOCK_STREAM):
    304                 raise ValueError(
    305                     f'A UNIX Domain Stream Socket was expected, got {sock!r}')
    306 
    307         sock.setblocking(False)
    308         server = base_events.Server(self, [sock], protocol_factory,
    309                                     ssl, backlog, ssl_handshake_timeout)
    310         if start_serving:
    311             server._start_serving()
    312             # Skip one loop iteration so that all 'loop.add_reader'
    313             # go through.
    314             await tasks.sleep(0, loop=self)
    315 
    316         return server
    317 
    318     async def _sock_sendfile_native(self, sock, file, offset, count):
    319         try:
    320             os.sendfile
    321         except AttributeError as exc:
    322             raise events.SendfileNotAvailableError(
    323                 "os.sendfile() is not available")
    324         try:
    325             fileno = file.fileno()
    326         except (AttributeError, io.UnsupportedOperation) as err:
    327             raise events.SendfileNotAvailableError("not a regular file")
    328         try:
    329             fsize = os.fstat(fileno).st_size
    330         except OSError as err:
    331             raise events.SendfileNotAvailableError("not a regular file")
    332         blocksize = count if count else fsize
    333         if not blocksize:
    334             return 0  # empty file
    335 
    336         fut = self.create_future()
    337         self._sock_sendfile_native_impl(fut, None, sock, fileno,
    338                                         offset, count, blocksize, 0)
    339         return await fut
    340 
    341     def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
    342                                    offset, count, blocksize, total_sent):
    343         fd = sock.fileno()
    344         if registered_fd is not None:
    345             # Remove the callback early.  It should be rare that the
    346             # selector says the fd is ready but the call still returns
    347             # EAGAIN, and I am willing to take a hit in that case in
    348             # order to simplify the common case.
    349             self.remove_writer(registered_fd)
    350         if fut.cancelled():
    351             self._sock_sendfile_update_filepos(fileno, offset, total_sent)
    352             return
    353         if count:
    354             blocksize = count - total_sent
    355             if blocksize <= 0:
    356                 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
    357                 fut.set_result(total_sent)
    358                 return
    359 
    360         try:
    361             sent = os.sendfile(fd, fileno, offset, blocksize)
    362         except (BlockingIOError, InterruptedError):
    363             if registered_fd is None:
    364                 self._sock_add_cancellation_callback(fut, sock)
    365             self.add_writer(fd, self._sock_sendfile_native_impl, fut,
    366                             fd, sock, fileno,
    367                             offset, count, blocksize, total_sent)
    368         except OSError as exc:
    369             if (registered_fd is not None and
    370                     exc.errno == errno.ENOTCONN and
    371                     type(exc) is not ConnectionError):
    372                 # If we have an ENOTCONN and this isn't a first call to
    373                 # sendfile(), i.e. the connection was closed in the middle
    374                 # of the operation, normalize the error to ConnectionError
    375                 # to make it consistent across all Posix systems.
    376                 new_exc = ConnectionError(
    377                     "socket is not connected", errno.ENOTCONN)
    378                 new_exc.__cause__ = exc
    379                 exc = new_exc
    380             if total_sent == 0:
    381                 # We can get here for different reasons, the main
    382                 # one being 'file' is not a regular mmap(2)-like
    383                 # file, in which case we'll fall back on using
    384                 # plain send().
    385                 err = events.SendfileNotAvailableError(
    386                     "os.sendfile call failed")
    387                 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
    388                 fut.set_exception(err)
    389             else:
    390                 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
    391                 fut.set_exception(exc)
    392         except Exception as exc:
    393             self._sock_sendfile_update_filepos(fileno, offset, total_sent)
    394             fut.set_exception(exc)
    395         else:
    396             if sent == 0:
    397                 # EOF
    398                 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
    399                 fut.set_result(total_sent)
    400             else:
    401                 offset += sent
    402                 total_sent += sent
    403                 if registered_fd is None:
    404                     self._sock_add_cancellation_callback(fut, sock)
    405                 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
    406                                 fd, sock, fileno,
    407                                 offset, count, blocksize, total_sent)
    408 
    409     def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
    410         if total_sent > 0:
    411             os.lseek(fileno, offset, os.SEEK_SET)
    412 
    413     def _sock_add_cancellation_callback(self, fut, sock):
    414         def cb(fut):
    415             if fut.cancelled():
    416                 fd = sock.fileno()
    417                 if fd != -1:
    418                     self.remove_writer(fd)
    419         fut.add_done_callback(cb)
    420 
    421 
    422 class _UnixReadPipeTransport(transports.ReadTransport):
    423 
    424     max_size = 256 * 1024  # max bytes we read in one event loop iteration
    425 
    426     def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
    427         super().__init__(extra)
    428         self._extra['pipe'] = pipe
    429         self._loop = loop
    430         self._pipe = pipe
    431         self._fileno = pipe.fileno()
    432         self._protocol = protocol
    433         self._closing = False
    434 
    435         mode = os.fstat(self._fileno).st_mode
    436         if not (stat.S_ISFIFO(mode) or
    437                 stat.S_ISSOCK(mode) or
    438                 stat.S_ISCHR(mode)):
    439             self._pipe = None
    440             self._fileno = None
    441             self._protocol = None
    442             raise ValueError("Pipe transport is for pipes/sockets only.")
    443 
    444         os.set_blocking(self._fileno, False)
    445 
    446         self._loop.call_soon(self._protocol.connection_made, self)
    447         # only start reading when connection_made() has been called
    448         self._loop.call_soon(self._loop._add_reader,
    449                              self._fileno, self._read_ready)
    450         if waiter is not None:
    451             # only wake up the waiter when connection_made() has been called
    452             self._loop.call_soon(futures._set_result_unless_cancelled,
    453                                  waiter, None)
    454 
    455     def __repr__(self):
    456         info = [self.__class__.__name__]
    457         if self._pipe is None:
    458             info.append('closed')
    459         elif self._closing:
    460             info.append('closing')
    461         info.append(f'fd={self._fileno}')
    462         selector = getattr(self._loop, '_selector', None)
    463         if self._pipe is not None and selector is not None:
    464             polling = selector_events._test_selector_event(
    465                 selector, self._fileno, selectors.EVENT_READ)
    466             if polling:
    467                 info.append('polling')
    468             else:
    469                 info.append('idle')
    470         elif self._pipe is not None:
    471             info.append('open')
    472         else:
    473             info.append('closed')
    474         return '<{}>'.format(' '.join(info))
    475 
    476     def _read_ready(self):
    477         try:
    478             data = os.read(self._fileno, self.max_size)
    479         except (BlockingIOError, InterruptedError):
    480             pass
    481         except OSError as exc:
    482             self._fatal_error(exc, 'Fatal read error on pipe transport')
    483         else:
    484             if data:
    485                 self._protocol.data_received(data)
    486             else:
    487                 if self._loop.get_debug():
    488                     logger.info("%r was closed by peer", self)
    489                 self._closing = True
    490                 self._loop._remove_reader(self._fileno)
    491                 self._loop.call_soon(self._protocol.eof_received)
    492                 self._loop.call_soon(self._call_connection_lost, None)
    493 
    494     def pause_reading(self):
    495         self._loop._remove_reader(self._fileno)
    496 
    497     def resume_reading(self):
    498         self._loop._add_reader(self._fileno, self._read_ready)
    499 
    500     def set_protocol(self, protocol):
    501         self._protocol = protocol
    502 
    503     def get_protocol(self):
    504         return self._protocol
    505 
    506     def is_closing(self):
    507         return self._closing
    508 
    509     def close(self):
    510         if not self._closing:
    511             self._close(None)
    512 
    513     def __del__(self):
    514         if self._pipe is not None:
    515             warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
    516                           source=self)
    517             self._pipe.close()
    518 
    519     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
    520         # should be called by exception handler only
    521         if (isinstance(exc, OSError) and exc.errno == errno.EIO):
    522             if self._loop.get_debug():
    523                 logger.debug("%r: %s", self, message, exc_info=True)
    524         else:
    525             self._loop.call_exception_handler({
    526                 'message': message,
    527                 'exception': exc,
    528                 'transport': self,
    529                 'protocol': self._protocol,
    530             })
    531         self._close(exc)
    532 
    533     def _close(self, exc):
    534         self._closing = True
    535         self._loop._remove_reader(self._fileno)
    536         self._loop.call_soon(self._call_connection_lost, exc)
    537 
    538     def _call_connection_lost(self, exc):
    539         try:
    540             self._protocol.connection_lost(exc)
    541         finally:
    542             self._pipe.close()
    543             self._pipe = None
    544             self._protocol = None
    545             self._loop = None
    546 
    547 
    548 class _UnixWritePipeTransport(transports._FlowControlMixin,
    549                               transports.WriteTransport):
    550 
    551     def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
    552         super().__init__(extra, loop)
    553         self._extra['pipe'] = pipe
    554         self._pipe = pipe
    555         self._fileno = pipe.fileno()
    556         self._protocol = protocol
    557         self._buffer = bytearray()
    558         self._conn_lost = 0
    559         self._closing = False  # Set when close() or write_eof() called.
    560 
    561         mode = os.fstat(self._fileno).st_mode
    562         is_char = stat.S_ISCHR(mode)
    563         is_fifo = stat.S_ISFIFO(mode)
    564         is_socket = stat.S_ISSOCK(mode)
    565         if not (is_char or is_fifo or is_socket):
    566             self._pipe = None
    567             self._fileno = None
    568             self._protocol = None
    569             raise ValueError("Pipe transport is only for "
    570                              "pipes, sockets and character devices")
    571 
    572         os.set_blocking(self._fileno, False)
    573         self._loop.call_soon(self._protocol.connection_made, self)
    574 
    575         # On AIX, the reader trick (to be notified when the read end of the
    576         # socket is closed) only works for sockets. On other platforms it
    577         # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
    578         if is_socket or (is_fifo and not sys.platform.startswith("aix")):
    579             # only start reading when connection_made() has been called
    580             self._loop.call_soon(self._loop._add_reader,
    581                                  self._fileno, self._read_ready)
    582 
    583         if waiter is not None:
    584             # only wake up the waiter when connection_made() has been called
    585             self._loop.call_soon(futures._set_result_unless_cancelled,
    586                                  waiter, None)
    587 
    588     def __repr__(self):
    589         info = [self.__class__.__name__]
    590         if self._pipe is None:
    591             info.append('closed')
    592         elif self._closing:
    593             info.append('closing')
    594         info.append(f'fd={self._fileno}')
    595         selector = getattr(self._loop, '_selector', None)
    596         if self._pipe is not None and selector is not None:
    597             polling = selector_events._test_selector_event(
    598                 selector, self._fileno, selectors.EVENT_WRITE)
    599             if polling:
    600                 info.append('polling')
    601             else:
    602                 info.append('idle')
    603 
    604             bufsize = self.get_write_buffer_size()
    605             info.append(f'bufsize={bufsize}')
    606         elif self._pipe is not None:
    607             info.append('open')
    608         else:
    609             info.append('closed')
    610         return '<{}>'.format(' '.join(info))
    611 
    612     def get_write_buffer_size(self):
    613         return len(self._buffer)
    614 
    615     def _read_ready(self):
    616         # Pipe was closed by peer.
    617         if self._loop.get_debug():
    618             logger.info("%r was closed by peer", self)
    619         if self._buffer:
    620             self._close(BrokenPipeError())
    621         else:
    622             self._close()
    623 
    624     def write(self, data):
    625         assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
    626         if isinstance(data, bytearray):
    627             data = memoryview(data)
    628         if not data:
    629             return
    630 
    631         if self._conn_lost or self._closing:
    632             if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
    633                 logger.warning('pipe closed by peer or '
    634                                'os.write(pipe, data) raised exception.')
    635             self._conn_lost += 1
    636             return
    637 
    638         if not self._buffer:
    639             # Attempt to send it right away first.
    640             try:
    641                 n = os.write(self._fileno, data)
    642             except (BlockingIOError, InterruptedError):
    643                 n = 0
    644             except Exception as exc:
    645                 self._conn_lost += 1
    646                 self._fatal_error(exc, 'Fatal write error on pipe transport')
    647                 return
    648             if n == len(data):
    649                 return
    650             elif n > 0:
    651                 data = memoryview(data)[n:]
    652             self._loop._add_writer(self._fileno, self._write_ready)
    653 
    654         self._buffer += data
    655         self._maybe_pause_protocol()
    656 
    657     def _write_ready(self):
    658         assert self._buffer, 'Data should not be empty'
    659 
    660         try:
    661             n = os.write(self._fileno, self._buffer)
    662         except (BlockingIOError, InterruptedError):
    663             pass
    664         except Exception as exc:
    665             self._buffer.clear()
    666             self._conn_lost += 1
    667             # Remove writer here, _fatal_error() doesn't it
    668             # because _buffer is empty.
    669             self._loop._remove_writer(self._fileno)
    670             self._fatal_error(exc, 'Fatal write error on pipe transport')
    671         else:
    672             if n == len(self._buffer):
    673                 self._buffer.clear()
    674                 self._loop._remove_writer(self._fileno)
    675                 self._maybe_resume_protocol()  # May append to buffer.
    676                 if self._closing:
    677                     self._loop._remove_reader(self._fileno)
    678                     self._call_connection_lost(None)
    679                 return
    680             elif n > 0:
    681                 del self._buffer[:n]
    682 
    683     def can_write_eof(self):
    684         return True
    685 
    686     def write_eof(self):
    687         if self._closing:
    688             return
    689         assert self._pipe
    690         self._closing = True
    691         if not self._buffer:
    692             self._loop._remove_reader(self._fileno)
    693             self._loop.call_soon(self._call_connection_lost, None)
    694 
    695     def set_protocol(self, protocol):
    696         self._protocol = protocol
    697 
    698     def get_protocol(self):
    699         return self._protocol
    700 
    701     def is_closing(self):
    702         return self._closing
    703 
    704     def close(self):
    705         if self._pipe is not None and not self._closing:
    706             # write_eof is all what we needed to close the write pipe
    707             self.write_eof()
    708 
    709     def __del__(self):
    710         if self._pipe is not None:
    711             warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
    712                           source=self)
    713             self._pipe.close()
    714 
    715     def abort(self):
    716         self._close(None)
    717 
    718     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
    719         # should be called by exception handler only
    720         if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
    721             if self._loop.get_debug():
    722                 logger.debug("%r: %s", self, message, exc_info=True)
    723         else:
    724             self._loop.call_exception_handler({
    725                 'message': message,
    726                 'exception': exc,
    727                 'transport': self,
    728                 'protocol': self._protocol,
    729             })
    730         self._close(exc)
    731 
    732     def _close(self, exc=None):
    733         self._closing = True
    734         if self._buffer:
    735             self._loop._remove_writer(self._fileno)
    736         self._buffer.clear()
    737         self._loop._remove_reader(self._fileno)
    738         self._loop.call_soon(self._call_connection_lost, exc)
    739 
    740     def _call_connection_lost(self, exc):
    741         try:
    742             self._protocol.connection_lost(exc)
    743         finally:
    744             self._pipe.close()
    745             self._pipe = None
    746             self._protocol = None
    747             self._loop = None
    748 
    749 
    750 class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
    751 
    752     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
    753         stdin_w = None
    754         if stdin == subprocess.PIPE:
    755             # Use a socket pair for stdin, since not all platforms
    756             # support selecting read events on the write end of a
    757             # socket (which we use in order to detect closing of the
    758             # other end).  Notably this is needed on AIX, and works
    759             # just fine on other platforms.
    760             stdin, stdin_w = socket.socketpair()
    761         self._proc = subprocess.Popen(
    762             args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
    763             universal_newlines=False, bufsize=bufsize, **kwargs)
    764         if stdin_w is not None:
    765             stdin.close()
    766             self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
    767 
    768 
    769 class AbstractChildWatcher:
    770     """Abstract base class for monitoring child processes.
    771 
    772     Objects derived from this class monitor a collection of subprocesses and
    773     report their termination or interruption by a signal.
    774 
    775     New callbacks are registered with .add_child_handler(). Starting a new
    776     process must be done within a 'with' block to allow the watcher to suspend
    777     its activity until the new process if fully registered (this is needed to
    778     prevent a race condition in some implementations).
    779 
    780     Example:
    781         with watcher:
    782             proc = subprocess.Popen("sleep 1")
    783             watcher.add_child_handler(proc.pid, callback)
    784 
    785     Notes:
    786         Implementations of this class must be thread-safe.
    787 
    788         Since child watcher objects may catch the SIGCHLD signal and call
    789         waitpid(-1), there should be only one active object per process.
    790     """
    791 
    792     def add_child_handler(self, pid, callback, *args):
    793         """Register a new child handler.
    794 
    795         Arrange for callback(pid, returncode, *args) to be called when
    796         process 'pid' terminates. Specifying another callback for the same
    797         process replaces the previous handler.
    798 
    799         Note: callback() must be thread-safe.
    800         """
    801         raise NotImplementedError()
    802 
    803     def remove_child_handler(self, pid):
    804         """Removes the handler for process 'pid'.
    805 
    806         The function returns True if the handler was successfully removed,
    807         False if there was nothing to remove."""
    808 
    809         raise NotImplementedError()
    810 
    811     def attach_loop(self, loop):
    812         """Attach the watcher to an event loop.
    813 
    814         If the watcher was previously attached to an event loop, then it is
    815         first detached before attaching to the new loop.
    816 
    817         Note: loop may be None.
    818         """
    819         raise NotImplementedError()
    820 
    821     def close(self):
    822         """Close the watcher.
    823 
    824         This must be called to make sure that any underlying resource is freed.
    825         """
    826         raise NotImplementedError()
    827 
    828     def __enter__(self):
    829         """Enter the watcher's context and allow starting new processes
    830 
    831         This function must return self"""
    832         raise NotImplementedError()
    833 
    834     def __exit__(self, a, b, c):
    835         """Exit the watcher's context"""
    836         raise NotImplementedError()
    837 
    838 
    839 class BaseChildWatcher(AbstractChildWatcher):
    840 
    841     def __init__(self):
    842         self._loop = None
    843         self._callbacks = {}
    844 
    845     def close(self):
    846         self.attach_loop(None)
    847 
    848     def _do_waitpid(self, expected_pid):
    849         raise NotImplementedError()
    850 
    851     def _do_waitpid_all(self):
    852         raise NotImplementedError()
    853 
    854     def attach_loop(self, loop):
    855         assert loop is None or isinstance(loop, events.AbstractEventLoop)
    856 
    857         if self._loop is not None and loop is None and self._callbacks:
    858             warnings.warn(
    859                 'A loop is being detached '
    860                 'from a child watcher with pending handlers',
    861                 RuntimeWarning)
    862 
    863         if self._loop is not None:
    864             self._loop.remove_signal_handler(signal.SIGCHLD)
    865 
    866         self._loop = loop
    867         if loop is not None:
    868             loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
    869 
    870             # Prevent a race condition in case a child terminated
    871             # during the switch.
    872             self._do_waitpid_all()
    873 
    874     def _sig_chld(self):
    875         try:
    876             self._do_waitpid_all()
    877         except Exception as exc:
    878             # self._loop should always be available here
    879             # as '_sig_chld' is added as a signal handler
    880             # in 'attach_loop'
    881             self._loop.call_exception_handler({
    882                 'message': 'Unknown exception in SIGCHLD handler',
    883                 'exception': exc,
    884             })
    885 
    886     def _compute_returncode(self, status):
    887         if os.WIFSIGNALED(status):
    888             # The child process died because of a signal.
    889             return -os.WTERMSIG(status)
    890         elif os.WIFEXITED(status):
    891             # The child process exited (e.g sys.exit()).
    892             return os.WEXITSTATUS(status)
    893         else:
    894             # The child exited, but we don't understand its status.
    895             # This shouldn't happen, but if it does, let's just
    896             # return that status; perhaps that helps debug it.
    897             return status
    898 
    899 
    900 class SafeChildWatcher(BaseChildWatcher):
    901     """'Safe' child watcher implementation.
    902 
    903     This implementation avoids disrupting other code spawning processes by
    904     polling explicitly each process in the SIGCHLD handler instead of calling
    905     os.waitpid(-1).
    906 
    907     This is a safe solution but it has a significant overhead when handling a
    908     big number of children (O(n) each time SIGCHLD is raised)
    909     """
    910 
    911     def close(self):
    912         self._callbacks.clear()
    913         super().close()
    914 
    915     def __enter__(self):
    916         return self
    917 
    918     def __exit__(self, a, b, c):
    919         pass
    920 
    921     def add_child_handler(self, pid, callback, *args):
    922         if self._loop is None:
    923             raise RuntimeError(
    924                 "Cannot add child handler, "
    925                 "the child watcher does not have a loop attached")
    926 
    927         self._callbacks[pid] = (callback, args)
    928 
    929         # Prevent a race condition in case the child is already terminated.
    930         self._do_waitpid(pid)
    931 
    932     def remove_child_handler(self, pid):
    933         try:
    934             del self._callbacks[pid]
    935             return True
    936         except KeyError:
    937             return False
    938 
    939     def _do_waitpid_all(self):
    940 
    941         for pid in list(self._callbacks):
    942             self._do_waitpid(pid)
    943 
    944     def _do_waitpid(self, expected_pid):
    945         assert expected_pid > 0
    946 
    947         try:
    948             pid, status = os.waitpid(expected_pid, os.WNOHANG)
    949         except ChildProcessError:
    950             # The child process is already reaped
    951             # (may happen if waitpid() is called elsewhere).
    952             pid = expected_pid
    953             returncode = 255
    954             logger.warning(
    955                 "Unknown child process pid %d, will report returncode 255",
    956                 pid)
    957         else:
    958             if pid == 0:
    959                 # The child process is still alive.
    960                 return
    961 
    962             returncode = self._compute_returncode(status)
    963             if self._loop.get_debug():
    964                 logger.debug('process %s exited with returncode %s',
    965                              expected_pid, returncode)
    966 
    967         try:
    968             callback, args = self._callbacks.pop(pid)
    969         except KeyError:  # pragma: no cover
    970             # May happen if .remove_child_handler() is called
    971             # after os.waitpid() returns.
    972             if self._loop.get_debug():
    973                 logger.warning("Child watcher got an unexpected pid: %r",
    974                                pid, exc_info=True)
    975         else:
    976             callback(pid, returncode, *args)
    977 
    978 
    979 class FastChildWatcher(BaseChildWatcher):
    980     """'Fast' child watcher implementation.
    981 
    982     This implementation reaps every terminated processes by calling
    983     os.waitpid(-1) directly, possibly breaking other code spawning processes
    984     and waiting for their termination.
    985 
    986     There is no noticeable overhead when handling a big number of children
    987     (O(1) each time a child terminates).
    988     """
    989     def __init__(self):
    990         super().__init__()
    991         self._lock = threading.Lock()
    992         self._zombies = {}
    993         self._forks = 0
    994 
    995     def close(self):
    996         self._callbacks.clear()
    997         self._zombies.clear()
    998         super().close()
    999 
   1000     def __enter__(self):
   1001         with self._lock:
   1002             self._forks += 1
   1003 
   1004             return self
   1005 
   1006     def __exit__(self, a, b, c):
   1007         with self._lock:
   1008             self._forks -= 1
   1009 
   1010             if self._forks or not self._zombies:
   1011                 return
   1012 
   1013             collateral_victims = str(self._zombies)
   1014             self._zombies.clear()
   1015 
   1016         logger.warning(
   1017             "Caught subprocesses termination from unknown pids: %s",
   1018             collateral_victims)
   1019 
   1020     def add_child_handler(self, pid, callback, *args):
   1021         assert self._forks, "Must use the context manager"
   1022 
   1023         if self._loop is None:
   1024             raise RuntimeError(
   1025                 "Cannot add child handler, "
   1026                 "the child watcher does not have a loop attached")
   1027 
   1028         with self._lock:
   1029             try:
   1030                 returncode = self._zombies.pop(pid)
   1031             except KeyError:
   1032                 # The child is running.
   1033                 self._callbacks[pid] = callback, args
   1034                 return
   1035 
   1036         # The child is dead already. We can fire the callback.
   1037         callback(pid, returncode, *args)
   1038 
   1039     def remove_child_handler(self, pid):
   1040         try:
   1041             del self._callbacks[pid]
   1042             return True
   1043         except KeyError:
   1044             return False
   1045 
   1046     def _do_waitpid_all(self):
   1047         # Because of signal coalescing, we must keep calling waitpid() as
   1048         # long as we're able to reap a child.
   1049         while True:
   1050             try:
   1051                 pid, status = os.waitpid(-1, os.WNOHANG)
   1052             except ChildProcessError:
   1053                 # No more child processes exist.
   1054                 return
   1055             else:
   1056                 if pid == 0:
   1057                     # A child process is still alive.
   1058                     return
   1059 
   1060                 returncode = self._compute_returncode(status)
   1061 
   1062             with self._lock:
   1063                 try:
   1064                     callback, args = self._callbacks.pop(pid)
   1065                 except KeyError:
   1066                     # unknown child
   1067                     if self._forks:
   1068                         # It may not be registered yet.
   1069                         self._zombies[pid] = returncode
   1070                         if self._loop.get_debug():
   1071                             logger.debug('unknown process %s exited '
   1072                                          'with returncode %s',
   1073                                          pid, returncode)
   1074                         continue
   1075                     callback = None
   1076                 else:
   1077                     if self._loop.get_debug():
   1078                         logger.debug('process %s exited with returncode %s',
   1079                                      pid, returncode)
   1080 
   1081             if callback is None:
   1082                 logger.warning(
   1083                     "Caught subprocess termination from unknown pid: "
   1084                     "%d -> %d", pid, returncode)
   1085             else:
   1086                 callback(pid, returncode, *args)
   1087 
   1088 
   1089 class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
   1090     """UNIX event loop policy with a watcher for child processes."""
   1091     _loop_factory = _UnixSelectorEventLoop
   1092 
   1093     def __init__(self):
   1094         super().__init__()
   1095         self._watcher = None
   1096 
   1097     def _init_watcher(self):
   1098         with events._lock:
   1099             if self._watcher is None:  # pragma: no branch
   1100                 self._watcher = SafeChildWatcher()
   1101                 if isinstance(threading.current_thread(),
   1102                               threading._MainThread):
   1103                     self._watcher.attach_loop(self._local._loop)
   1104 
   1105     def set_event_loop(self, loop):
   1106         """Set the event loop.
   1107 
   1108         As a side effect, if a child watcher was set before, then calling
   1109         .set_event_loop() from the main thread will call .attach_loop(loop) on
   1110         the child watcher.
   1111         """
   1112 
   1113         super().set_event_loop(loop)
   1114 
   1115         if (self._watcher is not None and
   1116                 isinstance(threading.current_thread(), threading._MainThread)):
   1117             self._watcher.attach_loop(loop)
   1118 
   1119     def get_child_watcher(self):
   1120         """Get the watcher for child processes.
   1121 
   1122         If not yet set, a SafeChildWatcher object is automatically created.
   1123         """
   1124         if self._watcher is None:
   1125             self._init_watcher()
   1126 
   1127         return self._watcher
   1128 
   1129     def set_child_watcher(self, watcher):
   1130         """Set the watcher for child processes."""
   1131 
   1132         assert watcher is None or isinstance(watcher, AbstractChildWatcher)
   1133 
   1134         if self._watcher is not None:
   1135             self._watcher.close()
   1136 
   1137         self._watcher = watcher
   1138 
   1139 
   1140 SelectorEventLoop = _UnixSelectorEventLoop
   1141 DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
   1142