1 """ 2 Tests for kqueue wrapper. 3 """ 4 import socket 5 import errno 6 import time 7 import select 8 import sys 9 import unittest 10 11 from test import test_support 12 if not hasattr(select, "kqueue"): 13 raise unittest.SkipTest("test works only on BSD") 14 15 class TestKQueue(unittest.TestCase): 16 def test_create_queue(self): 17 kq = select.kqueue() 18 self.assertTrue(kq.fileno() > 0, kq.fileno()) 19 self.assertTrue(not kq.closed) 20 kq.close() 21 self.assertTrue(kq.closed) 22 self.assertRaises(ValueError, kq.fileno) 23 24 def test_create_event(self): 25 fd = sys.stderr.fileno() 26 ev = select.kevent(fd) 27 other = select.kevent(1000) 28 self.assertEqual(ev.ident, fd) 29 self.assertEqual(ev.filter, select.KQ_FILTER_READ) 30 self.assertEqual(ev.flags, select.KQ_EV_ADD) 31 self.assertEqual(ev.fflags, 0) 32 self.assertEqual(ev.data, 0) 33 self.assertEqual(ev.udata, 0) 34 self.assertEqual(ev, ev) 35 self.assertNotEqual(ev, other) 36 self.assertEqual(cmp(ev, other), -1) 37 self.assertTrue(ev < other) 38 self.assertTrue(other >= ev) 39 self.assertRaises(TypeError, cmp, ev, None) 40 self.assertRaises(TypeError, cmp, ev, 1) 41 self.assertRaises(TypeError, cmp, ev, "ev") 42 43 ev = select.kevent(fd, select.KQ_FILTER_WRITE) 44 self.assertEqual(ev.ident, fd) 45 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 46 self.assertEqual(ev.flags, select.KQ_EV_ADD) 47 self.assertEqual(ev.fflags, 0) 48 self.assertEqual(ev.data, 0) 49 self.assertEqual(ev.udata, 0) 50 self.assertEqual(ev, ev) 51 self.assertNotEqual(ev, other) 52 53 ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) 54 self.assertEqual(ev.ident, fd) 55 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 56 self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) 57 self.assertEqual(ev.fflags, 0) 58 self.assertEqual(ev.data, 0) 59 self.assertEqual(ev.udata, 0) 60 self.assertEqual(ev, ev) 61 self.assertNotEqual(ev, other) 62 63 ev = select.kevent(1, 2, 3, 4, 5, 6) 64 self.assertEqual(ev.ident, 1) 65 self.assertEqual(ev.filter, 2) 66 self.assertEqual(ev.flags, 3) 67 self.assertEqual(ev.fflags, 4) 68 self.assertEqual(ev.data, 5) 69 self.assertEqual(ev.udata, 6) 70 self.assertEqual(ev, ev) 71 self.assertNotEqual(ev, other) 72 73 bignum = sys.maxsize * 2 + 1 74 ev = select.kevent(bignum, 1, 2, 3, sys.maxsize, bignum) 75 self.assertEqual(ev.ident, bignum) 76 self.assertEqual(ev.filter, 1) 77 self.assertEqual(ev.flags, 2) 78 self.assertEqual(ev.fflags, 3) 79 self.assertEqual(ev.data, sys.maxsize) 80 self.assertEqual(ev.udata, bignum) 81 self.assertEqual(ev, ev) 82 self.assertNotEqual(ev, other) 83 84 def test_queue_event(self): 85 serverSocket = socket.socket() 86 serverSocket.bind(('127.0.0.1', 0)) 87 serverSocket.listen(1) 88 client = socket.socket() 89 client.setblocking(False) 90 try: 91 client.connect(('127.0.0.1', serverSocket.getsockname()[1])) 92 except socket.error, e: 93 self.assertEqual(e.args[0], errno.EINPROGRESS) 94 else: 95 #raise AssertionError("Connect should have raised EINPROGRESS") 96 pass # FreeBSD doesn't raise an exception here 97 server, addr = serverSocket.accept() 98 99 kq = select.kqueue() 100 kq2 = select.kqueue.fromfd(kq.fileno()) 101 102 ev = select.kevent(server.fileno(), 103 select.KQ_FILTER_WRITE, 104 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 105 kq.control([ev], 0) 106 ev = select.kevent(server.fileno(), 107 select.KQ_FILTER_READ, 108 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 109 kq.control([ev], 0) 110 ev = select.kevent(client.fileno(), 111 select.KQ_FILTER_WRITE, 112 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 113 kq2.control([ev], 0) 114 ev = select.kevent(client.fileno(), 115 select.KQ_FILTER_READ, 116 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 117 kq2.control([ev], 0) 118 119 events = kq.control(None, 4, 1) 120 events = set((e.ident, e.filter) for e in events) 121 self.assertEqual(events, set([ 122 (client.fileno(), select.KQ_FILTER_WRITE), 123 (server.fileno(), select.KQ_FILTER_WRITE)])) 124 125 client.send("Hello!") 126 server.send("world!!!") 127 128 # We may need to call it several times 129 for i in range(10): 130 events = kq.control(None, 4, 1) 131 if len(events) == 4: 132 break 133 time.sleep(1.0) 134 else: 135 self.fail('timeout waiting for event notifications') 136 137 events = set((e.ident, e.filter) for e in events) 138 self.assertEqual(events, set([ 139 (client.fileno(), select.KQ_FILTER_WRITE), 140 (client.fileno(), select.KQ_FILTER_READ), 141 (server.fileno(), select.KQ_FILTER_WRITE), 142 (server.fileno(), select.KQ_FILTER_READ)])) 143 144 # Remove completely client, and server read part 145 ev = select.kevent(client.fileno(), 146 select.KQ_FILTER_WRITE, 147 select.KQ_EV_DELETE) 148 kq.control([ev], 0) 149 ev = select.kevent(client.fileno(), 150 select.KQ_FILTER_READ, 151 select.KQ_EV_DELETE) 152 kq.control([ev], 0) 153 ev = select.kevent(server.fileno(), 154 select.KQ_FILTER_READ, 155 select.KQ_EV_DELETE) 156 kq.control([ev], 0, 0) 157 158 events = kq.control([], 4, 0.99) 159 events = set((e.ident, e.filter) for e in events) 160 self.assertEqual(events, set([ 161 (server.fileno(), select.KQ_FILTER_WRITE)])) 162 163 client.close() 164 server.close() 165 serverSocket.close() 166 167 def testPair(self): 168 kq = select.kqueue() 169 a, b = socket.socketpair() 170 171 a.send(b'foo') 172 event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 173 event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 174 r = kq.control([event1, event2], 1, 1) 175 self.assertTrue(r) 176 self.assertFalse(r[0].flags & select.KQ_EV_ERROR) 177 self.assertEqual(b.recv(r[0].data), b'foo') 178 179 a.close() 180 b.close() 181 kq.close() 182 183 def test_main(): 184 test_support.run_unittest(TestKQueue) 185 186 if __name__ == "__main__": 187 test_main() 188