Home | History | Annotate | Download | only in bestflags
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 """Pipeline Process unittest.
      5 
      6 Part of the Chrome build flags optimization.
      7 """
      8 
      9 __author__ = 'yuhenglong (at] google.com (Yuheng Long)'
     10 
     11 import multiprocessing
     12 import unittest
     13 
     14 from mock_task import MockTask
     15 import pipeline_process
     16 
     17 # Pick an integer at random.
     18 ERROR = -334
     19 # Pick an integer at random.
     20 TEST_STAGE = -8
     21 
     22 
     23 def MockHelper(stage, done_dict, helper_queue, _, result_queue):
     24   """This method echos input to the output."""
     25 
     26   assert stage == TEST_STAGE
     27   while True:
     28     if not helper_queue.empty():
     29       task = helper_queue.get()
     30       if task == pipeline_process.POISONPILL:
     31         # Poison pill means shutdown
     32         break
     33 
     34       if task in done_dict:
     35         # verify that it does not get duplicate "1"s in the test.
     36         result_queue.put(ERROR)
     37       else:
     38         result_queue.put(('helper', task.GetIdentifier(TEST_STAGE)))
     39 
     40 
     41 def MockWorker(stage, task, _, result_queue):
     42   assert stage == TEST_STAGE
     43   result_queue.put(('worker', task.GetIdentifier(TEST_STAGE)))
     44 
     45 
     46 class PipelineProcessTest(unittest.TestCase):
     47   """This class test the PipelineProcess.
     48 
     49   All the task inserted into the input queue should be taken out and hand to the
     50   actual pipeline handler, except for the POISON_PILL.  All these task should
     51   also be passed to the next pipeline stage via the output queue.
     52   """
     53 
     54   def testRun(self):
     55     """Test the run method.
     56 
     57     Ensure that all the tasks inserted into the queue are properly handled.
     58     """
     59 
     60     manager = multiprocessing.Manager()
     61     inp = manager.Queue()
     62     output = manager.Queue()
     63 
     64     process = pipeline_process.PipelineProcess(
     65         2, 'testing', {}, TEST_STAGE, inp, MockHelper, MockWorker, output)
     66 
     67     process.start()
     68     inp.put(MockTask(TEST_STAGE, 1))
     69     inp.put(MockTask(TEST_STAGE, 1))
     70     inp.put(MockTask(TEST_STAGE, 2))
     71     inp.put(pipeline_process.POISONPILL)
     72     process.join()
     73 
     74     # All tasks are processed once and only once.
     75     result = [('worker', 1), ('helper', 1), ('worker', 2),
     76               pipeline_process.POISONPILL]
     77     while result:
     78       task = output.get()
     79 
     80       # One "1"s is passed to the worker and one to the helper.
     81       self.assertNotEqual(task, ERROR)
     82 
     83       # The messages received should be exactly the same as the result.
     84       self.assertTrue(task in result)
     85       result.remove(task)
     86 
     87 
     88 if __name__ == '__main__':
     89   unittest.main()
     90