1 """Tests for events.py.""" 2 3 import collections.abc 4 import concurrent.futures 5 import functools 6 import gc 7 import io 8 import os 9 import platform 10 import re 11 import signal 12 import socket 13 try: 14 import ssl 15 except ImportError: 16 ssl = None 17 import subprocess 18 import sys 19 import threading 20 import time 21 import errno 22 import unittest 23 from unittest import mock 24 import weakref 25 26 if sys.platform != 'win32': 27 import tty 28 29 import asyncio 30 from asyncio import coroutines 31 from asyncio import proactor_events 32 from asyncio import selector_events 33 from asyncio import sslproto 34 from asyncio import test_utils 35 try: 36 from test import support 37 except ImportError: 38 from asyncio import test_support as support 39 40 41 def data_file(filename): 42 if hasattr(support, 'TEST_HOME_DIR'): 43 fullname = os.path.join(support.TEST_HOME_DIR, filename) 44 if os.path.isfile(fullname): 45 return fullname 46 fullname = os.path.join(os.path.dirname(__file__), filename) 47 if os.path.isfile(fullname): 48 return fullname 49 raise FileNotFoundError(filename) 50 51 52 def osx_tiger(): 53 """Return True if the platform is Mac OS 10.4 or older.""" 54 if sys.platform != 'darwin': 55 return False 56 version = platform.mac_ver()[0] 57 version = tuple(map(int, version.split('.'))) 58 return version < (10, 5) 59 60 61 def _test_get_event_loop_new_process__sub_proc(): 62 async def doit(): 63 return 'hello' 64 65 loop = asyncio.new_event_loop() 66 asyncio.set_event_loop(loop) 67 return loop.run_until_complete(doit()) 68 69 70 ONLYCERT = data_file('ssl_cert.pem') 71 ONLYKEY = data_file('ssl_key.pem') 72 SIGNED_CERTFILE = data_file('keycert3.pem') 73 SIGNING_CA = data_file('pycacert.pem') 74 PEERCERT = {'serialNumber': 'B09264B1F2DA21D1', 75 'version': 1, 76 'subject': ((('countryName', 'XY'),), 77 (('localityName', 'Castle Anthrax'),), 78 (('organizationName', 'Python Software Foundation'),), 79 (('commonName', 'localhost'),)), 80 'issuer': ((('countryName', 'XY'),), 81 (('organizationName', 'Python Software Foundation CA'),), 82 (('commonName', 'our-ca-server'),)), 83 'notAfter': 'Nov 13 19:47:07 2022 GMT', 84 'notBefore': 'Jan 4 19:47:07 2013 GMT'} 85 86 87 class MyBaseProto(asyncio.Protocol): 88 connected = None 89 done = None 90 91 def __init__(self, loop=None): 92 self.transport = None 93 self.state = 'INITIAL' 94 self.nbytes = 0 95 if loop is not None: 96 self.connected = asyncio.Future(loop=loop) 97 self.done = asyncio.Future(loop=loop) 98 99 def connection_made(self, transport): 100 self.transport = transport 101 assert self.state == 'INITIAL', self.state 102 self.state = 'CONNECTED' 103 if self.connected: 104 self.connected.set_result(None) 105 106 def data_received(self, data): 107 assert self.state == 'CONNECTED', self.state 108 self.nbytes += len(data) 109 110 def eof_received(self): 111 assert self.state == 'CONNECTED', self.state 112 self.state = 'EOF' 113 114 def connection_lost(self, exc): 115 assert self.state in ('CONNECTED', 'EOF'), self.state 116 self.state = 'CLOSED' 117 if self.done: 118 self.done.set_result(None) 119 120 121 class MyProto(MyBaseProto): 122 def connection_made(self, transport): 123 super().connection_made(transport) 124 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 125 126 127 class MyDatagramProto(asyncio.DatagramProtocol): 128 done = None 129 130 def __init__(self, loop=None): 131 self.state = 'INITIAL' 132 self.nbytes = 0 133 if loop is not None: 134 self.done = asyncio.Future(loop=loop) 135 136 def connection_made(self, transport): 137 self.transport = transport 138 assert self.state == 'INITIAL', self.state 139 self.state = 'INITIALIZED' 140 141 def datagram_received(self, data, addr): 142 assert self.state == 'INITIALIZED', self.state 143 self.nbytes += len(data) 144 145 def error_received(self, exc): 146 assert self.state == 'INITIALIZED', self.state 147 148 def connection_lost(self, exc): 149 assert self.state == 'INITIALIZED', self.state 150 self.state = 'CLOSED' 151 if self.done: 152 self.done.set_result(None) 153 154 155 class MyReadPipeProto(asyncio.Protocol): 156 done = None 157 158 def __init__(self, loop=None): 159 self.state = ['INITIAL'] 160 self.nbytes = 0 161 self.transport = None 162 if loop is not None: 163 self.done = asyncio.Future(loop=loop) 164 165 def connection_made(self, transport): 166 self.transport = transport 167 assert self.state == ['INITIAL'], self.state 168 self.state.append('CONNECTED') 169 170 def data_received(self, data): 171 assert self.state == ['INITIAL', 'CONNECTED'], self.state 172 self.nbytes += len(data) 173 174 def eof_received(self): 175 assert self.state == ['INITIAL', 'CONNECTED'], self.state 176 self.state.append('EOF') 177 178 def connection_lost(self, exc): 179 if 'EOF' not in self.state: 180 self.state.append('EOF') # It is okay if EOF is missed. 181 assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state 182 self.state.append('CLOSED') 183 if self.done: 184 self.done.set_result(None) 185 186 187 class MyWritePipeProto(asyncio.BaseProtocol): 188 done = None 189 190 def __init__(self, loop=None): 191 self.state = 'INITIAL' 192 self.transport = None 193 if loop is not None: 194 self.done = asyncio.Future(loop=loop) 195 196 def connection_made(self, transport): 197 self.transport = transport 198 assert self.state == 'INITIAL', self.state 199 self.state = 'CONNECTED' 200 201 def connection_lost(self, exc): 202 assert self.state == 'CONNECTED', self.state 203 self.state = 'CLOSED' 204 if self.done: 205 self.done.set_result(None) 206 207 208 class MySubprocessProtocol(asyncio.SubprocessProtocol): 209 210 def __init__(self, loop): 211 self.state = 'INITIAL' 212 self.transport = None 213 self.connected = asyncio.Future(loop=loop) 214 self.completed = asyncio.Future(loop=loop) 215 self.disconnects = {fd: asyncio.Future(loop=loop) for fd in range(3)} 216 self.data = {1: b'', 2: b''} 217 self.returncode = None 218 self.got_data = {1: asyncio.Event(loop=loop), 219 2: asyncio.Event(loop=loop)} 220 221 def connection_made(self, transport): 222 self.transport = transport 223 assert self.state == 'INITIAL', self.state 224 self.state = 'CONNECTED' 225 self.connected.set_result(None) 226 227 def connection_lost(self, exc): 228 assert self.state == 'CONNECTED', self.state 229 self.state = 'CLOSED' 230 self.completed.set_result(None) 231 232 def pipe_data_received(self, fd, data): 233 assert self.state == 'CONNECTED', self.state 234 self.data[fd] += data 235 self.got_data[fd].set() 236 237 def pipe_connection_lost(self, fd, exc): 238 assert self.state == 'CONNECTED', self.state 239 if exc: 240 self.disconnects[fd].set_exception(exc) 241 else: 242 self.disconnects[fd].set_result(exc) 243 244 def process_exited(self): 245 assert self.state == 'CONNECTED', self.state 246 self.returncode = self.transport.get_returncode() 247 248 249 class EventLoopTestsMixin: 250 251 def setUp(self): 252 super().setUp() 253 self.loop = self.create_event_loop() 254 self.set_event_loop(self.loop) 255 256 def tearDown(self): 257 # just in case if we have transport close callbacks 258 if not self.loop.is_closed(): 259 test_utils.run_briefly(self.loop) 260 261 self.loop.close() 262 gc.collect() 263 super().tearDown() 264 265 def test_run_until_complete_nesting(self): 266 @asyncio.coroutine 267 def coro1(): 268 yield 269 270 @asyncio.coroutine 271 def coro2(): 272 self.assertTrue(self.loop.is_running()) 273 self.loop.run_until_complete(coro1()) 274 275 self.assertRaises( 276 RuntimeError, self.loop.run_until_complete, coro2()) 277 278 # Note: because of the default Windows timing granularity of 279 # 15.6 msec, we use fairly long sleep times here (~100 msec). 280 281 def test_run_until_complete(self): 282 t0 = self.loop.time() 283 self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop)) 284 t1 = self.loop.time() 285 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 286 287 def test_run_until_complete_stopped(self): 288 @asyncio.coroutine 289 def cb(): 290 self.loop.stop() 291 yield from asyncio.sleep(0.1, loop=self.loop) 292 task = cb() 293 self.assertRaises(RuntimeError, 294 self.loop.run_until_complete, task) 295 296 def test_call_later(self): 297 results = [] 298 299 def callback(arg): 300 results.append(arg) 301 self.loop.stop() 302 303 self.loop.call_later(0.1, callback, 'hello world') 304 t0 = time.monotonic() 305 self.loop.run_forever() 306 t1 = time.monotonic() 307 self.assertEqual(results, ['hello world']) 308 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 309 310 def test_call_soon(self): 311 results = [] 312 313 def callback(arg1, arg2): 314 results.append((arg1, arg2)) 315 self.loop.stop() 316 317 self.loop.call_soon(callback, 'hello', 'world') 318 self.loop.run_forever() 319 self.assertEqual(results, [('hello', 'world')]) 320 321 def test_call_soon_threadsafe(self): 322 results = [] 323 lock = threading.Lock() 324 325 def callback(arg): 326 results.append(arg) 327 if len(results) >= 2: 328 self.loop.stop() 329 330 def run_in_thread(): 331 self.loop.call_soon_threadsafe(callback, 'hello') 332 lock.release() 333 334 lock.acquire() 335 t = threading.Thread(target=run_in_thread) 336 t.start() 337 338 with lock: 339 self.loop.call_soon(callback, 'world') 340 self.loop.run_forever() 341 t.join() 342 self.assertEqual(results, ['hello', 'world']) 343 344 def test_call_soon_threadsafe_same_thread(self): 345 results = [] 346 347 def callback(arg): 348 results.append(arg) 349 if len(results) >= 2: 350 self.loop.stop() 351 352 self.loop.call_soon_threadsafe(callback, 'hello') 353 self.loop.call_soon(callback, 'world') 354 self.loop.run_forever() 355 self.assertEqual(results, ['hello', 'world']) 356 357 def test_run_in_executor(self): 358 def run(arg): 359 return (arg, threading.get_ident()) 360 f2 = self.loop.run_in_executor(None, run, 'yo') 361 res, thread_id = self.loop.run_until_complete(f2) 362 self.assertEqual(res, 'yo') 363 self.assertNotEqual(thread_id, threading.get_ident()) 364 365 def test_reader_callback(self): 366 r, w = test_utils.socketpair() 367 r.setblocking(False) 368 bytes_read = bytearray() 369 370 def reader(): 371 try: 372 data = r.recv(1024) 373 except BlockingIOError: 374 # Spurious readiness notifications are possible 375 # at least on Linux -- see man select. 376 return 377 if data: 378 bytes_read.extend(data) 379 else: 380 self.assertTrue(self.loop.remove_reader(r.fileno())) 381 r.close() 382 383 self.loop.add_reader(r.fileno(), reader) 384 self.loop.call_soon(w.send, b'abc') 385 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) 386 self.loop.call_soon(w.send, b'def') 387 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) 388 self.loop.call_soon(w.close) 389 self.loop.call_soon(self.loop.stop) 390 self.loop.run_forever() 391 self.assertEqual(bytes_read, b'abcdef') 392 393 def test_writer_callback(self): 394 r, w = test_utils.socketpair() 395 w.setblocking(False) 396 397 def writer(data): 398 w.send(data) 399 self.loop.stop() 400 401 data = b'x' * 1024 402 self.loop.add_writer(w.fileno(), writer, data) 403 self.loop.run_forever() 404 405 self.assertTrue(self.loop.remove_writer(w.fileno())) 406 self.assertFalse(self.loop.remove_writer(w.fileno())) 407 408 w.close() 409 read = r.recv(len(data) * 2) 410 r.close() 411 self.assertEqual(read, data) 412 413 def _basetest_sock_client_ops(self, httpd, sock): 414 if not isinstance(self.loop, proactor_events.BaseProactorEventLoop): 415 # in debug mode, socket operations must fail 416 # if the socket is not in blocking mode 417 self.loop.set_debug(True) 418 sock.setblocking(True) 419 with self.assertRaises(ValueError): 420 self.loop.run_until_complete( 421 self.loop.sock_connect(sock, httpd.address)) 422 with self.assertRaises(ValueError): 423 self.loop.run_until_complete( 424 self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) 425 with self.assertRaises(ValueError): 426 self.loop.run_until_complete( 427 self.loop.sock_recv(sock, 1024)) 428 with self.assertRaises(ValueError): 429 self.loop.run_until_complete( 430 self.loop.sock_accept(sock)) 431 432 # test in non-blocking mode 433 sock.setblocking(False) 434 self.loop.run_until_complete( 435 self.loop.sock_connect(sock, httpd.address)) 436 self.loop.run_until_complete( 437 self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) 438 data = self.loop.run_until_complete( 439 self.loop.sock_recv(sock, 1024)) 440 # consume data 441 self.loop.run_until_complete( 442 self.loop.sock_recv(sock, 1024)) 443 sock.close() 444 self.assertTrue(data.startswith(b'HTTP/1.0 200 OK')) 445 446 def test_sock_client_ops(self): 447 with test_utils.run_test_server() as httpd: 448 sock = socket.socket() 449 self._basetest_sock_client_ops(httpd, sock) 450 451 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 452 def test_unix_sock_client_ops(self): 453 with test_utils.run_test_unix_server() as httpd: 454 sock = socket.socket(socket.AF_UNIX) 455 self._basetest_sock_client_ops(httpd, sock) 456 457 def test_sock_client_fail(self): 458 # Make sure that we will get an unused port 459 address = None 460 try: 461 s = socket.socket() 462 s.bind(('127.0.0.1', 0)) 463 address = s.getsockname() 464 finally: 465 s.close() 466 467 sock = socket.socket() 468 sock.setblocking(False) 469 with self.assertRaises(ConnectionRefusedError): 470 self.loop.run_until_complete( 471 self.loop.sock_connect(sock, address)) 472 sock.close() 473 474 def test_sock_accept(self): 475 listener = socket.socket() 476 listener.setblocking(False) 477 listener.bind(('127.0.0.1', 0)) 478 listener.listen(1) 479 client = socket.socket() 480 client.connect(listener.getsockname()) 481 482 f = self.loop.sock_accept(listener) 483 conn, addr = self.loop.run_until_complete(f) 484 self.assertEqual(conn.gettimeout(), 0) 485 self.assertEqual(addr, client.getsockname()) 486 self.assertEqual(client.getpeername(), listener.getsockname()) 487 client.close() 488 conn.close() 489 listener.close() 490 491 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL') 492 def test_add_signal_handler(self): 493 caught = 0 494 495 def my_handler(): 496 nonlocal caught 497 caught += 1 498 499 # Check error behavior first. 500 self.assertRaises( 501 TypeError, self.loop.add_signal_handler, 'boom', my_handler) 502 self.assertRaises( 503 TypeError, self.loop.remove_signal_handler, 'boom') 504 self.assertRaises( 505 ValueError, self.loop.add_signal_handler, signal.NSIG+1, 506 my_handler) 507 self.assertRaises( 508 ValueError, self.loop.remove_signal_handler, signal.NSIG+1) 509 self.assertRaises( 510 ValueError, self.loop.add_signal_handler, 0, my_handler) 511 self.assertRaises( 512 ValueError, self.loop.remove_signal_handler, 0) 513 self.assertRaises( 514 ValueError, self.loop.add_signal_handler, -1, my_handler) 515 self.assertRaises( 516 ValueError, self.loop.remove_signal_handler, -1) 517 self.assertRaises( 518 RuntimeError, self.loop.add_signal_handler, signal.SIGKILL, 519 my_handler) 520 # Removing SIGKILL doesn't raise, since we don't call signal(). 521 self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) 522 # Now set a handler and handle it. 523 self.loop.add_signal_handler(signal.SIGINT, my_handler) 524 525 os.kill(os.getpid(), signal.SIGINT) 526 test_utils.run_until(self.loop, lambda: caught) 527 528 # Removing it should restore the default handler. 529 self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) 530 self.assertEqual(signal.getsignal(signal.SIGINT), 531 signal.default_int_handler) 532 # Removing again returns False. 533 self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT)) 534 535 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 536 def test_signal_handling_while_selecting(self): 537 # Test with a signal actually arriving during a select() call. 538 caught = 0 539 540 def my_handler(): 541 nonlocal caught 542 caught += 1 543 self.loop.stop() 544 545 self.loop.add_signal_handler(signal.SIGALRM, my_handler) 546 547 signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. 548 self.loop.run_forever() 549 self.assertEqual(caught, 1) 550 551 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 552 def test_signal_handling_args(self): 553 some_args = (42,) 554 caught = 0 555 556 def my_handler(*args): 557 nonlocal caught 558 caught += 1 559 self.assertEqual(args, some_args) 560 561 self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) 562 563 signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. 564 self.loop.call_later(0.5, self.loop.stop) 565 self.loop.run_forever() 566 self.assertEqual(caught, 1) 567 568 def _basetest_create_connection(self, connection_fut, check_sockname=True): 569 tr, pr = self.loop.run_until_complete(connection_fut) 570 self.assertIsInstance(tr, asyncio.Transport) 571 self.assertIsInstance(pr, asyncio.Protocol) 572 self.assertIs(pr.transport, tr) 573 if check_sockname: 574 self.assertIsNotNone(tr.get_extra_info('sockname')) 575 self.loop.run_until_complete(pr.done) 576 self.assertGreater(pr.nbytes, 0) 577 tr.close() 578 579 def test_create_connection(self): 580 with test_utils.run_test_server() as httpd: 581 conn_fut = self.loop.create_connection( 582 lambda: MyProto(loop=self.loop), *httpd.address) 583 self._basetest_create_connection(conn_fut) 584 585 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 586 def test_create_unix_connection(self): 587 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 588 # zero-length address for UNIX socket. 589 check_sockname = not osx_tiger() 590 591 with test_utils.run_test_unix_server() as httpd: 592 conn_fut = self.loop.create_unix_connection( 593 lambda: MyProto(loop=self.loop), httpd.address) 594 self._basetest_create_connection(conn_fut, check_sockname) 595 596 def test_create_connection_sock(self): 597 with test_utils.run_test_server() as httpd: 598 sock = None 599 infos = self.loop.run_until_complete( 600 self.loop.getaddrinfo( 601 *httpd.address, type=socket.SOCK_STREAM)) 602 for family, type, proto, cname, address in infos: 603 try: 604 sock = socket.socket(family=family, type=type, proto=proto) 605 sock.setblocking(False) 606 self.loop.run_until_complete( 607 self.loop.sock_connect(sock, address)) 608 except: 609 pass 610 else: 611 break 612 else: 613 assert False, 'Can not create socket.' 614 615 f = self.loop.create_connection( 616 lambda: MyProto(loop=self.loop), sock=sock) 617 tr, pr = self.loop.run_until_complete(f) 618 self.assertIsInstance(tr, asyncio.Transport) 619 self.assertIsInstance(pr, asyncio.Protocol) 620 self.loop.run_until_complete(pr.done) 621 self.assertGreater(pr.nbytes, 0) 622 tr.close() 623 624 def check_ssl_extra_info(self, client, check_sockname=True, 625 peername=None, peercert={}): 626 if check_sockname: 627 self.assertIsNotNone(client.get_extra_info('sockname')) 628 if peername: 629 self.assertEqual(peername, 630 client.get_extra_info('peername')) 631 else: 632 self.assertIsNotNone(client.get_extra_info('peername')) 633 self.assertEqual(peercert, 634 client.get_extra_info('peercert')) 635 636 # test SSL cipher 637 cipher = client.get_extra_info('cipher') 638 self.assertIsInstance(cipher, tuple) 639 self.assertEqual(len(cipher), 3, cipher) 640 self.assertIsInstance(cipher[0], str) 641 self.assertIsInstance(cipher[1], str) 642 self.assertIsInstance(cipher[2], int) 643 644 # test SSL object 645 sslobj = client.get_extra_info('ssl_object') 646 self.assertIsNotNone(sslobj) 647 self.assertEqual(sslobj.compression(), 648 client.get_extra_info('compression')) 649 self.assertEqual(sslobj.cipher(), 650 client.get_extra_info('cipher')) 651 self.assertEqual(sslobj.getpeercert(), 652 client.get_extra_info('peercert')) 653 self.assertEqual(sslobj.compression(), 654 client.get_extra_info('compression')) 655 656 def _basetest_create_ssl_connection(self, connection_fut, 657 check_sockname=True, 658 peername=None): 659 tr, pr = self.loop.run_until_complete(connection_fut) 660 self.assertIsInstance(tr, asyncio.Transport) 661 self.assertIsInstance(pr, asyncio.Protocol) 662 self.assertTrue('ssl' in tr.__class__.__name__.lower()) 663 self.check_ssl_extra_info(tr, check_sockname, peername) 664 self.loop.run_until_complete(pr.done) 665 self.assertGreater(pr.nbytes, 0) 666 tr.close() 667 668 def _test_create_ssl_connection(self, httpd, create_connection, 669 check_sockname=True, peername=None): 670 conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) 671 self._basetest_create_ssl_connection(conn_fut, check_sockname, 672 peername) 673 674 # ssl.Purpose was introduced in Python 3.4 675 if hasattr(ssl, 'Purpose'): 676 def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *, 677 cafile=None, capath=None, 678 cadata=None): 679 """ 680 A ssl.create_default_context() replacement that doesn't enable 681 cert validation. 682 """ 683 self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH) 684 return test_utils.dummy_ssl_context() 685 686 # With ssl=True, ssl.create_default_context() should be called 687 with mock.patch('ssl.create_default_context', 688 side_effect=_dummy_ssl_create_context) as m: 689 conn_fut = create_connection(ssl=True) 690 self._basetest_create_ssl_connection(conn_fut, check_sockname, 691 peername) 692 self.assertEqual(m.call_count, 1) 693 694 # With the real ssl.create_default_context(), certificate 695 # validation will fail 696 with self.assertRaises(ssl.SSLError) as cm: 697 conn_fut = create_connection(ssl=True) 698 # Ignore the "SSL handshake failed" log in debug mode 699 with test_utils.disable_logger(): 700 self._basetest_create_ssl_connection(conn_fut, check_sockname, 701 peername) 702 703 self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 704 705 @unittest.skipIf(ssl is None, 'No ssl module') 706 def test_create_ssl_connection(self): 707 with test_utils.run_test_server(use_ssl=True) as httpd: 708 create_connection = functools.partial( 709 self.loop.create_connection, 710 lambda: MyProto(loop=self.loop), 711 *httpd.address) 712 self._test_create_ssl_connection(httpd, create_connection, 713 peername=httpd.address) 714 715 def test_legacy_create_ssl_connection(self): 716 with test_utils.force_legacy_ssl_support(): 717 self.test_create_ssl_connection() 718 719 @unittest.skipIf(ssl is None, 'No ssl module') 720 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 721 def test_create_ssl_unix_connection(self): 722 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 723 # zero-length address for UNIX socket. 724 check_sockname = not osx_tiger() 725 726 with test_utils.run_test_unix_server(use_ssl=True) as httpd: 727 create_connection = functools.partial( 728 self.loop.create_unix_connection, 729 lambda: MyProto(loop=self.loop), httpd.address, 730 server_hostname='127.0.0.1') 731 732 self._test_create_ssl_connection(httpd, create_connection, 733 check_sockname, 734 peername=httpd.address) 735 736 def test_legacy_create_ssl_unix_connection(self): 737 with test_utils.force_legacy_ssl_support(): 738 self.test_create_ssl_unix_connection() 739 740 def test_create_connection_local_addr(self): 741 with test_utils.run_test_server() as httpd: 742 port = support.find_unused_port() 743 f = self.loop.create_connection( 744 lambda: MyProto(loop=self.loop), 745 *httpd.address, local_addr=(httpd.address[0], port)) 746 tr, pr = self.loop.run_until_complete(f) 747 expected = pr.transport.get_extra_info('sockname')[1] 748 self.assertEqual(port, expected) 749 tr.close() 750 751 def test_create_connection_local_addr_in_use(self): 752 with test_utils.run_test_server() as httpd: 753 f = self.loop.create_connection( 754 lambda: MyProto(loop=self.loop), 755 *httpd.address, local_addr=httpd.address) 756 with self.assertRaises(OSError) as cm: 757 self.loop.run_until_complete(f) 758 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 759 self.assertIn(str(httpd.address), cm.exception.strerror) 760 761 def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None): 762 loop = self.loop 763 764 class MyProto(MyBaseProto): 765 766 def connection_lost(self, exc): 767 super().connection_lost(exc) 768 loop.call_soon(loop.stop) 769 770 def data_received(self, data): 771 super().data_received(data) 772 self.transport.write(expected_response) 773 774 lsock = socket.socket() 775 lsock.bind(('127.0.0.1', 0)) 776 lsock.listen(1) 777 addr = lsock.getsockname() 778 779 message = b'test data' 780 response = None 781 expected_response = b'roger' 782 783 def client(): 784 nonlocal response 785 try: 786 csock = socket.socket() 787 if client_ssl is not None: 788 csock = client_ssl.wrap_socket(csock) 789 csock.connect(addr) 790 csock.sendall(message) 791 response = csock.recv(99) 792 csock.close() 793 except Exception as exc: 794 print( 795 "Failure in client thread in test_connect_accepted_socket", 796 exc) 797 798 thread = threading.Thread(target=client, daemon=True) 799 thread.start() 800 801 conn, _ = lsock.accept() 802 proto = MyProto(loop=loop) 803 proto.loop = loop 804 loop.run_until_complete( 805 loop.connect_accepted_socket( 806 (lambda: proto), conn, ssl=server_ssl)) 807 loop.run_forever() 808 proto.transport.close() 809 lsock.close() 810 811 thread.join(1) 812 self.assertFalse(thread.is_alive()) 813 self.assertEqual(proto.state, 'CLOSED') 814 self.assertEqual(proto.nbytes, len(message)) 815 self.assertEqual(response, expected_response) 816 817 @unittest.skipIf(ssl is None, 'No ssl module') 818 def test_ssl_connect_accepted_socket(self): 819 if (sys.platform == 'win32' and 820 sys.version_info < (3, 5) and 821 isinstance(self.loop, proactor_events.BaseProactorEventLoop) 822 ): 823 raise unittest.SkipTest( 824 'SSL not supported with proactor event loops before Python 3.5' 825 ) 826 827 server_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 828 server_context.load_cert_chain(ONLYCERT, ONLYKEY) 829 if hasattr(server_context, 'check_hostname'): 830 server_context.check_hostname = False 831 server_context.verify_mode = ssl.CERT_NONE 832 833 client_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 834 if hasattr(server_context, 'check_hostname'): 835 client_context.check_hostname = False 836 client_context.verify_mode = ssl.CERT_NONE 837 838 self.test_connect_accepted_socket(server_context, client_context) 839 840 @mock.patch('asyncio.base_events.socket') 841 def create_server_multiple_hosts(self, family, hosts, mock_sock): 842 @asyncio.coroutine 843 def getaddrinfo(host, port, *args, **kw): 844 if family == socket.AF_INET: 845 return [(family, socket.SOCK_STREAM, 6, '', (host, port))] 846 else: 847 return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))] 848 849 def getaddrinfo_task(*args, **kwds): 850 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 851 852 unique_hosts = set(hosts) 853 854 if family == socket.AF_INET: 855 mock_sock.socket().getsockbyname.side_effect = [ 856 (host, 80) for host in unique_hosts] 857 else: 858 mock_sock.socket().getsockbyname.side_effect = [ 859 (host, 80, 0, 0) for host in unique_hosts] 860 self.loop.getaddrinfo = getaddrinfo_task 861 self.loop._start_serving = mock.Mock() 862 self.loop._stop_serving = mock.Mock() 863 f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80) 864 server = self.loop.run_until_complete(f) 865 self.addCleanup(server.close) 866 server_hosts = {sock.getsockbyname()[0] for sock in server.sockets} 867 self.assertEqual(server_hosts, unique_hosts) 868 869 def test_create_server_multiple_hosts_ipv4(self): 870 self.create_server_multiple_hosts(socket.AF_INET, 871 ['1.2.3.4', '5.6.7.8', '1.2.3.4']) 872 873 def test_create_server_multiple_hosts_ipv6(self): 874 self.create_server_multiple_hosts(socket.AF_INET6, 875 ['::1', '::2', '::1']) 876 877 def test_create_server(self): 878 proto = MyProto(self.loop) 879 f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) 880 server = self.loop.run_until_complete(f) 881 self.assertEqual(len(server.sockets), 1) 882 sock = server.sockets[0] 883 host, port = sock.getsockname() 884 self.assertEqual(host, '0.0.0.0') 885 client = socket.socket() 886 client.connect(('127.0.0.1', port)) 887 client.sendall(b'xxx') 888 889 self.loop.run_until_complete(proto.connected) 890 self.assertEqual('CONNECTED', proto.state) 891 892 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 893 self.assertEqual(3, proto.nbytes) 894 895 # extra info is available 896 self.assertIsNotNone(proto.transport.get_extra_info('sockname')) 897 self.assertEqual('127.0.0.1', 898 proto.transport.get_extra_info('peername')[0]) 899 900 # close connection 901 proto.transport.close() 902 self.loop.run_until_complete(proto.done) 903 904 self.assertEqual('CLOSED', proto.state) 905 906 # the client socket must be closed after to avoid ECONNRESET upon 907 # recv()/send() on the serving socket 908 client.close() 909 910 # close server 911 server.close() 912 913 @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT') 914 def test_create_server_reuse_port(self): 915 proto = MyProto(self.loop) 916 f = self.loop.create_server( 917 lambda: proto, '0.0.0.0', 0) 918 server = self.loop.run_until_complete(f) 919 self.assertEqual(len(server.sockets), 1) 920 sock = server.sockets[0] 921 self.assertFalse( 922 sock.getsockopt( 923 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 924 server.close() 925 926 test_utils.run_briefly(self.loop) 927 928 proto = MyProto(self.loop) 929 f = self.loop.create_server( 930 lambda: proto, '0.0.0.0', 0, reuse_port=True) 931 server = self.loop.run_until_complete(f) 932 self.assertEqual(len(server.sockets), 1) 933 sock = server.sockets[0] 934 self.assertTrue( 935 sock.getsockopt( 936 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 937 server.close() 938 939 def _make_unix_server(self, factory, **kwargs): 940 path = test_utils.gen_unix_socket_path() 941 self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) 942 943 f = self.loop.create_unix_server(factory, path, **kwargs) 944 server = self.loop.run_until_complete(f) 945 946 return server, path 947 948 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 949 def test_create_unix_server(self): 950 proto = MyProto(loop=self.loop) 951 server, path = self._make_unix_server(lambda: proto) 952 self.assertEqual(len(server.sockets), 1) 953 954 client = socket.socket(socket.AF_UNIX) 955 client.connect(path) 956 client.sendall(b'xxx') 957 958 self.loop.run_until_complete(proto.connected) 959 self.assertEqual('CONNECTED', proto.state) 960 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 961 self.assertEqual(3, proto.nbytes) 962 963 # close connection 964 proto.transport.close() 965 self.loop.run_until_complete(proto.done) 966 967 self.assertEqual('CLOSED', proto.state) 968 969 # the client socket must be closed after to avoid ECONNRESET upon 970 # recv()/send() on the serving socket 971 client.close() 972 973 # close server 974 server.close() 975 976 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 977 def test_create_unix_server_path_socket_error(self): 978 proto = MyProto(loop=self.loop) 979 sock = socket.socket() 980 with sock: 981 f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock) 982 with self.assertRaisesRegex(ValueError, 983 'path and sock can not be specified ' 984 'at the same time'): 985 self.loop.run_until_complete(f) 986 987 def _create_ssl_context(self, certfile, keyfile=None): 988 sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 989 sslcontext.options |= ssl.OP_NO_SSLv2 990 sslcontext.load_cert_chain(certfile, keyfile) 991 return sslcontext 992 993 def _make_ssl_server(self, factory, certfile, keyfile=None): 994 sslcontext = self._create_ssl_context(certfile, keyfile) 995 996 f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext) 997 server = self.loop.run_until_complete(f) 998 999 sock = server.sockets[0] 1000 host, port = sock.getsockname() 1001 self.assertEqual(host, '127.0.0.1') 1002 return server, host, port 1003 1004 def _make_ssl_unix_server(self, factory, certfile, keyfile=None): 1005 sslcontext = self._create_ssl_context(certfile, keyfile) 1006 return self._make_unix_server(factory, ssl=sslcontext) 1007 1008 @unittest.skipIf(ssl is None, 'No ssl module') 1009 def test_create_server_ssl(self): 1010 proto = MyProto(loop=self.loop) 1011 server, host, port = self._make_ssl_server( 1012 lambda: proto, ONLYCERT, ONLYKEY) 1013 1014 f_c = self.loop.create_connection(MyBaseProto, host, port, 1015 ssl=test_utils.dummy_ssl_context()) 1016 client, pr = self.loop.run_until_complete(f_c) 1017 1018 client.write(b'xxx') 1019 self.loop.run_until_complete(proto.connected) 1020 self.assertEqual('CONNECTED', proto.state) 1021 1022 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 1023 self.assertEqual(3, proto.nbytes) 1024 1025 # extra info is available 1026 self.check_ssl_extra_info(client, peername=(host, port)) 1027 1028 # close connection 1029 proto.transport.close() 1030 self.loop.run_until_complete(proto.done) 1031 self.assertEqual('CLOSED', proto.state) 1032 1033 # the client socket must be closed after to avoid ECONNRESET upon 1034 # recv()/send() on the serving socket 1035 client.close() 1036 1037 # stop serving 1038 server.close() 1039 1040 def test_legacy_create_server_ssl(self): 1041 with test_utils.force_legacy_ssl_support(): 1042 self.test_create_server_ssl() 1043 1044 @unittest.skipIf(ssl is None, 'No ssl module') 1045 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1046 def test_create_unix_server_ssl(self): 1047 proto = MyProto(loop=self.loop) 1048 server, path = self._make_ssl_unix_server( 1049 lambda: proto, ONLYCERT, ONLYKEY) 1050 1051 f_c = self.loop.create_unix_connection( 1052 MyBaseProto, path, ssl=test_utils.dummy_ssl_context(), 1053 server_hostname='') 1054 1055 client, pr = self.loop.run_until_complete(f_c) 1056 1057 client.write(b'xxx') 1058 self.loop.run_until_complete(proto.connected) 1059 self.assertEqual('CONNECTED', proto.state) 1060 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 1061 self.assertEqual(3, proto.nbytes) 1062 1063 # close connection 1064 proto.transport.close() 1065 self.loop.run_until_complete(proto.done) 1066 self.assertEqual('CLOSED', proto.state) 1067 1068 # the client socket must be closed after to avoid ECONNRESET upon 1069 # recv()/send() on the serving socket 1070 client.close() 1071 1072 # stop serving 1073 server.close() 1074 1075 def test_legacy_create_unix_server_ssl(self): 1076 with test_utils.force_legacy_ssl_support(): 1077 self.test_create_unix_server_ssl() 1078 1079 @unittest.skipIf(ssl is None, 'No ssl module') 1080 def test_create_server_ssl_verify_failed(self): 1081 proto = MyProto(loop=self.loop) 1082 server, host, port = self._make_ssl_server( 1083 lambda: proto, SIGNED_CERTFILE) 1084 1085 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1086 sslcontext_client.options |= ssl.OP_NO_SSLv2 1087 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1088 if hasattr(sslcontext_client, 'check_hostname'): 1089 sslcontext_client.check_hostname = True 1090 1091 1092 # no CA loaded 1093 f_c = self.loop.create_connection(MyProto, host, port, 1094 ssl=sslcontext_client) 1095 with mock.patch.object(self.loop, 'call_exception_handler'): 1096 with test_utils.disable_logger(): 1097 with self.assertRaisesRegex(ssl.SSLError, 1098 '(?i)certificate.verify.failed'): 1099 self.loop.run_until_complete(f_c) 1100 1101 # execute the loop to log the connection error 1102 test_utils.run_briefly(self.loop) 1103 1104 # close connection 1105 self.assertIsNone(proto.transport) 1106 server.close() 1107 1108 def test_legacy_create_server_ssl_verify_failed(self): 1109 with test_utils.force_legacy_ssl_support(): 1110 self.test_create_server_ssl_verify_failed() 1111 1112 @unittest.skipIf(ssl is None, 'No ssl module') 1113 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1114 def test_create_unix_server_ssl_verify_failed(self): 1115 proto = MyProto(loop=self.loop) 1116 server, path = self._make_ssl_unix_server( 1117 lambda: proto, SIGNED_CERTFILE) 1118 1119 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1120 sslcontext_client.options |= ssl.OP_NO_SSLv2 1121 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1122 if hasattr(sslcontext_client, 'check_hostname'): 1123 sslcontext_client.check_hostname = True 1124 1125 # no CA loaded 1126 f_c = self.loop.create_unix_connection(MyProto, path, 1127 ssl=sslcontext_client, 1128 server_hostname='invalid') 1129 with mock.patch.object(self.loop, 'call_exception_handler'): 1130 with test_utils.disable_logger(): 1131 with self.assertRaisesRegex(ssl.SSLError, 1132 '(?i)certificate.verify.failed'): 1133 self.loop.run_until_complete(f_c) 1134 1135 # execute the loop to log the connection error 1136 test_utils.run_briefly(self.loop) 1137 1138 # close connection 1139 self.assertIsNone(proto.transport) 1140 server.close() 1141 1142 1143 def test_legacy_create_unix_server_ssl_verify_failed(self): 1144 with test_utils.force_legacy_ssl_support(): 1145 self.test_create_unix_server_ssl_verify_failed() 1146 1147 @unittest.skipIf(ssl is None, 'No ssl module') 1148 def test_create_server_ssl_match_failed(self): 1149 proto = MyProto(loop=self.loop) 1150 server, host, port = self._make_ssl_server( 1151 lambda: proto, SIGNED_CERTFILE) 1152 1153 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1154 sslcontext_client.options |= ssl.OP_NO_SSLv2 1155 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1156 sslcontext_client.load_verify_locations( 1157 cafile=SIGNING_CA) 1158 if hasattr(sslcontext_client, 'check_hostname'): 1159 sslcontext_client.check_hostname = True 1160 1161 # incorrect server_hostname 1162 f_c = self.loop.create_connection(MyProto, host, port, 1163 ssl=sslcontext_client) 1164 with mock.patch.object(self.loop, 'call_exception_handler'): 1165 with test_utils.disable_logger(): 1166 with self.assertRaisesRegex( 1167 ssl.CertificateError, 1168 "hostname '127.0.0.1' doesn't match 'localhost'"): 1169 self.loop.run_until_complete(f_c) 1170 1171 # close connection 1172 proto.transport.close() 1173 server.close() 1174 1175 def test_legacy_create_server_ssl_match_failed(self): 1176 with test_utils.force_legacy_ssl_support(): 1177 self.test_create_server_ssl_match_failed() 1178 1179 @unittest.skipIf(ssl is None, 'No ssl module') 1180 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1181 def test_create_unix_server_ssl_verified(self): 1182 proto = MyProto(loop=self.loop) 1183 server, path = self._make_ssl_unix_server( 1184 lambda: proto, SIGNED_CERTFILE) 1185 1186 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1187 sslcontext_client.options |= ssl.OP_NO_SSLv2 1188 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1189 sslcontext_client.load_verify_locations(cafile=SIGNING_CA) 1190 if hasattr(sslcontext_client, 'check_hostname'): 1191 sslcontext_client.check_hostname = True 1192 1193 # Connection succeeds with correct CA and server hostname. 1194 f_c = self.loop.create_unix_connection(MyProto, path, 1195 ssl=sslcontext_client, 1196 server_hostname='localhost') 1197 client, pr = self.loop.run_until_complete(f_c) 1198 1199 # close connection 1200 proto.transport.close() 1201 client.close() 1202 server.close() 1203 self.loop.run_until_complete(proto.done) 1204 1205 def test_legacy_create_unix_server_ssl_verified(self): 1206 with test_utils.force_legacy_ssl_support(): 1207 self.test_create_unix_server_ssl_verified() 1208 1209 @unittest.skipIf(ssl is None, 'No ssl module') 1210 def test_create_server_ssl_verified(self): 1211 proto = MyProto(loop=self.loop) 1212 server, host, port = self._make_ssl_server( 1213 lambda: proto, SIGNED_CERTFILE) 1214 1215 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1216 sslcontext_client.options |= ssl.OP_NO_SSLv2 1217 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1218 sslcontext_client.load_verify_locations(cafile=SIGNING_CA) 1219 if hasattr(sslcontext_client, 'check_hostname'): 1220 sslcontext_client.check_hostname = True 1221 1222 # Connection succeeds with correct CA and server hostname. 1223 f_c = self.loop.create_connection(MyProto, host, port, 1224 ssl=sslcontext_client, 1225 server_hostname='localhost') 1226 client, pr = self.loop.run_until_complete(f_c) 1227 1228 # extra info is available 1229 self.check_ssl_extra_info(client,peername=(host, port), 1230 peercert=PEERCERT) 1231 1232 # close connection 1233 proto.transport.close() 1234 client.close() 1235 server.close() 1236 self.loop.run_until_complete(proto.done) 1237 1238 def test_legacy_create_server_ssl_verified(self): 1239 with test_utils.force_legacy_ssl_support(): 1240 self.test_create_server_ssl_verified() 1241 1242 def test_create_server_sock(self): 1243 proto = asyncio.Future(loop=self.loop) 1244 1245 class TestMyProto(MyProto): 1246 def connection_made(self, transport): 1247 super().connection_made(transport) 1248 proto.set_result(self) 1249 1250 sock_ob = socket.socket(type=socket.SOCK_STREAM) 1251 sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1252 sock_ob.bind(('0.0.0.0', 0)) 1253 1254 f = self.loop.create_server(TestMyProto, sock=sock_ob) 1255 server = self.loop.run_until_complete(f) 1256 sock = server.sockets[0] 1257 self.assertIs(sock, sock_ob) 1258 1259 host, port = sock.getsockname() 1260 self.assertEqual(host, '0.0.0.0') 1261 client = socket.socket() 1262 client.connect(('127.0.0.1', port)) 1263 client.send(b'xxx') 1264 client.close() 1265 server.close() 1266 1267 def test_create_server_addr_in_use(self): 1268 sock_ob = socket.socket(type=socket.SOCK_STREAM) 1269 sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1270 sock_ob.bind(('0.0.0.0', 0)) 1271 1272 f = self.loop.create_server(MyProto, sock=sock_ob) 1273 server = self.loop.run_until_complete(f) 1274 sock = server.sockets[0] 1275 host, port = sock.getsockname() 1276 1277 f = self.loop.create_server(MyProto, host=host, port=port) 1278 with self.assertRaises(OSError) as cm: 1279 self.loop.run_until_complete(f) 1280 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 1281 1282 server.close() 1283 1284 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') 1285 def test_create_server_dual_stack(self): 1286 f_proto = asyncio.Future(loop=self.loop) 1287 1288 class TestMyProto(MyProto): 1289 def connection_made(self, transport): 1290 super().connection_made(transport) 1291 f_proto.set_result(self) 1292 1293 try_count = 0 1294 while True: 1295 try: 1296 port = support.find_unused_port() 1297 f = self.loop.create_server(TestMyProto, host=None, port=port) 1298 server = self.loop.run_until_complete(f) 1299 except OSError as ex: 1300 if ex.errno == errno.EADDRINUSE: 1301 try_count += 1 1302 self.assertGreaterEqual(5, try_count) 1303 continue 1304 else: 1305 raise 1306 else: 1307 break 1308 client = socket.socket() 1309 client.connect(('127.0.0.1', port)) 1310 client.send(b'xxx') 1311 proto = self.loop.run_until_complete(f_proto) 1312 proto.transport.close() 1313 client.close() 1314 1315 f_proto = asyncio.Future(loop=self.loop) 1316 client = socket.socket(socket.AF_INET6) 1317 client.connect(('::1', port)) 1318 client.send(b'xxx') 1319 proto = self.loop.run_until_complete(f_proto) 1320 proto.transport.close() 1321 client.close() 1322 1323 server.close() 1324 1325 def test_server_close(self): 1326 f = self.loop.create_server(MyProto, '0.0.0.0', 0) 1327 server = self.loop.run_until_complete(f) 1328 sock = server.sockets[0] 1329 host, port = sock.getsockname() 1330 1331 client = socket.socket() 1332 client.connect(('127.0.0.1', port)) 1333 client.send(b'xxx') 1334 client.close() 1335 1336 server.close() 1337 1338 client = socket.socket() 1339 self.assertRaises( 1340 ConnectionRefusedError, client.connect, ('127.0.0.1', port)) 1341 client.close() 1342 1343 def test_create_datagram_endpoint(self): 1344 class TestMyDatagramProto(MyDatagramProto): 1345 def __init__(inner_self): 1346 super().__init__(loop=self.loop) 1347 1348 def datagram_received(self, data, addr): 1349 super().datagram_received(data, addr) 1350 self.transport.sendto(b'resp:'+data, addr) 1351 1352 coro = self.loop.create_datagram_endpoint( 1353 TestMyDatagramProto, local_addr=('127.0.0.1', 0)) 1354 s_transport, server = self.loop.run_until_complete(coro) 1355 host, port = s_transport.get_extra_info('sockname') 1356 1357 self.assertIsInstance(s_transport, asyncio.Transport) 1358 self.assertIsInstance(server, TestMyDatagramProto) 1359 self.assertEqual('INITIALIZED', server.state) 1360 self.assertIs(server.transport, s_transport) 1361 1362 coro = self.loop.create_datagram_endpoint( 1363 lambda: MyDatagramProto(loop=self.loop), 1364 remote_addr=(host, port)) 1365 transport, client = self.loop.run_until_complete(coro) 1366 1367 self.assertIsInstance(transport, asyncio.Transport) 1368 self.assertIsInstance(client, MyDatagramProto) 1369 self.assertEqual('INITIALIZED', client.state) 1370 self.assertIs(client.transport, transport) 1371 1372 transport.sendto(b'xxx') 1373 test_utils.run_until(self.loop, lambda: server.nbytes) 1374 self.assertEqual(3, server.nbytes) 1375 test_utils.run_until(self.loop, lambda: client.nbytes) 1376 1377 # received 1378 self.assertEqual(8, client.nbytes) 1379 1380 # extra info is available 1381 self.assertIsNotNone(transport.get_extra_info('sockname')) 1382 1383 # close connection 1384 transport.close() 1385 self.loop.run_until_complete(client.done) 1386 self.assertEqual('CLOSED', client.state) 1387 server.transport.close() 1388 1389 def test_create_datagram_endpoint_sock(self): 1390 if (sys.platform == 'win32' and 1391 isinstance(self.loop, proactor_events.BaseProactorEventLoop)): 1392 raise unittest.SkipTest( 1393 'UDP is not supported with proactor event loops') 1394 1395 sock = None 1396 local_address = ('127.0.0.1', 0) 1397 infos = self.loop.run_until_complete( 1398 self.loop.getaddrinfo( 1399 *local_address, type=socket.SOCK_DGRAM)) 1400 for family, type, proto, cname, address in infos: 1401 try: 1402 sock = socket.socket(family=family, type=type, proto=proto) 1403 sock.setblocking(False) 1404 sock.bind(address) 1405 except: 1406 pass 1407 else: 1408 break 1409 else: 1410 assert False, 'Can not create socket.' 1411 1412 f = self.loop.create_datagram_endpoint( 1413 lambda: MyDatagramProto(loop=self.loop), sock=sock) 1414 tr, pr = self.loop.run_until_complete(f) 1415 self.assertIsInstance(tr, asyncio.Transport) 1416 self.assertIsInstance(pr, MyDatagramProto) 1417 tr.close() 1418 self.loop.run_until_complete(pr.done) 1419 1420 def test_internal_fds(self): 1421 loop = self.create_event_loop() 1422 if not isinstance(loop, selector_events.BaseSelectorEventLoop): 1423 loop.close() 1424 self.skipTest('loop is not a BaseSelectorEventLoop') 1425 1426 self.assertEqual(1, loop._internal_fds) 1427 loop.close() 1428 self.assertEqual(0, loop._internal_fds) 1429 self.assertIsNone(loop._csock) 1430 self.assertIsNone(loop._ssock) 1431 1432 @unittest.skipUnless(sys.platform != 'win32', 1433 "Don't support pipes for Windows") 1434 def test_read_pipe(self): 1435 proto = MyReadPipeProto(loop=self.loop) 1436 1437 rpipe, wpipe = os.pipe() 1438 pipeobj = io.open(rpipe, 'rb', 1024) 1439 1440 @asyncio.coroutine 1441 def connect(): 1442 t, p = yield from self.loop.connect_read_pipe( 1443 lambda: proto, pipeobj) 1444 self.assertIs(p, proto) 1445 self.assertIs(t, proto.transport) 1446 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1447 self.assertEqual(0, proto.nbytes) 1448 1449 self.loop.run_until_complete(connect()) 1450 1451 os.write(wpipe, b'1') 1452 test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) 1453 self.assertEqual(1, proto.nbytes) 1454 1455 os.write(wpipe, b'2345') 1456 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1457 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1458 self.assertEqual(5, proto.nbytes) 1459 1460 os.close(wpipe) 1461 self.loop.run_until_complete(proto.done) 1462 self.assertEqual( 1463 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1464 # extra info is available 1465 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1466 1467 @unittest.skipUnless(sys.platform != 'win32', 1468 "Don't support pipes for Windows") 1469 def test_unclosed_pipe_transport(self): 1470 # This test reproduces the issue #314 on GitHub 1471 loop = self.create_event_loop() 1472 read_proto = MyReadPipeProto(loop=loop) 1473 write_proto = MyWritePipeProto(loop=loop) 1474 1475 rpipe, wpipe = os.pipe() 1476 rpipeobj = io.open(rpipe, 'rb', 1024) 1477 wpipeobj = io.open(wpipe, 'w', 1024) 1478 1479 @asyncio.coroutine 1480 def connect(): 1481 read_transport, _ = yield from loop.connect_read_pipe( 1482 lambda: read_proto, rpipeobj) 1483 write_transport, _ = yield from loop.connect_write_pipe( 1484 lambda: write_proto, wpipeobj) 1485 return read_transport, write_transport 1486 1487 # Run and close the loop without closing the transports 1488 read_transport, write_transport = loop.run_until_complete(connect()) 1489 loop.close() 1490 1491 # These 'repr' calls used to raise an AttributeError 1492 # See Issue #314 on GitHub 1493 self.assertIn('open', repr(read_transport)) 1494 self.assertIn('open', repr(write_transport)) 1495 1496 # Clean up (avoid ResourceWarning) 1497 rpipeobj.close() 1498 wpipeobj.close() 1499 read_transport._pipe = None 1500 write_transport._pipe = None 1501 1502 @unittest.skipUnless(sys.platform != 'win32', 1503 "Don't support pipes for Windows") 1504 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1505 # older than 10.6 (Snow Leopard) 1506 @support.requires_mac_ver(10, 6) 1507 # Issue #20495: The test hangs on FreeBSD 7.2 but pass on FreeBSD 9 1508 @support.requires_freebsd_version(8) 1509 def test_read_pty_output(self): 1510 proto = MyReadPipeProto(loop=self.loop) 1511 1512 master, slave = os.openpty() 1513 master_read_obj = io.open(master, 'rb', 0) 1514 1515 @asyncio.coroutine 1516 def connect(): 1517 t, p = yield from self.loop.connect_read_pipe(lambda: proto, 1518 master_read_obj) 1519 self.assertIs(p, proto) 1520 self.assertIs(t, proto.transport) 1521 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1522 self.assertEqual(0, proto.nbytes) 1523 1524 self.loop.run_until_complete(connect()) 1525 1526 os.write(slave, b'1') 1527 test_utils.run_until(self.loop, lambda: proto.nbytes) 1528 self.assertEqual(1, proto.nbytes) 1529 1530 os.write(slave, b'2345') 1531 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1532 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1533 self.assertEqual(5, proto.nbytes) 1534 1535 os.close(slave) 1536 self.loop.run_until_complete(proto.done) 1537 self.assertEqual( 1538 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1539 # extra info is available 1540 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1541 1542 @unittest.skipUnless(sys.platform != 'win32', 1543 "Don't support pipes for Windows") 1544 def test_write_pipe(self): 1545 rpipe, wpipe = os.pipe() 1546 pipeobj = io.open(wpipe, 'wb', 1024) 1547 1548 proto = MyWritePipeProto(loop=self.loop) 1549 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1550 transport, p = self.loop.run_until_complete(connect) 1551 self.assertIs(p, proto) 1552 self.assertIs(transport, proto.transport) 1553 self.assertEqual('CONNECTED', proto.state) 1554 1555 transport.write(b'1') 1556 1557 data = bytearray() 1558 def reader(data): 1559 chunk = os.read(rpipe, 1024) 1560 data += chunk 1561 return len(data) 1562 1563 test_utils.run_until(self.loop, lambda: reader(data) >= 1) 1564 self.assertEqual(b'1', data) 1565 1566 transport.write(b'2345') 1567 test_utils.run_until(self.loop, lambda: reader(data) >= 5) 1568 self.assertEqual(b'12345', data) 1569 self.assertEqual('CONNECTED', proto.state) 1570 1571 os.close(rpipe) 1572 1573 # extra info is available 1574 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1575 1576 # close connection 1577 proto.transport.close() 1578 self.loop.run_until_complete(proto.done) 1579 self.assertEqual('CLOSED', proto.state) 1580 1581 @unittest.skipUnless(sys.platform != 'win32', 1582 "Don't support pipes for Windows") 1583 def test_write_pipe_disconnect_on_close(self): 1584 rsock, wsock = test_utils.socketpair() 1585 rsock.setblocking(False) 1586 pipeobj = io.open(wsock.detach(), 'wb', 1024) 1587 1588 proto = MyWritePipeProto(loop=self.loop) 1589 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1590 transport, p = self.loop.run_until_complete(connect) 1591 self.assertIs(p, proto) 1592 self.assertIs(transport, proto.transport) 1593 self.assertEqual('CONNECTED', proto.state) 1594 1595 transport.write(b'1') 1596 data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) 1597 self.assertEqual(b'1', data) 1598 1599 rsock.close() 1600 1601 self.loop.run_until_complete(proto.done) 1602 self.assertEqual('CLOSED', proto.state) 1603 1604 @unittest.skipUnless(sys.platform != 'win32', 1605 "Don't support pipes for Windows") 1606 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1607 # older than 10.6 (Snow Leopard) 1608 @support.requires_mac_ver(10, 6) 1609 def test_write_pty(self): 1610 master, slave = os.openpty() 1611 slave_write_obj = io.open(slave, 'wb', 0) 1612 1613 proto = MyWritePipeProto(loop=self.loop) 1614 connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) 1615 transport, p = self.loop.run_until_complete(connect) 1616 self.assertIs(p, proto) 1617 self.assertIs(transport, proto.transport) 1618 self.assertEqual('CONNECTED', proto.state) 1619 1620 transport.write(b'1') 1621 1622 data = bytearray() 1623 def reader(data): 1624 chunk = os.read(master, 1024) 1625 data += chunk 1626 return len(data) 1627 1628 test_utils.run_until(self.loop, lambda: reader(data) >= 1, 1629 timeout=10) 1630 self.assertEqual(b'1', data) 1631 1632 transport.write(b'2345') 1633 test_utils.run_until(self.loop, lambda: reader(data) >= 5, 1634 timeout=10) 1635 self.assertEqual(b'12345', data) 1636 self.assertEqual('CONNECTED', proto.state) 1637 1638 os.close(master) 1639 1640 # extra info is available 1641 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1642 1643 # close connection 1644 proto.transport.close() 1645 self.loop.run_until_complete(proto.done) 1646 self.assertEqual('CLOSED', proto.state) 1647 1648 @unittest.skipUnless(sys.platform != 'win32', 1649 "Don't support pipes for Windows") 1650 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1651 # older than 10.6 (Snow Leopard) 1652 @support.requires_mac_ver(10, 6) 1653 def test_bidirectional_pty(self): 1654 master, read_slave = os.openpty() 1655 write_slave = os.dup(read_slave) 1656 tty.setraw(read_slave) 1657 1658 slave_read_obj = io.open(read_slave, 'rb', 0) 1659 read_proto = MyReadPipeProto(loop=self.loop) 1660 read_connect = self.loop.connect_read_pipe(lambda: read_proto, 1661 slave_read_obj) 1662 read_transport, p = self.loop.run_until_complete(read_connect) 1663 self.assertIs(p, read_proto) 1664 self.assertIs(read_transport, read_proto.transport) 1665 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1666 self.assertEqual(0, read_proto.nbytes) 1667 1668 1669 slave_write_obj = io.open(write_slave, 'wb', 0) 1670 write_proto = MyWritePipeProto(loop=self.loop) 1671 write_connect = self.loop.connect_write_pipe(lambda: write_proto, 1672 slave_write_obj) 1673 write_transport, p = self.loop.run_until_complete(write_connect) 1674 self.assertIs(p, write_proto) 1675 self.assertIs(write_transport, write_proto.transport) 1676 self.assertEqual('CONNECTED', write_proto.state) 1677 1678 data = bytearray() 1679 def reader(data): 1680 chunk = os.read(master, 1024) 1681 data += chunk 1682 return len(data) 1683 1684 write_transport.write(b'1') 1685 test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) 1686 self.assertEqual(b'1', data) 1687 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1688 self.assertEqual('CONNECTED', write_proto.state) 1689 1690 os.write(master, b'a') 1691 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, 1692 timeout=10) 1693 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1694 self.assertEqual(1, read_proto.nbytes) 1695 self.assertEqual('CONNECTED', write_proto.state) 1696 1697 write_transport.write(b'2345') 1698 test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) 1699 self.assertEqual(b'12345', data) 1700 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1701 self.assertEqual('CONNECTED', write_proto.state) 1702 1703 os.write(master, b'bcde') 1704 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, 1705 timeout=10) 1706 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1707 self.assertEqual(5, read_proto.nbytes) 1708 self.assertEqual('CONNECTED', write_proto.state) 1709 1710 os.close(master) 1711 1712 read_transport.close() 1713 self.loop.run_until_complete(read_proto.done) 1714 self.assertEqual( 1715 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) 1716 1717 write_transport.close() 1718 self.loop.run_until_complete(write_proto.done) 1719 self.assertEqual('CLOSED', write_proto.state) 1720 1721 def test_prompt_cancellation(self): 1722 r, w = test_utils.socketpair() 1723 r.setblocking(False) 1724 f = self.loop.sock_recv(r, 1) 1725 ov = getattr(f, 'ov', None) 1726 if ov is not None: 1727 self.assertTrue(ov.pending) 1728 1729 @asyncio.coroutine 1730 def main(): 1731 try: 1732 self.loop.call_soon(f.cancel) 1733 yield from f 1734 except asyncio.CancelledError: 1735 res = 'cancelled' 1736 else: 1737 res = None 1738 finally: 1739 self.loop.stop() 1740 return res 1741 1742 start = time.monotonic() 1743 t = asyncio.Task(main(), loop=self.loop) 1744 self.loop.run_forever() 1745 elapsed = time.monotonic() - start 1746 1747 self.assertLess(elapsed, 0.1) 1748 self.assertEqual(t.result(), 'cancelled') 1749 self.assertRaises(asyncio.CancelledError, f.result) 1750 if ov is not None: 1751 self.assertFalse(ov.pending) 1752 self.loop._stop_serving(r) 1753 1754 r.close() 1755 w.close() 1756 1757 def test_timeout_rounding(self): 1758 def _run_once(): 1759 self.loop._run_once_counter += 1 1760 orig_run_once() 1761 1762 orig_run_once = self.loop._run_once 1763 self.loop._run_once_counter = 0 1764 self.loop._run_once = _run_once 1765 1766 @asyncio.coroutine 1767 def wait(): 1768 loop = self.loop 1769 yield from asyncio.sleep(1e-2, loop=loop) 1770 yield from asyncio.sleep(1e-4, loop=loop) 1771 yield from asyncio.sleep(1e-6, loop=loop) 1772 yield from asyncio.sleep(1e-8, loop=loop) 1773 yield from asyncio.sleep(1e-10, loop=loop) 1774 1775 self.loop.run_until_complete(wait()) 1776 # The ideal number of call is 12, but on some platforms, the selector 1777 # may sleep at little bit less than timeout depending on the resolution 1778 # of the clock used by the kernel. Tolerate a few useless calls on 1779 # these platforms. 1780 self.assertLessEqual(self.loop._run_once_counter, 20, 1781 {'clock_resolution': self.loop._clock_resolution, 1782 'selector': self.loop._selector.__class__.__name__}) 1783 1784 def test_remove_fds_after_closing(self): 1785 loop = self.create_event_loop() 1786 callback = lambda: None 1787 r, w = test_utils.socketpair() 1788 self.addCleanup(r.close) 1789 self.addCleanup(w.close) 1790 loop.add_reader(r, callback) 1791 loop.add_writer(w, callback) 1792 loop.close() 1793 self.assertFalse(loop.remove_reader(r)) 1794 self.assertFalse(loop.remove_writer(w)) 1795 1796 def test_add_fds_after_closing(self): 1797 loop = self.create_event_loop() 1798 callback = lambda: None 1799 r, w = test_utils.socketpair() 1800 self.addCleanup(r.close) 1801 self.addCleanup(w.close) 1802 loop.close() 1803 with self.assertRaises(RuntimeError): 1804 loop.add_reader(r, callback) 1805 with self.assertRaises(RuntimeError): 1806 loop.add_writer(w, callback) 1807 1808 def test_close_running_event_loop(self): 1809 @asyncio.coroutine 1810 def close_loop(loop): 1811 self.loop.close() 1812 1813 coro = close_loop(self.loop) 1814 with self.assertRaises(RuntimeError): 1815 self.loop.run_until_complete(coro) 1816 1817 def test_close(self): 1818 self.loop.close() 1819 1820 @asyncio.coroutine 1821 def test(): 1822 pass 1823 1824 func = lambda: False 1825 coro = test() 1826 self.addCleanup(coro.close) 1827 1828 # operation blocked when the loop is closed 1829 with self.assertRaises(RuntimeError): 1830 self.loop.run_forever() 1831 with self.assertRaises(RuntimeError): 1832 fut = asyncio.Future(loop=self.loop) 1833 self.loop.run_until_complete(fut) 1834 with self.assertRaises(RuntimeError): 1835 self.loop.call_soon(func) 1836 with self.assertRaises(RuntimeError): 1837 self.loop.call_soon_threadsafe(func) 1838 with self.assertRaises(RuntimeError): 1839 self.loop.call_later(1.0, func) 1840 with self.assertRaises(RuntimeError): 1841 self.loop.call_at(self.loop.time() + .0, func) 1842 with self.assertRaises(RuntimeError): 1843 self.loop.run_in_executor(None, func) 1844 with self.assertRaises(RuntimeError): 1845 self.loop.create_task(coro) 1846 with self.assertRaises(RuntimeError): 1847 self.loop.add_signal_handler(signal.SIGTERM, func) 1848 1849 1850 class SubprocessTestsMixin: 1851 1852 def check_terminated(self, returncode): 1853 if sys.platform == 'win32': 1854 self.assertIsInstance(returncode, int) 1855 # expect 1 but sometimes get 0 1856 else: 1857 self.assertEqual(-signal.SIGTERM, returncode) 1858 1859 def check_killed(self, returncode): 1860 if sys.platform == 'win32': 1861 self.assertIsInstance(returncode, int) 1862 # expect 1 but sometimes get 0 1863 else: 1864 self.assertEqual(-signal.SIGKILL, returncode) 1865 1866 def test_subprocess_exec(self): 1867 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1868 1869 connect = self.loop.subprocess_exec( 1870 functools.partial(MySubprocessProtocol, self.loop), 1871 sys.executable, prog) 1872 transp, proto = self.loop.run_until_complete(connect) 1873 self.assertIsInstance(proto, MySubprocessProtocol) 1874 self.loop.run_until_complete(proto.connected) 1875 self.assertEqual('CONNECTED', proto.state) 1876 1877 stdin = transp.get_pipe_transport(0) 1878 stdin.write(b'Python The Winner') 1879 self.loop.run_until_complete(proto.got_data[1].wait()) 1880 with test_utils.disable_logger(): 1881 transp.close() 1882 self.loop.run_until_complete(proto.completed) 1883 self.check_killed(proto.returncode) 1884 self.assertEqual(b'Python The Winner', proto.data[1]) 1885 1886 def test_subprocess_interactive(self): 1887 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1888 1889 connect = self.loop.subprocess_exec( 1890 functools.partial(MySubprocessProtocol, self.loop), 1891 sys.executable, prog) 1892 transp, proto = self.loop.run_until_complete(connect) 1893 self.assertIsInstance(proto, MySubprocessProtocol) 1894 self.loop.run_until_complete(proto.connected) 1895 self.assertEqual('CONNECTED', proto.state) 1896 1897 stdin = transp.get_pipe_transport(0) 1898 stdin.write(b'Python ') 1899 self.loop.run_until_complete(proto.got_data[1].wait()) 1900 proto.got_data[1].clear() 1901 self.assertEqual(b'Python ', proto.data[1]) 1902 1903 stdin.write(b'The Winner') 1904 self.loop.run_until_complete(proto.got_data[1].wait()) 1905 self.assertEqual(b'Python The Winner', proto.data[1]) 1906 1907 with test_utils.disable_logger(): 1908 transp.close() 1909 self.loop.run_until_complete(proto.completed) 1910 self.check_killed(proto.returncode) 1911 1912 def test_subprocess_shell(self): 1913 connect = self.loop.subprocess_shell( 1914 functools.partial(MySubprocessProtocol, self.loop), 1915 'echo Python') 1916 transp, proto = self.loop.run_until_complete(connect) 1917 self.assertIsInstance(proto, MySubprocessProtocol) 1918 self.loop.run_until_complete(proto.connected) 1919 1920 transp.get_pipe_transport(0).close() 1921 self.loop.run_until_complete(proto.completed) 1922 self.assertEqual(0, proto.returncode) 1923 self.assertTrue(all(f.done() for f in proto.disconnects.values())) 1924 self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') 1925 self.assertEqual(proto.data[2], b'') 1926 transp.close() 1927 1928 def test_subprocess_exitcode(self): 1929 connect = self.loop.subprocess_shell( 1930 functools.partial(MySubprocessProtocol, self.loop), 1931 'exit 7', stdin=None, stdout=None, stderr=None) 1932 transp, proto = self.loop.run_until_complete(connect) 1933 self.assertIsInstance(proto, MySubprocessProtocol) 1934 self.loop.run_until_complete(proto.completed) 1935 self.assertEqual(7, proto.returncode) 1936 transp.close() 1937 1938 def test_subprocess_close_after_finish(self): 1939 connect = self.loop.subprocess_shell( 1940 functools.partial(MySubprocessProtocol, self.loop), 1941 'exit 7', stdin=None, stdout=None, stderr=None) 1942 transp, proto = self.loop.run_until_complete(connect) 1943 self.assertIsInstance(proto, MySubprocessProtocol) 1944 self.assertIsNone(transp.get_pipe_transport(0)) 1945 self.assertIsNone(transp.get_pipe_transport(1)) 1946 self.assertIsNone(transp.get_pipe_transport(2)) 1947 self.loop.run_until_complete(proto.completed) 1948 self.assertEqual(7, proto.returncode) 1949 self.assertIsNone(transp.close()) 1950 1951 def test_subprocess_kill(self): 1952 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1953 1954 connect = self.loop.subprocess_exec( 1955 functools.partial(MySubprocessProtocol, self.loop), 1956 sys.executable, prog) 1957 transp, proto = self.loop.run_until_complete(connect) 1958 self.assertIsInstance(proto, MySubprocessProtocol) 1959 self.loop.run_until_complete(proto.connected) 1960 1961 transp.kill() 1962 self.loop.run_until_complete(proto.completed) 1963 self.check_killed(proto.returncode) 1964 transp.close() 1965 1966 def test_subprocess_terminate(self): 1967 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1968 1969 connect = self.loop.subprocess_exec( 1970 functools.partial(MySubprocessProtocol, self.loop), 1971 sys.executable, prog) 1972 transp, proto = self.loop.run_until_complete(connect) 1973 self.assertIsInstance(proto, MySubprocessProtocol) 1974 self.loop.run_until_complete(proto.connected) 1975 1976 transp.terminate() 1977 self.loop.run_until_complete(proto.completed) 1978 self.check_terminated(proto.returncode) 1979 transp.close() 1980 1981 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 1982 def test_subprocess_send_signal(self): 1983 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1984 1985 connect = self.loop.subprocess_exec( 1986 functools.partial(MySubprocessProtocol, self.loop), 1987 sys.executable, prog) 1988 transp, proto = self.loop.run_until_complete(connect) 1989 self.assertIsInstance(proto, MySubprocessProtocol) 1990 self.loop.run_until_complete(proto.connected) 1991 1992 transp.send_signal(signal.SIGHUP) 1993 self.loop.run_until_complete(proto.completed) 1994 self.assertEqual(-signal.SIGHUP, proto.returncode) 1995 transp.close() 1996 1997 def test_subprocess_stderr(self): 1998 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1999 2000 connect = self.loop.subprocess_exec( 2001 functools.partial(MySubprocessProtocol, self.loop), 2002 sys.executable, prog) 2003 transp, proto = self.loop.run_until_complete(connect) 2004 self.assertIsInstance(proto, MySubprocessProtocol) 2005 self.loop.run_until_complete(proto.connected) 2006 2007 stdin = transp.get_pipe_transport(0) 2008 stdin.write(b'test') 2009 2010 self.loop.run_until_complete(proto.completed) 2011 2012 transp.close() 2013 self.assertEqual(b'OUT:test', proto.data[1]) 2014 self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) 2015 self.assertEqual(0, proto.returncode) 2016 2017 def test_subprocess_stderr_redirect_to_stdout(self): 2018 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 2019 2020 connect = self.loop.subprocess_exec( 2021 functools.partial(MySubprocessProtocol, self.loop), 2022 sys.executable, prog, stderr=subprocess.STDOUT) 2023 transp, proto = self.loop.run_until_complete(connect) 2024 self.assertIsInstance(proto, MySubprocessProtocol) 2025 self.loop.run_until_complete(proto.connected) 2026 2027 stdin = transp.get_pipe_transport(0) 2028 self.assertIsNotNone(transp.get_pipe_transport(1)) 2029 self.assertIsNone(transp.get_pipe_transport(2)) 2030 2031 stdin.write(b'test') 2032 self.loop.run_until_complete(proto.completed) 2033 self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), 2034 proto.data[1]) 2035 self.assertEqual(b'', proto.data[2]) 2036 2037 transp.close() 2038 self.assertEqual(0, proto.returncode) 2039 2040 def test_subprocess_close_client_stream(self): 2041 prog = os.path.join(os.path.dirname(__file__), 'echo3.py') 2042 2043 connect = self.loop.subprocess_exec( 2044 functools.partial(MySubprocessProtocol, self.loop), 2045 sys.executable, prog) 2046 transp, proto = self.loop.run_until_complete(connect) 2047 self.assertIsInstance(proto, MySubprocessProtocol) 2048 self.loop.run_until_complete(proto.connected) 2049 2050 stdin = transp.get_pipe_transport(0) 2051 stdout = transp.get_pipe_transport(1) 2052 stdin.write(b'test') 2053 self.loop.run_until_complete(proto.got_data[1].wait()) 2054 self.assertEqual(b'OUT:test', proto.data[1]) 2055 2056 stdout.close() 2057 self.loop.run_until_complete(proto.disconnects[1]) 2058 stdin.write(b'xxx') 2059 self.loop.run_until_complete(proto.got_data[2].wait()) 2060 if sys.platform != 'win32': 2061 self.assertEqual(b'ERR:BrokenPipeError', proto.data[2]) 2062 else: 2063 # After closing the read-end of a pipe, writing to the 2064 # write-end using os.write() fails with errno==EINVAL and 2065 # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using 2066 # WriteFile() we get ERROR_BROKEN_PIPE as expected.) 2067 self.assertEqual(b'ERR:OSError', proto.data[2]) 2068 with test_utils.disable_logger(): 2069 transp.close() 2070 self.loop.run_until_complete(proto.completed) 2071 self.check_killed(proto.returncode) 2072 2073 def test_subprocess_wait_no_same_group(self): 2074 # start the new process in a new session 2075 connect = self.loop.subprocess_shell( 2076 functools.partial(MySubprocessProtocol, self.loop), 2077 'exit 7', stdin=None, stdout=None, stderr=None, 2078 start_new_session=True) 2079 _, proto = yield self.loop.run_until_complete(connect) 2080 self.assertIsInstance(proto, MySubprocessProtocol) 2081 self.loop.run_until_complete(proto.completed) 2082 self.assertEqual(7, proto.returncode) 2083 2084 def test_subprocess_exec_invalid_args(self): 2085 @asyncio.coroutine 2086 def connect(**kwds): 2087 yield from self.loop.subprocess_exec( 2088 asyncio.SubprocessProtocol, 2089 'pwd', **kwds) 2090 2091 with self.assertRaises(ValueError): 2092 self.loop.run_until_complete(connect(universal_newlines=True)) 2093 with self.assertRaises(ValueError): 2094 self.loop.run_until_complete(connect(bufsize=4096)) 2095 with self.assertRaises(ValueError): 2096 self.loop.run_until_complete(connect(shell=True)) 2097 2098 def test_subprocess_shell_invalid_args(self): 2099 @asyncio.coroutine 2100 def connect(cmd=None, **kwds): 2101 if not cmd: 2102 cmd = 'pwd' 2103 yield from self.loop.subprocess_shell( 2104 asyncio.SubprocessProtocol, 2105 cmd, **kwds) 2106 2107 with self.assertRaises(ValueError): 2108 self.loop.run_until_complete(connect(['ls', '-l'])) 2109 with self.assertRaises(ValueError): 2110 self.loop.run_until_complete(connect(universal_newlines=True)) 2111 with self.assertRaises(ValueError): 2112 self.loop.run_until_complete(connect(bufsize=4096)) 2113 with self.assertRaises(ValueError): 2114 self.loop.run_until_complete(connect(shell=False)) 2115 2116 2117 if sys.platform == 'win32': 2118 2119 class SelectEventLoopTests(EventLoopTestsMixin, test_utils.TestCase): 2120 2121 def create_event_loop(self): 2122 return asyncio.SelectorEventLoop() 2123 2124 class ProactorEventLoopTests(EventLoopTestsMixin, 2125 SubprocessTestsMixin, 2126 test_utils.TestCase): 2127 2128 def create_event_loop(self): 2129 return asyncio.ProactorEventLoop() 2130 2131 if not sslproto._is_sslproto_available(): 2132 def test_create_ssl_connection(self): 2133 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)") 2134 2135 def test_create_server_ssl(self): 2136 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)") 2137 2138 def test_create_server_ssl_verify_failed(self): 2139 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)") 2140 2141 def test_create_server_ssl_match_failed(self): 2142 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)") 2143 2144 def test_create_server_ssl_verified(self): 2145 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)") 2146 2147 def test_legacy_create_ssl_connection(self): 2148 raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL") 2149 2150 def test_legacy_create_server_ssl(self): 2151 raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL") 2152 2153 def test_legacy_create_server_ssl_verify_failed(self): 2154 raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL") 2155 2156 def test_legacy_create_server_ssl_match_failed(self): 2157 raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL") 2158 2159 def test_legacy_create_server_ssl_verified(self): 2160 raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL") 2161 2162 def test_reader_callback(self): 2163 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2164 2165 def test_reader_callback_cancel(self): 2166 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2167 2168 def test_writer_callback(self): 2169 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2170 2171 def test_writer_callback_cancel(self): 2172 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2173 2174 def test_create_datagram_endpoint(self): 2175 raise unittest.SkipTest( 2176 "IocpEventLoop does not have create_datagram_endpoint()") 2177 2178 def test_remove_fds_after_closing(self): 2179 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2180 else: 2181 from asyncio import selectors 2182 2183 class UnixEventLoopTestsMixin(EventLoopTestsMixin): 2184 def setUp(self): 2185 super().setUp() 2186 watcher = asyncio.SafeChildWatcher() 2187 watcher.attach_loop(self.loop) 2188 asyncio.set_child_watcher(watcher) 2189 2190 def tearDown(self): 2191 asyncio.set_child_watcher(None) 2192 super().tearDown() 2193 2194 def test_get_event_loop_new_process(self): 2195 async def main(): 2196 pool = concurrent.futures.ProcessPoolExecutor() 2197 return await self.loop.run_in_executor( 2198 pool, _test_get_event_loop_new_process__sub_proc) 2199 2200 self.unpatch_get_running_loop() 2201 2202 self.assertEqual( 2203 self.loop.run_until_complete(main()), 2204 'hello') 2205 2206 if hasattr(selectors, 'KqueueSelector'): 2207 class KqueueEventLoopTests(UnixEventLoopTestsMixin, 2208 SubprocessTestsMixin, 2209 test_utils.TestCase): 2210 2211 def create_event_loop(self): 2212 return asyncio.SelectorEventLoop( 2213 selectors.KqueueSelector()) 2214 2215 # kqueue doesn't support character devices (PTY) on Mac OS X older 2216 # than 10.9 (Maverick) 2217 @support.requires_mac_ver(10, 9) 2218 # Issue #20667: KqueueEventLoopTests.test_read_pty_output() 2219 # hangs on OpenBSD 5.5 2220 @unittest.skipIf(sys.platform.startswith('openbsd'), 2221 'test hangs on OpenBSD') 2222 def test_read_pty_output(self): 2223 super().test_read_pty_output() 2224 2225 # kqueue doesn't support character devices (PTY) on Mac OS X older 2226 # than 10.9 (Maverick) 2227 @support.requires_mac_ver(10, 9) 2228 def test_write_pty(self): 2229 super().test_write_pty() 2230 2231 if hasattr(selectors, 'EpollSelector'): 2232 class EPollEventLoopTests(UnixEventLoopTestsMixin, 2233 SubprocessTestsMixin, 2234 test_utils.TestCase): 2235 2236 def create_event_loop(self): 2237 return asyncio.SelectorEventLoop(selectors.EpollSelector()) 2238 2239 if hasattr(selectors, 'PollSelector'): 2240 class PollEventLoopTests(UnixEventLoopTestsMixin, 2241 SubprocessTestsMixin, 2242 test_utils.TestCase): 2243 2244 def create_event_loop(self): 2245 return asyncio.SelectorEventLoop(selectors.PollSelector()) 2246 2247 # Should always exist. 2248 class SelectEventLoopTests(UnixEventLoopTestsMixin, 2249 SubprocessTestsMixin, 2250 test_utils.TestCase): 2251 2252 def create_event_loop(self): 2253 return asyncio.SelectorEventLoop(selectors.SelectSelector()) 2254 2255 2256 def noop(*args, **kwargs): 2257 pass 2258 2259 2260 class HandleTests(test_utils.TestCase): 2261 2262 def setUp(self): 2263 super().setUp() 2264 self.loop = mock.Mock() 2265 self.loop.get_debug.return_value = True 2266 2267 def test_handle(self): 2268 def callback(*args): 2269 return args 2270 2271 args = () 2272 h = asyncio.Handle(callback, args, self.loop) 2273 self.assertIs(h._callback, callback) 2274 self.assertIs(h._args, args) 2275 self.assertFalse(h._cancelled) 2276 2277 h.cancel() 2278 self.assertTrue(h._cancelled) 2279 2280 def test_callback_with_exception(self): 2281 def callback(): 2282 raise ValueError() 2283 2284 self.loop = mock.Mock() 2285 self.loop.call_exception_handler = mock.Mock() 2286 2287 h = asyncio.Handle(callback, (), self.loop) 2288 h._run() 2289 2290 self.loop.call_exception_handler.assert_called_with({ 2291 'message': test_utils.MockPattern('Exception in callback.*'), 2292 'exception': mock.ANY, 2293 'handle': h, 2294 'source_traceback': h._source_traceback, 2295 }) 2296 2297 def test_handle_weakref(self): 2298 wd = weakref.WeakValueDictionary() 2299 h = asyncio.Handle(lambda: None, (), self.loop) 2300 wd['h'] = h # Would fail without __weakref__ slot. 2301 2302 def test_handle_repr(self): 2303 self.loop.get_debug.return_value = False 2304 2305 # simple function 2306 h = asyncio.Handle(noop, (1, 2), self.loop) 2307 filename, lineno = test_utils.get_function_source(noop) 2308 self.assertEqual(repr(h), 2309 '<Handle noop(1, 2) at %s:%s>' 2310 % (filename, lineno)) 2311 2312 # cancelled handle 2313 h.cancel() 2314 self.assertEqual(repr(h), 2315 '<Handle cancelled>') 2316 2317 # decorated function 2318 cb = asyncio.coroutine(noop) 2319 h = asyncio.Handle(cb, (), self.loop) 2320 self.assertEqual(repr(h), 2321 '<Handle noop() at %s:%s>' 2322 % (filename, lineno)) 2323 2324 # partial function 2325 cb = functools.partial(noop, 1, 2) 2326 h = asyncio.Handle(cb, (3,), self.loop) 2327 regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$' 2328 % (re.escape(filename), lineno)) 2329 self.assertRegex(repr(h), regex) 2330 2331 # partial function with keyword args 2332 cb = functools.partial(noop, x=1) 2333 h = asyncio.Handle(cb, (2, 3), self.loop) 2334 regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$' 2335 % (re.escape(filename), lineno)) 2336 self.assertRegex(repr(h), regex) 2337 2338 # partial method 2339 if sys.version_info >= (3, 4): 2340 method = HandleTests.test_handle_repr 2341 cb = functools.partialmethod(method) 2342 filename, lineno = test_utils.get_function_source(method) 2343 h = asyncio.Handle(cb, (), self.loop) 2344 2345 cb_regex = r'<function HandleTests.test_handle_repr .*>' 2346 cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex) 2347 regex = (r'^<Handle %s at %s:%s>$' 2348 % (cb_regex, re.escape(filename), lineno)) 2349 self.assertRegex(repr(h), regex) 2350 2351 def test_handle_repr_debug(self): 2352 self.loop.get_debug.return_value = True 2353 2354 # simple function 2355 create_filename = __file__ 2356 create_lineno = sys._getframe().f_lineno + 1 2357 h = asyncio.Handle(noop, (1, 2), self.loop) 2358 filename, lineno = test_utils.get_function_source(noop) 2359 self.assertEqual(repr(h), 2360 '<Handle noop(1, 2) at %s:%s created at %s:%s>' 2361 % (filename, lineno, create_filename, create_lineno)) 2362 2363 # cancelled handle 2364 h.cancel() 2365 self.assertEqual( 2366 repr(h), 2367 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2368 % (filename, lineno, create_filename, create_lineno)) 2369 2370 # double cancellation won't overwrite _repr 2371 h.cancel() 2372 self.assertEqual( 2373 repr(h), 2374 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2375 % (filename, lineno, create_filename, create_lineno)) 2376 2377 def test_handle_source_traceback(self): 2378 loop = asyncio.get_event_loop_policy().new_event_loop() 2379 loop.set_debug(True) 2380 self.set_event_loop(loop) 2381 2382 def check_source_traceback(h): 2383 lineno = sys._getframe(1).f_lineno - 1 2384 self.assertIsInstance(h._source_traceback, list) 2385 self.assertEqual(h._source_traceback[-1][:3], 2386 (__file__, 2387 lineno, 2388 'test_handle_source_traceback')) 2389 2390 # call_soon 2391 h = loop.call_soon(noop) 2392 check_source_traceback(h) 2393 2394 # call_soon_threadsafe 2395 h = loop.call_soon_threadsafe(noop) 2396 check_source_traceback(h) 2397 2398 # call_later 2399 h = loop.call_later(0, noop) 2400 check_source_traceback(h) 2401 2402 # call_at 2403 h = loop.call_later(0, noop) 2404 check_source_traceback(h) 2405 2406 @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'), 2407 'No collections.abc.Coroutine') 2408 def test_coroutine_like_object_debug_formatting(self): 2409 # Test that asyncio can format coroutines that are instances of 2410 # collections.abc.Coroutine, but lack cr_core or gi_code attributes 2411 # (such as ones compiled with Cython). 2412 2413 class Coro: 2414 def send(self, v): 2415 pass 2416 2417 def throw(self, *exc): 2418 pass 2419 2420 def close(self): 2421 pass 2422 2423 def __await__(self): 2424 pass 2425 2426 coro = Coro() 2427 coro.__name__ = 'AAA' 2428 self.assertTrue(asyncio.iscoroutine(coro)) 2429 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2430 2431 coro.__qualname__ = 'BBB' 2432 self.assertEqual(coroutines._format_coroutine(coro), 'BBB()') 2433 2434 coro.cr_running = True 2435 self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running') 2436 2437 coro = Coro() 2438 # Some coroutines might not have '__name__', such as 2439 # built-in async_gen.asend(). 2440 self.assertEqual(coroutines._format_coroutine(coro), 'Coro()') 2441 2442 2443 class TimerTests(unittest.TestCase): 2444 2445 def setUp(self): 2446 super().setUp() 2447 self.loop = mock.Mock() 2448 2449 def test_hash(self): 2450 when = time.monotonic() 2451 h = asyncio.TimerHandle(when, lambda: False, (), 2452 mock.Mock()) 2453 self.assertEqual(hash(h), hash(when)) 2454 2455 def test_timer(self): 2456 def callback(*args): 2457 return args 2458 2459 args = (1, 2, 3) 2460 when = time.monotonic() 2461 h = asyncio.TimerHandle(when, callback, args, mock.Mock()) 2462 self.assertIs(h._callback, callback) 2463 self.assertIs(h._args, args) 2464 self.assertFalse(h._cancelled) 2465 2466 # cancel 2467 h.cancel() 2468 self.assertTrue(h._cancelled) 2469 self.assertIsNone(h._callback) 2470 self.assertIsNone(h._args) 2471 2472 # when cannot be None 2473 self.assertRaises(AssertionError, 2474 asyncio.TimerHandle, None, callback, args, 2475 self.loop) 2476 2477 def test_timer_repr(self): 2478 self.loop.get_debug.return_value = False 2479 2480 # simple function 2481 h = asyncio.TimerHandle(123, noop, (), self.loop) 2482 src = test_utils.get_function_source(noop) 2483 self.assertEqual(repr(h), 2484 '<TimerHandle when=123 noop() at %s:%s>' % src) 2485 2486 # cancelled handle 2487 h.cancel() 2488 self.assertEqual(repr(h), 2489 '<TimerHandle cancelled when=123>') 2490 2491 def test_timer_repr_debug(self): 2492 self.loop.get_debug.return_value = True 2493 2494 # simple function 2495 create_filename = __file__ 2496 create_lineno = sys._getframe().f_lineno + 1 2497 h = asyncio.TimerHandle(123, noop, (), self.loop) 2498 filename, lineno = test_utils.get_function_source(noop) 2499 self.assertEqual(repr(h), 2500 '<TimerHandle when=123 noop() ' 2501 'at %s:%s created at %s:%s>' 2502 % (filename, lineno, create_filename, create_lineno)) 2503 2504 # cancelled handle 2505 h.cancel() 2506 self.assertEqual(repr(h), 2507 '<TimerHandle cancelled when=123 noop() ' 2508 'at %s:%s created at %s:%s>' 2509 % (filename, lineno, create_filename, create_lineno)) 2510 2511 2512 def test_timer_comparison(self): 2513 def callback(*args): 2514 return args 2515 2516 when = time.monotonic() 2517 2518 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2519 h2 = asyncio.TimerHandle(when, callback, (), self.loop) 2520 # TODO: Use assertLess etc. 2521 self.assertFalse(h1 < h2) 2522 self.assertFalse(h2 < h1) 2523 self.assertTrue(h1 <= h2) 2524 self.assertTrue(h2 <= h1) 2525 self.assertFalse(h1 > h2) 2526 self.assertFalse(h2 > h1) 2527 self.assertTrue(h1 >= h2) 2528 self.assertTrue(h2 >= h1) 2529 self.assertTrue(h1 == h2) 2530 self.assertFalse(h1 != h2) 2531 2532 h2.cancel() 2533 self.assertFalse(h1 == h2) 2534 2535 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2536 h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop) 2537 self.assertTrue(h1 < h2) 2538 self.assertFalse(h2 < h1) 2539 self.assertTrue(h1 <= h2) 2540 self.assertFalse(h2 <= h1) 2541 self.assertFalse(h1 > h2) 2542 self.assertTrue(h2 > h1) 2543 self.assertFalse(h1 >= h2) 2544 self.assertTrue(h2 >= h1) 2545 self.assertFalse(h1 == h2) 2546 self.assertTrue(h1 != h2) 2547 2548 h3 = asyncio.Handle(callback, (), self.loop) 2549 self.assertIs(NotImplemented, h1.__eq__(h3)) 2550 self.assertIs(NotImplemented, h1.__ne__(h3)) 2551 2552 2553 class AbstractEventLoopTests(unittest.TestCase): 2554 2555 def test_not_implemented(self): 2556 f = mock.Mock() 2557 loop = asyncio.AbstractEventLoop() 2558 self.assertRaises( 2559 NotImplementedError, loop.run_forever) 2560 self.assertRaises( 2561 NotImplementedError, loop.run_until_complete, None) 2562 self.assertRaises( 2563 NotImplementedError, loop.stop) 2564 self.assertRaises( 2565 NotImplementedError, loop.is_running) 2566 self.assertRaises( 2567 NotImplementedError, loop.is_closed) 2568 self.assertRaises( 2569 NotImplementedError, loop.close) 2570 self.assertRaises( 2571 NotImplementedError, loop.create_task, None) 2572 self.assertRaises( 2573 NotImplementedError, loop.call_later, None, None) 2574 self.assertRaises( 2575 NotImplementedError, loop.call_at, f, f) 2576 self.assertRaises( 2577 NotImplementedError, loop.call_soon, None) 2578 self.assertRaises( 2579 NotImplementedError, loop.time) 2580 self.assertRaises( 2581 NotImplementedError, loop.call_soon_threadsafe, None) 2582 self.assertRaises( 2583 NotImplementedError, loop.run_in_executor, f, f) 2584 self.assertRaises( 2585 NotImplementedError, loop.set_default_executor, f) 2586 self.assertRaises( 2587 NotImplementedError, loop.getaddrinfo, 'localhost', 8080) 2588 self.assertRaises( 2589 NotImplementedError, loop.getnameinfo, ('localhost', 8080)) 2590 self.assertRaises( 2591 NotImplementedError, loop.create_connection, f) 2592 self.assertRaises( 2593 NotImplementedError, loop.create_server, f) 2594 self.assertRaises( 2595 NotImplementedError, loop.create_datagram_endpoint, f) 2596 self.assertRaises( 2597 NotImplementedError, loop.add_reader, 1, f) 2598 self.assertRaises( 2599 NotImplementedError, loop.remove_reader, 1) 2600 self.assertRaises( 2601 NotImplementedError, loop.add_writer, 1, f) 2602 self.assertRaises( 2603 NotImplementedError, loop.remove_writer, 1) 2604 self.assertRaises( 2605 NotImplementedError, loop.sock_recv, f, 10) 2606 self.assertRaises( 2607 NotImplementedError, loop.sock_sendall, f, 10) 2608 self.assertRaises( 2609 NotImplementedError, loop.sock_connect, f, f) 2610 self.assertRaises( 2611 NotImplementedError, loop.sock_accept, f) 2612 self.assertRaises( 2613 NotImplementedError, loop.add_signal_handler, 1, f) 2614 self.assertRaises( 2615 NotImplementedError, loop.remove_signal_handler, 1) 2616 self.assertRaises( 2617 NotImplementedError, loop.remove_signal_handler, 1) 2618 self.assertRaises( 2619 NotImplementedError, loop.connect_read_pipe, f, 2620 mock.sentinel.pipe) 2621 self.assertRaises( 2622 NotImplementedError, loop.connect_write_pipe, f, 2623 mock.sentinel.pipe) 2624 self.assertRaises( 2625 NotImplementedError, loop.subprocess_shell, f, 2626 mock.sentinel) 2627 self.assertRaises( 2628 NotImplementedError, loop.subprocess_exec, f) 2629 self.assertRaises( 2630 NotImplementedError, loop.set_exception_handler, f) 2631 self.assertRaises( 2632 NotImplementedError, loop.default_exception_handler, f) 2633 self.assertRaises( 2634 NotImplementedError, loop.call_exception_handler, f) 2635 self.assertRaises( 2636 NotImplementedError, loop.get_debug) 2637 self.assertRaises( 2638 NotImplementedError, loop.set_debug, f) 2639 2640 2641 class ProtocolsAbsTests(unittest.TestCase): 2642 2643 def test_empty(self): 2644 f = mock.Mock() 2645 p = asyncio.Protocol() 2646 self.assertIsNone(p.connection_made(f)) 2647 self.assertIsNone(p.connection_lost(f)) 2648 self.assertIsNone(p.data_received(f)) 2649 self.assertIsNone(p.eof_received()) 2650 2651 dp = asyncio.DatagramProtocol() 2652 self.assertIsNone(dp.connection_made(f)) 2653 self.assertIsNone(dp.connection_lost(f)) 2654 self.assertIsNone(dp.error_received(f)) 2655 self.assertIsNone(dp.datagram_received(f, f)) 2656 2657 sp = asyncio.SubprocessProtocol() 2658 self.assertIsNone(sp.connection_made(f)) 2659 self.assertIsNone(sp.connection_lost(f)) 2660 self.assertIsNone(sp.pipe_data_received(1, f)) 2661 self.assertIsNone(sp.pipe_connection_lost(1, f)) 2662 self.assertIsNone(sp.process_exited()) 2663 2664 2665 class PolicyTests(unittest.TestCase): 2666 2667 def test_event_loop_policy(self): 2668 policy = asyncio.AbstractEventLoopPolicy() 2669 self.assertRaises(NotImplementedError, policy.get_event_loop) 2670 self.assertRaises(NotImplementedError, policy.set_event_loop, object()) 2671 self.assertRaises(NotImplementedError, policy.new_event_loop) 2672 self.assertRaises(NotImplementedError, policy.get_child_watcher) 2673 self.assertRaises(NotImplementedError, policy.set_child_watcher, 2674 object()) 2675 2676 def test_get_event_loop(self): 2677 policy = asyncio.DefaultEventLoopPolicy() 2678 self.assertIsNone(policy._local._loop) 2679 2680 loop = policy.get_event_loop() 2681 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2682 2683 self.assertIs(policy._local._loop, loop) 2684 self.assertIs(loop, policy.get_event_loop()) 2685 loop.close() 2686 2687 def test_get_event_loop_calls_set_event_loop(self): 2688 policy = asyncio.DefaultEventLoopPolicy() 2689 2690 with mock.patch.object( 2691 policy, "set_event_loop", 2692 wraps=policy.set_event_loop) as m_set_event_loop: 2693 2694 loop = policy.get_event_loop() 2695 2696 # policy._local._loop must be set through .set_event_loop() 2697 # (the unix DefaultEventLoopPolicy needs this call to attach 2698 # the child watcher correctly) 2699 m_set_event_loop.assert_called_with(loop) 2700 2701 loop.close() 2702 2703 def test_get_event_loop_after_set_none(self): 2704 policy = asyncio.DefaultEventLoopPolicy() 2705 policy.set_event_loop(None) 2706 self.assertRaises(RuntimeError, policy.get_event_loop) 2707 2708 @mock.patch('asyncio.events.threading.current_thread') 2709 def test_get_event_loop_thread(self, m_current_thread): 2710 2711 def f(): 2712 policy = asyncio.DefaultEventLoopPolicy() 2713 self.assertRaises(RuntimeError, policy.get_event_loop) 2714 2715 th = threading.Thread(target=f) 2716 th.start() 2717 th.join() 2718 2719 def test_new_event_loop(self): 2720 policy = asyncio.DefaultEventLoopPolicy() 2721 2722 loop = policy.new_event_loop() 2723 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2724 loop.close() 2725 2726 def test_set_event_loop(self): 2727 policy = asyncio.DefaultEventLoopPolicy() 2728 old_loop = policy.get_event_loop() 2729 2730 self.assertRaises(AssertionError, policy.set_event_loop, object()) 2731 2732 loop = policy.new_event_loop() 2733 policy.set_event_loop(loop) 2734 self.assertIs(loop, policy.get_event_loop()) 2735 self.assertIsNot(old_loop, policy.get_event_loop()) 2736 loop.close() 2737 old_loop.close() 2738 2739 def test_get_event_loop_policy(self): 2740 policy = asyncio.get_event_loop_policy() 2741 self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy) 2742 self.assertIs(policy, asyncio.get_event_loop_policy()) 2743 2744 def test_set_event_loop_policy(self): 2745 self.assertRaises( 2746 AssertionError, asyncio.set_event_loop_policy, object()) 2747 2748 old_policy = asyncio.get_event_loop_policy() 2749 2750 policy = asyncio.DefaultEventLoopPolicy() 2751 asyncio.set_event_loop_policy(policy) 2752 self.assertIs(policy, asyncio.get_event_loop_policy()) 2753 self.assertIsNot(policy, old_policy) 2754 2755 def test_get_event_loop_returns_running_loop(self): 2756 class Policy(asyncio.DefaultEventLoopPolicy): 2757 def get_event_loop(self): 2758 raise NotImplementedError 2759 2760 loop = None 2761 2762 old_policy = asyncio.get_event_loop_policy() 2763 try: 2764 asyncio.set_event_loop_policy(Policy()) 2765 loop = asyncio.new_event_loop() 2766 self.assertIs(asyncio._get_running_loop(), None) 2767 2768 async def func(): 2769 self.assertIs(asyncio.get_event_loop(), loop) 2770 self.assertIs(asyncio._get_running_loop(), loop) 2771 2772 loop.run_until_complete(func()) 2773 finally: 2774 asyncio.set_event_loop_policy(old_policy) 2775 if loop is not None: 2776 loop.close() 2777 2778 self.assertIs(asyncio._get_running_loop(), None) 2779 2780 2781 if __name__ == '__main__': 2782 unittest.main() 2783