1 .. currentmodule:: asyncio 2 3 .. _asyncio-streams: 4 5 ======= 6 Streams 7 ======= 8 9 Streams are high-level async/await-ready primitives to work with 10 network connections. Streams allow sending and receiving data without 11 using callbacks or low-level protocols and transports. 12 13 .. _asyncio_example_stream: 14 15 Here is an example of a TCP echo client written using asyncio 16 streams:: 17 18 import asyncio 19 20 async def tcp_echo_client(message): 21 reader, writer = await asyncio.open_connection( 22 '127.0.0.1', 8888) 23 24 print(f'Send: {message!r}') 25 writer.write(message.encode()) 26 27 data = await reader.read(100) 28 print(f'Received: {data.decode()!r}') 29 30 print('Close the connection') 31 writer.close() 32 await writer.wait_closed() 33 34 asyncio.run(tcp_echo_client('Hello World!')) 35 36 37 See also the `Examples`_ section below. 38 39 40 .. rubric:: Stream Functions 41 42 The following top-level asyncio functions can be used to create 43 and work with streams: 44 45 46 .. coroutinefunction:: open_connection(host=None, port=None, \*, \ 47 loop=None, limit=None, ssl=None, family=0, \ 48 proto=0, flags=0, sock=None, local_addr=None, \ 49 server_hostname=None, ssl_handshake_timeout=None) 50 51 Establish a network connection and return a pair of 52 ``(reader, writer)`` objects. 53 54 The returned *reader* and *writer* objects are instances of 55 :class:`StreamReader` and :class:`StreamWriter` classes. 56 57 The *loop* argument is optional and can always be determined 58 automatically when this function is awaited from a coroutine. 59 60 *limit* determines the buffer size limit used by the 61 returned :class:`StreamReader` instance. By default the *limit* 62 is set to 64 KiB. 63 64 The rest of the arguments are passed directly to 65 :meth:`loop.create_connection`. 66 67 .. versionadded:: 3.7 68 69 The *ssl_handshake_timeout* parameter. 70 71 .. coroutinefunction:: start_server(client_connected_cb, host=None, \ 72 port=None, \*, loop=None, limit=None, \ 73 family=socket.AF_UNSPEC, \ 74 flags=socket.AI_PASSIVE, sock=None, \ 75 backlog=100, ssl=None, reuse_address=None, \ 76 reuse_port=None, ssl_handshake_timeout=None, \ 77 start_serving=True) 78 79 Start a socket server. 80 81 The *client_connected_cb* callback is called whenever a new client 82 connection is established. It receives a ``(reader, writer)`` pair 83 as two arguments, instances of the :class:`StreamReader` and 84 :class:`StreamWriter` classes. 85 86 *client_connected_cb* can be a plain callable or a 87 :ref:`coroutine function <coroutine>`; if it is a coroutine function, 88 it will be automatically scheduled as a :class:`Task`. 89 90 The *loop* argument is optional and can always be determined 91 automatically when this method is awaited from a coroutine. 92 93 *limit* determines the buffer size limit used by the 94 returned :class:`StreamReader` instance. By default the *limit* 95 is set to 64 KiB. 96 97 The rest of the arguments are passed directly to 98 :meth:`loop.create_server`. 99 100 .. versionadded:: 3.7 101 102 The *ssl_handshake_timeout* and *start_serving* parameters. 103 104 105 .. rubric:: Unix Sockets 106 107 .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \ 108 limit=None, ssl=None, sock=None, \ 109 server_hostname=None, ssl_handshake_timeout=None) 110 111 Establish a Unix socket connection and return a pair of 112 ``(reader, writer)``. 113 114 Similar to :func:`open_connection` but operates on Unix sockets. 115 116 See also the documentation of :meth:`loop.create_unix_connection`. 117 118 .. availability:: Unix. 119 120 .. versionadded:: 3.7 121 122 The *ssl_handshake_timeout* parameter. 123 124 .. versionchanged:: 3.7 125 126 The *path* parameter can now be a :term:`path-like object` 127 128 129 .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ 130 \*, loop=None, limit=None, sock=None, \ 131 backlog=100, ssl=None, ssl_handshake_timeout=None, \ 132 start_serving=True) 133 134 Start a Unix socket server. 135 136 Similar to :func:`start_server` but works with Unix sockets. 137 138 See also the documentation of :meth:`loop.create_unix_server`. 139 140 .. availability:: Unix. 141 142 .. versionadded:: 3.7 143 144 The *ssl_handshake_timeout* and *start_serving* parameters. 145 146 .. versionchanged:: 3.7 147 148 The *path* parameter can now be a :term:`path-like object`. 149 150 151 --------- 152 153 154 StreamReader 155 ============ 156 157 .. class:: StreamReader 158 159 Represents a reader object that provides APIs to read data 160 from the IO stream. 161 162 It is not recommended to instantiate *StreamReader* objects 163 directly; use :func:`open_connection` and :func:`start_server` 164 instead. 165 166 .. coroutinemethod:: read(n=-1) 167 168 Read up to *n* bytes. If *n* is not provided, or set to ``-1``, 169 read until EOF and return all read bytes. 170 171 If EOF was received and the internal buffer is empty, 172 return an empty ``bytes`` object. 173 174 .. coroutinemethod:: readline() 175 176 Read one line, where "line" is a sequence of bytes 177 ending with ``\n``. 178 179 If EOF is received and ``\n`` was not found, the method 180 returns partially read data. 181 182 If EOF is received and the internal buffer is empty, 183 return an empty ``bytes`` object. 184 185 .. coroutinemethod:: readexactly(n) 186 187 Read exactly *n* bytes. 188 189 Raise an :exc:`IncompleteReadError` if EOF is reached before *n* 190 can be read. Use the :attr:`IncompleteReadError.partial` 191 attribute to get the partially read data. 192 193 .. coroutinemethod:: readuntil(separator=b'\\n') 194 195 Read data from the stream until *separator* is found. 196 197 On success, the data and separator will be removed from the 198 internal buffer (consumed). Returned data will include the 199 separator at the end. 200 201 If the amount of data read exceeds the configured stream limit, a 202 :exc:`LimitOverrunError` exception is raised, and the data 203 is left in the internal buffer and can be read again. 204 205 If EOF is reached before the complete separator is found, 206 an :exc:`IncompleteReadError` exception is raised, and the internal 207 buffer is reset. The :attr:`IncompleteReadError.partial` attribute 208 may contain a portion of the separator. 209 210 .. versionadded:: 3.5.2 211 212 .. method:: at_eof() 213 214 Return ``True`` if the buffer is empty and :meth:`feed_eof` 215 was called. 216 217 218 StreamWriter 219 ============ 220 221 .. class:: StreamWriter 222 223 Represents a writer object that provides APIs to write data 224 to the IO stream. 225 226 It is not recommended to instantiate *StreamWriter* objects 227 directly; use :func:`open_connection` and :func:`start_server` 228 instead. 229 230 .. method:: can_write_eof() 231 232 Return *True* if the underlying transport supports 233 the :meth:`write_eof` method, *False* otherwise. 234 235 .. method:: write_eof() 236 237 Close the write end of the stream after the buffered write 238 data is flushed. 239 240 .. attribute:: transport 241 242 Return the underlying asyncio transport. 243 244 .. method:: get_extra_info(name, default=None) 245 246 Access optional transport information; see 247 :meth:`BaseTransport.get_extra_info` for details. 248 249 .. method:: write(data) 250 251 Write *data* to the stream. 252 253 This method is not subject to flow control. Calls to ``write()`` should 254 be followed by :meth:`drain`. 255 256 .. method:: writelines(data) 257 258 Write a list (or any iterable) of bytes to the stream. 259 260 This method is not subject to flow control. Calls to ``writelines()`` 261 should be followed by :meth:`drain`. 262 263 .. coroutinemethod:: drain() 264 265 Wait until it is appropriate to resume writing to the stream. 266 Example:: 267 268 writer.write(data) 269 await writer.drain() 270 271 This is a flow control method that interacts with the underlying 272 IO write buffer. When the size of the buffer reaches 273 the high watermark, *drain()* blocks until the size of the 274 buffer is drained down to the low watermark and writing can 275 be resumed. When there is nothing to wait for, the :meth:`drain` 276 returns immediately. 277 278 .. method:: close() 279 280 Close the stream. 281 282 .. method:: is_closing() 283 284 Return ``True`` if the stream is closed or in the process of 285 being closed. 286 287 .. versionadded:: 3.7 288 289 .. coroutinemethod:: wait_closed() 290 291 Wait until the stream is closed. 292 293 Should be called after :meth:`close` to wait until the underlying 294 connection is closed. 295 296 .. versionadded:: 3.7 297 298 299 Examples 300 ======== 301 302 .. _asyncio-tcp-echo-client-streams: 303 304 TCP echo client using streams 305 ----------------------------- 306 307 TCP echo client using the :func:`asyncio.open_connection` function:: 308 309 import asyncio 310 311 async def tcp_echo_client(message): 312 reader, writer = await asyncio.open_connection( 313 '127.0.0.1', 8888) 314 315 print(f'Send: {message!r}') 316 writer.write(message.encode()) 317 318 data = await reader.read(100) 319 print(f'Received: {data.decode()!r}') 320 321 print('Close the connection') 322 writer.close() 323 324 asyncio.run(tcp_echo_client('Hello World!')) 325 326 327 .. seealso:: 328 329 The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>` 330 example uses the low-level :meth:`loop.create_connection` method. 331 332 333 .. _asyncio-tcp-echo-server-streams: 334 335 TCP echo server using streams 336 ----------------------------- 337 338 TCP echo server using the :func:`asyncio.start_server` function:: 339 340 import asyncio 341 342 async def handle_echo(reader, writer): 343 data = await reader.read(100) 344 message = data.decode() 345 addr = writer.get_extra_info('peername') 346 347 print(f"Received {message!r} from {addr!r}") 348 349 print(f"Send: {message!r}") 350 writer.write(data) 351 await writer.drain() 352 353 print("Close the connection") 354 writer.close() 355 356 async def main(): 357 server = await asyncio.start_server( 358 handle_echo, '127.0.0.1', 8888) 359 360 addr = server.sockets[0].getsockname() 361 print(f'Serving on {addr}') 362 363 async with server: 364 await server.serve_forever() 365 366 asyncio.run(main()) 367 368 369 .. seealso:: 370 371 The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>` 372 example uses the :meth:`loop.create_server` method. 373 374 375 Get HTTP headers 376 ---------------- 377 378 Simple example querying HTTP headers of the URL passed on the command line:: 379 380 import asyncio 381 import urllib.parse 382 import sys 383 384 async def print_http_headers(url): 385 url = urllib.parse.urlsplit(url) 386 if url.scheme == 'https': 387 reader, writer = await asyncio.open_connection( 388 url.hostname, 443, ssl=True) 389 else: 390 reader, writer = await asyncio.open_connection( 391 url.hostname, 80) 392 393 query = ( 394 f"HEAD {url.path or '/'} HTTP/1.0\r\n" 395 f"Host: {url.hostname}\r\n" 396 f"\r\n" 397 ) 398 399 writer.write(query.encode('latin-1')) 400 while True: 401 line = await reader.readline() 402 if not line: 403 break 404 405 line = line.decode('latin1').rstrip() 406 if line: 407 print(f'HTTP header> {line}') 408 409 # Ignore the body, close the socket 410 writer.close() 411 412 url = sys.argv[1] 413 asyncio.run(print_http_headers(url)) 414 415 416 Usage:: 417 418 python example.py http://example.com/path/page.html 419 420 or with HTTPS:: 421 422 python example.py https://example.com/path/page.html 423 424 425 .. _asyncio_example_create_connection-streams: 426 427 Register an open socket to wait for data using streams 428 ------------------------------------------------------ 429 430 Coroutine waiting until a socket receives data using the 431 :func:`open_connection` function:: 432 433 import asyncio 434 import socket 435 436 async def wait_for_data(): 437 # Get a reference to the current event loop because 438 # we want to access low-level APIs. 439 loop = asyncio.get_running_loop() 440 441 # Create a pair of connected sockets. 442 rsock, wsock = socket.socketpair() 443 444 # Register the open socket to wait for data. 445 reader, writer = await asyncio.open_connection(sock=rsock) 446 447 # Simulate the reception of data from the network 448 loop.call_soon(wsock.send, 'abc'.encode()) 449 450 # Wait for data 451 data = await reader.read(100) 452 453 # Got data, we are done: close the socket 454 print("Received:", data.decode()) 455 writer.close() 456 457 # Close the second socket 458 wsock.close() 459 460 asyncio.run(wait_for_data()) 461 462 .. seealso:: 463 464 The :ref:`register an open socket to wait for data using a protocol 465 <asyncio_example_create_connection>` example uses a low-level protocol and 466 the :meth:`loop.create_connection` method. 467 468 The :ref:`watch a file descriptor for read events 469 <asyncio_example_watch_fd>` example uses the low-level 470 :meth:`loop.add_reader` method to watch a file descriptor. 471