1 """Tests for window_utils""" 2 3 import socket 4 import sys 5 import unittest 6 import warnings 7 from unittest import mock 8 9 if sys.platform != 'win32': 10 raise unittest.SkipTest('Windows only') 11 12 import _winapi 13 14 from asyncio import _overlapped 15 from asyncio import windows_utils 16 try: 17 from test import support 18 except ImportError: 19 from asyncio import test_support as support 20 21 22 class WinsocketpairTests(unittest.TestCase): 23 24 def check_winsocketpair(self, ssock, csock): 25 csock.send(b'xxx') 26 self.assertEqual(b'xxx', ssock.recv(1024)) 27 csock.close() 28 ssock.close() 29 30 def test_winsocketpair(self): 31 ssock, csock = windows_utils.socketpair() 32 self.check_winsocketpair(ssock, csock) 33 34 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') 35 def test_winsocketpair_ipv6(self): 36 ssock, csock = windows_utils.socketpair(family=socket.AF_INET6) 37 self.check_winsocketpair(ssock, csock) 38 39 @unittest.skipIf(hasattr(socket, 'socketpair'), 40 'socket.socketpair is available') 41 @mock.patch('asyncio.windows_utils.socket') 42 def test_winsocketpair_exc(self, m_socket): 43 m_socket.AF_INET = socket.AF_INET 44 m_socket.SOCK_STREAM = socket.SOCK_STREAM 45 m_socket.socket.return_value.getsockname.return_value = ('', 12345) 46 m_socket.socket.return_value.accept.return_value = object(), object() 47 m_socket.socket.return_value.connect.side_effect = OSError() 48 49 self.assertRaises(OSError, windows_utils.socketpair) 50 51 def test_winsocketpair_invalid_args(self): 52 self.assertRaises(ValueError, 53 windows_utils.socketpair, family=socket.AF_UNSPEC) 54 self.assertRaises(ValueError, 55 windows_utils.socketpair, type=socket.SOCK_DGRAM) 56 self.assertRaises(ValueError, 57 windows_utils.socketpair, proto=1) 58 59 @unittest.skipIf(hasattr(socket, 'socketpair'), 60 'socket.socketpair is available') 61 @mock.patch('asyncio.windows_utils.socket') 62 def test_winsocketpair_close(self, m_socket): 63 m_socket.AF_INET = socket.AF_INET 64 m_socket.SOCK_STREAM = socket.SOCK_STREAM 65 sock = mock.Mock() 66 m_socket.socket.return_value = sock 67 sock.bind.side_effect = OSError 68 self.assertRaises(OSError, windows_utils.socketpair) 69 self.assertTrue(sock.close.called) 70 71 72 class PipeTests(unittest.TestCase): 73 74 def test_pipe_overlapped(self): 75 h1, h2 = windows_utils.pipe(overlapped=(True, True)) 76 try: 77 ov1 = _overlapped.Overlapped() 78 self.assertFalse(ov1.pending) 79 self.assertEqual(ov1.error, 0) 80 81 ov1.ReadFile(h1, 100) 82 self.assertTrue(ov1.pending) 83 self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING) 84 ERROR_IO_INCOMPLETE = 996 85 try: 86 ov1.getresult() 87 except OSError as e: 88 self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE) 89 else: 90 raise RuntimeError('expected ERROR_IO_INCOMPLETE') 91 92 ov2 = _overlapped.Overlapped() 93 self.assertFalse(ov2.pending) 94 self.assertEqual(ov2.error, 0) 95 96 ov2.WriteFile(h2, b"hello") 97 self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING}) 98 99 res = _winapi.WaitForMultipleObjects([ov2.event], False, 100) 100 self.assertEqual(res, _winapi.WAIT_OBJECT_0) 101 102 self.assertFalse(ov1.pending) 103 self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE) 104 self.assertFalse(ov2.pending) 105 self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING}) 106 self.assertEqual(ov1.getresult(), b"hello") 107 finally: 108 _winapi.CloseHandle(h1) 109 _winapi.CloseHandle(h2) 110 111 def test_pipe_handle(self): 112 h, _ = windows_utils.pipe(overlapped=(True, True)) 113 _winapi.CloseHandle(_) 114 p = windows_utils.PipeHandle(h) 115 self.assertEqual(p.fileno(), h) 116 self.assertEqual(p.handle, h) 117 118 # check garbage collection of p closes handle 119 with warnings.catch_warnings(): 120 warnings.filterwarnings("ignore", "", ResourceWarning) 121 del p 122 support.gc_collect() 123 try: 124 _winapi.CloseHandle(h) 125 except OSError as e: 126 self.assertEqual(e.winerror, 6) # ERROR_INVALID_HANDLE 127 else: 128 raise RuntimeError('expected ERROR_INVALID_HANDLE') 129 130 131 class PopenTests(unittest.TestCase): 132 133 def test_popen(self): 134 command = r"""if 1: 135 import sys 136 s = sys.stdin.readline() 137 sys.stdout.write(s.upper()) 138 sys.stderr.write('stderr') 139 """ 140 msg = b"blah\n" 141 142 p = windows_utils.Popen([sys.executable, '-c', command], 143 stdin=windows_utils.PIPE, 144 stdout=windows_utils.PIPE, 145 stderr=windows_utils.PIPE) 146 147 for f in [p.stdin, p.stdout, p.stderr]: 148 self.assertIsInstance(f, windows_utils.PipeHandle) 149 150 ovin = _overlapped.Overlapped() 151 ovout = _overlapped.Overlapped() 152 overr = _overlapped.Overlapped() 153 154 ovin.WriteFile(p.stdin.handle, msg) 155 ovout.ReadFile(p.stdout.handle, 100) 156 overr.ReadFile(p.stderr.handle, 100) 157 158 events = [ovin.event, ovout.event, overr.event] 159 # Super-long timeout for slow buildbots. 160 res = _winapi.WaitForMultipleObjects(events, True, 10000) 161 self.assertEqual(res, _winapi.WAIT_OBJECT_0) 162 self.assertFalse(ovout.pending) 163 self.assertFalse(overr.pending) 164 self.assertFalse(ovin.pending) 165 166 self.assertEqual(ovin.getresult(), len(msg)) 167 out = ovout.getresult().rstrip() 168 err = overr.getresult().rstrip() 169 170 self.assertGreater(len(out), 0) 171 self.assertGreater(len(err), 0) 172 # allow for partial reads... 173 self.assertTrue(msg.upper().rstrip().startswith(out)) 174 self.assertTrue(b"stderr".startswith(err)) 175 176 # The context manager calls wait() and closes resources 177 with p: 178 pass 179 180 181 if __name__ == '__main__': 182 unittest.main() 183