Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 """Host scheduler.
      9 
     10 If run as a standalone service, the host scheduler ensures the following:
     11     1. Hosts will not be assigned to multiple hqes simultaneously. The process
     12        of assignment in this case refers to the modification of the host_id
     13        column of a row in the afe_host_queue_entries table, to reflect the host
     14        id of a leased host matching the dependencies of the job.
     15     2. Hosts that are not being used by active hqes or incomplete special tasks
     16        will be released back to the available hosts pool, for acquisition by
     17        subsequent hqes.
     18 In addition to these guarantees, the host scheduler also confirms that no 2
     19 active hqes/special tasks are assigned the same host, and sets the leased bit
     20 for hosts needed by frontend special tasks. The need for the latter is only
     21 apparent when viewed in the context of the job-scheduler (monitor_db), which
     22 runs special tasks only after their hosts have been leased.
     23 
     24 ** Suport minimum duts requirement for suites (non-inline mode) **
     25 
     26 Each suite can specify the minimum number of duts it requires by
     27 dropping a 'suite_min_duts' job keyval which defaults to 0.
     28 
     29 When suites are competing for duts, if any suite has not got minimum duts
     30 it requires, the host scheduler will try to meet the requirement first,
     31 even if other suite may have higher priority or earlier timestamp. Once
     32 all suites' minimum duts requirement have been fullfilled, the host
     33 scheduler will allocate the rest of duts based on job priority and suite job id.
     34 This is to prevent low priority suites from starving when sharing pool with
     35 high-priority suites.
     36 
     37 Note:
     38     1. Prevent potential starvation:
     39        We need to carefully choose |suite_min_duts| for both low and high
     40        priority suites. If a high priority suite didn't specify it but a low
     41        priority one does, the high priority suite can be starved!
     42     2. Restart requirement:
     43        Restart host scheduler if you manually released a host by setting
     44        leased=0 in db. This is needed because host scheduler maintains internal
     45        state of host assignment for suites.
     46     3. Exchanging duts triggers provisioning:
     47        TODO(fdeng): There is a chance two suites can exchange duts,
     48        if the two suites are for different builds, the exchange
     49        will trigger provisioning. This can be optimized by preferring getting
     50        hosts with the same build.
     51 """
     52 
     53 import argparse
     54 import collections
     55 import datetime
     56 import logging
     57 import os
     58 import signal
     59 import sys
     60 import time
     61 
     62 import common
     63 from autotest_lib.client.common_lib import utils
     64 from autotest_lib.frontend import setup_django_environment
     65 
     66 # This import needs to come earlier to avoid using autotest's version of
     67 # httplib2, which is out of date.
     68 try:
     69     from chromite.lib import metrics
     70     from chromite.lib import ts_mon_config
     71 except ImportError:
     72     metrics = utils.metrics_mock
     73     ts_mon_config = utils.metrics_mock
     74 
     75 from autotest_lib.client.common_lib import global_config
     76 from autotest_lib.scheduler import email_manager
     77 from autotest_lib.scheduler import query_managers
     78 from autotest_lib.scheduler import rdb_lib
     79 from autotest_lib.scheduler import rdb_utils
     80 from autotest_lib.scheduler import scheduler_lib
     81 from autotest_lib.scheduler import scheduler_models
     82 from autotest_lib.site_utils import job_overhead
     83 from autotest_lib.site_utils import server_manager_utils
     84 
     85 
     86 _db_manager = None
     87 _shutdown = False
     88 _tick_pause_sec = global_config.global_config.get_config_value(
     89         'SCHEDULER', 'tick_pause_sec', type=int, default=5)
     90 _monitor_db_host_acquisition = global_config.global_config.get_config_value(
     91         'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
     92 _METRICS_PREFIX = 'chromeos/autotest/host_scheduler'
     93 
     94 class SuiteRecorder(object):
     95     """Recording the host assignment for suites.
     96 
     97     The recorder holds two things:
     98         * suite_host_num, records how many duts a suite is holding,
     99           which is a map <suite_job_id -> num_of_hosts>
    100         * hosts_to_suites, records which host is assigned to which
    101           suite, it is a map <host_id -> suite_job_id>
    102     The two datastructure got updated when a host is assigned to or released
    103     by a job.
    104 
    105     The reason to maintain hosts_to_suites is that, when a host is released,
    106     we need to know which suite it was leased to. Querying the db for the
    107     latest completed job that has run on a host is slow.  Therefore, we go with
    108     an alternative: keeping a <host id, suite job id> map
    109     in memory (for 10K hosts, the map should take less than 1M memory on
    110     64-bit machine with python 2.7)
    111 
    112     """
    113 
    114 
    115     def __init__(self, job_query_manager):
    116         """Initialize.
    117 
    118         @param job_queue_manager: A JobQueueryManager object.
    119         """
    120         self.job_query_manager = job_query_manager
    121         self.suite_host_num, self.hosts_to_suites = (
    122                 self.job_query_manager.get_suite_host_assignment())
    123 
    124 
    125     def record_assignment(self, queue_entry):
    126         """Record that the hqe has got a host.
    127 
    128         @param queue_entry: A scheduler_models.HostQueueEntry object which has
    129                             got a host.
    130         """
    131         parent_id = queue_entry.job.parent_job_id
    132         if not parent_id:
    133             return
    134         if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
    135             logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
    136                           'seems already recorded', queue_entry.id,
    137                           parent_id, queue_entry.host.hostname)
    138             return
    139         num_hosts = self.suite_host_num.get(parent_id, 0)
    140         self.suite_host_num[parent_id] = num_hosts + 1
    141         self.hosts_to_suites[queue_entry.host_id] = parent_id
    142         logging.debug('Suite %d got host %s, currently holding %d hosts',
    143                       parent_id, queue_entry.host.hostname,
    144                       self.suite_host_num[parent_id])
    145 
    146 
    147     def record_release(self, hosts):
    148         """Update the record with host releasing event.
    149 
    150         @param hosts: A list of scheduler_models.Host objects.
    151         """
    152         for host in hosts:
    153             if host.id in self.hosts_to_suites:
    154                 parent_job_id = self.hosts_to_suites.pop(host.id)
    155                 count = self.suite_host_num[parent_job_id] - 1
    156                 if count == 0:
    157                     del self.suite_host_num[parent_job_id]
    158                 else:
    159                     self.suite_host_num[parent_job_id] = count
    160                 logging.debug(
    161                         'Suite %d releases host %s, currently holding %d hosts',
    162                         parent_job_id, host.hostname, count)
    163 
    164 
    165     def get_min_duts(self, suite_job_ids):
    166         """Figure out min duts to request.
    167 
    168         Given a set ids of suite jobs, figure out minimum duts to request for
    169         each suite. It is determined by two factors: min_duts specified
    170         for each suite in its job keyvals, and how many duts a suite is
    171         currently holding.
    172 
    173         @param suite_job_ids: A set of suite job ids.
    174 
    175         @returns: A dictionary, the key is suite_job_id, the value
    176                   is the minimum number of duts to request.
    177         """
    178         suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
    179                 suite_job_ids)
    180         for parent_id in suite_job_ids:
    181             min_duts = suite_min_duts.get(parent_id, 0)
    182             cur_duts = self.suite_host_num.get(parent_id, 0)
    183             suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
    184         logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
    185                       suite_min_duts)
    186         return suite_min_duts
    187 
    188 
    189 class BaseHostScheduler(object):
    190     """Base class containing host acquisition logic.
    191 
    192     This class contains all the core host acquisition logic needed by the
    193     scheduler to run jobs on hosts. It is only capable of releasing hosts
    194     back to the rdb through its tick, any other action must be instigated by
    195     the job scheduler.
    196     """
    197 
    198 
    199     host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
    200 
    201 
    202     def __init__(self):
    203         self.host_query_manager = query_managers.AFEHostQueryManager()
    204 
    205 
    206     def _release_hosts(self):
    207         """Release hosts to the RDB.
    208 
    209         Release all hosts that are ready and are currently not being used by an
    210         active hqe, and don't have a new special task scheduled against them.
    211 
    212         @return a list of hosts that are released.
    213         """
    214         release_hosts = self.host_query_manager.find_unused_healty_hosts()
    215         release_hostnames = [host.hostname for host in release_hosts]
    216         if release_hostnames:
    217             self.host_query_manager.set_leased(
    218                     False, hostname__in=release_hostnames)
    219         return release_hosts
    220 
    221 
    222     @classmethod
    223     def schedule_host_job(cls, host, queue_entry):
    224         """Schedule a job on a host.
    225 
    226         Scheduling a job involves:
    227             1. Setting the active bit on the queue_entry.
    228             2. Scheduling a special task on behalf of the queue_entry.
    229         Performing these actions will lead the job scheduler through a chain of
    230         events, culminating in running the test and collecting results from
    231         the host.
    232 
    233         @param host: The host against which to schedule the job.
    234         @param queue_entry: The queue_entry to schedule.
    235         """
    236         if queue_entry.host_id is None:
    237             queue_entry.set_host(host)
    238         elif host.id != queue_entry.host_id:
    239                 raise rdb_utils.RDBException('The rdb returned host: %s '
    240                         'but the job:%s was already assigned a host: %s. ' %
    241                         (host.hostname, queue_entry.job_id,
    242                          queue_entry.host.hostname))
    243         queue_entry.update_field('active', True)
    244 
    245         # TODO: crbug.com/373936. The host scheduler should only be assigning
    246         # jobs to hosts, but the criterion we use to release hosts depends
    247         # on it not being used by an active hqe. Since we're activating the
    248         # hqe here, we also need to schedule its first prejob task. OTOH,
    249         # we could converge to having the host scheduler manager all special
    250         # tasks, since their only use today is to verify/cleanup/reset a host.
    251         logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
    252         queue_entry.schedule_pre_job_tasks()
    253 
    254 
    255     def acquire_hosts(self, host_jobs):
    256         """Accquire hosts for given jobs.
    257 
    258         This method sends jobs that need hosts to rdb.
    259         Child class can override this method to pipe more args
    260         to rdb.
    261 
    262         @param host_jobs: A list of queue entries that either require hosts,
    263             or require host assignment validation through the rdb.
    264 
    265         @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
    266                        for each host acquired on behalf of a queue_entry,
    267                        or None if a host wasn't found.
    268         """
    269         return rdb_lib.acquire_hosts(host_jobs)
    270 
    271 
    272     def find_hosts_for_jobs(self, host_jobs):
    273         """Find and verify hosts for a list of jobs.
    274 
    275         @param host_jobs: A list of queue entries that either require hosts,
    276             or require host assignment validation through the rdb.
    277         @return: A generator of tuples of the form (host, queue_entry) for each
    278             valid host-queue_entry assignment.
    279         """
    280         hosts = self.acquire_hosts(host_jobs)
    281         for host, job in zip(hosts, host_jobs):
    282             if host:
    283                 yield self.host_assignment(host, job)
    284 
    285 
    286     def tick(self):
    287         """Schedule core host management activities."""
    288         self._release_hosts()
    289 
    290 
    291 class HostScheduler(BaseHostScheduler):
    292     """A scheduler capable managing host acquisition for new jobs."""
    293 
    294 
    295     def __init__(self):
    296         super(HostScheduler, self).__init__()
    297         self.job_query_manager = query_managers.AFEJobQueryManager()
    298         # Keeping track on how many hosts each suite is holding
    299         # {suite_job_id: num_hosts}
    300         self._suite_recorder = SuiteRecorder(self.job_query_manager)
    301 
    302 
    303     def _record_host_assignment(self, host, queue_entry):
    304         """Record that |host| is assigned to |queue_entry|.
    305 
    306         Record:
    307             1. How long it takes to assign a host to a job in metadata db.
    308             2. Record host assignment of a suite.
    309 
    310         @param host: A Host object.
    311         @param queue_entry: A HostQueueEntry object.
    312         """
    313         secs_in_queued = (datetime.datetime.now() -
    314                           queue_entry.job.created_on).total_seconds()
    315         job_overhead.record_state_duration(
    316                 queue_entry.job_id, host.hostname,
    317                 job_overhead.STATUS.QUEUED, secs_in_queued)
    318         self._suite_recorder.record_assignment(queue_entry)
    319 
    320 
    321     @metrics.SecondsTimerDecorator(
    322             '%s/schedule_jobs_duration' % _METRICS_PREFIX)
    323     def _schedule_jobs(self):
    324         """Schedule new jobs against hosts."""
    325 
    326         new_jobs_with_hosts = 0
    327         queue_entries = self.job_query_manager.get_pending_queue_entries(
    328                 only_hostless=False)
    329         unverified_host_jobs = [job for job in queue_entries
    330                                 if not job.is_hostless()]
    331         if unverified_host_jobs:
    332             for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
    333                 self.schedule_host_job(acquisition.host, acquisition.job)
    334                 self._record_host_assignment(acquisition.host, acquisition.job)
    335                 new_jobs_with_hosts += 1
    336             metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX
    337                             ).increment_by(new_jobs_with_hosts)
    338 
    339         num_jobs_without_hosts = (len(unverified_host_jobs) -
    340                                   new_jobs_with_hosts)
    341         metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX
    342                       ).set(num_jobs_without_hosts)
    343 
    344         metrics.Counter('%s/tick' % _METRICS_PREFIX).increment()
    345 
    346     @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX)
    347     def _lease_hosts_of_frontend_tasks(self):
    348         """Lease hosts of tasks scheduled through the frontend."""
    349         # We really don't need to get all the special tasks here, just the ones
    350         # without hqes, but reusing the method used by the scheduler ensures
    351         # we prioritize the same way.
    352         lease_hostnames = [
    353                 task.host.hostname for task in
    354                 self.job_query_manager.get_prioritized_special_tasks(
    355                     only_tasks_with_leased_hosts=False)
    356                 if task.queue_entry_id is None and not task.host.leased]
    357         # Leasing a leased hosts here shouldn't be a problem:
    358         # 1. The only way a host can be leased is if it's been assigned to
    359         #    an active hqe or another similar frontend task, but doing so will
    360         #    have already precluded it from the list of tasks returned by the
    361         #    job_query_manager.
    362         # 2. The unleasing is done based on global conditions. Eg: Even if a
    363         #    task has already leased a host and we lease it again, the
    364         #    host scheduler won't release the host till both tasks are complete.
    365         if lease_hostnames:
    366             self.host_query_manager.set_leased(
    367                     True, hostname__in=lease_hostnames)
    368 
    369 
    370     def acquire_hosts(self, host_jobs):
    371         """Override acquire_hosts.
    372 
    373         This method overrides the method in parent class.
    374         It figures out a set of suites that |host_jobs| belong to;
    375         and get min_duts requirement for each suite.
    376         It pipes min_duts for each suite to rdb.
    377 
    378         """
    379         parent_job_ids = set([q.job.parent_job_id
    380                               for q in host_jobs if q.job.parent_job_id])
    381         suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
    382         return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
    383 
    384 
    385     @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX)
    386     def tick(self):
    387         logging.info('Calling new tick.')
    388         logging.info('Leasing hosts for frontend tasks.')
    389         self._lease_hosts_of_frontend_tasks()
    390         logging.info('Finding hosts for new jobs.')
    391         self._schedule_jobs()
    392         logging.info('Releasing unused hosts.')
    393         released_hosts = self._release_hosts()
    394         logging.info('Updating suite assignment with released hosts')
    395         self._suite_recorder.record_release(released_hosts)
    396         logging.info('Calling email_manager.')
    397         email_manager.manager.send_queued_emails()
    398 
    399 
    400 class DummyHostScheduler(BaseHostScheduler):
    401     """A dummy host scheduler that doesn't acquire or release hosts."""
    402 
    403     def __init__(self):
    404         pass
    405 
    406 
    407     def tick(self):
    408         pass
    409 
    410 
    411 def handle_signal(signum, frame):
    412     """Sigint handler so we don't crash mid-tick."""
    413     global _shutdown
    414     _shutdown = True
    415     logging.info("Shutdown request received.")
    416 
    417 
    418 def initialize(testing=False):
    419     """Initialize the host scheduler."""
    420     if testing:
    421         # Don't import testing utilities unless we're in testing mode,
    422         # as the database imports have side effects.
    423         from autotest_lib.scheduler import rdb_testing_utils
    424         rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
    425                 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
    426     global _db_manager
    427     _db_manager = scheduler_lib.ConnectionManager()
    428     scheduler_lib.setup_logging(
    429             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    430             None, timestamped_logfile_prefix='host_scheduler')
    431     logging.info("Setting signal handler")
    432     signal.signal(signal.SIGINT, handle_signal)
    433     signal.signal(signal.SIGTERM, handle_signal)
    434     scheduler_models.initialize()
    435 
    436 
    437 def parse_arguments(argv):
    438     """
    439     Parse command line arguments
    440 
    441     @param argv: argument list to parse
    442     @returns:    parsed arguments.
    443     """
    444     parser = argparse.ArgumentParser(description='Host scheduler.')
    445     parser.add_argument('--testing', action='store_true', default=False,
    446                         help='Start the host scheduler in testing mode.')
    447     parser.add_argument('--production',
    448                         help=('Indicate that scheduler is running in production'
    449                               ' environment and it can use database that is not'
    450                               ' hosted in localhost. If it is set to False, '
    451                               'scheduler will fail if database is not in '
    452                               'localhost.'),
    453                         action='store_true', default=False)
    454     parser.add_argument(
    455             '--lifetime-hours',
    456             type=float,
    457             default=None,
    458             help='If provided, number of hours the scheduler should run for. '
    459                  'At the expiry of this time, the process will exit '
    460                  'gracefully.',
    461     )
    462     parser.add_argument(
    463             '--metrics-file',
    464             help='If provided, drop metrics to this local file instead of '
    465                  'reporting to ts_mon',
    466             type=str,
    467             default=None,
    468     )
    469     options = parser.parse_args(argv)
    470 
    471     return options
    472 
    473 
    474 def main():
    475     if _monitor_db_host_acquisition:
    476         logging.info('Please set inline_host_acquisition=False in the shadow '
    477                      'config before starting the host scheduler.')
    478         sys.exit(0)
    479     try:
    480         options = parse_arguments(sys.argv[1:])
    481         scheduler_lib.check_production_settings(options)
    482 
    483         # If server database is enabled, check if the server has role
    484         # `host_scheduler`. If the server does not have host_scheduler role,
    485         # exception will be raised and host scheduler will not continue to run.
    486         if server_manager_utils.use_server_db():
    487             server_manager_utils.confirm_server_has_role(hostname='localhost',
    488                                                          role='host_scheduler')
    489 
    490         initialize(options.testing)
    491 
    492         with ts_mon_config.SetupTsMonGlobalState(
    493                 'autotest_host_scheduler',
    494                 indirect=True,
    495                 debug_file=options.metrics_file,
    496         ):
    497             metrics.Counter('%s/start' % _METRICS_PREFIX).increment()
    498             process_start_time = time.time()
    499             host_scheduler = HostScheduler()
    500             minimum_tick_sec = global_config.global_config.get_config_value(
    501                     'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float)
    502             while not _shutdown:
    503                 if _lifetime_expired(options.lifetime_hours,
    504                                      process_start_time):
    505                     break
    506                 start = time.time()
    507                 host_scheduler.tick()
    508                 curr_tick_sec = time.time() - start
    509                 if (minimum_tick_sec > curr_tick_sec):
    510                     time.sleep(minimum_tick_sec - curr_tick_sec)
    511                 else:
    512                     time.sleep(0.0001)
    513             logging.info('Shutdown request recieved. Bye! Bye!')
    514     except server_manager_utils.ServerActionError:
    515         # This error is expected when the server is not in primary status
    516         # for host-scheduler role. Thus do not send email for it.
    517         raise
    518     except Exception:
    519         metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment()
    520         raise
    521     finally:
    522         email_manager.manager.send_queued_emails()
    523         if _db_manager:
    524             _db_manager.disconnect()
    525 
    526 
    527 def _lifetime_expired(lifetime_hours, process_start_time):
    528     """Returns True if we've expired the process lifetime, False otherwise.
    529 
    530     Also sets the global _shutdown so that any background processes also take
    531     the cue to exit.
    532     """
    533     if lifetime_hours is None:
    534         return False
    535     if time.time() - process_start_time > lifetime_hours * 3600:
    536         logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
    537                      lifetime_hours)
    538         global _shutdown
    539         _shutdown = True
    540         return True
    541     return False
    542 
    543 
    544 if __name__ == '__main__':
    545     main()
    546