Home | History | Annotate | Download | only in asyncio
      1 """Event loop using a proactor and related classes.
      2 
      3 A proactor is a "notify-on-completion" multiplexer.  Currently a
      4 proactor is only implemented on Windows with IOCP.
      5 """
      6 
      7 __all__ = ['BaseProactorEventLoop']
      8 
      9 import socket
     10 import warnings
     11 
     12 from . import base_events
     13 from . import compat
     14 from . import constants
     15 from . import futures
     16 from . import sslproto
     17 from . import transports
     18 from .log import logger
     19 
     20 
     21 class _ProactorBasePipeTransport(transports._FlowControlMixin,
     22                                  transports.BaseTransport):
     23     """Base class for pipe and socket transports."""
     24 
     25     def __init__(self, loop, sock, protocol, waiter=None,
     26                  extra=None, server=None):
     27         super().__init__(extra, loop)
     28         self._set_extra(sock)
     29         self._sock = sock
     30         self._protocol = protocol
     31         self._server = server
     32         self._buffer = None  # None or bytearray.
     33         self._read_fut = None
     34         self._write_fut = None
     35         self._pending_write = 0
     36         self._conn_lost = 0
     37         self._closing = False  # Set when close() called.
     38         self._eof_written = False
     39         if self._server is not None:
     40             self._server._attach()
     41         self._loop.call_soon(self._protocol.connection_made, self)
     42         if waiter is not None:
     43             # only wake up the waiter when connection_made() has been called
     44             self._loop.call_soon(futures._set_result_unless_cancelled,
     45                                  waiter, None)
     46 
     47     def __repr__(self):
     48         info = [self.__class__.__name__]
     49         if self._sock is None:
     50             info.append('closed')
     51         elif self._closing:
     52             info.append('closing')
     53         if self._sock is not None:
     54             info.append('fd=%s' % self._sock.fileno())
     55         if self._read_fut is not None:
     56             info.append('read=%s' % self._read_fut)
     57         if self._write_fut is not None:
     58             info.append("write=%r" % self._write_fut)
     59         if self._buffer:
     60             bufsize = len(self._buffer)
     61             info.append('write_bufsize=%s' % bufsize)
     62         if self._eof_written:
     63             info.append('EOF written')
     64         return '<%s>' % ' '.join(info)
     65 
     66     def _set_extra(self, sock):
     67         self._extra['pipe'] = sock
     68 
     69     def set_protocol(self, protocol):
     70         self._protocol = protocol
     71 
     72     def get_protocol(self):
     73         return self._protocol
     74 
     75     def is_closing(self):
     76         return self._closing
     77 
     78     def close(self):
     79         if self._closing:
     80             return
     81         self._closing = True
     82         self._conn_lost += 1
     83         if not self._buffer and self._write_fut is None:
     84             self._loop.call_soon(self._call_connection_lost, None)
     85         if self._read_fut is not None:
     86             self._read_fut.cancel()
     87             self._read_fut = None
     88 
     89     # On Python 3.3 and older, objects with a destructor part of a reference
     90     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
     91     # to the PEP 442.
     92     if compat.PY34:
     93         def __del__(self):
     94             if self._sock is not None:
     95                 warnings.warn("unclosed transport %r" % self, ResourceWarning,
     96                               source=self)
     97                 self.close()
     98 
     99     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
    100         if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
    101             if self._loop.get_debug():
    102                 logger.debug("%r: %s", self, message, exc_info=True)
    103         else:
    104             self._loop.call_exception_handler({
    105                 'message': message,
    106                 'exception': exc,
    107                 'transport': self,
    108                 'protocol': self._protocol,
    109             })
    110         self._force_close(exc)
    111 
    112     def _force_close(self, exc):
    113         if self._closing:
    114             return
    115         self._closing = True
    116         self._conn_lost += 1
    117         if self._write_fut:
    118             self._write_fut.cancel()
    119             self._write_fut = None
    120         if self._read_fut:
    121             self._read_fut.cancel()
    122             self._read_fut = None
    123         self._pending_write = 0
    124         self._buffer = None
    125         self._loop.call_soon(self._call_connection_lost, exc)
    126 
    127     def _call_connection_lost(self, exc):
    128         try:
    129             self._protocol.connection_lost(exc)
    130         finally:
    131             # XXX If there is a pending overlapped read on the other
    132             # end then it may fail with ERROR_NETNAME_DELETED if we
    133             # just close our end.  First calling shutdown() seems to
    134             # cure it, but maybe using DisconnectEx() would be better.
    135             if hasattr(self._sock, 'shutdown'):
    136                 self._sock.shutdown(socket.SHUT_RDWR)
    137             self._sock.close()
    138             self._sock = None
    139             server = self._server
    140             if server is not None:
    141                 server._detach()
    142                 self._server = None
    143 
    144     def get_write_buffer_size(self):
    145         size = self._pending_write
    146         if self._buffer is not None:
    147             size += len(self._buffer)
    148         return size
    149 
    150 
    151 class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
    152                                  transports.ReadTransport):
    153     """Transport for read pipes."""
    154 
    155     def __init__(self, loop, sock, protocol, waiter=None,
    156                  extra=None, server=None):
    157         super().__init__(loop, sock, protocol, waiter, extra, server)
    158         self._paused = False
    159         self._loop.call_soon(self._loop_reading)
    160 
    161     def pause_reading(self):
    162         if self._closing:
    163             raise RuntimeError('Cannot pause_reading() when closing')
    164         if self._paused:
    165             raise RuntimeError('Already paused')
    166         self._paused = True
    167         if self._loop.get_debug():
    168             logger.debug("%r pauses reading", self)
    169 
    170     def resume_reading(self):
    171         if not self._paused:
    172             raise RuntimeError('Not paused')
    173         self._paused = False
    174         if self._closing:
    175             return
    176         self._loop.call_soon(self._loop_reading, self._read_fut)
    177         if self._loop.get_debug():
    178             logger.debug("%r resumes reading", self)
    179 
    180     def _loop_reading(self, fut=None):
    181         if self._paused:
    182             return
    183         data = None
    184 
    185         try:
    186             if fut is not None:
    187                 assert self._read_fut is fut or (self._read_fut is None and
    188                                                  self._closing)
    189                 self._read_fut = None
    190                 data = fut.result()  # deliver data later in "finally" clause
    191 
    192             if self._closing:
    193                 # since close() has been called we ignore any read data
    194                 data = None
    195                 return
    196 
    197             if data == b'':
    198                 # we got end-of-file so no need to reschedule a new read
    199                 return
    200 
    201             # reschedule a new read
    202             self._read_fut = self._loop._proactor.recv(self._sock, 4096)
    203         except ConnectionAbortedError as exc:
    204             if not self._closing:
    205                 self._fatal_error(exc, 'Fatal read error on pipe transport')
    206             elif self._loop.get_debug():
    207                 logger.debug("Read error on pipe transport while closing",
    208                              exc_info=True)
    209         except ConnectionResetError as exc:
    210             self._force_close(exc)
    211         except OSError as exc:
    212             self._fatal_error(exc, 'Fatal read error on pipe transport')
    213         except futures.CancelledError:
    214             if not self._closing:
    215                 raise
    216         else:
    217             self._read_fut.add_done_callback(self._loop_reading)
    218         finally:
    219             if data:
    220                 self._protocol.data_received(data)
    221             elif data is not None:
    222                 if self._loop.get_debug():
    223                     logger.debug("%r received EOF", self)
    224                 keep_open = self._protocol.eof_received()
    225                 if not keep_open:
    226                     self.close()
    227 
    228 
    229 class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
    230                                       transports.WriteTransport):
    231     """Transport for write pipes."""
    232 
    233     def write(self, data):
    234         if not isinstance(data, (bytes, bytearray, memoryview)):
    235             raise TypeError('data argument must be byte-ish (%r)',
    236                             type(data))
    237         if self._eof_written:
    238             raise RuntimeError('write_eof() already called')
    239 
    240         if not data:
    241             return
    242 
    243         if self._conn_lost:
    244             if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
    245                 logger.warning('socket.send() raised exception.')
    246             self._conn_lost += 1
    247             return
    248 
    249         # Observable states:
    250         # 1. IDLE: _write_fut and _buffer both None
    251         # 2. WRITING: _write_fut set; _buffer None
    252         # 3. BACKED UP: _write_fut set; _buffer a bytearray
    253         # We always copy the data, so the caller can't modify it
    254         # while we're still waiting for the I/O to happen.
    255         if self._write_fut is None:  # IDLE -> WRITING
    256             assert self._buffer is None
    257             # Pass a copy, except if it's already immutable.
    258             self._loop_writing(data=bytes(data))
    259         elif not self._buffer:  # WRITING -> BACKED UP
    260             # Make a mutable copy which we can extend.
    261             self._buffer = bytearray(data)
    262             self._maybe_pause_protocol()
    263         else:  # BACKED UP
    264             # Append to buffer (also copies).
    265             self._buffer.extend(data)
    266             self._maybe_pause_protocol()
    267 
    268     def _loop_writing(self, f=None, data=None):
    269         try:
    270             assert f is self._write_fut
    271             self._write_fut = None
    272             self._pending_write = 0
    273             if f:
    274                 f.result()
    275             if data is None:
    276                 data = self._buffer
    277                 self._buffer = None
    278             if not data:
    279                 if self._closing:
    280                     self._loop.call_soon(self._call_connection_lost, None)
    281                 if self._eof_written:
    282                     self._sock.shutdown(socket.SHUT_WR)
    283                 # Now that we've reduced the buffer size, tell the
    284                 # protocol to resume writing if it was paused.  Note that
    285                 # we do this last since the callback is called immediately
    286                 # and it may add more data to the buffer (even causing the
    287                 # protocol to be paused again).
    288                 self._maybe_resume_protocol()
    289             else:
    290                 self._write_fut = self._loop._proactor.send(self._sock, data)
    291                 if not self._write_fut.done():
    292                     assert self._pending_write == 0
    293                     self._pending_write = len(data)
    294                     self._write_fut.add_done_callback(self._loop_writing)
    295                     self._maybe_pause_protocol()
    296                 else:
    297                     self._write_fut.add_done_callback(self._loop_writing)
    298         except ConnectionResetError as exc:
    299             self._force_close(exc)
    300         except OSError as exc:
    301             self._fatal_error(exc, 'Fatal write error on pipe transport')
    302 
    303     def can_write_eof(self):
    304         return True
    305 
    306     def write_eof(self):
    307         self.close()
    308 
    309     def abort(self):
    310         self._force_close(None)
    311 
    312 
    313 class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
    314     def __init__(self, *args, **kw):
    315         super().__init__(*args, **kw)
    316         self._read_fut = self._loop._proactor.recv(self._sock, 16)
    317         self._read_fut.add_done_callback(self._pipe_closed)
    318 
    319     def _pipe_closed(self, fut):
    320         if fut.cancelled():
    321             # the transport has been closed
    322             return
    323         assert fut.result() == b''
    324         if self._closing:
    325             assert self._read_fut is None
    326             return
    327         assert fut is self._read_fut, (fut, self._read_fut)
    328         self._read_fut = None
    329         if self._write_fut is not None:
    330             self._force_close(BrokenPipeError())
    331         else:
    332             self.close()
    333 
    334 
    335 class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
    336                                    _ProactorBaseWritePipeTransport,
    337                                    transports.Transport):
    338     """Transport for duplex pipes."""
    339 
    340     def can_write_eof(self):
    341         return False
    342 
    343     def write_eof(self):
    344         raise NotImplementedError
    345 
    346 
    347 class _ProactorSocketTransport(_ProactorReadPipeTransport,
    348                                _ProactorBaseWritePipeTransport,
    349                                transports.Transport):
    350     """Transport for connected sockets."""
    351 
    352     def _set_extra(self, sock):
    353         self._extra['socket'] = sock
    354         try:
    355             self._extra['sockname'] = sock.getsockname()
    356         except (socket.error, AttributeError):
    357             if self._loop.get_debug():
    358                 logger.warning("getsockname() failed on %r",
    359                              sock, exc_info=True)
    360         if 'peername' not in self._extra:
    361             try:
    362                 self._extra['peername'] = sock.getpeername()
    363             except (socket.error, AttributeError):
    364                 if self._loop.get_debug():
    365                     logger.warning("getpeername() failed on %r",
    366                                    sock, exc_info=True)
    367 
    368     def can_write_eof(self):
    369         return True
    370 
    371     def write_eof(self):
    372         if self._closing or self._eof_written:
    373             return
    374         self._eof_written = True
    375         if self._write_fut is None:
    376             self._sock.shutdown(socket.SHUT_WR)
    377 
    378 
    379 class BaseProactorEventLoop(base_events.BaseEventLoop):
    380 
    381     def __init__(self, proactor):
    382         super().__init__()
    383         logger.debug('Using proactor: %s', proactor.__class__.__name__)
    384         self._proactor = proactor
    385         self._selector = proactor   # convenient alias
    386         self._self_reading_future = None
    387         self._accept_futures = {}   # socket file descriptor => Future
    388         proactor.set_loop(self)
    389         self._make_self_pipe()
    390 
    391     def _make_socket_transport(self, sock, protocol, waiter=None,
    392                                extra=None, server=None):
    393         return _ProactorSocketTransport(self, sock, protocol, waiter,
    394                                         extra, server)
    395 
    396     def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
    397                             *, server_side=False, server_hostname=None,
    398                             extra=None, server=None):
    399         if not sslproto._is_sslproto_available():
    400             raise NotImplementedError("Proactor event loop requires Python 3.5"
    401                                       " or newer (ssl.MemoryBIO) to support "
    402                                       "SSL")
    403 
    404         ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
    405                                             server_side, server_hostname)
    406         _ProactorSocketTransport(self, rawsock, ssl_protocol,
    407                                  extra=extra, server=server)
    408         return ssl_protocol._app_transport
    409 
    410     def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
    411                                     extra=None):
    412         return _ProactorDuplexPipeTransport(self,
    413                                             sock, protocol, waiter, extra)
    414 
    415     def _make_read_pipe_transport(self, sock, protocol, waiter=None,
    416                                   extra=None):
    417         return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
    418 
    419     def _make_write_pipe_transport(self, sock, protocol, waiter=None,
    420                                    extra=None):
    421         # We want connection_lost() to be called when other end closes
    422         return _ProactorWritePipeTransport(self,
    423                                            sock, protocol, waiter, extra)
    424 
    425     def close(self):
    426         if self.is_running():
    427             raise RuntimeError("Cannot close a running event loop")
    428         if self.is_closed():
    429             return
    430 
    431         # Call these methods before closing the event loop (before calling
    432         # BaseEventLoop.close), because they can schedule callbacks with
    433         # call_soon(), which is forbidden when the event loop is closed.
    434         self._stop_accept_futures()
    435         self._close_self_pipe()
    436         self._proactor.close()
    437         self._proactor = None
    438         self._selector = None
    439 
    440         # Close the event loop
    441         super().close()
    442 
    443     def sock_recv(self, sock, n):
    444         return self._proactor.recv(sock, n)
    445 
    446     def sock_sendall(self, sock, data):
    447         return self._proactor.send(sock, data)
    448 
    449     def sock_connect(self, sock, address):
    450         return self._proactor.connect(sock, address)
    451 
    452     def sock_accept(self, sock):
    453         return self._proactor.accept(sock)
    454 
    455     def _socketpair(self):
    456         raise NotImplementedError
    457 
    458     def _close_self_pipe(self):
    459         if self._self_reading_future is not None:
    460             self._self_reading_future.cancel()
    461             self._self_reading_future = None
    462         self._ssock.close()
    463         self._ssock = None
    464         self._csock.close()
    465         self._csock = None
    466         self._internal_fds -= 1
    467 
    468     def _make_self_pipe(self):
    469         # A self-socket, really. :-)
    470         self._ssock, self._csock = self._socketpair()
    471         self._ssock.setblocking(False)
    472         self._csock.setblocking(False)
    473         self._internal_fds += 1
    474         self.call_soon(self._loop_self_reading)
    475 
    476     def _loop_self_reading(self, f=None):
    477         try:
    478             if f is not None:
    479                 f.result()  # may raise
    480             f = self._proactor.recv(self._ssock, 4096)
    481         except futures.CancelledError:
    482             # _close_self_pipe() has been called, stop waiting for data
    483             return
    484         except Exception as exc:
    485             self.call_exception_handler({
    486                 'message': 'Error on reading from the event loop self pipe',
    487                 'exception': exc,
    488                 'loop': self,
    489             })
    490         else:
    491             self._self_reading_future = f
    492             f.add_done_callback(self._loop_self_reading)
    493 
    494     def _write_to_self(self):
    495         self._csock.send(b'\0')
    496 
    497     def _start_serving(self, protocol_factory, sock,
    498                        sslcontext=None, server=None, backlog=100):
    499 
    500         def loop(f=None):
    501             try:
    502                 if f is not None:
    503                     conn, addr = f.result()
    504                     if self._debug:
    505                         logger.debug("%r got a new connection from %r: %r",
    506                                      server, addr, conn)
    507                     protocol = protocol_factory()
    508                     if sslcontext is not None:
    509                         self._make_ssl_transport(
    510                             conn, protocol, sslcontext, server_side=True,
    511                             extra={'peername': addr}, server=server)
    512                     else:
    513                         self._make_socket_transport(
    514                             conn, protocol,
    515                             extra={'peername': addr}, server=server)
    516                 if self.is_closed():
    517                     return
    518                 f = self._proactor.accept(sock)
    519             except OSError as exc:
    520                 if sock.fileno() != -1:
    521                     self.call_exception_handler({
    522                         'message': 'Accept failed on a socket',
    523                         'exception': exc,
    524                         'socket': sock,
    525                     })
    526                     sock.close()
    527                 elif self._debug:
    528                     logger.debug("Accept failed on socket %r",
    529                                  sock, exc_info=True)
    530             except futures.CancelledError:
    531                 sock.close()
    532             else:
    533                 self._accept_futures[sock.fileno()] = f
    534                 f.add_done_callback(loop)
    535 
    536         self.call_soon(loop)
    537 
    538     def _process_events(self, event_list):
    539         # Events are processed in the IocpProactor._poll() method
    540         pass
    541 
    542     def _stop_accept_futures(self):
    543         for future in self._accept_futures.values():
    544             future.cancel()
    545         self._accept_futures.clear()
    546 
    547     def _stop_serving(self, sock):
    548         self._stop_accept_futures()
    549         self._proactor._stop_serving(sock)
    550         sock.close()
    551