Home | History | Annotate | Download | only in shard
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 import argparse
      9 import datetime
     10 import httplib
     11 import logging
     12 import os
     13 import random
     14 import signal
     15 import time
     16 import urllib2
     17 
     18 import common
     19 
     20 from autotest_lib.frontend import setup_django_environment
     21 from autotest_lib.frontend.afe.json_rpc import proxy
     22 from autotest_lib.client.common_lib import error
     23 from autotest_lib.client.common_lib import global_config
     24 from autotest_lib.frontend.afe import models
     25 from autotest_lib.scheduler import email_manager
     26 from autotest_lib.scheduler import scheduler_lib
     27 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     28 from autotest_lib.server import utils as server_utils
     29 from chromite.lib import timeout_util
     30 from django.core.exceptions import MultipleObjectsReturned
     31 from django.db import transaction
     32 
     33 try:
     34     from chromite.lib import metrics
     35     from chromite.lib import ts_mon_config
     36     from infra_libs import ts_mon
     37 except ImportError:
     38     metrics = server_utils.metrics_mock
     39     ts_mon_config = server_utils.metrics_mock
     40 
     41 
     42 """
     43 Autotest shard client
     44 
     45 The shard client can be run as standalone service. It periodically polls the
     46 master in a heartbeat, retrieves new jobs and hosts and inserts them into the
     47 local database.
     48 
     49 A shard is set up (by a human) and pointed to the global AFE (cautotest).
     50 On the shard, this script periodically makes so called heartbeat requests to the
     51 global AFE, which will then complete the following actions:
     52 
     53 1. Find the previously created (with atest) record for the shard. Shards are
     54    identified by their hostnames, specified in the shadow_config.
     55 2. Take the records that were sent in the heartbeat and insert them into the
     56    global database.
     57    - This is to set the status of jobs to completed in the master database after
     58      they were run by a slave. This is necessary so one can just look at the
     59      master's afe to see the statuses of all jobs. Otherwise one would have to
     60      check the tko tables or the individual slave AFEs.
     61 3. Find labels that have been assigned to this shard.
     62 4. Assign hosts that:
     63    - have the specified label
     64    - aren't leased
     65    - have an id which is not in the known_host_ids which were sent in the
     66      heartbeat request.
     67 5. Assign jobs that:
     68    - depend on the specified label
     69    - haven't been assigned before
     70    - aren't started yet
     71    - aren't completed yet
     72    - have an id which is not in the jobs_known_ids which were sent in the
     73      heartbeat request.
     74 6. Serialize the chosen jobs and hosts.
     75    - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
     76      and many more. Details about this can be found around
     77      model_logic.serialize()
     78 7. Send these objects to the slave.
     79 
     80 
     81 On the client side, this will happen:
     82 1. Deserialize the objects sent from the master and persist them to the local
     83    database.
     84 2. monitor_db on the shard will pick up these jobs and schedule them on the
     85    available hosts (which were retrieved from a heartbeat).
     86 3. Once a job is finished, it's shard_id is set to NULL
     87 4. The shard_client will pick up all jobs where shard_id=NULL and will
     88    send them to the master in the request of the next heartbeat.
     89    - The master will persist them as described earlier.
     90    - the shard_id will be set back to the shard's id, so the record won't be
     91      uploaded again.
     92    The heartbeat request will also contain the ids of incomplete jobs and the
     93    ids of all hosts. This is used to not send objects repeatedly. For more
     94    information on this and alternatives considered
     95    see rpc_interface.shard_heartbeat.
     96 """
     97 
     98 
     99 HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
    100 _METRICS_PREFIX  = 'chromeos/autotest/shard_client/heartbeat/'
    101 
    102 RPC_TIMEOUT_MIN = 30
    103 RPC_DELAY_SEC = 5
    104 
    105 # The maximum number of jobs to attempt to upload in a single heartbeat.
    106 MAX_UPLOAD_JOBS = 1000
    107 
    108 _heartbeat_client = None
    109 
    110 
    111 class ShardClient(object):
    112     """Performs client side tasks of sharding, i.e. the heartbeat.
    113 
    114     This class contains the logic to do periodic heartbeats to a global AFE,
    115     to retrieve new jobs from it and to report completed jobs back.
    116     """
    117 
    118     def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
    119         self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
    120                                                  timeout_min=RPC_TIMEOUT_MIN,
    121                                                  delay_sec=RPC_DELAY_SEC)
    122         self.hostname = shard_hostname
    123         self.tick_pause_sec = tick_pause_sec
    124         self._shutdown_requested = False
    125         self._shard = None
    126 
    127 
    128     def _deserialize_many(self, serialized_list, djmodel, message):
    129         """Deserialize data in JSON format to database.
    130 
    131         Deserialize a list of JSON-formatted data to database using Django.
    132 
    133         @param serialized_list: A list of JSON-formatted data or python dict
    134                                 literals.
    135         @param djmodel: Django model type.
    136         @param message: A string to be used in a logging message.
    137         """
    138         logging.info('Deserializing %s %ss', len(serialized_list), message)
    139         i = 0
    140         for serialized in serialized_list:
    141             i += 1
    142             if i % 100 == 1:
    143               logging.info('Progress: at entry %s', i)
    144             with transaction.commit_on_success():
    145                 try:
    146                     djmodel.deserialize(serialized)
    147                 except Exception as e:
    148                     logging.error('Deserializing a %s fails: %s, Error: %s',
    149                                   message, serialized, e)
    150                     metrics.Counter(
    151                         'chromeos/autotest/shard_client/deserialization_failed'
    152                         ).increment()
    153         logging.info('Done deserializing %ss', message)
    154 
    155 
    156     @metrics.SecondsTimerDecorator(
    157             'chromeos/autotest/shard_client/heartbeat_response_duration')
    158     def process_heartbeat_response(self, heartbeat_response):
    159         """Save objects returned by a heartbeat to the local database.
    160 
    161         This deseralizes hosts and jobs including their dependencies and saves
    162         them to the local database.
    163 
    164         @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
    165                                    as returned by the `shard_heartbeat` rpc
    166                                    call.
    167         """
    168         hosts_serialized = heartbeat_response['hosts']
    169         jobs_serialized = heartbeat_response['jobs']
    170         suite_keyvals_serialized = heartbeat_response['suite_keyvals']
    171         incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', [])
    172 
    173         metrics.Gauge('chromeos/autotest/shard_client/hosts_received'
    174                       ).set(len(hosts_serialized))
    175         metrics.Gauge('chromeos/autotest/shard_client/jobs_received'
    176                       ).set(len(jobs_serialized))
    177         metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received'
    178                       ).set(len(suite_keyvals_serialized))
    179 
    180         self._deserialize_many(hosts_serialized, models.Host, 'host')
    181         self._deserialize_many(jobs_serialized, models.Job, 'job')
    182         self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
    183                                'jobkeyval')
    184 
    185         host_ids = [h['id'] for h in hosts_serialized]
    186         logging.info('Heartbeat response contains hosts %s', host_ids)
    187         job_ids = [j['id'] for j in jobs_serialized]
    188         logging.info('Heartbeat response contains jobs %s', job_ids)
    189         parent_jobs_with_keyval = set([kv['job_id']
    190                                        for kv in suite_keyvals_serialized])
    191         logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
    192                      list(parent_jobs_with_keyval))
    193         if incorrect_host_ids:
    194             logging.info('Heartbeat response contains incorrect_host_ids %s '
    195                          'which will be deleted.', incorrect_host_ids)
    196             self._remove_incorrect_hosts(incorrect_host_ids)
    197 
    198         # If the master has just sent any jobs that we think have completed,
    199         # re-sync them with the master. This is especially useful when a
    200         # heartbeat or job is silently dropped, as the next heartbeat will
    201         # have a disagreement. Updating the shard_id to NULL will mark these
    202         # jobs for upload on the next heartbeat.
    203         job_models = models.Job.objects.filter(
    204                 id__in=job_ids, hostqueueentry__complete=True)
    205         if job_models:
    206             job_models.update(shard=None)
    207             job_ids_repr = ', '.join([str(job.id) for job in job_models])
    208             logging.warn('Following completed jobs are reset shard_id to NULL '
    209                          'to be uploaded to master again: %s', job_ids_repr)
    210 
    211 
    212     def _remove_incorrect_hosts(self, incorrect_host_ids=None):
    213         """Remove from local database any hosts that should not exist.
    214 
    215         Entries of |incorrect_host_ids| that are absent from database will be
    216         silently ignored.
    217 
    218         @param incorrect_host_ids: a list of ids (as integers) to remove.
    219         """
    220         if not incorrect_host_ids:
    221             return
    222 
    223         try:
    224             models.Host.objects.filter(id__in=incorrect_host_ids).delete()
    225         except MultipleObjectsReturned as e:
    226             logging.exception('Failed to remove incorrect hosts %s',
    227                               incorrect_host_ids)
    228 
    229 
    230     @property
    231     def shard(self):
    232         """Return this shard's own shard object, fetched from the database.
    233 
    234         A shard's object is fetched from the master with the first jobs. It will
    235         not exist before that time.
    236 
    237         @returns: The shard object if it already exists, otherwise None
    238         """
    239         if self._shard is None:
    240             try:
    241                 self._shard = models.Shard.smart_get(self.hostname)
    242             except models.Shard.DoesNotExist:
    243                 # This might happen before any jobs are assigned to this shard.
    244                 # This is okay because then there is nothing to offload anyway.
    245                 pass
    246         return self._shard
    247 
    248 
    249     def _get_jobs_to_upload(self):
    250         jobs = []
    251         # The scheduler sets shard to None upon completion of the job.
    252         # For more information on the shard field's semantic see
    253         # models.Job.shard. We need to be careful to wait for both the
    254         # shard_id and the complete bit here, or we will end up syncing
    255         # the job without ever setting the complete bit.
    256         job_ids = list(models.Job.objects.filter(
    257             shard=None,
    258             hostqueueentry__complete=True).values_list('pk', flat=True))
    259 
    260         for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
    261             jobs.append(job_to_upload)
    262         return jobs
    263 
    264 
    265     def _mark_jobs_as_uploaded(self, job_ids):
    266         # self.shard might be None if no jobs were downloaded yet.
    267         # But then job_ids is empty, so this is harmless.
    268         # Even if there were jobs we'd in the worst case upload them twice.
    269         models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
    270 
    271 
    272     def _get_hqes_for_jobs(self, jobs):
    273         hqes = []
    274         for job in jobs:
    275             hqes.extend(job.hostqueueentry_set.all())
    276         return hqes
    277 
    278 
    279     def _get_known_jobs_and_hosts(self):
    280         """Returns lists of host and job info to send in a heartbeat.
    281 
    282         The host and job ids are ids of objects that are already present on the
    283         shard and therefore don't need to be sent again.
    284 
    285         For jobs, only incomplete jobs are sent, as the master won't send
    286         already completed jobs anyway. This helps keeping the list of id's
    287         considerably small.
    288 
    289         For hosts, host status in addition to host id are sent to master
    290         to sync the host status.
    291 
    292         @returns: Tuple of three lists. The first one contains job ids, the
    293                   second one host ids, and the third one host statuses.
    294         """
    295         jobs = models.Job.objects.filter(hostqueueentry__complete=False)
    296         job_ids = list(jobs.values_list('id', flat=True))
    297         self._report_job_time_distribution(jobs)
    298 
    299         host_models = models.Host.objects.filter(invalid=0)
    300         host_ids = []
    301         host_statuses = []
    302         for h in host_models:
    303             host_ids.append(h.id)
    304             host_statuses.append(h.status)
    305         return job_ids, host_ids, host_statuses
    306 
    307 
    308     def _heartbeat_packet(self):
    309         """Construct the heartbeat packet.
    310 
    311         See rpc_interface for a more detailed description of the heartbeat.
    312 
    313         @return: A heartbeat packet.
    314         """
    315         known_job_ids, known_host_ids, known_host_statuses = (
    316                 self._get_known_jobs_and_hosts())
    317         max_print = 100
    318         logging.info('Known jobs (first %s): %s', max_print,
    319                      known_job_ids[:max_print])
    320         logging.info('Total known jobs: %s', len(known_job_ids))
    321 
    322         job_objs = self._get_jobs_to_upload()
    323         hqes = [hqe.serialize(include_dependencies=False)
    324                 for hqe in self._get_hqes_for_jobs(job_objs)]
    325 
    326         jobs = [job.serialize(include_dependencies=False) for job in job_objs]
    327         if len(jobs) > MAX_UPLOAD_JOBS:
    328             logging.info('Throttling number of jobs to upload from %s to %s.',
    329                          len(jobs), MAX_UPLOAD_JOBS)
    330             jobs = jobs[:MAX_UPLOAD_JOBS]
    331         logging.info('Uploading jobs %s', [j['id'] for j in jobs])
    332 
    333         return {'shard_hostname': self.hostname,
    334                 'known_job_ids': known_job_ids,
    335                 'known_host_ids': known_host_ids,
    336                 'known_host_statuses': known_host_statuses,
    337                 'jobs': jobs,
    338                 'hqes': hqes}
    339 
    340 
    341     def _report_job_time_distribution(self, jobs):
    342         """Report distribution of job durations to monarch."""
    343         jobs_time_distribution = metrics.Distribution(
    344                 _METRICS_PREFIX + 'known_jobs_durations')
    345         now = datetime.datetime.now()
    346 
    347         # The type expected by the .set(...) of a distribution is a
    348         # distribution.
    349         dist = ts_mon.Distribution(ts_mon.GeometricBucketer())
    350         for job in jobs:
    351             duration = int(
    352                     max(0, (now - job.created_on).total_seconds()))
    353             dist.add(duration)
    354         jobs_time_distribution.set(dist)
    355 
    356     def _report_packet_metrics(self, packet):
    357         """Report stats about outgoing packet to monarch."""
    358         metrics.Gauge(_METRICS_PREFIX + 'known_job_ids_count').set(
    359                 len(packet['known_job_ids']))
    360         metrics.Gauge(_METRICS_PREFIX + 'jobs_upload_count').set(
    361                 len(packet['jobs']))
    362         metrics.Gauge(_METRICS_PREFIX + 'known_host_ids_count').set(
    363                 len(packet['known_host_ids']))
    364 
    365 
    366     def _heartbeat_failure(self, log_message, failure_type_str=''):
    367         logging.error("Heartbeat failed. %s", log_message)
    368         metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure'
    369                         ).increment(fields={'failure_type': failure_type_str})
    370 
    371 
    372     @metrics.SecondsTimerDecorator(
    373             'chromeos/autotest/shard_client/do_heatbeat_duration')
    374     def do_heartbeat(self):
    375         """Perform a heartbeat: Retreive new jobs.
    376 
    377         This function executes a `shard_heartbeat` RPC. It retrieves the
    378         response of this call and processes the response by storing the returned
    379         objects in the local database.
    380 
    381         Returns: True if the heartbeat ran successfully, False otherwise.
    382         """
    383 
    384         logging.info("Performing heartbeat.")
    385         packet = self._heartbeat_packet()
    386         self._report_packet_metrics(packet)
    387         metrics.Gauge(_METRICS_PREFIX + 'request_size').set(
    388             len(str(packet)))
    389 
    390         try:
    391             response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
    392             logging.info('Finished heartbeat upload.')
    393         except urllib2.HTTPError as e:
    394             self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason),
    395                                     'HTTPError')
    396             return False
    397         except urllib2.URLError as e:
    398             self._heartbeat_failure('URLError: %s' % e.reason,
    399                                     'URLError')
    400             return False
    401         except httplib.HTTPException as e:
    402             self._heartbeat_failure('HTTPException: %s' % e,
    403                                     'HTTPException')
    404             return False
    405         except timeout_util.TimeoutError as e:
    406             self._heartbeat_failure('TimeoutError: %s' % e,
    407                                     'TimeoutError')
    408             return False
    409         except proxy.JSONRPCException as e:
    410             self._heartbeat_failure('JSONRPCException: %s' % e,
    411                                     'JSONRPCException')
    412             return False
    413 
    414         metrics.Gauge(_METRICS_PREFIX + 'response_size').set(
    415             len(str(response)))
    416         logging.info('Marking jobs as uploaded.')
    417         self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
    418         logging.info('Processing heartbeat response.')
    419         self.process_heartbeat_response(response)
    420         logging.info("Heartbeat completed.")
    421         return True
    422 
    423 
    424     def tick(self):
    425         """Performs all tasks the shard clients needs to do periodically."""
    426         success = self.do_heartbeat()
    427         if success:
    428             metrics.Counter('chromeos/autotest/shard_client/tick').increment()
    429 
    430 
    431     def loop(self, lifetime_hours):
    432         """Calls tick() until shutdown() is called or lifetime expires.
    433 
    434         @param lifetime_hours: (int) hours to loop for.
    435         """
    436         loop_start_time = time.time()
    437         while self._continue_looping(lifetime_hours, loop_start_time):
    438             self.tick()
    439             # Sleep with +/- 10% fuzzing to avoid phaselock of shards.
    440             tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5)
    441             time.sleep(self.tick_pause_sec + tick_fuzz)
    442 
    443 
    444     def shutdown(self):
    445         """Stops the shard client after the current tick."""
    446         logging.info("Shutdown request received.")
    447         self._shutdown_requested = True
    448 
    449 
    450     def _continue_looping(self, lifetime_hours, loop_start_time):
    451         """Determines if we should continue with the next mainloop iteration.
    452 
    453         @param lifetime_hours: (float) number of hours to loop for. None
    454                 implies no deadline.
    455         @param process_start_time: Time when we started looping.
    456         @returns True if we should continue looping, False otherwise.
    457         """
    458         if self._shutdown_requested:
    459             return False
    460 
    461         if (lifetime_hours is None
    462             or time.time() - loop_start_time < lifetime_hours * 3600):
    463             return True
    464         logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
    465                      lifetime_hours)
    466         return False
    467 
    468 
    469 def handle_signal(signum, frame):
    470     """Sigint handler so we don't crash mid-tick."""
    471     _heartbeat_client.shutdown()
    472 
    473 
    474 def _get_shard_hostname_and_ensure_running_on_shard():
    475     """Read the hostname the local shard from the global configuration.
    476 
    477     Raise an exception if run from elsewhere than a shard.
    478 
    479     @raises error.HeartbeatOnlyAllowedInShardModeException if run from
    480             elsewhere than from a shard.
    481     """
    482     hostname = global_config.global_config.get_config_value(
    483         'SHARD', 'shard_hostname', default=None)
    484     if not hostname:
    485         raise error.HeartbeatOnlyAllowedInShardModeException(
    486             'To run the shard client, shard_hostname must neither be None nor '
    487             'empty.')
    488     return hostname
    489 
    490 
    491 def _get_tick_pause_sec():
    492     """Read pause to make between two ticks from the global configuration."""
    493     return global_config.global_config.get_config_value(
    494         'SHARD', 'heartbeat_pause_sec', type=float)
    495 
    496 
    497 def get_shard_client():
    498     """Instantiate a shard client instance.
    499 
    500     Configuration values will be read from the global configuration.
    501 
    502     @returns A shard client instance.
    503     """
    504     global_afe_hostname = server_utils.get_global_afe_hostname()
    505     shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
    506     tick_pause_sec = _get_tick_pause_sec()
    507     return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
    508 
    509 
    510 def main():
    511     parser = argparse.ArgumentParser(description='Shard client.')
    512     parser.add_argument(
    513             '--lifetime-hours',
    514             type=float,
    515             default=None,
    516             help='If provided, number of hours we should run for. '
    517                  'At the expiry of this time, the process will exit '
    518                  'gracefully.',
    519     )
    520     parser.add_argument(
    521             '--metrics-file',
    522             help='If provided, drop metrics to this local file instead of '
    523                  'reporting to ts_mon',
    524             type=str,
    525             default=None,
    526     )
    527     options = parser.parse_args()
    528 
    529     with ts_mon_config.SetupTsMonGlobalState(
    530           'shard_client',
    531           indirect=True,
    532           debug_file=options.metrics_file,
    533     ):
    534         try:
    535             metrics.Counter('chromeos/autotest/shard_client/start').increment()
    536             main_without_exception_handling(options)
    537         except Exception as e:
    538             metrics.Counter('chromeos/autotest/shard_client/uncaught_exception'
    539                             ).increment()
    540             message = 'Uncaught exception. Terminating shard_client.'
    541             email_manager.manager.log_stacktrace(message)
    542             logging.exception(message)
    543             raise
    544         finally:
    545             email_manager.manager.send_queued_emails()
    546 
    547 
    548 def main_without_exception_handling(options):
    549     scheduler_lib.setup_logging(
    550             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    551             None, timestamped_logfile_prefix='shard_client')
    552 
    553     logging.info("Setting signal handler.")
    554     signal.signal(signal.SIGINT, handle_signal)
    555     signal.signal(signal.SIGTERM, handle_signal)
    556 
    557     logging.info("Starting shard client.")
    558     global _heartbeat_client
    559     _heartbeat_client = get_shard_client()
    560     _heartbeat_client.loop(options.lifetime_hours)
    561 
    562 
    563 if __name__ == '__main__':
    564     main()
    565