Home | History | Annotate | Download | only in scheduler
      1 # pylint: disable=missing-docstring
      2 
      3 """Database model classes for the scheduler.
      4 
      5 Contains model classes abstracting the various DB tables used by the scheduler.
      6 These overlap the Django models in basic functionality, but were written before
      7 the Django models existed and have not yet been phased out.  Some of them
      8 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic
      9 which would probably be ill-suited for inclusion in the general Django model
     10 classes.
     11 
     12 Globals:
     13 _notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
     14         one of these statuses, an email will be sent to the job's email_list.
     15         comes from global_config.
     16 _base_url: URL to the local AFE server, used to construct URLs for emails.
     17 _db: DatabaseConnection for this module.
     18 _drone_manager: reference to global DroneManager instance.
     19 """
     20 
     21 import datetime
     22 import itertools
     23 import logging
     24 import re
     25 import time
     26 import weakref
     27 
     28 from autotest_lib.client.common_lib import global_config, host_protections
     29 from autotest_lib.client.common_lib import time_utils
     30 from autotest_lib.client.common_lib import utils
     31 from autotest_lib.client.common_lib.cros.graphite import autotest_es
     32 from autotest_lib.frontend.afe import models, model_attributes
     33 from autotest_lib.scheduler import drone_manager, email_manager
     34 from autotest_lib.scheduler import rdb_lib
     35 from autotest_lib.scheduler import scheduler_config
     36 from autotest_lib.scheduler import scheduler_lib
     37 from autotest_lib.server import afe_urls
     38 from autotest_lib.server.cros import provision
     39 
     40 try:
     41     from chromite.lib import metrics
     42 except ImportError:
     43     metrics = utils.metrics_mock
     44 
     45 
     46 _notify_email_statuses = []
     47 _base_url = None
     48 
     49 _db = None
     50 _drone_manager = None
     51 
     52 def initialize():
     53     global _db
     54     _db = scheduler_lib.ConnectionManager().get_connection()
     55 
     56     notify_statuses_list = global_config.global_config.get_config_value(
     57             scheduler_config.CONFIG_SECTION, "notify_email_statuses",
     58             default='')
     59     global _notify_email_statuses
     60     _notify_email_statuses = [status for status in
     61                               re.split(r'[\s,;:]', notify_statuses_list.lower())
     62                               if status]
     63 
     64     # AUTOTEST_WEB.base_url is still a supported config option as some people
     65     # may wish to override the entire url.
     66     global _base_url
     67     config_base_url = global_config.global_config.get_config_value(
     68             scheduler_config.CONFIG_SECTION, 'base_url', default='')
     69     if config_base_url:
     70         _base_url = config_base_url
     71     else:
     72         _base_url = afe_urls.ROOT_URL
     73 
     74     initialize_globals()
     75 
     76 
     77 def initialize_globals():
     78     global _drone_manager
     79     _drone_manager = drone_manager.instance()
     80 
     81 
     82 def get_job_metadata(job):
     83     """Get a dictionary of the job information.
     84 
     85     The return value is a dictionary that includes job information like id,
     86     name and parent job information. The value will be stored in metadata
     87     database.
     88 
     89     @param job: A Job object.
     90     @return: A dictionary containing the job id, owner and name.
     91     """
     92     if not job:
     93         logging.error('Job is None, no metadata returned.')
     94         return {}
     95     try:
     96         return {'job_id': job.id,
     97                 'owner': job.owner,
     98                 'job_name': job.name,
     99                 'parent_job_id': job.parent_job_id}
    100     except AttributeError as e:
    101         logging.error('Job has missing attribute: %s', e)
    102         return {}
    103 
    104 
    105 class DelayedCallTask(object):
    106     """
    107     A task object like AgentTask for an Agent to run that waits for the
    108     specified amount of time to have elapsed before calling the supplied
    109     callback once and finishing.  If the callback returns anything, it is
    110     assumed to be a new Agent instance and will be added to the dispatcher.
    111 
    112     @attribute end_time: The absolute posix time after which this task will
    113             call its callback when it is polled and be finished.
    114 
    115     Also has all attributes required by the Agent class.
    116     """
    117     def __init__(self, delay_seconds, callback, now_func=None):
    118         """
    119         @param delay_seconds: The delay in seconds from now that this task
    120                 will call the supplied callback and be done.
    121         @param callback: A callable to be called by this task once after at
    122                 least delay_seconds time has elapsed.  It must return None
    123                 or a new Agent instance.
    124         @param now_func: A time.time like function.  Default: time.time.
    125                 Used for testing.
    126         """
    127         assert delay_seconds > 0
    128         assert callable(callback)
    129         if not now_func:
    130             now_func = time.time
    131         self._now_func = now_func
    132         self._callback = callback
    133 
    134         self.end_time = self._now_func() + delay_seconds
    135 
    136         # These attributes are required by Agent.
    137         self.aborted = False
    138         self.host_ids = ()
    139         self.success = False
    140         self.queue_entry_ids = ()
    141         self.num_processes = 0
    142 
    143 
    144     def poll(self):
    145         if not self.is_done() and self._now_func() >= self.end_time:
    146             self._callback()
    147             self.success = True
    148 
    149 
    150     def is_done(self):
    151         return self.success or self.aborted
    152 
    153 
    154     def abort(self):
    155         self.aborted = True
    156 
    157 
    158 class DBError(Exception):
    159     """Raised by the DBObject constructor when its select fails."""
    160 
    161 
    162 class DBObject(object):
    163     """A miniature object relational model for the database."""
    164 
    165     # Subclasses MUST override these:
    166     _table_name = ''
    167     _fields = ()
    168 
    169     # A mapping from (type, id) to the instance of the object for that
    170     # particular id.  This prevents us from creating new Job() and Host()
    171     # instances for every HostQueueEntry object that we instantiate as
    172     # multiple HQEs often share the same Job.
    173     _instances_by_type_and_id = weakref.WeakValueDictionary()
    174     _initialized = False
    175 
    176 
    177     def __new__(cls, id=None, **kwargs):
    178         """
    179         Look to see if we already have an instance for this particular type
    180         and id.  If so, use it instead of creating a duplicate instance.
    181         """
    182         if id is not None:
    183             instance = cls._instances_by_type_and_id.get((cls, id))
    184             if instance:
    185                 return instance
    186         return super(DBObject, cls).__new__(cls, id=id, **kwargs)
    187 
    188 
    189     def __init__(self, id=None, row=None, new_record=False, always_query=True):
    190         assert bool(id) or bool(row)
    191         if id is not None and row is not None:
    192             assert id == row[0]
    193         assert self._table_name, '_table_name must be defined in your class'
    194         assert self._fields, '_fields must be defined in your class'
    195         if not new_record:
    196             if self._initialized and not always_query:
    197                 return  # We've already been initialized.
    198             if id is None:
    199                 id = row[0]
    200             # Tell future constructors to use us instead of re-querying while
    201             # this instance is still around.
    202             self._instances_by_type_and_id[(type(self), id)] = self
    203 
    204         self.__table = self._table_name
    205 
    206         self.__new_record = new_record
    207 
    208         if row is None:
    209             row = self._fetch_row_from_db(id)
    210 
    211         if self._initialized:
    212             differences = self._compare_fields_in_row(row)
    213             if differences:
    214                 logging.warning(
    215                     'initialized %s %s instance requery is updating: %s',
    216                     type(self), self.id, differences)
    217         self._update_fields_from_row(row)
    218         self._initialized = True
    219 
    220 
    221     @classmethod
    222     def _clear_instance_cache(cls):
    223         """Used for testing, clear the internal instance cache."""
    224         cls._instances_by_type_and_id.clear()
    225 
    226 
    227     def _fetch_row_from_db(self, row_id):
    228         sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
    229         rows = _db.execute(sql, (row_id,))
    230         if not rows:
    231             raise DBError("row not found (table=%s, row id=%s)"
    232                           % (self.__table, row_id))
    233         return rows[0]
    234 
    235 
    236     def _assert_row_length(self, row):
    237         assert len(row) == len(self._fields), (
    238             "table = %s, row = %s/%d, fields = %s/%d" % (
    239             self.__table, row, len(row), self._fields, len(self._fields)))
    240 
    241 
    242     def _compare_fields_in_row(self, row):
    243         """
    244         Given a row as returned by a SELECT query, compare it to our existing in
    245         memory fields.  Fractional seconds are stripped from datetime values
    246         before comparison.
    247 
    248         @param row - A sequence of values corresponding to fields named in
    249                 The class attribute _fields.
    250 
    251         @returns A dictionary listing the differences keyed by field name
    252                 containing tuples of (current_value, row_value).
    253         """
    254         self._assert_row_length(row)
    255         differences = {}
    256         for field, row_value in itertools.izip(self._fields, row):
    257             current_value = getattr(self, field)
    258             if (isinstance(current_value, datetime.datetime)
    259                 and isinstance(row_value, datetime.datetime)):
    260                 current_value = current_value.strftime(time_utils.TIME_FMT)
    261                 row_value = row_value.strftime(time_utils.TIME_FMT)
    262             if current_value != row_value:
    263                 differences[field] = (current_value, row_value)
    264         return differences
    265 
    266 
    267     def _update_fields_from_row(self, row):
    268         """
    269         Update our field attributes using a single row returned by SELECT.
    270 
    271         @param row - A sequence of values corresponding to fields named in
    272                 the class fields list.
    273         """
    274         self._assert_row_length(row)
    275 
    276         self._valid_fields = set()
    277         for field, value in itertools.izip(self._fields, row):
    278             setattr(self, field, value)
    279             self._valid_fields.add(field)
    280 
    281         self._valid_fields.remove('id')
    282 
    283 
    284     def update_from_database(self):
    285         assert self.id is not None
    286         row = self._fetch_row_from_db(self.id)
    287         self._update_fields_from_row(row)
    288 
    289 
    290     def count(self, where, table = None):
    291         if not table:
    292             table = self.__table
    293 
    294         rows = _db.execute("""
    295                 SELECT count(*) FROM %s
    296                 WHERE %s
    297         """ % (table, where))
    298 
    299         assert len(rows) == 1
    300 
    301         return int(rows[0][0])
    302 
    303 
    304     def update_field(self, field, value):
    305         assert field in self._valid_fields
    306 
    307         if getattr(self, field) == value:
    308             return
    309 
    310         query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
    311         _db.execute(query, (value, self.id))
    312 
    313         setattr(self, field, value)
    314 
    315 
    316     def save(self):
    317         if self.__new_record:
    318             keys = self._fields[1:] # avoid id
    319             columns = ','.join([str(key) for key in keys])
    320             values = []
    321             for key in keys:
    322                 value = getattr(self, key)
    323                 if value is None:
    324                     values.append('NULL')
    325                 else:
    326                     values.append('"%s"' % value)
    327             values_str = ','.join(values)
    328             query = ('INSERT INTO %s (%s) VALUES (%s)' %
    329                      (self.__table, columns, values_str))
    330             _db.execute(query)
    331             # Update our id to the one the database just assigned to us.
    332             self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
    333 
    334 
    335     def delete(self):
    336         self._instances_by_type_and_id.pop((type(self), id), None)
    337         self._initialized = False
    338         self._valid_fields.clear()
    339         query = 'DELETE FROM %s WHERE id=%%s' % self.__table
    340         _db.execute(query, (self.id,))
    341 
    342 
    343     @staticmethod
    344     def _prefix_with(string, prefix):
    345         if string:
    346             string = prefix + string
    347         return string
    348 
    349 
    350     @classmethod
    351     def fetch(cls, where='', params=(), joins='', order_by=''):
    352         """
    353         Construct instances of our class based on the given database query.
    354 
    355         @yields One class instance for each row fetched.
    356         """
    357         order_by = cls._prefix_with(order_by, 'ORDER BY ')
    358         where = cls._prefix_with(where, 'WHERE ')
    359         query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
    360                  '%(where)s %(order_by)s' % {'table' : cls._table_name,
    361                                              'joins' : joins,
    362                                              'where' : where,
    363                                              'order_by' : order_by})
    364         rows = _db.execute(query, params)
    365         return [cls(id=row[0], row=row) for row in rows]
    366 
    367 
    368 class IneligibleHostQueue(DBObject):
    369     _table_name = 'afe_ineligible_host_queues'
    370     _fields = ('id', 'job_id', 'host_id')
    371 
    372 
    373 class AtomicGroup(DBObject):
    374     _table_name = 'afe_atomic_groups'
    375     _fields = ('id', 'name', 'description', 'max_number_of_machines',
    376                'invalid')
    377 
    378 
    379 class Label(DBObject):
    380     _table_name = 'afe_labels'
    381     _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
    382                'only_if_needed', 'atomic_group_id')
    383 
    384 
    385     def __repr__(self):
    386         return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
    387                 self.name, self.id, self.atomic_group_id)
    388 
    389 
    390 class Host(DBObject):
    391     _table_name = 'afe_hosts'
    392     # TODO(ayatane): synch_id is not used, remove after fixing DB.
    393     _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
    394                'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
    395                'leased', 'shard_id', 'lock_reason')
    396 
    397 
    398     def set_status(self,status):
    399         logging.info('%s -> %s', self.hostname, status)
    400         self.update_field('status',status)
    401 
    402 
    403     def platform_and_labels(self):
    404         """
    405         Returns a tuple (platform_name, list_of_all_label_names).
    406         """
    407         rows = _db.execute("""
    408                 SELECT afe_labels.name, afe_labels.platform
    409                 FROM afe_labels
    410                 INNER JOIN afe_hosts_labels ON
    411                         afe_labels.id = afe_hosts_labels.label_id
    412                 WHERE afe_hosts_labels.host_id = %s
    413                 ORDER BY afe_labels.name
    414                 """, (self.id,))
    415         platform = None
    416         all_labels = []
    417         for label_name, is_platform in rows:
    418             if is_platform:
    419                 platform = label_name
    420             all_labels.append(label_name)
    421         return platform, all_labels
    422 
    423 
    424     _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
    425 
    426 
    427     @classmethod
    428     def cmp_for_sort(cls, a, b):
    429         """
    430         A comparison function for sorting Host objects by hostname.
    431 
    432         This strips any trailing numeric digits, ignores leading 0s and
    433         compares hostnames by the leading name and the trailing digits as a
    434         number.  If both hostnames do not match this pattern, they are simply
    435         compared as lower case strings.
    436 
    437         Example of how hostnames will be sorted:
    438 
    439           alice, host1, host2, host09, host010, host10, host11, yolkfolk
    440 
    441         This hopefully satisfy most people's hostname sorting needs regardless
    442         of their exact naming schemes.  Nobody sane should have both a host10
    443         and host010 (but the algorithm works regardless).
    444         """
    445         lower_a = a.hostname.lower()
    446         lower_b = b.hostname.lower()
    447         match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
    448         match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
    449         if match_a and match_b:
    450             name_a, number_a_str = match_a.groups()
    451             name_b, number_b_str = match_b.groups()
    452             number_a = int(number_a_str.lstrip('0'))
    453             number_b = int(number_b_str.lstrip('0'))
    454             result = cmp((name_a, number_a), (name_b, number_b))
    455             if result == 0 and lower_a != lower_b:
    456                 # If they compared equal above but the lower case names are
    457                 # indeed different, don't report equality.  abc012 != abc12.
    458                 return cmp(lower_a, lower_b)
    459             return result
    460         else:
    461             return cmp(lower_a, lower_b)
    462 
    463 
    464 class HostQueueEntry(DBObject):
    465     _table_name = 'afe_host_queue_entries'
    466     _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
    467                'active', 'complete', 'deleted', 'execution_subdir',
    468                'atomic_group_id', 'aborted', 'started_on', 'finished_on')
    469 
    470     _COMPLETION_COUNT_METRIC = metrics.Counter(
    471         'chromeos/autotest/scheduler/hqe_completion_count')
    472 
    473     def __init__(self, id=None, row=None, **kwargs):
    474         assert id or row
    475         super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
    476         self.job = Job(self.job_id)
    477 
    478         if self.host_id:
    479             self.host = rdb_lib.get_hosts([self.host_id])[0]
    480             self.host.dbg_str = self.get_dbg_str()
    481             self.host.metadata = get_job_metadata(self.job)
    482         else:
    483             self.host = None
    484 
    485 
    486     @classmethod
    487     def clone(cls, template):
    488         """
    489         Creates a new row using the values from a template instance.
    490 
    491         The new instance will not exist in the database or have a valid
    492         id attribute until its save() method is called.
    493         """
    494         assert isinstance(template, cls)
    495         new_row = [getattr(template, field) for field in cls._fields]
    496         clone = cls(row=new_row, new_record=True)
    497         clone.id = None
    498         return clone
    499 
    500 
    501     def _view_job_url(self):
    502         return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
    503 
    504 
    505     def get_labels(self):
    506         """
    507         Get all labels associated with this host queue entry (either via the
    508         meta_host or as a job dependency label).  The labels yielded are not
    509         guaranteed to be unique.
    510 
    511         @yields Label instances associated with this host_queue_entry.
    512         """
    513         if self.meta_host:
    514             yield Label(id=self.meta_host, always_query=False)
    515         labels = Label.fetch(
    516                 joins="JOIN afe_jobs_dependency_labels AS deps "
    517                       "ON (afe_labels.id = deps.label_id)",
    518                 where="deps.job_id = %d" % self.job.id)
    519         for label in labels:
    520             yield label
    521 
    522 
    523     def set_host(self, host):
    524         if host:
    525             logging.info('Assigning host %s to entry %s', host.hostname, self)
    526             self.update_field('host_id', host.id)
    527             self.block_host(host.id)
    528         else:
    529             logging.info('Releasing host from %s', self)
    530             self.unblock_host(self.host.id)
    531             self.update_field('host_id', None)
    532 
    533         self.host = host
    534 
    535 
    536     def block_host(self, host_id):
    537         logging.info("creating block %s/%s", self.job.id, host_id)
    538         row = [0, self.job.id, host_id]
    539         block = IneligibleHostQueue(row=row, new_record=True)
    540         block.save()
    541 
    542 
    543     def unblock_host(self, host_id):
    544         logging.info("removing block %s/%s", self.job.id, host_id)
    545         blocks = IneligibleHostQueue.fetch(
    546             'job_id=%d and host_id=%d' % (self.job.id, host_id))
    547         for block in blocks:
    548             block.delete()
    549 
    550 
    551     def set_execution_subdir(self, subdir=None):
    552         if subdir is None:
    553             assert self.host
    554             subdir = self.host.hostname
    555         self.update_field('execution_subdir', subdir)
    556 
    557 
    558     def _get_hostname(self):
    559         if self.host:
    560             return self.host.hostname
    561         return 'no host'
    562 
    563 
    564     def get_dbg_str(self):
    565         """Get a debug string to identify this host.
    566 
    567         @return: A string containing the hqe and job id.
    568         """
    569         try:
    570             return 'HQE: %s, for job: %s' % (self.id, self.job_id)
    571         except AttributeError as e:
    572             return 'HQE has not been initialized yet: %s' % e
    573 
    574 
    575     def __str__(self):
    576         flags = []
    577         if self.active:
    578             flags.append('active')
    579         if self.complete:
    580             flags.append('complete')
    581         if self.deleted:
    582             flags.append('deleted')
    583         if self.aborted:
    584             flags.append('aborted')
    585         flags_str = ','.join(flags)
    586         if flags_str:
    587             flags_str = ' [%s]' % flags_str
    588         return ("%s and host: %s has status:%s%s" %
    589                 (self.get_dbg_str(), self._get_hostname(), self.status,
    590                  flags_str))
    591 
    592 
    593     def record_state(self, type_str, state, value):
    594         """Record metadata in elasticsearch.
    595 
    596         If ES configured to use http, then we will time that http request.
    597         Otherwise, it uses UDP, so we will not need to time it.
    598 
    599         @param type_str: sets the _type field in elasticsearch db.
    600         @param state: string representing what state we are recording,
    601                       e.g. 'status'
    602         @param value: value of the state, e.g. 'verifying'
    603         """
    604         metadata = {
    605             'time_changed': time.time(),
    606              state: value,
    607             'job_id': self.job_id,
    608         }
    609         if self.host:
    610             metadata['hostname'] = self.host.hostname
    611         autotest_es.post(type_str=type_str, metadata=metadata)
    612 
    613 
    614     def set_status(self, status):
    615         logging.info("%s -> %s", self, status)
    616 
    617         self.update_field('status', status)
    618 
    619         active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
    620         complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
    621         assert not (active and complete)
    622 
    623         self.update_field('active', active)
    624 
    625         # The ordering of these operations is important. Once we set the
    626         # complete bit this job will become indistinguishable from all
    627         # the other complete jobs, unless we first set shard_id to NULL
    628         # to signal to the shard_client that we need to upload it. However,
    629         # we can only set both these after we've updated finished_on etc
    630         # within _on_complete or the job will get synced in an intermediate
    631         # state. This means that if someone sigkills the scheduler between
    632         # setting finished_on and complete, we will have inconsistent jobs.
    633         # This should be fine, because nothing critical checks finished_on,
    634         # and the scheduler should never be killed mid-tick.
    635         if complete:
    636             self._on_complete(status)
    637             self._email_on_job_complete()
    638 
    639         self.update_field('complete', complete)
    640 
    641         should_email_status = (status.lower() in _notify_email_statuses or
    642                                'all' in _notify_email_statuses)
    643         if should_email_status:
    644             self._email_on_status(status)
    645         logging.debug('HQE Set Status Complete')
    646         self.record_state('hqe_status', 'status', status)
    647 
    648 
    649     def _on_complete(self, status):
    650         metric_fields = {'status': status.lower()}
    651         if self.host:
    652             metric_fields['board'] = self.host.board or ''
    653             if len(self.host.pools) == 1:
    654                 metric_fields['pool'] = self.host.pools[0]
    655             else:
    656                 metric_fields['pool'] = 'MULTIPLE'
    657         else:
    658             metric_fields['board'] = 'NO_HOST'
    659             metric_fields['pool'] = 'NO_HOST'
    660         self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
    661         if status is not models.HostQueueEntry.Status.ABORTED:
    662             self.job.stop_if_necessary()
    663         if self.started_on:
    664             self.set_finished_on_now()
    665         if self.job.shard_id is not None:
    666             # If shard_id is None, the job will be synced back to the master
    667             self.job.update_field('shard_id', None)
    668         if not self.execution_subdir:
    669             return
    670         # unregister any possible pidfiles associated with this queue entry
    671         for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
    672             pidfile_id = _drone_manager.get_pidfile_id_from(
    673                     self.execution_path(), pidfile_name=pidfile_name)
    674             _drone_manager.unregister_pidfile(pidfile_id)
    675 
    676 
    677     def _get_status_email_contents(self, status, summary=None, hostname=None):
    678         """
    679         Gather info for the status notification e-mails.
    680 
    681         If needed, we could start using the Django templating engine to create
    682         the subject and the e-mail body, but that doesn't seem necessary right
    683         now.
    684 
    685         @param status: Job status text. Mandatory.
    686         @param summary: Job summary text. Optional.
    687         @param hostname: A hostname for the job. Optional.
    688 
    689         @return: Tuple (subject, body) for the notification e-mail.
    690         """
    691         job_stats = Job(id=self.job.id).get_execution_details()
    692 
    693         subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
    694                    (self.job.id, self.job.name, status))
    695 
    696         if hostname is not None:
    697             subject += '| Hostname: %s ' % hostname
    698 
    699         if status not in ["1 Failed", "Failed"]:
    700             subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
    701 
    702         body =  "Job ID: %s\n" % self.job.id
    703         body += "Job name: %s\n" % self.job.name
    704         if hostname is not None:
    705             body += "Host: %s\n" % hostname
    706         if summary is not None:
    707             body += "Summary: %s\n" % summary
    708         body += "Status: %s\n" % status
    709         body += "Results interface URL: %s\n" % self._view_job_url()
    710         body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
    711         if int(job_stats['total_executed']) > 0:
    712             body += "User tests executed: %s\n" % job_stats['total_executed']
    713             body += "User tests passed: %s\n" % job_stats['total_passed']
    714             body += "User tests failed: %s\n" % job_stats['total_failed']
    715             body += ("User tests success rate: %.2f %%\n" %
    716                      job_stats['success_rate'])
    717 
    718         if job_stats['failed_rows']:
    719             body += "Failures:\n"
    720             body += job_stats['failed_rows']
    721 
    722         return subject, body
    723 
    724 
    725     def _email_on_status(self, status):
    726         hostname = self._get_hostname()
    727         subject, body = self._get_status_email_contents(status, None, hostname)
    728         email_manager.manager.send_email(self.job.email_list, subject, body)
    729 
    730 
    731     def _email_on_job_complete(self):
    732         if not self.job.is_finished():
    733             return
    734 
    735         summary = []
    736         hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
    737         for queue_entry in hosts_queue:
    738             summary.append("Host: %s Status: %s" %
    739                                 (queue_entry._get_hostname(),
    740                                  queue_entry.status))
    741 
    742         summary = "\n".join(summary)
    743         status_counts = models.Job.objects.get_status_counts(
    744                 [self.job.id])[self.job.id]
    745         status = ', '.join('%d %s' % (count, status) for status, count
    746                     in status_counts.iteritems())
    747 
    748         subject, body = self._get_status_email_contents(status, summary, None)
    749         email_manager.manager.send_email(self.job.email_list, subject, body)
    750 
    751 
    752     def schedule_pre_job_tasks(self):
    753         logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
    754                      self.job.name, self.meta_host, self.atomic_group_id,
    755                      self.job.id, self.id, self.host.hostname, self.status)
    756 
    757         self._do_schedule_pre_job_tasks()
    758 
    759 
    760     def _do_schedule_pre_job_tasks(self):
    761         self.job.schedule_pre_job_tasks(queue_entry=self)
    762 
    763 
    764     def requeue(self):
    765         assert self.host
    766         self.set_status(models.HostQueueEntry.Status.QUEUED)
    767         self.update_field('started_on', None)
    768         self.update_field('finished_on', None)
    769         # verify/cleanup failure sets the execution subdir, so reset it here
    770         self.set_execution_subdir('')
    771         if self.meta_host:
    772             self.set_host(None)
    773 
    774 
    775     @property
    776     def aborted_by(self):
    777         self._load_abort_info()
    778         return self._aborted_by
    779 
    780 
    781     @property
    782     def aborted_on(self):
    783         self._load_abort_info()
    784         return self._aborted_on
    785 
    786 
    787     def _load_abort_info(self):
    788         """ Fetch info about who aborted the job. """
    789         if hasattr(self, "_aborted_by"):
    790             return
    791         rows = _db.execute("""
    792                 SELECT afe_users.login,
    793                         afe_aborted_host_queue_entries.aborted_on
    794                 FROM afe_aborted_host_queue_entries
    795                 INNER JOIN afe_users
    796                 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
    797                 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
    798                 """, (self.id,))
    799         if rows:
    800             self._aborted_by, self._aborted_on = rows[0]
    801         else:
    802             self._aborted_by = self._aborted_on = None
    803 
    804 
    805     def on_pending(self):
    806         """
    807         Called when an entry in a synchronous job has passed verify.  If the
    808         job is ready to run, sets the entries to STARTING. Otherwise, it leaves
    809         them in PENDING.
    810         """
    811         self.set_status(models.HostQueueEntry.Status.PENDING)
    812         self.host.set_status(models.Host.Status.PENDING)
    813 
    814         # Some debug code here: sends an email if an asynchronous job does not
    815         # immediately enter Starting.
    816         # TODO: Remove this once we figure out why asynchronous jobs are getting
    817         # stuck in Pending.
    818         self.job.run_if_ready(queue_entry=self)
    819         if (self.job.synch_count == 1 and
    820                 self.status == models.HostQueueEntry.Status.PENDING):
    821             subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
    822             message = 'Asynchronous job stuck in Pending'
    823             email_manager.manager.enqueue_notify_email(subject, message)
    824 
    825 
    826     def abort(self, dispatcher):
    827         assert self.aborted and not self.complete
    828 
    829         Status = models.HostQueueEntry.Status
    830         if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
    831             # do nothing; post-job tasks will finish and then mark this entry
    832             # with status "Aborted" and take care of the host
    833             return
    834 
    835         if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
    836                            Status.WAITING):
    837             # If hqe is in any of these status, it should not have any
    838             # unfinished agent before it can be aborted.
    839             agents = dispatcher.get_agents_for_entry(self)
    840             # Agent with finished task can be left behind. This is added to
    841             # handle the special case of aborting hostless job in STARTING
    842             # status, in which the agent has only a HostlessQueueTask
    843             # associated. The finished HostlessQueueTask will be cleaned up in
    844             # the next tick, so it's safe to leave the agent there. Without
    845             # filtering out finished agent, HQE abort won't be able to proceed.
    846             assert all([agent.is_done() for agent in agents])
    847             # If hqe is still in STARTING status, it may not have assigned a
    848             # host yet.
    849             if self.host:
    850                 self.host.set_status(models.Host.Status.READY)
    851         elif (self.status == Status.VERIFYING or
    852               self.status == Status.RESETTING):
    853             models.SpecialTask.objects.create(
    854                     task=models.SpecialTask.Task.CLEANUP,
    855                     host=models.Host.objects.get(id=self.host.id),
    856                     requested_by=self.job.owner_model())
    857         elif self.status == Status.PROVISIONING:
    858             models.SpecialTask.objects.create(
    859                     task=models.SpecialTask.Task.REPAIR,
    860                     host=models.Host.objects.get(id=self.host.id),
    861                     requested_by=self.job.owner_model())
    862 
    863         self.set_status(Status.ABORTED)
    864 
    865 
    866     def execution_tag(self):
    867         SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
    868                                'complete!=1 AND execution_subdir="" AND '
    869                                'status!="Queued";')
    870         SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
    871                                  'status="Aborted" WHERE id=%s;')
    872         try:
    873             assert self.execution_subdir
    874         except AssertionError:
    875             # TODO(scottz): Remove temporary fix/info gathering pathway for
    876             # crosbug.com/31595 once issue is root caused.
    877             logging.error('No execution_subdir for host queue id:%s.', self.id)
    878             logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
    879             for row in _db.execute(SQL_SUSPECT_ENTRIES):
    880                 logging.error(row)
    881             logging.error('====DB DEBUG====\n')
    882             fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
    883             logging.error('EXECUTING: %s', fix_query)
    884             _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
    885             raise AssertionError(('self.execution_subdir not found. '
    886                                   'See log for details.'))
    887 
    888         return "%s/%s" % (self.job.tag(), self.execution_subdir)
    889 
    890 
    891     def execution_path(self):
    892         return self.execution_tag()
    893 
    894 
    895     def set_started_on_now(self):
    896         self.update_field('started_on', datetime.datetime.now())
    897 
    898 
    899     def set_finished_on_now(self):
    900         self.update_field('finished_on', datetime.datetime.now())
    901 
    902 
    903     def is_hostless(self):
    904         return (self.host_id is None
    905                 and self.meta_host is None)
    906 
    907 
    908 class Job(DBObject):
    909     _table_name = 'afe_jobs'
    910     _fields = ('id', 'owner', 'name', 'priority', 'control_file',
    911                'control_type', 'created_on', 'synch_count', 'timeout',
    912                'run_verify', 'email_list', 'reboot_before', 'reboot_after',
    913                'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
    914                'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
    915                'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
    916                'require_ssp')
    917 
    918     # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
    919     # all status='Pending' atomic group HQEs incase a delay was running when the
    920     # scheduler was restarted and no more hosts ever successfully exit Verify.
    921 
    922     def __init__(self, id=None, row=None, **kwargs):
    923         assert id or row
    924         super(Job, self).__init__(id=id, row=row, **kwargs)
    925         self._owner_model = None # caches model instance of owner
    926         self.update_image_path = None # path of OS image to install
    927 
    928 
    929     def model(self):
    930         return models.Job.objects.get(id=self.id)
    931 
    932 
    933     def owner_model(self):
    934         # work around the fact that the Job owner field is a string, not a
    935         # foreign key
    936         if not self._owner_model:
    937             self._owner_model = models.User.objects.get(login=self.owner)
    938         return self._owner_model
    939 
    940 
    941     def tag(self):
    942         return "%s-%s" % (self.id, self.owner)
    943 
    944 
    945     def is_image_update_job(self):
    946         """
    947         Discover if the current job requires an OS update.
    948 
    949         @return: True/False if OS should be updated before job is run.
    950         """
    951         # All image update jobs have the parameterized_job_id set.
    952         if not self.parameterized_job_id:
    953             return False
    954 
    955         # Retrieve the ID of the ParameterizedJob this job is an instance of.
    956         rows = _db.execute("""
    957                 SELECT test_id
    958                 FROM afe_parameterized_jobs
    959                 WHERE id = %s
    960                 """, (self.parameterized_job_id,))
    961         if not rows:
    962             return False
    963         test_id = rows[0][0]
    964 
    965         # Retrieve the ID of the known autoupdate_ParameterizedJob.
    966         rows = _db.execute("""
    967                 SELECT id
    968                 FROM afe_autotests
    969                 WHERE name = 'autoupdate_ParameterizedJob'
    970                 """)
    971         if not rows:
    972             return False
    973         update_id = rows[0][0]
    974 
    975         # If the IDs are the same we've found an image update job.
    976         if test_id == update_id:
    977             # Finally, get the path to the OS image to install.
    978             rows = _db.execute("""
    979                     SELECT parameter_value
    980                     FROM afe_parameterized_job_parameters
    981                     WHERE parameterized_job_id = %s
    982                     """, (self.parameterized_job_id,))
    983             if rows:
    984                 # Save the path in update_image_path to use later as a command
    985                 # line parameter to autoserv.
    986                 self.update_image_path = rows[0][0]
    987                 return True
    988 
    989         return False
    990 
    991 
    992     def get_execution_details(self):
    993         """
    994         Get test execution details for this job.
    995 
    996         @return: Dictionary with test execution details
    997         """
    998         def _find_test_jobs(rows):
    999             """
   1000             Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
   1001             Those are autotest 'internal job' tests, so they should not be
   1002             counted when evaluating the test stats.
   1003 
   1004             @param rows: List of rows (matrix) with database results.
   1005             """
   1006             job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
   1007             n_test_jobs = 0
   1008             for r in rows:
   1009                 test_name = r[0]
   1010                 if job_test_pattern.match(test_name):
   1011                     n_test_jobs += 1
   1012 
   1013             return n_test_jobs
   1014 
   1015         stats = {}
   1016 
   1017         rows = _db.execute("""
   1018                 SELECT t.test, s.word, t.reason
   1019                 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
   1020                 WHERE t.job_idx = j.job_idx
   1021                 AND s.status_idx = t.status
   1022                 AND j.afe_job_id = %s
   1023                 ORDER BY t.reason
   1024                 """ % self.id)
   1025 
   1026         failed_rows = [r for r in rows if not r[1] == 'GOOD']
   1027 
   1028         n_test_jobs = _find_test_jobs(rows)
   1029         n_test_jobs_failed = _find_test_jobs(failed_rows)
   1030 
   1031         total_executed = len(rows) - n_test_jobs
   1032         total_failed = len(failed_rows) - n_test_jobs_failed
   1033 
   1034         if total_executed > 0:
   1035             success_rate = 100 - ((total_failed / float(total_executed)) * 100)
   1036         else:
   1037             success_rate = 0
   1038 
   1039         stats['total_executed'] = total_executed
   1040         stats['total_failed'] = total_failed
   1041         stats['total_passed'] = total_executed - total_failed
   1042         stats['success_rate'] = success_rate
   1043 
   1044         status_header = ("Test Name", "Status", "Reason")
   1045         if failed_rows:
   1046             stats['failed_rows'] = utils.matrix_to_string(failed_rows,
   1047                                                           status_header)
   1048         else:
   1049             stats['failed_rows'] = ''
   1050 
   1051         time_row = _db.execute("""
   1052                    SELECT started_time, finished_time
   1053                    FROM tko_jobs
   1054                    WHERE afe_job_id = %s
   1055                    """ % self.id)
   1056 
   1057         if time_row:
   1058             t_begin, t_end = time_row[0]
   1059             try:
   1060                 delta = t_end - t_begin
   1061                 minutes, seconds = divmod(delta.seconds, 60)
   1062                 hours, minutes = divmod(minutes, 60)
   1063                 stats['execution_time'] = ("%02d:%02d:%02d" %
   1064                                            (hours, minutes, seconds))
   1065             # One of t_end or t_begin are None
   1066             except TypeError:
   1067                 stats['execution_time'] = '(could not determine)'
   1068         else:
   1069             stats['execution_time'] = '(none)'
   1070 
   1071         return stats
   1072 
   1073 
   1074     def keyval_dict(self):
   1075         return self.model().keyval_dict()
   1076 
   1077 
   1078     def _pending_count(self):
   1079         """The number of HostQueueEntries for this job in the Pending state."""
   1080         pending_entries = models.HostQueueEntry.objects.filter(
   1081                 job=self.id, status=models.HostQueueEntry.Status.PENDING)
   1082         return pending_entries.count()
   1083 
   1084 
   1085     def is_ready(self):
   1086         pending_count = self._pending_count()
   1087         ready = (pending_count >= self.synch_count)
   1088 
   1089         if not ready:
   1090             logging.info(
   1091                     'Job %s not ready: %s pending, %s required ',
   1092                     self, pending_count, self.synch_count)
   1093 
   1094         return ready
   1095 
   1096 
   1097     def num_machines(self, clause = None):
   1098         sql = "job_id=%s" % self.id
   1099         if clause:
   1100             sql += " AND (%s)" % clause
   1101         return self.count(sql, table='afe_host_queue_entries')
   1102 
   1103 
   1104     def num_queued(self):
   1105         return self.num_machines('not complete')
   1106 
   1107 
   1108     def num_active(self):
   1109         return self.num_machines('active')
   1110 
   1111 
   1112     def num_complete(self):
   1113         return self.num_machines('complete')
   1114 
   1115 
   1116     def is_finished(self):
   1117         return self.num_complete() == self.num_machines()
   1118 
   1119 
   1120     def _not_yet_run_entries(self, include_active=True):
   1121         if include_active:
   1122           statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
   1123         else:
   1124           statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
   1125         return models.HostQueueEntry.objects.filter(job=self.id,
   1126                                                     status__in=statuses)
   1127 
   1128 
   1129     def _stop_all_entries(self):
   1130         """Stops the job's inactive pre-job HQEs."""
   1131         entries_to_stop = self._not_yet_run_entries(
   1132             include_active=False)
   1133         for child_entry in entries_to_stop:
   1134             assert not child_entry.complete, (
   1135                 '%s status=%s, active=%s, complete=%s' %
   1136                 (child_entry.id, child_entry.status, child_entry.active,
   1137                  child_entry.complete))
   1138             if child_entry.status == models.HostQueueEntry.Status.PENDING:
   1139                 child_entry.host.status = models.Host.Status.READY
   1140                 child_entry.host.save()
   1141             child_entry.status = models.HostQueueEntry.Status.STOPPED
   1142             child_entry.save()
   1143 
   1144 
   1145     def stop_if_necessary(self):
   1146         not_yet_run = self._not_yet_run_entries()
   1147         if not_yet_run.count() < self.synch_count:
   1148             self._stop_all_entries()
   1149 
   1150 
   1151     def _next_group_name(self):
   1152         """@returns a directory name to use for the next host group results."""
   1153         group_name = ''
   1154         group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
   1155         query = models.HostQueueEntry.objects.filter(
   1156             job=self.id).values('execution_subdir').distinct()
   1157         subdirs = (entry['execution_subdir'] for entry in query)
   1158         group_matches = (group_count_re.match(subdir) for subdir in subdirs)
   1159         ids = [int(match.group(1)) for match in group_matches if match]
   1160         if ids:
   1161             next_id = max(ids) + 1
   1162         else:
   1163             next_id = 0
   1164         return '%sgroup%d' % (group_name, next_id)
   1165 
   1166 
   1167     def get_group_entries(self, queue_entry_from_group):
   1168         """
   1169         @param queue_entry_from_group: A HostQueueEntry instance to find other
   1170                 group entries on this job for.
   1171 
   1172         @returns A list of HostQueueEntry objects all executing this job as
   1173                 part of the same group as the one supplied (having the same
   1174                 execution_subdir).
   1175         """
   1176         execution_subdir = queue_entry_from_group.execution_subdir
   1177         return list(HostQueueEntry.fetch(
   1178             where='job_id=%s AND execution_subdir=%s',
   1179             params=(self.id, execution_subdir)))
   1180 
   1181 
   1182     def _should_run_cleanup(self, queue_entry):
   1183         if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
   1184             return True
   1185         elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
   1186             return queue_entry.host.dirty
   1187         return False
   1188 
   1189 
   1190     def _should_run_verify(self, queue_entry):
   1191         do_not_verify = (queue_entry.host.protection ==
   1192                          host_protections.Protection.DO_NOT_VERIFY)
   1193         if do_not_verify:
   1194             return False
   1195         # If RebootBefore is set to NEVER, then we won't run reset because
   1196         # we can't cleanup, so we need to weaken a Reset into a Verify.
   1197         weaker_reset = (self.run_reset and
   1198                 self.reboot_before == model_attributes.RebootBefore.NEVER)
   1199         return self.run_verify or weaker_reset
   1200 
   1201 
   1202     def _should_run_reset(self, queue_entry):
   1203         can_verify = (queue_entry.host.protection !=
   1204                          host_protections.Protection.DO_NOT_VERIFY)
   1205         can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
   1206         return (can_reboot and can_verify and (self.run_reset or
   1207                 (self._should_run_cleanup(queue_entry) and
   1208                  self._should_run_verify(queue_entry))))
   1209 
   1210 
   1211     def _should_run_provision(self, queue_entry):
   1212         """
   1213         Determine if the queue_entry needs to have a provision task run before
   1214         it to provision queue_entry.host.
   1215 
   1216         @param queue_entry: The host queue entry in question.
   1217         @returns: True if we should schedule a provision task, False otherwise.
   1218 
   1219         """
   1220         # If we get to this point, it means that the scheduler has already
   1221         # vetted that all the unprovisionable labels match, so we can just
   1222         # find all labels on the job that aren't on the host to get the list
   1223         # of what we need to provision.  (See the scheduling logic in
   1224         # host_scheduler.py:is_host_eligable_for_job() where we discard all
   1225         # actionable labels when assigning jobs to hosts.)
   1226         job_labels = {x.name for x in queue_entry.get_labels()}
   1227         # Skip provision if `skip_provision` is listed in the job labels.
   1228         if provision.SKIP_PROVISION in job_labels:
   1229             return False
   1230         _, host_labels = queue_entry.host.platform_and_labels()
   1231         # If there are any labels on the job that are not on the host and they
   1232         # are labels that provisioning knows how to change, then that means
   1233         # there is provisioning work to do.  If there's no provisioning work to
   1234         # do, then obviously we have no reason to schedule a provision task!
   1235         diff = job_labels - set(host_labels)
   1236         if any([provision.Provision.acts_on(x) for x in diff]):
   1237             return True
   1238         return False
   1239 
   1240 
   1241     def _queue_special_task(self, queue_entry, task):
   1242         """
   1243         Create a special task and associate it with a host queue entry.
   1244 
   1245         @param queue_entry: The queue entry this special task should be
   1246                             associated with.
   1247         @param task: One of the members of the enum models.SpecialTask.Task.
   1248         @returns: None
   1249 
   1250         """
   1251         models.SpecialTask.objects.create(
   1252                 host=models.Host.objects.get(id=queue_entry.host_id),
   1253                 queue_entry=queue_entry, task=task)
   1254 
   1255 
   1256     def schedule_pre_job_tasks(self, queue_entry):
   1257         """
   1258         Queue all of the special tasks that need to be run before a host
   1259         queue entry may run.
   1260 
   1261         If no special taskes need to be scheduled, then |on_pending| will be
   1262         called directly.
   1263 
   1264         @returns None
   1265 
   1266         """
   1267         task_queued = False
   1268         hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
   1269 
   1270         if self._should_run_provision(queue_entry):
   1271             self._queue_special_task(hqe_model,
   1272                                      models.SpecialTask.Task.PROVISION)
   1273             task_queued = True
   1274         elif self._should_run_reset(queue_entry):
   1275             self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
   1276             task_queued = True
   1277         else:
   1278             if self._should_run_cleanup(queue_entry):
   1279                 self._queue_special_task(hqe_model,
   1280                                          models.SpecialTask.Task.CLEANUP)
   1281                 task_queued = True
   1282             if self._should_run_verify(queue_entry):
   1283                 self._queue_special_task(hqe_model,
   1284                                          models.SpecialTask.Task.VERIFY)
   1285                 task_queued = True
   1286 
   1287         if not task_queued:
   1288             queue_entry.on_pending()
   1289 
   1290 
   1291     def _assign_new_group(self, queue_entries):
   1292         if len(queue_entries) == 1:
   1293             group_subdir_name = queue_entries[0].host.hostname
   1294         else:
   1295             group_subdir_name = self._next_group_name()
   1296             logging.info('Running synchronous job %d hosts %s as %s',
   1297                 self.id, [entry.host.hostname for entry in queue_entries],
   1298                 group_subdir_name)
   1299 
   1300         for queue_entry in queue_entries:
   1301             queue_entry.set_execution_subdir(group_subdir_name)
   1302 
   1303 
   1304     def _choose_group_to_run(self, include_queue_entry):
   1305         """
   1306         @returns A tuple containing a list of HostQueueEntry instances to be
   1307                 used to run this Job, a string group name to suggest giving
   1308                 to this job in the results database.
   1309         """
   1310         chosen_entries = [include_queue_entry]
   1311         num_entries_wanted = self.synch_count
   1312         num_entries_wanted -= len(chosen_entries)
   1313 
   1314         if num_entries_wanted > 0:
   1315             where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
   1316             pending_entries = list(HostQueueEntry.fetch(
   1317                      where=where_clause,
   1318                      params=(self.id, include_queue_entry.id)))
   1319 
   1320             # Sort the chosen hosts by hostname before slicing.
   1321             def cmp_queue_entries_by_hostname(entry_a, entry_b):
   1322                 return Host.cmp_for_sort(entry_a.host, entry_b.host)
   1323             pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
   1324             chosen_entries += pending_entries[:num_entries_wanted]
   1325 
   1326         # Sanity check.  We'll only ever be called if this can be met.
   1327         if len(chosen_entries) < self.synch_count:
   1328             message = ('job %s got less than %s chosen entries: %s' % (
   1329                     self.id, self.synch_count, chosen_entries))
   1330             logging.error(message)
   1331             email_manager.manager.enqueue_notify_email(
   1332                     'Job not started, too few chosen entries', message)
   1333             return []
   1334 
   1335         self._assign_new_group(chosen_entries)
   1336         return chosen_entries
   1337 
   1338 
   1339     def run_if_ready(self, queue_entry):
   1340         """
   1341         Run this job by kicking its HQEs into status='Starting' if enough
   1342         hosts are ready for it to run.
   1343 
   1344         Cleans up by kicking HQEs into status='Stopped' if this Job is not
   1345         ready to run.
   1346         """
   1347         if not self.is_ready():
   1348             self.stop_if_necessary()
   1349         else:
   1350             self.run(queue_entry)
   1351 
   1352 
   1353     def request_abort(self):
   1354         """Request that this Job be aborted on the next scheduler cycle."""
   1355         self.model().abort()
   1356 
   1357 
   1358     def run(self, queue_entry):
   1359         """
   1360         @param queue_entry: The HostQueueEntry instance calling this method.
   1361         """
   1362         queue_entries = self._choose_group_to_run(queue_entry)
   1363         if queue_entries:
   1364             self._finish_run(queue_entries)
   1365 
   1366 
   1367     def _finish_run(self, queue_entries):
   1368         for queue_entry in queue_entries:
   1369             queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
   1370 
   1371 
   1372     def __str__(self):
   1373         return '%s-%s' % (self.id, self.owner)
   1374