Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 """Host scheduler.
      9 
     10 If run as a standalone service, the host scheduler ensures the following:
     11     1. Hosts will not be assigned to multiple hqes simultaneously. The process
     12        of assignment in this case refers to the modification of the host_id
     13        column of a row in the afe_host_queue_entries table, to reflect the host
     14        id of a leased host matching the dependencies of the job.
     15     2. Hosts that are not being used by active hqes or incomplete special tasks
     16        will be released back to the available hosts pool, for acquisition by
     17        subsequent hqes.
     18 In addition to these guarantees, the host scheduler also confirms that no 2
     19 active hqes/special tasks are assigned the same host, and sets the leased bit
     20 for hosts needed by frontend special tasks. The need for the latter is only
     21 apparent when viewed in the context of the job-scheduler (monitor_db), which
     22 runs special tasks only after their hosts have been leased.
     23 
     24 ** Suport minimum duts requirement for suites (non-inline mode) **
     25 
     26 Each suite can specify the minimum number of duts it requires by
     27 dropping a 'suite_min_duts' job keyval which defaults to 0.
     28 
     29 When suites are competing for duts, if any suite has not got minimum duts
     30 it requires, the host scheduler will try to meet the requirement first,
     31 even if other suite may have higher priority or earlier timestamp. Once
     32 all suites' minimum duts requirement have been fullfilled, the host
     33 scheduler will allocate the rest of duts based on job priority and suite job id.
     34 This is to prevent low priority suites from starving when sharing pool with
     35 high-priority suites.
     36 
     37 Note:
     38     1. Prevent potential starvation:
     39        We need to carefully choose |suite_min_duts| for both low and high
     40        priority suites. If a high priority suite didn't specify it but a low
     41        priority one does, the high priority suite can be starved!
     42     2. Restart requirement:
     43        Restart host scheduler if you manually released a host by setting
     44        leased=0 in db. This is needed because host scheduler maintains internal
     45        state of host assignment for suites.
     46     3. Exchanging duts triggers provisioning:
     47        TODO(fdeng): There is a chance two suites can exchange duts,
     48        if the two suites are for different builds, the exchange
     49        will trigger provisioning. This can be optimized by preferring getting
     50        hosts with the same build.
     51 """
     52 
     53 import argparse
     54 import collections
     55 import datetime
     56 import logging
     57 import os
     58 import signal
     59 import sys
     60 import time
     61 
     62 import common
     63 from autotest_lib.frontend import setup_django_environment
     64 
     65 from autotest_lib.client.common_lib import global_config
     66 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     67 from autotest_lib.scheduler import email_manager
     68 from autotest_lib.scheduler import query_managers
     69 from autotest_lib.scheduler import rdb_lib
     70 from autotest_lib.scheduler import rdb_utils
     71 from autotest_lib.scheduler import scheduler_lib
     72 from autotest_lib.scheduler import scheduler_models
     73 from autotest_lib.site_utils import job_overhead
     74 from autotest_lib.site_utils import metadata_reporter
     75 from autotest_lib.site_utils import server_manager_utils
     76 
     77 _db_manager = None
     78 _shutdown = False
     79 _tick_pause_sec = global_config.global_config.get_config_value(
     80         'SCHEDULER', 'tick_pause_sec', type=int, default=5)
     81 _monitor_db_host_acquisition = global_config.global_config.get_config_value(
     82         'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
     83 
     84 
     85 class SuiteRecorder(object):
     86     """Recording the host assignment for suites.
     87 
     88     The recorder holds two things:
     89         * suite_host_num, records how many duts a suite is holding,
     90           which is a map <suite_job_id -> num_of_hosts>
     91         * hosts_to_suites, records which host is assigned to which
     92           suite, it is a map <host_id -> suite_job_id>
     93     The two datastructure got updated when a host is assigned to or released
     94     by a job.
     95 
     96     The reason to maintain hosts_to_suites is that, when a host is released,
     97     we need to know which suite it was leased to. Querying the db for the
     98     latest completed job that has run on a host is slow.  Therefore, we go with
     99     an alternative: keeping a <host id, suite job id> map
    100     in memory (for 10K hosts, the map should take less than 1M memory on
    101     64-bit machine with python 2.7)
    102 
    103     """
    104 
    105 
    106     _timer = autotest_stats.Timer('suite_recorder')
    107 
    108 
    109     def __init__(self, job_query_manager):
    110         """Initialize.
    111 
    112         @param job_queue_manager: A JobQueueryManager object.
    113         """
    114         self.job_query_manager = job_query_manager
    115         self.suite_host_num, self.hosts_to_suites = (
    116                 self.job_query_manager.get_suite_host_assignment())
    117 
    118 
    119     def record_assignment(self, queue_entry):
    120         """Record that the hqe has got a host.
    121 
    122         @param queue_entry: A scheduler_models.HostQueueEntry object which has
    123                             got a host.
    124         """
    125         parent_id = queue_entry.job.parent_job_id
    126         if not parent_id:
    127             return
    128         if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
    129             logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
    130                           'seems already recorded', queue_entry.id,
    131                           parent_id, queue_entry.host.hostname)
    132             return
    133         num_hosts = self.suite_host_num.get(parent_id, 0)
    134         self.suite_host_num[parent_id] = num_hosts + 1
    135         self.hosts_to_suites[queue_entry.host_id] = parent_id
    136         logging.debug('Suite %d got host %s, currently holding %d hosts',
    137                       parent_id, queue_entry.host.hostname,
    138                       self.suite_host_num[parent_id])
    139 
    140 
    141     def record_release(self, hosts):
    142         """Update the record with host releasing event.
    143 
    144         @param hosts: A list of scheduler_models.Host objects.
    145         """
    146         for host in hosts:
    147             if host.id in self.hosts_to_suites:
    148                 parent_job_id = self.hosts_to_suites.pop(host.id)
    149                 count = self.suite_host_num[parent_job_id] - 1
    150                 if count == 0:
    151                     del self.suite_host_num[parent_job_id]
    152                 else:
    153                     self.suite_host_num[parent_job_id] = count
    154                 logging.debug(
    155                         'Suite %d releases host %s, currently holding %d hosts',
    156                         parent_job_id, host.hostname, count)
    157 
    158 
    159     def get_min_duts(self, suite_job_ids):
    160         """Figure out min duts to request.
    161 
    162         Given a set ids of suite jobs, figure out minimum duts to request for
    163         each suite. It is determined by two factors: min_duts specified
    164         for each suite in its job keyvals, and how many duts a suite is
    165         currently holding.
    166 
    167         @param suite_job_ids: A set of suite job ids.
    168 
    169         @returns: A dictionary, the key is suite_job_id, the value
    170                   is the minimum number of duts to request.
    171         """
    172         suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
    173                 suite_job_ids)
    174         for parent_id in suite_job_ids:
    175             min_duts = suite_min_duts.get(parent_id, 0)
    176             cur_duts = self.suite_host_num.get(parent_id, 0)
    177             suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
    178         logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
    179                       suite_min_duts)
    180         return suite_min_duts
    181 
    182 
    183 class BaseHostScheduler(object):
    184     """Base class containing host acquisition logic.
    185 
    186     This class contains all the core host acquisition logic needed by the
    187     scheduler to run jobs on hosts. It is only capable of releasing hosts
    188     back to the rdb through its tick, any other action must be instigated by
    189     the job scheduler.
    190     """
    191 
    192 
    193     _timer = autotest_stats.Timer('base_host_scheduler')
    194     host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
    195 
    196 
    197     def __init__(self):
    198         self.host_query_manager = query_managers.AFEHostQueryManager()
    199 
    200 
    201     @_timer.decorate
    202     def _release_hosts(self):
    203         """Release hosts to the RDB.
    204 
    205         Release all hosts that are ready and are currently not being used by an
    206         active hqe, and don't have a new special task scheduled against them.
    207 
    208         @return a list of hosts that are released.
    209         """
    210         release_hosts = self.host_query_manager.find_unused_healty_hosts()
    211         release_hostnames = [host.hostname for host in release_hosts]
    212         if release_hostnames:
    213             self.host_query_manager.set_leased(
    214                     False, hostname__in=release_hostnames)
    215         return release_hosts
    216 
    217 
    218     @classmethod
    219     def schedule_host_job(cls, host, queue_entry):
    220         """Schedule a job on a host.
    221 
    222         Scheduling a job involves:
    223             1. Setting the active bit on the queue_entry.
    224             2. Scheduling a special task on behalf of the queue_entry.
    225         Performing these actions will lead the job scheduler through a chain of
    226         events, culminating in running the test and collecting results from
    227         the host.
    228 
    229         @param host: The host against which to schedule the job.
    230         @param queue_entry: The queue_entry to schedule.
    231         """
    232         if queue_entry.host_id is None:
    233             queue_entry.set_host(host)
    234         elif host.id != queue_entry.host_id:
    235                 raise rdb_utils.RDBException('The rdb returned host: %s '
    236                         'but the job:%s was already assigned a host: %s. ' %
    237                         (host.hostname, queue_entry.job_id,
    238                          queue_entry.host.hostname))
    239         queue_entry.update_field('active', True)
    240 
    241         # TODO: crbug.com/373936. The host scheduler should only be assigning
    242         # jobs to hosts, but the criterion we use to release hosts depends
    243         # on it not being used by an active hqe. Since we're activating the
    244         # hqe here, we also need to schedule its first prejob task. OTOH,
    245         # we could converge to having the host scheduler manager all special
    246         # tasks, since their only use today is to verify/cleanup/reset a host.
    247         logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
    248         queue_entry.schedule_pre_job_tasks()
    249 
    250 
    251     def acquire_hosts(self, host_jobs):
    252         """Accquire hosts for given jobs.
    253 
    254         This method sends jobs that need hosts to rdb.
    255         Child class can override this method to pipe more args
    256         to rdb.
    257 
    258         @param host_jobs: A list of queue entries that either require hosts,
    259             or require host assignment validation through the rdb.
    260 
    261         @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
    262                        for each host acquired on behalf of a queue_entry,
    263                        or None if a host wasn't found.
    264         """
    265         return rdb_lib.acquire_hosts(host_jobs)
    266 
    267 
    268     def find_hosts_for_jobs(self, host_jobs):
    269         """Find and verify hosts for a list of jobs.
    270 
    271         @param host_jobs: A list of queue entries that either require hosts,
    272             or require host assignment validation through the rdb.
    273         @return: A list of tuples of the form (host, queue_entry) for each
    274             valid host-queue_entry assignment.
    275         """
    276         jobs_with_hosts = []
    277         hosts = self.acquire_hosts(host_jobs)
    278         for host, job in zip(hosts, host_jobs):
    279             if host:
    280                 jobs_with_hosts.append(self.host_assignment(host, job))
    281         return jobs_with_hosts
    282 
    283 
    284     @_timer.decorate
    285     def tick(self):
    286         """Schedule core host management activities."""
    287         self._release_hosts()
    288 
    289 
    290 class HostScheduler(BaseHostScheduler):
    291     """A scheduler capable managing host acquisition for new jobs."""
    292 
    293     _timer = autotest_stats.Timer('host_scheduler')
    294 
    295 
    296     def __init__(self):
    297         super(HostScheduler, self).__init__()
    298         self.job_query_manager = query_managers.AFEJobQueryManager()
    299         # Keeping track on how many hosts each suite is holding
    300         # {suite_job_id: num_hosts}
    301         self._suite_recorder = SuiteRecorder(self.job_query_manager)
    302 
    303 
    304     def _record_host_assignment(self, host, queue_entry):
    305         """Record that |host| is assigned to |queue_entry|.
    306 
    307         Record:
    308             1. How long it takes to assign a host to a job in metadata db.
    309             2. Record host assignment of a suite.
    310 
    311         @param host: A Host object.
    312         @param queue_entry: A HostQueueEntry object.
    313         """
    314         secs_in_queued = (datetime.datetime.now() -
    315                           queue_entry.job.created_on).total_seconds()
    316         job_overhead.record_state_duration(
    317                 queue_entry.job_id, host.hostname,
    318                 job_overhead.STATUS.QUEUED, secs_in_queued)
    319         self._suite_recorder.record_assignment(queue_entry)
    320 
    321 
    322     @_timer.decorate
    323     def _schedule_jobs(self):
    324         """Schedule new jobs against hosts."""
    325 
    326         key = 'host_scheduler.jobs_per_tick'
    327         new_jobs_with_hosts = 0
    328         queue_entries = self.job_query_manager.get_pending_queue_entries(
    329                 only_hostless=False)
    330         unverified_host_jobs = [job for job in queue_entries
    331                                 if not job.is_hostless()]
    332         if not unverified_host_jobs:
    333             return
    334         for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
    335             self.schedule_host_job(acquisition.host, acquisition.job)
    336             self._record_host_assignment(acquisition.host, acquisition.job)
    337             new_jobs_with_hosts += 1
    338         autotest_stats.Gauge(key).send('new_jobs_with_hosts',
    339                                        new_jobs_with_hosts)
    340         autotest_stats.Gauge(key).send('new_jobs_without_hosts',
    341                                        len(unverified_host_jobs) -
    342                                        new_jobs_with_hosts)
    343 
    344 
    345     @_timer.decorate
    346     def _lease_hosts_of_frontend_tasks(self):
    347         """Lease hosts of tasks scheduled through the frontend."""
    348         # We really don't need to get all the special tasks here, just the ones
    349         # without hqes, but reusing the method used by the scheduler ensures
    350         # we prioritize the same way.
    351         lease_hostnames = [
    352                 task.host.hostname for task in
    353                 self.job_query_manager.get_prioritized_special_tasks(
    354                     only_tasks_with_leased_hosts=False)
    355                 if task.queue_entry_id is None and not task.host.leased]
    356         # Leasing a leased hosts here shouldn't be a problem:
    357         # 1. The only way a host can be leased is if it's been assigned to
    358         #    an active hqe or another similar frontend task, but doing so will
    359         #    have already precluded it from the list of tasks returned by the
    360         #    job_query_manager.
    361         # 2. The unleasing is done based on global conditions. Eg: Even if a
    362         #    task has already leased a host and we lease it again, the
    363         #    host scheduler won't release the host till both tasks are complete.
    364         if lease_hostnames:
    365             self.host_query_manager.set_leased(
    366                     True, hostname__in=lease_hostnames)
    367 
    368 
    369     def acquire_hosts(self, host_jobs):
    370         """Override acquire_hosts.
    371 
    372         This method overrides the method in parent class.
    373         It figures out a set of suites that |host_jobs| belong to;
    374         and get min_duts requirement for each suite.
    375         It pipes min_duts for each suite to rdb.
    376 
    377         """
    378         parent_job_ids = set([q.job.parent_job_id
    379                               for q in host_jobs if q.job.parent_job_id])
    380         suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
    381         return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
    382 
    383 
    384     @_timer.decorate
    385     def tick(self):
    386         logging.info('Calling new tick.')
    387         logging.info('Leasing hosts for frontend tasks.')
    388         self._lease_hosts_of_frontend_tasks()
    389         logging.info('Finding hosts for new jobs.')
    390         self._schedule_jobs()
    391         logging.info('Releasing unused hosts.')
    392         released_hosts = self._release_hosts()
    393         logging.info('Updating suite assignment with released hosts')
    394         self._suite_recorder.record_release(released_hosts)
    395         logging.info('Calling email_manager.')
    396         email_manager.manager.send_queued_emails()
    397 
    398 
    399 class DummyHostScheduler(BaseHostScheduler):
    400     """A dummy host scheduler that doesn't acquire or release hosts."""
    401 
    402     def __init__(self):
    403         pass
    404 
    405 
    406     def tick(self):
    407         pass
    408 
    409 
    410 def handle_signal(signum, frame):
    411     """Sigint handler so we don't crash mid-tick."""
    412     global _shutdown
    413     _shutdown = True
    414     logging.info("Shutdown request received.")
    415 
    416 
    417 def initialize(testing=False):
    418     """Initialize the host scheduler."""
    419     if testing:
    420         # Don't import testing utilities unless we're in testing mode,
    421         # as the database imports have side effects.
    422         from autotest_lib.scheduler import rdb_testing_utils
    423         rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
    424                 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
    425     global _db_manager
    426     _db_manager = scheduler_lib.ConnectionManager()
    427     scheduler_lib.setup_logging(
    428             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    429             None, timestamped_logfile_prefix='host_scheduler')
    430     logging.info("Setting signal handler")
    431     signal.signal(signal.SIGINT, handle_signal)
    432     signal.signal(signal.SIGTERM, handle_signal)
    433     scheduler_models.initialize()
    434 
    435 
    436 def parse_arguments(argv):
    437     """
    438     Parse command line arguments
    439 
    440     @param argv: argument list to parse
    441     @returns:    parsed arguments.
    442     """
    443     parser = argparse.ArgumentParser(description='Host scheduler.')
    444     parser.add_argument('--testing', action='store_true', default=False,
    445                         help='Start the host scheduler in testing mode.')
    446     parser.add_argument('--production',
    447                         help=('Indicate that scheduler is running in production'
    448                               ' environment and it can use database that is not'
    449                               ' hosted in localhost. If it is set to False, '
    450                               'scheduler will fail if database is not in '
    451                               'localhost.'),
    452                         action='store_true', default=False)
    453     options = parser.parse_args(argv)
    454 
    455     return options
    456 
    457 
    458 def main():
    459     if _monitor_db_host_acquisition:
    460         logging.info('Please set inline_host_acquisition=False in the shadow '
    461                      'config before starting the host scheduler.')
    462         # The upstart job for the host scheduler understands exit(0) to mean
    463         # 'don't respawn'. This is desirable when the job scheduler is acquiring
    464         # hosts inline.
    465         sys.exit(0)
    466     try:
    467         options = parse_arguments(sys.argv[1:])
    468         scheduler_lib.check_production_settings(options)
    469 
    470         # If server database is enabled, check if the server has role
    471         # `host_scheduler`. If the server does not have host_scheduler role,
    472         # exception will be raised and host scheduler will not continue to run.
    473         if server_manager_utils.use_server_db():
    474             server_manager_utils.confirm_server_has_role(hostname='localhost',
    475                                                          role='host_scheduler')
    476 
    477         initialize(options.testing)
    478 
    479         # Start the thread to report metadata.
    480         metadata_reporter.start()
    481 
    482         host_scheduler = HostScheduler()
    483         minimum_tick_sec = global_config.global_config.get_config_value(
    484                 'SCHEDULER', 'minimum_tick_sec', type=float)
    485         while not _shutdown:
    486             start = time.time()
    487             host_scheduler.tick()
    488             curr_tick_sec = time.time() - start
    489             if (minimum_tick_sec > curr_tick_sec):
    490                 time.sleep(minimum_tick_sec - curr_tick_sec)
    491             else:
    492                 time.sleep(0.0001)
    493     except Exception:
    494         email_manager.manager.log_stacktrace(
    495                 'Uncaught exception; terminating host_scheduler.')
    496         raise
    497     finally:
    498         email_manager.manager.send_queued_emails()
    499         if _db_manager:
    500             _db_manager.disconnect()
    501         metadata_reporter.abort()
    502 
    503 
    504 if __name__ == '__main__':
    505     main()
    506