Home | History | Annotate | Download | only in bestflags
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 """The pipeline_worker functions of the build and test stage of the framework.
      5 
      6 Part of the Chrome build flags optimization.
      7 
      8 This module defines the helper and the worker. If there are duplicate tasks, for
      9 example t1 and t2, needs to be built/tested, one of them, for example t1, will
     10 be built/tested and the helper waits for the result of t1 and set the results of
     11 the other task, t2 here, to be the same as that of t1. Setting the result of t2
     12 to be the same as t1 is referred to as resolving the result of t2.
     13 The worker invokes the work method of the tasks that are not duplicate.
     14 """
     15 
     16 __author__ = 'yuhenglong (at] google.com (Yuheng Long)'
     17 
     18 import pipeline_process
     19 
     20 
     21 def Helper(stage, done_dict, helper_queue, completed_queue, result_queue):
     22   """Helper that filters duplicate tasks.
     23 
     24   This method Continuously pulls duplicate tasks from the helper_queue. The
     25   duplicate tasks need not be compiled/tested. This method also pulls completed
     26   tasks from the worker queue and let the results of the duplicate tasks be the
     27   same as their corresponding finished task.
     28 
     29   Args:
     30     stage: The current stage of the pipeline, for example, build stage or test
     31       stage.
     32     done_dict: A dictionary of tasks that are done. The key of the dictionary is
     33       the identifier of the task. The value of the dictionary is the results of
     34       performing the corresponding task.
     35     helper_queue: A queue of duplicate tasks whose results need to be resolved.
     36       This is a communication channel between the pipeline_process and this
     37       helper process.
     38     completed_queue: A queue of tasks that have been built/tested. The results
     39       of these tasks are needed to resolve the results of the duplicate tasks.
     40       This is the communication channel between the workers and this helper
     41       process.
     42     result_queue: After the results of the duplicate tasks have been resolved,
     43       the duplicate tasks will be sent to the next stage via this queue.
     44   """
     45 
     46   # The list of duplicate tasks, the results of which need to be resolved.
     47   waiting_list = []
     48 
     49   while True:
     50     # Pull duplicate task from the helper queue.
     51     if not helper_queue.empty():
     52       task = helper_queue.get()
     53 
     54       if task == pipeline_process.POISONPILL:
     55         # Poison pill means no more duplicate task from the helper queue.
     56         break
     57 
     58       # The task has not been performed before.
     59       assert not task.Done(stage)
     60 
     61       # The identifier of this task.
     62       identifier = task.GetIdentifier(stage)
     63 
     64       # If a duplicate task comes before the corresponding resolved results from
     65       # the completed_queue, it will be put in the waiting list. If the result
     66       # arrives before the duplicate task, the duplicate task will be resolved
     67       # right away.
     68       if identifier in done_dict:
     69         # This task has been encountered before and the result is available. The
     70         # result can be resolved right away.
     71         task.SetResult(stage, done_dict[identifier])
     72         result_queue.put(task)
     73       else:
     74         waiting_list.append(task)
     75 
     76     # Check and get completed tasks from completed_queue.
     77     GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
     78                                 result_queue)
     79 
     80   # Wait to resolve the results of the remaining duplicate tasks.
     81   while waiting_list:
     82     GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
     83                                 result_queue)
     84 
     85 
     86 def GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
     87                                 result_queue):
     88   """Pull results from the completed queue and resolves duplicate tasks.
     89 
     90   Args:
     91     stage: The current stage of the pipeline, for example, build stage or test
     92       stage.
     93     completed_queue: A queue of tasks that have been performed. The results of
     94       these tasks are needed to resolve the results of the duplicate tasks. This
     95       is the communication channel between the workers and this method.
     96     done_dict: A dictionary of tasks that are done. The key of the dictionary is
     97       the optimization flags of the task. The value of the dictionary is the
     98       compilation results of the corresponding task.
     99     waiting_list: The list of duplicate tasks, the results of which need to be
    100       resolved.
    101     result_queue: After the results of the duplicate tasks have been resolved,
    102       the duplicate tasks will be sent to the next stage via this queue.
    103 
    104   This helper method tries to pull a completed task from the completed queue.
    105   If it gets a task from the queue, it resolves the results of all the relevant
    106   duplicate tasks in the waiting list. Relevant tasks are the tasks that have
    107   the same flags as the currently received results from the completed_queue.
    108   """
    109   # Pull completed task from the worker queue.
    110   if not completed_queue.empty():
    111     (identifier, result) = completed_queue.get()
    112     done_dict[identifier] = result
    113 
    114     tasks = [t for t in waiting_list if t.GetIdentifier(stage) == identifier]
    115     for duplicate_task in tasks:
    116       duplicate_task.SetResult(stage, result)
    117       result_queue.put(duplicate_task)
    118       waiting_list.remove(duplicate_task)
    119 
    120 
    121 def Worker(stage, task, helper_queue, result_queue):
    122   """Worker that performs the task.
    123 
    124   This method calls the work method of the input task and distribute the result
    125   to the helper and the next stage.
    126 
    127   Args:
    128     stage: The current stage of the pipeline, for example, build stage or test
    129       stage.
    130     task: Input task that needs to be performed.
    131     helper_queue: Queue that holds the completed tasks and the results. This is
    132       the communication channel between the worker and the helper.
    133     result_queue: Queue that holds the completed tasks and the results. This is
    134       the communication channel between the worker and the next stage.
    135   """
    136 
    137   # The task has not been completed before.
    138   assert not task.Done(stage)
    139 
    140   task.Work(stage)
    141   helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage)))
    142   result_queue.put(task)
    143