Home | History | Annotate | Download | only in scheduler
      1 # pylint: disable-msg=C0111
      2 
      3 """Database model classes for the scheduler.
      4 
      5 Contains model classes abstracting the various DB tables used by the scheduler.
      6 These overlap the Django models in basic functionality, but were written before
      7 the Django models existed and have not yet been phased out.  Some of them
      8 (particularly HostQueueEntry and Job) have considerable scheduler-specific logic
      9 which would probably be ill-suited for inclusion in the general Django model
     10 classes.
     11 
     12 Globals:
     13 _notify_email_statuses: list of HQE statuses.  each time a single HQE reaches
     14         one of these statuses, an email will be sent to the job's email_list.
     15         comes from global_config.
     16 _base_url: URL to the local AFE server, used to construct URLs for emails.
     17 _db: DatabaseConnection for this module.
     18 _drone_manager: reference to global DroneManager instance.
     19 """
     20 
     21 import datetime, itertools, logging, os, re, sys, time, weakref
     22 from autotest_lib.client.common_lib import control_data
     23 from autotest_lib.client.common_lib import global_config, host_protections
     24 from autotest_lib.client.common_lib import time_utils
     25 from autotest_lib.client.common_lib import utils
     26 from autotest_lib.client.common_lib.cros.graphite import autotest_es
     27 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     28 from autotest_lib.frontend.afe import models, model_attributes
     29 from autotest_lib.scheduler import drone_manager, email_manager
     30 from autotest_lib.scheduler import rdb_lib
     31 from autotest_lib.scheduler import scheduler_config
     32 from autotest_lib.scheduler import scheduler_lib
     33 from autotest_lib.server.cros import provision
     34 
     35 
     36 _notify_email_statuses = []
     37 _base_url = None
     38 
     39 _db = None
     40 _drone_manager = None
     41 
     42 def initialize():
     43     global _db
     44     _db = scheduler_lib.ConnectionManager().get_connection()
     45 
     46     notify_statuses_list = global_config.global_config.get_config_value(
     47             scheduler_config.CONFIG_SECTION, "notify_email_statuses",
     48             default='')
     49     global _notify_email_statuses
     50     _notify_email_statuses = [status for status in
     51                               re.split(r'[\s,;:]', notify_statuses_list.lower())
     52                               if status]
     53 
     54     # AUTOTEST_WEB.base_url is still a supported config option as some people
     55     # may wish to override the entire url.
     56     global _base_url
     57     config_base_url = global_config.global_config.get_config_value(
     58             scheduler_config.CONFIG_SECTION, 'base_url', default='')
     59     if config_base_url:
     60         _base_url = config_base_url
     61     else:
     62         # For the common case of everything running on a single server you
     63         # can just set the hostname in a single place in the config file.
     64         server_name = global_config.global_config.get_config_value(
     65                 'SERVER', 'hostname')
     66         if not server_name:
     67             logging.critical('[SERVER] hostname missing from the config file.')
     68             sys.exit(1)
     69         _base_url = 'http://%s/afe/' % server_name
     70 
     71     initialize_globals()
     72 
     73 
     74 def initialize_globals():
     75     global _drone_manager
     76     _drone_manager = drone_manager.instance()
     77 
     78 
     79 def get_job_metadata(job):
     80     """Get a dictionary of the job information.
     81 
     82     The return value is a dictionary that includes job information like id,
     83     name and parent job information. The value will be stored in metadata
     84     database.
     85 
     86     @param job: A Job object.
     87     @return: A dictionary containing the job id, owner and name.
     88     """
     89     if not job:
     90         logging.error('Job is None, no metadata returned.')
     91         return {}
     92     try:
     93         return {'job_id': job.id,
     94                 'owner': job.owner,
     95                 'job_name': job.name,
     96                 'parent_job_id': job.parent_job_id}
     97     except AttributeError as e:
     98         logging.error('Job has missing attribute: %s', e)
     99         return {}
    100 
    101 
    102 class DelayedCallTask(object):
    103     """
    104     A task object like AgentTask for an Agent to run that waits for the
    105     specified amount of time to have elapsed before calling the supplied
    106     callback once and finishing.  If the callback returns anything, it is
    107     assumed to be a new Agent instance and will be added to the dispatcher.
    108 
    109     @attribute end_time: The absolute posix time after which this task will
    110             call its callback when it is polled and be finished.
    111 
    112     Also has all attributes required by the Agent class.
    113     """
    114     def __init__(self, delay_seconds, callback, now_func=None):
    115         """
    116         @param delay_seconds: The delay in seconds from now that this task
    117                 will call the supplied callback and be done.
    118         @param callback: A callable to be called by this task once after at
    119                 least delay_seconds time has elapsed.  It must return None
    120                 or a new Agent instance.
    121         @param now_func: A time.time like function.  Default: time.time.
    122                 Used for testing.
    123         """
    124         assert delay_seconds > 0
    125         assert callable(callback)
    126         if not now_func:
    127             now_func = time.time
    128         self._now_func = now_func
    129         self._callback = callback
    130 
    131         self.end_time = self._now_func() + delay_seconds
    132 
    133         # These attributes are required by Agent.
    134         self.aborted = False
    135         self.host_ids = ()
    136         self.success = False
    137         self.queue_entry_ids = ()
    138         self.num_processes = 0
    139 
    140 
    141     def poll(self):
    142         if not self.is_done() and self._now_func() >= self.end_time:
    143             self._callback()
    144             self.success = True
    145 
    146 
    147     def is_done(self):
    148         return self.success or self.aborted
    149 
    150 
    151     def abort(self):
    152         self.aborted = True
    153 
    154 
    155 class DBError(Exception):
    156     """Raised by the DBObject constructor when its select fails."""
    157 
    158 
    159 class DBObject(object):
    160     """A miniature object relational model for the database."""
    161 
    162     # Subclasses MUST override these:
    163     _table_name = ''
    164     _fields = ()
    165 
    166     # A mapping from (type, id) to the instance of the object for that
    167     # particular id.  This prevents us from creating new Job() and Host()
    168     # instances for every HostQueueEntry object that we instantiate as
    169     # multiple HQEs often share the same Job.
    170     _instances_by_type_and_id = weakref.WeakValueDictionary()
    171     _initialized = False
    172 
    173 
    174     def __new__(cls, id=None, **kwargs):
    175         """
    176         Look to see if we already have an instance for this particular type
    177         and id.  If so, use it instead of creating a duplicate instance.
    178         """
    179         if id is not None:
    180             instance = cls._instances_by_type_and_id.get((cls, id))
    181             if instance:
    182                 return instance
    183         return super(DBObject, cls).__new__(cls, id=id, **kwargs)
    184 
    185 
    186     def __init__(self, id=None, row=None, new_record=False, always_query=True):
    187         assert bool(id) or bool(row)
    188         if id is not None and row is not None:
    189             assert id == row[0]
    190         assert self._table_name, '_table_name must be defined in your class'
    191         assert self._fields, '_fields must be defined in your class'
    192         if not new_record:
    193             if self._initialized and not always_query:
    194                 return  # We've already been initialized.
    195             if id is None:
    196                 id = row[0]
    197             # Tell future constructors to use us instead of re-querying while
    198             # this instance is still around.
    199             self._instances_by_type_and_id[(type(self), id)] = self
    200 
    201         self.__table = self._table_name
    202 
    203         self.__new_record = new_record
    204 
    205         if row is None:
    206             row = self._fetch_row_from_db(id)
    207 
    208         if self._initialized:
    209             differences = self._compare_fields_in_row(row)
    210             if differences:
    211                 logging.warning(
    212                     'initialized %s %s instance requery is updating: %s',
    213                     type(self), self.id, differences)
    214         self._update_fields_from_row(row)
    215         self._initialized = True
    216 
    217 
    218     @classmethod
    219     def _clear_instance_cache(cls):
    220         """Used for testing, clear the internal instance cache."""
    221         cls._instances_by_type_and_id.clear()
    222 
    223 
    224     def _fetch_row_from_db(self, row_id):
    225         sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
    226         rows = _db.execute(sql, (row_id,))
    227         if not rows:
    228             raise DBError("row not found (table=%s, row id=%s)"
    229                           % (self.__table, row_id))
    230         return rows[0]
    231 
    232 
    233     def _assert_row_length(self, row):
    234         assert len(row) == len(self._fields), (
    235             "table = %s, row = %s/%d, fields = %s/%d" % (
    236             self.__table, row, len(row), self._fields, len(self._fields)))
    237 
    238 
    239     def _compare_fields_in_row(self, row):
    240         """
    241         Given a row as returned by a SELECT query, compare it to our existing in
    242         memory fields.  Fractional seconds are stripped from datetime values
    243         before comparison.
    244 
    245         @param row - A sequence of values corresponding to fields named in
    246                 The class attribute _fields.
    247 
    248         @returns A dictionary listing the differences keyed by field name
    249                 containing tuples of (current_value, row_value).
    250         """
    251         self._assert_row_length(row)
    252         differences = {}
    253         for field, row_value in itertools.izip(self._fields, row):
    254             current_value = getattr(self, field)
    255             if (isinstance(current_value, datetime.datetime)
    256                 and isinstance(row_value, datetime.datetime)):
    257                 current_value = current_value.strftime(time_utils.TIME_FMT)
    258                 row_value = row_value.strftime(time_utils.TIME_FMT)
    259             if current_value != row_value:
    260                 differences[field] = (current_value, row_value)
    261         return differences
    262 
    263 
    264     def _update_fields_from_row(self, row):
    265         """
    266         Update our field attributes using a single row returned by SELECT.
    267 
    268         @param row - A sequence of values corresponding to fields named in
    269                 the class fields list.
    270         """
    271         self._assert_row_length(row)
    272 
    273         self._valid_fields = set()
    274         for field, value in itertools.izip(self._fields, row):
    275             setattr(self, field, value)
    276             self._valid_fields.add(field)
    277 
    278         self._valid_fields.remove('id')
    279 
    280 
    281     def update_from_database(self):
    282         assert self.id is not None
    283         row = self._fetch_row_from_db(self.id)
    284         self._update_fields_from_row(row)
    285 
    286 
    287     def count(self, where, table = None):
    288         if not table:
    289             table = self.__table
    290 
    291         rows = _db.execute("""
    292                 SELECT count(*) FROM %s
    293                 WHERE %s
    294         """ % (table, where))
    295 
    296         assert len(rows) == 1
    297 
    298         return int(rows[0][0])
    299 
    300 
    301     def update_field(self, field, value):
    302         assert field in self._valid_fields
    303 
    304         if getattr(self, field) == value:
    305             return
    306 
    307         query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
    308         _db.execute(query, (value, self.id))
    309 
    310         setattr(self, field, value)
    311 
    312 
    313     def save(self):
    314         if self.__new_record:
    315             keys = self._fields[1:] # avoid id
    316             columns = ','.join([str(key) for key in keys])
    317             values = []
    318             for key in keys:
    319                 value = getattr(self, key)
    320                 if value is None:
    321                     values.append('NULL')
    322                 else:
    323                     values.append('"%s"' % value)
    324             values_str = ','.join(values)
    325             query = ('INSERT INTO %s (%s) VALUES (%s)' %
    326                      (self.__table, columns, values_str))
    327             _db.execute(query)
    328             # Update our id to the one the database just assigned to us.
    329             self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
    330 
    331 
    332     def delete(self):
    333         self._instances_by_type_and_id.pop((type(self), id), None)
    334         self._initialized = False
    335         self._valid_fields.clear()
    336         query = 'DELETE FROM %s WHERE id=%%s' % self.__table
    337         _db.execute(query, (self.id,))
    338 
    339 
    340     @staticmethod
    341     def _prefix_with(string, prefix):
    342         if string:
    343             string = prefix + string
    344         return string
    345 
    346 
    347     @classmethod
    348     def fetch(cls, where='', params=(), joins='', order_by=''):
    349         """
    350         Construct instances of our class based on the given database query.
    351 
    352         @yields One class instance for each row fetched.
    353         """
    354         order_by = cls._prefix_with(order_by, 'ORDER BY ')
    355         where = cls._prefix_with(where, 'WHERE ')
    356         query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
    357                  '%(where)s %(order_by)s' % {'table' : cls._table_name,
    358                                              'joins' : joins,
    359                                              'where' : where,
    360                                              'order_by' : order_by})
    361         rows = _db.execute(query, params)
    362         return [cls(id=row[0], row=row) for row in rows]
    363 
    364 
    365 class IneligibleHostQueue(DBObject):
    366     _table_name = 'afe_ineligible_host_queues'
    367     _fields = ('id', 'job_id', 'host_id')
    368 
    369 
    370 class AtomicGroup(DBObject):
    371     _table_name = 'afe_atomic_groups'
    372     _fields = ('id', 'name', 'description', 'max_number_of_machines',
    373                'invalid')
    374 
    375 
    376 class Label(DBObject):
    377     _table_name = 'afe_labels'
    378     _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
    379                'only_if_needed', 'atomic_group_id')
    380 
    381 
    382     def __repr__(self):
    383         return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
    384                 self.name, self.id, self.atomic_group_id)
    385 
    386 
    387 class Host(DBObject):
    388     _table_name = 'afe_hosts'
    389     _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
    390                'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
    391                'leased', 'shard_id', 'lock_reason')
    392     _timer = autotest_stats.Timer("scheduler_models.Host")
    393 
    394 
    395     @_timer.decorate
    396     def set_status(self,status):
    397         logging.info('%s -> %s', self.hostname, status)
    398         self.update_field('status',status)
    399         # Noticed some time jumps after the last log message.
    400         logging.debug('Host Set Status Complete')
    401 
    402 
    403     def platform_and_labels(self):
    404         """
    405         Returns a tuple (platform_name, list_of_all_label_names).
    406         """
    407         rows = _db.execute("""
    408                 SELECT afe_labels.name, afe_labels.platform
    409                 FROM afe_labels
    410                 INNER JOIN afe_hosts_labels ON
    411                         afe_labels.id = afe_hosts_labels.label_id
    412                 WHERE afe_hosts_labels.host_id = %s
    413                 ORDER BY afe_labels.name
    414                 """, (self.id,))
    415         platform = None
    416         all_labels = []
    417         for label_name, is_platform in rows:
    418             if is_platform:
    419                 platform = label_name
    420             all_labels.append(label_name)
    421         return platform, all_labels
    422 
    423 
    424     _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
    425 
    426 
    427     @classmethod
    428     def cmp_for_sort(cls, a, b):
    429         """
    430         A comparison function for sorting Host objects by hostname.
    431 
    432         This strips any trailing numeric digits, ignores leading 0s and
    433         compares hostnames by the leading name and the trailing digits as a
    434         number.  If both hostnames do not match this pattern, they are simply
    435         compared as lower case strings.
    436 
    437         Example of how hostnames will be sorted:
    438 
    439           alice, host1, host2, host09, host010, host10, host11, yolkfolk
    440 
    441         This hopefully satisfy most people's hostname sorting needs regardless
    442         of their exact naming schemes.  Nobody sane should have both a host10
    443         and host010 (but the algorithm works regardless).
    444         """
    445         lower_a = a.hostname.lower()
    446         lower_b = b.hostname.lower()
    447         match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
    448         match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
    449         if match_a and match_b:
    450             name_a, number_a_str = match_a.groups()
    451             name_b, number_b_str = match_b.groups()
    452             number_a = int(number_a_str.lstrip('0'))
    453             number_b = int(number_b_str.lstrip('0'))
    454             result = cmp((name_a, number_a), (name_b, number_b))
    455             if result == 0 and lower_a != lower_b:
    456                 # If they compared equal above but the lower case names are
    457                 # indeed different, don't report equality.  abc012 != abc12.
    458                 return cmp(lower_a, lower_b)
    459             return result
    460         else:
    461             return cmp(lower_a, lower_b)
    462 
    463 
    464 class HostQueueEntry(DBObject):
    465     _table_name = 'afe_host_queue_entries'
    466     _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
    467                'active', 'complete', 'deleted', 'execution_subdir',
    468                'atomic_group_id', 'aborted', 'started_on', 'finished_on')
    469     _timer = autotest_stats.Timer('scheduler_models.HostQueueEntry')
    470 
    471 
    472     def __init__(self, id=None, row=None, **kwargs):
    473         assert id or row
    474         super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
    475         self.job = Job(self.job_id)
    476 
    477         if self.host_id:
    478             self.host = rdb_lib.get_hosts([self.host_id])[0]
    479             self.host.dbg_str = self.get_dbg_str()
    480             self.host.metadata = get_job_metadata(self.job)
    481         else:
    482             self.host = None
    483 
    484         if self.atomic_group_id:
    485             self.atomic_group = AtomicGroup(self.atomic_group_id,
    486                                             always_query=False)
    487         else:
    488             self.atomic_group = None
    489 
    490 
    491     @classmethod
    492     def clone(cls, template):
    493         """
    494         Creates a new row using the values from a template instance.
    495 
    496         The new instance will not exist in the database or have a valid
    497         id attribute until its save() method is called.
    498         """
    499         assert isinstance(template, cls)
    500         new_row = [getattr(template, field) for field in cls._fields]
    501         clone = cls(row=new_row, new_record=True)
    502         clone.id = None
    503         return clone
    504 
    505 
    506     def _view_job_url(self):
    507         return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
    508 
    509 
    510     def get_labels(self):
    511         """
    512         Get all labels associated with this host queue entry (either via the
    513         meta_host or as a job dependency label).  The labels yielded are not
    514         guaranteed to be unique.
    515 
    516         @yields Label instances associated with this host_queue_entry.
    517         """
    518         if self.meta_host:
    519             yield Label(id=self.meta_host, always_query=False)
    520         labels = Label.fetch(
    521                 joins="JOIN afe_jobs_dependency_labels AS deps "
    522                       "ON (afe_labels.id = deps.label_id)",
    523                 where="deps.job_id = %d" % self.job.id)
    524         for label in labels:
    525             yield label
    526 
    527 
    528     def set_host(self, host):
    529         if host:
    530             logging.info('Assigning host %s to entry %s', host.hostname, self)
    531             self.update_field('host_id', host.id)
    532             self.block_host(host.id)
    533         else:
    534             logging.info('Releasing host from %s', self)
    535             self.unblock_host(self.host.id)
    536             self.update_field('host_id', None)
    537 
    538         self.host = host
    539 
    540 
    541     def block_host(self, host_id):
    542         logging.info("creating block %s/%s", self.job.id, host_id)
    543         row = [0, self.job.id, host_id]
    544         block = IneligibleHostQueue(row=row, new_record=True)
    545         block.save()
    546 
    547 
    548     def unblock_host(self, host_id):
    549         logging.info("removing block %s/%s", self.job.id, host_id)
    550         blocks = IneligibleHostQueue.fetch(
    551             'job_id=%d and host_id=%d' % (self.job.id, host_id))
    552         for block in blocks:
    553             block.delete()
    554 
    555 
    556     def set_execution_subdir(self, subdir=None):
    557         if subdir is None:
    558             assert self.host
    559             subdir = self.host.hostname
    560         self.update_field('execution_subdir', subdir)
    561 
    562 
    563     def _get_hostname(self):
    564         if self.host:
    565             return self.host.hostname
    566         return 'no host'
    567 
    568 
    569     def get_dbg_str(self):
    570         """Get a debug string to identify this host.
    571 
    572         @return: A string containing the hqe and job id.
    573         """
    574         try:
    575             return 'HQE: %s, for job: %s' % (self.id, self.job_id)
    576         except AttributeError as e:
    577             return 'HQE has not been initialized yet: %s' % e
    578 
    579 
    580     def __str__(self):
    581         flags = []
    582         if self.active:
    583             flags.append('active')
    584         if self.complete:
    585             flags.append('complete')
    586         if self.deleted:
    587             flags.append('deleted')
    588         if self.aborted:
    589             flags.append('aborted')
    590         flags_str = ','.join(flags)
    591         if flags_str:
    592             flags_str = ' [%s]' % flags_str
    593         return ("%s and host: %s has status:%s%s" %
    594                 (self.get_dbg_str(), self._get_hostname(), self.status,
    595                  flags_str))
    596 
    597 
    598     def record_state(self, type_str, state, value):
    599         """Record metadata in elasticsearch.
    600 
    601         If ES configured to use http, then we will time that http request.
    602         Otherwise, it uses UDP, so we will not need to time it.
    603 
    604         @param type_str: sets the _type field in elasticsearch db.
    605         @param state: string representing what state we are recording,
    606                       e.g. 'status'
    607         @param value: value of the state, e.g. 'verifying'
    608         """
    609         metadata = {
    610             'time_changed': time.time(),
    611              state: value,
    612             'job_id': self.job_id,
    613         }
    614         if self.host:
    615             metadata['hostname'] = self.host.hostname
    616         autotest_es.post(type_str=type_str, metadata=metadata)
    617 
    618 
    619     @_timer.decorate
    620     def set_status(self, status):
    621         logging.info("%s -> %s", self, status)
    622 
    623         self.update_field('status', status)
    624         # Noticed some time jumps after last logging message.
    625         logging.debug('Update Field Complete')
    626 
    627         active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
    628         complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
    629         assert not (active and complete)
    630 
    631         self.update_field('active', active)
    632 
    633         # The ordering of these operations is important. Once we set the
    634         # complete bit this job will become indistinguishable from all
    635         # the other complete jobs, unless we first set shard_id to NULL
    636         # to signal to the shard_client that we need to upload it. However,
    637         # we can only set both these after we've updated finished_on etc
    638         # within _on_complete or the job will get synced in an intermediate
    639         # state. This means that if someone sigkills the scheduler between
    640         # setting finished_on and complete, we will have inconsistent jobs.
    641         # This should be fine, because nothing critical checks finished_on,
    642         # and the scheduler should never be killed mid-tick.
    643         if complete:
    644             self._on_complete(status)
    645             if self.job.shard_id is not None:
    646                 # If shard_id is None, the job will be synced back to the master
    647                 self.job.update_field('shard_id', None)
    648             self._email_on_job_complete()
    649 
    650         self.update_field('complete', complete)
    651 
    652         should_email_status = (status.lower() in _notify_email_statuses or
    653                                'all' in _notify_email_statuses)
    654         if should_email_status:
    655             self._email_on_status(status)
    656         logging.debug('HQE Set Status Complete')
    657         self.record_state('hqe_status', 'status', status)
    658 
    659 
    660 
    661     def _on_complete(self, status):
    662         if status is not models.HostQueueEntry.Status.ABORTED:
    663             self.job.stop_if_necessary()
    664 
    665         if self.started_on:
    666             self.set_finished_on_now()
    667         if not self.execution_subdir:
    668             return
    669         # unregister any possible pidfiles associated with this queue entry
    670         for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
    671             pidfile_id = _drone_manager.get_pidfile_id_from(
    672                     self.execution_path(), pidfile_name=pidfile_name)
    673             _drone_manager.unregister_pidfile(pidfile_id)
    674 
    675 
    676     def _get_status_email_contents(self, status, summary=None, hostname=None):
    677         """
    678         Gather info for the status notification e-mails.
    679 
    680         If needed, we could start using the Django templating engine to create
    681         the subject and the e-mail body, but that doesn't seem necessary right
    682         now.
    683 
    684         @param status: Job status text. Mandatory.
    685         @param summary: Job summary text. Optional.
    686         @param hostname: A hostname for the job. Optional.
    687 
    688         @return: Tuple (subject, body) for the notification e-mail.
    689         """
    690         job_stats = Job(id=self.job.id).get_execution_details()
    691 
    692         subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
    693                    (self.job.id, self.job.name, status))
    694 
    695         if hostname is not None:
    696             subject += '| Hostname: %s ' % hostname
    697 
    698         if status not in ["1 Failed", "Failed"]:
    699             subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
    700 
    701         body =  "Job ID: %s\n" % self.job.id
    702         body += "Job name: %s\n" % self.job.name
    703         if hostname is not None:
    704             body += "Host: %s\n" % hostname
    705         if summary is not None:
    706             body += "Summary: %s\n" % summary
    707         body += "Status: %s\n" % status
    708         body += "Results interface URL: %s\n" % self._view_job_url()
    709         body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
    710         if int(job_stats['total_executed']) > 0:
    711             body += "User tests executed: %s\n" % job_stats['total_executed']
    712             body += "User tests passed: %s\n" % job_stats['total_passed']
    713             body += "User tests failed: %s\n" % job_stats['total_failed']
    714             body += ("User tests success rate: %.2f %%\n" %
    715                      job_stats['success_rate'])
    716 
    717         if job_stats['failed_rows']:
    718             body += "Failures:\n"
    719             body += job_stats['failed_rows']
    720 
    721         return subject, body
    722 
    723 
    724     def _email_on_status(self, status):
    725         hostname = self._get_hostname()
    726         subject, body = self._get_status_email_contents(status, None, hostname)
    727         email_manager.manager.send_email(self.job.email_list, subject, body)
    728 
    729 
    730     def _email_on_job_complete(self):
    731         if not self.job.is_finished():
    732             return
    733 
    734         summary = []
    735         hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
    736         for queue_entry in hosts_queue:
    737             summary.append("Host: %s Status: %s" %
    738                                 (queue_entry._get_hostname(),
    739                                  queue_entry.status))
    740 
    741         summary = "\n".join(summary)
    742         status_counts = models.Job.objects.get_status_counts(
    743                 [self.job.id])[self.job.id]
    744         status = ', '.join('%d %s' % (count, status) for status, count
    745                     in status_counts.iteritems())
    746 
    747         subject, body = self._get_status_email_contents(status, summary, None)
    748         email_manager.manager.send_email(self.job.email_list, subject, body)
    749 
    750 
    751     def schedule_pre_job_tasks(self):
    752         logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
    753                      self.job.name, self.meta_host, self.atomic_group_id,
    754                      self.job.id, self.id, self.host.hostname, self.status)
    755 
    756         self._do_schedule_pre_job_tasks()
    757 
    758 
    759     def _do_schedule_pre_job_tasks(self):
    760         self.job.schedule_pre_job_tasks(queue_entry=self)
    761 
    762 
    763     def requeue(self):
    764         assert self.host
    765         self.set_status(models.HostQueueEntry.Status.QUEUED)
    766         self.update_field('started_on', None)
    767         self.update_field('finished_on', None)
    768         # verify/cleanup failure sets the execution subdir, so reset it here
    769         self.set_execution_subdir('')
    770         if self.meta_host:
    771             self.set_host(None)
    772 
    773 
    774     @property
    775     def aborted_by(self):
    776         self._load_abort_info()
    777         return self._aborted_by
    778 
    779 
    780     @property
    781     def aborted_on(self):
    782         self._load_abort_info()
    783         return self._aborted_on
    784 
    785 
    786     def _load_abort_info(self):
    787         """ Fetch info about who aborted the job. """
    788         if hasattr(self, "_aborted_by"):
    789             return
    790         rows = _db.execute("""
    791                 SELECT afe_users.login,
    792                         afe_aborted_host_queue_entries.aborted_on
    793                 FROM afe_aborted_host_queue_entries
    794                 INNER JOIN afe_users
    795                 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
    796                 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
    797                 """, (self.id,))
    798         if rows:
    799             self._aborted_by, self._aborted_on = rows[0]
    800         else:
    801             self._aborted_by = self._aborted_on = None
    802 
    803 
    804     def on_pending(self):
    805         """
    806         Called when an entry in a synchronous job has passed verify.  If the
    807         job is ready to run, sets the entries to STARTING. Otherwise, it leaves
    808         them in PENDING.
    809         """
    810         self.set_status(models.HostQueueEntry.Status.PENDING)
    811         self.host.set_status(models.Host.Status.PENDING)
    812 
    813         # Some debug code here: sends an email if an asynchronous job does not
    814         # immediately enter Starting.
    815         # TODO: Remove this once we figure out why asynchronous jobs are getting
    816         # stuck in Pending.
    817         self.job.run_if_ready(queue_entry=self)
    818         if (self.job.synch_count == 1 and
    819                 self.status == models.HostQueueEntry.Status.PENDING):
    820             subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
    821             message = 'Asynchronous job stuck in Pending'
    822             email_manager.manager.enqueue_notify_email(subject, message)
    823 
    824 
    825     def abort(self, dispatcher):
    826         assert self.aborted and not self.complete
    827 
    828         Status = models.HostQueueEntry.Status
    829         if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
    830             # do nothing; post-job tasks will finish and then mark this entry
    831             # with status "Aborted" and take care of the host
    832             return
    833 
    834         if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
    835                            Status.WAITING):
    836             # If hqe is in any of these status, it should not have any
    837             # unfinished agent before it can be aborted.
    838             agents = dispatcher.get_agents_for_entry(self)
    839             # Agent with finished task can be left behind. This is added to
    840             # handle the special case of aborting hostless job in STARTING
    841             # status, in which the agent has only a HostlessQueueTask
    842             # associated. The finished HostlessQueueTask will be cleaned up in
    843             # the next tick, so it's safe to leave the agent there. Without
    844             # filtering out finished agent, HQE abort won't be able to proceed.
    845             assert all([agent.is_done() for agent in agents])
    846             # If hqe is still in STARTING status, it may not have assigned a
    847             # host yet.
    848             if self.host:
    849                 self.host.set_status(models.Host.Status.READY)
    850         elif (self.status == Status.VERIFYING or
    851               self.status == Status.RESETTING):
    852             models.SpecialTask.objects.create(
    853                     task=models.SpecialTask.Task.CLEANUP,
    854                     host=models.Host.objects.get(id=self.host.id),
    855                     requested_by=self.job.owner_model())
    856         elif self.status == Status.PROVISIONING:
    857             models.SpecialTask.objects.create(
    858                     task=models.SpecialTask.Task.REPAIR,
    859                     host=models.Host.objects.get(id=self.host.id),
    860                     requested_by=self.job.owner_model())
    861 
    862         self.set_status(Status.ABORTED)
    863         self.job.abort_delay_ready_task()
    864 
    865 
    866     def get_group_name(self):
    867         atomic_group = self.atomic_group
    868         if not atomic_group:
    869             return ''
    870 
    871         # Look at any meta_host and dependency labels and pick the first
    872         # one that also specifies this atomic group.  Use that label name
    873         # as the group name if possible (it is more specific).
    874         for label in self.get_labels():
    875             if label.atomic_group_id:
    876                 assert label.atomic_group_id == atomic_group.id
    877                 return label.name
    878         return atomic_group.name
    879 
    880 
    881     def execution_tag(self):
    882         SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
    883                                'complete!=1 AND execution_subdir="" AND '
    884                                'status!="Queued";')
    885         SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
    886                                  'status="Aborted" WHERE id=%s;')
    887         try:
    888             assert self.execution_subdir
    889         except AssertionError:
    890             # TODO(scottz): Remove temporary fix/info gathering pathway for
    891             # crosbug.com/31595 once issue is root caused.
    892             logging.error('No execution_subdir for host queue id:%s.', self.id)
    893             logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
    894             for row in _db.execute(SQL_SUSPECT_ENTRIES):
    895                 logging.error(row)
    896             logging.error('====DB DEBUG====\n')
    897             fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
    898             logging.error('EXECUTING: %s', fix_query)
    899             _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
    900             raise AssertionError(('self.execution_subdir not found. '
    901                                   'See log for details.'))
    902 
    903         return "%s/%s" % (self.job.tag(), self.execution_subdir)
    904 
    905 
    906     def execution_path(self):
    907         return self.execution_tag()
    908 
    909 
    910     def set_started_on_now(self):
    911         self.update_field('started_on', datetime.datetime.now())
    912 
    913 
    914     def set_finished_on_now(self):
    915         self.update_field('finished_on', datetime.datetime.now())
    916 
    917 
    918     def is_hostless(self):
    919         return (self.host_id is None
    920                 and self.meta_host is None
    921                 and self.atomic_group_id is None)
    922 
    923 
    924 class Job(DBObject):
    925     _table_name = 'afe_jobs'
    926     _fields = ('id', 'owner', 'name', 'priority', 'control_file',
    927                'control_type', 'created_on', 'synch_count', 'timeout',
    928                'run_verify', 'email_list', 'reboot_before', 'reboot_after',
    929                'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
    930                'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
    931                'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
    932                'require_ssp')
    933     _timer = autotest_stats.Timer("scheduler_models.Job")
    934 
    935     # This does not need to be a column in the DB.  The delays are likely to
    936     # be configured short.  If the scheduler is stopped and restarted in
    937     # the middle of a job's delay cycle, the delay cycle will either be
    938     # repeated or skipped depending on the number of Pending machines found
    939     # when the restarted scheduler recovers to track it.  Not a problem.
    940     #
    941     # A reference to the DelayedCallTask that will wake up the job should
    942     # no other HQEs change state in time.  Its end_time attribute is used
    943     # by our run_with_ready_delay() method to determine if the wait is over.
    944     _delay_ready_task = None
    945 
    946     # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
    947     # all status='Pending' atomic group HQEs incase a delay was running when the
    948     # scheduler was restarted and no more hosts ever successfully exit Verify.
    949 
    950     def __init__(self, id=None, row=None, **kwargs):
    951         assert id or row
    952         super(Job, self).__init__(id=id, row=row, **kwargs)
    953         self._owner_model = None # caches model instance of owner
    954         self.update_image_path = None # path of OS image to install
    955 
    956 
    957     def model(self):
    958         return models.Job.objects.get(id=self.id)
    959 
    960 
    961     def owner_model(self):
    962         # work around the fact that the Job owner field is a string, not a
    963         # foreign key
    964         if not self._owner_model:
    965             self._owner_model = models.User.objects.get(login=self.owner)
    966         return self._owner_model
    967 
    968 
    969     def is_server_job(self):
    970         return self.control_type == control_data.CONTROL_TYPE.SERVER
    971 
    972 
    973     def tag(self):
    974         return "%s-%s" % (self.id, self.owner)
    975 
    976 
    977     def get_host_queue_entries(self):
    978         rows = _db.execute("""
    979                 SELECT * FROM afe_host_queue_entries
    980                 WHERE job_id= %s
    981         """, (self.id,))
    982         entries = [HostQueueEntry(row=i) for i in rows]
    983 
    984         assert len(entries)>0
    985 
    986         return entries
    987 
    988 
    989     def is_image_update_job(self):
    990         """
    991         Discover if the current job requires an OS update.
    992 
    993         @return: True/False if OS should be updated before job is run.
    994         """
    995         # All image update jobs have the parameterized_job_id set.
    996         if not self.parameterized_job_id:
    997             return False
    998 
    999         # Retrieve the ID of the ParameterizedJob this job is an instance of.
   1000         rows = _db.execute("""
   1001                 SELECT test_id
   1002                 FROM afe_parameterized_jobs
   1003                 WHERE id = %s
   1004                 """, (self.parameterized_job_id,))
   1005         if not rows:
   1006             return False
   1007         test_id = rows[0][0]
   1008 
   1009         # Retrieve the ID of the known autoupdate_ParameterizedJob.
   1010         rows = _db.execute("""
   1011                 SELECT id
   1012                 FROM afe_autotests
   1013                 WHERE name = 'autoupdate_ParameterizedJob'
   1014                 """)
   1015         if not rows:
   1016             return False
   1017         update_id = rows[0][0]
   1018 
   1019         # If the IDs are the same we've found an image update job.
   1020         if test_id == update_id:
   1021             # Finally, get the path to the OS image to install.
   1022             rows = _db.execute("""
   1023                     SELECT parameter_value
   1024                     FROM afe_parameterized_job_parameters
   1025                     WHERE parameterized_job_id = %s
   1026                     """, (self.parameterized_job_id,))
   1027             if rows:
   1028                 # Save the path in update_image_path to use later as a command
   1029                 # line parameter to autoserv.
   1030                 self.update_image_path = rows[0][0]
   1031                 return True
   1032 
   1033         return False
   1034 
   1035 
   1036     def get_execution_details(self):
   1037         """
   1038         Get test execution details for this job.
   1039 
   1040         @return: Dictionary with test execution details
   1041         """
   1042         def _find_test_jobs(rows):
   1043             """
   1044             Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
   1045             Those are autotest 'internal job' tests, so they should not be
   1046             counted when evaluating the test stats.
   1047 
   1048             @param rows: List of rows (matrix) with database results.
   1049             """
   1050             job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
   1051             n_test_jobs = 0
   1052             for r in rows:
   1053                 test_name = r[0]
   1054                 if job_test_pattern.match(test_name):
   1055                     n_test_jobs += 1
   1056 
   1057             return n_test_jobs
   1058 
   1059         stats = {}
   1060 
   1061         rows = _db.execute("""
   1062                 SELECT t.test, s.word, t.reason
   1063                 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
   1064                 WHERE t.job_idx = j.job_idx
   1065                 AND s.status_idx = t.status
   1066                 AND j.afe_job_id = %s
   1067                 ORDER BY t.reason
   1068                 """ % self.id)
   1069 
   1070         failed_rows = [r for r in rows if not r[1] == 'GOOD']
   1071 
   1072         n_test_jobs = _find_test_jobs(rows)
   1073         n_test_jobs_failed = _find_test_jobs(failed_rows)
   1074 
   1075         total_executed = len(rows) - n_test_jobs
   1076         total_failed = len(failed_rows) - n_test_jobs_failed
   1077 
   1078         if total_executed > 0:
   1079             success_rate = 100 - ((total_failed / float(total_executed)) * 100)
   1080         else:
   1081             success_rate = 0
   1082 
   1083         stats['total_executed'] = total_executed
   1084         stats['total_failed'] = total_failed
   1085         stats['total_passed'] = total_executed - total_failed
   1086         stats['success_rate'] = success_rate
   1087 
   1088         status_header = ("Test Name", "Status", "Reason")
   1089         if failed_rows:
   1090             stats['failed_rows'] = utils.matrix_to_string(failed_rows,
   1091                                                           status_header)
   1092         else:
   1093             stats['failed_rows'] = ''
   1094 
   1095         time_row = _db.execute("""
   1096                    SELECT started_time, finished_time
   1097                    FROM tko_jobs
   1098                    WHERE afe_job_id = %s
   1099                    """ % self.id)
   1100 
   1101         if time_row:
   1102             t_begin, t_end = time_row[0]
   1103             try:
   1104                 delta = t_end - t_begin
   1105                 minutes, seconds = divmod(delta.seconds, 60)
   1106                 hours, minutes = divmod(minutes, 60)
   1107                 stats['execution_time'] = ("%02d:%02d:%02d" %
   1108                                            (hours, minutes, seconds))
   1109             # One of t_end or t_begin are None
   1110             except TypeError:
   1111                 stats['execution_time'] = '(could not determine)'
   1112         else:
   1113             stats['execution_time'] = '(none)'
   1114 
   1115         return stats
   1116 
   1117 
   1118     @_timer.decorate
   1119     def set_status(self, status, update_queues=False):
   1120         self.update_field('status',status)
   1121 
   1122         if update_queues:
   1123             for queue_entry in self.get_host_queue_entries():
   1124                 queue_entry.set_status(status)
   1125 
   1126 
   1127     def keyval_dict(self):
   1128         return self.model().keyval_dict()
   1129 
   1130 
   1131     def _atomic_and_has_started(self):
   1132         """
   1133         @returns True if any of the HostQueueEntries associated with this job
   1134         have entered the Status.STARTING state or beyond.
   1135         """
   1136         atomic_entries = models.HostQueueEntry.objects.filter(
   1137                 job=self.id, atomic_group__isnull=False)
   1138         if atomic_entries.count() <= 0:
   1139             return False
   1140 
   1141         # These states may *only* be reached if Job.run() has been called.
   1142         started_statuses = (models.HostQueueEntry.Status.STARTING,
   1143                             models.HostQueueEntry.Status.RUNNING,
   1144                             models.HostQueueEntry.Status.COMPLETED)
   1145 
   1146         started_entries = atomic_entries.filter(status__in=started_statuses)
   1147         return started_entries.count() > 0
   1148 
   1149 
   1150     def _hosts_assigned_count(self):
   1151         """The number of HostQueueEntries assigned a Host for this job."""
   1152         entries = models.HostQueueEntry.objects.filter(job=self.id,
   1153                                                        host__isnull=False)
   1154         return entries.count()
   1155 
   1156 
   1157     def _pending_count(self):
   1158         """The number of HostQueueEntries for this job in the Pending state."""
   1159         pending_entries = models.HostQueueEntry.objects.filter(
   1160                 job=self.id, status=models.HostQueueEntry.Status.PENDING)
   1161         return pending_entries.count()
   1162 
   1163 
   1164     def _max_hosts_needed_to_run(self, atomic_group):
   1165         """
   1166         @param atomic_group: The AtomicGroup associated with this job that we
   1167                 are using to set an upper bound on the threshold.
   1168         @returns The maximum number of HostQueueEntries assigned a Host before
   1169                 this job can run.
   1170         """
   1171         return min(self._hosts_assigned_count(),
   1172                    atomic_group.max_number_of_machines)
   1173 
   1174 
   1175     def _min_hosts_needed_to_run(self):
   1176         """Return the minumum number of hsots needed to run this job."""
   1177         return self.synch_count
   1178 
   1179 
   1180     def is_ready(self):
   1181         # NOTE: Atomic group jobs stop reporting ready after they have been
   1182         # started to avoid launching multiple copies of one atomic job.
   1183         # Only possible if synch_count is less than than half the number of
   1184         # machines in the atomic group.
   1185         pending_count = self._pending_count()
   1186         atomic_and_has_started = self._atomic_and_has_started()
   1187         ready = (pending_count >= self.synch_count
   1188                  and not atomic_and_has_started)
   1189 
   1190         if not ready:
   1191             logging.info(
   1192                     'Job %s not ready: %s pending, %s required '
   1193                     '(Atomic and started: %s)',
   1194                     self, pending_count, self.synch_count,
   1195                     atomic_and_has_started)
   1196 
   1197         return ready
   1198 
   1199 
   1200     def num_machines(self, clause = None):
   1201         sql = "job_id=%s" % self.id
   1202         if clause:
   1203             sql += " AND (%s)" % clause
   1204         return self.count(sql, table='afe_host_queue_entries')
   1205 
   1206 
   1207     def num_queued(self):
   1208         return self.num_machines('not complete')
   1209 
   1210 
   1211     def num_active(self):
   1212         return self.num_machines('active')
   1213 
   1214 
   1215     def num_complete(self):
   1216         return self.num_machines('complete')
   1217 
   1218 
   1219     def is_finished(self):
   1220         return self.num_complete() == self.num_machines()
   1221 
   1222 
   1223     def _not_yet_run_entries(self, include_verifying=True):
   1224         statuses = [models.HostQueueEntry.Status.QUEUED,
   1225                     models.HostQueueEntry.Status.PENDING]
   1226         if include_verifying:
   1227             statuses.append(models.HostQueueEntry.Status.VERIFYING)
   1228         return models.HostQueueEntry.objects.filter(job=self.id,
   1229                                                     status__in=statuses)
   1230 
   1231 
   1232     def _stop_all_entries(self):
   1233         entries_to_stop = self._not_yet_run_entries(
   1234             include_verifying=False)
   1235         for child_entry in entries_to_stop:
   1236             assert not child_entry.complete, (
   1237                 '%s status=%s, active=%s, complete=%s' %
   1238                 (child_entry.id, child_entry.status, child_entry.active,
   1239                  child_entry.complete))
   1240             if child_entry.status == models.HostQueueEntry.Status.PENDING:
   1241                 child_entry.host.status = models.Host.Status.READY
   1242                 child_entry.host.save()
   1243             child_entry.status = models.HostQueueEntry.Status.STOPPED
   1244             child_entry.save()
   1245 
   1246 
   1247     def stop_if_necessary(self):
   1248         not_yet_run = self._not_yet_run_entries()
   1249         if not_yet_run.count() < self.synch_count:
   1250             self._stop_all_entries()
   1251 
   1252 
   1253     def _next_group_name(self, group_name=''):
   1254         """@returns a directory name to use for the next host group results."""
   1255         if group_name:
   1256             # Sanitize for use as a pathname.
   1257             group_name = group_name.replace(os.path.sep, '_')
   1258             if group_name.startswith('.'):
   1259                 group_name = '_' + group_name[1:]
   1260             # Add a separator between the group name and 'group%d'.
   1261             group_name += '.'
   1262         group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
   1263         query = models.HostQueueEntry.objects.filter(
   1264             job=self.id).values('execution_subdir').distinct()
   1265         subdirs = (entry['execution_subdir'] for entry in query)
   1266         group_matches = (group_count_re.match(subdir) for subdir in subdirs)
   1267         ids = [int(match.group(1)) for match in group_matches if match]
   1268         if ids:
   1269             next_id = max(ids) + 1
   1270         else:
   1271             next_id = 0
   1272         return '%sgroup%d' % (group_name, next_id)
   1273 
   1274 
   1275     def get_group_entries(self, queue_entry_from_group):
   1276         """
   1277         @param queue_entry_from_group: A HostQueueEntry instance to find other
   1278                 group entries on this job for.
   1279 
   1280         @returns A list of HostQueueEntry objects all executing this job as
   1281                 part of the same group as the one supplied (having the same
   1282                 execution_subdir).
   1283         """
   1284         execution_subdir = queue_entry_from_group.execution_subdir
   1285         return list(HostQueueEntry.fetch(
   1286             where='job_id=%s AND execution_subdir=%s',
   1287             params=(self.id, execution_subdir)))
   1288 
   1289 
   1290     def _should_run_cleanup(self, queue_entry):
   1291         if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
   1292             return True
   1293         elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
   1294             return queue_entry.host.dirty
   1295         return False
   1296 
   1297 
   1298     def _should_run_verify(self, queue_entry):
   1299         do_not_verify = (queue_entry.host.protection ==
   1300                          host_protections.Protection.DO_NOT_VERIFY)
   1301         if do_not_verify:
   1302             return False
   1303         # If RebootBefore is set to NEVER, then we won't run reset because
   1304         # we can't cleanup, so we need to weaken a Reset into a Verify.
   1305         weaker_reset = (self.run_reset and
   1306                 self.reboot_before == model_attributes.RebootBefore.NEVER)
   1307         return self.run_verify or weaker_reset
   1308 
   1309 
   1310     def _should_run_reset(self, queue_entry):
   1311         can_verify = (queue_entry.host.protection !=
   1312                          host_protections.Protection.DO_NOT_VERIFY)
   1313         can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
   1314         return (can_reboot and can_verify and (self.run_reset or
   1315                 (self._should_run_cleanup(queue_entry) and
   1316                  self._should_run_verify(queue_entry))))
   1317 
   1318 
   1319     def _should_run_provision(self, queue_entry):
   1320         """
   1321         Determine if the queue_entry needs to have a provision task run before
   1322         it to provision queue_entry.host.
   1323 
   1324         @param queue_entry: The host queue entry in question.
   1325         @returns: True if we should schedule a provision task, False otherwise.
   1326 
   1327         """
   1328         # If we get to this point, it means that the scheduler has already
   1329         # vetted that all the unprovisionable labels match, so we can just
   1330         # find all labels on the job that aren't on the host to get the list
   1331         # of what we need to provision.  (See the scheduling logic in
   1332         # host_scheduler.py:is_host_eligable_for_job() where we discard all
   1333         # actionable labels when assigning jobs to hosts.)
   1334         job_labels = {x.name for x in queue_entry.get_labels()}
   1335         _, host_labels = queue_entry.host.platform_and_labels()
   1336         # If there are any labels on the job that are not on the host and they
   1337         # are labels that provisioning knows how to change, then that means
   1338         # there is provisioning work to do.  If there's no provisioning work to
   1339         # do, then obviously we have no reason to schedule a provision task!
   1340         diff = job_labels - set(host_labels)
   1341         if any([provision.Provision.acts_on(x) for x in diff]):
   1342             return True
   1343         return False
   1344 
   1345 
   1346     def _queue_special_task(self, queue_entry, task):
   1347         """
   1348         Create a special task and associate it with a host queue entry.
   1349 
   1350         @param queue_entry: The queue entry this special task should be
   1351                             associated with.
   1352         @param task: One of the members of the enum models.SpecialTask.Task.
   1353         @returns: None
   1354 
   1355         """
   1356         models.SpecialTask.objects.create(
   1357                 host=models.Host.objects.get(id=queue_entry.host_id),
   1358                 queue_entry=queue_entry, task=task)
   1359 
   1360 
   1361     def schedule_pre_job_tasks(self, queue_entry):
   1362         """
   1363         Queue all of the special tasks that need to be run before a host
   1364         queue entry may run.
   1365 
   1366         If no special taskes need to be scheduled, then |on_pending| will be
   1367         called directly.
   1368 
   1369         @returns None
   1370 
   1371         """
   1372         task_queued = False
   1373         hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
   1374 
   1375         if self._should_run_provision(queue_entry):
   1376             self._queue_special_task(hqe_model,
   1377                                      models.SpecialTask.Task.PROVISION)
   1378             task_queued = True
   1379         elif self._should_run_reset(queue_entry):
   1380             self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
   1381             task_queued = True
   1382         else:
   1383             if self._should_run_cleanup(queue_entry):
   1384                 self._queue_special_task(hqe_model,
   1385                                          models.SpecialTask.Task.CLEANUP)
   1386                 task_queued = True
   1387             if self._should_run_verify(queue_entry):
   1388                 self._queue_special_task(hqe_model,
   1389                                          models.SpecialTask.Task.VERIFY)
   1390                 task_queued = True
   1391 
   1392         if not task_queued:
   1393             queue_entry.on_pending()
   1394 
   1395 
   1396     def _assign_new_group(self, queue_entries, group_name=''):
   1397         if len(queue_entries) == 1:
   1398             group_subdir_name = queue_entries[0].host.hostname
   1399         else:
   1400             group_subdir_name = self._next_group_name(group_name)
   1401             logging.info('Running synchronous job %d hosts %s as %s',
   1402                 self.id, [entry.host.hostname for entry in queue_entries],
   1403                 group_subdir_name)
   1404 
   1405         for queue_entry in queue_entries:
   1406             queue_entry.set_execution_subdir(group_subdir_name)
   1407 
   1408 
   1409     def _choose_group_to_run(self, include_queue_entry):
   1410         """
   1411         @returns A tuple containing a list of HostQueueEntry instances to be
   1412                 used to run this Job, a string group name to suggest giving
   1413                 to this job in the results database.
   1414         """
   1415         atomic_group = include_queue_entry.atomic_group
   1416         chosen_entries = [include_queue_entry]
   1417         if atomic_group:
   1418             num_entries_wanted = atomic_group.max_number_of_machines
   1419         else:
   1420             num_entries_wanted = self.synch_count
   1421         num_entries_wanted -= len(chosen_entries)
   1422 
   1423         if num_entries_wanted > 0:
   1424             where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
   1425             pending_entries = list(HostQueueEntry.fetch(
   1426                      where=where_clause,
   1427                      params=(self.id, include_queue_entry.id)))
   1428 
   1429             # Sort the chosen hosts by hostname before slicing.
   1430             def cmp_queue_entries_by_hostname(entry_a, entry_b):
   1431                 return Host.cmp_for_sort(entry_a.host, entry_b.host)
   1432             pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
   1433             chosen_entries += pending_entries[:num_entries_wanted]
   1434 
   1435         # Sanity check.  We'll only ever be called if this can be met.
   1436         if len(chosen_entries) < self.synch_count:
   1437             message = ('job %s got less than %s chosen entries: %s' % (
   1438                     self.id, self.synch_count, chosen_entries))
   1439             logging.error(message)
   1440             email_manager.manager.enqueue_notify_email(
   1441                     'Job not started, too few chosen entries', message)
   1442             return []
   1443 
   1444         group_name = include_queue_entry.get_group_name()
   1445 
   1446         self._assign_new_group(chosen_entries, group_name=group_name)
   1447         return chosen_entries
   1448 
   1449 
   1450     def run_if_ready(self, queue_entry):
   1451         """
   1452         Run this job by kicking its HQEs into status='Starting' if enough
   1453         hosts are ready for it to run.
   1454 
   1455         Cleans up by kicking HQEs into status='Stopped' if this Job is not
   1456         ready to run.
   1457         """
   1458         if not self.is_ready():
   1459             self.stop_if_necessary()
   1460         elif queue_entry.atomic_group:
   1461             self.run_with_ready_delay(queue_entry)
   1462         else:
   1463             self.run(queue_entry)
   1464 
   1465 
   1466     def run_with_ready_delay(self, queue_entry):
   1467         """
   1468         Start a delay to wait for more hosts to enter Pending state before
   1469         launching an atomic group job.  Once set, the a delay cannot be reset.
   1470 
   1471         @param queue_entry: The HostQueueEntry object to get atomic group
   1472                 info from and pass to run_if_ready when the delay is up.
   1473 
   1474         @returns An Agent to run the job as appropriate or None if a delay
   1475                 has already been set.
   1476         """
   1477         assert queue_entry.job_id == self.id
   1478         assert queue_entry.atomic_group
   1479         delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
   1480         over_max_threshold = (self._pending_count() >=
   1481                 self._max_hosts_needed_to_run(queue_entry.atomic_group))
   1482         delay_expired = (self._delay_ready_task and
   1483                          time.time() >= self._delay_ready_task.end_time)
   1484 
   1485         # Delay is disabled or we already have enough?  Do not wait to run.
   1486         if not delay or over_max_threshold or delay_expired:
   1487             self.run(queue_entry)
   1488         else:
   1489             queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
   1490 
   1491 
   1492     def request_abort(self):
   1493         """Request that this Job be aborted on the next scheduler cycle."""
   1494         self.model().abort()
   1495 
   1496 
   1497     def schedule_delayed_callback_task(self, queue_entry):
   1498         queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
   1499 
   1500         if self._delay_ready_task:
   1501             return None
   1502 
   1503         delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
   1504 
   1505         def run_job_after_delay():
   1506             logging.info('Job %s done waiting for extra hosts.', self)
   1507             # Check to see if the job is still relevant.  It could have aborted
   1508             # while we were waiting or hosts could have disappearred, etc.
   1509             if self._pending_count() < self._min_hosts_needed_to_run():
   1510                 logging.info('Job %s had too few Pending hosts after waiting '
   1511                              'for extras.  Not running.', self)
   1512                 self.request_abort()
   1513                 return
   1514             return self.run(queue_entry)
   1515 
   1516         logging.info('Job %s waiting up to %s seconds for more hosts.',
   1517                      self.id, delay)
   1518         self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
   1519                                                  callback=run_job_after_delay)
   1520         return self._delay_ready_task
   1521 
   1522 
   1523     def run(self, queue_entry):
   1524         """
   1525         @param queue_entry: The HostQueueEntry instance calling this method.
   1526         """
   1527         if queue_entry.atomic_group and self._atomic_and_has_started():
   1528             logging.error('Job.run() called on running atomic Job %d '
   1529                           'with HQE %s.', self.id, queue_entry)
   1530             return
   1531         queue_entries = self._choose_group_to_run(queue_entry)
   1532         if queue_entries:
   1533             self._finish_run(queue_entries)
   1534 
   1535 
   1536     def _finish_run(self, queue_entries):
   1537         for queue_entry in queue_entries:
   1538             queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
   1539         self.abort_delay_ready_task()
   1540 
   1541 
   1542     def abort_delay_ready_task(self):
   1543         """Abort the delayed task associated with this job, if any."""
   1544         if self._delay_ready_task:
   1545             # Cancel any pending callback that would try to run again
   1546             # as we are already running.
   1547             self._delay_ready_task.abort()
   1548 
   1549 
   1550     def __str__(self):
   1551         return '%s-%s' % (self.id, self.owner)
   1552