Home | History | Annotate | Download | only in library
      1 .. currentmodule:: asyncio
      2 
      3 .. _asyncio-streams:
      4 
      5 =======
      6 Streams
      7 =======
      8 
      9 Streams are high-level async/await-ready primitives to work with
     10 network connections.  Streams allow sending and receiving data without
     11 using callbacks or low-level protocols and transports.
     12 
     13 .. _asyncio_example_stream:
     14 
     15 Here is an example of a TCP echo client written using asyncio
     16 streams::
     17 
     18     import asyncio
     19 
     20     async def tcp_echo_client(message):
     21         reader, writer = await asyncio.open_connection(
     22             '127.0.0.1', 8888)
     23 
     24         print(f'Send: {message!r}')
     25         writer.write(message.encode())
     26 
     27         data = await reader.read(100)
     28         print(f'Received: {data.decode()!r}')
     29 
     30         print('Close the connection')
     31         writer.close()
     32         await writer.wait_closed()
     33 
     34     asyncio.run(tcp_echo_client('Hello World!'))
     35 
     36 
     37 See also the `Examples`_ section below.
     38 
     39 
     40 .. rubric:: Stream Functions
     41 
     42 The following top-level asyncio functions can be used to create
     43 and work with streams:
     44 
     45 
     46 .. coroutinefunction:: open_connection(host=None, port=None, \*, \
     47                           loop=None, limit=None, ssl=None, family=0, \
     48                           proto=0, flags=0, sock=None, local_addr=None, \
     49                           server_hostname=None, ssl_handshake_timeout=None)
     50 
     51    Establish a network connection and return a pair of
     52    ``(reader, writer)`` objects.
     53 
     54    The returned *reader* and *writer* objects are instances of
     55    :class:`StreamReader` and :class:`StreamWriter` classes.
     56 
     57    The *loop* argument is optional and can always be determined
     58    automatically when this function is awaited from a coroutine.
     59 
     60    *limit* determines the buffer size limit used by the
     61    returned :class:`StreamReader` instance.  By default the *limit*
     62    is set to 64 KiB.
     63 
     64    The rest of the arguments are passed directly to
     65    :meth:`loop.create_connection`.
     66 
     67    .. versionadded:: 3.7
     68 
     69       The *ssl_handshake_timeout* parameter.
     70 
     71 .. coroutinefunction:: start_server(client_connected_cb, host=None, \
     72                           port=None, \*, loop=None, limit=None, \
     73                           family=socket.AF_UNSPEC, \
     74                           flags=socket.AI_PASSIVE, sock=None, \
     75                           backlog=100, ssl=None, reuse_address=None, \
     76                           reuse_port=None, ssl_handshake_timeout=None, \
     77                           start_serving=True)
     78 
     79    Start a socket server.
     80 
     81    The *client_connected_cb* callback is called whenever a new client
     82    connection is established.  It receives a ``(reader, writer)`` pair
     83    as two arguments, instances of the :class:`StreamReader` and
     84    :class:`StreamWriter` classes.
     85 
     86    *client_connected_cb* can be a plain callable or a
     87    :ref:`coroutine function <coroutine>`; if it is a coroutine function,
     88    it will be automatically scheduled as a :class:`Task`.
     89 
     90    The *loop* argument is optional and can always be determined
     91    automatically when this method is awaited from a coroutine.
     92 
     93    *limit* determines the buffer size limit used by the
     94    returned :class:`StreamReader` instance.  By default the *limit*
     95    is set to 64 KiB.
     96 
     97    The rest of the arguments are passed directly to
     98    :meth:`loop.create_server`.
     99 
    100    .. versionadded:: 3.7
    101 
    102       The *ssl_handshake_timeout* and *start_serving* parameters.
    103 
    104 
    105 .. rubric:: Unix Sockets
    106 
    107 .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
    108                         limit=None, ssl=None, sock=None, \
    109                         server_hostname=None, ssl_handshake_timeout=None)
    110 
    111    Establish a Unix socket connection and return a pair of
    112    ``(reader, writer)``.
    113 
    114    Similar to :func:`open_connection` but operates on Unix sockets.
    115 
    116    See also the documentation of :meth:`loop.create_unix_connection`.
    117 
    118    .. availability:: Unix.
    119 
    120    .. versionadded:: 3.7
    121 
    122       The *ssl_handshake_timeout* parameter.
    123 
    124    .. versionchanged:: 3.7
    125 
    126       The *path* parameter can now be a :term:`path-like object`
    127 
    128 
    129 .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
    130                           \*, loop=None, limit=None, sock=None, \
    131                           backlog=100, ssl=None, ssl_handshake_timeout=None, \
    132                           start_serving=True)
    133 
    134    Start a Unix socket server.
    135 
    136    Similar to :func:`start_server` but works with Unix sockets.
    137 
    138    See also the documentation of :meth:`loop.create_unix_server`.
    139 
    140    .. availability:: Unix.
    141 
    142    .. versionadded:: 3.7
    143 
    144       The *ssl_handshake_timeout* and *start_serving* parameters.
    145 
    146    .. versionchanged:: 3.7
    147 
    148       The *path* parameter can now be a :term:`path-like object`.
    149 
    150 
    151 ---------
    152 
    153 
    154 StreamReader
    155 ============
    156 
    157 .. class:: StreamReader
    158 
    159    Represents a reader object that provides APIs to read data
    160    from the IO stream.
    161 
    162    It is not recommended to instantiate *StreamReader* objects
    163    directly; use :func:`open_connection` and :func:`start_server`
    164    instead.
    165 
    166    .. coroutinemethod:: read(n=-1)
    167 
    168       Read up to *n* bytes.  If *n* is not provided, or set to ``-1``,
    169       read until EOF and return all read bytes.
    170 
    171       If EOF was received and the internal buffer is empty,
    172       return an empty ``bytes`` object.
    173 
    174    .. coroutinemethod:: readline()
    175 
    176       Read one line, where "line" is a sequence of bytes
    177       ending with ``\n``.
    178 
    179       If EOF is received and ``\n`` was not found, the method
    180       returns partially read data.
    181 
    182       If EOF is received and the internal buffer is empty,
    183       return an empty ``bytes`` object.
    184 
    185    .. coroutinemethod:: readexactly(n)
    186 
    187       Read exactly *n* bytes.
    188 
    189       Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
    190       can be read.  Use the :attr:`IncompleteReadError.partial`
    191       attribute to get the partially read data.
    192 
    193    .. coroutinemethod:: readuntil(separator=b'\\n')
    194 
    195       Read data from the stream until *separator* is found.
    196 
    197       On success, the data and separator will be removed from the
    198       internal buffer (consumed). Returned data will include the
    199       separator at the end.
    200 
    201       If the amount of data read exceeds the configured stream limit, a
    202       :exc:`LimitOverrunError` exception is raised, and the data
    203       is left in the internal buffer and can be read again.
    204 
    205       If EOF is reached before the complete separator is found,
    206       an :exc:`IncompleteReadError` exception is raised, and the internal
    207       buffer is reset.  The :attr:`IncompleteReadError.partial` attribute
    208       may contain a portion of the separator.
    209 
    210       .. versionadded:: 3.5.2
    211 
    212    .. method:: at_eof()
    213 
    214       Return ``True`` if the buffer is empty and :meth:`feed_eof`
    215       was called.
    216 
    217 
    218 StreamWriter
    219 ============
    220 
    221 .. class:: StreamWriter
    222 
    223    Represents a writer object that provides APIs to write data
    224    to the IO stream.
    225 
    226    It is not recommended to instantiate *StreamWriter* objects
    227    directly; use :func:`open_connection` and :func:`start_server`
    228    instead.
    229 
    230    .. method:: can_write_eof()
    231 
    232       Return *True* if the underlying transport supports
    233       the :meth:`write_eof` method, *False* otherwise.
    234 
    235    .. method:: write_eof()
    236 
    237       Close the write end of the stream after the buffered write
    238       data is flushed.
    239 
    240    .. attribute:: transport
    241 
    242       Return the underlying asyncio transport.
    243 
    244    .. method:: get_extra_info(name, default=None)
    245 
    246       Access optional transport information; see
    247       :meth:`BaseTransport.get_extra_info` for details.
    248 
    249    .. method:: write(data)
    250 
    251       Write *data* to the stream.
    252 
    253       This method is not subject to flow control.  Calls to ``write()`` should
    254       be followed by :meth:`drain`.
    255 
    256    .. method:: writelines(data)
    257 
    258       Write a list (or any iterable) of bytes to the stream.
    259 
    260       This method is not subject to flow control. Calls to ``writelines()``
    261       should be followed by :meth:`drain`.
    262 
    263    .. coroutinemethod:: drain()
    264 
    265       Wait until it is appropriate to resume writing to the stream.
    266       Example::
    267 
    268           writer.write(data)
    269           await writer.drain()
    270 
    271       This is a flow control method that interacts with the underlying
    272       IO write buffer.  When the size of the buffer reaches
    273       the high watermark, *drain()* blocks until the size of the
    274       buffer is drained down to the low watermark and writing can
    275       be resumed.  When there is nothing to wait for, the :meth:`drain`
    276       returns immediately.
    277 
    278    .. method:: close()
    279 
    280       Close the stream.
    281 
    282    .. method:: is_closing()
    283 
    284       Return ``True`` if the stream is closed or in the process of
    285       being closed.
    286 
    287       .. versionadded:: 3.7
    288 
    289    .. coroutinemethod:: wait_closed()
    290 
    291       Wait until the stream is closed.
    292 
    293       Should be called after :meth:`close` to wait until the underlying
    294       connection is closed.
    295 
    296       .. versionadded:: 3.7
    297 
    298 
    299 Examples
    300 ========
    301 
    302 .. _asyncio-tcp-echo-client-streams:
    303 
    304 TCP echo client using streams
    305 -----------------------------
    306 
    307 TCP echo client using the :func:`asyncio.open_connection` function::
    308 
    309     import asyncio
    310 
    311     async def tcp_echo_client(message):
    312         reader, writer = await asyncio.open_connection(
    313             '127.0.0.1', 8888)
    314 
    315         print(f'Send: {message!r}')
    316         writer.write(message.encode())
    317 
    318         data = await reader.read(100)
    319         print(f'Received: {data.decode()!r}')
    320 
    321         print('Close the connection')
    322         writer.close()
    323 
    324     asyncio.run(tcp_echo_client('Hello World!'))
    325 
    326 
    327 .. seealso::
    328 
    329    The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>`
    330    example uses the low-level :meth:`loop.create_connection` method.
    331 
    332 
    333 .. _asyncio-tcp-echo-server-streams:
    334 
    335 TCP echo server using streams
    336 -----------------------------
    337 
    338 TCP echo server using the :func:`asyncio.start_server` function::
    339 
    340     import asyncio
    341 
    342     async def handle_echo(reader, writer):
    343         data = await reader.read(100)
    344         message = data.decode()
    345         addr = writer.get_extra_info('peername')
    346 
    347         print(f"Received {message!r} from {addr!r}")
    348 
    349         print(f"Send: {message!r}")
    350         writer.write(data)
    351         await writer.drain()
    352 
    353         print("Close the connection")
    354         writer.close()
    355 
    356     async def main():
    357         server = await asyncio.start_server(
    358             handle_echo, '127.0.0.1', 8888)
    359 
    360         addr = server.sockets[0].getsockname()
    361         print(f'Serving on {addr}')
    362 
    363         async with server:
    364             await server.serve_forever()
    365 
    366     asyncio.run(main())
    367 
    368 
    369 .. seealso::
    370 
    371    The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>`
    372    example uses the :meth:`loop.create_server` method.
    373 
    374 
    375 Get HTTP headers
    376 ----------------
    377 
    378 Simple example querying HTTP headers of the URL passed on the command line::
    379 
    380     import asyncio
    381     import urllib.parse
    382     import sys
    383 
    384     async def print_http_headers(url):
    385         url = urllib.parse.urlsplit(url)
    386         if url.scheme == 'https':
    387             reader, writer = await asyncio.open_connection(
    388                 url.hostname, 443, ssl=True)
    389         else:
    390             reader, writer = await asyncio.open_connection(
    391                 url.hostname, 80)
    392 
    393         query = (
    394             f"HEAD {url.path or '/'} HTTP/1.0\r\n"
    395             f"Host: {url.hostname}\r\n"
    396             f"\r\n"
    397         )
    398 
    399         writer.write(query.encode('latin-1'))
    400         while True:
    401             line = await reader.readline()
    402             if not line:
    403                 break
    404 
    405             line = line.decode('latin1').rstrip()
    406             if line:
    407                 print(f'HTTP header> {line}')
    408 
    409         # Ignore the body, close the socket
    410         writer.close()
    411 
    412     url = sys.argv[1]
    413     asyncio.run(print_http_headers(url))
    414 
    415 
    416 Usage::
    417 
    418     python example.py http://example.com/path/page.html
    419 
    420 or with HTTPS::
    421 
    422     python example.py https://example.com/path/page.html
    423 
    424 
    425 .. _asyncio_example_create_connection-streams:
    426 
    427 Register an open socket to wait for data using streams
    428 ------------------------------------------------------
    429 
    430 Coroutine waiting until a socket receives data using the
    431 :func:`open_connection` function::
    432 
    433     import asyncio
    434     import socket
    435 
    436     async def wait_for_data():
    437         # Get a reference to the current event loop because
    438         # we want to access low-level APIs.
    439         loop = asyncio.get_running_loop()
    440 
    441         # Create a pair of connected sockets.
    442         rsock, wsock = socket.socketpair()
    443 
    444         # Register the open socket to wait for data.
    445         reader, writer = await asyncio.open_connection(sock=rsock)
    446 
    447         # Simulate the reception of data from the network
    448         loop.call_soon(wsock.send, 'abc'.encode())
    449 
    450         # Wait for data
    451         data = await reader.read(100)
    452 
    453         # Got data, we are done: close the socket
    454         print("Received:", data.decode())
    455         writer.close()
    456 
    457         # Close the second socket
    458         wsock.close()
    459 
    460     asyncio.run(wait_for_data())
    461 
    462 .. seealso::
    463 
    464    The :ref:`register an open socket to wait for data using a protocol
    465    <asyncio_example_create_connection>` example uses a low-level protocol and
    466    the :meth:`loop.create_connection` method.
    467 
    468    The :ref:`watch a file descriptor for read events
    469    <asyncio_example_watch_fd>` example uses the low-level
    470    :meth:`loop.add_reader` method to watch a file descriptor.
    471