Home | History | Annotate | Download | only in scheduler
      1 #pylint: disable-msg=C0111
      2 
      3 """
      4 Postjob task.
      5 
      6 Postjob tasks are responsible for setting the final status of the HQE
      7 and Host, and scheduling additional special agents such as cleanup,
      8 if necessary.
      9 """
     10 
     11 import os
     12 
     13 from autotest_lib.client.common_lib import utils
     14 from autotest_lib.frontend.afe import models, model_attributes
     15 from autotest_lib.scheduler import agent_task, drones, drone_manager
     16 from autotest_lib.scheduler import email_manager, pidfile_monitor
     17 from autotest_lib.scheduler import scheduler_config
     18 from autotest_lib.server import autoserv_utils
     19 
     20 try:
     21     from chromite.lib import metrics
     22 except ImportError:
     23     metrics = utils.metrics_mock
     24 
     25 
     26 _parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
     27 
     28 
     29 class PostJobTask(agent_task.AgentTask):
     30     def __init__(self, queue_entries, log_file_name):
     31         super(PostJobTask, self).__init__(log_file_name=log_file_name)
     32 
     33         self.queue_entries = queue_entries
     34 
     35         self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
     36         self._autoserv_monitor.attach_to_existing_process(
     37                 self._working_directory())
     38 
     39 
     40     def _command_line(self):
     41         # Do we need testing_mode?
     42         return self._generate_command(
     43                 self._drone_manager.absolute_path(self._working_directory()))
     44 
     45 
     46     def _generate_command(self, results_dir):
     47         raise NotImplementedError('Subclasses must override this')
     48 
     49 
     50     @property
     51     def owner_username(self):
     52         return self.queue_entries[0].job.owner
     53 
     54 
     55     def _working_directory(self):
     56         return self._get_consistent_execution_path(self.queue_entries)
     57 
     58 
     59     def _paired_with_monitor(self):
     60         return self._autoserv_monitor
     61 
     62 
     63     def _job_was_aborted(self):
     64         was_aborted = None
     65         for queue_entry in self.queue_entries:
     66             queue_entry.update_from_database()
     67             if was_aborted is None: # first queue entry
     68                 was_aborted = bool(queue_entry.aborted)
     69             elif was_aborted != bool(queue_entry.aborted): # subsequent entries
     70                 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
     71                            for entry in self.queue_entries]
     72                 email_manager.manager.enqueue_notify_email(
     73                         'Inconsistent abort state',
     74                         'Queue entries have inconsistent abort state:\n' +
     75                         '\n'.join(entries))
     76                 # don't crash here, just assume true
     77                 return True
     78         return was_aborted
     79 
     80 
     81     def _final_status(self):
     82         if self._job_was_aborted():
     83             return models.HostQueueEntry.Status.ABORTED
     84 
     85         # we'll use a PidfileRunMonitor to read the autoserv exit status
     86         if self._autoserv_monitor.exit_code() == 0:
     87             return models.HostQueueEntry.Status.COMPLETED
     88         return models.HostQueueEntry.Status.FAILED
     89 
     90 
     91     def _set_all_statuses(self, status):
     92         for queue_entry in self.queue_entries:
     93             queue_entry.set_status(status)
     94 
     95 
     96     def abort(self):
     97         # override AgentTask.abort() to avoid killing the process and ending
     98         # the task.  post-job tasks continue when the job is aborted.
     99         pass
    100 
    101 
    102     def _pidfile_label(self):
    103         # '.autoserv_execute' -> 'autoserv'
    104         return self._pidfile_name()[1:-len('_execute')]
    105 
    106 
    107 class SelfThrottledPostJobTask(PostJobTask):
    108     """
    109     PostJobTask that maintains its own process limit.
    110 
    111     We throttle tasks like parsing because we don't want them to
    112     hold up tests. At the same time we don't wish to build up load
    113     that will take forever to parse.
    114     """
    115     _num_running_processes = 0
    116     # Last known limit of max processes, used to check whether
    117     # max processes config has been changed.
    118     _last_known_max_processes = 0
    119     # Whether an email should be sent to notifiy process limit being hit.
    120     _notification_on = True
    121     # Once process limit is hit, an email will be sent.
    122     # To prevent spams, do not send another email until
    123     # it drops to lower than the following level.
    124     REVIVE_NOTIFICATION_THRESHOLD = 0.80
    125 
    126     @classmethod
    127     def _gauge_metrics(cls):
    128         """Report to monarch the number of running processes."""
    129         m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks')
    130         m.set(cls._num_running_processes, fields={'task_name': cls.__name__})
    131 
    132 
    133     @classmethod
    134     def _increment_running_processes(cls):
    135         cls._num_running_processes += 1
    136         cls._gauge_metrics()
    137 
    138 
    139     @classmethod
    140     def _decrement_running_processes(cls):
    141         cls._num_running_processes -= 1
    142         cls._gauge_metrics()
    143 
    144 
    145     @classmethod
    146     def _max_processes(cls):
    147         raise NotImplementedError
    148 
    149 
    150     @classmethod
    151     def _can_run_new_process(cls):
    152         return cls._num_running_processes < cls._max_processes()
    153 
    154 
    155     def _process_started(self):
    156         return bool(self.monitor)
    157 
    158 
    159     def tick(self):
    160         # override tick to keep trying to start until the process count goes
    161         # down and we can, at which point we revert to default behavior
    162         if self._process_started():
    163             super(SelfThrottledPostJobTask, self).tick()
    164         else:
    165             self._try_starting_process()
    166 
    167 
    168     def run(self):
    169         # override run() to not actually run unless we can
    170         self._try_starting_process()
    171 
    172 
    173     @classmethod
    174     def _notify_process_limit_hit(cls):
    175         """Send an email to notify that process limit is hit."""
    176         if cls._notification_on:
    177             subject = '%s: hitting max process limit.' % cls.__name__
    178             message = ('Running processes/Max processes: %d/%d'
    179                        % (cls._num_running_processes, cls._max_processes()))
    180             email_manager.manager.enqueue_notify_email(subject, message)
    181             cls._notification_on = False
    182 
    183 
    184     @classmethod
    185     def _reset_notification_switch_if_necessary(cls):
    186         """Reset _notification_on if necessary.
    187 
    188         Set _notification_on to True on the following cases:
    189         1) If the limit of max processes configuration changes;
    190         2) If _notification_on is False and the number of running processes
    191            drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
    192 
    193         """
    194         if cls._last_known_max_processes != cls._max_processes():
    195             cls._notification_on = True
    196             cls._last_known_max_processes = cls._max_processes()
    197             return
    198         percentage = float(cls._num_running_processes) / cls._max_processes()
    199         if (not cls._notification_on and
    200             percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
    201             cls._notification_on = True
    202 
    203 
    204     def _try_starting_process(self):
    205         self._reset_notification_switch_if_necessary()
    206         if not self._can_run_new_process():
    207             self._notify_process_limit_hit()
    208             return
    209 
    210         # actually run the command
    211         super(SelfThrottledPostJobTask, self).run()
    212         if self._process_started():
    213             self._increment_running_processes()
    214 
    215 
    216     def finished(self, success):
    217         super(SelfThrottledPostJobTask, self).finished(success)
    218         if self._process_started():
    219             self._decrement_running_processes()
    220 
    221 
    222 class GatherLogsTask(PostJobTask):
    223     """
    224     Task responsible for
    225     * gathering uncollected logs (if Autoserv crashed hard or was killed)
    226     * copying logs to the results repository
    227     * spawning CleanupTasks for hosts, if necessary
    228     * spawning a FinalReparseTask for the job
    229     * setting the final status of the host, directly or through a cleanup
    230     """
    231     def __init__(self, queue_entries, recover_run_monitor=None):
    232         self._job = queue_entries[0].job
    233         super(GatherLogsTask, self).__init__(
    234             queue_entries, log_file_name='.collect_crashinfo.log')
    235         self._set_ids(queue_entries=queue_entries)
    236 
    237 
    238     # TODO: Refactor into autoserv_utils. crbug.com/243090
    239     def _generate_command(self, results_dir):
    240         host_list = ','.join(queue_entry.host.hostname
    241                              for queue_entry in self.queue_entries)
    242         return [autoserv_utils.autoserv_path , '-p',
    243                 '--pidfile-label=%s' % self._pidfile_label(),
    244                 '--use-existing-results', '--collect-crashinfo',
    245                 '-m', host_list, '-r', results_dir]
    246 
    247 
    248     @property
    249     def num_processes(self):
    250         return len(self.queue_entries)
    251 
    252 
    253     def _pidfile_name(self):
    254         return drone_manager.CRASHINFO_PID_FILE
    255 
    256 
    257     def prolog(self):
    258         self._check_queue_entry_statuses(
    259                 self.queue_entries,
    260                 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
    261                 allowed_host_statuses=(models.Host.Status.RUNNING,))
    262 
    263         super(GatherLogsTask, self).prolog()
    264 
    265 
    266     def epilog(self):
    267         super(GatherLogsTask, self).epilog()
    268         self._parse_results(self.queue_entries)
    269         final_success, num_tests_failed = self._get_monitor_info()
    270         reset_after_failure = (
    271                 not self._job.run_reset and (
    272                     not final_success or num_tests_failed > 0))
    273         self._reboot_hosts(final_success, num_tests_failed, reset_after_failure)
    274         if reset_after_failure:
    275             m = metrics.Counter('chromeos/autotest/scheduler/postjob_tasks/'
    276                                 'reset_after_failure')
    277             m.increment(fields={'autoserv_process_success': final_success,
    278                                 'num_tests_failed': num_tests_failed > 0})
    279             self._reset_after_failure()
    280 
    281 
    282     def _get_monitor_info(self):
    283         """Read monitor info from pidfile.
    284 
    285         @returns: a tuple including
    286             final_success: whether the monitor is successfully finished;
    287             num_tests_failed: how many failed tests in the process.
    288         """
    289         if self._autoserv_monitor.has_process():
    290             final_success = (self._final_status() ==
    291                              models.HostQueueEntry.Status.COMPLETED)
    292             num_tests_failed = self._autoserv_monitor.num_tests_failed()
    293         else:
    294             final_success = False
    295             num_tests_failed = 0
    296 
    297         return final_success, num_tests_failed
    298 
    299 
    300     def _reboot_hosts(self, final_success, num_tests_failed,
    301                       reset_after_failure):
    302         """Reboot hosts by scheduling a CLEANUP task on host if needed.
    303 
    304         @param final_success: whether the monitor successfully exits.
    305         @param num_tests_failed: how many failed tests in total.
    306         @param reset_after_failure: whether to schedule RESET task later.
    307         """
    308         reboot_after = self._job.reboot_after
    309         do_reboot = (
    310                 # always reboot after aborted jobs
    311                 self._final_status() == models.HostQueueEntry.Status.ABORTED
    312                 or reboot_after == model_attributes.RebootAfter.ALWAYS
    313                 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
    314                     and final_success and num_tests_failed == 0)
    315                 or (num_tests_failed > 0 and not reset_after_failure))
    316 
    317         for queue_entry in self.queue_entries:
    318             if do_reboot:
    319                 # don't pass the queue entry to the CleanupTask. if the cleanup
    320                 # fails, the job doesn't care -- it's over.
    321                 models.SpecialTask.objects.create(
    322                         host=models.Host.objects.get(id=queue_entry.host.id),
    323                         task=models.SpecialTask.Task.CLEANUP,
    324                         requested_by=self._job.owner_model())
    325             else:
    326                 queue_entry.host.set_status(models.Host.Status.READY)
    327 
    328 
    329     def _reset_after_failure(self):
    330         """Queue a RESET job for the host if job fails.
    331 
    332         The current hqe entry is not passed into the RESET job.
    333         """
    334         for queue_entry in self.queue_entries:
    335             models.SpecialTask.objects.create(
    336                         host=models.Host.objects.get(id=queue_entry.host.id),
    337                         task=models.SpecialTask.Task.RESET,
    338                         requested_by=self._job.owner_model())
    339 
    340 
    341     def run(self):
    342         autoserv_exit_code = self._autoserv_monitor.exit_code()
    343         # only run if Autoserv exited due to some signal. if we have no exit
    344         # code, assume something bad (and signal-like) happened.
    345         if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
    346             super(GatherLogsTask, self).run()
    347         else:
    348             self.finished(True)
    349 
    350 
    351 class FinalReparseTask(SelfThrottledPostJobTask):
    352     def __init__(self, queue_entries):
    353         super(FinalReparseTask, self).__init__(queue_entries,
    354                                                log_file_name='.parse.log')
    355         # don't use _set_ids, since we don't want to set the host_ids
    356         self.queue_entry_ids = [entry.id for entry in queue_entries]
    357 
    358 
    359     def _generate_command(self, results_dir):
    360         return [_parser_path, '--detach', '--write-pidfile',
    361                 '--record-duration', '--suite-report', '-l', '2', '-r', '-o',
    362                 results_dir]
    363 
    364 
    365     @property
    366     def num_processes(self):
    367         return 0 # don't include parser processes in accounting
    368 
    369 
    370     def _pidfile_name(self):
    371         return drone_manager.PARSER_PID_FILE
    372 
    373 
    374     @classmethod
    375     def _max_processes(cls):
    376         return scheduler_config.config.max_parse_processes
    377 
    378 
    379     def prolog(self):
    380         self._check_queue_entry_statuses(
    381                 self.queue_entries,
    382                 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
    383 
    384         super(FinalReparseTask, self).prolog()
    385 
    386 
    387     def epilog(self):
    388         super(FinalReparseTask, self).epilog()
    389         self._set_all_statuses(self._final_status())
    390