1 #!/usr/bin/python 2 #pylint: disable-msg=C0111 3 4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5 # Use of this source code is governed by a BSD-style license that can be 6 # found in the LICENSE file. 7 8 """Host scheduler. 9 10 If run as a standalone service, the host scheduler ensures the following: 11 1. Hosts will not be assigned to multiple hqes simultaneously. The process 12 of assignment in this case refers to the modification of the host_id 13 column of a row in the afe_host_queue_entries table, to reflect the host 14 id of a leased host matching the dependencies of the job. 15 2. Hosts that are not being used by active hqes or incomplete special tasks 16 will be released back to the available hosts pool, for acquisition by 17 subsequent hqes. 18 In addition to these guarantees, the host scheduler also confirms that no 2 19 active hqes/special tasks are assigned the same host, and sets the leased bit 20 for hosts needed by frontend special tasks. The need for the latter is only 21 apparent when viewed in the context of the job-scheduler (monitor_db), which 22 runs special tasks only after their hosts have been leased. 23 24 ** Suport minimum duts requirement for suites (non-inline mode) ** 25 26 Each suite can specify the minimum number of duts it requires by 27 dropping a 'suite_min_duts' job keyval which defaults to 0. 28 29 When suites are competing for duts, if any suite has not got minimum duts 30 it requires, the host scheduler will try to meet the requirement first, 31 even if other suite may have higher priority or earlier timestamp. Once 32 all suites' minimum duts requirement have been fullfilled, the host 33 scheduler will allocate the rest of duts based on job priority and suite job id. 34 This is to prevent low priority suites from starving when sharing pool with 35 high-priority suites. 36 37 Note: 38 1. Prevent potential starvation: 39 We need to carefully choose |suite_min_duts| for both low and high 40 priority suites. If a high priority suite didn't specify it but a low 41 priority one does, the high priority suite can be starved! 42 2. Restart requirement: 43 Restart host scheduler if you manually released a host by setting 44 leased=0 in db. This is needed because host scheduler maintains internal 45 state of host assignment for suites. 46 3. Exchanging duts triggers provisioning: 47 TODO(fdeng): There is a chance two suites can exchange duts, 48 if the two suites are for different builds, the exchange 49 will trigger provisioning. This can be optimized by preferring getting 50 hosts with the same build. 51 """ 52 53 import argparse 54 import collections 55 import datetime 56 import logging 57 import os 58 import signal 59 import sys 60 import time 61 62 import common 63 from autotest_lib.frontend import setup_django_environment 64 65 # This import needs to come earlier to avoid using autotest's version of 66 # httplib2, which is out of date. 67 try: 68 from chromite.lib import metrics 69 from chromite.lib import ts_mon_config 70 except ImportError: 71 metrics = utils.metrics_mock 72 ts_mon_config = utils.metrics_mock 73 74 from autotest_lib.client.common_lib import global_config 75 from autotest_lib.client.common_lib import utils 76 from autotest_lib.scheduler import email_manager 77 from autotest_lib.scheduler import query_managers 78 from autotest_lib.scheduler import rdb_lib 79 from autotest_lib.scheduler import rdb_utils 80 from autotest_lib.scheduler import scheduler_lib 81 from autotest_lib.scheduler import scheduler_models 82 from autotest_lib.site_utils import job_overhead 83 from autotest_lib.site_utils import metadata_reporter 84 from autotest_lib.site_utils import server_manager_utils 85 86 87 _db_manager = None 88 _shutdown = False 89 _tick_pause_sec = global_config.global_config.get_config_value( 90 'SCHEDULER', 'tick_pause_sec', type=int, default=5) 91 _monitor_db_host_acquisition = global_config.global_config.get_config_value( 92 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True) 93 _METRICS_PREFIX = 'chromeos/autotest/host_scheduler' 94 95 class SuiteRecorder(object): 96 """Recording the host assignment for suites. 97 98 The recorder holds two things: 99 * suite_host_num, records how many duts a suite is holding, 100 which is a map <suite_job_id -> num_of_hosts> 101 * hosts_to_suites, records which host is assigned to which 102 suite, it is a map <host_id -> suite_job_id> 103 The two datastructure got updated when a host is assigned to or released 104 by a job. 105 106 The reason to maintain hosts_to_suites is that, when a host is released, 107 we need to know which suite it was leased to. Querying the db for the 108 latest completed job that has run on a host is slow. Therefore, we go with 109 an alternative: keeping a <host id, suite job id> map 110 in memory (for 10K hosts, the map should take less than 1M memory on 111 64-bit machine with python 2.7) 112 113 """ 114 115 116 def __init__(self, job_query_manager): 117 """Initialize. 118 119 @param job_queue_manager: A JobQueueryManager object. 120 """ 121 self.job_query_manager = job_query_manager 122 self.suite_host_num, self.hosts_to_suites = ( 123 self.job_query_manager.get_suite_host_assignment()) 124 125 126 def record_assignment(self, queue_entry): 127 """Record that the hqe has got a host. 128 129 @param queue_entry: A scheduler_models.HostQueueEntry object which has 130 got a host. 131 """ 132 parent_id = queue_entry.job.parent_job_id 133 if not parent_id: 134 return 135 if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id: 136 logging.error('HQE (id: %d, parent_job_id: %d, host: %s) ' 137 'seems already recorded', queue_entry.id, 138 parent_id, queue_entry.host.hostname) 139 return 140 num_hosts = self.suite_host_num.get(parent_id, 0) 141 self.suite_host_num[parent_id] = num_hosts + 1 142 self.hosts_to_suites[queue_entry.host_id] = parent_id 143 logging.debug('Suite %d got host %s, currently holding %d hosts', 144 parent_id, queue_entry.host.hostname, 145 self.suite_host_num[parent_id]) 146 147 148 def record_release(self, hosts): 149 """Update the record with host releasing event. 150 151 @param hosts: A list of scheduler_models.Host objects. 152 """ 153 for host in hosts: 154 if host.id in self.hosts_to_suites: 155 parent_job_id = self.hosts_to_suites.pop(host.id) 156 count = self.suite_host_num[parent_job_id] - 1 157 if count == 0: 158 del self.suite_host_num[parent_job_id] 159 else: 160 self.suite_host_num[parent_job_id] = count 161 logging.debug( 162 'Suite %d releases host %s, currently holding %d hosts', 163 parent_job_id, host.hostname, count) 164 165 166 def get_min_duts(self, suite_job_ids): 167 """Figure out min duts to request. 168 169 Given a set ids of suite jobs, figure out minimum duts to request for 170 each suite. It is determined by two factors: min_duts specified 171 for each suite in its job keyvals, and how many duts a suite is 172 currently holding. 173 174 @param suite_job_ids: A set of suite job ids. 175 176 @returns: A dictionary, the key is suite_job_id, the value 177 is the minimum number of duts to request. 178 """ 179 suite_min_duts = self.job_query_manager.get_min_duts_of_suites( 180 suite_job_ids) 181 for parent_id in suite_job_ids: 182 min_duts = suite_min_duts.get(parent_id, 0) 183 cur_duts = self.suite_host_num.get(parent_id, 0) 184 suite_min_duts[parent_id] = max(0, min_duts - cur_duts) 185 logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s', 186 suite_min_duts) 187 return suite_min_duts 188 189 190 class BaseHostScheduler(object): 191 """Base class containing host acquisition logic. 192 193 This class contains all the core host acquisition logic needed by the 194 scheduler to run jobs on hosts. It is only capable of releasing hosts 195 back to the rdb through its tick, any other action must be instigated by 196 the job scheduler. 197 """ 198 199 200 host_assignment = collections.namedtuple('host_assignment', ['host', 'job']) 201 202 203 def __init__(self): 204 self.host_query_manager = query_managers.AFEHostQueryManager() 205 206 207 def _release_hosts(self): 208 """Release hosts to the RDB. 209 210 Release all hosts that are ready and are currently not being used by an 211 active hqe, and don't have a new special task scheduled against them. 212 213 @return a list of hosts that are released. 214 """ 215 release_hosts = self.host_query_manager.find_unused_healty_hosts() 216 release_hostnames = [host.hostname for host in release_hosts] 217 if release_hostnames: 218 self.host_query_manager.set_leased( 219 False, hostname__in=release_hostnames) 220 return release_hosts 221 222 223 @classmethod 224 def schedule_host_job(cls, host, queue_entry): 225 """Schedule a job on a host. 226 227 Scheduling a job involves: 228 1. Setting the active bit on the queue_entry. 229 2. Scheduling a special task on behalf of the queue_entry. 230 Performing these actions will lead the job scheduler through a chain of 231 events, culminating in running the test and collecting results from 232 the host. 233 234 @param host: The host against which to schedule the job. 235 @param queue_entry: The queue_entry to schedule. 236 """ 237 if queue_entry.host_id is None: 238 queue_entry.set_host(host) 239 elif host.id != queue_entry.host_id: 240 raise rdb_utils.RDBException('The rdb returned host: %s ' 241 'but the job:%s was already assigned a host: %s. ' % 242 (host.hostname, queue_entry.job_id, 243 queue_entry.host.hostname)) 244 queue_entry.update_field('active', True) 245 246 # TODO: crbug.com/373936. The host scheduler should only be assigning 247 # jobs to hosts, but the criterion we use to release hosts depends 248 # on it not being used by an active hqe. Since we're activating the 249 # hqe here, we also need to schedule its first prejob task. OTOH, 250 # we could converge to having the host scheduler manager all special 251 # tasks, since their only use today is to verify/cleanup/reset a host. 252 logging.info('Scheduling pre job tasks for entry: %s', queue_entry) 253 queue_entry.schedule_pre_job_tasks() 254 255 256 def acquire_hosts(self, host_jobs): 257 """Accquire hosts for given jobs. 258 259 This method sends jobs that need hosts to rdb. 260 Child class can override this method to pipe more args 261 to rdb. 262 263 @param host_jobs: A list of queue entries that either require hosts, 264 or require host assignment validation through the rdb. 265 266 @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper 267 for each host acquired on behalf of a queue_entry, 268 or None if a host wasn't found. 269 """ 270 return rdb_lib.acquire_hosts(host_jobs) 271 272 273 def find_hosts_for_jobs(self, host_jobs): 274 """Find and verify hosts for a list of jobs. 275 276 @param host_jobs: A list of queue entries that either require hosts, 277 or require host assignment validation through the rdb. 278 @return: A generator of tuples of the form (host, queue_entry) for each 279 valid host-queue_entry assignment. 280 """ 281 hosts = self.acquire_hosts(host_jobs) 282 for host, job in zip(hosts, host_jobs): 283 if host: 284 yield self.host_assignment(host, job) 285 286 287 def tick(self): 288 """Schedule core host management activities.""" 289 self._release_hosts() 290 291 292 class HostScheduler(BaseHostScheduler): 293 """A scheduler capable managing host acquisition for new jobs.""" 294 295 296 def __init__(self): 297 super(HostScheduler, self).__init__() 298 self.job_query_manager = query_managers.AFEJobQueryManager() 299 # Keeping track on how many hosts each suite is holding 300 # {suite_job_id: num_hosts} 301 self._suite_recorder = SuiteRecorder(self.job_query_manager) 302 303 304 def _record_host_assignment(self, host, queue_entry): 305 """Record that |host| is assigned to |queue_entry|. 306 307 Record: 308 1. How long it takes to assign a host to a job in metadata db. 309 2. Record host assignment of a suite. 310 311 @param host: A Host object. 312 @param queue_entry: A HostQueueEntry object. 313 """ 314 secs_in_queued = (datetime.datetime.now() - 315 queue_entry.job.created_on).total_seconds() 316 job_overhead.record_state_duration( 317 queue_entry.job_id, host.hostname, 318 job_overhead.STATUS.QUEUED, secs_in_queued) 319 self._suite_recorder.record_assignment(queue_entry) 320 321 322 @metrics.SecondsTimerDecorator( 323 '%s/schedule_jobs_duration' % _METRICS_PREFIX) 324 def _schedule_jobs(self): 325 """Schedule new jobs against hosts.""" 326 327 new_jobs_with_hosts = 0 328 queue_entries = self.job_query_manager.get_pending_queue_entries( 329 only_hostless=False) 330 unverified_host_jobs = [job for job in queue_entries 331 if not job.is_hostless()] 332 if unverified_host_jobs: 333 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs): 334 self.schedule_host_job(acquisition.host, acquisition.job) 335 self._record_host_assignment(acquisition.host, acquisition.job) 336 new_jobs_with_hosts += 1 337 metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX 338 ).increment_by(new_jobs_with_hosts) 339 340 num_jobs_without_hosts = (len(unverified_host_jobs) - 341 new_jobs_with_hosts) 342 metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX 343 ).set(num_jobs_without_hosts) 344 345 metrics.Counter('%s/tick' % _METRICS_PREFIX).increment() 346 347 @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX) 348 def _lease_hosts_of_frontend_tasks(self): 349 """Lease hosts of tasks scheduled through the frontend.""" 350 # We really don't need to get all the special tasks here, just the ones 351 # without hqes, but reusing the method used by the scheduler ensures 352 # we prioritize the same way. 353 lease_hostnames = [ 354 task.host.hostname for task in 355 self.job_query_manager.get_prioritized_special_tasks( 356 only_tasks_with_leased_hosts=False) 357 if task.queue_entry_id is None and not task.host.leased] 358 # Leasing a leased hosts here shouldn't be a problem: 359 # 1. The only way a host can be leased is if it's been assigned to 360 # an active hqe or another similar frontend task, but doing so will 361 # have already precluded it from the list of tasks returned by the 362 # job_query_manager. 363 # 2. The unleasing is done based on global conditions. Eg: Even if a 364 # task has already leased a host and we lease it again, the 365 # host scheduler won't release the host till both tasks are complete. 366 if lease_hostnames: 367 self.host_query_manager.set_leased( 368 True, hostname__in=lease_hostnames) 369 370 371 def acquire_hosts(self, host_jobs): 372 """Override acquire_hosts. 373 374 This method overrides the method in parent class. 375 It figures out a set of suites that |host_jobs| belong to; 376 and get min_duts requirement for each suite. 377 It pipes min_duts for each suite to rdb. 378 379 """ 380 parent_job_ids = set([q.job.parent_job_id 381 for q in host_jobs if q.job.parent_job_id]) 382 suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids) 383 return rdb_lib.acquire_hosts(host_jobs, suite_min_duts) 384 385 386 @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX) 387 def tick(self): 388 logging.info('Calling new tick.') 389 logging.info('Leasing hosts for frontend tasks.') 390 self._lease_hosts_of_frontend_tasks() 391 logging.info('Finding hosts for new jobs.') 392 self._schedule_jobs() 393 logging.info('Releasing unused hosts.') 394 released_hosts = self._release_hosts() 395 logging.info('Updating suite assignment with released hosts') 396 self._suite_recorder.record_release(released_hosts) 397 logging.info('Calling email_manager.') 398 email_manager.manager.send_queued_emails() 399 400 401 class DummyHostScheduler(BaseHostScheduler): 402 """A dummy host scheduler that doesn't acquire or release hosts.""" 403 404 def __init__(self): 405 pass 406 407 408 def tick(self): 409 pass 410 411 412 def handle_signal(signum, frame): 413 """Sigint handler so we don't crash mid-tick.""" 414 global _shutdown 415 _shutdown = True 416 logging.info("Shutdown request received.") 417 418 419 def initialize(testing=False): 420 """Initialize the host scheduler.""" 421 if testing: 422 # Don't import testing utilities unless we're in testing mode, 423 # as the database imports have side effects. 424 from autotest_lib.scheduler import rdb_testing_utils 425 rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing( 426 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE) 427 global _db_manager 428 _db_manager = scheduler_lib.ConnectionManager() 429 scheduler_lib.setup_logging( 430 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 431 None, timestamped_logfile_prefix='host_scheduler') 432 logging.info("Setting signal handler") 433 signal.signal(signal.SIGINT, handle_signal) 434 signal.signal(signal.SIGTERM, handle_signal) 435 scheduler_models.initialize() 436 437 438 def parse_arguments(argv): 439 """ 440 Parse command line arguments 441 442 @param argv: argument list to parse 443 @returns: parsed arguments. 444 """ 445 parser = argparse.ArgumentParser(description='Host scheduler.') 446 parser.add_argument('--testing', action='store_true', default=False, 447 help='Start the host scheduler in testing mode.') 448 parser.add_argument('--production', 449 help=('Indicate that scheduler is running in production' 450 ' environment and it can use database that is not' 451 ' hosted in localhost. If it is set to False, ' 452 'scheduler will fail if database is not in ' 453 'localhost.'), 454 action='store_true', default=False) 455 options = parser.parse_args(argv) 456 457 return options 458 459 460 def main(): 461 if _monitor_db_host_acquisition: 462 logging.info('Please set inline_host_acquisition=False in the shadow ' 463 'config before starting the host scheduler.') 464 # The upstart job for the host scheduler understands exit(0) to mean 465 # 'don't respawn'. This is desirable when the job scheduler is acquiring 466 # hosts inline. 467 sys.exit(0) 468 try: 469 options = parse_arguments(sys.argv[1:]) 470 scheduler_lib.check_production_settings(options) 471 472 # If server database is enabled, check if the server has role 473 # `host_scheduler`. If the server does not have host_scheduler role, 474 # exception will be raised and host scheduler will not continue to run. 475 if server_manager_utils.use_server_db(): 476 server_manager_utils.confirm_server_has_role(hostname='localhost', 477 role='host_scheduler') 478 479 initialize(options.testing) 480 481 # Start the thread to report metadata. 482 metadata_reporter.start() 483 484 ts_mon_config.SetupTsMonGlobalState('autotest_host_scheduler') 485 486 host_scheduler = HostScheduler() 487 minimum_tick_sec = global_config.global_config.get_config_value( 488 'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float) 489 while not _shutdown: 490 start = time.time() 491 host_scheduler.tick() 492 curr_tick_sec = time.time() - start 493 if (minimum_tick_sec > curr_tick_sec): 494 time.sleep(minimum_tick_sec - curr_tick_sec) 495 else: 496 time.sleep(0.0001) 497 except server_manager_utils.ServerActionError: 498 # This error is expected when the server is not in primary status 499 # for host-scheduler role. Thus do not send email for it. 500 raise 501 except Exception: 502 metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment() 503 raise 504 finally: 505 email_manager.manager.send_queued_emails() 506 if _db_manager: 507 _db_manager.disconnect() 508 metadata_reporter.abort() 509 510 511 if __name__ == '__main__': 512 main() 513