Home | History | Annotate | Download | only in lucifer
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Event handlers."""
      6 
      7 from __future__ import absolute_import
      8 from __future__ import division
      9 from __future__ import print_function
     10 
     11 import datetime
     12 import logging
     13 
     14 from lucifer import autotest
     15 from lucifer import jobx
     16 
     17 logger = logging.getLogger(__name__)
     18 
     19 
     20 class EventHandler(object):
     21     """Event handling dispatcher.
     22 
     23     Event handlers are implemented as methods named _handle_<event value>.
     24 
     25     Each handler method must handle its exceptions accordingly.  If an
     26     exception escapes, the job dies on the spot.
     27 
     28     Instances have one public attribute completed.  completed is set to
     29     True once the final COMPLETED event is received and the handler
     30     finishes.
     31     """
     32 
     33     def __init__(self, metrics, job, autoserv_exit):
     34         """Initialize instance.
     35 
     36         @param metrics: Metrics instance
     37         @param job: frontend.afe.models.Job instance to own
     38         @param hqes: list of HostQueueEntry instances for the job
     39         @param autoserv_exit: autoserv exit status
     40         """
     41         self.completed = False
     42         self._metrics = metrics
     43         self._job = job
     44         # TODO(crbug.com/748234): autoserv not implemented yet.
     45         self._autoserv_exit = autoserv_exit
     46 
     47     def __call__(self, event, msg):
     48         logger.debug('Received event %r with message %r', event.name, msg)
     49         method_name = '_handle_%s' % event.value
     50         try:
     51             handler = getattr(self, method_name)
     52         except AttributeError:
     53             raise NotImplementedError('%s is not implemented for handling %s',
     54                                       method_name, event.name)
     55         handler(msg)
     56 
     57     def _handle_starting(self, msg):
     58         # TODO(crbug.com/748234): No event update needed yet.
     59         pass
     60 
     61     def _handle_gathering(self, msg):
     62         # TODO(crbug.com/794779): monitor_db leaves HQEs in GATHERING
     63         pass
     64 
     65     def _handle_x_tests_done(self, msg):
     66         """Taken from GatherLogsTask.epilog."""
     67         autoserv_exit, failures = (int(x) for x in msg.split(','))
     68         logger.debug('Got autoserv_exit=%d, failures=%d',
     69                      autoserv_exit, failures)
     70         success = (autoserv_exit == 0 and failures == 0)
     71         reset_after_failure = not self._job.run_reset and not success
     72         hqes = self._job.hostqueueentry_set.all().prefetch_related('host')
     73         if self._should_reboot_duts(autoserv_exit, failures,
     74                                     reset_after_failure):
     75             logger.debug('Creating cleanup jobs for hosts')
     76             for entry in hqes:
     77                 self._handle_host_needs_cleanup(entry.host.hostname)
     78         else:
     79             logger.debug('Not creating cleanup jobs for hosts')
     80             for entry in hqes:
     81                 self._handle_host_ready(entry.host.hostname)
     82         if not reset_after_failure:
     83             logger.debug('Skipping reset because reset_after_failure is False')
     84             return
     85         logger.debug('Creating reset jobs for hosts')
     86         self._metrics.send_reset_after_failure(autoserv_exit, failures)
     87         for entry in hqes:
     88             self._handle_host_needs_reset(entry.host.hostname)
     89 
     90     def _handle_parsing(self, _msg):
     91         models = autotest.load('frontend.afe.models')
     92         self._job.hostqueueentry_set.all().update(
     93                 status=models.HostQueueEntry.Status.PARSING)
     94 
     95     def _handle_completed(self, _msg):
     96         models = autotest.load('frontend.afe.models')
     97         final_status = self._final_status()
     98         for hqe in self._job.hostqueueentry_set.all():
     99             self._set_completed_status(hqe, final_status)
    100         if final_status is not models.HostQueueEntry.Status.ABORTED:
    101             _stop_prejob_hqes(self._job)
    102         if self._job.shard_id is not None:
    103             # If shard_id is None, the job will be synced back to the master
    104             self._job.shard_id = None
    105             self._job.save()
    106         self.completed = True
    107 
    108     def _handle_host_ready(self, msg):
    109         models = autotest.load('frontend.afe.models')
    110         (models.Host.objects.filter(hostname=msg)
    111          .update(status=models.Host.Status.READY))
    112 
    113     def _handle_host_needs_cleanup(self, msg):
    114         models = autotest.load('frontend.afe.models')
    115         host = models.Host.objects.get(hostname=msg)
    116         models.SpecialTask.objects.create(
    117                 host_id=host.id,
    118                 task=models.SpecialTask.Task.CLEANUP,
    119                 requested_by=models.User.objects.get(login=self._job.owner))
    120 
    121     def _handle_host_needs_reset(self, msg):
    122         models = autotest.load('frontend.afe.models')
    123         host = models.Host.objects.get(hostname=msg)
    124         models.SpecialTask.objects.create(
    125                 host_id=host.id,
    126                 task=models.SpecialTask.Task.RESET,
    127                 requested_by=models.User.objects.get(login=self._job.owner))
    128 
    129     def _should_reboot_duts(self, autoserv_exit, failures, reset_after_failure):
    130         models = autotest.load('frontend.afe.models')
    131         reboot_after = self._job.reboot_after
    132         if self._final_status() == models.HostQueueEntry.Status.ABORTED:
    133             logger.debug('Should reboot because reboot_after=ABORTED')
    134             return True
    135         elif reboot_after == models.Job.RebootAfter.ALWAYS:
    136             logger.debug('Should reboot because reboot_after=ALWAYS')
    137             return True
    138         elif (reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED
    139               and autoserv_exit == 0 and failures == 0):
    140             logger.debug('Should reboot because'
    141                          ' reboot_after=IF_ALL_TESTS_PASSED')
    142             return True
    143         else:
    144             return failures > 0 and not reset_after_failure
    145 
    146     def _final_status(self):
    147         models = autotest.load('frontend.afe.models')
    148         Status = models.HostQueueEntry.Status
    149         if jobx.is_aborted(self._job):
    150             return Status.ABORTED
    151         if self._autoserv_exit == 0:
    152             return Status.COMPLETED
    153         return Status.FAILED
    154 
    155     def _set_completed_status(self, hqe, status):
    156         """Set completed status of HQE.
    157 
    158         This is a cleaned up version of the one in scheduler_models to work
    159         with Django models.
    160         """
    161         hqe.status = status
    162         hqe.active = False
    163         hqe.complete = True
    164         if hqe.started_on:
    165             hqe.finished_on = datetime.datetime.now()
    166         hqe.save()
    167         self._metrics.send_hqe_completion(hqe)
    168         self._metrics.send_hqe_duration(hqe)
    169 
    170 
    171 class Metrics(object):
    172 
    173     """Class for sending job metrics."""
    174 
    175     def __init__(self):
    176         # Metrics
    177         metrics = autotest.chromite_load('metrics')
    178         self._hqe_completion_metric = metrics.Counter(
    179                 'chromeos/autotest/scheduler/hqe_completion_count')
    180         self._reset_after_failure_metric = metrics.Counter(
    181                 'chromeos/autotest/scheduler/postjob_tasks/'
    182                 'reset_after_failure')
    183 
    184     def send_hqe_completion(self, hqe):
    185         """Send ts_mon metrics for HQE completion."""
    186         fields = {
    187                 'status': hqe.status.lower(),
    188                 'board': 'NO_HOST',
    189                 'pool': 'NO_HOST',
    190         }
    191         if hqe.host:
    192             labellib = autotest.load('utils.labellib')
    193             labels = labellib.LabelsMapping.from_host(hqe.host)
    194             fields['board'] = labels.get('board', '')
    195             fields['pool'] = labels.get('pool', '')
    196         self._hqe_completion_metric.increment(fields=fields)
    197 
    198     def send_hqe_duration(self, hqe):
    199         """Send CloudTrace metrics for HQE duration."""
    200         if not (hqe.started_on and hqe.finished_on):
    201             return
    202         scheduler_models = autotest.load('scheduler.scheduler_models')
    203         cloud_trace = autotest.chromite_load('cloud_trace')
    204         types = autotest.deps_load('google.protobuf.internal.well_known_types')
    205         hqe_trace_id = scheduler_models.hqe_trace_id
    206 
    207         span = cloud_trace.Span(
    208                 'HQE', spanId='0', traceId=hqe_trace_id(hqe.id))
    209         span.startTime = types.Timestamp()
    210         span.startTime.FromDatetime(hqe.started_on)
    211         span.endTime = types.Timestamp()
    212         span.endTime.FromDatetime(hqe.finished_on)
    213         cloud_trace.LogSpan(span)
    214 
    215     def send_reset_after_failure(self, autoserv_exit, failures):
    216         """Send reset_after_failure metric."""
    217         self._reset_after_failure_metric.increment(
    218                 fields={'autoserv_process_success': autoserv_exit == 0,
    219                         # Yes, this is a boolean
    220                         'num_tests_failed': failures > 0})
    221 
    222 
    223 def _stop_prejob_hqes(job):
    224     """Stop pending HQEs for a job (for synch_count)."""
    225     models = autotest.load('frontend.afe.models')
    226     HQEStatus = models.HostQueueEntry.Status
    227     HostStatus = models.Host.Status
    228     not_yet_run = _get_prejob_hqes(job)
    229     if not_yet_run.count() == job.synch_count:
    230         return
    231     entries_to_stop = _get_prejob_hqes(job, include_active=False)
    232     for hqe in entries_to_stop:
    233         if hqe.status == HQEStatus.PENDING:
    234             hqe.host.status = HostStatus.READY
    235             hqe.host.save()
    236         hqe.status = HQEStatus.STOPPED
    237         hqe.save()
    238 
    239 
    240 def _get_prejob_hqes(job, include_active=True):
    241     """Return a queryset of not run HQEs for the job (for synch_count)."""
    242     models = autotest.load('frontend.afe.models')
    243     if include_active:
    244         statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
    245     else:
    246         statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
    247     return models.HostQueueEntry.objects.filter(
    248             job=job, status__in=statuses)
    249