Home | History | Annotate | Download | only in scheduler
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 
      6 """Thread library for drone management.
      7 
      8 This library contains a threaded task queue capable of starting, monitoring
      9 and syncing threads across remote and localhost drones asynchronously. It also
     10 contains a wrapper for standard python threads that records exceptions so they
     11 can be re-raised in the thread manager. The api exposed by the threaded task
     12 queue is as follows:
     13     1. worker: The staticmethod executed by all worker threads.
     14     2. execute: Takes a list of drones and invokes a worker thread per drone.
     15         This method assumes that all drones have a queue of pending calls
     16         for execution.
     17     3. wait_on_drones: Waits for all worker threads started by execute to finish
     18         and raises any exceptions as a consolidated DroneTaskQueueException.
     19     4. get_results: Returns the results of all threads as a dictionary keyed
     20         on the drones.
     21 """
     22 
     23 import collections
     24 import Queue
     25 import threading
     26 import logging
     27 
     28 import common
     29 from autotest_lib.scheduler import drone_task_queue
     30 
     31 
     32 class ExceptionRememberingThread(threading.Thread):
     33     """A wrapper around regular python threads that records exceptions."""
     34 
     35     def run(self):
     36         """Wrapper around the thread's run method."""
     37         try:
     38             super(ExceptionRememberingThread, self).run()
     39         except Exception as self.err:
     40             logging.error('%s raised an exception that will be re-raised by '
     41                           'the thread pool manager.', self.getName())
     42         else:
     43             self.err = None
     44 
     45 
     46 class ThreadedTaskQueue(drone_task_queue.DroneTaskQueue):
     47     """Threaded implementation of a drone task queue."""
     48 
     49     result = collections.namedtuple('task', ['drone', 'results'])
     50 
     51     def __init__(self, name='thread_queue'):
     52         self.results_queue = Queue.Queue()
     53         self.drone_threads = {}
     54         self.name = name
     55 
     56 
     57     @staticmethod
     58     def worker(drone, results_queue):
     59         """Worker for task execution.
     60 
     61         Execute calls queued against the given drone and place the return value
     62         in results_queue.
     63 
     64         @param drone: A drone with calls to execute.
     65         @param results_queue: A queue, into which the worker places
     66             ThreadedTaskQueue.result from the drone calls.
     67         """
     68         logging.info('(Worker.%s) starting.', drone.hostname)
     69         results_queue.put(ThreadedTaskQueue.result(
     70             drone, drone.execute_queued_calls()))
     71         logging.info('(Worker.%s) finished.', drone.hostname)
     72 
     73 
     74     def wait_on_drones(self):
     75         """Wait on all threads that are currently refreshing a drone.
     76 
     77         @raises DroneTaskQueueException: Consolidated exception for all
     78             drone thread exceptions.
     79         """
     80         if not self.drone_threads:
     81             return
     82         # TODO: Make this process more resilient. We can:
     83         # 1. Timeout the join.
     84         # 2. Kick out the exception/timeout drone.
     85         # 3. Selectively retry exceptions.
     86         # For now, it is compliant with the single threaded drone manager which
     87         # will raise all drone_utility, ssh and drone_manager exceptions.
     88         drone_exceptions = []
     89         for drone, thread in self.drone_threads.iteritems():
     90             tname = thread.getName()
     91             logging.info('(Task Queue) Waiting for %s', tname)
     92             thread.join()
     93             if thread.err:
     94                 drone_exceptions.append((drone, thread.err))
     95         logging.info('(Task Queue) All threads have returned, clearing map.')
     96         self.drone_threads = {}
     97         if not drone_exceptions:
     98             return
     99         exception_msg = ''
    100         for drone, err in drone_exceptions:
    101             exception_msg += ('Drone %s raised Exception %s\n' %
    102                               (drone.hostname, err))
    103         raise drone_task_queue.DroneTaskQueueException(exception_msg)
    104 
    105 
    106     def get_results(self):
    107         """Get a results dictionary keyed on the drones.
    108 
    109         This method synchronously waits till all drone threads have returned
    110         before checking for results. It is meant to be invoked in conjunction
    111         with the 'execute' method, which creates a thread per drone.
    112 
    113         @return: A dictionary of return values from the drones.
    114         """
    115         self.wait_on_drones()
    116         results = {}
    117         while not self.results_queue.empty():
    118             drone_results = self.results_queue.get()
    119             if drone_results.drone in results:
    120                 raise drone_task_queue.DroneTaskQueueException(
    121                         'Task queue has recorded results for drone %s: %s' %
    122                         (drone_results.drone, results))
    123             results[drone_results.drone] = drone_results.results
    124         return results
    125 
    126 
    127     def execute(self, drones, wait=True):
    128         """Invoke a thread per drone, to execute drone_utility in parallel.
    129 
    130         @param drones: A list of drones with calls to execute.
    131         @param wait: If True, this method will only return when all the drones
    132             have returned the result of their respective invocations of
    133             drone_utility. The results_queue and drone_threads will be cleared.
    134             If False, the caller must clear both the queue and the map before
    135             the next invocation of 'execute', by calling 'get_results'.
    136 
    137         @return: A dictionary keyed on the drones, containing a list of return
    138             values from the execution of drone_utility.
    139 
    140         @raises DroneManagerError: If the results queue or drone map isn't empty
    141             at the time of invocation.
    142         """
    143         if not self.results_queue.empty():
    144             raise drone_task_queue.DroneTaskQueueException(
    145                     'Cannot clobber results queue: %s, it should be cleared '
    146                     'through get_results.' % self.results_queue)
    147         if self.drone_threads:
    148             raise drone_task_queue.DroneTaskQueueException(
    149                     'Cannot clobber thread map: %s, it should be cleared '
    150                     'through wait_on_drones' % self.drone_threads)
    151         for drone in drones:
    152             if not drone.get_calls():
    153                 continue
    154             worker_thread = ExceptionRememberingThread(
    155                     target=ThreadedTaskQueue.worker,
    156                     args=(drone, self.results_queue))
    157             # None of these threads are allowed to survive past the tick they
    158             # were spawned in, and the scheduler won't die mid-tick, so none
    159             # of the threads need to be daemons. However, if the scheduler does
    160             # die unexpectedly we can just forsake the daemon threads.
    161             self.drone_threads[drone] = worker_thread
    162             # The name is only used for debugging
    163             worker_thread.setName('%s.%s' %
    164                                   (self.name, drone.hostname.replace('.', '_')))
    165             worker_thread.daemon = True
    166             worker_thread.start()
    167         return self.get_results() if wait else None
    168