Home | History | Annotate | Download | only in test_asyncio
      1 """Tests for unix_events.py."""
      2 
      3 import collections
      4 import errno
      5 import io
      6 import os
      7 import pathlib
      8 import signal
      9 import socket
     10 import stat
     11 import sys
     12 import tempfile
     13 import threading
     14 import unittest
     15 import warnings
     16 from unittest import mock
     17 
     18 if sys.platform == 'win32':
     19     raise unittest.SkipTest('UNIX only')
     20 
     21 
     22 import asyncio
     23 from asyncio import log
     24 from asyncio import test_utils
     25 from asyncio import unix_events
     26 
     27 
     28 MOCK_ANY = mock.ANY
     29 
     30 
     31 def close_pipe_transport(transport):
     32     # Don't call transport.close() because the event loop and the selector
     33     # are mocked
     34     if transport._pipe is None:
     35         return
     36     transport._pipe.close()
     37     transport._pipe = None
     38 
     39 
     40 @unittest.skipUnless(signal, 'Signals are not supported')
     41 class SelectorEventLoopSignalTests(test_utils.TestCase):
     42 
     43     def setUp(self):
     44         super().setUp()
     45         self.loop = asyncio.SelectorEventLoop()
     46         self.set_event_loop(self.loop)
     47 
     48     def test_check_signal(self):
     49         self.assertRaises(
     50             TypeError, self.loop._check_signal, '1')
     51         self.assertRaises(
     52             ValueError, self.loop._check_signal, signal.NSIG + 1)
     53 
     54     def test_handle_signal_no_handler(self):
     55         self.loop._handle_signal(signal.NSIG + 1)
     56 
     57     def test_handle_signal_cancelled_handler(self):
     58         h = asyncio.Handle(mock.Mock(), (),
     59                            loop=mock.Mock())
     60         h.cancel()
     61         self.loop._signal_handlers[signal.NSIG + 1] = h
     62         self.loop.remove_signal_handler = mock.Mock()
     63         self.loop._handle_signal(signal.NSIG + 1)
     64         self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
     65 
     66     @mock.patch('asyncio.unix_events.signal')
     67     def test_add_signal_handler_setup_error(self, m_signal):
     68         m_signal.NSIG = signal.NSIG
     69         m_signal.set_wakeup_fd.side_effect = ValueError
     70 
     71         self.assertRaises(
     72             RuntimeError,
     73             self.loop.add_signal_handler,
     74             signal.SIGINT, lambda: True)
     75 
     76     @mock.patch('asyncio.unix_events.signal')
     77     def test_add_signal_handler_coroutine_error(self, m_signal):
     78         m_signal.NSIG = signal.NSIG
     79 
     80         @asyncio.coroutine
     81         def simple_coroutine():
     82             yield from []
     83 
     84         # callback must not be a coroutine function
     85         coro_func = simple_coroutine
     86         coro_obj = coro_func()
     87         self.addCleanup(coro_obj.close)
     88         for func in (coro_func, coro_obj):
     89             self.assertRaisesRegex(
     90                 TypeError, 'coroutines cannot be used with add_signal_handler',
     91                 self.loop.add_signal_handler,
     92                 signal.SIGINT, func)
     93 
     94     @mock.patch('asyncio.unix_events.signal')
     95     def test_add_signal_handler(self, m_signal):
     96         m_signal.NSIG = signal.NSIG
     97 
     98         cb = lambda: True
     99         self.loop.add_signal_handler(signal.SIGHUP, cb)
    100         h = self.loop._signal_handlers.get(signal.SIGHUP)
    101         self.assertIsInstance(h, asyncio.Handle)
    102         self.assertEqual(h._callback, cb)
    103 
    104     @mock.patch('asyncio.unix_events.signal')
    105     def test_add_signal_handler_install_error(self, m_signal):
    106         m_signal.NSIG = signal.NSIG
    107 
    108         def set_wakeup_fd(fd):
    109             if fd == -1:
    110                 raise ValueError()
    111         m_signal.set_wakeup_fd = set_wakeup_fd
    112 
    113         class Err(OSError):
    114             errno = errno.EFAULT
    115         m_signal.signal.side_effect = Err
    116 
    117         self.assertRaises(
    118             Err,
    119             self.loop.add_signal_handler,
    120             signal.SIGINT, lambda: True)
    121 
    122     @mock.patch('asyncio.unix_events.signal')
    123     @mock.patch('asyncio.base_events.logger')
    124     def test_add_signal_handler_install_error2(self, m_logging, m_signal):
    125         m_signal.NSIG = signal.NSIG
    126 
    127         class Err(OSError):
    128             errno = errno.EINVAL
    129         m_signal.signal.side_effect = Err
    130 
    131         self.loop._signal_handlers[signal.SIGHUP] = lambda: True
    132         self.assertRaises(
    133             RuntimeError,
    134             self.loop.add_signal_handler,
    135             signal.SIGINT, lambda: True)
    136         self.assertFalse(m_logging.info.called)
    137         self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
    138 
    139     @mock.patch('asyncio.unix_events.signal')
    140     @mock.patch('asyncio.base_events.logger')
    141     def test_add_signal_handler_install_error3(self, m_logging, m_signal):
    142         class Err(OSError):
    143             errno = errno.EINVAL
    144         m_signal.signal.side_effect = Err
    145         m_signal.NSIG = signal.NSIG
    146 
    147         self.assertRaises(
    148             RuntimeError,
    149             self.loop.add_signal_handler,
    150             signal.SIGINT, lambda: True)
    151         self.assertFalse(m_logging.info.called)
    152         self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
    153 
    154     @mock.patch('asyncio.unix_events.signal')
    155     def test_remove_signal_handler(self, m_signal):
    156         m_signal.NSIG = signal.NSIG
    157 
    158         self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
    159 
    160         self.assertTrue(
    161             self.loop.remove_signal_handler(signal.SIGHUP))
    162         self.assertTrue(m_signal.set_wakeup_fd.called)
    163         self.assertTrue(m_signal.signal.called)
    164         self.assertEqual(
    165             (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
    166 
    167     @mock.patch('asyncio.unix_events.signal')
    168     def test_remove_signal_handler_2(self, m_signal):
    169         m_signal.NSIG = signal.NSIG
    170         m_signal.SIGINT = signal.SIGINT
    171 
    172         self.loop.add_signal_handler(signal.SIGINT, lambda: True)
    173         self.loop._signal_handlers[signal.SIGHUP] = object()
    174         m_signal.set_wakeup_fd.reset_mock()
    175 
    176         self.assertTrue(
    177             self.loop.remove_signal_handler(signal.SIGINT))
    178         self.assertFalse(m_signal.set_wakeup_fd.called)
    179         self.assertTrue(m_signal.signal.called)
    180         self.assertEqual(
    181             (signal.SIGINT, m_signal.default_int_handler),
    182             m_signal.signal.call_args[0])
    183 
    184     @mock.patch('asyncio.unix_events.signal')
    185     @mock.patch('asyncio.base_events.logger')
    186     def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
    187         m_signal.NSIG = signal.NSIG
    188         self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
    189 
    190         m_signal.set_wakeup_fd.side_effect = ValueError
    191 
    192         self.loop.remove_signal_handler(signal.SIGHUP)
    193         self.assertTrue(m_logging.info)
    194 
    195     @mock.patch('asyncio.unix_events.signal')
    196     def test_remove_signal_handler_error(self, m_signal):
    197         m_signal.NSIG = signal.NSIG
    198         self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
    199 
    200         m_signal.signal.side_effect = OSError
    201 
    202         self.assertRaises(
    203             OSError, self.loop.remove_signal_handler, signal.SIGHUP)
    204 
    205     @mock.patch('asyncio.unix_events.signal')
    206     def test_remove_signal_handler_error2(self, m_signal):
    207         m_signal.NSIG = signal.NSIG
    208         self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
    209 
    210         class Err(OSError):
    211             errno = errno.EINVAL
    212         m_signal.signal.side_effect = Err
    213 
    214         self.assertRaises(
    215             RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
    216 
    217     @mock.patch('asyncio.unix_events.signal')
    218     def test_close(self, m_signal):
    219         m_signal.NSIG = signal.NSIG
    220 
    221         self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
    222         self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)
    223 
    224         self.assertEqual(len(self.loop._signal_handlers), 2)
    225 
    226         m_signal.set_wakeup_fd.reset_mock()
    227 
    228         self.loop.close()
    229 
    230         self.assertEqual(len(self.loop._signal_handlers), 0)
    231         m_signal.set_wakeup_fd.assert_called_once_with(-1)
    232 
    233 
    234 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
    235                      'UNIX Sockets are not supported')
    236 class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
    237 
    238     def setUp(self):
    239         super().setUp()
    240         self.loop = asyncio.SelectorEventLoop()
    241         self.set_event_loop(self.loop)
    242 
    243     def test_create_unix_server_existing_path_sock(self):
    244         with test_utils.unix_socket_path() as path:
    245             sock = socket.socket(socket.AF_UNIX)
    246             sock.bind(path)
    247             sock.listen(1)
    248             sock.close()
    249 
    250             coro = self.loop.create_unix_server(lambda: None, path)
    251             srv = self.loop.run_until_complete(coro)
    252             srv.close()
    253             self.loop.run_until_complete(srv.wait_closed())
    254 
    255     @unittest.skipUnless(hasattr(os, 'fspath'), 'no os.fspath')
    256     def test_create_unix_server_pathlib(self):
    257         with test_utils.unix_socket_path() as path:
    258             path = pathlib.Path(path)
    259             srv_coro = self.loop.create_unix_server(lambda: None, path)
    260             srv = self.loop.run_until_complete(srv_coro)
    261             srv.close()
    262             self.loop.run_until_complete(srv.wait_closed())
    263 
    264     def test_create_unix_server_existing_path_nonsock(self):
    265         with tempfile.NamedTemporaryFile() as file:
    266             coro = self.loop.create_unix_server(lambda: None, file.name)
    267             with self.assertRaisesRegex(OSError,
    268                                         'Address.*is already in use'):
    269                 self.loop.run_until_complete(coro)
    270 
    271     def test_create_unix_server_ssl_bool(self):
    272         coro = self.loop.create_unix_server(lambda: None, path='spam',
    273                                             ssl=True)
    274         with self.assertRaisesRegex(TypeError,
    275                                     'ssl argument must be an SSLContext'):
    276             self.loop.run_until_complete(coro)
    277 
    278     def test_create_unix_server_nopath_nosock(self):
    279         coro = self.loop.create_unix_server(lambda: None, path=None)
    280         with self.assertRaisesRegex(ValueError,
    281                                     'path was not specified, and no sock'):
    282             self.loop.run_until_complete(coro)
    283 
    284     def test_create_unix_server_path_inetsock(self):
    285         sock = socket.socket()
    286         with sock:
    287             coro = self.loop.create_unix_server(lambda: None, path=None,
    288                                                 sock=sock)
    289             with self.assertRaisesRegex(ValueError,
    290                                         'A UNIX Domain Stream.*was expected'):
    291                 self.loop.run_until_complete(coro)
    292 
    293     def test_create_unix_server_path_dgram(self):
    294         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
    295         with sock:
    296             coro = self.loop.create_unix_server(lambda: None, path=None,
    297                                                 sock=sock)
    298             with self.assertRaisesRegex(ValueError,
    299                                         'A UNIX Domain Stream.*was expected'):
    300                 self.loop.run_until_complete(coro)
    301 
    302     @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
    303                          'no socket.SOCK_NONBLOCK (linux only)')
    304     def test_create_unix_server_path_stream_bittype(self):
    305         sock = socket.socket(
    306             socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
    307         with tempfile.NamedTemporaryFile() as file:
    308             fn = file.name
    309         try:
    310             with sock:
    311                 sock.bind(fn)
    312                 coro = self.loop.create_unix_server(lambda: None, path=None,
    313                                                     sock=sock)
    314                 srv = self.loop.run_until_complete(coro)
    315                 srv.close()
    316                 self.loop.run_until_complete(srv.wait_closed())
    317         finally:
    318             os.unlink(fn)
    319 
    320     def test_create_unix_connection_path_inetsock(self):
    321         sock = socket.socket()
    322         with sock:
    323             coro = self.loop.create_unix_connection(lambda: None, path=None,
    324                                                     sock=sock)
    325             with self.assertRaisesRegex(ValueError,
    326                                         'A UNIX Domain Stream.*was expected'):
    327                 self.loop.run_until_complete(coro)
    328 
    329     @mock.patch('asyncio.unix_events.socket')
    330     def test_create_unix_server_bind_error(self, m_socket):
    331         # Ensure that the socket is closed on any bind error
    332         sock = mock.Mock()
    333         m_socket.socket.return_value = sock
    334 
    335         sock.bind.side_effect = OSError
    336         coro = self.loop.create_unix_server(lambda: None, path="/test")
    337         with self.assertRaises(OSError):
    338             self.loop.run_until_complete(coro)
    339         self.assertTrue(sock.close.called)
    340 
    341         sock.bind.side_effect = MemoryError
    342         coro = self.loop.create_unix_server(lambda: None, path="/test")
    343         with self.assertRaises(MemoryError):
    344             self.loop.run_until_complete(coro)
    345         self.assertTrue(sock.close.called)
    346 
    347     def test_create_unix_connection_path_sock(self):
    348         coro = self.loop.create_unix_connection(
    349             lambda: None, os.devnull, sock=object())
    350         with self.assertRaisesRegex(ValueError, 'path and sock can not be'):
    351             self.loop.run_until_complete(coro)
    352 
    353     def test_create_unix_connection_nopath_nosock(self):
    354         coro = self.loop.create_unix_connection(
    355             lambda: None, None)
    356         with self.assertRaisesRegex(ValueError,
    357                                     'no path and sock were specified'):
    358             self.loop.run_until_complete(coro)
    359 
    360     def test_create_unix_connection_nossl_serverhost(self):
    361         coro = self.loop.create_unix_connection(
    362             lambda: None, os.devnull, server_hostname='spam')
    363         with self.assertRaisesRegex(ValueError,
    364                                     'server_hostname is only meaningful'):
    365             self.loop.run_until_complete(coro)
    366 
    367     def test_create_unix_connection_ssl_noserverhost(self):
    368         coro = self.loop.create_unix_connection(
    369             lambda: None, os.devnull, ssl=True)
    370 
    371         with self.assertRaisesRegex(
    372             ValueError, 'you have to pass server_hostname when using ssl'):
    373 
    374             self.loop.run_until_complete(coro)
    375 
    376 
    377 class UnixReadPipeTransportTests(test_utils.TestCase):
    378 
    379     def setUp(self):
    380         super().setUp()
    381         self.loop = self.new_test_loop()
    382         self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
    383         self.pipe = mock.Mock(spec_set=io.RawIOBase)
    384         self.pipe.fileno.return_value = 5
    385 
    386         blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
    387         blocking_patcher.start()
    388         self.addCleanup(blocking_patcher.stop)
    389 
    390         fstat_patcher = mock.patch('os.fstat')
    391         m_fstat = fstat_patcher.start()
    392         st = mock.Mock()
    393         st.st_mode = stat.S_IFIFO
    394         m_fstat.return_value = st
    395         self.addCleanup(fstat_patcher.stop)
    396 
    397     def read_pipe_transport(self, waiter=None):
    398         transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe,
    399                                                        self.protocol,
    400                                                        waiter=waiter)
    401         self.addCleanup(close_pipe_transport, transport)
    402         return transport
    403 
    404     def test_ctor(self):
    405         waiter = asyncio.Future(loop=self.loop)
    406         tr = self.read_pipe_transport(waiter=waiter)
    407         self.loop.run_until_complete(waiter)
    408 
    409         self.protocol.connection_made.assert_called_with(tr)
    410         self.loop.assert_reader(5, tr._read_ready)
    411         self.assertIsNone(waiter.result())
    412 
    413     @mock.patch('os.read')
    414     def test__read_ready(self, m_read):
    415         tr = self.read_pipe_transport()
    416         m_read.return_value = b'data'
    417         tr._read_ready()
    418 
    419         m_read.assert_called_with(5, tr.max_size)
    420         self.protocol.data_received.assert_called_with(b'data')
    421 
    422     @mock.patch('os.read')
    423     def test__read_ready_eof(self, m_read):
    424         tr = self.read_pipe_transport()
    425         m_read.return_value = b''
    426         tr._read_ready()
    427 
    428         m_read.assert_called_with(5, tr.max_size)
    429         self.assertFalse(self.loop.readers)
    430         test_utils.run_briefly(self.loop)
    431         self.protocol.eof_received.assert_called_with()
    432         self.protocol.connection_lost.assert_called_with(None)
    433 
    434     @mock.patch('os.read')
    435     def test__read_ready_blocked(self, m_read):
    436         tr = self.read_pipe_transport()
    437         m_read.side_effect = BlockingIOError
    438         tr._read_ready()
    439 
    440         m_read.assert_called_with(5, tr.max_size)
    441         test_utils.run_briefly(self.loop)
    442         self.assertFalse(self.protocol.data_received.called)
    443 
    444     @mock.patch('asyncio.log.logger.error')
    445     @mock.patch('os.read')
    446     def test__read_ready_error(self, m_read, m_logexc):
    447         tr = self.read_pipe_transport()
    448         err = OSError()
    449         m_read.side_effect = err
    450         tr._close = mock.Mock()
    451         tr._read_ready()
    452 
    453         m_read.assert_called_with(5, tr.max_size)
    454         tr._close.assert_called_with(err)
    455         m_logexc.assert_called_with(
    456             test_utils.MockPattern(
    457                 'Fatal read error on pipe transport'
    458                 '\nprotocol:.*\ntransport:.*'),
    459             exc_info=(OSError, MOCK_ANY, MOCK_ANY))
    460 
    461     @mock.patch('os.read')
    462     def test_pause_reading(self, m_read):
    463         tr = self.read_pipe_transport()
    464         m = mock.Mock()
    465         self.loop.add_reader(5, m)
    466         tr.pause_reading()
    467         self.assertFalse(self.loop.readers)
    468 
    469     @mock.patch('os.read')
    470     def test_resume_reading(self, m_read):
    471         tr = self.read_pipe_transport()
    472         tr.resume_reading()
    473         self.loop.assert_reader(5, tr._read_ready)
    474 
    475     @mock.patch('os.read')
    476     def test_close(self, m_read):
    477         tr = self.read_pipe_transport()
    478         tr._close = mock.Mock()
    479         tr.close()
    480         tr._close.assert_called_with(None)
    481 
    482     @mock.patch('os.read')
    483     def test_close_already_closing(self, m_read):
    484         tr = self.read_pipe_transport()
    485         tr._closing = True
    486         tr._close = mock.Mock()
    487         tr.close()
    488         self.assertFalse(tr._close.called)
    489 
    490     @mock.patch('os.read')
    491     def test__close(self, m_read):
    492         tr = self.read_pipe_transport()
    493         err = object()
    494         tr._close(err)
    495         self.assertTrue(tr.is_closing())
    496         self.assertFalse(self.loop.readers)
    497         test_utils.run_briefly(self.loop)
    498         self.protocol.connection_lost.assert_called_with(err)
    499 
    500     def test__call_connection_lost(self):
    501         tr = self.read_pipe_transport()
    502         self.assertIsNotNone(tr._protocol)
    503         self.assertIsNotNone(tr._loop)
    504 
    505         err = None
    506         tr._call_connection_lost(err)
    507         self.protocol.connection_lost.assert_called_with(err)
    508         self.pipe.close.assert_called_with()
    509 
    510         self.assertIsNone(tr._protocol)
    511         self.assertIsNone(tr._loop)
    512 
    513     def test__call_connection_lost_with_err(self):
    514         tr = self.read_pipe_transport()
    515         self.assertIsNotNone(tr._protocol)
    516         self.assertIsNotNone(tr._loop)
    517 
    518         err = OSError()
    519         tr._call_connection_lost(err)
    520         self.protocol.connection_lost.assert_called_with(err)
    521         self.pipe.close.assert_called_with()
    522 
    523         self.assertIsNone(tr._protocol)
    524         self.assertIsNone(tr._loop)
    525 
    526 
    527 class UnixWritePipeTransportTests(test_utils.TestCase):
    528 
    529     def setUp(self):
    530         super().setUp()
    531         self.loop = self.new_test_loop()
    532         self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
    533         self.pipe = mock.Mock(spec_set=io.RawIOBase)
    534         self.pipe.fileno.return_value = 5
    535 
    536         blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
    537         blocking_patcher.start()
    538         self.addCleanup(blocking_patcher.stop)
    539 
    540         fstat_patcher = mock.patch('os.fstat')
    541         m_fstat = fstat_patcher.start()
    542         st = mock.Mock()
    543         st.st_mode = stat.S_IFSOCK
    544         m_fstat.return_value = st
    545         self.addCleanup(fstat_patcher.stop)
    546 
    547     def write_pipe_transport(self, waiter=None):
    548         transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe,
    549                                                         self.protocol,
    550                                                         waiter=waiter)
    551         self.addCleanup(close_pipe_transport, transport)
    552         return transport
    553 
    554     def test_ctor(self):
    555         waiter = asyncio.Future(loop=self.loop)
    556         tr = self.write_pipe_transport(waiter=waiter)
    557         self.loop.run_until_complete(waiter)
    558 
    559         self.protocol.connection_made.assert_called_with(tr)
    560         self.loop.assert_reader(5, tr._read_ready)
    561         self.assertEqual(None, waiter.result())
    562 
    563     def test_can_write_eof(self):
    564         tr = self.write_pipe_transport()
    565         self.assertTrue(tr.can_write_eof())
    566 
    567     @mock.patch('os.write')
    568     def test_write(self, m_write):
    569         tr = self.write_pipe_transport()
    570         m_write.return_value = 4
    571         tr.write(b'data')
    572         m_write.assert_called_with(5, b'data')
    573         self.assertFalse(self.loop.writers)
    574         self.assertEqual(bytearray(), tr._buffer)
    575 
    576     @mock.patch('os.write')
    577     def test_write_no_data(self, m_write):
    578         tr = self.write_pipe_transport()
    579         tr.write(b'')
    580         self.assertFalse(m_write.called)
    581         self.assertFalse(self.loop.writers)
    582         self.assertEqual(bytearray(b''), tr._buffer)
    583 
    584     @mock.patch('os.write')
    585     def test_write_partial(self, m_write):
    586         tr = self.write_pipe_transport()
    587         m_write.return_value = 2
    588         tr.write(b'data')
    589         self.loop.assert_writer(5, tr._write_ready)
    590         self.assertEqual(bytearray(b'ta'), tr._buffer)
    591 
    592     @mock.patch('os.write')
    593     def test_write_buffer(self, m_write):
    594         tr = self.write_pipe_transport()
    595         self.loop.add_writer(5, tr._write_ready)
    596         tr._buffer = bytearray(b'previous')
    597         tr.write(b'data')
    598         self.assertFalse(m_write.called)
    599         self.loop.assert_writer(5, tr._write_ready)
    600         self.assertEqual(bytearray(b'previousdata'), tr._buffer)
    601 
    602     @mock.patch('os.write')
    603     def test_write_again(self, m_write):
    604         tr = self.write_pipe_transport()
    605         m_write.side_effect = BlockingIOError()
    606         tr.write(b'data')
    607         m_write.assert_called_with(5, bytearray(b'data'))
    608         self.loop.assert_writer(5, tr._write_ready)
    609         self.assertEqual(bytearray(b'data'), tr._buffer)
    610 
    611     @mock.patch('asyncio.unix_events.logger')
    612     @mock.patch('os.write')
    613     def test_write_err(self, m_write, m_log):
    614         tr = self.write_pipe_transport()
    615         err = OSError()
    616         m_write.side_effect = err
    617         tr._fatal_error = mock.Mock()
    618         tr.write(b'data')
    619         m_write.assert_called_with(5, b'data')
    620         self.assertFalse(self.loop.writers)
    621         self.assertEqual(bytearray(), tr._buffer)
    622         tr._fatal_error.assert_called_with(
    623                             err,
    624                             'Fatal write error on pipe transport')
    625         self.assertEqual(1, tr._conn_lost)
    626 
    627         tr.write(b'data')
    628         self.assertEqual(2, tr._conn_lost)
    629         tr.write(b'data')
    630         tr.write(b'data')
    631         tr.write(b'data')
    632         tr.write(b'data')
    633         # This is a bit overspecified. :-(
    634         m_log.warning.assert_called_with(
    635             'pipe closed by peer or os.write(pipe, data) raised exception.')
    636         tr.close()
    637 
    638     @mock.patch('os.write')
    639     def test_write_close(self, m_write):
    640         tr = self.write_pipe_transport()
    641         tr._read_ready()  # pipe was closed by peer
    642 
    643         tr.write(b'data')
    644         self.assertEqual(tr._conn_lost, 1)
    645         tr.write(b'data')
    646         self.assertEqual(tr._conn_lost, 2)
    647 
    648     def test__read_ready(self):
    649         tr = self.write_pipe_transport()
    650         tr._read_ready()
    651         self.assertFalse(self.loop.readers)
    652         self.assertFalse(self.loop.writers)
    653         self.assertTrue(tr.is_closing())
    654         test_utils.run_briefly(self.loop)
    655         self.protocol.connection_lost.assert_called_with(None)
    656 
    657     @mock.patch('os.write')
    658     def test__write_ready(self, m_write):
    659         tr = self.write_pipe_transport()
    660         self.loop.add_writer(5, tr._write_ready)
    661         tr._buffer = bytearray(b'data')
    662         m_write.return_value = 4
    663         tr._write_ready()
    664         self.assertFalse(self.loop.writers)
    665         self.assertEqual(bytearray(), tr._buffer)
    666 
    667     @mock.patch('os.write')
    668     def test__write_ready_partial(self, m_write):
    669         tr = self.write_pipe_transport()
    670         self.loop.add_writer(5, tr._write_ready)
    671         tr._buffer = bytearray(b'data')
    672         m_write.return_value = 3
    673         tr._write_ready()
    674         self.loop.assert_writer(5, tr._write_ready)
    675         self.assertEqual(bytearray(b'a'), tr._buffer)
    676 
    677     @mock.patch('os.write')
    678     def test__write_ready_again(self, m_write):
    679         tr = self.write_pipe_transport()
    680         self.loop.add_writer(5, tr._write_ready)
    681         tr._buffer = bytearray(b'data')
    682         m_write.side_effect = BlockingIOError()
    683         tr._write_ready()
    684         m_write.assert_called_with(5, bytearray(b'data'))
    685         self.loop.assert_writer(5, tr._write_ready)
    686         self.assertEqual(bytearray(b'data'), tr._buffer)
    687 
    688     @mock.patch('os.write')
    689     def test__write_ready_empty(self, m_write):
    690         tr = self.write_pipe_transport()
    691         self.loop.add_writer(5, tr._write_ready)
    692         tr._buffer = bytearray(b'data')
    693         m_write.return_value = 0
    694         tr._write_ready()
    695         m_write.assert_called_with(5, bytearray(b'data'))
    696         self.loop.assert_writer(5, tr._write_ready)
    697         self.assertEqual(bytearray(b'data'), tr._buffer)
    698 
    699     @mock.patch('asyncio.log.logger.error')
    700     @mock.patch('os.write')
    701     def test__write_ready_err(self, m_write, m_logexc):
    702         tr = self.write_pipe_transport()
    703         self.loop.add_writer(5, tr._write_ready)
    704         tr._buffer = bytearray(b'data')
    705         m_write.side_effect = err = OSError()
    706         tr._write_ready()
    707         self.assertFalse(self.loop.writers)
    708         self.assertFalse(self.loop.readers)
    709         self.assertEqual(bytearray(), tr._buffer)
    710         self.assertTrue(tr.is_closing())
    711         m_logexc.assert_called_with(
    712             test_utils.MockPattern(
    713                 'Fatal write error on pipe transport'
    714                 '\nprotocol:.*\ntransport:.*'),
    715             exc_info=(OSError, MOCK_ANY, MOCK_ANY))
    716         self.assertEqual(1, tr._conn_lost)
    717         test_utils.run_briefly(self.loop)
    718         self.protocol.connection_lost.assert_called_with(err)
    719 
    720     @mock.patch('os.write')
    721     def test__write_ready_closing(self, m_write):
    722         tr = self.write_pipe_transport()
    723         self.loop.add_writer(5, tr._write_ready)
    724         tr._closing = True
    725         tr._buffer = bytearray(b'data')
    726         m_write.return_value = 4
    727         tr._write_ready()
    728         self.assertFalse(self.loop.writers)
    729         self.assertFalse(self.loop.readers)
    730         self.assertEqual(bytearray(), tr._buffer)
    731         self.protocol.connection_lost.assert_called_with(None)
    732         self.pipe.close.assert_called_with()
    733 
    734     @mock.patch('os.write')
    735     def test_abort(self, m_write):
    736         tr = self.write_pipe_transport()
    737         self.loop.add_writer(5, tr._write_ready)
    738         self.loop.add_reader(5, tr._read_ready)
    739         tr._buffer = [b'da', b'ta']
    740         tr.abort()
    741         self.assertFalse(m_write.called)
    742         self.assertFalse(self.loop.readers)
    743         self.assertFalse(self.loop.writers)
    744         self.assertEqual([], tr._buffer)
    745         self.assertTrue(tr.is_closing())
    746         test_utils.run_briefly(self.loop)
    747         self.protocol.connection_lost.assert_called_with(None)
    748 
    749     def test__call_connection_lost(self):
    750         tr = self.write_pipe_transport()
    751         self.assertIsNotNone(tr._protocol)
    752         self.assertIsNotNone(tr._loop)
    753 
    754         err = None
    755         tr._call_connection_lost(err)
    756         self.protocol.connection_lost.assert_called_with(err)
    757         self.pipe.close.assert_called_with()
    758 
    759         self.assertIsNone(tr._protocol)
    760         self.assertIsNone(tr._loop)
    761 
    762     def test__call_connection_lost_with_err(self):
    763         tr = self.write_pipe_transport()
    764         self.assertIsNotNone(tr._protocol)
    765         self.assertIsNotNone(tr._loop)
    766 
    767         err = OSError()
    768         tr._call_connection_lost(err)
    769         self.protocol.connection_lost.assert_called_with(err)
    770         self.pipe.close.assert_called_with()
    771 
    772         self.assertIsNone(tr._protocol)
    773         self.assertIsNone(tr._loop)
    774 
    775     def test_close(self):
    776         tr = self.write_pipe_transport()
    777         tr.write_eof = mock.Mock()
    778         tr.close()
    779         tr.write_eof.assert_called_with()
    780 
    781         # closing the transport twice must not fail
    782         tr.close()
    783 
    784     def test_close_closing(self):
    785         tr = self.write_pipe_transport()
    786         tr.write_eof = mock.Mock()
    787         tr._closing = True
    788         tr.close()
    789         self.assertFalse(tr.write_eof.called)
    790 
    791     def test_write_eof(self):
    792         tr = self.write_pipe_transport()
    793         tr.write_eof()
    794         self.assertTrue(tr.is_closing())
    795         self.assertFalse(self.loop.readers)
    796         test_utils.run_briefly(self.loop)
    797         self.protocol.connection_lost.assert_called_with(None)
    798 
    799     def test_write_eof_pending(self):
    800         tr = self.write_pipe_transport()
    801         tr._buffer = [b'data']
    802         tr.write_eof()
    803         self.assertTrue(tr.is_closing())
    804         self.assertFalse(self.protocol.connection_lost.called)
    805 
    806 
    807 class AbstractChildWatcherTests(unittest.TestCase):
    808 
    809     def test_not_implemented(self):
    810         f = mock.Mock()
    811         watcher = asyncio.AbstractChildWatcher()
    812         self.assertRaises(
    813             NotImplementedError, watcher.add_child_handler, f, f)
    814         self.assertRaises(
    815             NotImplementedError, watcher.remove_child_handler, f)
    816         self.assertRaises(
    817             NotImplementedError, watcher.attach_loop, f)
    818         self.assertRaises(
    819             NotImplementedError, watcher.close)
    820         self.assertRaises(
    821             NotImplementedError, watcher.__enter__)
    822         self.assertRaises(
    823             NotImplementedError, watcher.__exit__, f, f, f)
    824 
    825 
    826 class BaseChildWatcherTests(unittest.TestCase):
    827 
    828     def test_not_implemented(self):
    829         f = mock.Mock()
    830         watcher = unix_events.BaseChildWatcher()
    831         self.assertRaises(
    832             NotImplementedError, watcher._do_waitpid, f)
    833 
    834 
    835 WaitPidMocks = collections.namedtuple("WaitPidMocks",
    836                                       ("waitpid",
    837                                        "WIFEXITED",
    838                                        "WIFSIGNALED",
    839                                        "WEXITSTATUS",
    840                                        "WTERMSIG",
    841                                        ))
    842 
    843 
    844 class ChildWatcherTestsMixin:
    845 
    846     ignore_warnings = mock.patch.object(log.logger, "warning")
    847 
    848     def setUp(self):
    849         super().setUp()
    850         self.loop = self.new_test_loop()
    851         self.running = False
    852         self.zombies = {}
    853 
    854         with mock.patch.object(
    855                 self.loop, "add_signal_handler") as self.m_add_signal_handler:
    856             self.watcher = self.create_watcher()
    857             self.watcher.attach_loop(self.loop)
    858 
    859     def waitpid(self, pid, flags):
    860         if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
    861             self.assertGreater(pid, 0)
    862         try:
    863             if pid < 0:
    864                 return self.zombies.popitem()
    865             else:
    866                 return pid, self.zombies.pop(pid)
    867         except KeyError:
    868             pass
    869         if self.running:
    870             return 0, 0
    871         else:
    872             raise ChildProcessError()
    873 
    874     def add_zombie(self, pid, returncode):
    875         self.zombies[pid] = returncode + 32768
    876 
    877     def WIFEXITED(self, status):
    878         return status >= 32768
    879 
    880     def WIFSIGNALED(self, status):
    881         return 32700 < status < 32768
    882 
    883     def WEXITSTATUS(self, status):
    884         self.assertTrue(self.WIFEXITED(status))
    885         return status - 32768
    886 
    887     def WTERMSIG(self, status):
    888         self.assertTrue(self.WIFSIGNALED(status))
    889         return 32768 - status
    890 
    891     def test_create_watcher(self):
    892         self.m_add_signal_handler.assert_called_once_with(
    893             signal.SIGCHLD, self.watcher._sig_chld)
    894 
    895     def waitpid_mocks(func):
    896         def wrapped_func(self):
    897             def patch(target, wrapper):
    898                 return mock.patch(target, wraps=wrapper,
    899                                   new_callable=mock.Mock)
    900 
    901             with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \
    902                  patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \
    903                  patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \
    904                  patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \
    905                  patch('os.waitpid', self.waitpid) as m_waitpid:
    906                 func(self, WaitPidMocks(m_waitpid,
    907                                         m_WIFEXITED, m_WIFSIGNALED,
    908                                         m_WEXITSTATUS, m_WTERMSIG,
    909                                         ))
    910         return wrapped_func
    911 
    912     @waitpid_mocks
    913     def test_sigchld(self, m):
    914         # register a child
    915         callback = mock.Mock()
    916 
    917         with self.watcher:
    918             self.running = True
    919             self.watcher.add_child_handler(42, callback, 9, 10, 14)
    920 
    921         self.assertFalse(callback.called)
    922         self.assertFalse(m.WIFEXITED.called)
    923         self.assertFalse(m.WIFSIGNALED.called)
    924         self.assertFalse(m.WEXITSTATUS.called)
    925         self.assertFalse(m.WTERMSIG.called)
    926 
    927         # child is running
    928         self.watcher._sig_chld()
    929 
    930         self.assertFalse(callback.called)
    931         self.assertFalse(m.WIFEXITED.called)
    932         self.assertFalse(m.WIFSIGNALED.called)
    933         self.assertFalse(m.WEXITSTATUS.called)
    934         self.assertFalse(m.WTERMSIG.called)
    935 
    936         # child terminates (returncode 12)
    937         self.running = False
    938         self.add_zombie(42, 12)
    939         self.watcher._sig_chld()
    940 
    941         self.assertTrue(m.WIFEXITED.called)
    942         self.assertTrue(m.WEXITSTATUS.called)
    943         self.assertFalse(m.WTERMSIG.called)
    944         callback.assert_called_once_with(42, 12, 9, 10, 14)
    945 
    946         m.WIFSIGNALED.reset_mock()
    947         m.WIFEXITED.reset_mock()
    948         m.WEXITSTATUS.reset_mock()
    949         callback.reset_mock()
    950 
    951         # ensure that the child is effectively reaped
    952         self.add_zombie(42, 13)
    953         with self.ignore_warnings:
    954             self.watcher._sig_chld()
    955 
    956         self.assertFalse(callback.called)
    957         self.assertFalse(m.WTERMSIG.called)
    958 
    959         m.WIFSIGNALED.reset_mock()
    960         m.WIFEXITED.reset_mock()
    961         m.WEXITSTATUS.reset_mock()
    962 
    963         # sigchld called again
    964         self.zombies.clear()
    965         self.watcher._sig_chld()
    966 
    967         self.assertFalse(callback.called)
    968         self.assertFalse(m.WIFEXITED.called)
    969         self.assertFalse(m.WIFSIGNALED.called)
    970         self.assertFalse(m.WEXITSTATUS.called)
    971         self.assertFalse(m.WTERMSIG.called)
    972 
    973     @waitpid_mocks
    974     def test_sigchld_two_children(self, m):
    975         callback1 = mock.Mock()
    976         callback2 = mock.Mock()
    977 
    978         # register child 1
    979         with self.watcher:
    980             self.running = True
    981             self.watcher.add_child_handler(43, callback1, 7, 8)
    982 
    983         self.assertFalse(callback1.called)
    984         self.assertFalse(callback2.called)
    985         self.assertFalse(m.WIFEXITED.called)
    986         self.assertFalse(m.WIFSIGNALED.called)
    987         self.assertFalse(m.WEXITSTATUS.called)
    988         self.assertFalse(m.WTERMSIG.called)
    989 
    990         # register child 2
    991         with self.watcher:
    992             self.watcher.add_child_handler(44, callback2, 147, 18)
    993 
    994         self.assertFalse(callback1.called)
    995         self.assertFalse(callback2.called)
    996         self.assertFalse(m.WIFEXITED.called)
    997         self.assertFalse(m.WIFSIGNALED.called)
    998         self.assertFalse(m.WEXITSTATUS.called)
    999         self.assertFalse(m.WTERMSIG.called)
   1000 
   1001         # children are running
   1002         self.watcher._sig_chld()
   1003 
   1004         self.assertFalse(callback1.called)
   1005         self.assertFalse(callback2.called)
   1006         self.assertFalse(m.WIFEXITED.called)
   1007         self.assertFalse(m.WIFSIGNALED.called)
   1008         self.assertFalse(m.WEXITSTATUS.called)
   1009         self.assertFalse(m.WTERMSIG.called)
   1010 
   1011         # child 1 terminates (signal 3)
   1012         self.add_zombie(43, -3)
   1013         self.watcher._sig_chld()
   1014 
   1015         callback1.assert_called_once_with(43, -3, 7, 8)
   1016         self.assertFalse(callback2.called)
   1017         self.assertTrue(m.WIFSIGNALED.called)
   1018         self.assertFalse(m.WEXITSTATUS.called)
   1019         self.assertTrue(m.WTERMSIG.called)
   1020 
   1021         m.WIFSIGNALED.reset_mock()
   1022         m.WIFEXITED.reset_mock()
   1023         m.WTERMSIG.reset_mock()
   1024         callback1.reset_mock()
   1025 
   1026         # child 2 still running
   1027         self.watcher._sig_chld()
   1028 
   1029         self.assertFalse(callback1.called)
   1030         self.assertFalse(callback2.called)
   1031         self.assertFalse(m.WIFEXITED.called)
   1032         self.assertFalse(m.WIFSIGNALED.called)
   1033         self.assertFalse(m.WEXITSTATUS.called)
   1034         self.assertFalse(m.WTERMSIG.called)
   1035 
   1036         # child 2 terminates (code 108)
   1037         self.add_zombie(44, 108)
   1038         self.running = False
   1039         self.watcher._sig_chld()
   1040 
   1041         callback2.assert_called_once_with(44, 108, 147, 18)
   1042         self.assertFalse(callback1.called)
   1043         self.assertTrue(m.WIFEXITED.called)
   1044         self.assertTrue(m.WEXITSTATUS.called)
   1045         self.assertFalse(m.WTERMSIG.called)
   1046 
   1047         m.WIFSIGNALED.reset_mock()
   1048         m.WIFEXITED.reset_mock()
   1049         m.WEXITSTATUS.reset_mock()
   1050         callback2.reset_mock()
   1051 
   1052         # ensure that the children are effectively reaped
   1053         self.add_zombie(43, 14)
   1054         self.add_zombie(44, 15)
   1055         with self.ignore_warnings:
   1056             self.watcher._sig_chld()
   1057 
   1058         self.assertFalse(callback1.called)
   1059         self.assertFalse(callback2.called)
   1060         self.assertFalse(m.WTERMSIG.called)
   1061 
   1062         m.WIFSIGNALED.reset_mock()
   1063         m.WIFEXITED.reset_mock()
   1064         m.WEXITSTATUS.reset_mock()
   1065 
   1066         # sigchld called again
   1067         self.zombies.clear()
   1068         self.watcher._sig_chld()
   1069 
   1070         self.assertFalse(callback1.called)
   1071         self.assertFalse(callback2.called)
   1072         self.assertFalse(m.WIFEXITED.called)
   1073         self.assertFalse(m.WIFSIGNALED.called)
   1074         self.assertFalse(m.WEXITSTATUS.called)
   1075         self.assertFalse(m.WTERMSIG.called)
   1076 
   1077     @waitpid_mocks
   1078     def test_sigchld_two_children_terminating_together(self, m):
   1079         callback1 = mock.Mock()
   1080         callback2 = mock.Mock()
   1081 
   1082         # register child 1
   1083         with self.watcher:
   1084             self.running = True
   1085             self.watcher.add_child_handler(45, callback1, 17, 8)
   1086 
   1087         self.assertFalse(callback1.called)
   1088         self.assertFalse(callback2.called)
   1089         self.assertFalse(m.WIFEXITED.called)
   1090         self.assertFalse(m.WIFSIGNALED.called)
   1091         self.assertFalse(m.WEXITSTATUS.called)
   1092         self.assertFalse(m.WTERMSIG.called)
   1093 
   1094         # register child 2
   1095         with self.watcher:
   1096             self.watcher.add_child_handler(46, callback2, 1147, 18)
   1097 
   1098         self.assertFalse(callback1.called)
   1099         self.assertFalse(callback2.called)
   1100         self.assertFalse(m.WIFEXITED.called)
   1101         self.assertFalse(m.WIFSIGNALED.called)
   1102         self.assertFalse(m.WEXITSTATUS.called)
   1103         self.assertFalse(m.WTERMSIG.called)
   1104 
   1105         # children are running
   1106         self.watcher._sig_chld()
   1107 
   1108         self.assertFalse(callback1.called)
   1109         self.assertFalse(callback2.called)
   1110         self.assertFalse(m.WIFEXITED.called)
   1111         self.assertFalse(m.WIFSIGNALED.called)
   1112         self.assertFalse(m.WEXITSTATUS.called)
   1113         self.assertFalse(m.WTERMSIG.called)
   1114 
   1115         # child 1 terminates (code 78)
   1116         # child 2 terminates (signal 5)
   1117         self.add_zombie(45, 78)
   1118         self.add_zombie(46, -5)
   1119         self.running = False
   1120         self.watcher._sig_chld()
   1121 
   1122         callback1.assert_called_once_with(45, 78, 17, 8)
   1123         callback2.assert_called_once_with(46, -5, 1147, 18)
   1124         self.assertTrue(m.WIFSIGNALED.called)
   1125         self.assertTrue(m.WIFEXITED.called)
   1126         self.assertTrue(m.WEXITSTATUS.called)
   1127         self.assertTrue(m.WTERMSIG.called)
   1128 
   1129         m.WIFSIGNALED.reset_mock()
   1130         m.WIFEXITED.reset_mock()
   1131         m.WTERMSIG.reset_mock()
   1132         m.WEXITSTATUS.reset_mock()
   1133         callback1.reset_mock()
   1134         callback2.reset_mock()
   1135 
   1136         # ensure that the children are effectively reaped
   1137         self.add_zombie(45, 14)
   1138         self.add_zombie(46, 15)
   1139         with self.ignore_warnings:
   1140             self.watcher._sig_chld()
   1141 
   1142         self.assertFalse(callback1.called)
   1143         self.assertFalse(callback2.called)
   1144         self.assertFalse(m.WTERMSIG.called)
   1145 
   1146     @waitpid_mocks
   1147     def test_sigchld_race_condition(self, m):
   1148         # register a child
   1149         callback = mock.Mock()
   1150 
   1151         with self.watcher:
   1152             # child terminates before being registered
   1153             self.add_zombie(50, 4)
   1154             self.watcher._sig_chld()
   1155 
   1156             self.watcher.add_child_handler(50, callback, 1, 12)
   1157 
   1158         callback.assert_called_once_with(50, 4, 1, 12)
   1159         callback.reset_mock()
   1160 
   1161         # ensure that the child is effectively reaped
   1162         self.add_zombie(50, -1)
   1163         with self.ignore_warnings:
   1164             self.watcher._sig_chld()
   1165 
   1166         self.assertFalse(callback.called)
   1167 
   1168     @waitpid_mocks
   1169     def test_sigchld_replace_handler(self, m):
   1170         callback1 = mock.Mock()
   1171         callback2 = mock.Mock()
   1172 
   1173         # register a child
   1174         with self.watcher:
   1175             self.running = True
   1176             self.watcher.add_child_handler(51, callback1, 19)
   1177 
   1178         self.assertFalse(callback1.called)
   1179         self.assertFalse(callback2.called)
   1180         self.assertFalse(m.WIFEXITED.called)
   1181         self.assertFalse(m.WIFSIGNALED.called)
   1182         self.assertFalse(m.WEXITSTATUS.called)
   1183         self.assertFalse(m.WTERMSIG.called)
   1184 
   1185         # register the same child again
   1186         with self.watcher:
   1187             self.watcher.add_child_handler(51, callback2, 21)
   1188 
   1189         self.assertFalse(callback1.called)
   1190         self.assertFalse(callback2.called)
   1191         self.assertFalse(m.WIFEXITED.called)
   1192         self.assertFalse(m.WIFSIGNALED.called)
   1193         self.assertFalse(m.WEXITSTATUS.called)
   1194         self.assertFalse(m.WTERMSIG.called)
   1195 
   1196         # child terminates (signal 8)
   1197         self.running = False
   1198         self.add_zombie(51, -8)
   1199         self.watcher._sig_chld()
   1200 
   1201         callback2.assert_called_once_with(51, -8, 21)
   1202         self.assertFalse(callback1.called)
   1203         self.assertTrue(m.WIFSIGNALED.called)
   1204         self.assertFalse(m.WEXITSTATUS.called)
   1205         self.assertTrue(m.WTERMSIG.called)
   1206 
   1207         m.WIFSIGNALED.reset_mock()
   1208         m.WIFEXITED.reset_mock()
   1209         m.WTERMSIG.reset_mock()
   1210         callback2.reset_mock()
   1211 
   1212         # ensure that the child is effectively reaped
   1213         self.add_zombie(51, 13)
   1214         with self.ignore_warnings:
   1215             self.watcher._sig_chld()
   1216 
   1217         self.assertFalse(callback1.called)
   1218         self.assertFalse(callback2.called)
   1219         self.assertFalse(m.WTERMSIG.called)
   1220 
   1221     @waitpid_mocks
   1222     def test_sigchld_remove_handler(self, m):
   1223         callback = mock.Mock()
   1224 
   1225         # register a child
   1226         with self.watcher:
   1227             self.running = True
   1228             self.watcher.add_child_handler(52, callback, 1984)
   1229 
   1230         self.assertFalse(callback.called)
   1231         self.assertFalse(m.WIFEXITED.called)
   1232         self.assertFalse(m.WIFSIGNALED.called)
   1233         self.assertFalse(m.WEXITSTATUS.called)
   1234         self.assertFalse(m.WTERMSIG.called)
   1235 
   1236         # unregister the child
   1237         self.watcher.remove_child_handler(52)
   1238 
   1239         self.assertFalse(callback.called)
   1240         self.assertFalse(m.WIFEXITED.called)
   1241         self.assertFalse(m.WIFSIGNALED.called)
   1242         self.assertFalse(m.WEXITSTATUS.called)
   1243         self.assertFalse(m.WTERMSIG.called)
   1244 
   1245         # child terminates (code 99)
   1246         self.running = False
   1247         self.add_zombie(52, 99)
   1248         with self.ignore_warnings:
   1249             self.watcher._sig_chld()
   1250 
   1251         self.assertFalse(callback.called)
   1252 
   1253     @waitpid_mocks
   1254     def test_sigchld_unknown_status(self, m):
   1255         callback = mock.Mock()
   1256 
   1257         # register a child
   1258         with self.watcher:
   1259             self.running = True
   1260             self.watcher.add_child_handler(53, callback, -19)
   1261 
   1262         self.assertFalse(callback.called)
   1263         self.assertFalse(m.WIFEXITED.called)
   1264         self.assertFalse(m.WIFSIGNALED.called)
   1265         self.assertFalse(m.WEXITSTATUS.called)
   1266         self.assertFalse(m.WTERMSIG.called)
   1267 
   1268         # terminate with unknown status
   1269         self.zombies[53] = 1178
   1270         self.running = False
   1271         self.watcher._sig_chld()
   1272 
   1273         callback.assert_called_once_with(53, 1178, -19)
   1274         self.assertTrue(m.WIFEXITED.called)
   1275         self.assertTrue(m.WIFSIGNALED.called)
   1276         self.assertFalse(m.WEXITSTATUS.called)
   1277         self.assertFalse(m.WTERMSIG.called)
   1278 
   1279         callback.reset_mock()
   1280         m.WIFEXITED.reset_mock()
   1281         m.WIFSIGNALED.reset_mock()
   1282 
   1283         # ensure that the child is effectively reaped
   1284         self.add_zombie(53, 101)
   1285         with self.ignore_warnings:
   1286             self.watcher._sig_chld()
   1287 
   1288         self.assertFalse(callback.called)
   1289 
   1290     @waitpid_mocks
   1291     def test_remove_child_handler(self, m):
   1292         callback1 = mock.Mock()
   1293         callback2 = mock.Mock()
   1294         callback3 = mock.Mock()
   1295 
   1296         # register children
   1297         with self.watcher:
   1298             self.running = True
   1299             self.watcher.add_child_handler(54, callback1, 1)
   1300             self.watcher.add_child_handler(55, callback2, 2)
   1301             self.watcher.add_child_handler(56, callback3, 3)
   1302 
   1303         # remove child handler 1
   1304         self.assertTrue(self.watcher.remove_child_handler(54))
   1305 
   1306         # remove child handler 2 multiple times
   1307         self.assertTrue(self.watcher.remove_child_handler(55))
   1308         self.assertFalse(self.watcher.remove_child_handler(55))
   1309         self.assertFalse(self.watcher.remove_child_handler(55))
   1310 
   1311         # all children terminate
   1312         self.add_zombie(54, 0)
   1313         self.add_zombie(55, 1)
   1314         self.add_zombie(56, 2)
   1315         self.running = False
   1316         with self.ignore_warnings:
   1317             self.watcher._sig_chld()
   1318 
   1319         self.assertFalse(callback1.called)
   1320         self.assertFalse(callback2.called)
   1321         callback3.assert_called_once_with(56, 2, 3)
   1322 
   1323     @waitpid_mocks
   1324     def test_sigchld_unhandled_exception(self, m):
   1325         callback = mock.Mock()
   1326 
   1327         # register a child
   1328         with self.watcher:
   1329             self.running = True
   1330             self.watcher.add_child_handler(57, callback)
   1331 
   1332         # raise an exception
   1333         m.waitpid.side_effect = ValueError
   1334 
   1335         with mock.patch.object(log.logger,
   1336                                'error') as m_error:
   1337 
   1338             self.assertEqual(self.watcher._sig_chld(), None)
   1339             self.assertTrue(m_error.called)
   1340 
   1341     @waitpid_mocks
   1342     def test_sigchld_child_reaped_elsewhere(self, m):
   1343         # register a child
   1344         callback = mock.Mock()
   1345 
   1346         with self.watcher:
   1347             self.running = True
   1348             self.watcher.add_child_handler(58, callback)
   1349 
   1350         self.assertFalse(callback.called)
   1351         self.assertFalse(m.WIFEXITED.called)
   1352         self.assertFalse(m.WIFSIGNALED.called)
   1353         self.assertFalse(m.WEXITSTATUS.called)
   1354         self.assertFalse(m.WTERMSIG.called)
   1355 
   1356         # child terminates
   1357         self.running = False
   1358         self.add_zombie(58, 4)
   1359 
   1360         # waitpid is called elsewhere
   1361         os.waitpid(58, os.WNOHANG)
   1362 
   1363         m.waitpid.reset_mock()
   1364 
   1365         # sigchld
   1366         with self.ignore_warnings:
   1367             self.watcher._sig_chld()
   1368 
   1369         if isinstance(self.watcher, asyncio.FastChildWatcher):
   1370             # here the FastChildWatche enters a deadlock
   1371             # (there is no way to prevent it)
   1372             self.assertFalse(callback.called)
   1373         else:
   1374             callback.assert_called_once_with(58, 255)
   1375 
   1376     @waitpid_mocks
   1377     def test_sigchld_unknown_pid_during_registration(self, m):
   1378         # register two children
   1379         callback1 = mock.Mock()
   1380         callback2 = mock.Mock()
   1381 
   1382         with self.ignore_warnings, self.watcher:
   1383             self.running = True
   1384             # child 1 terminates
   1385             self.add_zombie(591, 7)
   1386             # an unknown child terminates
   1387             self.add_zombie(593, 17)
   1388 
   1389             self.watcher._sig_chld()
   1390 
   1391             self.watcher.add_child_handler(591, callback1)
   1392             self.watcher.add_child_handler(592, callback2)
   1393 
   1394         callback1.assert_called_once_with(591, 7)
   1395         self.assertFalse(callback2.called)
   1396 
   1397     @waitpid_mocks
   1398     def test_set_loop(self, m):
   1399         # register a child
   1400         callback = mock.Mock()
   1401 
   1402         with self.watcher:
   1403             self.running = True
   1404             self.watcher.add_child_handler(60, callback)
   1405 
   1406         # attach a new loop
   1407         old_loop = self.loop
   1408         self.loop = self.new_test_loop()
   1409         patch = mock.patch.object
   1410 
   1411         with patch(old_loop, "remove_signal_handler") as m_old_remove, \
   1412              patch(self.loop, "add_signal_handler") as m_new_add:
   1413 
   1414             self.watcher.attach_loop(self.loop)
   1415 
   1416             m_old_remove.assert_called_once_with(
   1417                 signal.SIGCHLD)
   1418             m_new_add.assert_called_once_with(
   1419                 signal.SIGCHLD, self.watcher._sig_chld)
   1420 
   1421         # child terminates
   1422         self.running = False
   1423         self.add_zombie(60, 9)
   1424         self.watcher._sig_chld()
   1425 
   1426         callback.assert_called_once_with(60, 9)
   1427 
   1428     @waitpid_mocks
   1429     def test_set_loop_race_condition(self, m):
   1430         # register 3 children
   1431         callback1 = mock.Mock()
   1432         callback2 = mock.Mock()
   1433         callback3 = mock.Mock()
   1434 
   1435         with self.watcher:
   1436             self.running = True
   1437             self.watcher.add_child_handler(61, callback1)
   1438             self.watcher.add_child_handler(62, callback2)
   1439             self.watcher.add_child_handler(622, callback3)
   1440 
   1441         # detach the loop
   1442         old_loop = self.loop
   1443         self.loop = None
   1444 
   1445         with mock.patch.object(
   1446                 old_loop, "remove_signal_handler") as m_remove_signal_handler:
   1447 
   1448             with self.assertWarnsRegex(
   1449                     RuntimeWarning, 'A loop is being detached'):
   1450                 self.watcher.attach_loop(None)
   1451 
   1452             m_remove_signal_handler.assert_called_once_with(
   1453                 signal.SIGCHLD)
   1454 
   1455         # child 1 & 2 terminate
   1456         self.add_zombie(61, 11)
   1457         self.add_zombie(62, -5)
   1458 
   1459         # SIGCHLD was not caught
   1460         self.assertFalse(callback1.called)
   1461         self.assertFalse(callback2.called)
   1462         self.assertFalse(callback3.called)
   1463 
   1464         # attach a new loop
   1465         self.loop = self.new_test_loop()
   1466 
   1467         with mock.patch.object(
   1468                 self.loop, "add_signal_handler") as m_add_signal_handler:
   1469 
   1470             self.watcher.attach_loop(self.loop)
   1471 
   1472             m_add_signal_handler.assert_called_once_with(
   1473                 signal.SIGCHLD, self.watcher._sig_chld)
   1474             callback1.assert_called_once_with(61, 11)  # race condition!
   1475             callback2.assert_called_once_with(62, -5)  # race condition!
   1476             self.assertFalse(callback3.called)
   1477 
   1478         callback1.reset_mock()
   1479         callback2.reset_mock()
   1480 
   1481         # child 3 terminates
   1482         self.running = False
   1483         self.add_zombie(622, 19)
   1484         self.watcher._sig_chld()
   1485 
   1486         self.assertFalse(callback1.called)
   1487         self.assertFalse(callback2.called)
   1488         callback3.assert_called_once_with(622, 19)
   1489 
   1490     @waitpid_mocks
   1491     def test_close(self, m):
   1492         # register two children
   1493         callback1 = mock.Mock()
   1494 
   1495         with self.watcher:
   1496             self.running = True
   1497             # child 1 terminates
   1498             self.add_zombie(63, 9)
   1499             # other child terminates
   1500             self.add_zombie(65, 18)
   1501             self.watcher._sig_chld()
   1502 
   1503             self.watcher.add_child_handler(63, callback1)
   1504             self.watcher.add_child_handler(64, callback1)
   1505 
   1506             self.assertEqual(len(self.watcher._callbacks), 1)
   1507             if isinstance(self.watcher, asyncio.FastChildWatcher):
   1508                 self.assertEqual(len(self.watcher._zombies), 1)
   1509 
   1510             with mock.patch.object(
   1511                     self.loop,
   1512                     "remove_signal_handler") as m_remove_signal_handler:
   1513 
   1514                 self.watcher.close()
   1515 
   1516                 m_remove_signal_handler.assert_called_once_with(
   1517                     signal.SIGCHLD)
   1518                 self.assertFalse(self.watcher._callbacks)
   1519                 if isinstance(self.watcher, asyncio.FastChildWatcher):
   1520                     self.assertFalse(self.watcher._zombies)
   1521 
   1522     @waitpid_mocks
   1523     def test_add_child_handler_with_no_loop_attached(self, m):
   1524         callback = mock.Mock()
   1525         with self.create_watcher() as watcher:
   1526             with self.assertRaisesRegex(
   1527                     RuntimeError,
   1528                     'the child watcher does not have a loop attached'):
   1529                 watcher.add_child_handler(100, callback)
   1530 
   1531 
   1532 class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
   1533     def create_watcher(self):
   1534         return asyncio.SafeChildWatcher()
   1535 
   1536 
   1537 class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
   1538     def create_watcher(self):
   1539         return asyncio.FastChildWatcher()
   1540 
   1541 
   1542 class PolicyTests(unittest.TestCase):
   1543 
   1544     def create_policy(self):
   1545         return asyncio.DefaultEventLoopPolicy()
   1546 
   1547     def test_get_child_watcher(self):
   1548         policy = self.create_policy()
   1549         self.assertIsNone(policy._watcher)
   1550 
   1551         watcher = policy.get_child_watcher()
   1552         self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
   1553 
   1554         self.assertIs(policy._watcher, watcher)
   1555 
   1556         self.assertIs(watcher, policy.get_child_watcher())
   1557         self.assertIsNone(watcher._loop)
   1558 
   1559     def test_get_child_watcher_after_set(self):
   1560         policy = self.create_policy()
   1561         watcher = asyncio.FastChildWatcher()
   1562 
   1563         policy.set_child_watcher(watcher)
   1564         self.assertIs(policy._watcher, watcher)
   1565         self.assertIs(watcher, policy.get_child_watcher())
   1566 
   1567     def test_get_child_watcher_with_mainloop_existing(self):
   1568         policy = self.create_policy()
   1569         loop = policy.get_event_loop()
   1570 
   1571         self.assertIsNone(policy._watcher)
   1572         watcher = policy.get_child_watcher()
   1573 
   1574         self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
   1575         self.assertIs(watcher._loop, loop)
   1576 
   1577         loop.close()
   1578 
   1579     def test_get_child_watcher_thread(self):
   1580 
   1581         def f():
   1582             policy.set_event_loop(policy.new_event_loop())
   1583 
   1584             self.assertIsInstance(policy.get_event_loop(),
   1585                                   asyncio.AbstractEventLoop)
   1586             watcher = policy.get_child_watcher()
   1587 
   1588             self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
   1589             self.assertIsNone(watcher._loop)
   1590 
   1591             policy.get_event_loop().close()
   1592 
   1593         policy = self.create_policy()
   1594 
   1595         th = threading.Thread(target=f)
   1596         th.start()
   1597         th.join()
   1598 
   1599     def test_child_watcher_replace_mainloop_existing(self):
   1600         policy = self.create_policy()
   1601         loop = policy.get_event_loop()
   1602 
   1603         watcher = policy.get_child_watcher()
   1604 
   1605         self.assertIs(watcher._loop, loop)
   1606 
   1607         new_loop = policy.new_event_loop()
   1608         policy.set_event_loop(new_loop)
   1609 
   1610         self.assertIs(watcher._loop, new_loop)
   1611 
   1612         policy.set_event_loop(None)
   1613 
   1614         self.assertIs(watcher._loop, None)
   1615 
   1616         loop.close()
   1617         new_loop.close()
   1618 
   1619 
   1620 if __name__ == '__main__':
   1621     unittest.main()
   1622