1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import math 7 import threading 8 9 import common 10 from autotest_lib.client.common_lib import error 11 from autotest_lib.client.common_lib.cros import retry 12 from autotest_lib.frontend.afe.json_rpc import proxy 13 from autotest_lib.server import frontend 14 try: 15 from chromite.lib import retry_util 16 from chromite.lib import timeout_util 17 except ImportError: 18 logging.warn('Unable to import chromite.') 19 retry_util = None 20 timeout_util = None 21 22 23 def convert_timeout_to_retry(backoff, timeout_min, delay_sec): 24 """Compute the number of retry attempts for use with chromite.retry_util. 25 26 @param backoff: The exponential backoff factor. 27 @param timeout_min: The maximum amount of time (in minutes) to sleep. 28 @param delay_sec: The amount to sleep (in seconds) between each attempt. 29 30 @return: The number of retry attempts in the case of exponential backoff. 31 """ 32 # Estimate the max_retry in the case of exponential backoff: 33 # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r) 34 # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) ) 35 # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff)) 36 # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff) 37 total_sleep = timeout_min * 60 38 numerator = math.log10(1-(total_sleep/delay_sec)*(1-backoff)) 39 denominator = math.log10(backoff) 40 return int(math.ceil(numerator/denominator)) 41 42 43 class RetryingAFE(frontend.AFE): 44 """Wrapper around frontend.AFE that retries all RPCs. 45 46 Timeout for retries and delay between retries are configurable. 47 """ 48 def __init__(self, timeout_min=30, delay_sec=10, **dargs): 49 """Constructor 50 51 @param timeout_min: timeout in minutes until giving up. 52 @param delay_sec: pre-jittered delay between retries in seconds. 53 """ 54 self.timeout_min = timeout_min 55 self.delay_sec = delay_sec 56 super(RetryingAFE, self).__init__(**dargs) 57 58 59 def set_timeout(self, timeout_min): 60 """Set timeout minutes for the AFE server. 61 62 @param timeout_min: The timeout minutes for AFE server. 63 """ 64 self.timeout_min = timeout_min 65 66 67 def run(self, call, **dargs): 68 if retry_util is None: 69 raise ImportError('Unable to import chromite. Please consider to ' 70 'run build_externals to build site packages.') 71 # exc_retry: We retry if this exception is raised. 72 # blacklist: Exceptions that we raise immediately if caught. 73 exc_retry = Exception 74 blacklist = (ImportError, error.RPCException, proxy.JSONRPCException, 75 timeout_util.TimeoutError) 76 backoff = 2 77 max_retry = convert_timeout_to_retry(backoff, self.timeout_min, 78 self.delay_sec) 79 80 def _run(self, call, **dargs): 81 return super(RetryingAFE, self).run(call, **dargs) 82 83 def handler(exc): 84 """Check if exc is an exc_retry or if it's blacklisted. 85 86 @param exc: An exception. 87 88 @return: True if exc is an exc_retry and is not 89 blacklisted. False otherwise. 90 """ 91 is_exc_to_check = isinstance(exc, exc_retry) 92 is_blacklisted = isinstance(exc, blacklist) 93 return is_exc_to_check and not is_blacklisted 94 95 # If the call is not in main thread, signal can't be used to abort the 96 # call. In that case, use a basic retry which does not enforce timeout 97 # if the process hangs. 98 @retry.retry(Exception, timeout_min=self.timeout_min, 99 delay_sec=self.delay_sec, 100 blacklist=[ImportError, error.RPCException, 101 proxy.ValidationError]) 102 def _run_in_child_thread(self, call, **dargs): 103 return super(RetryingAFE, self).run(call, **dargs) 104 105 if isinstance(threading.current_thread(), threading._MainThread): 106 # Set the keyword argument for GenericRetry 107 dargs['sleep'] = self.delay_sec 108 dargs['backoff_factor'] = backoff 109 with timeout_util.Timeout(self.timeout_min * 60): 110 return retry_util.GenericRetry(handler, max_retry, _run, 111 self, call, **dargs) 112 else: 113 return _run_in_child_thread(self, call, **dargs) 114 115 116 class RetryingTKO(frontend.TKO): 117 """Wrapper around frontend.TKO that retries all RPCs. 118 119 Timeout for retries and delay between retries are configurable. 120 """ 121 def __init__(self, timeout_min=30, delay_sec=10, **dargs): 122 """Constructor 123 124 @param timeout_min: timeout in minutes until giving up. 125 @param delay_sec: pre-jittered delay between retries in seconds. 126 """ 127 self.timeout_min = timeout_min 128 self.delay_sec = delay_sec 129 super(RetryingTKO, self).__init__(**dargs) 130 131 132 def run(self, call, **dargs): 133 @retry.retry(Exception, timeout_min=self.timeout_min, 134 delay_sec=self.delay_sec, 135 blacklist=[ImportError, error.RPCException, 136 proxy.ValidationError]) 137 def _run(self, call, **dargs): 138 return super(RetryingTKO, self).run(call, **dargs) 139 return _run(self, call, **dargs) 140