Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 """
      3 Autotest scheduler
      4 """
      5 
      6 import datetime
      7 import gc
      8 import logging
      9 import optparse
     10 import os
     11 import signal
     12 import sys
     13 import time
     14 
     15 import common
     16 from autotest_lib.frontend import setup_django_environment
     17 
     18 import django.db
     19 
     20 from autotest_lib.client.common_lib import control_data
     21 from autotest_lib.client.common_lib import global_config
     22 from autotest_lib.client.common_lib import utils
     23 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     24 from autotest_lib.frontend.afe import models, rpc_utils
     25 from autotest_lib.scheduler import agent_task, drone_manager
     26 from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
     27 from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
     28 from autotest_lib.scheduler import postjob_task
     29 from autotest_lib.scheduler import query_managers
     30 from autotest_lib.scheduler import scheduler_lib
     31 from autotest_lib.scheduler import scheduler_models
     32 from autotest_lib.scheduler import status_server, scheduler_config
     33 from autotest_lib.server import autoserv_utils
     34 from autotest_lib.server import system_utils
     35 from autotest_lib.server import utils as server_utils
     36 from autotest_lib.site_utils import metadata_reporter
     37 from autotest_lib.site_utils import server_manager_utils
     38 
     39 
     40 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
     41 PID_FILE_PREFIX = 'monitor_db'
     42 
     43 RESULTS_DIR = '.'
     44 AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
     45 
     46 if os.environ.has_key('AUTOTEST_DIR'):
     47     AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
     48 AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
     49 AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
     50 
     51 if AUTOTEST_SERVER_DIR not in sys.path:
     52     sys.path.insert(0, AUTOTEST_SERVER_DIR)
     53 
     54 # error message to leave in results dir when an autoserv process disappears
     55 # mysteriously
     56 _LOST_PROCESS_ERROR = """\
     57 Autoserv failed abnormally during execution for this job, probably due to a
     58 system error on the Autotest server.  Full results may not be available.  Sorry.
     59 """
     60 
     61 _db_manager = None
     62 _db = None
     63 _shutdown = False
     64 
     65 # These 2 globals are replaced for testing
     66 _autoserv_directory = autoserv_utils.autoserv_directory
     67 _autoserv_path = autoserv_utils.autoserv_path
     68 _testing_mode = False
     69 _drone_manager = None
     70 _inline_host_acquisition = global_config.global_config.get_config_value(
     71         scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
     72         default=True)
     73 
     74 _enable_ssp_container = global_config.global_config.get_config_value(
     75         'AUTOSERV', 'enable_ssp_container', type=bool,
     76         default=True)
     77 
     78 def _site_init_monitor_db_dummy():
     79     return {}
     80 
     81 
     82 def _verify_default_drone_set_exists():
     83     if (models.DroneSet.drone_sets_enabled() and
     84             not models.DroneSet.default_drone_set_name()):
     85         raise scheduler_lib.SchedulerError(
     86                 'Drone sets are enabled, but no default is set')
     87 
     88 
     89 def _sanity_check():
     90     """Make sure the configs are consistent before starting the scheduler"""
     91     _verify_default_drone_set_exists()
     92 
     93 
     94 def main():
     95     try:
     96         try:
     97             main_without_exception_handling()
     98         except SystemExit:
     99             raise
    100         except:
    101             logging.exception('Exception escaping in monitor_db')
    102             raise
    103     finally:
    104         utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
    105 
    106 
    107 def main_without_exception_handling():
    108     scheduler_lib.setup_logging(
    109             os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
    110             os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
    111     usage = 'usage: %prog [options] results_dir'
    112     parser = optparse.OptionParser(usage)
    113     parser.add_option('--recover-hosts', help='Try to recover dead hosts',
    114                       action='store_true')
    115     parser.add_option('--test', help='Indicate that scheduler is under ' +
    116                       'test and should use dummy autoserv and no parsing',
    117                       action='store_true')
    118     parser.add_option('--production',
    119                       help=('Indicate that scheduler is running in production '
    120                             'environment and it can use database that is not '
    121                             'hosted in localhost. If it is set to False, '
    122                             'scheduler will fail if database is not in '
    123                             'localhost.'),
    124                       action='store_true', default=False)
    125     (options, args) = parser.parse_args()
    126     if len(args) != 1:
    127         parser.print_usage()
    128         return
    129 
    130     scheduler_lib.check_production_settings(options)
    131 
    132     scheduler_enabled = global_config.global_config.get_config_value(
    133         scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
    134 
    135     if not scheduler_enabled:
    136         logging.error("Scheduler not enabled, set enable_scheduler to true in "
    137                       "the global_config's SCHEDULER section to enable it. "
    138                       "Exiting.")
    139         sys.exit(1)
    140 
    141     global RESULTS_DIR
    142     RESULTS_DIR = args[0]
    143 
    144     site_init = utils.import_site_function(__file__,
    145         "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
    146         _site_init_monitor_db_dummy)
    147     site_init()
    148 
    149     # Change the cwd while running to avoid issues incase we were launched from
    150     # somewhere odd (such as a random NFS home directory of the person running
    151     # sudo to launch us as the appropriate user).
    152     os.chdir(RESULTS_DIR)
    153 
    154     # This is helpful for debugging why stuff a scheduler launches is
    155     # misbehaving.
    156     logging.info('os.environ: %s', os.environ)
    157 
    158     if options.test:
    159         global _autoserv_path
    160         _autoserv_path = 'autoserv_dummy'
    161         global _testing_mode
    162         _testing_mode = True
    163 
    164     server = status_server.StatusServer()
    165     server.start()
    166 
    167     # Start the thread to report metadata.
    168     metadata_reporter.start()
    169 
    170     try:
    171         initialize()
    172         dispatcher = Dispatcher()
    173         dispatcher.initialize(recover_hosts=options.recover_hosts)
    174         minimum_tick_sec = global_config.global_config.get_config_value(
    175                 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
    176 
    177         while not _shutdown and not server._shutdown_scheduler:
    178             start = time.time()
    179             dispatcher.tick()
    180             curr_tick_sec = time.time() - start
    181             if (minimum_tick_sec > curr_tick_sec):
    182                 time.sleep(minimum_tick_sec - curr_tick_sec)
    183             else:
    184                 time.sleep(0.0001)
    185     except Exception:
    186         email_manager.manager.log_stacktrace(
    187             "Uncaught exception; terminating monitor_db")
    188 
    189     metadata_reporter.abort()
    190     email_manager.manager.send_queued_emails()
    191     server.shutdown()
    192     _drone_manager.shutdown()
    193     _db_manager.disconnect()
    194 
    195 
    196 def handle_signal(signum, frame):
    197     global _shutdown
    198     _shutdown = True
    199     logging.info("Shutdown request received.")
    200 
    201 
    202 def initialize():
    203     logging.info("%s> dispatcher starting", time.strftime("%X %x"))
    204     logging.info("My PID is %d", os.getpid())
    205 
    206     if utils.program_is_alive(PID_FILE_PREFIX):
    207         logging.critical("monitor_db already running, aborting!")
    208         sys.exit(1)
    209     utils.write_pid(PID_FILE_PREFIX)
    210 
    211     if _testing_mode:
    212         global_config.global_config.override_config_value(
    213             scheduler_lib.DB_CONFIG_SECTION, 'database',
    214             'stresstest_autotest_web')
    215 
    216     # If server database is enabled, check if the server has role `scheduler`.
    217     # If the server does not have scheduler role, exception will be raised and
    218     # scheduler will not continue to run.
    219     if server_manager_utils.use_server_db():
    220         server_manager_utils.confirm_server_has_role(hostname='localhost',
    221                                                      role='scheduler')
    222 
    223     os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
    224     global _db_manager
    225     _db_manager = scheduler_lib.ConnectionManager()
    226     global _db
    227     _db = _db_manager.get_connection()
    228     logging.info("Setting signal handler")
    229     signal.signal(signal.SIGINT, handle_signal)
    230     signal.signal(signal.SIGTERM, handle_signal)
    231 
    232     initialize_globals()
    233     scheduler_models.initialize()
    234 
    235     drone_list = system_utils.get_drones()
    236     results_host = global_config.global_config.get_config_value(
    237         scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
    238     _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
    239 
    240     logging.info("Connected! Running...")
    241 
    242 
    243 def initialize_globals():
    244     global _drone_manager
    245     _drone_manager = drone_manager.instance()
    246 
    247 
    248 def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
    249                            verbose=True):
    250     """
    251     @returns The autoserv command line as a list of executable + parameters.
    252 
    253     @param machines - string - A machine or comma separated list of machines
    254             for the (-m) flag.
    255     @param extra_args - list - Additional arguments to pass to autoserv.
    256     @param job - Job object - If supplied, -u owner, -l name, --test-retry,
    257             and client -c or server -s parameters will be added.
    258     @param queue_entry - A HostQueueEntry object - If supplied and no Job
    259             object was supplied, this will be used to lookup the Job object.
    260     """
    261     command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
    262             machines, results_directory=drone_manager.WORKING_DIRECTORY,
    263             extra_args=extra_args, job=job, queue_entry=queue_entry,
    264             verbose=verbose, in_lab=True)
    265     return command
    266 
    267 
    268 class BaseDispatcher(object):
    269 
    270 
    271     def __init__(self):
    272         self._agents = []
    273         self._last_clean_time = time.time()
    274         user_cleanup_time = scheduler_config.config.clean_interval_minutes
    275         self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
    276                 _db, user_cleanup_time)
    277         self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
    278                 _db, _drone_manager)
    279         self._host_agents = {}
    280         self._queue_entry_agents = {}
    281         self._tick_count = 0
    282         self._last_garbage_stats_time = time.time()
    283         self._seconds_between_garbage_stats = 60 * (
    284                 global_config.global_config.get_config_value(
    285                         scheduler_config.CONFIG_SECTION,
    286                         'gc_stats_interval_mins', type=int, default=6*60))
    287         self._tick_debug = global_config.global_config.get_config_value(
    288                 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
    289                 default=False)
    290         self._extra_debugging = global_config.global_config.get_config_value(
    291                 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
    292                 default=False)
    293 
    294         # If _inline_host_acquisition is set the scheduler will acquire and
    295         # release hosts against jobs inline, with the tick. Otherwise the
    296         # scheduler will only focus on jobs that already have hosts, and
    297         # will not explicitly unlease a host when a job finishes using it.
    298         self._job_query_manager = query_managers.AFEJobQueryManager()
    299         self._host_scheduler = (host_scheduler.BaseHostScheduler()
    300                                 if _inline_host_acquisition else
    301                                 host_scheduler.DummyHostScheduler())
    302 
    303 
    304     def initialize(self, recover_hosts=True):
    305         self._periodic_cleanup.initialize()
    306         self._24hr_upkeep.initialize()
    307         # Execute all actions queued in the cleanup tasks. Scheduler tick will
    308         # run a refresh task first. If there is any action in the queue, refresh
    309         # will raise an exception.
    310         _drone_manager.execute_actions()
    311 
    312         # always recover processes
    313         self._recover_processes()
    314 
    315         if recover_hosts:
    316             self._recover_hosts()
    317 
    318 
    319     def _log_tick_msg(self, msg):
    320         if self._tick_debug:
    321             logging.debug(msg)
    322 
    323 
    324     def _log_extra_msg(self, msg):
    325         if self._extra_debugging:
    326             logging.debug(msg)
    327 
    328 
    329     def tick(self):
    330         """
    331         This is an altered version of tick() where we keep track of when each
    332         major step begins so we can try to figure out where we are using most
    333         of the tick time.
    334         """
    335         timer = autotest_stats.Timer('scheduler.tick')
    336         system_utils.DroneCache.refresh()
    337         self._log_tick_msg('Calling new tick, starting garbage collection().')
    338         self._garbage_collection()
    339         self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
    340         _drone_manager.trigger_refresh()
    341         self._log_tick_msg('Calling _process_recurring_runs().')
    342         self._process_recurring_runs()
    343         self._log_tick_msg('Calling _schedule_delay_tasks().')
    344         self._schedule_delay_tasks()
    345         self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
    346         self._schedule_running_host_queue_entries()
    347         self._log_tick_msg('Calling _schedule_special_tasks().')
    348         self._schedule_special_tasks()
    349         self._log_tick_msg('Calling _schedule_new_jobs().')
    350         self._schedule_new_jobs()
    351         self._log_tick_msg('Calling _drone_manager.sync_refresh().')
    352         _drone_manager.sync_refresh()
    353         # _run_cleanup must be called between drone_manager.sync_refresh, and
    354         # drone_manager.execute_actions, as sync_refresh will clear the calls
    355         # queued in drones. Therefore, any action that calls drone.queue_call
    356         # to add calls to the drone._calls, should be after drone refresh is
    357         # completed and before drone_manager.execute_actions at the end of the
    358         # tick.
    359         self._log_tick_msg('Calling _run_cleanup().')
    360         self._run_cleanup()
    361         self._log_tick_msg('Calling _find_aborting().')
    362         self._find_aborting()
    363         self._log_tick_msg('Calling _find_aborted_special_tasks().')
    364         self._find_aborted_special_tasks()
    365         self._log_tick_msg('Calling _handle_agents().')
    366         self._handle_agents()
    367         self._log_tick_msg('Calling _host_scheduler.tick().')
    368         self._host_scheduler.tick()
    369         self._log_tick_msg('Calling _drone_manager.execute_actions().')
    370         _drone_manager.execute_actions()
    371         self._log_tick_msg('Calling '
    372                            'email_manager.manager.send_queued_emails().')
    373         with timer.get_client('email_manager_send_queued_emails'):
    374             email_manager.manager.send_queued_emails()
    375         self._log_tick_msg('Calling django.db.reset_queries().')
    376         with timer.get_client('django_db_reset_queries'):
    377             django.db.reset_queries()
    378         self._tick_count += 1
    379 
    380 
    381     def _run_cleanup(self):
    382         self._periodic_cleanup.run_cleanup_maybe()
    383         self._24hr_upkeep.run_cleanup_maybe()
    384 
    385 
    386     def _garbage_collection(self):
    387         threshold_time = time.time() - self._seconds_between_garbage_stats
    388         if threshold_time < self._last_garbage_stats_time:
    389             # Don't generate these reports very often.
    390             return
    391 
    392         self._last_garbage_stats_time = time.time()
    393         # Force a full level 0 collection (because we can, it doesn't hurt
    394         # at this interval).
    395         gc.collect()
    396         logging.info('Logging garbage collector stats on tick %d.',
    397                      self._tick_count)
    398         gc_stats._log_garbage_collector_stats()
    399 
    400 
    401     def _register_agent_for_ids(self, agent_dict, object_ids, agent):
    402         for object_id in object_ids:
    403             agent_dict.setdefault(object_id, set()).add(agent)
    404 
    405 
    406     def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
    407         for object_id in object_ids:
    408             assert object_id in agent_dict
    409             agent_dict[object_id].remove(agent)
    410             # If an ID has no more active agent associated, there is no need to
    411             # keep it in the dictionary. Otherwise, scheduler will keep an
    412             # unnecessarily big dictionary until being restarted.
    413             if not agent_dict[object_id]:
    414                 agent_dict.pop(object_id)
    415 
    416 
    417     def add_agent_task(self, agent_task):
    418         """
    419         Creates and adds an agent to the dispatchers list.
    420 
    421         In creating the agent we also pass on all the queue_entry_ids and
    422         host_ids from the special agent task. For every agent we create, we
    423         add it to 1. a dict against the queue_entry_ids given to it 2. A dict
    424         against the host_ids given to it. So theoritically, a host can have any
    425         number of agents associated with it, and each of them can have any
    426         special agent task, though in practice we never see > 1 agent/task per
    427         host at any time.
    428 
    429         @param agent_task: A SpecialTask for the agent to manage.
    430         """
    431         agent = Agent(agent_task)
    432         self._agents.append(agent)
    433         agent.dispatcher = self
    434         self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
    435         self._register_agent_for_ids(self._queue_entry_agents,
    436                                      agent.queue_entry_ids, agent)
    437 
    438 
    439     def get_agents_for_entry(self, queue_entry):
    440         """
    441         Find agents corresponding to the specified queue_entry.
    442         """
    443         return list(self._queue_entry_agents.get(queue_entry.id, set()))
    444 
    445 
    446     def host_has_agent(self, host):
    447         """
    448         Determine if there is currently an Agent present using this host.
    449         """
    450         return bool(self._host_agents.get(host.id, None))
    451 
    452 
    453     def remove_agent(self, agent):
    454         self._agents.remove(agent)
    455         self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
    456                                        agent)
    457         self._unregister_agent_for_ids(self._queue_entry_agents,
    458                                        agent.queue_entry_ids, agent)
    459 
    460 
    461     def _host_has_scheduled_special_task(self, host):
    462         return bool(models.SpecialTask.objects.filter(host__id=host.id,
    463                                                       is_active=False,
    464                                                       is_complete=False))
    465 
    466 
    467     def _recover_processes(self):
    468         agent_tasks = self._create_recovery_agent_tasks()
    469         self._register_pidfiles(agent_tasks)
    470         _drone_manager.refresh()
    471         self._recover_tasks(agent_tasks)
    472         self._recover_pending_entries()
    473         self._check_for_unrecovered_verifying_entries()
    474         self._reverify_remaining_hosts()
    475         # reinitialize drones after killing orphaned processes, since they can
    476         # leave around files when they die
    477         _drone_manager.execute_actions()
    478         _drone_manager.reinitialize_drones()
    479 
    480 
    481     def _create_recovery_agent_tasks(self):
    482         return (self._get_queue_entry_agent_tasks()
    483                 + self._get_special_task_agent_tasks(is_active=True))
    484 
    485 
    486     def _get_queue_entry_agent_tasks(self):
    487         """
    488         Get agent tasks for all hqe in the specified states.
    489 
    490         Loosely this translates to taking a hqe in one of the specified states,
    491         say parsing, and getting an AgentTask for it, like the FinalReparseTask,
    492         through _get_agent_task_for_queue_entry. Each queue entry can only have
    493         one agent task at a time, but there might be multiple queue entries in
    494         the group.
    495 
    496         @return: A list of AgentTasks.
    497         """
    498         # host queue entry statuses handled directly by AgentTasks (Verifying is
    499         # handled through SpecialTasks, so is not listed here)
    500         statuses = (models.HostQueueEntry.Status.STARTING,
    501                     models.HostQueueEntry.Status.RUNNING,
    502                     models.HostQueueEntry.Status.GATHERING,
    503                     models.HostQueueEntry.Status.PARSING,
    504                     models.HostQueueEntry.Status.ARCHIVING)
    505         status_list = ','.join("'%s'" % status for status in statuses)
    506         queue_entries = scheduler_models.HostQueueEntry.fetch(
    507                 where='status IN (%s)' % status_list)
    508         autotest_stats.Gauge('scheduler.jobs_per_tick').send(
    509                 'running', len(queue_entries))
    510 
    511         agent_tasks = []
    512         used_queue_entries = set()
    513         for entry in queue_entries:
    514             if self.get_agents_for_entry(entry):
    515                 # already being handled
    516                 continue
    517             if entry in used_queue_entries:
    518                 # already picked up by a synchronous job
    519                 continue
    520             agent_task = self._get_agent_task_for_queue_entry(entry)
    521             agent_tasks.append(agent_task)
    522             used_queue_entries.update(agent_task.queue_entries)
    523         return agent_tasks
    524 
    525 
    526     def _get_special_task_agent_tasks(self, is_active=False):
    527         special_tasks = models.SpecialTask.objects.filter(
    528                 is_active=is_active, is_complete=False)
    529         return [self._get_agent_task_for_special_task(task)
    530                 for task in special_tasks]
    531 
    532 
    533     def _get_agent_task_for_queue_entry(self, queue_entry):
    534         """
    535         Construct an AgentTask instance for the given active HostQueueEntry.
    536 
    537         @param queue_entry: a HostQueueEntry
    538         @return: an AgentTask to run the queue entry
    539         """
    540         task_entries = queue_entry.job.get_group_entries(queue_entry)
    541         self._check_for_duplicate_host_entries(task_entries)
    542 
    543         if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
    544                                   models.HostQueueEntry.Status.RUNNING):
    545             if queue_entry.is_hostless():
    546                 return HostlessQueueTask(queue_entry=queue_entry)
    547             return QueueTask(queue_entries=task_entries)
    548         if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
    549             return postjob_task.GatherLogsTask(queue_entries=task_entries)
    550         if queue_entry.status == models.HostQueueEntry.Status.PARSING:
    551             return postjob_task.FinalReparseTask(queue_entries=task_entries)
    552         if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
    553             return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
    554 
    555         raise scheduler_lib.SchedulerError(
    556                 '_get_agent_task_for_queue_entry got entry with '
    557                 'invalid status %s: %s' % (queue_entry.status, queue_entry))
    558 
    559 
    560     def _check_for_duplicate_host_entries(self, task_entries):
    561         non_host_statuses = (models.HostQueueEntry.Status.PARSING,
    562                              models.HostQueueEntry.Status.ARCHIVING)
    563         for task_entry in task_entries:
    564             using_host = (task_entry.host is not None
    565                           and task_entry.status not in non_host_statuses)
    566             if using_host:
    567                 self._assert_host_has_no_agent(task_entry)
    568 
    569 
    570     def _assert_host_has_no_agent(self, entry):
    571         """
    572         @param entry: a HostQueueEntry or a SpecialTask
    573         """
    574         if self.host_has_agent(entry.host):
    575             agent = tuple(self._host_agents.get(entry.host.id))[0]
    576             raise scheduler_lib.SchedulerError(
    577                     'While scheduling %s, host %s already has a host agent %s'
    578                     % (entry, entry.host, agent.task))
    579 
    580 
    581     def _get_agent_task_for_special_task(self, special_task):
    582         """
    583         Construct an AgentTask class to run the given SpecialTask and add it
    584         to this dispatcher.
    585 
    586         A special task is created through schedule_special_tasks, but only if
    587         the host doesn't already have an agent. This happens through
    588         add_agent_task. All special agent tasks are given a host on creation,
    589         and a Null hqe. To create a SpecialAgentTask object, you need a
    590         models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
    591         object contains a hqe it's passed on to the special agent task, which
    592         creates a HostQueueEntry and saves it as it's queue_entry.
    593 
    594         @param special_task: a models.SpecialTask instance
    595         @returns an AgentTask to run this SpecialTask
    596         """
    597         self._assert_host_has_no_agent(special_task)
    598 
    599         special_agent_task_classes = (prejob_task.CleanupTask,
    600                                       prejob_task.VerifyTask,
    601                                       prejob_task.RepairTask,
    602                                       prejob_task.ResetTask,
    603                                       prejob_task.ProvisionTask)
    604 
    605         for agent_task_class in special_agent_task_classes:
    606             if agent_task_class.TASK_TYPE == special_task.task:
    607                 return agent_task_class(task=special_task)
    608 
    609         raise scheduler_lib.SchedulerError(
    610                 'No AgentTask class for task', str(special_task))
    611 
    612 
    613     def _register_pidfiles(self, agent_tasks):
    614         for agent_task in agent_tasks:
    615             agent_task.register_necessary_pidfiles()
    616 
    617 
    618     def _recover_tasks(self, agent_tasks):
    619         orphans = _drone_manager.get_orphaned_autoserv_processes()
    620 
    621         for agent_task in agent_tasks:
    622             agent_task.recover()
    623             if agent_task.monitor and agent_task.monitor.has_process():
    624                 orphans.discard(agent_task.monitor.get_process())
    625             self.add_agent_task(agent_task)
    626 
    627         self._check_for_remaining_orphan_processes(orphans)
    628 
    629 
    630     def _get_unassigned_entries(self, status):
    631         for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
    632                                                            % status):
    633             if entry.status == status and not self.get_agents_for_entry(entry):
    634                 # The status can change during iteration, e.g., if job.run()
    635                 # sets a group of queue entries to Starting
    636                 yield entry
    637 
    638 
    639     def _check_for_remaining_orphan_processes(self, orphans):
    640         if not orphans:
    641             return
    642         subject = 'Unrecovered orphan autoserv processes remain'
    643         message = '\n'.join(str(process) for process in orphans)
    644         email_manager.manager.enqueue_notify_email(subject, message)
    645 
    646         die_on_orphans = global_config.global_config.get_config_value(
    647             scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
    648 
    649         if die_on_orphans:
    650             raise RuntimeError(subject + '\n' + message)
    651 
    652 
    653     def _recover_pending_entries(self):
    654         for entry in self._get_unassigned_entries(
    655                 models.HostQueueEntry.Status.PENDING):
    656             logging.info('Recovering Pending entry %s', entry)
    657             entry.on_pending()
    658 
    659 
    660     def _check_for_unrecovered_verifying_entries(self):
    661         queue_entries = scheduler_models.HostQueueEntry.fetch(
    662                 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
    663         unrecovered_hqes = []
    664         for queue_entry in queue_entries:
    665             special_tasks = models.SpecialTask.objects.filter(
    666                     task__in=(models.SpecialTask.Task.CLEANUP,
    667                               models.SpecialTask.Task.VERIFY),
    668                     queue_entry__id=queue_entry.id,
    669                     is_complete=False)
    670             if special_tasks.count() == 0:
    671                 unrecovered_hqes.append(queue_entry)
    672 
    673         if unrecovered_hqes:
    674             message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
    675             raise scheduler_lib.SchedulerError(
    676                     '%d unrecovered verifying host queue entries:\n%s' %
    677                     (len(unrecovered_hqes), message))
    678 
    679 
    680     def _schedule_special_tasks(self):
    681         """
    682         Execute queued SpecialTasks that are ready to run on idle hosts.
    683 
    684         Special tasks include PreJobTasks like verify, reset and cleanup.
    685         They are created through _schedule_new_jobs and associated with a hqe
    686         This method translates SpecialTasks to the appropriate AgentTask and
    687         adds them to the dispatchers agents list, so _handle_agents can execute
    688         them.
    689         """
    690         # When the host scheduler is responsible for acquisition we only want
    691         # to run tasks with leased hosts. All hqe tasks will already have
    692         # leased hosts, and we don't want to run frontend tasks till the host
    693         # scheduler has vetted the assignment. Note that this doesn't include
    694         # frontend tasks with hosts leased by other active hqes.
    695         for task in self._job_query_manager.get_prioritized_special_tasks(
    696                 only_tasks_with_leased_hosts=not _inline_host_acquisition):
    697             if self.host_has_agent(task.host):
    698                 continue
    699             self.add_agent_task(self._get_agent_task_for_special_task(task))
    700 
    701 
    702     def _reverify_remaining_hosts(self):
    703         # recover active hosts that have not yet been recovered, although this
    704         # should never happen
    705         message = ('Recovering active host %s - this probably indicates a '
    706                    'scheduler bug')
    707         self._reverify_hosts_where(
    708                 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
    709                 print_message=message)
    710 
    711 
    712     def _reverify_hosts_where(self, where,
    713                               print_message='Reverifying host %s'):
    714         full_where='locked = 0 AND invalid = 0 AND ' + where
    715         for host in scheduler_models.Host.fetch(where=full_where):
    716             if self.host_has_agent(host):
    717                 # host has already been recovered in some way
    718                 continue
    719             if self._host_has_scheduled_special_task(host):
    720                 # host will have a special task scheduled on the next tick
    721                 continue
    722             if print_message:
    723                 logging.info(print_message, host.hostname)
    724             models.SpecialTask.objects.create(
    725                     task=models.SpecialTask.Task.CLEANUP,
    726                     host=models.Host.objects.get(id=host.id))
    727 
    728 
    729     def _recover_hosts(self):
    730         # recover "Repair Failed" hosts
    731         message = 'Reverifying dead host %s'
    732         self._reverify_hosts_where("status = 'Repair Failed'",
    733                                    print_message=message)
    734 
    735 
    736     def _refresh_pending_queue_entries(self):
    737         """
    738         Lookup the pending HostQueueEntries and call our HostScheduler
    739         refresh() method given that list.  Return the list.
    740 
    741         @returns A list of pending HostQueueEntries sorted in priority order.
    742         """
    743         queue_entries = self._job_query_manager.get_pending_queue_entries(
    744                 only_hostless=not _inline_host_acquisition)
    745         if not queue_entries:
    746             return []
    747         return queue_entries
    748 
    749 
    750     def _schedule_hostless_job(self, queue_entry):
    751         """Schedule a hostless (suite) job.
    752 
    753         @param queue_entry: The queue_entry representing the hostless job.
    754         """
    755         self.add_agent_task(HostlessQueueTask(queue_entry))
    756 
    757         # Need to set execution_subdir before setting the status:
    758         # After a restart of the scheduler, agents will be restored for HQEs in
    759         # Starting, Running, Gathering, Parsing or Archiving. To do this, the
    760         # execution_subdir is needed. Therefore it must be set before entering
    761         # one of these states.
    762         # Otherwise, if the scheduler was interrupted between setting the status
    763         # and the execution_subdir, upon it's restart restoring agents would
    764         # fail.
    765         # Is there a way to get a status in one of these states without going
    766         # through this code? Following cases are possible:
    767         # - If it's aborted before being started:
    768         #     active bit will be 0, so there's nothing to parse, it will just be
    769         #     set to completed by _find_aborting. Critical statuses are skipped.
    770         # - If it's aborted or it fails after being started:
    771         #     It was started, so this code was executed.
    772         queue_entry.update_field('execution_subdir', 'hostless')
    773         queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
    774 
    775 
    776     def _schedule_host_job(self, host, queue_entry):
    777         """Schedules a job on the given host.
    778 
    779         1. Assign the host to the hqe, if it isn't already assigned.
    780         2. Create a SpecialAgentTask for the hqe.
    781         3. Activate the hqe.
    782 
    783         @param queue_entry: The job to schedule.
    784         @param host: The host to schedule the job on.
    785         """
    786         if self.host_has_agent(host):
    787             host_agent_task = list(self._host_agents.get(host.id))[0].task
    788             subject = 'Host with agents assigned to an HQE'
    789             message = ('HQE: %s assigned host %s, but the host has '
    790                        'agent: %s for queue_entry %s. The HQE '
    791                        'will have to try and acquire a host next tick ' %
    792                        (queue_entry, host.hostname, host_agent_task,
    793                         host_agent_task.queue_entry))
    794             email_manager.manager.enqueue_notify_email(subject, message)
    795         else:
    796             self._host_scheduler.schedule_host_job(host, queue_entry)
    797 
    798 
    799     def _schedule_new_jobs(self):
    800         """
    801         Find any new HQEs and call schedule_pre_job_tasks for it.
    802 
    803         This involves setting the status of the HQE and creating a row in the
    804         db corresponding the the special task, through
    805         scheduler_models._queue_special_task. The new db row is then added as
    806         an agent to the dispatcher through _schedule_special_tasks and
    807         scheduled for execution on the drone through _handle_agents.
    808         """
    809         queue_entries = self._refresh_pending_queue_entries()
    810 
    811         key = 'scheduler.jobs_per_tick'
    812         new_hostless_jobs = 0
    813         new_jobs_with_hosts = 0
    814         new_jobs_need_hosts = 0
    815         host_jobs = []
    816         logging.debug('Processing %d queue_entries', len(queue_entries))
    817 
    818         for queue_entry in queue_entries:
    819             if queue_entry.is_hostless():
    820                 self._schedule_hostless_job(queue_entry)
    821                 new_hostless_jobs = new_hostless_jobs + 1
    822             else:
    823                 host_jobs.append(queue_entry)
    824                 new_jobs_need_hosts = new_jobs_need_hosts + 1
    825 
    826         autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
    827         if not host_jobs:
    828             return
    829         if not _inline_host_acquisition:
    830             message = ('Found %s jobs that need hosts though '
    831                        '_inline_host_acquisition=%s. Will acquire hosts.' %
    832                        ([str(job) for job in host_jobs],
    833                         _inline_host_acquisition))
    834             email_manager.manager.enqueue_notify_email(
    835                     'Processing unexpected host acquisition requests', message)
    836         jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
    837         for host_assignment in jobs_with_hosts:
    838             self._schedule_host_job(host_assignment.host, host_assignment.job)
    839             new_jobs_with_hosts = new_jobs_with_hosts + 1
    840 
    841         autotest_stats.Gauge(key).send('new_jobs_with_hosts',
    842                                        new_jobs_with_hosts)
    843         autotest_stats.Gauge(key).send('new_jobs_without_hosts',
    844                                        new_jobs_need_hosts -
    845                                        new_jobs_with_hosts)
    846 
    847 
    848     def _schedule_running_host_queue_entries(self):
    849         """
    850         Adds agents to the dispatcher.
    851 
    852         Any AgentTask, like the QueueTask, is wrapped in an Agent. The
    853         QueueTask for example, will have a job with a control file, and
    854         the agent will have methods that poll, abort and check if the queue
    855         task is finished. The dispatcher runs the agent_task, as well as
    856         other agents in it's _agents member, through _handle_agents, by
    857         calling the Agents tick().
    858 
    859         This method creates an agent for each HQE in one of (starting, running,
    860         gathering, parsing, archiving) states, and adds it to the dispatcher so
    861         it is handled by _handle_agents.
    862         """
    863         for agent_task in self._get_queue_entry_agent_tasks():
    864             self.add_agent_task(agent_task)
    865 
    866 
    867     def _schedule_delay_tasks(self):
    868         for entry in scheduler_models.HostQueueEntry.fetch(
    869                 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
    870             task = entry.job.schedule_delayed_callback_task(entry)
    871             if task:
    872                 self.add_agent_task(task)
    873 
    874 
    875     def _find_aborting(self):
    876         """
    877         Looks through the afe_host_queue_entries for an aborted entry.
    878 
    879         The aborted bit is set on an HQE in many ways, the most common
    880         being when a user requests an abort through the frontend, which
    881         results in an rpc from the afe to abort_host_queue_entries.
    882         """
    883         jobs_to_stop = set()
    884         for entry in scheduler_models.HostQueueEntry.fetch(
    885                 where='aborted=1 and complete=0'):
    886 
    887             # If the job is running on a shard, let the shard handle aborting
    888             # it and sync back the right status.
    889             if entry.job.shard_id is not None and not server_utils.is_shard():
    890                 logging.info('Waiting for shard %s to abort hqe %s',
    891                         entry.job.shard_id, entry)
    892                 continue
    893 
    894             logging.info('Aborting %s', entry)
    895 
    896             # The task would have started off with both is_complete and
    897             # is_active = False. Aborted tasks are neither active nor complete.
    898             # For all currently active tasks this will happen through the agent,
    899             # but we need to manually update the special tasks that haven't
    900             # started yet, because they don't have agents.
    901             models.SpecialTask.objects.filter(is_active=False,
    902                 queue_entry_id=entry.id).update(is_complete=True)
    903 
    904             for agent in self.get_agents_for_entry(entry):
    905                 agent.abort()
    906             entry.abort(self)
    907             jobs_to_stop.add(entry.job)
    908         logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
    909         for job in jobs_to_stop:
    910             job.stop_if_necessary()
    911 
    912 
    913     def _find_aborted_special_tasks(self):
    914         """
    915         Find SpecialTasks that have been marked for abortion.
    916 
    917         Poll the database looking for SpecialTasks that are active
    918         and have been marked for abortion, then abort them.
    919         """
    920 
    921         # The completed and active bits are very important when it comes
    922         # to scheduler correctness. The active bit is set through the prolog
    923         # of a special task, and reset through the cleanup method of the
    924         # SpecialAgentTask. The cleanup is called both through the abort and
    925         # epilog. The complete bit is set in several places, and in general
    926         # a hanging job will have is_active=1 is_complete=0, while a special
    927         # task which completed will have is_active=0 is_complete=1. To check
    928         # aborts we directly check active because the complete bit is set in
    929         # several places, including the epilog of agent tasks.
    930         aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
    931                                                           is_aborted=True)
    932         for task in aborted_tasks:
    933             # There are 2 ways to get the agent associated with a task,
    934             # through the host and through the hqe. A special task
    935             # always needs a host, but doesn't always need a hqe.
    936             for agent in self._host_agents.get(task.host.id, []):
    937                 if isinstance(agent.task, agent_task.SpecialAgentTask):
    938 
    939                     # The epilog preforms critical actions such as
    940                     # queueing the next SpecialTask, requeuing the
    941                     # hqe etc, however it doesn't actually kill the
    942                     # monitor process and set the 'done' bit. Epilogs
    943                     # assume that the job failed, and that the monitor
    944                     # process has already written an exit code. The
    945                     # done bit is a necessary condition for
    946                     # _handle_agents to schedule any more special
    947                     # tasks against the host, and it must be set
    948                     # in addition to is_active, is_complete and success.
    949                     agent.task.epilog()
    950                     agent.task.abort()
    951 
    952 
    953     def _can_start_agent(self, agent, have_reached_limit):
    954         # always allow zero-process agents to run
    955         if agent.task.num_processes == 0:
    956             return True
    957         # don't allow any nonzero-process agents to run after we've reached a
    958         # limit (this avoids starvation of many-process agents)
    959         if have_reached_limit:
    960             return False
    961         # total process throttling
    962         max_runnable_processes = _drone_manager.max_runnable_processes(
    963                 agent.task.owner_username,
    964                 agent.task.get_drone_hostnames_allowed())
    965         if agent.task.num_processes > max_runnable_processes:
    966             return False
    967         return True
    968 
    969 
    970     def _handle_agents(self):
    971         """
    972         Handles agents of the dispatcher.
    973 
    974         Appropriate Agents are added to the dispatcher through
    975         _schedule_running_host_queue_entries. These agents each
    976         have a task. This method runs the agents task through
    977         agent.tick() leading to:
    978             agent.start
    979                 prolog -> AgentTasks prolog
    980                           For each queue entry:
    981                             sets host status/status to Running
    982                             set started_on in afe_host_queue_entries
    983                 run    -> AgentTasks run
    984                           Creates PidfileRunMonitor
    985                           Queues the autoserv command line for this AgentTask
    986                           via the drone manager. These commands are executed
    987                           through the drone managers execute actions.
    988                 poll   -> AgentTasks/BaseAgentTask poll
    989                           checks the monitors exit_code.
    990                           Executes epilog if task is finished.
    991                           Executes AgentTasks _finish_task
    992                 finish_task is usually responsible for setting the status
    993                 of the HQE/host, and updating it's active and complete fileds.
    994 
    995             agent.is_done
    996                 Removed the agent from the dispatchers _agents queue.
    997                 Is_done checks the finished bit on the agent, that is
    998                 set based on the Agents task. During the agents poll
    999                 we check to see if the monitor process has exited in
   1000                 it's finish method, and set the success member of the
   1001                 task based on this exit code.
   1002         """
   1003         num_started_this_tick = 0
   1004         num_finished_this_tick = 0
   1005         have_reached_limit = False
   1006         # iterate over copy, so we can remove agents during iteration
   1007         logging.debug('Handling %d Agents', len(self._agents))
   1008         for agent in list(self._agents):
   1009             self._log_extra_msg('Processing Agent with Host Ids: %s and '
   1010                                 'queue_entry ids:%s' % (agent.host_ids,
   1011                                 agent.queue_entry_ids))
   1012             if not agent.started:
   1013                 if not self._can_start_agent(agent, have_reached_limit):
   1014                     have_reached_limit = True
   1015                     logging.debug('Reached Limit of allowed running Agents.')
   1016                     continue
   1017                 num_started_this_tick += agent.task.num_processes
   1018                 self._log_extra_msg('Starting Agent')
   1019             agent.tick()
   1020             self._log_extra_msg('Agent tick completed.')
   1021             if agent.is_done():
   1022                 num_finished_this_tick += agent.task.num_processes
   1023                 self._log_extra_msg("Agent finished")
   1024                 self.remove_agent(agent)
   1025         autotest_stats.Gauge('scheduler.jobs_per_tick').send(
   1026                 'agents_started', num_started_this_tick)
   1027         autotest_stats.Gauge('scheduler.jobs_per_tick').send(
   1028                 'agents_finished', num_finished_this_tick)
   1029         logging.info('%d running processes. %d added this tick.',
   1030                      _drone_manager.total_running_processes(),
   1031                      num_started_this_tick)
   1032 
   1033 
   1034     def _process_recurring_runs(self):
   1035         recurring_runs = models.RecurringRun.objects.filter(
   1036             start_date__lte=datetime.datetime.now())
   1037         for rrun in recurring_runs:
   1038             # Create job from template
   1039             job = rrun.job
   1040             info = rpc_utils.get_job_info(job)
   1041             options = job.get_object_dict()
   1042 
   1043             host_objects = info['hosts']
   1044             one_time_hosts = info['one_time_hosts']
   1045             metahost_objects = info['meta_hosts']
   1046             dependencies = info['dependencies']
   1047             atomic_group = info['atomic_group']
   1048 
   1049             for host in one_time_hosts or []:
   1050                 this_host = models.Host.create_one_time_host(host.hostname)
   1051                 host_objects.append(this_host)
   1052 
   1053             try:
   1054                 rpc_utils.create_new_job(owner=rrun.owner.login,
   1055                                          options=options,
   1056                                          host_objects=host_objects,
   1057                                          metahost_objects=metahost_objects,
   1058                                          atomic_group=atomic_group)
   1059 
   1060             except Exception, ex:
   1061                 logging.exception(ex)
   1062                 #TODO send email
   1063 
   1064             if rrun.loop_count == 1:
   1065                 rrun.delete()
   1066             else:
   1067                 if rrun.loop_count != 0: # if not infinite loop
   1068                     # calculate new start_date
   1069                     difference = datetime.timedelta(seconds=rrun.loop_period)
   1070                     rrun.start_date = rrun.start_date + difference
   1071                     rrun.loop_count -= 1
   1072                     rrun.save()
   1073 
   1074 
   1075 SiteDispatcher = utils.import_site_class(
   1076     __file__, 'autotest_lib.scheduler.site_monitor_db',
   1077     'SiteDispatcher', BaseDispatcher)
   1078 
   1079 class Dispatcher(SiteDispatcher):
   1080     pass
   1081 
   1082 
   1083 class Agent(object):
   1084     """
   1085     An agent for use by the Dispatcher class to perform a task.  An agent wraps
   1086     around an AgentTask mainly to associate the AgentTask with the queue_entry
   1087     and host ids.
   1088 
   1089     The following methods are required on all task objects:
   1090         poll() - Called periodically to let the task check its status and
   1091                 update its internal state.  If the task succeeded.
   1092         is_done() - Returns True if the task is finished.
   1093         abort() - Called when an abort has been requested.  The task must
   1094                 set its aborted attribute to True if it actually aborted.
   1095 
   1096     The following attributes are required on all task objects:
   1097         aborted - bool, True if this task was aborted.
   1098         success - bool, True if this task succeeded.
   1099         queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
   1100         host_ids - A sequence of Host ids this task represents.
   1101     """
   1102 
   1103 
   1104     def __init__(self, task):
   1105         """
   1106         @param task: An instance of an AgentTask.
   1107         """
   1108         self.task = task
   1109 
   1110         # This is filled in by Dispatcher.add_agent()
   1111         self.dispatcher = None
   1112 
   1113         self.queue_entry_ids = task.queue_entry_ids
   1114         self.host_ids = task.host_ids
   1115 
   1116         self.started = False
   1117         self.finished = False
   1118 
   1119 
   1120     def tick(self):
   1121         self.started = True
   1122         if not self.finished:
   1123             self.task.poll()
   1124             if self.task.is_done():
   1125                 self.finished = True
   1126 
   1127 
   1128     def is_done(self):
   1129         return self.finished
   1130 
   1131 
   1132     def abort(self):
   1133         if self.task:
   1134             self.task.abort()
   1135             if self.task.aborted:
   1136                 # tasks can choose to ignore aborts
   1137                 self.finished = True
   1138 
   1139 
   1140 class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
   1141     """
   1142     Common functionality for QueueTask and HostlessQueueTask
   1143     """
   1144     def __init__(self, queue_entries):
   1145         super(AbstractQueueTask, self).__init__()
   1146         self.job = queue_entries[0].job
   1147         self.queue_entries = queue_entries
   1148 
   1149 
   1150     def _keyval_path(self):
   1151         return os.path.join(self._working_directory(), self._KEYVAL_FILE)
   1152 
   1153 
   1154     def _write_control_file(self, execution_path):
   1155         control_path = _drone_manager.attach_file_to_execution(
   1156                 execution_path, self.job.control_file)
   1157         return control_path
   1158 
   1159 
   1160     # TODO: Refactor into autoserv_utils. crbug.com/243090
   1161     def _command_line(self):
   1162         execution_path = self.queue_entries[0].execution_path()
   1163         control_path = self._write_control_file(execution_path)
   1164         hostnames = ','.join(entry.host.hostname
   1165                              for entry in self.queue_entries
   1166                              if not entry.is_hostless())
   1167 
   1168         execution_tag = self.queue_entries[0].execution_tag()
   1169         params = _autoserv_command_line(
   1170             hostnames,
   1171             ['-P', execution_tag, '-n',
   1172              _drone_manager.absolute_path(control_path)],
   1173             job=self.job, verbose=False)
   1174         if self.job.is_image_update_job():
   1175             params += ['--image', self.job.update_image_path]
   1176 
   1177         return params
   1178 
   1179 
   1180     @property
   1181     def num_processes(self):
   1182         return len(self.queue_entries)
   1183 
   1184 
   1185     @property
   1186     def owner_username(self):
   1187         return self.job.owner
   1188 
   1189 
   1190     def _working_directory(self):
   1191         return self._get_consistent_execution_path(self.queue_entries)
   1192 
   1193 
   1194     def prolog(self):
   1195         queued_key, queued_time = self._job_queued_keyval(self.job)
   1196         keyval_dict = self.job.keyval_dict()
   1197         keyval_dict[queued_key] = queued_time
   1198         group_name = self.queue_entries[0].get_group_name()
   1199         if group_name:
   1200             keyval_dict['host_group_name'] = group_name
   1201         self._write_keyvals_before_job(keyval_dict)
   1202         for queue_entry in self.queue_entries:
   1203             queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
   1204             queue_entry.set_started_on_now()
   1205 
   1206 
   1207     def _write_lost_process_error_file(self):
   1208         error_file_path = os.path.join(self._working_directory(), 'job_failure')
   1209         _drone_manager.write_lines_to_file(error_file_path,
   1210                                            [_LOST_PROCESS_ERROR])
   1211 
   1212 
   1213     def _finish_task(self):
   1214         if not self.monitor:
   1215             return
   1216 
   1217         self._write_job_finished()
   1218 
   1219         if self.monitor.lost_process:
   1220             self._write_lost_process_error_file()
   1221 
   1222 
   1223     def _write_status_comment(self, comment):
   1224         _drone_manager.write_lines_to_file(
   1225             os.path.join(self._working_directory(), 'status.log'),
   1226             ['INFO\t----\t----\t' + comment],
   1227             paired_with_process=self.monitor.get_process())
   1228 
   1229 
   1230     def _log_abort(self):
   1231         if not self.monitor or not self.monitor.has_process():
   1232             return
   1233 
   1234         # build up sets of all the aborted_by and aborted_on values
   1235         aborted_by, aborted_on = set(), set()
   1236         for queue_entry in self.queue_entries:
   1237             if queue_entry.aborted_by:
   1238                 aborted_by.add(queue_entry.aborted_by)
   1239                 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
   1240                 aborted_on.add(t)
   1241 
   1242         # extract some actual, unique aborted by value and write it out
   1243         # TODO(showard): this conditional is now obsolete, we just need to leave
   1244         # it in temporarily for backwards compatibility over upgrades.  delete
   1245         # soon.
   1246         assert len(aborted_by) <= 1
   1247         if len(aborted_by) == 1:
   1248             aborted_by_value = aborted_by.pop()
   1249             aborted_on_value = max(aborted_on)
   1250         else:
   1251             aborted_by_value = 'autotest_system'
   1252             aborted_on_value = int(time.time())
   1253 
   1254         self._write_keyval_after_job("aborted_by", aborted_by_value)
   1255         self._write_keyval_after_job("aborted_on", aborted_on_value)
   1256 
   1257         aborted_on_string = str(datetime.datetime.fromtimestamp(
   1258             aborted_on_value))
   1259         self._write_status_comment('Job aborted by %s on %s' %
   1260                                    (aborted_by_value, aborted_on_string))
   1261 
   1262 
   1263     def abort(self):
   1264         super(AbstractQueueTask, self).abort()
   1265         self._log_abort()
   1266         self._finish_task()
   1267 
   1268 
   1269     def epilog(self):
   1270         super(AbstractQueueTask, self).epilog()
   1271         self._finish_task()
   1272 
   1273 
   1274 class QueueTask(AbstractQueueTask):
   1275     def __init__(self, queue_entries):
   1276         super(QueueTask, self).__init__(queue_entries)
   1277         self._set_ids(queue_entries=queue_entries)
   1278 
   1279 
   1280     def prolog(self):
   1281         self._check_queue_entry_statuses(
   1282                 self.queue_entries,
   1283                 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
   1284                                       models.HostQueueEntry.Status.RUNNING),
   1285                 allowed_host_statuses=(models.Host.Status.PENDING,
   1286                                        models.Host.Status.RUNNING))
   1287 
   1288         super(QueueTask, self).prolog()
   1289 
   1290         for queue_entry in self.queue_entries:
   1291             self._write_host_keyvals(queue_entry.host)
   1292             queue_entry.host.set_status(models.Host.Status.RUNNING)
   1293             queue_entry.host.update_field('dirty', 1)
   1294 
   1295 
   1296     def _finish_task(self):
   1297         super(QueueTask, self)._finish_task()
   1298 
   1299         for queue_entry in self.queue_entries:
   1300             queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
   1301             queue_entry.host.set_status(models.Host.Status.RUNNING)
   1302 
   1303 
   1304     def _command_line(self):
   1305         invocation = super(QueueTask, self)._command_line()
   1306         # Check if server-side packaging is needed.
   1307         if (_enable_ssp_container and
   1308             self.job.control_type == control_data.CONTROL_TYPE.SERVER and
   1309             self.job.require_ssp != False):
   1310             invocation += ['--require-ssp']
   1311             keyval_dict = self.job.keyval_dict()
   1312             test_source_build = keyval_dict.get('test_source_build', None)
   1313             if test_source_build:
   1314                 invocation += ['--test_source_build', test_source_build]
   1315         if self.job.parent_job_id:
   1316             invocation += ['--parent_job_id', str(self.job.parent_job_id)]
   1317         return invocation + ['--verify_job_repo_url']
   1318 
   1319 
   1320 class HostlessQueueTask(AbstractQueueTask):
   1321     def __init__(self, queue_entry):
   1322         super(HostlessQueueTask, self).__init__([queue_entry])
   1323         self.queue_entry_ids = [queue_entry.id]
   1324 
   1325 
   1326     def prolog(self):
   1327         super(HostlessQueueTask, self).prolog()
   1328 
   1329 
   1330     def _finish_task(self):
   1331         super(HostlessQueueTask, self)._finish_task()
   1332 
   1333         # When a job is added to database, its initial status is always
   1334         # Starting. In a scheduler tick, scheduler finds all jobs in Starting
   1335         # status, check if any of them can be started. If scheduler hits some
   1336         # limit, e.g., max_hostless_jobs_per_drone, scheduler will
   1337         # leave these jobs in Starting status. Otherwise, the jobs'
   1338         # status will be changed to Running, and an autoserv process
   1339         # will be started in drone for each of these jobs.
   1340         # If the entry is still in status Starting, the process has not started
   1341         # yet. Therefore, there is no need to parse and collect log. Without
   1342         # this check, exception will be raised by scheduler as execution_subdir
   1343         # for this queue entry does not have a value yet.
   1344         hqe = self.queue_entries[0]
   1345         if hqe.status != models.HostQueueEntry.Status.STARTING:
   1346             hqe.set_status(models.HostQueueEntry.Status.PARSING)
   1347 
   1348 
   1349 if __name__ == '__main__':
   1350     main()
   1351