Home | History | Annotate | Download | only in scheduler
      1 # pylint: disable=missing-docstring
      2 
      3 """Database model classes for the scheduler.
      4 
      5 Contains model classes abstracting the various DB tables used by the scheduler.
      6 These overlap the Django models in basic functionality, but were written before
      7 the Django models existed and have not yet been phased out.  Some of them
      8 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic
      9 which would probably be ill-suited for inclusion in the general Django model
     10 classes.
     11 
     12 Globals:
     13 _notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
     14         one of these statuses, an email will be sent to the job's email_list.
     15         comes from global_config.
     16 _base_url: URL to the local AFE server, used to construct URLs for emails.
     17 _db: DatabaseConnection for this module.
     18 _drone_manager: reference to global DroneManager instance.
     19 """
     20 
     21 import base64
     22 import datetime
     23 import errno
     24 import itertools
     25 import logging
     26 import re
     27 import weakref
     28 
     29 import google.protobuf.internal.well_known_types as types
     30 
     31 from autotest_lib.client.common_lib import global_config, host_protections
     32 from autotest_lib.client.common_lib import time_utils
     33 from autotest_lib.client.common_lib import utils
     34 from autotest_lib.frontend.afe import models, model_attributes
     35 from autotest_lib.scheduler import drone_manager, email_manager
     36 from autotest_lib.scheduler import rdb_lib
     37 from autotest_lib.scheduler import scheduler_config
     38 from autotest_lib.scheduler import scheduler_lib
     39 from autotest_lib.server import afe_urls
     40 from autotest_lib.server.cros import provision
     41 
     42 try:
     43     from chromite.lib import metrics
     44     from chromite.lib import cloud_trace
     45 except ImportError:
     46     metrics = utils.metrics_mock
     47     import mock
     48     cloud_trace = mock.Mock()
     49 
     50 
     51 _notify_email_statuses = []
     52 _base_url = None
     53 
     54 _db = None
     55 _drone_manager = None
     56 
     57 RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
     58         'SKYLAB', 'respect_static_labels', type=bool, default=False)
     59 
     60 
     61 def initialize():
     62     global _db
     63     _db = scheduler_lib.ConnectionManager().get_connection()
     64 
     65     notify_statuses_list = global_config.global_config.get_config_value(
     66             scheduler_config.CONFIG_SECTION, "notify_email_statuses",
     67             default='')
     68     global _notify_email_statuses
     69     _notify_email_statuses = [status for status in
     70                               re.split(r'[\s,;:]', notify_statuses_list.lower())
     71                               if status]
     72 
     73     # AUTOTEST_WEB.base_url is still a supported config option as some people
     74     # may wish to override the entire url.
     75     global _base_url
     76     config_base_url = global_config.global_config.get_config_value(
     77             scheduler_config.CONFIG_SECTION, 'base_url', default='')
     78     if config_base_url:
     79         _base_url = config_base_url
     80     else:
     81         _base_url = afe_urls.ROOT_URL
     82 
     83     initialize_globals()
     84 
     85 
     86 def initialize_globals():
     87     global _drone_manager
     88     _drone_manager = drone_manager.instance()
     89 
     90 
     91 def get_job_metadata(job):
     92     """Get a dictionary of the job information.
     93 
     94     The return value is a dictionary that includes job information like id,
     95     name and parent job information. The value will be stored in metadata
     96     database.
     97 
     98     @param job: A Job object.
     99     @return: A dictionary containing the job id, owner and name.
    100     """
    101     if not job:
    102         logging.error('Job is None, no metadata returned.')
    103         return {}
    104     try:
    105         return {'job_id': job.id,
    106                 'owner': job.owner,
    107                 'job_name': job.name,
    108                 'parent_job_id': job.parent_job_id}
    109     except AttributeError as e:
    110         logging.error('Job has missing attribute: %s', e)
    111         return {}
    112 
    113 
    114 class DBError(Exception):
    115     """Raised by the DBObject constructor when its select fails."""
    116 
    117 
    118 class DBObject(object):
    119     """A miniature object relational model for the database."""
    120 
    121     # Subclasses MUST override these:
    122     _table_name = ''
    123     _fields = ()
    124 
    125     # A mapping from (type, id) to the instance of the object for that
    126     # particular id.  This prevents us from creating new Job() and Host()
    127     # instances for every HostQueueEntry object that we instantiate as
    128     # multiple HQEs often share the same Job.
    129     _instances_by_type_and_id = weakref.WeakValueDictionary()
    130     _initialized = False
    131 
    132 
    133     def __new__(cls, id=None, **kwargs):
    134         """
    135         Look to see if we already have an instance for this particular type
    136         and id.  If so, use it instead of creating a duplicate instance.
    137         """
    138         if id is not None:
    139             instance = cls._instances_by_type_and_id.get((cls, id))
    140             if instance:
    141                 return instance
    142         return super(DBObject, cls).__new__(cls, id=id, **kwargs)
    143 
    144 
    145     def __init__(self, id=None, row=None, new_record=False, always_query=True):
    146         assert bool(id) or bool(row)
    147         if id is not None and row is not None:
    148             assert id == row[0]
    149         assert self._table_name, '_table_name must be defined in your class'
    150         assert self._fields, '_fields must be defined in your class'
    151         if not new_record:
    152             if self._initialized and not always_query:
    153                 return  # We've already been initialized.
    154             if id is None:
    155                 id = row[0]
    156             # Tell future constructors to use us instead of re-querying while
    157             # this instance is still around.
    158             self._instances_by_type_and_id[(type(self), id)] = self
    159 
    160         self.__table = self._table_name
    161 
    162         self.__new_record = new_record
    163 
    164         if row is None:
    165             row = self._fetch_row_from_db(id)
    166 
    167         if self._initialized:
    168             differences = self._compare_fields_in_row(row)
    169             if differences:
    170                 logging.warning(
    171                     'initialized %s %s instance requery is updating: %s',
    172                     type(self), self.id, differences)
    173         self._update_fields_from_row(row)
    174         self._initialized = True
    175 
    176 
    177     @classmethod
    178     def _clear_instance_cache(cls):
    179         """Used for testing, clear the internal instance cache."""
    180         cls._instances_by_type_and_id.clear()
    181 
    182 
    183     def _fetch_row_from_db(self, row_id):
    184         fields = ', '.join(self._fields)
    185         sql = 'SELECT %s FROM %s WHERE ID=%%s' % (fields, self.__table)
    186         rows = _db.execute(sql, (row_id,))
    187         if not rows:
    188             raise DBError("row not found (table=%s, row id=%s)"
    189                           % (self.__table, row_id))
    190         return rows[0]
    191 
    192 
    193     def _assert_row_length(self, row):
    194         assert len(row) == len(self._fields), (
    195             "table = %s, row = %s/%d, fields = %s/%d" % (
    196             self.__table, row, len(row), self._fields, len(self._fields)))
    197 
    198 
    199     def _compare_fields_in_row(self, row):
    200         """
    201         Given a row as returned by a SELECT query, compare it to our existing in
    202         memory fields.  Fractional seconds are stripped from datetime values
    203         before comparison.
    204 
    205         @param row - A sequence of values corresponding to fields named in
    206                 The class attribute _fields.
    207 
    208         @returns A dictionary listing the differences keyed by field name
    209                 containing tuples of (current_value, row_value).
    210         """
    211         self._assert_row_length(row)
    212         differences = {}
    213         for field, row_value in itertools.izip(self._fields, row):
    214             current_value = getattr(self, field)
    215             if (isinstance(current_value, datetime.datetime)
    216                 and isinstance(row_value, datetime.datetime)):
    217                 current_value = current_value.strftime(time_utils.TIME_FMT)
    218                 row_value = row_value.strftime(time_utils.TIME_FMT)
    219             if current_value != row_value:
    220                 differences[field] = (current_value, row_value)
    221         return differences
    222 
    223 
    224     def _update_fields_from_row(self, row):
    225         """
    226         Update our field attributes using a single row returned by SELECT.
    227 
    228         @param row - A sequence of values corresponding to fields named in
    229                 the class fields list.
    230         """
    231         self._assert_row_length(row)
    232 
    233         self._valid_fields = set()
    234         for field, value in itertools.izip(self._fields, row):
    235             setattr(self, field, value)
    236             self._valid_fields.add(field)
    237 
    238         self._valid_fields.remove('id')
    239 
    240 
    241     def update_from_database(self):
    242         assert self.id is not None
    243         row = self._fetch_row_from_db(self.id)
    244         self._update_fields_from_row(row)
    245 
    246 
    247     def count(self, where, table = None):
    248         if not table:
    249             table = self.__table
    250 
    251         rows = _db.execute("""
    252                 SELECT count(*) FROM %s
    253                 WHERE %s
    254         """ % (table, where))
    255 
    256         assert len(rows) == 1
    257 
    258         return int(rows[0][0])
    259 
    260 
    261     def update_field(self, field, value):
    262         assert field in self._valid_fields
    263 
    264         if getattr(self, field) == value:
    265             return
    266 
    267         query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
    268         _db.execute(query, (value, self.id))
    269 
    270         setattr(self, field, value)
    271 
    272 
    273     def save(self):
    274         if self.__new_record:
    275             keys = self._fields[1:] # avoid id
    276             columns = ','.join([str(key) for key in keys])
    277             values = []
    278             for key in keys:
    279                 value = getattr(self, key)
    280                 if value is None:
    281                     values.append('NULL')
    282                 else:
    283                     values.append('"%s"' % value)
    284             values_str = ','.join(values)
    285             query = ('INSERT INTO %s (%s) VALUES (%s)' %
    286                      (self.__table, columns, values_str))
    287             _db.execute(query)
    288             # Update our id to the one the database just assigned to us.
    289             self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
    290 
    291 
    292     def delete(self):
    293         self._instances_by_type_and_id.pop((type(self), id), None)
    294         self._initialized = False
    295         self._valid_fields.clear()
    296         query = 'DELETE FROM %s WHERE id=%%s' % self.__table
    297         _db.execute(query, (self.id,))
    298 
    299 
    300     @staticmethod
    301     def _prefix_with(string, prefix):
    302         if string:
    303             string = prefix + string
    304         return string
    305 
    306 
    307     @classmethod
    308     def fetch_rows(cls, where='', params=(), joins='', order_by=''):
    309         """
    310         Fetch the rows based on the given database query.
    311 
    312         @yields the rows fetched by the given query.
    313         """
    314         order_by = cls._prefix_with(order_by, 'ORDER BY ')
    315         where = cls._prefix_with(where, 'WHERE ')
    316         fields = []
    317         for field in cls._fields:
    318             fields.append('%s.%s' % (cls._table_name, field))
    319 
    320         query = ('SELECT %(fields)s FROM %(table)s %(joins)s '
    321                  '%(where)s %(order_by)s' % {'fields' : ', '.join(fields),
    322                                              'table' : cls._table_name,
    323                                              'joins' : joins,
    324                                              'where' : where,
    325                                              'order_by' : order_by})
    326         rows = _db.execute(query, params)
    327         return rows
    328 
    329     @classmethod
    330     def fetch(cls, where='', params=(), joins='', order_by=''):
    331         """
    332         Construct instances of our class based on the given database query.
    333 
    334         @yields One class instance for each row fetched.
    335         """
    336         rows = cls.fetch_rows(where=where, params=params, joins=joins,
    337                               order_by=order_by)
    338         return [cls(id=row[0], row=row) for row in rows]
    339 
    340 
    341 class IneligibleHostQueue(DBObject):
    342     _table_name = 'afe_ineligible_host_queues'
    343     _fields = ('id', 'job_id', 'host_id')
    344 
    345 
    346 class AtomicGroup(DBObject):
    347     _table_name = 'afe_atomic_groups'
    348     _fields = ('id', 'name', 'description', 'max_number_of_machines',
    349                'invalid')
    350 
    351 
    352 class Label(DBObject):
    353     _table_name = 'afe_labels'
    354     _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
    355                'only_if_needed', 'atomic_group_id')
    356 
    357 
    358     def __repr__(self):
    359         return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
    360                 self.name, self.id, self.atomic_group_id)
    361 
    362 
    363 class Host(DBObject):
    364     _table_name = 'afe_hosts'
    365     # TODO(ayatane): synch_id is not used, remove after fixing DB.
    366     _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
    367                'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
    368                'leased', 'shard_id', 'lock_reason')
    369 
    370 
    371     def set_status(self,status):
    372         logging.info('%s -> %s', self.hostname, status)
    373         self.update_field('status',status)
    374 
    375 
    376     def _get_labels_with_platform(self, non_static_rows, static_rows):
    377         """Helper function to fetch labels & platform for a host."""
    378         if not RESPECT_STATIC_LABELS:
    379             return non_static_rows
    380 
    381         combined_rows = []
    382         replaced_labels = _db.execute(
    383                 'SELECT label_id FROM afe_replaced_labels')
    384         replaced_label_ids = {l[0] for l in replaced_labels}
    385 
    386         # We respect afe_labels more, which means:
    387         #   * if non-static labels are replaced, we find its replaced static
    388         #   labels from afe_static_labels by label name.
    389         #   * if non-static labels are not replaced, we keep it.
    390         #   * Drop static labels which don't have reference non-static labels.
    391         static_label_names = []
    392         for label_id, label_name, is_platform in non_static_rows:
    393             if label_id not in replaced_label_ids:
    394                 combined_rows.append((label_id, label_name, is_platform))
    395             else:
    396                 static_label_names.append(label_name)
    397 
    398         # Only keep static labels who have replaced non-static labels.
    399         for label_id, label_name, is_platform in static_rows:
    400             if label_name in static_label_names:
    401                 combined_rows.append((label_id, label_name, is_platform))
    402 
    403         return combined_rows
    404 
    405 
    406     def platform_and_labels(self):
    407         """
    408         Returns a tuple (platform_name, list_of_all_label_names).
    409         """
    410         template = ('SELECT %(label_table)s.id, %(label_table)s.name, '
    411                     '%(label_table)s.platform FROM %(label_table)s INNER '
    412                     'JOIN %(host_label_table)s '
    413                     'ON %(label_table)s.id = %(host_label_table)s.%(column)s '
    414                     'WHERE %(host_label_table)s.host_id = %(host_id)s '
    415                     'ORDER BY %(label_table)s.name')
    416         static_query = template % {
    417                 'host_label_table': 'afe_static_hosts_labels',
    418                 'label_table': 'afe_static_labels',
    419                 'column': 'staticlabel_id',
    420                 'host_id': self.id
    421         }
    422         non_static_query = template % {
    423                 'host_label_table': 'afe_hosts_labels',
    424                 'label_table': 'afe_labels',
    425                 'column': 'label_id',
    426                 'host_id': self.id
    427         }
    428         non_static_rows = _db.execute(non_static_query)
    429         static_rows = _db.execute(static_query)
    430 
    431         rows = self._get_labels_with_platform(non_static_rows, static_rows)
    432         platform = None
    433         all_labels = []
    434         for _, label_name, is_platform in rows:
    435             if is_platform:
    436                 platform = label_name
    437             all_labels.append(label_name)
    438         return platform, all_labels
    439 
    440 
    441     _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
    442 
    443 
    444     @classmethod
    445     def cmp_for_sort(cls, a, b):
    446         """
    447         A comparison function for sorting Host objects by hostname.
    448 
    449         This strips any trailing numeric digits, ignores leading 0s and
    450         compares hostnames by the leading name and the trailing digits as a
    451         number.  If both hostnames do not match this pattern, they are simply
    452         compared as lower case strings.
    453 
    454         Example of how hostnames will be sorted:
    455 
    456           alice, host1, host2, host09, host010, host10, host11, yolkfolk
    457 
    458         This hopefully satisfy most people's hostname sorting needs regardless
    459         of their exact naming schemes.  Nobody sane should have both a host10
    460         and host010 (but the algorithm works regardless).
    461         """
    462         lower_a = a.hostname.lower()
    463         lower_b = b.hostname.lower()
    464         match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
    465         match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
    466         if match_a and match_b:
    467             name_a, number_a_str = match_a.groups()
    468             name_b, number_b_str = match_b.groups()
    469             number_a = int(number_a_str.lstrip('0'))
    470             number_b = int(number_b_str.lstrip('0'))
    471             result = cmp((name_a, number_a), (name_b, number_b))
    472             if result == 0 and lower_a != lower_b:
    473                 # If they compared equal above but the lower case names are
    474                 # indeed different, don't report equality.  abc012 != abc12.
    475                 return cmp(lower_a, lower_b)
    476             return result
    477         else:
    478             return cmp(lower_a, lower_b)
    479 
    480 
    481 class HostQueueEntry(DBObject):
    482     _table_name = 'afe_host_queue_entries'
    483     _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
    484                'active', 'complete', 'deleted', 'execution_subdir',
    485                'atomic_group_id', 'aborted', 'started_on', 'finished_on')
    486 
    487     _COMPLETION_COUNT_METRIC = metrics.Counter(
    488         'chromeos/autotest/scheduler/hqe_completion_count')
    489 
    490     def __init__(self, id=None, row=None, job_row=None, **kwargs):
    491         """
    492         @param id: ID field from afe_host_queue_entries table.
    493                    Either id or row should be specified for initialization.
    494         @param row: The DB row for a particular HostQueueEntry.
    495                     Either id or row should be specified for initialization.
    496         @param job_row: The DB row for the job of this HostQueueEntry.
    497         """
    498         assert id or row
    499         super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
    500         self.job = Job(self.job_id, row=job_row)
    501 
    502         if self.host_id:
    503             self.host = rdb_lib.get_hosts([self.host_id])[0]
    504             self.host.dbg_str = self.get_dbg_str()
    505             self.host.metadata = get_job_metadata(self.job)
    506         else:
    507             self.host = None
    508 
    509 
    510     @classmethod
    511     def clone(cls, template):
    512         """
    513         Creates a new row using the values from a template instance.
    514 
    515         The new instance will not exist in the database or have a valid
    516         id attribute until its save() method is called.
    517         """
    518         assert isinstance(template, cls)
    519         new_row = [getattr(template, field) for field in cls._fields]
    520         clone = cls(row=new_row, new_record=True)
    521         clone.id = None
    522         return clone
    523 
    524 
    525     @classmethod
    526     def fetch(cls, where='', params=(), joins='', order_by=''):
    527         """
    528         Construct instances of our class based on the given database query.
    529 
    530         @yields One class instance for each row fetched.
    531         """
    532         # Override the original fetch method to pre-fetch the jobs from the DB
    533         # in order to prevent each HQE making separate DB queries.
    534         rows = cls.fetch_rows(where=where, params=params, joins=joins,
    535                               order_by=order_by)
    536         if len(rows) <= 1:
    537             return [cls(id=row[0], row=row) for row in rows]
    538 
    539         job_params = ', '.join([str(row[1]) for row in rows])
    540         job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params))
    541         # Create a Job_id to Job_row match dictionary to match the HQE
    542         # to its corresponding job.
    543         job_dict = {job_row[0]: job_row for job_row in job_rows}
    544         return [cls(id=row[0], row=row, job_row=job_dict.get(row[1]))
    545                 for row in rows]
    546 
    547 
    548     def _view_job_url(self):
    549         return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
    550 
    551 
    552     def get_labels(self):
    553         """
    554         Get all labels associated with this host queue entry (either via the
    555         meta_host or as a job dependency label).  The labels yielded are not
    556         guaranteed to be unique.
    557 
    558         @yields Label instances associated with this host_queue_entry.
    559         """
    560         if self.meta_host:
    561             yield Label(id=self.meta_host, always_query=False)
    562         labels = Label.fetch(
    563                 joins="JOIN afe_jobs_dependency_labels AS deps "
    564                       "ON (afe_labels.id = deps.label_id)",
    565                 where="deps.job_id = %d" % self.job.id)
    566         for label in labels:
    567             yield label
    568 
    569 
    570     def set_host(self, host):
    571         if host:
    572             logging.info('Assigning host %s to entry %s', host.hostname, self)
    573             self.update_field('host_id', host.id)
    574             self.block_host(host.id)
    575         else:
    576             logging.info('Releasing host from %s', self)
    577             self.unblock_host(self.host.id)
    578             self.update_field('host_id', None)
    579 
    580         self.host = host
    581 
    582 
    583     def block_host(self, host_id):
    584         logging.info("creating block %s/%s", self.job.id, host_id)
    585         row = [0, self.job.id, host_id]
    586         block = IneligibleHostQueue(row=row, new_record=True)
    587         block.save()
    588 
    589 
    590     def unblock_host(self, host_id):
    591         logging.info("removing block %s/%s", self.job.id, host_id)
    592         blocks = IneligibleHostQueue.fetch(
    593             'job_id=%d and host_id=%d' % (self.job.id, host_id))
    594         for block in blocks:
    595             block.delete()
    596 
    597 
    598     def set_execution_subdir(self, subdir=None):
    599         if subdir is None:
    600             assert self.host
    601             subdir = self.host.hostname
    602         self.update_field('execution_subdir', subdir)
    603 
    604 
    605     def _get_hostname(self):
    606         if self.host:
    607             return self.host.hostname
    608         return 'no host'
    609 
    610 
    611     def get_dbg_str(self):
    612         """Get a debug string to identify this host.
    613 
    614         @return: A string containing the hqe and job id.
    615         """
    616         try:
    617             return 'HQE: %s, for job: %s' % (self.id, self.job_id)
    618         except AttributeError as e:
    619             return 'HQE has not been initialized yet: %s' % e
    620 
    621 
    622     def __str__(self):
    623         flags = []
    624         if self.active:
    625             flags.append('active')
    626         if self.complete:
    627             flags.append('complete')
    628         if self.deleted:
    629             flags.append('deleted')
    630         if self.aborted:
    631             flags.append('aborted')
    632         flags_str = ','.join(flags)
    633         if flags_str:
    634             flags_str = ' [%s]' % flags_str
    635         return ("%s and host: %s has status:%s%s" %
    636                 (self.get_dbg_str(), self._get_hostname(), self.status,
    637                  flags_str))
    638 
    639 
    640     def set_status(self, status):
    641         logging.info("%s -> %s", self, status)
    642 
    643         self.update_field('status', status)
    644 
    645         active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
    646         complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
    647 
    648         self.update_field('active', active)
    649 
    650         # The ordering of these operations is important. Once we set the
    651         # complete bit this job will become indistinguishable from all
    652         # the other complete jobs, unless we first set shard_id to NULL
    653         # to signal to the shard_client that we need to upload it. However,
    654         # we can only set both these after we've updated finished_on etc
    655         # within _on_complete or the job will get synced in an intermediate
    656         # state. This means that if someone sigkills the scheduler between
    657         # setting finished_on and complete, we will have inconsistent jobs.
    658         # This should be fine, because nothing critical checks finished_on,
    659         # and the scheduler should never be killed mid-tick.
    660         if complete:
    661             self._on_complete(status)
    662             self._email_on_job_complete()
    663 
    664         self.update_field('complete', complete)
    665 
    666         should_email_status = (status.lower() in _notify_email_statuses or
    667                                'all' in _notify_email_statuses)
    668         if should_email_status:
    669             self._email_on_status(status)
    670         logging.debug('HQE Set Status Complete')
    671 
    672 
    673     def _on_complete(self, status):
    674         metric_fields = {'status': status.lower()}
    675         if self.host:
    676             metric_fields['board'] = self.host.board or ''
    677             if len(self.host.pools) == 1:
    678                 metric_fields['pool'] = self.host.pools[0]
    679             else:
    680                 metric_fields['pool'] = 'MULTIPLE'
    681         else:
    682             metric_fields['board'] = 'NO_HOST'
    683             metric_fields['pool'] = 'NO_HOST'
    684         self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
    685         if status is not models.HostQueueEntry.Status.ABORTED:
    686             self.job.stop_if_necessary()
    687         if self.started_on:
    688             self.set_finished_on_now()
    689             self._log_trace()
    690         if self.job.shard_id is not None:
    691             # If shard_id is None, the job will be synced back to the master
    692             self.job.update_field('shard_id', None)
    693         if not self.execution_subdir:
    694             return
    695         # unregister any possible pidfiles associated with this queue entry
    696         for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
    697             pidfile_id = _drone_manager.get_pidfile_id_from(
    698                     self.execution_path(), pidfile_name=pidfile_name)
    699             _drone_manager.unregister_pidfile(pidfile_id)
    700 
    701     def _log_trace(self):
    702         """Emits a Cloud Trace span for the HQE's duration."""
    703         if self.started_on and self.finished_on:
    704             span = cloud_trace.Span('HQE', spanId='0',
    705                                     traceId=hqe_trace_id(self.id))
    706             # TODO(phobbs) make a .SetStart() and .SetEnd() helper method
    707             span.startTime = types.Timestamp()
    708             span.startTime.FromDatetime(self.started_on)
    709             span.endTime = types.Timestamp()
    710             span.endTime.FromDatetime(self.finished_on)
    711             # TODO(phobbs) any LogSpan calls need to be wrapped in this for
    712             # safety during tests, so this should be caught within LogSpan.
    713             try:
    714                 cloud_trace.LogSpan(span)
    715             except IOError as e:
    716                 if e.errno == errno.ENOENT:
    717                     logging.warning('Error writing to cloud trace results '
    718                                     'directory: %s', e)
    719 
    720 
    721     def _get_status_email_contents(self, status, summary=None, hostname=None):
    722         """
    723         Gather info for the status notification e-mails.
    724 
    725         If needed, we could start using the Django templating engine to create
    726         the subject and the e-mail body, but that doesn't seem necessary right
    727         now.
    728 
    729         @param status: Job status text. Mandatory.
    730         @param summary: Job summary text. Optional.
    731         @param hostname: A hostname for the job. Optional.
    732 
    733         @return: Tuple (subject, body) for the notification e-mail.
    734         """
    735         job_stats = Job(id=self.job.id).get_execution_details()
    736 
    737         subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
    738                    (self.job.id, self.job.name, status))
    739 
    740         if hostname is not None:
    741             subject += '| Hostname: %s ' % hostname
    742 
    743         if status not in ["1 Failed", "Failed"]:
    744             subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
    745 
    746         body =  "Job ID: %s\n" % self.job.id
    747         body += "Job name: %s\n" % self.job.name
    748         if hostname is not None:
    749             body += "Host: %s\n" % hostname
    750         if summary is not None:
    751             body += "Summary: %s\n" % summary
    752         body += "Status: %s\n" % status
    753         body += "Results interface URL: %s\n" % self._view_job_url()
    754         body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
    755         if int(job_stats['total_executed']) > 0:
    756             body += "User tests executed: %s\n" % job_stats['total_executed']
    757             body += "User tests passed: %s\n" % job_stats['total_passed']
    758             body += "User tests failed: %s\n" % job_stats['total_failed']
    759             body += ("User tests success rate: %.2f %%\n" %
    760                      job_stats['success_rate'])
    761 
    762         if job_stats['failed_rows']:
    763             body += "Failures:\n"
    764             body += job_stats['failed_rows']
    765 
    766         return subject, body
    767 
    768 
    769     def _email_on_status(self, status):
    770         hostname = self._get_hostname()
    771         subject, body = self._get_status_email_contents(status, None, hostname)
    772         email_manager.manager.send_email(self.job.email_list, subject, body)
    773 
    774 
    775     def _email_on_job_complete(self):
    776         if not self.job.is_finished():
    777             return
    778 
    779         summary = []
    780         hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
    781         for queue_entry in hosts_queue:
    782             summary.append("Host: %s Status: %s" %
    783                                 (queue_entry._get_hostname(),
    784                                  queue_entry.status))
    785 
    786         summary = "\n".join(summary)
    787         status_counts = models.Job.objects.get_status_counts(
    788                 [self.job.id])[self.job.id]
    789         status = ', '.join('%d %s' % (count, status) for status, count
    790                     in status_counts.iteritems())
    791 
    792         subject, body = self._get_status_email_contents(status, summary, None)
    793         email_manager.manager.send_email(self.job.email_list, subject, body)
    794 
    795 
    796     def schedule_pre_job_tasks(self):
    797         logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
    798                      self.job.name, self.meta_host, self.atomic_group_id,
    799                      self.job.id, self.id, self.host.hostname, self.status)
    800 
    801         self._do_schedule_pre_job_tasks()
    802 
    803 
    804     def _do_schedule_pre_job_tasks(self):
    805         self.job.schedule_pre_job_tasks(queue_entry=self)
    806 
    807 
    808     def requeue(self):
    809         assert self.host
    810         self.set_status(models.HostQueueEntry.Status.QUEUED)
    811         self.update_field('started_on', None)
    812         self.update_field('finished_on', None)
    813         # verify/cleanup failure sets the execution subdir, so reset it here
    814         self.set_execution_subdir('')
    815         if self.meta_host:
    816             self.set_host(None)
    817 
    818 
    819     @property
    820     def aborted_by(self):
    821         self._load_abort_info()
    822         return self._aborted_by
    823 
    824 
    825     @property
    826     def aborted_on(self):
    827         self._load_abort_info()
    828         return self._aborted_on
    829 
    830 
    831     def _load_abort_info(self):
    832         """ Fetch info about who aborted the job. """
    833         if hasattr(self, "_aborted_by"):
    834             return
    835         rows = _db.execute("""
    836                 SELECT afe_users.login,
    837                         afe_aborted_host_queue_entries.aborted_on
    838                 FROM afe_aborted_host_queue_entries
    839                 INNER JOIN afe_users
    840                 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
    841                 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
    842                 """, (self.id,))
    843         if rows:
    844             self._aborted_by, self._aborted_on = rows[0]
    845         else:
    846             self._aborted_by = self._aborted_on = None
    847 
    848 
    849     def on_pending(self):
    850         """
    851         Called when an entry in a synchronous job has passed verify.  If the
    852         job is ready to run, sets the entries to STARTING. Otherwise, it leaves
    853         them in PENDING.
    854         """
    855         self.set_status(models.HostQueueEntry.Status.PENDING)
    856         if not self.host:
    857             raise scheduler_lib.NoHostIdError(
    858                     'Failed to recover a job whose host_queue_entry_id=%r due'
    859                     ' to no host_id.'
    860                     % self.id)
    861         self.host.set_status(models.Host.Status.PENDING)
    862 
    863         # Some debug code here: sends an email if an asynchronous job does not
    864         # immediately enter Starting.
    865         # TODO: Remove this once we figure out why asynchronous jobs are getting
    866         # stuck in Pending.
    867         self.job.run_if_ready(queue_entry=self)
    868         if (self.job.synch_count == 1 and
    869                 self.status == models.HostQueueEntry.Status.PENDING):
    870             subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
    871             message = 'Asynchronous job stuck in Pending'
    872             email_manager.manager.enqueue_notify_email(subject, message)
    873 
    874 
    875     def abort(self, dispatcher):
    876         assert self.aborted and not self.complete
    877 
    878         Status = models.HostQueueEntry.Status
    879         if self.status in {Status.GATHERING, Status.PARSING}:
    880             # do nothing; post-job tasks will finish and then mark this entry
    881             # with status "Aborted" and take care of the host
    882             return
    883 
    884         if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
    885             # If hqe is in any of these status, it should not have any
    886             # unfinished agent before it can be aborted.
    887             agents = dispatcher.get_agents_for_entry(self)
    888             # Agent with finished task can be left behind. This is added to
    889             # handle the special case of aborting hostless job in STARTING
    890             # status, in which the agent has only a HostlessQueueTask
    891             # associated. The finished HostlessQueueTask will be cleaned up in
    892             # the next tick, so it's safe to leave the agent there. Without
    893             # filtering out finished agent, HQE abort won't be able to proceed.
    894             assert all([agent.is_done() for agent in agents])
    895 
    896         if self.host:
    897             if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
    898                 self.host.set_status(models.Host.Status.READY)
    899             elif self.status in {Status.VERIFYING, Status.RESETTING}:
    900                 models.SpecialTask.objects.create(
    901                         task=models.SpecialTask.Task.CLEANUP,
    902                         host=models.Host.objects.get(id=self.host.id),
    903                         requested_by=self.job.owner_model())
    904             elif self.status == Status.PROVISIONING:
    905                 models.SpecialTask.objects.create(
    906                         task=models.SpecialTask.Task.REPAIR,
    907                         host=models.Host.objects.get(id=self.host.id),
    908                         requested_by=self.job.owner_model())
    909 
    910         self.set_status(Status.ABORTED)
    911 
    912 
    913     def execution_tag(self):
    914         SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
    915                                'complete!=1 AND execution_subdir="" AND '
    916                                'status!="Queued";')
    917         SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
    918                                  'status="Aborted" WHERE id=%s;')
    919         try:
    920             assert self.execution_subdir
    921         except AssertionError:
    922             # TODO(scottz): Remove temporary fix/info gathering pathway for
    923             # crosbug.com/31595 once issue is root caused.
    924             logging.error('No execution_subdir for host queue id:%s.', self.id)
    925             logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
    926             for row in _db.execute(SQL_SUSPECT_ENTRIES):
    927                 logging.error(row)
    928             logging.error('====DB DEBUG====\n')
    929             fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
    930             logging.error('EXECUTING: %s', fix_query)
    931             _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
    932             raise AssertionError(('self.execution_subdir not found. '
    933                                   'See log for details.'))
    934 
    935         return "%s/%s" % (self.job.tag(), self.execution_subdir)
    936 
    937 
    938     def execution_path(self):
    939         return self.execution_tag()
    940 
    941 
    942     def set_started_on_now(self):
    943         self.update_field('started_on', datetime.datetime.now())
    944 
    945 
    946     def set_finished_on_now(self):
    947         self.update_field('finished_on', datetime.datetime.now())
    948 
    949 
    950     def is_hostless(self):
    951         return (self.host_id is None
    952                 and self.meta_host is None)
    953 
    954 
    955 def hqe_trace_id(hqe_id):
    956     """Constructs the canonical trace id based on the HQE's id.
    957 
    958     Encodes 'HQE' in base16 and concatenates with the hex representation
    959     of the HQE's id.
    960 
    961     @param hqe_id: The HostQueueEntry's id.
    962 
    963     Returns:
    964         A trace id (in hex format)
    965     """
    966     return base64.b16encode('HQE') + hex(hqe_id)[2:]
    967 
    968 
    969 class Job(DBObject):
    970     _table_name = 'afe_jobs'
    971     _fields = ('id', 'owner', 'name', 'priority', 'control_file',
    972                'control_type', 'created_on', 'synch_count', 'timeout',
    973                'run_verify', 'email_list', 'reboot_before', 'reboot_after',
    974                'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
    975                'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
    976                'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
    977                'require_ssp')
    978 
    979     # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
    980     # all status='Pending' atomic group HQEs incase a delay was running when the
    981     # scheduler was restarted and no more hosts ever successfully exit Verify.
    982 
    983     def __init__(self, id=None, row=None, **kwargs):
    984         assert id or row
    985         super(Job, self).__init__(id=id, row=row, **kwargs)
    986         self._owner_model = None # caches model instance of owner
    987         self.update_image_path = None # path of OS image to install
    988 
    989 
    990     def model(self):
    991         return models.Job.objects.get(id=self.id)
    992 
    993 
    994     def owner_model(self):
    995         # work around the fact that the Job owner field is a string, not a
    996         # foreign key
    997         if not self._owner_model:
    998             self._owner_model = models.User.objects.get(login=self.owner)
    999         return self._owner_model
   1000 
   1001 
   1002     def tag(self):
   1003         return "%s-%s" % (self.id, self.owner)
   1004 
   1005 
   1006     def get_execution_details(self):
   1007         """
   1008         Get test execution details for this job.
   1009 
   1010         @return: Dictionary with test execution details
   1011         """
   1012         def _find_test_jobs(rows):
   1013             """
   1014             Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
   1015             Those are autotest 'internal job' tests, so they should not be
   1016             counted when evaluating the test stats.
   1017 
   1018             @param rows: List of rows (matrix) with database results.
   1019             """
   1020             job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
   1021             n_test_jobs = 0
   1022             for r in rows:
   1023                 test_name = r[0]
   1024                 if job_test_pattern.match(test_name):
   1025                     n_test_jobs += 1
   1026 
   1027             return n_test_jobs
   1028 
   1029         stats = {}
   1030 
   1031         rows = _db.execute("""
   1032                 SELECT t.test, s.word, t.reason
   1033                 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
   1034                 WHERE t.job_idx = j.job_idx
   1035                 AND s.status_idx = t.status
   1036                 AND j.afe_job_id = %s
   1037                 ORDER BY t.reason
   1038                 """ % self.id)
   1039 
   1040         failed_rows = [r for r in rows if not r[1] == 'GOOD']
   1041 
   1042         n_test_jobs = _find_test_jobs(rows)
   1043         n_test_jobs_failed = _find_test_jobs(failed_rows)
   1044 
   1045         total_executed = len(rows) - n_test_jobs
   1046         total_failed = len(failed_rows) - n_test_jobs_failed
   1047 
   1048         if total_executed > 0:
   1049             success_rate = 100 - ((total_failed / float(total_executed)) * 100)
   1050         else:
   1051             success_rate = 0
   1052 
   1053         stats['total_executed'] = total_executed
   1054         stats['total_failed'] = total_failed
   1055         stats['total_passed'] = total_executed - total_failed
   1056         stats['success_rate'] = success_rate
   1057 
   1058         status_header = ("Test Name", "Status", "Reason")
   1059         if failed_rows:
   1060             stats['failed_rows'] = utils.matrix_to_string(failed_rows,
   1061                                                           status_header)
   1062         else:
   1063             stats['failed_rows'] = ''
   1064 
   1065         time_row = _db.execute("""
   1066                    SELECT started_time, finished_time
   1067                    FROM tko_jobs
   1068                    WHERE afe_job_id = %s
   1069                    """ % self.id)
   1070 
   1071         if time_row:
   1072             t_begin, t_end = time_row[0]
   1073             try:
   1074                 delta = t_end - t_begin
   1075                 minutes, seconds = divmod(delta.seconds, 60)
   1076                 hours, minutes = divmod(minutes, 60)
   1077                 stats['execution_time'] = ("%02d:%02d:%02d" %
   1078                                            (hours, minutes, seconds))
   1079             # One of t_end or t_begin are None
   1080             except TypeError:
   1081                 stats['execution_time'] = '(could not determine)'
   1082         else:
   1083             stats['execution_time'] = '(none)'
   1084 
   1085         return stats
   1086 
   1087 
   1088     def keyval_dict(self):
   1089         return self.model().keyval_dict()
   1090 
   1091 
   1092     def _pending_count(self):
   1093         """The number of HostQueueEntries for this job in the Pending state."""
   1094         pending_entries = models.HostQueueEntry.objects.filter(
   1095                 job=self.id, status=models.HostQueueEntry.Status.PENDING)
   1096         return pending_entries.count()
   1097 
   1098 
   1099     def is_ready(self):
   1100         pending_count = self._pending_count()
   1101         ready = (pending_count >= self.synch_count)
   1102 
   1103         if not ready:
   1104             logging.info(
   1105                     'Job %s not ready: %s pending, %s required ',
   1106                     self, pending_count, self.synch_count)
   1107 
   1108         return ready
   1109 
   1110 
   1111     def num_machines(self, clause = None):
   1112         sql = "job_id=%s" % self.id
   1113         if clause:
   1114             sql += " AND (%s)" % clause
   1115         return self.count(sql, table='afe_host_queue_entries')
   1116 
   1117 
   1118     def num_queued(self):
   1119         return self.num_machines('not complete')
   1120 
   1121 
   1122     def num_active(self):
   1123         return self.num_machines('active')
   1124 
   1125 
   1126     def num_complete(self):
   1127         return self.num_machines('complete')
   1128 
   1129 
   1130     def is_finished(self):
   1131         return self.num_complete() == self.num_machines()
   1132 
   1133 
   1134     def _not_yet_run_entries(self, include_active=True):
   1135         if include_active:
   1136           statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
   1137         else:
   1138           statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
   1139         return models.HostQueueEntry.objects.filter(job=self.id,
   1140                                                     status__in=statuses)
   1141 
   1142 
   1143     def _stop_all_entries(self):
   1144         """Stops the job's inactive pre-job HQEs."""
   1145         entries_to_stop = self._not_yet_run_entries(
   1146             include_active=False)
   1147         for child_entry in entries_to_stop:
   1148             assert not child_entry.complete, (
   1149                 '%s status=%s, active=%s, complete=%s' %
   1150                 (child_entry.id, child_entry.status, child_entry.active,
   1151                  child_entry.complete))
   1152             if child_entry.status == models.HostQueueEntry.Status.PENDING:
   1153                 child_entry.host.status = models.Host.Status.READY
   1154                 child_entry.host.save()
   1155             child_entry.status = models.HostQueueEntry.Status.STOPPED
   1156             child_entry.save()
   1157 
   1158 
   1159     def stop_if_necessary(self):
   1160         not_yet_run = self._not_yet_run_entries()
   1161         if not_yet_run.count() < self.synch_count:
   1162             self._stop_all_entries()
   1163 
   1164 
   1165     def _next_group_name(self):
   1166         """@returns a directory name to use for the next host group results."""
   1167         group_count_re = re.compile(r'group(\d+)')
   1168         query = models.HostQueueEntry.objects.filter(
   1169             job=self.id).values('execution_subdir').distinct()
   1170         subdirs = (entry['execution_subdir'] for entry in query)
   1171         group_matches = (group_count_re.match(subdir) for subdir in subdirs)
   1172         ids = [int(match.group(1)) for match in group_matches if match]
   1173         if ids:
   1174             next_id = max(ids) + 1
   1175         else:
   1176             next_id = 0
   1177         return 'group%d' % (next_id,)
   1178 
   1179 
   1180     def get_group_entries(self, queue_entry_from_group):
   1181         """
   1182         @param queue_entry_from_group: A HostQueueEntry instance to find other
   1183                 group entries on this job for.
   1184 
   1185         @returns A list of HostQueueEntry objects all executing this job as
   1186                 part of the same group as the one supplied (having the same
   1187                 execution_subdir).
   1188         """
   1189         execution_subdir = queue_entry_from_group.execution_subdir
   1190         return list(HostQueueEntry.fetch(
   1191             where='job_id=%s AND execution_subdir=%s',
   1192             params=(self.id, execution_subdir)))
   1193 
   1194 
   1195     def _should_run_cleanup(self, queue_entry):
   1196         if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
   1197             return True
   1198         elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
   1199             return queue_entry.host.dirty
   1200         return False
   1201 
   1202 
   1203     def _should_run_verify(self, queue_entry):
   1204         do_not_verify = (queue_entry.host.protection ==
   1205                          host_protections.Protection.DO_NOT_VERIFY)
   1206         if do_not_verify:
   1207             return False
   1208         # If RebootBefore is set to NEVER, then we won't run reset because
   1209         # we can't cleanup, so we need to weaken a Reset into a Verify.
   1210         weaker_reset = (self.run_reset and
   1211                 self.reboot_before == model_attributes.RebootBefore.NEVER)
   1212         return self.run_verify or weaker_reset
   1213 
   1214 
   1215     def _should_run_reset(self, queue_entry):
   1216         can_verify = (queue_entry.host.protection !=
   1217                          host_protections.Protection.DO_NOT_VERIFY)
   1218         can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
   1219         return (can_reboot and can_verify
   1220                 and (self.run_reset
   1221                      or (self._should_run_cleanup(queue_entry)
   1222                          and self._should_run_verify(queue_entry))))
   1223 
   1224 
   1225     def _should_run_provision(self, queue_entry):
   1226         """
   1227         Determine if the queue_entry needs to have a provision task run before
   1228         it to provision queue_entry.host.
   1229 
   1230         @param queue_entry: The host queue entry in question.
   1231         @returns: True if we should schedule a provision task, False otherwise.
   1232 
   1233         """
   1234         # If we get to this point, it means that the scheduler has already
   1235         # vetted that all the unprovisionable labels match, so we can just
   1236         # find all labels on the job that aren't on the host to get the list
   1237         # of what we need to provision.  (See the scheduling logic in
   1238         # host_scheduler.py:is_host_eligable_for_job() where we discard all
   1239         # actionable labels when assigning jobs to hosts.)
   1240         job_labels = {x.name for x in queue_entry.get_labels()}
   1241         # Skip provision if `skip_provision` is listed in the job labels.
   1242         if provision.SKIP_PROVISION in job_labels:
   1243             return False
   1244         _, host_labels = queue_entry.host.platform_and_labels()
   1245         # If there are any labels on the job that are not on the host and they
   1246         # are labels that provisioning knows how to change, then that means
   1247         # there is provisioning work to do.  If there's no provisioning work to
   1248         # do, then obviously we have no reason to schedule a provision task!
   1249         diff = job_labels - set(host_labels)
   1250         if any([provision.Provision.acts_on(x) for x in diff]):
   1251             return True
   1252         return False
   1253 
   1254 
   1255     def _queue_special_task(self, queue_entry, task):
   1256         """
   1257         Create a special task and associate it with a host queue entry.
   1258 
   1259         @param queue_entry: The queue entry this special task should be
   1260                             associated with.
   1261         @param task: One of the members of the enum models.SpecialTask.Task.
   1262         @returns: None
   1263 
   1264         """
   1265         models.SpecialTask.objects.create(
   1266                 host=models.Host.objects.get(id=queue_entry.host_id),
   1267                 queue_entry=queue_entry, task=task)
   1268 
   1269 
   1270     def schedule_pre_job_tasks(self, queue_entry):
   1271         """
   1272         Queue all of the special tasks that need to be run before a host
   1273         queue entry may run.
   1274 
   1275         If no special taskes need to be scheduled, then |on_pending| will be
   1276         called directly.
   1277 
   1278         @returns None
   1279 
   1280         """
   1281         task_queued = False
   1282         hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
   1283 
   1284         if self._should_run_provision(queue_entry):
   1285             self._queue_special_task(hqe_model,
   1286                                      models.SpecialTask.Task.PROVISION)
   1287             task_queued = True
   1288         elif self._should_run_reset(queue_entry):
   1289             self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
   1290             task_queued = True
   1291         else:
   1292             if self._should_run_cleanup(queue_entry):
   1293                 self._queue_special_task(hqe_model,
   1294                                          models.SpecialTask.Task.CLEANUP)
   1295                 task_queued = True
   1296             if self._should_run_verify(queue_entry):
   1297                 self._queue_special_task(hqe_model,
   1298                                          models.SpecialTask.Task.VERIFY)
   1299                 task_queued = True
   1300 
   1301         if not task_queued:
   1302             queue_entry.on_pending()
   1303 
   1304 
   1305     def _assign_new_group(self, queue_entries):
   1306         if len(queue_entries) == 1:
   1307             group_subdir_name = queue_entries[0].host.hostname
   1308         else:
   1309             group_subdir_name = self._next_group_name()
   1310             logging.info('Running synchronous job %d hosts %s as %s',
   1311                 self.id, [entry.host.hostname for entry in queue_entries],
   1312                 group_subdir_name)
   1313 
   1314         for queue_entry in queue_entries:
   1315             queue_entry.set_execution_subdir(group_subdir_name)
   1316 
   1317 
   1318     def _choose_group_to_run(self, include_queue_entry):
   1319         """
   1320         @returns A tuple containing a list of HostQueueEntry instances to be
   1321                 used to run this Job, a string group name to suggest giving
   1322                 to this job in the results database.
   1323         """
   1324         chosen_entries = [include_queue_entry]
   1325         num_entries_wanted = self.synch_count
   1326         num_entries_wanted -= len(chosen_entries)
   1327 
   1328         if num_entries_wanted > 0:
   1329             where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
   1330             pending_entries = list(HostQueueEntry.fetch(
   1331                      where=where_clause,
   1332                      params=(self.id, include_queue_entry.id)))
   1333 
   1334             # Sort the chosen hosts by hostname before slicing.
   1335             def cmp_queue_entries_by_hostname(entry_a, entry_b):
   1336                 return Host.cmp_for_sort(entry_a.host, entry_b.host)
   1337             pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
   1338             chosen_entries += pending_entries[:num_entries_wanted]
   1339 
   1340         # Sanity check.  We'll only ever be called if this can be met.
   1341         if len(chosen_entries) < self.synch_count:
   1342             message = ('job %s got less than %s chosen entries: %s' % (
   1343                     self.id, self.synch_count, chosen_entries))
   1344             logging.error(message)
   1345             email_manager.manager.enqueue_notify_email(
   1346                     'Job not started, too few chosen entries', message)
   1347             return []
   1348 
   1349         self._assign_new_group(chosen_entries)
   1350         return chosen_entries
   1351 
   1352 
   1353     def run_if_ready(self, queue_entry):
   1354         """
   1355         Run this job by kicking its HQEs into status='Starting' if enough
   1356         hosts are ready for it to run.
   1357 
   1358         Cleans up by kicking HQEs into status='Stopped' if this Job is not
   1359         ready to run.
   1360         """
   1361         if not self.is_ready():
   1362             self.stop_if_necessary()
   1363         else:
   1364             self.run(queue_entry)
   1365 
   1366 
   1367     def request_abort(self):
   1368         """Request that this Job be aborted on the next scheduler cycle."""
   1369         self.model().abort()
   1370 
   1371 
   1372     def run(self, queue_entry):
   1373         """
   1374         @param queue_entry: The HostQueueEntry instance calling this method.
   1375         """
   1376         queue_entries = self._choose_group_to_run(queue_entry)
   1377         if queue_entries:
   1378             self._finish_run(queue_entries)
   1379 
   1380 
   1381     def _finish_run(self, queue_entries):
   1382         for queue_entry in queue_entries:
   1383             queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
   1384 
   1385 
   1386     def __str__(self):
   1387         return '%s-%s' % (self.id, self.owner)
   1388