1 """ 2 Tests for kqueue wrapper. 3 """ 4 import socket 5 import errno 6 import time 7 import select 8 import sys 9 import unittest 10 11 from test import test_support 12 if not hasattr(select, "kqueue"): 13 raise unittest.SkipTest("test works only on BSD") 14 15 class TestKQueue(unittest.TestCase): 16 def test_create_queue(self): 17 kq = select.kqueue() 18 self.assertTrue(kq.fileno() > 0, kq.fileno()) 19 self.assertTrue(not kq.closed) 20 kq.close() 21 self.assertTrue(kq.closed) 22 self.assertRaises(ValueError, kq.fileno) 23 24 def test_create_event(self): 25 fd = sys.stderr.fileno() 26 ev = select.kevent(fd) 27 other = select.kevent(1000) 28 self.assertEqual(ev.ident, fd) 29 self.assertEqual(ev.filter, select.KQ_FILTER_READ) 30 self.assertEqual(ev.flags, select.KQ_EV_ADD) 31 self.assertEqual(ev.fflags, 0) 32 self.assertEqual(ev.data, 0) 33 self.assertEqual(ev.udata, 0) 34 self.assertEqual(ev, ev) 35 self.assertNotEqual(ev, other) 36 self.assertEqual(cmp(ev, other), -1) 37 self.assertTrue(ev < other) 38 self.assertTrue(other >= ev) 39 self.assertRaises(TypeError, cmp, ev, None) 40 self.assertRaises(TypeError, cmp, ev, 1) 41 self.assertRaises(TypeError, cmp, ev, "ev") 42 43 ev = select.kevent(fd, select.KQ_FILTER_WRITE) 44 self.assertEqual(ev.ident, fd) 45 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 46 self.assertEqual(ev.flags, select.KQ_EV_ADD) 47 self.assertEqual(ev.fflags, 0) 48 self.assertEqual(ev.data, 0) 49 self.assertEqual(ev.udata, 0) 50 self.assertEqual(ev, ev) 51 self.assertNotEqual(ev, other) 52 53 ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) 54 self.assertEqual(ev.ident, fd) 55 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 56 self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) 57 self.assertEqual(ev.fflags, 0) 58 self.assertEqual(ev.data, 0) 59 self.assertEqual(ev.udata, 0) 60 self.assertEqual(ev, ev) 61 self.assertNotEqual(ev, other) 62 63 ev = select.kevent(1, 2, 3, 4, 5, 6) 64 self.assertEqual(ev.ident, 1) 65 self.assertEqual(ev.filter, 2) 66 self.assertEqual(ev.flags, 3) 67 self.assertEqual(ev.fflags, 4) 68 self.assertEqual(ev.data, 5) 69 self.assertEqual(ev.udata, 6) 70 self.assertEqual(ev, ev) 71 self.assertNotEqual(ev, other) 72 73 bignum = sys.maxsize * 2 + 1 74 ev = select.kevent(bignum, 1, 2, 3, sys.maxsize, bignum) 75 self.assertEqual(ev.ident, bignum) 76 self.assertEqual(ev.filter, 1) 77 self.assertEqual(ev.flags, 2) 78 self.assertEqual(ev.fflags, 3) 79 self.assertEqual(ev.data, sys.maxsize) 80 self.assertEqual(ev.udata, bignum) 81 self.assertEqual(ev, ev) 82 self.assertNotEqual(ev, other) 83 84 def test_queue_event(self): 85 serverSocket = socket.socket() 86 serverSocket.bind(('127.0.0.1', 0)) 87 serverSocket.listen(1) 88 client = socket.socket() 89 client.setblocking(False) 90 try: 91 client.connect(('127.0.0.1', serverSocket.getsockname()[1])) 92 except socket.error, e: 93 self.assertEqual(e.args[0], errno.EINPROGRESS) 94 else: 95 #raise AssertionError("Connect should have raised EINPROGRESS") 96 pass # FreeBSD doesn't raise an exception here 97 server, addr = serverSocket.accept() 98 99 if sys.platform.startswith("darwin"): 100 flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE 101 else: 102 flags = 0 103 104 kq = select.kqueue() 105 kq2 = select.kqueue.fromfd(kq.fileno()) 106 107 ev = select.kevent(server.fileno(), 108 select.KQ_FILTER_WRITE, 109 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 110 kq.control([ev], 0) 111 ev = select.kevent(server.fileno(), 112 select.KQ_FILTER_READ, 113 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 114 kq.control([ev], 0) 115 ev = select.kevent(client.fileno(), 116 select.KQ_FILTER_WRITE, 117 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 118 kq2.control([ev], 0) 119 ev = select.kevent(client.fileno(), 120 select.KQ_FILTER_READ, 121 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 122 kq2.control([ev], 0) 123 124 events = kq.control(None, 4, 1) 125 events = [(e.ident, e.filter, e.flags) for e in events] 126 events.sort() 127 self.assertEqual(events, [ 128 (client.fileno(), select.KQ_FILTER_WRITE, flags), 129 (server.fileno(), select.KQ_FILTER_WRITE, flags)]) 130 131 client.send("Hello!") 132 server.send("world!!!") 133 134 # We may need to call it several times 135 for i in range(10): 136 events = kq.control(None, 4, 1) 137 if len(events) == 4: 138 break 139 time.sleep(1.0) 140 else: 141 self.fail('timeout waiting for event notifications') 142 143 events = [(e.ident, e.filter, e.flags) for e in events] 144 events.sort() 145 146 self.assertEqual(events, [ 147 (client.fileno(), select.KQ_FILTER_WRITE, flags), 148 (client.fileno(), select.KQ_FILTER_READ, flags), 149 (server.fileno(), select.KQ_FILTER_WRITE, flags), 150 (server.fileno(), select.KQ_FILTER_READ, flags)]) 151 152 # Remove completely client, and server read part 153 ev = select.kevent(client.fileno(), 154 select.KQ_FILTER_WRITE, 155 select.KQ_EV_DELETE) 156 kq.control([ev], 0) 157 ev = select.kevent(client.fileno(), 158 select.KQ_FILTER_READ, 159 select.KQ_EV_DELETE) 160 kq.control([ev], 0) 161 ev = select.kevent(server.fileno(), 162 select.KQ_FILTER_READ, 163 select.KQ_EV_DELETE) 164 kq.control([ev], 0, 0) 165 166 events = kq.control([], 4, 0.99) 167 events = [(e.ident, e.filter, e.flags) for e in events] 168 events.sort() 169 self.assertEqual(events, [ 170 (server.fileno(), select.KQ_FILTER_WRITE, flags)]) 171 172 client.close() 173 server.close() 174 serverSocket.close() 175 176 def testPair(self): 177 kq = select.kqueue() 178 a, b = socket.socketpair() 179 180 a.send(b'foo') 181 event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 182 event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 183 r = kq.control([event1, event2], 1, 1) 184 self.assertTrue(r) 185 self.assertFalse(r[0].flags & select.KQ_EV_ERROR) 186 self.assertEqual(b.recv(r[0].data), b'foo') 187 188 a.close() 189 b.close() 190 kq.close() 191 192 def test_main(): 193 test_support.run_unittest(TestKQueue) 194 195 if __name__ == "__main__": 196 test_main() 197