Home | History | Annotate | Download | only in scheduler
      1 #pylint: disable-msg=C0111
      2 
      3 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Scheduler library classes.
      8 """
      9 
     10 import collections
     11 import logging
     12 
     13 import common
     14 
     15 from autotest_lib.frontend import setup_django_environment
     16 
     17 from autotest_lib.client.common_lib import utils
     18 from autotest_lib.frontend.afe import models
     19 from autotest_lib.server.cros.dynamic_suite import constants
     20 from autotest_lib.scheduler import scheduler_models
     21 from autotest_lib.scheduler import scheduler_lib
     22 
     23 try:
     24     from chromite.lib import metrics
     25 except ImportError:
     26     metrics = utils.metrics_mock
     27 
     28 
     29 _job_timer_name = 'chromeos/autotest/scheduler/job_query_durations/%s'
     30 class AFEJobQueryManager(object):
     31     """Query manager for AFE Jobs."""
     32 
     33     # A subquery to only get inactive hostless jobs.
     34     hostless_query = 'host_id IS NULL AND meta_host IS NULL'
     35 
     36 
     37     @metrics.SecondsTimerDecorator(
     38             _job_timer_name % 'get_pending_queue_entries')
     39     def get_pending_queue_entries(self, only_hostless=False):
     40         """
     41         Fetch a list of new host queue entries.
     42 
     43         The ordering of this list is important, as every new agent
     44         we schedule can potentially contribute to the process count
     45         on the drone, which has a static limit. The sort order
     46         prioritizes jobs as follows:
     47         1. High priority jobs: Based on the afe_job's priority
     48         2. With hosts and metahosts: This will only happen if we don't
     49             activate the hqe after assigning a host to it in
     50             schedule_new_jobs.
     51         3. With hosts but without metahosts: When tests are scheduled
     52             through the frontend the owner of the job would have chosen
     53             a host for it.
     54         4. Without hosts but with metahosts: This is the common case of
     55             a new test that needs a DUT. We assign a host and set it to
     56             active so it shouldn't show up in case 2 on the next tick.
     57         5. Without hosts and without metahosts: Hostless suite jobs, that
     58             will result in new jobs that fall under category 4.
     59 
     60         A note about the ordering of cases 3 and 4:
     61         Prioritizing one case above the other leads to earlier acquisition
     62         of the following resources: 1. process slots on the drone 2. machines.
     63         - When a user schedules a job through the afe they choose a specific
     64           host for it. Jobs with metahost can utilize any host that satisfies
     65           the metahost criterion. This means that if we had scheduled 4 before
     66           3 there is a good chance that a job which could've used another host,
     67           will now use the host assigned to a metahost-less job. Given the
     68           availability of machines in pool:suites, this almost guarantees
     69           starvation for jobs scheduled through the frontend.
     70         - Scheduling 4 before 3 also has its pros however, since a suite
     71           has the concept of a time out, whereas users can wait. If we hit the
     72           process count on the drone a suite can timeout waiting on the test,
     73           but a user job generally has a much longer timeout, and relatively
     74           harmless consequences.
     75         The current ordering was chosed because it is more likely that we will
     76         run out of machines in pool:suites than processes on the drone.
     77 
     78         @returns A list of HQEs ordered according to sort_order.
     79         """
     80         sort_order = ('afe_jobs.priority DESC, '
     81                       'ISNULL(host_id), '
     82                       'ISNULL(meta_host), '
     83                       'parent_job_id, '
     84                       'job_id')
     85         # Don't execute jobs that should be executed by a shard in the global
     86         # scheduler.
     87         # This won't prevent the shard scheduler to run this, as the shard db
     88         # doesn't have an an entry in afe_shards_labels.
     89         query=('NOT complete AND NOT active AND status="Queued"'
     90                'AND NOT aborted AND afe_shards_labels.id IS NULL')
     91 
     92         # TODO(jakobjuelich, beeps): Optimize this query. Details:
     93         # Compressed output of EXPLAIN <query>:
     94         # +------------------------+--------+-------------------------+-------+
     95         # | table                  | type   | key                     | rows  |
     96         # +------------------------+--------+-------------------------+-------+
     97         # | afe_host_queue_entries | ref    | host_queue_entry_status | 30536 |
     98         # | afe_shards_labels      | ref    | shard_label_id_fk       |     1 |
     99         # | afe_jobs               | eq_ref | PRIMARY                 |     1 |
    100         # +------------------------+--------+-------------------------+-------+
    101         # This shows the first part of the query fetches a lot of objects, that
    102         # are then filtered. The joins are comparably fast: There's usually just
    103         # one or none shard mapping that can be answered fully using an index
    104         # (shard_label_id_fk), similar thing applies to the job.
    105         #
    106         # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued),
    107         # it might be more efficient to filter on the meta_host first, instead
    108         # of the status.
    109         if only_hostless:
    110             query = '%s AND (%s)' % (query, self.hostless_query)
    111         return list(scheduler_models.HostQueueEntry.fetch(
    112             joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) '
    113                    'LEFT JOIN afe_shards_labels ON ('
    114                    'meta_host=afe_shards_labels.label_id)'),
    115             where=query, order_by=sort_order))
    116 
    117 
    118     @metrics.SecondsTimerDecorator(
    119             _job_timer_name % 'get_prioritized_special_tasks')
    120     def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
    121         """
    122         Returns all queued SpecialTasks prioritized for repair first, then
    123         cleanup, then verify.
    124 
    125         @param only_tasks_with_leased_hosts: If true, this method only returns
    126             tasks with leased hosts.
    127 
    128         @return: list of afe.models.SpecialTasks sorted according to priority.
    129         """
    130         queued_tasks = models.SpecialTask.objects.filter(is_active=False,
    131                                                          is_complete=False,
    132                                                          host__locked=False)
    133         # exclude hosts with active queue entries unless the SpecialTask is for
    134         # that queue entry
    135         queued_tasks = models.SpecialTask.objects.add_join(
    136                 queued_tasks, 'afe_host_queue_entries', 'host_id',
    137                 join_condition='afe_host_queue_entries.active',
    138                 join_from_key='host_id', force_left_join=True)
    139         queued_tasks = queued_tasks.extra(
    140                 where=['(afe_host_queue_entries.id IS NULL OR '
    141                        'afe_host_queue_entries.id = '
    142                                'afe_special_tasks.queue_entry_id)'])
    143         if only_tasks_with_leased_hosts:
    144             queued_tasks = queued_tasks.filter(host__leased=True)
    145 
    146         # reorder tasks by priority
    147         task_priority_order = [models.SpecialTask.Task.REPAIR,
    148                                models.SpecialTask.Task.CLEANUP,
    149                                models.SpecialTask.Task.VERIFY,
    150                                models.SpecialTask.Task.RESET,
    151                                models.SpecialTask.Task.PROVISION]
    152         def task_priority_key(task):
    153             return task_priority_order.index(task.task)
    154         return sorted(queued_tasks, key=task_priority_key)
    155 
    156 
    157     @classmethod
    158     def get_overlapping_jobs(cls):
    159         """A helper method to get all active jobs using the same host.
    160 
    161         @return: A list of dictionaries with the hqe id, job_id and host_id
    162             of the currently overlapping jobs.
    163         """
    164         # Filter all active hqes and stand alone special tasks to make sure
    165         # a host isn't being used by two jobs at the same time. An incomplete
    166         # stand alone special task can share a host with an active hqe, an
    167         # example of this is the cleanup scheduled in gathering.
    168         hqe_hosts = list(models.HostQueueEntry.objects.filter(
    169                 active=1, complete=0, host_id__isnull=False).values_list(
    170                 'host_id', flat=True))
    171         special_task_hosts = list(models.SpecialTask.objects.filter(
    172                 is_active=1, is_complete=0, host_id__isnull=False,
    173                 queue_entry_id__isnull=True).values_list('host_id', flat=True))
    174         host_counts = collections.Counter(
    175                 hqe_hosts + special_task_hosts).most_common()
    176         multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
    177         return list(models.HostQueueEntry.objects.filter(
    178                 host_id__in=multiple_hosts, active=True).values(
    179                         'id', 'job_id', 'host_id'))
    180 
    181 
    182     @metrics.SecondsTimerDecorator(
    183             _job_timer_name % 'get_suite_host_assignment')
    184     def get_suite_host_assignment(self):
    185         """A helper method to get how many hosts each suite is holding.
    186 
    187         @return: Two dictionaries (suite_host_num, hosts_to_suites)
    188                  suite_host_num maps suite job id to number of hosts
    189                  holding by its child jobs.
    190                  hosts_to_suites contains current hosts held by
    191                  any suites, and maps the host id to its parent_job_id.
    192         """
    193         query = models.HostQueueEntry.objects.filter(
    194                 host_id__isnull=False, complete=0, active=1,
    195                 job__parent_job_id__isnull=False)
    196         suite_host_num = {}
    197         hosts_to_suites = {}
    198         for hqe in query:
    199             host_id = hqe.host_id
    200             parent_job_id = hqe.job.parent_job_id
    201             count = suite_host_num.get(parent_job_id, 0)
    202             suite_host_num[parent_job_id] = count + 1
    203             hosts_to_suites[host_id] = parent_job_id
    204         return suite_host_num, hosts_to_suites
    205 
    206 
    207     @metrics.SecondsTimerDecorator( _job_timer_name % 'get_min_duts_of_suites')
    208     def get_min_duts_of_suites(self, suite_job_ids):
    209         """Load suite_min_duts job keyval for a set of suites.
    210 
    211         @param suite_job_ids: A set of suite job ids.
    212 
    213         @return: A dictionary where the key is a suite job id,
    214                  the value is the value of 'suite_min_duts'.
    215         """
    216         query = models.JobKeyval.objects.filter(
    217                 job_id__in=suite_job_ids,
    218                 key=constants.SUITE_MIN_DUTS_KEY, value__isnull=False)
    219         return dict((keyval.job_id, int(keyval.value)) for keyval in query)
    220 
    221 
    222 _host_timer_name = 'chromeos/autotest/scheduler/host_query_durations/%s'
    223 class AFEHostQueryManager(object):
    224     """Query manager for AFE Hosts."""
    225 
    226     def __init__(self):
    227         """Create an AFEHostQueryManager.
    228 
    229         @param db: A connection to the database with the afe_hosts table.
    230         """
    231         self._db = scheduler_lib.ConnectionManager().get_connection()
    232 
    233 
    234     def _process_many2many_dict(self, rows, flip=False):
    235         result = {}
    236         for row in rows:
    237             left_id, right_id = int(row[0]), int(row[1])
    238             if flip:
    239                 left_id, right_id = right_id, left_id
    240             result.setdefault(left_id, set()).add(right_id)
    241         return result
    242 
    243 
    244     def _get_sql_id_list(self, id_list):
    245         return ','.join(str(item_id) for item_id in id_list)
    246 
    247 
    248     def _get_many2many_dict(self, query, id_list, flip=False):
    249         if not id_list:
    250             return {}
    251         query %= self._get_sql_id_list(id_list)
    252         rows = self._db.execute(query)
    253         return self._process_many2many_dict(rows, flip)
    254 
    255 
    256     def _get_ready_hosts(self):
    257         # We don't lose anything by re-doing these checks
    258         # even though we release hosts on the same conditions.
    259         # In the future we might have multiple clients that
    260         # release_hosts and/or lock them independent of the
    261         # scheduler tick.
    262         hosts = scheduler_models.Host.fetch(
    263             where="NOT afe_hosts.leased "
    264                   "AND NOT afe_hosts.locked "
    265                   "AND (afe_hosts.status IS NULL "
    266                       "OR afe_hosts.status = 'Ready')")
    267         return dict((host.id, host) for host in hosts)
    268 
    269 
    270     @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_acl_groups')
    271     def _get_job_acl_groups(self, job_ids):
    272         query = """
    273         SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
    274         FROM afe_jobs
    275         INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
    276         INNER JOIN afe_acl_groups_users ON
    277                 afe_acl_groups_users.user_id = afe_users.id
    278         WHERE afe_jobs.id IN (%s)
    279         """
    280         return self._get_many2many_dict(query, job_ids)
    281 
    282 
    283     def _get_job_ineligible_hosts(self, job_ids):
    284         query = """
    285         SELECT job_id, host_id
    286         FROM afe_ineligible_host_queues
    287         WHERE job_id IN (%s)
    288         """
    289         return self._get_many2many_dict(query, job_ids)
    290 
    291 
    292     @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_dependencies')
    293     def _get_job_dependencies(self, job_ids):
    294         query = """
    295         SELECT job_id, label_id
    296         FROM afe_jobs_dependency_labels
    297         WHERE job_id IN (%s)
    298         """
    299         return self._get_many2many_dict(query, job_ids)
    300 
    301     def _get_host_acls(self, host_ids):
    302         query = """
    303         SELECT host_id, aclgroup_id
    304         FROM afe_acl_groups_hosts
    305         WHERE host_id IN (%s)
    306         """
    307         return self._get_many2many_dict(query, host_ids)
    308 
    309 
    310     def _get_label_hosts(self, host_ids):
    311         if not host_ids:
    312             return {}, {}
    313         query = """
    314         SELECT label_id, host_id
    315         FROM afe_hosts_labels
    316         WHERE host_id IN (%s)
    317         """ % self._get_sql_id_list(host_ids)
    318         rows = self._db.execute(query)
    319         labels_to_hosts = self._process_many2many_dict(rows)
    320         hosts_to_labels = self._process_many2many_dict(rows, flip=True)
    321         return labels_to_hosts, hosts_to_labels
    322 
    323 
    324     @classmethod
    325     def find_unused_healty_hosts(cls):
    326         """Get hosts that are currently unused and in the READY state.
    327 
    328         @return: A list of host objects, one for each unused healthy host.
    329         """
    330         # Avoid any host with a currently active queue entry against it.
    331         hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
    332                     'ON (afe_hosts.id = active_hqe.host_id AND '
    333                     'active_hqe.active)')
    334 
    335         # Avoid any host with a new special task against it. There are 2 cases
    336         # when an inactive but incomplete special task will not use the host
    337         # this tick: 1. When the host is locked 2. When an active hqe already
    338         # has special tasks for the same host. In both these cases this host
    339         # will not be in the ready hosts list anyway. In all other cases,
    340         # an incomplete special task will grab the host before a new job does
    341         # by assigning an agent to it.
    342         special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
    343                              'ON (afe_hosts.id = new_tasks.host_id AND '
    344                              'new_tasks.is_complete=0)')
    345 
    346         return scheduler_models.Host.fetch(
    347             joins='%s %s' % (hqe_join, special_task_join),
    348             where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
    349                   "AND afe_hosts.leased "
    350                   "AND NOT afe_hosts.locked "
    351                   "AND (afe_hosts.status IS NULL "
    352                           "OR afe_hosts.status = 'Ready')")
    353 
    354     @metrics.SecondsTimerDecorator(_host_timer_name % 'set_leased')
    355     def set_leased(self, leased_value, **kwargs):
    356         """Modify the leased bit on the hosts with ids in host_ids.
    357 
    358         @param leased_value: The True/False value of the leased column for
    359             the hosts with ids in host_ids.
    360         @param kwargs: The args to use in finding matching hosts.
    361         """
    362         logging.info('Setting leased = %s for the hosts that match %s',
    363                      leased_value, kwargs)
    364         models.Host.objects.filter(**kwargs).update(leased=leased_value)
    365 
    366 
    367     @metrics.SecondsTimerDecorator(_host_timer_name % 'get_labels')
    368     def _get_labels(self, job_dependencies):
    369         """
    370         Calculate a dict mapping label id to label object so that we don't
    371         frequently round trip to the database every time we need a label.
    372 
    373         @param job_dependencies: A dict mapping an integer job id to a list of
    374             integer label id's.  ie. {job_id: [label_id]}
    375         @return: A dict mapping an integer label id to a scheduler model label
    376             object.  ie. {label_id: label_object}
    377 
    378         """
    379         id_to_label = dict()
    380         # Pull all the labels on hosts we might look at
    381         host_labels = scheduler_models.Label.fetch(
    382                 where="id IN (SELECT label_id FROM afe_hosts_labels)")
    383         id_to_label.update([(label.id, label) for label in host_labels])
    384         # and pull all the labels on jobs we might look at.
    385         job_label_set = set()
    386         for job_deps in job_dependencies.values():
    387             job_label_set.update(job_deps)
    388         # On the rare/impossible chance that no jobs have any labels, we
    389         # can skip this.
    390         if job_label_set:
    391             job_string_label_list = ','.join([str(x) for x in job_label_set])
    392             job_labels = scheduler_models.Label.fetch(
    393                     where="id IN (%s)" % job_string_label_list)
    394             id_to_label.update([(label.id, label) for label in job_labels])
    395         return id_to_label
    396 
    397 
    398     def refresh(self, pending_queue_entries):
    399         """Update the query manager.
    400 
    401         Cache information about a list of queue entries and eligible hosts
    402         from the database so clients can avoid expensive round trips during
    403         host acquisition.
    404 
    405         @param pending_queue_entries: A list of queue entries about which we
    406             need information.
    407         """
    408         self._hosts_available = self._get_ready_hosts()
    409         relevant_jobs = [queue_entry.job_id
    410                          for queue_entry in pending_queue_entries]
    411         self._job_acls = self._get_job_acl_groups(relevant_jobs)
    412         self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
    413         self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
    414         host_ids = self._hosts_available.keys()
    415         self._host_acls = self._get_host_acls(host_ids)
    416         self._label_hosts, self._host_labels = (
    417                 self._get_label_hosts(host_ids))
    418         self._labels = self._get_labels(self._job_dependencies)
    419