Home | History | Annotate | Download | only in test
      1 # Some simple queue module tests, plus some failure conditions
      2 # to ensure the Queue locks remain stable.
      3 import Queue
      4 import time
      5 import unittest
      6 from test import test_support
      7 threading = test_support.import_module('threading')
      8 
      9 QUEUE_SIZE = 5
     10 
     11 # A thread to run a function that unclogs a blocked Queue.
     12 class _TriggerThread(threading.Thread):
     13     def __init__(self, fn, args):
     14         self.fn = fn
     15         self.args = args
     16         self.startedEvent = threading.Event()
     17         threading.Thread.__init__(self)
     18 
     19     def run(self):
     20         # The sleep isn't necessary, but is intended to give the blocking
     21         # function in the main thread a chance at actually blocking before
     22         # we unclog it.  But if the sleep is longer than the timeout-based
     23         # tests wait in their blocking functions, those tests will fail.
     24         # So we give them much longer timeout values compared to the
     25         # sleep here (I aimed at 10 seconds for blocking functions --
     26         # they should never actually wait that long - they should make
     27         # progress as soon as we call self.fn()).
     28         time.sleep(0.1)
     29         self.startedEvent.set()
     30         self.fn(*self.args)
     31 
     32 
     33 # Execute a function that blocks, and in a separate thread, a function that
     34 # triggers the release.  Returns the result of the blocking function.  Caution:
     35 # block_func must guarantee to block until trigger_func is called, and
     36 # trigger_func must guarantee to change queue state so that block_func can make
     37 # enough progress to return.  In particular, a block_func that just raises an
     38 # exception regardless of whether trigger_func is called will lead to
     39 # timing-dependent sporadic failures, and one of those went rarely seen but
     40 # undiagnosed for years.  Now block_func must be unexceptional.  If block_func
     41 # is supposed to raise an exception, call do_exceptional_blocking_test()
     42 # instead.
     43 
     44 class BlockingTestMixin:
     45 
     46     def tearDown(self):
     47         self.t = None
     48 
     49     def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
     50         self.t = _TriggerThread(trigger_func, trigger_args)
     51         self.t.start()
     52         self.result = block_func(*block_args)
     53         # If block_func returned before our thread made the call, we failed!
     54         if not self.t.startedEvent.is_set():
     55             self.fail("blocking function '%r' appeared not to block" %
     56                       block_func)
     57         self.t.join(10) # make sure the thread terminates
     58         if self.t.is_alive():
     59             self.fail("trigger function '%r' appeared to not return" %
     60                       trigger_func)
     61         return self.result
     62 
     63     # Call this instead if block_func is supposed to raise an exception.
     64     def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
     65                                    trigger_args, expected_exception_class):
     66         self.t = _TriggerThread(trigger_func, trigger_args)
     67         self.t.start()
     68         try:
     69             try:
     70                 block_func(*block_args)
     71             except expected_exception_class:
     72                 raise
     73             else:
     74                 self.fail("expected exception of kind %r" %
     75                                  expected_exception_class)
     76         finally:
     77             self.t.join(10) # make sure the thread terminates
     78             if self.t.is_alive():
     79                 self.fail("trigger function '%r' appeared to not return" %
     80                                  trigger_func)
     81             if not self.t.startedEvent.is_set():
     82                 self.fail("trigger thread ended but event never set")
     83 
     84 
     85 class BaseQueueTest(BlockingTestMixin):
     86     def setUp(self):
     87         self.cum = 0
     88         self.cumlock = threading.Lock()
     89 
     90     def simple_queue_test(self, q):
     91         if not q.empty():
     92             raise RuntimeError, "Call this function with an empty queue"
     93         # I guess we better check things actually queue correctly a little :)
     94         q.put(111)
     95         q.put(333)
     96         q.put(222)
     97         target_order = dict(Queue = [111, 333, 222],
     98                             LifoQueue = [222, 333, 111],
     99                             PriorityQueue = [111, 222, 333])
    100         actual_order = [q.get(), q.get(), q.get()]
    101         self.assertEqual(actual_order, target_order[q.__class__.__name__],
    102                          "Didn't seem to queue the correct data!")
    103         for i in range(QUEUE_SIZE-1):
    104             q.put(i)
    105             self.assertTrue(not q.empty(), "Queue should not be empty")
    106         self.assertTrue(not q.full(), "Queue should not be full")
    107         last = 2 * QUEUE_SIZE
    108         full = 3 * 2 * QUEUE_SIZE
    109         q.put(last)
    110         self.assertTrue(q.full(), "Queue should be full")
    111         try:
    112             q.put(full, block=0)
    113             self.fail("Didn't appear to block with a full queue")
    114         except Queue.Full:
    115             pass
    116         try:
    117             q.put(full, timeout=0.01)
    118             self.fail("Didn't appear to time-out with a full queue")
    119         except Queue.Full:
    120             pass
    121         # Test a blocking put
    122         self.do_blocking_test(q.put, (full,), q.get, ())
    123         self.do_blocking_test(q.put, (full, True, 10), q.get, ())
    124         # Empty it
    125         for i in range(QUEUE_SIZE):
    126             q.get()
    127         self.assertTrue(q.empty(), "Queue should be empty")
    128         try:
    129             q.get(block=0)
    130             self.fail("Didn't appear to block with an empty queue")
    131         except Queue.Empty:
    132             pass
    133         try:
    134             q.get(timeout=0.01)
    135             self.fail("Didn't appear to time-out with an empty queue")
    136         except Queue.Empty:
    137             pass
    138         # Test a blocking get
    139         self.do_blocking_test(q.get, (), q.put, ('empty',))
    140         self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
    141 
    142 
    143     def worker(self, q):
    144         while True:
    145             x = q.get()
    146             if x is None:
    147                 q.task_done()
    148                 return
    149             with self.cumlock:
    150                 self.cum += x
    151             q.task_done()
    152 
    153     def queue_join_test(self, q):
    154         self.cum = 0
    155         for i in (0,1):
    156             threading.Thread(target=self.worker, args=(q,)).start()
    157         for i in xrange(100):
    158             q.put(i)
    159         q.join()
    160         self.assertEqual(self.cum, sum(range(100)),
    161                          "q.join() did not block until all tasks were done")
    162         for i in (0,1):
    163             q.put(None)         # instruct the threads to close
    164         q.join()                # verify that you can join twice
    165 
    166     def test_queue_task_done(self):
    167         # Test to make sure a queue task completed successfully.
    168         q = self.type2test()
    169         try:
    170             q.task_done()
    171         except ValueError:
    172             pass
    173         else:
    174             self.fail("Did not detect task count going negative")
    175 
    176     def test_queue_join(self):
    177         # Test that a queue join()s successfully, and before anything else
    178         # (done twice for insurance).
    179         q = self.type2test()
    180         self.queue_join_test(q)
    181         self.queue_join_test(q)
    182         try:
    183             q.task_done()
    184         except ValueError:
    185             pass
    186         else:
    187             self.fail("Did not detect task count going negative")
    188 
    189     def test_simple_queue(self):
    190         # Do it a couple of times on the same queue.
    191         # Done twice to make sure works with same instance reused.
    192         q = self.type2test(QUEUE_SIZE)
    193         self.simple_queue_test(q)
    194         self.simple_queue_test(q)
    195 
    196 
    197 class QueueTest(BaseQueueTest, unittest.TestCase):
    198     type2test = Queue.Queue
    199 
    200 class LifoQueueTest(BaseQueueTest, unittest.TestCase):
    201     type2test = Queue.LifoQueue
    202 
    203 class PriorityQueueTest(BaseQueueTest, unittest.TestCase):
    204     type2test = Queue.PriorityQueue
    205 
    206 
    207 
    208 # A Queue subclass that can provoke failure at a moment's notice :)
    209 class FailingQueueException(Exception):
    210     pass
    211 
    212 class FailingQueue(Queue.Queue):
    213     def __init__(self, *args):
    214         self.fail_next_put = False
    215         self.fail_next_get = False
    216         Queue.Queue.__init__(self, *args)
    217     def _put(self, item):
    218         if self.fail_next_put:
    219             self.fail_next_put = False
    220             raise FailingQueueException, "You Lose"
    221         return Queue.Queue._put(self, item)
    222     def _get(self):
    223         if self.fail_next_get:
    224             self.fail_next_get = False
    225             raise FailingQueueException, "You Lose"
    226         return Queue.Queue._get(self)
    227 
    228 class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
    229 
    230     def failing_queue_test(self, q):
    231         if not q.empty():
    232             raise RuntimeError, "Call this function with an empty queue"
    233         for i in range(QUEUE_SIZE-1):
    234             q.put(i)
    235         # Test a failing non-blocking put.
    236         q.fail_next_put = True
    237         try:
    238             q.put("oops", block=0)
    239             self.fail("The queue didn't fail when it should have")
    240         except FailingQueueException:
    241             pass
    242         q.fail_next_put = True
    243         try:
    244             q.put("oops", timeout=0.1)
    245             self.fail("The queue didn't fail when it should have")
    246         except FailingQueueException:
    247             pass
    248         q.put("last")
    249         self.assertTrue(q.full(), "Queue should be full")
    250         # Test a failing blocking put
    251         q.fail_next_put = True
    252         try:
    253             self.do_blocking_test(q.put, ("full",), q.get, ())
    254             self.fail("The queue didn't fail when it should have")
    255         except FailingQueueException:
    256             pass
    257         # Check the Queue isn't damaged.
    258         # put failed, but get succeeded - re-add
    259         q.put("last")
    260         # Test a failing timeout put
    261         q.fail_next_put = True
    262         try:
    263             self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
    264                                               FailingQueueException)
    265             self.fail("The queue didn't fail when it should have")
    266         except FailingQueueException:
    267             pass
    268         # Check the Queue isn't damaged.
    269         # put failed, but get succeeded - re-add
    270         q.put("last")
    271         self.assertTrue(q.full(), "Queue should be full")
    272         q.get()
    273         self.assertTrue(not q.full(), "Queue should not be full")
    274         q.put("last")
    275         self.assertTrue(q.full(), "Queue should be full")
    276         # Test a blocking put
    277         self.do_blocking_test(q.put, ("full",), q.get, ())
    278         # Empty it
    279         for i in range(QUEUE_SIZE):
    280             q.get()
    281         self.assertTrue(q.empty(), "Queue should be empty")
    282         q.put("first")
    283         q.fail_next_get = True
    284         try:
    285             q.get()
    286             self.fail("The queue didn't fail when it should have")
    287         except FailingQueueException:
    288             pass
    289         self.assertTrue(not q.empty(), "Queue should not be empty")
    290         q.fail_next_get = True
    291         try:
    292             q.get(timeout=0.1)
    293             self.fail("The queue didn't fail when it should have")
    294         except FailingQueueException:
    295             pass
    296         self.assertTrue(not q.empty(), "Queue should not be empty")
    297         q.get()
    298         self.assertTrue(q.empty(), "Queue should be empty")
    299         q.fail_next_get = True
    300         try:
    301             self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
    302                                               FailingQueueException)
    303             self.fail("The queue didn't fail when it should have")
    304         except FailingQueueException:
    305             pass
    306         # put succeeded, but get failed.
    307         self.assertTrue(not q.empty(), "Queue should not be empty")
    308         q.get()
    309         self.assertTrue(q.empty(), "Queue should be empty")
    310 
    311     def test_failing_queue(self):
    312         # Test to make sure a queue is functioning correctly.
    313         # Done twice to the same instance.
    314         q = FailingQueue(QUEUE_SIZE)
    315         self.failing_queue_test(q)
    316         self.failing_queue_test(q)
    317 
    318 
    319 def test_main():
    320     test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
    321                               FailingQueueTest)
    322 
    323 
    324 if __name__ == "__main__":
    325     test_main()
    326