Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import sys
      7 import threading
      8 import time
      9 from autotest_lib.client.common_lib import error
     10 
     11 
     12 class BaseStressor(threading.Thread):
     13     """
     14     Implements common functionality for *Stressor classes.
     15 
     16     @var stressor: callable which performs a single stress event.
     17     """
     18     def __init__(self, stressor, on_exit=None, escalate_exceptions=True):
     19         """
     20         Initialize the ControlledStressor.
     21 
     22         @param stressor: callable which performs a single stress event.
     23         @param on_exit: callable which will be called when the thread finishes.
     24         @param escalate_exceptions: whether to escalate exceptions to the parent
     25             thread; defaults to True.
     26         """
     27         super(BaseStressor, self).__init__()
     28         self.daemon = True
     29         self.stressor = stressor
     30         self.on_exit = on_exit
     31         self._escalate_exceptions = escalate_exceptions
     32         self._exc_info = None
     33 
     34 
     35     def start(self, start_condition=None, start_timeout_secs=None):
     36         """
     37         Creates a new thread which will call the run() method.
     38 
     39         Optionally takes a wait condition before the stressor loop. Returns
     40         immediately.
     41 
     42         @param start_condition: the new thread will wait until this optional
     43             callable returns True before running the stressor.
     44         @param start_timeout_secs: how long to wait for |start_condition| to
     45             become True, or None to wait forever.
     46         """
     47         self._start_condition = start_condition
     48         self._start_timeout_secs = start_timeout_secs
     49         super(BaseStressor, self).start()
     50 
     51 
     52     def run(self):
     53         """
     54         Wait for |_start_condition|, and then start the stressor loop.
     55 
     56         Overloaded from threading.Thread. This is run in a separate thread when
     57         start() is called.
     58         """
     59         try:
     60             self._wait_for_start_condition()
     61             self._loop_stressor()
     62         except Exception as e:
     63             if self._escalate_exceptions:
     64                 self._exc_info = sys.exc_info()
     65             raise  # Terminates this thread. Caller continues to run.
     66         finally:
     67             if self.on_exit:
     68               self.on_exit()
     69 
     70 
     71     def _wait_for_start_condition(self):
     72         """
     73         Loop until _start_condition() returns True, or _start_timeout_secs
     74         have elapsed.
     75 
     76         @raise error.TestFail if we time out waiting for the start condition
     77         """
     78         if self._start_condition is None:
     79             return
     80 
     81         elapsed_secs = 0
     82         while not self._start_condition():
     83             if (self._start_timeout_secs and
     84                     elapsed_secs >= self._start_timeout_secs):
     85                 raise error.TestFail('start condition did not become true '
     86                                      'within %d seconds' %
     87                                      self._start_timeout_secs)
     88             time.sleep(1)
     89             elapsed_secs += 1
     90 
     91 
     92     def _loop_stressor(self):
     93         """
     94         Apply stressor in a loop.
     95 
     96         Overloaded by the particular *Stressor.
     97         """
     98         raise NotImplementedError
     99 
    100 
    101     def reraise(self):
    102         """
    103         Reraise an exception raised in the thread's stress loop.
    104 
    105         This is a No-op if no exception was raised.
    106         """
    107         if self._exc_info:
    108             exc_info = self._exc_info
    109             self._exc_info = None
    110             raise exc_info[0], exc_info[1], exc_info[2]
    111 
    112 
    113 class ControlledStressor(BaseStressor):
    114     """
    115     Run a stressor in loop on a separate thread.
    116 
    117     Creates a new thread and calls |stressor| in a loop until stop() is called.
    118     """
    119     def __init__(self, stressor, on_exit=None, escalate_exceptions=True):
    120         """
    121         Initialize the ControlledStressor.
    122 
    123         @param stressor: callable which performs a single stress event.
    124         @param on_exit: callable which will be called when the thread finishes.
    125         @param escalate_exceptions: whether to escalate exceptions to the parent
    126             thread when stop() is called; defaults to True.
    127         """
    128         self._complete = threading.Event()
    129         super(ControlledStressor, self).__init__(stressor, on_exit,
    130                                                  escalate_exceptions)
    131 
    132 
    133     def _loop_stressor(self):
    134         """Overloaded from parent."""
    135         iteration_num = 0
    136         while not self._complete.is_set():
    137             iteration_num += 1
    138             logging.info('Stressor iteration: %d', iteration_num)
    139             self.stressor()
    140 
    141 
    142     def start(self, start_condition=None, start_timeout_secs=None):
    143         """Start applying the stressor.
    144 
    145         Overloaded from parent.
    146 
    147         @param start_condition: the new thread will wait to until this optional
    148             callable returns True before running the stressor.
    149         @param start_timeout_secs: how long to wait for |start_condition| to
    150             become True, or None to wait forever.
    151         """
    152         self._complete.clear()
    153         super(ControlledStressor, self).start(start_condition,
    154                                               start_timeout_secs)
    155 
    156 
    157     def stop(self, timeout=45):
    158         """
    159         Stop applying the stressor.
    160 
    161         @param timeout: maximum time to wait for a single run of the stressor to
    162             complete, defaults to 45 seconds.
    163         """
    164         self._complete.set()
    165         self.join(timeout)
    166         self.reraise()
    167 
    168 
    169 class CountedStressor(BaseStressor):
    170     """
    171     Run a stressor in a loop on a separate thread a given number of times.
    172 
    173     Creates a new thread and calls |stressor| in a loop |iterations| times. The
    174     calling thread can use wait() to block until the loop completes. If the
    175     stressor thread terminates with an exception, wait() will propagate that
    176     exception to the thread that called wait().
    177     """
    178     def _loop_stressor(self):
    179         """Overloaded from parent."""
    180         for iteration_num in xrange(1, self._iterations + 1):
    181             logging.info('Stressor iteration: %d of %d',
    182                          iteration_num, self._iterations)
    183             self.stressor()
    184 
    185 
    186     def start(self, iterations, start_condition=None, start_timeout_secs=None):
    187         """
    188         Apply the stressor a given number of times.
    189 
    190         Overloaded from parent.
    191 
    192         @param iterations: number of times to apply the stressor.
    193         @param start_condition: the new thread will wait to until this optional
    194             callable returns True before running the stressor.
    195         @param start_timeout_secs: how long to wait for |start_condition| to
    196             become True, or None to wait forever.
    197         """
    198         self._iterations = iterations
    199         super(CountedStressor, self).start(start_condition, start_timeout_secs)
    200 
    201 
    202     def wait(self, timeout=None):
    203         """Wait until the stressor completes.
    204 
    205         @param timeout: maximum time for the thread to complete, by default
    206             never times out.
    207         """
    208         self.join(timeout)
    209         self.reraise()
    210