Home | History | Annotate | Download | only in test_asyncio
      1 """Tests for events.py."""
      2 
      3 import collections.abc
      4 import concurrent.futures
      5 import functools
      6 import gc
      7 import io
      8 import os
      9 import platform
     10 import re
     11 import signal
     12 import socket
     13 try:
     14     import ssl
     15 except ImportError:
     16     ssl = None
     17 import subprocess
     18 import sys
     19 import threading
     20 import time
     21 import errno
     22 import unittest
     23 from unittest import mock
     24 import weakref
     25 
     26 if sys.platform != 'win32':
     27     import tty
     28 
     29 import asyncio
     30 from asyncio import coroutines
     31 from asyncio import proactor_events
     32 from asyncio import selector_events
     33 from asyncio import sslproto
     34 from asyncio import test_utils
     35 try:
     36     from test import support
     37 except ImportError:
     38     from asyncio import test_support as support
     39 
     40 
     41 def data_file(filename):
     42     if hasattr(support, 'TEST_HOME_DIR'):
     43         fullname = os.path.join(support.TEST_HOME_DIR, filename)
     44         if os.path.isfile(fullname):
     45             return fullname
     46     fullname = os.path.join(os.path.dirname(__file__), filename)
     47     if os.path.isfile(fullname):
     48         return fullname
     49     raise FileNotFoundError(filename)
     50 
     51 
     52 def osx_tiger():
     53     """Return True if the platform is Mac OS 10.4 or older."""
     54     if sys.platform != 'darwin':
     55         return False
     56     version = platform.mac_ver()[0]
     57     version = tuple(map(int, version.split('.')))
     58     return version < (10, 5)
     59 
     60 
     61 def _test_get_event_loop_new_process__sub_proc():
     62     async def doit():
     63         return 'hello'
     64 
     65     loop = asyncio.new_event_loop()
     66     asyncio.set_event_loop(loop)
     67     return loop.run_until_complete(doit())
     68 
     69 
     70 ONLYCERT = data_file('ssl_cert.pem')
     71 ONLYKEY = data_file('ssl_key.pem')
     72 SIGNED_CERTFILE = data_file('keycert3.pem')
     73 SIGNING_CA = data_file('pycacert.pem')
     74 PEERCERT = {'serialNumber': 'B09264B1F2DA21D1',
     75             'version': 1,
     76             'subject': ((('countryName', 'XY'),),
     77                     (('localityName', 'Castle Anthrax'),),
     78                     (('organizationName', 'Python Software Foundation'),),
     79                     (('commonName', 'localhost'),)),
     80             'issuer': ((('countryName', 'XY'),),
     81                     (('organizationName', 'Python Software Foundation CA'),),
     82                     (('commonName', 'our-ca-server'),)),
     83             'notAfter': 'Nov 13 19:47:07 2022 GMT',
     84             'notBefore': 'Jan  4 19:47:07 2013 GMT'}
     85 
     86 
     87 class MyBaseProto(asyncio.Protocol):
     88     connected = None
     89     done = None
     90 
     91     def __init__(self, loop=None):
     92         self.transport = None
     93         self.state = 'INITIAL'
     94         self.nbytes = 0
     95         if loop is not None:
     96             self.connected = asyncio.Future(loop=loop)
     97             self.done = asyncio.Future(loop=loop)
     98 
     99     def connection_made(self, transport):
    100         self.transport = transport
    101         assert self.state == 'INITIAL', self.state
    102         self.state = 'CONNECTED'
    103         if self.connected:
    104             self.connected.set_result(None)
    105 
    106     def data_received(self, data):
    107         assert self.state == 'CONNECTED', self.state
    108         self.nbytes += len(data)
    109 
    110     def eof_received(self):
    111         assert self.state == 'CONNECTED', self.state
    112         self.state = 'EOF'
    113 
    114     def connection_lost(self, exc):
    115         assert self.state in ('CONNECTED', 'EOF'), self.state
    116         self.state = 'CLOSED'
    117         if self.done:
    118             self.done.set_result(None)
    119 
    120 
    121 class MyProto(MyBaseProto):
    122     def connection_made(self, transport):
    123         super().connection_made(transport)
    124         transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
    125 
    126 
    127 class MyDatagramProto(asyncio.DatagramProtocol):
    128     done = None
    129 
    130     def __init__(self, loop=None):
    131         self.state = 'INITIAL'
    132         self.nbytes = 0
    133         if loop is not None:
    134             self.done = asyncio.Future(loop=loop)
    135 
    136     def connection_made(self, transport):
    137         self.transport = transport
    138         assert self.state == 'INITIAL', self.state
    139         self.state = 'INITIALIZED'
    140 
    141     def datagram_received(self, data, addr):
    142         assert self.state == 'INITIALIZED', self.state
    143         self.nbytes += len(data)
    144 
    145     def error_received(self, exc):
    146         assert self.state == 'INITIALIZED', self.state
    147 
    148     def connection_lost(self, exc):
    149         assert self.state == 'INITIALIZED', self.state
    150         self.state = 'CLOSED'
    151         if self.done:
    152             self.done.set_result(None)
    153 
    154 
    155 class MyReadPipeProto(asyncio.Protocol):
    156     done = None
    157 
    158     def __init__(self, loop=None):
    159         self.state = ['INITIAL']
    160         self.nbytes = 0
    161         self.transport = None
    162         if loop is not None:
    163             self.done = asyncio.Future(loop=loop)
    164 
    165     def connection_made(self, transport):
    166         self.transport = transport
    167         assert self.state == ['INITIAL'], self.state
    168         self.state.append('CONNECTED')
    169 
    170     def data_received(self, data):
    171         assert self.state == ['INITIAL', 'CONNECTED'], self.state
    172         self.nbytes += len(data)
    173 
    174     def eof_received(self):
    175         assert self.state == ['INITIAL', 'CONNECTED'], self.state
    176         self.state.append('EOF')
    177 
    178     def connection_lost(self, exc):
    179         if 'EOF' not in self.state:
    180             self.state.append('EOF')  # It is okay if EOF is missed.
    181         assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state
    182         self.state.append('CLOSED')
    183         if self.done:
    184             self.done.set_result(None)
    185 
    186 
    187 class MyWritePipeProto(asyncio.BaseProtocol):
    188     done = None
    189 
    190     def __init__(self, loop=None):
    191         self.state = 'INITIAL'
    192         self.transport = None
    193         if loop is not None:
    194             self.done = asyncio.Future(loop=loop)
    195 
    196     def connection_made(self, transport):
    197         self.transport = transport
    198         assert self.state == 'INITIAL', self.state
    199         self.state = 'CONNECTED'
    200 
    201     def connection_lost(self, exc):
    202         assert self.state == 'CONNECTED', self.state
    203         self.state = 'CLOSED'
    204         if self.done:
    205             self.done.set_result(None)
    206 
    207 
    208 class MySubprocessProtocol(asyncio.SubprocessProtocol):
    209 
    210     def __init__(self, loop):
    211         self.state = 'INITIAL'
    212         self.transport = None
    213         self.connected = asyncio.Future(loop=loop)
    214         self.completed = asyncio.Future(loop=loop)
    215         self.disconnects = {fd: asyncio.Future(loop=loop) for fd in range(3)}
    216         self.data = {1: b'', 2: b''}
    217         self.returncode = None
    218         self.got_data = {1: asyncio.Event(loop=loop),
    219                          2: asyncio.Event(loop=loop)}
    220 
    221     def connection_made(self, transport):
    222         self.transport = transport
    223         assert self.state == 'INITIAL', self.state
    224         self.state = 'CONNECTED'
    225         self.connected.set_result(None)
    226 
    227     def connection_lost(self, exc):
    228         assert self.state == 'CONNECTED', self.state
    229         self.state = 'CLOSED'
    230         self.completed.set_result(None)
    231 
    232     def pipe_data_received(self, fd, data):
    233         assert self.state == 'CONNECTED', self.state
    234         self.data[fd] += data
    235         self.got_data[fd].set()
    236 
    237     def pipe_connection_lost(self, fd, exc):
    238         assert self.state == 'CONNECTED', self.state
    239         if exc:
    240             self.disconnects[fd].set_exception(exc)
    241         else:
    242             self.disconnects[fd].set_result(exc)
    243 
    244     def process_exited(self):
    245         assert self.state == 'CONNECTED', self.state
    246         self.returncode = self.transport.get_returncode()
    247 
    248 
    249 class EventLoopTestsMixin:
    250 
    251     def setUp(self):
    252         super().setUp()
    253         self.loop = self.create_event_loop()
    254         self.set_event_loop(self.loop)
    255 
    256     def tearDown(self):
    257         # just in case if we have transport close callbacks
    258         if not self.loop.is_closed():
    259             test_utils.run_briefly(self.loop)
    260 
    261         self.loop.close()
    262         gc.collect()
    263         super().tearDown()
    264 
    265     def test_run_until_complete_nesting(self):
    266         @asyncio.coroutine
    267         def coro1():
    268             yield
    269 
    270         @asyncio.coroutine
    271         def coro2():
    272             self.assertTrue(self.loop.is_running())
    273             self.loop.run_until_complete(coro1())
    274 
    275         self.assertRaises(
    276             RuntimeError, self.loop.run_until_complete, coro2())
    277 
    278     # Note: because of the default Windows timing granularity of
    279     # 15.6 msec, we use fairly long sleep times here (~100 msec).
    280 
    281     def test_run_until_complete(self):
    282         t0 = self.loop.time()
    283         self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
    284         t1 = self.loop.time()
    285         self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
    286 
    287     def test_run_until_complete_stopped(self):
    288         @asyncio.coroutine
    289         def cb():
    290             self.loop.stop()
    291             yield from asyncio.sleep(0.1, loop=self.loop)
    292         task = cb()
    293         self.assertRaises(RuntimeError,
    294                           self.loop.run_until_complete, task)
    295 
    296     def test_call_later(self):
    297         results = []
    298 
    299         def callback(arg):
    300             results.append(arg)
    301             self.loop.stop()
    302 
    303         self.loop.call_later(0.1, callback, 'hello world')
    304         t0 = time.monotonic()
    305         self.loop.run_forever()
    306         t1 = time.monotonic()
    307         self.assertEqual(results, ['hello world'])
    308         self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
    309 
    310     def test_call_soon(self):
    311         results = []
    312 
    313         def callback(arg1, arg2):
    314             results.append((arg1, arg2))
    315             self.loop.stop()
    316 
    317         self.loop.call_soon(callback, 'hello', 'world')
    318         self.loop.run_forever()
    319         self.assertEqual(results, [('hello', 'world')])
    320 
    321     def test_call_soon_threadsafe(self):
    322         results = []
    323         lock = threading.Lock()
    324 
    325         def callback(arg):
    326             results.append(arg)
    327             if len(results) >= 2:
    328                 self.loop.stop()
    329 
    330         def run_in_thread():
    331             self.loop.call_soon_threadsafe(callback, 'hello')
    332             lock.release()
    333 
    334         lock.acquire()
    335         t = threading.Thread(target=run_in_thread)
    336         t.start()
    337 
    338         with lock:
    339             self.loop.call_soon(callback, 'world')
    340             self.loop.run_forever()
    341         t.join()
    342         self.assertEqual(results, ['hello', 'world'])
    343 
    344     def test_call_soon_threadsafe_same_thread(self):
    345         results = []
    346 
    347         def callback(arg):
    348             results.append(arg)
    349             if len(results) >= 2:
    350                 self.loop.stop()
    351 
    352         self.loop.call_soon_threadsafe(callback, 'hello')
    353         self.loop.call_soon(callback, 'world')
    354         self.loop.run_forever()
    355         self.assertEqual(results, ['hello', 'world'])
    356 
    357     def test_run_in_executor(self):
    358         def run(arg):
    359             return (arg, threading.get_ident())
    360         f2 = self.loop.run_in_executor(None, run, 'yo')
    361         res, thread_id = self.loop.run_until_complete(f2)
    362         self.assertEqual(res, 'yo')
    363         self.assertNotEqual(thread_id, threading.get_ident())
    364 
    365     def test_reader_callback(self):
    366         r, w = test_utils.socketpair()
    367         r.setblocking(False)
    368         bytes_read = bytearray()
    369 
    370         def reader():
    371             try:
    372                 data = r.recv(1024)
    373             except BlockingIOError:
    374                 # Spurious readiness notifications are possible
    375                 # at least on Linux -- see man select.
    376                 return
    377             if data:
    378                 bytes_read.extend(data)
    379             else:
    380                 self.assertTrue(self.loop.remove_reader(r.fileno()))
    381                 r.close()
    382 
    383         self.loop.add_reader(r.fileno(), reader)
    384         self.loop.call_soon(w.send, b'abc')
    385         test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
    386         self.loop.call_soon(w.send, b'def')
    387         test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
    388         self.loop.call_soon(w.close)
    389         self.loop.call_soon(self.loop.stop)
    390         self.loop.run_forever()
    391         self.assertEqual(bytes_read, b'abcdef')
    392 
    393     def test_writer_callback(self):
    394         r, w = test_utils.socketpair()
    395         w.setblocking(False)
    396 
    397         def writer(data):
    398             w.send(data)
    399             self.loop.stop()
    400 
    401         data = b'x' * 1024
    402         self.loop.add_writer(w.fileno(), writer, data)
    403         self.loop.run_forever()
    404 
    405         self.assertTrue(self.loop.remove_writer(w.fileno()))
    406         self.assertFalse(self.loop.remove_writer(w.fileno()))
    407 
    408         w.close()
    409         read = r.recv(len(data) * 2)
    410         r.close()
    411         self.assertEqual(read, data)
    412 
    413     def _basetest_sock_client_ops(self, httpd, sock):
    414         if not isinstance(self.loop, proactor_events.BaseProactorEventLoop):
    415             # in debug mode, socket operations must fail
    416             # if the socket is not in blocking mode
    417             self.loop.set_debug(True)
    418             sock.setblocking(True)
    419             with self.assertRaises(ValueError):
    420                 self.loop.run_until_complete(
    421                     self.loop.sock_connect(sock, httpd.address))
    422             with self.assertRaises(ValueError):
    423                 self.loop.run_until_complete(
    424                     self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
    425             with self.assertRaises(ValueError):
    426                 self.loop.run_until_complete(
    427                     self.loop.sock_recv(sock, 1024))
    428             with self.assertRaises(ValueError):
    429                 self.loop.run_until_complete(
    430                     self.loop.sock_accept(sock))
    431 
    432         # test in non-blocking mode
    433         sock.setblocking(False)
    434         self.loop.run_until_complete(
    435             self.loop.sock_connect(sock, httpd.address))
    436         self.loop.run_until_complete(
    437             self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
    438         data = self.loop.run_until_complete(
    439             self.loop.sock_recv(sock, 1024))
    440         # consume data
    441         self.loop.run_until_complete(
    442             self.loop.sock_recv(sock, 1024))
    443         sock.close()
    444         self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
    445 
    446     def test_sock_client_ops(self):
    447         with test_utils.run_test_server() as httpd:
    448             sock = socket.socket()
    449             self._basetest_sock_client_ops(httpd, sock)
    450 
    451     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
    452     def test_unix_sock_client_ops(self):
    453         with test_utils.run_test_unix_server() as httpd:
    454             sock = socket.socket(socket.AF_UNIX)
    455             self._basetest_sock_client_ops(httpd, sock)
    456 
    457     def test_sock_client_fail(self):
    458         # Make sure that we will get an unused port
    459         address = None
    460         try:
    461             s = socket.socket()
    462             s.bind(('127.0.0.1', 0))
    463             address = s.getsockname()
    464         finally:
    465             s.close()
    466 
    467         sock = socket.socket()
    468         sock.setblocking(False)
    469         with self.assertRaises(ConnectionRefusedError):
    470             self.loop.run_until_complete(
    471                 self.loop.sock_connect(sock, address))
    472         sock.close()
    473 
    474     def test_sock_accept(self):
    475         listener = socket.socket()
    476         listener.setblocking(False)
    477         listener.bind(('127.0.0.1', 0))
    478         listener.listen(1)
    479         client = socket.socket()
    480         client.connect(listener.getsockname())
    481 
    482         f = self.loop.sock_accept(listener)
    483         conn, addr = self.loop.run_until_complete(f)
    484         self.assertEqual(conn.gettimeout(), 0)
    485         self.assertEqual(addr, client.getsockname())
    486         self.assertEqual(client.getpeername(), listener.getsockname())
    487         client.close()
    488         conn.close()
    489         listener.close()
    490 
    491     @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
    492     def test_add_signal_handler(self):
    493         caught = 0
    494 
    495         def my_handler():
    496             nonlocal caught
    497             caught += 1
    498 
    499         # Check error behavior first.
    500         self.assertRaises(
    501             TypeError, self.loop.add_signal_handler, 'boom', my_handler)
    502         self.assertRaises(
    503             TypeError, self.loop.remove_signal_handler, 'boom')
    504         self.assertRaises(
    505             ValueError, self.loop.add_signal_handler, signal.NSIG+1,
    506             my_handler)
    507         self.assertRaises(
    508             ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
    509         self.assertRaises(
    510             ValueError, self.loop.add_signal_handler, 0, my_handler)
    511         self.assertRaises(
    512             ValueError, self.loop.remove_signal_handler, 0)
    513         self.assertRaises(
    514             ValueError, self.loop.add_signal_handler, -1, my_handler)
    515         self.assertRaises(
    516             ValueError, self.loop.remove_signal_handler, -1)
    517         self.assertRaises(
    518             RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
    519             my_handler)
    520         # Removing SIGKILL doesn't raise, since we don't call signal().
    521         self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
    522         # Now set a handler and handle it.
    523         self.loop.add_signal_handler(signal.SIGINT, my_handler)
    524 
    525         os.kill(os.getpid(), signal.SIGINT)
    526         test_utils.run_until(self.loop, lambda: caught)
    527 
    528         # Removing it should restore the default handler.
    529         self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
    530         self.assertEqual(signal.getsignal(signal.SIGINT),
    531                          signal.default_int_handler)
    532         # Removing again returns False.
    533         self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
    534 
    535     @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
    536     def test_signal_handling_while_selecting(self):
    537         # Test with a signal actually arriving during a select() call.
    538         caught = 0
    539 
    540         def my_handler():
    541             nonlocal caught
    542             caught += 1
    543             self.loop.stop()
    544 
    545         self.loop.add_signal_handler(signal.SIGALRM, my_handler)
    546 
    547         signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
    548         self.loop.run_forever()
    549         self.assertEqual(caught, 1)
    550 
    551     @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
    552     def test_signal_handling_args(self):
    553         some_args = (42,)
    554         caught = 0
    555 
    556         def my_handler(*args):
    557             nonlocal caught
    558             caught += 1
    559             self.assertEqual(args, some_args)
    560 
    561         self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
    562 
    563         signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
    564         self.loop.call_later(0.5, self.loop.stop)
    565         self.loop.run_forever()
    566         self.assertEqual(caught, 1)
    567 
    568     def _basetest_create_connection(self, connection_fut, check_sockname=True):
    569         tr, pr = self.loop.run_until_complete(connection_fut)
    570         self.assertIsInstance(tr, asyncio.Transport)
    571         self.assertIsInstance(pr, asyncio.Protocol)
    572         self.assertIs(pr.transport, tr)
    573         if check_sockname:
    574             self.assertIsNotNone(tr.get_extra_info('sockname'))
    575         self.loop.run_until_complete(pr.done)
    576         self.assertGreater(pr.nbytes, 0)
    577         tr.close()
    578 
    579     def test_create_connection(self):
    580         with test_utils.run_test_server() as httpd:
    581             conn_fut = self.loop.create_connection(
    582                 lambda: MyProto(loop=self.loop), *httpd.address)
    583             self._basetest_create_connection(conn_fut)
    584 
    585     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
    586     def test_create_unix_connection(self):
    587         # Issue #20682: On Mac OS X Tiger, getsockname() returns a
    588         # zero-length address for UNIX socket.
    589         check_sockname = not osx_tiger()
    590 
    591         with test_utils.run_test_unix_server() as httpd:
    592             conn_fut = self.loop.create_unix_connection(
    593                 lambda: MyProto(loop=self.loop), httpd.address)
    594             self._basetest_create_connection(conn_fut, check_sockname)
    595 
    596     def test_create_connection_sock(self):
    597         with test_utils.run_test_server() as httpd:
    598             sock = None
    599             infos = self.loop.run_until_complete(
    600                 self.loop.getaddrinfo(
    601                     *httpd.address, type=socket.SOCK_STREAM))
    602             for family, type, proto, cname, address in infos:
    603                 try:
    604                     sock = socket.socket(family=family, type=type, proto=proto)
    605                     sock.setblocking(False)
    606                     self.loop.run_until_complete(
    607                         self.loop.sock_connect(sock, address))
    608                 except:
    609                     pass
    610                 else:
    611                     break
    612             else:
    613                 assert False, 'Can not create socket.'
    614 
    615             f = self.loop.create_connection(
    616                 lambda: MyProto(loop=self.loop), sock=sock)
    617             tr, pr = self.loop.run_until_complete(f)
    618             self.assertIsInstance(tr, asyncio.Transport)
    619             self.assertIsInstance(pr, asyncio.Protocol)
    620             self.loop.run_until_complete(pr.done)
    621             self.assertGreater(pr.nbytes, 0)
    622             tr.close()
    623 
    624     def check_ssl_extra_info(self, client, check_sockname=True,
    625                              peername=None, peercert={}):
    626         if check_sockname:
    627             self.assertIsNotNone(client.get_extra_info('sockname'))
    628         if peername:
    629             self.assertEqual(peername,
    630                              client.get_extra_info('peername'))
    631         else:
    632             self.assertIsNotNone(client.get_extra_info('peername'))
    633         self.assertEqual(peercert,
    634                          client.get_extra_info('peercert'))
    635 
    636         # test SSL cipher
    637         cipher = client.get_extra_info('cipher')
    638         self.assertIsInstance(cipher, tuple)
    639         self.assertEqual(len(cipher), 3, cipher)
    640         self.assertIsInstance(cipher[0], str)
    641         self.assertIsInstance(cipher[1], str)
    642         self.assertIsInstance(cipher[2], int)
    643 
    644         # test SSL object
    645         sslobj = client.get_extra_info('ssl_object')
    646         self.assertIsNotNone(sslobj)
    647         self.assertEqual(sslobj.compression(),
    648                          client.get_extra_info('compression'))
    649         self.assertEqual(sslobj.cipher(),
    650                          client.get_extra_info('cipher'))
    651         self.assertEqual(sslobj.getpeercert(),
    652                          client.get_extra_info('peercert'))
    653         self.assertEqual(sslobj.compression(),
    654                          client.get_extra_info('compression'))
    655 
    656     def _basetest_create_ssl_connection(self, connection_fut,
    657                                         check_sockname=True,
    658                                         peername=None):
    659         tr, pr = self.loop.run_until_complete(connection_fut)
    660         self.assertIsInstance(tr, asyncio.Transport)
    661         self.assertIsInstance(pr, asyncio.Protocol)
    662         self.assertTrue('ssl' in tr.__class__.__name__.lower())
    663         self.check_ssl_extra_info(tr, check_sockname, peername)
    664         self.loop.run_until_complete(pr.done)
    665         self.assertGreater(pr.nbytes, 0)
    666         tr.close()
    667 
    668     def _test_create_ssl_connection(self, httpd, create_connection,
    669                                     check_sockname=True, peername=None):
    670         conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
    671         self._basetest_create_ssl_connection(conn_fut, check_sockname,
    672                                              peername)
    673 
    674         # ssl.Purpose was introduced in Python 3.4
    675         if hasattr(ssl, 'Purpose'):
    676             def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
    677                                           cafile=None, capath=None,
    678                                           cadata=None):
    679                 """
    680                 A ssl.create_default_context() replacement that doesn't enable
    681                 cert validation.
    682                 """
    683                 self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
    684                 return test_utils.dummy_ssl_context()
    685 
    686             # With ssl=True, ssl.create_default_context() should be called
    687             with mock.patch('ssl.create_default_context',
    688                             side_effect=_dummy_ssl_create_context) as m:
    689                 conn_fut = create_connection(ssl=True)
    690                 self._basetest_create_ssl_connection(conn_fut, check_sockname,
    691                                                      peername)
    692                 self.assertEqual(m.call_count, 1)
    693 
    694         # With the real ssl.create_default_context(), certificate
    695         # validation will fail
    696         with self.assertRaises(ssl.SSLError) as cm:
    697             conn_fut = create_connection(ssl=True)
    698             # Ignore the "SSL handshake failed" log in debug mode
    699             with test_utils.disable_logger():
    700                 self._basetest_create_ssl_connection(conn_fut, check_sockname,
    701                                                      peername)
    702 
    703         self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
    704 
    705     @unittest.skipIf(ssl is None, 'No ssl module')
    706     def test_create_ssl_connection(self):
    707         with test_utils.run_test_server(use_ssl=True) as httpd:
    708             create_connection = functools.partial(
    709                 self.loop.create_connection,
    710                 lambda: MyProto(loop=self.loop),
    711                 *httpd.address)
    712             self._test_create_ssl_connection(httpd, create_connection,
    713                                              peername=httpd.address)
    714 
    715     def test_legacy_create_ssl_connection(self):
    716         with test_utils.force_legacy_ssl_support():
    717             self.test_create_ssl_connection()
    718 
    719     @unittest.skipIf(ssl is None, 'No ssl module')
    720     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
    721     def test_create_ssl_unix_connection(self):
    722         # Issue #20682: On Mac OS X Tiger, getsockname() returns a
    723         # zero-length address for UNIX socket.
    724         check_sockname = not osx_tiger()
    725 
    726         with test_utils.run_test_unix_server(use_ssl=True) as httpd:
    727             create_connection = functools.partial(
    728                 self.loop.create_unix_connection,
    729                 lambda: MyProto(loop=self.loop), httpd.address,
    730                 server_hostname='127.0.0.1')
    731 
    732             self._test_create_ssl_connection(httpd, create_connection,
    733                                              check_sockname,
    734                                              peername=httpd.address)
    735 
    736     def test_legacy_create_ssl_unix_connection(self):
    737         with test_utils.force_legacy_ssl_support():
    738             self.test_create_ssl_unix_connection()
    739 
    740     def test_create_connection_local_addr(self):
    741         with test_utils.run_test_server() as httpd:
    742             port = support.find_unused_port()
    743             f = self.loop.create_connection(
    744                 lambda: MyProto(loop=self.loop),
    745                 *httpd.address, local_addr=(httpd.address[0], port))
    746             tr, pr = self.loop.run_until_complete(f)
    747             expected = pr.transport.get_extra_info('sockname')[1]
    748             self.assertEqual(port, expected)
    749             tr.close()
    750 
    751     def test_create_connection_local_addr_in_use(self):
    752         with test_utils.run_test_server() as httpd:
    753             f = self.loop.create_connection(
    754                 lambda: MyProto(loop=self.loop),
    755                 *httpd.address, local_addr=httpd.address)
    756             with self.assertRaises(OSError) as cm:
    757                 self.loop.run_until_complete(f)
    758             self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
    759             self.assertIn(str(httpd.address), cm.exception.strerror)
    760 
    761     def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
    762         loop = self.loop
    763 
    764         class MyProto(MyBaseProto):
    765 
    766             def connection_lost(self, exc):
    767                 super().connection_lost(exc)
    768                 loop.call_soon(loop.stop)
    769 
    770             def data_received(self, data):
    771                 super().data_received(data)
    772                 self.transport.write(expected_response)
    773 
    774         lsock = socket.socket()
    775         lsock.bind(('127.0.0.1', 0))
    776         lsock.listen(1)
    777         addr = lsock.getsockname()
    778 
    779         message = b'test data'
    780         response = None
    781         expected_response = b'roger'
    782 
    783         def client():
    784             nonlocal response
    785             try:
    786                 csock = socket.socket()
    787                 if client_ssl is not None:
    788                     csock = client_ssl.wrap_socket(csock)
    789                 csock.connect(addr)
    790                 csock.sendall(message)
    791                 response = csock.recv(99)
    792                 csock.close()
    793             except Exception as exc:
    794                 print(
    795                     "Failure in client thread in test_connect_accepted_socket",
    796                     exc)
    797 
    798         thread = threading.Thread(target=client, daemon=True)
    799         thread.start()
    800 
    801         conn, _ = lsock.accept()
    802         proto = MyProto(loop=loop)
    803         proto.loop = loop
    804         loop.run_until_complete(
    805             loop.connect_accepted_socket(
    806                 (lambda: proto), conn, ssl=server_ssl))
    807         loop.run_forever()
    808         proto.transport.close()
    809         lsock.close()
    810 
    811         thread.join(1)
    812         self.assertFalse(thread.is_alive())
    813         self.assertEqual(proto.state, 'CLOSED')
    814         self.assertEqual(proto.nbytes, len(message))
    815         self.assertEqual(response, expected_response)
    816 
    817     @unittest.skipIf(ssl is None, 'No ssl module')
    818     def test_ssl_connect_accepted_socket(self):
    819         if (sys.platform == 'win32' and
    820             sys.version_info < (3, 5) and
    821             isinstance(self.loop, proactor_events.BaseProactorEventLoop)
    822             ):
    823             raise unittest.SkipTest(
    824                 'SSL not supported with proactor event loops before Python 3.5'
    825                 )
    826 
    827         server_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    828         server_context.load_cert_chain(ONLYCERT, ONLYKEY)
    829         if hasattr(server_context, 'check_hostname'):
    830             server_context.check_hostname = False
    831         server_context.verify_mode = ssl.CERT_NONE
    832 
    833         client_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    834         if hasattr(server_context, 'check_hostname'):
    835             client_context.check_hostname = False
    836         client_context.verify_mode = ssl.CERT_NONE
    837 
    838         self.test_connect_accepted_socket(server_context, client_context)
    839 
    840     @mock.patch('asyncio.base_events.socket')
    841     def create_server_multiple_hosts(self, family, hosts, mock_sock):
    842         @asyncio.coroutine
    843         def getaddrinfo(host, port, *args, **kw):
    844             if family == socket.AF_INET:
    845                 return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
    846             else:
    847                 return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
    848 
    849         def getaddrinfo_task(*args, **kwds):
    850             return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
    851 
    852         unique_hosts = set(hosts)
    853 
    854         if family == socket.AF_INET:
    855             mock_sock.socket().getsockbyname.side_effect = [
    856                 (host, 80) for host in unique_hosts]
    857         else:
    858             mock_sock.socket().getsockbyname.side_effect = [
    859                 (host, 80, 0, 0) for host in unique_hosts]
    860         self.loop.getaddrinfo = getaddrinfo_task
    861         self.loop._start_serving = mock.Mock()
    862         self.loop._stop_serving = mock.Mock()
    863         f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
    864         server = self.loop.run_until_complete(f)
    865         self.addCleanup(server.close)
    866         server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
    867         self.assertEqual(server_hosts, unique_hosts)
    868 
    869     def test_create_server_multiple_hosts_ipv4(self):
    870         self.create_server_multiple_hosts(socket.AF_INET,
    871                                           ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
    872 
    873     def test_create_server_multiple_hosts_ipv6(self):
    874         self.create_server_multiple_hosts(socket.AF_INET6,
    875                                           ['::1', '::2', '::1'])
    876 
    877     def test_create_server(self):
    878         proto = MyProto(self.loop)
    879         f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
    880         server = self.loop.run_until_complete(f)
    881         self.assertEqual(len(server.sockets), 1)
    882         sock = server.sockets[0]
    883         host, port = sock.getsockname()
    884         self.assertEqual(host, '0.0.0.0')
    885         client = socket.socket()
    886         client.connect(('127.0.0.1', port))
    887         client.sendall(b'xxx')
    888 
    889         self.loop.run_until_complete(proto.connected)
    890         self.assertEqual('CONNECTED', proto.state)
    891 
    892         test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
    893         self.assertEqual(3, proto.nbytes)
    894 
    895         # extra info is available
    896         self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
    897         self.assertEqual('127.0.0.1',
    898                          proto.transport.get_extra_info('peername')[0])
    899 
    900         # close connection
    901         proto.transport.close()
    902         self.loop.run_until_complete(proto.done)
    903 
    904         self.assertEqual('CLOSED', proto.state)
    905 
    906         # the client socket must be closed after to avoid ECONNRESET upon
    907         # recv()/send() on the serving socket
    908         client.close()
    909 
    910         # close server
    911         server.close()
    912 
    913     @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
    914     def test_create_server_reuse_port(self):
    915         proto = MyProto(self.loop)
    916         f = self.loop.create_server(
    917             lambda: proto, '0.0.0.0', 0)
    918         server = self.loop.run_until_complete(f)
    919         self.assertEqual(len(server.sockets), 1)
    920         sock = server.sockets[0]
    921         self.assertFalse(
    922             sock.getsockopt(
    923                 socket.SOL_SOCKET, socket.SO_REUSEPORT))
    924         server.close()
    925 
    926         test_utils.run_briefly(self.loop)
    927 
    928         proto = MyProto(self.loop)
    929         f = self.loop.create_server(
    930             lambda: proto, '0.0.0.0', 0, reuse_port=True)
    931         server = self.loop.run_until_complete(f)
    932         self.assertEqual(len(server.sockets), 1)
    933         sock = server.sockets[0]
    934         self.assertTrue(
    935             sock.getsockopt(
    936                 socket.SOL_SOCKET, socket.SO_REUSEPORT))
    937         server.close()
    938 
    939     def _make_unix_server(self, factory, **kwargs):
    940         path = test_utils.gen_unix_socket_path()
    941         self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
    942 
    943         f = self.loop.create_unix_server(factory, path, **kwargs)
    944         server = self.loop.run_until_complete(f)
    945 
    946         return server, path
    947 
    948     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
    949     def test_create_unix_server(self):
    950         proto = MyProto(loop=self.loop)
    951         server, path = self._make_unix_server(lambda: proto)
    952         self.assertEqual(len(server.sockets), 1)
    953 
    954         client = socket.socket(socket.AF_UNIX)
    955         client.connect(path)
    956         client.sendall(b'xxx')
    957 
    958         self.loop.run_until_complete(proto.connected)
    959         self.assertEqual('CONNECTED', proto.state)
    960         test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
    961         self.assertEqual(3, proto.nbytes)
    962 
    963         # close connection
    964         proto.transport.close()
    965         self.loop.run_until_complete(proto.done)
    966 
    967         self.assertEqual('CLOSED', proto.state)
    968 
    969         # the client socket must be closed after to avoid ECONNRESET upon
    970         # recv()/send() on the serving socket
    971         client.close()
    972 
    973         # close server
    974         server.close()
    975 
    976     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
    977     def test_create_unix_server_path_socket_error(self):
    978         proto = MyProto(loop=self.loop)
    979         sock = socket.socket()
    980         with sock:
    981             f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
    982             with self.assertRaisesRegex(ValueError,
    983                                         'path and sock can not be specified '
    984                                         'at the same time'):
    985                 self.loop.run_until_complete(f)
    986 
    987     def _create_ssl_context(self, certfile, keyfile=None):
    988         sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    989         sslcontext.options |= ssl.OP_NO_SSLv2
    990         sslcontext.load_cert_chain(certfile, keyfile)
    991         return sslcontext
    992 
    993     def _make_ssl_server(self, factory, certfile, keyfile=None):
    994         sslcontext = self._create_ssl_context(certfile, keyfile)
    995 
    996         f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
    997         server = self.loop.run_until_complete(f)
    998 
    999         sock = server.sockets[0]
   1000         host, port = sock.getsockname()
   1001         self.assertEqual(host, '127.0.0.1')
   1002         return server, host, port
   1003 
   1004     def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
   1005         sslcontext = self._create_ssl_context(certfile, keyfile)
   1006         return self._make_unix_server(factory, ssl=sslcontext)
   1007 
   1008     @unittest.skipIf(ssl is None, 'No ssl module')
   1009     def test_create_server_ssl(self):
   1010         proto = MyProto(loop=self.loop)
   1011         server, host, port = self._make_ssl_server(
   1012             lambda: proto, ONLYCERT, ONLYKEY)
   1013 
   1014         f_c = self.loop.create_connection(MyBaseProto, host, port,
   1015                                           ssl=test_utils.dummy_ssl_context())
   1016         client, pr = self.loop.run_until_complete(f_c)
   1017 
   1018         client.write(b'xxx')
   1019         self.loop.run_until_complete(proto.connected)
   1020         self.assertEqual('CONNECTED', proto.state)
   1021 
   1022         test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
   1023         self.assertEqual(3, proto.nbytes)
   1024 
   1025         # extra info is available
   1026         self.check_ssl_extra_info(client, peername=(host, port))
   1027 
   1028         # close connection
   1029         proto.transport.close()
   1030         self.loop.run_until_complete(proto.done)
   1031         self.assertEqual('CLOSED', proto.state)
   1032 
   1033         # the client socket must be closed after to avoid ECONNRESET upon
   1034         # recv()/send() on the serving socket
   1035         client.close()
   1036 
   1037         # stop serving
   1038         server.close()
   1039 
   1040     def test_legacy_create_server_ssl(self):
   1041         with test_utils.force_legacy_ssl_support():
   1042             self.test_create_server_ssl()
   1043 
   1044     @unittest.skipIf(ssl is None, 'No ssl module')
   1045     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
   1046     def test_create_unix_server_ssl(self):
   1047         proto = MyProto(loop=self.loop)
   1048         server, path = self._make_ssl_unix_server(
   1049             lambda: proto, ONLYCERT, ONLYKEY)
   1050 
   1051         f_c = self.loop.create_unix_connection(
   1052             MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
   1053             server_hostname='')
   1054 
   1055         client, pr = self.loop.run_until_complete(f_c)
   1056 
   1057         client.write(b'xxx')
   1058         self.loop.run_until_complete(proto.connected)
   1059         self.assertEqual('CONNECTED', proto.state)
   1060         test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
   1061         self.assertEqual(3, proto.nbytes)
   1062 
   1063         # close connection
   1064         proto.transport.close()
   1065         self.loop.run_until_complete(proto.done)
   1066         self.assertEqual('CLOSED', proto.state)
   1067 
   1068         # the client socket must be closed after to avoid ECONNRESET upon
   1069         # recv()/send() on the serving socket
   1070         client.close()
   1071 
   1072         # stop serving
   1073         server.close()
   1074 
   1075     def test_legacy_create_unix_server_ssl(self):
   1076         with test_utils.force_legacy_ssl_support():
   1077             self.test_create_unix_server_ssl()
   1078 
   1079     @unittest.skipIf(ssl is None, 'No ssl module')
   1080     def test_create_server_ssl_verify_failed(self):
   1081         proto = MyProto(loop=self.loop)
   1082         server, host, port = self._make_ssl_server(
   1083             lambda: proto, SIGNED_CERTFILE)
   1084 
   1085         sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1086         sslcontext_client.options |= ssl.OP_NO_SSLv2
   1087         sslcontext_client.verify_mode = ssl.CERT_REQUIRED
   1088         if hasattr(sslcontext_client, 'check_hostname'):
   1089             sslcontext_client.check_hostname = True
   1090 
   1091 
   1092         # no CA loaded
   1093         f_c = self.loop.create_connection(MyProto, host, port,
   1094                                           ssl=sslcontext_client)
   1095         with mock.patch.object(self.loop, 'call_exception_handler'):
   1096             with test_utils.disable_logger():
   1097                 with self.assertRaisesRegex(ssl.SSLError,
   1098                                             '(?i)certificate.verify.failed'):
   1099                     self.loop.run_until_complete(f_c)
   1100 
   1101             # execute the loop to log the connection error
   1102             test_utils.run_briefly(self.loop)
   1103 
   1104         # close connection
   1105         self.assertIsNone(proto.transport)
   1106         server.close()
   1107 
   1108     def test_legacy_create_server_ssl_verify_failed(self):
   1109         with test_utils.force_legacy_ssl_support():
   1110             self.test_create_server_ssl_verify_failed()
   1111 
   1112     @unittest.skipIf(ssl is None, 'No ssl module')
   1113     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
   1114     def test_create_unix_server_ssl_verify_failed(self):
   1115         proto = MyProto(loop=self.loop)
   1116         server, path = self._make_ssl_unix_server(
   1117             lambda: proto, SIGNED_CERTFILE)
   1118 
   1119         sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1120         sslcontext_client.options |= ssl.OP_NO_SSLv2
   1121         sslcontext_client.verify_mode = ssl.CERT_REQUIRED
   1122         if hasattr(sslcontext_client, 'check_hostname'):
   1123             sslcontext_client.check_hostname = True
   1124 
   1125         # no CA loaded
   1126         f_c = self.loop.create_unix_connection(MyProto, path,
   1127                                                ssl=sslcontext_client,
   1128                                                server_hostname='invalid')
   1129         with mock.patch.object(self.loop, 'call_exception_handler'):
   1130             with test_utils.disable_logger():
   1131                 with self.assertRaisesRegex(ssl.SSLError,
   1132                                             '(?i)certificate.verify.failed'):
   1133                     self.loop.run_until_complete(f_c)
   1134 
   1135             # execute the loop to log the connection error
   1136             test_utils.run_briefly(self.loop)
   1137 
   1138         # close connection
   1139         self.assertIsNone(proto.transport)
   1140         server.close()
   1141 
   1142 
   1143     def test_legacy_create_unix_server_ssl_verify_failed(self):
   1144         with test_utils.force_legacy_ssl_support():
   1145             self.test_create_unix_server_ssl_verify_failed()
   1146 
   1147     @unittest.skipIf(ssl is None, 'No ssl module')
   1148     def test_create_server_ssl_match_failed(self):
   1149         proto = MyProto(loop=self.loop)
   1150         server, host, port = self._make_ssl_server(
   1151             lambda: proto, SIGNED_CERTFILE)
   1152 
   1153         sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1154         sslcontext_client.options |= ssl.OP_NO_SSLv2
   1155         sslcontext_client.verify_mode = ssl.CERT_REQUIRED
   1156         sslcontext_client.load_verify_locations(
   1157             cafile=SIGNING_CA)
   1158         if hasattr(sslcontext_client, 'check_hostname'):
   1159             sslcontext_client.check_hostname = True
   1160 
   1161         # incorrect server_hostname
   1162         f_c = self.loop.create_connection(MyProto, host, port,
   1163                                           ssl=sslcontext_client)
   1164         with mock.patch.object(self.loop, 'call_exception_handler'):
   1165             with test_utils.disable_logger():
   1166                 with self.assertRaisesRegex(
   1167                         ssl.CertificateError,
   1168                         "hostname '127.0.0.1' doesn't match 'localhost'"):
   1169                     self.loop.run_until_complete(f_c)
   1170 
   1171         # close connection
   1172         proto.transport.close()
   1173         server.close()
   1174 
   1175     def test_legacy_create_server_ssl_match_failed(self):
   1176         with test_utils.force_legacy_ssl_support():
   1177             self.test_create_server_ssl_match_failed()
   1178 
   1179     @unittest.skipIf(ssl is None, 'No ssl module')
   1180     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
   1181     def test_create_unix_server_ssl_verified(self):
   1182         proto = MyProto(loop=self.loop)
   1183         server, path = self._make_ssl_unix_server(
   1184             lambda: proto, SIGNED_CERTFILE)
   1185 
   1186         sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1187         sslcontext_client.options |= ssl.OP_NO_SSLv2
   1188         sslcontext_client.verify_mode = ssl.CERT_REQUIRED
   1189         sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
   1190         if hasattr(sslcontext_client, 'check_hostname'):
   1191             sslcontext_client.check_hostname = True
   1192 
   1193         # Connection succeeds with correct CA and server hostname.
   1194         f_c = self.loop.create_unix_connection(MyProto, path,
   1195                                                ssl=sslcontext_client,
   1196                                                server_hostname='localhost')
   1197         client, pr = self.loop.run_until_complete(f_c)
   1198 
   1199         # close connection
   1200         proto.transport.close()
   1201         client.close()
   1202         server.close()
   1203         self.loop.run_until_complete(proto.done)
   1204 
   1205     def test_legacy_create_unix_server_ssl_verified(self):
   1206         with test_utils.force_legacy_ssl_support():
   1207             self.test_create_unix_server_ssl_verified()
   1208 
   1209     @unittest.skipIf(ssl is None, 'No ssl module')
   1210     def test_create_server_ssl_verified(self):
   1211         proto = MyProto(loop=self.loop)
   1212         server, host, port = self._make_ssl_server(
   1213             lambda: proto, SIGNED_CERTFILE)
   1214 
   1215         sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   1216         sslcontext_client.options |= ssl.OP_NO_SSLv2
   1217         sslcontext_client.verify_mode = ssl.CERT_REQUIRED
   1218         sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
   1219         if hasattr(sslcontext_client, 'check_hostname'):
   1220             sslcontext_client.check_hostname = True
   1221 
   1222         # Connection succeeds with correct CA and server hostname.
   1223         f_c = self.loop.create_connection(MyProto, host, port,
   1224                                           ssl=sslcontext_client,
   1225                                           server_hostname='localhost')
   1226         client, pr = self.loop.run_until_complete(f_c)
   1227 
   1228         # extra info is available
   1229         self.check_ssl_extra_info(client,peername=(host, port),
   1230                                   peercert=PEERCERT)
   1231 
   1232         # close connection
   1233         proto.transport.close()
   1234         client.close()
   1235         server.close()
   1236         self.loop.run_until_complete(proto.done)
   1237 
   1238     def test_legacy_create_server_ssl_verified(self):
   1239         with test_utils.force_legacy_ssl_support():
   1240             self.test_create_server_ssl_verified()
   1241 
   1242     def test_create_server_sock(self):
   1243         proto = asyncio.Future(loop=self.loop)
   1244 
   1245         class TestMyProto(MyProto):
   1246             def connection_made(self, transport):
   1247                 super().connection_made(transport)
   1248                 proto.set_result(self)
   1249 
   1250         sock_ob = socket.socket(type=socket.SOCK_STREAM)
   1251         sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
   1252         sock_ob.bind(('0.0.0.0', 0))
   1253 
   1254         f = self.loop.create_server(TestMyProto, sock=sock_ob)
   1255         server = self.loop.run_until_complete(f)
   1256         sock = server.sockets[0]
   1257         self.assertIs(sock, sock_ob)
   1258 
   1259         host, port = sock.getsockname()
   1260         self.assertEqual(host, '0.0.0.0')
   1261         client = socket.socket()
   1262         client.connect(('127.0.0.1', port))
   1263         client.send(b'xxx')
   1264         client.close()
   1265         server.close()
   1266 
   1267     def test_create_server_addr_in_use(self):
   1268         sock_ob = socket.socket(type=socket.SOCK_STREAM)
   1269         sock_ob.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
   1270         sock_ob.bind(('0.0.0.0', 0))
   1271 
   1272         f = self.loop.create_server(MyProto, sock=sock_ob)
   1273         server = self.loop.run_until_complete(f)
   1274         sock = server.sockets[0]
   1275         host, port = sock.getsockname()
   1276 
   1277         f = self.loop.create_server(MyProto, host=host, port=port)
   1278         with self.assertRaises(OSError) as cm:
   1279             self.loop.run_until_complete(f)
   1280         self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
   1281 
   1282         server.close()
   1283 
   1284     @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
   1285     def test_create_server_dual_stack(self):
   1286         f_proto = asyncio.Future(loop=self.loop)
   1287 
   1288         class TestMyProto(MyProto):
   1289             def connection_made(self, transport):
   1290                 super().connection_made(transport)
   1291                 f_proto.set_result(self)
   1292 
   1293         try_count = 0
   1294         while True:
   1295             try:
   1296                 port = support.find_unused_port()
   1297                 f = self.loop.create_server(TestMyProto, host=None, port=port)
   1298                 server = self.loop.run_until_complete(f)
   1299             except OSError as ex:
   1300                 if ex.errno == errno.EADDRINUSE:
   1301                     try_count += 1
   1302                     self.assertGreaterEqual(5, try_count)
   1303                     continue
   1304                 else:
   1305                     raise
   1306             else:
   1307                 break
   1308         client = socket.socket()
   1309         client.connect(('127.0.0.1', port))
   1310         client.send(b'xxx')
   1311         proto = self.loop.run_until_complete(f_proto)
   1312         proto.transport.close()
   1313         client.close()
   1314 
   1315         f_proto = asyncio.Future(loop=self.loop)
   1316         client = socket.socket(socket.AF_INET6)
   1317         client.connect(('::1', port))
   1318         client.send(b'xxx')
   1319         proto = self.loop.run_until_complete(f_proto)
   1320         proto.transport.close()
   1321         client.close()
   1322 
   1323         server.close()
   1324 
   1325     def test_server_close(self):
   1326         f = self.loop.create_server(MyProto, '0.0.0.0', 0)
   1327         server = self.loop.run_until_complete(f)
   1328         sock = server.sockets[0]
   1329         host, port = sock.getsockname()
   1330 
   1331         client = socket.socket()
   1332         client.connect(('127.0.0.1', port))
   1333         client.send(b'xxx')
   1334         client.close()
   1335 
   1336         server.close()
   1337 
   1338         client = socket.socket()
   1339         self.assertRaises(
   1340             ConnectionRefusedError, client.connect, ('127.0.0.1', port))
   1341         client.close()
   1342 
   1343     def test_create_datagram_endpoint(self):
   1344         class TestMyDatagramProto(MyDatagramProto):
   1345             def __init__(inner_self):
   1346                 super().__init__(loop=self.loop)
   1347 
   1348             def datagram_received(self, data, addr):
   1349                 super().datagram_received(data, addr)
   1350                 self.transport.sendto(b'resp:'+data, addr)
   1351 
   1352         coro = self.loop.create_datagram_endpoint(
   1353             TestMyDatagramProto, local_addr=('127.0.0.1', 0))
   1354         s_transport, server = self.loop.run_until_complete(coro)
   1355         host, port = s_transport.get_extra_info('sockname')
   1356 
   1357         self.assertIsInstance(s_transport, asyncio.Transport)
   1358         self.assertIsInstance(server, TestMyDatagramProto)
   1359         self.assertEqual('INITIALIZED', server.state)
   1360         self.assertIs(server.transport, s_transport)
   1361 
   1362         coro = self.loop.create_datagram_endpoint(
   1363             lambda: MyDatagramProto(loop=self.loop),
   1364             remote_addr=(host, port))
   1365         transport, client = self.loop.run_until_complete(coro)
   1366 
   1367         self.assertIsInstance(transport, asyncio.Transport)
   1368         self.assertIsInstance(client, MyDatagramProto)
   1369         self.assertEqual('INITIALIZED', client.state)
   1370         self.assertIs(client.transport, transport)
   1371 
   1372         transport.sendto(b'xxx')
   1373         test_utils.run_until(self.loop, lambda: server.nbytes)
   1374         self.assertEqual(3, server.nbytes)
   1375         test_utils.run_until(self.loop, lambda: client.nbytes)
   1376 
   1377         # received
   1378         self.assertEqual(8, client.nbytes)
   1379 
   1380         # extra info is available
   1381         self.assertIsNotNone(transport.get_extra_info('sockname'))
   1382 
   1383         # close connection
   1384         transport.close()
   1385         self.loop.run_until_complete(client.done)
   1386         self.assertEqual('CLOSED', client.state)
   1387         server.transport.close()
   1388 
   1389     def test_create_datagram_endpoint_sock(self):
   1390         if (sys.platform == 'win32' and
   1391                 isinstance(self.loop, proactor_events.BaseProactorEventLoop)):
   1392             raise unittest.SkipTest(
   1393                 'UDP is not supported with proactor event loops')
   1394 
   1395         sock = None
   1396         local_address = ('127.0.0.1', 0)
   1397         infos = self.loop.run_until_complete(
   1398             self.loop.getaddrinfo(
   1399                 *local_address, type=socket.SOCK_DGRAM))
   1400         for family, type, proto, cname, address in infos:
   1401             try:
   1402                 sock = socket.socket(family=family, type=type, proto=proto)
   1403                 sock.setblocking(False)
   1404                 sock.bind(address)
   1405             except:
   1406                 pass
   1407             else:
   1408                 break
   1409         else:
   1410             assert False, 'Can not create socket.'
   1411 
   1412         f = self.loop.create_datagram_endpoint(
   1413             lambda: MyDatagramProto(loop=self.loop), sock=sock)
   1414         tr, pr = self.loop.run_until_complete(f)
   1415         self.assertIsInstance(tr, asyncio.Transport)
   1416         self.assertIsInstance(pr, MyDatagramProto)
   1417         tr.close()
   1418         self.loop.run_until_complete(pr.done)
   1419 
   1420     def test_internal_fds(self):
   1421         loop = self.create_event_loop()
   1422         if not isinstance(loop, selector_events.BaseSelectorEventLoop):
   1423             loop.close()
   1424             self.skipTest('loop is not a BaseSelectorEventLoop')
   1425 
   1426         self.assertEqual(1, loop._internal_fds)
   1427         loop.close()
   1428         self.assertEqual(0, loop._internal_fds)
   1429         self.assertIsNone(loop._csock)
   1430         self.assertIsNone(loop._ssock)
   1431 
   1432     @unittest.skipUnless(sys.platform != 'win32',
   1433                          "Don't support pipes for Windows")
   1434     def test_read_pipe(self):
   1435         proto = MyReadPipeProto(loop=self.loop)
   1436 
   1437         rpipe, wpipe = os.pipe()
   1438         pipeobj = io.open(rpipe, 'rb', 1024)
   1439 
   1440         @asyncio.coroutine
   1441         def connect():
   1442             t, p = yield from self.loop.connect_read_pipe(
   1443                 lambda: proto, pipeobj)
   1444             self.assertIs(p, proto)
   1445             self.assertIs(t, proto.transport)
   1446             self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
   1447             self.assertEqual(0, proto.nbytes)
   1448 
   1449         self.loop.run_until_complete(connect())
   1450 
   1451         os.write(wpipe, b'1')
   1452         test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
   1453         self.assertEqual(1, proto.nbytes)
   1454 
   1455         os.write(wpipe, b'2345')
   1456         test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
   1457         self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
   1458         self.assertEqual(5, proto.nbytes)
   1459 
   1460         os.close(wpipe)
   1461         self.loop.run_until_complete(proto.done)
   1462         self.assertEqual(
   1463             ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
   1464         # extra info is available
   1465         self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
   1466 
   1467     @unittest.skipUnless(sys.platform != 'win32',
   1468                          "Don't support pipes for Windows")
   1469     def test_unclosed_pipe_transport(self):
   1470         # This test reproduces the issue #314 on GitHub
   1471         loop = self.create_event_loop()
   1472         read_proto = MyReadPipeProto(loop=loop)
   1473         write_proto = MyWritePipeProto(loop=loop)
   1474 
   1475         rpipe, wpipe = os.pipe()
   1476         rpipeobj = io.open(rpipe, 'rb', 1024)
   1477         wpipeobj = io.open(wpipe, 'w', 1024)
   1478 
   1479         @asyncio.coroutine
   1480         def connect():
   1481             read_transport, _ = yield from loop.connect_read_pipe(
   1482                 lambda: read_proto, rpipeobj)
   1483             write_transport, _ = yield from loop.connect_write_pipe(
   1484                 lambda: write_proto, wpipeobj)
   1485             return read_transport, write_transport
   1486 
   1487         # Run and close the loop without closing the transports
   1488         read_transport, write_transport = loop.run_until_complete(connect())
   1489         loop.close()
   1490 
   1491         # These 'repr' calls used to raise an AttributeError
   1492         # See Issue #314 on GitHub
   1493         self.assertIn('open', repr(read_transport))
   1494         self.assertIn('open', repr(write_transport))
   1495 
   1496         # Clean up (avoid ResourceWarning)
   1497         rpipeobj.close()
   1498         wpipeobj.close()
   1499         read_transport._pipe = None
   1500         write_transport._pipe = None
   1501 
   1502     @unittest.skipUnless(sys.platform != 'win32',
   1503                          "Don't support pipes for Windows")
   1504     # select, poll and kqueue don't support character devices (PTY) on Mac OS X
   1505     # older than 10.6 (Snow Leopard)
   1506     @support.requires_mac_ver(10, 6)
   1507     # Issue #20495: The test hangs on FreeBSD 7.2 but pass on FreeBSD 9
   1508     @support.requires_freebsd_version(8)
   1509     def test_read_pty_output(self):
   1510         proto = MyReadPipeProto(loop=self.loop)
   1511 
   1512         master, slave = os.openpty()
   1513         master_read_obj = io.open(master, 'rb', 0)
   1514 
   1515         @asyncio.coroutine
   1516         def connect():
   1517             t, p = yield from self.loop.connect_read_pipe(lambda: proto,
   1518                                                           master_read_obj)
   1519             self.assertIs(p, proto)
   1520             self.assertIs(t, proto.transport)
   1521             self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
   1522             self.assertEqual(0, proto.nbytes)
   1523 
   1524         self.loop.run_until_complete(connect())
   1525 
   1526         os.write(slave, b'1')
   1527         test_utils.run_until(self.loop, lambda: proto.nbytes)
   1528         self.assertEqual(1, proto.nbytes)
   1529 
   1530         os.write(slave, b'2345')
   1531         test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
   1532         self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
   1533         self.assertEqual(5, proto.nbytes)
   1534 
   1535         os.close(slave)
   1536         self.loop.run_until_complete(proto.done)
   1537         self.assertEqual(
   1538             ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
   1539         # extra info is available
   1540         self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
   1541 
   1542     @unittest.skipUnless(sys.platform != 'win32',
   1543                          "Don't support pipes for Windows")
   1544     def test_write_pipe(self):
   1545         rpipe, wpipe = os.pipe()
   1546         pipeobj = io.open(wpipe, 'wb', 1024)
   1547 
   1548         proto = MyWritePipeProto(loop=self.loop)
   1549         connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
   1550         transport, p = self.loop.run_until_complete(connect)
   1551         self.assertIs(p, proto)
   1552         self.assertIs(transport, proto.transport)
   1553         self.assertEqual('CONNECTED', proto.state)
   1554 
   1555         transport.write(b'1')
   1556 
   1557         data = bytearray()
   1558         def reader(data):
   1559             chunk = os.read(rpipe, 1024)
   1560             data += chunk
   1561             return len(data)
   1562 
   1563         test_utils.run_until(self.loop, lambda: reader(data) >= 1)
   1564         self.assertEqual(b'1', data)
   1565 
   1566         transport.write(b'2345')
   1567         test_utils.run_until(self.loop, lambda: reader(data) >= 5)
   1568         self.assertEqual(b'12345', data)
   1569         self.assertEqual('CONNECTED', proto.state)
   1570 
   1571         os.close(rpipe)
   1572 
   1573         # extra info is available
   1574         self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
   1575 
   1576         # close connection
   1577         proto.transport.close()
   1578         self.loop.run_until_complete(proto.done)
   1579         self.assertEqual('CLOSED', proto.state)
   1580 
   1581     @unittest.skipUnless(sys.platform != 'win32',
   1582                          "Don't support pipes for Windows")
   1583     def test_write_pipe_disconnect_on_close(self):
   1584         rsock, wsock = test_utils.socketpair()
   1585         rsock.setblocking(False)
   1586         pipeobj = io.open(wsock.detach(), 'wb', 1024)
   1587 
   1588         proto = MyWritePipeProto(loop=self.loop)
   1589         connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
   1590         transport, p = self.loop.run_until_complete(connect)
   1591         self.assertIs(p, proto)
   1592         self.assertIs(transport, proto.transport)
   1593         self.assertEqual('CONNECTED', proto.state)
   1594 
   1595         transport.write(b'1')
   1596         data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
   1597         self.assertEqual(b'1', data)
   1598 
   1599         rsock.close()
   1600 
   1601         self.loop.run_until_complete(proto.done)
   1602         self.assertEqual('CLOSED', proto.state)
   1603 
   1604     @unittest.skipUnless(sys.platform != 'win32',
   1605                          "Don't support pipes for Windows")
   1606     # select, poll and kqueue don't support character devices (PTY) on Mac OS X
   1607     # older than 10.6 (Snow Leopard)
   1608     @support.requires_mac_ver(10, 6)
   1609     def test_write_pty(self):
   1610         master, slave = os.openpty()
   1611         slave_write_obj = io.open(slave, 'wb', 0)
   1612 
   1613         proto = MyWritePipeProto(loop=self.loop)
   1614         connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
   1615         transport, p = self.loop.run_until_complete(connect)
   1616         self.assertIs(p, proto)
   1617         self.assertIs(transport, proto.transport)
   1618         self.assertEqual('CONNECTED', proto.state)
   1619 
   1620         transport.write(b'1')
   1621 
   1622         data = bytearray()
   1623         def reader(data):
   1624             chunk = os.read(master, 1024)
   1625             data += chunk
   1626             return len(data)
   1627 
   1628         test_utils.run_until(self.loop, lambda: reader(data) >= 1,
   1629                              timeout=10)
   1630         self.assertEqual(b'1', data)
   1631 
   1632         transport.write(b'2345')
   1633         test_utils.run_until(self.loop, lambda: reader(data) >= 5,
   1634                              timeout=10)
   1635         self.assertEqual(b'12345', data)
   1636         self.assertEqual('CONNECTED', proto.state)
   1637 
   1638         os.close(master)
   1639 
   1640         # extra info is available
   1641         self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
   1642 
   1643         # close connection
   1644         proto.transport.close()
   1645         self.loop.run_until_complete(proto.done)
   1646         self.assertEqual('CLOSED', proto.state)
   1647 
   1648     @unittest.skipUnless(sys.platform != 'win32',
   1649                          "Don't support pipes for Windows")
   1650     # select, poll and kqueue don't support character devices (PTY) on Mac OS X
   1651     # older than 10.6 (Snow Leopard)
   1652     @support.requires_mac_ver(10, 6)
   1653     def test_bidirectional_pty(self):
   1654         master, read_slave = os.openpty()
   1655         write_slave = os.dup(read_slave)
   1656         tty.setraw(read_slave)
   1657 
   1658         slave_read_obj = io.open(read_slave, 'rb', 0)
   1659         read_proto = MyReadPipeProto(loop=self.loop)
   1660         read_connect = self.loop.connect_read_pipe(lambda: read_proto,
   1661                                                    slave_read_obj)
   1662         read_transport, p = self.loop.run_until_complete(read_connect)
   1663         self.assertIs(p, read_proto)
   1664         self.assertIs(read_transport, read_proto.transport)
   1665         self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
   1666         self.assertEqual(0, read_proto.nbytes)
   1667 
   1668 
   1669         slave_write_obj = io.open(write_slave, 'wb', 0)
   1670         write_proto = MyWritePipeProto(loop=self.loop)
   1671         write_connect = self.loop.connect_write_pipe(lambda: write_proto,
   1672                                                      slave_write_obj)
   1673         write_transport, p = self.loop.run_until_complete(write_connect)
   1674         self.assertIs(p, write_proto)
   1675         self.assertIs(write_transport, write_proto.transport)
   1676         self.assertEqual('CONNECTED', write_proto.state)
   1677 
   1678         data = bytearray()
   1679         def reader(data):
   1680             chunk = os.read(master, 1024)
   1681             data += chunk
   1682             return len(data)
   1683 
   1684         write_transport.write(b'1')
   1685         test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
   1686         self.assertEqual(b'1', data)
   1687         self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
   1688         self.assertEqual('CONNECTED', write_proto.state)
   1689 
   1690         os.write(master, b'a')
   1691         test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
   1692                              timeout=10)
   1693         self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
   1694         self.assertEqual(1, read_proto.nbytes)
   1695         self.assertEqual('CONNECTED', write_proto.state)
   1696 
   1697         write_transport.write(b'2345')
   1698         test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
   1699         self.assertEqual(b'12345', data)
   1700         self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
   1701         self.assertEqual('CONNECTED', write_proto.state)
   1702 
   1703         os.write(master, b'bcde')
   1704         test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
   1705                              timeout=10)
   1706         self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
   1707         self.assertEqual(5, read_proto.nbytes)
   1708         self.assertEqual('CONNECTED', write_proto.state)
   1709 
   1710         os.close(master)
   1711 
   1712         read_transport.close()
   1713         self.loop.run_until_complete(read_proto.done)
   1714         self.assertEqual(
   1715             ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
   1716 
   1717         write_transport.close()
   1718         self.loop.run_until_complete(write_proto.done)
   1719         self.assertEqual('CLOSED', write_proto.state)
   1720 
   1721     def test_prompt_cancellation(self):
   1722         r, w = test_utils.socketpair()
   1723         r.setblocking(False)
   1724         f = self.loop.sock_recv(r, 1)
   1725         ov = getattr(f, 'ov', None)
   1726         if ov is not None:
   1727             self.assertTrue(ov.pending)
   1728 
   1729         @asyncio.coroutine
   1730         def main():
   1731             try:
   1732                 self.loop.call_soon(f.cancel)
   1733                 yield from f
   1734             except asyncio.CancelledError:
   1735                 res = 'cancelled'
   1736             else:
   1737                 res = None
   1738             finally:
   1739                 self.loop.stop()
   1740             return res
   1741 
   1742         start = time.monotonic()
   1743         t = asyncio.Task(main(), loop=self.loop)
   1744         self.loop.run_forever()
   1745         elapsed = time.monotonic() - start
   1746 
   1747         self.assertLess(elapsed, 0.1)
   1748         self.assertEqual(t.result(), 'cancelled')
   1749         self.assertRaises(asyncio.CancelledError, f.result)
   1750         if ov is not None:
   1751             self.assertFalse(ov.pending)
   1752         self.loop._stop_serving(r)
   1753 
   1754         r.close()
   1755         w.close()
   1756 
   1757     def test_timeout_rounding(self):
   1758         def _run_once():
   1759             self.loop._run_once_counter += 1
   1760             orig_run_once()
   1761 
   1762         orig_run_once = self.loop._run_once
   1763         self.loop._run_once_counter = 0
   1764         self.loop._run_once = _run_once
   1765 
   1766         @asyncio.coroutine
   1767         def wait():
   1768             loop = self.loop
   1769             yield from asyncio.sleep(1e-2, loop=loop)
   1770             yield from asyncio.sleep(1e-4, loop=loop)
   1771             yield from asyncio.sleep(1e-6, loop=loop)
   1772             yield from asyncio.sleep(1e-8, loop=loop)
   1773             yield from asyncio.sleep(1e-10, loop=loop)
   1774 
   1775         self.loop.run_until_complete(wait())
   1776         # The ideal number of call is 12, but on some platforms, the selector
   1777         # may sleep at little bit less than timeout depending on the resolution
   1778         # of the clock used by the kernel. Tolerate a few useless calls on
   1779         # these platforms.
   1780         self.assertLessEqual(self.loop._run_once_counter, 20,
   1781             {'clock_resolution': self.loop._clock_resolution,
   1782              'selector': self.loop._selector.__class__.__name__})
   1783 
   1784     def test_remove_fds_after_closing(self):
   1785         loop = self.create_event_loop()
   1786         callback = lambda: None
   1787         r, w = test_utils.socketpair()
   1788         self.addCleanup(r.close)
   1789         self.addCleanup(w.close)
   1790         loop.add_reader(r, callback)
   1791         loop.add_writer(w, callback)
   1792         loop.close()
   1793         self.assertFalse(loop.remove_reader(r))
   1794         self.assertFalse(loop.remove_writer(w))
   1795 
   1796     def test_add_fds_after_closing(self):
   1797         loop = self.create_event_loop()
   1798         callback = lambda: None
   1799         r, w = test_utils.socketpair()
   1800         self.addCleanup(r.close)
   1801         self.addCleanup(w.close)
   1802         loop.close()
   1803         with self.assertRaises(RuntimeError):
   1804             loop.add_reader(r, callback)
   1805         with self.assertRaises(RuntimeError):
   1806             loop.add_writer(w, callback)
   1807 
   1808     def test_close_running_event_loop(self):
   1809         @asyncio.coroutine
   1810         def close_loop(loop):
   1811             self.loop.close()
   1812 
   1813         coro = close_loop(self.loop)
   1814         with self.assertRaises(RuntimeError):
   1815             self.loop.run_until_complete(coro)
   1816 
   1817     def test_close(self):
   1818         self.loop.close()
   1819 
   1820         @asyncio.coroutine
   1821         def test():
   1822             pass
   1823 
   1824         func = lambda: False
   1825         coro = test()
   1826         self.addCleanup(coro.close)
   1827 
   1828         # operation blocked when the loop is closed
   1829         with self.assertRaises(RuntimeError):
   1830             self.loop.run_forever()
   1831         with self.assertRaises(RuntimeError):
   1832             fut = asyncio.Future(loop=self.loop)
   1833             self.loop.run_until_complete(fut)
   1834         with self.assertRaises(RuntimeError):
   1835             self.loop.call_soon(func)
   1836         with self.assertRaises(RuntimeError):
   1837             self.loop.call_soon_threadsafe(func)
   1838         with self.assertRaises(RuntimeError):
   1839             self.loop.call_later(1.0, func)
   1840         with self.assertRaises(RuntimeError):
   1841             self.loop.call_at(self.loop.time() + .0, func)
   1842         with self.assertRaises(RuntimeError):
   1843             self.loop.run_in_executor(None, func)
   1844         with self.assertRaises(RuntimeError):
   1845             self.loop.create_task(coro)
   1846         with self.assertRaises(RuntimeError):
   1847             self.loop.add_signal_handler(signal.SIGTERM, func)
   1848 
   1849 
   1850 class SubprocessTestsMixin:
   1851 
   1852     def check_terminated(self, returncode):
   1853         if sys.platform == 'win32':
   1854             self.assertIsInstance(returncode, int)
   1855             # expect 1 but sometimes get 0
   1856         else:
   1857             self.assertEqual(-signal.SIGTERM, returncode)
   1858 
   1859     def check_killed(self, returncode):
   1860         if sys.platform == 'win32':
   1861             self.assertIsInstance(returncode, int)
   1862             # expect 1 but sometimes get 0
   1863         else:
   1864             self.assertEqual(-signal.SIGKILL, returncode)
   1865 
   1866     def test_subprocess_exec(self):
   1867         prog = os.path.join(os.path.dirname(__file__), 'echo.py')
   1868 
   1869         connect = self.loop.subprocess_exec(
   1870                         functools.partial(MySubprocessProtocol, self.loop),
   1871                         sys.executable, prog)
   1872         transp, proto = self.loop.run_until_complete(connect)
   1873         self.assertIsInstance(proto, MySubprocessProtocol)
   1874         self.loop.run_until_complete(proto.connected)
   1875         self.assertEqual('CONNECTED', proto.state)
   1876 
   1877         stdin = transp.get_pipe_transport(0)
   1878         stdin.write(b'Python The Winner')
   1879         self.loop.run_until_complete(proto.got_data[1].wait())
   1880         with test_utils.disable_logger():
   1881             transp.close()
   1882         self.loop.run_until_complete(proto.completed)
   1883         self.check_killed(proto.returncode)
   1884         self.assertEqual(b'Python The Winner', proto.data[1])
   1885 
   1886     def test_subprocess_interactive(self):
   1887         prog = os.path.join(os.path.dirname(__file__), 'echo.py')
   1888 
   1889         connect = self.loop.subprocess_exec(
   1890                         functools.partial(MySubprocessProtocol, self.loop),
   1891                         sys.executable, prog)
   1892         transp, proto = self.loop.run_until_complete(connect)
   1893         self.assertIsInstance(proto, MySubprocessProtocol)
   1894         self.loop.run_until_complete(proto.connected)
   1895         self.assertEqual('CONNECTED', proto.state)
   1896 
   1897         stdin = transp.get_pipe_transport(0)
   1898         stdin.write(b'Python ')
   1899         self.loop.run_until_complete(proto.got_data[1].wait())
   1900         proto.got_data[1].clear()
   1901         self.assertEqual(b'Python ', proto.data[1])
   1902 
   1903         stdin.write(b'The Winner')
   1904         self.loop.run_until_complete(proto.got_data[1].wait())
   1905         self.assertEqual(b'Python The Winner', proto.data[1])
   1906 
   1907         with test_utils.disable_logger():
   1908             transp.close()
   1909         self.loop.run_until_complete(proto.completed)
   1910         self.check_killed(proto.returncode)
   1911 
   1912     def test_subprocess_shell(self):
   1913         connect = self.loop.subprocess_shell(
   1914                         functools.partial(MySubprocessProtocol, self.loop),
   1915                         'echo Python')
   1916         transp, proto = self.loop.run_until_complete(connect)
   1917         self.assertIsInstance(proto, MySubprocessProtocol)
   1918         self.loop.run_until_complete(proto.connected)
   1919 
   1920         transp.get_pipe_transport(0).close()
   1921         self.loop.run_until_complete(proto.completed)
   1922         self.assertEqual(0, proto.returncode)
   1923         self.assertTrue(all(f.done() for f in proto.disconnects.values()))
   1924         self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
   1925         self.assertEqual(proto.data[2], b'')
   1926         transp.close()
   1927 
   1928     def test_subprocess_exitcode(self):
   1929         connect = self.loop.subprocess_shell(
   1930                         functools.partial(MySubprocessProtocol, self.loop),
   1931                         'exit 7', stdin=None, stdout=None, stderr=None)
   1932         transp, proto = self.loop.run_until_complete(connect)
   1933         self.assertIsInstance(proto, MySubprocessProtocol)
   1934         self.loop.run_until_complete(proto.completed)
   1935         self.assertEqual(7, proto.returncode)
   1936         transp.close()
   1937 
   1938     def test_subprocess_close_after_finish(self):
   1939         connect = self.loop.subprocess_shell(
   1940                         functools.partial(MySubprocessProtocol, self.loop),
   1941                         'exit 7', stdin=None, stdout=None, stderr=None)
   1942         transp, proto = self.loop.run_until_complete(connect)
   1943         self.assertIsInstance(proto, MySubprocessProtocol)
   1944         self.assertIsNone(transp.get_pipe_transport(0))
   1945         self.assertIsNone(transp.get_pipe_transport(1))
   1946         self.assertIsNone(transp.get_pipe_transport(2))
   1947         self.loop.run_until_complete(proto.completed)
   1948         self.assertEqual(7, proto.returncode)
   1949         self.assertIsNone(transp.close())
   1950 
   1951     def test_subprocess_kill(self):
   1952         prog = os.path.join(os.path.dirname(__file__), 'echo.py')
   1953 
   1954         connect = self.loop.subprocess_exec(
   1955                         functools.partial(MySubprocessProtocol, self.loop),
   1956                         sys.executable, prog)
   1957         transp, proto = self.loop.run_until_complete(connect)
   1958         self.assertIsInstance(proto, MySubprocessProtocol)
   1959         self.loop.run_until_complete(proto.connected)
   1960 
   1961         transp.kill()
   1962         self.loop.run_until_complete(proto.completed)
   1963         self.check_killed(proto.returncode)
   1964         transp.close()
   1965 
   1966     def test_subprocess_terminate(self):
   1967         prog = os.path.join(os.path.dirname(__file__), 'echo.py')
   1968 
   1969         connect = self.loop.subprocess_exec(
   1970                         functools.partial(MySubprocessProtocol, self.loop),
   1971                         sys.executable, prog)
   1972         transp, proto = self.loop.run_until_complete(connect)
   1973         self.assertIsInstance(proto, MySubprocessProtocol)
   1974         self.loop.run_until_complete(proto.connected)
   1975 
   1976         transp.terminate()
   1977         self.loop.run_until_complete(proto.completed)
   1978         self.check_terminated(proto.returncode)
   1979         transp.close()
   1980 
   1981     @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
   1982     def test_subprocess_send_signal(self):
   1983         prog = os.path.join(os.path.dirname(__file__), 'echo.py')
   1984 
   1985         connect = self.loop.subprocess_exec(
   1986                         functools.partial(MySubprocessProtocol, self.loop),
   1987                         sys.executable, prog)
   1988         transp, proto = self.loop.run_until_complete(connect)
   1989         self.assertIsInstance(proto, MySubprocessProtocol)
   1990         self.loop.run_until_complete(proto.connected)
   1991 
   1992         transp.send_signal(signal.SIGHUP)
   1993         self.loop.run_until_complete(proto.completed)
   1994         self.assertEqual(-signal.SIGHUP, proto.returncode)
   1995         transp.close()
   1996 
   1997     def test_subprocess_stderr(self):
   1998         prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
   1999 
   2000         connect = self.loop.subprocess_exec(
   2001                         functools.partial(MySubprocessProtocol, self.loop),
   2002                         sys.executable, prog)
   2003         transp, proto = self.loop.run_until_complete(connect)
   2004         self.assertIsInstance(proto, MySubprocessProtocol)
   2005         self.loop.run_until_complete(proto.connected)
   2006 
   2007         stdin = transp.get_pipe_transport(0)
   2008         stdin.write(b'test')
   2009 
   2010         self.loop.run_until_complete(proto.completed)
   2011 
   2012         transp.close()
   2013         self.assertEqual(b'OUT:test', proto.data[1])
   2014         self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
   2015         self.assertEqual(0, proto.returncode)
   2016 
   2017     def test_subprocess_stderr_redirect_to_stdout(self):
   2018         prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
   2019 
   2020         connect = self.loop.subprocess_exec(
   2021                         functools.partial(MySubprocessProtocol, self.loop),
   2022                         sys.executable, prog, stderr=subprocess.STDOUT)
   2023         transp, proto = self.loop.run_until_complete(connect)
   2024         self.assertIsInstance(proto, MySubprocessProtocol)
   2025         self.loop.run_until_complete(proto.connected)
   2026 
   2027         stdin = transp.get_pipe_transport(0)
   2028         self.assertIsNotNone(transp.get_pipe_transport(1))
   2029         self.assertIsNone(transp.get_pipe_transport(2))
   2030 
   2031         stdin.write(b'test')
   2032         self.loop.run_until_complete(proto.completed)
   2033         self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
   2034                         proto.data[1])
   2035         self.assertEqual(b'', proto.data[2])
   2036 
   2037         transp.close()
   2038         self.assertEqual(0, proto.returncode)
   2039 
   2040     def test_subprocess_close_client_stream(self):
   2041         prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
   2042 
   2043         connect = self.loop.subprocess_exec(
   2044                         functools.partial(MySubprocessProtocol, self.loop),
   2045                         sys.executable, prog)
   2046         transp, proto = self.loop.run_until_complete(connect)
   2047         self.assertIsInstance(proto, MySubprocessProtocol)
   2048         self.loop.run_until_complete(proto.connected)
   2049 
   2050         stdin = transp.get_pipe_transport(0)
   2051         stdout = transp.get_pipe_transport(1)
   2052         stdin.write(b'test')
   2053         self.loop.run_until_complete(proto.got_data[1].wait())
   2054         self.assertEqual(b'OUT:test', proto.data[1])
   2055 
   2056         stdout.close()
   2057         self.loop.run_until_complete(proto.disconnects[1])
   2058         stdin.write(b'xxx')
   2059         self.loop.run_until_complete(proto.got_data[2].wait())
   2060         if sys.platform != 'win32':
   2061             self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
   2062         else:
   2063             # After closing the read-end of a pipe, writing to the
   2064             # write-end using os.write() fails with errno==EINVAL and
   2065             # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
   2066             # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
   2067             self.assertEqual(b'ERR:OSError', proto.data[2])
   2068         with test_utils.disable_logger():
   2069             transp.close()
   2070         self.loop.run_until_complete(proto.completed)
   2071         self.check_killed(proto.returncode)
   2072 
   2073     def test_subprocess_wait_no_same_group(self):
   2074         # start the new process in a new session
   2075         connect = self.loop.subprocess_shell(
   2076                         functools.partial(MySubprocessProtocol, self.loop),
   2077                         'exit 7', stdin=None, stdout=None, stderr=None,
   2078                         start_new_session=True)
   2079         _, proto = yield self.loop.run_until_complete(connect)
   2080         self.assertIsInstance(proto, MySubprocessProtocol)
   2081         self.loop.run_until_complete(proto.completed)
   2082         self.assertEqual(7, proto.returncode)
   2083 
   2084     def test_subprocess_exec_invalid_args(self):
   2085         @asyncio.coroutine
   2086         def connect(**kwds):
   2087             yield from self.loop.subprocess_exec(
   2088                 asyncio.SubprocessProtocol,
   2089                 'pwd', **kwds)
   2090 
   2091         with self.assertRaises(ValueError):
   2092             self.loop.run_until_complete(connect(universal_newlines=True))
   2093         with self.assertRaises(ValueError):
   2094             self.loop.run_until_complete(connect(bufsize=4096))
   2095         with self.assertRaises(ValueError):
   2096             self.loop.run_until_complete(connect(shell=True))
   2097 
   2098     def test_subprocess_shell_invalid_args(self):
   2099         @asyncio.coroutine
   2100         def connect(cmd=None, **kwds):
   2101             if not cmd:
   2102                 cmd = 'pwd'
   2103             yield from self.loop.subprocess_shell(
   2104                 asyncio.SubprocessProtocol,
   2105                 cmd, **kwds)
   2106 
   2107         with self.assertRaises(ValueError):
   2108             self.loop.run_until_complete(connect(['ls', '-l']))
   2109         with self.assertRaises(ValueError):
   2110             self.loop.run_until_complete(connect(universal_newlines=True))
   2111         with self.assertRaises(ValueError):
   2112             self.loop.run_until_complete(connect(bufsize=4096))
   2113         with self.assertRaises(ValueError):
   2114             self.loop.run_until_complete(connect(shell=False))
   2115 
   2116 
   2117 if sys.platform == 'win32':
   2118 
   2119     class SelectEventLoopTests(EventLoopTestsMixin, test_utils.TestCase):
   2120 
   2121         def create_event_loop(self):
   2122             return asyncio.SelectorEventLoop()
   2123 
   2124     class ProactorEventLoopTests(EventLoopTestsMixin,
   2125                                  SubprocessTestsMixin,
   2126                                  test_utils.TestCase):
   2127 
   2128         def create_event_loop(self):
   2129             return asyncio.ProactorEventLoop()
   2130 
   2131         if not sslproto._is_sslproto_available():
   2132             def test_create_ssl_connection(self):
   2133                 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
   2134 
   2135             def test_create_server_ssl(self):
   2136                 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
   2137 
   2138             def test_create_server_ssl_verify_failed(self):
   2139                 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
   2140 
   2141             def test_create_server_ssl_match_failed(self):
   2142                 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
   2143 
   2144             def test_create_server_ssl_verified(self):
   2145                 raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
   2146 
   2147         def test_legacy_create_ssl_connection(self):
   2148             raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
   2149 
   2150         def test_legacy_create_server_ssl(self):
   2151             raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
   2152 
   2153         def test_legacy_create_server_ssl_verify_failed(self):
   2154             raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
   2155 
   2156         def test_legacy_create_server_ssl_match_failed(self):
   2157             raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
   2158 
   2159         def test_legacy_create_server_ssl_verified(self):
   2160             raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
   2161 
   2162         def test_reader_callback(self):
   2163             raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
   2164 
   2165         def test_reader_callback_cancel(self):
   2166             raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
   2167 
   2168         def test_writer_callback(self):
   2169             raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
   2170 
   2171         def test_writer_callback_cancel(self):
   2172             raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
   2173 
   2174         def test_create_datagram_endpoint(self):
   2175             raise unittest.SkipTest(
   2176                 "IocpEventLoop does not have create_datagram_endpoint()")
   2177 
   2178         def test_remove_fds_after_closing(self):
   2179             raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
   2180 else:
   2181     from asyncio import selectors
   2182 
   2183     class UnixEventLoopTestsMixin(EventLoopTestsMixin):
   2184         def setUp(self):
   2185             super().setUp()
   2186             watcher = asyncio.SafeChildWatcher()
   2187             watcher.attach_loop(self.loop)
   2188             asyncio.set_child_watcher(watcher)
   2189 
   2190         def tearDown(self):
   2191             asyncio.set_child_watcher(None)
   2192             super().tearDown()
   2193 
   2194         def test_get_event_loop_new_process(self):
   2195             async def main():
   2196                 pool = concurrent.futures.ProcessPoolExecutor()
   2197                 return await self.loop.run_in_executor(
   2198                     pool, _test_get_event_loop_new_process__sub_proc)
   2199 
   2200             self.unpatch_get_running_loop()
   2201 
   2202             self.assertEqual(
   2203                 self.loop.run_until_complete(main()),
   2204                 'hello')
   2205 
   2206     if hasattr(selectors, 'KqueueSelector'):
   2207         class KqueueEventLoopTests(UnixEventLoopTestsMixin,
   2208                                    SubprocessTestsMixin,
   2209                                    test_utils.TestCase):
   2210 
   2211             def create_event_loop(self):
   2212                 return asyncio.SelectorEventLoop(
   2213                     selectors.KqueueSelector())
   2214 
   2215             # kqueue doesn't support character devices (PTY) on Mac OS X older
   2216             # than 10.9 (Maverick)
   2217             @support.requires_mac_ver(10, 9)
   2218             # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
   2219             # hangs on OpenBSD 5.5
   2220             @unittest.skipIf(sys.platform.startswith('openbsd'),
   2221                              'test hangs on OpenBSD')
   2222             def test_read_pty_output(self):
   2223                 super().test_read_pty_output()
   2224 
   2225             # kqueue doesn't support character devices (PTY) on Mac OS X older
   2226             # than 10.9 (Maverick)
   2227             @support.requires_mac_ver(10, 9)
   2228             def test_write_pty(self):
   2229                 super().test_write_pty()
   2230 
   2231     if hasattr(selectors, 'EpollSelector'):
   2232         class EPollEventLoopTests(UnixEventLoopTestsMixin,
   2233                                   SubprocessTestsMixin,
   2234                                   test_utils.TestCase):
   2235 
   2236             def create_event_loop(self):
   2237                 return asyncio.SelectorEventLoop(selectors.EpollSelector())
   2238 
   2239     if hasattr(selectors, 'PollSelector'):
   2240         class PollEventLoopTests(UnixEventLoopTestsMixin,
   2241                                  SubprocessTestsMixin,
   2242                                  test_utils.TestCase):
   2243 
   2244             def create_event_loop(self):
   2245                 return asyncio.SelectorEventLoop(selectors.PollSelector())
   2246 
   2247     # Should always exist.
   2248     class SelectEventLoopTests(UnixEventLoopTestsMixin,
   2249                                SubprocessTestsMixin,
   2250                                test_utils.TestCase):
   2251 
   2252         def create_event_loop(self):
   2253             return asyncio.SelectorEventLoop(selectors.SelectSelector())
   2254 
   2255 
   2256 def noop(*args, **kwargs):
   2257     pass
   2258 
   2259 
   2260 class HandleTests(test_utils.TestCase):
   2261 
   2262     def setUp(self):
   2263         super().setUp()
   2264         self.loop = mock.Mock()
   2265         self.loop.get_debug.return_value = True
   2266 
   2267     def test_handle(self):
   2268         def callback(*args):
   2269             return args
   2270 
   2271         args = ()
   2272         h = asyncio.Handle(callback, args, self.loop)
   2273         self.assertIs(h._callback, callback)
   2274         self.assertIs(h._args, args)
   2275         self.assertFalse(h._cancelled)
   2276 
   2277         h.cancel()
   2278         self.assertTrue(h._cancelled)
   2279 
   2280     def test_callback_with_exception(self):
   2281         def callback():
   2282             raise ValueError()
   2283 
   2284         self.loop = mock.Mock()
   2285         self.loop.call_exception_handler = mock.Mock()
   2286 
   2287         h = asyncio.Handle(callback, (), self.loop)
   2288         h._run()
   2289 
   2290         self.loop.call_exception_handler.assert_called_with({
   2291             'message': test_utils.MockPattern('Exception in callback.*'),
   2292             'exception': mock.ANY,
   2293             'handle': h,
   2294             'source_traceback': h._source_traceback,
   2295         })
   2296 
   2297     def test_handle_weakref(self):
   2298         wd = weakref.WeakValueDictionary()
   2299         h = asyncio.Handle(lambda: None, (), self.loop)
   2300         wd['h'] = h  # Would fail without __weakref__ slot.
   2301 
   2302     def test_handle_repr(self):
   2303         self.loop.get_debug.return_value = False
   2304 
   2305         # simple function
   2306         h = asyncio.Handle(noop, (1, 2), self.loop)
   2307         filename, lineno = test_utils.get_function_source(noop)
   2308         self.assertEqual(repr(h),
   2309                         '<Handle noop(1, 2) at %s:%s>'
   2310                         % (filename, lineno))
   2311 
   2312         # cancelled handle
   2313         h.cancel()
   2314         self.assertEqual(repr(h),
   2315                         '<Handle cancelled>')
   2316 
   2317         # decorated function
   2318         cb = asyncio.coroutine(noop)
   2319         h = asyncio.Handle(cb, (), self.loop)
   2320         self.assertEqual(repr(h),
   2321                         '<Handle noop() at %s:%s>'
   2322                         % (filename, lineno))
   2323 
   2324         # partial function
   2325         cb = functools.partial(noop, 1, 2)
   2326         h = asyncio.Handle(cb, (3,), self.loop)
   2327         regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
   2328                  % (re.escape(filename), lineno))
   2329         self.assertRegex(repr(h), regex)
   2330 
   2331         # partial function with keyword args
   2332         cb = functools.partial(noop, x=1)
   2333         h = asyncio.Handle(cb, (2, 3), self.loop)
   2334         regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
   2335                  % (re.escape(filename), lineno))
   2336         self.assertRegex(repr(h), regex)
   2337 
   2338         # partial method
   2339         if sys.version_info >= (3, 4):
   2340             method = HandleTests.test_handle_repr
   2341             cb = functools.partialmethod(method)
   2342             filename, lineno = test_utils.get_function_source(method)
   2343             h = asyncio.Handle(cb, (), self.loop)
   2344 
   2345             cb_regex = r'<function HandleTests.test_handle_repr .*>'
   2346             cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
   2347             regex = (r'^<Handle %s at %s:%s>$'
   2348                      % (cb_regex, re.escape(filename), lineno))
   2349             self.assertRegex(repr(h), regex)
   2350 
   2351     def test_handle_repr_debug(self):
   2352         self.loop.get_debug.return_value = True
   2353 
   2354         # simple function
   2355         create_filename = __file__
   2356         create_lineno = sys._getframe().f_lineno + 1
   2357         h = asyncio.Handle(noop, (1, 2), self.loop)
   2358         filename, lineno = test_utils.get_function_source(noop)
   2359         self.assertEqual(repr(h),
   2360                         '<Handle noop(1, 2) at %s:%s created at %s:%s>'
   2361                         % (filename, lineno, create_filename, create_lineno))
   2362 
   2363         # cancelled handle
   2364         h.cancel()
   2365         self.assertEqual(
   2366             repr(h),
   2367             '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
   2368             % (filename, lineno, create_filename, create_lineno))
   2369 
   2370         # double cancellation won't overwrite _repr
   2371         h.cancel()
   2372         self.assertEqual(
   2373             repr(h),
   2374             '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
   2375             % (filename, lineno, create_filename, create_lineno))
   2376 
   2377     def test_handle_source_traceback(self):
   2378         loop = asyncio.get_event_loop_policy().new_event_loop()
   2379         loop.set_debug(True)
   2380         self.set_event_loop(loop)
   2381 
   2382         def check_source_traceback(h):
   2383             lineno = sys._getframe(1).f_lineno - 1
   2384             self.assertIsInstance(h._source_traceback, list)
   2385             self.assertEqual(h._source_traceback[-1][:3],
   2386                              (__file__,
   2387                               lineno,
   2388                               'test_handle_source_traceback'))
   2389 
   2390         # call_soon
   2391         h = loop.call_soon(noop)
   2392         check_source_traceback(h)
   2393 
   2394         # call_soon_threadsafe
   2395         h = loop.call_soon_threadsafe(noop)
   2396         check_source_traceback(h)
   2397 
   2398         # call_later
   2399         h = loop.call_later(0, noop)
   2400         check_source_traceback(h)
   2401 
   2402         # call_at
   2403         h = loop.call_later(0, noop)
   2404         check_source_traceback(h)
   2405 
   2406     @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
   2407                          'No collections.abc.Coroutine')
   2408     def test_coroutine_like_object_debug_formatting(self):
   2409         # Test that asyncio can format coroutines that are instances of
   2410         # collections.abc.Coroutine, but lack cr_core or gi_code attributes
   2411         # (such as ones compiled with Cython).
   2412 
   2413         class Coro:
   2414             def send(self, v):
   2415                 pass
   2416 
   2417             def throw(self, *exc):
   2418                 pass
   2419 
   2420             def close(self):
   2421                 pass
   2422 
   2423             def __await__(self):
   2424                 pass
   2425 
   2426         coro = Coro()
   2427         coro.__name__ = 'AAA'
   2428         self.assertTrue(asyncio.iscoroutine(coro))
   2429         self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
   2430 
   2431         coro.__qualname__ = 'BBB'
   2432         self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
   2433 
   2434         coro.cr_running = True
   2435         self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
   2436 
   2437         coro = Coro()
   2438         # Some coroutines might not have '__name__', such as
   2439         # built-in async_gen.asend().
   2440         self.assertEqual(coroutines._format_coroutine(coro), 'Coro()')
   2441 
   2442 
   2443 class TimerTests(unittest.TestCase):
   2444 
   2445     def setUp(self):
   2446         super().setUp()
   2447         self.loop = mock.Mock()
   2448 
   2449     def test_hash(self):
   2450         when = time.monotonic()
   2451         h = asyncio.TimerHandle(when, lambda: False, (),
   2452                                 mock.Mock())
   2453         self.assertEqual(hash(h), hash(when))
   2454 
   2455     def test_timer(self):
   2456         def callback(*args):
   2457             return args
   2458 
   2459         args = (1, 2, 3)
   2460         when = time.monotonic()
   2461         h = asyncio.TimerHandle(when, callback, args, mock.Mock())
   2462         self.assertIs(h._callback, callback)
   2463         self.assertIs(h._args, args)
   2464         self.assertFalse(h._cancelled)
   2465 
   2466         # cancel
   2467         h.cancel()
   2468         self.assertTrue(h._cancelled)
   2469         self.assertIsNone(h._callback)
   2470         self.assertIsNone(h._args)
   2471 
   2472         # when cannot be None
   2473         self.assertRaises(AssertionError,
   2474                           asyncio.TimerHandle, None, callback, args,
   2475                           self.loop)
   2476 
   2477     def test_timer_repr(self):
   2478         self.loop.get_debug.return_value = False
   2479 
   2480         # simple function
   2481         h = asyncio.TimerHandle(123, noop, (), self.loop)
   2482         src = test_utils.get_function_source(noop)
   2483         self.assertEqual(repr(h),
   2484                         '<TimerHandle when=123 noop() at %s:%s>' % src)
   2485 
   2486         # cancelled handle
   2487         h.cancel()
   2488         self.assertEqual(repr(h),
   2489                         '<TimerHandle cancelled when=123>')
   2490 
   2491     def test_timer_repr_debug(self):
   2492         self.loop.get_debug.return_value = True
   2493 
   2494         # simple function
   2495         create_filename = __file__
   2496         create_lineno = sys._getframe().f_lineno + 1
   2497         h = asyncio.TimerHandle(123, noop, (), self.loop)
   2498         filename, lineno = test_utils.get_function_source(noop)
   2499         self.assertEqual(repr(h),
   2500                         '<TimerHandle when=123 noop() '
   2501                         'at %s:%s created at %s:%s>'
   2502                         % (filename, lineno, create_filename, create_lineno))
   2503 
   2504         # cancelled handle
   2505         h.cancel()
   2506         self.assertEqual(repr(h),
   2507                         '<TimerHandle cancelled when=123 noop() '
   2508                         'at %s:%s created at %s:%s>'
   2509                         % (filename, lineno, create_filename, create_lineno))
   2510 
   2511 
   2512     def test_timer_comparison(self):
   2513         def callback(*args):
   2514             return args
   2515 
   2516         when = time.monotonic()
   2517 
   2518         h1 = asyncio.TimerHandle(when, callback, (), self.loop)
   2519         h2 = asyncio.TimerHandle(when, callback, (), self.loop)
   2520         # TODO: Use assertLess etc.
   2521         self.assertFalse(h1 < h2)
   2522         self.assertFalse(h2 < h1)
   2523         self.assertTrue(h1 <= h2)
   2524         self.assertTrue(h2 <= h1)
   2525         self.assertFalse(h1 > h2)
   2526         self.assertFalse(h2 > h1)
   2527         self.assertTrue(h1 >= h2)
   2528         self.assertTrue(h2 >= h1)
   2529         self.assertTrue(h1 == h2)
   2530         self.assertFalse(h1 != h2)
   2531 
   2532         h2.cancel()
   2533         self.assertFalse(h1 == h2)
   2534 
   2535         h1 = asyncio.TimerHandle(when, callback, (), self.loop)
   2536         h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
   2537         self.assertTrue(h1 < h2)
   2538         self.assertFalse(h2 < h1)
   2539         self.assertTrue(h1 <= h2)
   2540         self.assertFalse(h2 <= h1)
   2541         self.assertFalse(h1 > h2)
   2542         self.assertTrue(h2 > h1)
   2543         self.assertFalse(h1 >= h2)
   2544         self.assertTrue(h2 >= h1)
   2545         self.assertFalse(h1 == h2)
   2546         self.assertTrue(h1 != h2)
   2547 
   2548         h3 = asyncio.Handle(callback, (), self.loop)
   2549         self.assertIs(NotImplemented, h1.__eq__(h3))
   2550         self.assertIs(NotImplemented, h1.__ne__(h3))
   2551 
   2552 
   2553 class AbstractEventLoopTests(unittest.TestCase):
   2554 
   2555     def test_not_implemented(self):
   2556         f = mock.Mock()
   2557         loop = asyncio.AbstractEventLoop()
   2558         self.assertRaises(
   2559             NotImplementedError, loop.run_forever)
   2560         self.assertRaises(
   2561             NotImplementedError, loop.run_until_complete, None)
   2562         self.assertRaises(
   2563             NotImplementedError, loop.stop)
   2564         self.assertRaises(
   2565             NotImplementedError, loop.is_running)
   2566         self.assertRaises(
   2567             NotImplementedError, loop.is_closed)
   2568         self.assertRaises(
   2569             NotImplementedError, loop.close)
   2570         self.assertRaises(
   2571             NotImplementedError, loop.create_task, None)
   2572         self.assertRaises(
   2573             NotImplementedError, loop.call_later, None, None)
   2574         self.assertRaises(
   2575             NotImplementedError, loop.call_at, f, f)
   2576         self.assertRaises(
   2577             NotImplementedError, loop.call_soon, None)
   2578         self.assertRaises(
   2579             NotImplementedError, loop.time)
   2580         self.assertRaises(
   2581             NotImplementedError, loop.call_soon_threadsafe, None)
   2582         self.assertRaises(
   2583             NotImplementedError, loop.run_in_executor, f, f)
   2584         self.assertRaises(
   2585             NotImplementedError, loop.set_default_executor, f)
   2586         self.assertRaises(
   2587             NotImplementedError, loop.getaddrinfo, 'localhost', 8080)
   2588         self.assertRaises(
   2589             NotImplementedError, loop.getnameinfo, ('localhost', 8080))
   2590         self.assertRaises(
   2591             NotImplementedError, loop.create_connection, f)
   2592         self.assertRaises(
   2593             NotImplementedError, loop.create_server, f)
   2594         self.assertRaises(
   2595             NotImplementedError, loop.create_datagram_endpoint, f)
   2596         self.assertRaises(
   2597             NotImplementedError, loop.add_reader, 1, f)
   2598         self.assertRaises(
   2599             NotImplementedError, loop.remove_reader, 1)
   2600         self.assertRaises(
   2601             NotImplementedError, loop.add_writer, 1, f)
   2602         self.assertRaises(
   2603             NotImplementedError, loop.remove_writer, 1)
   2604         self.assertRaises(
   2605             NotImplementedError, loop.sock_recv, f, 10)
   2606         self.assertRaises(
   2607             NotImplementedError, loop.sock_sendall, f, 10)
   2608         self.assertRaises(
   2609             NotImplementedError, loop.sock_connect, f, f)
   2610         self.assertRaises(
   2611             NotImplementedError, loop.sock_accept, f)
   2612         self.assertRaises(
   2613             NotImplementedError, loop.add_signal_handler, 1, f)
   2614         self.assertRaises(
   2615             NotImplementedError, loop.remove_signal_handler, 1)
   2616         self.assertRaises(
   2617             NotImplementedError, loop.remove_signal_handler, 1)
   2618         self.assertRaises(
   2619             NotImplementedError, loop.connect_read_pipe, f,
   2620             mock.sentinel.pipe)
   2621         self.assertRaises(
   2622             NotImplementedError, loop.connect_write_pipe, f,
   2623             mock.sentinel.pipe)
   2624         self.assertRaises(
   2625             NotImplementedError, loop.subprocess_shell, f,
   2626             mock.sentinel)
   2627         self.assertRaises(
   2628             NotImplementedError, loop.subprocess_exec, f)
   2629         self.assertRaises(
   2630             NotImplementedError, loop.set_exception_handler, f)
   2631         self.assertRaises(
   2632             NotImplementedError, loop.default_exception_handler, f)
   2633         self.assertRaises(
   2634             NotImplementedError, loop.call_exception_handler, f)
   2635         self.assertRaises(
   2636             NotImplementedError, loop.get_debug)
   2637         self.assertRaises(
   2638             NotImplementedError, loop.set_debug, f)
   2639 
   2640 
   2641 class ProtocolsAbsTests(unittest.TestCase):
   2642 
   2643     def test_empty(self):
   2644         f = mock.Mock()
   2645         p = asyncio.Protocol()
   2646         self.assertIsNone(p.connection_made(f))
   2647         self.assertIsNone(p.connection_lost(f))
   2648         self.assertIsNone(p.data_received(f))
   2649         self.assertIsNone(p.eof_received())
   2650 
   2651         dp = asyncio.DatagramProtocol()
   2652         self.assertIsNone(dp.connection_made(f))
   2653         self.assertIsNone(dp.connection_lost(f))
   2654         self.assertIsNone(dp.error_received(f))
   2655         self.assertIsNone(dp.datagram_received(f, f))
   2656 
   2657         sp = asyncio.SubprocessProtocol()
   2658         self.assertIsNone(sp.connection_made(f))
   2659         self.assertIsNone(sp.connection_lost(f))
   2660         self.assertIsNone(sp.pipe_data_received(1, f))
   2661         self.assertIsNone(sp.pipe_connection_lost(1, f))
   2662         self.assertIsNone(sp.process_exited())
   2663 
   2664 
   2665 class PolicyTests(unittest.TestCase):
   2666 
   2667     def test_event_loop_policy(self):
   2668         policy = asyncio.AbstractEventLoopPolicy()
   2669         self.assertRaises(NotImplementedError, policy.get_event_loop)
   2670         self.assertRaises(NotImplementedError, policy.set_event_loop, object())
   2671         self.assertRaises(NotImplementedError, policy.new_event_loop)
   2672         self.assertRaises(NotImplementedError, policy.get_child_watcher)
   2673         self.assertRaises(NotImplementedError, policy.set_child_watcher,
   2674                           object())
   2675 
   2676     def test_get_event_loop(self):
   2677         policy = asyncio.DefaultEventLoopPolicy()
   2678         self.assertIsNone(policy._local._loop)
   2679 
   2680         loop = policy.get_event_loop()
   2681         self.assertIsInstance(loop, asyncio.AbstractEventLoop)
   2682 
   2683         self.assertIs(policy._local._loop, loop)
   2684         self.assertIs(loop, policy.get_event_loop())
   2685         loop.close()
   2686 
   2687     def test_get_event_loop_calls_set_event_loop(self):
   2688         policy = asyncio.DefaultEventLoopPolicy()
   2689 
   2690         with mock.patch.object(
   2691                 policy, "set_event_loop",
   2692                 wraps=policy.set_event_loop) as m_set_event_loop:
   2693 
   2694             loop = policy.get_event_loop()
   2695 
   2696             # policy._local._loop must be set through .set_event_loop()
   2697             # (the unix DefaultEventLoopPolicy needs this call to attach
   2698             # the child watcher correctly)
   2699             m_set_event_loop.assert_called_with(loop)
   2700 
   2701         loop.close()
   2702 
   2703     def test_get_event_loop_after_set_none(self):
   2704         policy = asyncio.DefaultEventLoopPolicy()
   2705         policy.set_event_loop(None)
   2706         self.assertRaises(RuntimeError, policy.get_event_loop)
   2707 
   2708     @mock.patch('asyncio.events.threading.current_thread')
   2709     def test_get_event_loop_thread(self, m_current_thread):
   2710 
   2711         def f():
   2712             policy = asyncio.DefaultEventLoopPolicy()
   2713             self.assertRaises(RuntimeError, policy.get_event_loop)
   2714 
   2715         th = threading.Thread(target=f)
   2716         th.start()
   2717         th.join()
   2718 
   2719     def test_new_event_loop(self):
   2720         policy = asyncio.DefaultEventLoopPolicy()
   2721 
   2722         loop = policy.new_event_loop()
   2723         self.assertIsInstance(loop, asyncio.AbstractEventLoop)
   2724         loop.close()
   2725 
   2726     def test_set_event_loop(self):
   2727         policy = asyncio.DefaultEventLoopPolicy()
   2728         old_loop = policy.get_event_loop()
   2729 
   2730         self.assertRaises(AssertionError, policy.set_event_loop, object())
   2731 
   2732         loop = policy.new_event_loop()
   2733         policy.set_event_loop(loop)
   2734         self.assertIs(loop, policy.get_event_loop())
   2735         self.assertIsNot(old_loop, policy.get_event_loop())
   2736         loop.close()
   2737         old_loop.close()
   2738 
   2739     def test_get_event_loop_policy(self):
   2740         policy = asyncio.get_event_loop_policy()
   2741         self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
   2742         self.assertIs(policy, asyncio.get_event_loop_policy())
   2743 
   2744     def test_set_event_loop_policy(self):
   2745         self.assertRaises(
   2746             AssertionError, asyncio.set_event_loop_policy, object())
   2747 
   2748         old_policy = asyncio.get_event_loop_policy()
   2749 
   2750         policy = asyncio.DefaultEventLoopPolicy()
   2751         asyncio.set_event_loop_policy(policy)
   2752         self.assertIs(policy, asyncio.get_event_loop_policy())
   2753         self.assertIsNot(policy, old_policy)
   2754 
   2755     def test_get_event_loop_returns_running_loop(self):
   2756         class Policy(asyncio.DefaultEventLoopPolicy):
   2757             def get_event_loop(self):
   2758                 raise NotImplementedError
   2759 
   2760         loop = None
   2761 
   2762         old_policy = asyncio.get_event_loop_policy()
   2763         try:
   2764             asyncio.set_event_loop_policy(Policy())
   2765             loop = asyncio.new_event_loop()
   2766             self.assertIs(asyncio._get_running_loop(), None)
   2767 
   2768             async def func():
   2769                 self.assertIs(asyncio.get_event_loop(), loop)
   2770                 self.assertIs(asyncio._get_running_loop(), loop)
   2771 
   2772             loop.run_until_complete(func())
   2773         finally:
   2774             asyncio.set_event_loop_policy(old_policy)
   2775             if loop is not None:
   2776                 loop.close()
   2777 
   2778         self.assertIs(asyncio._get_running_loop(), None)
   2779 
   2780 
   2781 if __name__ == '__main__':
   2782     unittest.main()
   2783