Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/env python
      2 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 """Standalone service to monitor AFE servers and report to ts_mon"""
      7 import sys
      8 import time
      9 import multiprocessing
     10 import urllib2
     11 
     12 import common
     13 from autotest_lib.client.common_lib import global_config
     14 from autotest_lib.frontend.afe.json_rpc import proxy
     15 from autotest_lib.server import frontend
     16 # import needed to setup host_attributes
     17 # pylint: disable=unused-import
     18 from autotest_lib.server import site_host_attributes
     19 from autotest_lib.site_utils import server_manager_utils
     20 from chromite.lib import commandline
     21 from chromite.lib import cros_logging as logging
     22 from chromite.lib import metrics
     23 from chromite.lib import ts_mon_config
     24 
     25 METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
     26 METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
     27 METRIC_TICK = METRIC_ROOT + '/tick'
     28 METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
     29 
     30 FAILURE_REASONS = {
     31         proxy.JSONRPCException: 'JSONRPCException',
     32         }
     33 
     34 def afe_rpc_call(hostname):
     35     """Perform one rpc call set on server
     36 
     37     @param hostname: server's hostname to poll
     38     """
     39     afe_monitor = AfeMonitor(hostname)
     40     try:
     41         afe_monitor.run()
     42     except Exception as e:
     43         metrics.Counter(METRIC_MONITOR_ERROR).increment(
     44                 fields={'target_hostname': hostname})
     45         logging.exception(e)
     46 
     47 
     48 def update_shards(shards, shards_lock, period=600, stop_event=None):
     49     """Updates dict of shards
     50 
     51     @param shards: list of shards to be updated
     52     @param shards_lock: shared lock for accessing shards
     53     @param period: time between polls
     54     @param stop_event: Event that can be set to stop polling
     55     """
     56     while(not stop_event or not stop_event.is_set()):
     57         start_time = time.time()
     58 
     59         logging.debug('Updating Shards')
     60         new_shards = set(server_manager_utils.get_shards())
     61 
     62         with shards_lock:
     63             current_shards = set(shards)
     64             rm_shards = current_shards - new_shards
     65             add_shards = new_shards - current_shards
     66 
     67             if rm_shards:
     68                 for s in rm_shards:
     69                     shards.remove(s)
     70 
     71             if add_shards:
     72                 shards.extend(add_shards)
     73 
     74         if rm_shards:
     75             logging.info('Servers left production: %s', str(rm_shards))
     76 
     77         if add_shards:
     78             logging.info('Servers entered production: %s',
     79                     str(add_shards))
     80 
     81         wait_time = (start_time + period) - time.time()
     82         if wait_time > 0:
     83             time.sleep(wait_time)
     84 
     85 
     86 def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
     87                      stop_event=None):
     88     """Blocking function that polls all servers and shards
     89 
     90     @param servers: list of servers to poll
     91     @param servers_lock: lock to be used when accessing servers or shards
     92     @param shards: list of shards to poll
     93     @param period: time between polls
     94     @param stop_event: Event that can be set to stop polling
     95     """
     96     pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
     97 
     98     while(not stop_event or not stop_event.is_set()):
     99         start_time = time.time()
    100         with servers_lock:
    101             all_servers = set(servers).union(shards)
    102 
    103         logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
    104         pool.map(afe_rpc_call, all_servers)
    105 
    106         logging.debug('Finished Server Polling')
    107 
    108         metrics.Counter(METRIC_TICK).increment()
    109 
    110         wait_time = (start_time + period) - time.time()
    111         if wait_time > 0:
    112             time.sleep(wait_time)
    113 
    114 
    115 class RpcFlightRecorder(object):
    116     """Monitors a list of AFE"""
    117     def __init__(self, servers, with_shards=True, poll_period=60):
    118         """
    119         @param servers: list of afe services to monitor
    120         @param with_shards: also record status on shards
    121         @param poll_period: frequency to poll all services, in seconds
    122         """
    123         self._manager = multiprocessing.Manager()
    124 
    125         self._poll_period = poll_period
    126 
    127         self._servers = self._manager.list(servers)
    128         self._servers_lock = self._manager.RLock()
    129 
    130         self._with_shards = with_shards
    131         self._shards = self._manager.list()
    132         self._update_shards_ps = None
    133         self._poll_rpc_server_ps = None
    134 
    135         self._stop_event = multiprocessing.Event()
    136 
    137     def start(self):
    138         """Call to start recorder"""
    139         if(self._with_shards):
    140             shard_args = [self._shards, self._servers_lock]
    141             shard_kwargs = {'stop_event': self._stop_event}
    142             self._update_shards_ps = multiprocessing.Process(
    143                     name='update_shards',
    144                     target=update_shards,
    145                     args=shard_args,
    146                     kwargs=shard_kwargs)
    147 
    148             self._update_shards_ps.start()
    149 
    150         poll_args = [self._servers, self._servers_lock]
    151         poll_kwargs= {'shards':self._shards,
    152                      'period':self._poll_period,
    153                      'stop_event':self._stop_event}
    154         self._poll_rpc_server_ps = multiprocessing.Process(
    155                 name='poll_rpc_servers',
    156                 target=poll_rpc_servers,
    157                 args=poll_args,
    158                 kwargs=poll_kwargs)
    159 
    160         self._poll_rpc_server_ps.start()
    161 
    162     def close(self):
    163         """Send close event to all sub processes"""
    164         self._stop_event.set()
    165 
    166 
    167     def termitate(self):
    168         """Terminate processes"""
    169         self.close()
    170         if self._poll_rpc_server_ps:
    171             self._poll_rpc_server_ps.terminate()
    172 
    173         if self._update_shards_ps:
    174             self._update_shards_ps.terminate()
    175 
    176         if self._manager:
    177             self._manager.shutdown()
    178 
    179 
    180     def join(self, timeout=None):
    181         """Blocking call until closed and processes complete
    182 
    183         @param timeout: passed to each process, so could be >timeout"""
    184         if self._poll_rpc_server_ps:
    185             self._poll_rpc_server_ps.join(timeout)
    186 
    187         if self._update_shards_ps:
    188             self._update_shards_ps.join(timeout)
    189 
    190 def _failed(fields, msg_str, reason, err=None):
    191     """Mark current run failed
    192 
    193     @param fields, ts_mon fields to mark as failed
    194     @param msg_str, message string to be filled
    195     @param reason: why it failed
    196     @param err: optional error to log more debug info
    197     """
    198     fields['success'] = False
    199     fields['failure_reason'] = reason
    200     logging.warning("%s failed - %s", msg_str, reason)
    201     if err:
    202         logging.debug("%s fail_err - %s", msg_str, str(err))
    203 
    204 class AfeMonitor(object):
    205     """Object that runs rpc calls against the given afe frontend"""
    206 
    207     def __init__(self, hostname):
    208         """
    209         @param hostname: hostname of server to monitor, string
    210         """
    211         self._hostname = hostname
    212         self._afe = frontend.AFE(server=self._hostname)
    213         self._metric_fields = {'target_hostname': self._hostname}
    214 
    215 
    216     def run_cmd(self, cmd, expected=None):
    217         """Runs rpc command and log metrics
    218 
    219         @param cmd: string of rpc command to send
    220         @param expected: expected result of rpc
    221         """
    222         metric_fields = self._metric_fields.copy()
    223         metric_fields['command'] = cmd
    224         metric_fields['success'] = True
    225         metric_fields['failure_reason'] = ''
    226 
    227         with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
    228                 fields=dict(metric_fields), scale=0.001) as f:
    229 
    230             msg_str = "%s:%s" % (self._hostname, cmd)
    231 
    232 
    233             try:
    234                 result = self._afe.run(cmd)
    235                 logging.debug("%s result = %s", msg_str, result)
    236                 if expected is not None and expected != result:
    237                     _failed(f, msg_str, 'IncorrectResponse')
    238 
    239             except urllib2.HTTPError as e:
    240                 _failed(f, msg_str, 'HTTPError:%d' % e.code)
    241 
    242             except Exception as e:
    243                 _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
    244                         err=e)
    245 
    246                 if type(e) not in FAILURE_REASONS:
    247                     raise
    248 
    249             if f['success']:
    250                 logging.info("%s success", msg_str)
    251 
    252 
    253     def run(self):
    254         """Tests server and returns the result"""
    255         self.run_cmd('get_server_time')
    256         self.run_cmd('ping_db', [True])
    257 
    258 
    259 def get_parser():
    260     """Returns argparse parser"""
    261     parser = commandline.ArgumentParser(description=__doc__)
    262 
    263     parser.add_argument('-a', '--afe', action='append', default=[],
    264                         help='Autotest FrontEnd server to monitor')
    265 
    266     parser.add_argument('-p', '--poll-period', type=int, default=60,
    267                         help='Frequency to poll AFE servers')
    268 
    269     parser.add_argument('--no-shards', action='store_false', dest='with_shards',
    270                         help='Disable shard updating')
    271 
    272     return parser
    273 
    274 
    275 def main(argv):
    276     """Main function
    277 
    278     @param argv: commandline arguments passed
    279     """
    280     parser = get_parser()
    281     options = parser.parse_args(argv[1:])
    282 
    283 
    284     if not options.afe:
    285         options.afe = [global_config.global_config.get_config_value(
    286                         'SERVER', 'global_afe_hostname', default='cautotest')]
    287 
    288     with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
    289                                              indirect=True):
    290         flight_recorder = RpcFlightRecorder(options.afe,
    291                                             with_shards=options.with_shards,
    292                                             poll_period=options.poll_period)
    293 
    294         flight_recorder.start()
    295         flight_recorder.join()
    296 
    297 
    298 if __name__ == '__main__':
    299     main(sys.argv)
    300