Home | History | Annotate | Download | only in test_asyncio
      1 """Tests for proactor_events.py"""
      2 
      3 import io
      4 import socket
      5 import unittest
      6 import sys
      7 from unittest import mock
      8 
      9 import asyncio
     10 from asyncio import events
     11 from asyncio.proactor_events import BaseProactorEventLoop
     12 from asyncio.proactor_events import _ProactorSocketTransport
     13 from asyncio.proactor_events import _ProactorWritePipeTransport
     14 from asyncio.proactor_events import _ProactorDuplexPipeTransport
     15 from test import support
     16 from test.test_asyncio import utils as test_utils
     17 
     18 
     19 def close_transport(transport):
     20     # Don't call transport.close() because the event loop and the IOCP proactor
     21     # are mocked
     22     if transport._sock is None:
     23         return
     24     transport._sock.close()
     25     transport._sock = None
     26 
     27 
     28 class ProactorSocketTransportTests(test_utils.TestCase):
     29 
     30     def setUp(self):
     31         super().setUp()
     32         self.loop = self.new_test_loop()
     33         self.addCleanup(self.loop.close)
     34         self.proactor = mock.Mock()
     35         self.loop._proactor = self.proactor
     36         self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
     37         self.sock = mock.Mock(socket.socket)
     38 
     39     def socket_transport(self, waiter=None):
     40         transport = _ProactorSocketTransport(self.loop, self.sock,
     41                                              self.protocol, waiter=waiter)
     42         self.addCleanup(close_transport, transport)
     43         return transport
     44 
     45     def test_ctor(self):
     46         fut = asyncio.Future(loop=self.loop)
     47         tr = self.socket_transport(waiter=fut)
     48         test_utils.run_briefly(self.loop)
     49         self.assertIsNone(fut.result())
     50         self.protocol.connection_made(tr)
     51         self.proactor.recv.assert_called_with(self.sock, 32768)
     52 
     53     def test_loop_reading(self):
     54         tr = self.socket_transport()
     55         tr._loop_reading()
     56         self.loop._proactor.recv.assert_called_with(self.sock, 32768)
     57         self.assertFalse(self.protocol.data_received.called)
     58         self.assertFalse(self.protocol.eof_received.called)
     59 
     60     def test_loop_reading_data(self):
     61         res = asyncio.Future(loop=self.loop)
     62         res.set_result(b'data')
     63 
     64         tr = self.socket_transport()
     65         tr._read_fut = res
     66         tr._loop_reading(res)
     67         self.loop._proactor.recv.assert_called_with(self.sock, 32768)
     68         self.protocol.data_received.assert_called_with(b'data')
     69 
     70     def test_loop_reading_no_data(self):
     71         res = asyncio.Future(loop=self.loop)
     72         res.set_result(b'')
     73 
     74         tr = self.socket_transport()
     75         self.assertRaises(AssertionError, tr._loop_reading, res)
     76 
     77         tr.close = mock.Mock()
     78         tr._read_fut = res
     79         tr._loop_reading(res)
     80         self.assertFalse(self.loop._proactor.recv.called)
     81         self.assertTrue(self.protocol.eof_received.called)
     82         self.assertTrue(tr.close.called)
     83 
     84     def test_loop_reading_aborted(self):
     85         err = self.loop._proactor.recv.side_effect = ConnectionAbortedError()
     86 
     87         tr = self.socket_transport()
     88         tr._fatal_error = mock.Mock()
     89         tr._loop_reading()
     90         tr._fatal_error.assert_called_with(
     91                             err,
     92                             'Fatal read error on pipe transport')
     93 
     94     def test_loop_reading_aborted_closing(self):
     95         self.loop._proactor.recv.side_effect = ConnectionAbortedError()
     96 
     97         tr = self.socket_transport()
     98         tr._closing = True
     99         tr._fatal_error = mock.Mock()
    100         tr._loop_reading()
    101         self.assertFalse(tr._fatal_error.called)
    102 
    103     def test_loop_reading_aborted_is_fatal(self):
    104         self.loop._proactor.recv.side_effect = ConnectionAbortedError()
    105         tr = self.socket_transport()
    106         tr._closing = False
    107         tr._fatal_error = mock.Mock()
    108         tr._loop_reading()
    109         self.assertTrue(tr._fatal_error.called)
    110 
    111     def test_loop_reading_conn_reset_lost(self):
    112         err = self.loop._proactor.recv.side_effect = ConnectionResetError()
    113 
    114         tr = self.socket_transport()
    115         tr._closing = False
    116         tr._fatal_error = mock.Mock()
    117         tr._force_close = mock.Mock()
    118         tr._loop_reading()
    119         self.assertFalse(tr._fatal_error.called)
    120         tr._force_close.assert_called_with(err)
    121 
    122     def test_loop_reading_exception(self):
    123         err = self.loop._proactor.recv.side_effect = (OSError())
    124 
    125         tr = self.socket_transport()
    126         tr._fatal_error = mock.Mock()
    127         tr._loop_reading()
    128         tr._fatal_error.assert_called_with(
    129                             err,
    130                             'Fatal read error on pipe transport')
    131 
    132     def test_write(self):
    133         tr = self.socket_transport()
    134         tr._loop_writing = mock.Mock()
    135         tr.write(b'data')
    136         self.assertEqual(tr._buffer, None)
    137         tr._loop_writing.assert_called_with(data=b'data')
    138 
    139     def test_write_no_data(self):
    140         tr = self.socket_transport()
    141         tr.write(b'')
    142         self.assertFalse(tr._buffer)
    143 
    144     def test_write_more(self):
    145         tr = self.socket_transport()
    146         tr._write_fut = mock.Mock()
    147         tr._loop_writing = mock.Mock()
    148         tr.write(b'data')
    149         self.assertEqual(tr._buffer, b'data')
    150         self.assertFalse(tr._loop_writing.called)
    151 
    152     def test_loop_writing(self):
    153         tr = self.socket_transport()
    154         tr._buffer = bytearray(b'data')
    155         tr._loop_writing()
    156         self.loop._proactor.send.assert_called_with(self.sock, b'data')
    157         self.loop._proactor.send.return_value.add_done_callback.\
    158             assert_called_with(tr._loop_writing)
    159 
    160     @mock.patch('asyncio.proactor_events.logger')
    161     def test_loop_writing_err(self, m_log):
    162         err = self.loop._proactor.send.side_effect = OSError()
    163         tr = self.socket_transport()
    164         tr._fatal_error = mock.Mock()
    165         tr._buffer = [b'da', b'ta']
    166         tr._loop_writing()
    167         tr._fatal_error.assert_called_with(
    168                             err,
    169                             'Fatal write error on pipe transport')
    170         tr._conn_lost = 1
    171 
    172         tr.write(b'data')
    173         tr.write(b'data')
    174         tr.write(b'data')
    175         tr.write(b'data')
    176         tr.write(b'data')
    177         self.assertEqual(tr._buffer, None)
    178         m_log.warning.assert_called_with('socket.send() raised exception.')
    179 
    180     def test_loop_writing_stop(self):
    181         fut = asyncio.Future(loop=self.loop)
    182         fut.set_result(b'data')
    183 
    184         tr = self.socket_transport()
    185         tr._write_fut = fut
    186         tr._loop_writing(fut)
    187         self.assertIsNone(tr._write_fut)
    188 
    189     def test_loop_writing_closing(self):
    190         fut = asyncio.Future(loop=self.loop)
    191         fut.set_result(1)
    192 
    193         tr = self.socket_transport()
    194         tr._write_fut = fut
    195         tr.close()
    196         tr._loop_writing(fut)
    197         self.assertIsNone(tr._write_fut)
    198         test_utils.run_briefly(self.loop)
    199         self.protocol.connection_lost.assert_called_with(None)
    200 
    201     def test_abort(self):
    202         tr = self.socket_transport()
    203         tr._force_close = mock.Mock()
    204         tr.abort()
    205         tr._force_close.assert_called_with(None)
    206 
    207     def test_close(self):
    208         tr = self.socket_transport()
    209         tr.close()
    210         test_utils.run_briefly(self.loop)
    211         self.protocol.connection_lost.assert_called_with(None)
    212         self.assertTrue(tr.is_closing())
    213         self.assertEqual(tr._conn_lost, 1)
    214 
    215         self.protocol.connection_lost.reset_mock()
    216         tr.close()
    217         test_utils.run_briefly(self.loop)
    218         self.assertFalse(self.protocol.connection_lost.called)
    219 
    220     def test_close_write_fut(self):
    221         tr = self.socket_transport()
    222         tr._write_fut = mock.Mock()
    223         tr.close()
    224         test_utils.run_briefly(self.loop)
    225         self.assertFalse(self.protocol.connection_lost.called)
    226 
    227     def test_close_buffer(self):
    228         tr = self.socket_transport()
    229         tr._buffer = [b'data']
    230         tr.close()
    231         test_utils.run_briefly(self.loop)
    232         self.assertFalse(self.protocol.connection_lost.called)
    233 
    234     @mock.patch('asyncio.base_events.logger')
    235     def test_fatal_error(self, m_logging):
    236         tr = self.socket_transport()
    237         tr._force_close = mock.Mock()
    238         tr._fatal_error(None)
    239         self.assertTrue(tr._force_close.called)
    240         self.assertTrue(m_logging.error.called)
    241 
    242     def test_force_close(self):
    243         tr = self.socket_transport()
    244         tr._buffer = [b'data']
    245         read_fut = tr._read_fut = mock.Mock()
    246         write_fut = tr._write_fut = mock.Mock()
    247         tr._force_close(None)
    248 
    249         read_fut.cancel.assert_called_with()
    250         write_fut.cancel.assert_called_with()
    251         test_utils.run_briefly(self.loop)
    252         self.protocol.connection_lost.assert_called_with(None)
    253         self.assertEqual(None, tr._buffer)
    254         self.assertEqual(tr._conn_lost, 1)
    255 
    256     def test_loop_writing_force_close(self):
    257         exc_handler = mock.Mock()
    258         self.loop.set_exception_handler(exc_handler)
    259         fut = asyncio.Future(loop=self.loop)
    260         fut.set_result(1)
    261         self.proactor.send.return_value = fut
    262 
    263         tr = self.socket_transport()
    264         tr.write(b'data')
    265         tr._force_close(None)
    266         test_utils.run_briefly(self.loop)
    267         exc_handler.assert_not_called()
    268 
    269     def test_force_close_idempotent(self):
    270         tr = self.socket_transport()
    271         tr._closing = True
    272         tr._force_close(None)
    273         test_utils.run_briefly(self.loop)
    274         self.assertFalse(self.protocol.connection_lost.called)
    275 
    276     def test_fatal_error_2(self):
    277         tr = self.socket_transport()
    278         tr._buffer = [b'data']
    279         tr._force_close(None)
    280 
    281         test_utils.run_briefly(self.loop)
    282         self.protocol.connection_lost.assert_called_with(None)
    283         self.assertEqual(None, tr._buffer)
    284 
    285     def test_call_connection_lost(self):
    286         tr = self.socket_transport()
    287         tr._call_connection_lost(None)
    288         self.assertTrue(self.protocol.connection_lost.called)
    289         self.assertTrue(self.sock.close.called)
    290 
    291     def test_write_eof(self):
    292         tr = self.socket_transport()
    293         self.assertTrue(tr.can_write_eof())
    294         tr.write_eof()
    295         self.sock.shutdown.assert_called_with(socket.SHUT_WR)
    296         tr.write_eof()
    297         self.assertEqual(self.sock.shutdown.call_count, 1)
    298         tr.close()
    299 
    300     def test_write_eof_buffer(self):
    301         tr = self.socket_transport()
    302         f = asyncio.Future(loop=self.loop)
    303         tr._loop._proactor.send.return_value = f
    304         tr.write(b'data')
    305         tr.write_eof()
    306         self.assertTrue(tr._eof_written)
    307         self.assertFalse(self.sock.shutdown.called)
    308         tr._loop._proactor.send.assert_called_with(self.sock, b'data')
    309         f.set_result(4)
    310         self.loop._run_once()
    311         self.sock.shutdown.assert_called_with(socket.SHUT_WR)
    312         tr.close()
    313 
    314     def test_write_eof_write_pipe(self):
    315         tr = _ProactorWritePipeTransport(
    316             self.loop, self.sock, self.protocol)
    317         self.assertTrue(tr.can_write_eof())
    318         tr.write_eof()
    319         self.assertTrue(tr.is_closing())
    320         self.loop._run_once()
    321         self.assertTrue(self.sock.close.called)
    322         tr.close()
    323 
    324     def test_write_eof_buffer_write_pipe(self):
    325         tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
    326         f = asyncio.Future(loop=self.loop)
    327         tr._loop._proactor.send.return_value = f
    328         tr.write(b'data')
    329         tr.write_eof()
    330         self.assertTrue(tr.is_closing())
    331         self.assertFalse(self.sock.shutdown.called)
    332         tr._loop._proactor.send.assert_called_with(self.sock, b'data')
    333         f.set_result(4)
    334         self.loop._run_once()
    335         self.loop._run_once()
    336         self.assertTrue(self.sock.close.called)
    337         tr.close()
    338 
    339     def test_write_eof_duplex_pipe(self):
    340         tr = _ProactorDuplexPipeTransport(
    341             self.loop, self.sock, self.protocol)
    342         self.assertFalse(tr.can_write_eof())
    343         with self.assertRaises(NotImplementedError):
    344             tr.write_eof()
    345         close_transport(tr)
    346 
    347     def test_pause_resume_reading(self):
    348         tr = self.socket_transport()
    349         futures = []
    350         for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']:
    351             f = asyncio.Future(loop=self.loop)
    352             f.set_result(msg)
    353             futures.append(f)
    354 
    355         self.loop._proactor.recv.side_effect = futures
    356         self.loop._run_once()
    357         self.assertFalse(tr._paused)
    358         self.assertTrue(tr.is_reading())
    359         self.loop._run_once()
    360         self.protocol.data_received.assert_called_with(b'data1')
    361         self.loop._run_once()
    362         self.protocol.data_received.assert_called_with(b'data2')
    363 
    364         tr.pause_reading()
    365         tr.pause_reading()
    366         self.assertTrue(tr._paused)
    367         self.assertFalse(tr.is_reading())
    368         for i in range(10):
    369             self.loop._run_once()
    370         self.protocol.data_received.assert_called_with(b'data2')
    371 
    372         tr.resume_reading()
    373         tr.resume_reading()
    374         self.assertFalse(tr._paused)
    375         self.assertTrue(tr.is_reading())
    376         self.loop._run_once()
    377         self.protocol.data_received.assert_called_with(b'data3')
    378         self.loop._run_once()
    379         self.protocol.data_received.assert_called_with(b'data4')
    380 
    381         tr.pause_reading()
    382         tr.resume_reading()
    383         self.loop.call_exception_handler = mock.Mock()
    384         self.loop._run_once()
    385         self.loop.call_exception_handler.assert_not_called()
    386         self.protocol.data_received.assert_called_with(b'data5')
    387         tr.close()
    388 
    389         self.assertFalse(tr.is_reading())
    390 
    391 
    392     def pause_writing_transport(self, high):
    393         tr = self.socket_transport()
    394         tr.set_write_buffer_limits(high=high)
    395 
    396         self.assertEqual(tr.get_write_buffer_size(), 0)
    397         self.assertFalse(self.protocol.pause_writing.called)
    398         self.assertFalse(self.protocol.resume_writing.called)
    399         return tr
    400 
    401     def test_pause_resume_writing(self):
    402         tr = self.pause_writing_transport(high=4)
    403 
    404         # write a large chunk, must pause writing
    405         fut = asyncio.Future(loop=self.loop)
    406         self.loop._proactor.send.return_value = fut
    407         tr.write(b'large data')
    408         self.loop._run_once()
    409         self.assertTrue(self.protocol.pause_writing.called)
    410 
    411         # flush the buffer
    412         fut.set_result(None)
    413         self.loop._run_once()
    414         self.assertEqual(tr.get_write_buffer_size(), 0)
    415         self.assertTrue(self.protocol.resume_writing.called)
    416 
    417     def test_pause_writing_2write(self):
    418         tr = self.pause_writing_transport(high=4)
    419 
    420         # first short write, the buffer is not full (3 <= 4)
    421         fut1 = asyncio.Future(loop=self.loop)
    422         self.loop._proactor.send.return_value = fut1
    423         tr.write(b'123')
    424         self.loop._run_once()
    425         self.assertEqual(tr.get_write_buffer_size(), 3)
    426         self.assertFalse(self.protocol.pause_writing.called)
    427 
    428         # fill the buffer, must pause writing (6 > 4)
    429         tr.write(b'abc')
    430         self.loop._run_once()
    431         self.assertEqual(tr.get_write_buffer_size(), 6)
    432         self.assertTrue(self.protocol.pause_writing.called)
    433 
    434     def test_pause_writing_3write(self):
    435         tr = self.pause_writing_transport(high=4)
    436 
    437         # first short write, the buffer is not full (1 <= 4)
    438         fut = asyncio.Future(loop=self.loop)
    439         self.loop._proactor.send.return_value = fut
    440         tr.write(b'1')
    441         self.loop._run_once()
    442         self.assertEqual(tr.get_write_buffer_size(), 1)
    443         self.assertFalse(self.protocol.pause_writing.called)
    444 
    445         # second short write, the buffer is not full (3 <= 4)
    446         tr.write(b'23')
    447         self.loop._run_once()
    448         self.assertEqual(tr.get_write_buffer_size(), 3)
    449         self.assertFalse(self.protocol.pause_writing.called)
    450 
    451         # fill the buffer, must pause writing (6 > 4)
    452         tr.write(b'abc')
    453         self.loop._run_once()
    454         self.assertEqual(tr.get_write_buffer_size(), 6)
    455         self.assertTrue(self.protocol.pause_writing.called)
    456 
    457     def test_dont_pause_writing(self):
    458         tr = self.pause_writing_transport(high=4)
    459 
    460         # write a large chunk which completes immediately,
    461         # it should not pause writing
    462         fut = asyncio.Future(loop=self.loop)
    463         fut.set_result(None)
    464         self.loop._proactor.send.return_value = fut
    465         tr.write(b'very large data')
    466         self.loop._run_once()
    467         self.assertEqual(tr.get_write_buffer_size(), 0)
    468         self.assertFalse(self.protocol.pause_writing.called)
    469 
    470 
    471 @unittest.skip('FIXME: bpo-33694: these tests are too close '
    472                'to the implementation and should be refactored or removed')
    473 class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):
    474 
    475     def setUp(self):
    476         super().setUp()
    477         self.loop = self.new_test_loop()
    478         self.addCleanup(self.loop.close)
    479         self.proactor = mock.Mock()
    480         self.loop._proactor = self.proactor
    481 
    482         self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
    483         self.buf = bytearray(1)
    484         self.protocol.get_buffer.side_effect = lambda hint: self.buf
    485 
    486         self.sock = mock.Mock(socket.socket)
    487 
    488     def socket_transport(self, waiter=None):
    489         transport = _ProactorSocketTransport(self.loop, self.sock,
    490                                              self.protocol, waiter=waiter)
    491         self.addCleanup(close_transport, transport)
    492         return transport
    493 
    494     def test_ctor(self):
    495         fut = asyncio.Future(loop=self.loop)
    496         tr = self.socket_transport(waiter=fut)
    497         test_utils.run_briefly(self.loop)
    498         self.assertIsNone(fut.result())
    499         self.protocol.connection_made(tr)
    500         self.proactor.recv_into.assert_called_with(self.sock, self.buf)
    501 
    502     def test_loop_reading(self):
    503         tr = self.socket_transport()
    504         tr._loop_reading()
    505         self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf)
    506         self.assertTrue(self.protocol.get_buffer.called)
    507         self.assertFalse(self.protocol.buffer_updated.called)
    508         self.assertFalse(self.protocol.eof_received.called)
    509 
    510     def test_get_buffer_error(self):
    511         transport = self.socket_transport()
    512         transport._fatal_error = mock.Mock()
    513 
    514         self.loop.call_exception_handler = mock.Mock()
    515         self.protocol.get_buffer.side_effect = LookupError()
    516 
    517         transport._loop_reading()
    518 
    519         self.assertTrue(transport._fatal_error.called)
    520         self.assertTrue(self.protocol.get_buffer.called)
    521         self.assertFalse(self.protocol.buffer_updated.called)
    522 
    523     def test_get_buffer_zerosized(self):
    524         transport = self.socket_transport()
    525         transport._fatal_error = mock.Mock()
    526 
    527         self.loop.call_exception_handler = mock.Mock()
    528         self.protocol.get_buffer.side_effect = lambda hint: bytearray(0)
    529 
    530         transport._loop_reading()
    531 
    532         self.assertTrue(transport._fatal_error.called)
    533         self.assertTrue(self.protocol.get_buffer.called)
    534         self.assertFalse(self.protocol.buffer_updated.called)
    535 
    536     def test_proto_type_switch(self):
    537         self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
    538         tr = self.socket_transport()
    539 
    540         res = asyncio.Future(loop=self.loop)
    541         res.set_result(b'data')
    542 
    543         tr = self.socket_transport()
    544         tr._read_fut = res
    545         tr._loop_reading(res)
    546         self.loop._proactor.recv.assert_called_with(self.sock, 32768)
    547         self.protocol.data_received.assert_called_with(b'data')
    548 
    549         # switch protocol to a BufferedProtocol
    550 
    551         buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
    552         buf = bytearray(4)
    553         buf_proto.get_buffer.side_effect = lambda hint: buf
    554 
    555         tr.set_protocol(buf_proto)
    556         test_utils.run_briefly(self.loop)
    557         res = asyncio.Future(loop=self.loop)
    558         res.set_result(4)
    559 
    560         tr._read_fut = res
    561         tr._loop_reading(res)
    562         self.loop._proactor.recv_into.assert_called_with(self.sock, buf)
    563         buf_proto.buffer_updated.assert_called_with(4)
    564 
    565     @unittest.skip('FIXME: bpo-33694: this test is too close to the '
    566                    'implementation and should be refactored or removed')
    567     def test_proto_buf_switch(self):
    568         tr = self.socket_transport()
    569         test_utils.run_briefly(self.loop)
    570         self.protocol.get_buffer.assert_called_with(-1)
    571 
    572         # switch protocol to *another* BufferedProtocol
    573 
    574         buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
    575         buf = bytearray(4)
    576         buf_proto.get_buffer.side_effect = lambda hint: buf
    577         tr._read_fut.done.side_effect = lambda: False
    578         tr.set_protocol(buf_proto)
    579         self.assertFalse(buf_proto.get_buffer.called)
    580         test_utils.run_briefly(self.loop)
    581         buf_proto.get_buffer.assert_called_with(-1)
    582 
    583     def test_buffer_updated_error(self):
    584         transport = self.socket_transport()
    585         transport._fatal_error = mock.Mock()
    586 
    587         self.loop.call_exception_handler = mock.Mock()
    588         self.protocol.buffer_updated.side_effect = LookupError()
    589 
    590         res = asyncio.Future(loop=self.loop)
    591         res.set_result(10)
    592         transport._read_fut = res
    593         transport._loop_reading(res)
    594 
    595         self.assertTrue(transport._fatal_error.called)
    596         self.assertFalse(self.protocol.get_buffer.called)
    597         self.assertTrue(self.protocol.buffer_updated.called)
    598 
    599     def test_loop_eof_received_error(self):
    600         res = asyncio.Future(loop=self.loop)
    601         res.set_result(0)
    602 
    603         self.protocol.eof_received.side_effect = LookupError()
    604 
    605         tr = self.socket_transport()
    606         tr._fatal_error = mock.Mock()
    607 
    608         tr.close = mock.Mock()
    609         tr._read_fut = res
    610         tr._loop_reading(res)
    611         self.assertFalse(self.loop._proactor.recv_into.called)
    612         self.assertTrue(self.protocol.eof_received.called)
    613         self.assertTrue(tr._fatal_error.called)
    614 
    615     def test_loop_reading_data(self):
    616         res = asyncio.Future(loop=self.loop)
    617         res.set_result(4)
    618 
    619         tr = self.socket_transport()
    620         tr._read_fut = res
    621         tr._loop_reading(res)
    622         self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf)
    623         self.protocol.buffer_updated.assert_called_with(4)
    624 
    625     def test_loop_reading_no_data(self):
    626         res = asyncio.Future(loop=self.loop)
    627         res.set_result(0)
    628 
    629         tr = self.socket_transport()
    630         self.assertRaises(AssertionError, tr._loop_reading, res)
    631 
    632         tr.close = mock.Mock()
    633         tr._read_fut = res
    634         tr._loop_reading(res)
    635         self.assertFalse(self.loop._proactor.recv_into.called)
    636         self.assertTrue(self.protocol.eof_received.called)
    637         self.assertTrue(tr.close.called)
    638 
    639     def test_loop_reading_aborted(self):
    640         err = self.loop._proactor.recv_into.side_effect = \
    641             ConnectionAbortedError()
    642 
    643         tr = self.socket_transport()
    644         tr._fatal_error = mock.Mock()
    645         tr._loop_reading()
    646         tr._fatal_error.assert_called_with(
    647             err, 'Fatal read error on pipe transport')
    648 
    649     def test_loop_reading_aborted_closing(self):
    650         self.loop._proactor.recv.side_effect = ConnectionAbortedError()
    651 
    652         tr = self.socket_transport()
    653         tr._closing = True
    654         tr._fatal_error = mock.Mock()
    655         tr._loop_reading()
    656         self.assertFalse(tr._fatal_error.called)
    657 
    658     def test_loop_reading_aborted_is_fatal(self):
    659         self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
    660         tr = self.socket_transport()
    661         tr._closing = False
    662         tr._fatal_error = mock.Mock()
    663         tr._loop_reading()
    664         self.assertTrue(tr._fatal_error.called)
    665 
    666     def test_loop_reading_conn_reset_lost(self):
    667         err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
    668 
    669         tr = self.socket_transport()
    670         tr._closing = False
    671         tr._fatal_error = mock.Mock()
    672         tr._force_close = mock.Mock()
    673         tr._loop_reading()
    674         self.assertFalse(tr._fatal_error.called)
    675         tr._force_close.assert_called_with(err)
    676 
    677     def test_loop_reading_exception(self):
    678         err = self.loop._proactor.recv_into.side_effect = OSError()
    679 
    680         tr = self.socket_transport()
    681         tr._fatal_error = mock.Mock()
    682         tr._loop_reading()
    683         tr._fatal_error.assert_called_with(
    684             err, 'Fatal read error on pipe transport')
    685 
    686     def test_pause_resume_reading(self):
    687         tr = self.socket_transport()
    688         futures = []
    689         for msg in [10, 20, 30, 40, 0]:
    690             f = asyncio.Future(loop=self.loop)
    691             f.set_result(msg)
    692             futures.append(f)
    693 
    694         self.loop._proactor.recv_into.side_effect = futures
    695         self.loop._run_once()
    696         self.assertFalse(tr._paused)
    697         self.assertTrue(tr.is_reading())
    698         self.loop._run_once()
    699         self.protocol.buffer_updated.assert_called_with(10)
    700         self.loop._run_once()
    701         self.protocol.buffer_updated.assert_called_with(20)
    702 
    703         tr.pause_reading()
    704         tr.pause_reading()
    705         self.assertTrue(tr._paused)
    706         self.assertFalse(tr.is_reading())
    707         for i in range(10):
    708             self.loop._run_once()
    709         self.protocol.buffer_updated.assert_called_with(20)
    710 
    711         tr.resume_reading()
    712         tr.resume_reading()
    713         self.assertFalse(tr._paused)
    714         self.assertTrue(tr.is_reading())
    715         self.loop._run_once()
    716         self.protocol.buffer_updated.assert_called_with(30)
    717         self.loop._run_once()
    718         self.protocol.buffer_updated.assert_called_with(40)
    719         tr.close()
    720 
    721         self.assertFalse(tr.is_reading())
    722 
    723 
    724 class BaseProactorEventLoopTests(test_utils.TestCase):
    725 
    726     def setUp(self):
    727         super().setUp()
    728 
    729         self.sock = test_utils.mock_nonblocking_socket()
    730         self.proactor = mock.Mock()
    731 
    732         self.ssock, self.csock = mock.Mock(), mock.Mock()
    733 
    734         with mock.patch('asyncio.proactor_events.socket.socketpair',
    735                         return_value=(self.ssock, self.csock)):
    736             self.loop = BaseProactorEventLoop(self.proactor)
    737         self.set_event_loop(self.loop)
    738 
    739     @mock.patch.object(BaseProactorEventLoop, 'call_soon')
    740     @mock.patch('asyncio.proactor_events.socket.socketpair')
    741     def test_ctor(self, socketpair, call_soon):
    742         ssock, csock = socketpair.return_value = (
    743             mock.Mock(), mock.Mock())
    744         loop = BaseProactorEventLoop(self.proactor)
    745         self.assertIs(loop._ssock, ssock)
    746         self.assertIs(loop._csock, csock)
    747         self.assertEqual(loop._internal_fds, 1)
    748         call_soon.assert_called_with(loop._loop_self_reading)
    749         loop.close()
    750 
    751     def test_close_self_pipe(self):
    752         self.loop._close_self_pipe()
    753         self.assertEqual(self.loop._internal_fds, 0)
    754         self.assertTrue(self.ssock.close.called)
    755         self.assertTrue(self.csock.close.called)
    756         self.assertIsNone(self.loop._ssock)
    757         self.assertIsNone(self.loop._csock)
    758 
    759         # Don't call close(): _close_self_pipe() cannot be called twice
    760         self.loop._closed = True
    761 
    762     def test_close(self):
    763         self.loop._close_self_pipe = mock.Mock()
    764         self.loop.close()
    765         self.assertTrue(self.loop._close_self_pipe.called)
    766         self.assertTrue(self.proactor.close.called)
    767         self.assertIsNone(self.loop._proactor)
    768 
    769         self.loop._close_self_pipe.reset_mock()
    770         self.loop.close()
    771         self.assertFalse(self.loop._close_self_pipe.called)
    772 
    773     def test_make_socket_transport(self):
    774         tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
    775         self.assertIsInstance(tr, _ProactorSocketTransport)
    776         close_transport(tr)
    777 
    778     def test_loop_self_reading(self):
    779         self.loop._loop_self_reading()
    780         self.proactor.recv.assert_called_with(self.ssock, 4096)
    781         self.proactor.recv.return_value.add_done_callback.assert_called_with(
    782             self.loop._loop_self_reading)
    783 
    784     def test_loop_self_reading_fut(self):
    785         fut = mock.Mock()
    786         self.loop._loop_self_reading(fut)
    787         self.assertTrue(fut.result.called)
    788         self.proactor.recv.assert_called_with(self.ssock, 4096)
    789         self.proactor.recv.return_value.add_done_callback.assert_called_with(
    790             self.loop._loop_self_reading)
    791 
    792     def test_loop_self_reading_exception(self):
    793         self.loop.call_exception_handler = mock.Mock()
    794         self.proactor.recv.side_effect = OSError()
    795         self.loop._loop_self_reading()
    796         self.assertTrue(self.loop.call_exception_handler.called)
    797 
    798     def test_write_to_self(self):
    799         self.loop._write_to_self()
    800         self.csock.send.assert_called_with(b'\0')
    801 
    802     def test_process_events(self):
    803         self.loop._process_events([])
    804 
    805     @mock.patch('asyncio.base_events.logger')
    806     def test_create_server(self, m_log):
    807         pf = mock.Mock()
    808         call_soon = self.loop.call_soon = mock.Mock()
    809 
    810         self.loop._start_serving(pf, self.sock)
    811         self.assertTrue(call_soon.called)
    812 
    813         # callback
    814         loop = call_soon.call_args[0][0]
    815         loop()
    816         self.proactor.accept.assert_called_with(self.sock)
    817 
    818         # conn
    819         fut = mock.Mock()
    820         fut.result.return_value = (mock.Mock(), mock.Mock())
    821 
    822         make_tr = self.loop._make_socket_transport = mock.Mock()
    823         loop(fut)
    824         self.assertTrue(fut.result.called)
    825         self.assertTrue(make_tr.called)
    826 
    827         # exception
    828         fut.result.side_effect = OSError()
    829         loop(fut)
    830         self.assertTrue(self.sock.close.called)
    831         self.assertTrue(m_log.error.called)
    832 
    833     def test_create_server_cancel(self):
    834         pf = mock.Mock()
    835         call_soon = self.loop.call_soon = mock.Mock()
    836 
    837         self.loop._start_serving(pf, self.sock)
    838         loop = call_soon.call_args[0][0]
    839 
    840         # cancelled
    841         fut = asyncio.Future(loop=self.loop)
    842         fut.cancel()
    843         loop(fut)
    844         self.assertTrue(self.sock.close.called)
    845 
    846     def test_stop_serving(self):
    847         sock1 = mock.Mock()
    848         future1 = mock.Mock()
    849         sock2 = mock.Mock()
    850         future2 = mock.Mock()
    851         self.loop._accept_futures = {
    852             sock1.fileno(): future1,
    853             sock2.fileno(): future2
    854         }
    855 
    856         self.loop._stop_serving(sock1)
    857         self.assertTrue(sock1.close.called)
    858         self.assertTrue(future1.cancel.called)
    859         self.proactor._stop_serving.assert_called_with(sock1)
    860         self.assertFalse(sock2.close.called)
    861         self.assertFalse(future2.cancel.called)
    862 
    863 
    864 @unittest.skipIf(sys.platform != 'win32',
    865                  'Proactor is supported on Windows only')
    866 class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
    867     DATA = b"12345abcde" * 16 * 1024  # 160 KiB
    868 
    869     class MyProto(asyncio.Protocol):
    870 
    871         def __init__(self, loop):
    872             self.started = False
    873             self.closed = False
    874             self.data = bytearray()
    875             self.fut = loop.create_future()
    876             self.transport = None
    877 
    878         def connection_made(self, transport):
    879             self.started = True
    880             self.transport = transport
    881 
    882         def data_received(self, data):
    883             self.data.extend(data)
    884 
    885         def connection_lost(self, exc):
    886             self.closed = True
    887             self.fut.set_result(None)
    888 
    889         async def wait_closed(self):
    890             await self.fut
    891 
    892     @classmethod
    893     def setUpClass(cls):
    894         with open(support.TESTFN, 'wb') as fp:
    895             fp.write(cls.DATA)
    896         super().setUpClass()
    897 
    898     @classmethod
    899     def tearDownClass(cls):
    900         support.unlink(support.TESTFN)
    901         super().tearDownClass()
    902 
    903     def setUp(self):
    904         self.loop = asyncio.ProactorEventLoop()
    905         self.set_event_loop(self.loop)
    906         self.addCleanup(self.loop.close)
    907         self.file = open(support.TESTFN, 'rb')
    908         self.addCleanup(self.file.close)
    909         super().setUp()
    910 
    911     def make_socket(self, cleanup=True):
    912         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    913         sock.setblocking(False)
    914         sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
    915         sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
    916         if cleanup:
    917             self.addCleanup(sock.close)
    918         return sock
    919 
    920     def run_loop(self, coro):
    921         return self.loop.run_until_complete(coro)
    922 
    923     def prepare(self):
    924         sock = self.make_socket()
    925         proto = self.MyProto(self.loop)
    926         port = support.find_unused_port()
    927         srv_sock = self.make_socket(cleanup=False)
    928         srv_sock.bind(('127.0.0.1', port))
    929         server = self.run_loop(self.loop.create_server(
    930             lambda: proto, sock=srv_sock))
    931         self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))
    932 
    933         def cleanup():
    934             if proto.transport is not None:
    935                 # can be None if the task was cancelled before
    936                 # connection_made callback
    937                 proto.transport.close()
    938                 self.run_loop(proto.wait_closed())
    939 
    940             server.close()
    941             self.run_loop(server.wait_closed())
    942 
    943         self.addCleanup(cleanup)
    944 
    945         return sock, proto
    946 
    947     def test_sock_sendfile_not_a_file(self):
    948         sock, proto = self.prepare()
    949         f = object()
    950         with self.assertRaisesRegex(events.SendfileNotAvailableError,
    951                                     "not a regular file"):
    952             self.run_loop(self.loop._sock_sendfile_native(sock, f,
    953                                                           0, None))
    954         self.assertEqual(self.file.tell(), 0)
    955 
    956     def test_sock_sendfile_iobuffer(self):
    957         sock, proto = self.prepare()
    958         f = io.BytesIO()
    959         with self.assertRaisesRegex(events.SendfileNotAvailableError,
    960                                     "not a regular file"):
    961             self.run_loop(self.loop._sock_sendfile_native(sock, f,
    962                                                           0, None))
    963         self.assertEqual(self.file.tell(), 0)
    964 
    965     def test_sock_sendfile_not_regular_file(self):
    966         sock, proto = self.prepare()
    967         f = mock.Mock()
    968         f.fileno.return_value = -1
    969         with self.assertRaisesRegex(events.SendfileNotAvailableError,
    970                                     "not a regular file"):
    971             self.run_loop(self.loop._sock_sendfile_native(sock, f,
    972                                                           0, None))
    973         self.assertEqual(self.file.tell(), 0)
    974 
    975 
    976 if __name__ == '__main__':
    977     unittest.main()
    978