1 # Some simple queue module tests, plus some failure conditions 2 # to ensure the Queue locks remain stable. 3 import queue 4 import time 5 import unittest 6 from test import support 7 threading = support.import_module('threading') 8 9 QUEUE_SIZE = 5 10 11 def qfull(q): 12 return q.maxsize > 0 and q.qsize() == q.maxsize 13 14 # A thread to run a function that unclogs a blocked Queue. 15 class _TriggerThread(threading.Thread): 16 def __init__(self, fn, args): 17 self.fn = fn 18 self.args = args 19 self.startedEvent = threading.Event() 20 threading.Thread.__init__(self) 21 22 def run(self): 23 # The sleep isn't necessary, but is intended to give the blocking 24 # function in the main thread a chance at actually blocking before 25 # we unclog it. But if the sleep is longer than the timeout-based 26 # tests wait in their blocking functions, those tests will fail. 27 # So we give them much longer timeout values compared to the 28 # sleep here (I aimed at 10 seconds for blocking functions -- 29 # they should never actually wait that long - they should make 30 # progress as soon as we call self.fn()). 31 time.sleep(0.1) 32 self.startedEvent.set() 33 self.fn(*self.args) 34 35 36 # Execute a function that blocks, and in a separate thread, a function that 37 # triggers the release. Returns the result of the blocking function. Caution: 38 # block_func must guarantee to block until trigger_func is called, and 39 # trigger_func must guarantee to change queue state so that block_func can make 40 # enough progress to return. In particular, a block_func that just raises an 41 # exception regardless of whether trigger_func is called will lead to 42 # timing-dependent sporadic failures, and one of those went rarely seen but 43 # undiagnosed for years. Now block_func must be unexceptional. If block_func 44 # is supposed to raise an exception, call do_exceptional_blocking_test() 45 # instead. 46 47 class BlockingTestMixin: 48 49 def tearDown(self): 50 self.t = None 51 52 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): 53 self.t = _TriggerThread(trigger_func, trigger_args) 54 self.t.start() 55 self.result = block_func(*block_args) 56 # If block_func returned before our thread made the call, we failed! 57 if not self.t.startedEvent.is_set(): 58 self.fail("blocking function '%r' appeared not to block" % 59 block_func) 60 self.t.join(10) # make sure the thread terminates 61 if self.t.is_alive(): 62 self.fail("trigger function '%r' appeared to not return" % 63 trigger_func) 64 return self.result 65 66 # Call this instead if block_func is supposed to raise an exception. 67 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, 68 trigger_args, expected_exception_class): 69 self.t = _TriggerThread(trigger_func, trigger_args) 70 self.t.start() 71 try: 72 try: 73 block_func(*block_args) 74 except expected_exception_class: 75 raise 76 else: 77 self.fail("expected exception of kind %r" % 78 expected_exception_class) 79 finally: 80 self.t.join(10) # make sure the thread terminates 81 if self.t.is_alive(): 82 self.fail("trigger function '%r' appeared to not return" % 83 trigger_func) 84 if not self.t.startedEvent.is_set(): 85 self.fail("trigger thread ended but event never set") 86 87 88 class BaseQueueTestMixin(BlockingTestMixin): 89 def setUp(self): 90 self.cum = 0 91 self.cumlock = threading.Lock() 92 93 def simple_queue_test(self, q): 94 if q.qsize(): 95 raise RuntimeError("Call this function with an empty queue") 96 self.assertTrue(q.empty()) 97 self.assertFalse(q.full()) 98 # I guess we better check things actually queue correctly a little :) 99 q.put(111) 100 q.put(333) 101 q.put(222) 102 target_order = dict(Queue = [111, 333, 222], 103 LifoQueue = [222, 333, 111], 104 PriorityQueue = [111, 222, 333]) 105 actual_order = [q.get(), q.get(), q.get()] 106 self.assertEqual(actual_order, target_order[q.__class__.__name__], 107 "Didn't seem to queue the correct data!") 108 for i in range(QUEUE_SIZE-1): 109 q.put(i) 110 self.assertTrue(q.qsize(), "Queue should not be empty") 111 self.assertTrue(not qfull(q), "Queue should not be full") 112 last = 2 * QUEUE_SIZE 113 full = 3 * 2 * QUEUE_SIZE 114 q.put(last) 115 self.assertTrue(qfull(q), "Queue should be full") 116 self.assertFalse(q.empty()) 117 self.assertTrue(q.full()) 118 try: 119 q.put(full, block=0) 120 self.fail("Didn't appear to block with a full queue") 121 except queue.Full: 122 pass 123 try: 124 q.put(full, timeout=0.01) 125 self.fail("Didn't appear to time-out with a full queue") 126 except queue.Full: 127 pass 128 # Test a blocking put 129 self.do_blocking_test(q.put, (full,), q.get, ()) 130 self.do_blocking_test(q.put, (full, True, 10), q.get, ()) 131 # Empty it 132 for i in range(QUEUE_SIZE): 133 q.get() 134 self.assertTrue(not q.qsize(), "Queue should be empty") 135 try: 136 q.get(block=0) 137 self.fail("Didn't appear to block with an empty queue") 138 except queue.Empty: 139 pass 140 try: 141 q.get(timeout=0.01) 142 self.fail("Didn't appear to time-out with an empty queue") 143 except queue.Empty: 144 pass 145 # Test a blocking get 146 self.do_blocking_test(q.get, (), q.put, ('empty',)) 147 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) 148 149 150 def worker(self, q): 151 while True: 152 x = q.get() 153 if x < 0: 154 q.task_done() 155 return 156 with self.cumlock: 157 self.cum += x 158 q.task_done() 159 160 def queue_join_test(self, q): 161 self.cum = 0 162 for i in (0,1): 163 threading.Thread(target=self.worker, args=(q,)).start() 164 for i in range(100): 165 q.put(i) 166 q.join() 167 self.assertEqual(self.cum, sum(range(100)), 168 "q.join() did not block until all tasks were done") 169 for i in (0,1): 170 q.put(-1) # instruct the threads to close 171 q.join() # verify that you can join twice 172 173 def test_queue_task_done(self): 174 # Test to make sure a queue task completed successfully. 175 q = self.type2test() 176 try: 177 q.task_done() 178 except ValueError: 179 pass 180 else: 181 self.fail("Did not detect task count going negative") 182 183 def test_queue_join(self): 184 # Test that a queue join()s successfully, and before anything else 185 # (done twice for insurance). 186 q = self.type2test() 187 self.queue_join_test(q) 188 self.queue_join_test(q) 189 try: 190 q.task_done() 191 except ValueError: 192 pass 193 else: 194 self.fail("Did not detect task count going negative") 195 196 def test_simple_queue(self): 197 # Do it a couple of times on the same queue. 198 # Done twice to make sure works with same instance reused. 199 q = self.type2test(QUEUE_SIZE) 200 self.simple_queue_test(q) 201 self.simple_queue_test(q) 202 203 def test_negative_timeout_raises_exception(self): 204 q = self.type2test(QUEUE_SIZE) 205 with self.assertRaises(ValueError): 206 q.put(1, timeout=-1) 207 with self.assertRaises(ValueError): 208 q.get(1, timeout=-1) 209 210 def test_nowait(self): 211 q = self.type2test(QUEUE_SIZE) 212 for i in range(QUEUE_SIZE): 213 q.put_nowait(1) 214 with self.assertRaises(queue.Full): 215 q.put_nowait(1) 216 217 for i in range(QUEUE_SIZE): 218 q.get_nowait() 219 with self.assertRaises(queue.Empty): 220 q.get_nowait() 221 222 def test_shrinking_queue(self): 223 # issue 10110 224 q = self.type2test(3) 225 q.put(1) 226 q.put(2) 227 q.put(3) 228 with self.assertRaises(queue.Full): 229 q.put_nowait(4) 230 self.assertEqual(q.qsize(), 3) 231 q.maxsize = 2 # shrink the queue 232 with self.assertRaises(queue.Full): 233 q.put_nowait(4) 234 235 class QueueTest(BaseQueueTestMixin, unittest.TestCase): 236 type2test = queue.Queue 237 238 class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase): 239 type2test = queue.LifoQueue 240 241 class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase): 242 type2test = queue.PriorityQueue 243 244 245 246 # A Queue subclass that can provoke failure at a moment's notice :) 247 class FailingQueueException(Exception): 248 pass 249 250 class FailingQueue(queue.Queue): 251 def __init__(self, *args): 252 self.fail_next_put = False 253 self.fail_next_get = False 254 queue.Queue.__init__(self, *args) 255 def _put(self, item): 256 if self.fail_next_put: 257 self.fail_next_put = False 258 raise FailingQueueException("You Lose") 259 return queue.Queue._put(self, item) 260 def _get(self): 261 if self.fail_next_get: 262 self.fail_next_get = False 263 raise FailingQueueException("You Lose") 264 return queue.Queue._get(self) 265 266 class FailingQueueTest(BlockingTestMixin, unittest.TestCase): 267 268 def failing_queue_test(self, q): 269 if q.qsize(): 270 raise RuntimeError("Call this function with an empty queue") 271 for i in range(QUEUE_SIZE-1): 272 q.put(i) 273 # Test a failing non-blocking put. 274 q.fail_next_put = True 275 try: 276 q.put("oops", block=0) 277 self.fail("The queue didn't fail when it should have") 278 except FailingQueueException: 279 pass 280 q.fail_next_put = True 281 try: 282 q.put("oops", timeout=0.1) 283 self.fail("The queue didn't fail when it should have") 284 except FailingQueueException: 285 pass 286 q.put("last") 287 self.assertTrue(qfull(q), "Queue should be full") 288 # Test a failing blocking put 289 q.fail_next_put = True 290 try: 291 self.do_blocking_test(q.put, ("full",), q.get, ()) 292 self.fail("The queue didn't fail when it should have") 293 except FailingQueueException: 294 pass 295 # Check the Queue isn't damaged. 296 # put failed, but get succeeded - re-add 297 q.put("last") 298 # Test a failing timeout put 299 q.fail_next_put = True 300 try: 301 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), 302 FailingQueueException) 303 self.fail("The queue didn't fail when it should have") 304 except FailingQueueException: 305 pass 306 # Check the Queue isn't damaged. 307 # put failed, but get succeeded - re-add 308 q.put("last") 309 self.assertTrue(qfull(q), "Queue should be full") 310 q.get() 311 self.assertTrue(not qfull(q), "Queue should not be full") 312 q.put("last") 313 self.assertTrue(qfull(q), "Queue should be full") 314 # Test a blocking put 315 self.do_blocking_test(q.put, ("full",), q.get, ()) 316 # Empty it 317 for i in range(QUEUE_SIZE): 318 q.get() 319 self.assertTrue(not q.qsize(), "Queue should be empty") 320 q.put("first") 321 q.fail_next_get = True 322 try: 323 q.get() 324 self.fail("The queue didn't fail when it should have") 325 except FailingQueueException: 326 pass 327 self.assertTrue(q.qsize(), "Queue should not be empty") 328 q.fail_next_get = True 329 try: 330 q.get(timeout=0.1) 331 self.fail("The queue didn't fail when it should have") 332 except FailingQueueException: 333 pass 334 self.assertTrue(q.qsize(), "Queue should not be empty") 335 q.get() 336 self.assertTrue(not q.qsize(), "Queue should be empty") 337 q.fail_next_get = True 338 try: 339 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), 340 FailingQueueException) 341 self.fail("The queue didn't fail when it should have") 342 except FailingQueueException: 343 pass 344 # put succeeded, but get failed. 345 self.assertTrue(q.qsize(), "Queue should not be empty") 346 q.get() 347 self.assertTrue(not q.qsize(), "Queue should be empty") 348 349 def test_failing_queue(self): 350 # Test to make sure a queue is functioning correctly. 351 # Done twice to the same instance. 352 q = FailingQueue(QUEUE_SIZE) 353 self.failing_queue_test(q) 354 self.failing_queue_test(q) 355 356 357 if __name__ == "__main__": 358 unittest.main() 359