Home | History | Annotate | Download | only in test_asyncio
      1 """Tests for window_utils"""
      2 
      3 import socket
      4 import sys
      5 import unittest
      6 import warnings
      7 from unittest import mock
      8 
      9 if sys.platform != 'win32':
     10     raise unittest.SkipTest('Windows only')
     11 
     12 import _winapi
     13 
     14 from asyncio import _overlapped
     15 from asyncio import windows_utils
     16 try:
     17     from test import support
     18 except ImportError:
     19     from asyncio import test_support as support
     20 
     21 
     22 class WinsocketpairTests(unittest.TestCase):
     23 
     24     def check_winsocketpair(self, ssock, csock):
     25         csock.send(b'xxx')
     26         self.assertEqual(b'xxx', ssock.recv(1024))
     27         csock.close()
     28         ssock.close()
     29 
     30     def test_winsocketpair(self):
     31         ssock, csock = windows_utils.socketpair()
     32         self.check_winsocketpair(ssock, csock)
     33 
     34     @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
     35     def test_winsocketpair_ipv6(self):
     36         ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
     37         self.check_winsocketpair(ssock, csock)
     38 
     39     @unittest.skipIf(hasattr(socket, 'socketpair'),
     40                      'socket.socketpair is available')
     41     @mock.patch('asyncio.windows_utils.socket')
     42     def test_winsocketpair_exc(self, m_socket):
     43         m_socket.AF_INET = socket.AF_INET
     44         m_socket.SOCK_STREAM = socket.SOCK_STREAM
     45         m_socket.socket.return_value.getsockname.return_value = ('', 12345)
     46         m_socket.socket.return_value.accept.return_value = object(), object()
     47         m_socket.socket.return_value.connect.side_effect = OSError()
     48 
     49         self.assertRaises(OSError, windows_utils.socketpair)
     50 
     51     def test_winsocketpair_invalid_args(self):
     52         self.assertRaises(ValueError,
     53                           windows_utils.socketpair, family=socket.AF_UNSPEC)
     54         self.assertRaises(ValueError,
     55                           windows_utils.socketpair, type=socket.SOCK_DGRAM)
     56         self.assertRaises(ValueError,
     57                           windows_utils.socketpair, proto=1)
     58 
     59     @unittest.skipIf(hasattr(socket, 'socketpair'),
     60                      'socket.socketpair is available')
     61     @mock.patch('asyncio.windows_utils.socket')
     62     def test_winsocketpair_close(self, m_socket):
     63         m_socket.AF_INET = socket.AF_INET
     64         m_socket.SOCK_STREAM = socket.SOCK_STREAM
     65         sock = mock.Mock()
     66         m_socket.socket.return_value = sock
     67         sock.bind.side_effect = OSError
     68         self.assertRaises(OSError, windows_utils.socketpair)
     69         self.assertTrue(sock.close.called)
     70 
     71 
     72 class PipeTests(unittest.TestCase):
     73 
     74     def test_pipe_overlapped(self):
     75         h1, h2 = windows_utils.pipe(overlapped=(True, True))
     76         try:
     77             ov1 = _overlapped.Overlapped()
     78             self.assertFalse(ov1.pending)
     79             self.assertEqual(ov1.error, 0)
     80 
     81             ov1.ReadFile(h1, 100)
     82             self.assertTrue(ov1.pending)
     83             self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING)
     84             ERROR_IO_INCOMPLETE = 996
     85             try:
     86                 ov1.getresult()
     87             except OSError as e:
     88                 self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE)
     89             else:
     90                 raise RuntimeError('expected ERROR_IO_INCOMPLETE')
     91 
     92             ov2 = _overlapped.Overlapped()
     93             self.assertFalse(ov2.pending)
     94             self.assertEqual(ov2.error, 0)
     95 
     96             ov2.WriteFile(h2, b"hello")
     97             self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
     98 
     99             res = _winapi.WaitForMultipleObjects([ov2.event], False, 100)
    100             self.assertEqual(res, _winapi.WAIT_OBJECT_0)
    101 
    102             self.assertFalse(ov1.pending)
    103             self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE)
    104             self.assertFalse(ov2.pending)
    105             self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
    106             self.assertEqual(ov1.getresult(), b"hello")
    107         finally:
    108             _winapi.CloseHandle(h1)
    109             _winapi.CloseHandle(h2)
    110 
    111     def test_pipe_handle(self):
    112         h, _ = windows_utils.pipe(overlapped=(True, True))
    113         _winapi.CloseHandle(_)
    114         p = windows_utils.PipeHandle(h)
    115         self.assertEqual(p.fileno(), h)
    116         self.assertEqual(p.handle, h)
    117 
    118         # check garbage collection of p closes handle
    119         with warnings.catch_warnings():
    120             warnings.filterwarnings("ignore", "",  ResourceWarning)
    121             del p
    122             support.gc_collect()
    123         try:
    124             _winapi.CloseHandle(h)
    125         except OSError as e:
    126             self.assertEqual(e.winerror, 6)     # ERROR_INVALID_HANDLE
    127         else:
    128             raise RuntimeError('expected ERROR_INVALID_HANDLE')
    129 
    130 
    131 class PopenTests(unittest.TestCase):
    132 
    133     def test_popen(self):
    134         command = r"""if 1:
    135             import sys
    136             s = sys.stdin.readline()
    137             sys.stdout.write(s.upper())
    138             sys.stderr.write('stderr')
    139             """
    140         msg = b"blah\n"
    141 
    142         p = windows_utils.Popen([sys.executable, '-c', command],
    143                                 stdin=windows_utils.PIPE,
    144                                 stdout=windows_utils.PIPE,
    145                                 stderr=windows_utils.PIPE)
    146 
    147         for f in [p.stdin, p.stdout, p.stderr]:
    148             self.assertIsInstance(f, windows_utils.PipeHandle)
    149 
    150         ovin = _overlapped.Overlapped()
    151         ovout = _overlapped.Overlapped()
    152         overr = _overlapped.Overlapped()
    153 
    154         ovin.WriteFile(p.stdin.handle, msg)
    155         ovout.ReadFile(p.stdout.handle, 100)
    156         overr.ReadFile(p.stderr.handle, 100)
    157 
    158         events = [ovin.event, ovout.event, overr.event]
    159         # Super-long timeout for slow buildbots.
    160         res = _winapi.WaitForMultipleObjects(events, True, 10000)
    161         self.assertEqual(res, _winapi.WAIT_OBJECT_0)
    162         self.assertFalse(ovout.pending)
    163         self.assertFalse(overr.pending)
    164         self.assertFalse(ovin.pending)
    165 
    166         self.assertEqual(ovin.getresult(), len(msg))
    167         out = ovout.getresult().rstrip()
    168         err = overr.getresult().rstrip()
    169 
    170         self.assertGreater(len(out), 0)
    171         self.assertGreater(len(err), 0)
    172         # allow for partial reads...
    173         self.assertTrue(msg.upper().rstrip().startswith(out))
    174         self.assertTrue(b"stderr".startswith(err))
    175 
    176         # The context manager calls wait() and closes resources
    177         with p:
    178             pass
    179 
    180 
    181 if __name__ == '__main__':
    182     unittest.main()
    183