Home | History | Annotate | Download | only in library
      1 .. currentmodule:: asyncio
      2 
      3 
      4 .. _asyncio-transports-protocols:
      5 
      6 
      7 ========================
      8 Transports and Protocols
      9 ========================
     10 
     11 .. rubric:: Preface
     12 
     13 Transports and Protocols are used by the **low-level** event loop
     14 APIs such as :meth:`loop.create_connection`.  They use
     15 callback-based programming style and enable high-performance
     16 implementations of network or IPC protocols (e.g. HTTP).
     17 
     18 Essentially, transports and protocols should only be used in
     19 libraries and frameworks and never in high-level asyncio
     20 applications.
     21 
     22 This documentation page covers both `Transports`_ and `Protocols`_.
     23 
     24 .. rubric:: Introduction
     25 
     26 At the highest level, the transport is concerned with *how* bytes
     27 are transmitted, while the protocol determines *which* bytes to
     28 transmit (and to some extent when).
     29 
     30 A different way of saying the same thing: a transport is an
     31 abstraction for a socket (or similar I/O endpoint) while a protocol
     32 is an abstraction for an application, from the transport's point
     33 of view.
     34 
     35 Yet another view is the transport and protocol interfaces
     36 together define an abstract interface for using network I/O and
     37 interprocess I/O.
     38 
     39 There is always a 1:1 relationship between transport and protocol
     40 objects: the protocol calls transport methods to send data,
     41 while the transport calls protocol methods to pass it data that
     42 has been received.
     43 
     44 Most of connection oriented event loop methods
     45 (such as :meth:`loop.create_connection`) usually accept a
     46 *protocol_factory* argument used to create a *Protocol* object
     47 for an accepted connection, represented by a *Transport* object.
     48 Such methods usually return a tuple of ``(transport, protocol)``.
     49 
     50 .. rubric:: Contents
     51 
     52 This documentation page contains the following sections:
     53 
     54 * The `Transports`_ section documents asyncio :class:`BaseTransport`,
     55   :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`,
     56   :class:`DatagramTransport`, and :class:`SubprocessTransport`
     57   classes.
     58 
     59 * The `Protocols`_ section documents asyncio :class:`BaseProtocol`,
     60   :class:`Protocol`, :class:`BufferedProtocol`,
     61   :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes.
     62 
     63 * The `Examples`_ section showcases how to work with transports,
     64   protocols, and low-level event loop APIs.
     65 
     66 
     67 .. _asyncio-transport:
     68 
     69 Transports
     70 ==========
     71 
     72 Transports are classes provided by :mod:`asyncio` in order to abstract
     73 various kinds of communication channels.
     74 
     75 Transport objects are always instantiated by an
     76 ref:`asyncio event loop <asyncio-event-loop>`.
     77 
     78 asyncio implements transports for TCP, UDP, SSL, and subprocess pipes.
     79 The methods available on a transport depend on the transport's kind.
     80 
     81 The transport classes are :ref:`not thread safe <asyncio-multithreading>`.
     82 
     83 
     84 Transports Hierarchy
     85 --------------------
     86 
     87 .. class:: BaseTransport
     88 
     89    Base class for all transports.  Contains methods that all
     90    asyncio transports share.
     91 
     92 .. class:: WriteTransport(BaseTransport)
     93 
     94    A base transport for write-only connections.
     95 
     96    Instances of the *WriteTransport* class are returned from
     97    the :meth:`loop.connect_write_pipe` event loop method and
     98    are also used by subprocess-related methods like
     99    :meth:`loop.subprocess_exec`.
    100 
    101 .. class:: ReadTransport(BaseTransport)
    102 
    103    A base transport for read-only connections.
    104 
    105    Instances of the *ReadTransport* class are returned from
    106    the :meth:`loop.connect_read_pipe` event loop method and
    107    are also used by subprocess-related methods like
    108    :meth:`loop.subprocess_exec`.
    109 
    110 .. class:: Transport(WriteTransport, ReadTransport)
    111 
    112    Interface representing a bidirectional transport, such as a
    113    TCP connection.
    114 
    115    The user does not instantiate a transport directly; they call a
    116    utility function, passing it a protocol factory and other
    117    information necessary to create the transport and protocol.
    118 
    119    Instances of the *Transport* class are returned from or used by
    120    event loop methods like :meth:`loop.create_connection`,
    121    :meth:`loop.create_unix_connection`,
    122    :meth:`loop.create_server`, :meth:`loop.sendfile`, etc.
    123 
    124 
    125 .. class:: DatagramTransport(BaseTransport)
    126 
    127    A transport for datagram (UDP) connections.
    128 
    129    Instances of the *DatagramTransport* class are returned from
    130    the :meth:`loop.create_datagram_endpoint` event loop method.
    131 
    132 
    133 .. class:: SubprocessTransport(BaseTransport)
    134 
    135    An abstraction to represent a connection between a parent and its
    136    child OS process.
    137 
    138    Instances of the *SubprocessTransport* class are returned from
    139    event loop methods :meth:`loop.subprocess_shell` and
    140    :meth:`loop.subprocess_exec`.
    141 
    142 
    143 Base Transport
    144 --------------
    145 
    146 .. method:: BaseTransport.close()
    147 
    148    Close the transport.
    149 
    150    If the transport has a buffer for outgoing
    151    data, buffered data will be flushed asynchronously.  No more data
    152    will be received.  After all buffered data is flushed, the
    153    protocol's :meth:`protocol.connection_lost()
    154    <BaseProtocol.connection_lost>` method will be called with
    155    :const:`None` as its argument.
    156 
    157 .. method:: BaseTransport.is_closing()
    158 
    159    Return ``True`` if the transport is closing or is closed.
    160 
    161 .. method:: BaseTransport.get_extra_info(name, default=None)
    162 
    163    Return information about the transport or underlying resources
    164    it uses.
    165 
    166    *name* is a string representing the piece of transport-specific
    167    information to get.
    168 
    169    *default* is the value to return if the information is not
    170    available, or if the transport does not support querying it
    171    with the given third-party event loop implementation or on the
    172    current platform.
    173 
    174    For example, the following code attempts to get the underlying
    175    socket object of the transport::
    176 
    177       sock = transport.get_extra_info('socket')
    178       if sock is not None:
    179           print(sock.getsockopt(...))
    180 
    181    Categories of information that can be queried on some transports:
    182 
    183    * socket:
    184 
    185      - ``'peername'``: the remote address to which the socket is
    186        connected, result of :meth:`socket.socket.getpeername`
    187        (``None`` on error)
    188 
    189      - ``'socket'``: :class:`socket.socket` instance
    190 
    191      - ``'sockname'``: the socket's own address,
    192        result of :meth:`socket.socket.getsockname`
    193 
    194    * SSL socket:
    195 
    196      - ``'compression'``: the compression algorithm being used as a
    197        string, or ``None`` if the connection isn't compressed; result
    198        of :meth:`ssl.SSLSocket.compression`
    199 
    200      - ``'cipher'``: a three-value tuple containing the name of the
    201        cipher being used, the version of the SSL protocol that defines
    202        its use, and the number of secret bits being used; result of
    203        :meth:`ssl.SSLSocket.cipher`
    204 
    205      - ``'peercert'``: peer certificate; result of
    206        :meth:`ssl.SSLSocket.getpeercert`
    207 
    208      - ``'sslcontext'``: :class:`ssl.SSLContext` instance
    209 
    210      - ``'ssl_object'``: :class:`ssl.SSLObject` or
    211        :class:`ssl.SSLSocket` instance
    212 
    213    * pipe:
    214 
    215      - ``'pipe'``: pipe object
    216 
    217    * subprocess:
    218 
    219      - ``'subprocess'``: :class:`subprocess.Popen` instance
    220 
    221 .. method:: BaseTransport.set_protocol(protocol)
    222 
    223    Set a new protocol.
    224 
    225    Switching protocol should only be done when both
    226    protocols are documented to support the switch.
    227 
    228 .. method:: BaseTransport.get_protocol()
    229 
    230    Return the current protocol.
    231 
    232 
    233 Read-only Transports
    234 --------------------
    235 
    236 .. method:: ReadTransport.is_reading()
    237 
    238    Return ``True`` if the transport is receiving new data.
    239 
    240    .. versionadded:: 3.7
    241 
    242 .. method:: ReadTransport.pause_reading()
    243 
    244    Pause the receiving end of the transport.  No data will be passed to
    245    the protocol's :meth:`protocol.data_received() <Protocol.data_received>`
    246    method until :meth:`resume_reading` is called.
    247 
    248    .. versionchanged:: 3.7
    249       The method is idempotent, i.e. it can be called when the
    250       transport is already paused or closed.
    251 
    252 .. method:: ReadTransport.resume_reading()
    253 
    254    Resume the receiving end.  The protocol's
    255    :meth:`protocol.data_received() <Protocol.data_received>` method
    256    will be called once again if some data is available for reading.
    257 
    258    .. versionchanged:: 3.7
    259       The method is idempotent, i.e. it can be called when the
    260       transport is already reading.
    261 
    262 
    263 Write-only Transports
    264 ---------------------
    265 
    266 .. method:: WriteTransport.abort()
    267 
    268    Close the transport immediately, without waiting for pending operations
    269    to complete.  Buffered data will be lost.  No more data will be received.
    270    The protocol's :meth:`protocol.connection_lost()
    271    <BaseProtocol.connection_lost>` method will eventually be
    272    called with :const:`None` as its argument.
    273 
    274 .. method:: WriteTransport.can_write_eof()
    275 
    276    Return :const:`True` if the transport supports
    277    :meth:`~WriteTransport.write_eof`, :const:`False` if not.
    278 
    279 .. method:: WriteTransport.get_write_buffer_size()
    280 
    281    Return the current size of the output buffer used by the transport.
    282 
    283 .. method:: WriteTransport.get_write_buffer_limits()
    284 
    285    Get the *high* and *low* watermarks for write flow control. Return a
    286    tuple ``(low, high)`` where *low* and *high* are positive number of
    287    bytes.
    288 
    289    Use :meth:`set_write_buffer_limits` to set the limits.
    290 
    291    .. versionadded:: 3.4.2
    292 
    293 .. method:: WriteTransport.set_write_buffer_limits(high=None, low=None)
    294 
    295    Set the *high* and *low* watermarks for write flow control.
    296 
    297    These two values (measured in number of
    298    bytes) control when the protocol's
    299    :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>`
    300    and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>`
    301    methods are called. If specified, the low watermark must be less
    302    than or equal to the high watermark.  Neither *high* nor *low*
    303    can be negative.
    304 
    305    :meth:`~BaseProtocol.pause_writing` is called when the buffer size
    306    becomes greater than or equal to the *high* value. If writing has
    307    been paused, :meth:`~BaseProtocol.resume_writing` is called when
    308    the buffer size becomes less than or equal to the *low* value.
    309 
    310    The defaults are implementation-specific.  If only the
    311    high watermark is given, the low watermark defaults to an
    312    implementation-specific value less than or equal to the
    313    high watermark.  Setting *high* to zero forces *low* to zero as
    314    well, and causes :meth:`~BaseProtocol.pause_writing` to be called
    315    whenever the buffer becomes non-empty.  Setting *low* to zero causes
    316    :meth:`~BaseProtocol.resume_writing` to be called only once the
    317    buffer is empty. Use of zero for either limit is generally
    318    sub-optimal as it reduces opportunities for doing I/O and
    319    computation concurrently.
    320 
    321    Use :meth:`~WriteTransport.get_write_buffer_limits`
    322    to get the limits.
    323 
    324 .. method:: WriteTransport.write(data)
    325 
    326    Write some *data* bytes to the transport.
    327 
    328    This method does not block; it buffers the data and arranges for it
    329    to be sent out asynchronously.
    330 
    331 .. method:: WriteTransport.writelines(list_of_data)
    332 
    333    Write a list (or any iterable) of data bytes to the transport.
    334    This is functionally equivalent to calling :meth:`write` on each
    335    element yielded by the iterable, but may be implemented more
    336    efficiently.
    337 
    338 .. method:: WriteTransport.write_eof()
    339 
    340    Close the write end of the transport after flushing all buffered data.
    341    Data may still be received.
    342 
    343    This method can raise :exc:`NotImplementedError` if the transport
    344    (e.g. SSL) doesn't support half-closed connections.
    345 
    346 
    347 Datagram Transports
    348 -------------------
    349 
    350 .. method:: DatagramTransport.sendto(data, addr=None)
    351 
    352    Send the *data* bytes to the remote peer given by *addr* (a
    353    transport-dependent target address).  If *addr* is :const:`None`,
    354    the data is sent to the target address given on transport
    355    creation.
    356 
    357    This method does not block; it buffers the data and arranges
    358    for it to be sent out asynchronously.
    359 
    360 .. method:: DatagramTransport.abort()
    361 
    362    Close the transport immediately, without waiting for pending
    363    operations to complete.  Buffered data will be lost.
    364    No more data will be received.  The protocol's
    365    :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>`
    366    method will eventually be called with :const:`None` as its argument.
    367 
    368 
    369 .. _asyncio-subprocess-transports:
    370 
    371 Subprocess Transports
    372 ---------------------
    373 
    374 .. method:: SubprocessTransport.get_pid()
    375 
    376    Return the subprocess process id as an integer.
    377 
    378 .. method:: SubprocessTransport.get_pipe_transport(fd)
    379 
    380    Return the transport for the communication pipe corresponding to the
    381    integer file descriptor *fd*:
    382 
    383    * ``0``: readable streaming transport of the standard input (*stdin*),
    384      or :const:`None` if the subprocess was not created with ``stdin=PIPE``
    385    * ``1``: writable streaming transport of the standard output (*stdout*),
    386      or :const:`None` if the subprocess was not created with ``stdout=PIPE``
    387    * ``2``: writable streaming transport of the standard error (*stderr*),
    388      or :const:`None` if the subprocess was not created with ``stderr=PIPE``
    389    * other *fd*: :const:`None`
    390 
    391 .. method:: SubprocessTransport.get_returncode()
    392 
    393    Return the subprocess return code as an integer or :const:`None`
    394    if it hasn't returned, which is similar to the
    395    :attr:`subprocess.Popen.returncode` attribute.
    396 
    397 .. method:: SubprocessTransport.kill()
    398 
    399    Kill the subprocess.
    400 
    401    On POSIX systems, the function sends SIGKILL to the subprocess.
    402    On Windows, this method is an alias for :meth:`terminate`.
    403 
    404    See also :meth:`subprocess.Popen.kill`.
    405 
    406 .. method:: SubprocessTransport.send_signal(signal)
    407 
    408    Send the *signal* number to the subprocess, as in
    409    :meth:`subprocess.Popen.send_signal`.
    410 
    411 .. method:: SubprocessTransport.terminate()
    412 
    413    Stop the subprocess.
    414 
    415    On POSIX systems, this method sends SIGTERM to the subprocess.
    416    On Windows, the Windows API function TerminateProcess() is called to
    417    stop the subprocess.
    418 
    419    See also :meth:`subprocess.Popen.terminate`.
    420 
    421 .. method:: SubprocessTransport.close()
    422 
    423    Kill the subprocess by calling the :meth:`kill` method.
    424 
    425    If the subprocess hasn't returned yet, and close transports of
    426    *stdin*, *stdout*, and *stderr* pipes.
    427 
    428 
    429 .. _asyncio-protocol:
    430 
    431 Protocols
    432 =========
    433 
    434 asyncio provides a set of abstract base classes that should be used
    435 to implement network protocols.  Those classes are meant to be used
    436 together with :ref:`transports <asyncio-transport>`.
    437 
    438 Subclasses of abstract base protocol classes may implement some or
    439 all methods.  All these methods are callbacks: they are called by
    440 transports on certain events, for example when some data is received.
    441 A base protocol method should be called by the corresponding transport.
    442 
    443 
    444 Base Protocols
    445 --------------
    446 
    447 .. class:: BaseProtocol
    448 
    449    Base protocol with methods that all protocols share.
    450 
    451 .. class:: Protocol(BaseProtocol)
    452 
    453    The base class for implementing streaming protocols
    454    (TCP, Unix sockets, etc).
    455 
    456 .. class:: BufferedProtocol(BaseProtocol)
    457 
    458    A base class for implementing streaming protocols with manual
    459    control of the receive buffer.
    460 
    461 .. class:: DatagramProtocol(BaseProtocol)
    462 
    463    The base class for implementing datagram (UDP) protocols.
    464 
    465 .. class:: SubprocessProtocol(BaseProtocol)
    466 
    467    The base class for implementing protocols communicating with child
    468    processes (unidirectional pipes).
    469 
    470 
    471 Base Protocol
    472 -------------
    473 
    474 All asyncio protocols can implement Base Protocol callbacks.
    475 
    476 .. rubric:: Connection Callbacks
    477 
    478 Connection callbacks are called on all protocols, exactly once per
    479 a successful connection.  All other protocol callbacks can only be
    480 called between those two methods.
    481 
    482 .. method:: BaseProtocol.connection_made(transport)
    483 
    484    Called when a connection is made.
    485 
    486    The *transport* argument is the transport representing the
    487    connection.  The protocol is responsible for storing the reference
    488    to its transport.
    489 
    490 .. method:: BaseProtocol.connection_lost(exc)
    491 
    492    Called when the connection is lost or closed.
    493 
    494    The argument is either an exception object or :const:`None`.
    495    The latter means a regular EOF is received, or the connection was
    496    aborted or closed by this side of the connection.
    497 
    498 
    499 .. rubric:: Flow Control Callbacks
    500 
    501 Flow control callbacks can be called by transports to pause or
    502 resume writing performed by the protocol.
    503 
    504 See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits`
    505 method for more details.
    506 
    507 .. method:: BaseProtocol.pause_writing()
    508 
    509    Called when the transport's buffer goes over the high watermark.
    510 
    511 .. method:: BaseProtocol.resume_writing()
    512 
    513    Called when the transport's buffer drains below the low watermark.
    514 
    515 If the buffer size equals the high watermark,
    516 :meth:`~BaseProtocol.pause_writing` is not called: the buffer size must
    517 go strictly over.
    518 
    519 Conversely, :meth:`~BaseProtocol.resume_writing` is called when the
    520 buffer size is equal or lower than the low watermark.  These end
    521 conditions are important to ensure that things go as expected when
    522 either mark is zero.
    523 
    524 
    525 Streaming Protocols
    526 -------------------
    527 
    528 Event methods, such as :meth:`loop.create_server`,
    529 :meth:`loop.create_unix_server`, :meth:`loop.create_connection`,
    530 :meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`,
    531 :meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe`
    532 accept factories that return streaming protocols.
    533 
    534 .. method:: Protocol.data_received(data)
    535 
    536    Called when some data is received.  *data* is a non-empty bytes
    537    object containing the incoming data.
    538 
    539    Whether the data is buffered, chunked or reassembled depends on
    540    the transport.  In general, you shouldn't rely on specific semantics
    541    and instead make your parsing generic and flexible. However,
    542    data is always received in the correct order.
    543 
    544    The method can be called an arbitrary number of times while
    545    a connection is open.
    546 
    547    However, :meth:`protocol.eof_received() <Protocol.eof_received>`
    548    is called at most once.  Once `eof_received()` is called,
    549    ``data_received()`` is not called anymore.
    550 
    551 .. method:: Protocol.eof_received()
    552 
    553    Called when the other end signals it won't send any more data
    554    (for example by calling :meth:`transport.write_eof()
    555    <WriteTransport.write_eof>`, if the other end also uses
    556    asyncio).
    557 
    558    This method may return a false value (including ``None``), in which case
    559    the transport will close itself.  Conversely, if this method returns a
    560    true value, the protocol used determines whether to close the transport.
    561    Since the default implementation returns ``None``, it implicitly closes the
    562    connection.
    563 
    564    Some transports, including SSL, don't support half-closed connections,
    565    in which case returning true from this method will result in the connection
    566    being closed.
    567 
    568 
    569 State machine:
    570 
    571 .. code-block:: none
    572 
    573     start -> connection_made
    574         [-> data_received]*
    575         [-> eof_received]?
    576     -> connection_lost -> end
    577 
    578 
    579 Buffered Streaming Protocols
    580 ----------------------------
    581 
    582 .. versionadded:: 3.7
    583    **Important:** this has been added to asyncio in Python 3.7
    584    *on a provisional basis*!  This is as an experimental API that
    585    might be changed or removed completely in Python 3.8.
    586 
    587 Buffered Protocols can be used with any event loop method
    588 that supports `Streaming Protocols`_.
    589 
    590 ``BufferedProtocol`` implementations allow explicit manual allocation
    591 and control of the receive buffer.  Event loops can then use the buffer
    592 provided by the protocol to avoid unnecessary data copies.  This
    593 can result in noticeable performance improvement for protocols that
    594 receive big amounts of data.  Sophisticated protocol implementations
    595 can significantly reduce the number of buffer allocations.
    596 
    597 The following callbacks are called on :class:`BufferedProtocol`
    598 instances:
    599 
    600 .. method:: BufferedProtocol.get_buffer(sizehint)
    601 
    602    Called to allocate a new receive buffer.
    603 
    604    *sizehint* is the recommended minimum size for the returned
    605    buffer.  It is acceptable to return smaller or larger buffers
    606    than what *sizehint* suggests.  When set to -1, the buffer size
    607    can be arbitrary. It is an error to return a buffer with a zero size.
    608 
    609    ``get_buffer()`` must return an object implementing the
    610    :ref:`buffer protocol <bufferobjects>`.
    611 
    612 .. method:: BufferedProtocol.buffer_updated(nbytes)
    613 
    614    Called when the buffer was updated with the received data.
    615 
    616    *nbytes* is the total number of bytes that were written to the buffer.
    617 
    618 .. method:: BufferedProtocol.eof_received()
    619 
    620    See the documentation of the :meth:`protocol.eof_received()
    621    <Protocol.eof_received>` method.
    622 
    623 
    624 :meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number
    625 of times during a connection.  However, :meth:`protocol.eof_received()
    626 <Protocol.eof_received>` is called at most once
    627 and, if called, :meth:`~BufferedProtocol.get_buffer` and
    628 :meth:`~BufferedProtocol.buffer_updated` won't be called after it.
    629 
    630 State machine:
    631 
    632 .. code-block:: none
    633 
    634     start -> connection_made
    635         [-> get_buffer
    636             [-> buffer_updated]?
    637         ]*
    638         [-> eof_received]?
    639     -> connection_lost -> end
    640 
    641 
    642 Datagram Protocols
    643 ------------------
    644 
    645 Datagram Protocol instances should be constructed by protocol
    646 factories passed to the :meth:`loop.create_datagram_endpoint` method.
    647 
    648 .. method:: DatagramProtocol.datagram_received(data, addr)
    649 
    650    Called when a datagram is received.  *data* is a bytes object containing
    651    the incoming data.  *addr* is the address of the peer sending the data;
    652    the exact format depends on the transport.
    653 
    654 .. method:: DatagramProtocol.error_received(exc)
    655 
    656    Called when a previous send or receive operation raises an
    657    :class:`OSError`.  *exc* is the :class:`OSError` instance.
    658 
    659    This method is called in rare conditions, when the transport (e.g. UDP)
    660    detects that a datagram could not be delivered to its recipient.
    661    In many conditions though, undeliverable datagrams will be silently
    662    dropped.
    663 
    664 .. note::
    665 
    666    On BSD systems (macOS, FreeBSD, etc.) flow control is not supported
    667    for datagram protocols, because there is no reliable way to detect send
    668    failures caused by writing too many packets.
    669 
    670    The socket always appears 'ready' and excess packets are dropped. An
    671    :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may
    672    or may not be raised; if it is raised, it will be reported to
    673    :meth:`DatagramProtocol.error_received` but otherwise ignored.
    674 
    675 
    676 .. _asyncio-subprocess-protocols:
    677 
    678 Subprocess Protocols
    679 --------------------
    680 
    681 Datagram Protocol instances should be constructed by protocol
    682 factories passed to the :meth:`loop.subprocess_exec` and
    683 :meth:`loop.subprocess_shell` methods.
    684 
    685 .. method:: SubprocessProtocol.pipe_data_received(fd, data)
    686 
    687    Called when the child process writes data into its stdout or stderr
    688    pipe.
    689 
    690    *fd* is the integer file descriptor of the pipe.
    691 
    692    *data* is a non-empty bytes object containing the received data.
    693 
    694 .. method:: SubprocessProtocol.pipe_connection_lost(fd, exc)
    695 
    696    Called when one of the pipes communicating with the child process
    697    is closed.
    698 
    699    *fd* is the integer file descriptor that was closed.
    700 
    701 .. method:: SubprocessProtocol.process_exited()
    702 
    703    Called when the child process has exited.
    704 
    705 
    706 Examples
    707 ========
    708 
    709 .. _asyncio_example_tcp_echo_server_protocol:
    710 
    711 TCP Echo Server
    712 ---------------
    713 
    714 Create a TCP echo server using the :meth:`loop.create_server` method, send back
    715 received data, and close the connection::
    716 
    717     import asyncio
    718 
    719 
    720     class EchoServerProtocol(asyncio.Protocol):
    721         def connection_made(self, transport):
    722             peername = transport.get_extra_info('peername')
    723             print('Connection from {}'.format(peername))
    724             self.transport = transport
    725 
    726         def data_received(self, data):
    727             message = data.decode()
    728             print('Data received: {!r}'.format(message))
    729 
    730             print('Send: {!r}'.format(message))
    731             self.transport.write(data)
    732 
    733             print('Close the client socket')
    734             self.transport.close()
    735 
    736 
    737     async def main():
    738         # Get a reference to the event loop as we plan to use
    739         # low-level APIs.
    740         loop = asyncio.get_running_loop()
    741 
    742         server = await loop.create_server(
    743             lambda: EchoServerProtocol(),
    744             '127.0.0.1', 8888)
    745 
    746         async with server:
    747             await server.serve_forever()
    748 
    749 
    750     asyncio.run(main())
    751 
    752 
    753 .. seealso::
    754 
    755    The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>`
    756    example uses the high-level :func:`asyncio.start_server` function.
    757 
    758 .. _asyncio_example_tcp_echo_client_protocol:
    759 
    760 TCP Echo Client
    761 ---------------
    762 
    763 A TCP echo client using the :meth:`loop.create_connection` method, sends
    764 data, and waits until the connection is closed::
    765 
    766     import asyncio
    767 
    768 
    769     class EchoClientProtocol(asyncio.Protocol):
    770         def __init__(self, message, on_con_lost, loop):
    771             self.message = message
    772             self.loop = loop
    773             self.on_con_lost = on_con_lost
    774 
    775         def connection_made(self, transport):
    776             transport.write(self.message.encode())
    777             print('Data sent: {!r}'.format(self.message))
    778 
    779         def data_received(self, data):
    780             print('Data received: {!r}'.format(data.decode()))
    781 
    782         def connection_lost(self, exc):
    783             print('The server closed the connection')
    784             self.on_con_lost.set_result(True)
    785 
    786 
    787     async def main():
    788         # Get a reference to the event loop as we plan to use
    789         # low-level APIs.
    790         loop = asyncio.get_running_loop()
    791 
    792         on_con_lost = loop.create_future()
    793         message = 'Hello World!'
    794 
    795         transport, protocol = await loop.create_connection(
    796             lambda: EchoClientProtocol(message, on_con_lost, loop),
    797             '127.0.0.1', 8888)
    798 
    799         # Wait until the protocol signals that the connection
    800         # is lost and close the transport.
    801         try:
    802             await on_con_lost
    803         finally:
    804             transport.close()
    805 
    806 
    807     asyncio.run(main())
    808 
    809 
    810 .. seealso::
    811 
    812    The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
    813    example uses the high-level :func:`asyncio.open_connection` function.
    814 
    815 
    816 .. _asyncio-udp-echo-server-protocol:
    817 
    818 UDP Echo Server
    819 ---------------
    820 
    821 A UDP echo server, using the :meth:`loop.create_datagram_endpoint`
    822 method, sends back received data::
    823 
    824     import asyncio
    825 
    826 
    827     class EchoServerProtocol:
    828         def connection_made(self, transport):
    829             self.transport = transport
    830 
    831         def datagram_received(self, data, addr):
    832             message = data.decode()
    833             print('Received %r from %s' % (message, addr))
    834             print('Send %r to %s' % (message, addr))
    835             self.transport.sendto(data, addr)
    836 
    837 
    838     async def main():
    839         print("Starting UDP server")
    840 
    841         # Get a reference to the event loop as we plan to use
    842         # low-level APIs.
    843         loop = asyncio.get_running_loop()
    844 
    845         # One protocol instance will be created to serve all
    846         # client requests.
    847         transport, protocol = await loop.create_datagram_endpoint(
    848             lambda: EchoServerProtocol(),
    849             local_addr=('127.0.0.1', 9999))
    850 
    851         try:
    852             await asyncio.sleep(3600)  # Serve for 1 hour.
    853         finally:
    854             transport.close()
    855 
    856 
    857     asyncio.run(main())
    858 
    859 
    860 .. _asyncio-udp-echo-client-protocol:
    861 
    862 UDP Echo Client
    863 ---------------
    864 
    865 A UDP echo client, using the :meth:`loop.create_datagram_endpoint`
    866 method, sends data and closes the transport when it receives the answer::
    867 
    868     import asyncio
    869 
    870 
    871     class EchoClientProtocol:
    872         def __init__(self, message, loop):
    873             self.message = message
    874             self.loop = loop
    875             self.transport = None
    876             self.on_con_lost = loop.create_future()
    877 
    878         def connection_made(self, transport):
    879             self.transport = transport
    880             print('Send:', self.message)
    881             self.transport.sendto(self.message.encode())
    882 
    883         def datagram_received(self, data, addr):
    884             print("Received:", data.decode())
    885 
    886             print("Close the socket")
    887             self.transport.close()
    888 
    889         def error_received(self, exc):
    890             print('Error received:', exc)
    891 
    892         def connection_lost(self, exc):
    893             print("Connection closed")
    894             self.on_con_lost.set_result(True)
    895 
    896 
    897     async def main():
    898         # Get a reference to the event loop as we plan to use
    899         # low-level APIs.
    900         loop = asyncio.get_running_loop()
    901 
    902         message = "Hello World!"
    903         transport, protocol = await loop.create_datagram_endpoint(
    904             lambda: EchoClientProtocol(message, loop),
    905             remote_addr=('127.0.0.1', 9999))
    906 
    907         try:
    908             await protocol.on_con_lost
    909         finally:
    910             transport.close()
    911 
    912 
    913     asyncio.run(main())
    914 
    915 
    916 .. _asyncio_example_create_connection:
    917 
    918 Connecting Existing Sockets
    919 ---------------------------
    920 
    921 Wait until a socket receives data using the
    922 :meth:`loop.create_connection` method with a protocol::
    923 
    924     import asyncio
    925     import socket
    926 
    927 
    928     class MyProtocol(asyncio.Protocol):
    929 
    930         def __init__(self, loop):
    931             self.transport = None
    932             self.on_con_lost = loop.create_future()
    933 
    934         def connection_made(self, transport):
    935             self.transport = transport
    936 
    937         def data_received(self, data):
    938             print("Received:", data.decode())
    939 
    940             # We are done: close the transport;
    941             # connection_lost() will be called automatically.
    942             self.transport.close()
    943 
    944         def connection_lost(self, exc):
    945             # The socket has been closed
    946             self.on_con_lost.set_result(True)
    947 
    948 
    949     async def main():
    950         # Get a reference to the event loop as we plan to use
    951         # low-level APIs.
    952         loop = asyncio.get_running_loop()
    953 
    954         # Create a pair of connected sockets
    955         rsock, wsock = socket.socketpair()
    956 
    957         # Register the socket to wait for data.
    958         transport, protocol = await loop.create_connection(
    959             lambda: MyProtocol(loop), sock=rsock)
    960 
    961         # Simulate the reception of data from the network.
    962         loop.call_soon(wsock.send, 'abc'.encode())
    963 
    964         try:
    965             await protocol.on_con_lost
    966         finally:
    967             transport.close()
    968             wsock.close()
    969 
    970     asyncio.run(main())
    971 
    972 .. seealso::
    973 
    974    The :ref:`watch a file descriptor for read events
    975    <asyncio_example_watch_fd>` example uses the low-level
    976    :meth:`loop.add_reader` method to register an FD.
    977 
    978    The :ref:`register an open socket to wait for data using streams
    979    <asyncio_example_create_connection-streams>` example uses high-level streams
    980    created by the :func:`open_connection` function in a coroutine.
    981 
    982 .. _asyncio_example_subprocess_proto:
    983 
    984 loop.subprocess_exec() and SubprocessProtocol
    985 ---------------------------------------------
    986 
    987 An example of a subprocess protocol used to get the output of a
    988 subprocess and to wait for the subprocess exit.
    989 
    990 The subprocess is created by th :meth:`loop.subprocess_exec` method::
    991 
    992     import asyncio
    993     import sys
    994 
    995     class DateProtocol(asyncio.SubprocessProtocol):
    996         def __init__(self, exit_future):
    997             self.exit_future = exit_future
    998             self.output = bytearray()
    999 
   1000         def pipe_data_received(self, fd, data):
   1001             self.output.extend(data)
   1002 
   1003         def process_exited(self):
   1004             self.exit_future.set_result(True)
   1005 
   1006     async def get_date():
   1007         # Get a reference to the event loop as we plan to use
   1008         # low-level APIs.
   1009         loop = asyncio.get_running_loop()
   1010 
   1011         code = 'import datetime; print(datetime.datetime.now())'
   1012         exit_future = asyncio.Future(loop=loop)
   1013 
   1014         # Create the subprocess controlled by DateProtocol;
   1015         # redirect the standard output into a pipe.
   1016         transport, protocol = await loop.subprocess_exec(
   1017             lambda: DateProtocol(exit_future),
   1018             sys.executable, '-c', code,
   1019             stdin=None, stderr=None)
   1020 
   1021         # Wait for the subprocess exit using the process_exited()
   1022         # method of the protocol.
   1023         await exit_future
   1024 
   1025         # Close the stdout pipe.
   1026         transport.close()
   1027 
   1028         # Read the output which was collected by the
   1029         # pipe_data_received() method of the protocol.
   1030         data = bytes(protocol.output)
   1031         return data.decode('ascii').rstrip()
   1032 
   1033     if sys.platform == "win32":
   1034         asyncio.set_event_loop_policy(
   1035             asyncio.WindowsProactorEventLoopPolicy())
   1036 
   1037     date = asyncio.run(get_date())
   1038     print(f"Current date: {date}")
   1039 
   1040 See also the :ref:`same example <asyncio_example_create_subprocess_exec>`
   1041 written using high-level APIs.
   1042