Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 """Host scheduler.
      9 
     10 If run as a standalone service, the host scheduler ensures the following:
     11     1. Hosts will not be assigned to multiple hqes simultaneously. The process
     12        of assignment in this case refers to the modification of the host_id
     13        column of a row in the afe_host_queue_entries table, to reflect the host
     14        id of a leased host matching the dependencies of the job.
     15     2. Hosts that are not being used by active hqes or incomplete special tasks
     16        will be released back to the available hosts pool, for acquisition by
     17        subsequent hqes.
     18 In addition to these guarantees, the host scheduler also confirms that no 2
     19 active hqes/special tasks are assigned the same host, and sets the leased bit
     20 for hosts needed by frontend special tasks. The need for the latter is only
     21 apparent when viewed in the context of the job-scheduler (monitor_db), which
     22 runs special tasks only after their hosts have been leased.
     23 
     24 ** Suport minimum duts requirement for suites (non-inline mode) **
     25 
     26 Each suite can specify the minimum number of duts it requires by
     27 dropping a 'suite_min_duts' job keyval which defaults to 0.
     28 
     29 When suites are competing for duts, if any suite has not got minimum duts
     30 it requires, the host scheduler will try to meet the requirement first,
     31 even if other suite may have higher priority or earlier timestamp. Once
     32 all suites' minimum duts requirement have been fullfilled, the host
     33 scheduler will allocate the rest of duts based on job priority and suite job id.
     34 This is to prevent low priority suites from starving when sharing pool with
     35 high-priority suites.
     36 
     37 Note:
     38     1. Prevent potential starvation:
     39        We need to carefully choose |suite_min_duts| for both low and high
     40        priority suites. If a high priority suite didn't specify it but a low
     41        priority one does, the high priority suite can be starved!
     42     2. Restart requirement:
     43        Restart host scheduler if you manually released a host by setting
     44        leased=0 in db. This is needed because host scheduler maintains internal
     45        state of host assignment for suites.
     46     3. Exchanging duts triggers provisioning:
     47        TODO(fdeng): There is a chance two suites can exchange duts,
     48        if the two suites are for different builds, the exchange
     49        will trigger provisioning. This can be optimized by preferring getting
     50        hosts with the same build.
     51 """
     52 
     53 import argparse
     54 import collections
     55 import datetime
     56 import logging
     57 import os
     58 import signal
     59 import sys
     60 import time
     61 
     62 import common
     63 from autotest_lib.client.common_lib import utils
     64 from autotest_lib.frontend import setup_django_environment
     65 
     66 # This import needs to come earlier to avoid using autotest's version of
     67 # httplib2, which is out of date.
     68 try:
     69     from chromite.lib import metrics
     70     from chromite.lib import ts_mon_config
     71 except ImportError:
     72     metrics = utils.metrics_mock
     73     ts_mon_config = utils.metrics_mock
     74 
     75 from autotest_lib.client.common_lib import global_config
     76 from autotest_lib.scheduler import email_manager
     77 from autotest_lib.scheduler import query_managers
     78 from autotest_lib.scheduler import rdb_lib
     79 from autotest_lib.scheduler import rdb_utils
     80 from autotest_lib.scheduler import scheduler_lib
     81 from autotest_lib.scheduler import scheduler_models
     82 from autotest_lib.site_utils import server_manager_utils
     83 
     84 
     85 _db_manager = None
     86 _shutdown = False
     87 _tick_pause_sec = global_config.global_config.get_config_value(
     88         'SCHEDULER', 'tick_pause_sec', type=int, default=5)
     89 _monitor_db_host_acquisition = global_config.global_config.get_config_value(
     90         'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
     91 _METRICS_PREFIX = 'chromeos/autotest/host_scheduler'
     92 
     93 class SuiteRecorder(object):
     94     """Recording the host assignment for suites.
     95 
     96     The recorder holds two things:
     97         * suite_host_num, records how many duts a suite is holding,
     98           which is a map <suite_job_id -> num_of_hosts>
     99         * hosts_to_suites, records which host is assigned to which
    100           suite, it is a map <host_id -> suite_job_id>
    101     The two datastructure got updated when a host is assigned to or released
    102     by a job.
    103 
    104     The reason to maintain hosts_to_suites is that, when a host is released,
    105     we need to know which suite it was leased to. Querying the db for the
    106     latest completed job that has run on a host is slow.  Therefore, we go with
    107     an alternative: keeping a <host id, suite job id> map
    108     in memory (for 10K hosts, the map should take less than 1M memory on
    109     64-bit machine with python 2.7)
    110 
    111     """
    112 
    113 
    114     def __init__(self, job_query_manager):
    115         """Initialize.
    116 
    117         @param job_queue_manager: A JobQueueryManager object.
    118         """
    119         self.job_query_manager = job_query_manager
    120         self.suite_host_num, self.hosts_to_suites = (
    121                 self.job_query_manager.get_suite_host_assignment())
    122 
    123 
    124     def record_assignment(self, queue_entry):
    125         """Record that the hqe has got a host.
    126 
    127         @param queue_entry: A scheduler_models.HostQueueEntry object which has
    128                             got a host.
    129         """
    130         parent_id = queue_entry.job.parent_job_id
    131         if not parent_id:
    132             return
    133         if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
    134             logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
    135                           'seems already recorded', queue_entry.id,
    136                           parent_id, queue_entry.host.hostname)
    137             return
    138         num_hosts = self.suite_host_num.get(parent_id, 0)
    139         self.suite_host_num[parent_id] = num_hosts + 1
    140         self.hosts_to_suites[queue_entry.host_id] = parent_id
    141         logging.debug('Suite %d got host %s, currently holding %d hosts',
    142                       parent_id, queue_entry.host.hostname,
    143                       self.suite_host_num[parent_id])
    144 
    145 
    146     def record_release(self, hosts):
    147         """Update the record with host releasing event.
    148 
    149         @param hosts: A list of scheduler_models.Host objects.
    150         """
    151         for host in hosts:
    152             if host.id in self.hosts_to_suites:
    153                 parent_job_id = self.hosts_to_suites.pop(host.id)
    154                 count = self.suite_host_num[parent_job_id] - 1
    155                 if count == 0:
    156                     del self.suite_host_num[parent_job_id]
    157                 else:
    158                     self.suite_host_num[parent_job_id] = count
    159                 logging.debug(
    160                         'Suite %d releases host %s, currently holding %d hosts',
    161                         parent_job_id, host.hostname, count)
    162 
    163 
    164     def get_min_duts(self, suite_job_ids):
    165         """Figure out min duts to request.
    166 
    167         Given a set ids of suite jobs, figure out minimum duts to request for
    168         each suite. It is determined by two factors: min_duts specified
    169         for each suite in its job keyvals, and how many duts a suite is
    170         currently holding.
    171 
    172         @param suite_job_ids: A set of suite job ids.
    173 
    174         @returns: A dictionary, the key is suite_job_id, the value
    175                   is the minimum number of duts to request.
    176         """
    177         suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
    178                 suite_job_ids)
    179         for parent_id in suite_job_ids:
    180             min_duts = suite_min_duts.get(parent_id, 0)
    181             cur_duts = self.suite_host_num.get(parent_id, 0)
    182             suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
    183         logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
    184                       suite_min_duts)
    185         return suite_min_duts
    186 
    187 
    188 class BaseHostScheduler(object):
    189     """Base class containing host acquisition logic.
    190 
    191     This class contains all the core host acquisition logic needed by the
    192     scheduler to run jobs on hosts. It is only capable of releasing hosts
    193     back to the rdb through its tick, any other action must be instigated by
    194     the job scheduler.
    195     """
    196 
    197 
    198     host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
    199 
    200 
    201     def __init__(self):
    202         self.host_query_manager = query_managers.AFEHostQueryManager()
    203 
    204 
    205     def _release_hosts(self):
    206         """Release hosts to the RDB.
    207 
    208         Release all hosts that are ready and are currently not being used by an
    209         active hqe, and don't have a new special task scheduled against them.
    210 
    211         @return a list of hosts that are released.
    212         """
    213         release_hosts = self.host_query_manager.find_unused_healty_hosts()
    214         release_hostnames = [host.hostname for host in release_hosts]
    215         if release_hostnames:
    216             self.host_query_manager.set_leased(
    217                     False, hostname__in=release_hostnames)
    218         return release_hosts
    219 
    220 
    221     @classmethod
    222     def schedule_host_job(cls, host, queue_entry):
    223         """Schedule a job on a host.
    224 
    225         Scheduling a job involves:
    226             1. Setting the active bit on the queue_entry.
    227             2. Scheduling a special task on behalf of the queue_entry.
    228         Performing these actions will lead the job scheduler through a chain of
    229         events, culminating in running the test and collecting results from
    230         the host.
    231 
    232         @param host: The host against which to schedule the job.
    233         @param queue_entry: The queue_entry to schedule.
    234         """
    235         if queue_entry.host_id is None:
    236             queue_entry.set_host(host)
    237         elif host.id != queue_entry.host_id:
    238                 raise rdb_utils.RDBException('The rdb returned host: %s '
    239                         'but the job:%s was already assigned a host: %s. ' %
    240                         (host.hostname, queue_entry.job_id,
    241                          queue_entry.host.hostname))
    242         queue_entry.update_field('active', True)
    243 
    244         # TODO: crbug.com/373936. The host scheduler should only be assigning
    245         # jobs to hosts, but the criterion we use to release hosts depends
    246         # on it not being used by an active hqe. Since we're activating the
    247         # hqe here, we also need to schedule its first prejob task. OTOH,
    248         # we could converge to having the host scheduler manager all special
    249         # tasks, since their only use today is to verify/cleanup/reset a host.
    250         logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
    251         queue_entry.schedule_pre_job_tasks()
    252 
    253 
    254     def acquire_hosts(self, host_jobs):
    255         """Accquire hosts for given jobs.
    256 
    257         This method sends jobs that need hosts to rdb.
    258         Child class can override this method to pipe more args
    259         to rdb.
    260 
    261         @param host_jobs: A list of queue entries that either require hosts,
    262             or require host assignment validation through the rdb.
    263 
    264         @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
    265                        for each host acquired on behalf of a queue_entry,
    266                        or None if a host wasn't found.
    267         """
    268         return rdb_lib.acquire_hosts(host_jobs)
    269 
    270 
    271     def find_hosts_for_jobs(self, host_jobs):
    272         """Find and verify hosts for a list of jobs.
    273 
    274         @param host_jobs: A list of queue entries that either require hosts,
    275             or require host assignment validation through the rdb.
    276         @return: A generator of tuples of the form (host, queue_entry) for each
    277             valid host-queue_entry assignment.
    278         """
    279         hosts = self.acquire_hosts(host_jobs)
    280         for host, job in zip(hosts, host_jobs):
    281             if host:
    282                 yield self.host_assignment(host, job)
    283 
    284 
    285     def tick(self):
    286         """Schedule core host management activities."""
    287         self._release_hosts()
    288 
    289 
    290 class HostScheduler(BaseHostScheduler):
    291     """A scheduler capable managing host acquisition for new jobs."""
    292 
    293 
    294     def __init__(self):
    295         super(HostScheduler, self).__init__()
    296         self.job_query_manager = query_managers.AFEJobQueryManager()
    297         # Keeping track on how many hosts each suite is holding
    298         # {suite_job_id: num_hosts}
    299         self._suite_recorder = SuiteRecorder(self.job_query_manager)
    300 
    301 
    302     def _record_host_assignment(self, host, queue_entry):
    303         """Record that |host| is assigned to |queue_entry|.
    304 
    305         Record:
    306             1. How long it takes to assign a host to a job in metadata db.
    307             2. Record host assignment of a suite.
    308 
    309         @param host: A Host object.
    310         @param queue_entry: A HostQueueEntry object.
    311         """
    312         secs_in_queued = (datetime.datetime.now() -
    313                           queue_entry.job.created_on).total_seconds()
    314         self._suite_recorder.record_assignment(queue_entry)
    315 
    316 
    317     @metrics.SecondsTimerDecorator(
    318             '%s/schedule_jobs_duration' % _METRICS_PREFIX)
    319     def _schedule_jobs(self):
    320         """Schedule new jobs against hosts."""
    321 
    322         new_jobs_with_hosts = 0
    323         queue_entries = self.job_query_manager.get_pending_queue_entries(
    324                 only_hostless=False)
    325         unverified_host_jobs = [job for job in queue_entries
    326                                 if not job.is_hostless()]
    327         if unverified_host_jobs:
    328             for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
    329                 self.schedule_host_job(acquisition.host, acquisition.job)
    330                 self._record_host_assignment(acquisition.host, acquisition.job)
    331                 new_jobs_with_hosts += 1
    332             metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX
    333                             ).increment_by(new_jobs_with_hosts)
    334 
    335         num_jobs_without_hosts = (len(unverified_host_jobs) -
    336                                   new_jobs_with_hosts)
    337         metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX
    338                       ).set(num_jobs_without_hosts)
    339 
    340         metrics.Counter('%s/tick' % _METRICS_PREFIX).increment()
    341 
    342     @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX)
    343     def _lease_hosts_of_frontend_tasks(self):
    344         """Lease hosts of tasks scheduled through the frontend."""
    345         # We really don't need to get all the special tasks here, just the ones
    346         # without hqes, but reusing the method used by the scheduler ensures
    347         # we prioritize the same way.
    348         lease_hostnames = [
    349                 task.host.hostname for task in
    350                 self.job_query_manager.get_prioritized_special_tasks(
    351                     only_tasks_with_leased_hosts=False)
    352                 if task.queue_entry_id is None and not task.host.leased]
    353         # Leasing a leased hosts here shouldn't be a problem:
    354         # 1. The only way a host can be leased is if it's been assigned to
    355         #    an active hqe or another similar frontend task, but doing so will
    356         #    have already precluded it from the list of tasks returned by the
    357         #    job_query_manager.
    358         # 2. The unleasing is done based on global conditions. Eg: Even if a
    359         #    task has already leased a host and we lease it again, the
    360         #    host scheduler won't release the host till both tasks are complete.
    361         if lease_hostnames:
    362             self.host_query_manager.set_leased(
    363                     True, hostname__in=lease_hostnames)
    364 
    365 
    366     def acquire_hosts(self, host_jobs):
    367         """Override acquire_hosts.
    368 
    369         This method overrides the method in parent class.
    370         It figures out a set of suites that |host_jobs| belong to;
    371         and get min_duts requirement for each suite.
    372         It pipes min_duts for each suite to rdb.
    373 
    374         """
    375         parent_job_ids = set([q.job.parent_job_id
    376                               for q in host_jobs if q.job.parent_job_id])
    377         suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
    378         return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
    379 
    380 
    381     @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX)
    382     def tick(self):
    383         logging.info('Calling new tick.')
    384         logging.info('Leasing hosts for frontend tasks.')
    385         self._lease_hosts_of_frontend_tasks()
    386         logging.info('Finding hosts for new jobs.')
    387         self._schedule_jobs()
    388         logging.info('Releasing unused hosts.')
    389         released_hosts = self._release_hosts()
    390         logging.info('Updating suite assignment with released hosts')
    391         self._suite_recorder.record_release(released_hosts)
    392         logging.info('Calling email_manager.')
    393         email_manager.manager.send_queued_emails()
    394 
    395 
    396 class DummyHostScheduler(BaseHostScheduler):
    397     """A dummy host scheduler that doesn't acquire or release hosts."""
    398 
    399     def __init__(self):
    400         pass
    401 
    402 
    403     def tick(self):
    404         pass
    405 
    406 
    407 def handle_signal(signum, frame):
    408     """Sigint handler so we don't crash mid-tick."""
    409     global _shutdown
    410     _shutdown = True
    411     logging.info("Shutdown request received.")
    412 
    413 
    414 def initialize(testing=False):
    415     """Initialize the host scheduler."""
    416     if testing:
    417         # Don't import testing utilities unless we're in testing mode,
    418         # as the database imports have side effects.
    419         from autotest_lib.scheduler import rdb_testing_utils
    420         rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
    421                 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
    422     global _db_manager
    423     _db_manager = scheduler_lib.ConnectionManager()
    424     scheduler_lib.setup_logging(
    425             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    426             None, timestamped_logfile_prefix='host_scheduler')
    427     logging.info("Setting signal handler")
    428     signal.signal(signal.SIGINT, handle_signal)
    429     signal.signal(signal.SIGTERM, handle_signal)
    430     scheduler_models.initialize()
    431 
    432 
    433 def parse_arguments(argv):
    434     """
    435     Parse command line arguments
    436 
    437     @param argv: argument list to parse
    438     @returns:    parsed arguments.
    439     """
    440     parser = argparse.ArgumentParser(description='Host scheduler.')
    441     parser.add_argument('--testing', action='store_true', default=False,
    442                         help='Start the host scheduler in testing mode.')
    443     parser.add_argument('--production',
    444                         help=('Indicate that scheduler is running in production'
    445                               ' environment and it can use database that is not'
    446                               ' hosted in localhost. If it is set to False, '
    447                               'scheduler will fail if database is not in '
    448                               'localhost.'),
    449                         action='store_true', default=False)
    450     parser.add_argument(
    451             '--lifetime-hours',
    452             type=float,
    453             default=None,
    454             help='If provided, number of hours the scheduler should run for. '
    455                  'At the expiry of this time, the process will exit '
    456                  'gracefully.',
    457     )
    458     parser.add_argument(
    459             '--metrics-file',
    460             help='If provided, drop metrics to this local file instead of '
    461                  'reporting to ts_mon',
    462             type=str,
    463             default=None,
    464     )
    465     options = parser.parse_args(argv)
    466 
    467     return options
    468 
    469 
    470 def main():
    471     if _monitor_db_host_acquisition:
    472         logging.info('Please set inline_host_acquisition=False in the shadow '
    473                      'config before starting the host scheduler.')
    474         sys.exit(0)
    475     try:
    476         options = parse_arguments(sys.argv[1:])
    477         scheduler_lib.check_production_settings(options)
    478 
    479         # If server database is enabled, check if the server has role
    480         # `host_scheduler`. If the server does not have host_scheduler role,
    481         # exception will be raised and host scheduler will not continue to run.
    482         if server_manager_utils.use_server_db():
    483             server_manager_utils.confirm_server_has_role(hostname='localhost',
    484                                                          role='host_scheduler')
    485 
    486         initialize(options.testing)
    487 
    488         with ts_mon_config.SetupTsMonGlobalState(
    489                 'autotest_host_scheduler',
    490                 indirect=True,
    491                 debug_file=options.metrics_file,
    492         ):
    493             metrics.Counter('%s/start' % _METRICS_PREFIX).increment()
    494             process_start_time = time.time()
    495             host_scheduler = HostScheduler()
    496             minimum_tick_sec = global_config.global_config.get_config_value(
    497                     'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float)
    498             while not _shutdown:
    499                 if _lifetime_expired(options.lifetime_hours,
    500                                      process_start_time):
    501                     break
    502                 start = time.time()
    503                 host_scheduler.tick()
    504                 curr_tick_sec = time.time() - start
    505                 if (minimum_tick_sec > curr_tick_sec):
    506                     time.sleep(minimum_tick_sec - curr_tick_sec)
    507                 else:
    508                     time.sleep(0.0001)
    509             logging.info('Shutdown request recieved. Bye! Bye!')
    510     except server_manager_utils.ServerActionError:
    511         # This error is expected when the server is not in primary status
    512         # for host-scheduler role. Thus do not send email for it.
    513         raise
    514     except Exception:
    515         metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment()
    516         raise
    517     finally:
    518         email_manager.manager.send_queued_emails()
    519         if _db_manager:
    520             _db_manager.disconnect()
    521 
    522 
    523 def _lifetime_expired(lifetime_hours, process_start_time):
    524     """Returns True if we've expired the process lifetime, False otherwise.
    525 
    526     Also sets the global _shutdown so that any background processes also take
    527     the cue to exit.
    528     """
    529     if lifetime_hours is None:
    530         return False
    531     if time.time() - process_start_time > lifetime_hours * 3600:
    532         logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
    533                      lifetime_hours)
    534         global _shutdown
    535         _shutdown = True
    536         return True
    537     return False
    538 
    539 
    540 if __name__ == '__main__':
    541     main()
    542