1 .. currentmodule:: asyncio 2 3 ++++++++++++++++++++++++++++++++++++++++++++++ 4 Transports and protocols (callback based API) 5 ++++++++++++++++++++++++++++++++++++++++++++++ 6 7 .. _asyncio-transport: 8 9 Transports 10 ========== 11 12 Transports are classes provided by :mod:`asyncio` in order to abstract 13 various kinds of communication channels. You generally won't instantiate 14 a transport yourself; instead, you will call an :class:`AbstractEventLoop` method 15 which will create the transport and try to initiate the underlying 16 communication channel, calling you back when it succeeds. 17 18 Once the communication channel is established, a transport is always 19 paired with a :ref:`protocol <asyncio-protocol>` instance. The protocol can 20 then call the transport's methods for various purposes. 21 22 :mod:`asyncio` currently implements transports for TCP, UDP, SSL, and 23 subprocess pipes. The methods available on a transport depend on 24 the transport's kind. 25 26 The transport classes are :ref:`not thread safe <asyncio-multithreading>`. 27 28 .. versionchanged:: 3.6 29 The socket option ``TCP_NODELAY`` is now set by default. 30 31 32 BaseTransport 33 ------------- 34 35 .. class:: BaseTransport 36 37 Base class for transports. 38 39 .. method:: close() 40 41 Close the transport. If the transport has a buffer for outgoing 42 data, buffered data will be flushed asynchronously. No more data 43 will be received. After all buffered data is flushed, the 44 protocol's :meth:`connection_lost` method will be called with 45 :const:`None` as its argument. 46 47 .. method:: is_closing() 48 49 Return ``True`` if the transport is closing or is closed. 50 51 .. versionadded:: 3.5.1 52 53 .. method:: get_extra_info(name, default=None) 54 55 Return optional transport information. *name* is a string representing 56 the piece of transport-specific information to get, *default* is the 57 value to return if the information doesn't exist. 58 59 This method allows transport implementations to easily expose 60 channel-specific information. 61 62 * socket: 63 64 - ``'peername'``: the remote address to which the socket is connected, 65 result of :meth:`socket.socket.getpeername` (``None`` on error) 66 - ``'socket'``: :class:`socket.socket` instance 67 - ``'sockname'``: the socket's own address, 68 result of :meth:`socket.socket.getsockname` 69 70 * SSL socket: 71 72 - ``'compression'``: the compression algorithm being used as a string, 73 or ``None`` if the connection isn't compressed; result of 74 :meth:`ssl.SSLSocket.compression` 75 - ``'cipher'``: a three-value tuple containing the name of the cipher 76 being used, the version of the SSL protocol that defines its use, and 77 the number of secret bits being used; result of 78 :meth:`ssl.SSLSocket.cipher` 79 - ``'peercert'``: peer certificate; result of 80 :meth:`ssl.SSLSocket.getpeercert` 81 - ``'sslcontext'``: :class:`ssl.SSLContext` instance 82 - ``'ssl_object'``: :class:`ssl.SSLObject` or :class:`ssl.SSLSocket` 83 instance 84 85 * pipe: 86 87 - ``'pipe'``: pipe object 88 89 * subprocess: 90 91 - ``'subprocess'``: :class:`subprocess.Popen` instance 92 93 .. method:: set_protocol(protocol) 94 95 Set a new protocol. Switching protocol should only be done when both 96 protocols are documented to support the switch. 97 98 .. versionadded:: 3.5.3 99 100 .. method:: get_protocol 101 102 Return the current protocol. 103 104 .. versionadded:: 3.5.3 105 106 .. versionchanged:: 3.5.1 107 ``'ssl_object'`` info was added to SSL sockets. 108 109 110 ReadTransport 111 ------------- 112 113 .. class:: ReadTransport 114 115 Interface for read-only transports. 116 117 .. method:: pause_reading() 118 119 Pause the receiving end of the transport. No data will be passed to 120 the protocol's :meth:`data_received` method until :meth:`resume_reading` 121 is called. 122 123 .. method:: resume_reading() 124 125 Resume the receiving end. The protocol's :meth:`data_received` method 126 will be called once again if some data is available for reading. 127 128 129 WriteTransport 130 -------------- 131 132 .. class:: WriteTransport 133 134 Interface for write-only transports. 135 136 .. method:: abort() 137 138 Close the transport immediately, without waiting for pending operations 139 to complete. Buffered data will be lost. No more data will be received. 140 The protocol's :meth:`connection_lost` method will eventually be 141 called with :const:`None` as its argument. 142 143 .. method:: can_write_eof() 144 145 Return :const:`True` if the transport supports :meth:`write_eof`, 146 :const:`False` if not. 147 148 .. method:: get_write_buffer_size() 149 150 Return the current size of the output buffer used by the transport. 151 152 .. method:: get_write_buffer_limits() 153 154 Get the *high*- and *low*-water limits for write flow control. Return a 155 tuple ``(low, high)`` where *low* and *high* are positive number of 156 bytes. 157 158 Use :meth:`set_write_buffer_limits` to set the limits. 159 160 .. versionadded:: 3.4.2 161 162 .. method:: set_write_buffer_limits(high=None, low=None) 163 164 Set the *high*- and *low*-water limits for write flow control. 165 166 These two values control when call the protocol's 167 :meth:`pause_writing` and :meth:`resume_writing` methods are called. 168 If specified, the low-water limit must be less than or equal to the 169 high-water limit. Neither *high* nor *low* can be negative. 170 171 The defaults are implementation-specific. If only the 172 high-water limit is given, the low-water limit defaults to an 173 implementation-specific value less than or equal to the 174 high-water limit. Setting *high* to zero forces *low* to zero as 175 well, and causes :meth:`pause_writing` to be called whenever the 176 buffer becomes non-empty. Setting *low* to zero causes 177 :meth:`resume_writing` to be called only once the buffer is empty. 178 Use of zero for either limit is generally sub-optimal as it 179 reduces opportunities for doing I/O and computation 180 concurrently. 181 182 Use :meth:`get_write_buffer_limits` to get the limits. 183 184 .. method:: write(data) 185 186 Write some *data* bytes to the transport. 187 188 This method does not block; it buffers the data and arranges for it 189 to be sent out asynchronously. 190 191 .. method:: writelines(list_of_data) 192 193 Write a list (or any iterable) of data bytes to the transport. 194 This is functionally equivalent to calling :meth:`write` on each 195 element yielded by the iterable, but may be implemented more efficiently. 196 197 .. method:: write_eof() 198 199 Close the write end of the transport after flushing buffered data. 200 Data may still be received. 201 202 This method can raise :exc:`NotImplementedError` if the transport 203 (e.g. SSL) doesn't support half-closes. 204 205 206 DatagramTransport 207 ----------------- 208 209 .. method:: DatagramTransport.sendto(data, addr=None) 210 211 Send the *data* bytes to the remote peer given by *addr* (a 212 transport-dependent target address). If *addr* is :const:`None`, the 213 data is sent to the target address given on transport creation. 214 215 This method does not block; it buffers the data and arranges for it 216 to be sent out asynchronously. 217 218 .. method:: DatagramTransport.abort() 219 220 Close the transport immediately, without waiting for pending operations 221 to complete. Buffered data will be lost. No more data will be received. 222 The protocol's :meth:`connection_lost` method will eventually be 223 called with :const:`None` as its argument. 224 225 226 BaseSubprocessTransport 227 ----------------------- 228 229 .. class:: BaseSubprocessTransport 230 231 .. method:: get_pid() 232 233 Return the subprocess process id as an integer. 234 235 .. method:: get_pipe_transport(fd) 236 237 Return the transport for the communication pipe corresponding to the 238 integer file descriptor *fd*: 239 240 * ``0``: readable streaming transport of the standard input (*stdin*), 241 or :const:`None` if the subprocess was not created with ``stdin=PIPE`` 242 * ``1``: writable streaming transport of the standard output (*stdout*), 243 or :const:`None` if the subprocess was not created with ``stdout=PIPE`` 244 * ``2``: writable streaming transport of the standard error (*stderr*), 245 or :const:`None` if the subprocess was not created with ``stderr=PIPE`` 246 * other *fd*: :const:`None` 247 248 .. method:: get_returncode() 249 250 Return the subprocess returncode as an integer or :const:`None` 251 if it hasn't returned, similarly to the 252 :attr:`subprocess.Popen.returncode` attribute. 253 254 .. method:: kill() 255 256 Kill the subprocess, as in :meth:`subprocess.Popen.kill`. 257 258 On POSIX systems, the function sends SIGKILL to the subprocess. 259 On Windows, this method is an alias for :meth:`terminate`. 260 261 .. method:: send_signal(signal) 262 263 Send the *signal* number to the subprocess, as in 264 :meth:`subprocess.Popen.send_signal`. 265 266 .. method:: terminate() 267 268 Ask the subprocess to stop, as in :meth:`subprocess.Popen.terminate`. 269 This method is an alias for the :meth:`close` method. 270 271 On POSIX systems, this method sends SIGTERM to the subprocess. 272 On Windows, the Windows API function TerminateProcess() is called to 273 stop the subprocess. 274 275 .. method:: close() 276 277 Ask the subprocess to stop by calling the :meth:`terminate` method if the 278 subprocess hasn't returned yet, and close transports of all pipes 279 (*stdin*, *stdout* and *stderr*). 280 281 282 .. _asyncio-protocol: 283 284 Protocols 285 ========= 286 287 :mod:`asyncio` provides base classes that you can subclass to implement 288 your network protocols. Those classes are used in conjunction with 289 :ref:`transports <asyncio-transport>` (see below): the protocol parses incoming 290 data and asks for the writing of outgoing data, while the transport is 291 responsible for the actual I/O and buffering. 292 293 When subclassing a protocol class, it is recommended you override certain 294 methods. Those methods are callbacks: they will be called by the transport 295 on certain events (for example when some data is received); you shouldn't 296 call them yourself, unless you are implementing a transport. 297 298 .. note:: 299 All callbacks have default implementations, which are empty. Therefore, 300 you only need to implement the callbacks for the events in which you 301 are interested. 302 303 304 Protocol classes 305 ---------------- 306 307 .. class:: Protocol 308 309 The base class for implementing streaming protocols (for use with 310 e.g. TCP and SSL transports). 311 312 .. class:: DatagramProtocol 313 314 The base class for implementing datagram protocols (for use with 315 e.g. UDP transports). 316 317 .. class:: SubprocessProtocol 318 319 The base class for implementing protocols communicating with child 320 processes (through a set of unidirectional pipes). 321 322 323 Connection callbacks 324 -------------------- 325 326 These callbacks may be called on :class:`Protocol`, :class:`DatagramProtocol` 327 and :class:`SubprocessProtocol` instances: 328 329 .. method:: BaseProtocol.connection_made(transport) 330 331 Called when a connection is made. 332 333 The *transport* argument is the transport representing the 334 connection. You are responsible for storing it somewhere 335 (e.g. as an attribute) if you need to. 336 337 .. method:: BaseProtocol.connection_lost(exc) 338 339 Called when the connection is lost or closed. 340 341 The argument is either an exception object or :const:`None`. 342 The latter means a regular EOF is received, or the connection was 343 aborted or closed by this side of the connection. 344 345 :meth:`~BaseProtocol.connection_made` and :meth:`~BaseProtocol.connection_lost` 346 are called exactly once per successful connection. All other callbacks will be 347 called between those two methods, which allows for easier resource management 348 in your protocol implementation. 349 350 The following callbacks may be called only on :class:`SubprocessProtocol` 351 instances: 352 353 .. method:: SubprocessProtocol.pipe_data_received(fd, data) 354 355 Called when the child process writes data into its stdout or stderr pipe. 356 *fd* is the integer file descriptor of the pipe. *data* is a non-empty 357 bytes object containing the data. 358 359 .. method:: SubprocessProtocol.pipe_connection_lost(fd, exc) 360 361 Called when one of the pipes communicating with the child process 362 is closed. *fd* is the integer file descriptor that was closed. 363 364 .. method:: SubprocessProtocol.process_exited() 365 366 Called when the child process has exited. 367 368 369 Streaming protocols 370 ------------------- 371 372 The following callbacks are called on :class:`Protocol` instances: 373 374 .. method:: Protocol.data_received(data) 375 376 Called when some data is received. *data* is a non-empty bytes object 377 containing the incoming data. 378 379 .. note:: 380 Whether the data is buffered, chunked or reassembled depends on 381 the transport. In general, you shouldn't rely on specific semantics 382 and instead make your parsing generic and flexible enough. However, 383 data is always received in the correct order. 384 385 .. method:: Protocol.eof_received() 386 387 Calls when the other end signals it won't send any more data 388 (for example by calling :meth:`write_eof`, if the other end also uses 389 asyncio). 390 391 This method may return a false value (including ``None``), in which case 392 the transport will close itself. Conversely, if this method returns a 393 true value, closing the transport is up to the protocol. Since the 394 default implementation returns ``None``, it implicitly closes the connection. 395 396 .. note:: 397 Some transports such as SSL don't support half-closed connections, 398 in which case returning true from this method will not prevent closing 399 the connection. 400 401 :meth:`data_received` can be called an arbitrary number of times during 402 a connection. However, :meth:`eof_received` is called at most once 403 and, if called, :meth:`data_received` won't be called after it. 404 405 State machine: 406 407 start -> :meth:`~BaseProtocol.connection_made` 408 [-> :meth:`~Protocol.data_received` \*] 409 [-> :meth:`~Protocol.eof_received` ?] 410 -> :meth:`~BaseProtocol.connection_lost` -> end 411 412 413 Datagram protocols 414 ------------------ 415 416 The following callbacks are called on :class:`DatagramProtocol` instances. 417 418 .. method:: DatagramProtocol.datagram_received(data, addr) 419 420 Called when a datagram is received. *data* is a bytes object containing 421 the incoming data. *addr* is the address of the peer sending the data; 422 the exact format depends on the transport. 423 424 .. method:: DatagramProtocol.error_received(exc) 425 426 Called when a previous send or receive operation raises an 427 :class:`OSError`. *exc* is the :class:`OSError` instance. 428 429 This method is called in rare conditions, when the transport (e.g. UDP) 430 detects that a datagram couldn't be delivered to its recipient. 431 In many conditions though, undeliverable datagrams will be silently 432 dropped. 433 434 435 Flow control callbacks 436 ---------------------- 437 438 These callbacks may be called on :class:`Protocol`, 439 :class:`DatagramProtocol` and :class:`SubprocessProtocol` instances: 440 441 .. method:: BaseProtocol.pause_writing() 442 443 Called when the transport's buffer goes over the high-water mark. 444 445 .. method:: BaseProtocol.resume_writing() 446 447 Called when the transport's buffer drains below the low-water mark. 448 449 450 :meth:`pause_writing` and :meth:`resume_writing` calls are paired -- 451 :meth:`pause_writing` is called once when the buffer goes strictly over 452 the high-water mark (even if subsequent writes increases the buffer size 453 even more), and eventually :meth:`resume_writing` is called once when the 454 buffer size reaches the low-water mark. 455 456 .. note:: 457 If the buffer size equals the high-water mark, 458 :meth:`pause_writing` is not called -- it must go strictly over. 459 Conversely, :meth:`resume_writing` is called when the buffer size is 460 equal or lower than the low-water mark. These end conditions 461 are important to ensure that things go as expected when either 462 mark is zero. 463 464 .. note:: 465 On BSD systems (OS X, FreeBSD, etc.) flow control is not supported 466 for :class:`DatagramProtocol`, because send failures caused by 467 writing too many packets cannot be detected easily. The socket 468 always appears 'ready' and excess packets are dropped; an 469 :class:`OSError` with errno set to :const:`errno.ENOBUFS` may or 470 may not be raised; if it is raised, it will be reported to 471 :meth:`DatagramProtocol.error_received` but otherwise ignored. 472 473 474 Coroutines and protocols 475 ------------------------ 476 477 Coroutines can be scheduled in a protocol method using :func:`ensure_future`, 478 but there is no guarantee made about the execution order. Protocols are not 479 aware of coroutines created in protocol methods and so will not wait for them. 480 481 To have a reliable execution order, use :ref:`stream objects <asyncio-streams>` in a 482 coroutine with ``yield from``. For example, the :meth:`StreamWriter.drain` 483 coroutine can be used to wait until the write buffer is flushed. 484 485 486 Protocol examples 487 ================= 488 489 .. _asyncio-tcp-echo-client-protocol: 490 491 TCP echo client protocol 492 ------------------------ 493 494 TCP echo client using the :meth:`AbstractEventLoop.create_connection` method, send 495 data and wait until the connection is closed:: 496 497 import asyncio 498 499 class EchoClientProtocol(asyncio.Protocol): 500 def __init__(self, message, loop): 501 self.message = message 502 self.loop = loop 503 504 def connection_made(self, transport): 505 transport.write(self.message.encode()) 506 print('Data sent: {!r}'.format(self.message)) 507 508 def data_received(self, data): 509 print('Data received: {!r}'.format(data.decode())) 510 511 def connection_lost(self, exc): 512 print('The server closed the connection') 513 print('Stop the event loop') 514 self.loop.stop() 515 516 loop = asyncio.get_event_loop() 517 message = 'Hello World!' 518 coro = loop.create_connection(lambda: EchoClientProtocol(message, loop), 519 '127.0.0.1', 8888) 520 loop.run_until_complete(coro) 521 loop.run_forever() 522 loop.close() 523 524 The event loop is running twice. The 525 :meth:`~AbstractEventLoop.run_until_complete` method is preferred in this short 526 example to raise an exception if the server is not listening, instead of 527 having to write a short coroutine to handle the exception and stop the 528 running loop. At :meth:`~AbstractEventLoop.run_until_complete` exit, the loop is 529 no longer running, so there is no need to stop the loop in case of an error. 530 531 .. seealso:: 532 533 The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` 534 example uses the :func:`asyncio.open_connection` function. 535 536 537 .. _asyncio-tcp-echo-server-protocol: 538 539 TCP echo server protocol 540 ------------------------ 541 542 TCP echo server using the :meth:`AbstractEventLoop.create_server` method, send back 543 received data and close the connection:: 544 545 import asyncio 546 547 class EchoServerClientProtocol(asyncio.Protocol): 548 def connection_made(self, transport): 549 peername = transport.get_extra_info('peername') 550 print('Connection from {}'.format(peername)) 551 self.transport = transport 552 553 def data_received(self, data): 554 message = data.decode() 555 print('Data received: {!r}'.format(message)) 556 557 print('Send: {!r}'.format(message)) 558 self.transport.write(data) 559 560 print('Close the client socket') 561 self.transport.close() 562 563 loop = asyncio.get_event_loop() 564 # Each client connection will create a new protocol instance 565 coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888) 566 server = loop.run_until_complete(coro) 567 568 # Serve requests until Ctrl+C is pressed 569 print('Serving on {}'.format(server.sockets[0].getsockname())) 570 try: 571 loop.run_forever() 572 except KeyboardInterrupt: 573 pass 574 575 # Close the server 576 server.close() 577 loop.run_until_complete(server.wait_closed()) 578 loop.close() 579 580 :meth:`Transport.close` can be called immediately after 581 :meth:`WriteTransport.write` even if data are not sent yet on the socket: both 582 methods are asynchronous. ``yield from`` is not needed because these transport 583 methods are not coroutines. 584 585 .. seealso:: 586 587 The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>` 588 example uses the :func:`asyncio.start_server` function. 589 590 591 .. _asyncio-udp-echo-client-protocol: 592 593 UDP echo client protocol 594 ------------------------ 595 596 UDP echo client using the :meth:`AbstractEventLoop.create_datagram_endpoint` 597 method, send data and close the transport when we received the answer:: 598 599 import asyncio 600 601 class EchoClientProtocol: 602 def __init__(self, message, loop): 603 self.message = message 604 self.loop = loop 605 self.transport = None 606 607 def connection_made(self, transport): 608 self.transport = transport 609 print('Send:', self.message) 610 self.transport.sendto(self.message.encode()) 611 612 def datagram_received(self, data, addr): 613 print("Received:", data.decode()) 614 615 print("Close the socket") 616 self.transport.close() 617 618 def error_received(self, exc): 619 print('Error received:', exc) 620 621 def connection_lost(self, exc): 622 print("Socket closed, stop the event loop") 623 loop = asyncio.get_event_loop() 624 loop.stop() 625 626 loop = asyncio.get_event_loop() 627 message = "Hello World!" 628 connect = loop.create_datagram_endpoint( 629 lambda: EchoClientProtocol(message, loop), 630 remote_addr=('127.0.0.1', 9999)) 631 transport, protocol = loop.run_until_complete(connect) 632 loop.run_forever() 633 transport.close() 634 loop.close() 635 636 637 .. _asyncio-udp-echo-server-protocol: 638 639 UDP echo server protocol 640 ------------------------ 641 642 UDP echo server using the :meth:`AbstractEventLoop.create_datagram_endpoint` 643 method, send back received data:: 644 645 import asyncio 646 647 class EchoServerProtocol: 648 def connection_made(self, transport): 649 self.transport = transport 650 651 def datagram_received(self, data, addr): 652 message = data.decode() 653 print('Received %r from %s' % (message, addr)) 654 print('Send %r to %s' % (message, addr)) 655 self.transport.sendto(data, addr) 656 657 loop = asyncio.get_event_loop() 658 print("Starting UDP server") 659 # One protocol instance will be created to serve all client requests 660 listen = loop.create_datagram_endpoint( 661 EchoServerProtocol, local_addr=('127.0.0.1', 9999)) 662 transport, protocol = loop.run_until_complete(listen) 663 664 try: 665 loop.run_forever() 666 except KeyboardInterrupt: 667 pass 668 669 transport.close() 670 loop.close() 671 672 673 .. _asyncio-register-socket: 674 675 Register an open socket to wait for data using a protocol 676 --------------------------------------------------------- 677 678 Wait until a socket receives data using the 679 :meth:`AbstractEventLoop.create_connection` method with a protocol, and then close 680 the event loop :: 681 682 import asyncio 683 try: 684 from socket import socketpair 685 except ImportError: 686 from asyncio.windows_utils import socketpair 687 688 # Create a pair of connected sockets 689 rsock, wsock = socketpair() 690 loop = asyncio.get_event_loop() 691 692 class MyProtocol(asyncio.Protocol): 693 transport = None 694 695 def connection_made(self, transport): 696 self.transport = transport 697 698 def data_received(self, data): 699 print("Received:", data.decode()) 700 701 # We are done: close the transport (it will call connection_lost()) 702 self.transport.close() 703 704 def connection_lost(self, exc): 705 # The socket has been closed, stop the event loop 706 loop.stop() 707 708 # Register the socket to wait for data 709 connect_coro = loop.create_connection(MyProtocol, sock=rsock) 710 transport, protocol = loop.run_until_complete(connect_coro) 711 712 # Simulate the reception of data from the network 713 loop.call_soon(wsock.send, 'abc'.encode()) 714 715 # Run the event loop 716 loop.run_forever() 717 718 # We are done, close sockets and the event loop 719 rsock.close() 720 wsock.close() 721 loop.close() 722 723 .. seealso:: 724 725 The :ref:`watch a file descriptor for read events 726 <asyncio-watch-read-event>` example uses the low-level 727 :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a 728 socket. 729 730 The :ref:`register an open socket to wait for data using streams 731 <asyncio-register-socket-streams>` example uses high-level streams 732 created by the :func:`open_connection` function in a coroutine. 733