Home | History | Annotate | Download | only in scheduler
      1 #pylint: disable-msg=C0111
      2 
      3 """
      4 Postjob task.
      5 
      6 Postjob tasks are responsible for setting the final status of the HQE
      7 and Host, and scheduling additional special agents such as cleanup,
      8 if necessary.
      9 """
     10 
     11 import os
     12 
     13 from autotest_lib.client.common_lib import utils
     14 from autotest_lib.frontend.afe import models, model_attributes
     15 from autotest_lib.scheduler import agent_task, drones, drone_manager
     16 from autotest_lib.scheduler import email_manager, pidfile_monitor
     17 from autotest_lib.scheduler import scheduler_config
     18 from autotest_lib.server import autoserv_utils
     19 
     20 try:
     21     from chromite.lib import metrics
     22 except ImportError:
     23     metrics = utils.metrics_mock
     24 
     25 
     26 _parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
     27 
     28 
     29 class PostJobTask(agent_task.AgentTask):
     30     def __init__(self, queue_entries, log_file_name):
     31         super(PostJobTask, self).__init__(log_file_name=log_file_name)
     32 
     33         self.queue_entries = queue_entries
     34 
     35         self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
     36         self._autoserv_monitor.attach_to_existing_process(
     37                 self._working_directory())
     38 
     39 
     40     def _command_line(self):
     41         # Do we need testing_mode?
     42         return self._generate_command(
     43                 self._drone_manager.absolute_path(self._working_directory()))
     44 
     45 
     46     def _generate_command(self, results_dir):
     47         raise NotImplementedError('Subclasses must override this')
     48 
     49 
     50     @property
     51     def owner_username(self):
     52         return self.queue_entries[0].job.owner
     53 
     54 
     55     def _working_directory(self):
     56         return self._get_consistent_execution_path(self.queue_entries)
     57 
     58 
     59     def _paired_with_monitor(self):
     60         return self._autoserv_monitor
     61 
     62 
     63     def _job_was_aborted(self):
     64         was_aborted = None
     65         for queue_entry in self.queue_entries:
     66             queue_entry.update_from_database()
     67             if was_aborted is None: # first queue entry
     68                 was_aborted = bool(queue_entry.aborted)
     69             elif was_aborted != bool(queue_entry.aborted): # subsequent entries
     70                 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
     71                            for entry in self.queue_entries]
     72                 email_manager.manager.enqueue_notify_email(
     73                         'Inconsistent abort state',
     74                         'Queue entries have inconsistent abort state:\n' +
     75                         '\n'.join(entries))
     76                 # don't crash here, just assume true
     77                 return True
     78         return was_aborted
     79 
     80 
     81     def _final_status(self):
     82         if self._job_was_aborted():
     83             return models.HostQueueEntry.Status.ABORTED
     84 
     85         # we'll use a PidfileRunMonitor to read the autoserv exit status
     86         if self._autoserv_monitor.exit_code() == 0:
     87             return models.HostQueueEntry.Status.COMPLETED
     88         return models.HostQueueEntry.Status.FAILED
     89 
     90 
     91     def _set_all_statuses(self, status):
     92         for queue_entry in self.queue_entries:
     93             queue_entry.set_status(status)
     94 
     95 
     96     def abort(self):
     97         # override AgentTask.abort() to avoid killing the process and ending
     98         # the task.  post-job tasks continue when the job is aborted.
     99         pass
    100 
    101 
    102     def _pidfile_label(self):
    103         # '.autoserv_execute' -> 'autoserv'
    104         return self._pidfile_name()[1:-len('_execute')]
    105 
    106 
    107 class SelfThrottledPostJobTask(PostJobTask):
    108     """
    109     PostJobTask that maintains its own process limit.
    110 
    111     We throttle tasks like parsing because we don't want them to
    112     hold up tests. At the same time we don't wish to build up load
    113     that will take forever to parse.
    114     """
    115     _num_running_processes = 0
    116     # Last known limit of max processes, used to check whether
    117     # max processes config has been changed.
    118     _last_known_max_processes = 0
    119     # Whether an email should be sent to notifiy process limit being hit.
    120     _notification_on = True
    121     # Once process limit is hit, an email will be sent.
    122     # To prevent spams, do not send another email until
    123     # it drops to lower than the following level.
    124     REVIVE_NOTIFICATION_THRESHOLD = 0.80
    125 
    126     @classmethod
    127     def _gauge_metrics(cls):
    128         """Report to monarch the number of running processes."""
    129         m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks')
    130         m.set(cls._num_running_processes, fields={'task_name': cls.__name__})
    131 
    132 
    133     @classmethod
    134     def _increment_running_processes(cls):
    135         cls._num_running_processes += 1
    136         cls._gauge_metrics()
    137 
    138 
    139     @classmethod
    140     def _decrement_running_processes(cls):
    141         cls._num_running_processes -= 1
    142         cls._gauge_metrics()
    143 
    144 
    145     @classmethod
    146     def _max_processes(cls):
    147         raise NotImplementedError
    148 
    149 
    150     @classmethod
    151     def _can_run_new_process(cls):
    152         return cls._num_running_processes < cls._max_processes()
    153 
    154 
    155     def _process_started(self):
    156         return bool(self.monitor)
    157 
    158 
    159     def tick(self):
    160         # override tick to keep trying to start until the process count goes
    161         # down and we can, at which point we revert to default behavior
    162         if self._process_started():
    163             super(SelfThrottledPostJobTask, self).tick()
    164         else:
    165             self._try_starting_process()
    166 
    167 
    168     def run(self):
    169         # override run() to not actually run unless we can
    170         self._try_starting_process()
    171 
    172 
    173     @classmethod
    174     def _notify_process_limit_hit(cls):
    175         """Send an email to notify that process limit is hit."""
    176         if cls._notification_on:
    177             subject = '%s: hitting max process limit.' % cls.__name__
    178             message = ('Running processes/Max processes: %d/%d'
    179                        % (cls._num_running_processes, cls._max_processes()))
    180             email_manager.manager.enqueue_notify_email(subject, message)
    181             cls._notification_on = False
    182 
    183 
    184     @classmethod
    185     def _reset_notification_switch_if_necessary(cls):
    186         """Reset _notification_on if necessary.
    187 
    188         Set _notification_on to True on the following cases:
    189         1) If the limit of max processes configuration changes;
    190         2) If _notification_on is False and the number of running processes
    191            drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
    192 
    193         """
    194         if cls._last_known_max_processes != cls._max_processes():
    195             cls._notification_on = True
    196             cls._last_known_max_processes = cls._max_processes()
    197             return
    198         percentage = float(cls._num_running_processes) / cls._max_processes()
    199         if (not cls._notification_on and
    200             percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
    201             cls._notification_on = True
    202 
    203 
    204     def _try_starting_process(self):
    205         self._reset_notification_switch_if_necessary()
    206         if not self._can_run_new_process():
    207             self._notify_process_limit_hit()
    208             return
    209 
    210         # actually run the command
    211         super(SelfThrottledPostJobTask, self).run()
    212         if self._process_started():
    213             self._increment_running_processes()
    214 
    215 
    216     def finished(self, success):
    217         super(SelfThrottledPostJobTask, self).finished(success)
    218         if self._process_started():
    219             self._decrement_running_processes()
    220 
    221 
    222 class GatherLogsTask(PostJobTask):
    223     """
    224     Task responsible for
    225     * gathering uncollected logs (if Autoserv crashed hard or was killed)
    226     * copying logs to the results repository
    227     * spawning CleanupTasks for hosts, if necessary
    228     * spawning a FinalReparseTask for the job
    229     * setting the final status of the host, directly or through a cleanup
    230     """
    231     def __init__(self, queue_entries, recover_run_monitor=None):
    232         self._job = queue_entries[0].job
    233         super(GatherLogsTask, self).__init__(
    234             queue_entries, log_file_name='.collect_crashinfo.log')
    235         self._set_ids(queue_entries=queue_entries)
    236 
    237 
    238     # TODO: Refactor into autoserv_utils. crbug.com/243090
    239     def _generate_command(self, results_dir):
    240         host_list = ','.join(queue_entry.host.hostname
    241                              for queue_entry in self.queue_entries)
    242         return [autoserv_utils.autoserv_path , '-p',
    243                 '--pidfile-label=%s' % self._pidfile_label(),
    244                 '--use-existing-results', '--collect-crashinfo',
    245                 '-m', host_list, '-r', results_dir]
    246 
    247 
    248     @property
    249     def num_processes(self):
    250         return len(self.queue_entries)
    251 
    252 
    253     def _pidfile_name(self):
    254         return drone_manager.CRASHINFO_PID_FILE
    255 
    256 
    257     def prolog(self):
    258         self._check_queue_entry_statuses(
    259                 self.queue_entries,
    260                 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
    261                 allowed_host_statuses=(models.Host.Status.RUNNING,))
    262 
    263         super(GatherLogsTask, self).prolog()
    264 
    265 
    266     def epilog(self):
    267         super(GatherLogsTask, self).epilog()
    268         self._parse_results(self.queue_entries)
    269         self._reboot_hosts()
    270 
    271 
    272     def _reboot_hosts(self):
    273         if self._autoserv_monitor.has_process():
    274             final_success = (self._final_status() ==
    275                              models.HostQueueEntry.Status.COMPLETED)
    276             num_tests_failed = self._autoserv_monitor.num_tests_failed()
    277         else:
    278             final_success = False
    279             num_tests_failed = 0
    280         reboot_after = self._job.reboot_after
    281         do_reboot = (
    282                 # always reboot after aborted jobs
    283                 self._final_status() == models.HostQueueEntry.Status.ABORTED
    284                 or reboot_after == model_attributes.RebootAfter.ALWAYS
    285                 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
    286                     and final_success and num_tests_failed == 0)
    287                 or num_tests_failed > 0)
    288 
    289         for queue_entry in self.queue_entries:
    290             if do_reboot:
    291                 # don't pass the queue entry to the CleanupTask. if the cleanup
    292                 # fails, the job doesn't care -- it's over.
    293                 models.SpecialTask.objects.create(
    294                         host=models.Host.objects.get(id=queue_entry.host.id),
    295                         task=models.SpecialTask.Task.CLEANUP,
    296                         requested_by=self._job.owner_model())
    297             else:
    298                 queue_entry.host.set_status(models.Host.Status.READY)
    299 
    300 
    301     def run(self):
    302         autoserv_exit_code = self._autoserv_monitor.exit_code()
    303         # only run if Autoserv exited due to some signal. if we have no exit
    304         # code, assume something bad (and signal-like) happened.
    305         if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
    306             super(GatherLogsTask, self).run()
    307         else:
    308             self.finished(True)
    309 
    310 
    311 class FinalReparseTask(SelfThrottledPostJobTask):
    312     def __init__(self, queue_entries):
    313         super(FinalReparseTask, self).__init__(queue_entries,
    314                                                log_file_name='.parse.log')
    315         # don't use _set_ids, since we don't want to set the host_ids
    316         self.queue_entry_ids = [entry.id for entry in queue_entries]
    317 
    318 
    319     def _generate_command(self, results_dir):
    320         return [_parser_path, '--write-pidfile', '--record-duration',
    321                 '--suite-report', '-l', '2', '-r', '-o', results_dir]
    322 
    323 
    324     @property
    325     def num_processes(self):
    326         return 0 # don't include parser processes in accounting
    327 
    328 
    329     def _pidfile_name(self):
    330         return drone_manager.PARSER_PID_FILE
    331 
    332 
    333     @classmethod
    334     def _max_processes(cls):
    335         return scheduler_config.config.max_parse_processes
    336 
    337 
    338     def prolog(self):
    339         self._check_queue_entry_statuses(
    340                 self.queue_entries,
    341                 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
    342 
    343         super(FinalReparseTask, self).prolog()
    344 
    345 
    346     def epilog(self):
    347         super(FinalReparseTask, self).epilog()
    348         self._set_all_statuses(self._final_status())
    349