Home | History | Annotate | Download | only in shard
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 import argparse
      9 import httplib
     10 import logging
     11 import os
     12 import signal
     13 import socket
     14 import time
     15 import urllib2
     16 
     17 import common
     18 from autotest_lib.frontend import setup_django_environment
     19 from autotest_lib.client.common_lib import error
     20 from autotest_lib.client.common_lib import global_config
     21 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     22 from autotest_lib.frontend.afe import models
     23 from autotest_lib.scheduler import email_manager
     24 from autotest_lib.scheduler import scheduler_lib
     25 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     26 from autotest_lib.server import utils as server_utils
     27 from chromite.lib import timeout_util
     28 from django.db import transaction
     29 
     30 """
     31 Autotest shard client
     32 
     33 The shard client can be run as standalone service. It periodically polls the
     34 master in a heartbeat, retrieves new jobs and hosts and inserts them into the
     35 local database.
     36 
     37 A shard is set up (by a human) and pointed to the global AFE (cautotest).
     38 On the shard, this script periodically makes so called heartbeat requests to the
     39 global AFE, which will then complete the following actions:
     40 
     41 1. Find the previously created (with atest) record for the shard. Shards are
     42    identified by their hostnames, specified in the shadow_config.
     43 2. Take the records that were sent in the heartbeat and insert them into the
     44    global database.
     45    - This is to set the status of jobs to completed in the master database after
     46      they were run by a slave. This is necessary so one can just look at the
     47      master's afe to see the statuses of all jobs. Otherwise one would have to
     48      check the tko tables or the individual slave AFEs.
     49 3. Find labels that have been assigned to this shard.
     50 4. Assign hosts that:
     51    - have the specified label
     52    - aren't leased
     53    - have an id which is not in the known_host_ids which were sent in the
     54      heartbeat request.
     55 5. Assign jobs that:
     56    - depend on the specified label
     57    - haven't been assigned before
     58    - aren't started yet
     59    - aren't completed yet
     60    - have an id which is not in the jobs_known_ids which were sent in the
     61      heartbeat request.
     62 6. Serialize the chosen jobs and hosts.
     63    - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
     64      and many more. Details about this can be found around
     65      model_logic.serialize()
     66 7. Send these objects to the slave.
     67 
     68 
     69 On the client side, this will happen:
     70 1. Deserialize the objects sent from the master and persist them to the local
     71    database.
     72 2. monitor_db on the shard will pick up these jobs and schedule them on the
     73    available hosts (which were retrieved from a heartbeat).
     74 3. Once a job is finished, it's shard_id is set to NULL
     75 4. The shard_client will pick up all jobs where shard_id=NULL and will
     76    send them to the master in the request of the next heartbeat.
     77    - The master will persist them as described earlier.
     78    - the shard_id will be set back to the shard's id, so the record won't be
     79      uploaded again.
     80    The heartbeat request will also contain the ids of incomplete jobs and the
     81    ids of all hosts. This is used to not send objects repeatedly. For more
     82    information on this and alternatives considered
     83    see site_rpc_interface.shard_heartbeat.
     84 """
     85 
     86 
     87 HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
     88 
     89 RPC_TIMEOUT_MIN = 5
     90 RPC_DELAY_SEC = 5
     91 
     92 STATS_KEY = 'shard_client.%s' % socket.gethostname()
     93 timer = autotest_stats.Timer(STATS_KEY)
     94 _heartbeat_client = None
     95 
     96 
     97 class ShardClient(object):
     98     """Performs client side tasks of sharding, i.e. the heartbeat.
     99 
    100     This class contains the logic to do periodic heartbeats to a global AFE,
    101     to retrieve new jobs from it and to report completed jobs back.
    102     """
    103 
    104     def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
    105         self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
    106                                                  timeout_min=RPC_TIMEOUT_MIN,
    107                                                  delay_sec=RPC_DELAY_SEC)
    108         self.hostname = shard_hostname
    109         self.tick_pause_sec = tick_pause_sec
    110         self._shutdown = False
    111         self._shard = None
    112 
    113 
    114     def _deserialize_many(self, serialized_list, djmodel, message):
    115         """Deserialize data in JSON format to database.
    116 
    117         Deserialize a list of JSON-formatted data to database using Django.
    118 
    119         @param serialized_list: A list of JSON-formatted data.
    120         @param djmodel: Django model type.
    121         @param message: A string to be used in a logging message.
    122         """
    123         for serialized in serialized_list:
    124             with transaction.commit_on_success():
    125                 try:
    126                     djmodel.deserialize(serialized)
    127                 except Exception as e:
    128                     logging.error('Deserializing a %s fails: %s, Error: %s',
    129                                   message, serialized, e)
    130                     autotest_stats.Counter(STATS_KEY).increment(
    131                             'deserialization_failures')
    132 
    133 
    134     @timer.decorate
    135     def process_heartbeat_response(self, heartbeat_response):
    136         """Save objects returned by a heartbeat to the local database.
    137 
    138         This deseralizes hosts and jobs including their dependencies and saves
    139         them to the local database.
    140 
    141         @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
    142                                    as returned by the `shard_heartbeat` rpc
    143                                    call.
    144         """
    145         hosts_serialized = heartbeat_response['hosts']
    146         jobs_serialized = heartbeat_response['jobs']
    147         suite_keyvals_serialized = heartbeat_response['suite_keyvals']
    148 
    149         autotest_stats.Gauge(STATS_KEY).send(
    150                 'hosts_received', len(hosts_serialized))
    151         autotest_stats.Gauge(STATS_KEY).send(
    152                 'jobs_received', len(jobs_serialized))
    153         autotest_stats.Gauge(STATS_KEY).send(
    154                 'suite_keyvals_received', len(suite_keyvals_serialized))
    155 
    156         self._deserialize_many(hosts_serialized, models.Host, 'host')
    157         self._deserialize_many(jobs_serialized, models.Job, 'job')
    158         self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
    159                                'jobkeyval')
    160 
    161         host_ids = [h['id'] for h in hosts_serialized]
    162         logging.info('Heartbeat response contains hosts %s', host_ids)
    163         job_ids = [j['id'] for j in jobs_serialized]
    164         logging.info('Heartbeat response contains jobs %s', job_ids)
    165         parent_jobs_with_keyval = set([kv['job_id']
    166                                        for kv in suite_keyvals_serialized])
    167         logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
    168                      list(parent_jobs_with_keyval))
    169 
    170         # If the master has just sent any jobs that we think have completed,
    171         # re-sync them with the master. This is especially useful when a
    172         # heartbeat or job is silently dropped, as the next heartbeat will
    173         # have a disagreement. Updating the shard_id to NULL will mark these
    174         # jobs for upload on the next heartbeat.
    175         job_models = models.Job.objects.filter(
    176                 id__in=job_ids, hostqueueentry__complete=True)
    177         if job_models:
    178             job_models.update(shard=None)
    179             job_ids_repr = ', '.join([str(job.id) for job in job_models])
    180             logging.warn('Following completed jobs are reset shard_id to NULL '
    181                          'to be uploaded to master again: %s', job_ids_repr)
    182 
    183 
    184     @property
    185     def shard(self):
    186         """Return this shard's own shard object, fetched from the database.
    187 
    188         A shard's object is fetched from the master with the first jobs. It will
    189         not exist before that time.
    190 
    191         @returns: The shard object if it already exists, otherwise None
    192         """
    193         if self._shard is None:
    194             try:
    195                 self._shard = models.Shard.smart_get(self.hostname)
    196             except models.Shard.DoesNotExist:
    197                 # This might happen before any jobs are assigned to this shard.
    198                 # This is okay because then there is nothing to offload anyway.
    199                 pass
    200         return self._shard
    201 
    202 
    203     def _get_jobs_to_upload(self):
    204         jobs = []
    205         # The scheduler sets shard to None upon completion of the job.
    206         # For more information on the shard field's semantic see
    207         # models.Job.shard. We need to be careful to wait for both the
    208         # shard_id and the complete bit here, or we will end up syncing
    209         # the job without ever setting the complete bit.
    210         job_ids = list(models.Job.objects.filter(
    211             shard=None,
    212             hostqueueentry__complete=True).values_list('pk', flat=True))
    213 
    214         for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
    215             jobs.append(job_to_upload)
    216         return jobs
    217 
    218 
    219     def _mark_jobs_as_uploaded(self, job_ids):
    220         # self.shard might be None if no jobs were downloaded yet.
    221         # But then job_ids is empty, so this is harmless.
    222         # Even if there were jobs we'd in the worst case upload them twice.
    223         models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
    224 
    225 
    226     def _get_hqes_for_jobs(self, jobs):
    227         hqes = []
    228         for job in jobs:
    229             hqes.extend(job.hostqueueentry_set.all())
    230         return hqes
    231 
    232 
    233     def _get_known_jobs_and_hosts(self):
    234         """Returns lists of host and job info to send in a heartbeat.
    235 
    236         The host and job ids are ids of objects that are already present on the
    237         shard and therefore don't need to be sent again.
    238 
    239         For jobs, only incomplete jobs are sent, as the master won't send
    240         already completed jobs anyway. This helps keeping the list of id's
    241         considerably small.
    242 
    243         For hosts, host status in addition to host id are sent to master
    244         to sync the host status.
    245 
    246         @returns: Tuple of three lists. The first one contains job ids, the
    247                   second one host ids, and the third one host statuses.
    248         """
    249         job_ids = list(models.Job.objects.filter(
    250                 hostqueueentry__complete=False).values_list('id', flat=True))
    251         host_models = models.Host.objects.filter(invalid=0)
    252         host_ids = []
    253         host_statuses = []
    254         for h in host_models:
    255             host_ids.append(h.id)
    256             host_statuses.append(h.status)
    257         return job_ids, host_ids, host_statuses
    258 
    259 
    260     def _heartbeat_packet(self):
    261         """Construct the heartbeat packet.
    262 
    263         See site_rpc_interface for a more detailed description of the heartbeat.
    264 
    265         @return: A heartbeat packet.
    266         """
    267         known_job_ids, known_host_ids, known_host_statuses = (
    268                 self._get_known_jobs_and_hosts())
    269         logging.info('Known jobs: %s', known_job_ids)
    270 
    271         job_objs = self._get_jobs_to_upload()
    272         hqes = [hqe.serialize(include_dependencies=False)
    273                 for hqe in self._get_hqes_for_jobs(job_objs)]
    274         jobs = [job.serialize(include_dependencies=False) for job in job_objs]
    275         logging.info('Uploading jobs %s', [j['id'] for j in jobs])
    276 
    277         return {'shard_hostname': self.hostname,
    278                 'known_job_ids': known_job_ids,
    279                 'known_host_ids': known_host_ids,
    280                 'known_host_statuses': known_host_statuses,
    281                 'jobs': jobs, 'hqes': hqes}
    282 
    283 
    284     def _heartbeat_failure(self, log_message):
    285         logging.error("Heartbeat failed. %s", log_message)
    286         autotest_stats.Counter(STATS_KEY).increment('heartbeat_failures')
    287 
    288 
    289     @timer.decorate
    290     def do_heartbeat(self):
    291         """Perform a heartbeat: Retreive new jobs.
    292 
    293         This function executes a `shard_heartbeat` RPC. It retrieves the
    294         response of this call and processes the response by storing the returned
    295         objects in the local database.
    296         """
    297         logging.info("Performing heartbeat.")
    298         packet = self._heartbeat_packet()
    299         autotest_stats.Gauge(STATS_KEY).send(
    300                 'heartbeat.request_size', len(str(packet)))
    301 
    302         try:
    303             response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
    304         except urllib2.HTTPError as e:
    305             self._heartbeat_failure("HTTPError %d: %s" % (e.code, e.reason))
    306             return
    307         except urllib2.URLError as e:
    308             self._heartbeat_failure("URLError: %s" % e.reason)
    309             return
    310         except httplib.HTTPException as e:
    311             self._heartbeat_failure("HTTPException: %s" % e)
    312             return
    313         except timeout_util.TimeoutError as e:
    314             self._heartbeat_failure("TimeoutError: %s" % e)
    315             return
    316 
    317         autotest_stats.Gauge(STATS_KEY).send(
    318                 'heartbeat.response_size', len(str(response)))
    319         self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
    320         self.process_heartbeat_response(response)
    321         logging.info("Heartbeat completed.")
    322 
    323 
    324     def tick(self):
    325         """Performs all tasks the shard clients needs to do periodically."""
    326         self.do_heartbeat()
    327 
    328 
    329     def loop(self):
    330         """Calls tick() until shutdown() is called."""
    331         while not self._shutdown:
    332             self.tick()
    333             time.sleep(self.tick_pause_sec)
    334 
    335 
    336     def shutdown(self):
    337         """Stops the shard client after the current tick."""
    338         logging.info("Shutdown request received.")
    339         self._shutdown = True
    340 
    341 
    342 def handle_signal(signum, frame):
    343     """Sigint handler so we don't crash mid-tick."""
    344     _heartbeat_client.shutdown()
    345 
    346 
    347 def _get_shard_hostname_and_ensure_running_on_shard():
    348     """Read the hostname the local shard from the global configuration.
    349 
    350     Raise an exception if run from elsewhere than a shard.
    351 
    352     @raises error.HeartbeatOnlyAllowedInShardModeException if run from
    353             elsewhere than from a shard.
    354     """
    355     hostname = global_config.global_config.get_config_value(
    356         'SHARD', 'shard_hostname', default=None)
    357     if not hostname:
    358         raise error.HeartbeatOnlyAllowedInShardModeException(
    359             'To run the shard client, shard_hostname must neither be None nor '
    360             'empty.')
    361     return hostname
    362 
    363 
    364 def _get_tick_pause_sec():
    365     """Read pause to make between two ticks from the global configuration."""
    366     return global_config.global_config.get_config_value(
    367         'SHARD', 'heartbeat_pause_sec', type=float)
    368 
    369 
    370 def get_shard_client():
    371     """Instantiate a shard client instance.
    372 
    373     Configuration values will be read from the global configuration.
    374 
    375     @returns A shard client instance.
    376     """
    377     global_afe_hostname = server_utils.get_global_afe_hostname()
    378     shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
    379     tick_pause_sec = _get_tick_pause_sec()
    380     return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
    381 
    382 
    383 def main():
    384     try:
    385         autotest_stats.Counter(STATS_KEY).increment('starts')
    386         main_without_exception_handling()
    387     except Exception as e:
    388         message = 'Uncaught exception. Terminating shard_client.'
    389         email_manager.manager.log_stacktrace(message)
    390         logging.exception(message)
    391         autotest_stats.Counter(STATS_KEY).increment('uncaught_exceptions')
    392         raise
    393     finally:
    394         email_manager.manager.send_queued_emails()
    395 
    396 
    397 def main_without_exception_handling():
    398     parser = argparse.ArgumentParser(description='Shard client.')
    399     options = parser.parse_args()
    400 
    401     scheduler_lib.setup_logging(
    402             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    403             None, timestamped_logfile_prefix='shard_client')
    404 
    405     logging.info("Setting signal handler.")
    406     signal.signal(signal.SIGINT, handle_signal)
    407     signal.signal(signal.SIGTERM, handle_signal)
    408 
    409     logging.info("Starting shard client.")
    410     global _heartbeat_client
    411     _heartbeat_client = get_shard_client()
    412     _heartbeat_client.loop()
    413 
    414 
    415 if __name__ == '__main__':
    416     main()
    417