Home | History | Annotate | Download | only in bestflags
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 """Steering stage unittest.
      5 
      6 Part of the Chrome build flags optimization.
      7 """
      8 
      9 __author__ = 'yuhenglong (at] google.com (Yuheng Long)'
     10 
     11 import multiprocessing
     12 import unittest
     13 
     14 from generation import Generation
     15 from mock_task import IdentifierMockTask
     16 import pipeline_process
     17 import steering
     18 
     19 # Pick an integer at random.
     20 STEERING_TEST_STAGE = -8
     21 
     22 # The number of generations to be used to do the testing.
     23 NUMBER_OF_GENERATIONS = 20
     24 
     25 # The number of tasks to be put in each generation to be tested.
     26 NUMBER_OF_TASKS = 20
     27 
     28 # The stride of permutation used to shuffle the input list of tasks. Should be
     29 # relatively prime with NUMBER_OF_TASKS.
     30 STRIDE = 7
     31 
     32 
     33 class MockGeneration(Generation):
     34   """This class emulates an actual generation.
     35 
     36   It will output the next_generations when the method Next is called. The
     37   next_generations is initiated when the MockGeneration instance is constructed.
     38   """
     39 
     40   def __init__(self, tasks, next_generations):
     41     """Set up the next generations for this task.
     42 
     43     Args:
     44       tasks: A set of tasks to be run.
     45       next_generations: A list of generations as the next generation of the
     46         current generation.
     47     """
     48     Generation.__init__(self, tasks, None)
     49     self._next_generations = next_generations
     50 
     51   def Next(self, _):
     52     return self._next_generations
     53 
     54   def IsImproved(self):
     55     if self._next_generations:
     56       return True
     57     return False
     58 
     59 
     60 class SteeringTest(unittest.TestCase):
     61   """This class test the steering method.
     62 
     63   The steering algorithm should return if there is no new task in the initial
     64   generation. The steering algorithm should send all the tasks to the next stage
     65   and should terminate once there is no pending generation. A generation is
     66   pending if it contains pending task. A task is pending if its (test) result
     67   is not ready.
     68   """
     69 
     70   def testSteering(self):
     71     """Test that the steering algorithm processes all the tasks properly.
     72 
     73     Test that the steering algorithm sends all the tasks to the next stage. Test
     74     that the steering algorithm terminates once all the tasks have been
     75     processed, i.e., the results for the tasks are all ready.
     76     """
     77 
     78     # A list of generations used to test the steering stage.
     79     generations = []
     80 
     81     task_index = 0
     82     previous_generations = None
     83 
     84     # Generate a sequence of generations to be tested. Each generation will
     85     # output the next generation in reverse order of the list when the "Next"
     86     # method is called.
     87     for _ in range(NUMBER_OF_GENERATIONS):
     88       # Use a consecutive sequence of numbers as identifiers for the set of
     89       # tasks put into a generation.
     90       test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
     91       tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
     92       steering_tasks = set(tasks)
     93 
     94       # Let the previous generation as the offspring generation of the current
     95       # generation.
     96       current_generation = MockGeneration(steering_tasks, previous_generations)
     97       generations.insert(0, current_generation)
     98       previous_generations = [current_generation]
     99 
    100       task_index += NUMBER_OF_TASKS
    101 
    102     # If there is no generation at all, the unittest returns right away.
    103     if not current_generation:
    104       return
    105 
    106     # Set up the input and result queue for the steering method.
    107     manager = multiprocessing.Manager()
    108     input_queue = manager.Queue()
    109     result_queue = manager.Queue()
    110 
    111     steering_process = multiprocessing.Process(
    112         target=steering.Steering,
    113         args=(set(), [current_generation], input_queue, result_queue))
    114     steering_process.start()
    115 
    116     # Test that each generation is processed properly. I.e., the generations are
    117     # processed in order.
    118     while generations:
    119       generation = generations.pop(0)
    120       tasks = [task for task in generation.Pool()]
    121 
    122       # Test that all the tasks are processed once and only once.
    123       while tasks:
    124         task = result_queue.get()
    125 
    126         assert task in tasks
    127         tasks.remove(task)
    128 
    129         input_queue.put(task)
    130 
    131     task = result_queue.get()
    132 
    133     # Test that the steering algorithm returns properly after processing all
    134     # the generations.
    135     assert task == pipeline_process.POISONPILL
    136 
    137     steering_process.join()
    138 
    139   def testCache(self):
    140     """The steering algorithm returns immediately if there is no new tasks.
    141 
    142     If all the new tasks have been cached before, the steering algorithm does
    143     not have to execute these tasks again and thus can terminate right away.
    144     """
    145 
    146     # Put a set of tasks in the cache and add this set to initial generation.
    147     test_ranges = range(NUMBER_OF_TASKS)
    148     tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
    149     steering_tasks = set(tasks)
    150 
    151     current_generation = MockGeneration(steering_tasks, None)
    152 
    153     # Set up the input and result queue for the steering method.
    154     manager = multiprocessing.Manager()
    155     input_queue = manager.Queue()
    156     result_queue = manager.Queue()
    157 
    158     steering_process = multiprocessing.Process(
    159         target=steering.Steering,
    160         args=(steering_tasks, [current_generation], input_queue, result_queue))
    161 
    162     steering_process.start()
    163 
    164     # Test that the steering method returns right away.
    165     assert result_queue.get() == pipeline_process.POISONPILL
    166     steering_process.join()
    167 
    168 
    169 if __name__ == '__main__':
    170   unittest.main()
    171