Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 
      3 #pylint: disable=C0111
      4 
      5 """
      6 Autotest scheduler
      7 """
      8 
      9 import datetime
     10 import functools
     11 import logging
     12 import optparse
     13 import os
     14 import signal
     15 import sys
     16 import time
     17 
     18 import common
     19 from autotest_lib.frontend import setup_django_environment
     20 
     21 import django.db
     22 
     23 from autotest_lib.client.common_lib import control_data
     24 from autotest_lib.client.common_lib import global_config
     25 from autotest_lib.client.common_lib import utils
     26 from autotest_lib.frontend.afe import models
     27 from autotest_lib.scheduler import agent_task, drone_manager
     28 from autotest_lib.scheduler import email_manager, host_scheduler
     29 from autotest_lib.scheduler import luciferlib
     30 from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
     31 from autotest_lib.scheduler import postjob_task
     32 from autotest_lib.scheduler import query_managers
     33 from autotest_lib.scheduler import scheduler_lib
     34 from autotest_lib.scheduler import scheduler_models
     35 from autotest_lib.scheduler import scheduler_config
     36 from autotest_lib.server import autoserv_utils
     37 from autotest_lib.server import system_utils
     38 from autotest_lib.server import utils as server_utils
     39 from autotest_lib.site_utils import server_manager_utils
     40 
     41 try:
     42     from chromite.lib import metrics
     43     from chromite.lib import ts_mon_config
     44 except ImportError:
     45     metrics = utils.metrics_mock
     46     ts_mon_config = utils.metrics_mock
     47 
     48 
     49 PID_FILE_PREFIX = 'monitor_db'
     50 
     51 RESULTS_DIR = '.'
     52 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
     53 
     54 if os.environ.has_key('AUTOTEST_DIR'):
     55     AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
     56 AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
     57 AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
     58 
     59 if AUTOTEST_SERVER_DIR not in sys.path:
     60     sys.path.insert(0, AUTOTEST_SERVER_DIR)
     61 
     62 # error message to leave in results dir when an autoserv process disappears
     63 # mysteriously
     64 _LOST_PROCESS_ERROR = """\
     65 Autoserv failed abnormally during execution for this job, probably due to a
     66 system error on the Autotest server.  Full results may not be available.  Sorry.
     67 """
     68 
     69 _db_manager = None
     70 _db = None
     71 _shutdown = False
     72 
     73 # These 2 globals are replaced for testing
     74 _autoserv_directory = autoserv_utils.autoserv_directory
     75 _autoserv_path = autoserv_utils.autoserv_path
     76 _testing_mode = False
     77 _drone_manager = None
     78 
     79 
     80 def _verify_default_drone_set_exists():
     81     if (models.DroneSet.drone_sets_enabled() and
     82             not models.DroneSet.default_drone_set_name()):
     83         raise scheduler_lib.SchedulerError(
     84                 'Drone sets are enabled, but no default is set')
     85 
     86 
     87 def _sanity_check():
     88     """Make sure the configs are consistent before starting the scheduler"""
     89     _verify_default_drone_set_exists()
     90 
     91 
     92 def main():
     93     try:
     94         try:
     95             main_without_exception_handling()
     96         except SystemExit:
     97             raise
     98         except:
     99             logging.exception('Exception escaping in monitor_db')
    100             raise
    101     finally:
    102         utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
    103 
    104 
    105 def main_without_exception_handling():
    106     scheduler_lib.setup_logging(
    107             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    108             os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
    109     usage = 'usage: %prog [options] results_dir'
    110     parser = optparse.OptionParser(usage)
    111     parser.add_option('--recover-hosts', help='Try to recover dead hosts',
    112                       action='store_true')
    113     parser.add_option('--test', help='Indicate that scheduler is under ' +
    114                       'test and should use dummy autoserv and no parsing',
    115                       action='store_true')
    116     parser.add_option(
    117             '--metrics-file',
    118             help='If provided, drop metrics to this local file instead of '
    119                  'reporting to ts_mon',
    120             type=str,
    121             default=None,
    122     )
    123     parser.add_option(
    124             '--lifetime-hours',
    125             type=float,
    126             default=None,
    127             help='If provided, number of hours the scheduler should run for. '
    128                  'At the expiry of this time, the process will exit '
    129                  'gracefully.',
    130     )
    131     parser.add_option('--production',
    132                       help=('Indicate that scheduler is running in production '
    133                             'environment and it can use database that is not '
    134                             'hosted in localhost. If it is set to False, '
    135                             'scheduler will fail if database is not in '
    136                             'localhost.'),
    137                       action='store_true', default=False)
    138     (options, args) = parser.parse_args()
    139     if len(args) != 1:
    140         parser.print_usage()
    141         return
    142 
    143     scheduler_lib.check_production_settings(options)
    144 
    145     scheduler_enabled = global_config.global_config.get_config_value(
    146         scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
    147 
    148     if not scheduler_enabled:
    149         logging.error("Scheduler not enabled, set enable_scheduler to true in "
    150                       "the global_config's SCHEDULER section to enable it. "
    151                       "Exiting.")
    152         sys.exit(1)
    153 
    154     global RESULTS_DIR
    155     RESULTS_DIR = args[0]
    156 
    157     # Change the cwd while running to avoid issues incase we were launched from
    158     # somewhere odd (such as a random NFS home directory of the person running
    159     # sudo to launch us as the appropriate user).
    160     os.chdir(RESULTS_DIR)
    161 
    162     # This is helpful for debugging why stuff a scheduler launches is
    163     # misbehaving.
    164     logging.info('os.environ: %s', os.environ)
    165 
    166     if options.test:
    167         global _autoserv_path
    168         _autoserv_path = 'autoserv_dummy'
    169         global _testing_mode
    170         _testing_mode = True
    171 
    172     with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
    173                                              indirect=True,
    174                                              debug_file=options.metrics_file):
    175       try:
    176           metrics.Counter('chromeos/autotest/scheduler/start').increment()
    177           process_start_time = time.time()
    178           initialize()
    179           dispatcher = Dispatcher()
    180           dispatcher.initialize(recover_hosts=options.recover_hosts)
    181           minimum_tick_sec = global_config.global_config.get_config_value(
    182                   scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
    183 
    184           # TODO(crbug.com/837680): Force creating the current user.
    185           # This is a dirty hack to work around a race; see bug.
    186           models.User.current_user()
    187 
    188           while not _shutdown:
    189               if _lifetime_expired(options.lifetime_hours, process_start_time):
    190                   break
    191 
    192               start = time.time()
    193               dispatcher.tick()
    194               curr_tick_sec = time.time() - start
    195               if minimum_tick_sec > curr_tick_sec:
    196                   time.sleep(minimum_tick_sec - curr_tick_sec)
    197               else:
    198                   time.sleep(0.0001)
    199       except server_manager_utils.ServerActionError as e:
    200           # This error is expected when the server is not in primary status
    201           # for scheduler role. Thus do not send email for it.
    202           logging.exception(e)
    203       except Exception:
    204           logging.exception('Uncaught exception, terminating monitor_db.')
    205           metrics.Counter('chromeos/autotest/scheduler/uncaught_exception'
    206                           ).increment()
    207 
    208     email_manager.manager.send_queued_emails()
    209     _drone_manager.shutdown()
    210     _db_manager.disconnect()
    211 
    212 
    213 def handle_signal(signum, frame):
    214     global _shutdown
    215     _shutdown = True
    216     logging.info("Shutdown request received.")
    217 
    218 
    219 def _lifetime_expired(lifetime_hours, process_start_time):
    220     """Returns True if we've expired the process lifetime, False otherwise.
    221 
    222     Also sets the global _shutdown so that any background processes also take
    223     the cue to exit.
    224     """
    225     if lifetime_hours is None:
    226         return False
    227     if time.time() - process_start_time > lifetime_hours * 3600:
    228         logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
    229                      lifetime_hours)
    230         global _shutdown
    231         _shutdown = True
    232         return True
    233     return False
    234 
    235 
    236 def initialize():
    237     logging.info("%s> dispatcher starting", time.strftime("%X %x"))
    238     logging.info("My PID is %d", os.getpid())
    239 
    240     if utils.program_is_alive(PID_FILE_PREFIX):
    241         logging.critical("monitor_db already running, aborting!")
    242         sys.exit(1)
    243     utils.write_pid(PID_FILE_PREFIX)
    244 
    245     if _testing_mode:
    246         global_config.global_config.override_config_value(
    247             scheduler_lib.DB_CONFIG_SECTION, 'database',
    248             'stresstest_autotest_web')
    249 
    250     # If server database is enabled, check if the server has role `scheduler`.
    251     # If the server does not have scheduler role, exception will be raised and
    252     # scheduler will not continue to run.
    253     if server_manager_utils.use_server_db():
    254         server_manager_utils.confirm_server_has_role(hostname='localhost',
    255                                                      role='scheduler')
    256 
    257     os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
    258     global _db_manager
    259     _db_manager = scheduler_lib.ConnectionManager()
    260     global _db
    261     _db = _db_manager.get_connection()
    262     logging.info("Setting signal handler")
    263     signal.signal(signal.SIGINT, handle_signal)
    264     signal.signal(signal.SIGTERM, handle_signal)
    265 
    266     initialize_globals()
    267     scheduler_models.initialize()
    268 
    269     drone_list = system_utils.get_drones()
    270     results_host = global_config.global_config.get_config_value(
    271         scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
    272     _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
    273 
    274     logging.info("Connected! Running...")
    275 
    276 
    277 def initialize_globals():
    278     global _drone_manager
    279     _drone_manager = drone_manager.instance()
    280 
    281 
    282 def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
    283                            verbose=True):
    284     """
    285     @returns The autoserv command line as a list of executable + parameters.
    286 
    287     @param machines - string - A machine or comma separated list of machines
    288             for the (-m) flag.
    289     @param extra_args - list - Additional arguments to pass to autoserv.
    290     @param job - Job object - If supplied, -u owner, -l name and client -c or
    291             server -s parameters will be added.
    292     @param queue_entry - A HostQueueEntry object - If supplied and no Job
    293             object was supplied, this will be used to lookup the Job object.
    294     """
    295     command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
    296             machines, results_directory=drone_manager.WORKING_DIRECTORY,
    297             extra_args=extra_args, job=job, queue_entry=queue_entry,
    298             verbose=verbose, in_lab=True)
    299     return command
    300 
    301 def _calls_log_tick_msg(func):
    302     """Used to trace functions called by Dispatcher.tick."""
    303     @functools.wraps(func)
    304     def wrapper(self, *args, **kwargs):
    305         self._log_tick_msg('Starting %s' % func.__name__)
    306         return func(self, *args, **kwargs)
    307 
    308     return wrapper
    309 
    310 
    311 class Dispatcher(object):
    312 
    313 
    314     def __init__(self):
    315         self._agents = []
    316         self._last_clean_time = time.time()
    317         user_cleanup_time = scheduler_config.config.clean_interval_minutes
    318         self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
    319                 _db, user_cleanup_time)
    320         self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
    321                 _db, _drone_manager)
    322         self._host_agents = {}
    323         self._queue_entry_agents = {}
    324         self._tick_count = 0
    325         self._tick_debug = global_config.global_config.get_config_value(
    326                 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
    327                 default=False)
    328         self._extra_debugging = global_config.global_config.get_config_value(
    329                 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
    330                 default=False)
    331         self._inline_host_acquisition = (
    332                 global_config.global_config.get_config_value(
    333                         scheduler_config.CONFIG_SECTION,
    334                         'inline_host_acquisition', type=bool, default=True))
    335 
    336         # If _inline_host_acquisition is set the scheduler will acquire and
    337         # release hosts against jobs inline, with the tick. Otherwise the
    338         # scheduler will only focus on jobs that already have hosts, and
    339         # will not explicitly unlease a host when a job finishes using it.
    340         self._job_query_manager = query_managers.AFEJobQueryManager()
    341         self._host_scheduler = (host_scheduler.BaseHostScheduler()
    342                                 if self._inline_host_acquisition else
    343                                 host_scheduler.DummyHostScheduler())
    344 
    345 
    346     def initialize(self, recover_hosts=True):
    347         self._periodic_cleanup.initialize()
    348         self._24hr_upkeep.initialize()
    349         # Execute all actions queued in the cleanup tasks. Scheduler tick will
    350         # run a refresh task first. If there is any action in the queue, refresh
    351         # will raise an exception.
    352         _drone_manager.execute_actions()
    353 
    354         # always recover processes
    355         self._recover_processes()
    356 
    357         if recover_hosts:
    358             self._recover_hosts()
    359 
    360 
    361     # TODO(pprabhu) Drop this metric once tick_times has been verified.
    362     @metrics.SecondsTimerDecorator(
    363             'chromeos/autotest/scheduler/tick_durations/tick')
    364     def tick(self):
    365         """
    366         This is an altered version of tick() where we keep track of when each
    367         major step begins so we can try to figure out where we are using most
    368         of the tick time.
    369         """
    370         with metrics.RuntimeBreakdownTimer(
    371             'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
    372             self._log_tick_msg('New tick')
    373             system_utils.DroneCache.refresh()
    374 
    375             with breakdown_timer.Step('trigger_refresh'):
    376                 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
    377                 _drone_manager.trigger_refresh()
    378             with breakdown_timer.Step('schedule_running_host_queue_entries'):
    379                 self._schedule_running_host_queue_entries()
    380             with breakdown_timer.Step('schedule_special_tasks'):
    381                 self._schedule_special_tasks()
    382             with breakdown_timer.Step('schedule_new_jobs'):
    383                 self._schedule_new_jobs()
    384             with breakdown_timer.Step('gather_tick_metrics'):
    385                 self._gather_tick_metrics()
    386             with breakdown_timer.Step('sync_refresh'):
    387                 self._log_tick_msg('Starting _drone_manager.sync_refresh')
    388                 _drone_manager.sync_refresh()
    389             if luciferlib.is_lucifer_enabled():
    390                 with breakdown_timer.Step('send_to_lucifer'):
    391                     self._send_to_lucifer()
    392             # _run_cleanup must be called between drone_manager.sync_refresh,
    393             # and drone_manager.execute_actions, as sync_refresh will clear the
    394             # calls queued in drones. Therefore, any action that calls
    395             # drone.queue_call to add calls to the drone._calls, should be after
    396             # drone refresh is completed and before
    397             # drone_manager.execute_actions at the end of the tick.
    398             with breakdown_timer.Step('run_cleanup'):
    399                 self._run_cleanup()
    400             with breakdown_timer.Step('find_aborting'):
    401                 self._find_aborting()
    402             with breakdown_timer.Step('find_aborted_special_tasks'):
    403                 self._find_aborted_special_tasks()
    404             with breakdown_timer.Step('handle_agents'):
    405                 self._handle_agents()
    406             with breakdown_timer.Step('host_scheduler_tick'):
    407                 self._log_tick_msg('Starting _host_scheduler.tick')
    408                 self._host_scheduler.tick()
    409             with breakdown_timer.Step('drones_execute_actions'):
    410                 self._log_tick_msg('Starting _drone_manager.execute_actions')
    411                 _drone_manager.execute_actions()
    412             with breakdown_timer.Step('send_queued_emails'):
    413                 self._log_tick_msg(
    414                     'Starting email_manager.manager.send_queued_emails')
    415                 email_manager.manager.send_queued_emails()
    416             with breakdown_timer.Step('db_reset_queries'):
    417                 self._log_tick_msg('Starting django.db.reset_queries')
    418                 django.db.reset_queries()
    419 
    420             self._tick_count += 1
    421             metrics.Counter('chromeos/autotest/scheduler/tick').increment()
    422 
    423 
    424     @_calls_log_tick_msg
    425     def _run_cleanup(self):
    426         self._periodic_cleanup.run_cleanup_maybe()
    427         self._24hr_upkeep.run_cleanup_maybe()
    428 
    429 
    430     def _gather_tick_metrics(self):
    431         """Gather metrics during tick, after all tasks have been scheduled."""
    432         metrics.Gauge(
    433             'chromeos/autotest/scheduler/agent_count'
    434         ).set(len(self._agents))
    435 
    436 
    437     def _register_agent_for_ids(self, agent_dict, object_ids, agent):
    438         for object_id in object_ids:
    439             agent_dict.setdefault(object_id, set()).add(agent)
    440 
    441 
    442     def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
    443         for object_id in object_ids:
    444             assert object_id in agent_dict
    445             agent_dict[object_id].remove(agent)
    446             # If an ID has no more active agent associated, there is no need to
    447             # keep it in the dictionary. Otherwise, scheduler will keep an
    448             # unnecessarily big dictionary until being restarted.
    449             if not agent_dict[object_id]:
    450                 agent_dict.pop(object_id)
    451 
    452 
    453     def add_agent_task(self, agent_task):
    454         """
    455         Creates and adds an agent to the dispatchers list.
    456 
    457         In creating the agent we also pass on all the queue_entry_ids and
    458         host_ids from the special agent task. For every agent we create, we
    459         add it to 1. a dict against the queue_entry_ids given to it 2. A dict
    460         against the host_ids given to it. So theoritically, a host can have any
    461         number of agents associated with it, and each of them can have any
    462         special agent task, though in practice we never see > 1 agent/task per
    463         host at any time.
    464 
    465         @param agent_task: A SpecialTask for the agent to manage.
    466         """
    467         if luciferlib.is_enabled_for('STARTING'):
    468             # TODO(crbug.com/810141): Transition code.  After running at
    469             # STARTING for a while, these tasks should no longer exist.
    470             if (isinstance(agent_task, postjob_task.GatherLogsTask)
    471                 # TODO(crbug.com/811877): Don't skip split HQE parsing.
    472                 or (isinstance(agent_task, postjob_task.FinalReparseTask)
    473                     and not luciferlib.is_split_job(
    474                             agent_task.queue_entries[0].id))):
    475                 return
    476             if isinstance(agent_task, AbstractQueueTask):
    477                 # If Lucifer already owns the job, ignore the agent.
    478                 if luciferlib.is_lucifer_owned_by_id(agent_task.job.id):
    479                     return
    480                 # If the job isn't started yet, let Lucifer own it.
    481                 if not agent_task.started:
    482                     return
    483                 # Otherwise, this is a STARTING job that Autotest owned
    484                 # before Lucifer was enabled for STARTING.  Allow the
    485                 # scheduler to recover the agent task normally.
    486 
    487         agent = Agent(agent_task)
    488         self._agents.append(agent)
    489         agent.dispatcher = self
    490         self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
    491         self._register_agent_for_ids(self._queue_entry_agents,
    492                                      agent.queue_entry_ids, agent)
    493 
    494 
    495     def get_agents_for_entry(self, queue_entry):
    496         """
    497         Find agents corresponding to the specified queue_entry.
    498         """
    499         return list(self._queue_entry_agents.get(queue_entry.id, set()))
    500 
    501 
    502     def host_has_agent(self, host):
    503         """
    504         Determine if there is currently an Agent present using this host.
    505         """
    506         return bool(self._host_agents.get(host.id, None))
    507 
    508 
    509     def remove_agent(self, agent):
    510         self._agents.remove(agent)
    511         self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
    512                                        agent)
    513         self._unregister_agent_for_ids(self._queue_entry_agents,
    514                                        agent.queue_entry_ids, agent)
    515 
    516 
    517     def _host_has_scheduled_special_task(self, host):
    518         return bool(models.SpecialTask.objects.filter(host__id=host.id,
    519                                                       is_active=False,
    520                                                       is_complete=False))
    521 
    522 
    523     def _recover_processes(self):
    524         agent_tasks = self._create_recovery_agent_tasks()
    525         self._register_pidfiles(agent_tasks)
    526         _drone_manager.refresh()
    527         self._recover_tasks(agent_tasks)
    528         self._recover_pending_entries()
    529         self._check_for_unrecovered_verifying_entries()
    530         self._reverify_remaining_hosts()
    531         # reinitialize drones after killing orphaned processes, since they can
    532         # leave around files when they die
    533         _drone_manager.execute_actions()
    534         _drone_manager.reinitialize_drones()
    535 
    536 
    537     def _create_recovery_agent_tasks(self):
    538         return (self._get_queue_entry_agent_tasks()
    539                 + self._get_special_task_agent_tasks(is_active=True))
    540 
    541 
    542     def _get_queue_entry_agent_tasks(self):
    543         """
    544         Get agent tasks for all hqe in the specified states.
    545 
    546         Loosely this translates to taking a hqe in one of the specified states,
    547         say parsing, and getting an AgentTask for it, like the FinalReparseTask,
    548         through _get_agent_task_for_queue_entry. Each queue entry can only have
    549         one agent task at a time, but there might be multiple queue entries in
    550         the group.
    551 
    552         @return: A list of AgentTasks.
    553         """
    554         # host queue entry statuses handled directly by AgentTasks
    555         # (Verifying is handled through SpecialTasks, so is not
    556         # listed here)
    557         statuses = (models.HostQueueEntry.Status.STARTING,
    558                     models.HostQueueEntry.Status.RUNNING,
    559                     models.HostQueueEntry.Status.GATHERING,
    560                     models.HostQueueEntry.Status.PARSING)
    561         status_list = ','.join("'%s'" % status for status in statuses)
    562         queue_entries = scheduler_models.HostQueueEntry.fetch(
    563                 where='status IN (%s)' % status_list)
    564 
    565         agent_tasks = []
    566         used_queue_entries = set()
    567         hqe_count_by_status = {}
    568         for entry in queue_entries:
    569             try:
    570                 hqe_count_by_status[entry.status] = (
    571                     hqe_count_by_status.get(entry.status, 0) + 1)
    572                 if self.get_agents_for_entry(entry):
    573                     # already being handled
    574                     continue
    575                 if entry in used_queue_entries:
    576                     # already picked up by a synchronous job
    577                     continue
    578                 try:
    579                     agent_task = self._get_agent_task_for_queue_entry(entry)
    580                 except scheduler_lib.SchedulerError:
    581                     # Probably being handled by lucifer crbug.com/809773
    582                     continue
    583                 agent_tasks.append(agent_task)
    584                 used_queue_entries.update(agent_task.queue_entries)
    585             except scheduler_lib.MalformedRecordError as e:
    586                 logging.exception('Skipping agent task for a malformed hqe.')
    587                 # TODO(akeshet): figure out a way to safely permanently discard
    588                 # this errant HQE. It appears that calling entry.abort() is not
    589                 # sufficient, as that already makes some assumptions about
    590                 # record sanity that may be violated. See crbug.com/739530 for
    591                 # context.
    592                 m = 'chromeos/autotest/scheduler/skipped_malformed_hqe'
    593                 metrics.Counter(m).increment()
    594 
    595         for status, count in hqe_count_by_status.iteritems():
    596             metrics.Gauge(
    597                 'chromeos/autotest/scheduler/active_host_queue_entries'
    598             ).set(count, fields={'status': status})
    599 
    600         return agent_tasks
    601 
    602 
    603     def _get_special_task_agent_tasks(self, is_active=False):
    604         special_tasks = models.SpecialTask.objects.filter(
    605                 is_active=is_active, is_complete=False)
    606         agent_tasks = []
    607         for task in special_tasks:
    608           try:
    609               agent_tasks.append(self._get_agent_task_for_special_task(task))
    610           except scheduler_lib.MalformedRecordError as e:
    611               logging.exception('Skipping agent task for malformed special '
    612                                 'task.')
    613               m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'
    614               metrics.Counter(m).increment()
    615         return agent_tasks
    616 
    617 
    618     def _get_agent_task_for_queue_entry(self, queue_entry):
    619         """
    620         Construct an AgentTask instance for the given active HostQueueEntry.
    621 
    622         @param queue_entry: a HostQueueEntry
    623         @return: an AgentTask to run the queue entry
    624         """
    625         task_entries = queue_entry.job.get_group_entries(queue_entry)
    626         self._check_for_duplicate_host_entries(task_entries)
    627 
    628         if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
    629                                   models.HostQueueEntry.Status.RUNNING):
    630             if queue_entry.is_hostless():
    631                 return HostlessQueueTask(queue_entry=queue_entry)
    632             return QueueTask(queue_entries=task_entries)
    633         if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
    634             return postjob_task.GatherLogsTask(queue_entries=task_entries)
    635         if queue_entry.status == models.HostQueueEntry.Status.PARSING:
    636             return postjob_task.FinalReparseTask(queue_entries=task_entries)
    637 
    638         raise scheduler_lib.MalformedRecordError(
    639                 '_get_agent_task_for_queue_entry got entry with '
    640                 'invalid status %s: %s' % (queue_entry.status, queue_entry))
    641 
    642 
    643     def _check_for_duplicate_host_entries(self, task_entries):
    644         non_host_statuses = {models.HostQueueEntry.Status.PARSING}
    645         for task_entry in task_entries:
    646             using_host = (task_entry.host is not None
    647                           and task_entry.status not in non_host_statuses)
    648             if using_host:
    649                 self._assert_host_has_no_agent(task_entry)
    650 
    651 
    652     def _assert_host_has_no_agent(self, entry):
    653         """
    654         @param entry: a HostQueueEntry or a SpecialTask
    655         """
    656         if self.host_has_agent(entry.host):
    657             agent = tuple(self._host_agents.get(entry.host.id))[0]
    658             raise scheduler_lib.MalformedRecordError(
    659                     'While scheduling %s, host %s already has a host agent %s'
    660                     % (entry, entry.host, agent.task))
    661 
    662 
    663     def _get_agent_task_for_special_task(self, special_task):
    664         """
    665         Construct an AgentTask class to run the given SpecialTask and add it
    666         to this dispatcher.
    667 
    668         A special task is created through schedule_special_tasks, but only if
    669         the host doesn't already have an agent. This happens through
    670         add_agent_task. All special agent tasks are given a host on creation,
    671         and a Null hqe. To create a SpecialAgentTask object, you need a
    672         models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
    673         object contains a hqe it's passed on to the special agent task, which
    674         creates a HostQueueEntry and saves it as it's queue_entry.
    675 
    676         @param special_task: a models.SpecialTask instance
    677         @returns an AgentTask to run this SpecialTask
    678         """
    679         self._assert_host_has_no_agent(special_task)
    680 
    681         special_agent_task_classes = (prejob_task.CleanupTask,
    682                                       prejob_task.VerifyTask,
    683                                       prejob_task.RepairTask,
    684                                       prejob_task.ResetTask,
    685                                       prejob_task.ProvisionTask)
    686 
    687         for agent_task_class in special_agent_task_classes:
    688             if agent_task_class.TASK_TYPE == special_task.task:
    689                 return agent_task_class(task=special_task)
    690 
    691         raise scheduler_lib.MalformedRecordError(
    692                 'No AgentTask class for task', str(special_task))
    693 
    694 
    695     def _register_pidfiles(self, agent_tasks):
    696         for agent_task in agent_tasks:
    697             agent_task.register_necessary_pidfiles()
    698 
    699 
    700     def _recover_tasks(self, agent_tasks):
    701         orphans = _drone_manager.get_orphaned_autoserv_processes()
    702 
    703         for agent_task in agent_tasks:
    704             agent_task.recover()
    705             if agent_task.monitor and agent_task.monitor.has_process():
    706                 orphans.discard(agent_task.monitor.get_process())
    707             self.add_agent_task(agent_task)
    708 
    709         self._check_for_remaining_orphan_processes(orphans)
    710 
    711 
    712     def _get_unassigned_entries(self, status):
    713         for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
    714                                                            % status):
    715             if entry.status == status and not self.get_agents_for_entry(entry):
    716                 # The status can change during iteration, e.g., if job.run()
    717                 # sets a group of queue entries to Starting
    718                 yield entry
    719 
    720 
    721     def _check_for_remaining_orphan_processes(self, orphans):
    722         m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
    723         metrics.Gauge(m).set(len(orphans))
    724 
    725         if not orphans:
    726             return
    727         subject = 'Unrecovered orphan autoserv processes remain'
    728         message = '\n'.join(str(process) for process in orphans)
    729         die_on_orphans = global_config.global_config.get_config_value(
    730             scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
    731 
    732         if die_on_orphans:
    733             raise RuntimeError(subject + '\n' + message)
    734 
    735 
    736     def _recover_pending_entries(self):
    737         for entry in self._get_unassigned_entries(
    738                 models.HostQueueEntry.Status.PENDING):
    739             logging.info('Recovering Pending entry %s', entry)
    740             try:
    741                 entry.on_pending()
    742             except scheduler_lib.MalformedRecordError as e:
    743                 logging.exception(
    744                         'Skipping agent task for malformed special task.')
    745                 m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'
    746                 metrics.Counter(m).increment()
    747 
    748 
    749     def _check_for_unrecovered_verifying_entries(self):
    750         # Verify is replaced by Reset.
    751         queue_entries = scheduler_models.HostQueueEntry.fetch(
    752                 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)
    753         for queue_entry in queue_entries:
    754             special_tasks = models.SpecialTask.objects.filter(
    755                     task__in=(models.SpecialTask.Task.CLEANUP,
    756                               models.SpecialTask.Task.VERIFY,
    757                               models.SpecialTask.Task.RESET),
    758                     queue_entry__id=queue_entry.id,
    759                     is_complete=False)
    760             if special_tasks.count() == 0:
    761                 logging.error('Unrecovered Resetting host queue entry: %s. ',
    762                               str(queue_entry))
    763                 # Essentially this host queue entry was set to be Verifying
    764                 # however no special task exists for entry. This occurs if the
    765                 # scheduler dies between changing the status and creating the
    766                 # special task. By setting it to queued, the job can restart
    767                 # from the beginning and proceed correctly. This is much more
    768                 # preferable than having monitor_db not launching.
    769                 logging.info('Setting host status for %s to Ready',
    770                              str(queue_entry.host))
    771                 # Let's at least run a cleanup/reset before reusing this DUT.
    772                 queue_entry.host.update_field('dirty', 1)
    773                 queue_entry.host.set_status(models.Host.Status.READY)
    774                 logging.info('Setting status for HQE %s to Queued.',
    775                              str(queue_entry))
    776                 queue_entry.set_status('Queued')
    777 
    778 
    779     @_calls_log_tick_msg
    780     def _schedule_special_tasks(self):
    781         """
    782         Execute queued SpecialTasks that are ready to run on idle hosts.
    783 
    784         Special tasks include PreJobTasks like verify, reset and cleanup.
    785         They are created through _schedule_new_jobs and associated with a hqe
    786         This method translates SpecialTasks to the appropriate AgentTask and
    787         adds them to the dispatchers agents list, so _handle_agents can execute
    788         them.
    789         """
    790         # When the host scheduler is responsible for acquisition we only want
    791         # to run tasks with leased hosts. All hqe tasks will already have
    792         # leased hosts, and we don't want to run frontend tasks till the host
    793         # scheduler has vetted the assignment. Note that this doesn't include
    794         # frontend tasks with hosts leased by other active hqes.
    795         for task in self._job_query_manager.get_prioritized_special_tasks(
    796                 only_tasks_with_leased_hosts=not self._inline_host_acquisition):
    797             if self.host_has_agent(task.host):
    798                 continue
    799             try:
    800                 self.add_agent_task(self._get_agent_task_for_special_task(task))
    801             except scheduler_lib.MalformedRecordError:
    802                 logging.exception('Skipping schedule for malformed '
    803                                   'special task.')
    804                 m = 'chromeos/autotest/scheduler/skipped_schedule_special_task'
    805                 metrics.Counter(m).increment()
    806 
    807 
    808     def _reverify_remaining_hosts(self):
    809         # recover active hosts that have not yet been recovered, although this
    810         # should never happen
    811         message = ('Recovering active host %s - this probably indicates a '
    812                    'scheduler bug')
    813         self._reverify_hosts_where(
    814                 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
    815                 print_message=message)
    816 
    817 
    818     DEFAULT_REQUESTED_BY_USER_ID = 1
    819 
    820 
    821     def _reverify_hosts_where(self, where,
    822                               print_message='Reverifying host %s'):
    823         full_where = 'locked = 0 AND invalid = 0 AND %s' % where
    824         for host in scheduler_models.Host.fetch(where=full_where):
    825             if self.host_has_agent(host):
    826                 # host has already been recovered in some way
    827                 continue
    828             if self._host_has_scheduled_special_task(host):
    829                 # host will have a special task scheduled on the next cycle
    830                 continue
    831             if host.shard_id is not None and not server_utils.is_shard():
    832                 # I am master and host is owned by a shard, ignore it.
    833                 continue
    834             if print_message:
    835                 logging.error(print_message, host.hostname)
    836             try:
    837                 user = models.User.objects.get(login='autotest_system')
    838             except models.User.DoesNotExist:
    839                 user = models.User.objects.get(
    840                         id=self.DEFAULT_REQUESTED_BY_USER_ID)
    841             models.SpecialTask.objects.create(
    842                     task=models.SpecialTask.Task.RESET,
    843                     host=models.Host.objects.get(id=host.id),
    844                     requested_by=user)
    845 
    846 
    847     def _recover_hosts(self):
    848         # recover "Repair Failed" hosts
    849         message = 'Reverifying dead host %s'
    850         self._reverify_hosts_where("status = 'Repair Failed'",
    851                                    print_message=message)
    852 
    853 
    854     def _refresh_pending_queue_entries(self):
    855         """
    856         Lookup the pending HostQueueEntries and call our HostScheduler
    857         refresh() method given that list.  Return the list.
    858 
    859         @returns A list of pending HostQueueEntries sorted in priority order.
    860         """
    861         queue_entries = self._job_query_manager.get_pending_queue_entries(
    862                 only_hostless=not self._inline_host_acquisition)
    863         if not queue_entries:
    864             return []
    865         return queue_entries
    866 
    867 
    868     def _schedule_hostless_job(self, queue_entry):
    869         """Schedule a hostless (suite) job.
    870 
    871         @param queue_entry: The queue_entry representing the hostless job.
    872         """
    873         if not luciferlib.is_enabled_for('STARTING'):
    874             self.add_agent_task(HostlessQueueTask(queue_entry))
    875 
    876         # Need to set execution_subdir before setting the status:
    877         # After a restart of the scheduler, agents will be restored for HQEs in
    878         # Starting, Running, Gathering, Parsing or Archiving. To do this, the
    879         # execution_subdir is needed. Therefore it must be set before entering
    880         # one of these states.
    881         # Otherwise, if the scheduler was interrupted between setting the status
    882         # and the execution_subdir, upon it's restart restoring agents would
    883         # fail.
    884         # Is there a way to get a status in one of these states without going
    885         # through this code? Following cases are possible:
    886         # - If it's aborted before being started:
    887         #     active bit will be 0, so there's nothing to parse, it will just be
    888         #     set to completed by _find_aborting. Critical statuses are skipped.
    889         # - If it's aborted or it fails after being started:
    890         #     It was started, so this code was executed.
    891         queue_entry.update_field('execution_subdir', 'hostless')
    892         queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
    893 
    894 
    895     def _schedule_host_job(self, host, queue_entry):
    896         """Schedules a job on the given host.
    897 
    898         1. Assign the host to the hqe, if it isn't already assigned.
    899         2. Create a SpecialAgentTask for the hqe.
    900         3. Activate the hqe.
    901 
    902         @param queue_entry: The job to schedule.
    903         @param host: The host to schedule the job on.
    904         """
    905         if self.host_has_agent(host):
    906             host_agent_task = list(self._host_agents.get(host.id))[0].task
    907         else:
    908             self._host_scheduler.schedule_host_job(host, queue_entry)
    909 
    910 
    911     @_calls_log_tick_msg
    912     def _schedule_new_jobs(self):
    913         """
    914         Find any new HQEs and call schedule_pre_job_tasks for it.
    915 
    916         This involves setting the status of the HQE and creating a row in the
    917         db corresponding the the special task, through
    918         scheduler_models._queue_special_task. The new db row is then added as
    919         an agent to the dispatcher through _schedule_special_tasks and
    920         scheduled for execution on the drone through _handle_agents.
    921         """
    922         queue_entries = self._refresh_pending_queue_entries()
    923 
    924         key = 'scheduler.jobs_per_tick'
    925         new_hostless_jobs = 0
    926         new_jobs_with_hosts = 0
    927         new_jobs_need_hosts = 0
    928         host_jobs = []
    929         logging.debug('Processing %d queue_entries', len(queue_entries))
    930 
    931         for queue_entry in queue_entries:
    932             if queue_entry.is_hostless():
    933                 self._schedule_hostless_job(queue_entry)
    934                 new_hostless_jobs = new_hostless_jobs + 1
    935             else:
    936                 host_jobs.append(queue_entry)
    937                 new_jobs_need_hosts = new_jobs_need_hosts + 1
    938 
    939         metrics.Counter(
    940             'chromeos/autotest/scheduler/scheduled_jobs_hostless'
    941         ).increment_by(new_hostless_jobs)
    942 
    943         if not host_jobs:
    944             return
    945 
    946         if not self._inline_host_acquisition:
    947           # In this case, host_scheduler is responsible for scheduling
    948           # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
    949           # since host_scheduler assumes it is the single process scheduling
    950           # host jobs.
    951           metrics.Gauge(
    952               'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
    953                   len(host_jobs))
    954           return
    955 
    956         jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
    957         for host_assignment in jobs_with_hosts:
    958             self._schedule_host_job(host_assignment.host, host_assignment.job)
    959             new_jobs_with_hosts = new_jobs_with_hosts + 1
    960 
    961         metrics.Counter(
    962             'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
    963         ).increment_by(new_jobs_with_hosts)
    964 
    965 
    966     @_calls_log_tick_msg
    967     def _send_to_lucifer(self):
    968         """
    969         Hand off ownership of a job to lucifer component.
    970         """
    971         self._send_starting_to_lucifer()
    972         self._send_parsing_to_lucifer()
    973 
    974 
    975     # TODO(crbug.com/748234): This is temporary to enable toggling
    976     # lucifer rollouts with an option.
    977     def _send_starting_to_lucifer(self):
    978         Status = models.HostQueueEntry.Status
    979         queue_entries_qs = (models.HostQueueEntry.objects
    980                             .filter(status=Status.STARTING))
    981         for queue_entry in queue_entries_qs:
    982             if self.get_agents_for_entry(queue_entry):
    983                 continue
    984             job = queue_entry.job
    985             if luciferlib.is_lucifer_owned(job):
    986                 continue
    987             try:
    988                 drone = luciferlib.spawn_starting_job_handler(
    989                         manager=_drone_manager,
    990                         job=job)
    991             except Exception:
    992                 logging.exception('Error when sending job to Lucifer')
    993                 models.HostQueueEntry.abort_host_queue_entries(
    994                         job.hostqueueentry_set.all())
    995             else:
    996                 models.JobHandoff.objects.create(
    997                         job=job, drone=drone.hostname())
    998 
    999 
   1000     # TODO(crbug.com/748234): This is temporary to enable toggling
   1001     # lucifer rollouts with an option.
   1002     def _send_parsing_to_lucifer(self):
   1003         Status = models.HostQueueEntry.Status
   1004         queue_entries_qs = (models.HostQueueEntry.objects
   1005                             .filter(status=Status.PARSING))
   1006         for queue_entry in queue_entries_qs:
   1007             # If this HQE already has an agent, let monitor_db continue
   1008             # owning it.
   1009             if self.get_agents_for_entry(queue_entry):
   1010                 continue
   1011             job = queue_entry.job
   1012             if luciferlib.is_lucifer_owned(job):
   1013                 continue
   1014             # TODO(crbug.com/811877): Ignore split HQEs.
   1015             if luciferlib.is_split_job(queue_entry.id):
   1016                 continue
   1017             task = postjob_task.PostJobTask(
   1018                     [queue_entry], log_file_name='/dev/null')
   1019             pidfile_id = task._autoserv_monitor.pidfile_id
   1020             autoserv_exit = task._autoserv_monitor.exit_code()
   1021             try:
   1022                 drone = luciferlib.spawn_parsing_job_handler(
   1023                         manager=_drone_manager,
   1024                         job=job,
   1025                         autoserv_exit=autoserv_exit,
   1026                         pidfile_id=pidfile_id)
   1027                 models.JobHandoff.objects.create(job=job,
   1028                                                  drone=drone.hostname())
   1029             except drone_manager.DroneManagerError as e:
   1030                 logging.warning(
   1031                     'Fail to get drone for job %s, skipping lucifer. Error: %s',
   1032                     job.id, e)
   1033 
   1034 
   1035 
   1036     @_calls_log_tick_msg
   1037     def _schedule_running_host_queue_entries(self):
   1038         """
   1039         Adds agents to the dispatcher.
   1040 
   1041         Any AgentTask, like the QueueTask, is wrapped in an Agent. The
   1042         QueueTask for example, will have a job with a control file, and
   1043         the agent will have methods that poll, abort and check if the queue
   1044         task is finished. The dispatcher runs the agent_task, as well as
   1045         other agents in it's _agents member, through _handle_agents, by
   1046         calling the Agents tick().
   1047 
   1048         This method creates an agent for each HQE in one of (starting, running,
   1049         gathering, parsing) states, and adds it to the dispatcher so
   1050         it is handled by _handle_agents.
   1051         """
   1052         for agent_task in self._get_queue_entry_agent_tasks():
   1053             self.add_agent_task(agent_task)
   1054 
   1055 
   1056     @_calls_log_tick_msg
   1057     def _find_aborting(self):
   1058         """
   1059         Looks through the afe_host_queue_entries for an aborted entry.
   1060 
   1061         The aborted bit is set on an HQE in many ways, the most common
   1062         being when a user requests an abort through the frontend, which
   1063         results in an rpc from the afe to abort_host_queue_entries.
   1064         """
   1065         jobs_to_stop = set()
   1066         for entry in scheduler_models.HostQueueEntry.fetch(
   1067                 where='aborted=1 and complete=0'):
   1068             if (luciferlib.is_enabled_for('STARTING')
   1069                 and luciferlib.is_lucifer_owned_by_id(entry.job.id)):
   1070                 continue
   1071 
   1072             # If the job is running on a shard, let the shard handle aborting
   1073             # it and sync back the right status.
   1074             if entry.job.shard_id is not None and not server_utils.is_shard():
   1075                 # Due to crbug.com/894162, we abort jobs that 1hr beyond
   1076                 # timeout on master.
   1077                 create_on = time.mktime(entry.job.created_on.timetuple())
   1078                 wait_threshold = entry.job.timeout_mins * 60 + 3600
   1079                 abort_anyway = wait_threshold < time.time() - create_on
   1080                 if abort_anyway:
   1081                     logging.info('Aborting %s on master due to '
   1082                                  'the job 1 hour beyond timeout.', entry)
   1083                 else:
   1084                     logging.info('Waiting for shard %s to abort hqe %s',
   1085                             entry.job.shard_id, entry)
   1086                     continue
   1087 
   1088             logging.info('Aborting %s', entry)
   1089 
   1090             # The task would have started off with both is_complete and
   1091             # is_active = False. Aborted tasks are neither active nor complete.
   1092             # For all currently active tasks this will happen through the agent,
   1093             # but we need to manually update the special tasks that haven't
   1094             # started yet, because they don't have agents.
   1095             models.SpecialTask.objects.filter(is_active=False,
   1096                 queue_entry_id=entry.id).update(is_complete=True)
   1097 
   1098             for agent in self.get_agents_for_entry(entry):
   1099                 agent.abort()
   1100             entry.abort(self)
   1101             jobs_to_stop.add(entry.job)
   1102         logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
   1103         for job in jobs_to_stop:
   1104             job.stop_if_necessary()
   1105 
   1106 
   1107     @_calls_log_tick_msg
   1108     def _find_aborted_special_tasks(self):
   1109         """
   1110         Find SpecialTasks that have been marked for abortion.
   1111 
   1112         Poll the database looking for SpecialTasks that are active
   1113         and have been marked for abortion, then abort them.
   1114         """
   1115 
   1116         # The completed and active bits are very important when it comes
   1117         # to scheduler correctness. The active bit is set through the prolog
   1118         # of a special task, and reset through the cleanup method of the
   1119         # SpecialAgentTask. The cleanup is called both through the abort and
   1120         # epilog. The complete bit is set in several places, and in general
   1121         # a hanging job will have is_active=1 is_complete=0, while a special
   1122         # task which completed will have is_active=0 is_complete=1. To check
   1123         # aborts we directly check active because the complete bit is set in
   1124         # several places, including the epilog of agent tasks.
   1125         aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
   1126                                                           is_aborted=True)
   1127         for task in aborted_tasks:
   1128             # There are 2 ways to get the agent associated with a task,
   1129             # through the host and through the hqe. A special task
   1130             # always needs a host, but doesn't always need a hqe.
   1131             for agent in self._host_agents.get(task.host.id, []):
   1132                 if isinstance(agent.task, agent_task.SpecialAgentTask):
   1133 
   1134                     # The epilog preforms critical actions such as
   1135                     # queueing the next SpecialTask, requeuing the
   1136                     # hqe etc, however it doesn't actually kill the
   1137                     # monitor process and set the 'done' bit. Epilogs
   1138                     # assume that the job failed, and that the monitor
   1139                     # process has already written an exit code. The
   1140                     # done bit is a necessary condition for
   1141                     # _handle_agents to schedule any more special
   1142                     # tasks against the host, and it must be set
   1143                     # in addition to is_active, is_complete and success.
   1144                     agent.task.epilog()
   1145                     agent.task.abort()
   1146 
   1147 
   1148     def _can_start_agent(self, agent, have_reached_limit):
   1149         # always allow zero-process agents to run
   1150         if agent.task.num_processes == 0:
   1151             return True
   1152         # don't allow any nonzero-process agents to run after we've reached a
   1153         # limit (this avoids starvation of many-process agents)
   1154         if have_reached_limit:
   1155             return False
   1156         # total process throttling
   1157         max_runnable_processes = _drone_manager.max_runnable_processes(
   1158                 agent.task.owner_username,
   1159                 agent.task.get_drone_hostnames_allowed())
   1160         if agent.task.num_processes > max_runnable_processes:
   1161             return False
   1162         return True
   1163 
   1164 
   1165     @_calls_log_tick_msg
   1166     def _handle_agents(self):
   1167         """
   1168         Handles agents of the dispatcher.
   1169 
   1170         Appropriate Agents are added to the dispatcher through
   1171         _schedule_running_host_queue_entries. These agents each
   1172         have a task. This method runs the agents task through
   1173         agent.tick() leading to:
   1174             agent.start
   1175                 prolog -> AgentTasks prolog
   1176                           For each queue entry:
   1177                             sets host status/status to Running
   1178                             set started_on in afe_host_queue_entries
   1179                 run    -> AgentTasks run
   1180                           Creates PidfileRunMonitor
   1181                           Queues the autoserv command line for this AgentTask
   1182                           via the drone manager. These commands are executed
   1183                           through the drone managers execute actions.
   1184                 poll   -> AgentTasks/BaseAgentTask poll
   1185                           checks the monitors exit_code.
   1186                           Executes epilog if task is finished.
   1187                           Executes AgentTasks _finish_task
   1188                 finish_task is usually responsible for setting the status
   1189                 of the HQE/host, and updating it's active and complete fileds.
   1190 
   1191             agent.is_done
   1192                 Removed the agent from the dispatchers _agents queue.
   1193                 Is_done checks the finished bit on the agent, that is
   1194                 set based on the Agents task. During the agents poll
   1195                 we check to see if the monitor process has exited in
   1196                 it's finish method, and set the success member of the
   1197                 task based on this exit code.
   1198         """
   1199         num_started_this_tick = 0
   1200         num_finished_this_tick = 0
   1201         have_reached_limit = False
   1202         # iterate over copy, so we can remove agents during iteration
   1203         logging.debug('Handling %d Agents', len(self._agents))
   1204         for agent in list(self._agents):
   1205             self._log_extra_msg('Processing Agent with Host Ids: %s and '
   1206                                 'queue_entry ids:%s' % (agent.host_ids,
   1207                                 agent.queue_entry_ids))
   1208             if not agent.started:
   1209                 if not self._can_start_agent(agent, have_reached_limit):
   1210                     have_reached_limit = True
   1211                     logging.debug('Reached Limit of allowed running Agents.')
   1212                     continue
   1213                 num_started_this_tick += agent.task.num_processes
   1214                 self._log_extra_msg('Starting Agent')
   1215             agent.tick()
   1216             self._log_extra_msg('Agent tick completed.')
   1217             if agent.is_done():
   1218                 num_finished_this_tick += agent.task.num_processes
   1219                 self._log_extra_msg("Agent finished")
   1220                 self.remove_agent(agent)
   1221 
   1222         metrics.Counter(
   1223             'chromeos/autotest/scheduler/agent_processes_started'
   1224         ).increment_by(num_started_this_tick)
   1225         metrics.Counter(
   1226             'chromeos/autotest/scheduler/agent_processes_finished'
   1227         ).increment_by(num_finished_this_tick)
   1228         num_agent_processes = _drone_manager.total_running_processes()
   1229         metrics.Gauge(
   1230             'chromeos/autotest/scheduler/agent_processes'
   1231         ).set(num_agent_processes)
   1232         logging.info('%d running processes. %d added this tick.',
   1233                      num_agent_processes, num_started_this_tick)
   1234 
   1235 
   1236     def _log_tick_msg(self, msg):
   1237         if self._tick_debug:
   1238             logging.debug(msg)
   1239 
   1240 
   1241     def _log_extra_msg(self, msg):
   1242         if self._extra_debugging:
   1243             logging.debug(msg)
   1244 
   1245 
   1246 class Agent(object):
   1247     """
   1248     An agent for use by the Dispatcher class to perform a task.  An agent wraps
   1249     around an AgentTask mainly to associate the AgentTask with the queue_entry
   1250     and host ids.
   1251 
   1252     The following methods are required on all task objects:
   1253         poll() - Called periodically to let the task check its status and
   1254                 update its internal state.  If the task succeeded.
   1255         is_done() - Returns True if the task is finished.
   1256         abort() - Called when an abort has been requested.  The task must
   1257                 set its aborted attribute to True if it actually aborted.
   1258 
   1259     The following attributes are required on all task objects:
   1260         aborted - bool, True if this task was aborted.
   1261         success - bool, True if this task succeeded.
   1262         queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
   1263         host_ids - A sequence of Host ids this task represents.
   1264     """
   1265 
   1266 
   1267     def __init__(self, task):
   1268         """
   1269         @param task: An instance of an AgentTask.
   1270         """
   1271         self.task = task
   1272 
   1273         # This is filled in by Dispatcher.add_agent()
   1274         self.dispatcher = None
   1275 
   1276         self.queue_entry_ids = task.queue_entry_ids
   1277         self.host_ids = task.host_ids
   1278 
   1279         self.started = False
   1280         self.finished = False
   1281 
   1282 
   1283     def tick(self):
   1284         self.started = True
   1285         if not self.finished:
   1286             self.task.poll()
   1287             if self.task.is_done():
   1288                 self.finished = True
   1289 
   1290 
   1291     def is_done(self):
   1292         return self.finished
   1293 
   1294 
   1295     def abort(self):
   1296         if self.task:
   1297             self.task.abort()
   1298             if self.task.aborted:
   1299                 # tasks can choose to ignore aborts
   1300                 self.finished = True
   1301 
   1302 
   1303 class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
   1304     """
   1305     Common functionality for QueueTask and HostlessQueueTask
   1306     """
   1307     def __init__(self, queue_entries):
   1308         super(AbstractQueueTask, self).__init__()
   1309         self.job = queue_entries[0].job
   1310         self.queue_entries = queue_entries
   1311 
   1312 
   1313     def _keyval_path(self):
   1314         return os.path.join(self._working_directory(), self._KEYVAL_FILE)
   1315 
   1316 
   1317     def _write_control_file(self, execution_path):
   1318         control_path = _drone_manager.attach_file_to_execution(
   1319                 execution_path, self.job.control_file)
   1320         return control_path
   1321 
   1322 
   1323     # TODO: Refactor into autoserv_utils. crbug.com/243090
   1324     def _command_line(self):
   1325         execution_path = self.queue_entries[0].execution_path()
   1326         control_path = self._write_control_file(execution_path)
   1327         hostnames = ','.join(entry.host.hostname
   1328                              for entry in self.queue_entries
   1329                              if not entry.is_hostless())
   1330 
   1331         execution_tag = self.queue_entries[0].execution_tag()
   1332         params = _autoserv_command_line(
   1333             hostnames,
   1334             ['-P', execution_tag, '-n',
   1335              _drone_manager.absolute_path(control_path)],
   1336             job=self.job, verbose=False)
   1337 
   1338         return params
   1339 
   1340 
   1341     @property
   1342     def num_processes(self):
   1343         return len(self.queue_entries)
   1344 
   1345 
   1346     @property
   1347     def owner_username(self):
   1348         return self.job.owner
   1349 
   1350 
   1351     def _working_directory(self):
   1352         return self._get_consistent_execution_path(self.queue_entries)
   1353 
   1354 
   1355     def prolog(self):
   1356         queued_key, queued_time = self._job_queued_keyval(self.job)
   1357         keyval_dict = self.job.keyval_dict()
   1358         keyval_dict[queued_key] = queued_time
   1359         self._write_keyvals_before_job(keyval_dict)
   1360         for queue_entry in self.queue_entries:
   1361             queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
   1362             queue_entry.set_started_on_now()
   1363 
   1364 
   1365     def _write_lost_process_error_file(self):
   1366         error_file_path = os.path.join(self._working_directory(), 'job_failure')
   1367         _drone_manager.write_lines_to_file(error_file_path,
   1368                                            [_LOST_PROCESS_ERROR])
   1369 
   1370 
   1371     def _finish_task(self):
   1372         if not self.monitor:
   1373             return
   1374 
   1375         self._write_job_finished()
   1376 
   1377         if self.monitor.lost_process:
   1378             self._write_lost_process_error_file()
   1379 
   1380 
   1381     def _write_status_comment(self, comment):
   1382         _drone_manager.write_lines_to_file(
   1383             os.path.join(self._working_directory(), 'status.log'),
   1384             ['INFO\t----\t----\t' + comment],
   1385             paired_with_process=self.monitor.get_process())
   1386 
   1387 
   1388     def _log_abort(self):
   1389         if not self.monitor or not self.monitor.has_process():
   1390             return
   1391 
   1392         # build up sets of all the aborted_by and aborted_on values
   1393         aborted_by, aborted_on = set(), set()
   1394         for queue_entry in self.queue_entries:
   1395             if queue_entry.aborted_by:
   1396                 aborted_by.add(queue_entry.aborted_by)
   1397                 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
   1398                 aborted_on.add(t)
   1399 
   1400         # extract some actual, unique aborted by value and write it out
   1401         # TODO(showard): this conditional is now obsolete, we just need to leave
   1402         # it in temporarily for backwards compatibility over upgrades.  delete
   1403         # soon.
   1404         assert len(aborted_by) <= 1
   1405         if len(aborted_by) == 1:
   1406             aborted_by_value = aborted_by.pop()
   1407             aborted_on_value = max(aborted_on)
   1408         else:
   1409             aborted_by_value = 'autotest_system'
   1410             aborted_on_value = int(time.time())
   1411 
   1412         self._write_keyval_after_job("aborted_by", aborted_by_value)
   1413         self._write_keyval_after_job("aborted_on", aborted_on_value)
   1414 
   1415         aborted_on_string = str(datetime.datetime.fromtimestamp(
   1416             aborted_on_value))
   1417         self._write_status_comment('Job aborted by %s on %s' %
   1418                                    (aborted_by_value, aborted_on_string))
   1419 
   1420 
   1421     def abort(self):
   1422         super(AbstractQueueTask, self).abort()
   1423         self._log_abort()
   1424         self._finish_task()
   1425 
   1426 
   1427     def epilog(self):
   1428         super(AbstractQueueTask, self).epilog()
   1429         self._finish_task()
   1430 
   1431 
   1432 class QueueTask(AbstractQueueTask):
   1433     def __init__(self, queue_entries):
   1434         super(QueueTask, self).__init__(queue_entries)
   1435         self._set_ids(queue_entries=queue_entries)
   1436         self._enable_ssp_container = (
   1437                 global_config.global_config.get_config_value(
   1438                         'AUTOSERV', 'enable_ssp_container', type=bool,
   1439                         default=True))
   1440 
   1441 
   1442     def prolog(self):
   1443         self._check_queue_entry_statuses(
   1444                 self.queue_entries,
   1445                 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
   1446                                       models.HostQueueEntry.Status.RUNNING),
   1447                 allowed_host_statuses=(models.Host.Status.PENDING,
   1448                                        models.Host.Status.RUNNING))
   1449 
   1450         super(QueueTask, self).prolog()
   1451 
   1452         for queue_entry in self.queue_entries:
   1453             self._write_host_keyvals(queue_entry.host)
   1454             queue_entry.host.set_status(models.Host.Status.RUNNING)
   1455             queue_entry.host.update_field('dirty', 1)
   1456 
   1457 
   1458     def _finish_task(self):
   1459         super(QueueTask, self)._finish_task()
   1460 
   1461         for queue_entry in self.queue_entries:
   1462             queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
   1463             queue_entry.host.set_status(models.Host.Status.RUNNING)
   1464 
   1465 
   1466     def _command_line(self):
   1467         invocation = super(QueueTask, self)._command_line()
   1468         # Check if server-side packaging is needed.
   1469         if (self._enable_ssp_container and
   1470             self.job.control_type == control_data.CONTROL_TYPE.SERVER and
   1471             self.job.require_ssp != False):
   1472             invocation += ['--require-ssp']
   1473             keyval_dict = self.job.keyval_dict()
   1474             test_source_build = keyval_dict.get('test_source_build', None)
   1475             if test_source_build:
   1476                 invocation += ['--test_source_build', test_source_build]
   1477         if self.job.parent_job_id:
   1478             invocation += ['--parent_job_id', str(self.job.parent_job_id)]
   1479         return invocation + ['--verify_job_repo_url']
   1480 
   1481 
   1482 class HostlessQueueTask(AbstractQueueTask):
   1483     def __init__(self, queue_entry):
   1484         super(HostlessQueueTask, self).__init__([queue_entry])
   1485         self.queue_entry_ids = [queue_entry.id]
   1486 
   1487 
   1488     def prolog(self):
   1489         super(HostlessQueueTask, self).prolog()
   1490 
   1491 
   1492     def _finish_task(self):
   1493         super(HostlessQueueTask, self)._finish_task()
   1494 
   1495         # When a job is added to database, its initial status is always
   1496         # Starting. In a scheduler tick, scheduler finds all jobs in Starting
   1497         # status, check if any of them can be started. If scheduler hits some
   1498         # limit, e.g., max_hostless_jobs_per_drone, scheduler will
   1499         # leave these jobs in Starting status. Otherwise, the jobs'
   1500         # status will be changed to Running, and an autoserv process
   1501         # will be started in drone for each of these jobs.
   1502         # If the entry is still in status Starting, the process has not started
   1503         # yet. Therefore, there is no need to parse and collect log. Without
   1504         # this check, exception will be raised by scheduler as execution_subdir
   1505         # for this queue entry does not have a value yet.
   1506         hqe = self.queue_entries[0]
   1507         if hqe.status != models.HostQueueEntry.Status.STARTING:
   1508             hqe.set_status(models.HostQueueEntry.Status.PARSING)
   1509 
   1510 
   1511 if __name__ == '__main__':
   1512     main()
   1513