Home | History | Annotate | Download | only in scheduler
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 #pylint: disable-msg=C0111
      6 
      7 import os
      8 import logging
      9 import time
     10 
     11 from autotest_lib.client.common_lib import global_config
     12 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     13 from autotest_lib.frontend.afe import models
     14 from autotest_lib.scheduler import email_manager
     15 from autotest_lib.scheduler import scheduler_config, scheduler_models
     16 
     17 
     18 # Override default parser with our site parser.
     19 def parser_path(install_dir):
     20     """Return site implementation of parser.
     21 
     22     @param install_dir: installation directory.
     23     """
     24     return os.path.join(install_dir, 'tko', 'site_parse')
     25 
     26 
     27 class SiteAgentTask(object):
     28     """
     29     SiteAgentTask subclasses BaseAgentTask in monitor_db.
     30     """
     31 
     32 
     33     def _archive_results(self, queue_entries):
     34         """
     35         Set the status of queue_entries to ARCHIVING.
     36 
     37         This method sets the status of the queue_entries to ARCHIVING
     38         if the enable_archiving flag is true in global_config.ini.
     39         Otherwise, it bypasses the archiving step and sets the queue entries
     40         to the final status of current step.
     41         """
     42         enable_archiving = global_config.global_config.get_config_value(
     43             scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)
     44         # Set the status of the queue entries to archiving or self final status
     45         if enable_archiving:
     46             status = models.HostQueueEntry.Status.ARCHIVING
     47         else:
     48             status = self._final_status()
     49 
     50         for queue_entry in self.queue_entries:
     51             queue_entry.set_status(status)
     52 
     53 
     54     def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
     55                                     allowed_host_statuses=None):
     56         """
     57         Forked from monitor_db.py
     58         """
     59         class_name = self.__class__.__name__
     60         for entry in queue_entries:
     61             if entry.status not in allowed_hqe_statuses:
     62                 # In the orignal code, here we raise an exception. In an
     63                 # effort to prevent downtime we will instead abort the job and
     64                 # send out an email notifying us this has occured.
     65                 error_message = ('%s attempting to start entry with invalid '
     66                                  'status %s: %s. Aborting Job: %s.'
     67                                  % (class_name, entry.status, entry,
     68                                     entry.job))
     69                 logging.error(error_message)
     70                 email_manager.manager.enqueue_notify_email(
     71                     'Job Aborted - Invalid Host Queue Entry Status',
     72                     error_message)
     73                 entry.job.request_abort()
     74             invalid_host_status = (
     75                     allowed_host_statuses is not None
     76                     and entry.host.status not in allowed_host_statuses)
     77             if invalid_host_status:
     78                 # In the orignal code, here we raise an exception. In an
     79                 # effort to prevent downtime we will instead abort the job and
     80                 # send out an email notifying us this has occured.
     81                 error_message = ('%s attempting to start on queue entry with '
     82                                  'invalid host status %s: %s. Aborting Job: %s'
     83                                  % (class_name, entry.host.status, entry,
     84                                     entry.job))
     85                 logging.error(error_message)
     86                 email_manager.manager.enqueue_notify_email(
     87                     'Job Aborted - Invalid Host Status', error_message)
     88                 entry.job.request_abort()
     89 
     90 
     91 class SiteDispatcher(object):
     92     """
     93     SiteDispatcher subclasses BaseDispatcher in monitor_db.
     94     """
     95     DEFAULT_REQUESTED_BY_USER_ID = 1
     96 
     97 
     98     _timer = autotest_stats.Timer('scheduler')
     99     _gauge = autotest_stats.Gauge('scheduler_rel')
    100     _tick_start = None
    101 
    102 
    103     @_timer.decorate
    104     def tick(self):
    105         self._tick_start = time.time()
    106         super(SiteDispatcher, self).tick()
    107         self._gauge.send('tick', time.time() - self._tick_start)
    108 
    109     @_timer.decorate
    110     def _garbage_collection(self):
    111         super(SiteDispatcher, self)._garbage_collection()
    112         if self._tick_start:
    113             self._gauge.send('_garbage_collection',
    114                              time.time() - self._tick_start)
    115 
    116     @_timer.decorate
    117     def _run_cleanup(self):
    118         super(SiteDispatcher, self)._run_cleanup()
    119         if self._tick_start:
    120             self._gauge.send('_run_cleanup', time.time() - self._tick_start)
    121 
    122     @_timer.decorate
    123     def _find_aborting(self):
    124         super(SiteDispatcher, self)._find_aborting()
    125         if self._tick_start:
    126             self._gauge.send('_find_aborting', time.time() - self._tick_start)
    127 
    128     @_timer.decorate
    129     def _process_recurring_runs(self):
    130         super(SiteDispatcher, self)._process_recurring_runs()
    131         if self._tick_start:
    132             self._gauge.send('_process_recurring_runs',
    133                              time.time() - self._tick_start)
    134 
    135     @_timer.decorate
    136     def _schedule_delay_tasks(self):
    137         super(SiteDispatcher, self)._schedule_delay_tasks()
    138         if self._tick_start:
    139             self._gauge.send('_schedule_delay_tasks',
    140                              time.time() - self._tick_start)
    141 
    142     @_timer.decorate
    143     def _schedule_running_host_queue_entries(self):
    144         super(SiteDispatcher, self)._schedule_running_host_queue_entries()
    145         if self._tick_start:
    146             self._gauge.send('_schedule_running_host_queue_entries',
    147                              time.time() - self._tick_start)
    148 
    149     @_timer.decorate
    150     def _schedule_special_tasks(self):
    151         super(SiteDispatcher, self)._schedule_special_tasks()
    152         if self._tick_start:
    153             self._gauge.send('_schedule_special_tasks',
    154                              time.time() - self._tick_start)
    155 
    156     @_timer.decorate
    157     def _schedule_new_jobs(self):
    158         super(SiteDispatcher, self)._schedule_new_jobs()
    159         if self._tick_start:
    160             self._gauge.send('_schedule_new_jobs',
    161                              time.time() - self._tick_start)
    162 
    163 
    164     @_timer.decorate
    165     def _handle_agents(self):
    166         super(SiteDispatcher, self)._handle_agents()
    167         if self._tick_start:
    168             self._gauge.send('_handle_agents', time.time() - self._tick_start)
    169 
    170 
    171     def _reverify_hosts_where(self, where,
    172                               print_message='Reverifying host %s'):
    173         """
    174         This is an altered version of _reverify_hosts_where the class to
    175         models.SpecialTask.objects.create passes in an argument for
    176         requested_by, in order to allow the Reset task to be created
    177         properly.
    178         """
    179         full_where='locked = 0 AND invalid = 0 AND ' + where
    180         for host in scheduler_models.Host.fetch(where=full_where):
    181             if self.host_has_agent(host):
    182                 # host has already been recovered in some way
    183                 continue
    184             if self._host_has_scheduled_special_task(host):
    185                 # host will have a special task scheduled on the next cycle
    186                 continue
    187             if print_message:
    188                 logging.error(print_message, host.hostname)
    189             try:
    190                 user = models.User.objects.get(login='autotest_system')
    191             except models.User.DoesNotExist:
    192                 user = models.User.objects.get(
    193                         id=SiteDispatcher.DEFAULT_REQUESTED_BY_USER_ID)
    194             models.SpecialTask.objects.create(
    195                     task=models.SpecialTask.Task.RESET,
    196                     host=models.Host.objects.get(id=host.id),
    197                     requested_by=user)
    198 
    199 
    200     def _check_for_unrecovered_verifying_entries(self):
    201         # Verify is replaced by Reset.
    202         queue_entries = scheduler_models.HostQueueEntry.fetch(
    203                 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)
    204         for queue_entry in queue_entries:
    205             special_tasks = models.SpecialTask.objects.filter(
    206                     task__in=(models.SpecialTask.Task.CLEANUP,
    207                               models.SpecialTask.Task.VERIFY,
    208                               models.SpecialTask.Task.RESET),
    209                     queue_entry__id=queue_entry.id,
    210                     is_complete=False)
    211             if special_tasks.count() == 0:
    212                 logging.error('Unrecovered Resetting host queue entry: %s. '
    213                               'Setting status to Queued.', str(queue_entry))
    214                 # Essentially this host queue entry was set to be Verifying
    215                 # however no special task exists for entry. This occurs if the
    216                 # scheduler dies between changing the status and creating the
    217                 # special task. By setting it to queued, the job can restart
    218                 # from the beginning and proceed correctly. This is much more
    219                 # preferable than having monitor_db not launching.
    220                 queue_entry.set_status('Queued')
    221