Home | History | Annotate | Download | only in test
      1 # Some simple queue module tests, plus some failure conditions
      2 # to ensure the Queue locks remain stable.
      3 import queue
      4 import time
      5 import unittest
      6 from test import support
      7 threading = support.import_module('threading')
      8 
      9 QUEUE_SIZE = 5
     10 
     11 def qfull(q):
     12     return q.maxsize > 0 and q.qsize() == q.maxsize
     13 
     14 # A thread to run a function that unclogs a blocked Queue.
     15 class _TriggerThread(threading.Thread):
     16     def __init__(self, fn, args):
     17         self.fn = fn
     18         self.args = args
     19         self.startedEvent = threading.Event()
     20         threading.Thread.__init__(self)
     21 
     22     def run(self):
     23         # The sleep isn't necessary, but is intended to give the blocking
     24         # function in the main thread a chance at actually blocking before
     25         # we unclog it.  But if the sleep is longer than the timeout-based
     26         # tests wait in their blocking functions, those tests will fail.
     27         # So we give them much longer timeout values compared to the
     28         # sleep here (I aimed at 10 seconds for blocking functions --
     29         # they should never actually wait that long - they should make
     30         # progress as soon as we call self.fn()).
     31         time.sleep(0.1)
     32         self.startedEvent.set()
     33         self.fn(*self.args)
     34 
     35 
     36 # Execute a function that blocks, and in a separate thread, a function that
     37 # triggers the release.  Returns the result of the blocking function.  Caution:
     38 # block_func must guarantee to block until trigger_func is called, and
     39 # trigger_func must guarantee to change queue state so that block_func can make
     40 # enough progress to return.  In particular, a block_func that just raises an
     41 # exception regardless of whether trigger_func is called will lead to
     42 # timing-dependent sporadic failures, and one of those went rarely seen but
     43 # undiagnosed for years.  Now block_func must be unexceptional.  If block_func
     44 # is supposed to raise an exception, call do_exceptional_blocking_test()
     45 # instead.
     46 
     47 class BlockingTestMixin:
     48 
     49     def tearDown(self):
     50         self.t = None
     51 
     52     def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
     53         self.t = _TriggerThread(trigger_func, trigger_args)
     54         self.t.start()
     55         self.result = block_func(*block_args)
     56         # If block_func returned before our thread made the call, we failed!
     57         if not self.t.startedEvent.is_set():
     58             self.fail("blocking function '%r' appeared not to block" %
     59                       block_func)
     60         self.t.join(10) # make sure the thread terminates
     61         if self.t.is_alive():
     62             self.fail("trigger function '%r' appeared to not return" %
     63                       trigger_func)
     64         return self.result
     65 
     66     # Call this instead if block_func is supposed to raise an exception.
     67     def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
     68                                    trigger_args, expected_exception_class):
     69         self.t = _TriggerThread(trigger_func, trigger_args)
     70         self.t.start()
     71         try:
     72             try:
     73                 block_func(*block_args)
     74             except expected_exception_class:
     75                 raise
     76             else:
     77                 self.fail("expected exception of kind %r" %
     78                                  expected_exception_class)
     79         finally:
     80             self.t.join(10) # make sure the thread terminates
     81             if self.t.is_alive():
     82                 self.fail("trigger function '%r' appeared to not return" %
     83                                  trigger_func)
     84             if not self.t.startedEvent.is_set():
     85                 self.fail("trigger thread ended but event never set")
     86 
     87 
     88 class BaseQueueTestMixin(BlockingTestMixin):
     89     def setUp(self):
     90         self.cum = 0
     91         self.cumlock = threading.Lock()
     92 
     93     def simple_queue_test(self, q):
     94         if q.qsize():
     95             raise RuntimeError("Call this function with an empty queue")
     96         self.assertTrue(q.empty())
     97         self.assertFalse(q.full())
     98         # I guess we better check things actually queue correctly a little :)
     99         q.put(111)
    100         q.put(333)
    101         q.put(222)
    102         target_order = dict(Queue = [111, 333, 222],
    103                             LifoQueue = [222, 333, 111],
    104                             PriorityQueue = [111, 222, 333])
    105         actual_order = [q.get(), q.get(), q.get()]
    106         self.assertEqual(actual_order, target_order[q.__class__.__name__],
    107                          "Didn't seem to queue the correct data!")
    108         for i in range(QUEUE_SIZE-1):
    109             q.put(i)
    110             self.assertTrue(q.qsize(), "Queue should not be empty")
    111         self.assertTrue(not qfull(q), "Queue should not be full")
    112         last = 2 * QUEUE_SIZE
    113         full = 3 * 2 * QUEUE_SIZE
    114         q.put(last)
    115         self.assertTrue(qfull(q), "Queue should be full")
    116         self.assertFalse(q.empty())
    117         self.assertTrue(q.full())
    118         try:
    119             q.put(full, block=0)
    120             self.fail("Didn't appear to block with a full queue")
    121         except queue.Full:
    122             pass
    123         try:
    124             q.put(full, timeout=0.01)
    125             self.fail("Didn't appear to time-out with a full queue")
    126         except queue.Full:
    127             pass
    128         # Test a blocking put
    129         self.do_blocking_test(q.put, (full,), q.get, ())
    130         self.do_blocking_test(q.put, (full, True, 10), q.get, ())
    131         # Empty it
    132         for i in range(QUEUE_SIZE):
    133             q.get()
    134         self.assertTrue(not q.qsize(), "Queue should be empty")
    135         try:
    136             q.get(block=0)
    137             self.fail("Didn't appear to block with an empty queue")
    138         except queue.Empty:
    139             pass
    140         try:
    141             q.get(timeout=0.01)
    142             self.fail("Didn't appear to time-out with an empty queue")
    143         except queue.Empty:
    144             pass
    145         # Test a blocking get
    146         self.do_blocking_test(q.get, (), q.put, ('empty',))
    147         self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
    148 
    149 
    150     def worker(self, q):
    151         while True:
    152             x = q.get()
    153             if x < 0:
    154                 q.task_done()
    155                 return
    156             with self.cumlock:
    157                 self.cum += x
    158             q.task_done()
    159 
    160     def queue_join_test(self, q):
    161         self.cum = 0
    162         for i in (0,1):
    163             threading.Thread(target=self.worker, args=(q,)).start()
    164         for i in range(100):
    165             q.put(i)
    166         q.join()
    167         self.assertEqual(self.cum, sum(range(100)),
    168                          "q.join() did not block until all tasks were done")
    169         for i in (0,1):
    170             q.put(-1)         # instruct the threads to close
    171         q.join()                # verify that you can join twice
    172 
    173     def test_queue_task_done(self):
    174         # Test to make sure a queue task completed successfully.
    175         q = self.type2test()
    176         try:
    177             q.task_done()
    178         except ValueError:
    179             pass
    180         else:
    181             self.fail("Did not detect task count going negative")
    182 
    183     def test_queue_join(self):
    184         # Test that a queue join()s successfully, and before anything else
    185         # (done twice for insurance).
    186         q = self.type2test()
    187         self.queue_join_test(q)
    188         self.queue_join_test(q)
    189         try:
    190             q.task_done()
    191         except ValueError:
    192             pass
    193         else:
    194             self.fail("Did not detect task count going negative")
    195 
    196     def test_simple_queue(self):
    197         # Do it a couple of times on the same queue.
    198         # Done twice to make sure works with same instance reused.
    199         q = self.type2test(QUEUE_SIZE)
    200         self.simple_queue_test(q)
    201         self.simple_queue_test(q)
    202 
    203     def test_negative_timeout_raises_exception(self):
    204         q = self.type2test(QUEUE_SIZE)
    205         with self.assertRaises(ValueError):
    206             q.put(1, timeout=-1)
    207         with self.assertRaises(ValueError):
    208             q.get(1, timeout=-1)
    209 
    210     def test_nowait(self):
    211         q = self.type2test(QUEUE_SIZE)
    212         for i in range(QUEUE_SIZE):
    213             q.put_nowait(1)
    214         with self.assertRaises(queue.Full):
    215             q.put_nowait(1)
    216 
    217         for i in range(QUEUE_SIZE):
    218             q.get_nowait()
    219         with self.assertRaises(queue.Empty):
    220             q.get_nowait()
    221 
    222     def test_shrinking_queue(self):
    223         # issue 10110
    224         q = self.type2test(3)
    225         q.put(1)
    226         q.put(2)
    227         q.put(3)
    228         with self.assertRaises(queue.Full):
    229             q.put_nowait(4)
    230         self.assertEqual(q.qsize(), 3)
    231         q.maxsize = 2                       # shrink the queue
    232         with self.assertRaises(queue.Full):
    233             q.put_nowait(4)
    234 
    235 class QueueTest(BaseQueueTestMixin, unittest.TestCase):
    236     type2test = queue.Queue
    237 
    238 class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
    239     type2test = queue.LifoQueue
    240 
    241 class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
    242     type2test = queue.PriorityQueue
    243 
    244 
    245 
    246 # A Queue subclass that can provoke failure at a moment's notice :)
    247 class FailingQueueException(Exception):
    248     pass
    249 
    250 class FailingQueue(queue.Queue):
    251     def __init__(self, *args):
    252         self.fail_next_put = False
    253         self.fail_next_get = False
    254         queue.Queue.__init__(self, *args)
    255     def _put(self, item):
    256         if self.fail_next_put:
    257             self.fail_next_put = False
    258             raise FailingQueueException("You Lose")
    259         return queue.Queue._put(self, item)
    260     def _get(self):
    261         if self.fail_next_get:
    262             self.fail_next_get = False
    263             raise FailingQueueException("You Lose")
    264         return queue.Queue._get(self)
    265 
    266 class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
    267 
    268     def failing_queue_test(self, q):
    269         if q.qsize():
    270             raise RuntimeError("Call this function with an empty queue")
    271         for i in range(QUEUE_SIZE-1):
    272             q.put(i)
    273         # Test a failing non-blocking put.
    274         q.fail_next_put = True
    275         try:
    276             q.put("oops", block=0)
    277             self.fail("The queue didn't fail when it should have")
    278         except FailingQueueException:
    279             pass
    280         q.fail_next_put = True
    281         try:
    282             q.put("oops", timeout=0.1)
    283             self.fail("The queue didn't fail when it should have")
    284         except FailingQueueException:
    285             pass
    286         q.put("last")
    287         self.assertTrue(qfull(q), "Queue should be full")
    288         # Test a failing blocking put
    289         q.fail_next_put = True
    290         try:
    291             self.do_blocking_test(q.put, ("full",), q.get, ())
    292             self.fail("The queue didn't fail when it should have")
    293         except FailingQueueException:
    294             pass
    295         # Check the Queue isn't damaged.
    296         # put failed, but get succeeded - re-add
    297         q.put("last")
    298         # Test a failing timeout put
    299         q.fail_next_put = True
    300         try:
    301             self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
    302                                               FailingQueueException)
    303             self.fail("The queue didn't fail when it should have")
    304         except FailingQueueException:
    305             pass
    306         # Check the Queue isn't damaged.
    307         # put failed, but get succeeded - re-add
    308         q.put("last")
    309         self.assertTrue(qfull(q), "Queue should be full")
    310         q.get()
    311         self.assertTrue(not qfull(q), "Queue should not be full")
    312         q.put("last")
    313         self.assertTrue(qfull(q), "Queue should be full")
    314         # Test a blocking put
    315         self.do_blocking_test(q.put, ("full",), q.get, ())
    316         # Empty it
    317         for i in range(QUEUE_SIZE):
    318             q.get()
    319         self.assertTrue(not q.qsize(), "Queue should be empty")
    320         q.put("first")
    321         q.fail_next_get = True
    322         try:
    323             q.get()
    324             self.fail("The queue didn't fail when it should have")
    325         except FailingQueueException:
    326             pass
    327         self.assertTrue(q.qsize(), "Queue should not be empty")
    328         q.fail_next_get = True
    329         try:
    330             q.get(timeout=0.1)
    331             self.fail("The queue didn't fail when it should have")
    332         except FailingQueueException:
    333             pass
    334         self.assertTrue(q.qsize(), "Queue should not be empty")
    335         q.get()
    336         self.assertTrue(not q.qsize(), "Queue should be empty")
    337         q.fail_next_get = True
    338         try:
    339             self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
    340                                               FailingQueueException)
    341             self.fail("The queue didn't fail when it should have")
    342         except FailingQueueException:
    343             pass
    344         # put succeeded, but get failed.
    345         self.assertTrue(q.qsize(), "Queue should not be empty")
    346         q.get()
    347         self.assertTrue(not q.qsize(), "Queue should be empty")
    348 
    349     def test_failing_queue(self):
    350         # Test to make sure a queue is functioning correctly.
    351         # Done twice to the same instance.
    352         q = FailingQueue(QUEUE_SIZE)
    353         self.failing_queue_test(q)
    354         self.failing_queue_test(q)
    355 
    356 
    357 if __name__ == "__main__":
    358     unittest.main()
    359