Home | History | Annotate | Download | only in scheduler
      1 # pylint: disable=missing-docstring
      2 
      3 """Database model classes for the scheduler.
      4 
      5 Contains model classes abstracting the various DB tables used by the scheduler.
      6 These overlap the Django models in basic functionality, but were written before
      7 the Django models existed and have not yet been phased out.  Some of them
      8 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic
      9 which would probably be ill-suited for inclusion in the general Django model
     10 classes.
     11 
     12 Globals:
     13 _notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
     14         one of these statuses, an email will be sent to the job's email_list.
     15         comes from global_config.
     16 _base_url: URL to the local AFE server, used to construct URLs for emails.
     17 _db: DatabaseConnection for this module.
     18 _drone_manager: reference to global DroneManager instance.
     19 """
     20 
     21 import base64
     22 import datetime
     23 import errno
     24 import itertools
     25 import logging
     26 import re
     27 import weakref
     28 
     29 import google.protobuf.internal.well_known_types as types
     30 
     31 from autotest_lib.client.common_lib import global_config, host_protections
     32 from autotest_lib.client.common_lib import time_utils
     33 from autotest_lib.client.common_lib import utils
     34 from autotest_lib.frontend.afe import models, model_attributes
     35 from autotest_lib.scheduler import drone_manager, email_manager
     36 from autotest_lib.scheduler import rdb_lib
     37 from autotest_lib.scheduler import scheduler_config
     38 from autotest_lib.scheduler import scheduler_lib
     39 from autotest_lib.server import afe_urls
     40 from autotest_lib.server.cros import provision
     41 
     42 try:
     43     from chromite.lib import metrics
     44     from chromite.lib import cloud_trace
     45 except ImportError:
     46     metrics = utils.metrics_mock
     47     import mock
     48     cloud_trace = mock.Mock()
     49 
     50 
     51 _notify_email_statuses = []
     52 _base_url = None
     53 
     54 _db = None
     55 _drone_manager = None
     56 
     57 RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
     58         'SKYLAB', 'respect_static_labels', type=bool, default=False)
     59 
     60 
     61 def initialize():
     62     global _db
     63     _db = scheduler_lib.ConnectionManager().get_connection()
     64 
     65     notify_statuses_list = global_config.global_config.get_config_value(
     66             scheduler_config.CONFIG_SECTION, "notify_email_statuses",
     67             default='')
     68     global _notify_email_statuses
     69     _notify_email_statuses = [status for status in
     70                               re.split(r'[\s,;:]', notify_statuses_list.lower())
     71                               if status]
     72 
     73     # AUTOTEST_WEB.base_url is still a supported config option as some people
     74     # may wish to override the entire url.
     75     global _base_url
     76     config_base_url = global_config.global_config.get_config_value(
     77             scheduler_config.CONFIG_SECTION, 'base_url', default='')
     78     if config_base_url:
     79         _base_url = config_base_url
     80     else:
     81         _base_url = afe_urls.ROOT_URL
     82 
     83     initialize_globals()
     84 
     85 
     86 def initialize_globals():
     87     global _drone_manager
     88     _drone_manager = drone_manager.instance()
     89 
     90 
     91 def get_job_metadata(job):
     92     """Get a dictionary of the job information.
     93 
     94     The return value is a dictionary that includes job information like id,
     95     name and parent job information. The value will be stored in metadata
     96     database.
     97 
     98     @param job: A Job object.
     99     @return: A dictionary containing the job id, owner and name.
    100     """
    101     if not job:
    102         logging.error('Job is None, no metadata returned.')
    103         return {}
    104     try:
    105         return {'job_id': job.id,
    106                 'owner': job.owner,
    107                 'job_name': job.name,
    108                 'parent_job_id': job.parent_job_id}
    109     except AttributeError as e:
    110         logging.error('Job has missing attribute: %s', e)
    111         return {}
    112 
    113 
    114 class DBError(Exception):
    115     """Raised by the DBObject constructor when its select fails."""
    116 
    117 
    118 class DBObject(object):
    119     """A miniature object relational model for the database."""
    120 
    121     # Subclasses MUST override these:
    122     _table_name = ''
    123     _fields = ()
    124 
    125     # A mapping from (type, id) to the instance of the object for that
    126     # particular id.  This prevents us from creating new Job() and Host()
    127     # instances for every HostQueueEntry object that we instantiate as
    128     # multiple HQEs often share the same Job.
    129     _instances_by_type_and_id = weakref.WeakValueDictionary()
    130     _initialized = False
    131 
    132 
    133     def __new__(cls, id=None, **kwargs):
    134         """
    135         Look to see if we already have an instance for this particular type
    136         and id.  If so, use it instead of creating a duplicate instance.
    137         """
    138         if id is not None:
    139             instance = cls._instances_by_type_and_id.get((cls, id))
    140             if instance:
    141                 return instance
    142         return super(DBObject, cls).__new__(cls, id=id, **kwargs)
    143 
    144 
    145     def __init__(self, id=None, row=None, new_record=False, always_query=True):
    146         assert bool(id) or bool(row)
    147         if id is not None and row is not None:
    148             assert id == row[0]
    149         assert self._table_name, '_table_name must be defined in your class'
    150         assert self._fields, '_fields must be defined in your class'
    151         if not new_record:
    152             if self._initialized and not always_query:
    153                 return  # We've already been initialized.
    154             if id is None:
    155                 id = row[0]
    156             # Tell future constructors to use us instead of re-querying while
    157             # this instance is still around.
    158             self._instances_by_type_and_id[(type(self), id)] = self
    159 
    160         self.__table = self._table_name
    161 
    162         self.__new_record = new_record
    163 
    164         if row is None:
    165             row = self._fetch_row_from_db(id)
    166 
    167         if self._initialized:
    168             differences = self._compare_fields_in_row(row)
    169             if differences:
    170                 logging.warning(
    171                     'initialized %s %s instance requery is updating: %s',
    172                     type(self), self.id, differences)
    173         self._update_fields_from_row(row)
    174         self._initialized = True
    175 
    176 
    177     @classmethod
    178     def _clear_instance_cache(cls):
    179         """Used for testing, clear the internal instance cache."""
    180         cls._instances_by_type_and_id.clear()
    181 
    182 
    183     def _fetch_row_from_db(self, row_id):
    184         fields = ', '.join(self._fields)
    185         sql = 'SELECT %s FROM %s WHERE ID=%%s' % (fields, self.__table)
    186         rows = _db.execute(sql, (row_id,))
    187         if not rows:
    188             raise DBError("row not found (table=%s, row id=%s)"
    189                           % (self.__table, row_id))
    190         return rows[0]
    191 
    192 
    193     def _assert_row_length(self, row):
    194         assert len(row) == len(self._fields), (
    195             "table = %s, row = %s/%d, fields = %s/%d" % (
    196             self.__table, row, len(row), self._fields, len(self._fields)))
    197 
    198 
    199     def _compare_fields_in_row(self, row):
    200         """
    201         Given a row as returned by a SELECT query, compare it to our existing in
    202         memory fields.  Fractional seconds are stripped from datetime values
    203         before comparison.
    204 
    205         @param row - A sequence of values corresponding to fields named in
    206                 The class attribute _fields.
    207 
    208         @returns A dictionary listing the differences keyed by field name
    209                 containing tuples of (current_value, row_value).
    210         """
    211         self._assert_row_length(row)
    212         differences = {}
    213         for field, row_value in itertools.izip(self._fields, row):
    214             current_value = getattr(self, field)
    215             if (isinstance(current_value, datetime.datetime)
    216                 and isinstance(row_value, datetime.datetime)):
    217                 current_value = current_value.strftime(time_utils.TIME_FMT)
    218                 row_value = row_value.strftime(time_utils.TIME_FMT)
    219             if current_value != row_value:
    220                 differences[field] = (current_value, row_value)
    221         return differences
    222 
    223 
    224     def _update_fields_from_row(self, row):
    225         """
    226         Update our field attributes using a single row returned by SELECT.
    227 
    228         @param row - A sequence of values corresponding to fields named in
    229                 the class fields list.
    230         """
    231         self._assert_row_length(row)
    232 
    233         self._valid_fields = set()
    234         for field, value in itertools.izip(self._fields, row):
    235             setattr(self, field, value)
    236             self._valid_fields.add(field)
    237 
    238         self._valid_fields.remove('id')
    239 
    240 
    241     def update_from_database(self):
    242         assert self.id is not None
    243         row = self._fetch_row_from_db(self.id)
    244         self._update_fields_from_row(row)
    245 
    246 
    247     def count(self, where, table = None):
    248         if not table:
    249             table = self.__table
    250 
    251         rows = _db.execute("""
    252                 SELECT count(*) FROM %s
    253                 WHERE %s
    254         """ % (table, where))
    255 
    256         assert len(rows) == 1
    257 
    258         return int(rows[0][0])
    259 
    260 
    261     def update_field(self, field, value):
    262         assert field in self._valid_fields
    263 
    264         if getattr(self, field) == value:
    265             return
    266 
    267         query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
    268         _db.execute(query, (value, self.id))
    269 
    270         setattr(self, field, value)
    271 
    272 
    273     def save(self):
    274         if self.__new_record:
    275             keys = self._fields[1:] # avoid id
    276             columns = ','.join([str(key) for key in keys])
    277             values = []
    278             for key in keys:
    279                 value = getattr(self, key)
    280                 if value is None:
    281                     values.append('NULL')
    282                 else:
    283                     values.append('"%s"' % value)
    284             values_str = ','.join(values)
    285             query = ('INSERT INTO %s (%s) VALUES (%s)' %
    286                      (self.__table, columns, values_str))
    287             _db.execute(query)
    288             # Update our id to the one the database just assigned to us.
    289             self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
    290 
    291 
    292     def delete(self):
    293         self._instances_by_type_and_id.pop((type(self), id), None)
    294         self._initialized = False
    295         self._valid_fields.clear()
    296         query = 'DELETE FROM %s WHERE id=%%s' % self.__table
    297         _db.execute(query, (self.id,))
    298 
    299 
    300     @staticmethod
    301     def _prefix_with(string, prefix):
    302         if string:
    303             string = prefix + string
    304         return string
    305 
    306 
    307     @classmethod
    308     def fetch_rows(cls, where='', params=(), joins='', order_by=''):
    309         """
    310         Fetch the rows based on the given database query.
    311 
    312         @yields the rows fetched by the given query.
    313         """
    314         order_by = cls._prefix_with(order_by, 'ORDER BY ')
    315         where = cls._prefix_with(where, 'WHERE ')
    316         fields = []
    317         for field in cls._fields:
    318             fields.append('%s.%s' % (cls._table_name, field))
    319 
    320         query = ('SELECT %(fields)s FROM %(table)s %(joins)s '
    321                  '%(where)s %(order_by)s' % {'fields' : ', '.join(fields),
    322                                              'table' : cls._table_name,
    323                                              'joins' : joins,
    324                                              'where' : where,
    325                                              'order_by' : order_by})
    326         rows = _db.execute(query, params)
    327         return rows
    328 
    329     @classmethod
    330     def fetch(cls, where='', params=(), joins='', order_by=''):
    331         """
    332         Construct instances of our class based on the given database query.
    333 
    334         @yields One class instance for each row fetched.
    335         """
    336         rows = cls.fetch_rows(where=where, params=params, joins=joins,
    337                               order_by=order_by)
    338         return [cls(id=row[0], row=row) for row in rows]
    339 
    340 
    341 class IneligibleHostQueue(DBObject):
    342     _table_name = 'afe_ineligible_host_queues'
    343     _fields = ('id', 'job_id', 'host_id')
    344 
    345 
    346 class AtomicGroup(DBObject):
    347     _table_name = 'afe_atomic_groups'
    348     _fields = ('id', 'name', 'description', 'max_number_of_machines',
    349                'invalid')
    350 
    351 
    352 class Label(DBObject):
    353     _table_name = 'afe_labels'
    354     _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
    355                'only_if_needed', 'atomic_group_id')
    356 
    357 
    358     def __repr__(self):
    359         return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
    360                 self.name, self.id, self.atomic_group_id)
    361 
    362 
    363 class Host(DBObject):
    364     _table_name = 'afe_hosts'
    365     # TODO(ayatane): synch_id is not used, remove after fixing DB.
    366     _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
    367                'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
    368                'leased', 'shard_id', 'lock_reason')
    369 
    370 
    371     def set_status(self,status):
    372         logging.info('%s -> %s', self.hostname, status)
    373         self.update_field('status',status)
    374 
    375 
    376     def _get_labels_with_platform(self, non_static_rows, static_rows):
    377         """Helper function to fetch labels & platform for a host."""
    378         if not RESPECT_STATIC_LABELS:
    379             return non_static_rows
    380 
    381         combined_rows = []
    382         replaced_labels = _db.execute(
    383                 'SELECT label_id FROM afe_replaced_labels')
    384         replaced_label_ids = {l[0] for l in replaced_labels}
    385 
    386         # We respect afe_labels more, which means:
    387         #   * if non-static labels are replaced, we find its replaced static
    388         #   labels from afe_static_labels by label name.
    389         #   * if non-static labels are not replaced, we keep it.
    390         #   * Drop static labels which don't have reference non-static labels.
    391         static_label_names = []
    392         for label_id, label_name, is_platform in non_static_rows:
    393             if label_id not in replaced_label_ids:
    394                 combined_rows.append((label_id, label_name, is_platform))
    395             else:
    396                 static_label_names.append(label_name)
    397 
    398         # Only keep static labels who have replaced non-static labels.
    399         for label_id, label_name, is_platform in static_rows:
    400             if label_name in static_label_names:
    401                 combined_rows.append((label_id, label_name, is_platform))
    402 
    403         return combined_rows
    404 
    405 
    406     def platform_and_labels(self):
    407         """
    408         Returns a tuple (platform_name, list_of_all_label_names).
    409         """
    410         template = ('SELECT %(label_table)s.id, %(label_table)s.name, '
    411                     '%(label_table)s.platform FROM %(label_table)s INNER '
    412                     'JOIN %(host_label_table)s '
    413                     'ON %(label_table)s.id = %(host_label_table)s.%(column)s '
    414                     'WHERE %(host_label_table)s.host_id = %(host_id)s '
    415                     'ORDER BY %(label_table)s.name')
    416         static_query = template % {
    417                 'host_label_table': 'afe_static_hosts_labels',
    418                 'label_table': 'afe_static_labels',
    419                 'column': 'staticlabel_id',
    420                 'host_id': self.id
    421         }
    422         non_static_query = template % {
    423                 'host_label_table': 'afe_hosts_labels',
    424                 'label_table': 'afe_labels',
    425                 'column': 'label_id',
    426                 'host_id': self.id
    427         }
    428         non_static_rows = _db.execute(non_static_query)
    429         static_rows = _db.execute(static_query)
    430 
    431         rows = self._get_labels_with_platform(non_static_rows, static_rows)
    432         platform = None
    433         all_labels = []
    434         for _, label_name, is_platform in rows:
    435             if is_platform:
    436                 platform = label_name
    437             all_labels.append(label_name)
    438         return platform, all_labels
    439 
    440 
    441     _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
    442 
    443 
    444     @classmethod
    445     def cmp_for_sort(cls, a, b):
    446         """
    447         A comparison function for sorting Host objects by hostname.
    448 
    449         This strips any trailing numeric digits, ignores leading 0s and
    450         compares hostnames by the leading name and the trailing digits as a
    451         number.  If both hostnames do not match this pattern, they are simply
    452         compared as lower case strings.
    453 
    454         Example of how hostnames will be sorted:
    455 
    456           alice, host1, host2, host09, host010, host10, host11, yolkfolk
    457 
    458         This hopefully satisfy most people's hostname sorting needs regardless
    459         of their exact naming schemes.  Nobody sane should have both a host10
    460         and host010 (but the algorithm works regardless).
    461         """
    462         lower_a = a.hostname.lower()
    463         lower_b = b.hostname.lower()
    464         match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
    465         match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
    466         if match_a and match_b:
    467             name_a, number_a_str = match_a.groups()
    468             name_b, number_b_str = match_b.groups()
    469             number_a = int(number_a_str.lstrip('0'))
    470             number_b = int(number_b_str.lstrip('0'))
    471             result = cmp((name_a, number_a), (name_b, number_b))
    472             if result == 0 and lower_a != lower_b:
    473                 # If they compared equal above but the lower case names are
    474                 # indeed different, don't report equality.  abc012 != abc12.
    475                 return cmp(lower_a, lower_b)
    476             return result
    477         else:
    478             return cmp(lower_a, lower_b)
    479 
    480 
    481 class HostQueueEntry(DBObject):
    482     _table_name = 'afe_host_queue_entries'
    483     _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
    484                'active', 'complete', 'deleted', 'execution_subdir',
    485                'atomic_group_id', 'aborted', 'started_on', 'finished_on')
    486 
    487     _COMPLETION_COUNT_METRIC = metrics.Counter(
    488         'chromeos/autotest/scheduler/hqe_completion_count')
    489 
    490     def __init__(self, id=None, row=None, job_row=None, **kwargs):
    491         """
    492         @param id: ID field from afe_host_queue_entries table.
    493                    Either id or row should be specified for initialization.
    494         @param row: The DB row for a particular HostQueueEntry.
    495                     Either id or row should be specified for initialization.
    496         @param job_row: The DB row for the job of this HostQueueEntry.
    497         """
    498         assert id or row
    499         super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
    500         self.job = Job(self.job_id, row=job_row)
    501 
    502         if self.host_id:
    503             self.host = rdb_lib.get_hosts([self.host_id])[0]
    504             self.host.dbg_str = self.get_dbg_str()
    505             self.host.metadata = get_job_metadata(self.job)
    506         else:
    507             self.host = None
    508 
    509 
    510     @classmethod
    511     def clone(cls, template):
    512         """
    513         Creates a new row using the values from a template instance.
    514 
    515         The new instance will not exist in the database or have a valid
    516         id attribute until its save() method is called.
    517         """
    518         assert isinstance(template, cls)
    519         new_row = [getattr(template, field) for field in cls._fields]
    520         clone = cls(row=new_row, new_record=True)
    521         clone.id = None
    522         return clone
    523 
    524 
    525     @classmethod
    526     def fetch(cls, where='', params=(), joins='', order_by=''):
    527         """
    528         Construct instances of our class based on the given database query.
    529 
    530         @yields One class instance for each row fetched.
    531         """
    532         # Override the original fetch method to pre-fetch the jobs from the DB
    533         # in order to prevent each HQE making separate DB queries.
    534         rows = cls.fetch_rows(where=where, params=params, joins=joins,
    535                               order_by=order_by)
    536         if len(rows) <= 1:
    537             return [cls(id=row[0], row=row) for row in rows]
    538 
    539         job_params = ', '.join([str(row[1]) for row in rows])
    540         job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params))
    541         # Create a Job_id to Job_row match dictionary to match the HQE
    542         # to its corresponding job.
    543         job_dict = {job_row[0]: job_row for job_row in job_rows}
    544         return [cls(id=row[0], row=row, job_row=job_dict.get(row[1]))
    545                 for row in rows]
    546 
    547 
    548     def _view_job_url(self):
    549         return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
    550 
    551 
    552     def get_labels(self):
    553         """
    554         Get all labels associated with this host queue entry (either via the
    555         meta_host or as a job dependency label).  The labels yielded are not
    556         guaranteed to be unique.
    557 
    558         @yields Label instances associated with this host_queue_entry.
    559         """
    560         if self.meta_host:
    561             yield Label(id=self.meta_host, always_query=False)
    562         labels = Label.fetch(
    563                 joins="JOIN afe_jobs_dependency_labels AS deps "
    564                       "ON (afe_labels.id = deps.label_id)",
    565                 where="deps.job_id = %d" % self.job.id)
    566         for label in labels:
    567             yield label
    568 
    569 
    570     def set_host(self, host):
    571         if host:
    572             logging.info('Assigning host %s to entry %s', host.hostname, self)
    573             self.update_field('host_id', host.id)
    574             self.block_host(host.id)
    575         else:
    576             logging.info('Releasing host from %s', self)
    577             self.unblock_host(self.host.id)
    578             self.update_field('host_id', None)
    579 
    580         self.host = host
    581 
    582 
    583     def block_host(self, host_id):
    584         logging.info("creating block %s/%s", self.job.id, host_id)
    585         row = [0, self.job.id, host_id]
    586         block = IneligibleHostQueue(row=row, new_record=True)
    587         block.save()
    588 
    589 
    590     def unblock_host(self, host_id):
    591         logging.info("removing block %s/%s", self.job.id, host_id)
    592         blocks = IneligibleHostQueue.fetch(
    593             'job_id=%d and host_id=%d' % (self.job.id, host_id))
    594         for block in blocks:
    595             block.delete()
    596 
    597 
    598     def set_execution_subdir(self, subdir=None):
    599         if subdir is None:
    600             assert self.host
    601             subdir = self.host.hostname
    602         self.update_field('execution_subdir', subdir)
    603 
    604 
    605     def _get_hostname(self):
    606         if self.host:
    607             return self.host.hostname
    608         return 'no host'
    609 
    610 
    611     def get_dbg_str(self):
    612         """Get a debug string to identify this host.
    613 
    614         @return: A string containing the hqe and job id.
    615         """
    616         try:
    617             return 'HQE: %s, for job: %s' % (self.id, self.job_id)
    618         except AttributeError as e:
    619             return 'HQE has not been initialized yet: %s' % e
    620 
    621 
    622     def __str__(self):
    623         flags = []
    624         if self.active:
    625             flags.append('active')
    626         if self.complete:
    627             flags.append('complete')
    628         if self.deleted:
    629             flags.append('deleted')
    630         if self.aborted:
    631             flags.append('aborted')
    632         flags_str = ','.join(flags)
    633         if flags_str:
    634             flags_str = ' [%s]' % flags_str
    635         return ("%s and host: %s has status:%s%s" %
    636                 (self.get_dbg_str(), self._get_hostname(), self.status,
    637                  flags_str))
    638 
    639 
    640     def set_status(self, status):
    641         logging.info("%s -> %s", self, status)
    642 
    643         self.update_field('status', status)
    644 
    645         active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
    646         complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
    647 
    648         self.update_field('active', active)
    649 
    650         # The ordering of these operations is important. Once we set the
    651         # complete bit this job will become indistinguishable from all
    652         # the other complete jobs, unless we first set shard_id to NULL
    653         # to signal to the shard_client that we need to upload it. However,
    654         # we can only set both these after we've updated finished_on etc
    655         # within _on_complete or the job will get synced in an intermediate
    656         # state. This means that if someone sigkills the scheduler between
    657         # setting finished_on and complete, we will have inconsistent jobs.
    658         # This should be fine, because nothing critical checks finished_on,
    659         # and the scheduler should never be killed mid-tick.
    660         if complete:
    661             self._on_complete(status)
    662             self._email_on_job_complete()
    663 
    664         self.update_field('complete', complete)
    665 
    666         should_email_status = (status.lower() in _notify_email_statuses or
    667                                'all' in _notify_email_statuses)
    668         if should_email_status:
    669             self._email_on_status(status)
    670         logging.debug('HQE Set Status Complete')
    671 
    672 
    673     def _on_complete(self, status):
    674         metric_fields = {'status': status.lower()}
    675         if self.host:
    676             metric_fields['board'] = self.host.board or ''
    677             if len(self.host.pools) == 1:
    678                 metric_fields['pool'] = self.host.pools[0]
    679             else:
    680                 metric_fields['pool'] = 'MULTIPLE'
    681         else:
    682             metric_fields['board'] = 'NO_HOST'
    683             metric_fields['pool'] = 'NO_HOST'
    684         self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
    685         if status is not models.HostQueueEntry.Status.ABORTED:
    686             self.job.stop_if_necessary()
    687         if self.started_on:
    688             self.set_finished_on_now()
    689             self._log_trace()
    690         if self.job.shard_id is not None:
    691             # If shard_id is None, the job will be synced back to the master
    692             self.job.update_field('shard_id', None)
    693         if not self.execution_subdir:
    694             return
    695         # unregister any possible pidfiles associated with this queue entry
    696         for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
    697             pidfile_id = _drone_manager.get_pidfile_id_from(
    698                     self.execution_path(), pidfile_name=pidfile_name)
    699             _drone_manager.unregister_pidfile(pidfile_id)
    700 
    701     def _log_trace(self):
    702         """Emits a Cloud Trace span for the HQE's duration."""
    703         if self.started_on and self.finished_on:
    704             span = cloud_trace.Span('HQE', spanId='0',
    705                                     traceId=hqe_trace_id(self.id))
    706             # TODO(phobbs) make a .SetStart() and .SetEnd() helper method
    707             span.startTime = types.Timestamp()
    708             span.startTime.FromDatetime(self.started_on)
    709             span.endTime = types.Timestamp()
    710             span.endTime.FromDatetime(self.finished_on)
    711             # TODO(phobbs) any LogSpan calls need to be wrapped in this for
    712             # safety during tests, so this should be caught within LogSpan.
    713             try:
    714                 cloud_trace.LogSpan(span)
    715             except IOError as e:
    716                 if e.errno == errno.ENOENT:
    717                     logging.warning('Error writing to cloud trace results '
    718                                     'directory: %s', e)
    719 
    720 
    721     def _get_status_email_contents(self, status, summary=None, hostname=None):
    722         """
    723         Gather info for the status notification e-mails.
    724 
    725         If needed, we could start using the Django templating engine to create
    726         the subject and the e-mail body, but that doesn't seem necessary right
    727         now.
    728 
    729         @param status: Job status text. Mandatory.
    730         @param summary: Job summary text. Optional.
    731         @param hostname: A hostname for the job. Optional.
    732 
    733         @return: Tuple (subject, body) for the notification e-mail.
    734         """
    735         job_stats = Job(id=self.job.id).get_execution_details()
    736 
    737         subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
    738                    (self.job.id, self.job.name, status))
    739 
    740         if hostname is not None:
    741             subject += '| Hostname: %s ' % hostname
    742 
    743         if status not in ["1 Failed", "Failed"]:
    744             subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
    745 
    746         body =  "Job ID: %s\n" % self.job.id
    747         body += "Job name: %s\n" % self.job.name
    748         if hostname is not None:
    749             body += "Host: %s\n" % hostname
    750         if summary is not None:
    751             body += "Summary: %s\n" % summary
    752         body += "Status: %s\n" % status
    753         body += "Results interface URL: %s\n" % self._view_job_url()
    754         body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
    755         if int(job_stats['total_executed']) > 0:
    756             body += "User tests executed: %s\n" % job_stats['total_executed']
    757             body += "User tests passed: %s\n" % job_stats['total_passed']
    758             body += "User tests failed: %s\n" % job_stats['total_failed']
    759             body += ("User tests success rate: %.2f %%\n" %
    760                      job_stats['success_rate'])
    761 
    762         if job_stats['failed_rows']:
    763             body += "Failures:\n"
    764             body += job_stats['failed_rows']
    765 
    766         return subject, body
    767 
    768 
    769     def _email_on_status(self, status):
    770         hostname = self._get_hostname()
    771         subject, body = self._get_status_email_contents(status, None, hostname)
    772         email_manager.manager.send_email(self.job.email_list, subject, body)
    773 
    774 
    775     def _email_on_job_complete(self):
    776         if not self.job.is_finished():
    777             return
    778 
    779         summary = []
    780         hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
    781         for queue_entry in hosts_queue:
    782             summary.append("Host: %s Status: %s" %
    783                                 (queue_entry._get_hostname(),
    784                                  queue_entry.status))
    785 
    786         summary = "\n".join(summary)
    787         status_counts = models.Job.objects.get_status_counts(
    788                 [self.job.id])[self.job.id]
    789         status = ', '.join('%d %s' % (count, status) for status, count
    790                     in status_counts.iteritems())
    791 
    792         subject, body = self._get_status_email_contents(status, summary, None)
    793         email_manager.manager.send_email(self.job.email_list, subject, body)
    794 
    795 
    796     def schedule_pre_job_tasks(self):
    797         logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
    798                      self.job.name, self.meta_host, self.atomic_group_id,
    799                      self.job.id, self.id, self.host.hostname, self.status)
    800 
    801         self._do_schedule_pre_job_tasks()
    802 
    803 
    804     def _do_schedule_pre_job_tasks(self):
    805         self.job.schedule_pre_job_tasks(queue_entry=self)
    806 
    807 
    808     def requeue(self):
    809         assert self.host
    810         self.set_status(models.HostQueueEntry.Status.QUEUED)
    811         self.update_field('started_on', None)
    812         self.update_field('finished_on', None)
    813         # verify/cleanup failure sets the execution subdir, so reset it here
    814         self.set_execution_subdir('')
    815         if self.meta_host:
    816             self.set_host(None)
    817 
    818 
    819     @property
    820     def aborted_by(self):
    821         self._load_abort_info()
    822         return self._aborted_by
    823 
    824 
    825     @property
    826     def aborted_on(self):
    827         self._load_abort_info()
    828         return self._aborted_on
    829 
    830 
    831     def _load_abort_info(self):
    832         """ Fetch info about who aborted the job. """
    833         if hasattr(self, "_aborted_by"):
    834             return
    835         rows = _db.execute("""
    836                 SELECT afe_users.login,
    837                         afe_aborted_host_queue_entries.aborted_on
    838                 FROM afe_aborted_host_queue_entries
    839                 INNER JOIN afe_users
    840                 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
    841                 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
    842                 """, (self.id,))
    843         if rows:
    844             self._aborted_by, self._aborted_on = rows[0]
    845         else:
    846             self._aborted_by = self._aborted_on = None
    847 
    848 
    849     def on_pending(self):
    850         """
    851         Called when an entry in a synchronous job has passed verify.  If the
    852         job is ready to run, sets the entries to STARTING. Otherwise, it leaves
    853         them in PENDING.
    854         """
    855         self.set_status(models.HostQueueEntry.Status.PENDING)
    856         if not self.host:
    857             raise scheduler_lib.NoHostIdError(
    858                     'Failed to recover a job whose host_queue_entry_id=%r due'
    859                     ' to no host_id.'
    860                     % self.id)
    861         self.host.set_status(models.Host.Status.PENDING)
    862 
    863         # Some debug code here: sends an email if an asynchronous job does not
    864         # immediately enter Starting.
    865         # TODO: Remove this once we figure out why asynchronous jobs are getting
    866         # stuck in Pending.
    867         self.job.run_if_ready(queue_entry=self)
    868         if (self.job.synch_count == 1 and
    869                 self.status == models.HostQueueEntry.Status.PENDING):
    870             subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
    871             message = 'Asynchronous job stuck in Pending'
    872             email_manager.manager.enqueue_notify_email(subject, message)
    873 
    874 
    875     def abort(self, dispatcher):
    876         assert self.aborted and not self.complete
    877 
    878         Status = models.HostQueueEntry.Status
    879         if self.status in {Status.GATHERING, Status.PARSING}:
    880             # do nothing; post-job tasks will finish and then mark this entry
    881             # with status "Aborted" and take care of the host
    882             return
    883 
    884         if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
    885             # If hqe is in any of these status, it should not have any
    886             # unfinished agent before it can be aborted.
    887             agents = dispatcher.get_agents_for_entry(self)
    888             # Agent with finished task can be left behind. This is added to
    889             # handle the special case of aborting hostless job in STARTING
    890             # status, in which the agent has only a HostlessQueueTask
    891             # associated. The finished HostlessQueueTask will be cleaned up in
    892             # the next tick, so it's safe to leave the agent there. Without
    893             # filtering out finished agent, HQE abort won't be able to proceed.
    894             assert all([agent.is_done() for agent in agents])
    895             # If hqe is still in STARTING status, it may not have assigned a
    896             # host yet.
    897             if self.host:
    898                 self.host.set_status(models.Host.Status.READY)
    899         elif (self.status == Status.VERIFYING or
    900               self.status == Status.RESETTING):
    901             models.SpecialTask.objects.create(
    902                     task=models.SpecialTask.Task.CLEANUP,
    903                     host=models.Host.objects.get(id=self.host.id),
    904                     requested_by=self.job.owner_model())
    905         elif self.status == Status.PROVISIONING:
    906             models.SpecialTask.objects.create(
    907                     task=models.SpecialTask.Task.REPAIR,
    908                     host=models.Host.objects.get(id=self.host.id),
    909                     requested_by=self.job.owner_model())
    910 
    911         self.set_status(Status.ABORTED)
    912 
    913 
    914     def execution_tag(self):
    915         SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
    916                                'complete!=1 AND execution_subdir="" AND '
    917                                'status!="Queued";')
    918         SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
    919                                  'status="Aborted" WHERE id=%s;')
    920         try:
    921             assert self.execution_subdir
    922         except AssertionError:
    923             # TODO(scottz): Remove temporary fix/info gathering pathway for
    924             # crosbug.com/31595 once issue is root caused.
    925             logging.error('No execution_subdir for host queue id:%s.', self.id)
    926             logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
    927             for row in _db.execute(SQL_SUSPECT_ENTRIES):
    928                 logging.error(row)
    929             logging.error('====DB DEBUG====\n')
    930             fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
    931             logging.error('EXECUTING: %s', fix_query)
    932             _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
    933             raise AssertionError(('self.execution_subdir not found. '
    934                                   'See log for details.'))
    935 
    936         return "%s/%s" % (self.job.tag(), self.execution_subdir)
    937 
    938 
    939     def execution_path(self):
    940         return self.execution_tag()
    941 
    942 
    943     def set_started_on_now(self):
    944         self.update_field('started_on', datetime.datetime.now())
    945 
    946 
    947     def set_finished_on_now(self):
    948         self.update_field('finished_on', datetime.datetime.now())
    949 
    950 
    951     def is_hostless(self):
    952         return (self.host_id is None
    953                 and self.meta_host is None)
    954 
    955 
    956 def hqe_trace_id(hqe_id):
    957     """Constructs the canonical trace id based on the HQE's id.
    958 
    959     Encodes 'HQE' in base16 and concatenates with the hex representation
    960     of the HQE's id.
    961 
    962     @param hqe_id: The HostQueueEntry's id.
    963 
    964     Returns:
    965         A trace id (in hex format)
    966     """
    967     return base64.b16encode('HQE') + hex(hqe_id)[2:]
    968 
    969 
    970 class Job(DBObject):
    971     _table_name = 'afe_jobs'
    972     _fields = ('id', 'owner', 'name', 'priority', 'control_file',
    973                'control_type', 'created_on', 'synch_count', 'timeout',
    974                'run_verify', 'email_list', 'reboot_before', 'reboot_after',
    975                'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
    976                'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
    977                'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
    978                'require_ssp')
    979 
    980     # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
    981     # all status='Pending' atomic group HQEs incase a delay was running when the
    982     # scheduler was restarted and no more hosts ever successfully exit Verify.
    983 
    984     def __init__(self, id=None, row=None, **kwargs):
    985         assert id or row
    986         super(Job, self).__init__(id=id, row=row, **kwargs)
    987         self._owner_model = None # caches model instance of owner
    988         self.update_image_path = None # path of OS image to install
    989 
    990 
    991     def model(self):
    992         return models.Job.objects.get(id=self.id)
    993 
    994 
    995     def owner_model(self):
    996         # work around the fact that the Job owner field is a string, not a
    997         # foreign key
    998         if not self._owner_model:
    999             self._owner_model = models.User.objects.get(login=self.owner)
   1000         return self._owner_model
   1001 
   1002 
   1003     def tag(self):
   1004         return "%s-%s" % (self.id, self.owner)
   1005 
   1006 
   1007     def get_execution_details(self):
   1008         """
   1009         Get test execution details for this job.
   1010 
   1011         @return: Dictionary with test execution details
   1012         """
   1013         def _find_test_jobs(rows):
   1014             """
   1015             Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
   1016             Those are autotest 'internal job' tests, so they should not be
   1017             counted when evaluating the test stats.
   1018 
   1019             @param rows: List of rows (matrix) with database results.
   1020             """
   1021             job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
   1022             n_test_jobs = 0
   1023             for r in rows:
   1024                 test_name = r[0]
   1025                 if job_test_pattern.match(test_name):
   1026                     n_test_jobs += 1
   1027 
   1028             return n_test_jobs
   1029 
   1030         stats = {}
   1031 
   1032         rows = _db.execute("""
   1033                 SELECT t.test, s.word, t.reason
   1034                 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
   1035                 WHERE t.job_idx = j.job_idx
   1036                 AND s.status_idx = t.status
   1037                 AND j.afe_job_id = %s
   1038                 ORDER BY t.reason
   1039                 """ % self.id)
   1040 
   1041         failed_rows = [r for r in rows if not r[1] == 'GOOD']
   1042 
   1043         n_test_jobs = _find_test_jobs(rows)
   1044         n_test_jobs_failed = _find_test_jobs(failed_rows)
   1045 
   1046         total_executed = len(rows) - n_test_jobs
   1047         total_failed = len(failed_rows) - n_test_jobs_failed
   1048 
   1049         if total_executed > 0:
   1050             success_rate = 100 - ((total_failed / float(total_executed)) * 100)
   1051         else:
   1052             success_rate = 0
   1053 
   1054         stats['total_executed'] = total_executed
   1055         stats['total_failed'] = total_failed
   1056         stats['total_passed'] = total_executed - total_failed
   1057         stats['success_rate'] = success_rate
   1058 
   1059         status_header = ("Test Name", "Status", "Reason")
   1060         if failed_rows:
   1061             stats['failed_rows'] = utils.matrix_to_string(failed_rows,
   1062                                                           status_header)
   1063         else:
   1064             stats['failed_rows'] = ''
   1065 
   1066         time_row = _db.execute("""
   1067                    SELECT started_time, finished_time
   1068                    FROM tko_jobs
   1069                    WHERE afe_job_id = %s
   1070                    """ % self.id)
   1071 
   1072         if time_row:
   1073             t_begin, t_end = time_row[0]
   1074             try:
   1075                 delta = t_end - t_begin
   1076                 minutes, seconds = divmod(delta.seconds, 60)
   1077                 hours, minutes = divmod(minutes, 60)
   1078                 stats['execution_time'] = ("%02d:%02d:%02d" %
   1079                                            (hours, minutes, seconds))
   1080             # One of t_end or t_begin are None
   1081             except TypeError:
   1082                 stats['execution_time'] = '(could not determine)'
   1083         else:
   1084             stats['execution_time'] = '(none)'
   1085 
   1086         return stats
   1087 
   1088 
   1089     def keyval_dict(self):
   1090         return self.model().keyval_dict()
   1091 
   1092 
   1093     def _pending_count(self):
   1094         """The number of HostQueueEntries for this job in the Pending state."""
   1095         pending_entries = models.HostQueueEntry.objects.filter(
   1096                 job=self.id, status=models.HostQueueEntry.Status.PENDING)
   1097         return pending_entries.count()
   1098 
   1099 
   1100     def is_ready(self):
   1101         pending_count = self._pending_count()
   1102         ready = (pending_count >= self.synch_count)
   1103 
   1104         if not ready:
   1105             logging.info(
   1106                     'Job %s not ready: %s pending, %s required ',
   1107                     self, pending_count, self.synch_count)
   1108 
   1109         return ready
   1110 
   1111 
   1112     def num_machines(self, clause = None):
   1113         sql = "job_id=%s" % self.id
   1114         if clause:
   1115             sql += " AND (%s)" % clause
   1116         return self.count(sql, table='afe_host_queue_entries')
   1117 
   1118 
   1119     def num_queued(self):
   1120         return self.num_machines('not complete')
   1121 
   1122 
   1123     def num_active(self):
   1124         return self.num_machines('active')
   1125 
   1126 
   1127     def num_complete(self):
   1128         return self.num_machines('complete')
   1129 
   1130 
   1131     def is_finished(self):
   1132         return self.num_complete() == self.num_machines()
   1133 
   1134 
   1135     def _not_yet_run_entries(self, include_active=True):
   1136         if include_active:
   1137           statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
   1138         else:
   1139           statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
   1140         return models.HostQueueEntry.objects.filter(job=self.id,
   1141                                                     status__in=statuses)
   1142 
   1143 
   1144     def _stop_all_entries(self):
   1145         """Stops the job's inactive pre-job HQEs."""
   1146         entries_to_stop = self._not_yet_run_entries(
   1147             include_active=False)
   1148         for child_entry in entries_to_stop:
   1149             assert not child_entry.complete, (
   1150                 '%s status=%s, active=%s, complete=%s' %
   1151                 (child_entry.id, child_entry.status, child_entry.active,
   1152                  child_entry.complete))
   1153             if child_entry.status == models.HostQueueEntry.Status.PENDING:
   1154                 child_entry.host.status = models.Host.Status.READY
   1155                 child_entry.host.save()
   1156             child_entry.status = models.HostQueueEntry.Status.STOPPED
   1157             child_entry.save()
   1158 
   1159 
   1160     def stop_if_necessary(self):
   1161         not_yet_run = self._not_yet_run_entries()
   1162         if not_yet_run.count() < self.synch_count:
   1163             self._stop_all_entries()
   1164 
   1165 
   1166     def _next_group_name(self):
   1167         """@returns a directory name to use for the next host group results."""
   1168         group_name = ''
   1169         group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
   1170         query = models.HostQueueEntry.objects.filter(
   1171             job=self.id).values('execution_subdir').distinct()
   1172         subdirs = (entry['execution_subdir'] for entry in query)
   1173         group_matches = (group_count_re.match(subdir) for subdir in subdirs)
   1174         ids = [int(match.group(1)) for match in group_matches if match]
   1175         if ids:
   1176             next_id = max(ids) + 1
   1177         else:
   1178             next_id = 0
   1179         return '%sgroup%d' % (group_name, next_id)
   1180 
   1181 
   1182     def get_group_entries(self, queue_entry_from_group):
   1183         """
   1184         @param queue_entry_from_group: A HostQueueEntry instance to find other
   1185                 group entries on this job for.
   1186 
   1187         @returns A list of HostQueueEntry objects all executing this job as
   1188                 part of the same group as the one supplied (having the same
   1189                 execution_subdir).
   1190         """
   1191         execution_subdir = queue_entry_from_group.execution_subdir
   1192         return list(HostQueueEntry.fetch(
   1193             where='job_id=%s AND execution_subdir=%s',
   1194             params=(self.id, execution_subdir)))
   1195 
   1196 
   1197     def _should_run_cleanup(self, queue_entry):
   1198         if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
   1199             return True
   1200         elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
   1201             return queue_entry.host.dirty
   1202         return False
   1203 
   1204 
   1205     def _should_run_verify(self, queue_entry):
   1206         do_not_verify = (queue_entry.host.protection ==
   1207                          host_protections.Protection.DO_NOT_VERIFY)
   1208         if do_not_verify:
   1209             return False
   1210         # If RebootBefore is set to NEVER, then we won't run reset because
   1211         # we can't cleanup, so we need to weaken a Reset into a Verify.
   1212         weaker_reset = (self.run_reset and
   1213                 self.reboot_before == model_attributes.RebootBefore.NEVER)
   1214         return self.run_verify or weaker_reset
   1215 
   1216 
   1217     def _should_run_reset(self, queue_entry):
   1218         can_verify = (queue_entry.host.protection !=
   1219                          host_protections.Protection.DO_NOT_VERIFY)
   1220         can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
   1221         return (can_reboot and can_verify and (self.run_reset or
   1222                 (self._should_run_cleanup(queue_entry) and
   1223                  self._should_run_verify(queue_entry))))
   1224 
   1225 
   1226     def _should_run_provision(self, queue_entry):
   1227         """
   1228         Determine if the queue_entry needs to have a provision task run before
   1229         it to provision queue_entry.host.
   1230 
   1231         @param queue_entry: The host queue entry in question.
   1232         @returns: True if we should schedule a provision task, False otherwise.
   1233 
   1234         """
   1235         # If we get to this point, it means that the scheduler has already
   1236         # vetted that all the unprovisionable labels match, so we can just
   1237         # find all labels on the job that aren't on the host to get the list
   1238         # of what we need to provision.  (See the scheduling logic in
   1239         # host_scheduler.py:is_host_eligable_for_job() where we discard all
   1240         # actionable labels when assigning jobs to hosts.)
   1241         job_labels = {x.name for x in queue_entry.get_labels()}
   1242         # Skip provision if `skip_provision` is listed in the job labels.
   1243         if provision.SKIP_PROVISION in job_labels:
   1244             return False
   1245         _, host_labels = queue_entry.host.platform_and_labels()
   1246         # If there are any labels on the job that are not on the host and they
   1247         # are labels that provisioning knows how to change, then that means
   1248         # there is provisioning work to do.  If there's no provisioning work to
   1249         # do, then obviously we have no reason to schedule a provision task!
   1250         diff = job_labels - set(host_labels)
   1251         if any([provision.Provision.acts_on(x) for x in diff]):
   1252             return True
   1253         return False
   1254 
   1255 
   1256     def _queue_special_task(self, queue_entry, task):
   1257         """
   1258         Create a special task and associate it with a host queue entry.
   1259 
   1260         @param queue_entry: The queue entry this special task should be
   1261                             associated with.
   1262         @param task: One of the members of the enum models.SpecialTask.Task.
   1263         @returns: None
   1264 
   1265         """
   1266         models.SpecialTask.objects.create(
   1267                 host=models.Host.objects.get(id=queue_entry.host_id),
   1268                 queue_entry=queue_entry, task=task)
   1269 
   1270 
   1271     def schedule_pre_job_tasks(self, queue_entry):
   1272         """
   1273         Queue all of the special tasks that need to be run before a host
   1274         queue entry may run.
   1275 
   1276         If no special taskes need to be scheduled, then |on_pending| will be
   1277         called directly.
   1278 
   1279         @returns None
   1280 
   1281         """
   1282         task_queued = False
   1283         hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
   1284 
   1285         if self._should_run_provision(queue_entry):
   1286             self._queue_special_task(hqe_model,
   1287                                      models.SpecialTask.Task.PROVISION)
   1288             task_queued = True
   1289         elif self._should_run_reset(queue_entry):
   1290             self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
   1291             task_queued = True
   1292         else:
   1293             if self._should_run_cleanup(queue_entry):
   1294                 self._queue_special_task(hqe_model,
   1295                                          models.SpecialTask.Task.CLEANUP)
   1296                 task_queued = True
   1297             if self._should_run_verify(queue_entry):
   1298                 self._queue_special_task(hqe_model,
   1299                                          models.SpecialTask.Task.VERIFY)
   1300                 task_queued = True
   1301 
   1302         if not task_queued:
   1303             queue_entry.on_pending()
   1304 
   1305 
   1306     def _assign_new_group(self, queue_entries):
   1307         if len(queue_entries) == 1:
   1308             group_subdir_name = queue_entries[0].host.hostname
   1309         else:
   1310             group_subdir_name = self._next_group_name()
   1311             logging.info('Running synchronous job %d hosts %s as %s',
   1312                 self.id, [entry.host.hostname for entry in queue_entries],
   1313                 group_subdir_name)
   1314 
   1315         for queue_entry in queue_entries:
   1316             queue_entry.set_execution_subdir(group_subdir_name)
   1317 
   1318 
   1319     def _choose_group_to_run(self, include_queue_entry):
   1320         """
   1321         @returns A tuple containing a list of HostQueueEntry instances to be
   1322                 used to run this Job, a string group name to suggest giving
   1323                 to this job in the results database.
   1324         """
   1325         chosen_entries = [include_queue_entry]
   1326         num_entries_wanted = self.synch_count
   1327         num_entries_wanted -= len(chosen_entries)
   1328 
   1329         if num_entries_wanted > 0:
   1330             where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
   1331             pending_entries = list(HostQueueEntry.fetch(
   1332                      where=where_clause,
   1333                      params=(self.id, include_queue_entry.id)))
   1334 
   1335             # Sort the chosen hosts by hostname before slicing.
   1336             def cmp_queue_entries_by_hostname(entry_a, entry_b):
   1337                 return Host.cmp_for_sort(entry_a.host, entry_b.host)
   1338             pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
   1339             chosen_entries += pending_entries[:num_entries_wanted]
   1340 
   1341         # Sanity check.  We'll only ever be called if this can be met.
   1342         if len(chosen_entries) < self.synch_count:
   1343             message = ('job %s got less than %s chosen entries: %s' % (
   1344                     self.id, self.synch_count, chosen_entries))
   1345             logging.error(message)
   1346             email_manager.manager.enqueue_notify_email(
   1347                     'Job not started, too few chosen entries', message)
   1348             return []
   1349 
   1350         self._assign_new_group(chosen_entries)
   1351         return chosen_entries
   1352 
   1353 
   1354     def run_if_ready(self, queue_entry):
   1355         """
   1356         Run this job by kicking its HQEs into status='Starting' if enough
   1357         hosts are ready for it to run.
   1358 
   1359         Cleans up by kicking HQEs into status='Stopped' if this Job is not
   1360         ready to run.
   1361         """
   1362         if not self.is_ready():
   1363             self.stop_if_necessary()
   1364         else:
   1365             self.run(queue_entry)
   1366 
   1367 
   1368     def request_abort(self):
   1369         """Request that this Job be aborted on the next scheduler cycle."""
   1370         self.model().abort()
   1371 
   1372 
   1373     def run(self, queue_entry):
   1374         """
   1375         @param queue_entry: The HostQueueEntry instance calling this method.
   1376         """
   1377         queue_entries = self._choose_group_to_run(queue_entry)
   1378         if queue_entries:
   1379             self._finish_run(queue_entries)
   1380 
   1381 
   1382     def _finish_run(self, queue_entries):
   1383         for queue_entry in queue_entries:
   1384             queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
   1385 
   1386 
   1387     def __str__(self):
   1388         return '%s-%s' % (self.id, self.owner)
   1389