Home | History | Annotate | Download | only in scheduler
      1 # pylint: disable=missing-docstring
      2 
      3 """Database model classes for the scheduler.
      4 
      5 Contains model classes abstracting the various DB tables used by the scheduler.
      6 These overlap the Django models in basic functionality, but were written before
      7 the Django models existed and have not yet been phased out.  Some of them
      8 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic
      9 which would probably be ill-suited for inclusion in the general Django model
     10 classes.
     11 
     12 Globals:
     13 _notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
     14         one of these statuses, an email will be sent to the job's email_list.
     15         comes from global_config.
     16 _base_url: URL to the local AFE server, used to construct URLs for emails.
     17 _db: DatabaseConnection for this module.
     18 _drone_manager: reference to global DroneManager instance.
     19 """
     20 
     21 import datetime
     22 import itertools
     23 import logging
     24 import re
     25 import time
     26 import weakref
     27 
     28 from autotest_lib.client.common_lib import global_config, host_protections
     29 from autotest_lib.client.common_lib import time_utils
     30 from autotest_lib.client.common_lib import utils
     31 from autotest_lib.frontend.afe import models, model_attributes
     32 from autotest_lib.scheduler import drone_manager, email_manager
     33 from autotest_lib.scheduler import rdb_lib
     34 from autotest_lib.scheduler import scheduler_config
     35 from autotest_lib.scheduler import scheduler_lib
     36 from autotest_lib.server import afe_urls
     37 from autotest_lib.server.cros import provision
     38 
     39 try:
     40     from chromite.lib import metrics
     41 except ImportError:
     42     metrics = utils.metrics_mock
     43 
     44 
     45 _notify_email_statuses = []
     46 _base_url = None
     47 
     48 _db = None
     49 _drone_manager = None
     50 
     51 def initialize():
     52     global _db
     53     _db = scheduler_lib.ConnectionManager().get_connection()
     54 
     55     notify_statuses_list = global_config.global_config.get_config_value(
     56             scheduler_config.CONFIG_SECTION, "notify_email_statuses",
     57             default='')
     58     global _notify_email_statuses
     59     _notify_email_statuses = [status for status in
     60                               re.split(r'[\s,;:]', notify_statuses_list.lower())
     61                               if status]
     62 
     63     # AUTOTEST_WEB.base_url is still a supported config option as some people
     64     # may wish to override the entire url.
     65     global _base_url
     66     config_base_url = global_config.global_config.get_config_value(
     67             scheduler_config.CONFIG_SECTION, 'base_url', default='')
     68     if config_base_url:
     69         _base_url = config_base_url
     70     else:
     71         _base_url = afe_urls.ROOT_URL
     72 
     73     initialize_globals()
     74 
     75 
     76 def initialize_globals():
     77     global _drone_manager
     78     _drone_manager = drone_manager.instance()
     79 
     80 
     81 def get_job_metadata(job):
     82     """Get a dictionary of the job information.
     83 
     84     The return value is a dictionary that includes job information like id,
     85     name and parent job information. The value will be stored in metadata
     86     database.
     87 
     88     @param job: A Job object.
     89     @return: A dictionary containing the job id, owner and name.
     90     """
     91     if not job:
     92         logging.error('Job is None, no metadata returned.')
     93         return {}
     94     try:
     95         return {'job_id': job.id,
     96                 'owner': job.owner,
     97                 'job_name': job.name,
     98                 'parent_job_id': job.parent_job_id}
     99     except AttributeError as e:
    100         logging.error('Job has missing attribute: %s', e)
    101         return {}
    102 
    103 
    104 class DelayedCallTask(object):
    105     """
    106     A task object like AgentTask for an Agent to run that waits for the
    107     specified amount of time to have elapsed before calling the supplied
    108     callback once and finishing.  If the callback returns anything, it is
    109     assumed to be a new Agent instance and will be added to the dispatcher.
    110 
    111     @attribute end_time: The absolute posix time after which this task will
    112             call its callback when it is polled and be finished.
    113 
    114     Also has all attributes required by the Agent class.
    115     """
    116     def __init__(self, delay_seconds, callback, now_func=None):
    117         """
    118         @param delay_seconds: The delay in seconds from now that this task
    119                 will call the supplied callback and be done.
    120         @param callback: A callable to be called by this task once after at
    121                 least delay_seconds time has elapsed.  It must return None
    122                 or a new Agent instance.
    123         @param now_func: A time.time like function.  Default: time.time.
    124                 Used for testing.
    125         """
    126         assert delay_seconds > 0
    127         assert callable(callback)
    128         if not now_func:
    129             now_func = time.time
    130         self._now_func = now_func
    131         self._callback = callback
    132 
    133         self.end_time = self._now_func() + delay_seconds
    134 
    135         # These attributes are required by Agent.
    136         self.aborted = False
    137         self.host_ids = ()
    138         self.success = False
    139         self.queue_entry_ids = ()
    140         self.num_processes = 0
    141 
    142 
    143     def poll(self):
    144         if not self.is_done() and self._now_func() >= self.end_time:
    145             self._callback()
    146             self.success = True
    147 
    148 
    149     def is_done(self):
    150         return self.success or self.aborted
    151 
    152 
    153     def abort(self):
    154         self.aborted = True
    155 
    156 
    157 class DBError(Exception):
    158     """Raised by the DBObject constructor when its select fails."""
    159 
    160 
    161 class DBObject(object):
    162     """A miniature object relational model for the database."""
    163 
    164     # Subclasses MUST override these:
    165     _table_name = ''
    166     _fields = ()
    167 
    168     # A mapping from (type, id) to the instance of the object for that
    169     # particular id.  This prevents us from creating new Job() and Host()
    170     # instances for every HostQueueEntry object that we instantiate as
    171     # multiple HQEs often share the same Job.
    172     _instances_by_type_and_id = weakref.WeakValueDictionary()
    173     _initialized = False
    174 
    175 
    176     def __new__(cls, id=None, **kwargs):
    177         """
    178         Look to see if we already have an instance for this particular type
    179         and id.  If so, use it instead of creating a duplicate instance.
    180         """
    181         if id is not None:
    182             instance = cls._instances_by_type_and_id.get((cls, id))
    183             if instance:
    184                 return instance
    185         return super(DBObject, cls).__new__(cls, id=id, **kwargs)
    186 
    187 
    188     def __init__(self, id=None, row=None, new_record=False, always_query=True):
    189         assert bool(id) or bool(row)
    190         if id is not None and row is not None:
    191             assert id == row[0]
    192         assert self._table_name, '_table_name must be defined in your class'
    193         assert self._fields, '_fields must be defined in your class'
    194         if not new_record:
    195             if self._initialized and not always_query:
    196                 return  # We've already been initialized.
    197             if id is None:
    198                 id = row[0]
    199             # Tell future constructors to use us instead of re-querying while
    200             # this instance is still around.
    201             self._instances_by_type_and_id[(type(self), id)] = self
    202 
    203         self.__table = self._table_name
    204 
    205         self.__new_record = new_record
    206 
    207         if row is None:
    208             row = self._fetch_row_from_db(id)
    209 
    210         if self._initialized:
    211             differences = self._compare_fields_in_row(row)
    212             if differences:
    213                 logging.warning(
    214                     'initialized %s %s instance requery is updating: %s',
    215                     type(self), self.id, differences)
    216         self._update_fields_from_row(row)
    217         self._initialized = True
    218 
    219 
    220     @classmethod
    221     def _clear_instance_cache(cls):
    222         """Used for testing, clear the internal instance cache."""
    223         cls._instances_by_type_and_id.clear()
    224 
    225 
    226     def _fetch_row_from_db(self, row_id):
    227         sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
    228         rows = _db.execute(sql, (row_id,))
    229         if not rows:
    230             raise DBError("row not found (table=%s, row id=%s)"
    231                           % (self.__table, row_id))
    232         return rows[0]
    233 
    234 
    235     def _assert_row_length(self, row):
    236         assert len(row) == len(self._fields), (
    237             "table = %s, row = %s/%d, fields = %s/%d" % (
    238             self.__table, row, len(row), self._fields, len(self._fields)))
    239 
    240 
    241     def _compare_fields_in_row(self, row):
    242         """
    243         Given a row as returned by a SELECT query, compare it to our existing in
    244         memory fields.  Fractional seconds are stripped from datetime values
    245         before comparison.
    246 
    247         @param row - A sequence of values corresponding to fields named in
    248                 The class attribute _fields.
    249 
    250         @returns A dictionary listing the differences keyed by field name
    251                 containing tuples of (current_value, row_value).
    252         """
    253         self._assert_row_length(row)
    254         differences = {}
    255         for field, row_value in itertools.izip(self._fields, row):
    256             current_value = getattr(self, field)
    257             if (isinstance(current_value, datetime.datetime)
    258                 and isinstance(row_value, datetime.datetime)):
    259                 current_value = current_value.strftime(time_utils.TIME_FMT)
    260                 row_value = row_value.strftime(time_utils.TIME_FMT)
    261             if current_value != row_value:
    262                 differences[field] = (current_value, row_value)
    263         return differences
    264 
    265 
    266     def _update_fields_from_row(self, row):
    267         """
    268         Update our field attributes using a single row returned by SELECT.
    269 
    270         @param row - A sequence of values corresponding to fields named in
    271                 the class fields list.
    272         """
    273         self._assert_row_length(row)
    274 
    275         self._valid_fields = set()
    276         for field, value in itertools.izip(self._fields, row):
    277             setattr(self, field, value)
    278             self._valid_fields.add(field)
    279 
    280         self._valid_fields.remove('id')
    281 
    282 
    283     def update_from_database(self):
    284         assert self.id is not None
    285         row = self._fetch_row_from_db(self.id)
    286         self._update_fields_from_row(row)
    287 
    288 
    289     def count(self, where, table = None):
    290         if not table:
    291             table = self.__table
    292 
    293         rows = _db.execute("""
    294                 SELECT count(*) FROM %s
    295                 WHERE %s
    296         """ % (table, where))
    297 
    298         assert len(rows) == 1
    299 
    300         return int(rows[0][0])
    301 
    302 
    303     def update_field(self, field, value):
    304         assert field in self._valid_fields
    305 
    306         if getattr(self, field) == value:
    307             return
    308 
    309         query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
    310         _db.execute(query, (value, self.id))
    311 
    312         setattr(self, field, value)
    313 
    314 
    315     def save(self):
    316         if self.__new_record:
    317             keys = self._fields[1:] # avoid id
    318             columns = ','.join([str(key) for key in keys])
    319             values = []
    320             for key in keys:
    321                 value = getattr(self, key)
    322                 if value is None:
    323                     values.append('NULL')
    324                 else:
    325                     values.append('"%s"' % value)
    326             values_str = ','.join(values)
    327             query = ('INSERT INTO %s (%s) VALUES (%s)' %
    328                      (self.__table, columns, values_str))
    329             _db.execute(query)
    330             # Update our id to the one the database just assigned to us.
    331             self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
    332 
    333 
    334     def delete(self):
    335         self._instances_by_type_and_id.pop((type(self), id), None)
    336         self._initialized = False
    337         self._valid_fields.clear()
    338         query = 'DELETE FROM %s WHERE id=%%s' % self.__table
    339         _db.execute(query, (self.id,))
    340 
    341 
    342     @staticmethod
    343     def _prefix_with(string, prefix):
    344         if string:
    345             string = prefix + string
    346         return string
    347 
    348 
    349     @classmethod
    350     def fetch_rows(cls, where='', params=(), joins='', order_by=''):
    351         """
    352         Fetch the rows based on the given database query.
    353 
    354         @yields the rows fetched by the given query.
    355         """
    356         order_by = cls._prefix_with(order_by, 'ORDER BY ')
    357         where = cls._prefix_with(where, 'WHERE ')
    358         query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
    359                  '%(where)s %(order_by)s' % {'table' : cls._table_name,
    360                                              'joins' : joins,
    361                                              'where' : where,
    362                                              'order_by' : order_by})
    363         rows = _db.execute(query, params)
    364         return rows
    365 
    366     @classmethod
    367     def fetch(cls, where='', params=(), joins='', order_by=''):
    368         """
    369         Construct instances of our class based on the given database query.
    370 
    371         @yields One class instance for each row fetched.
    372         """
    373         rows = cls.fetch_rows(where=where, params=params, joins=joins,
    374                               order_by=order_by)
    375         return [cls(id=row[0], row=row) for row in rows]
    376 
    377 
    378 class IneligibleHostQueue(DBObject):
    379     _table_name = 'afe_ineligible_host_queues'
    380     _fields = ('id', 'job_id', 'host_id')
    381 
    382 
    383 class AtomicGroup(DBObject):
    384     _table_name = 'afe_atomic_groups'
    385     _fields = ('id', 'name', 'description', 'max_number_of_machines',
    386                'invalid')
    387 
    388 
    389 class Label(DBObject):
    390     _table_name = 'afe_labels'
    391     _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
    392                'only_if_needed', 'atomic_group_id')
    393 
    394 
    395     def __repr__(self):
    396         return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
    397                 self.name, self.id, self.atomic_group_id)
    398 
    399 
    400 class Host(DBObject):
    401     _table_name = 'afe_hosts'
    402     # TODO(ayatane): synch_id is not used, remove after fixing DB.
    403     _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
    404                'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
    405                'leased', 'shard_id', 'lock_reason')
    406 
    407 
    408     def set_status(self,status):
    409         logging.info('%s -> %s', self.hostname, status)
    410         self.update_field('status',status)
    411 
    412 
    413     def platform_and_labels(self):
    414         """
    415         Returns a tuple (platform_name, list_of_all_label_names).
    416         """
    417         rows = _db.execute("""
    418                 SELECT afe_labels.name, afe_labels.platform
    419                 FROM afe_labels
    420                 INNER JOIN afe_hosts_labels ON
    421                         afe_labels.id = afe_hosts_labels.label_id
    422                 WHERE afe_hosts_labels.host_id = %s
    423                 ORDER BY afe_labels.name
    424                 """, (self.id,))
    425         platform = None
    426         all_labels = []
    427         for label_name, is_platform in rows:
    428             if is_platform:
    429                 platform = label_name
    430             all_labels.append(label_name)
    431         return platform, all_labels
    432 
    433 
    434     _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
    435 
    436 
    437     @classmethod
    438     def cmp_for_sort(cls, a, b):
    439         """
    440         A comparison function for sorting Host objects by hostname.
    441 
    442         This strips any trailing numeric digits, ignores leading 0s and
    443         compares hostnames by the leading name and the trailing digits as a
    444         number.  If both hostnames do not match this pattern, they are simply
    445         compared as lower case strings.
    446 
    447         Example of how hostnames will be sorted:
    448 
    449           alice, host1, host2, host09, host010, host10, host11, yolkfolk
    450 
    451         This hopefully satisfy most people's hostname sorting needs regardless
    452         of their exact naming schemes.  Nobody sane should have both a host10
    453         and host010 (but the algorithm works regardless).
    454         """
    455         lower_a = a.hostname.lower()
    456         lower_b = b.hostname.lower()
    457         match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
    458         match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
    459         if match_a and match_b:
    460             name_a, number_a_str = match_a.groups()
    461             name_b, number_b_str = match_b.groups()
    462             number_a = int(number_a_str.lstrip('0'))
    463             number_b = int(number_b_str.lstrip('0'))
    464             result = cmp((name_a, number_a), (name_b, number_b))
    465             if result == 0 and lower_a != lower_b:
    466                 # If they compared equal above but the lower case names are
    467                 # indeed different, don't report equality.  abc012 != abc12.
    468                 return cmp(lower_a, lower_b)
    469             return result
    470         else:
    471             return cmp(lower_a, lower_b)
    472 
    473 
    474 class HostQueueEntry(DBObject):
    475     _table_name = 'afe_host_queue_entries'
    476     _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
    477                'active', 'complete', 'deleted', 'execution_subdir',
    478                'atomic_group_id', 'aborted', 'started_on', 'finished_on')
    479 
    480     _COMPLETION_COUNT_METRIC = metrics.Counter(
    481         'chromeos/autotest/scheduler/hqe_completion_count')
    482 
    483     def __init__(self, id=None, row=None, job_row=None, **kwargs):
    484         """
    485         @param id: ID field from afe_host_queue_entries table.
    486                    Either id or row should be specified for initialization.
    487         @param row: The DB row for a particular HostQueueEntry.
    488                     Either id or row should be specified for initialization.
    489         @param job_row: The DB row for the job of this HostQueueEntry.
    490         """
    491         assert id or row
    492         super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
    493         self.job = Job(self.job_id, row=job_row)
    494 
    495         if self.host_id:
    496             self.host = rdb_lib.get_hosts([self.host_id])[0]
    497             self.host.dbg_str = self.get_dbg_str()
    498             self.host.metadata = get_job_metadata(self.job)
    499         else:
    500             self.host = None
    501 
    502 
    503     @classmethod
    504     def clone(cls, template):
    505         """
    506         Creates a new row using the values from a template instance.
    507 
    508         The new instance will not exist in the database or have a valid
    509         id attribute until its save() method is called.
    510         """
    511         assert isinstance(template, cls)
    512         new_row = [getattr(template, field) for field in cls._fields]
    513         clone = cls(row=new_row, new_record=True)
    514         clone.id = None
    515         return clone
    516 
    517 
    518     @classmethod
    519     def fetch(cls, where='', params=(), joins='', order_by=''):
    520         """
    521         Construct instances of our class based on the given database query.
    522 
    523         @yields One class instance for each row fetched.
    524         """
    525         # Override the original fetch method to pre-fetch the jobs from the DB
    526         # in order to prevent each HQE making separate DB queries.
    527         rows = cls.fetch_rows(where=where, params=params, joins=joins,
    528                               order_by=order_by)
    529         if len(rows) <= 1:
    530             return [cls(id=row[0], row=row) for row in rows]
    531 
    532         job_params = ', '.join([str(row[1]) for row in rows])
    533         job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params))
    534         # Create a Job_id to Job_row match dictionary to match the HQE
    535         # to its corresponding job.
    536         job_dict = {job_row[0]: job_row for job_row in job_rows}
    537         return [cls(id=row[0], row=row, job_row=job_dict.get(row[1]))
    538                 for row in rows]
    539 
    540 
    541     def _view_job_url(self):
    542         return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
    543 
    544 
    545     def get_labels(self):
    546         """
    547         Get all labels associated with this host queue entry (either via the
    548         meta_host or as a job dependency label).  The labels yielded are not
    549         guaranteed to be unique.
    550 
    551         @yields Label instances associated with this host_queue_entry.
    552         """
    553         if self.meta_host:
    554             yield Label(id=self.meta_host, always_query=False)
    555         labels = Label.fetch(
    556                 joins="JOIN afe_jobs_dependency_labels AS deps "
    557                       "ON (afe_labels.id = deps.label_id)",
    558                 where="deps.job_id = %d" % self.job.id)
    559         for label in labels:
    560             yield label
    561 
    562 
    563     def set_host(self, host):
    564         if host:
    565             logging.info('Assigning host %s to entry %s', host.hostname, self)
    566             self.update_field('host_id', host.id)
    567             self.block_host(host.id)
    568         else:
    569             logging.info('Releasing host from %s', self)
    570             self.unblock_host(self.host.id)
    571             self.update_field('host_id', None)
    572 
    573         self.host = host
    574 
    575 
    576     def block_host(self, host_id):
    577         logging.info("creating block %s/%s", self.job.id, host_id)
    578         row = [0, self.job.id, host_id]
    579         block = IneligibleHostQueue(row=row, new_record=True)
    580         block.save()
    581 
    582 
    583     def unblock_host(self, host_id):
    584         logging.info("removing block %s/%s", self.job.id, host_id)
    585         blocks = IneligibleHostQueue.fetch(
    586             'job_id=%d and host_id=%d' % (self.job.id, host_id))
    587         for block in blocks:
    588             block.delete()
    589 
    590 
    591     def set_execution_subdir(self, subdir=None):
    592         if subdir is None:
    593             assert self.host
    594             subdir = self.host.hostname
    595         self.update_field('execution_subdir', subdir)
    596 
    597 
    598     def _get_hostname(self):
    599         if self.host:
    600             return self.host.hostname
    601         return 'no host'
    602 
    603 
    604     def get_dbg_str(self):
    605         """Get a debug string to identify this host.
    606 
    607         @return: A string containing the hqe and job id.
    608         """
    609         try:
    610             return 'HQE: %s, for job: %s' % (self.id, self.job_id)
    611         except AttributeError as e:
    612             return 'HQE has not been initialized yet: %s' % e
    613 
    614 
    615     def __str__(self):
    616         flags = []
    617         if self.active:
    618             flags.append('active')
    619         if self.complete:
    620             flags.append('complete')
    621         if self.deleted:
    622             flags.append('deleted')
    623         if self.aborted:
    624             flags.append('aborted')
    625         flags_str = ','.join(flags)
    626         if flags_str:
    627             flags_str = ' [%s]' % flags_str
    628         return ("%s and host: %s has status:%s%s" %
    629                 (self.get_dbg_str(), self._get_hostname(), self.status,
    630                  flags_str))
    631 
    632 
    633     def set_status(self, status):
    634         logging.info("%s -> %s", self, status)
    635 
    636         self.update_field('status', status)
    637 
    638         active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
    639         complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
    640         assert not (active and complete)
    641 
    642         self.update_field('active', active)
    643 
    644         # The ordering of these operations is important. Once we set the
    645         # complete bit this job will become indistinguishable from all
    646         # the other complete jobs, unless we first set shard_id to NULL
    647         # to signal to the shard_client that we need to upload it. However,
    648         # we can only set both these after we've updated finished_on etc
    649         # within _on_complete or the job will get synced in an intermediate
    650         # state. This means that if someone sigkills the scheduler between
    651         # setting finished_on and complete, we will have inconsistent jobs.
    652         # This should be fine, because nothing critical checks finished_on,
    653         # and the scheduler should never be killed mid-tick.
    654         if complete:
    655             self._on_complete(status)
    656             self._email_on_job_complete()
    657 
    658         self.update_field('complete', complete)
    659 
    660         should_email_status = (status.lower() in _notify_email_statuses or
    661                                'all' in _notify_email_statuses)
    662         if should_email_status:
    663             self._email_on_status(status)
    664         logging.debug('HQE Set Status Complete')
    665 
    666 
    667     def _on_complete(self, status):
    668         metric_fields = {'status': status.lower()}
    669         if self.host:
    670             metric_fields['board'] = self.host.board or ''
    671             if len(self.host.pools) == 1:
    672                 metric_fields['pool'] = self.host.pools[0]
    673             else:
    674                 metric_fields['pool'] = 'MULTIPLE'
    675         else:
    676             metric_fields['board'] = 'NO_HOST'
    677             metric_fields['pool'] = 'NO_HOST'
    678         self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
    679         if status is not models.HostQueueEntry.Status.ABORTED:
    680             self.job.stop_if_necessary()
    681         if self.started_on:
    682             self.set_finished_on_now()
    683         if self.job.shard_id is not None:
    684             # If shard_id is None, the job will be synced back to the master
    685             self.job.update_field('shard_id', None)
    686         if not self.execution_subdir:
    687             return
    688         # unregister any possible pidfiles associated with this queue entry
    689         for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
    690             pidfile_id = _drone_manager.get_pidfile_id_from(
    691                     self.execution_path(), pidfile_name=pidfile_name)
    692             _drone_manager.unregister_pidfile(pidfile_id)
    693 
    694 
    695     def _get_status_email_contents(self, status, summary=None, hostname=None):
    696         """
    697         Gather info for the status notification e-mails.
    698 
    699         If needed, we could start using the Django templating engine to create
    700         the subject and the e-mail body, but that doesn't seem necessary right
    701         now.
    702 
    703         @param status: Job status text. Mandatory.
    704         @param summary: Job summary text. Optional.
    705         @param hostname: A hostname for the job. Optional.
    706 
    707         @return: Tuple (subject, body) for the notification e-mail.
    708         """
    709         job_stats = Job(id=self.job.id).get_execution_details()
    710 
    711         subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
    712                    (self.job.id, self.job.name, status))
    713 
    714         if hostname is not None:
    715             subject += '| Hostname: %s ' % hostname
    716 
    717         if status not in ["1 Failed", "Failed"]:
    718             subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
    719 
    720         body =  "Job ID: %s\n" % self.job.id
    721         body += "Job name: %s\n" % self.job.name
    722         if hostname is not None:
    723             body += "Host: %s\n" % hostname
    724         if summary is not None:
    725             body += "Summary: %s\n" % summary
    726         body += "Status: %s\n" % status
    727         body += "Results interface URL: %s\n" % self._view_job_url()
    728         body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
    729         if int(job_stats['total_executed']) > 0:
    730             body += "User tests executed: %s\n" % job_stats['total_executed']
    731             body += "User tests passed: %s\n" % job_stats['total_passed']
    732             body += "User tests failed: %s\n" % job_stats['total_failed']
    733             body += ("User tests success rate: %.2f %%\n" %
    734                      job_stats['success_rate'])
    735 
    736         if job_stats['failed_rows']:
    737             body += "Failures:\n"
    738             body += job_stats['failed_rows']
    739 
    740         return subject, body
    741 
    742 
    743     def _email_on_status(self, status):
    744         hostname = self._get_hostname()
    745         subject, body = self._get_status_email_contents(status, None, hostname)
    746         email_manager.manager.send_email(self.job.email_list, subject, body)
    747 
    748 
    749     def _email_on_job_complete(self):
    750         if not self.job.is_finished():
    751             return
    752 
    753         summary = []
    754         hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
    755         for queue_entry in hosts_queue:
    756             summary.append("Host: %s Status: %s" %
    757                                 (queue_entry._get_hostname(),
    758                                  queue_entry.status))
    759 
    760         summary = "\n".join(summary)
    761         status_counts = models.Job.objects.get_status_counts(
    762                 [self.job.id])[self.job.id]
    763         status = ', '.join('%d %s' % (count, status) for status, count
    764                     in status_counts.iteritems())
    765 
    766         subject, body = self._get_status_email_contents(status, summary, None)
    767         email_manager.manager.send_email(self.job.email_list, subject, body)
    768 
    769 
    770     def schedule_pre_job_tasks(self):
    771         logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
    772                      self.job.name, self.meta_host, self.atomic_group_id,
    773                      self.job.id, self.id, self.host.hostname, self.status)
    774 
    775         self._do_schedule_pre_job_tasks()
    776 
    777 
    778     def _do_schedule_pre_job_tasks(self):
    779         self.job.schedule_pre_job_tasks(queue_entry=self)
    780 
    781 
    782     def requeue(self):
    783         assert self.host
    784         self.set_status(models.HostQueueEntry.Status.QUEUED)
    785         self.update_field('started_on', None)
    786         self.update_field('finished_on', None)
    787         # verify/cleanup failure sets the execution subdir, so reset it here
    788         self.set_execution_subdir('')
    789         if self.meta_host:
    790             self.set_host(None)
    791 
    792 
    793     @property
    794     def aborted_by(self):
    795         self._load_abort_info()
    796         return self._aborted_by
    797 
    798 
    799     @property
    800     def aborted_on(self):
    801         self._load_abort_info()
    802         return self._aborted_on
    803 
    804 
    805     def _load_abort_info(self):
    806         """ Fetch info about who aborted the job. """
    807         if hasattr(self, "_aborted_by"):
    808             return
    809         rows = _db.execute("""
    810                 SELECT afe_users.login,
    811                         afe_aborted_host_queue_entries.aborted_on
    812                 FROM afe_aborted_host_queue_entries
    813                 INNER JOIN afe_users
    814                 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
    815                 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
    816                 """, (self.id,))
    817         if rows:
    818             self._aborted_by, self._aborted_on = rows[0]
    819         else:
    820             self._aborted_by = self._aborted_on = None
    821 
    822 
    823     def on_pending(self):
    824         """
    825         Called when an entry in a synchronous job has passed verify.  If the
    826         job is ready to run, sets the entries to STARTING. Otherwise, it leaves
    827         them in PENDING.
    828         """
    829         self.set_status(models.HostQueueEntry.Status.PENDING)
    830         self.host.set_status(models.Host.Status.PENDING)
    831 
    832         # Some debug code here: sends an email if an asynchronous job does not
    833         # immediately enter Starting.
    834         # TODO: Remove this once we figure out why asynchronous jobs are getting
    835         # stuck in Pending.
    836         self.job.run_if_ready(queue_entry=self)
    837         if (self.job.synch_count == 1 and
    838                 self.status == models.HostQueueEntry.Status.PENDING):
    839             subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
    840             message = 'Asynchronous job stuck in Pending'
    841             email_manager.manager.enqueue_notify_email(subject, message)
    842 
    843 
    844     def abort(self, dispatcher):
    845         assert self.aborted and not self.complete
    846 
    847         Status = models.HostQueueEntry.Status
    848         if self.status in {Status.GATHERING, Status.PARSING}:
    849             # do nothing; post-job tasks will finish and then mark this entry
    850             # with status "Aborted" and take care of the host
    851             return
    852 
    853         if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
    854             # If hqe is in any of these status, it should not have any
    855             # unfinished agent before it can be aborted.
    856             agents = dispatcher.get_agents_for_entry(self)
    857             # Agent with finished task can be left behind. This is added to
    858             # handle the special case of aborting hostless job in STARTING
    859             # status, in which the agent has only a HostlessQueueTask
    860             # associated. The finished HostlessQueueTask will be cleaned up in
    861             # the next tick, so it's safe to leave the agent there. Without
    862             # filtering out finished agent, HQE abort won't be able to proceed.
    863             assert all([agent.is_done() for agent in agents])
    864             # If hqe is still in STARTING status, it may not have assigned a
    865             # host yet.
    866             if self.host:
    867                 self.host.set_status(models.Host.Status.READY)
    868         elif (self.status == Status.VERIFYING or
    869               self.status == Status.RESETTING):
    870             models.SpecialTask.objects.create(
    871                     task=models.SpecialTask.Task.CLEANUP,
    872                     host=models.Host.objects.get(id=self.host.id),
    873                     requested_by=self.job.owner_model())
    874         elif self.status == Status.PROVISIONING:
    875             models.SpecialTask.objects.create(
    876                     task=models.SpecialTask.Task.REPAIR,
    877                     host=models.Host.objects.get(id=self.host.id),
    878                     requested_by=self.job.owner_model())
    879 
    880         self.set_status(Status.ABORTED)
    881 
    882 
    883     def execution_tag(self):
    884         SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
    885                                'complete!=1 AND execution_subdir="" AND '
    886                                'status!="Queued";')
    887         SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
    888                                  'status="Aborted" WHERE id=%s;')
    889         try:
    890             assert self.execution_subdir
    891         except AssertionError:
    892             # TODO(scottz): Remove temporary fix/info gathering pathway for
    893             # crosbug.com/31595 once issue is root caused.
    894             logging.error('No execution_subdir for host queue id:%s.', self.id)
    895             logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
    896             for row in _db.execute(SQL_SUSPECT_ENTRIES):
    897                 logging.error(row)
    898             logging.error('====DB DEBUG====\n')
    899             fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
    900             logging.error('EXECUTING: %s', fix_query)
    901             _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
    902             raise AssertionError(('self.execution_subdir not found. '
    903                                   'See log for details.'))
    904 
    905         return "%s/%s" % (self.job.tag(), self.execution_subdir)
    906 
    907 
    908     def execution_path(self):
    909         return self.execution_tag()
    910 
    911 
    912     def set_started_on_now(self):
    913         self.update_field('started_on', datetime.datetime.now())
    914 
    915 
    916     def set_finished_on_now(self):
    917         self.update_field('finished_on', datetime.datetime.now())
    918 
    919 
    920     def is_hostless(self):
    921         return (self.host_id is None
    922                 and self.meta_host is None)
    923 
    924 
    925 class Job(DBObject):
    926     _table_name = 'afe_jobs'
    927     _fields = ('id', 'owner', 'name', 'priority', 'control_file',
    928                'control_type', 'created_on', 'synch_count', 'timeout',
    929                'run_verify', 'email_list', 'reboot_before', 'reboot_after',
    930                'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
    931                'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
    932                'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
    933                'require_ssp')
    934 
    935     # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
    936     # all status='Pending' atomic group HQEs incase a delay was running when the
    937     # scheduler was restarted and no more hosts ever successfully exit Verify.
    938 
    939     def __init__(self, id=None, row=None, **kwargs):
    940         assert id or row
    941         super(Job, self).__init__(id=id, row=row, **kwargs)
    942         self._owner_model = None # caches model instance of owner
    943         self.update_image_path = None # path of OS image to install
    944 
    945 
    946     def model(self):
    947         return models.Job.objects.get(id=self.id)
    948 
    949 
    950     def owner_model(self):
    951         # work around the fact that the Job owner field is a string, not a
    952         # foreign key
    953         if not self._owner_model:
    954             self._owner_model = models.User.objects.get(login=self.owner)
    955         return self._owner_model
    956 
    957 
    958     def tag(self):
    959         return "%s-%s" % (self.id, self.owner)
    960 
    961 
    962     def get_execution_details(self):
    963         """
    964         Get test execution details for this job.
    965 
    966         @return: Dictionary with test execution details
    967         """
    968         def _find_test_jobs(rows):
    969             """
    970             Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
    971             Those are autotest 'internal job' tests, so they should not be
    972             counted when evaluating the test stats.
    973 
    974             @param rows: List of rows (matrix) with database results.
    975             """
    976             job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
    977             n_test_jobs = 0
    978             for r in rows:
    979                 test_name = r[0]
    980                 if job_test_pattern.match(test_name):
    981                     n_test_jobs += 1
    982 
    983             return n_test_jobs
    984 
    985         stats = {}
    986 
    987         rows = _db.execute("""
    988                 SELECT t.test, s.word, t.reason
    989                 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
    990                 WHERE t.job_idx = j.job_idx
    991                 AND s.status_idx = t.status
    992                 AND j.afe_job_id = %s
    993                 ORDER BY t.reason
    994                 """ % self.id)
    995 
    996         failed_rows = [r for r in rows if not r[1] == 'GOOD']
    997 
    998         n_test_jobs = _find_test_jobs(rows)
    999         n_test_jobs_failed = _find_test_jobs(failed_rows)
   1000 
   1001         total_executed = len(rows) - n_test_jobs
   1002         total_failed = len(failed_rows) - n_test_jobs_failed
   1003 
   1004         if total_executed > 0:
   1005             success_rate = 100 - ((total_failed / float(total_executed)) * 100)
   1006         else:
   1007             success_rate = 0
   1008 
   1009         stats['total_executed'] = total_executed
   1010         stats['total_failed'] = total_failed
   1011         stats['total_passed'] = total_executed - total_failed
   1012         stats['success_rate'] = success_rate
   1013 
   1014         status_header = ("Test Name", "Status", "Reason")
   1015         if failed_rows:
   1016             stats['failed_rows'] = utils.matrix_to_string(failed_rows,
   1017                                                           status_header)
   1018         else:
   1019             stats['failed_rows'] = ''
   1020 
   1021         time_row = _db.execute("""
   1022                    SELECT started_time, finished_time
   1023                    FROM tko_jobs
   1024                    WHERE afe_job_id = %s
   1025                    """ % self.id)
   1026 
   1027         if time_row:
   1028             t_begin, t_end = time_row[0]
   1029             try:
   1030                 delta = t_end - t_begin
   1031                 minutes, seconds = divmod(delta.seconds, 60)
   1032                 hours, minutes = divmod(minutes, 60)
   1033                 stats['execution_time'] = ("%02d:%02d:%02d" %
   1034                                            (hours, minutes, seconds))
   1035             # One of t_end or t_begin are None
   1036             except TypeError:
   1037                 stats['execution_time'] = '(could not determine)'
   1038         else:
   1039             stats['execution_time'] = '(none)'
   1040 
   1041         return stats
   1042 
   1043 
   1044     def keyval_dict(self):
   1045         return self.model().keyval_dict()
   1046 
   1047 
   1048     def _pending_count(self):
   1049         """The number of HostQueueEntries for this job in the Pending state."""
   1050         pending_entries = models.HostQueueEntry.objects.filter(
   1051                 job=self.id, status=models.HostQueueEntry.Status.PENDING)
   1052         return pending_entries.count()
   1053 
   1054 
   1055     def is_ready(self):
   1056         pending_count = self._pending_count()
   1057         ready = (pending_count >= self.synch_count)
   1058 
   1059         if not ready:
   1060             logging.info(
   1061                     'Job %s not ready: %s pending, %s required ',
   1062                     self, pending_count, self.synch_count)
   1063 
   1064         return ready
   1065 
   1066 
   1067     def num_machines(self, clause = None):
   1068         sql = "job_id=%s" % self.id
   1069         if clause:
   1070             sql += " AND (%s)" % clause
   1071         return self.count(sql, table='afe_host_queue_entries')
   1072 
   1073 
   1074     def num_queued(self):
   1075         return self.num_machines('not complete')
   1076 
   1077 
   1078     def num_active(self):
   1079         return self.num_machines('active')
   1080 
   1081 
   1082     def num_complete(self):
   1083         return self.num_machines('complete')
   1084 
   1085 
   1086     def is_finished(self):
   1087         return self.num_complete() == self.num_machines()
   1088 
   1089 
   1090     def _not_yet_run_entries(self, include_active=True):
   1091         if include_active:
   1092           statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
   1093         else:
   1094           statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
   1095         return models.HostQueueEntry.objects.filter(job=self.id,
   1096                                                     status__in=statuses)
   1097 
   1098 
   1099     def _stop_all_entries(self):
   1100         """Stops the job's inactive pre-job HQEs."""
   1101         entries_to_stop = self._not_yet_run_entries(
   1102             include_active=False)
   1103         for child_entry in entries_to_stop:
   1104             assert not child_entry.complete, (
   1105                 '%s status=%s, active=%s, complete=%s' %
   1106                 (child_entry.id, child_entry.status, child_entry.active,
   1107                  child_entry.complete))
   1108             if child_entry.status == models.HostQueueEntry.Status.PENDING:
   1109                 child_entry.host.status = models.Host.Status.READY
   1110                 child_entry.host.save()
   1111             child_entry.status = models.HostQueueEntry.Status.STOPPED
   1112             child_entry.save()
   1113 
   1114 
   1115     def stop_if_necessary(self):
   1116         not_yet_run = self._not_yet_run_entries()
   1117         if not_yet_run.count() < self.synch_count:
   1118             self._stop_all_entries()
   1119 
   1120 
   1121     def _next_group_name(self):
   1122         """@returns a directory name to use for the next host group results."""
   1123         group_name = ''
   1124         group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
   1125         query = models.HostQueueEntry.objects.filter(
   1126             job=self.id).values('execution_subdir').distinct()
   1127         subdirs = (entry['execution_subdir'] for entry in query)
   1128         group_matches = (group_count_re.match(subdir) for subdir in subdirs)
   1129         ids = [int(match.group(1)) for match in group_matches if match]
   1130         if ids:
   1131             next_id = max(ids) + 1
   1132         else:
   1133             next_id = 0
   1134         return '%sgroup%d' % (group_name, next_id)
   1135 
   1136 
   1137     def get_group_entries(self, queue_entry_from_group):
   1138         """
   1139         @param queue_entry_from_group: A HostQueueEntry instance to find other
   1140                 group entries on this job for.
   1141 
   1142         @returns A list of HostQueueEntry objects all executing this job as
   1143                 part of the same group as the one supplied (having the same
   1144                 execution_subdir).
   1145         """
   1146         execution_subdir = queue_entry_from_group.execution_subdir
   1147         return list(HostQueueEntry.fetch(
   1148             where='job_id=%s AND execution_subdir=%s',
   1149             params=(self.id, execution_subdir)))
   1150 
   1151 
   1152     def _should_run_cleanup(self, queue_entry):
   1153         if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
   1154             return True
   1155         elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
   1156             return queue_entry.host.dirty
   1157         return False
   1158 
   1159 
   1160     def _should_run_verify(self, queue_entry):
   1161         do_not_verify = (queue_entry.host.protection ==
   1162                          host_protections.Protection.DO_NOT_VERIFY)
   1163         if do_not_verify:
   1164             return False
   1165         # If RebootBefore is set to NEVER, then we won't run reset because
   1166         # we can't cleanup, so we need to weaken a Reset into a Verify.
   1167         weaker_reset = (self.run_reset and
   1168                 self.reboot_before == model_attributes.RebootBefore.NEVER)
   1169         return self.run_verify or weaker_reset
   1170 
   1171 
   1172     def _should_run_reset(self, queue_entry):
   1173         can_verify = (queue_entry.host.protection !=
   1174                          host_protections.Protection.DO_NOT_VERIFY)
   1175         can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
   1176         return (can_reboot and can_verify and (self.run_reset or
   1177                 (self._should_run_cleanup(queue_entry) and
   1178                  self._should_run_verify(queue_entry))))
   1179 
   1180 
   1181     def _should_run_provision(self, queue_entry):
   1182         """
   1183         Determine if the queue_entry needs to have a provision task run before
   1184         it to provision queue_entry.host.
   1185 
   1186         @param queue_entry: The host queue entry in question.
   1187         @returns: True if we should schedule a provision task, False otherwise.
   1188 
   1189         """
   1190         # If we get to this point, it means that the scheduler has already
   1191         # vetted that all the unprovisionable labels match, so we can just
   1192         # find all labels on the job that aren't on the host to get the list
   1193         # of what we need to provision.  (See the scheduling logic in
   1194         # host_scheduler.py:is_host_eligable_for_job() where we discard all
   1195         # actionable labels when assigning jobs to hosts.)
   1196         job_labels = {x.name for x in queue_entry.get_labels()}
   1197         # Skip provision if `skip_provision` is listed in the job labels.
   1198         if provision.SKIP_PROVISION in job_labels:
   1199             return False
   1200         _, host_labels = queue_entry.host.platform_and_labels()
   1201         # If there are any labels on the job that are not on the host and they
   1202         # are labels that provisioning knows how to change, then that means
   1203         # there is provisioning work to do.  If there's no provisioning work to
   1204         # do, then obviously we have no reason to schedule a provision task!
   1205         diff = job_labels - set(host_labels)
   1206         if any([provision.Provision.acts_on(x) for x in diff]):
   1207             return True
   1208         return False
   1209 
   1210 
   1211     def _queue_special_task(self, queue_entry, task):
   1212         """
   1213         Create a special task and associate it with a host queue entry.
   1214 
   1215         @param queue_entry: The queue entry this special task should be
   1216                             associated with.
   1217         @param task: One of the members of the enum models.SpecialTask.Task.
   1218         @returns: None
   1219 
   1220         """
   1221         models.SpecialTask.objects.create(
   1222                 host=models.Host.objects.get(id=queue_entry.host_id),
   1223                 queue_entry=queue_entry, task=task)
   1224 
   1225 
   1226     def schedule_pre_job_tasks(self, queue_entry):
   1227         """
   1228         Queue all of the special tasks that need to be run before a host
   1229         queue entry may run.
   1230 
   1231         If no special taskes need to be scheduled, then |on_pending| will be
   1232         called directly.
   1233 
   1234         @returns None
   1235 
   1236         """
   1237         task_queued = False
   1238         hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
   1239 
   1240         if self._should_run_provision(queue_entry):
   1241             self._queue_special_task(hqe_model,
   1242                                      models.SpecialTask.Task.PROVISION)
   1243             task_queued = True
   1244         elif self._should_run_reset(queue_entry):
   1245             self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
   1246             task_queued = True
   1247         else:
   1248             if self._should_run_cleanup(queue_entry):
   1249                 self._queue_special_task(hqe_model,
   1250                                          models.SpecialTask.Task.CLEANUP)
   1251                 task_queued = True
   1252             if self._should_run_verify(queue_entry):
   1253                 self._queue_special_task(hqe_model,
   1254                                          models.SpecialTask.Task.VERIFY)
   1255                 task_queued = True
   1256 
   1257         if not task_queued:
   1258             queue_entry.on_pending()
   1259 
   1260 
   1261     def _assign_new_group(self, queue_entries):
   1262         if len(queue_entries) == 1:
   1263             group_subdir_name = queue_entries[0].host.hostname
   1264         else:
   1265             group_subdir_name = self._next_group_name()
   1266             logging.info('Running synchronous job %d hosts %s as %s',
   1267                 self.id, [entry.host.hostname for entry in queue_entries],
   1268                 group_subdir_name)
   1269 
   1270         for queue_entry in queue_entries:
   1271             queue_entry.set_execution_subdir(group_subdir_name)
   1272 
   1273 
   1274     def _choose_group_to_run(self, include_queue_entry):
   1275         """
   1276         @returns A tuple containing a list of HostQueueEntry instances to be
   1277                 used to run this Job, a string group name to suggest giving
   1278                 to this job in the results database.
   1279         """
   1280         chosen_entries = [include_queue_entry]
   1281         num_entries_wanted = self.synch_count
   1282         num_entries_wanted -= len(chosen_entries)
   1283 
   1284         if num_entries_wanted > 0:
   1285             where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
   1286             pending_entries = list(HostQueueEntry.fetch(
   1287                      where=where_clause,
   1288                      params=(self.id, include_queue_entry.id)))
   1289 
   1290             # Sort the chosen hosts by hostname before slicing.
   1291             def cmp_queue_entries_by_hostname(entry_a, entry_b):
   1292                 return Host.cmp_for_sort(entry_a.host, entry_b.host)
   1293             pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
   1294             chosen_entries += pending_entries[:num_entries_wanted]
   1295 
   1296         # Sanity check.  We'll only ever be called if this can be met.
   1297         if len(chosen_entries) < self.synch_count:
   1298             message = ('job %s got less than %s chosen entries: %s' % (
   1299                     self.id, self.synch_count, chosen_entries))
   1300             logging.error(message)
   1301             email_manager.manager.enqueue_notify_email(
   1302                     'Job not started, too few chosen entries', message)
   1303             return []
   1304 
   1305         self._assign_new_group(chosen_entries)
   1306         return chosen_entries
   1307 
   1308 
   1309     def run_if_ready(self, queue_entry):
   1310         """
   1311         Run this job by kicking its HQEs into status='Starting' if enough
   1312         hosts are ready for it to run.
   1313 
   1314         Cleans up by kicking HQEs into status='Stopped' if this Job is not
   1315         ready to run.
   1316         """
   1317         if not self.is_ready():
   1318             self.stop_if_necessary()
   1319         else:
   1320             self.run(queue_entry)
   1321 
   1322 
   1323     def request_abort(self):
   1324         """Request that this Job be aborted on the next scheduler cycle."""
   1325         self.model().abort()
   1326 
   1327 
   1328     def run(self, queue_entry):
   1329         """
   1330         @param queue_entry: The HostQueueEntry instance calling this method.
   1331         """
   1332         queue_entries = self._choose_group_to_run(queue_entry)
   1333         if queue_entries:
   1334             self._finish_run(queue_entries)
   1335 
   1336 
   1337     def _finish_run(self, queue_entries):
   1338         for queue_entry in queue_entries:
   1339             queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
   1340 
   1341 
   1342     def __str__(self):
   1343         return '%s-%s' % (self.id, self.owner)
   1344