Home | History | Annotate | Download | only in shard
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 import argparse
      9 import httplib
     10 import logging
     11 import os
     12 import random
     13 import signal
     14 import time
     15 import urllib2
     16 
     17 import common
     18 
     19 from autotest_lib.frontend import setup_django_environment
     20 from autotest_lib.frontend.afe.json_rpc import proxy
     21 from autotest_lib.client.common_lib import error
     22 from autotest_lib.client.common_lib import global_config
     23 from autotest_lib.frontend.afe import models
     24 from autotest_lib.scheduler import email_manager
     25 from autotest_lib.scheduler import scheduler_lib
     26 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     27 from autotest_lib.server import utils as server_utils
     28 from chromite.lib import timeout_util
     29 from django.db import transaction
     30 
     31 try:
     32     from chromite.lib import metrics
     33     from chromite.lib import ts_mon_config
     34 except ImportError:
     35     metrics = server_utils.metrics_mock
     36     ts_mon_config = server_utils.metrics_mock
     37 
     38 
     39 """
     40 Autotest shard client
     41 
     42 The shard client can be run as standalone service. It periodically polls the
     43 master in a heartbeat, retrieves new jobs and hosts and inserts them into the
     44 local database.
     45 
     46 A shard is set up (by a human) and pointed to the global AFE (cautotest).
     47 On the shard, this script periodically makes so called heartbeat requests to the
     48 global AFE, which will then complete the following actions:
     49 
     50 1. Find the previously created (with atest) record for the shard. Shards are
     51    identified by their hostnames, specified in the shadow_config.
     52 2. Take the records that were sent in the heartbeat and insert them into the
     53    global database.
     54    - This is to set the status of jobs to completed in the master database after
     55      they were run by a slave. This is necessary so one can just look at the
     56      master's afe to see the statuses of all jobs. Otherwise one would have to
     57      check the tko tables or the individual slave AFEs.
     58 3. Find labels that have been assigned to this shard.
     59 4. Assign hosts that:
     60    - have the specified label
     61    - aren't leased
     62    - have an id which is not in the known_host_ids which were sent in the
     63      heartbeat request.
     64 5. Assign jobs that:
     65    - depend on the specified label
     66    - haven't been assigned before
     67    - aren't started yet
     68    - aren't completed yet
     69    - have an id which is not in the jobs_known_ids which were sent in the
     70      heartbeat request.
     71 6. Serialize the chosen jobs and hosts.
     72    - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
     73      and many more. Details about this can be found around
     74      model_logic.serialize()
     75 7. Send these objects to the slave.
     76 
     77 
     78 On the client side, this will happen:
     79 1. Deserialize the objects sent from the master and persist them to the local
     80    database.
     81 2. monitor_db on the shard will pick up these jobs and schedule them on the
     82    available hosts (which were retrieved from a heartbeat).
     83 3. Once a job is finished, it's shard_id is set to NULL
     84 4. The shard_client will pick up all jobs where shard_id=NULL and will
     85    send them to the master in the request of the next heartbeat.
     86    - The master will persist them as described earlier.
     87    - the shard_id will be set back to the shard's id, so the record won't be
     88      uploaded again.
     89    The heartbeat request will also contain the ids of incomplete jobs and the
     90    ids of all hosts. This is used to not send objects repeatedly. For more
     91    information on this and alternatives considered
     92    see rpc_interface.shard_heartbeat.
     93 """
     94 
     95 
     96 HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
     97 _METRICS_PREFIX  = 'chromeos/autotest/shard_client/heartbeat/'
     98 
     99 RPC_TIMEOUT_MIN = 5
    100 RPC_DELAY_SEC = 5
    101 
    102 _heartbeat_client = None
    103 
    104 
    105 class ShardClient(object):
    106     """Performs client side tasks of sharding, i.e. the heartbeat.
    107 
    108     This class contains the logic to do periodic heartbeats to a global AFE,
    109     to retrieve new jobs from it and to report completed jobs back.
    110     """
    111 
    112     def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
    113         self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
    114                                                  timeout_min=RPC_TIMEOUT_MIN,
    115                                                  delay_sec=RPC_DELAY_SEC)
    116         self.hostname = shard_hostname
    117         self.tick_pause_sec = tick_pause_sec
    118         self._shutdown_requested = False
    119         self._shard = None
    120 
    121 
    122     def _deserialize_many(self, serialized_list, djmodel, message):
    123         """Deserialize data in JSON format to database.
    124 
    125         Deserialize a list of JSON-formatted data to database using Django.
    126 
    127         @param serialized_list: A list of JSON-formatted data.
    128         @param djmodel: Django model type.
    129         @param message: A string to be used in a logging message.
    130         """
    131         for serialized in serialized_list:
    132             with transaction.commit_on_success():
    133                 try:
    134                     djmodel.deserialize(serialized)
    135                 except Exception as e:
    136                     logging.error('Deserializing a %s fails: %s, Error: %s',
    137                                   message, serialized, e)
    138                     metrics.Counter(
    139                         'chromeos/autotest/shard_client/deserialization_failed'
    140                         ).increment()
    141 
    142 
    143     @metrics.SecondsTimerDecorator(
    144             'chromeos/autotest/shard_client/heartbeat_response_duration')
    145     def process_heartbeat_response(self, heartbeat_response):
    146         """Save objects returned by a heartbeat to the local database.
    147 
    148         This deseralizes hosts and jobs including their dependencies and saves
    149         them to the local database.
    150 
    151         @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
    152                                    as returned by the `shard_heartbeat` rpc
    153                                    call.
    154         """
    155         hosts_serialized = heartbeat_response['hosts']
    156         jobs_serialized = heartbeat_response['jobs']
    157         suite_keyvals_serialized = heartbeat_response['suite_keyvals']
    158         incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', [])
    159 
    160         metrics.Gauge('chromeos/autotest/shard_client/hosts_received'
    161                       ).set(len(hosts_serialized))
    162         metrics.Gauge('chromeos/autotest/shard_client/jobs_received'
    163                       ).set(len(jobs_serialized))
    164         metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received'
    165                       ).set(len(suite_keyvals_serialized))
    166 
    167         self._deserialize_many(hosts_serialized, models.Host, 'host')
    168         self._deserialize_many(jobs_serialized, models.Job, 'job')
    169         self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
    170                                'jobkeyval')
    171 
    172         host_ids = [h['id'] for h in hosts_serialized]
    173         logging.info('Heartbeat response contains hosts %s', host_ids)
    174         job_ids = [j['id'] for j in jobs_serialized]
    175         logging.info('Heartbeat response contains jobs %s', job_ids)
    176         parent_jobs_with_keyval = set([kv['job_id']
    177                                        for kv in suite_keyvals_serialized])
    178         logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
    179                      list(parent_jobs_with_keyval))
    180         if incorrect_host_ids:
    181             logging.info('Heartbeat response contains incorrect_host_ids %s '
    182                          'which will be deleted.', incorrect_host_ids)
    183             self._remove_incorrect_hosts(incorrect_host_ids)
    184 
    185         # If the master has just sent any jobs that we think have completed,
    186         # re-sync them with the master. This is especially useful when a
    187         # heartbeat or job is silently dropped, as the next heartbeat will
    188         # have a disagreement. Updating the shard_id to NULL will mark these
    189         # jobs for upload on the next heartbeat.
    190         job_models = models.Job.objects.filter(
    191                 id__in=job_ids, hostqueueentry__complete=True)
    192         if job_models:
    193             job_models.update(shard=None)
    194             job_ids_repr = ', '.join([str(job.id) for job in job_models])
    195             logging.warn('Following completed jobs are reset shard_id to NULL '
    196                          'to be uploaded to master again: %s', job_ids_repr)
    197 
    198 
    199     def _remove_incorrect_hosts(self, incorrect_host_ids=None):
    200         """Remove from local database any hosts that should not exist.
    201 
    202         Entries of |incorrect_host_ids| that are absent from database will be
    203         silently ignored.
    204 
    205         @param incorrect_host_ids: a list of ids (as integers) to remove.
    206         """
    207         if not incorrect_host_ids:
    208             return
    209 
    210         models.Host.objects.filter(id__in=incorrect_host_ids).delete()
    211 
    212 
    213     @property
    214     def shard(self):
    215         """Return this shard's own shard object, fetched from the database.
    216 
    217         A shard's object is fetched from the master with the first jobs. It will
    218         not exist before that time.
    219 
    220         @returns: The shard object if it already exists, otherwise None
    221         """
    222         if self._shard is None:
    223             try:
    224                 self._shard = models.Shard.smart_get(self.hostname)
    225             except models.Shard.DoesNotExist:
    226                 # This might happen before any jobs are assigned to this shard.
    227                 # This is okay because then there is nothing to offload anyway.
    228                 pass
    229         return self._shard
    230 
    231 
    232     def _get_jobs_to_upload(self):
    233         jobs = []
    234         # The scheduler sets shard to None upon completion of the job.
    235         # For more information on the shard field's semantic see
    236         # models.Job.shard. We need to be careful to wait for both the
    237         # shard_id and the complete bit here, or we will end up syncing
    238         # the job without ever setting the complete bit.
    239         job_ids = list(models.Job.objects.filter(
    240             shard=None,
    241             hostqueueentry__complete=True).values_list('pk', flat=True))
    242 
    243         for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
    244             jobs.append(job_to_upload)
    245         return jobs
    246 
    247 
    248     def _mark_jobs_as_uploaded(self, job_ids):
    249         # self.shard might be None if no jobs were downloaded yet.
    250         # But then job_ids is empty, so this is harmless.
    251         # Even if there were jobs we'd in the worst case upload them twice.
    252         models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
    253 
    254 
    255     def _get_hqes_for_jobs(self, jobs):
    256         hqes = []
    257         for job in jobs:
    258             hqes.extend(job.hostqueueentry_set.all())
    259         return hqes
    260 
    261 
    262     def _get_known_jobs_and_hosts(self):
    263         """Returns lists of host and job info to send in a heartbeat.
    264 
    265         The host and job ids are ids of objects that are already present on the
    266         shard and therefore don't need to be sent again.
    267 
    268         For jobs, only incomplete jobs are sent, as the master won't send
    269         already completed jobs anyway. This helps keeping the list of id's
    270         considerably small.
    271 
    272         For hosts, host status in addition to host id are sent to master
    273         to sync the host status.
    274 
    275         @returns: Tuple of three lists. The first one contains job ids, the
    276                   second one host ids, and the third one host statuses.
    277         """
    278         job_ids = list(models.Job.objects.filter(
    279                 hostqueueentry__complete=False).values_list('id', flat=True))
    280         host_models = models.Host.objects.filter(invalid=0)
    281         host_ids = []
    282         host_statuses = []
    283         for h in host_models:
    284             host_ids.append(h.id)
    285             host_statuses.append(h.status)
    286         return job_ids, host_ids, host_statuses
    287 
    288 
    289     def _heartbeat_packet(self):
    290         """Construct the heartbeat packet.
    291 
    292         See rpc_interface for a more detailed description of the heartbeat.
    293 
    294         @return: A heartbeat packet.
    295         """
    296         known_job_ids, known_host_ids, known_host_statuses = (
    297                 self._get_known_jobs_and_hosts())
    298         logging.info('Known jobs: %s', known_job_ids)
    299 
    300         job_objs = self._get_jobs_to_upload()
    301         hqes = [hqe.serialize(include_dependencies=False)
    302                 for hqe in self._get_hqes_for_jobs(job_objs)]
    303         jobs = [job.serialize(include_dependencies=False) for job in job_objs]
    304         logging.info('Uploading jobs %s', [j['id'] for j in jobs])
    305 
    306         return {'shard_hostname': self.hostname,
    307                 'known_job_ids': known_job_ids,
    308                 'known_host_ids': known_host_ids,
    309                 'known_host_statuses': known_host_statuses,
    310                 'jobs': jobs, 'hqes': hqes}
    311 
    312 
    313     def _report_packet_metrics(self, packet):
    314         """Report stats about outgoing packet to monarch."""
    315         metrics.Gauge(_METRICS_PREFIX + 'known_job_ids_count').set(
    316             len(packet['known_job_ids']))
    317         metrics.Gauge(_METRICS_PREFIX + 'jobs_upload_count').set(
    318             len(packet['jobs']))
    319         metrics.Gauge(_METRICS_PREFIX + 'known_host_ids_count').set(
    320             len(packet['known_host_ids']))
    321 
    322 
    323     def _heartbeat_failure(self, log_message, failure_type_str=''):
    324         logging.error("Heartbeat failed. %s", log_message)
    325         metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure'
    326                         ).increment(fields={'failure_type': failure_type_str})
    327 
    328 
    329     @metrics.SecondsTimerDecorator(
    330             'chromeos/autotest/shard_client/do_heatbeat_duration')
    331     def do_heartbeat(self):
    332         """Perform a heartbeat: Retreive new jobs.
    333 
    334         This function executes a `shard_heartbeat` RPC. It retrieves the
    335         response of this call and processes the response by storing the returned
    336         objects in the local database.
    337 
    338         Returns: True if the heartbeat ran successfully, False otherwise.
    339         """
    340 
    341         logging.info("Performing heartbeat.")
    342         packet = self._heartbeat_packet()
    343         self._report_packet_metrics(packet)
    344         metrics.Gauge(_METRICS_PREFIX + 'request_size').set(
    345             len(str(packet)))
    346 
    347         try:
    348             response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
    349         except urllib2.HTTPError as e:
    350             self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason),
    351                                     'HTTPError')
    352             return False
    353         except urllib2.URLError as e:
    354             self._heartbeat_failure('URLError: %s' % e.reason,
    355                                     'URLError')
    356             return False
    357         except httplib.HTTPException as e:
    358             self._heartbeat_failure('HTTPException: %s' % e,
    359                                     'HTTPException')
    360             return False
    361         except timeout_util.TimeoutError as e:
    362             self._heartbeat_failure('TimeoutError: %s' % e,
    363                                     'TimeoutError')
    364             return False
    365         except proxy.JSONRPCException as e:
    366             self._heartbeat_failure('JSONRPCException: %s' % e,
    367                                     'JSONRPCException')
    368             return False
    369 
    370         metrics.Gauge(_METRICS_PREFIX + 'response_size').set(
    371             len(str(response)))
    372         self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
    373         self.process_heartbeat_response(response)
    374         logging.info("Heartbeat completed.")
    375         return True
    376 
    377 
    378     def tick(self):
    379         """Performs all tasks the shard clients needs to do periodically."""
    380         success = self.do_heartbeat()
    381         if success:
    382             metrics.Counter('chromeos/autotest/shard_client/tick').increment()
    383 
    384 
    385     def loop(self, lifetime_hours):
    386         """Calls tick() until shutdown() is called or lifetime expires.
    387 
    388         @param lifetime_hours: (int) hours to loop for.
    389         """
    390         loop_start_time = time.time()
    391         while self._continue_looping(lifetime_hours, loop_start_time):
    392             self.tick()
    393             # Sleep with +/- 10% fuzzing to avoid phaselock of shards.
    394             tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5)
    395             time.sleep(self.tick_pause_sec + tick_fuzz)
    396 
    397 
    398     def shutdown(self):
    399         """Stops the shard client after the current tick."""
    400         logging.info("Shutdown request received.")
    401         self._shutdown_requested = True
    402 
    403 
    404     def _continue_looping(self, lifetime_hours, loop_start_time):
    405         """Determines if we should continue with the next mainloop iteration.
    406 
    407         @param lifetime_hours: (float) number of hours to loop for. None
    408                 implies no deadline.
    409         @param process_start_time: Time when we started looping.
    410         @returns True if we should continue looping, False otherwise.
    411         """
    412         if self._shutdown_requested:
    413             return False
    414 
    415         if (lifetime_hours is None
    416             or time.time() - loop_start_time < lifetime_hours * 3600):
    417             return True
    418         logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
    419                      lifetime_hours)
    420         return False
    421 
    422 
    423 def handle_signal(signum, frame):
    424     """Sigint handler so we don't crash mid-tick."""
    425     _heartbeat_client.shutdown()
    426 
    427 
    428 def _get_shard_hostname_and_ensure_running_on_shard():
    429     """Read the hostname the local shard from the global configuration.
    430 
    431     Raise an exception if run from elsewhere than a shard.
    432 
    433     @raises error.HeartbeatOnlyAllowedInShardModeException if run from
    434             elsewhere than from a shard.
    435     """
    436     hostname = global_config.global_config.get_config_value(
    437         'SHARD', 'shard_hostname', default=None)
    438     if not hostname:
    439         raise error.HeartbeatOnlyAllowedInShardModeException(
    440             'To run the shard client, shard_hostname must neither be None nor '
    441             'empty.')
    442     return hostname
    443 
    444 
    445 def _get_tick_pause_sec():
    446     """Read pause to make between two ticks from the global configuration."""
    447     return global_config.global_config.get_config_value(
    448         'SHARD', 'heartbeat_pause_sec', type=float)
    449 
    450 
    451 def get_shard_client():
    452     """Instantiate a shard client instance.
    453 
    454     Configuration values will be read from the global configuration.
    455 
    456     @returns A shard client instance.
    457     """
    458     global_afe_hostname = server_utils.get_global_afe_hostname()
    459     shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
    460     tick_pause_sec = _get_tick_pause_sec()
    461     return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
    462 
    463 
    464 def main():
    465     parser = argparse.ArgumentParser(description='Shard client.')
    466     parser.add_argument(
    467             '--lifetime-hours',
    468             type=float,
    469             default=None,
    470             help='If provided, number of hours we should run for. '
    471                  'At the expiry of this time, the process will exit '
    472                  'gracefully.',
    473     )
    474     parser.add_argument(
    475             '--metrics-file',
    476             help='If provided, drop metrics to this local file instead of '
    477                  'reporting to ts_mon',
    478             type=str,
    479             default=None,
    480     )
    481     options = parser.parse_args()
    482 
    483     with ts_mon_config.SetupTsMonGlobalState(
    484           'shard_client',
    485           indirect=True,
    486           debug_file=options.metrics_file,
    487     ):
    488         try:
    489             metrics.Counter('chromeos/autotest/shard_client/start').increment()
    490             main_without_exception_handling(options)
    491         except Exception as e:
    492             metrics.Counter('chromeos/autotest/shard_client/uncaught_exception'
    493                             ).increment()
    494             message = 'Uncaught exception. Terminating shard_client.'
    495             email_manager.manager.log_stacktrace(message)
    496             logging.exception(message)
    497             raise
    498         finally:
    499             email_manager.manager.send_queued_emails()
    500 
    501 
    502 def main_without_exception_handling(options):
    503     scheduler_lib.setup_logging(
    504             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    505             None, timestamped_logfile_prefix='shard_client')
    506 
    507     logging.info("Setting signal handler.")
    508     signal.signal(signal.SIGINT, handle_signal)
    509     signal.signal(signal.SIGTERM, handle_signal)
    510 
    511     logging.info("Starting shard client.")
    512     global _heartbeat_client
    513     _heartbeat_client = get_shard_client()
    514     _heartbeat_client.loop(options.lifetime_hours)
    515 
    516 
    517 if __name__ == '__main__':
    518     main()
    519