Home | History | Annotate | Download | only in wardmodem
      1 #!/usr/bin/env python
      2 
      3 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 import task_loop
      8 
      9 import glib
     10 import logging
     11 import mox
     12 import time
     13 import unittest
     14 
     15 class TaskLoopTestCase(unittest.TestCase):
     16     """
     17     Test fixture for TaskLoop class.
     18 
     19     These unit-tests have a trade-off between speed and stability. The whole
     20     suite takes about half a minute to run, and probably could be speeded up a
     21     little bit. But, making the numbers too small might make the tests flaky.
     22     """
     23 
     24     def setUp(self):
     25         self._mox = mox.Mox()
     26         self._callback_mocker = self._mox.CreateMock(TaskLoopTestCase)
     27         self._task_loop = task_loop.get_instance()
     28 
     29     # ##########################################################################
     30     # Tests
     31 
     32     def test_post_task_simple(self):
     33         """Post a simple task and expect it gets dispatched."""
     34         self._callback_mocker._callback()
     35 
     36         self._mox.ReplayAll()
     37         self._task_loop.post_task(self._callback_mocker._callback)
     38         self._run_task_loop(2)
     39         self._mox.VerifyAll()
     40 
     41 
     42     def test_post_task_set_attribute(self):
     43         """Post a task that accesses an attribute from the context object."""
     44         self.flag = False
     45         self._task_loop.post_task(self._callback_set_attribute)
     46         self._run_task_loop(2)
     47         self.assertTrue(self.flag)
     48 
     49 
     50     def test_post_task_with_argument(self):
     51         """Post task with some argument."""
     52         arg = True
     53         self._callback_mocker._callback_with_arguments(arg)
     54 
     55         self._mox.ReplayAll()
     56         self._task_loop.post_task(
     57                 self._callback_mocker._callback_with_arguments, arg)
     58         self._run_task_loop(2)
     59         self._mox.VerifyAll()
     60 
     61 
     62     def test_post_task_after_delay(self):
     63         """Post a task with some delay and check that the delay is respected."""
     64         start_time = time.time()
     65         self.time = start_time
     66         self._task_loop.post_task_after_delay(self._callback_set_time, 3000)
     67         self._run_task_loop(5)
     68         delayed_time = self.time - start_time
     69         self.assertGreaterEqual(delayed_time, 3)
     70 
     71 
     72     def test_post_repeated_task(self):
     73         """Post a repeated task and check it gets dispatched multiple times."""
     74         self.count = 0
     75         self._task_loop.post_repeated_task(self._callback_increment_count, 1000)
     76         self._run_task_loop(5)
     77         self.assertGreaterEqual(self.count, 3)
     78 
     79 
     80     def test_ignore_delays(self):
     81         """Post a task and test ignore_delays mode."""
     82         self._task_loop.ignore_delays = False
     83 
     84         self._task_loop.post_task_after_delay(self._callback_mocker._callback,
     85                                               10000)
     86         # Not enough time to dispatch the posted task
     87         self._run_task_loop(1)
     88         self._mox.VerifyAll()
     89 
     90 
     91     def test_cancel_posted_task(self):
     92         """Test that a cancelled task is not dispatched."""
     93         post_id = self._task_loop.post_task_after_delay(
     94                 self._callback_mocker._callback,
     95                 2000)
     96         self._task_loop.post_task(self._callback_cancel_task, post_id)
     97         self._run_task_loop(3)
     98         self._mox.VerifyAll()
     99 
    100 
    101     def test_multiple_cancels(self):
    102         """Test that successive cancels after a successful cancel fail."""
    103         post_id = self._task_loop.post_task_after_delay(
    104                 self._callback_mocker._callback,
    105                 2000)
    106         self._task_loop.post_task(self._callback_cancel_task, post_id)
    107         self._task_loop.post_task(self._callback_cancel_cancelled_task, post_id)
    108         self._run_task_loop(3)
    109         self._mox.VerifyAll()
    110 
    111 
    112     def test_random_delays(self):
    113         """Test that random delays works (sort of). This test could be flaky."""
    114         # Warning: This test could be flaky. Add more differences?
    115         self.count = 0
    116         self.times = {}
    117         self._task_loop.random_delays = True
    118         self._task_loop.max_random_delay_ms = 1000
    119         self._task_loop.post_repeated_task(self._callback_record_times, 500)
    120         self._run_task_loop(5)
    121         self.assertGreaterEqual(self.count, 4)
    122         # Test that not all time gaps are almost the same
    123         diff1 = round(self.times[1] - self.times[0], 3)
    124         diff2 = round(self.times[2] - self.times[1], 3)
    125         diff3 = round(self.times[3] - self.times[2], 3)
    126         self.assertTrue(diff1 != diff2 or diff2 != diff3 or diff3 != diff1)
    127 
    128     # ##########################################################################
    129     # Helper functions
    130 
    131     def _stop_task_loop(self):
    132         print('Stopping task_loop.')
    133         self._task_loop.stop()
    134 
    135     def _run_task_loop(self, run_for_seconds):
    136         """
    137         Runs the task loop for |run_for_seconds| seconds. This function is
    138         blocking, so the main thread will return only after |run_for_seconds|.
    139         """
    140         # post a task to stop the task loop.
    141         glib.timeout_add(run_for_seconds*1000, self._stop_task_loop)
    142         self._task_loop.start()
    143         # We will continue only when the stop task has been executed.
    144 
    145     # ##########################################################################
    146     # Callbacks for tests
    147 
    148     def _callback(self):
    149         print('Actual TaskLoopTestCase._callback called!')
    150 
    151 
    152     def _callback_set_attribute(self):
    153         self.flag = True
    154 
    155 
    156     def _callback_set_time(self):
    157         self.time = time.time()
    158 
    159 
    160     def _callback_increment_count(self):
    161         self.count = self.count + 1
    162 
    163 
    164     def _callback_record_times(self):
    165         self.times[self.count] = time.time()
    166         self.count = self.count + 1
    167 
    168 
    169     def _callback_with_arguments(self, arg):
    170         pass
    171 
    172 
    173     def _callback_cancel_task(self, post_id):
    174         self._task_loop.cancel_posted_task(post_id)
    175 
    176 
    177     def _callback_cancel_cancelled_task(self, post_id):
    178         self.assertFalse(self._task_loop.cancel_posted_task(post_id))
    179 
    180 
    181 if __name__ == '__main__':
    182     logging.basicConfig(level=logging.DEBUG)
    183     unittest.main()
    184