Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 """Host scheduler.
      9 
     10 If run as a standalone service, the host scheduler ensures the following:
     11     1. Hosts will not be assigned to multiple hqes simultaneously. The process
     12        of assignment in this case refers to the modification of the host_id
     13        column of a row in the afe_host_queue_entries table, to reflect the host
     14        id of a leased host matching the dependencies of the job.
     15     2. Hosts that are not being used by active hqes or incomplete special tasks
     16        will be released back to the available hosts pool, for acquisition by
     17        subsequent hqes.
     18 In addition to these guarantees, the host scheduler also confirms that no 2
     19 active hqes/special tasks are assigned the same host, and sets the leased bit
     20 for hosts needed by frontend special tasks. The need for the latter is only
     21 apparent when viewed in the context of the job-scheduler (monitor_db), which
     22 runs special tasks only after their hosts have been leased.
     23 
     24 ** Suport minimum duts requirement for suites (non-inline mode) **
     25 
     26 Each suite can specify the minimum number of duts it requires by
     27 dropping a 'suite_min_duts' job keyval which defaults to 0.
     28 
     29 When suites are competing for duts, if any suite has not got minimum duts
     30 it requires, the host scheduler will try to meet the requirement first,
     31 even if other suite may have higher priority or earlier timestamp. Once
     32 all suites' minimum duts requirement have been fullfilled, the host
     33 scheduler will allocate the rest of duts based on job priority and suite job id.
     34 This is to prevent low priority suites from starving when sharing pool with
     35 high-priority suites.
     36 
     37 Note:
     38     1. Prevent potential starvation:
     39        We need to carefully choose |suite_min_duts| for both low and high
     40        priority suites. If a high priority suite didn't specify it but a low
     41        priority one does, the high priority suite can be starved!
     42     2. Restart requirement:
     43        Restart host scheduler if you manually released a host by setting
     44        leased=0 in db. This is needed because host scheduler maintains internal
     45        state of host assignment for suites.
     46     3. Exchanging duts triggers provisioning:
     47        TODO(fdeng): There is a chance two suites can exchange duts,
     48        if the two suites are for different builds, the exchange
     49        will trigger provisioning. This can be optimized by preferring getting
     50        hosts with the same build.
     51 """
     52 
     53 import argparse
     54 import collections
     55 import datetime
     56 import logging
     57 import os
     58 import signal
     59 import sys
     60 import time
     61 
     62 import common
     63 from autotest_lib.frontend import setup_django_environment
     64 
     65 # This import needs to come earlier to avoid using autotest's version of
     66 # httplib2, which is out of date.
     67 try:
     68     from chromite.lib import metrics
     69     from chromite.lib import ts_mon_config
     70 except ImportError:
     71     metrics = utils.metrics_mock
     72     ts_mon_config = utils.metrics_mock
     73 
     74 from autotest_lib.client.common_lib import global_config
     75 from autotest_lib.client.common_lib import utils
     76 from autotest_lib.scheduler import email_manager
     77 from autotest_lib.scheduler import query_managers
     78 from autotest_lib.scheduler import rdb_lib
     79 from autotest_lib.scheduler import rdb_utils
     80 from autotest_lib.scheduler import scheduler_lib
     81 from autotest_lib.scheduler import scheduler_models
     82 from autotest_lib.site_utils import job_overhead
     83 from autotest_lib.site_utils import metadata_reporter
     84 from autotest_lib.site_utils import server_manager_utils
     85 
     86 
     87 _db_manager = None
     88 _shutdown = False
     89 _tick_pause_sec = global_config.global_config.get_config_value(
     90         'SCHEDULER', 'tick_pause_sec', type=int, default=5)
     91 _monitor_db_host_acquisition = global_config.global_config.get_config_value(
     92         'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
     93 _METRICS_PREFIX = 'chromeos/autotest/host_scheduler'
     94 
     95 class SuiteRecorder(object):
     96     """Recording the host assignment for suites.
     97 
     98     The recorder holds two things:
     99         * suite_host_num, records how many duts a suite is holding,
    100           which is a map <suite_job_id -> num_of_hosts>
    101         * hosts_to_suites, records which host is assigned to which
    102           suite, it is a map <host_id -> suite_job_id>
    103     The two datastructure got updated when a host is assigned to or released
    104     by a job.
    105 
    106     The reason to maintain hosts_to_suites is that, when a host is released,
    107     we need to know which suite it was leased to. Querying the db for the
    108     latest completed job that has run on a host is slow.  Therefore, we go with
    109     an alternative: keeping a <host id, suite job id> map
    110     in memory (for 10K hosts, the map should take less than 1M memory on
    111     64-bit machine with python 2.7)
    112 
    113     """
    114 
    115 
    116     def __init__(self, job_query_manager):
    117         """Initialize.
    118 
    119         @param job_queue_manager: A JobQueueryManager object.
    120         """
    121         self.job_query_manager = job_query_manager
    122         self.suite_host_num, self.hosts_to_suites = (
    123                 self.job_query_manager.get_suite_host_assignment())
    124 
    125 
    126     def record_assignment(self, queue_entry):
    127         """Record that the hqe has got a host.
    128 
    129         @param queue_entry: A scheduler_models.HostQueueEntry object which has
    130                             got a host.
    131         """
    132         parent_id = queue_entry.job.parent_job_id
    133         if not parent_id:
    134             return
    135         if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
    136             logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
    137                           'seems already recorded', queue_entry.id,
    138                           parent_id, queue_entry.host.hostname)
    139             return
    140         num_hosts = self.suite_host_num.get(parent_id, 0)
    141         self.suite_host_num[parent_id] = num_hosts + 1
    142         self.hosts_to_suites[queue_entry.host_id] = parent_id
    143         logging.debug('Suite %d got host %s, currently holding %d hosts',
    144                       parent_id, queue_entry.host.hostname,
    145                       self.suite_host_num[parent_id])
    146 
    147 
    148     def record_release(self, hosts):
    149         """Update the record with host releasing event.
    150 
    151         @param hosts: A list of scheduler_models.Host objects.
    152         """
    153         for host in hosts:
    154             if host.id in self.hosts_to_suites:
    155                 parent_job_id = self.hosts_to_suites.pop(host.id)
    156                 count = self.suite_host_num[parent_job_id] - 1
    157                 if count == 0:
    158                     del self.suite_host_num[parent_job_id]
    159                 else:
    160                     self.suite_host_num[parent_job_id] = count
    161                 logging.debug(
    162                         'Suite %d releases host %s, currently holding %d hosts',
    163                         parent_job_id, host.hostname, count)
    164 
    165 
    166     def get_min_duts(self, suite_job_ids):
    167         """Figure out min duts to request.
    168 
    169         Given a set ids of suite jobs, figure out minimum duts to request for
    170         each suite. It is determined by two factors: min_duts specified
    171         for each suite in its job keyvals, and how many duts a suite is
    172         currently holding.
    173 
    174         @param suite_job_ids: A set of suite job ids.
    175 
    176         @returns: A dictionary, the key is suite_job_id, the value
    177                   is the minimum number of duts to request.
    178         """
    179         suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
    180                 suite_job_ids)
    181         for parent_id in suite_job_ids:
    182             min_duts = suite_min_duts.get(parent_id, 0)
    183             cur_duts = self.suite_host_num.get(parent_id, 0)
    184             suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
    185         logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
    186                       suite_min_duts)
    187         return suite_min_duts
    188 
    189 
    190 class BaseHostScheduler(object):
    191     """Base class containing host acquisition logic.
    192 
    193     This class contains all the core host acquisition logic needed by the
    194     scheduler to run jobs on hosts. It is only capable of releasing hosts
    195     back to the rdb through its tick, any other action must be instigated by
    196     the job scheduler.
    197     """
    198 
    199 
    200     host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
    201 
    202 
    203     def __init__(self):
    204         self.host_query_manager = query_managers.AFEHostQueryManager()
    205 
    206 
    207     def _release_hosts(self):
    208         """Release hosts to the RDB.
    209 
    210         Release all hosts that are ready and are currently not being used by an
    211         active hqe, and don't have a new special task scheduled against them.
    212 
    213         @return a list of hosts that are released.
    214         """
    215         release_hosts = self.host_query_manager.find_unused_healty_hosts()
    216         release_hostnames = [host.hostname for host in release_hosts]
    217         if release_hostnames:
    218             self.host_query_manager.set_leased(
    219                     False, hostname__in=release_hostnames)
    220         return release_hosts
    221 
    222 
    223     @classmethod
    224     def schedule_host_job(cls, host, queue_entry):
    225         """Schedule a job on a host.
    226 
    227         Scheduling a job involves:
    228             1. Setting the active bit on the queue_entry.
    229             2. Scheduling a special task on behalf of the queue_entry.
    230         Performing these actions will lead the job scheduler through a chain of
    231         events, culminating in running the test and collecting results from
    232         the host.
    233 
    234         @param host: The host against which to schedule the job.
    235         @param queue_entry: The queue_entry to schedule.
    236         """
    237         if queue_entry.host_id is None:
    238             queue_entry.set_host(host)
    239         elif host.id != queue_entry.host_id:
    240                 raise rdb_utils.RDBException('The rdb returned host: %s '
    241                         'but the job:%s was already assigned a host: %s. ' %
    242                         (host.hostname, queue_entry.job_id,
    243                          queue_entry.host.hostname))
    244         queue_entry.update_field('active', True)
    245 
    246         # TODO: crbug.com/373936. The host scheduler should only be assigning
    247         # jobs to hosts, but the criterion we use to release hosts depends
    248         # on it not being used by an active hqe. Since we're activating the
    249         # hqe here, we also need to schedule its first prejob task. OTOH,
    250         # we could converge to having the host scheduler manager all special
    251         # tasks, since their only use today is to verify/cleanup/reset a host.
    252         logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
    253         queue_entry.schedule_pre_job_tasks()
    254 
    255 
    256     def acquire_hosts(self, host_jobs):
    257         """Accquire hosts for given jobs.
    258 
    259         This method sends jobs that need hosts to rdb.
    260         Child class can override this method to pipe more args
    261         to rdb.
    262 
    263         @param host_jobs: A list of queue entries that either require hosts,
    264             or require host assignment validation through the rdb.
    265 
    266         @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
    267                        for each host acquired on behalf of a queue_entry,
    268                        or None if a host wasn't found.
    269         """
    270         return rdb_lib.acquire_hosts(host_jobs)
    271 
    272 
    273     def find_hosts_for_jobs(self, host_jobs):
    274         """Find and verify hosts for a list of jobs.
    275 
    276         @param host_jobs: A list of queue entries that either require hosts,
    277             or require host assignment validation through the rdb.
    278         @return: A generator of tuples of the form (host, queue_entry) for each
    279             valid host-queue_entry assignment.
    280         """
    281         hosts = self.acquire_hosts(host_jobs)
    282         for host, job in zip(hosts, host_jobs):
    283             if host:
    284                 yield self.host_assignment(host, job)
    285 
    286 
    287     def tick(self):
    288         """Schedule core host management activities."""
    289         self._release_hosts()
    290 
    291 
    292 class HostScheduler(BaseHostScheduler):
    293     """A scheduler capable managing host acquisition for new jobs."""
    294 
    295 
    296     def __init__(self):
    297         super(HostScheduler, self).__init__()
    298         self.job_query_manager = query_managers.AFEJobQueryManager()
    299         # Keeping track on how many hosts each suite is holding
    300         # {suite_job_id: num_hosts}
    301         self._suite_recorder = SuiteRecorder(self.job_query_manager)
    302 
    303 
    304     def _record_host_assignment(self, host, queue_entry):
    305         """Record that |host| is assigned to |queue_entry|.
    306 
    307         Record:
    308             1. How long it takes to assign a host to a job in metadata db.
    309             2. Record host assignment of a suite.
    310 
    311         @param host: A Host object.
    312         @param queue_entry: A HostQueueEntry object.
    313         """
    314         secs_in_queued = (datetime.datetime.now() -
    315                           queue_entry.job.created_on).total_seconds()
    316         job_overhead.record_state_duration(
    317                 queue_entry.job_id, host.hostname,
    318                 job_overhead.STATUS.QUEUED, secs_in_queued)
    319         self._suite_recorder.record_assignment(queue_entry)
    320 
    321 
    322     @metrics.SecondsTimerDecorator(
    323             '%s/schedule_jobs_duration' % _METRICS_PREFIX)
    324     def _schedule_jobs(self):
    325         """Schedule new jobs against hosts."""
    326 
    327         new_jobs_with_hosts = 0
    328         queue_entries = self.job_query_manager.get_pending_queue_entries(
    329                 only_hostless=False)
    330         unverified_host_jobs = [job for job in queue_entries
    331                                 if not job.is_hostless()]
    332         if unverified_host_jobs:
    333             for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
    334                 self.schedule_host_job(acquisition.host, acquisition.job)
    335                 self._record_host_assignment(acquisition.host, acquisition.job)
    336                 new_jobs_with_hosts += 1
    337             metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX
    338                             ).increment_by(new_jobs_with_hosts)
    339 
    340         num_jobs_without_hosts = (len(unverified_host_jobs) -
    341                                   new_jobs_with_hosts)
    342         metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX
    343                       ).set(num_jobs_without_hosts)
    344 
    345         metrics.Counter('%s/tick' % _METRICS_PREFIX).increment()
    346 
    347     @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX)
    348     def _lease_hosts_of_frontend_tasks(self):
    349         """Lease hosts of tasks scheduled through the frontend."""
    350         # We really don't need to get all the special tasks here, just the ones
    351         # without hqes, but reusing the method used by the scheduler ensures
    352         # we prioritize the same way.
    353         lease_hostnames = [
    354                 task.host.hostname for task in
    355                 self.job_query_manager.get_prioritized_special_tasks(
    356                     only_tasks_with_leased_hosts=False)
    357                 if task.queue_entry_id is None and not task.host.leased]
    358         # Leasing a leased hosts here shouldn't be a problem:
    359         # 1. The only way a host can be leased is if it's been assigned to
    360         #    an active hqe or another similar frontend task, but doing so will
    361         #    have already precluded it from the list of tasks returned by the
    362         #    job_query_manager.
    363         # 2. The unleasing is done based on global conditions. Eg: Even if a
    364         #    task has already leased a host and we lease it again, the
    365         #    host scheduler won't release the host till both tasks are complete.
    366         if lease_hostnames:
    367             self.host_query_manager.set_leased(
    368                     True, hostname__in=lease_hostnames)
    369 
    370 
    371     def acquire_hosts(self, host_jobs):
    372         """Override acquire_hosts.
    373 
    374         This method overrides the method in parent class.
    375         It figures out a set of suites that |host_jobs| belong to;
    376         and get min_duts requirement for each suite.
    377         It pipes min_duts for each suite to rdb.
    378 
    379         """
    380         parent_job_ids = set([q.job.parent_job_id
    381                               for q in host_jobs if q.job.parent_job_id])
    382         suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
    383         return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
    384 
    385 
    386     @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX)
    387     def tick(self):
    388         logging.info('Calling new tick.')
    389         logging.info('Leasing hosts for frontend tasks.')
    390         self._lease_hosts_of_frontend_tasks()
    391         logging.info('Finding hosts for new jobs.')
    392         self._schedule_jobs()
    393         logging.info('Releasing unused hosts.')
    394         released_hosts = self._release_hosts()
    395         logging.info('Updating suite assignment with released hosts')
    396         self._suite_recorder.record_release(released_hosts)
    397         logging.info('Calling email_manager.')
    398         email_manager.manager.send_queued_emails()
    399 
    400 
    401 class DummyHostScheduler(BaseHostScheduler):
    402     """A dummy host scheduler that doesn't acquire or release hosts."""
    403 
    404     def __init__(self):
    405         pass
    406 
    407 
    408     def tick(self):
    409         pass
    410 
    411 
    412 def handle_signal(signum, frame):
    413     """Sigint handler so we don't crash mid-tick."""
    414     global _shutdown
    415     _shutdown = True
    416     logging.info("Shutdown request received.")
    417 
    418 
    419 def initialize(testing=False):
    420     """Initialize the host scheduler."""
    421     if testing:
    422         # Don't import testing utilities unless we're in testing mode,
    423         # as the database imports have side effects.
    424         from autotest_lib.scheduler import rdb_testing_utils
    425         rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
    426                 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
    427     global _db_manager
    428     _db_manager = scheduler_lib.ConnectionManager()
    429     scheduler_lib.setup_logging(
    430             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    431             None, timestamped_logfile_prefix='host_scheduler')
    432     logging.info("Setting signal handler")
    433     signal.signal(signal.SIGINT, handle_signal)
    434     signal.signal(signal.SIGTERM, handle_signal)
    435     scheduler_models.initialize()
    436 
    437 
    438 def parse_arguments(argv):
    439     """
    440     Parse command line arguments
    441 
    442     @param argv: argument list to parse
    443     @returns:    parsed arguments.
    444     """
    445     parser = argparse.ArgumentParser(description='Host scheduler.')
    446     parser.add_argument('--testing', action='store_true', default=False,
    447                         help='Start the host scheduler in testing mode.')
    448     parser.add_argument('--production',
    449                         help=('Indicate that scheduler is running in production'
    450                               ' environment and it can use database that is not'
    451                               ' hosted in localhost. If it is set to False, '
    452                               'scheduler will fail if database is not in '
    453                               'localhost.'),
    454                         action='store_true', default=False)
    455     options = parser.parse_args(argv)
    456 
    457     return options
    458 
    459 
    460 def main():
    461     if _monitor_db_host_acquisition:
    462         logging.info('Please set inline_host_acquisition=False in the shadow '
    463                      'config before starting the host scheduler.')
    464         # The upstart job for the host scheduler understands exit(0) to mean
    465         # 'don't respawn'. This is desirable when the job scheduler is acquiring
    466         # hosts inline.
    467         sys.exit(0)
    468     try:
    469         options = parse_arguments(sys.argv[1:])
    470         scheduler_lib.check_production_settings(options)
    471 
    472         # If server database is enabled, check if the server has role
    473         # `host_scheduler`. If the server does not have host_scheduler role,
    474         # exception will be raised and host scheduler will not continue to run.
    475         if server_manager_utils.use_server_db():
    476             server_manager_utils.confirm_server_has_role(hostname='localhost',
    477                                                          role='host_scheduler')
    478 
    479         initialize(options.testing)
    480 
    481         # Start the thread to report metadata.
    482         metadata_reporter.start()
    483 
    484         ts_mon_config.SetupTsMonGlobalState('autotest_host_scheduler')
    485 
    486         host_scheduler = HostScheduler()
    487         minimum_tick_sec = global_config.global_config.get_config_value(
    488                 'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float)
    489         while not _shutdown:
    490             start = time.time()
    491             host_scheduler.tick()
    492             curr_tick_sec = time.time() - start
    493             if (minimum_tick_sec > curr_tick_sec):
    494                 time.sleep(minimum_tick_sec - curr_tick_sec)
    495             else:
    496                 time.sleep(0.0001)
    497     except server_manager_utils.ServerActionError:
    498         # This error is expected when the server is not in primary status
    499         # for host-scheduler role. Thus do not send email for it.
    500         raise
    501     except Exception:
    502         metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment()
    503         raise
    504     finally:
    505         email_manager.manager.send_queued_emails()
    506         if _db_manager:
    507             _db_manager.disconnect()
    508         metadata_reporter.abort()
    509 
    510 
    511 if __name__ == '__main__':
    512     main()
    513