1 #!/usr/bin/python 2 3 #pylint: disable=C0111 4 5 """ 6 Autotest scheduler 7 """ 8 9 import datetime 10 import functools 11 import gc 12 import logging 13 import optparse 14 import os 15 import signal 16 import sys 17 import time 18 19 import common 20 from autotest_lib.frontend import setup_django_environment 21 22 import django.db 23 24 from autotest_lib.client.common_lib import control_data 25 from autotest_lib.client.common_lib import global_config 26 from autotest_lib.client.common_lib import utils 27 from autotest_lib.frontend.afe import models 28 from autotest_lib.scheduler import agent_task, drone_manager 29 from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler 30 from autotest_lib.scheduler import luciferlib 31 from autotest_lib.scheduler import monitor_db_cleanup, prejob_task 32 from autotest_lib.scheduler import postjob_task 33 from autotest_lib.scheduler import query_managers 34 from autotest_lib.scheduler import scheduler_lib 35 from autotest_lib.scheduler import scheduler_models 36 from autotest_lib.scheduler import scheduler_config 37 from autotest_lib.server import autoserv_utils 38 from autotest_lib.server import system_utils 39 from autotest_lib.server import utils as server_utils 40 from autotest_lib.site_utils import server_manager_utils 41 42 try: 43 from chromite.lib import metrics 44 from chromite.lib import ts_mon_config 45 except ImportError: 46 metrics = utils.metrics_mock 47 ts_mon_config = utils.metrics_mock 48 49 50 PID_FILE_PREFIX = 'monitor_db' 51 52 RESULTS_DIR = '.' 53 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') 54 55 if os.environ.has_key('AUTOTEST_DIR'): 56 AUTOTEST_PATH = os.environ['AUTOTEST_DIR'] 57 AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server') 58 AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko') 59 60 if AUTOTEST_SERVER_DIR not in sys.path: 61 sys.path.insert(0, AUTOTEST_SERVER_DIR) 62 63 # error message to leave in results dir when an autoserv process disappears 64 # mysteriously 65 _LOST_PROCESS_ERROR = """\ 66 Autoserv failed abnormally during execution for this job, probably due to a 67 system error on the Autotest server. Full results may not be available. Sorry. 68 """ 69 70 _db_manager = None 71 _db = None 72 _shutdown = False 73 74 # These 2 globals are replaced for testing 75 _autoserv_directory = autoserv_utils.autoserv_directory 76 _autoserv_path = autoserv_utils.autoserv_path 77 _testing_mode = False 78 _drone_manager = None 79 80 81 def _verify_default_drone_set_exists(): 82 if (models.DroneSet.drone_sets_enabled() and 83 not models.DroneSet.default_drone_set_name()): 84 raise scheduler_lib.SchedulerError( 85 'Drone sets are enabled, but no default is set') 86 87 88 def _sanity_check(): 89 """Make sure the configs are consistent before starting the scheduler""" 90 _verify_default_drone_set_exists() 91 92 93 def main(): 94 try: 95 try: 96 main_without_exception_handling() 97 except SystemExit: 98 raise 99 except: 100 logging.exception('Exception escaping in monitor_db') 101 raise 102 finally: 103 utils.delete_pid_file_if_exists(PID_FILE_PREFIX) 104 105 106 def main_without_exception_handling(): 107 scheduler_lib.setup_logging( 108 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 109 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)) 110 usage = 'usage: %prog [options] results_dir' 111 parser = optparse.OptionParser(usage) 112 parser.add_option('--recover-hosts', help='Try to recover dead hosts', 113 action='store_true') 114 parser.add_option('--test', help='Indicate that scheduler is under ' + 115 'test and should use dummy autoserv and no parsing', 116 action='store_true') 117 parser.add_option( 118 '--metrics-file', 119 help='If provided, drop metrics to this local file instead of ' 120 'reporting to ts_mon', 121 type=str, 122 default=None, 123 ) 124 parser.add_option( 125 '--lifetime-hours', 126 type=float, 127 default=None, 128 help='If provided, number of hours the scheduler should run for. ' 129 'At the expiry of this time, the process will exit ' 130 'gracefully.', 131 ) 132 parser.add_option('--production', 133 help=('Indicate that scheduler is running in production ' 134 'environment and it can use database that is not ' 135 'hosted in localhost. If it is set to False, ' 136 'scheduler will fail if database is not in ' 137 'localhost.'), 138 action='store_true', default=False) 139 (options, args) = parser.parse_args() 140 if len(args) != 1: 141 parser.print_usage() 142 return 143 144 scheduler_lib.check_production_settings(options) 145 146 scheduler_enabled = global_config.global_config.get_config_value( 147 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool) 148 149 if not scheduler_enabled: 150 logging.error("Scheduler not enabled, set enable_scheduler to true in " 151 "the global_config's SCHEDULER section to enable it. " 152 "Exiting.") 153 sys.exit(1) 154 155 global RESULTS_DIR 156 RESULTS_DIR = args[0] 157 158 # Change the cwd while running to avoid issues incase we were launched from 159 # somewhere odd (such as a random NFS home directory of the person running 160 # sudo to launch us as the appropriate user). 161 os.chdir(RESULTS_DIR) 162 163 # This is helpful for debugging why stuff a scheduler launches is 164 # misbehaving. 165 logging.info('os.environ: %s', os.environ) 166 167 if options.test: 168 global _autoserv_path 169 _autoserv_path = 'autoserv_dummy' 170 global _testing_mode 171 _testing_mode = True 172 173 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler', 174 indirect=True, 175 debug_file=options.metrics_file): 176 try: 177 metrics.Counter('chromeos/autotest/scheduler/start').increment() 178 process_start_time = time.time() 179 initialize() 180 dispatcher = Dispatcher() 181 dispatcher.initialize(recover_hosts=options.recover_hosts) 182 minimum_tick_sec = global_config.global_config.get_config_value( 183 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float) 184 185 while not _shutdown: 186 if _lifetime_expired(options.lifetime_hours, process_start_time): 187 break 188 189 start = time.time() 190 dispatcher.tick() 191 curr_tick_sec = time.time() - start 192 if minimum_tick_sec > curr_tick_sec: 193 time.sleep(minimum_tick_sec - curr_tick_sec) 194 else: 195 time.sleep(0.0001) 196 except server_manager_utils.ServerActionError as e: 197 # This error is expected when the server is not in primary status 198 # for scheduler role. Thus do not send email for it. 199 logging.exception(e) 200 except Exception: 201 logging.exception('Uncaught exception, terminating monitor_db.') 202 metrics.Counter('chromeos/autotest/scheduler/uncaught_exception' 203 ).increment() 204 205 email_manager.manager.send_queued_emails() 206 _drone_manager.shutdown() 207 _db_manager.disconnect() 208 209 210 def handle_signal(signum, frame): 211 global _shutdown 212 _shutdown = True 213 logging.info("Shutdown request received.") 214 215 216 def _lifetime_expired(lifetime_hours, process_start_time): 217 """Returns True if we've expired the process lifetime, False otherwise. 218 219 Also sets the global _shutdown so that any background processes also take 220 the cue to exit. 221 """ 222 if lifetime_hours is None: 223 return False 224 if time.time() - process_start_time > lifetime_hours * 3600: 225 logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', 226 lifetime_hours) 227 global _shutdown 228 _shutdown = True 229 return True 230 return False 231 232 233 def initialize(): 234 logging.info("%s> dispatcher starting", time.strftime("%X %x")) 235 logging.info("My PID is %d", os.getpid()) 236 237 if utils.program_is_alive(PID_FILE_PREFIX): 238 logging.critical("monitor_db already running, aborting!") 239 sys.exit(1) 240 utils.write_pid(PID_FILE_PREFIX) 241 242 if _testing_mode: 243 global_config.global_config.override_config_value( 244 scheduler_lib.DB_CONFIG_SECTION, 'database', 245 'stresstest_autotest_web') 246 247 # If server database is enabled, check if the server has role `scheduler`. 248 # If the server does not have scheduler role, exception will be raised and 249 # scheduler will not continue to run. 250 if server_manager_utils.use_server_db(): 251 server_manager_utils.confirm_server_has_role(hostname='localhost', 252 role='scheduler') 253 254 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH'] 255 global _db_manager 256 _db_manager = scheduler_lib.ConnectionManager() 257 global _db 258 _db = _db_manager.get_connection() 259 logging.info("Setting signal handler") 260 signal.signal(signal.SIGINT, handle_signal) 261 signal.signal(signal.SIGTERM, handle_signal) 262 263 initialize_globals() 264 scheduler_models.initialize() 265 266 drone_list = system_utils.get_drones() 267 results_host = global_config.global_config.get_config_value( 268 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost') 269 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host) 270 271 logging.info("Connected! Running...") 272 273 274 def initialize_globals(): 275 global _drone_manager 276 _drone_manager = drone_manager.instance() 277 278 279 def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None, 280 verbose=True): 281 """ 282 @returns The autoserv command line as a list of executable + parameters. 283 284 @param machines - string - A machine or comma separated list of machines 285 for the (-m) flag. 286 @param extra_args - list - Additional arguments to pass to autoserv. 287 @param job - Job object - If supplied, -u owner, -l name, --test-retry, 288 and client -c or server -s parameters will be added. 289 @param queue_entry - A HostQueueEntry object - If supplied and no Job 290 object was supplied, this will be used to lookup the Job object. 291 """ 292 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory, 293 machines, results_directory=drone_manager.WORKING_DIRECTORY, 294 extra_args=extra_args, job=job, queue_entry=queue_entry, 295 verbose=verbose, in_lab=True) 296 return command 297 298 def _calls_log_tick_msg(func): 299 """Used to trace functions called by Dispatcher.tick.""" 300 @functools.wraps(func) 301 def wrapper(self, *args, **kwargs): 302 self._log_tick_msg('Starting %s' % func.__name__) 303 return func(self, *args, **kwargs) 304 305 return wrapper 306 307 308 class Dispatcher(object): 309 310 311 def __init__(self): 312 self._agents = [] 313 self._last_clean_time = time.time() 314 user_cleanup_time = scheduler_config.config.clean_interval_minutes 315 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( 316 _db, user_cleanup_time) 317 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep( 318 _db, _drone_manager) 319 self._host_agents = {} 320 self._queue_entry_agents = {} 321 self._tick_count = 0 322 self._last_garbage_stats_time = time.time() 323 self._seconds_between_garbage_stats = 60 * ( 324 global_config.global_config.get_config_value( 325 scheduler_config.CONFIG_SECTION, 326 'gc_stats_interval_mins', type=int, default=6*60)) 327 self._tick_debug = global_config.global_config.get_config_value( 328 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool, 329 default=False) 330 self._extra_debugging = global_config.global_config.get_config_value( 331 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool, 332 default=False) 333 self._inline_host_acquisition = ( 334 global_config.global_config.get_config_value( 335 scheduler_config.CONFIG_SECTION, 336 'inline_host_acquisition', type=bool, default=True)) 337 338 # If _inline_host_acquisition is set the scheduler will acquire and 339 # release hosts against jobs inline, with the tick. Otherwise the 340 # scheduler will only focus on jobs that already have hosts, and 341 # will not explicitly unlease a host when a job finishes using it. 342 self._job_query_manager = query_managers.AFEJobQueryManager() 343 self._host_scheduler = (host_scheduler.BaseHostScheduler() 344 if self._inline_host_acquisition else 345 host_scheduler.DummyHostScheduler()) 346 347 348 def initialize(self, recover_hosts=True): 349 self._periodic_cleanup.initialize() 350 self._24hr_upkeep.initialize() 351 # Execute all actions queued in the cleanup tasks. Scheduler tick will 352 # run a refresh task first. If there is any action in the queue, refresh 353 # will raise an exception. 354 _drone_manager.execute_actions() 355 356 # always recover processes 357 self._recover_processes() 358 359 if recover_hosts: 360 self._recover_hosts() 361 362 363 # TODO(pprabhu) Drop this metric once tick_times has been verified. 364 @metrics.SecondsTimerDecorator( 365 'chromeos/autotest/scheduler/tick_durations/tick') 366 def tick(self): 367 """ 368 This is an altered version of tick() where we keep track of when each 369 major step begins so we can try to figure out where we are using most 370 of the tick time. 371 """ 372 with metrics.RuntimeBreakdownTimer( 373 'chromeos/autotest/scheduler/tick_times') as breakdown_timer: 374 self._log_tick_msg('New tick') 375 system_utils.DroneCache.refresh() 376 377 with breakdown_timer.Step('garbage_collection'): 378 self._garbage_collection() 379 with breakdown_timer.Step('trigger_refresh'): 380 self._log_tick_msg('Starting _drone_manager.trigger_refresh') 381 _drone_manager.trigger_refresh() 382 with breakdown_timer.Step('schedule_running_host_queue_entries'): 383 self._schedule_running_host_queue_entries() 384 with breakdown_timer.Step('schedule_special_tasks'): 385 self._schedule_special_tasks() 386 with breakdown_timer.Step('schedule_new_jobs'): 387 self._schedule_new_jobs() 388 with breakdown_timer.Step('gather_tick_metrics'): 389 self._gather_tick_metrics() 390 with breakdown_timer.Step('sync_refresh'): 391 self._log_tick_msg('Starting _drone_manager.sync_refresh') 392 _drone_manager.sync_refresh() 393 if luciferlib.is_lucifer_enabled(): 394 with breakdown_timer.Step('send_to_lucifer'): 395 self._send_to_lucifer() 396 # _run_cleanup must be called between drone_manager.sync_refresh, 397 # and drone_manager.execute_actions, as sync_refresh will clear the 398 # calls queued in drones. Therefore, any action that calls 399 # drone.queue_call to add calls to the drone._calls, should be after 400 # drone refresh is completed and before 401 # drone_manager.execute_actions at the end of the tick. 402 with breakdown_timer.Step('run_cleanup'): 403 self._run_cleanup() 404 with breakdown_timer.Step('find_aborting'): 405 self._find_aborting() 406 with breakdown_timer.Step('find_aborted_special_tasks'): 407 self._find_aborted_special_tasks() 408 with breakdown_timer.Step('handle_agents'): 409 self._handle_agents() 410 with breakdown_timer.Step('host_scheduler_tick'): 411 self._log_tick_msg('Starting _host_scheduler.tick') 412 self._host_scheduler.tick() 413 with breakdown_timer.Step('drones_execute_actions'): 414 self._log_tick_msg('Starting _drone_manager.execute_actions') 415 _drone_manager.execute_actions() 416 with breakdown_timer.Step('send_queued_emails'): 417 self._log_tick_msg( 418 'Starting email_manager.manager.send_queued_emails') 419 email_manager.manager.send_queued_emails() 420 with breakdown_timer.Step('db_reset_queries'): 421 self._log_tick_msg('Starting django.db.reset_queries') 422 django.db.reset_queries() 423 424 self._tick_count += 1 425 metrics.Counter('chromeos/autotest/scheduler/tick').increment() 426 427 428 @_calls_log_tick_msg 429 def _run_cleanup(self): 430 self._periodic_cleanup.run_cleanup_maybe() 431 self._24hr_upkeep.run_cleanup_maybe() 432 433 434 @_calls_log_tick_msg 435 def _garbage_collection(self): 436 threshold_time = time.time() - self._seconds_between_garbage_stats 437 if threshold_time < self._last_garbage_stats_time: 438 # Don't generate these reports very often. 439 return 440 441 self._last_garbage_stats_time = time.time() 442 # Force a full level 0 collection (because we can, it doesn't hurt 443 # at this interval). 444 gc.collect() 445 logging.info('Logging garbage collector stats on tick %d.', 446 self._tick_count) 447 gc_stats._log_garbage_collector_stats() 448 449 450 def _gather_tick_metrics(self): 451 """Gather metrics during tick, after all tasks have been scheduled.""" 452 metrics.Gauge( 453 'chromeos/autotest/scheduler/agent_count' 454 ).set(len(self._agents)) 455 456 457 def _register_agent_for_ids(self, agent_dict, object_ids, agent): 458 for object_id in object_ids: 459 agent_dict.setdefault(object_id, set()).add(agent) 460 461 462 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent): 463 for object_id in object_ids: 464 assert object_id in agent_dict 465 agent_dict[object_id].remove(agent) 466 # If an ID has no more active agent associated, there is no need to 467 # keep it in the dictionary. Otherwise, scheduler will keep an 468 # unnecessarily big dictionary until being restarted. 469 if not agent_dict[object_id]: 470 agent_dict.pop(object_id) 471 472 473 def add_agent_task(self, agent_task): 474 """ 475 Creates and adds an agent to the dispatchers list. 476 477 In creating the agent we also pass on all the queue_entry_ids and 478 host_ids from the special agent task. For every agent we create, we 479 add it to 1. a dict against the queue_entry_ids given to it 2. A dict 480 against the host_ids given to it. So theoritically, a host can have any 481 number of agents associated with it, and each of them can have any 482 special agent task, though in practice we never see > 1 agent/task per 483 host at any time. 484 485 @param agent_task: A SpecialTask for the agent to manage. 486 """ 487 # These are owned by lucifer; don't manage these tasks. 488 if (luciferlib.is_enabled_for('GATHERING') 489 and (isinstance(agent_task, postjob_task.GatherLogsTask) 490 # TODO(crbug.com/811877): Don't skip split HQE parsing. 491 or (isinstance(agent_task, postjob_task.FinalReparseTask) 492 and not luciferlib.is_split_job( 493 agent_task.queue_entries[0].id)))): 494 return 495 if luciferlib.is_enabled_for('STARTING'): 496 # TODO(crbug.com/810141): Transition code. After running at 497 # STARTING for a while, these tasks should no longer exist. 498 if (isinstance(agent_task, postjob_task.GatherLogsTask) 499 # TODO(crbug.com/811877): Don't skip split HQE parsing. 500 or (isinstance(agent_task, postjob_task.FinalReparseTask) 501 and not luciferlib.is_split_job( 502 agent_task.queue_entries[0].id))): 503 return 504 # If this AgentTask is already started (i.e., recovered from 505 # the scheduler running previously not at STARTING lucifer 506 # level), we want to use the AgentTask to run the test to 507 # completion. 508 if (isinstance(agent_task, postjob_task.AbstractQueueTask) 509 and not agent_task.started): 510 return 511 512 agent = Agent(agent_task) 513 self._agents.append(agent) 514 agent.dispatcher = self 515 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent) 516 self._register_agent_for_ids(self._queue_entry_agents, 517 agent.queue_entry_ids, agent) 518 519 520 def get_agents_for_entry(self, queue_entry): 521 """ 522 Find agents corresponding to the specified queue_entry. 523 """ 524 return list(self._queue_entry_agents.get(queue_entry.id, set())) 525 526 527 def host_has_agent(self, host): 528 """ 529 Determine if there is currently an Agent present using this host. 530 """ 531 return bool(self._host_agents.get(host.id, None)) 532 533 534 def remove_agent(self, agent): 535 self._agents.remove(agent) 536 self._unregister_agent_for_ids(self._host_agents, agent.host_ids, 537 agent) 538 self._unregister_agent_for_ids(self._queue_entry_agents, 539 agent.queue_entry_ids, agent) 540 541 542 def _host_has_scheduled_special_task(self, host): 543 return bool(models.SpecialTask.objects.filter(host__id=host.id, 544 is_active=False, 545 is_complete=False)) 546 547 548 def _recover_processes(self): 549 agent_tasks = self._create_recovery_agent_tasks() 550 self._register_pidfiles(agent_tasks) 551 _drone_manager.refresh() 552 self._recover_tasks(agent_tasks) 553 self._recover_pending_entries() 554 self._check_for_unrecovered_verifying_entries() 555 self._reverify_remaining_hosts() 556 # reinitialize drones after killing orphaned processes, since they can 557 # leave around files when they die 558 _drone_manager.execute_actions() 559 _drone_manager.reinitialize_drones() 560 561 562 def _create_recovery_agent_tasks(self): 563 return (self._get_queue_entry_agent_tasks() 564 + self._get_special_task_agent_tasks(is_active=True)) 565 566 567 def _get_queue_entry_agent_tasks(self): 568 """ 569 Get agent tasks for all hqe in the specified states. 570 571 Loosely this translates to taking a hqe in one of the specified states, 572 say parsing, and getting an AgentTask for it, like the FinalReparseTask, 573 through _get_agent_task_for_queue_entry. Each queue entry can only have 574 one agent task at a time, but there might be multiple queue entries in 575 the group. 576 577 @return: A list of AgentTasks. 578 """ 579 # host queue entry statuses handled directly by AgentTasks 580 # (Verifying is handled through SpecialTasks, so is not 581 # listed here) 582 statuses = (models.HostQueueEntry.Status.STARTING, 583 models.HostQueueEntry.Status.RUNNING, 584 models.HostQueueEntry.Status.GATHERING, 585 models.HostQueueEntry.Status.PARSING) 586 status_list = ','.join("'%s'" % status for status in statuses) 587 queue_entries = scheduler_models.HostQueueEntry.fetch( 588 where='status IN (%s)' % status_list) 589 590 agent_tasks = [] 591 used_queue_entries = set() 592 hqe_count_by_status = {} 593 for entry in queue_entries: 594 try: 595 hqe_count_by_status[entry.status] = ( 596 hqe_count_by_status.get(entry.status, 0) + 1) 597 if self.get_agents_for_entry(entry): 598 # already being handled 599 continue 600 if entry in used_queue_entries: 601 # already picked up by a synchronous job 602 continue 603 try: 604 agent_task = self._get_agent_task_for_queue_entry(entry) 605 except scheduler_lib.SchedulerError: 606 # Probably being handled by lucifer crbug.com/809773 607 continue 608 agent_tasks.append(agent_task) 609 used_queue_entries.update(agent_task.queue_entries) 610 except scheduler_lib.MalformedRecordError as e: 611 logging.exception('Skipping agent task for a malformed hqe.') 612 # TODO(akeshet): figure out a way to safely permanently discard 613 # this errant HQE. It appears that calling entry.abort() is not 614 # sufficient, as that already makes some assumptions about 615 # record sanity that may be violated. See crbug.com/739530 for 616 # context. 617 m = 'chromeos/autotest/scheduler/skipped_malformed_hqe' 618 metrics.Counter(m).increment() 619 620 for status, count in hqe_count_by_status.iteritems(): 621 metrics.Gauge( 622 'chromeos/autotest/scheduler/active_host_queue_entries' 623 ).set(count, fields={'status': status}) 624 625 return agent_tasks 626 627 628 def _get_special_task_agent_tasks(self, is_active=False): 629 special_tasks = models.SpecialTask.objects.filter( 630 is_active=is_active, is_complete=False) 631 agent_tasks = [] 632 for task in special_tasks: 633 try: 634 agent_tasks.append(self._get_agent_task_for_special_task(task)) 635 except scheduler_lib.MalformedRecordError as e: 636 logging.exception('Skipping agent task for malformed special ' 637 'task.') 638 m = 'chromeos/autotest/scheduler/skipped_malformed_special_task' 639 metrics.Counter(m).increment() 640 return agent_tasks 641 642 643 def _get_agent_task_for_queue_entry(self, queue_entry): 644 """ 645 Construct an AgentTask instance for the given active HostQueueEntry. 646 647 @param queue_entry: a HostQueueEntry 648 @return: an AgentTask to run the queue entry 649 """ 650 task_entries = queue_entry.job.get_group_entries(queue_entry) 651 self._check_for_duplicate_host_entries(task_entries) 652 653 if queue_entry.status in (models.HostQueueEntry.Status.STARTING, 654 models.HostQueueEntry.Status.RUNNING): 655 if queue_entry.is_hostless(): 656 return HostlessQueueTask(queue_entry=queue_entry) 657 return QueueTask(queue_entries=task_entries) 658 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: 659 return postjob_task.GatherLogsTask(queue_entries=task_entries) 660 if queue_entry.status == models.HostQueueEntry.Status.PARSING: 661 return postjob_task.FinalReparseTask(queue_entries=task_entries) 662 663 raise scheduler_lib.MalformedRecordError( 664 '_get_agent_task_for_queue_entry got entry with ' 665 'invalid status %s: %s' % (queue_entry.status, queue_entry)) 666 667 668 def _check_for_duplicate_host_entries(self, task_entries): 669 non_host_statuses = {models.HostQueueEntry.Status.PARSING} 670 for task_entry in task_entries: 671 using_host = (task_entry.host is not None 672 and task_entry.status not in non_host_statuses) 673 if using_host: 674 self._assert_host_has_no_agent(task_entry) 675 676 677 def _assert_host_has_no_agent(self, entry): 678 """ 679 @param entry: a HostQueueEntry or a SpecialTask 680 """ 681 if self.host_has_agent(entry.host): 682 agent = tuple(self._host_agents.get(entry.host.id))[0] 683 raise scheduler_lib.MalformedRecordError( 684 'While scheduling %s, host %s already has a host agent %s' 685 % (entry, entry.host, agent.task)) 686 687 688 def _get_agent_task_for_special_task(self, special_task): 689 """ 690 Construct an AgentTask class to run the given SpecialTask and add it 691 to this dispatcher. 692 693 A special task is created through schedule_special_tasks, but only if 694 the host doesn't already have an agent. This happens through 695 add_agent_task. All special agent tasks are given a host on creation, 696 and a Null hqe. To create a SpecialAgentTask object, you need a 697 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask 698 object contains a hqe it's passed on to the special agent task, which 699 creates a HostQueueEntry and saves it as it's queue_entry. 700 701 @param special_task: a models.SpecialTask instance 702 @returns an AgentTask to run this SpecialTask 703 """ 704 self._assert_host_has_no_agent(special_task) 705 706 special_agent_task_classes = (prejob_task.CleanupTask, 707 prejob_task.VerifyTask, 708 prejob_task.RepairTask, 709 prejob_task.ResetTask, 710 prejob_task.ProvisionTask) 711 712 for agent_task_class in special_agent_task_classes: 713 if agent_task_class.TASK_TYPE == special_task.task: 714 return agent_task_class(task=special_task) 715 716 raise scheduler_lib.MalformedRecordError( 717 'No AgentTask class for task', str(special_task)) 718 719 720 def _register_pidfiles(self, agent_tasks): 721 for agent_task in agent_tasks: 722 agent_task.register_necessary_pidfiles() 723 724 725 def _recover_tasks(self, agent_tasks): 726 orphans = _drone_manager.get_orphaned_autoserv_processes() 727 728 for agent_task in agent_tasks: 729 agent_task.recover() 730 if agent_task.monitor and agent_task.monitor.has_process(): 731 orphans.discard(agent_task.monitor.get_process()) 732 self.add_agent_task(agent_task) 733 734 self._check_for_remaining_orphan_processes(orphans) 735 736 737 def _get_unassigned_entries(self, status): 738 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'" 739 % status): 740 if entry.status == status and not self.get_agents_for_entry(entry): 741 # The status can change during iteration, e.g., if job.run() 742 # sets a group of queue entries to Starting 743 yield entry 744 745 746 def _check_for_remaining_orphan_processes(self, orphans): 747 m = 'chromeos/autotest/errors/unrecovered_orphan_processes' 748 metrics.Gauge(m).set(len(orphans)) 749 750 if not orphans: 751 return 752 subject = 'Unrecovered orphan autoserv processes remain' 753 message = '\n'.join(str(process) for process in orphans) 754 die_on_orphans = global_config.global_config.get_config_value( 755 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool) 756 757 if die_on_orphans: 758 raise RuntimeError(subject + '\n' + message) 759 760 761 def _recover_pending_entries(self): 762 for entry in self._get_unassigned_entries( 763 models.HostQueueEntry.Status.PENDING): 764 logging.info('Recovering Pending entry %s', entry) 765 try: 766 entry.on_pending() 767 except scheduler_lib.MalformedRecordError as e: 768 logging.exception( 769 'Skipping agent task for malformed special task.') 770 m = 'chromeos/autotest/scheduler/skipped_malformed_special_task' 771 metrics.Counter(m).increment() 772 773 774 def _check_for_unrecovered_verifying_entries(self): 775 # Verify is replaced by Reset. 776 queue_entries = scheduler_models.HostQueueEntry.fetch( 777 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING) 778 for queue_entry in queue_entries: 779 special_tasks = models.SpecialTask.objects.filter( 780 task__in=(models.SpecialTask.Task.CLEANUP, 781 models.SpecialTask.Task.VERIFY, 782 models.SpecialTask.Task.RESET), 783 queue_entry__id=queue_entry.id, 784 is_complete=False) 785 if special_tasks.count() == 0: 786 logging.error('Unrecovered Resetting host queue entry: %s. ' 787 'Setting status to Queued.', str(queue_entry)) 788 # Essentially this host queue entry was set to be Verifying 789 # however no special task exists for entry. This occurs if the 790 # scheduler dies between changing the status and creating the 791 # special task. By setting it to queued, the job can restart 792 # from the beginning and proceed correctly. This is much more 793 # preferable than having monitor_db not launching. 794 queue_entry.set_status('Queued') 795 796 797 @_calls_log_tick_msg 798 def _schedule_special_tasks(self): 799 """ 800 Execute queued SpecialTasks that are ready to run on idle hosts. 801 802 Special tasks include PreJobTasks like verify, reset and cleanup. 803 They are created through _schedule_new_jobs and associated with a hqe 804 This method translates SpecialTasks to the appropriate AgentTask and 805 adds them to the dispatchers agents list, so _handle_agents can execute 806 them. 807 """ 808 # When the host scheduler is responsible for acquisition we only want 809 # to run tasks with leased hosts. All hqe tasks will already have 810 # leased hosts, and we don't want to run frontend tasks till the host 811 # scheduler has vetted the assignment. Note that this doesn't include 812 # frontend tasks with hosts leased by other active hqes. 813 for task in self._job_query_manager.get_prioritized_special_tasks( 814 only_tasks_with_leased_hosts=not self._inline_host_acquisition): 815 if self.host_has_agent(task.host): 816 continue 817 try: 818 self.add_agent_task(self._get_agent_task_for_special_task(task)) 819 except scheduler_lib.MalformedRecordError: 820 logging.exception('Skipping schedule for malformed ' 821 'special task.') 822 m = 'chromeos/autotest/scheduler/skipped_schedule_special_task' 823 metrics.Counter(m).increment() 824 825 826 def _reverify_remaining_hosts(self): 827 # recover active hosts that have not yet been recovered, although this 828 # should never happen 829 message = ('Recovering active host %s - this probably indicates a ' 830 'scheduler bug') 831 self._reverify_hosts_where( 832 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')", 833 print_message=message) 834 835 836 DEFAULT_REQUESTED_BY_USER_ID = 1 837 838 839 def _reverify_hosts_where(self, where, 840 print_message='Reverifying host %s'): 841 full_where='locked = 0 AND invalid = 0 AND ' + where 842 for host in scheduler_models.Host.fetch(where=full_where): 843 if self.host_has_agent(host): 844 # host has already been recovered in some way 845 continue 846 if self._host_has_scheduled_special_task(host): 847 # host will have a special task scheduled on the next cycle 848 continue 849 if print_message: 850 logging.error(print_message, host.hostname) 851 try: 852 user = models.User.objects.get(login='autotest_system') 853 except models.User.DoesNotExist: 854 user = models.User.objects.get( 855 id=self.DEFAULT_REQUESTED_BY_USER_ID) 856 models.SpecialTask.objects.create( 857 task=models.SpecialTask.Task.RESET, 858 host=models.Host.objects.get(id=host.id), 859 requested_by=user) 860 861 862 def _recover_hosts(self): 863 # recover "Repair Failed" hosts 864 message = 'Reverifying dead host %s' 865 self._reverify_hosts_where("status = 'Repair Failed'", 866 print_message=message) 867 868 869 def _refresh_pending_queue_entries(self): 870 """ 871 Lookup the pending HostQueueEntries and call our HostScheduler 872 refresh() method given that list. Return the list. 873 874 @returns A list of pending HostQueueEntries sorted in priority order. 875 """ 876 queue_entries = self._job_query_manager.get_pending_queue_entries( 877 only_hostless=not self._inline_host_acquisition) 878 if not queue_entries: 879 return [] 880 return queue_entries 881 882 883 def _schedule_hostless_job(self, queue_entry): 884 """Schedule a hostless (suite) job. 885 886 @param queue_entry: The queue_entry representing the hostless job. 887 """ 888 if not luciferlib.is_enabled_for('STARTING'): 889 self.add_agent_task(HostlessQueueTask(queue_entry)) 890 891 # Need to set execution_subdir before setting the status: 892 # After a restart of the scheduler, agents will be restored for HQEs in 893 # Starting, Running, Gathering, Parsing or Archiving. To do this, the 894 # execution_subdir is needed. Therefore it must be set before entering 895 # one of these states. 896 # Otherwise, if the scheduler was interrupted between setting the status 897 # and the execution_subdir, upon it's restart restoring agents would 898 # fail. 899 # Is there a way to get a status in one of these states without going 900 # through this code? Following cases are possible: 901 # - If it's aborted before being started: 902 # active bit will be 0, so there's nothing to parse, it will just be 903 # set to completed by _find_aborting. Critical statuses are skipped. 904 # - If it's aborted or it fails after being started: 905 # It was started, so this code was executed. 906 queue_entry.update_field('execution_subdir', 'hostless') 907 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 908 909 910 def _schedule_host_job(self, host, queue_entry): 911 """Schedules a job on the given host. 912 913 1. Assign the host to the hqe, if it isn't already assigned. 914 2. Create a SpecialAgentTask for the hqe. 915 3. Activate the hqe. 916 917 @param queue_entry: The job to schedule. 918 @param host: The host to schedule the job on. 919 """ 920 if self.host_has_agent(host): 921 host_agent_task = list(self._host_agents.get(host.id))[0].task 922 else: 923 self._host_scheduler.schedule_host_job(host, queue_entry) 924 925 926 @_calls_log_tick_msg 927 def _schedule_new_jobs(self): 928 """ 929 Find any new HQEs and call schedule_pre_job_tasks for it. 930 931 This involves setting the status of the HQE and creating a row in the 932 db corresponding the the special task, through 933 scheduler_models._queue_special_task. The new db row is then added as 934 an agent to the dispatcher through _schedule_special_tasks and 935 scheduled for execution on the drone through _handle_agents. 936 """ 937 queue_entries = self._refresh_pending_queue_entries() 938 939 key = 'scheduler.jobs_per_tick' 940 new_hostless_jobs = 0 941 new_jobs_with_hosts = 0 942 new_jobs_need_hosts = 0 943 host_jobs = [] 944 logging.debug('Processing %d queue_entries', len(queue_entries)) 945 946 for queue_entry in queue_entries: 947 if queue_entry.is_hostless(): 948 self._schedule_hostless_job(queue_entry) 949 new_hostless_jobs = new_hostless_jobs + 1 950 else: 951 host_jobs.append(queue_entry) 952 new_jobs_need_hosts = new_jobs_need_hosts + 1 953 954 metrics.Counter( 955 'chromeos/autotest/scheduler/scheduled_jobs_hostless' 956 ).increment_by(new_hostless_jobs) 957 958 if not host_jobs: 959 return 960 961 if not self._inline_host_acquisition: 962 # In this case, host_scheduler is responsible for scheduling 963 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption 964 # since host_scheduler assumes it is the single process scheduling 965 # host jobs. 966 metrics.Gauge( 967 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set( 968 len(host_jobs)) 969 return 970 971 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs) 972 for host_assignment in jobs_with_hosts: 973 self._schedule_host_job(host_assignment.host, host_assignment.job) 974 new_jobs_with_hosts = new_jobs_with_hosts + 1 975 976 metrics.Counter( 977 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts' 978 ).increment_by(new_jobs_with_hosts) 979 980 981 @_calls_log_tick_msg 982 def _send_to_lucifer(self): 983 """ 984 Hand off ownership of a job to lucifer component. 985 """ 986 if luciferlib.is_enabled_for('starting'): 987 self._send_starting_to_lucifer() 988 # TODO(crbug.com/810141): Older states need to be supported when 989 # STARTING is toggled; some jobs may be in an intermediate state 990 # at that moment. 991 self._send_gathering_to_lucifer() 992 self._send_parsing_to_lucifer() 993 994 995 # TODO(crbug.com/748234): This is temporary to enable toggling 996 # lucifer rollouts with an option. 997 def _send_starting_to_lucifer(self): 998 Status = models.HostQueueEntry.Status 999 queue_entries_qs = (models.HostQueueEntry.objects 1000 .filter(status=Status.STARTING)) 1001 for queue_entry in queue_entries_qs: 1002 if self.get_agents_for_entry(queue_entry): 1003 continue 1004 job = queue_entry.job 1005 if luciferlib.is_lucifer_owned(job): 1006 continue 1007 drone = luciferlib.spawn_starting_job_handler( 1008 manager=_drone_manager, 1009 job=job) 1010 models.JobHandoff.objects.create(job=job, drone=drone.hostname()) 1011 1012 1013 # TODO(crbug.com/748234): This is temporary to enable toggling 1014 # lucifer rollouts with an option. 1015 def _send_gathering_to_lucifer(self): 1016 Status = models.HostQueueEntry.Status 1017 queue_entries_qs = (models.HostQueueEntry.objects 1018 .filter(status=Status.GATHERING)) 1019 for queue_entry in queue_entries_qs: 1020 # If this HQE already has an agent, let monitor_db continue 1021 # owning it. 1022 if self.get_agents_for_entry(queue_entry): 1023 continue 1024 1025 job = queue_entry.job 1026 if luciferlib.is_lucifer_owned(job): 1027 continue 1028 task = postjob_task.PostJobTask( 1029 [queue_entry], log_file_name='/dev/null') 1030 pidfile_id = task._autoserv_monitor.pidfile_id 1031 autoserv_exit = task._autoserv_monitor.exit_code() 1032 try: 1033 drone = luciferlib.spawn_gathering_job_handler( 1034 manager=_drone_manager, 1035 job=job, 1036 autoserv_exit=autoserv_exit, 1037 pidfile_id=pidfile_id) 1038 models.JobHandoff.objects.create(job=job, 1039 drone=drone.hostname()) 1040 except drone_manager.DroneManagerError as e: 1041 logging.warning( 1042 'Fail to get drone for job %s, skipping lucifer. Error: %s', 1043 job.id, e) 1044 1045 1046 # TODO(crbug.com/748234): This is temporary to enable toggling 1047 # lucifer rollouts with an option. 1048 def _send_parsing_to_lucifer(self): 1049 Status = models.HostQueueEntry.Status 1050 queue_entries_qs = (models.HostQueueEntry.objects 1051 .filter(status=Status.PARSING)) 1052 for queue_entry in queue_entries_qs: 1053 # If this HQE already has an agent, let monitor_db continue 1054 # owning it. 1055 if self.get_agents_for_entry(queue_entry): 1056 continue 1057 job = queue_entry.job 1058 if luciferlib.is_lucifer_owned(job): 1059 continue 1060 # TODO(crbug.com/811877): Ignore split HQEs. 1061 if luciferlib.is_split_job(queue_entry.id): 1062 continue 1063 task = postjob_task.PostJobTask( 1064 [queue_entry], log_file_name='/dev/null') 1065 pidfile_id = task._autoserv_monitor.pidfile_id 1066 autoserv_exit = task._autoserv_monitor.exit_code() 1067 try: 1068 drone = luciferlib.spawn_parsing_job_handler( 1069 manager=_drone_manager, 1070 job=job, 1071 autoserv_exit=autoserv_exit, 1072 pidfile_id=pidfile_id) 1073 models.JobHandoff.objects.create(job=job, 1074 drone=drone.hostname()) 1075 except drone_manager.DroneManagerError as e: 1076 logging.warning( 1077 'Fail to get drone for job %s, skipping lucifer. Error: %s', 1078 job.id, e) 1079 1080 1081 1082 @_calls_log_tick_msg 1083 def _schedule_running_host_queue_entries(self): 1084 """ 1085 Adds agents to the dispatcher. 1086 1087 Any AgentTask, like the QueueTask, is wrapped in an Agent. The 1088 QueueTask for example, will have a job with a control file, and 1089 the agent will have methods that poll, abort and check if the queue 1090 task is finished. The dispatcher runs the agent_task, as well as 1091 other agents in it's _agents member, through _handle_agents, by 1092 calling the Agents tick(). 1093 1094 This method creates an agent for each HQE in one of (starting, running, 1095 gathering, parsing) states, and adds it to the dispatcher so 1096 it is handled by _handle_agents. 1097 """ 1098 for agent_task in self._get_queue_entry_agent_tasks(): 1099 self.add_agent_task(agent_task) 1100 1101 1102 @_calls_log_tick_msg 1103 def _find_aborting(self): 1104 """ 1105 Looks through the afe_host_queue_entries for an aborted entry. 1106 1107 The aborted bit is set on an HQE in many ways, the most common 1108 being when a user requests an abort through the frontend, which 1109 results in an rpc from the afe to abort_host_queue_entries. 1110 """ 1111 jobs_to_stop = set() 1112 for entry in scheduler_models.HostQueueEntry.fetch( 1113 where='aborted=1 and complete=0'): 1114 1115 # If the job is running on a shard, let the shard handle aborting 1116 # it and sync back the right status. 1117 if entry.job.shard_id is not None and not server_utils.is_shard(): 1118 logging.info('Waiting for shard %s to abort hqe %s', 1119 entry.job.shard_id, entry) 1120 continue 1121 1122 logging.info('Aborting %s', entry) 1123 1124 # The task would have started off with both is_complete and 1125 # is_active = False. Aborted tasks are neither active nor complete. 1126 # For all currently active tasks this will happen through the agent, 1127 # but we need to manually update the special tasks that haven't 1128 # started yet, because they don't have agents. 1129 models.SpecialTask.objects.filter(is_active=False, 1130 queue_entry_id=entry.id).update(is_complete=True) 1131 1132 for agent in self.get_agents_for_entry(entry): 1133 agent.abort() 1134 entry.abort(self) 1135 jobs_to_stop.add(entry.job) 1136 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop)) 1137 for job in jobs_to_stop: 1138 job.stop_if_necessary() 1139 1140 1141 @_calls_log_tick_msg 1142 def _find_aborted_special_tasks(self): 1143 """ 1144 Find SpecialTasks that have been marked for abortion. 1145 1146 Poll the database looking for SpecialTasks that are active 1147 and have been marked for abortion, then abort them. 1148 """ 1149 1150 # The completed and active bits are very important when it comes 1151 # to scheduler correctness. The active bit is set through the prolog 1152 # of a special task, and reset through the cleanup method of the 1153 # SpecialAgentTask. The cleanup is called both through the abort and 1154 # epilog. The complete bit is set in several places, and in general 1155 # a hanging job will have is_active=1 is_complete=0, while a special 1156 # task which completed will have is_active=0 is_complete=1. To check 1157 # aborts we directly check active because the complete bit is set in 1158 # several places, including the epilog of agent tasks. 1159 aborted_tasks = models.SpecialTask.objects.filter(is_active=True, 1160 is_aborted=True) 1161 for task in aborted_tasks: 1162 # There are 2 ways to get the agent associated with a task, 1163 # through the host and through the hqe. A special task 1164 # always needs a host, but doesn't always need a hqe. 1165 for agent in self._host_agents.get(task.host.id, []): 1166 if isinstance(agent.task, agent_task.SpecialAgentTask): 1167 1168 # The epilog preforms critical actions such as 1169 # queueing the next SpecialTask, requeuing the 1170 # hqe etc, however it doesn't actually kill the 1171 # monitor process and set the 'done' bit. Epilogs 1172 # assume that the job failed, and that the monitor 1173 # process has already written an exit code. The 1174 # done bit is a necessary condition for 1175 # _handle_agents to schedule any more special 1176 # tasks against the host, and it must be set 1177 # in addition to is_active, is_complete and success. 1178 agent.task.epilog() 1179 agent.task.abort() 1180 1181 1182 def _can_start_agent(self, agent, have_reached_limit): 1183 # always allow zero-process agents to run 1184 if agent.task.num_processes == 0: 1185 return True 1186 # don't allow any nonzero-process agents to run after we've reached a 1187 # limit (this avoids starvation of many-process agents) 1188 if have_reached_limit: 1189 return False 1190 # total process throttling 1191 max_runnable_processes = _drone_manager.max_runnable_processes( 1192 agent.task.owner_username, 1193 agent.task.get_drone_hostnames_allowed()) 1194 if agent.task.num_processes > max_runnable_processes: 1195 return False 1196 return True 1197 1198 1199 @_calls_log_tick_msg 1200 def _handle_agents(self): 1201 """ 1202 Handles agents of the dispatcher. 1203 1204 Appropriate Agents are added to the dispatcher through 1205 _schedule_running_host_queue_entries. These agents each 1206 have a task. This method runs the agents task through 1207 agent.tick() leading to: 1208 agent.start 1209 prolog -> AgentTasks prolog 1210 For each queue entry: 1211 sets host status/status to Running 1212 set started_on in afe_host_queue_entries 1213 run -> AgentTasks run 1214 Creates PidfileRunMonitor 1215 Queues the autoserv command line for this AgentTask 1216 via the drone manager. These commands are executed 1217 through the drone managers execute actions. 1218 poll -> AgentTasks/BaseAgentTask poll 1219 checks the monitors exit_code. 1220 Executes epilog if task is finished. 1221 Executes AgentTasks _finish_task 1222 finish_task is usually responsible for setting the status 1223 of the HQE/host, and updating it's active and complete fileds. 1224 1225 agent.is_done 1226 Removed the agent from the dispatchers _agents queue. 1227 Is_done checks the finished bit on the agent, that is 1228 set based on the Agents task. During the agents poll 1229 we check to see if the monitor process has exited in 1230 it's finish method, and set the success member of the 1231 task based on this exit code. 1232 """ 1233 num_started_this_tick = 0 1234 num_finished_this_tick = 0 1235 have_reached_limit = False 1236 # iterate over copy, so we can remove agents during iteration 1237 logging.debug('Handling %d Agents', len(self._agents)) 1238 for agent in list(self._agents): 1239 self._log_extra_msg('Processing Agent with Host Ids: %s and ' 1240 'queue_entry ids:%s' % (agent.host_ids, 1241 agent.queue_entry_ids)) 1242 if not agent.started: 1243 if not self._can_start_agent(agent, have_reached_limit): 1244 have_reached_limit = True 1245 logging.debug('Reached Limit of allowed running Agents.') 1246 continue 1247 num_started_this_tick += agent.task.num_processes 1248 self._log_extra_msg('Starting Agent') 1249 agent.tick() 1250 self._log_extra_msg('Agent tick completed.') 1251 if agent.is_done(): 1252 num_finished_this_tick += agent.task.num_processes 1253 self._log_extra_msg("Agent finished") 1254 self.remove_agent(agent) 1255 1256 metrics.Counter( 1257 'chromeos/autotest/scheduler/agent_processes_started' 1258 ).increment_by(num_started_this_tick) 1259 metrics.Counter( 1260 'chromeos/autotest/scheduler/agent_processes_finished' 1261 ).increment_by(num_finished_this_tick) 1262 num_agent_processes = _drone_manager.total_running_processes() 1263 metrics.Gauge( 1264 'chromeos/autotest/scheduler/agent_processes' 1265 ).set(num_agent_processes) 1266 logging.info('%d running processes. %d added this tick.', 1267 num_agent_processes, num_started_this_tick) 1268 1269 1270 def _log_tick_msg(self, msg): 1271 if self._tick_debug: 1272 logging.debug(msg) 1273 1274 1275 def _log_extra_msg(self, msg): 1276 if self._extra_debugging: 1277 logging.debug(msg) 1278 1279 1280 class Agent(object): 1281 """ 1282 An agent for use by the Dispatcher class to perform a task. An agent wraps 1283 around an AgentTask mainly to associate the AgentTask with the queue_entry 1284 and host ids. 1285 1286 The following methods are required on all task objects: 1287 poll() - Called periodically to let the task check its status and 1288 update its internal state. If the task succeeded. 1289 is_done() - Returns True if the task is finished. 1290 abort() - Called when an abort has been requested. The task must 1291 set its aborted attribute to True if it actually aborted. 1292 1293 The following attributes are required on all task objects: 1294 aborted - bool, True if this task was aborted. 1295 success - bool, True if this task succeeded. 1296 queue_entry_ids - A sequence of HostQueueEntry ids this task handles. 1297 host_ids - A sequence of Host ids this task represents. 1298 """ 1299 1300 1301 def __init__(self, task): 1302 """ 1303 @param task: An instance of an AgentTask. 1304 """ 1305 self.task = task 1306 1307 # This is filled in by Dispatcher.add_agent() 1308 self.dispatcher = None 1309 1310 self.queue_entry_ids = task.queue_entry_ids 1311 self.host_ids = task.host_ids 1312 1313 self.started = False 1314 self.finished = False 1315 1316 1317 def tick(self): 1318 self.started = True 1319 if not self.finished: 1320 self.task.poll() 1321 if self.task.is_done(): 1322 self.finished = True 1323 1324 1325 def is_done(self): 1326 return self.finished 1327 1328 1329 def abort(self): 1330 if self.task: 1331 self.task.abort() 1332 if self.task.aborted: 1333 # tasks can choose to ignore aborts 1334 self.finished = True 1335 1336 1337 class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals): 1338 """ 1339 Common functionality for QueueTask and HostlessQueueTask 1340 """ 1341 def __init__(self, queue_entries): 1342 super(AbstractQueueTask, self).__init__() 1343 self.job = queue_entries[0].job 1344 self.queue_entries = queue_entries 1345 1346 1347 def _keyval_path(self): 1348 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 1349 1350 1351 def _write_control_file(self, execution_path): 1352 control_path = _drone_manager.attach_file_to_execution( 1353 execution_path, self.job.control_file) 1354 return control_path 1355 1356 1357 # TODO: Refactor into autoserv_utils. crbug.com/243090 1358 def _command_line(self): 1359 execution_path = self.queue_entries[0].execution_path() 1360 control_path = self._write_control_file(execution_path) 1361 hostnames = ','.join(entry.host.hostname 1362 for entry in self.queue_entries 1363 if not entry.is_hostless()) 1364 1365 execution_tag = self.queue_entries[0].execution_tag() 1366 params = _autoserv_command_line( 1367 hostnames, 1368 ['-P', execution_tag, '-n', 1369 _drone_manager.absolute_path(control_path)], 1370 job=self.job, verbose=False) 1371 1372 return params 1373 1374 1375 @property 1376 def num_processes(self): 1377 return len(self.queue_entries) 1378 1379 1380 @property 1381 def owner_username(self): 1382 return self.job.owner 1383 1384 1385 def _working_directory(self): 1386 return self._get_consistent_execution_path(self.queue_entries) 1387 1388 1389 def prolog(self): 1390 queued_key, queued_time = self._job_queued_keyval(self.job) 1391 keyval_dict = self.job.keyval_dict() 1392 keyval_dict[queued_key] = queued_time 1393 self._write_keyvals_before_job(keyval_dict) 1394 for queue_entry in self.queue_entries: 1395 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING) 1396 queue_entry.set_started_on_now() 1397 1398 1399 def _write_lost_process_error_file(self): 1400 error_file_path = os.path.join(self._working_directory(), 'job_failure') 1401 _drone_manager.write_lines_to_file(error_file_path, 1402 [_LOST_PROCESS_ERROR]) 1403 1404 1405 def _finish_task(self): 1406 if not self.monitor: 1407 return 1408 1409 self._write_job_finished() 1410 1411 if self.monitor.lost_process: 1412 self._write_lost_process_error_file() 1413 1414 1415 def _write_status_comment(self, comment): 1416 _drone_manager.write_lines_to_file( 1417 os.path.join(self._working_directory(), 'status.log'), 1418 ['INFO\t----\t----\t' + comment], 1419 paired_with_process=self.monitor.get_process()) 1420 1421 1422 def _log_abort(self): 1423 if not self.monitor or not self.monitor.has_process(): 1424 return 1425 1426 # build up sets of all the aborted_by and aborted_on values 1427 aborted_by, aborted_on = set(), set() 1428 for queue_entry in self.queue_entries: 1429 if queue_entry.aborted_by: 1430 aborted_by.add(queue_entry.aborted_by) 1431 t = int(time.mktime(queue_entry.aborted_on.timetuple())) 1432 aborted_on.add(t) 1433 1434 # extract some actual, unique aborted by value and write it out 1435 # TODO(showard): this conditional is now obsolete, we just need to leave 1436 # it in temporarily for backwards compatibility over upgrades. delete 1437 # soon. 1438 assert len(aborted_by) <= 1 1439 if len(aborted_by) == 1: 1440 aborted_by_value = aborted_by.pop() 1441 aborted_on_value = max(aborted_on) 1442 else: 1443 aborted_by_value = 'autotest_system' 1444 aborted_on_value = int(time.time()) 1445 1446 self._write_keyval_after_job("aborted_by", aborted_by_value) 1447 self._write_keyval_after_job("aborted_on", aborted_on_value) 1448 1449 aborted_on_string = str(datetime.datetime.fromtimestamp( 1450 aborted_on_value)) 1451 self._write_status_comment('Job aborted by %s on %s' % 1452 (aborted_by_value, aborted_on_string)) 1453 1454 1455 def abort(self): 1456 super(AbstractQueueTask, self).abort() 1457 self._log_abort() 1458 self._finish_task() 1459 1460 1461 def epilog(self): 1462 super(AbstractQueueTask, self).epilog() 1463 self._finish_task() 1464 1465 1466 class QueueTask(AbstractQueueTask): 1467 def __init__(self, queue_entries): 1468 super(QueueTask, self).__init__(queue_entries) 1469 self._set_ids(queue_entries=queue_entries) 1470 self._enable_ssp_container = ( 1471 global_config.global_config.get_config_value( 1472 'AUTOSERV', 'enable_ssp_container', type=bool, 1473 default=True)) 1474 1475 1476 def prolog(self): 1477 self._check_queue_entry_statuses( 1478 self.queue_entries, 1479 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING, 1480 models.HostQueueEntry.Status.RUNNING), 1481 allowed_host_statuses=(models.Host.Status.PENDING, 1482 models.Host.Status.RUNNING)) 1483 1484 super(QueueTask, self).prolog() 1485 1486 for queue_entry in self.queue_entries: 1487 self._write_host_keyvals(queue_entry.host) 1488 queue_entry.host.set_status(models.Host.Status.RUNNING) 1489 queue_entry.host.update_field('dirty', 1) 1490 1491 1492 def _finish_task(self): 1493 super(QueueTask, self)._finish_task() 1494 1495 for queue_entry in self.queue_entries: 1496 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING) 1497 queue_entry.host.set_status(models.Host.Status.RUNNING) 1498 1499 1500 def _command_line(self): 1501 invocation = super(QueueTask, self)._command_line() 1502 # Check if server-side packaging is needed. 1503 if (self._enable_ssp_container and 1504 self.job.control_type == control_data.CONTROL_TYPE.SERVER and 1505 self.job.require_ssp != False): 1506 invocation += ['--require-ssp'] 1507 keyval_dict = self.job.keyval_dict() 1508 test_source_build = keyval_dict.get('test_source_build', None) 1509 if test_source_build: 1510 invocation += ['--test_source_build', test_source_build] 1511 if self.job.parent_job_id: 1512 invocation += ['--parent_job_id', str(self.job.parent_job_id)] 1513 return invocation + ['--verify_job_repo_url'] 1514 1515 1516 class HostlessQueueTask(AbstractQueueTask): 1517 def __init__(self, queue_entry): 1518 super(HostlessQueueTask, self).__init__([queue_entry]) 1519 self.queue_entry_ids = [queue_entry.id] 1520 1521 1522 def prolog(self): 1523 super(HostlessQueueTask, self).prolog() 1524 1525 1526 def _finish_task(self): 1527 super(HostlessQueueTask, self)._finish_task() 1528 1529 # When a job is added to database, its initial status is always 1530 # Starting. In a scheduler tick, scheduler finds all jobs in Starting 1531 # status, check if any of them can be started. If scheduler hits some 1532 # limit, e.g., max_hostless_jobs_per_drone, scheduler will 1533 # leave these jobs in Starting status. Otherwise, the jobs' 1534 # status will be changed to Running, and an autoserv process 1535 # will be started in drone for each of these jobs. 1536 # If the entry is still in status Starting, the process has not started 1537 # yet. Therefore, there is no need to parse and collect log. Without 1538 # this check, exception will be raised by scheduler as execution_subdir 1539 # for this queue entry does not have a value yet. 1540 hqe = self.queue_entries[0] 1541 if hqe.status != models.HostQueueEntry.Status.STARTING: 1542 hqe.set_status(models.HostQueueEntry.Status.PARSING) 1543 1544 1545 if __name__ == '__main__': 1546 main() 1547