Home | History | Annotate | Download | only in bestflags
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 """Pipeline process that encapsulates the actual content.
      5 
      6 Part of the Chrome build flags optimization.
      7 
      8 The actual stages include the builder and the executor.
      9 """
     10 
     11 __author__ = 'yuhenglong (at] google.com (Yuheng Long)'
     12 
     13 import multiprocessing
     14 
     15 # Pick an integer at random.
     16 POISONPILL = 975
     17 
     18 
     19 class PipelineProcess(multiprocessing.Process):
     20   """A process that encapsulates the actual content pipeline stage.
     21 
     22   The actual pipeline stage can be the builder or the tester.  This process
     23   continuously pull tasks from the queue until a poison pill is received.
     24   Once a job is received, it will hand it to the actual stage for processing.
     25 
     26   Each pipeline stage contains three modules.
     27   The first module continuously pulls task from the input queue. It searches the
     28   cache to check whether the task has encountered before. If so, duplicate
     29   computation can be avoided.
     30   The second module consists of a pool of workers that do the actual work, e.g.,
     31   the worker will compile the source code and get the image in the builder
     32   pipeline stage.
     33   The third module is a helper that put the result cost to the cost field of the
     34   duplicate tasks. For example, if two tasks are equivalent, only one task, say
     35   t1 will be executed and the other task, say t2 will not be executed. The third
     36   mode gets the result from t1, when it is available and set the cost of t2 to
     37   be the same as that of t1.
     38   """
     39 
     40   def __init__(self, num_processes, name, cache, stage, task_queue, helper,
     41                worker, result_queue):
     42     """Set up input/output queue and the actual method to be called.
     43 
     44     Args:
     45       num_processes: Number of helpers subprocessors this stage has.
     46       name: The name of this stage.
     47       cache: The computed tasks encountered before.
     48       stage: An int value that specifies the stage for this pipeline stage, for
     49         example, build stage or test stage. This value will be used to retrieve
     50         the keys in different stage. I.e., the flags set is the key in build
     51         stage and the checksum is the key in the test stage. The key is used to
     52         detect duplicates.
     53       task_queue: The input task queue for this pipeline stage.
     54       helper: The method hosted by the helper module to fill up the cost of the
     55         duplicate tasks.
     56       worker: The method hosted by the worker pools to do the actual work, e.g.,
     57         compile the image.
     58       result_queue: The output task queue for this pipeline stage.
     59     """
     60 
     61     multiprocessing.Process.__init__(self)
     62 
     63     self._name = name
     64     self._task_queue = task_queue
     65     self._result_queue = result_queue
     66 
     67     self._helper = helper
     68     self._worker = worker
     69 
     70     self._cache = cache
     71     self._stage = stage
     72     self._num_processes = num_processes
     73 
     74     # the queues used by the modules for communication
     75     manager = multiprocessing.Manager()
     76     self._helper_queue = manager.Queue()
     77     self._work_queue = manager.Queue()
     78 
     79   def run(self):
     80     """Busy pulling the next task from the queue for execution.
     81 
     82     Once a job is pulled, this stage invokes the actual stage method and submits
     83     the result to the next pipeline stage.
     84 
     85     The process will terminate on receiving the poison pill from previous stage.
     86     """
     87 
     88     # the worker pool
     89     work_pool = multiprocessing.Pool(self._num_processes)
     90 
     91     # the helper process
     92     helper_process = multiprocessing.Process(
     93         target=self._helper,
     94         args=(self._stage, self._cache, self._helper_queue, self._work_queue,
     95               self._result_queue))
     96     helper_process.start()
     97     mycache = self._cache.keys()
     98 
     99     while True:
    100       task = self._task_queue.get()
    101       if task == POISONPILL:
    102         # Poison pill means shutdown
    103         self._result_queue.put(POISONPILL)
    104         break
    105 
    106       task_key = task.GetIdentifier(self._stage)
    107       if task_key in mycache:
    108         # The task has been encountered before. It will be sent to the helper
    109         # module for further processing.
    110         self._helper_queue.put(task)
    111       else:
    112         # Let the workers do the actual work.
    113         work_pool.apply_async(
    114             self._worker,
    115             args=(self._stage, task, self._work_queue, self._result_queue))
    116         mycache.append(task_key)
    117 
    118     # Shutdown the workers pool and the helper process.
    119     work_pool.close()
    120     work_pool.join()
    121 
    122     self._helper_queue.put(POISONPILL)
    123     helper_process.join()
    124