1 .. currentmodule:: asyncio 2 3 4 .. _asyncio-transports-protocols: 5 6 7 ======================== 8 Transports and Protocols 9 ======================== 10 11 .. rubric:: Preface 12 13 Transports and Protocols are used by the **low-level** event loop 14 APIs such as :meth:`loop.create_connection`. They use 15 callback-based programming style and enable high-performance 16 implementations of network or IPC protocols (e.g. HTTP). 17 18 Essentially, transports and protocols should only be used in 19 libraries and frameworks and never in high-level asyncio 20 applications. 21 22 This documentation page covers both `Transports`_ and `Protocols`_. 23 24 .. rubric:: Introduction 25 26 At the highest level, the transport is concerned with *how* bytes 27 are transmitted, while the protocol determines *which* bytes to 28 transmit (and to some extent when). 29 30 A different way of saying the same thing: a transport is an 31 abstraction for a socket (or similar I/O endpoint) while a protocol 32 is an abstraction for an application, from the transport's point 33 of view. 34 35 Yet another view is the transport and protocol interfaces 36 together define an abstract interface for using network I/O and 37 interprocess I/O. 38 39 There is always a 1:1 relationship between transport and protocol 40 objects: the protocol calls transport methods to send data, 41 while the transport calls protocol methods to pass it data that 42 has been received. 43 44 Most of connection oriented event loop methods 45 (such as :meth:`loop.create_connection`) usually accept a 46 *protocol_factory* argument used to create a *Protocol* object 47 for an accepted connection, represented by a *Transport* object. 48 Such methods usually return a tuple of ``(transport, protocol)``. 49 50 .. rubric:: Contents 51 52 This documentation page contains the following sections: 53 54 * The `Transports`_ section documents asyncio :class:`BaseTransport`, 55 :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`, 56 :class:`DatagramTransport`, and :class:`SubprocessTransport` 57 classes. 58 59 * The `Protocols`_ section documents asyncio :class:`BaseProtocol`, 60 :class:`Protocol`, :class:`BufferedProtocol`, 61 :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes. 62 63 * The `Examples`_ section showcases how to work with transports, 64 protocols, and low-level event loop APIs. 65 66 67 .. _asyncio-transport: 68 69 Transports 70 ========== 71 72 Transports are classes provided by :mod:`asyncio` in order to abstract 73 various kinds of communication channels. 74 75 Transport objects are always instantiated by an 76 ref:`asyncio event loop <asyncio-event-loop>`. 77 78 asyncio implements transports for TCP, UDP, SSL, and subprocess pipes. 79 The methods available on a transport depend on the transport's kind. 80 81 The transport classes are :ref:`not thread safe <asyncio-multithreading>`. 82 83 84 Transports Hierarchy 85 -------------------- 86 87 .. class:: BaseTransport 88 89 Base class for all transports. Contains methods that all 90 asyncio transports share. 91 92 .. class:: WriteTransport(BaseTransport) 93 94 A base transport for write-only connections. 95 96 Instances of the *WriteTransport* class are returned from 97 the :meth:`loop.connect_write_pipe` event loop method and 98 are also used by subprocess-related methods like 99 :meth:`loop.subprocess_exec`. 100 101 .. class:: ReadTransport(BaseTransport) 102 103 A base transport for read-only connections. 104 105 Instances of the *ReadTransport* class are returned from 106 the :meth:`loop.connect_read_pipe` event loop method and 107 are also used by subprocess-related methods like 108 :meth:`loop.subprocess_exec`. 109 110 .. class:: Transport(WriteTransport, ReadTransport) 111 112 Interface representing a bidirectional transport, such as a 113 TCP connection. 114 115 The user does not instantiate a transport directly; they call a 116 utility function, passing it a protocol factory and other 117 information necessary to create the transport and protocol. 118 119 Instances of the *Transport* class are returned from or used by 120 event loop methods like :meth:`loop.create_connection`, 121 :meth:`loop.create_unix_connection`, 122 :meth:`loop.create_server`, :meth:`loop.sendfile`, etc. 123 124 125 .. class:: DatagramTransport(BaseTransport) 126 127 A transport for datagram (UDP) connections. 128 129 Instances of the *DatagramTransport* class are returned from 130 the :meth:`loop.create_datagram_endpoint` event loop method. 131 132 133 .. class:: SubprocessTransport(BaseTransport) 134 135 An abstraction to represent a connection between a parent and its 136 child OS process. 137 138 Instances of the *SubprocessTransport* class are returned from 139 event loop methods :meth:`loop.subprocess_shell` and 140 :meth:`loop.subprocess_exec`. 141 142 143 Base Transport 144 -------------- 145 146 .. method:: BaseTransport.close() 147 148 Close the transport. 149 150 If the transport has a buffer for outgoing 151 data, buffered data will be flushed asynchronously. No more data 152 will be received. After all buffered data is flushed, the 153 protocol's :meth:`protocol.connection_lost() 154 <BaseProtocol.connection_lost>` method will be called with 155 :const:`None` as its argument. 156 157 .. method:: BaseTransport.is_closing() 158 159 Return ``True`` if the transport is closing or is closed. 160 161 .. method:: BaseTransport.get_extra_info(name, default=None) 162 163 Return information about the transport or underlying resources 164 it uses. 165 166 *name* is a string representing the piece of transport-specific 167 information to get. 168 169 *default* is the value to return if the information is not 170 available, or if the transport does not support querying it 171 with the given third-party event loop implementation or on the 172 current platform. 173 174 For example, the following code attempts to get the underlying 175 socket object of the transport:: 176 177 sock = transport.get_extra_info('socket') 178 if sock is not None: 179 print(sock.getsockopt(...)) 180 181 Categories of information that can be queried on some transports: 182 183 * socket: 184 185 - ``'peername'``: the remote address to which the socket is 186 connected, result of :meth:`socket.socket.getpeername` 187 (``None`` on error) 188 189 - ``'socket'``: :class:`socket.socket` instance 190 191 - ``'sockname'``: the socket's own address, 192 result of :meth:`socket.socket.getsockname` 193 194 * SSL socket: 195 196 - ``'compression'``: the compression algorithm being used as a 197 string, or ``None`` if the connection isn't compressed; result 198 of :meth:`ssl.SSLSocket.compression` 199 200 - ``'cipher'``: a three-value tuple containing the name of the 201 cipher being used, the version of the SSL protocol that defines 202 its use, and the number of secret bits being used; result of 203 :meth:`ssl.SSLSocket.cipher` 204 205 - ``'peercert'``: peer certificate; result of 206 :meth:`ssl.SSLSocket.getpeercert` 207 208 - ``'sslcontext'``: :class:`ssl.SSLContext` instance 209 210 - ``'ssl_object'``: :class:`ssl.SSLObject` or 211 :class:`ssl.SSLSocket` instance 212 213 * pipe: 214 215 - ``'pipe'``: pipe object 216 217 * subprocess: 218 219 - ``'subprocess'``: :class:`subprocess.Popen` instance 220 221 .. method:: BaseTransport.set_protocol(protocol) 222 223 Set a new protocol. 224 225 Switching protocol should only be done when both 226 protocols are documented to support the switch. 227 228 .. method:: BaseTransport.get_protocol() 229 230 Return the current protocol. 231 232 233 Read-only Transports 234 -------------------- 235 236 .. method:: ReadTransport.is_reading() 237 238 Return ``True`` if the transport is receiving new data. 239 240 .. versionadded:: 3.7 241 242 .. method:: ReadTransport.pause_reading() 243 244 Pause the receiving end of the transport. No data will be passed to 245 the protocol's :meth:`protocol.data_received() <Protocol.data_received>` 246 method until :meth:`resume_reading` is called. 247 248 .. versionchanged:: 3.7 249 The method is idempotent, i.e. it can be called when the 250 transport is already paused or closed. 251 252 .. method:: ReadTransport.resume_reading() 253 254 Resume the receiving end. The protocol's 255 :meth:`protocol.data_received() <Protocol.data_received>` method 256 will be called once again if some data is available for reading. 257 258 .. versionchanged:: 3.7 259 The method is idempotent, i.e. it can be called when the 260 transport is already reading. 261 262 263 Write-only Transports 264 --------------------- 265 266 .. method:: WriteTransport.abort() 267 268 Close the transport immediately, without waiting for pending operations 269 to complete. Buffered data will be lost. No more data will be received. 270 The protocol's :meth:`protocol.connection_lost() 271 <BaseProtocol.connection_lost>` method will eventually be 272 called with :const:`None` as its argument. 273 274 .. method:: WriteTransport.can_write_eof() 275 276 Return :const:`True` if the transport supports 277 :meth:`~WriteTransport.write_eof`, :const:`False` if not. 278 279 .. method:: WriteTransport.get_write_buffer_size() 280 281 Return the current size of the output buffer used by the transport. 282 283 .. method:: WriteTransport.get_write_buffer_limits() 284 285 Get the *high* and *low* watermarks for write flow control. Return a 286 tuple ``(low, high)`` where *low* and *high* are positive number of 287 bytes. 288 289 Use :meth:`set_write_buffer_limits` to set the limits. 290 291 .. versionadded:: 3.4.2 292 293 .. method:: WriteTransport.set_write_buffer_limits(high=None, low=None) 294 295 Set the *high* and *low* watermarks for write flow control. 296 297 These two values (measured in number of 298 bytes) control when the protocol's 299 :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>` 300 and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>` 301 methods are called. If specified, the low watermark must be less 302 than or equal to the high watermark. Neither *high* nor *low* 303 can be negative. 304 305 :meth:`~BaseProtocol.pause_writing` is called when the buffer size 306 becomes greater than or equal to the *high* value. If writing has 307 been paused, :meth:`~BaseProtocol.resume_writing` is called when 308 the buffer size becomes less than or equal to the *low* value. 309 310 The defaults are implementation-specific. If only the 311 high watermark is given, the low watermark defaults to an 312 implementation-specific value less than or equal to the 313 high watermark. Setting *high* to zero forces *low* to zero as 314 well, and causes :meth:`~BaseProtocol.pause_writing` to be called 315 whenever the buffer becomes non-empty. Setting *low* to zero causes 316 :meth:`~BaseProtocol.resume_writing` to be called only once the 317 buffer is empty. Use of zero for either limit is generally 318 sub-optimal as it reduces opportunities for doing I/O and 319 computation concurrently. 320 321 Use :meth:`~WriteTransport.get_write_buffer_limits` 322 to get the limits. 323 324 .. method:: WriteTransport.write(data) 325 326 Write some *data* bytes to the transport. 327 328 This method does not block; it buffers the data and arranges for it 329 to be sent out asynchronously. 330 331 .. method:: WriteTransport.writelines(list_of_data) 332 333 Write a list (or any iterable) of data bytes to the transport. 334 This is functionally equivalent to calling :meth:`write` on each 335 element yielded by the iterable, but may be implemented more 336 efficiently. 337 338 .. method:: WriteTransport.write_eof() 339 340 Close the write end of the transport after flushing all buffered data. 341 Data may still be received. 342 343 This method can raise :exc:`NotImplementedError` if the transport 344 (e.g. SSL) doesn't support half-closed connections. 345 346 347 Datagram Transports 348 ------------------- 349 350 .. method:: DatagramTransport.sendto(data, addr=None) 351 352 Send the *data* bytes to the remote peer given by *addr* (a 353 transport-dependent target address). If *addr* is :const:`None`, 354 the data is sent to the target address given on transport 355 creation. 356 357 This method does not block; it buffers the data and arranges 358 for it to be sent out asynchronously. 359 360 .. method:: DatagramTransport.abort() 361 362 Close the transport immediately, without waiting for pending 363 operations to complete. Buffered data will be lost. 364 No more data will be received. The protocol's 365 :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>` 366 method will eventually be called with :const:`None` as its argument. 367 368 369 .. _asyncio-subprocess-transports: 370 371 Subprocess Transports 372 --------------------- 373 374 .. method:: SubprocessTransport.get_pid() 375 376 Return the subprocess process id as an integer. 377 378 .. method:: SubprocessTransport.get_pipe_transport(fd) 379 380 Return the transport for the communication pipe corresponding to the 381 integer file descriptor *fd*: 382 383 * ``0``: readable streaming transport of the standard input (*stdin*), 384 or :const:`None` if the subprocess was not created with ``stdin=PIPE`` 385 * ``1``: writable streaming transport of the standard output (*stdout*), 386 or :const:`None` if the subprocess was not created with ``stdout=PIPE`` 387 * ``2``: writable streaming transport of the standard error (*stderr*), 388 or :const:`None` if the subprocess was not created with ``stderr=PIPE`` 389 * other *fd*: :const:`None` 390 391 .. method:: SubprocessTransport.get_returncode() 392 393 Return the subprocess return code as an integer or :const:`None` 394 if it hasn't returned, which is similar to the 395 :attr:`subprocess.Popen.returncode` attribute. 396 397 .. method:: SubprocessTransport.kill() 398 399 Kill the subprocess. 400 401 On POSIX systems, the function sends SIGKILL to the subprocess. 402 On Windows, this method is an alias for :meth:`terminate`. 403 404 See also :meth:`subprocess.Popen.kill`. 405 406 .. method:: SubprocessTransport.send_signal(signal) 407 408 Send the *signal* number to the subprocess, as in 409 :meth:`subprocess.Popen.send_signal`. 410 411 .. method:: SubprocessTransport.terminate() 412 413 Stop the subprocess. 414 415 On POSIX systems, this method sends SIGTERM to the subprocess. 416 On Windows, the Windows API function TerminateProcess() is called to 417 stop the subprocess. 418 419 See also :meth:`subprocess.Popen.terminate`. 420 421 .. method:: SubprocessTransport.close() 422 423 Kill the subprocess by calling the :meth:`kill` method. 424 425 If the subprocess hasn't returned yet, and close transports of 426 *stdin*, *stdout*, and *stderr* pipes. 427 428 429 .. _asyncio-protocol: 430 431 Protocols 432 ========= 433 434 asyncio provides a set of abstract base classes that should be used 435 to implement network protocols. Those classes are meant to be used 436 together with :ref:`transports <asyncio-transport>`. 437 438 Subclasses of abstract base protocol classes may implement some or 439 all methods. All these methods are callbacks: they are called by 440 transports on certain events, for example when some data is received. 441 A base protocol method should be called by the corresponding transport. 442 443 444 Base Protocols 445 -------------- 446 447 .. class:: BaseProtocol 448 449 Base protocol with methods that all protocols share. 450 451 .. class:: Protocol(BaseProtocol) 452 453 The base class for implementing streaming protocols 454 (TCP, Unix sockets, etc). 455 456 .. class:: BufferedProtocol(BaseProtocol) 457 458 A base class for implementing streaming protocols with manual 459 control of the receive buffer. 460 461 .. class:: DatagramProtocol(BaseProtocol) 462 463 The base class for implementing datagram (UDP) protocols. 464 465 .. class:: SubprocessProtocol(BaseProtocol) 466 467 The base class for implementing protocols communicating with child 468 processes (unidirectional pipes). 469 470 471 Base Protocol 472 ------------- 473 474 All asyncio protocols can implement Base Protocol callbacks. 475 476 .. rubric:: Connection Callbacks 477 478 Connection callbacks are called on all protocols, exactly once per 479 a successful connection. All other protocol callbacks can only be 480 called between those two methods. 481 482 .. method:: BaseProtocol.connection_made(transport) 483 484 Called when a connection is made. 485 486 The *transport* argument is the transport representing the 487 connection. The protocol is responsible for storing the reference 488 to its transport. 489 490 .. method:: BaseProtocol.connection_lost(exc) 491 492 Called when the connection is lost or closed. 493 494 The argument is either an exception object or :const:`None`. 495 The latter means a regular EOF is received, or the connection was 496 aborted or closed by this side of the connection. 497 498 499 .. rubric:: Flow Control Callbacks 500 501 Flow control callbacks can be called by transports to pause or 502 resume writing performed by the protocol. 503 504 See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits` 505 method for more details. 506 507 .. method:: BaseProtocol.pause_writing() 508 509 Called when the transport's buffer goes over the high watermark. 510 511 .. method:: BaseProtocol.resume_writing() 512 513 Called when the transport's buffer drains below the low watermark. 514 515 If the buffer size equals the high watermark, 516 :meth:`~BaseProtocol.pause_writing` is not called: the buffer size must 517 go strictly over. 518 519 Conversely, :meth:`~BaseProtocol.resume_writing` is called when the 520 buffer size is equal or lower than the low watermark. These end 521 conditions are important to ensure that things go as expected when 522 either mark is zero. 523 524 525 Streaming Protocols 526 ------------------- 527 528 Event methods, such as :meth:`loop.create_server`, 529 :meth:`loop.create_unix_server`, :meth:`loop.create_connection`, 530 :meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`, 531 :meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe` 532 accept factories that return streaming protocols. 533 534 .. method:: Protocol.data_received(data) 535 536 Called when some data is received. *data* is a non-empty bytes 537 object containing the incoming data. 538 539 Whether the data is buffered, chunked or reassembled depends on 540 the transport. In general, you shouldn't rely on specific semantics 541 and instead make your parsing generic and flexible. However, 542 data is always received in the correct order. 543 544 The method can be called an arbitrary number of times while 545 a connection is open. 546 547 However, :meth:`protocol.eof_received() <Protocol.eof_received>` 548 is called at most once. Once `eof_received()` is called, 549 ``data_received()`` is not called anymore. 550 551 .. method:: Protocol.eof_received() 552 553 Called when the other end signals it won't send any more data 554 (for example by calling :meth:`transport.write_eof() 555 <WriteTransport.write_eof>`, if the other end also uses 556 asyncio). 557 558 This method may return a false value (including ``None``), in which case 559 the transport will close itself. Conversely, if this method returns a 560 true value, the protocol used determines whether to close the transport. 561 Since the default implementation returns ``None``, it implicitly closes the 562 connection. 563 564 Some transports, including SSL, don't support half-closed connections, 565 in which case returning true from this method will result in the connection 566 being closed. 567 568 569 State machine: 570 571 .. code-block:: none 572 573 start -> connection_made 574 [-> data_received]* 575 [-> eof_received]? 576 -> connection_lost -> end 577 578 579 Buffered Streaming Protocols 580 ---------------------------- 581 582 .. versionadded:: 3.7 583 **Important:** this has been added to asyncio in Python 3.7 584 *on a provisional basis*! This is as an experimental API that 585 might be changed or removed completely in Python 3.8. 586 587 Buffered Protocols can be used with any event loop method 588 that supports `Streaming Protocols`_. 589 590 ``BufferedProtocol`` implementations allow explicit manual allocation 591 and control of the receive buffer. Event loops can then use the buffer 592 provided by the protocol to avoid unnecessary data copies. This 593 can result in noticeable performance improvement for protocols that 594 receive big amounts of data. Sophisticated protocol implementations 595 can significantly reduce the number of buffer allocations. 596 597 The following callbacks are called on :class:`BufferedProtocol` 598 instances: 599 600 .. method:: BufferedProtocol.get_buffer(sizehint) 601 602 Called to allocate a new receive buffer. 603 604 *sizehint* is the recommended minimum size for the returned 605 buffer. It is acceptable to return smaller or larger buffers 606 than what *sizehint* suggests. When set to -1, the buffer size 607 can be arbitrary. It is an error to return a buffer with a zero size. 608 609 ``get_buffer()`` must return an object implementing the 610 :ref:`buffer protocol <bufferobjects>`. 611 612 .. method:: BufferedProtocol.buffer_updated(nbytes) 613 614 Called when the buffer was updated with the received data. 615 616 *nbytes* is the total number of bytes that were written to the buffer. 617 618 .. method:: BufferedProtocol.eof_received() 619 620 See the documentation of the :meth:`protocol.eof_received() 621 <Protocol.eof_received>` method. 622 623 624 :meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number 625 of times during a connection. However, :meth:`protocol.eof_received() 626 <Protocol.eof_received>` is called at most once 627 and, if called, :meth:`~BufferedProtocol.get_buffer` and 628 :meth:`~BufferedProtocol.buffer_updated` won't be called after it. 629 630 State machine: 631 632 .. code-block:: none 633 634 start -> connection_made 635 [-> get_buffer 636 [-> buffer_updated]? 637 ]* 638 [-> eof_received]? 639 -> connection_lost -> end 640 641 642 Datagram Protocols 643 ------------------ 644 645 Datagram Protocol instances should be constructed by protocol 646 factories passed to the :meth:`loop.create_datagram_endpoint` method. 647 648 .. method:: DatagramProtocol.datagram_received(data, addr) 649 650 Called when a datagram is received. *data* is a bytes object containing 651 the incoming data. *addr* is the address of the peer sending the data; 652 the exact format depends on the transport. 653 654 .. method:: DatagramProtocol.error_received(exc) 655 656 Called when a previous send or receive operation raises an 657 :class:`OSError`. *exc* is the :class:`OSError` instance. 658 659 This method is called in rare conditions, when the transport (e.g. UDP) 660 detects that a datagram could not be delivered to its recipient. 661 In many conditions though, undeliverable datagrams will be silently 662 dropped. 663 664 .. note:: 665 666 On BSD systems (macOS, FreeBSD, etc.) flow control is not supported 667 for datagram protocols, because there is no reliable way to detect send 668 failures caused by writing too many packets. 669 670 The socket always appears 'ready' and excess packets are dropped. An 671 :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may 672 or may not be raised; if it is raised, it will be reported to 673 :meth:`DatagramProtocol.error_received` but otherwise ignored. 674 675 676 .. _asyncio-subprocess-protocols: 677 678 Subprocess Protocols 679 -------------------- 680 681 Datagram Protocol instances should be constructed by protocol 682 factories passed to the :meth:`loop.subprocess_exec` and 683 :meth:`loop.subprocess_shell` methods. 684 685 .. method:: SubprocessProtocol.pipe_data_received(fd, data) 686 687 Called when the child process writes data into its stdout or stderr 688 pipe. 689 690 *fd* is the integer file descriptor of the pipe. 691 692 *data* is a non-empty bytes object containing the received data. 693 694 .. method:: SubprocessProtocol.pipe_connection_lost(fd, exc) 695 696 Called when one of the pipes communicating with the child process 697 is closed. 698 699 *fd* is the integer file descriptor that was closed. 700 701 .. method:: SubprocessProtocol.process_exited() 702 703 Called when the child process has exited. 704 705 706 Examples 707 ======== 708 709 .. _asyncio_example_tcp_echo_server_protocol: 710 711 TCP Echo Server 712 --------------- 713 714 Create a TCP echo server using the :meth:`loop.create_server` method, send back 715 received data, and close the connection:: 716 717 import asyncio 718 719 720 class EchoServerProtocol(asyncio.Protocol): 721 def connection_made(self, transport): 722 peername = transport.get_extra_info('peername') 723 print('Connection from {}'.format(peername)) 724 self.transport = transport 725 726 def data_received(self, data): 727 message = data.decode() 728 print('Data received: {!r}'.format(message)) 729 730 print('Send: {!r}'.format(message)) 731 self.transport.write(data) 732 733 print('Close the client socket') 734 self.transport.close() 735 736 737 async def main(): 738 # Get a reference to the event loop as we plan to use 739 # low-level APIs. 740 loop = asyncio.get_running_loop() 741 742 server = await loop.create_server( 743 lambda: EchoServerProtocol(), 744 '127.0.0.1', 8888) 745 746 async with server: 747 await server.serve_forever() 748 749 750 asyncio.run(main()) 751 752 753 .. seealso:: 754 755 The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>` 756 example uses the high-level :func:`asyncio.start_server` function. 757 758 .. _asyncio_example_tcp_echo_client_protocol: 759 760 TCP Echo Client 761 --------------- 762 763 A TCP echo client using the :meth:`loop.create_connection` method, sends 764 data, and waits until the connection is closed:: 765 766 import asyncio 767 768 769 class EchoClientProtocol(asyncio.Protocol): 770 def __init__(self, message, on_con_lost, loop): 771 self.message = message 772 self.loop = loop 773 self.on_con_lost = on_con_lost 774 775 def connection_made(self, transport): 776 transport.write(self.message.encode()) 777 print('Data sent: {!r}'.format(self.message)) 778 779 def data_received(self, data): 780 print('Data received: {!r}'.format(data.decode())) 781 782 def connection_lost(self, exc): 783 print('The server closed the connection') 784 self.on_con_lost.set_result(True) 785 786 787 async def main(): 788 # Get a reference to the event loop as we plan to use 789 # low-level APIs. 790 loop = asyncio.get_running_loop() 791 792 on_con_lost = loop.create_future() 793 message = 'Hello World!' 794 795 transport, protocol = await loop.create_connection( 796 lambda: EchoClientProtocol(message, on_con_lost, loop), 797 '127.0.0.1', 8888) 798 799 # Wait until the protocol signals that the connection 800 # is lost and close the transport. 801 try: 802 await on_con_lost 803 finally: 804 transport.close() 805 806 807 asyncio.run(main()) 808 809 810 .. seealso:: 811 812 The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` 813 example uses the high-level :func:`asyncio.open_connection` function. 814 815 816 .. _asyncio-udp-echo-server-protocol: 817 818 UDP Echo Server 819 --------------- 820 821 A UDP echo server, using the :meth:`loop.create_datagram_endpoint` 822 method, sends back received data:: 823 824 import asyncio 825 826 827 class EchoServerProtocol: 828 def connection_made(self, transport): 829 self.transport = transport 830 831 def datagram_received(self, data, addr): 832 message = data.decode() 833 print('Received %r from %s' % (message, addr)) 834 print('Send %r to %s' % (message, addr)) 835 self.transport.sendto(data, addr) 836 837 838 async def main(): 839 print("Starting UDP server") 840 841 # Get a reference to the event loop as we plan to use 842 # low-level APIs. 843 loop = asyncio.get_running_loop() 844 845 # One protocol instance will be created to serve all 846 # client requests. 847 transport, protocol = await loop.create_datagram_endpoint( 848 lambda: EchoServerProtocol(), 849 local_addr=('127.0.0.1', 9999)) 850 851 try: 852 await asyncio.sleep(3600) # Serve for 1 hour. 853 finally: 854 transport.close() 855 856 857 asyncio.run(main()) 858 859 860 .. _asyncio-udp-echo-client-protocol: 861 862 UDP Echo Client 863 --------------- 864 865 A UDP echo client, using the :meth:`loop.create_datagram_endpoint` 866 method, sends data and closes the transport when it receives the answer:: 867 868 import asyncio 869 870 871 class EchoClientProtocol: 872 def __init__(self, message, loop): 873 self.message = message 874 self.loop = loop 875 self.transport = None 876 self.on_con_lost = loop.create_future() 877 878 def connection_made(self, transport): 879 self.transport = transport 880 print('Send:', self.message) 881 self.transport.sendto(self.message.encode()) 882 883 def datagram_received(self, data, addr): 884 print("Received:", data.decode()) 885 886 print("Close the socket") 887 self.transport.close() 888 889 def error_received(self, exc): 890 print('Error received:', exc) 891 892 def connection_lost(self, exc): 893 print("Connection closed") 894 self.on_con_lost.set_result(True) 895 896 897 async def main(): 898 # Get a reference to the event loop as we plan to use 899 # low-level APIs. 900 loop = asyncio.get_running_loop() 901 902 message = "Hello World!" 903 transport, protocol = await loop.create_datagram_endpoint( 904 lambda: EchoClientProtocol(message, loop), 905 remote_addr=('127.0.0.1', 9999)) 906 907 try: 908 await protocol.on_con_lost 909 finally: 910 transport.close() 911 912 913 asyncio.run(main()) 914 915 916 .. _asyncio_example_create_connection: 917 918 Connecting Existing Sockets 919 --------------------------- 920 921 Wait until a socket receives data using the 922 :meth:`loop.create_connection` method with a protocol:: 923 924 import asyncio 925 import socket 926 927 928 class MyProtocol(asyncio.Protocol): 929 930 def __init__(self, loop): 931 self.transport = None 932 self.on_con_lost = loop.create_future() 933 934 def connection_made(self, transport): 935 self.transport = transport 936 937 def data_received(self, data): 938 print("Received:", data.decode()) 939 940 # We are done: close the transport; 941 # connection_lost() will be called automatically. 942 self.transport.close() 943 944 def connection_lost(self, exc): 945 # The socket has been closed 946 self.on_con_lost.set_result(True) 947 948 949 async def main(): 950 # Get a reference to the event loop as we plan to use 951 # low-level APIs. 952 loop = asyncio.get_running_loop() 953 954 # Create a pair of connected sockets 955 rsock, wsock = socket.socketpair() 956 957 # Register the socket to wait for data. 958 transport, protocol = await loop.create_connection( 959 lambda: MyProtocol(loop), sock=rsock) 960 961 # Simulate the reception of data from the network. 962 loop.call_soon(wsock.send, 'abc'.encode()) 963 964 try: 965 await protocol.on_con_lost 966 finally: 967 transport.close() 968 wsock.close() 969 970 asyncio.run(main()) 971 972 .. seealso:: 973 974 The :ref:`watch a file descriptor for read events 975 <asyncio_example_watch_fd>` example uses the low-level 976 :meth:`loop.add_reader` method to register an FD. 977 978 The :ref:`register an open socket to wait for data using streams 979 <asyncio_example_create_connection-streams>` example uses high-level streams 980 created by the :func:`open_connection` function in a coroutine. 981 982 .. _asyncio_example_subprocess_proto: 983 984 loop.subprocess_exec() and SubprocessProtocol 985 --------------------------------------------- 986 987 An example of a subprocess protocol used to get the output of a 988 subprocess and to wait for the subprocess exit. 989 990 The subprocess is created by th :meth:`loop.subprocess_exec` method:: 991 992 import asyncio 993 import sys 994 995 class DateProtocol(asyncio.SubprocessProtocol): 996 def __init__(self, exit_future): 997 self.exit_future = exit_future 998 self.output = bytearray() 999 1000 def pipe_data_received(self, fd, data): 1001 self.output.extend(data) 1002 1003 def process_exited(self): 1004 self.exit_future.set_result(True) 1005 1006 async def get_date(): 1007 # Get a reference to the event loop as we plan to use 1008 # low-level APIs. 1009 loop = asyncio.get_running_loop() 1010 1011 code = 'import datetime; print(datetime.datetime.now())' 1012 exit_future = asyncio.Future(loop=loop) 1013 1014 # Create the subprocess controlled by DateProtocol; 1015 # redirect the standard output into a pipe. 1016 transport, protocol = await loop.subprocess_exec( 1017 lambda: DateProtocol(exit_future), 1018 sys.executable, '-c', code, 1019 stdin=None, stderr=None) 1020 1021 # Wait for the subprocess exit using the process_exited() 1022 # method of the protocol. 1023 await exit_future 1024 1025 # Close the stdout pipe. 1026 transport.close() 1027 1028 # Read the output which was collected by the 1029 # pipe_data_received() method of the protocol. 1030 data = bytes(protocol.output) 1031 return data.decode('ascii').rstrip() 1032 1033 if sys.platform == "win32": 1034 asyncio.set_event_loop_policy( 1035 asyncio.WindowsProactorEventLoopPolicy()) 1036 1037 date = asyncio.run(get_date()) 1038 print(f"Current date: {date}") 1039 1040 See also the :ref:`same example <asyncio_example_create_subprocess_exec>` 1041 written using high-level APIs. 1042