1 # Copyright 2017 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Event handlers.""" 6 7 from __future__ import absolute_import 8 from __future__ import division 9 from __future__ import print_function 10 11 import datetime 12 import logging 13 import time 14 15 from lucifer import autotest 16 from lucifer import jobx 17 18 logger = logging.getLogger(__name__) 19 20 21 class EventHandler(object): 22 """Event handling dispatcher. 23 24 Event handlers are implemented as methods named _handle_<event value>. 25 26 Each handler method must handle its exceptions accordingly. If an 27 exception escapes, the job dies on the spot. 28 29 Instances have one public attribute completed. completed is set to 30 True once the final COMPLETED event is received and the handler 31 finishes. 32 """ 33 34 def __init__(self, metrics, job, autoserv_exit, results_dir): 35 """Initialize instance. 36 37 @param metrics: Metrics instance 38 @param job: frontend.afe.models.Job instance to own 39 @param hqes: list of HostQueueEntry instances for the job 40 @param autoserv_exit: autoserv exit status 41 @param results_dir: Job results directory 42 """ 43 self.completed = False 44 self._metrics = metrics 45 self._job = job 46 # TODO(crbug.com/748234): autoserv not implemented yet. 47 self._autoserv_exit = autoserv_exit 48 self._results_dir = results_dir 49 50 def __call__(self, event, msg): 51 logger.debug('Received event %r with message %r', event.name, msg) 52 method_name = '_handle_%s' % event.value 53 try: 54 handler = getattr(self, method_name) 55 except AttributeError: 56 raise NotImplementedError('%s is not implemented for handling %s', 57 method_name, event.name) 58 _retry_db_errors(lambda: handler(msg)) 59 60 def _handle_starting(self, msg): 61 # TODO(crbug.com/748234): No event update needed yet. 62 pass 63 64 def _handle_running(self, _msg): 65 models = autotest.load('frontend.afe.models') 66 self._job.hostqueueentry_set.all().update( 67 status=models.HostQueueEntry.Status.RUNNING, 68 started_on=datetime.datetime.now()) 69 70 def _handle_gathering(self, _msg): 71 models = autotest.load('frontend.afe.models') 72 self._job.hostqueueentry_set.all().update( 73 status=models.HostQueueEntry.Status.GATHERING) 74 75 def _handle_parsing(self, _msg): 76 models = autotest.load('frontend.afe.models') 77 self._job.hostqueueentry_set.all().update( 78 status=models.HostQueueEntry.Status.PARSING) 79 80 def _handle_aborted(self, _msg): 81 for hqe in self._job.hostqueueentry_set.all().prefetch_related('host'): 82 _mark_hqe_aborted(hqe) 83 jobx.write_aborted_keyvals_and_status(self._job, self._results_dir) 84 85 def _handle_completed(self, _msg): 86 self._mark_job_complete() 87 self.completed = True 88 89 def _handle_test_passed(self, msg): 90 if msg == 'autoserv': 91 self._autoserv_exit = 0 92 93 def _handle_test_failed(self, msg): 94 if msg == 'autoserv': 95 self._autoserv_exit = 1 96 97 def _handle_host_running(self, msg): 98 models = autotest.load('frontend.afe.models') 99 host = models.Host.objects.get(hostname=msg) 100 host.status = models.Host.Status.RUNNING 101 host.dirty = 1 102 host.save(update_fields=['status', 'dirty']) 103 self._metrics.send_host_status(host) 104 105 def _handle_host_ready(self, msg): 106 models = autotest.load('frontend.afe.models') 107 host = models.Host.objects.get(hostname=msg) 108 host.status = models.Host.Status.READY 109 host.save(update_fields=['status']) 110 self._metrics.send_host_status(host) 111 112 def _handle_host_needs_cleanup(self, msg): 113 models = autotest.load('frontend.afe.models') 114 host = models.Host.objects.get(hostname=msg) 115 models.SpecialTask.objects.create( 116 host_id=host.id, 117 task=models.SpecialTask.Task.CLEANUP, 118 requested_by=models.User.objects.get(login=self._job.owner)) 119 120 def _handle_host_needs_reset(self, msg): 121 models = autotest.load('frontend.afe.models') 122 host = models.Host.objects.get(hostname=msg) 123 models.SpecialTask.objects.create( 124 host_id=host.id, 125 task=models.SpecialTask.Task.RESET, 126 requested_by=models.User.objects.get(login=self._job.owner)) 127 128 def _handle_x_tests_done(self, msg): 129 """Taken from GatherLogsTask.epilog.""" 130 autoserv_exit, failures = (int(x) for x in msg.split(',')) 131 logger.debug('Got autoserv_exit=%d, failures=%d', 132 autoserv_exit, failures) 133 success = (autoserv_exit == 0 and failures == 0) 134 reset_after_failure = not self._job.run_reset and not success 135 hqes = self._job.hostqueueentry_set.all().prefetch_related('host') 136 if self._should_reboot_duts(autoserv_exit, failures, 137 reset_after_failure): 138 logger.debug('Creating cleanup jobs for hosts') 139 for entry in hqes: 140 self._handle_host_needs_cleanup(entry.host.hostname) 141 else: 142 logger.debug('Not creating cleanup jobs for hosts') 143 for entry in hqes: 144 self._handle_host_ready(entry.host.hostname) 145 if not reset_after_failure: 146 logger.debug('Skipping reset because reset_after_failure is False') 147 return 148 logger.debug('Creating reset jobs for hosts') 149 self._metrics.send_reset_after_failure(autoserv_exit, failures) 150 for entry in hqes: 151 self._handle_host_needs_reset(entry.host.hostname) 152 153 def _should_reboot_duts(self, autoserv_exit, failures, reset_after_failure): 154 models = autotest.load('frontend.afe.models') 155 reboot_after = self._job.reboot_after 156 if self._final_status() == models.HostQueueEntry.Status.ABORTED: 157 logger.debug('Should reboot because reboot_after=ABORTED') 158 return True 159 elif reboot_after == models.Job.RebootAfter.ALWAYS: 160 logger.debug('Should reboot because reboot_after=ALWAYS') 161 return True 162 elif (reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED 163 and autoserv_exit == 0 and failures == 0): 164 logger.debug('Should reboot because' 165 ' reboot_after=IF_ALL_TESTS_PASSED') 166 return True 167 else: 168 return failures > 0 and not reset_after_failure 169 170 def _mark_job_complete(self): 171 """Perform Autotest operations needed for job completion.""" 172 final_status = self._final_status() 173 self._mark_hqes_complete(final_status) 174 self._stop_job_if_necessary(final_status) 175 self._release_job_if_sharded() 176 177 def _mark_hqes_complete(self, final_status): 178 """Perform Autotest HQE operations needed for job completion.""" 179 for hqe in self._job.hostqueueentry_set.all(): 180 self._set_completed_status(hqe, final_status) 181 182 def _stop_job_if_necessary(self, final_status): 183 """Equivalent to scheduler.modes.Job.stop_if_necessary(). 184 185 The name isn't informative, but this will stop pre-job tasks as 186 necessary. 187 """ 188 models = autotest.load('frontend.afe.models') 189 if final_status is not models.HostQueueEntry.Status.ABORTED: 190 _stop_prejob_hqes(self._job) 191 192 def _release_job_if_sharded(self): 193 if self._job.shard_id is not None: 194 # If shard_id is None, the job will be synced back to the master 195 self._job.shard_id = None 196 self._job.save(update_fields=['shard_id']) 197 198 def _final_status(self): 199 models = autotest.load('frontend.afe.models') 200 Status = models.HostQueueEntry.Status 201 if jobx.is_aborted(self._job): 202 return Status.ABORTED 203 if self._autoserv_exit == 0: 204 return Status.COMPLETED 205 return Status.FAILED 206 207 def _set_completed_status(self, hqe, status): 208 """Set completed status of HQE. 209 210 This is a cleaned up version of the one in scheduler_models to work 211 with Django models. 212 """ 213 hqe.status = status 214 hqe.active = False 215 hqe.complete = True 216 if hqe.started_on: 217 hqe.finished_on = datetime.datetime.now() 218 hqe.save(update_fields=['status', 'active', 'complete', 'finished_on']) 219 self._metrics.send_hqe_completion(hqe) 220 self._metrics.send_hqe_duration(hqe) 221 222 223 class Metrics(object): 224 225 """Class for sending job metrics.""" 226 227 def __init__(self): 228 # Metrics 229 metrics = autotest.chromite_load('metrics') 230 self._hqe_completion_metric = metrics.Counter( 231 'chromeos/autotest/scheduler/hqe_completion_count') 232 self._reset_after_failure_metric = metrics.Counter( 233 'chromeos/autotest/scheduler/postjob_tasks/' 234 'reset_after_failure') 235 self._host_status_metric = metrics.Boolean( 236 'chromeos/autotest/dut_status', reset_after=True) 237 238 def send_host_status(self, host): 239 """Send ts_mon metrics for host status. 240 241 @param host: frontend.afe.models.Host instance 242 """ 243 labellib = autotest.load('utils.labellib') 244 labels = labellib.LabelsMapping.from_host(host) 245 fields = { 246 'dut_host_name': host.hostname, 247 'board': labels['board'], 248 'model': labels['model'], 249 } 250 # As each device switches state, indicate that it is not in any 251 # other state. This allows Monarch queries to avoid double counting 252 # when additional points are added by the Window Align operation. 253 for s in host.Status.names: 254 fields['status'] = s 255 self._host_status_metric.set(s == host.status, fields=fields) 256 257 def send_hqe_completion(self, hqe): 258 """Send ts_mon metrics for HQE completion.""" 259 fields = { 260 'status': hqe.status.lower(), 261 'board': 'NO_HOST', 262 'pool': 'NO_HOST', 263 } 264 if hqe.host: 265 labellib = autotest.load('utils.labellib') 266 labels = labellib.LabelsMapping.from_host(hqe.host) 267 fields['board'] = labels.get('board', '') 268 fields['pool'] = labels.get('pool', '') 269 self._hqe_completion_metric.increment(fields=fields) 270 271 def send_hqe_duration(self, hqe): 272 """Send CloudTrace metrics for HQE duration.""" 273 if not (hqe.started_on and hqe.finished_on): 274 return 275 scheduler_models = autotest.load('scheduler.scheduler_models') 276 cloud_trace = autotest.chromite_load('cloud_trace') 277 types = autotest.deps_load('google.protobuf.internal.well_known_types') 278 hqe_trace_id = scheduler_models.hqe_trace_id 279 280 span = cloud_trace.Span( 281 'HQE', spanId='0', traceId=hqe_trace_id(hqe.id)) 282 span.startTime = types.Timestamp() 283 span.startTime.FromDatetime(hqe.started_on) 284 span.endTime = types.Timestamp() 285 span.endTime.FromDatetime(hqe.finished_on) 286 cloud_trace.LogSpan(span) 287 288 def send_reset_after_failure(self, autoserv_exit, failures): 289 """Send reset_after_failure metric.""" 290 self._reset_after_failure_metric.increment( 291 fields={'autoserv_process_success': autoserv_exit == 0, 292 # Yes, this is a boolean 293 'num_tests_failed': failures > 0}) 294 295 296 def _mark_hqe_aborted(hqe): 297 """Perform Autotest operations needed for HQE abortion. 298 299 This also operates on the HQE's host, so prefetch it when possible. 300 301 This logic is from scheduler_models.HostQueueEntry.abort(). 302 """ 303 models = autotest.load('frontend.afe.models') 304 transaction = autotest.deps_load('django.db.transaction') 305 Status = models.HostQueueEntry.Status 306 with transaction.commit_on_success(): 307 if hqe.status in (Status.GATHERING, Status.PARSING): 308 return 309 if hqe.status in (Status.STARTING, Status.PENDING, Status.RUNNING): 310 if hqe.host is None: 311 return 312 hqe.host.status = models.Host.Status.READY 313 hqe.host.save(update_fields=['status']) 314 hqe.status = Status.ABORTED 315 hqe.save(update_fields=['status']) 316 317 318 def _stop_prejob_hqes(job): 319 """Stop pending HQEs for a job (for synch_count).""" 320 models = autotest.load('frontend.afe.models') 321 HQEStatus = models.HostQueueEntry.Status 322 HostStatus = models.Host.Status 323 not_yet_run = _get_prejob_hqes(job) 324 if not_yet_run.count() == job.synch_count: 325 return 326 entries_to_stop = _get_prejob_hqes(job, include_active=False) 327 for hqe in entries_to_stop: 328 if hqe.status == HQEStatus.PENDING: 329 hqe.host.status = HostStatus.READY 330 hqe.host.save(update_fields=['status']) 331 hqe.status = HQEStatus.STOPPED 332 hqe.save(update_fields=['status']) 333 334 335 def _get_prejob_hqes(job, include_active=True): 336 """Return a queryset of not run HQEs for the job (for synch_count).""" 337 models = autotest.load('frontend.afe.models') 338 if include_active: 339 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES) 340 else: 341 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES) 342 return models.HostQueueEntry.objects.filter( 343 job=job, status__in=statuses) 344 345 346 def _retry_db_errors(func): 347 """Call func, retrying multiple times if database errors are raised. 348 349 crbug.com/863504 350 """ 351 django = autotest.deps_load('django') 352 MySQLdb = autotest.deps_load('MySQLdb') 353 max_retries = 10 354 # n ... 0 means n + 1 tries, or 1 try plus n retries 355 for i in xrange(max_retries, -1, -1): 356 try: 357 func() 358 except (django.db.utils.DatabaseError, MySQLdb.OperationalError) as e: 359 if i == 0: 360 raise 361 logger.debug('Got database error %s, retrying', e) 362 django.db.close_connection() 363 time.sleep(5) 364 else: 365 break 366