1 # pylint: disable=missing-docstring 2 3 """Database model classes for the scheduler. 4 5 Contains model classes abstracting the various DB tables used by the scheduler. 6 These overlap the Django models in basic functionality, but were written before 7 the Django models existed and have not yet been phased out. Some of them 8 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic 9 which would probably be ill-suited for inclusion in the general Django model 10 classes. 11 12 Globals: 13 _notify_email_statuses: list of HQE statuses. each time a single HQE reaches 14 one of these statuses, an email will be sent to the job's email_list. 15 comes from global_config. 16 _base_url: URL to the local AFE server, used to construct URLs for emails. 17 _db: DatabaseConnection for this module. 18 _drone_manager: reference to global DroneManager instance. 19 """ 20 21 import datetime 22 import itertools 23 import logging 24 import re 25 import time 26 import weakref 27 28 from autotest_lib.client.common_lib import global_config, host_protections 29 from autotest_lib.client.common_lib import time_utils 30 from autotest_lib.client.common_lib import utils 31 from autotest_lib.frontend.afe import models, model_attributes 32 from autotest_lib.scheduler import drone_manager, email_manager 33 from autotest_lib.scheduler import rdb_lib 34 from autotest_lib.scheduler import scheduler_config 35 from autotest_lib.scheduler import scheduler_lib 36 from autotest_lib.server import afe_urls 37 from autotest_lib.server.cros import provision 38 39 try: 40 from chromite.lib import metrics 41 except ImportError: 42 metrics = utils.metrics_mock 43 44 45 _notify_email_statuses = [] 46 _base_url = None 47 48 _db = None 49 _drone_manager = None 50 51 def initialize(): 52 global _db 53 _db = scheduler_lib.ConnectionManager().get_connection() 54 55 notify_statuses_list = global_config.global_config.get_config_value( 56 scheduler_config.CONFIG_SECTION, "notify_email_statuses", 57 default='') 58 global _notify_email_statuses 59 _notify_email_statuses = [status for status in 60 re.split(r'[\s,;:]', notify_statuses_list.lower()) 61 if status] 62 63 # AUTOTEST_WEB.base_url is still a supported config option as some people 64 # may wish to override the entire url. 65 global _base_url 66 config_base_url = global_config.global_config.get_config_value( 67 scheduler_config.CONFIG_SECTION, 'base_url', default='') 68 if config_base_url: 69 _base_url = config_base_url 70 else: 71 _base_url = afe_urls.ROOT_URL 72 73 initialize_globals() 74 75 76 def initialize_globals(): 77 global _drone_manager 78 _drone_manager = drone_manager.instance() 79 80 81 def get_job_metadata(job): 82 """Get a dictionary of the job information. 83 84 The return value is a dictionary that includes job information like id, 85 name and parent job information. The value will be stored in metadata 86 database. 87 88 @param job: A Job object. 89 @return: A dictionary containing the job id, owner and name. 90 """ 91 if not job: 92 logging.error('Job is None, no metadata returned.') 93 return {} 94 try: 95 return {'job_id': job.id, 96 'owner': job.owner, 97 'job_name': job.name, 98 'parent_job_id': job.parent_job_id} 99 except AttributeError as e: 100 logging.error('Job has missing attribute: %s', e) 101 return {} 102 103 104 class DelayedCallTask(object): 105 """ 106 A task object like AgentTask for an Agent to run that waits for the 107 specified amount of time to have elapsed before calling the supplied 108 callback once and finishing. If the callback returns anything, it is 109 assumed to be a new Agent instance and will be added to the dispatcher. 110 111 @attribute end_time: The absolute posix time after which this task will 112 call its callback when it is polled and be finished. 113 114 Also has all attributes required by the Agent class. 115 """ 116 def __init__(self, delay_seconds, callback, now_func=None): 117 """ 118 @param delay_seconds: The delay in seconds from now that this task 119 will call the supplied callback and be done. 120 @param callback: A callable to be called by this task once after at 121 least delay_seconds time has elapsed. It must return None 122 or a new Agent instance. 123 @param now_func: A time.time like function. Default: time.time. 124 Used for testing. 125 """ 126 assert delay_seconds > 0 127 assert callable(callback) 128 if not now_func: 129 now_func = time.time 130 self._now_func = now_func 131 self._callback = callback 132 133 self.end_time = self._now_func() + delay_seconds 134 135 # These attributes are required by Agent. 136 self.aborted = False 137 self.host_ids = () 138 self.success = False 139 self.queue_entry_ids = () 140 self.num_processes = 0 141 142 143 def poll(self): 144 if not self.is_done() and self._now_func() >= self.end_time: 145 self._callback() 146 self.success = True 147 148 149 def is_done(self): 150 return self.success or self.aborted 151 152 153 def abort(self): 154 self.aborted = True 155 156 157 class DBError(Exception): 158 """Raised by the DBObject constructor when its select fails.""" 159 160 161 class DBObject(object): 162 """A miniature object relational model for the database.""" 163 164 # Subclasses MUST override these: 165 _table_name = '' 166 _fields = () 167 168 # A mapping from (type, id) to the instance of the object for that 169 # particular id. This prevents us from creating new Job() and Host() 170 # instances for every HostQueueEntry object that we instantiate as 171 # multiple HQEs often share the same Job. 172 _instances_by_type_and_id = weakref.WeakValueDictionary() 173 _initialized = False 174 175 176 def __new__(cls, id=None, **kwargs): 177 """ 178 Look to see if we already have an instance for this particular type 179 and id. If so, use it instead of creating a duplicate instance. 180 """ 181 if id is not None: 182 instance = cls._instances_by_type_and_id.get((cls, id)) 183 if instance: 184 return instance 185 return super(DBObject, cls).__new__(cls, id=id, **kwargs) 186 187 188 def __init__(self, id=None, row=None, new_record=False, always_query=True): 189 assert bool(id) or bool(row) 190 if id is not None and row is not None: 191 assert id == row[0] 192 assert self._table_name, '_table_name must be defined in your class' 193 assert self._fields, '_fields must be defined in your class' 194 if not new_record: 195 if self._initialized and not always_query: 196 return # We've already been initialized. 197 if id is None: 198 id = row[0] 199 # Tell future constructors to use us instead of re-querying while 200 # this instance is still around. 201 self._instances_by_type_and_id[(type(self), id)] = self 202 203 self.__table = self._table_name 204 205 self.__new_record = new_record 206 207 if row is None: 208 row = self._fetch_row_from_db(id) 209 210 if self._initialized: 211 differences = self._compare_fields_in_row(row) 212 if differences: 213 logging.warning( 214 'initialized %s %s instance requery is updating: %s', 215 type(self), self.id, differences) 216 self._update_fields_from_row(row) 217 self._initialized = True 218 219 220 @classmethod 221 def _clear_instance_cache(cls): 222 """Used for testing, clear the internal instance cache.""" 223 cls._instances_by_type_and_id.clear() 224 225 226 def _fetch_row_from_db(self, row_id): 227 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table 228 rows = _db.execute(sql, (row_id,)) 229 if not rows: 230 raise DBError("row not found (table=%s, row id=%s)" 231 % (self.__table, row_id)) 232 return rows[0] 233 234 235 def _assert_row_length(self, row): 236 assert len(row) == len(self._fields), ( 237 "table = %s, row = %s/%d, fields = %s/%d" % ( 238 self.__table, row, len(row), self._fields, len(self._fields))) 239 240 241 def _compare_fields_in_row(self, row): 242 """ 243 Given a row as returned by a SELECT query, compare it to our existing in 244 memory fields. Fractional seconds are stripped from datetime values 245 before comparison. 246 247 @param row - A sequence of values corresponding to fields named in 248 The class attribute _fields. 249 250 @returns A dictionary listing the differences keyed by field name 251 containing tuples of (current_value, row_value). 252 """ 253 self._assert_row_length(row) 254 differences = {} 255 for field, row_value in itertools.izip(self._fields, row): 256 current_value = getattr(self, field) 257 if (isinstance(current_value, datetime.datetime) 258 and isinstance(row_value, datetime.datetime)): 259 current_value = current_value.strftime(time_utils.TIME_FMT) 260 row_value = row_value.strftime(time_utils.TIME_FMT) 261 if current_value != row_value: 262 differences[field] = (current_value, row_value) 263 return differences 264 265 266 def _update_fields_from_row(self, row): 267 """ 268 Update our field attributes using a single row returned by SELECT. 269 270 @param row - A sequence of values corresponding to fields named in 271 the class fields list. 272 """ 273 self._assert_row_length(row) 274 275 self._valid_fields = set() 276 for field, value in itertools.izip(self._fields, row): 277 setattr(self, field, value) 278 self._valid_fields.add(field) 279 280 self._valid_fields.remove('id') 281 282 283 def update_from_database(self): 284 assert self.id is not None 285 row = self._fetch_row_from_db(self.id) 286 self._update_fields_from_row(row) 287 288 289 def count(self, where, table = None): 290 if not table: 291 table = self.__table 292 293 rows = _db.execute(""" 294 SELECT count(*) FROM %s 295 WHERE %s 296 """ % (table, where)) 297 298 assert len(rows) == 1 299 300 return int(rows[0][0]) 301 302 303 def update_field(self, field, value): 304 assert field in self._valid_fields 305 306 if getattr(self, field) == value: 307 return 308 309 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field) 310 _db.execute(query, (value, self.id)) 311 312 setattr(self, field, value) 313 314 315 def save(self): 316 if self.__new_record: 317 keys = self._fields[1:] # avoid id 318 columns = ','.join([str(key) for key in keys]) 319 values = [] 320 for key in keys: 321 value = getattr(self, key) 322 if value is None: 323 values.append('NULL') 324 else: 325 values.append('"%s"' % value) 326 values_str = ','.join(values) 327 query = ('INSERT INTO %s (%s) VALUES (%s)' % 328 (self.__table, columns, values_str)) 329 _db.execute(query) 330 # Update our id to the one the database just assigned to us. 331 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0] 332 333 334 def delete(self): 335 self._instances_by_type_and_id.pop((type(self), id), None) 336 self._initialized = False 337 self._valid_fields.clear() 338 query = 'DELETE FROM %s WHERE id=%%s' % self.__table 339 _db.execute(query, (self.id,)) 340 341 342 @staticmethod 343 def _prefix_with(string, prefix): 344 if string: 345 string = prefix + string 346 return string 347 348 349 @classmethod 350 def fetch_rows(cls, where='', params=(), joins='', order_by=''): 351 """ 352 Fetch the rows based on the given database query. 353 354 @yields the rows fetched by the given query. 355 """ 356 order_by = cls._prefix_with(order_by, 'ORDER BY ') 357 where = cls._prefix_with(where, 'WHERE ') 358 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s ' 359 '%(where)s %(order_by)s' % {'table' : cls._table_name, 360 'joins' : joins, 361 'where' : where, 362 'order_by' : order_by}) 363 rows = _db.execute(query, params) 364 return rows 365 366 @classmethod 367 def fetch(cls, where='', params=(), joins='', order_by=''): 368 """ 369 Construct instances of our class based on the given database query. 370 371 @yields One class instance for each row fetched. 372 """ 373 rows = cls.fetch_rows(where=where, params=params, joins=joins, 374 order_by=order_by) 375 return [cls(id=row[0], row=row) for row in rows] 376 377 378 class IneligibleHostQueue(DBObject): 379 _table_name = 'afe_ineligible_host_queues' 380 _fields = ('id', 'job_id', 'host_id') 381 382 383 class AtomicGroup(DBObject): 384 _table_name = 'afe_atomic_groups' 385 _fields = ('id', 'name', 'description', 'max_number_of_machines', 386 'invalid') 387 388 389 class Label(DBObject): 390 _table_name = 'afe_labels' 391 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid', 392 'only_if_needed', 'atomic_group_id') 393 394 395 def __repr__(self): 396 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % ( 397 self.name, self.id, self.atomic_group_id) 398 399 400 class Host(DBObject): 401 _table_name = 'afe_hosts' 402 # TODO(ayatane): synch_id is not used, remove after fixing DB. 403 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status', 404 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty', 405 'leased', 'shard_id', 'lock_reason') 406 407 408 def set_status(self,status): 409 logging.info('%s -> %s', self.hostname, status) 410 self.update_field('status',status) 411 412 413 def platform_and_labels(self): 414 """ 415 Returns a tuple (platform_name, list_of_all_label_names). 416 """ 417 rows = _db.execute(""" 418 SELECT afe_labels.name, afe_labels.platform 419 FROM afe_labels 420 INNER JOIN afe_hosts_labels ON 421 afe_labels.id = afe_hosts_labels.label_id 422 WHERE afe_hosts_labels.host_id = %s 423 ORDER BY afe_labels.name 424 """, (self.id,)) 425 platform = None 426 all_labels = [] 427 for label_name, is_platform in rows: 428 if is_platform: 429 platform = label_name 430 all_labels.append(label_name) 431 return platform, all_labels 432 433 434 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE) 435 436 437 @classmethod 438 def cmp_for_sort(cls, a, b): 439 """ 440 A comparison function for sorting Host objects by hostname. 441 442 This strips any trailing numeric digits, ignores leading 0s and 443 compares hostnames by the leading name and the trailing digits as a 444 number. If both hostnames do not match this pattern, they are simply 445 compared as lower case strings. 446 447 Example of how hostnames will be sorted: 448 449 alice, host1, host2, host09, host010, host10, host11, yolkfolk 450 451 This hopefully satisfy most people's hostname sorting needs regardless 452 of their exact naming schemes. Nobody sane should have both a host10 453 and host010 (but the algorithm works regardless). 454 """ 455 lower_a = a.hostname.lower() 456 lower_b = b.hostname.lower() 457 match_a = cls._ALPHANUM_HOST_RE.match(lower_a) 458 match_b = cls._ALPHANUM_HOST_RE.match(lower_b) 459 if match_a and match_b: 460 name_a, number_a_str = match_a.groups() 461 name_b, number_b_str = match_b.groups() 462 number_a = int(number_a_str.lstrip('0')) 463 number_b = int(number_b_str.lstrip('0')) 464 result = cmp((name_a, number_a), (name_b, number_b)) 465 if result == 0 and lower_a != lower_b: 466 # If they compared equal above but the lower case names are 467 # indeed different, don't report equality. abc012 != abc12. 468 return cmp(lower_a, lower_b) 469 return result 470 else: 471 return cmp(lower_a, lower_b) 472 473 474 class HostQueueEntry(DBObject): 475 _table_name = 'afe_host_queue_entries' 476 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host', 477 'active', 'complete', 'deleted', 'execution_subdir', 478 'atomic_group_id', 'aborted', 'started_on', 'finished_on') 479 480 _COMPLETION_COUNT_METRIC = metrics.Counter( 481 'chromeos/autotest/scheduler/hqe_completion_count') 482 483 def __init__(self, id=None, row=None, job_row=None, **kwargs): 484 """ 485 @param id: ID field from afe_host_queue_entries table. 486 Either id or row should be specified for initialization. 487 @param row: The DB row for a particular HostQueueEntry. 488 Either id or row should be specified for initialization. 489 @param job_row: The DB row for the job of this HostQueueEntry. 490 """ 491 assert id or row 492 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs) 493 self.job = Job(self.job_id, row=job_row) 494 495 if self.host_id: 496 self.host = rdb_lib.get_hosts([self.host_id])[0] 497 self.host.dbg_str = self.get_dbg_str() 498 self.host.metadata = get_job_metadata(self.job) 499 else: 500 self.host = None 501 502 503 @classmethod 504 def clone(cls, template): 505 """ 506 Creates a new row using the values from a template instance. 507 508 The new instance will not exist in the database or have a valid 509 id attribute until its save() method is called. 510 """ 511 assert isinstance(template, cls) 512 new_row = [getattr(template, field) for field in cls._fields] 513 clone = cls(row=new_row, new_record=True) 514 clone.id = None 515 return clone 516 517 518 @classmethod 519 def fetch(cls, where='', params=(), joins='', order_by=''): 520 """ 521 Construct instances of our class based on the given database query. 522 523 @yields One class instance for each row fetched. 524 """ 525 # Override the original fetch method to pre-fetch the jobs from the DB 526 # in order to prevent each HQE making separate DB queries. 527 rows = cls.fetch_rows(where=where, params=params, joins=joins, 528 order_by=order_by) 529 if len(rows) <= 1: 530 return [cls(id=row[0], row=row) for row in rows] 531 532 job_params = ', '.join([str(row[1]) for row in rows]) 533 job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params)) 534 # Create a Job_id to Job_row match dictionary to match the HQE 535 # to its corresponding job. 536 job_dict = {job_row[0]: job_row for job_row in job_rows} 537 return [cls(id=row[0], row=row, job_row=job_dict.get(row[1])) 538 for row in rows] 539 540 541 def _view_job_url(self): 542 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id) 543 544 545 def get_labels(self): 546 """ 547 Get all labels associated with this host queue entry (either via the 548 meta_host or as a job dependency label). The labels yielded are not 549 guaranteed to be unique. 550 551 @yields Label instances associated with this host_queue_entry. 552 """ 553 if self.meta_host: 554 yield Label(id=self.meta_host, always_query=False) 555 labels = Label.fetch( 556 joins="JOIN afe_jobs_dependency_labels AS deps " 557 "ON (afe_labels.id = deps.label_id)", 558 where="deps.job_id = %d" % self.job.id) 559 for label in labels: 560 yield label 561 562 563 def set_host(self, host): 564 if host: 565 logging.info('Assigning host %s to entry %s', host.hostname, self) 566 self.update_field('host_id', host.id) 567 self.block_host(host.id) 568 else: 569 logging.info('Releasing host from %s', self) 570 self.unblock_host(self.host.id) 571 self.update_field('host_id', None) 572 573 self.host = host 574 575 576 def block_host(self, host_id): 577 logging.info("creating block %s/%s", self.job.id, host_id) 578 row = [0, self.job.id, host_id] 579 block = IneligibleHostQueue(row=row, new_record=True) 580 block.save() 581 582 583 def unblock_host(self, host_id): 584 logging.info("removing block %s/%s", self.job.id, host_id) 585 blocks = IneligibleHostQueue.fetch( 586 'job_id=%d and host_id=%d' % (self.job.id, host_id)) 587 for block in blocks: 588 block.delete() 589 590 591 def set_execution_subdir(self, subdir=None): 592 if subdir is None: 593 assert self.host 594 subdir = self.host.hostname 595 self.update_field('execution_subdir', subdir) 596 597 598 def _get_hostname(self): 599 if self.host: 600 return self.host.hostname 601 return 'no host' 602 603 604 def get_dbg_str(self): 605 """Get a debug string to identify this host. 606 607 @return: A string containing the hqe and job id. 608 """ 609 try: 610 return 'HQE: %s, for job: %s' % (self.id, self.job_id) 611 except AttributeError as e: 612 return 'HQE has not been initialized yet: %s' % e 613 614 615 def __str__(self): 616 flags = [] 617 if self.active: 618 flags.append('active') 619 if self.complete: 620 flags.append('complete') 621 if self.deleted: 622 flags.append('deleted') 623 if self.aborted: 624 flags.append('aborted') 625 flags_str = ','.join(flags) 626 if flags_str: 627 flags_str = ' [%s]' % flags_str 628 return ("%s and host: %s has status:%s%s" % 629 (self.get_dbg_str(), self._get_hostname(), self.status, 630 flags_str)) 631 632 633 def set_status(self, status): 634 logging.info("%s -> %s", self, status) 635 636 self.update_field('status', status) 637 638 active = (status in models.HostQueueEntry.ACTIVE_STATUSES) 639 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES) 640 assert not (active and complete) 641 642 self.update_field('active', active) 643 644 # The ordering of these operations is important. Once we set the 645 # complete bit this job will become indistinguishable from all 646 # the other complete jobs, unless we first set shard_id to NULL 647 # to signal to the shard_client that we need to upload it. However, 648 # we can only set both these after we've updated finished_on etc 649 # within _on_complete or the job will get synced in an intermediate 650 # state. This means that if someone sigkills the scheduler between 651 # setting finished_on and complete, we will have inconsistent jobs. 652 # This should be fine, because nothing critical checks finished_on, 653 # and the scheduler should never be killed mid-tick. 654 if complete: 655 self._on_complete(status) 656 self._email_on_job_complete() 657 658 self.update_field('complete', complete) 659 660 should_email_status = (status.lower() in _notify_email_statuses or 661 'all' in _notify_email_statuses) 662 if should_email_status: 663 self._email_on_status(status) 664 logging.debug('HQE Set Status Complete') 665 666 667 def _on_complete(self, status): 668 metric_fields = {'status': status.lower()} 669 if self.host: 670 metric_fields['board'] = self.host.board or '' 671 if len(self.host.pools) == 1: 672 metric_fields['pool'] = self.host.pools[0] 673 else: 674 metric_fields['pool'] = 'MULTIPLE' 675 else: 676 metric_fields['board'] = 'NO_HOST' 677 metric_fields['pool'] = 'NO_HOST' 678 self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields) 679 if status is not models.HostQueueEntry.Status.ABORTED: 680 self.job.stop_if_necessary() 681 if self.started_on: 682 self.set_finished_on_now() 683 if self.job.shard_id is not None: 684 # If shard_id is None, the job will be synced back to the master 685 self.job.update_field('shard_id', None) 686 if not self.execution_subdir: 687 return 688 # unregister any possible pidfiles associated with this queue entry 689 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES: 690 pidfile_id = _drone_manager.get_pidfile_id_from( 691 self.execution_path(), pidfile_name=pidfile_name) 692 _drone_manager.unregister_pidfile(pidfile_id) 693 694 695 def _get_status_email_contents(self, status, summary=None, hostname=None): 696 """ 697 Gather info for the status notification e-mails. 698 699 If needed, we could start using the Django templating engine to create 700 the subject and the e-mail body, but that doesn't seem necessary right 701 now. 702 703 @param status: Job status text. Mandatory. 704 @param summary: Job summary text. Optional. 705 @param hostname: A hostname for the job. Optional. 706 707 @return: Tuple (subject, body) for the notification e-mail. 708 """ 709 job_stats = Job(id=self.job.id).get_execution_details() 710 711 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' % 712 (self.job.id, self.job.name, status)) 713 714 if hostname is not None: 715 subject += '| Hostname: %s ' % hostname 716 717 if status not in ["1 Failed", "Failed"]: 718 subject += '| Success Rate: %.2f %%' % job_stats['success_rate'] 719 720 body = "Job ID: %s\n" % self.job.id 721 body += "Job name: %s\n" % self.job.name 722 if hostname is not None: 723 body += "Host: %s\n" % hostname 724 if summary is not None: 725 body += "Summary: %s\n" % summary 726 body += "Status: %s\n" % status 727 body += "Results interface URL: %s\n" % self._view_job_url() 728 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time'] 729 if int(job_stats['total_executed']) > 0: 730 body += "User tests executed: %s\n" % job_stats['total_executed'] 731 body += "User tests passed: %s\n" % job_stats['total_passed'] 732 body += "User tests failed: %s\n" % job_stats['total_failed'] 733 body += ("User tests success rate: %.2f %%\n" % 734 job_stats['success_rate']) 735 736 if job_stats['failed_rows']: 737 body += "Failures:\n" 738 body += job_stats['failed_rows'] 739 740 return subject, body 741 742 743 def _email_on_status(self, status): 744 hostname = self._get_hostname() 745 subject, body = self._get_status_email_contents(status, None, hostname) 746 email_manager.manager.send_email(self.job.email_list, subject, body) 747 748 749 def _email_on_job_complete(self): 750 if not self.job.is_finished(): 751 return 752 753 summary = [] 754 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id) 755 for queue_entry in hosts_queue: 756 summary.append("Host: %s Status: %s" % 757 (queue_entry._get_hostname(), 758 queue_entry.status)) 759 760 summary = "\n".join(summary) 761 status_counts = models.Job.objects.get_status_counts( 762 [self.job.id])[self.job.id] 763 status = ', '.join('%d %s' % (count, status) for status, count 764 in status_counts.iteritems()) 765 766 subject, body = self._get_status_email_contents(status, summary, None) 767 email_manager.manager.send_email(self.job.email_list, subject, body) 768 769 770 def schedule_pre_job_tasks(self): 771 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s", 772 self.job.name, self.meta_host, self.atomic_group_id, 773 self.job.id, self.id, self.host.hostname, self.status) 774 775 self._do_schedule_pre_job_tasks() 776 777 778 def _do_schedule_pre_job_tasks(self): 779 self.job.schedule_pre_job_tasks(queue_entry=self) 780 781 782 def requeue(self): 783 assert self.host 784 self.set_status(models.HostQueueEntry.Status.QUEUED) 785 self.update_field('started_on', None) 786 self.update_field('finished_on', None) 787 # verify/cleanup failure sets the execution subdir, so reset it here 788 self.set_execution_subdir('') 789 if self.meta_host: 790 self.set_host(None) 791 792 793 @property 794 def aborted_by(self): 795 self._load_abort_info() 796 return self._aborted_by 797 798 799 @property 800 def aborted_on(self): 801 self._load_abort_info() 802 return self._aborted_on 803 804 805 def _load_abort_info(self): 806 """ Fetch info about who aborted the job. """ 807 if hasattr(self, "_aborted_by"): 808 return 809 rows = _db.execute(""" 810 SELECT afe_users.login, 811 afe_aborted_host_queue_entries.aborted_on 812 FROM afe_aborted_host_queue_entries 813 INNER JOIN afe_users 814 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id 815 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s 816 """, (self.id,)) 817 if rows: 818 self._aborted_by, self._aborted_on = rows[0] 819 else: 820 self._aborted_by = self._aborted_on = None 821 822 823 def on_pending(self): 824 """ 825 Called when an entry in a synchronous job has passed verify. If the 826 job is ready to run, sets the entries to STARTING. Otherwise, it leaves 827 them in PENDING. 828 """ 829 self.set_status(models.HostQueueEntry.Status.PENDING) 830 self.host.set_status(models.Host.Status.PENDING) 831 832 # Some debug code here: sends an email if an asynchronous job does not 833 # immediately enter Starting. 834 # TODO: Remove this once we figure out why asynchronous jobs are getting 835 # stuck in Pending. 836 self.job.run_if_ready(queue_entry=self) 837 if (self.job.synch_count == 1 and 838 self.status == models.HostQueueEntry.Status.PENDING): 839 subject = 'Job %s (id %s)' % (self.job.name, self.job.id) 840 message = 'Asynchronous job stuck in Pending' 841 email_manager.manager.enqueue_notify_email(subject, message) 842 843 844 def abort(self, dispatcher): 845 assert self.aborted and not self.complete 846 847 Status = models.HostQueueEntry.Status 848 if self.status in {Status.GATHERING, Status.PARSING}: 849 # do nothing; post-job tasks will finish and then mark this entry 850 # with status "Aborted" and take care of the host 851 return 852 853 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}: 854 # If hqe is in any of these status, it should not have any 855 # unfinished agent before it can be aborted. 856 agents = dispatcher.get_agents_for_entry(self) 857 # Agent with finished task can be left behind. This is added to 858 # handle the special case of aborting hostless job in STARTING 859 # status, in which the agent has only a HostlessQueueTask 860 # associated. The finished HostlessQueueTask will be cleaned up in 861 # the next tick, so it's safe to leave the agent there. Without 862 # filtering out finished agent, HQE abort won't be able to proceed. 863 assert all([agent.is_done() for agent in agents]) 864 # If hqe is still in STARTING status, it may not have assigned a 865 # host yet. 866 if self.host: 867 self.host.set_status(models.Host.Status.READY) 868 elif (self.status == Status.VERIFYING or 869 self.status == Status.RESETTING): 870 models.SpecialTask.objects.create( 871 task=models.SpecialTask.Task.CLEANUP, 872 host=models.Host.objects.get(id=self.host.id), 873 requested_by=self.job.owner_model()) 874 elif self.status == Status.PROVISIONING: 875 models.SpecialTask.objects.create( 876 task=models.SpecialTask.Task.REPAIR, 877 host=models.Host.objects.get(id=self.host.id), 878 requested_by=self.job.owner_model()) 879 880 self.set_status(Status.ABORTED) 881 882 883 def execution_tag(self): 884 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE ' 885 'complete!=1 AND execution_subdir="" AND ' 886 'status!="Queued";') 887 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET ' 888 'status="Aborted" WHERE id=%s;') 889 try: 890 assert self.execution_subdir 891 except AssertionError: 892 # TODO(scottz): Remove temporary fix/info gathering pathway for 893 # crosbug.com/31595 once issue is root caused. 894 logging.error('No execution_subdir for host queue id:%s.', self.id) 895 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES) 896 for row in _db.execute(SQL_SUSPECT_ENTRIES): 897 logging.error(row) 898 logging.error('====DB DEBUG====\n') 899 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id 900 logging.error('EXECUTING: %s', fix_query) 901 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id) 902 raise AssertionError(('self.execution_subdir not found. ' 903 'See log for details.')) 904 905 return "%s/%s" % (self.job.tag(), self.execution_subdir) 906 907 908 def execution_path(self): 909 return self.execution_tag() 910 911 912 def set_started_on_now(self): 913 self.update_field('started_on', datetime.datetime.now()) 914 915 916 def set_finished_on_now(self): 917 self.update_field('finished_on', datetime.datetime.now()) 918 919 920 def is_hostless(self): 921 return (self.host_id is None 922 and self.meta_host is None) 923 924 925 class Job(DBObject): 926 _table_name = 'afe_jobs' 927 _fields = ('id', 'owner', 'name', 'priority', 'control_file', 928 'control_type', 'created_on', 'synch_count', 'timeout', 929 'run_verify', 'email_list', 'reboot_before', 'reboot_after', 930 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id', 931 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id', 932 'test_retry', 'run_reset', 'timeout_mins', 'shard_id', 933 'require_ssp') 934 935 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on 936 # all status='Pending' atomic group HQEs incase a delay was running when the 937 # scheduler was restarted and no more hosts ever successfully exit Verify. 938 939 def __init__(self, id=None, row=None, **kwargs): 940 assert id or row 941 super(Job, self).__init__(id=id, row=row, **kwargs) 942 self._owner_model = None # caches model instance of owner 943 self.update_image_path = None # path of OS image to install 944 945 946 def model(self): 947 return models.Job.objects.get(id=self.id) 948 949 950 def owner_model(self): 951 # work around the fact that the Job owner field is a string, not a 952 # foreign key 953 if not self._owner_model: 954 self._owner_model = models.User.objects.get(login=self.owner) 955 return self._owner_model 956 957 958 def tag(self): 959 return "%s-%s" % (self.id, self.owner) 960 961 962 def get_execution_details(self): 963 """ 964 Get test execution details for this job. 965 966 @return: Dictionary with test execution details 967 """ 968 def _find_test_jobs(rows): 969 """ 970 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.* 971 Those are autotest 'internal job' tests, so they should not be 972 counted when evaluating the test stats. 973 974 @param rows: List of rows (matrix) with database results. 975 """ 976 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]') 977 n_test_jobs = 0 978 for r in rows: 979 test_name = r[0] 980 if job_test_pattern.match(test_name): 981 n_test_jobs += 1 982 983 return n_test_jobs 984 985 stats = {} 986 987 rows = _db.execute(""" 988 SELECT t.test, s.word, t.reason 989 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s 990 WHERE t.job_idx = j.job_idx 991 AND s.status_idx = t.status 992 AND j.afe_job_id = %s 993 ORDER BY t.reason 994 """ % self.id) 995 996 failed_rows = [r for r in rows if not r[1] == 'GOOD'] 997 998 n_test_jobs = _find_test_jobs(rows) 999 n_test_jobs_failed = _find_test_jobs(failed_rows) 1000 1001 total_executed = len(rows) - n_test_jobs 1002 total_failed = len(failed_rows) - n_test_jobs_failed 1003 1004 if total_executed > 0: 1005 success_rate = 100 - ((total_failed / float(total_executed)) * 100) 1006 else: 1007 success_rate = 0 1008 1009 stats['total_executed'] = total_executed 1010 stats['total_failed'] = total_failed 1011 stats['total_passed'] = total_executed - total_failed 1012 stats['success_rate'] = success_rate 1013 1014 status_header = ("Test Name", "Status", "Reason") 1015 if failed_rows: 1016 stats['failed_rows'] = utils.matrix_to_string(failed_rows, 1017 status_header) 1018 else: 1019 stats['failed_rows'] = '' 1020 1021 time_row = _db.execute(""" 1022 SELECT started_time, finished_time 1023 FROM tko_jobs 1024 WHERE afe_job_id = %s 1025 """ % self.id) 1026 1027 if time_row: 1028 t_begin, t_end = time_row[0] 1029 try: 1030 delta = t_end - t_begin 1031 minutes, seconds = divmod(delta.seconds, 60) 1032 hours, minutes = divmod(minutes, 60) 1033 stats['execution_time'] = ("%02d:%02d:%02d" % 1034 (hours, minutes, seconds)) 1035 # One of t_end or t_begin are None 1036 except TypeError: 1037 stats['execution_time'] = '(could not determine)' 1038 else: 1039 stats['execution_time'] = '(none)' 1040 1041 return stats 1042 1043 1044 def keyval_dict(self): 1045 return self.model().keyval_dict() 1046 1047 1048 def _pending_count(self): 1049 """The number of HostQueueEntries for this job in the Pending state.""" 1050 pending_entries = models.HostQueueEntry.objects.filter( 1051 job=self.id, status=models.HostQueueEntry.Status.PENDING) 1052 return pending_entries.count() 1053 1054 1055 def is_ready(self): 1056 pending_count = self._pending_count() 1057 ready = (pending_count >= self.synch_count) 1058 1059 if not ready: 1060 logging.info( 1061 'Job %s not ready: %s pending, %s required ', 1062 self, pending_count, self.synch_count) 1063 1064 return ready 1065 1066 1067 def num_machines(self, clause = None): 1068 sql = "job_id=%s" % self.id 1069 if clause: 1070 sql += " AND (%s)" % clause 1071 return self.count(sql, table='afe_host_queue_entries') 1072 1073 1074 def num_queued(self): 1075 return self.num_machines('not complete') 1076 1077 1078 def num_active(self): 1079 return self.num_machines('active') 1080 1081 1082 def num_complete(self): 1083 return self.num_machines('complete') 1084 1085 1086 def is_finished(self): 1087 return self.num_complete() == self.num_machines() 1088 1089 1090 def _not_yet_run_entries(self, include_active=True): 1091 if include_active: 1092 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES) 1093 else: 1094 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES) 1095 return models.HostQueueEntry.objects.filter(job=self.id, 1096 status__in=statuses) 1097 1098 1099 def _stop_all_entries(self): 1100 """Stops the job's inactive pre-job HQEs.""" 1101 entries_to_stop = self._not_yet_run_entries( 1102 include_active=False) 1103 for child_entry in entries_to_stop: 1104 assert not child_entry.complete, ( 1105 '%s status=%s, active=%s, complete=%s' % 1106 (child_entry.id, child_entry.status, child_entry.active, 1107 child_entry.complete)) 1108 if child_entry.status == models.HostQueueEntry.Status.PENDING: 1109 child_entry.host.status = models.Host.Status.READY 1110 child_entry.host.save() 1111 child_entry.status = models.HostQueueEntry.Status.STOPPED 1112 child_entry.save() 1113 1114 1115 def stop_if_necessary(self): 1116 not_yet_run = self._not_yet_run_entries() 1117 if not_yet_run.count() < self.synch_count: 1118 self._stop_all_entries() 1119 1120 1121 def _next_group_name(self): 1122 """@returns a directory name to use for the next host group results.""" 1123 group_name = '' 1124 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name)) 1125 query = models.HostQueueEntry.objects.filter( 1126 job=self.id).values('execution_subdir').distinct() 1127 subdirs = (entry['execution_subdir'] for entry in query) 1128 group_matches = (group_count_re.match(subdir) for subdir in subdirs) 1129 ids = [int(match.group(1)) for match in group_matches if match] 1130 if ids: 1131 next_id = max(ids) + 1 1132 else: 1133 next_id = 0 1134 return '%sgroup%d' % (group_name, next_id) 1135 1136 1137 def get_group_entries(self, queue_entry_from_group): 1138 """ 1139 @param queue_entry_from_group: A HostQueueEntry instance to find other 1140 group entries on this job for. 1141 1142 @returns A list of HostQueueEntry objects all executing this job as 1143 part of the same group as the one supplied (having the same 1144 execution_subdir). 1145 """ 1146 execution_subdir = queue_entry_from_group.execution_subdir 1147 return list(HostQueueEntry.fetch( 1148 where='job_id=%s AND execution_subdir=%s', 1149 params=(self.id, execution_subdir))) 1150 1151 1152 def _should_run_cleanup(self, queue_entry): 1153 if self.reboot_before == model_attributes.RebootBefore.ALWAYS: 1154 return True 1155 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY: 1156 return queue_entry.host.dirty 1157 return False 1158 1159 1160 def _should_run_verify(self, queue_entry): 1161 do_not_verify = (queue_entry.host.protection == 1162 host_protections.Protection.DO_NOT_VERIFY) 1163 if do_not_verify: 1164 return False 1165 # If RebootBefore is set to NEVER, then we won't run reset because 1166 # we can't cleanup, so we need to weaken a Reset into a Verify. 1167 weaker_reset = (self.run_reset and 1168 self.reboot_before == model_attributes.RebootBefore.NEVER) 1169 return self.run_verify or weaker_reset 1170 1171 1172 def _should_run_reset(self, queue_entry): 1173 can_verify = (queue_entry.host.protection != 1174 host_protections.Protection.DO_NOT_VERIFY) 1175 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER 1176 return (can_reboot and can_verify and (self.run_reset or 1177 (self._should_run_cleanup(queue_entry) and 1178 self._should_run_verify(queue_entry)))) 1179 1180 1181 def _should_run_provision(self, queue_entry): 1182 """ 1183 Determine if the queue_entry needs to have a provision task run before 1184 it to provision queue_entry.host. 1185 1186 @param queue_entry: The host queue entry in question. 1187 @returns: True if we should schedule a provision task, False otherwise. 1188 1189 """ 1190 # If we get to this point, it means that the scheduler has already 1191 # vetted that all the unprovisionable labels match, so we can just 1192 # find all labels on the job that aren't on the host to get the list 1193 # of what we need to provision. (See the scheduling logic in 1194 # host_scheduler.py:is_host_eligable_for_job() where we discard all 1195 # actionable labels when assigning jobs to hosts.) 1196 job_labels = {x.name for x in queue_entry.get_labels()} 1197 # Skip provision if `skip_provision` is listed in the job labels. 1198 if provision.SKIP_PROVISION in job_labels: 1199 return False 1200 _, host_labels = queue_entry.host.platform_and_labels() 1201 # If there are any labels on the job that are not on the host and they 1202 # are labels that provisioning knows how to change, then that means 1203 # there is provisioning work to do. If there's no provisioning work to 1204 # do, then obviously we have no reason to schedule a provision task! 1205 diff = job_labels - set(host_labels) 1206 if any([provision.Provision.acts_on(x) for x in diff]): 1207 return True 1208 return False 1209 1210 1211 def _queue_special_task(self, queue_entry, task): 1212 """ 1213 Create a special task and associate it with a host queue entry. 1214 1215 @param queue_entry: The queue entry this special task should be 1216 associated with. 1217 @param task: One of the members of the enum models.SpecialTask.Task. 1218 @returns: None 1219 1220 """ 1221 models.SpecialTask.objects.create( 1222 host=models.Host.objects.get(id=queue_entry.host_id), 1223 queue_entry=queue_entry, task=task) 1224 1225 1226 def schedule_pre_job_tasks(self, queue_entry): 1227 """ 1228 Queue all of the special tasks that need to be run before a host 1229 queue entry may run. 1230 1231 If no special taskes need to be scheduled, then |on_pending| will be 1232 called directly. 1233 1234 @returns None 1235 1236 """ 1237 task_queued = False 1238 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id) 1239 1240 if self._should_run_provision(queue_entry): 1241 self._queue_special_task(hqe_model, 1242 models.SpecialTask.Task.PROVISION) 1243 task_queued = True 1244 elif self._should_run_reset(queue_entry): 1245 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET) 1246 task_queued = True 1247 else: 1248 if self._should_run_cleanup(queue_entry): 1249 self._queue_special_task(hqe_model, 1250 models.SpecialTask.Task.CLEANUP) 1251 task_queued = True 1252 if self._should_run_verify(queue_entry): 1253 self._queue_special_task(hqe_model, 1254 models.SpecialTask.Task.VERIFY) 1255 task_queued = True 1256 1257 if not task_queued: 1258 queue_entry.on_pending() 1259 1260 1261 def _assign_new_group(self, queue_entries): 1262 if len(queue_entries) == 1: 1263 group_subdir_name = queue_entries[0].host.hostname 1264 else: 1265 group_subdir_name = self._next_group_name() 1266 logging.info('Running synchronous job %d hosts %s as %s', 1267 self.id, [entry.host.hostname for entry in queue_entries], 1268 group_subdir_name) 1269 1270 for queue_entry in queue_entries: 1271 queue_entry.set_execution_subdir(group_subdir_name) 1272 1273 1274 def _choose_group_to_run(self, include_queue_entry): 1275 """ 1276 @returns A tuple containing a list of HostQueueEntry instances to be 1277 used to run this Job, a string group name to suggest giving 1278 to this job in the results database. 1279 """ 1280 chosen_entries = [include_queue_entry] 1281 num_entries_wanted = self.synch_count 1282 num_entries_wanted -= len(chosen_entries) 1283 1284 if num_entries_wanted > 0: 1285 where_clause = 'job_id = %s AND status = "Pending" AND id != %s' 1286 pending_entries = list(HostQueueEntry.fetch( 1287 where=where_clause, 1288 params=(self.id, include_queue_entry.id))) 1289 1290 # Sort the chosen hosts by hostname before slicing. 1291 def cmp_queue_entries_by_hostname(entry_a, entry_b): 1292 return Host.cmp_for_sort(entry_a.host, entry_b.host) 1293 pending_entries.sort(cmp=cmp_queue_entries_by_hostname) 1294 chosen_entries += pending_entries[:num_entries_wanted] 1295 1296 # Sanity check. We'll only ever be called if this can be met. 1297 if len(chosen_entries) < self.synch_count: 1298 message = ('job %s got less than %s chosen entries: %s' % ( 1299 self.id, self.synch_count, chosen_entries)) 1300 logging.error(message) 1301 email_manager.manager.enqueue_notify_email( 1302 'Job not started, too few chosen entries', message) 1303 return [] 1304 1305 self._assign_new_group(chosen_entries) 1306 return chosen_entries 1307 1308 1309 def run_if_ready(self, queue_entry): 1310 """ 1311 Run this job by kicking its HQEs into status='Starting' if enough 1312 hosts are ready for it to run. 1313 1314 Cleans up by kicking HQEs into status='Stopped' if this Job is not 1315 ready to run. 1316 """ 1317 if not self.is_ready(): 1318 self.stop_if_necessary() 1319 else: 1320 self.run(queue_entry) 1321 1322 1323 def request_abort(self): 1324 """Request that this Job be aborted on the next scheduler cycle.""" 1325 self.model().abort() 1326 1327 1328 def run(self, queue_entry): 1329 """ 1330 @param queue_entry: The HostQueueEntry instance calling this method. 1331 """ 1332 queue_entries = self._choose_group_to_run(queue_entry) 1333 if queue_entries: 1334 self._finish_run(queue_entries) 1335 1336 1337 def _finish_run(self, queue_entries): 1338 for queue_entry in queue_entries: 1339 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 1340 1341 1342 def __str__(self): 1343 return '%s-%s' % (self.id, self.owner) 1344