Home | History | Annotate | Download | only in scheduler
      1 #pylint: disable-msg=C0111
      2 
      3 """
      4 Postjob task.
      5 
      6 Postjob tasks are responsible for setting the final status of the HQE
      7 and Host, and scheduling additional special agents such as cleanup,
      8 if necessary.
      9 """
     10 
     11 import os
     12 
     13 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     14 from autotest_lib.frontend.afe import models, model_attributes
     15 from autotest_lib.scheduler import agent_task, drones, drone_manager
     16 from autotest_lib.scheduler import email_manager, pidfile_monitor
     17 from autotest_lib.scheduler import scheduler_config
     18 from autotest_lib.server import autoserv_utils
     19 
     20 
     21 _parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
     22 
     23 
     24 class PostJobTask(agent_task.AgentTask):
     25     def __init__(self, queue_entries, log_file_name):
     26         super(PostJobTask, self).__init__(log_file_name=log_file_name)
     27 
     28         self.queue_entries = queue_entries
     29 
     30         self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
     31         self._autoserv_monitor.attach_to_existing_process(
     32                 self._working_directory())
     33 
     34 
     35     def _command_line(self):
     36         # Do we need testing_mode?
     37         return self._generate_command(
     38                 self._drone_manager.absolute_path(self._working_directory()))
     39 
     40 
     41     def _generate_command(self, results_dir):
     42         raise NotImplementedError('Subclasses must override this')
     43 
     44 
     45     @property
     46     def owner_username(self):
     47         return self.queue_entries[0].job.owner
     48 
     49 
     50     def _working_directory(self):
     51         return self._get_consistent_execution_path(self.queue_entries)
     52 
     53 
     54     def _paired_with_monitor(self):
     55         return self._autoserv_monitor
     56 
     57 
     58     def _job_was_aborted(self):
     59         was_aborted = None
     60         for queue_entry in self.queue_entries:
     61             queue_entry.update_from_database()
     62             if was_aborted is None: # first queue entry
     63                 was_aborted = bool(queue_entry.aborted)
     64             elif was_aborted != bool(queue_entry.aborted): # subsequent entries
     65                 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
     66                            for entry in self.queue_entries]
     67                 email_manager.manager.enqueue_notify_email(
     68                         'Inconsistent abort state',
     69                         'Queue entries have inconsistent abort state:\n' +
     70                         '\n'.join(entries))
     71                 # don't crash here, just assume true
     72                 return True
     73         return was_aborted
     74 
     75 
     76     def _final_status(self):
     77         if self._job_was_aborted():
     78             return models.HostQueueEntry.Status.ABORTED
     79 
     80         # we'll use a PidfileRunMonitor to read the autoserv exit status
     81         if self._autoserv_monitor.exit_code() == 0:
     82             return models.HostQueueEntry.Status.COMPLETED
     83         return models.HostQueueEntry.Status.FAILED
     84 
     85 
     86     def _set_all_statuses(self, status):
     87         for queue_entry in self.queue_entries:
     88             queue_entry.set_status(status)
     89 
     90 
     91     def abort(self):
     92         # override AgentTask.abort() to avoid killing the process and ending
     93         # the task.  post-job tasks continue when the job is aborted.
     94         pass
     95 
     96 
     97     def _pidfile_label(self):
     98         # '.autoserv_execute' -> 'autoserv'
     99         return self._pidfile_name()[1:-len('_execute')]
    100 
    101 
    102 class SelfThrottledPostJobTask(PostJobTask):
    103     """
    104     PostJobTask that maintains its own process limit.
    105 
    106     We throttle tasks like parsing because we don't want them to
    107     hold up tests. At the same time we don't wish to build up load
    108     that will take forever to parse.
    109     """
    110     _num_running_processes = 0
    111     # Last known limit of max processes, used to check whether
    112     # max processes config has been changed.
    113     _last_known_max_processes = 0
    114     # Whether an email should be sent to notifiy process limit being hit.
    115     _notification_on = True
    116     # Once process limit is hit, an email will be sent.
    117     # To prevent spams, do not send another email until
    118     # it drops to lower than the following level.
    119     REVIVE_NOTIFICATION_THRESHOLD = 0.80
    120 
    121 
    122     @classmethod
    123     def _increment_running_processes(cls):
    124         cls._num_running_processes += 1
    125         autotest_stats.Gauge('scheduler').send(
    126                 '%s.num_running_processes' % cls.__name__,
    127                 cls._num_running_processes)
    128 
    129 
    130     @classmethod
    131     def _decrement_running_processes(cls):
    132         cls._num_running_processes -= 1
    133         autotest_stats.Gauge('scheduler').send(
    134                 '%s.num_running_processes' % cls.__name__,
    135                 cls._num_running_processes)
    136 
    137 
    138     @classmethod
    139     def _max_processes(cls):
    140         raise NotImplementedError
    141 
    142 
    143     @classmethod
    144     def _can_run_new_process(cls):
    145         return cls._num_running_processes < cls._max_processes()
    146 
    147 
    148     def _process_started(self):
    149         return bool(self.monitor)
    150 
    151 
    152     def tick(self):
    153         # override tick to keep trying to start until the process count goes
    154         # down and we can, at which point we revert to default behavior
    155         if self._process_started():
    156             super(SelfThrottledPostJobTask, self).tick()
    157         else:
    158             self._try_starting_process()
    159 
    160 
    161     def run(self):
    162         # override run() to not actually run unless we can
    163         self._try_starting_process()
    164 
    165 
    166     @classmethod
    167     def _notify_process_limit_hit(cls):
    168         """Send an email to notify that process limit is hit."""
    169         if cls._notification_on:
    170             subject = '%s: hitting max process limit.' % cls.__name__
    171             message = ('Running processes/Max processes: %d/%d'
    172                        % (cls._num_running_processes, cls._max_processes()))
    173             email_manager.manager.enqueue_notify_email(subject, message)
    174             cls._notification_on = False
    175 
    176 
    177     @classmethod
    178     def _reset_notification_switch_if_necessary(cls):
    179         """Reset _notification_on if necessary.
    180 
    181         Set _notification_on to True on the following cases:
    182         1) If the limit of max processes configuration changes;
    183         2) If _notification_on is False and the number of running processes
    184            drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
    185 
    186         """
    187         if cls._last_known_max_processes != cls._max_processes():
    188             cls._notification_on = True
    189             cls._last_known_max_processes = cls._max_processes()
    190             return
    191         percentage = float(cls._num_running_processes) / cls._max_processes()
    192         if (not cls._notification_on and
    193             percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
    194             cls._notification_on = True
    195 
    196 
    197     def _try_starting_process(self):
    198         self._reset_notification_switch_if_necessary()
    199         if not self._can_run_new_process():
    200             self._notify_process_limit_hit()
    201             return
    202 
    203         # actually run the command
    204         super(SelfThrottledPostJobTask, self).run()
    205         if self._process_started():
    206             self._increment_running_processes()
    207 
    208 
    209     def finished(self, success):
    210         super(SelfThrottledPostJobTask, self).finished(success)
    211         if self._process_started():
    212             self._decrement_running_processes()
    213 
    214 
    215 class GatherLogsTask(PostJobTask):
    216     """
    217     Task responsible for
    218     * gathering uncollected logs (if Autoserv crashed hard or was killed)
    219     * copying logs to the results repository
    220     * spawning CleanupTasks for hosts, if necessary
    221     * spawning a FinalReparseTask for the job
    222     * setting the final status of the host, directly or through a cleanup
    223     """
    224     def __init__(self, queue_entries, recover_run_monitor=None):
    225         self._job = queue_entries[0].job
    226         super(GatherLogsTask, self).__init__(
    227             queue_entries, log_file_name='.collect_crashinfo.log')
    228         self._set_ids(queue_entries=queue_entries)
    229 
    230 
    231     # TODO: Refactor into autoserv_utils. crbug.com/243090
    232     def _generate_command(self, results_dir):
    233         host_list = ','.join(queue_entry.host.hostname
    234                              for queue_entry in self.queue_entries)
    235         return [autoserv_utils.autoserv_path , '-p',
    236                 '--pidfile-label=%s' % self._pidfile_label(),
    237                 '--use-existing-results', '--collect-crashinfo',
    238                 '-m', host_list, '-r', results_dir]
    239 
    240 
    241     @property
    242     def num_processes(self):
    243         return len(self.queue_entries)
    244 
    245 
    246     def _pidfile_name(self):
    247         return drone_manager.CRASHINFO_PID_FILE
    248 
    249 
    250     def prolog(self):
    251         self._check_queue_entry_statuses(
    252                 self.queue_entries,
    253                 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
    254                 allowed_host_statuses=(models.Host.Status.RUNNING,))
    255 
    256         super(GatherLogsTask, self).prolog()
    257 
    258 
    259     def epilog(self):
    260         super(GatherLogsTask, self).epilog()
    261         self._parse_results(self.queue_entries)
    262         self._reboot_hosts()
    263 
    264 
    265     def _reboot_hosts(self):
    266         if self._autoserv_monitor.has_process():
    267             final_success = (self._final_status() ==
    268                              models.HostQueueEntry.Status.COMPLETED)
    269             num_tests_failed = self._autoserv_monitor.num_tests_failed()
    270         else:
    271             final_success = False
    272             num_tests_failed = 0
    273         reboot_after = self._job.reboot_after
    274         do_reboot = (
    275                 # always reboot after aborted jobs
    276                 self._final_status() == models.HostQueueEntry.Status.ABORTED
    277                 or reboot_after == model_attributes.RebootAfter.ALWAYS
    278                 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
    279                     and final_success and num_tests_failed == 0)
    280                 or num_tests_failed > 0)
    281 
    282         for queue_entry in self.queue_entries:
    283             if do_reboot:
    284                 # don't pass the queue entry to the CleanupTask. if the cleanup
    285                 # fails, the job doesn't care -- it's over.
    286                 models.SpecialTask.objects.create(
    287                         host=models.Host.objects.get(id=queue_entry.host.id),
    288                         task=models.SpecialTask.Task.CLEANUP,
    289                         requested_by=self._job.owner_model())
    290             else:
    291                 queue_entry.host.set_status(models.Host.Status.READY)
    292 
    293 
    294     def run(self):
    295         autoserv_exit_code = self._autoserv_monitor.exit_code()
    296         # only run if Autoserv exited due to some signal. if we have no exit
    297         # code, assume something bad (and signal-like) happened.
    298         if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
    299             super(GatherLogsTask, self).run()
    300         else:
    301             self.finished(True)
    302 
    303 
    304 class FinalReparseTask(SelfThrottledPostJobTask):
    305     def __init__(self, queue_entries):
    306         super(FinalReparseTask, self).__init__(queue_entries,
    307                                                log_file_name='.parse.log')
    308         # don't use _set_ids, since we don't want to set the host_ids
    309         self.queue_entry_ids = [entry.id for entry in queue_entries]
    310 
    311 
    312     def _generate_command(self, results_dir):
    313         return [_parser_path, '--write-pidfile', '--record-duration',
    314                 '-l', '2', '-r', '-o', results_dir]
    315 
    316 
    317     @property
    318     def num_processes(self):
    319         return 0 # don't include parser processes in accounting
    320 
    321 
    322     def _pidfile_name(self):
    323         return drone_manager.PARSER_PID_FILE
    324 
    325 
    326     @classmethod
    327     def _max_processes(cls):
    328         return scheduler_config.config.max_parse_processes
    329 
    330 
    331     def prolog(self):
    332         self._check_queue_entry_statuses(
    333                 self.queue_entries,
    334                 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
    335 
    336         super(FinalReparseTask, self).prolog()
    337 
    338 
    339     def epilog(self):
    340         super(FinalReparseTask, self).epilog()
    341         self._archive_results(self.queue_entries)
    342 
    343 
    344 class ArchiveResultsTask(SelfThrottledPostJobTask):
    345     _ARCHIVING_FAILED_FILE = '.archiver_failed'
    346 
    347     def __init__(self, queue_entries):
    348         super(ArchiveResultsTask, self).__init__(queue_entries,
    349                                                  log_file_name='.archiving.log')
    350         # don't use _set_ids, since we don't want to set the host_ids
    351         self.queue_entry_ids = [entry.id for entry in queue_entries]
    352 
    353 
    354     def _pidfile_name(self):
    355         return drone_manager.ARCHIVER_PID_FILE
    356 
    357 
    358     # TODO: Refactor into autoserv_utils. crbug.com/243090
    359     def _generate_command(self, results_dir):
    360         return [autoserv_utils.autoserv_path , '-p',
    361                 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
    362                 '--use-existing-results', '--control-filename=control.archive',
    363                 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
    364                              'archive_results.control.srv')]
    365 
    366 
    367     @classmethod
    368     def _max_processes(cls):
    369         return scheduler_config.config.max_transfer_processes
    370 
    371 
    372     def prolog(self):
    373         self._check_queue_entry_statuses(
    374                 self.queue_entries,
    375                 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
    376 
    377         super(ArchiveResultsTask, self).prolog()
    378 
    379 
    380     def epilog(self):
    381         super(ArchiveResultsTask, self).epilog()
    382         if not self.success and self._paired_with_monitor().has_process():
    383             failed_file = os.path.join(self._working_directory(),
    384                                        self._ARCHIVING_FAILED_FILE)
    385             paired_process = self._paired_with_monitor().get_process()
    386             self._drone_manager.write_lines_to_file(
    387                     failed_file, ['Archiving failed with exit code %s'
    388                                   % self.monitor.exit_code()],
    389                     paired_with_process=paired_process)
    390         self._set_all_statuses(self._final_status())
    391