Home | History | Annotate | Download | only in scheduler
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 
      6 """Thread library for drone management.
      7 
      8 This library contains a threaded task queue capable of starting, monitoring
      9 and syncing threads across remote and localhost drones asynchronously. It also
     10 contains a wrapper for standard python threads that records exceptions so they
     11 can be re-raised in the thread manager. The api exposed by the threaded task
     12 queue is as follows:
     13     1. worker: The staticmethod executed by all worker threads.
     14     2. execute: Takes a list of drones and invokes a worker thread per drone.
     15         This method assumes that all drones have a queue of pending calls
     16         for execution.
     17     3. wait_on_drones: Waits for all worker threads started by execute to finish
     18         and raises any exceptions as a consolidated DroneTaskQueueException.
     19     4. get_results: Returns the results of all threads as a dictionary keyed
     20         on the drones.
     21 """
     22 
     23 import collections
     24 import Queue
     25 import threading
     26 import logging
     27 
     28 import common
     29 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     30 from autotest_lib.scheduler import drone_task_queue
     31 
     32 
     33 class ExceptionRememberingThread(threading.Thread):
     34     """A wrapper around regular python threads that records exceptions."""
     35 
     36     def run(self):
     37         """Wrapper around the thread's run method."""
     38         try:
     39             with autotest_stats.Timer(self.name):
     40                 super(ExceptionRememberingThread, self).run()
     41         except Exception as self.err:
     42             logging.error('%s raised an exception that will be re-raised by '
     43                           'the thread pool manager.', self.getName())
     44         else:
     45             self.err = None
     46 
     47 
     48 class PersistentTimer(object):
     49     """A class to handle timers across local scopes."""
     50 
     51     def __init__(self, name):
     52         """Initialize a persistent timer.
     53 
     54         @param name: The name/key to insert timings under.
     55         """
     56         self.name = name
     57         self.timer = None
     58 
     59 
     60     def start(self):
     61         """Create and start a new timer."""
     62         self.timer = autotest_stats.Timer(self.name)
     63         self.timer.start()
     64 
     65 
     66     def stop(self):
     67         """Stop a previously started timer."""
     68         try:
     69             self.timer.stop()
     70         except (AssertionError, AttributeError) as e:
     71             logging.info('Stopping timer %s failed: %s', self.name, e)
     72         finally:
     73             self.timer = None
     74 
     75 
     76 class ThreadedTaskQueue(drone_task_queue.DroneTaskQueue):
     77     """Threaded implementation of a drone task queue."""
     78 
     79     result = collections.namedtuple('task', ['drone', 'results'])
     80 
     81     def __init__(self, name='thread_queue'):
     82         self.results_queue = Queue.Queue()
     83         self.drone_threads = {}
     84         self.name = name
     85         # The persistent timer is used to measure net time spent
     86         # refreshing all drones across 'execute' and 'get_results'.
     87         self.timer = PersistentTimer(self.name)
     88 
     89 
     90     @staticmethod
     91     def worker(drone, results_queue):
     92         """Worker for task execution.
     93 
     94         Execute calls queued against the given drone and place the return value
     95         in results_queue.
     96 
     97         @param drone: A drone with calls to execute.
     98         @param results_queue: A queue, into which the worker places
     99             ThreadedTaskQueue.result from the drone calls.
    100         """
    101         logging.info('(Worker.%s) starting.', drone.hostname)
    102         results_queue.put(ThreadedTaskQueue.result(
    103             drone, drone.execute_queued_calls()))
    104         logging.info('(Worker.%s) finished.', drone.hostname)
    105 
    106 
    107     def wait_on_drones(self):
    108         """Wait on all threads that are currently refreshing a drone.
    109 
    110         @raises DroneTaskQueueException: Consolidated exception for all
    111             drone thread exceptions.
    112         """
    113         if not self.drone_threads:
    114             return
    115         # TODO: Make this process more resilient. We can:
    116         # 1. Timeout the join.
    117         # 2. Kick out the exception/timeout drone.
    118         # 3. Selectively retry exceptions.
    119         # For now, it is compliant with the single threaded drone manager which
    120         # will raise all drone_utility, ssh and drone_manager exceptions.
    121         drone_exceptions = []
    122         for drone, thread in self.drone_threads.iteritems():
    123             tname = thread.getName()
    124             logging.info('(Task Queue) Waiting for %s', tname)
    125             thread.join()
    126             if thread.err:
    127                 drone_exceptions.append((drone, thread.err))
    128         logging.info('(Task Queue) All threads have returned, clearing map.')
    129         self.drone_threads = {}
    130         if not drone_exceptions:
    131             return
    132         exception_msg = ''
    133         for drone, err in drone_exceptions:
    134             exception_msg += ('Drone %s raised Exception %s\n' %
    135                               (drone.hostname, err))
    136         raise drone_task_queue.DroneTaskQueueException(exception_msg)
    137 
    138 
    139     def get_results(self):
    140         """Get a results dictionary keyed on the drones.
    141 
    142         This method synchronously waits till all drone threads have returned
    143         before checking for results. It is meant to be invoked in conjunction
    144         with the 'execute' method, which creates a thread per drone.
    145 
    146         @return: A dictionary of return values from the drones.
    147         """
    148         self.wait_on_drones()
    149         self.timer.stop()
    150         results = {}
    151         while not self.results_queue.empty():
    152             drone_results = self.results_queue.get()
    153             if drone_results.drone in results:
    154                 raise drone_task_queue.DroneTaskQueueException(
    155                         'Task queue has recorded results for drone %s: %s' %
    156                         (drone_results.drone, results))
    157             results[drone_results.drone] = drone_results.results
    158         return results
    159 
    160 
    161     def execute(self, drones, wait=True):
    162         """Invoke a thread per drone, to execute drone_utility in parallel.
    163 
    164         @param drones: A list of drones with calls to execute.
    165         @param wait: If True, this method will only return when all the drones
    166             have returned the result of their respective invocations of
    167             drone_utility. The results_queue and drone_threads will be cleared.
    168             If False, the caller must clear both the queue and the map before
    169             the next invocation of 'execute', by calling 'get_results'.
    170 
    171         @return: A dictionary keyed on the drones, containing a list of return
    172             values from the execution of drone_utility.
    173 
    174         @raises DroneManagerError: If the results queue or drone map isn't empty
    175             at the time of invocation.
    176         """
    177         if not self.results_queue.empty():
    178             raise drone_task_queue.DroneTaskQueueException(
    179                     'Cannot clobber results queue: %s, it should be cleared '
    180                     'through get_results.' % self.results_queue)
    181         if self.drone_threads:
    182             raise drone_task_queue.DroneTaskQueueException(
    183                     'Cannot clobber thread map: %s, it should be cleared '
    184                     'through wait_on_drones' % self.drone_threads)
    185         self.timer.start()
    186         for drone in drones:
    187             if not drone.get_calls():
    188                 continue
    189             worker_thread = ExceptionRememberingThread(
    190                     target=ThreadedTaskQueue.worker,
    191                     args=(drone, self.results_queue))
    192             # None of these threads are allowed to survive past the tick they
    193             # were spawned in, and the scheduler won't die mid-tick, so none
    194             # of the threads need to be daemons. However, if the scheduler does
    195             # die unexpectedly we can just forsake the daemon threads.
    196             self.drone_threads[drone] = worker_thread
    197             # The name is only used for debugging
    198             worker_thread.setName('%s.%s' %
    199                                   (self.name, drone.hostname.replace('.', '_')))
    200             worker_thread.daemon = True
    201             worker_thread.start()
    202         return self.get_results() if wait else None
    203