Home | History | Annotate | Download | only in dynamic_suite
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import math
      7 import threading
      8 
      9 import common
     10 from autotest_lib.client.common_lib import error
     11 from autotest_lib.client.common_lib.cros import retry
     12 from autotest_lib.frontend.afe.json_rpc import proxy
     13 from autotest_lib.server import frontend
     14 try:
     15     from chromite.lib import retry_util
     16     from chromite.lib import timeout_util
     17 except ImportError:
     18     logging.warn('Unable to import chromite.')
     19     retry_util = None
     20     timeout_util = None
     21 
     22 
     23 def convert_timeout_to_retry(backoff, timeout_min, delay_sec):
     24     """Compute the number of retry attempts for use with chromite.retry_util.
     25 
     26     @param backoff: The exponential backoff factor.
     27     @param timeout_min: The maximum amount of time (in minutes) to sleep.
     28     @param delay_sec: The amount to sleep (in seconds) between each attempt.
     29 
     30     @return: The number of retry attempts in the case of exponential backoff.
     31     """
     32     # Estimate the max_retry in the case of exponential backoff:
     33     # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r)
     34     # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) )
     35     # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff))
     36     # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff)
     37     total_sleep = timeout_min * 60
     38     numerator = math.log10(1-(total_sleep/delay_sec)*(1-backoff))
     39     denominator = math.log10(backoff)
     40     return int(math.ceil(numerator/denominator))
     41 
     42 
     43 class RetryingAFE(frontend.AFE):
     44     """Wrapper around frontend.AFE that retries all RPCs.
     45 
     46     Timeout for retries and delay between retries are configurable.
     47     """
     48     def __init__(self, timeout_min=30, delay_sec=10, **dargs):
     49         """Constructor
     50 
     51         @param timeout_min: timeout in minutes until giving up.
     52         @param delay_sec: pre-jittered delay between retries in seconds.
     53         """
     54         self.timeout_min = timeout_min
     55         self.delay_sec = delay_sec
     56         super(RetryingAFE, self).__init__(**dargs)
     57 
     58 
     59     def set_timeout(self, timeout_min):
     60         """Set timeout minutes for the AFE server.
     61 
     62         @param timeout_min: The timeout minutes for AFE server.
     63         """
     64         self.timeout_min = timeout_min
     65 
     66 
     67     def run(self, call, **dargs):
     68         if retry_util is None:
     69             raise ImportError('Unable to import chromite. Please consider to '
     70                               'run build_externals to build site packages.')
     71         # exc_retry: We retry if this exception is raised.
     72         # blacklist: Exceptions that we raise immediately if caught.
     73         exc_retry = Exception
     74         blacklist = (ImportError, error.RPCException, proxy.JSONRPCException,
     75                      timeout_util.TimeoutError)
     76         backoff = 2
     77         max_retry = convert_timeout_to_retry(backoff, self.timeout_min,
     78                                              self.delay_sec)
     79 
     80         def _run(self, call, **dargs):
     81             return super(RetryingAFE, self).run(call, **dargs)
     82 
     83         def handler(exc):
     84             """Check if exc is an exc_retry or if it's blacklisted.
     85 
     86             @param exc: An exception.
     87 
     88             @return: True if exc is an exc_retry and is not
     89                      blacklisted. False otherwise.
     90             """
     91             is_exc_to_check = isinstance(exc, exc_retry)
     92             is_blacklisted = isinstance(exc, blacklist)
     93             return is_exc_to_check and not is_blacklisted
     94 
     95         # If the call is not in main thread, signal can't be used to abort the
     96         # call. In that case, use a basic retry which does not enforce timeout
     97         # if the process hangs.
     98         @retry.retry(Exception, timeout_min=self.timeout_min,
     99                      delay_sec=self.delay_sec,
    100                      blacklist=[ImportError, error.RPCException,
    101                                 proxy.ValidationError])
    102         def _run_in_child_thread(self, call, **dargs):
    103             return super(RetryingAFE, self).run(call, **dargs)
    104 
    105         if isinstance(threading.current_thread(), threading._MainThread):
    106             # Set the keyword argument for GenericRetry
    107             dargs['sleep'] = self.delay_sec
    108             dargs['backoff_factor'] = backoff
    109             with timeout_util.Timeout(self.timeout_min * 60):
    110                 return retry_util.GenericRetry(handler, max_retry, _run,
    111                                                self, call, **dargs)
    112         else:
    113             return _run_in_child_thread(self, call, **dargs)
    114 
    115 
    116 class RetryingTKO(frontend.TKO):
    117     """Wrapper around frontend.TKO that retries all RPCs.
    118 
    119     Timeout for retries and delay between retries are configurable.
    120     """
    121     def __init__(self, timeout_min=30, delay_sec=10, **dargs):
    122         """Constructor
    123 
    124         @param timeout_min: timeout in minutes until giving up.
    125         @param delay_sec: pre-jittered delay between retries in seconds.
    126         """
    127         self.timeout_min = timeout_min
    128         self.delay_sec = delay_sec
    129         super(RetryingTKO, self).__init__(**dargs)
    130 
    131 
    132     def run(self, call, **dargs):
    133         @retry.retry(Exception, timeout_min=self.timeout_min,
    134                      delay_sec=self.delay_sec,
    135                      blacklist=[ImportError, error.RPCException,
    136                                 proxy.ValidationError])
    137         def _run(self, call, **dargs):
    138             return super(RetryingTKO, self).run(call, **dargs)
    139         return _run(self, call, **dargs)
    140