1 # Some simple queue module tests, plus some failure conditions 2 # to ensure the Queue locks remain stable. 3 import Queue 4 import time 5 import unittest 6 from test import test_support 7 threading = test_support.import_module('threading') 8 9 QUEUE_SIZE = 5 10 11 # A thread to run a function that unclogs a blocked Queue. 12 class _TriggerThread(threading.Thread): 13 def __init__(self, fn, args): 14 self.fn = fn 15 self.args = args 16 self.startedEvent = threading.Event() 17 threading.Thread.__init__(self) 18 19 def run(self): 20 # The sleep isn't necessary, but is intended to give the blocking 21 # function in the main thread a chance at actually blocking before 22 # we unclog it. But if the sleep is longer than the timeout-based 23 # tests wait in their blocking functions, those tests will fail. 24 # So we give them much longer timeout values compared to the 25 # sleep here (I aimed at 10 seconds for blocking functions -- 26 # they should never actually wait that long - they should make 27 # progress as soon as we call self.fn()). 28 time.sleep(0.1) 29 self.startedEvent.set() 30 self.fn(*self.args) 31 32 33 # Execute a function that blocks, and in a separate thread, a function that 34 # triggers the release. Returns the result of the blocking function. Caution: 35 # block_func must guarantee to block until trigger_func is called, and 36 # trigger_func must guarantee to change queue state so that block_func can make 37 # enough progress to return. In particular, a block_func that just raises an 38 # exception regardless of whether trigger_func is called will lead to 39 # timing-dependent sporadic failures, and one of those went rarely seen but 40 # undiagnosed for years. Now block_func must be unexceptional. If block_func 41 # is supposed to raise an exception, call do_exceptional_blocking_test() 42 # instead. 43 44 class BlockingTestMixin: 45 46 def tearDown(self): 47 self.t = None 48 49 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): 50 self.t = _TriggerThread(trigger_func, trigger_args) 51 self.t.start() 52 self.result = block_func(*block_args) 53 # If block_func returned before our thread made the call, we failed! 54 if not self.t.startedEvent.is_set(): 55 self.fail("blocking function '%r' appeared not to block" % 56 block_func) 57 self.t.join(10) # make sure the thread terminates 58 if self.t.is_alive(): 59 self.fail("trigger function '%r' appeared to not return" % 60 trigger_func) 61 return self.result 62 63 # Call this instead if block_func is supposed to raise an exception. 64 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, 65 trigger_args, expected_exception_class): 66 self.t = _TriggerThread(trigger_func, trigger_args) 67 self.t.start() 68 try: 69 try: 70 block_func(*block_args) 71 except expected_exception_class: 72 raise 73 else: 74 self.fail("expected exception of kind %r" % 75 expected_exception_class) 76 finally: 77 self.t.join(10) # make sure the thread terminates 78 if self.t.is_alive(): 79 self.fail("trigger function '%r' appeared to not return" % 80 trigger_func) 81 if not self.t.startedEvent.is_set(): 82 self.fail("trigger thread ended but event never set") 83 84 85 class BaseQueueTest(BlockingTestMixin): 86 def setUp(self): 87 self.cum = 0 88 self.cumlock = threading.Lock() 89 90 def simple_queue_test(self, q): 91 if not q.empty(): 92 raise RuntimeError, "Call this function with an empty queue" 93 # I guess we better check things actually queue correctly a little :) 94 q.put(111) 95 q.put(333) 96 q.put(222) 97 target_order = dict(Queue = [111, 333, 222], 98 LifoQueue = [222, 333, 111], 99 PriorityQueue = [111, 222, 333]) 100 actual_order = [q.get(), q.get(), q.get()] 101 self.assertEqual(actual_order, target_order[q.__class__.__name__], 102 "Didn't seem to queue the correct data!") 103 for i in range(QUEUE_SIZE-1): 104 q.put(i) 105 self.assertTrue(not q.empty(), "Queue should not be empty") 106 self.assertTrue(not q.full(), "Queue should not be full") 107 last = 2 * QUEUE_SIZE 108 full = 3 * 2 * QUEUE_SIZE 109 q.put(last) 110 self.assertTrue(q.full(), "Queue should be full") 111 try: 112 q.put(full, block=0) 113 self.fail("Didn't appear to block with a full queue") 114 except Queue.Full: 115 pass 116 try: 117 q.put(full, timeout=0.01) 118 self.fail("Didn't appear to time-out with a full queue") 119 except Queue.Full: 120 pass 121 # Test a blocking put 122 self.do_blocking_test(q.put, (full,), q.get, ()) 123 self.do_blocking_test(q.put, (full, True, 10), q.get, ()) 124 # Empty it 125 for i in range(QUEUE_SIZE): 126 q.get() 127 self.assertTrue(q.empty(), "Queue should be empty") 128 try: 129 q.get(block=0) 130 self.fail("Didn't appear to block with an empty queue") 131 except Queue.Empty: 132 pass 133 try: 134 q.get(timeout=0.01) 135 self.fail("Didn't appear to time-out with an empty queue") 136 except Queue.Empty: 137 pass 138 # Test a blocking get 139 self.do_blocking_test(q.get, (), q.put, ('empty',)) 140 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) 141 142 143 def worker(self, q): 144 while True: 145 x = q.get() 146 if x is None: 147 q.task_done() 148 return 149 with self.cumlock: 150 self.cum += x 151 q.task_done() 152 153 def queue_join_test(self, q): 154 self.cum = 0 155 for i in (0,1): 156 threading.Thread(target=self.worker, args=(q,)).start() 157 for i in xrange(100): 158 q.put(i) 159 q.join() 160 self.assertEqual(self.cum, sum(range(100)), 161 "q.join() did not block until all tasks were done") 162 for i in (0,1): 163 q.put(None) # instruct the threads to close 164 q.join() # verify that you can join twice 165 166 def test_queue_task_done(self): 167 # Test to make sure a queue task completed successfully. 168 q = self.type2test() 169 try: 170 q.task_done() 171 except ValueError: 172 pass 173 else: 174 self.fail("Did not detect task count going negative") 175 176 def test_queue_join(self): 177 # Test that a queue join()s successfully, and before anything else 178 # (done twice for insurance). 179 q = self.type2test() 180 self.queue_join_test(q) 181 self.queue_join_test(q) 182 try: 183 q.task_done() 184 except ValueError: 185 pass 186 else: 187 self.fail("Did not detect task count going negative") 188 189 def test_simple_queue(self): 190 # Do it a couple of times on the same queue. 191 # Done twice to make sure works with same instance reused. 192 q = self.type2test(QUEUE_SIZE) 193 self.simple_queue_test(q) 194 self.simple_queue_test(q) 195 196 197 class QueueTest(BaseQueueTest, unittest.TestCase): 198 type2test = Queue.Queue 199 200 class LifoQueueTest(BaseQueueTest, unittest.TestCase): 201 type2test = Queue.LifoQueue 202 203 class PriorityQueueTest(BaseQueueTest, unittest.TestCase): 204 type2test = Queue.PriorityQueue 205 206 207 208 # A Queue subclass that can provoke failure at a moment's notice :) 209 class FailingQueueException(Exception): 210 pass 211 212 class FailingQueue(Queue.Queue): 213 def __init__(self, *args): 214 self.fail_next_put = False 215 self.fail_next_get = False 216 Queue.Queue.__init__(self, *args) 217 def _put(self, item): 218 if self.fail_next_put: 219 self.fail_next_put = False 220 raise FailingQueueException, "You Lose" 221 return Queue.Queue._put(self, item) 222 def _get(self): 223 if self.fail_next_get: 224 self.fail_next_get = False 225 raise FailingQueueException, "You Lose" 226 return Queue.Queue._get(self) 227 228 class FailingQueueTest(BlockingTestMixin, unittest.TestCase): 229 230 def failing_queue_test(self, q): 231 if not q.empty(): 232 raise RuntimeError, "Call this function with an empty queue" 233 for i in range(QUEUE_SIZE-1): 234 q.put(i) 235 # Test a failing non-blocking put. 236 q.fail_next_put = True 237 try: 238 q.put("oops", block=0) 239 self.fail("The queue didn't fail when it should have") 240 except FailingQueueException: 241 pass 242 q.fail_next_put = True 243 try: 244 q.put("oops", timeout=0.1) 245 self.fail("The queue didn't fail when it should have") 246 except FailingQueueException: 247 pass 248 q.put("last") 249 self.assertTrue(q.full(), "Queue should be full") 250 # Test a failing blocking put 251 q.fail_next_put = True 252 try: 253 self.do_blocking_test(q.put, ("full",), q.get, ()) 254 self.fail("The queue didn't fail when it should have") 255 except FailingQueueException: 256 pass 257 # Check the Queue isn't damaged. 258 # put failed, but get succeeded - re-add 259 q.put("last") 260 # Test a failing timeout put 261 q.fail_next_put = True 262 try: 263 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), 264 FailingQueueException) 265 self.fail("The queue didn't fail when it should have") 266 except FailingQueueException: 267 pass 268 # Check the Queue isn't damaged. 269 # put failed, but get succeeded - re-add 270 q.put("last") 271 self.assertTrue(q.full(), "Queue should be full") 272 q.get() 273 self.assertTrue(not q.full(), "Queue should not be full") 274 q.put("last") 275 self.assertTrue(q.full(), "Queue should be full") 276 # Test a blocking put 277 self.do_blocking_test(q.put, ("full",), q.get, ()) 278 # Empty it 279 for i in range(QUEUE_SIZE): 280 q.get() 281 self.assertTrue(q.empty(), "Queue should be empty") 282 q.put("first") 283 q.fail_next_get = True 284 try: 285 q.get() 286 self.fail("The queue didn't fail when it should have") 287 except FailingQueueException: 288 pass 289 self.assertTrue(not q.empty(), "Queue should not be empty") 290 q.fail_next_get = True 291 try: 292 q.get(timeout=0.1) 293 self.fail("The queue didn't fail when it should have") 294 except FailingQueueException: 295 pass 296 self.assertTrue(not q.empty(), "Queue should not be empty") 297 q.get() 298 self.assertTrue(q.empty(), "Queue should be empty") 299 q.fail_next_get = True 300 try: 301 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), 302 FailingQueueException) 303 self.fail("The queue didn't fail when it should have") 304 except FailingQueueException: 305 pass 306 # put succeeded, but get failed. 307 self.assertTrue(not q.empty(), "Queue should not be empty") 308 q.get() 309 self.assertTrue(q.empty(), "Queue should be empty") 310 311 def test_failing_queue(self): 312 # Test to make sure a queue is functioning correctly. 313 # Done twice to the same instance. 314 q = FailingQueue(QUEUE_SIZE) 315 self.failing_queue_test(q) 316 self.failing_queue_test(q) 317 318 319 def test_main(): 320 test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest, 321 FailingQueueTest) 322 323 324 if __name__ == "__main__": 325 test_main() 326