Home | History | Annotate | Download | only in lucifer
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Event handlers."""
      6 
      7 from __future__ import absolute_import
      8 from __future__ import division
      9 from __future__ import print_function
     10 
     11 import datetime
     12 import logging
     13 import time
     14 
     15 from lucifer import autotest
     16 from lucifer import jobx
     17 
     18 logger = logging.getLogger(__name__)
     19 
     20 
     21 class EventHandler(object):
     22     """Event handling dispatcher.
     23 
     24     Event handlers are implemented as methods named _handle_<event value>.
     25 
     26     Each handler method must handle its exceptions accordingly.  If an
     27     exception escapes, the job dies on the spot.
     28 
     29     Instances have one public attribute completed.  completed is set to
     30     True once the final COMPLETED event is received and the handler
     31     finishes.
     32     """
     33 
     34     def __init__(self, metrics, job, autoserv_exit, results_dir):
     35         """Initialize instance.
     36 
     37         @param metrics: Metrics instance
     38         @param job: frontend.afe.models.Job instance to own
     39         @param hqes: list of HostQueueEntry instances for the job
     40         @param autoserv_exit: autoserv exit status
     41         @param results_dir: Job results directory
     42         """
     43         self.completed = False
     44         self._metrics = metrics
     45         self._job = job
     46         # TODO(crbug.com/748234): autoserv not implemented yet.
     47         self._autoserv_exit = autoserv_exit
     48         self._results_dir = results_dir
     49 
     50     def __call__(self, event, msg):
     51         logger.debug('Received event %r with message %r', event.name, msg)
     52         method_name = '_handle_%s' % event.value
     53         try:
     54             handler = getattr(self, method_name)
     55         except AttributeError:
     56             raise NotImplementedError('%s is not implemented for handling %s',
     57                                       method_name, event.name)
     58         _retry_db_errors(lambda: handler(msg))
     59 
     60     def _handle_starting(self, msg):
     61         # TODO(crbug.com/748234): No event update needed yet.
     62         pass
     63 
     64     def _handle_running(self, _msg):
     65         models = autotest.load('frontend.afe.models')
     66         self._job.hostqueueentry_set.all().update(
     67                 status=models.HostQueueEntry.Status.RUNNING,
     68                 started_on=datetime.datetime.now())
     69 
     70     def _handle_gathering(self, _msg):
     71         models = autotest.load('frontend.afe.models')
     72         self._job.hostqueueentry_set.all().update(
     73                 status=models.HostQueueEntry.Status.GATHERING)
     74 
     75     def _handle_parsing(self, _msg):
     76         models = autotest.load('frontend.afe.models')
     77         self._job.hostqueueentry_set.all().update(
     78                 status=models.HostQueueEntry.Status.PARSING)
     79 
     80     def _handle_aborted(self, _msg):
     81         for hqe in self._job.hostqueueentry_set.all().prefetch_related('host'):
     82             _mark_hqe_aborted(hqe)
     83         jobx.write_aborted_keyvals_and_status(self._job, self._results_dir)
     84 
     85     def _handle_completed(self, _msg):
     86         self._mark_job_complete()
     87         self.completed = True
     88 
     89     def _handle_test_passed(self, msg):
     90         if msg == 'autoserv':
     91             self._autoserv_exit = 0
     92 
     93     def _handle_test_failed(self, msg):
     94         if msg == 'autoserv':
     95             self._autoserv_exit = 1
     96 
     97     def _handle_host_running(self, msg):
     98         models = autotest.load('frontend.afe.models')
     99         host = models.Host.objects.get(hostname=msg)
    100         host.status = models.Host.Status.RUNNING
    101         host.dirty = 1
    102         host.save(update_fields=['status', 'dirty'])
    103         self._metrics.send_host_status(host)
    104 
    105     def _handle_host_ready(self, msg):
    106         models = autotest.load('frontend.afe.models')
    107         host = models.Host.objects.get(hostname=msg)
    108         host.status = models.Host.Status.READY
    109         host.save(update_fields=['status'])
    110         self._metrics.send_host_status(host)
    111 
    112     def _handle_host_needs_cleanup(self, msg):
    113         models = autotest.load('frontend.afe.models')
    114         host = models.Host.objects.get(hostname=msg)
    115         models.SpecialTask.objects.create(
    116                 host_id=host.id,
    117                 task=models.SpecialTask.Task.CLEANUP,
    118                 requested_by=models.User.objects.get(login=self._job.owner))
    119 
    120     def _handle_host_needs_reset(self, msg):
    121         models = autotest.load('frontend.afe.models')
    122         host = models.Host.objects.get(hostname=msg)
    123         models.SpecialTask.objects.create(
    124                 host_id=host.id,
    125                 task=models.SpecialTask.Task.RESET,
    126                 requested_by=models.User.objects.get(login=self._job.owner))
    127 
    128     def _handle_x_tests_done(self, msg):
    129         """Taken from GatherLogsTask.epilog."""
    130         autoserv_exit, failures = (int(x) for x in msg.split(','))
    131         logger.debug('Got autoserv_exit=%d, failures=%d',
    132                      autoserv_exit, failures)
    133         success = (autoserv_exit == 0 and failures == 0)
    134         reset_after_failure = not self._job.run_reset and not success
    135         hqes = self._job.hostqueueentry_set.all().prefetch_related('host')
    136         if self._should_reboot_duts(autoserv_exit, failures,
    137                                     reset_after_failure):
    138             logger.debug('Creating cleanup jobs for hosts')
    139             for entry in hqes:
    140                 self._handle_host_needs_cleanup(entry.host.hostname)
    141         else:
    142             logger.debug('Not creating cleanup jobs for hosts')
    143             for entry in hqes:
    144                 self._handle_host_ready(entry.host.hostname)
    145         if not reset_after_failure:
    146             logger.debug('Skipping reset because reset_after_failure is False')
    147             return
    148         logger.debug('Creating reset jobs for hosts')
    149         self._metrics.send_reset_after_failure(autoserv_exit, failures)
    150         for entry in hqes:
    151             self._handle_host_needs_reset(entry.host.hostname)
    152 
    153     def _should_reboot_duts(self, autoserv_exit, failures, reset_after_failure):
    154         models = autotest.load('frontend.afe.models')
    155         reboot_after = self._job.reboot_after
    156         if self._final_status() == models.HostQueueEntry.Status.ABORTED:
    157             logger.debug('Should reboot because reboot_after=ABORTED')
    158             return True
    159         elif reboot_after == models.Job.RebootAfter.ALWAYS:
    160             logger.debug('Should reboot because reboot_after=ALWAYS')
    161             return True
    162         elif (reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED
    163               and autoserv_exit == 0 and failures == 0):
    164             logger.debug('Should reboot because'
    165                          ' reboot_after=IF_ALL_TESTS_PASSED')
    166             return True
    167         else:
    168             return failures > 0 and not reset_after_failure
    169 
    170     def _mark_job_complete(self):
    171         """Perform Autotest operations needed for job completion."""
    172         final_status = self._final_status()
    173         self._mark_hqes_complete(final_status)
    174         self._stop_job_if_necessary(final_status)
    175         self._release_job_if_sharded()
    176 
    177     def _mark_hqes_complete(self, final_status):
    178         """Perform Autotest HQE operations needed for job completion."""
    179         for hqe in self._job.hostqueueentry_set.all():
    180             self._set_completed_status(hqe, final_status)
    181 
    182     def _stop_job_if_necessary(self, final_status):
    183         """Equivalent to scheduler.modes.Job.stop_if_necessary().
    184 
    185         The name isn't informative, but this will stop pre-job tasks as
    186         necessary.
    187         """
    188         models = autotest.load('frontend.afe.models')
    189         if final_status is not models.HostQueueEntry.Status.ABORTED:
    190             _stop_prejob_hqes(self._job)
    191 
    192     def _release_job_if_sharded(self):
    193         if self._job.shard_id is not None:
    194             # If shard_id is None, the job will be synced back to the master
    195             self._job.shard_id = None
    196             self._job.save(update_fields=['shard_id'])
    197 
    198     def _final_status(self):
    199         models = autotest.load('frontend.afe.models')
    200         Status = models.HostQueueEntry.Status
    201         if jobx.is_aborted(self._job):
    202             return Status.ABORTED
    203         if self._autoserv_exit == 0:
    204             return Status.COMPLETED
    205         return Status.FAILED
    206 
    207     def _set_completed_status(self, hqe, status):
    208         """Set completed status of HQE.
    209 
    210         This is a cleaned up version of the one in scheduler_models to work
    211         with Django models.
    212         """
    213         hqe.status = status
    214         hqe.active = False
    215         hqe.complete = True
    216         if hqe.started_on:
    217             hqe.finished_on = datetime.datetime.now()
    218         hqe.save(update_fields=['status', 'active', 'complete', 'finished_on'])
    219         self._metrics.send_hqe_completion(hqe)
    220         self._metrics.send_hqe_duration(hqe)
    221 
    222 
    223 class Metrics(object):
    224 
    225     """Class for sending job metrics."""
    226 
    227     def __init__(self):
    228         # Metrics
    229         metrics = autotest.chromite_load('metrics')
    230         self._hqe_completion_metric = metrics.Counter(
    231                 'chromeos/autotest/scheduler/hqe_completion_count')
    232         self._reset_after_failure_metric = metrics.Counter(
    233                 'chromeos/autotest/scheduler/postjob_tasks/'
    234                 'reset_after_failure')
    235         self._host_status_metric = metrics.Boolean(
    236                 'chromeos/autotest/dut_status', reset_after=True)
    237 
    238     def send_host_status(self, host):
    239         """Send ts_mon metrics for host status.
    240 
    241         @param host: frontend.afe.models.Host instance
    242         """
    243         labellib = autotest.load('utils.labellib')
    244         labels = labellib.LabelsMapping.from_host(host)
    245         fields = {
    246                 'dut_host_name': host.hostname,
    247                 'board': labels['board'],
    248                 'model': labels['model'],
    249         }
    250         # As each device switches state, indicate that it is not in any
    251         # other state.  This allows Monarch queries to avoid double counting
    252         # when additional points are added by the Window Align operation.
    253         for s in host.Status.names:
    254             fields['status'] = s
    255             self._host_status_metric.set(s == host.status, fields=fields)
    256 
    257     def send_hqe_completion(self, hqe):
    258         """Send ts_mon metrics for HQE completion."""
    259         fields = {
    260                 'status': hqe.status.lower(),
    261                 'board': 'NO_HOST',
    262                 'pool': 'NO_HOST',
    263         }
    264         if hqe.host:
    265             labellib = autotest.load('utils.labellib')
    266             labels = labellib.LabelsMapping.from_host(hqe.host)
    267             fields['board'] = labels.get('board', '')
    268             fields['pool'] = labels.get('pool', '')
    269         self._hqe_completion_metric.increment(fields=fields)
    270 
    271     def send_hqe_duration(self, hqe):
    272         """Send CloudTrace metrics for HQE duration."""
    273         if not (hqe.started_on and hqe.finished_on):
    274             return
    275         scheduler_models = autotest.load('scheduler.scheduler_models')
    276         cloud_trace = autotest.chromite_load('cloud_trace')
    277         types = autotest.deps_load('google.protobuf.internal.well_known_types')
    278         hqe_trace_id = scheduler_models.hqe_trace_id
    279 
    280         span = cloud_trace.Span(
    281                 'HQE', spanId='0', traceId=hqe_trace_id(hqe.id))
    282         span.startTime = types.Timestamp()
    283         span.startTime.FromDatetime(hqe.started_on)
    284         span.endTime = types.Timestamp()
    285         span.endTime.FromDatetime(hqe.finished_on)
    286         cloud_trace.LogSpan(span)
    287 
    288     def send_reset_after_failure(self, autoserv_exit, failures):
    289         """Send reset_after_failure metric."""
    290         self._reset_after_failure_metric.increment(
    291                 fields={'autoserv_process_success': autoserv_exit == 0,
    292                         # Yes, this is a boolean
    293                         'num_tests_failed': failures > 0})
    294 
    295 
    296 def _mark_hqe_aborted(hqe):
    297     """Perform Autotest operations needed for HQE abortion.
    298 
    299     This also operates on the HQE's host, so prefetch it when possible.
    300 
    301     This logic is from scheduler_models.HostQueueEntry.abort().
    302     """
    303     models = autotest.load('frontend.afe.models')
    304     transaction = autotest.deps_load('django.db.transaction')
    305     Status = models.HostQueueEntry.Status
    306     with transaction.commit_on_success():
    307         if hqe.status in (Status.GATHERING, Status.PARSING):
    308             return
    309         if hqe.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
    310             if hqe.host is None:
    311                 return
    312             hqe.host.status = models.Host.Status.READY
    313             hqe.host.save(update_fields=['status'])
    314         hqe.status = Status.ABORTED
    315         hqe.save(update_fields=['status'])
    316 
    317 
    318 def _stop_prejob_hqes(job):
    319     """Stop pending HQEs for a job (for synch_count)."""
    320     models = autotest.load('frontend.afe.models')
    321     HQEStatus = models.HostQueueEntry.Status
    322     HostStatus = models.Host.Status
    323     not_yet_run = _get_prejob_hqes(job)
    324     if not_yet_run.count() == job.synch_count:
    325         return
    326     entries_to_stop = _get_prejob_hqes(job, include_active=False)
    327     for hqe in entries_to_stop:
    328         if hqe.status == HQEStatus.PENDING:
    329             hqe.host.status = HostStatus.READY
    330             hqe.host.save(update_fields=['status'])
    331         hqe.status = HQEStatus.STOPPED
    332         hqe.save(update_fields=['status'])
    333 
    334 
    335 def _get_prejob_hqes(job, include_active=True):
    336     """Return a queryset of not run HQEs for the job (for synch_count)."""
    337     models = autotest.load('frontend.afe.models')
    338     if include_active:
    339         statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
    340     else:
    341         statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
    342     return models.HostQueueEntry.objects.filter(
    343             job=job, status__in=statuses)
    344 
    345 
    346 def _retry_db_errors(func):
    347     """Call func, retrying multiple times if database errors are raised.
    348 
    349     crbug.com/863504
    350     """
    351     django = autotest.deps_load('django')
    352     MySQLdb = autotest.deps_load('MySQLdb')
    353     max_retries = 10
    354     # n ... 0 means n + 1 tries, or 1 try plus n retries
    355     for i in xrange(max_retries, -1, -1):
    356         try:
    357             func()
    358         except (django.db.utils.DatabaseError, MySQLdb.OperationalError) as e:
    359             if i == 0:
    360                 raise
    361             logger.debug('Got database error %s, retrying', e)
    362             django.db.close_connection()
    363             time.sleep(5)
    364         else:
    365             break
    366