1 # pylint: disable-msg=C0111 2 3 """Database model classes for the scheduler. 4 5 Contains model classes abstracting the various DB tables used by the scheduler. 6 These overlap the Django models in basic functionality, but were written before 7 the Django models existed and have not yet been phased out. Some of them 8 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic 9 which would probably be ill-suited for inclusion in the general Django model 10 classes. 11 12 Globals: 13 _notify_email_statuses: list of HQE statuses. each time a single HQE reaches 14 one of these statuses, an email will be sent to the job's email_list. 15 comes from global_config. 16 _base_url: URL to the local AFE server, used to construct URLs for emails. 17 _db: DatabaseConnection for this module. 18 _drone_manager: reference to global DroneManager instance. 19 """ 20 21 import datetime, itertools, logging, os, re, sys, time, weakref 22 from autotest_lib.client.common_lib import control_data 23 from autotest_lib.client.common_lib import global_config, host_protections 24 from autotest_lib.client.common_lib import time_utils 25 from autotest_lib.client.common_lib import utils 26 from autotest_lib.client.common_lib.cros.graphite import autotest_es 27 from autotest_lib.client.common_lib.cros.graphite import autotest_stats 28 from autotest_lib.frontend.afe import models, model_attributes 29 from autotest_lib.scheduler import drone_manager, email_manager 30 from autotest_lib.scheduler import rdb_lib 31 from autotest_lib.scheduler import scheduler_config 32 from autotest_lib.scheduler import scheduler_lib 33 from autotest_lib.server.cros import provision 34 35 36 _notify_email_statuses = [] 37 _base_url = None 38 39 _db = None 40 _drone_manager = None 41 42 def initialize(): 43 global _db 44 _db = scheduler_lib.ConnectionManager().get_connection() 45 46 notify_statuses_list = global_config.global_config.get_config_value( 47 scheduler_config.CONFIG_SECTION, "notify_email_statuses", 48 default='') 49 global _notify_email_statuses 50 _notify_email_statuses = [status for status in 51 re.split(r'[\s,;:]', notify_statuses_list.lower()) 52 if status] 53 54 # AUTOTEST_WEB.base_url is still a supported config option as some people 55 # may wish to override the entire url. 56 global _base_url 57 config_base_url = global_config.global_config.get_config_value( 58 scheduler_config.CONFIG_SECTION, 'base_url', default='') 59 if config_base_url: 60 _base_url = config_base_url 61 else: 62 # For the common case of everything running on a single server you 63 # can just set the hostname in a single place in the config file. 64 server_name = global_config.global_config.get_config_value( 65 'SERVER', 'hostname') 66 if not server_name: 67 logging.critical('[SERVER] hostname missing from the config file.') 68 sys.exit(1) 69 _base_url = 'http://%s/afe/' % server_name 70 71 initialize_globals() 72 73 74 def initialize_globals(): 75 global _drone_manager 76 _drone_manager = drone_manager.instance() 77 78 79 def get_job_metadata(job): 80 """Get a dictionary of the job information. 81 82 The return value is a dictionary that includes job information like id, 83 name and parent job information. The value will be stored in metadata 84 database. 85 86 @param job: A Job object. 87 @return: A dictionary containing the job id, owner and name. 88 """ 89 if not job: 90 logging.error('Job is None, no metadata returned.') 91 return {} 92 try: 93 return {'job_id': job.id, 94 'owner': job.owner, 95 'job_name': job.name, 96 'parent_job_id': job.parent_job_id} 97 except AttributeError as e: 98 logging.error('Job has missing attribute: %s', e) 99 return {} 100 101 102 class DelayedCallTask(object): 103 """ 104 A task object like AgentTask for an Agent to run that waits for the 105 specified amount of time to have elapsed before calling the supplied 106 callback once and finishing. If the callback returns anything, it is 107 assumed to be a new Agent instance and will be added to the dispatcher. 108 109 @attribute end_time: The absolute posix time after which this task will 110 call its callback when it is polled and be finished. 111 112 Also has all attributes required by the Agent class. 113 """ 114 def __init__(self, delay_seconds, callback, now_func=None): 115 """ 116 @param delay_seconds: The delay in seconds from now that this task 117 will call the supplied callback and be done. 118 @param callback: A callable to be called by this task once after at 119 least delay_seconds time has elapsed. It must return None 120 or a new Agent instance. 121 @param now_func: A time.time like function. Default: time.time. 122 Used for testing. 123 """ 124 assert delay_seconds > 0 125 assert callable(callback) 126 if not now_func: 127 now_func = time.time 128 self._now_func = now_func 129 self._callback = callback 130 131 self.end_time = self._now_func() + delay_seconds 132 133 # These attributes are required by Agent. 134 self.aborted = False 135 self.host_ids = () 136 self.success = False 137 self.queue_entry_ids = () 138 self.num_processes = 0 139 140 141 def poll(self): 142 if not self.is_done() and self._now_func() >= self.end_time: 143 self._callback() 144 self.success = True 145 146 147 def is_done(self): 148 return self.success or self.aborted 149 150 151 def abort(self): 152 self.aborted = True 153 154 155 class DBError(Exception): 156 """Raised by the DBObject constructor when its select fails.""" 157 158 159 class DBObject(object): 160 """A miniature object relational model for the database.""" 161 162 # Subclasses MUST override these: 163 _table_name = '' 164 _fields = () 165 166 # A mapping from (type, id) to the instance of the object for that 167 # particular id. This prevents us from creating new Job() and Host() 168 # instances for every HostQueueEntry object that we instantiate as 169 # multiple HQEs often share the same Job. 170 _instances_by_type_and_id = weakref.WeakValueDictionary() 171 _initialized = False 172 173 174 def __new__(cls, id=None, **kwargs): 175 """ 176 Look to see if we already have an instance for this particular type 177 and id. If so, use it instead of creating a duplicate instance. 178 """ 179 if id is not None: 180 instance = cls._instances_by_type_and_id.get((cls, id)) 181 if instance: 182 return instance 183 return super(DBObject, cls).__new__(cls, id=id, **kwargs) 184 185 186 def __init__(self, id=None, row=None, new_record=False, always_query=True): 187 assert bool(id) or bool(row) 188 if id is not None and row is not None: 189 assert id == row[0] 190 assert self._table_name, '_table_name must be defined in your class' 191 assert self._fields, '_fields must be defined in your class' 192 if not new_record: 193 if self._initialized and not always_query: 194 return # We've already been initialized. 195 if id is None: 196 id = row[0] 197 # Tell future constructors to use us instead of re-querying while 198 # this instance is still around. 199 self._instances_by_type_and_id[(type(self), id)] = self 200 201 self.__table = self._table_name 202 203 self.__new_record = new_record 204 205 if row is None: 206 row = self._fetch_row_from_db(id) 207 208 if self._initialized: 209 differences = self._compare_fields_in_row(row) 210 if differences: 211 logging.warning( 212 'initialized %s %s instance requery is updating: %s', 213 type(self), self.id, differences) 214 self._update_fields_from_row(row) 215 self._initialized = True 216 217 218 @classmethod 219 def _clear_instance_cache(cls): 220 """Used for testing, clear the internal instance cache.""" 221 cls._instances_by_type_and_id.clear() 222 223 224 def _fetch_row_from_db(self, row_id): 225 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table 226 rows = _db.execute(sql, (row_id,)) 227 if not rows: 228 raise DBError("row not found (table=%s, row id=%s)" 229 % (self.__table, row_id)) 230 return rows[0] 231 232 233 def _assert_row_length(self, row): 234 assert len(row) == len(self._fields), ( 235 "table = %s, row = %s/%d, fields = %s/%d" % ( 236 self.__table, row, len(row), self._fields, len(self._fields))) 237 238 239 def _compare_fields_in_row(self, row): 240 """ 241 Given a row as returned by a SELECT query, compare it to our existing in 242 memory fields. Fractional seconds are stripped from datetime values 243 before comparison. 244 245 @param row - A sequence of values corresponding to fields named in 246 The class attribute _fields. 247 248 @returns A dictionary listing the differences keyed by field name 249 containing tuples of (current_value, row_value). 250 """ 251 self._assert_row_length(row) 252 differences = {} 253 for field, row_value in itertools.izip(self._fields, row): 254 current_value = getattr(self, field) 255 if (isinstance(current_value, datetime.datetime) 256 and isinstance(row_value, datetime.datetime)): 257 current_value = current_value.strftime(time_utils.TIME_FMT) 258 row_value = row_value.strftime(time_utils.TIME_FMT) 259 if current_value != row_value: 260 differences[field] = (current_value, row_value) 261 return differences 262 263 264 def _update_fields_from_row(self, row): 265 """ 266 Update our field attributes using a single row returned by SELECT. 267 268 @param row - A sequence of values corresponding to fields named in 269 the class fields list. 270 """ 271 self._assert_row_length(row) 272 273 self._valid_fields = set() 274 for field, value in itertools.izip(self._fields, row): 275 setattr(self, field, value) 276 self._valid_fields.add(field) 277 278 self._valid_fields.remove('id') 279 280 281 def update_from_database(self): 282 assert self.id is not None 283 row = self._fetch_row_from_db(self.id) 284 self._update_fields_from_row(row) 285 286 287 def count(self, where, table = None): 288 if not table: 289 table = self.__table 290 291 rows = _db.execute(""" 292 SELECT count(*) FROM %s 293 WHERE %s 294 """ % (table, where)) 295 296 assert len(rows) == 1 297 298 return int(rows[0][0]) 299 300 301 def update_field(self, field, value): 302 assert field in self._valid_fields 303 304 if getattr(self, field) == value: 305 return 306 307 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field) 308 _db.execute(query, (value, self.id)) 309 310 setattr(self, field, value) 311 312 313 def save(self): 314 if self.__new_record: 315 keys = self._fields[1:] # avoid id 316 columns = ','.join([str(key) for key in keys]) 317 values = [] 318 for key in keys: 319 value = getattr(self, key) 320 if value is None: 321 values.append('NULL') 322 else: 323 values.append('"%s"' % value) 324 values_str = ','.join(values) 325 query = ('INSERT INTO %s (%s) VALUES (%s)' % 326 (self.__table, columns, values_str)) 327 _db.execute(query) 328 # Update our id to the one the database just assigned to us. 329 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0] 330 331 332 def delete(self): 333 self._instances_by_type_and_id.pop((type(self), id), None) 334 self._initialized = False 335 self._valid_fields.clear() 336 query = 'DELETE FROM %s WHERE id=%%s' % self.__table 337 _db.execute(query, (self.id,)) 338 339 340 @staticmethod 341 def _prefix_with(string, prefix): 342 if string: 343 string = prefix + string 344 return string 345 346 347 @classmethod 348 def fetch(cls, where='', params=(), joins='', order_by=''): 349 """ 350 Construct instances of our class based on the given database query. 351 352 @yields One class instance for each row fetched. 353 """ 354 order_by = cls._prefix_with(order_by, 'ORDER BY ') 355 where = cls._prefix_with(where, 'WHERE ') 356 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s ' 357 '%(where)s %(order_by)s' % {'table' : cls._table_name, 358 'joins' : joins, 359 'where' : where, 360 'order_by' : order_by}) 361 rows = _db.execute(query, params) 362 return [cls(id=row[0], row=row) for row in rows] 363 364 365 class IneligibleHostQueue(DBObject): 366 _table_name = 'afe_ineligible_host_queues' 367 _fields = ('id', 'job_id', 'host_id') 368 369 370 class AtomicGroup(DBObject): 371 _table_name = 'afe_atomic_groups' 372 _fields = ('id', 'name', 'description', 'max_number_of_machines', 373 'invalid') 374 375 376 class Label(DBObject): 377 _table_name = 'afe_labels' 378 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid', 379 'only_if_needed', 'atomic_group_id') 380 381 382 def __repr__(self): 383 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % ( 384 self.name, self.id, self.atomic_group_id) 385 386 387 class Host(DBObject): 388 _table_name = 'afe_hosts' 389 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status', 390 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty', 391 'leased', 'shard_id', 'lock_reason') 392 _timer = autotest_stats.Timer("scheduler_models.Host") 393 394 395 @_timer.decorate 396 def set_status(self,status): 397 logging.info('%s -> %s', self.hostname, status) 398 self.update_field('status',status) 399 # Noticed some time jumps after the last log message. 400 logging.debug('Host Set Status Complete') 401 402 403 def platform_and_labels(self): 404 """ 405 Returns a tuple (platform_name, list_of_all_label_names). 406 """ 407 rows = _db.execute(""" 408 SELECT afe_labels.name, afe_labels.platform 409 FROM afe_labels 410 INNER JOIN afe_hosts_labels ON 411 afe_labels.id = afe_hosts_labels.label_id 412 WHERE afe_hosts_labels.host_id = %s 413 ORDER BY afe_labels.name 414 """, (self.id,)) 415 platform = None 416 all_labels = [] 417 for label_name, is_platform in rows: 418 if is_platform: 419 platform = label_name 420 all_labels.append(label_name) 421 return platform, all_labels 422 423 424 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE) 425 426 427 @classmethod 428 def cmp_for_sort(cls, a, b): 429 """ 430 A comparison function for sorting Host objects by hostname. 431 432 This strips any trailing numeric digits, ignores leading 0s and 433 compares hostnames by the leading name and the trailing digits as a 434 number. If both hostnames do not match this pattern, they are simply 435 compared as lower case strings. 436 437 Example of how hostnames will be sorted: 438 439 alice, host1, host2, host09, host010, host10, host11, yolkfolk 440 441 This hopefully satisfy most people's hostname sorting needs regardless 442 of their exact naming schemes. Nobody sane should have both a host10 443 and host010 (but the algorithm works regardless). 444 """ 445 lower_a = a.hostname.lower() 446 lower_b = b.hostname.lower() 447 match_a = cls._ALPHANUM_HOST_RE.match(lower_a) 448 match_b = cls._ALPHANUM_HOST_RE.match(lower_b) 449 if match_a and match_b: 450 name_a, number_a_str = match_a.groups() 451 name_b, number_b_str = match_b.groups() 452 number_a = int(number_a_str.lstrip('0')) 453 number_b = int(number_b_str.lstrip('0')) 454 result = cmp((name_a, number_a), (name_b, number_b)) 455 if result == 0 and lower_a != lower_b: 456 # If they compared equal above but the lower case names are 457 # indeed different, don't report equality. abc012 != abc12. 458 return cmp(lower_a, lower_b) 459 return result 460 else: 461 return cmp(lower_a, lower_b) 462 463 464 class HostQueueEntry(DBObject): 465 _table_name = 'afe_host_queue_entries' 466 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host', 467 'active', 'complete', 'deleted', 'execution_subdir', 468 'atomic_group_id', 'aborted', 'started_on', 'finished_on') 469 _timer = autotest_stats.Timer('scheduler_models.HostQueueEntry') 470 471 472 def __init__(self, id=None, row=None, **kwargs): 473 assert id or row 474 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs) 475 self.job = Job(self.job_id) 476 477 if self.host_id: 478 self.host = rdb_lib.get_hosts([self.host_id])[0] 479 self.host.dbg_str = self.get_dbg_str() 480 self.host.metadata = get_job_metadata(self.job) 481 else: 482 self.host = None 483 484 if self.atomic_group_id: 485 self.atomic_group = AtomicGroup(self.atomic_group_id, 486 always_query=False) 487 else: 488 self.atomic_group = None 489 490 491 @classmethod 492 def clone(cls, template): 493 """ 494 Creates a new row using the values from a template instance. 495 496 The new instance will not exist in the database or have a valid 497 id attribute until its save() method is called. 498 """ 499 assert isinstance(template, cls) 500 new_row = [getattr(template, field) for field in cls._fields] 501 clone = cls(row=new_row, new_record=True) 502 clone.id = None 503 return clone 504 505 506 def _view_job_url(self): 507 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id) 508 509 510 def get_labels(self): 511 """ 512 Get all labels associated with this host queue entry (either via the 513 meta_host or as a job dependency label). The labels yielded are not 514 guaranteed to be unique. 515 516 @yields Label instances associated with this host_queue_entry. 517 """ 518 if self.meta_host: 519 yield Label(id=self.meta_host, always_query=False) 520 labels = Label.fetch( 521 joins="JOIN afe_jobs_dependency_labels AS deps " 522 "ON (afe_labels.id = deps.label_id)", 523 where="deps.job_id = %d" % self.job.id) 524 for label in labels: 525 yield label 526 527 528 def set_host(self, host): 529 if host: 530 logging.info('Assigning host %s to entry %s', host.hostname, self) 531 self.update_field('host_id', host.id) 532 self.block_host(host.id) 533 else: 534 logging.info('Releasing host from %s', self) 535 self.unblock_host(self.host.id) 536 self.update_field('host_id', None) 537 538 self.host = host 539 540 541 def block_host(self, host_id): 542 logging.info("creating block %s/%s", self.job.id, host_id) 543 row = [0, self.job.id, host_id] 544 block = IneligibleHostQueue(row=row, new_record=True) 545 block.save() 546 547 548 def unblock_host(self, host_id): 549 logging.info("removing block %s/%s", self.job.id, host_id) 550 blocks = IneligibleHostQueue.fetch( 551 'job_id=%d and host_id=%d' % (self.job.id, host_id)) 552 for block in blocks: 553 block.delete() 554 555 556 def set_execution_subdir(self, subdir=None): 557 if subdir is None: 558 assert self.host 559 subdir = self.host.hostname 560 self.update_field('execution_subdir', subdir) 561 562 563 def _get_hostname(self): 564 if self.host: 565 return self.host.hostname 566 return 'no host' 567 568 569 def get_dbg_str(self): 570 """Get a debug string to identify this host. 571 572 @return: A string containing the hqe and job id. 573 """ 574 try: 575 return 'HQE: %s, for job: %s' % (self.id, self.job_id) 576 except AttributeError as e: 577 return 'HQE has not been initialized yet: %s' % e 578 579 580 def __str__(self): 581 flags = [] 582 if self.active: 583 flags.append('active') 584 if self.complete: 585 flags.append('complete') 586 if self.deleted: 587 flags.append('deleted') 588 if self.aborted: 589 flags.append('aborted') 590 flags_str = ','.join(flags) 591 if flags_str: 592 flags_str = ' [%s]' % flags_str 593 return ("%s and host: %s has status:%s%s" % 594 (self.get_dbg_str(), self._get_hostname(), self.status, 595 flags_str)) 596 597 598 def record_state(self, type_str, state, value): 599 """Record metadata in elasticsearch. 600 601 If ES configured to use http, then we will time that http request. 602 Otherwise, it uses UDP, so we will not need to time it. 603 604 @param type_str: sets the _type field in elasticsearch db. 605 @param state: string representing what state we are recording, 606 e.g. 'status' 607 @param value: value of the state, e.g. 'verifying' 608 """ 609 metadata = { 610 'time_changed': time.time(), 611 state: value, 612 'job_id': self.job_id, 613 } 614 if self.host: 615 metadata['hostname'] = self.host.hostname 616 autotest_es.post(type_str=type_str, metadata=metadata) 617 618 619 @_timer.decorate 620 def set_status(self, status): 621 logging.info("%s -> %s", self, status) 622 623 self.update_field('status', status) 624 # Noticed some time jumps after last logging message. 625 logging.debug('Update Field Complete') 626 627 active = (status in models.HostQueueEntry.ACTIVE_STATUSES) 628 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES) 629 assert not (active and complete) 630 631 self.update_field('active', active) 632 633 # The ordering of these operations is important. Once we set the 634 # complete bit this job will become indistinguishable from all 635 # the other complete jobs, unless we first set shard_id to NULL 636 # to signal to the shard_client that we need to upload it. However, 637 # we can only set both these after we've updated finished_on etc 638 # within _on_complete or the job will get synced in an intermediate 639 # state. This means that if someone sigkills the scheduler between 640 # setting finished_on and complete, we will have inconsistent jobs. 641 # This should be fine, because nothing critical checks finished_on, 642 # and the scheduler should never be killed mid-tick. 643 if complete: 644 self._on_complete(status) 645 if self.job.shard_id is not None: 646 # If shard_id is None, the job will be synced back to the master 647 self.job.update_field('shard_id', None) 648 self._email_on_job_complete() 649 650 self.update_field('complete', complete) 651 652 should_email_status = (status.lower() in _notify_email_statuses or 653 'all' in _notify_email_statuses) 654 if should_email_status: 655 self._email_on_status(status) 656 logging.debug('HQE Set Status Complete') 657 self.record_state('hqe_status', 'status', status) 658 659 660 661 def _on_complete(self, status): 662 if status is not models.HostQueueEntry.Status.ABORTED: 663 self.job.stop_if_necessary() 664 665 if self.started_on: 666 self.set_finished_on_now() 667 if not self.execution_subdir: 668 return 669 # unregister any possible pidfiles associated with this queue entry 670 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES: 671 pidfile_id = _drone_manager.get_pidfile_id_from( 672 self.execution_path(), pidfile_name=pidfile_name) 673 _drone_manager.unregister_pidfile(pidfile_id) 674 675 676 def _get_status_email_contents(self, status, summary=None, hostname=None): 677 """ 678 Gather info for the status notification e-mails. 679 680 If needed, we could start using the Django templating engine to create 681 the subject and the e-mail body, but that doesn't seem necessary right 682 now. 683 684 @param status: Job status text. Mandatory. 685 @param summary: Job summary text. Optional. 686 @param hostname: A hostname for the job. Optional. 687 688 @return: Tuple (subject, body) for the notification e-mail. 689 """ 690 job_stats = Job(id=self.job.id).get_execution_details() 691 692 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' % 693 (self.job.id, self.job.name, status)) 694 695 if hostname is not None: 696 subject += '| Hostname: %s ' % hostname 697 698 if status not in ["1 Failed", "Failed"]: 699 subject += '| Success Rate: %.2f %%' % job_stats['success_rate'] 700 701 body = "Job ID: %s\n" % self.job.id 702 body += "Job name: %s\n" % self.job.name 703 if hostname is not None: 704 body += "Host: %s\n" % hostname 705 if summary is not None: 706 body += "Summary: %s\n" % summary 707 body += "Status: %s\n" % status 708 body += "Results interface URL: %s\n" % self._view_job_url() 709 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time'] 710 if int(job_stats['total_executed']) > 0: 711 body += "User tests executed: %s\n" % job_stats['total_executed'] 712 body += "User tests passed: %s\n" % job_stats['total_passed'] 713 body += "User tests failed: %s\n" % job_stats['total_failed'] 714 body += ("User tests success rate: %.2f %%\n" % 715 job_stats['success_rate']) 716 717 if job_stats['failed_rows']: 718 body += "Failures:\n" 719 body += job_stats['failed_rows'] 720 721 return subject, body 722 723 724 def _email_on_status(self, status): 725 hostname = self._get_hostname() 726 subject, body = self._get_status_email_contents(status, None, hostname) 727 email_manager.manager.send_email(self.job.email_list, subject, body) 728 729 730 def _email_on_job_complete(self): 731 if not self.job.is_finished(): 732 return 733 734 summary = [] 735 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id) 736 for queue_entry in hosts_queue: 737 summary.append("Host: %s Status: %s" % 738 (queue_entry._get_hostname(), 739 queue_entry.status)) 740 741 summary = "\n".join(summary) 742 status_counts = models.Job.objects.get_status_counts( 743 [self.job.id])[self.job.id] 744 status = ', '.join('%d %s' % (count, status) for status, count 745 in status_counts.iteritems()) 746 747 subject, body = self._get_status_email_contents(status, summary, None) 748 email_manager.manager.send_email(self.job.email_list, subject, body) 749 750 751 def schedule_pre_job_tasks(self): 752 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s", 753 self.job.name, self.meta_host, self.atomic_group_id, 754 self.job.id, self.id, self.host.hostname, self.status) 755 756 self._do_schedule_pre_job_tasks() 757 758 759 def _do_schedule_pre_job_tasks(self): 760 self.job.schedule_pre_job_tasks(queue_entry=self) 761 762 763 def requeue(self): 764 assert self.host 765 self.set_status(models.HostQueueEntry.Status.QUEUED) 766 self.update_field('started_on', None) 767 self.update_field('finished_on', None) 768 # verify/cleanup failure sets the execution subdir, so reset it here 769 self.set_execution_subdir('') 770 if self.meta_host: 771 self.set_host(None) 772 773 774 @property 775 def aborted_by(self): 776 self._load_abort_info() 777 return self._aborted_by 778 779 780 @property 781 def aborted_on(self): 782 self._load_abort_info() 783 return self._aborted_on 784 785 786 def _load_abort_info(self): 787 """ Fetch info about who aborted the job. """ 788 if hasattr(self, "_aborted_by"): 789 return 790 rows = _db.execute(""" 791 SELECT afe_users.login, 792 afe_aborted_host_queue_entries.aborted_on 793 FROM afe_aborted_host_queue_entries 794 INNER JOIN afe_users 795 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id 796 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s 797 """, (self.id,)) 798 if rows: 799 self._aborted_by, self._aborted_on = rows[0] 800 else: 801 self._aborted_by = self._aborted_on = None 802 803 804 def on_pending(self): 805 """ 806 Called when an entry in a synchronous job has passed verify. If the 807 job is ready to run, sets the entries to STARTING. Otherwise, it leaves 808 them in PENDING. 809 """ 810 self.set_status(models.HostQueueEntry.Status.PENDING) 811 self.host.set_status(models.Host.Status.PENDING) 812 813 # Some debug code here: sends an email if an asynchronous job does not 814 # immediately enter Starting. 815 # TODO: Remove this once we figure out why asynchronous jobs are getting 816 # stuck in Pending. 817 self.job.run_if_ready(queue_entry=self) 818 if (self.job.synch_count == 1 and 819 self.status == models.HostQueueEntry.Status.PENDING): 820 subject = 'Job %s (id %s)' % (self.job.name, self.job.id) 821 message = 'Asynchronous job stuck in Pending' 822 email_manager.manager.enqueue_notify_email(subject, message) 823 824 825 def abort(self, dispatcher): 826 assert self.aborted and not self.complete 827 828 Status = models.HostQueueEntry.Status 829 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING): 830 # do nothing; post-job tasks will finish and then mark this entry 831 # with status "Aborted" and take care of the host 832 return 833 834 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING, 835 Status.WAITING): 836 # If hqe is in any of these status, it should not have any 837 # unfinished agent before it can be aborted. 838 agents = dispatcher.get_agents_for_entry(self) 839 # Agent with finished task can be left behind. This is added to 840 # handle the special case of aborting hostless job in STARTING 841 # status, in which the agent has only a HostlessQueueTask 842 # associated. The finished HostlessQueueTask will be cleaned up in 843 # the next tick, so it's safe to leave the agent there. Without 844 # filtering out finished agent, HQE abort won't be able to proceed. 845 assert all([agent.is_done() for agent in agents]) 846 # If hqe is still in STARTING status, it may not have assigned a 847 # host yet. 848 if self.host: 849 self.host.set_status(models.Host.Status.READY) 850 elif (self.status == Status.VERIFYING or 851 self.status == Status.RESETTING): 852 models.SpecialTask.objects.create( 853 task=models.SpecialTask.Task.CLEANUP, 854 host=models.Host.objects.get(id=self.host.id), 855 requested_by=self.job.owner_model()) 856 elif self.status == Status.PROVISIONING: 857 models.SpecialTask.objects.create( 858 task=models.SpecialTask.Task.REPAIR, 859 host=models.Host.objects.get(id=self.host.id), 860 requested_by=self.job.owner_model()) 861 862 self.set_status(Status.ABORTED) 863 self.job.abort_delay_ready_task() 864 865 866 def get_group_name(self): 867 atomic_group = self.atomic_group 868 if not atomic_group: 869 return '' 870 871 # Look at any meta_host and dependency labels and pick the first 872 # one that also specifies this atomic group. Use that label name 873 # as the group name if possible (it is more specific). 874 for label in self.get_labels(): 875 if label.atomic_group_id: 876 assert label.atomic_group_id == atomic_group.id 877 return label.name 878 return atomic_group.name 879 880 881 def execution_tag(self): 882 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE ' 883 'complete!=1 AND execution_subdir="" AND ' 884 'status!="Queued";') 885 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET ' 886 'status="Aborted" WHERE id=%s;') 887 try: 888 assert self.execution_subdir 889 except AssertionError: 890 # TODO(scottz): Remove temporary fix/info gathering pathway for 891 # crosbug.com/31595 once issue is root caused. 892 logging.error('No execution_subdir for host queue id:%s.', self.id) 893 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES) 894 for row in _db.execute(SQL_SUSPECT_ENTRIES): 895 logging.error(row) 896 logging.error('====DB DEBUG====\n') 897 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id 898 logging.error('EXECUTING: %s', fix_query) 899 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id) 900 raise AssertionError(('self.execution_subdir not found. ' 901 'See log for details.')) 902 903 return "%s/%s" % (self.job.tag(), self.execution_subdir) 904 905 906 def execution_path(self): 907 return self.execution_tag() 908 909 910 def set_started_on_now(self): 911 self.update_field('started_on', datetime.datetime.now()) 912 913 914 def set_finished_on_now(self): 915 self.update_field('finished_on', datetime.datetime.now()) 916 917 918 def is_hostless(self): 919 return (self.host_id is None 920 and self.meta_host is None 921 and self.atomic_group_id is None) 922 923 924 class Job(DBObject): 925 _table_name = 'afe_jobs' 926 _fields = ('id', 'owner', 'name', 'priority', 'control_file', 927 'control_type', 'created_on', 'synch_count', 'timeout', 928 'run_verify', 'email_list', 'reboot_before', 'reboot_after', 929 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id', 930 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id', 931 'test_retry', 'run_reset', 'timeout_mins', 'shard_id', 932 'require_ssp') 933 _timer = autotest_stats.Timer("scheduler_models.Job") 934 935 # This does not need to be a column in the DB. The delays are likely to 936 # be configured short. If the scheduler is stopped and restarted in 937 # the middle of a job's delay cycle, the delay cycle will either be 938 # repeated or skipped depending on the number of Pending machines found 939 # when the restarted scheduler recovers to track it. Not a problem. 940 # 941 # A reference to the DelayedCallTask that will wake up the job should 942 # no other HQEs change state in time. Its end_time attribute is used 943 # by our run_with_ready_delay() method to determine if the wait is over. 944 _delay_ready_task = None 945 946 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on 947 # all status='Pending' atomic group HQEs incase a delay was running when the 948 # scheduler was restarted and no more hosts ever successfully exit Verify. 949 950 def __init__(self, id=None, row=None, **kwargs): 951 assert id or row 952 super(Job, self).__init__(id=id, row=row, **kwargs) 953 self._owner_model = None # caches model instance of owner 954 self.update_image_path = None # path of OS image to install 955 956 957 def model(self): 958 return models.Job.objects.get(id=self.id) 959 960 961 def owner_model(self): 962 # work around the fact that the Job owner field is a string, not a 963 # foreign key 964 if not self._owner_model: 965 self._owner_model = models.User.objects.get(login=self.owner) 966 return self._owner_model 967 968 969 def is_server_job(self): 970 return self.control_type == control_data.CONTROL_TYPE.SERVER 971 972 973 def tag(self): 974 return "%s-%s" % (self.id, self.owner) 975 976 977 def get_host_queue_entries(self): 978 rows = _db.execute(""" 979 SELECT * FROM afe_host_queue_entries 980 WHERE job_id= %s 981 """, (self.id,)) 982 entries = [HostQueueEntry(row=i) for i in rows] 983 984 assert len(entries)>0 985 986 return entries 987 988 989 def is_image_update_job(self): 990 """ 991 Discover if the current job requires an OS update. 992 993 @return: True/False if OS should be updated before job is run. 994 """ 995 # All image update jobs have the parameterized_job_id set. 996 if not self.parameterized_job_id: 997 return False 998 999 # Retrieve the ID of the ParameterizedJob this job is an instance of. 1000 rows = _db.execute(""" 1001 SELECT test_id 1002 FROM afe_parameterized_jobs 1003 WHERE id = %s 1004 """, (self.parameterized_job_id,)) 1005 if not rows: 1006 return False 1007 test_id = rows[0][0] 1008 1009 # Retrieve the ID of the known autoupdate_ParameterizedJob. 1010 rows = _db.execute(""" 1011 SELECT id 1012 FROM afe_autotests 1013 WHERE name = 'autoupdate_ParameterizedJob' 1014 """) 1015 if not rows: 1016 return False 1017 update_id = rows[0][0] 1018 1019 # If the IDs are the same we've found an image update job. 1020 if test_id == update_id: 1021 # Finally, get the path to the OS image to install. 1022 rows = _db.execute(""" 1023 SELECT parameter_value 1024 FROM afe_parameterized_job_parameters 1025 WHERE parameterized_job_id = %s 1026 """, (self.parameterized_job_id,)) 1027 if rows: 1028 # Save the path in update_image_path to use later as a command 1029 # line parameter to autoserv. 1030 self.update_image_path = rows[0][0] 1031 return True 1032 1033 return False 1034 1035 1036 def get_execution_details(self): 1037 """ 1038 Get test execution details for this job. 1039 1040 @return: Dictionary with test execution details 1041 """ 1042 def _find_test_jobs(rows): 1043 """ 1044 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.* 1045 Those are autotest 'internal job' tests, so they should not be 1046 counted when evaluating the test stats. 1047 1048 @param rows: List of rows (matrix) with database results. 1049 """ 1050 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]') 1051 n_test_jobs = 0 1052 for r in rows: 1053 test_name = r[0] 1054 if job_test_pattern.match(test_name): 1055 n_test_jobs += 1 1056 1057 return n_test_jobs 1058 1059 stats = {} 1060 1061 rows = _db.execute(""" 1062 SELECT t.test, s.word, t.reason 1063 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s 1064 WHERE t.job_idx = j.job_idx 1065 AND s.status_idx = t.status 1066 AND j.afe_job_id = %s 1067 ORDER BY t.reason 1068 """ % self.id) 1069 1070 failed_rows = [r for r in rows if not r[1] == 'GOOD'] 1071 1072 n_test_jobs = _find_test_jobs(rows) 1073 n_test_jobs_failed = _find_test_jobs(failed_rows) 1074 1075 total_executed = len(rows) - n_test_jobs 1076 total_failed = len(failed_rows) - n_test_jobs_failed 1077 1078 if total_executed > 0: 1079 success_rate = 100 - ((total_failed / float(total_executed)) * 100) 1080 else: 1081 success_rate = 0 1082 1083 stats['total_executed'] = total_executed 1084 stats['total_failed'] = total_failed 1085 stats['total_passed'] = total_executed - total_failed 1086 stats['success_rate'] = success_rate 1087 1088 status_header = ("Test Name", "Status", "Reason") 1089 if failed_rows: 1090 stats['failed_rows'] = utils.matrix_to_string(failed_rows, 1091 status_header) 1092 else: 1093 stats['failed_rows'] = '' 1094 1095 time_row = _db.execute(""" 1096 SELECT started_time, finished_time 1097 FROM tko_jobs 1098 WHERE afe_job_id = %s 1099 """ % self.id) 1100 1101 if time_row: 1102 t_begin, t_end = time_row[0] 1103 try: 1104 delta = t_end - t_begin 1105 minutes, seconds = divmod(delta.seconds, 60) 1106 hours, minutes = divmod(minutes, 60) 1107 stats['execution_time'] = ("%02d:%02d:%02d" % 1108 (hours, minutes, seconds)) 1109 # One of t_end or t_begin are None 1110 except TypeError: 1111 stats['execution_time'] = '(could not determine)' 1112 else: 1113 stats['execution_time'] = '(none)' 1114 1115 return stats 1116 1117 1118 @_timer.decorate 1119 def set_status(self, status, update_queues=False): 1120 self.update_field('status',status) 1121 1122 if update_queues: 1123 for queue_entry in self.get_host_queue_entries(): 1124 queue_entry.set_status(status) 1125 1126 1127 def keyval_dict(self): 1128 return self.model().keyval_dict() 1129 1130 1131 def _atomic_and_has_started(self): 1132 """ 1133 @returns True if any of the HostQueueEntries associated with this job 1134 have entered the Status.STARTING state or beyond. 1135 """ 1136 atomic_entries = models.HostQueueEntry.objects.filter( 1137 job=self.id, atomic_group__isnull=False) 1138 if atomic_entries.count() <= 0: 1139 return False 1140 1141 # These states may *only* be reached if Job.run() has been called. 1142 started_statuses = (models.HostQueueEntry.Status.STARTING, 1143 models.HostQueueEntry.Status.RUNNING, 1144 models.HostQueueEntry.Status.COMPLETED) 1145 1146 started_entries = atomic_entries.filter(status__in=started_statuses) 1147 return started_entries.count() > 0 1148 1149 1150 def _hosts_assigned_count(self): 1151 """The number of HostQueueEntries assigned a Host for this job.""" 1152 entries = models.HostQueueEntry.objects.filter(job=self.id, 1153 host__isnull=False) 1154 return entries.count() 1155 1156 1157 def _pending_count(self): 1158 """The number of HostQueueEntries for this job in the Pending state.""" 1159 pending_entries = models.HostQueueEntry.objects.filter( 1160 job=self.id, status=models.HostQueueEntry.Status.PENDING) 1161 return pending_entries.count() 1162 1163 1164 def _max_hosts_needed_to_run(self, atomic_group): 1165 """ 1166 @param atomic_group: The AtomicGroup associated with this job that we 1167 are using to set an upper bound on the threshold. 1168 @returns The maximum number of HostQueueEntries assigned a Host before 1169 this job can run. 1170 """ 1171 return min(self._hosts_assigned_count(), 1172 atomic_group.max_number_of_machines) 1173 1174 1175 def _min_hosts_needed_to_run(self): 1176 """Return the minumum number of hsots needed to run this job.""" 1177 return self.synch_count 1178 1179 1180 def is_ready(self): 1181 # NOTE: Atomic group jobs stop reporting ready after they have been 1182 # started to avoid launching multiple copies of one atomic job. 1183 # Only possible if synch_count is less than than half the number of 1184 # machines in the atomic group. 1185 pending_count = self._pending_count() 1186 atomic_and_has_started = self._atomic_and_has_started() 1187 ready = (pending_count >= self.synch_count 1188 and not atomic_and_has_started) 1189 1190 if not ready: 1191 logging.info( 1192 'Job %s not ready: %s pending, %s required ' 1193 '(Atomic and started: %s)', 1194 self, pending_count, self.synch_count, 1195 atomic_and_has_started) 1196 1197 return ready 1198 1199 1200 def num_machines(self, clause = None): 1201 sql = "job_id=%s" % self.id 1202 if clause: 1203 sql += " AND (%s)" % clause 1204 return self.count(sql, table='afe_host_queue_entries') 1205 1206 1207 def num_queued(self): 1208 return self.num_machines('not complete') 1209 1210 1211 def num_active(self): 1212 return self.num_machines('active') 1213 1214 1215 def num_complete(self): 1216 return self.num_machines('complete') 1217 1218 1219 def is_finished(self): 1220 return self.num_complete() == self.num_machines() 1221 1222 1223 def _not_yet_run_entries(self, include_verifying=True): 1224 statuses = [models.HostQueueEntry.Status.QUEUED, 1225 models.HostQueueEntry.Status.PENDING] 1226 if include_verifying: 1227 statuses.append(models.HostQueueEntry.Status.VERIFYING) 1228 return models.HostQueueEntry.objects.filter(job=self.id, 1229 status__in=statuses) 1230 1231 1232 def _stop_all_entries(self): 1233 entries_to_stop = self._not_yet_run_entries( 1234 include_verifying=False) 1235 for child_entry in entries_to_stop: 1236 assert not child_entry.complete, ( 1237 '%s status=%s, active=%s, complete=%s' % 1238 (child_entry.id, child_entry.status, child_entry.active, 1239 child_entry.complete)) 1240 if child_entry.status == models.HostQueueEntry.Status.PENDING: 1241 child_entry.host.status = models.Host.Status.READY 1242 child_entry.host.save() 1243 child_entry.status = models.HostQueueEntry.Status.STOPPED 1244 child_entry.save() 1245 1246 1247 def stop_if_necessary(self): 1248 not_yet_run = self._not_yet_run_entries() 1249 if not_yet_run.count() < self.synch_count: 1250 self._stop_all_entries() 1251 1252 1253 def _next_group_name(self, group_name=''): 1254 """@returns a directory name to use for the next host group results.""" 1255 if group_name: 1256 # Sanitize for use as a pathname. 1257 group_name = group_name.replace(os.path.sep, '_') 1258 if group_name.startswith('.'): 1259 group_name = '_' + group_name[1:] 1260 # Add a separator between the group name and 'group%d'. 1261 group_name += '.' 1262 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name)) 1263 query = models.HostQueueEntry.objects.filter( 1264 job=self.id).values('execution_subdir').distinct() 1265 subdirs = (entry['execution_subdir'] for entry in query) 1266 group_matches = (group_count_re.match(subdir) for subdir in subdirs) 1267 ids = [int(match.group(1)) for match in group_matches if match] 1268 if ids: 1269 next_id = max(ids) + 1 1270 else: 1271 next_id = 0 1272 return '%sgroup%d' % (group_name, next_id) 1273 1274 1275 def get_group_entries(self, queue_entry_from_group): 1276 """ 1277 @param queue_entry_from_group: A HostQueueEntry instance to find other 1278 group entries on this job for. 1279 1280 @returns A list of HostQueueEntry objects all executing this job as 1281 part of the same group as the one supplied (having the same 1282 execution_subdir). 1283 """ 1284 execution_subdir = queue_entry_from_group.execution_subdir 1285 return list(HostQueueEntry.fetch( 1286 where='job_id=%s AND execution_subdir=%s', 1287 params=(self.id, execution_subdir))) 1288 1289 1290 def _should_run_cleanup(self, queue_entry): 1291 if self.reboot_before == model_attributes.RebootBefore.ALWAYS: 1292 return True 1293 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY: 1294 return queue_entry.host.dirty 1295 return False 1296 1297 1298 def _should_run_verify(self, queue_entry): 1299 do_not_verify = (queue_entry.host.protection == 1300 host_protections.Protection.DO_NOT_VERIFY) 1301 if do_not_verify: 1302 return False 1303 # If RebootBefore is set to NEVER, then we won't run reset because 1304 # we can't cleanup, so we need to weaken a Reset into a Verify. 1305 weaker_reset = (self.run_reset and 1306 self.reboot_before == model_attributes.RebootBefore.NEVER) 1307 return self.run_verify or weaker_reset 1308 1309 1310 def _should_run_reset(self, queue_entry): 1311 can_verify = (queue_entry.host.protection != 1312 host_protections.Protection.DO_NOT_VERIFY) 1313 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER 1314 return (can_reboot and can_verify and (self.run_reset or 1315 (self._should_run_cleanup(queue_entry) and 1316 self._should_run_verify(queue_entry)))) 1317 1318 1319 def _should_run_provision(self, queue_entry): 1320 """ 1321 Determine if the queue_entry needs to have a provision task run before 1322 it to provision queue_entry.host. 1323 1324 @param queue_entry: The host queue entry in question. 1325 @returns: True if we should schedule a provision task, False otherwise. 1326 1327 """ 1328 # If we get to this point, it means that the scheduler has already 1329 # vetted that all the unprovisionable labels match, so we can just 1330 # find all labels on the job that aren't on the host to get the list 1331 # of what we need to provision. (See the scheduling logic in 1332 # host_scheduler.py:is_host_eligable_for_job() where we discard all 1333 # actionable labels when assigning jobs to hosts.) 1334 job_labels = {x.name for x in queue_entry.get_labels()} 1335 _, host_labels = queue_entry.host.platform_and_labels() 1336 # If there are any labels on the job that are not on the host and they 1337 # are labels that provisioning knows how to change, then that means 1338 # there is provisioning work to do. If there's no provisioning work to 1339 # do, then obviously we have no reason to schedule a provision task! 1340 diff = job_labels - set(host_labels) 1341 if any([provision.Provision.acts_on(x) for x in diff]): 1342 return True 1343 return False 1344 1345 1346 def _queue_special_task(self, queue_entry, task): 1347 """ 1348 Create a special task and associate it with a host queue entry. 1349 1350 @param queue_entry: The queue entry this special task should be 1351 associated with. 1352 @param task: One of the members of the enum models.SpecialTask.Task. 1353 @returns: None 1354 1355 """ 1356 models.SpecialTask.objects.create( 1357 host=models.Host.objects.get(id=queue_entry.host_id), 1358 queue_entry=queue_entry, task=task) 1359 1360 1361 def schedule_pre_job_tasks(self, queue_entry): 1362 """ 1363 Queue all of the special tasks that need to be run before a host 1364 queue entry may run. 1365 1366 If no special taskes need to be scheduled, then |on_pending| will be 1367 called directly. 1368 1369 @returns None 1370 1371 """ 1372 task_queued = False 1373 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id) 1374 1375 if self._should_run_provision(queue_entry): 1376 self._queue_special_task(hqe_model, 1377 models.SpecialTask.Task.PROVISION) 1378 task_queued = True 1379 elif self._should_run_reset(queue_entry): 1380 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET) 1381 task_queued = True 1382 else: 1383 if self._should_run_cleanup(queue_entry): 1384 self._queue_special_task(hqe_model, 1385 models.SpecialTask.Task.CLEANUP) 1386 task_queued = True 1387 if self._should_run_verify(queue_entry): 1388 self._queue_special_task(hqe_model, 1389 models.SpecialTask.Task.VERIFY) 1390 task_queued = True 1391 1392 if not task_queued: 1393 queue_entry.on_pending() 1394 1395 1396 def _assign_new_group(self, queue_entries, group_name=''): 1397 if len(queue_entries) == 1: 1398 group_subdir_name = queue_entries[0].host.hostname 1399 else: 1400 group_subdir_name = self._next_group_name(group_name) 1401 logging.info('Running synchronous job %d hosts %s as %s', 1402 self.id, [entry.host.hostname for entry in queue_entries], 1403 group_subdir_name) 1404 1405 for queue_entry in queue_entries: 1406 queue_entry.set_execution_subdir(group_subdir_name) 1407 1408 1409 def _choose_group_to_run(self, include_queue_entry): 1410 """ 1411 @returns A tuple containing a list of HostQueueEntry instances to be 1412 used to run this Job, a string group name to suggest giving 1413 to this job in the results database. 1414 """ 1415 atomic_group = include_queue_entry.atomic_group 1416 chosen_entries = [include_queue_entry] 1417 if atomic_group: 1418 num_entries_wanted = atomic_group.max_number_of_machines 1419 else: 1420 num_entries_wanted = self.synch_count 1421 num_entries_wanted -= len(chosen_entries) 1422 1423 if num_entries_wanted > 0: 1424 where_clause = 'job_id = %s AND status = "Pending" AND id != %s' 1425 pending_entries = list(HostQueueEntry.fetch( 1426 where=where_clause, 1427 params=(self.id, include_queue_entry.id))) 1428 1429 # Sort the chosen hosts by hostname before slicing. 1430 def cmp_queue_entries_by_hostname(entry_a, entry_b): 1431 return Host.cmp_for_sort(entry_a.host, entry_b.host) 1432 pending_entries.sort(cmp=cmp_queue_entries_by_hostname) 1433 chosen_entries += pending_entries[:num_entries_wanted] 1434 1435 # Sanity check. We'll only ever be called if this can be met. 1436 if len(chosen_entries) < self.synch_count: 1437 message = ('job %s got less than %s chosen entries: %s' % ( 1438 self.id, self.synch_count, chosen_entries)) 1439 logging.error(message) 1440 email_manager.manager.enqueue_notify_email( 1441 'Job not started, too few chosen entries', message) 1442 return [] 1443 1444 group_name = include_queue_entry.get_group_name() 1445 1446 self._assign_new_group(chosen_entries, group_name=group_name) 1447 return chosen_entries 1448 1449 1450 def run_if_ready(self, queue_entry): 1451 """ 1452 Run this job by kicking its HQEs into status='Starting' if enough 1453 hosts are ready for it to run. 1454 1455 Cleans up by kicking HQEs into status='Stopped' if this Job is not 1456 ready to run. 1457 """ 1458 if not self.is_ready(): 1459 self.stop_if_necessary() 1460 elif queue_entry.atomic_group: 1461 self.run_with_ready_delay(queue_entry) 1462 else: 1463 self.run(queue_entry) 1464 1465 1466 def run_with_ready_delay(self, queue_entry): 1467 """ 1468 Start a delay to wait for more hosts to enter Pending state before 1469 launching an atomic group job. Once set, the a delay cannot be reset. 1470 1471 @param queue_entry: The HostQueueEntry object to get atomic group 1472 info from and pass to run_if_ready when the delay is up. 1473 1474 @returns An Agent to run the job as appropriate or None if a delay 1475 has already been set. 1476 """ 1477 assert queue_entry.job_id == self.id 1478 assert queue_entry.atomic_group 1479 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts 1480 over_max_threshold = (self._pending_count() >= 1481 self._max_hosts_needed_to_run(queue_entry.atomic_group)) 1482 delay_expired = (self._delay_ready_task and 1483 time.time() >= self._delay_ready_task.end_time) 1484 1485 # Delay is disabled or we already have enough? Do not wait to run. 1486 if not delay or over_max_threshold or delay_expired: 1487 self.run(queue_entry) 1488 else: 1489 queue_entry.set_status(models.HostQueueEntry.Status.WAITING) 1490 1491 1492 def request_abort(self): 1493 """Request that this Job be aborted on the next scheduler cycle.""" 1494 self.model().abort() 1495 1496 1497 def schedule_delayed_callback_task(self, queue_entry): 1498 queue_entry.set_status(models.HostQueueEntry.Status.PENDING) 1499 1500 if self._delay_ready_task: 1501 return None 1502 1503 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts 1504 1505 def run_job_after_delay(): 1506 logging.info('Job %s done waiting for extra hosts.', self) 1507 # Check to see if the job is still relevant. It could have aborted 1508 # while we were waiting or hosts could have disappearred, etc. 1509 if self._pending_count() < self._min_hosts_needed_to_run(): 1510 logging.info('Job %s had too few Pending hosts after waiting ' 1511 'for extras. Not running.', self) 1512 self.request_abort() 1513 return 1514 return self.run(queue_entry) 1515 1516 logging.info('Job %s waiting up to %s seconds for more hosts.', 1517 self.id, delay) 1518 self._delay_ready_task = DelayedCallTask(delay_seconds=delay, 1519 callback=run_job_after_delay) 1520 return self._delay_ready_task 1521 1522 1523 def run(self, queue_entry): 1524 """ 1525 @param queue_entry: The HostQueueEntry instance calling this method. 1526 """ 1527 if queue_entry.atomic_group and self._atomic_and_has_started(): 1528 logging.error('Job.run() called on running atomic Job %d ' 1529 'with HQE %s.', self.id, queue_entry) 1530 return 1531 queue_entries = self._choose_group_to_run(queue_entry) 1532 if queue_entries: 1533 self._finish_run(queue_entries) 1534 1535 1536 def _finish_run(self, queue_entries): 1537 for queue_entry in queue_entries: 1538 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 1539 self.abort_delay_ready_task() 1540 1541 1542 def abort_delay_ready_task(self): 1543 """Abort the delayed task associated with this job, if any.""" 1544 if self._delay_ready_task: 1545 # Cancel any pending callback that would try to run again 1546 # as we are already running. 1547 self._delay_ready_task.abort() 1548 1549 1550 def __str__(self): 1551 return '%s-%s' % (self.id, self.owner) 1552