Home | History | Annotate | Download | only in scheduler
      1 """Autotest AFE Cleanup used by the scheduler"""
      2 
      3 import contextlib
      4 import logging
      5 import random
      6 import time
      7 
      8 from autotest_lib.client.common_lib import utils
      9 from autotest_lib.frontend.afe import models
     10 from autotest_lib.scheduler import scheduler_config
     11 from autotest_lib.client.common_lib import global_config
     12 from autotest_lib.client.common_lib import host_protections
     13 
     14 try:
     15     from chromite.lib import metrics
     16 except ImportError:
     17     metrics = utils.metrics_mock
     18 
     19 
     20 _METRICS_PREFIX = 'chromeos/autotest/scheduler/cleanup'
     21 
     22 
     23 class PeriodicCleanup(object):
     24     """Base class to schedule periodical cleanup work.
     25     """
     26 
     27     def __init__(self, db, clean_interval_minutes, run_at_initialize=False):
     28         self._db = db
     29         self.clean_interval_minutes = clean_interval_minutes
     30         self._last_clean_time = time.time()
     31         self._run_at_initialize = run_at_initialize
     32 
     33 
     34     def initialize(self):
     35         """Method called by scheduler at the startup.
     36         """
     37         if self._run_at_initialize:
     38             self._cleanup()
     39 
     40 
     41     def run_cleanup_maybe(self):
     42         """Test if cleanup method should be called.
     43         """
     44         should_cleanup = (self._last_clean_time +
     45                           self.clean_interval_minutes * 60
     46                           < time.time())
     47         if should_cleanup:
     48             self._cleanup()
     49             self._last_clean_time = time.time()
     50 
     51 
     52     def _cleanup(self):
     53         """Abrstract cleanup method."""
     54         raise NotImplementedError
     55 
     56 
     57 class UserCleanup(PeriodicCleanup):
     58     """User cleanup that is controlled by the global config variable
     59        clean_interval_minutes in the SCHEDULER section.
     60     """
     61 
     62     def __init__(self, db, clean_interval_minutes):
     63         super(UserCleanup, self).__init__(db, clean_interval_minutes)
     64         self._last_reverify_time = time.time()
     65 
     66 
     67     @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/user/durations')
     68     def _cleanup(self):
     69         logging.info('Running periodic cleanup')
     70         self._abort_timed_out_jobs()
     71         self._abort_jobs_past_max_runtime()
     72         self._clear_inactive_blocks()
     73         self._check_for_db_inconsistencies()
     74         self._reverify_dead_hosts()
     75         self._django_session_cleanup()
     76 
     77 
     78     def _abort_timed_out_jobs(self):
     79         logging.info(
     80                 'Aborting all jobs that have timed out and are not complete')
     81         query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
     82             where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
     83         jobs = query.distinct()
     84         if not jobs:
     85             return
     86 
     87         with _cleanup_warning_banner('timed out jobs', len(jobs)):
     88             for job in jobs:
     89                 logging.warning('Aborting job %d due to job timeout', job.id)
     90                 job.abort()
     91         _report_detected_errors('jobs_timed_out', len(jobs))
     92 
     93 
     94     def _abort_jobs_past_max_runtime(self):
     95         """
     96         Abort executions that have started and are past the job's max runtime.
     97         """
     98         logging.info('Aborting all jobs that have passed maximum runtime')
     99         rows = self._db.execute("""
    100             SELECT hqe.id FROM afe_host_queue_entries AS hqe
    101             WHERE NOT hqe.complete AND NOT hqe.aborted AND EXISTS
    102             (select * from afe_jobs where hqe.job_id=afe_jobs.id and
    103              hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < NOW())
    104             """)
    105         query = models.HostQueueEntry.objects.filter(
    106             id__in=[row[0] for row in rows])
    107         hqes = query.distinct()
    108         if not hqes:
    109             return
    110 
    111         with _cleanup_warning_banner('hqes past max runtime', len(hqes)):
    112             for queue_entry in hqes:
    113                 logging.warning('Aborting entry %s due to max runtime',
    114                                 queue_entry)
    115                 queue_entry.abort()
    116         _report_detected_errors('hqes_past_max_runtime', len(hqes))
    117 
    118 
    119     def _check_for_db_inconsistencies(self):
    120         logging.info('Cleaning db inconsistencies')
    121         self._check_all_invalid_related_objects()
    122 
    123 
    124     def _check_invalid_related_objects_one_way(self, invalid_model,
    125                                                relation_field, valid_model):
    126         if 'invalid' not in invalid_model.get_field_dict():
    127             return
    128 
    129         invalid_objects = list(invalid_model.objects.filter(invalid=True))
    130         invalid_model.objects.populate_relationships(
    131                 invalid_objects, valid_model, 'related_objects')
    132         if not invalid_objects:
    133             return
    134 
    135         num_objects_with_invalid_relations = 0
    136         errors = []
    137         for invalid_object in invalid_objects:
    138             if invalid_object.related_objects:
    139                 related_objects = invalid_object.related_objects
    140                 related_list = ', '.join(str(x) for x in related_objects)
    141                 num_objects_with_invalid_relations += 1
    142                 errors.append('Invalid %s is related to: %s' %
    143                               (invalid_object, related_list))
    144                 related_manager = getattr(invalid_object, relation_field)
    145                 related_manager.clear()
    146 
    147         # Only log warnings after we're sure we've seen at least one invalid
    148         # model with some valid relations to avoid empty banners from getting
    149         # printed.
    150         if errors:
    151             invalid_model_name = invalid_model.__name__
    152             valid_model_name = valid_model.__name__
    153             banner = 'invalid %s related to valid %s' % (invalid_model_name,
    154                                                          valid_model_name)
    155             with _cleanup_warning_banner(banner, len(errors)):
    156                 for error in errors:
    157                     logging.warning(error)
    158             _report_detected_errors(
    159                     'invalid_related_objects',
    160                     num_objects_with_invalid_relations,
    161                     fields={'invalid_model': invalid_model_name,
    162                             'valid_model': valid_model_name})
    163             _report_detected_errors(
    164                     'invalid_related_objects_relations',
    165                     len(errors),
    166                     fields={'invalid_model': invalid_model_name,
    167                             'valid_model': valid_model_name})
    168 
    169 
    170     def _check_invalid_related_objects(self, first_model, first_field,
    171                                        second_model, second_field):
    172         self._check_invalid_related_objects_one_way(
    173                 first_model,
    174                 first_field,
    175                 second_model,
    176         )
    177         self._check_invalid_related_objects_one_way(
    178                 second_model,
    179                 second_field,
    180                 first_model,
    181         )
    182 
    183 
    184     def _check_all_invalid_related_objects(self):
    185         model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
    186                        (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
    187                        (models.AclGroup, 'users', models.User, 'aclgroup_set'),
    188                        (models.Test, 'dependency_labels', models.Label,
    189                         'test_set'))
    190         for first_model, first_field, second_model, second_field in model_pairs:
    191             self._check_invalid_related_objects(
    192                     first_model,
    193                     first_field,
    194                     second_model,
    195                     second_field,
    196             )
    197 
    198 
    199     def _clear_inactive_blocks(self):
    200         logging.info('Clear out blocks for all completed jobs.')
    201         # this would be simpler using NOT IN (subquery), but MySQL
    202         # treats all IN subqueries as dependent, so this optimizes much
    203         # better
    204         self._db.execute("""
    205                 DELETE ihq FROM afe_ineligible_host_queues ihq
    206                 WHERE NOT EXISTS
    207                     (SELECT job_id FROM afe_host_queue_entries hqe
    208                      WHERE NOT hqe.complete AND hqe.job_id = ihq.job_id)""")
    209 
    210 
    211     def _should_reverify_hosts_now(self):
    212         reverify_period_sec = (scheduler_config.config.reverify_period_minutes
    213                                * 60)
    214         if reverify_period_sec == 0:
    215             return False
    216         return (self._last_reverify_time + reverify_period_sec) <= time.time()
    217 
    218 
    219     def _choose_subset_of_hosts_to_reverify(self, hosts):
    220         """Given hosts needing verification, return a subset to reverify."""
    221         max_at_once = scheduler_config.config.reverify_max_hosts_at_once
    222         if (max_at_once > 0 and len(hosts) > max_at_once):
    223             return random.sample(hosts, max_at_once)
    224         return sorted(hosts)
    225 
    226 
    227     def _reverify_dead_hosts(self):
    228         if not self._should_reverify_hosts_now():
    229             return
    230 
    231         self._last_reverify_time = time.time()
    232         logging.info('Checking for dead hosts to reverify')
    233         hosts = models.Host.objects.filter(
    234                 status=models.Host.Status.REPAIR_FAILED,
    235                 locked=False,
    236                 invalid=False)
    237         hosts = hosts.exclude(
    238                 protection=host_protections.Protection.DO_NOT_VERIFY)
    239         if not hosts:
    240             return
    241 
    242         hosts = list(hosts)
    243         total_hosts = len(hosts)
    244         hosts = self._choose_subset_of_hosts_to_reverify(hosts)
    245         logging.info('Reverifying dead hosts (%d of %d)', len(hosts),
    246                      total_hosts)
    247         with _cleanup_warning_banner('reverify dead hosts', len(hosts)):
    248             for host in hosts:
    249                 logging.warning(host.hostname)
    250         _report_detected_errors('dead_hosts_triggered_reverify', len(hosts))
    251         _report_detected_errors('dead_hosts_require_reverify', total_hosts)
    252         for host in hosts:
    253             models.SpecialTask.schedule_special_task(
    254                     host=host, task=models.SpecialTask.Task.VERIFY)
    255 
    256 
    257     def _django_session_cleanup(self):
    258         """Clean up django_session since django doesn't for us.
    259            http://www.djangoproject.com/documentation/0.96/sessions/
    260         """
    261         logging.info('Deleting old sessions from django_session')
    262         sql = 'TRUNCATE TABLE django_session'
    263         self._db.execute(sql)
    264 
    265 
    266 class TwentyFourHourUpkeep(PeriodicCleanup):
    267     """Cleanup that runs at the startup of monitor_db and every subsequent
    268        twenty four hours.
    269     """
    270 
    271 
    272     def __init__(self, db, drone_manager, run_at_initialize=True):
    273         """Initialize TwentyFourHourUpkeep.
    274 
    275         @param db: Database connection object.
    276         @param drone_manager: DroneManager to access drones.
    277         @param run_at_initialize: True to run cleanup when scheduler starts.
    278                                   Default is set to True.
    279 
    280         """
    281         self.drone_manager = drone_manager
    282         clean_interval_minutes = 24 * 60 # 24 hours
    283         super(TwentyFourHourUpkeep, self).__init__(
    284             db, clean_interval_minutes, run_at_initialize=run_at_initialize)
    285 
    286 
    287     @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/daily/durations')
    288     def _cleanup(self):
    289         logging.info('Running 24 hour clean up')
    290         self._check_for_uncleanable_db_inconsistencies()
    291         self._cleanup_orphaned_containers()
    292 
    293 
    294     def _check_for_uncleanable_db_inconsistencies(self):
    295         logging.info('Checking for uncleanable DB inconsistencies')
    296         self._check_for_active_and_complete_queue_entries()
    297         self._check_for_multiple_platform_hosts()
    298         self._check_for_no_platform_hosts()
    299 
    300 
    301     def _check_for_active_and_complete_queue_entries(self):
    302         query = models.HostQueueEntry.objects.filter(active=True, complete=True)
    303         num_bad_hqes = query.count()
    304         if num_bad_hqes == 0:
    305             return
    306 
    307         num_aborted = 0
    308         logging.warning('%d queue entries found with active=complete=1',
    309                         num_bad_hqes)
    310         with _cleanup_warning_banner('active and complete hqes', num_bad_hqes):
    311             for entry in query:
    312                 if entry.status == 'Aborted':
    313                     entry.active = False
    314                     entry.save()
    315                     recovery_path = 'was also aborted, set active to False'
    316                     num_aborted += 1
    317                 else:
    318                     recovery_path = 'can not recover'
    319                 logging.warning('%s (recovery: %s)', entry.get_object_dict(),
    320                                 recovery_path)
    321         _report_detected_errors('hqes_active_and_complete', num_bad_hqes)
    322         _report_detected_errors('hqes_aborted_set_to_inactive', num_aborted)
    323 
    324 
    325     def _check_for_multiple_platform_hosts(self):
    326         rows = self._db.execute("""
    327             SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
    328                    GROUP_CONCAT(afe_labels.name)
    329             FROM afe_hosts
    330             INNER JOIN afe_hosts_labels ON
    331                     afe_hosts.id = afe_hosts_labels.host_id
    332             INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
    333             WHERE afe_labels.platform
    334             GROUP BY afe_hosts.id
    335             HAVING platform_count > 1
    336             ORDER BY hostname""")
    337 
    338         if rows:
    339             logging.warning('Cleanup found hosts with multiple platforms')
    340             with _cleanup_warning_banner('hosts with multiple platforms',
    341                                          len(rows)):
    342                 for row in rows:
    343                     logging.warning(' '.join(str(item) for item in row))
    344             _report_detected_errors('hosts_with_multiple_platforms', len(rows))
    345 
    346 
    347     def _check_for_no_platform_hosts(self):
    348         rows = self._db.execute("""
    349             SELECT hostname
    350             FROM afe_hosts
    351             LEFT JOIN afe_hosts_labels
    352               ON afe_hosts.id = afe_hosts_labels.host_id
    353               AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
    354                                                 WHERE platform)
    355             WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
    356         if rows:
    357             with _cleanup_warning_banner('hosts with no platform', len(rows)):
    358                 for row in rows:
    359                     logging.warning(row[0])
    360             _report_detected_errors('hosts_with_no_platform', len(rows))
    361 
    362 
    363     def _cleanup_orphaned_containers(self):
    364         """Cleanup orphaned containers in each drone.
    365 
    366         The function queues a lxc_cleanup call in each drone without waiting for
    367         the script to finish, as the cleanup procedure could take minutes and the
    368         script output is logged.
    369 
    370         """
    371         ssp_enabled = global_config.global_config.get_config_value(
    372                 'AUTOSERV', 'enable_ssp_container')
    373         if not ssp_enabled:
    374             logging.info(
    375                     'Server-side packaging is not enabled, no need to clean '
    376                     'up orphaned containers.')
    377             return
    378         self.drone_manager.cleanup_orphaned_containers()
    379 
    380 
    381 def _report_detected_errors(metric_name, count, fields={}):
    382     """Reports a counter metric for recovered errors
    383 
    384     @param metric_name: Name of the metric to report about.
    385     @param count: How many "errors" were fixed this cycle.
    386     @param fields: Optional fields to include with the metric.
    387     """
    388     m = '%s/errors_recovered/%s' % (_METRICS_PREFIX, metric_name)
    389     metrics.Counter(m).increment_by(count, fields=fields)
    390 
    391 
    392 def _report_detected_errors(metric_name, gauge, fields={}):
    393     """Reports a gauge metric for errors detected
    394 
    395     @param metric_name: Name of the metric to report about.
    396     @param gauge: Outstanding number of unrecoverable errors of this type.
    397     @param fields: Optional fields to include with the metric.
    398     """
    399     m = '%s/errors_detected/%s' % (_METRICS_PREFIX, metric_name)
    400     metrics.Gauge(m).set(gauge, fields=fields)
    401 
    402 
    403 @contextlib.contextmanager
    404 def _cleanup_warning_banner(banner, error_count=None):
    405     """Put a clear context in the logs around list of errors
    406 
    407     @param: banner: The identifying header to print for context.
    408     @param: error_count: If not None, the number of errors detected.
    409     """
    410     if error_count is not None:
    411         banner += ' (total: %d)' % error_count
    412     logging.warning('#### START: %s ####', banner)
    413     try:
    414         yield
    415     finally:
    416         logging.warning('#### END: %s ####', banner)
    417