Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 
      3 #pylint: disable=C0111
      4 
      5 """
      6 Autotest scheduler
      7 """
      8 
      9 import datetime
     10 import functools
     11 import gc
     12 import logging
     13 import optparse
     14 import os
     15 import signal
     16 import sys
     17 import time
     18 
     19 import common
     20 from autotest_lib.frontend import setup_django_environment
     21 
     22 import django.db
     23 
     24 from autotest_lib.client.common_lib import control_data
     25 from autotest_lib.client.common_lib import global_config
     26 from autotest_lib.client.common_lib import utils
     27 from autotest_lib.frontend.afe import models
     28 from autotest_lib.scheduler import agent_task, drone_manager
     29 from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
     30 from autotest_lib.scheduler import luciferlib
     31 from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
     32 from autotest_lib.scheduler import postjob_task
     33 from autotest_lib.scheduler import query_managers
     34 from autotest_lib.scheduler import scheduler_lib
     35 from autotest_lib.scheduler import scheduler_models
     36 from autotest_lib.scheduler import scheduler_config
     37 from autotest_lib.server import autoserv_utils
     38 from autotest_lib.server import system_utils
     39 from autotest_lib.server import utils as server_utils
     40 from autotest_lib.site_utils import server_manager_utils
     41 
     42 try:
     43     from chromite.lib import metrics
     44     from chromite.lib import ts_mon_config
     45 except ImportError:
     46     metrics = utils.metrics_mock
     47     ts_mon_config = utils.metrics_mock
     48 
     49 
     50 PID_FILE_PREFIX = 'monitor_db'
     51 
     52 RESULTS_DIR = '.'
     53 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
     54 
     55 if os.environ.has_key('AUTOTEST_DIR'):
     56     AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
     57 AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
     58 AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
     59 
     60 if AUTOTEST_SERVER_DIR not in sys.path:
     61     sys.path.insert(0, AUTOTEST_SERVER_DIR)
     62 
     63 # error message to leave in results dir when an autoserv process disappears
     64 # mysteriously
     65 _LOST_PROCESS_ERROR = """\
     66 Autoserv failed abnormally during execution for this job, probably due to a
     67 system error on the Autotest server.  Full results may not be available.  Sorry.
     68 """
     69 
     70 _db_manager = None
     71 _db = None
     72 _shutdown = False
     73 
     74 # These 2 globals are replaced for testing
     75 _autoserv_directory = autoserv_utils.autoserv_directory
     76 _autoserv_path = autoserv_utils.autoserv_path
     77 _testing_mode = False
     78 _drone_manager = None
     79 
     80 
     81 def _verify_default_drone_set_exists():
     82     if (models.DroneSet.drone_sets_enabled() and
     83             not models.DroneSet.default_drone_set_name()):
     84         raise scheduler_lib.SchedulerError(
     85                 'Drone sets are enabled, but no default is set')
     86 
     87 
     88 def _sanity_check():
     89     """Make sure the configs are consistent before starting the scheduler"""
     90     _verify_default_drone_set_exists()
     91 
     92 
     93 def main():
     94     try:
     95         try:
     96             main_without_exception_handling()
     97         except SystemExit:
     98             raise
     99         except:
    100             logging.exception('Exception escaping in monitor_db')
    101             raise
    102     finally:
    103         utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
    104 
    105 
    106 def main_without_exception_handling():
    107     scheduler_lib.setup_logging(
    108             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    109             os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
    110     usage = 'usage: %prog [options] results_dir'
    111     parser = optparse.OptionParser(usage)
    112     parser.add_option('--recover-hosts', help='Try to recover dead hosts',
    113                       action='store_true')
    114     parser.add_option('--test', help='Indicate that scheduler is under ' +
    115                       'test and should use dummy autoserv and no parsing',
    116                       action='store_true')
    117     parser.add_option(
    118             '--metrics-file',
    119             help='If provided, drop metrics to this local file instead of '
    120                  'reporting to ts_mon',
    121             type=str,
    122             default=None,
    123     )
    124     parser.add_option(
    125             '--lifetime-hours',
    126             type=float,
    127             default=None,
    128             help='If provided, number of hours the scheduler should run for. '
    129                  'At the expiry of this time, the process will exit '
    130                  'gracefully.',
    131     )
    132     parser.add_option('--production',
    133                       help=('Indicate that scheduler is running in production '
    134                             'environment and it can use database that is not '
    135                             'hosted in localhost. If it is set to False, '
    136                             'scheduler will fail if database is not in '
    137                             'localhost.'),
    138                       action='store_true', default=False)
    139     (options, args) = parser.parse_args()
    140     if len(args) != 1:
    141         parser.print_usage()
    142         return
    143 
    144     scheduler_lib.check_production_settings(options)
    145 
    146     scheduler_enabled = global_config.global_config.get_config_value(
    147         scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
    148 
    149     if not scheduler_enabled:
    150         logging.error("Scheduler not enabled, set enable_scheduler to true in "
    151                       "the global_config's SCHEDULER section to enable it. "
    152                       "Exiting.")
    153         sys.exit(1)
    154 
    155     global RESULTS_DIR
    156     RESULTS_DIR = args[0]
    157 
    158     # Change the cwd while running to avoid issues incase we were launched from
    159     # somewhere odd (such as a random NFS home directory of the person running
    160     # sudo to launch us as the appropriate user).
    161     os.chdir(RESULTS_DIR)
    162 
    163     # This is helpful for debugging why stuff a scheduler launches is
    164     # misbehaving.
    165     logging.info('os.environ: %s', os.environ)
    166 
    167     if options.test:
    168         global _autoserv_path
    169         _autoserv_path = 'autoserv_dummy'
    170         global _testing_mode
    171         _testing_mode = True
    172 
    173     with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
    174                                              indirect=True,
    175                                              debug_file=options.metrics_file):
    176       try:
    177           metrics.Counter('chromeos/autotest/scheduler/start').increment()
    178           process_start_time = time.time()
    179           initialize()
    180           dispatcher = Dispatcher()
    181           dispatcher.initialize(recover_hosts=options.recover_hosts)
    182           minimum_tick_sec = global_config.global_config.get_config_value(
    183                   scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
    184 
    185           while not _shutdown:
    186               if _lifetime_expired(options.lifetime_hours, process_start_time):
    187                   break
    188 
    189               start = time.time()
    190               dispatcher.tick()
    191               curr_tick_sec = time.time() - start
    192               if minimum_tick_sec > curr_tick_sec:
    193                   time.sleep(minimum_tick_sec - curr_tick_sec)
    194               else:
    195                   time.sleep(0.0001)
    196       except server_manager_utils.ServerActionError as e:
    197           # This error is expected when the server is not in primary status
    198           # for scheduler role. Thus do not send email for it.
    199           logging.exception(e)
    200       except Exception:
    201           logging.exception('Uncaught exception, terminating monitor_db.')
    202           metrics.Counter('chromeos/autotest/scheduler/uncaught_exception'
    203                           ).increment()
    204 
    205     email_manager.manager.send_queued_emails()
    206     _drone_manager.shutdown()
    207     _db_manager.disconnect()
    208 
    209 
    210 def handle_signal(signum, frame):
    211     global _shutdown
    212     _shutdown = True
    213     logging.info("Shutdown request received.")
    214 
    215 
    216 def _lifetime_expired(lifetime_hours, process_start_time):
    217     """Returns True if we've expired the process lifetime, False otherwise.
    218 
    219     Also sets the global _shutdown so that any background processes also take
    220     the cue to exit.
    221     """
    222     if lifetime_hours is None:
    223         return False
    224     if time.time() - process_start_time > lifetime_hours * 3600:
    225         logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
    226                      lifetime_hours)
    227         global _shutdown
    228         _shutdown = True
    229         return True
    230     return False
    231 
    232 
    233 def initialize():
    234     logging.info("%s> dispatcher starting", time.strftime("%X %x"))
    235     logging.info("My PID is %d", os.getpid())
    236 
    237     if utils.program_is_alive(PID_FILE_PREFIX):
    238         logging.critical("monitor_db already running, aborting!")
    239         sys.exit(1)
    240     utils.write_pid(PID_FILE_PREFIX)
    241 
    242     if _testing_mode:
    243         global_config.global_config.override_config_value(
    244             scheduler_lib.DB_CONFIG_SECTION, 'database',
    245             'stresstest_autotest_web')
    246 
    247     # If server database is enabled, check if the server has role `scheduler`.
    248     # If the server does not have scheduler role, exception will be raised and
    249     # scheduler will not continue to run.
    250     if server_manager_utils.use_server_db():
    251         server_manager_utils.confirm_server_has_role(hostname='localhost',
    252                                                      role='scheduler')
    253 
    254     os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
    255     global _db_manager
    256     _db_manager = scheduler_lib.ConnectionManager()
    257     global _db
    258     _db = _db_manager.get_connection()
    259     logging.info("Setting signal handler")
    260     signal.signal(signal.SIGINT, handle_signal)
    261     signal.signal(signal.SIGTERM, handle_signal)
    262 
    263     initialize_globals()
    264     scheduler_models.initialize()
    265 
    266     drone_list = system_utils.get_drones()
    267     results_host = global_config.global_config.get_config_value(
    268         scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
    269     _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
    270 
    271     logging.info("Connected! Running...")
    272 
    273 
    274 def initialize_globals():
    275     global _drone_manager
    276     _drone_manager = drone_manager.instance()
    277 
    278 
    279 def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
    280                            verbose=True):
    281     """
    282     @returns The autoserv command line as a list of executable + parameters.
    283 
    284     @param machines - string - A machine or comma separated list of machines
    285             for the (-m) flag.
    286     @param extra_args - list - Additional arguments to pass to autoserv.
    287     @param job - Job object - If supplied, -u owner, -l name, --test-retry,
    288             and client -c or server -s parameters will be added.
    289     @param queue_entry - A HostQueueEntry object - If supplied and no Job
    290             object was supplied, this will be used to lookup the Job object.
    291     """
    292     command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
    293             machines, results_directory=drone_manager.WORKING_DIRECTORY,
    294             extra_args=extra_args, job=job, queue_entry=queue_entry,
    295             verbose=verbose, in_lab=True)
    296     return command
    297 
    298 def _calls_log_tick_msg(func):
    299     """Used to trace functions called by Dispatcher.tick."""
    300     @functools.wraps(func)
    301     def wrapper(self, *args, **kwargs):
    302         self._log_tick_msg('Starting %s' % func.__name__)
    303         return func(self, *args, **kwargs)
    304 
    305     return wrapper
    306 
    307 
    308 class Dispatcher(object):
    309 
    310 
    311     def __init__(self):
    312         self._agents = []
    313         self._last_clean_time = time.time()
    314         user_cleanup_time = scheduler_config.config.clean_interval_minutes
    315         self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
    316                 _db, user_cleanup_time)
    317         self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
    318                 _db, _drone_manager)
    319         self._host_agents = {}
    320         self._queue_entry_agents = {}
    321         self._tick_count = 0
    322         self._last_garbage_stats_time = time.time()
    323         self._seconds_between_garbage_stats = 60 * (
    324                 global_config.global_config.get_config_value(
    325                         scheduler_config.CONFIG_SECTION,
    326                         'gc_stats_interval_mins', type=int, default=6*60))
    327         self._tick_debug = global_config.global_config.get_config_value(
    328                 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
    329                 default=False)
    330         self._extra_debugging = global_config.global_config.get_config_value(
    331                 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
    332                 default=False)
    333         self._inline_host_acquisition = (
    334                 global_config.global_config.get_config_value(
    335                         scheduler_config.CONFIG_SECTION,
    336                         'inline_host_acquisition', type=bool, default=True))
    337 
    338         # If _inline_host_acquisition is set the scheduler will acquire and
    339         # release hosts against jobs inline, with the tick. Otherwise the
    340         # scheduler will only focus on jobs that already have hosts, and
    341         # will not explicitly unlease a host when a job finishes using it.
    342         self._job_query_manager = query_managers.AFEJobQueryManager()
    343         self._host_scheduler = (host_scheduler.BaseHostScheduler()
    344                                 if self._inline_host_acquisition else
    345                                 host_scheduler.DummyHostScheduler())
    346 
    347 
    348     def initialize(self, recover_hosts=True):
    349         self._periodic_cleanup.initialize()
    350         self._24hr_upkeep.initialize()
    351         # Execute all actions queued in the cleanup tasks. Scheduler tick will
    352         # run a refresh task first. If there is any action in the queue, refresh
    353         # will raise an exception.
    354         _drone_manager.execute_actions()
    355 
    356         # always recover processes
    357         self._recover_processes()
    358 
    359         if recover_hosts:
    360             self._recover_hosts()
    361 
    362 
    363     # TODO(pprabhu) Drop this metric once tick_times has been verified.
    364     @metrics.SecondsTimerDecorator(
    365             'chromeos/autotest/scheduler/tick_durations/tick')
    366     def tick(self):
    367         """
    368         This is an altered version of tick() where we keep track of when each
    369         major step begins so we can try to figure out where we are using most
    370         of the tick time.
    371         """
    372         with metrics.RuntimeBreakdownTimer(
    373             'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
    374             self._log_tick_msg('New tick')
    375             system_utils.DroneCache.refresh()
    376 
    377             with breakdown_timer.Step('garbage_collection'):
    378                 self._garbage_collection()
    379             with breakdown_timer.Step('trigger_refresh'):
    380                 self._log_tick_msg('Starting _drone_manager.trigger_refresh')
    381                 _drone_manager.trigger_refresh()
    382             with breakdown_timer.Step('schedule_running_host_queue_entries'):
    383                 self._schedule_running_host_queue_entries()
    384             with breakdown_timer.Step('schedule_special_tasks'):
    385                 self._schedule_special_tasks()
    386             with breakdown_timer.Step('schedule_new_jobs'):
    387                 self._schedule_new_jobs()
    388             with breakdown_timer.Step('gather_tick_metrics'):
    389                 self._gather_tick_metrics()
    390             with breakdown_timer.Step('sync_refresh'):
    391                 self._log_tick_msg('Starting _drone_manager.sync_refresh')
    392                 _drone_manager.sync_refresh()
    393             if luciferlib.is_lucifer_enabled():
    394                 with breakdown_timer.Step('send_to_lucifer'):
    395                     self._send_to_lucifer()
    396             # _run_cleanup must be called between drone_manager.sync_refresh,
    397             # and drone_manager.execute_actions, as sync_refresh will clear the
    398             # calls queued in drones. Therefore, any action that calls
    399             # drone.queue_call to add calls to the drone._calls, should be after
    400             # drone refresh is completed and before
    401             # drone_manager.execute_actions at the end of the tick.
    402             with breakdown_timer.Step('run_cleanup'):
    403                 self._run_cleanup()
    404             with breakdown_timer.Step('find_aborting'):
    405                 self._find_aborting()
    406             with breakdown_timer.Step('find_aborted_special_tasks'):
    407                 self._find_aborted_special_tasks()
    408             with breakdown_timer.Step('handle_agents'):
    409                 self._handle_agents()
    410             with breakdown_timer.Step('host_scheduler_tick'):
    411                 self._log_tick_msg('Starting _host_scheduler.tick')
    412                 self._host_scheduler.tick()
    413             with breakdown_timer.Step('drones_execute_actions'):
    414                 self._log_tick_msg('Starting _drone_manager.execute_actions')
    415                 _drone_manager.execute_actions()
    416             with breakdown_timer.Step('send_queued_emails'):
    417                 self._log_tick_msg(
    418                     'Starting email_manager.manager.send_queued_emails')
    419                 email_manager.manager.send_queued_emails()
    420             with breakdown_timer.Step('db_reset_queries'):
    421                 self._log_tick_msg('Starting django.db.reset_queries')
    422                 django.db.reset_queries()
    423 
    424             self._tick_count += 1
    425             metrics.Counter('chromeos/autotest/scheduler/tick').increment()
    426 
    427 
    428     @_calls_log_tick_msg
    429     def _run_cleanup(self):
    430         self._periodic_cleanup.run_cleanup_maybe()
    431         self._24hr_upkeep.run_cleanup_maybe()
    432 
    433 
    434     @_calls_log_tick_msg
    435     def _garbage_collection(self):
    436         threshold_time = time.time() - self._seconds_between_garbage_stats
    437         if threshold_time < self._last_garbage_stats_time:
    438             # Don't generate these reports very often.
    439             return
    440 
    441         self._last_garbage_stats_time = time.time()
    442         # Force a full level 0 collection (because we can, it doesn't hurt
    443         # at this interval).
    444         gc.collect()
    445         logging.info('Logging garbage collector stats on tick %d.',
    446                      self._tick_count)
    447         gc_stats._log_garbage_collector_stats()
    448 
    449 
    450     def _gather_tick_metrics(self):
    451         """Gather metrics during tick, after all tasks have been scheduled."""
    452         metrics.Gauge(
    453             'chromeos/autotest/scheduler/agent_count'
    454         ).set(len(self._agents))
    455 
    456 
    457     def _register_agent_for_ids(self, agent_dict, object_ids, agent):
    458         for object_id in object_ids:
    459             agent_dict.setdefault(object_id, set()).add(agent)
    460 
    461 
    462     def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
    463         for object_id in object_ids:
    464             assert object_id in agent_dict
    465             agent_dict[object_id].remove(agent)
    466             # If an ID has no more active agent associated, there is no need to
    467             # keep it in the dictionary. Otherwise, scheduler will keep an
    468             # unnecessarily big dictionary until being restarted.
    469             if not agent_dict[object_id]:
    470                 agent_dict.pop(object_id)
    471 
    472 
    473     def add_agent_task(self, agent_task):
    474         """
    475         Creates and adds an agent to the dispatchers list.
    476 
    477         In creating the agent we also pass on all the queue_entry_ids and
    478         host_ids from the special agent task. For every agent we create, we
    479         add it to 1. a dict against the queue_entry_ids given to it 2. A dict
    480         against the host_ids given to it. So theoritically, a host can have any
    481         number of agents associated with it, and each of them can have any
    482         special agent task, though in practice we never see > 1 agent/task per
    483         host at any time.
    484 
    485         @param agent_task: A SpecialTask for the agent to manage.
    486         """
    487         # These are owned by lucifer; don't manage these tasks.
    488         if (luciferlib.is_enabled_for('GATHERING')
    489             and (isinstance(agent_task, postjob_task.GatherLogsTask)
    490                  # TODO(crbug.com/811877): Don't skip split HQE parsing.
    491                  or (isinstance(agent_task, postjob_task.FinalReparseTask)
    492                      and not luciferlib.is_split_job(
    493                              agent_task.queue_entries[0].id)))):
    494             return
    495         if luciferlib.is_enabled_for('STARTING'):
    496             # TODO(crbug.com/810141): Transition code.  After running at
    497             # STARTING for a while, these tasks should no longer exist.
    498             if (isinstance(agent_task, postjob_task.GatherLogsTask)
    499                 # TODO(crbug.com/811877): Don't skip split HQE parsing.
    500                 or (isinstance(agent_task, postjob_task.FinalReparseTask)
    501                     and not luciferlib.is_split_job(
    502                             agent_task.queue_entries[0].id))):
    503                 return
    504             # If this AgentTask is already started (i.e., recovered from
    505             # the scheduler running previously not at STARTING lucifer
    506             # level), we want to use the AgentTask to run the test to
    507             # completion.
    508             if (isinstance(agent_task, postjob_task.AbstractQueueTask)
    509                 and not agent_task.started):
    510                 return
    511 
    512         agent = Agent(agent_task)
    513         self._agents.append(agent)
    514         agent.dispatcher = self
    515         self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
    516         self._register_agent_for_ids(self._queue_entry_agents,
    517                                      agent.queue_entry_ids, agent)
    518 
    519 
    520     def get_agents_for_entry(self, queue_entry):
    521         """
    522         Find agents corresponding to the specified queue_entry.
    523         """
    524         return list(self._queue_entry_agents.get(queue_entry.id, set()))
    525 
    526 
    527     def host_has_agent(self, host):
    528         """
    529         Determine if there is currently an Agent present using this host.
    530         """
    531         return bool(self._host_agents.get(host.id, None))
    532 
    533 
    534     def remove_agent(self, agent):
    535         self._agents.remove(agent)
    536         self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
    537                                        agent)
    538         self._unregister_agent_for_ids(self._queue_entry_agents,
    539                                        agent.queue_entry_ids, agent)
    540 
    541 
    542     def _host_has_scheduled_special_task(self, host):
    543         return bool(models.SpecialTask.objects.filter(host__id=host.id,
    544                                                       is_active=False,
    545                                                       is_complete=False))
    546 
    547 
    548     def _recover_processes(self):
    549         agent_tasks = self._create_recovery_agent_tasks()
    550         self._register_pidfiles(agent_tasks)
    551         _drone_manager.refresh()
    552         self._recover_tasks(agent_tasks)
    553         self._recover_pending_entries()
    554         self._check_for_unrecovered_verifying_entries()
    555         self._reverify_remaining_hosts()
    556         # reinitialize drones after killing orphaned processes, since they can
    557         # leave around files when they die
    558         _drone_manager.execute_actions()
    559         _drone_manager.reinitialize_drones()
    560 
    561 
    562     def _create_recovery_agent_tasks(self):
    563         return (self._get_queue_entry_agent_tasks()
    564                 + self._get_special_task_agent_tasks(is_active=True))
    565 
    566 
    567     def _get_queue_entry_agent_tasks(self):
    568         """
    569         Get agent tasks for all hqe in the specified states.
    570 
    571         Loosely this translates to taking a hqe in one of the specified states,
    572         say parsing, and getting an AgentTask for it, like the FinalReparseTask,
    573         through _get_agent_task_for_queue_entry. Each queue entry can only have
    574         one agent task at a time, but there might be multiple queue entries in
    575         the group.
    576 
    577         @return: A list of AgentTasks.
    578         """
    579         # host queue entry statuses handled directly by AgentTasks
    580         # (Verifying is handled through SpecialTasks, so is not
    581         # listed here)
    582         statuses = (models.HostQueueEntry.Status.STARTING,
    583                     models.HostQueueEntry.Status.RUNNING,
    584                     models.HostQueueEntry.Status.GATHERING,
    585                     models.HostQueueEntry.Status.PARSING)
    586         status_list = ','.join("'%s'" % status for status in statuses)
    587         queue_entries = scheduler_models.HostQueueEntry.fetch(
    588                 where='status IN (%s)' % status_list)
    589 
    590         agent_tasks = []
    591         used_queue_entries = set()
    592         hqe_count_by_status = {}
    593         for entry in queue_entries:
    594             try:
    595                 hqe_count_by_status[entry.status] = (
    596                     hqe_count_by_status.get(entry.status, 0) + 1)
    597                 if self.get_agents_for_entry(entry):
    598                     # already being handled
    599                     continue
    600                 if entry in used_queue_entries:
    601                     # already picked up by a synchronous job
    602                     continue
    603                 try:
    604                     agent_task = self._get_agent_task_for_queue_entry(entry)
    605                 except scheduler_lib.SchedulerError:
    606                     # Probably being handled by lucifer crbug.com/809773
    607                     continue
    608                 agent_tasks.append(agent_task)
    609                 used_queue_entries.update(agent_task.queue_entries)
    610             except scheduler_lib.MalformedRecordError as e:
    611                 logging.exception('Skipping agent task for a malformed hqe.')
    612                 # TODO(akeshet): figure out a way to safely permanently discard
    613                 # this errant HQE. It appears that calling entry.abort() is not
    614                 # sufficient, as that already makes some assumptions about
    615                 # record sanity that may be violated. See crbug.com/739530 for
    616                 # context.
    617                 m = 'chromeos/autotest/scheduler/skipped_malformed_hqe'
    618                 metrics.Counter(m).increment()
    619 
    620         for status, count in hqe_count_by_status.iteritems():
    621             metrics.Gauge(
    622                 'chromeos/autotest/scheduler/active_host_queue_entries'
    623             ).set(count, fields={'status': status})
    624 
    625         return agent_tasks
    626 
    627 
    628     def _get_special_task_agent_tasks(self, is_active=False):
    629         special_tasks = models.SpecialTask.objects.filter(
    630                 is_active=is_active, is_complete=False)
    631         agent_tasks = []
    632         for task in special_tasks:
    633           try:
    634               agent_tasks.append(self._get_agent_task_for_special_task(task))
    635           except scheduler_lib.MalformedRecordError as e:
    636               logging.exception('Skipping agent task for malformed special '
    637                                 'task.')
    638               m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'
    639               metrics.Counter(m).increment()
    640         return agent_tasks
    641 
    642 
    643     def _get_agent_task_for_queue_entry(self, queue_entry):
    644         """
    645         Construct an AgentTask instance for the given active HostQueueEntry.
    646 
    647         @param queue_entry: a HostQueueEntry
    648         @return: an AgentTask to run the queue entry
    649         """
    650         task_entries = queue_entry.job.get_group_entries(queue_entry)
    651         self._check_for_duplicate_host_entries(task_entries)
    652 
    653         if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
    654                                   models.HostQueueEntry.Status.RUNNING):
    655             if queue_entry.is_hostless():
    656                 return HostlessQueueTask(queue_entry=queue_entry)
    657             return QueueTask(queue_entries=task_entries)
    658         if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
    659             return postjob_task.GatherLogsTask(queue_entries=task_entries)
    660         if queue_entry.status == models.HostQueueEntry.Status.PARSING:
    661             return postjob_task.FinalReparseTask(queue_entries=task_entries)
    662 
    663         raise scheduler_lib.MalformedRecordError(
    664                 '_get_agent_task_for_queue_entry got entry with '
    665                 'invalid status %s: %s' % (queue_entry.status, queue_entry))
    666 
    667 
    668     def _check_for_duplicate_host_entries(self, task_entries):
    669         non_host_statuses = {models.HostQueueEntry.Status.PARSING}
    670         for task_entry in task_entries:
    671             using_host = (task_entry.host is not None
    672                           and task_entry.status not in non_host_statuses)
    673             if using_host:
    674                 self._assert_host_has_no_agent(task_entry)
    675 
    676 
    677     def _assert_host_has_no_agent(self, entry):
    678         """
    679         @param entry: a HostQueueEntry or a SpecialTask
    680         """
    681         if self.host_has_agent(entry.host):
    682             agent = tuple(self._host_agents.get(entry.host.id))[0]
    683             raise scheduler_lib.MalformedRecordError(
    684                     'While scheduling %s, host %s already has a host agent %s'
    685                     % (entry, entry.host, agent.task))
    686 
    687 
    688     def _get_agent_task_for_special_task(self, special_task):
    689         """
    690         Construct an AgentTask class to run the given SpecialTask and add it
    691         to this dispatcher.
    692 
    693         A special task is created through schedule_special_tasks, but only if
    694         the host doesn't already have an agent. This happens through
    695         add_agent_task. All special agent tasks are given a host on creation,
    696         and a Null hqe. To create a SpecialAgentTask object, you need a
    697         models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
    698         object contains a hqe it's passed on to the special agent task, which
    699         creates a HostQueueEntry and saves it as it's queue_entry.
    700 
    701         @param special_task: a models.SpecialTask instance
    702         @returns an AgentTask to run this SpecialTask
    703         """
    704         self._assert_host_has_no_agent(special_task)
    705 
    706         special_agent_task_classes = (prejob_task.CleanupTask,
    707                                       prejob_task.VerifyTask,
    708                                       prejob_task.RepairTask,
    709                                       prejob_task.ResetTask,
    710                                       prejob_task.ProvisionTask)
    711 
    712         for agent_task_class in special_agent_task_classes:
    713             if agent_task_class.TASK_TYPE == special_task.task:
    714                 return agent_task_class(task=special_task)
    715 
    716         raise scheduler_lib.MalformedRecordError(
    717                 'No AgentTask class for task', str(special_task))
    718 
    719 
    720     def _register_pidfiles(self, agent_tasks):
    721         for agent_task in agent_tasks:
    722             agent_task.register_necessary_pidfiles()
    723 
    724 
    725     def _recover_tasks(self, agent_tasks):
    726         orphans = _drone_manager.get_orphaned_autoserv_processes()
    727 
    728         for agent_task in agent_tasks:
    729             agent_task.recover()
    730             if agent_task.monitor and agent_task.monitor.has_process():
    731                 orphans.discard(agent_task.monitor.get_process())
    732             self.add_agent_task(agent_task)
    733 
    734         self._check_for_remaining_orphan_processes(orphans)
    735 
    736 
    737     def _get_unassigned_entries(self, status):
    738         for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
    739                                                            % status):
    740             if entry.status == status and not self.get_agents_for_entry(entry):
    741                 # The status can change during iteration, e.g., if job.run()
    742                 # sets a group of queue entries to Starting
    743                 yield entry
    744 
    745 
    746     def _check_for_remaining_orphan_processes(self, orphans):
    747         m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
    748         metrics.Gauge(m).set(len(orphans))
    749 
    750         if not orphans:
    751             return
    752         subject = 'Unrecovered orphan autoserv processes remain'
    753         message = '\n'.join(str(process) for process in orphans)
    754         die_on_orphans = global_config.global_config.get_config_value(
    755             scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
    756 
    757         if die_on_orphans:
    758             raise RuntimeError(subject + '\n' + message)
    759 
    760 
    761     def _recover_pending_entries(self):
    762         for entry in self._get_unassigned_entries(
    763                 models.HostQueueEntry.Status.PENDING):
    764             logging.info('Recovering Pending entry %s', entry)
    765             try:
    766                 entry.on_pending()
    767             except scheduler_lib.MalformedRecordError as e:
    768                 logging.exception(
    769                         'Skipping agent task for malformed special task.')
    770                 m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'
    771                 metrics.Counter(m).increment()
    772 
    773 
    774     def _check_for_unrecovered_verifying_entries(self):
    775         # Verify is replaced by Reset.
    776         queue_entries = scheduler_models.HostQueueEntry.fetch(
    777                 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)
    778         for queue_entry in queue_entries:
    779             special_tasks = models.SpecialTask.objects.filter(
    780                     task__in=(models.SpecialTask.Task.CLEANUP,
    781                               models.SpecialTask.Task.VERIFY,
    782                               models.SpecialTask.Task.RESET),
    783                     queue_entry__id=queue_entry.id,
    784                     is_complete=False)
    785             if special_tasks.count() == 0:
    786                 logging.error('Unrecovered Resetting host queue entry: %s. '
    787                               'Setting status to Queued.', str(queue_entry))
    788                 # Essentially this host queue entry was set to be Verifying
    789                 # however no special task exists for entry. This occurs if the
    790                 # scheduler dies between changing the status and creating the
    791                 # special task. By setting it to queued, the job can restart
    792                 # from the beginning and proceed correctly. This is much more
    793                 # preferable than having monitor_db not launching.
    794                 queue_entry.set_status('Queued')
    795 
    796 
    797     @_calls_log_tick_msg
    798     def _schedule_special_tasks(self):
    799         """
    800         Execute queued SpecialTasks that are ready to run on idle hosts.
    801 
    802         Special tasks include PreJobTasks like verify, reset and cleanup.
    803         They are created through _schedule_new_jobs and associated with a hqe
    804         This method translates SpecialTasks to the appropriate AgentTask and
    805         adds them to the dispatchers agents list, so _handle_agents can execute
    806         them.
    807         """
    808         # When the host scheduler is responsible for acquisition we only want
    809         # to run tasks with leased hosts. All hqe tasks will already have
    810         # leased hosts, and we don't want to run frontend tasks till the host
    811         # scheduler has vetted the assignment. Note that this doesn't include
    812         # frontend tasks with hosts leased by other active hqes.
    813         for task in self._job_query_manager.get_prioritized_special_tasks(
    814                 only_tasks_with_leased_hosts=not self._inline_host_acquisition):
    815             if self.host_has_agent(task.host):
    816                 continue
    817             try:
    818                 self.add_agent_task(self._get_agent_task_for_special_task(task))
    819             except scheduler_lib.MalformedRecordError:
    820                 logging.exception('Skipping schedule for malformed '
    821                                   'special task.')
    822                 m = 'chromeos/autotest/scheduler/skipped_schedule_special_task'
    823                 metrics.Counter(m).increment()
    824 
    825 
    826     def _reverify_remaining_hosts(self):
    827         # recover active hosts that have not yet been recovered, although this
    828         # should never happen
    829         message = ('Recovering active host %s - this probably indicates a '
    830                    'scheduler bug')
    831         self._reverify_hosts_where(
    832                 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
    833                 print_message=message)
    834 
    835 
    836     DEFAULT_REQUESTED_BY_USER_ID = 1
    837 
    838 
    839     def _reverify_hosts_where(self, where,
    840                               print_message='Reverifying host %s'):
    841         full_where='locked = 0 AND invalid = 0 AND ' + where
    842         for host in scheduler_models.Host.fetch(where=full_where):
    843             if self.host_has_agent(host):
    844                 # host has already been recovered in some way
    845                 continue
    846             if self._host_has_scheduled_special_task(host):
    847                 # host will have a special task scheduled on the next cycle
    848                 continue
    849             if print_message:
    850                 logging.error(print_message, host.hostname)
    851             try:
    852                 user = models.User.objects.get(login='autotest_system')
    853             except models.User.DoesNotExist:
    854                 user = models.User.objects.get(
    855                         id=self.DEFAULT_REQUESTED_BY_USER_ID)
    856             models.SpecialTask.objects.create(
    857                     task=models.SpecialTask.Task.RESET,
    858                     host=models.Host.objects.get(id=host.id),
    859                     requested_by=user)
    860 
    861 
    862     def _recover_hosts(self):
    863         # recover "Repair Failed" hosts
    864         message = 'Reverifying dead host %s'
    865         self._reverify_hosts_where("status = 'Repair Failed'",
    866                                    print_message=message)
    867 
    868 
    869     def _refresh_pending_queue_entries(self):
    870         """
    871         Lookup the pending HostQueueEntries and call our HostScheduler
    872         refresh() method given that list.  Return the list.
    873 
    874         @returns A list of pending HostQueueEntries sorted in priority order.
    875         """
    876         queue_entries = self._job_query_manager.get_pending_queue_entries(
    877                 only_hostless=not self._inline_host_acquisition)
    878         if not queue_entries:
    879             return []
    880         return queue_entries
    881 
    882 
    883     def _schedule_hostless_job(self, queue_entry):
    884         """Schedule a hostless (suite) job.
    885 
    886         @param queue_entry: The queue_entry representing the hostless job.
    887         """
    888         if not luciferlib.is_enabled_for('STARTING'):
    889             self.add_agent_task(HostlessQueueTask(queue_entry))
    890 
    891         # Need to set execution_subdir before setting the status:
    892         # After a restart of the scheduler, agents will be restored for HQEs in
    893         # Starting, Running, Gathering, Parsing or Archiving. To do this, the
    894         # execution_subdir is needed. Therefore it must be set before entering
    895         # one of these states.
    896         # Otherwise, if the scheduler was interrupted between setting the status
    897         # and the execution_subdir, upon it's restart restoring agents would
    898         # fail.
    899         # Is there a way to get a status in one of these states without going
    900         # through this code? Following cases are possible:
    901         # - If it's aborted before being started:
    902         #     active bit will be 0, so there's nothing to parse, it will just be
    903         #     set to completed by _find_aborting. Critical statuses are skipped.
    904         # - If it's aborted or it fails after being started:
    905         #     It was started, so this code was executed.
    906         queue_entry.update_field('execution_subdir', 'hostless')
    907         queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
    908 
    909 
    910     def _schedule_host_job(self, host, queue_entry):
    911         """Schedules a job on the given host.
    912 
    913         1. Assign the host to the hqe, if it isn't already assigned.
    914         2. Create a SpecialAgentTask for the hqe.
    915         3. Activate the hqe.
    916 
    917         @param queue_entry: The job to schedule.
    918         @param host: The host to schedule the job on.
    919         """
    920         if self.host_has_agent(host):
    921             host_agent_task = list(self._host_agents.get(host.id))[0].task
    922         else:
    923             self._host_scheduler.schedule_host_job(host, queue_entry)
    924 
    925 
    926     @_calls_log_tick_msg
    927     def _schedule_new_jobs(self):
    928         """
    929         Find any new HQEs and call schedule_pre_job_tasks for it.
    930 
    931         This involves setting the status of the HQE and creating a row in the
    932         db corresponding the the special task, through
    933         scheduler_models._queue_special_task. The new db row is then added as
    934         an agent to the dispatcher through _schedule_special_tasks and
    935         scheduled for execution on the drone through _handle_agents.
    936         """
    937         queue_entries = self._refresh_pending_queue_entries()
    938 
    939         key = 'scheduler.jobs_per_tick'
    940         new_hostless_jobs = 0
    941         new_jobs_with_hosts = 0
    942         new_jobs_need_hosts = 0
    943         host_jobs = []
    944         logging.debug('Processing %d queue_entries', len(queue_entries))
    945 
    946         for queue_entry in queue_entries:
    947             if queue_entry.is_hostless():
    948                 self._schedule_hostless_job(queue_entry)
    949                 new_hostless_jobs = new_hostless_jobs + 1
    950             else:
    951                 host_jobs.append(queue_entry)
    952                 new_jobs_need_hosts = new_jobs_need_hosts + 1
    953 
    954         metrics.Counter(
    955             'chromeos/autotest/scheduler/scheduled_jobs_hostless'
    956         ).increment_by(new_hostless_jobs)
    957 
    958         if not host_jobs:
    959             return
    960 
    961         if not self._inline_host_acquisition:
    962           # In this case, host_scheduler is responsible for scheduling
    963           # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
    964           # since host_scheduler assumes it is the single process scheduling
    965           # host jobs.
    966           metrics.Gauge(
    967               'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
    968                   len(host_jobs))
    969           return
    970 
    971         jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
    972         for host_assignment in jobs_with_hosts:
    973             self._schedule_host_job(host_assignment.host, host_assignment.job)
    974             new_jobs_with_hosts = new_jobs_with_hosts + 1
    975 
    976         metrics.Counter(
    977             'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
    978         ).increment_by(new_jobs_with_hosts)
    979 
    980 
    981     @_calls_log_tick_msg
    982     def _send_to_lucifer(self):
    983         """
    984         Hand off ownership of a job to lucifer component.
    985         """
    986         if luciferlib.is_enabled_for('starting'):
    987             self._send_starting_to_lucifer()
    988         # TODO(crbug.com/810141): Older states need to be supported when
    989         # STARTING is toggled; some jobs may be in an intermediate state
    990         # at that moment.
    991         self._send_gathering_to_lucifer()
    992         self._send_parsing_to_lucifer()
    993 
    994 
    995     # TODO(crbug.com/748234): This is temporary to enable toggling
    996     # lucifer rollouts with an option.
    997     def _send_starting_to_lucifer(self):
    998         Status = models.HostQueueEntry.Status
    999         queue_entries_qs = (models.HostQueueEntry.objects
   1000                             .filter(status=Status.STARTING))
   1001         for queue_entry in queue_entries_qs:
   1002             if self.get_agents_for_entry(queue_entry):
   1003                 continue
   1004             job = queue_entry.job
   1005             if luciferlib.is_lucifer_owned(job):
   1006                 continue
   1007             drone = luciferlib.spawn_starting_job_handler(
   1008                     manager=_drone_manager,
   1009                     job=job)
   1010             models.JobHandoff.objects.create(job=job, drone=drone.hostname())
   1011 
   1012 
   1013     # TODO(crbug.com/748234): This is temporary to enable toggling
   1014     # lucifer rollouts with an option.
   1015     def _send_gathering_to_lucifer(self):
   1016         Status = models.HostQueueEntry.Status
   1017         queue_entries_qs = (models.HostQueueEntry.objects
   1018                             .filter(status=Status.GATHERING))
   1019         for queue_entry in queue_entries_qs:
   1020             # If this HQE already has an agent, let monitor_db continue
   1021             # owning it.
   1022             if self.get_agents_for_entry(queue_entry):
   1023                 continue
   1024 
   1025             job = queue_entry.job
   1026             if luciferlib.is_lucifer_owned(job):
   1027                 continue
   1028             task = postjob_task.PostJobTask(
   1029                     [queue_entry], log_file_name='/dev/null')
   1030             pidfile_id = task._autoserv_monitor.pidfile_id
   1031             autoserv_exit = task._autoserv_monitor.exit_code()
   1032             try:
   1033                 drone = luciferlib.spawn_gathering_job_handler(
   1034                         manager=_drone_manager,
   1035                         job=job,
   1036                         autoserv_exit=autoserv_exit,
   1037                         pidfile_id=pidfile_id)
   1038                 models.JobHandoff.objects.create(job=job,
   1039                                                  drone=drone.hostname())
   1040             except drone_manager.DroneManagerError as e:
   1041                 logging.warning(
   1042                     'Fail to get drone for job %s, skipping lucifer. Error: %s',
   1043                     job.id, e)
   1044 
   1045 
   1046     # TODO(crbug.com/748234): This is temporary to enable toggling
   1047     # lucifer rollouts with an option.
   1048     def _send_parsing_to_lucifer(self):
   1049         Status = models.HostQueueEntry.Status
   1050         queue_entries_qs = (models.HostQueueEntry.objects
   1051                             .filter(status=Status.PARSING))
   1052         for queue_entry in queue_entries_qs:
   1053             # If this HQE already has an agent, let monitor_db continue
   1054             # owning it.
   1055             if self.get_agents_for_entry(queue_entry):
   1056                 continue
   1057             job = queue_entry.job
   1058             if luciferlib.is_lucifer_owned(job):
   1059                 continue
   1060             # TODO(crbug.com/811877): Ignore split HQEs.
   1061             if luciferlib.is_split_job(queue_entry.id):
   1062                 continue
   1063             task = postjob_task.PostJobTask(
   1064                     [queue_entry], log_file_name='/dev/null')
   1065             pidfile_id = task._autoserv_monitor.pidfile_id
   1066             autoserv_exit = task._autoserv_monitor.exit_code()
   1067             try:
   1068                 drone = luciferlib.spawn_parsing_job_handler(
   1069                         manager=_drone_manager,
   1070                         job=job,
   1071                         autoserv_exit=autoserv_exit,
   1072                         pidfile_id=pidfile_id)
   1073                 models.JobHandoff.objects.create(job=job,
   1074                                                  drone=drone.hostname())
   1075             except drone_manager.DroneManagerError as e:
   1076                 logging.warning(
   1077                     'Fail to get drone for job %s, skipping lucifer. Error: %s',
   1078                     job.id, e)
   1079 
   1080 
   1081 
   1082     @_calls_log_tick_msg
   1083     def _schedule_running_host_queue_entries(self):
   1084         """
   1085         Adds agents to the dispatcher.
   1086 
   1087         Any AgentTask, like the QueueTask, is wrapped in an Agent. The
   1088         QueueTask for example, will have a job with a control file, and
   1089         the agent will have methods that poll, abort and check if the queue
   1090         task is finished. The dispatcher runs the agent_task, as well as
   1091         other agents in it's _agents member, through _handle_agents, by
   1092         calling the Agents tick().
   1093 
   1094         This method creates an agent for each HQE in one of (starting, running,
   1095         gathering, parsing) states, and adds it to the dispatcher so
   1096         it is handled by _handle_agents.
   1097         """
   1098         for agent_task in self._get_queue_entry_agent_tasks():
   1099             self.add_agent_task(agent_task)
   1100 
   1101 
   1102     @_calls_log_tick_msg
   1103     def _find_aborting(self):
   1104         """
   1105         Looks through the afe_host_queue_entries for an aborted entry.
   1106 
   1107         The aborted bit is set on an HQE in many ways, the most common
   1108         being when a user requests an abort through the frontend, which
   1109         results in an rpc from the afe to abort_host_queue_entries.
   1110         """
   1111         jobs_to_stop = set()
   1112         for entry in scheduler_models.HostQueueEntry.fetch(
   1113                 where='aborted=1 and complete=0'):
   1114 
   1115             # If the job is running on a shard, let the shard handle aborting
   1116             # it and sync back the right status.
   1117             if entry.job.shard_id is not None and not server_utils.is_shard():
   1118                 logging.info('Waiting for shard %s to abort hqe %s',
   1119                         entry.job.shard_id, entry)
   1120                 continue
   1121 
   1122             logging.info('Aborting %s', entry)
   1123 
   1124             # The task would have started off with both is_complete and
   1125             # is_active = False. Aborted tasks are neither active nor complete.
   1126             # For all currently active tasks this will happen through the agent,
   1127             # but we need to manually update the special tasks that haven't
   1128             # started yet, because they don't have agents.
   1129             models.SpecialTask.objects.filter(is_active=False,
   1130                 queue_entry_id=entry.id).update(is_complete=True)
   1131 
   1132             for agent in self.get_agents_for_entry(entry):
   1133                 agent.abort()
   1134             entry.abort(self)
   1135             jobs_to_stop.add(entry.job)
   1136         logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
   1137         for job in jobs_to_stop:
   1138             job.stop_if_necessary()
   1139 
   1140 
   1141     @_calls_log_tick_msg
   1142     def _find_aborted_special_tasks(self):
   1143         """
   1144         Find SpecialTasks that have been marked for abortion.
   1145 
   1146         Poll the database looking for SpecialTasks that are active
   1147         and have been marked for abortion, then abort them.
   1148         """
   1149 
   1150         # The completed and active bits are very important when it comes
   1151         # to scheduler correctness. The active bit is set through the prolog
   1152         # of a special task, and reset through the cleanup method of the
   1153         # SpecialAgentTask. The cleanup is called both through the abort and
   1154         # epilog. The complete bit is set in several places, and in general
   1155         # a hanging job will have is_active=1 is_complete=0, while a special
   1156         # task which completed will have is_active=0 is_complete=1. To check
   1157         # aborts we directly check active because the complete bit is set in
   1158         # several places, including the epilog of agent tasks.
   1159         aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
   1160                                                           is_aborted=True)
   1161         for task in aborted_tasks:
   1162             # There are 2 ways to get the agent associated with a task,
   1163             # through the host and through the hqe. A special task
   1164             # always needs a host, but doesn't always need a hqe.
   1165             for agent in self._host_agents.get(task.host.id, []):
   1166                 if isinstance(agent.task, agent_task.SpecialAgentTask):
   1167 
   1168                     # The epilog preforms critical actions such as
   1169                     # queueing the next SpecialTask, requeuing the
   1170                     # hqe etc, however it doesn't actually kill the
   1171                     # monitor process and set the 'done' bit. Epilogs
   1172                     # assume that the job failed, and that the monitor
   1173                     # process has already written an exit code. The
   1174                     # done bit is a necessary condition for
   1175                     # _handle_agents to schedule any more special
   1176                     # tasks against the host, and it must be set
   1177                     # in addition to is_active, is_complete and success.
   1178                     agent.task.epilog()
   1179                     agent.task.abort()
   1180 
   1181 
   1182     def _can_start_agent(self, agent, have_reached_limit):
   1183         # always allow zero-process agents to run
   1184         if agent.task.num_processes == 0:
   1185             return True
   1186         # don't allow any nonzero-process agents to run after we've reached a
   1187         # limit (this avoids starvation of many-process agents)
   1188         if have_reached_limit:
   1189             return False
   1190         # total process throttling
   1191         max_runnable_processes = _drone_manager.max_runnable_processes(
   1192                 agent.task.owner_username,
   1193                 agent.task.get_drone_hostnames_allowed())
   1194         if agent.task.num_processes > max_runnable_processes:
   1195             return False
   1196         return True
   1197 
   1198 
   1199     @_calls_log_tick_msg
   1200     def _handle_agents(self):
   1201         """
   1202         Handles agents of the dispatcher.
   1203 
   1204         Appropriate Agents are added to the dispatcher through
   1205         _schedule_running_host_queue_entries. These agents each
   1206         have a task. This method runs the agents task through
   1207         agent.tick() leading to:
   1208             agent.start
   1209                 prolog -> AgentTasks prolog
   1210                           For each queue entry:
   1211                             sets host status/status to Running
   1212                             set started_on in afe_host_queue_entries
   1213                 run    -> AgentTasks run
   1214                           Creates PidfileRunMonitor
   1215                           Queues the autoserv command line for this AgentTask
   1216                           via the drone manager. These commands are executed
   1217                           through the drone managers execute actions.
   1218                 poll   -> AgentTasks/BaseAgentTask poll
   1219                           checks the monitors exit_code.
   1220                           Executes epilog if task is finished.
   1221                           Executes AgentTasks _finish_task
   1222                 finish_task is usually responsible for setting the status
   1223                 of the HQE/host, and updating it's active and complete fileds.
   1224 
   1225             agent.is_done
   1226                 Removed the agent from the dispatchers _agents queue.
   1227                 Is_done checks the finished bit on the agent, that is
   1228                 set based on the Agents task. During the agents poll
   1229                 we check to see if the monitor process has exited in
   1230                 it's finish method, and set the success member of the
   1231                 task based on this exit code.
   1232         """
   1233         num_started_this_tick = 0
   1234         num_finished_this_tick = 0
   1235         have_reached_limit = False
   1236         # iterate over copy, so we can remove agents during iteration
   1237         logging.debug('Handling %d Agents', len(self._agents))
   1238         for agent in list(self._agents):
   1239             self._log_extra_msg('Processing Agent with Host Ids: %s and '
   1240                                 'queue_entry ids:%s' % (agent.host_ids,
   1241                                 agent.queue_entry_ids))
   1242             if not agent.started:
   1243                 if not self._can_start_agent(agent, have_reached_limit):
   1244                     have_reached_limit = True
   1245                     logging.debug('Reached Limit of allowed running Agents.')
   1246                     continue
   1247                 num_started_this_tick += agent.task.num_processes
   1248                 self._log_extra_msg('Starting Agent')
   1249             agent.tick()
   1250             self._log_extra_msg('Agent tick completed.')
   1251             if agent.is_done():
   1252                 num_finished_this_tick += agent.task.num_processes
   1253                 self._log_extra_msg("Agent finished")
   1254                 self.remove_agent(agent)
   1255 
   1256         metrics.Counter(
   1257             'chromeos/autotest/scheduler/agent_processes_started'
   1258         ).increment_by(num_started_this_tick)
   1259         metrics.Counter(
   1260             'chromeos/autotest/scheduler/agent_processes_finished'
   1261         ).increment_by(num_finished_this_tick)
   1262         num_agent_processes = _drone_manager.total_running_processes()
   1263         metrics.Gauge(
   1264             'chromeos/autotest/scheduler/agent_processes'
   1265         ).set(num_agent_processes)
   1266         logging.info('%d running processes. %d added this tick.',
   1267                      num_agent_processes, num_started_this_tick)
   1268 
   1269 
   1270     def _log_tick_msg(self, msg):
   1271         if self._tick_debug:
   1272             logging.debug(msg)
   1273 
   1274 
   1275     def _log_extra_msg(self, msg):
   1276         if self._extra_debugging:
   1277             logging.debug(msg)
   1278 
   1279 
   1280 class Agent(object):
   1281     """
   1282     An agent for use by the Dispatcher class to perform a task.  An agent wraps
   1283     around an AgentTask mainly to associate the AgentTask with the queue_entry
   1284     and host ids.
   1285 
   1286     The following methods are required on all task objects:
   1287         poll() - Called periodically to let the task check its status and
   1288                 update its internal state.  If the task succeeded.
   1289         is_done() - Returns True if the task is finished.
   1290         abort() - Called when an abort has been requested.  The task must
   1291                 set its aborted attribute to True if it actually aborted.
   1292 
   1293     The following attributes are required on all task objects:
   1294         aborted - bool, True if this task was aborted.
   1295         success - bool, True if this task succeeded.
   1296         queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
   1297         host_ids - A sequence of Host ids this task represents.
   1298     """
   1299 
   1300 
   1301     def __init__(self, task):
   1302         """
   1303         @param task: An instance of an AgentTask.
   1304         """
   1305         self.task = task
   1306 
   1307         # This is filled in by Dispatcher.add_agent()
   1308         self.dispatcher = None
   1309 
   1310         self.queue_entry_ids = task.queue_entry_ids
   1311         self.host_ids = task.host_ids
   1312 
   1313         self.started = False
   1314         self.finished = False
   1315 
   1316 
   1317     def tick(self):
   1318         self.started = True
   1319         if not self.finished:
   1320             self.task.poll()
   1321             if self.task.is_done():
   1322                 self.finished = True
   1323 
   1324 
   1325     def is_done(self):
   1326         return self.finished
   1327 
   1328 
   1329     def abort(self):
   1330         if self.task:
   1331             self.task.abort()
   1332             if self.task.aborted:
   1333                 # tasks can choose to ignore aborts
   1334                 self.finished = True
   1335 
   1336 
   1337 class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
   1338     """
   1339     Common functionality for QueueTask and HostlessQueueTask
   1340     """
   1341     def __init__(self, queue_entries):
   1342         super(AbstractQueueTask, self).__init__()
   1343         self.job = queue_entries[0].job
   1344         self.queue_entries = queue_entries
   1345 
   1346 
   1347     def _keyval_path(self):
   1348         return os.path.join(self._working_directory(), self._KEYVAL_FILE)
   1349 
   1350 
   1351     def _write_control_file(self, execution_path):
   1352         control_path = _drone_manager.attach_file_to_execution(
   1353                 execution_path, self.job.control_file)
   1354         return control_path
   1355 
   1356 
   1357     # TODO: Refactor into autoserv_utils. crbug.com/243090
   1358     def _command_line(self):
   1359         execution_path = self.queue_entries[0].execution_path()
   1360         control_path = self._write_control_file(execution_path)
   1361         hostnames = ','.join(entry.host.hostname
   1362                              for entry in self.queue_entries
   1363                              if not entry.is_hostless())
   1364 
   1365         execution_tag = self.queue_entries[0].execution_tag()
   1366         params = _autoserv_command_line(
   1367             hostnames,
   1368             ['-P', execution_tag, '-n',
   1369              _drone_manager.absolute_path(control_path)],
   1370             job=self.job, verbose=False)
   1371 
   1372         return params
   1373 
   1374 
   1375     @property
   1376     def num_processes(self):
   1377         return len(self.queue_entries)
   1378 
   1379 
   1380     @property
   1381     def owner_username(self):
   1382         return self.job.owner
   1383 
   1384 
   1385     def _working_directory(self):
   1386         return self._get_consistent_execution_path(self.queue_entries)
   1387 
   1388 
   1389     def prolog(self):
   1390         queued_key, queued_time = self._job_queued_keyval(self.job)
   1391         keyval_dict = self.job.keyval_dict()
   1392         keyval_dict[queued_key] = queued_time
   1393         self._write_keyvals_before_job(keyval_dict)
   1394         for queue_entry in self.queue_entries:
   1395             queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
   1396             queue_entry.set_started_on_now()
   1397 
   1398 
   1399     def _write_lost_process_error_file(self):
   1400         error_file_path = os.path.join(self._working_directory(), 'job_failure')
   1401         _drone_manager.write_lines_to_file(error_file_path,
   1402                                            [_LOST_PROCESS_ERROR])
   1403 
   1404 
   1405     def _finish_task(self):
   1406         if not self.monitor:
   1407             return
   1408 
   1409         self._write_job_finished()
   1410 
   1411         if self.monitor.lost_process:
   1412             self._write_lost_process_error_file()
   1413 
   1414 
   1415     def _write_status_comment(self, comment):
   1416         _drone_manager.write_lines_to_file(
   1417             os.path.join(self._working_directory(), 'status.log'),
   1418             ['INFO\t----\t----\t' + comment],
   1419             paired_with_process=self.monitor.get_process())
   1420 
   1421 
   1422     def _log_abort(self):
   1423         if not self.monitor or not self.monitor.has_process():
   1424             return
   1425 
   1426         # build up sets of all the aborted_by and aborted_on values
   1427         aborted_by, aborted_on = set(), set()
   1428         for queue_entry in self.queue_entries:
   1429             if queue_entry.aborted_by:
   1430                 aborted_by.add(queue_entry.aborted_by)
   1431                 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
   1432                 aborted_on.add(t)
   1433 
   1434         # extract some actual, unique aborted by value and write it out
   1435         # TODO(showard): this conditional is now obsolete, we just need to leave
   1436         # it in temporarily for backwards compatibility over upgrades.  delete
   1437         # soon.
   1438         assert len(aborted_by) <= 1
   1439         if len(aborted_by) == 1:
   1440             aborted_by_value = aborted_by.pop()
   1441             aborted_on_value = max(aborted_on)
   1442         else:
   1443             aborted_by_value = 'autotest_system'
   1444             aborted_on_value = int(time.time())
   1445 
   1446         self._write_keyval_after_job("aborted_by", aborted_by_value)
   1447         self._write_keyval_after_job("aborted_on", aborted_on_value)
   1448 
   1449         aborted_on_string = str(datetime.datetime.fromtimestamp(
   1450             aborted_on_value))
   1451         self._write_status_comment('Job aborted by %s on %s' %
   1452                                    (aborted_by_value, aborted_on_string))
   1453 
   1454 
   1455     def abort(self):
   1456         super(AbstractQueueTask, self).abort()
   1457         self._log_abort()
   1458         self._finish_task()
   1459 
   1460 
   1461     def epilog(self):
   1462         super(AbstractQueueTask, self).epilog()
   1463         self._finish_task()
   1464 
   1465 
   1466 class QueueTask(AbstractQueueTask):
   1467     def __init__(self, queue_entries):
   1468         super(QueueTask, self).__init__(queue_entries)
   1469         self._set_ids(queue_entries=queue_entries)
   1470         self._enable_ssp_container = (
   1471                 global_config.global_config.get_config_value(
   1472                         'AUTOSERV', 'enable_ssp_container', type=bool,
   1473                         default=True))
   1474 
   1475 
   1476     def prolog(self):
   1477         self._check_queue_entry_statuses(
   1478                 self.queue_entries,
   1479                 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
   1480                                       models.HostQueueEntry.Status.RUNNING),
   1481                 allowed_host_statuses=(models.Host.Status.PENDING,
   1482                                        models.Host.Status.RUNNING))
   1483 
   1484         super(QueueTask, self).prolog()
   1485 
   1486         for queue_entry in self.queue_entries:
   1487             self._write_host_keyvals(queue_entry.host)
   1488             queue_entry.host.set_status(models.Host.Status.RUNNING)
   1489             queue_entry.host.update_field('dirty', 1)
   1490 
   1491 
   1492     def _finish_task(self):
   1493         super(QueueTask, self)._finish_task()
   1494 
   1495         for queue_entry in self.queue_entries:
   1496             queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
   1497             queue_entry.host.set_status(models.Host.Status.RUNNING)
   1498 
   1499 
   1500     def _command_line(self):
   1501         invocation = super(QueueTask, self)._command_line()
   1502         # Check if server-side packaging is needed.
   1503         if (self._enable_ssp_container and
   1504             self.job.control_type == control_data.CONTROL_TYPE.SERVER and
   1505             self.job.require_ssp != False):
   1506             invocation += ['--require-ssp']
   1507             keyval_dict = self.job.keyval_dict()
   1508             test_source_build = keyval_dict.get('test_source_build', None)
   1509             if test_source_build:
   1510                 invocation += ['--test_source_build', test_source_build]
   1511         if self.job.parent_job_id:
   1512             invocation += ['--parent_job_id', str(self.job.parent_job_id)]
   1513         return invocation + ['--verify_job_repo_url']
   1514 
   1515 
   1516 class HostlessQueueTask(AbstractQueueTask):
   1517     def __init__(self, queue_entry):
   1518         super(HostlessQueueTask, self).__init__([queue_entry])
   1519         self.queue_entry_ids = [queue_entry.id]
   1520 
   1521 
   1522     def prolog(self):
   1523         super(HostlessQueueTask, self).prolog()
   1524 
   1525 
   1526     def _finish_task(self):
   1527         super(HostlessQueueTask, self)._finish_task()
   1528 
   1529         # When a job is added to database, its initial status is always
   1530         # Starting. In a scheduler tick, scheduler finds all jobs in Starting
   1531         # status, check if any of them can be started. If scheduler hits some
   1532         # limit, e.g., max_hostless_jobs_per_drone, scheduler will
   1533         # leave these jobs in Starting status. Otherwise, the jobs'
   1534         # status will be changed to Running, and an autoserv process
   1535         # will be started in drone for each of these jobs.
   1536         # If the entry is still in status Starting, the process has not started
   1537         # yet. Therefore, there is no need to parse and collect log. Without
   1538         # this check, exception will be raised by scheduler as execution_subdir
   1539         # for this queue entry does not have a value yet.
   1540         hqe = self.queue_entries[0]
   1541         if hqe.status != models.HostQueueEntry.Status.STARTING:
   1542             hqe.set_status(models.HostQueueEntry.Status.PARSING)
   1543 
   1544 
   1545 if __name__ == '__main__':
   1546     main()
   1547