Home | History | Annotate | Download | only in scheduler
      1 #pylint: disable-msg=C0111
      2 
      3 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Scheduler library classes.
      8 """
      9 
     10 import collections
     11 import logging
     12 
     13 import common
     14 
     15 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     16 from autotest_lib.frontend import setup_django_environment
     17 from autotest_lib.frontend.afe import models
     18 from autotest_lib.server.cros.dynamic_suite import constants
     19 from autotest_lib.scheduler import scheduler_models
     20 from autotest_lib.scheduler import scheduler_lib
     21 
     22 
     23 _job_timer = autotest_stats.Timer('scheduler.job_query_manager')
     24 class AFEJobQueryManager(object):
     25     """Query manager for AFE Jobs."""
     26 
     27     # A subquery to only get inactive hostless jobs.
     28     hostless_query = 'host_id IS NULL AND meta_host IS NULL'
     29 
     30 
     31     @_job_timer.decorate
     32     def get_pending_queue_entries(self, only_hostless=False):
     33         """
     34         Fetch a list of new host queue entries.
     35 
     36         The ordering of this list is important, as every new agent
     37         we schedule can potentially contribute to the process count
     38         on the drone, which has a static limit. The sort order
     39         prioritizes jobs as follows:
     40         1. High priority jobs: Based on the afe_job's priority
     41         2. With hosts and metahosts: This will only happen if we don't
     42             activate the hqe after assigning a host to it in
     43             schedule_new_jobs.
     44         3. With hosts but without metahosts: When tests are scheduled
     45             through the frontend the owner of the job would have chosen
     46             a host for it.
     47         4. Without hosts but with metahosts: This is the common case of
     48             a new test that needs a DUT. We assign a host and set it to
     49             active so it shouldn't show up in case 2 on the next tick.
     50         5. Without hosts and without metahosts: Hostless suite jobs, that
     51             will result in new jobs that fall under category 4.
     52 
     53         A note about the ordering of cases 3 and 4:
     54         Prioritizing one case above the other leads to earlier acquisition
     55         of the following resources: 1. process slots on the drone 2. machines.
     56         - When a user schedules a job through the afe they choose a specific
     57           host for it. Jobs with metahost can utilize any host that satisfies
     58           the metahost criterion. This means that if we had scheduled 4 before
     59           3 there is a good chance that a job which could've used another host,
     60           will now use the host assigned to a metahost-less job. Given the
     61           availability of machines in pool:suites, this almost guarantees
     62           starvation for jobs scheduled through the frontend.
     63         - Scheduling 4 before 3 also has its pros however, since a suite
     64           has the concept of a time out, whereas users can wait. If we hit the
     65           process count on the drone a suite can timeout waiting on the test,
     66           but a user job generally has a much longer timeout, and relatively
     67           harmless consequences.
     68         The current ordering was chosed because it is more likely that we will
     69         run out of machines in pool:suites than processes on the drone.
     70 
     71         @returns A list of HQEs ordered according to sort_order.
     72         """
     73         sort_order = ('afe_jobs.priority DESC, '
     74                       'ISNULL(host_id), '
     75                       'ISNULL(meta_host), '
     76                       'parent_job_id, '
     77                       'job_id')
     78         # Don't execute jobs that should be executed by a shard in the global
     79         # scheduler.
     80         # This won't prevent the shard scheduler to run this, as the shard db
     81         # doesn't have an an entry in afe_shards_labels.
     82         query=('NOT complete AND NOT active AND status="Queued"'
     83                'AND NOT aborted AND afe_shards_labels.id IS NULL')
     84 
     85         # TODO(jakobjuelich, beeps): Optimize this query. Details:
     86         # Compressed output of EXPLAIN <query>:
     87         # +------------------------+--------+-------------------------+-------+
     88         # | table                  | type   | key                     | rows  |
     89         # +------------------------+--------+-------------------------+-------+
     90         # | afe_host_queue_entries | ref    | host_queue_entry_status | 30536 |
     91         # | afe_shards_labels      | ref    | shard_label_id_fk       |     1 |
     92         # | afe_jobs               | eq_ref | PRIMARY                 |     1 |
     93         # +------------------------+--------+-------------------------+-------+
     94         # This shows the first part of the query fetches a lot of objects, that
     95         # are then filtered. The joins are comparably fast: There's usually just
     96         # one or none shard mapping that can be answered fully using an index
     97         # (shard_label_id_fk), similar thing applies to the job.
     98         #
     99         # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued),
    100         # it might be more efficient to filter on the meta_host first, instead
    101         # of the status.
    102         if only_hostless:
    103             query = '%s AND (%s)' % (query, self.hostless_query)
    104         return list(scheduler_models.HostQueueEntry.fetch(
    105             joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) '
    106                    'LEFT JOIN afe_shards_labels ON ('
    107                    'meta_host=afe_shards_labels.label_id)'),
    108             where=query, order_by=sort_order))
    109 
    110 
    111     @_job_timer.decorate
    112     def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
    113         """
    114         Returns all queued SpecialTasks prioritized for repair first, then
    115         cleanup, then verify.
    116 
    117         @param only_tasks_with_leased_hosts: If true, this method only returns
    118             tasks with leased hosts.
    119 
    120         @return: list of afe.models.SpecialTasks sorted according to priority.
    121         """
    122         queued_tasks = models.SpecialTask.objects.filter(is_active=False,
    123                                                          is_complete=False,
    124                                                          host__locked=False)
    125         # exclude hosts with active queue entries unless the SpecialTask is for
    126         # that queue entry
    127         queued_tasks = models.SpecialTask.objects.add_join(
    128                 queued_tasks, 'afe_host_queue_entries', 'host_id',
    129                 join_condition='afe_host_queue_entries.active',
    130                 join_from_key='host_id', force_left_join=True)
    131         queued_tasks = queued_tasks.extra(
    132                 where=['(afe_host_queue_entries.id IS NULL OR '
    133                        'afe_host_queue_entries.id = '
    134                                'afe_special_tasks.queue_entry_id)'])
    135         if only_tasks_with_leased_hosts:
    136             queued_tasks = queued_tasks.filter(host__leased=True)
    137 
    138         # reorder tasks by priority
    139         task_priority_order = [models.SpecialTask.Task.REPAIR,
    140                                models.SpecialTask.Task.CLEANUP,
    141                                models.SpecialTask.Task.VERIFY,
    142                                models.SpecialTask.Task.RESET,
    143                                models.SpecialTask.Task.PROVISION]
    144         def task_priority_key(task):
    145             return task_priority_order.index(task.task)
    146         return sorted(queued_tasks, key=task_priority_key)
    147 
    148 
    149     @classmethod
    150     def get_overlapping_jobs(cls):
    151         """A helper method to get all active jobs using the same host.
    152 
    153         @return: A list of dictionaries with the hqe id, job_id and host_id
    154             of the currently overlapping jobs.
    155         """
    156         # Filter all active hqes and stand alone special tasks to make sure
    157         # a host isn't being used by two jobs at the same time. An incomplete
    158         # stand alone special task can share a host with an active hqe, an
    159         # example of this is the cleanup scheduled in gathering.
    160         hqe_hosts = list(models.HostQueueEntry.objects.filter(
    161                 active=1, complete=0, host_id__isnull=False).values_list(
    162                 'host_id', flat=True))
    163         special_task_hosts = list(models.SpecialTask.objects.filter(
    164                 is_active=1, is_complete=0, host_id__isnull=False,
    165                 queue_entry_id__isnull=True).values_list('host_id', flat=True))
    166         host_counts = collections.Counter(
    167                 hqe_hosts + special_task_hosts).most_common()
    168         multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
    169         return list(models.HostQueueEntry.objects.filter(
    170                 host_id__in=multiple_hosts, active=True).values(
    171                         'id', 'job_id', 'host_id'))
    172 
    173 
    174     @_job_timer.decorate
    175     def get_suite_host_assignment(self):
    176         """A helper method to get how many hosts each suite is holding.
    177 
    178         @return: Two dictionaries (suite_host_num, hosts_to_suites)
    179                  suite_host_num maps suite job id to number of hosts
    180                  holding by its child jobs.
    181                  hosts_to_suites contains current hosts held by
    182                  any suites, and maps the host id to its parent_job_id.
    183         """
    184         query = models.HostQueueEntry.objects.filter(
    185                 host_id__isnull=False, complete=0, active=1,
    186                 job__parent_job_id__isnull=False)
    187         suite_host_num = {}
    188         hosts_to_suites = {}
    189         for hqe in query:
    190             host_id = hqe.host_id
    191             parent_job_id = hqe.job.parent_job_id
    192             count = suite_host_num.get(parent_job_id, 0)
    193             suite_host_num[parent_job_id] = count + 1
    194             hosts_to_suites[host_id] = parent_job_id
    195         return suite_host_num, hosts_to_suites
    196 
    197 
    198     @_job_timer.decorate
    199     def get_min_duts_of_suites(self, suite_job_ids):
    200         """Load suite_min_duts job keyval for a set of suites.
    201 
    202         @param suite_job_ids: A set of suite job ids.
    203 
    204         @return: A dictionary where the key is a suite job id,
    205                  the value is the value of 'suite_min_duts'.
    206         """
    207         query = models.JobKeyval.objects.filter(
    208                 job_id__in=suite_job_ids,
    209                 key=constants.SUITE_MIN_DUTS_KEY, value__isnull=False)
    210         return dict((keyval.job_id, int(keyval.value)) for keyval in query)
    211 
    212 
    213 _host_timer = autotest_stats.Timer('scheduler.host_query_manager')
    214 class AFEHostQueryManager(object):
    215     """Query manager for AFE Hosts."""
    216 
    217     def __init__(self):
    218         """Create an AFEHostQueryManager.
    219 
    220         @param db: A connection to the database with the afe_hosts table.
    221         """
    222         self._db = scheduler_lib.ConnectionManager().get_connection()
    223 
    224 
    225     def _process_many2many_dict(self, rows, flip=False):
    226         result = {}
    227         for row in rows:
    228             left_id, right_id = int(row[0]), int(row[1])
    229             if flip:
    230                 left_id, right_id = right_id, left_id
    231             result.setdefault(left_id, set()).add(right_id)
    232         return result
    233 
    234 
    235     def _get_sql_id_list(self, id_list):
    236         return ','.join(str(item_id) for item_id in id_list)
    237 
    238 
    239     def _get_many2many_dict(self, query, id_list, flip=False):
    240         if not id_list:
    241             return {}
    242         query %= self._get_sql_id_list(id_list)
    243         rows = self._db.execute(query)
    244         return self._process_many2many_dict(rows, flip)
    245 
    246 
    247     @_host_timer.decorate
    248     def _get_ready_hosts(self):
    249         # We don't lose anything by re-doing these checks
    250         # even though we release hosts on the same conditions.
    251         # In the future we might have multiple clients that
    252         # release_hosts and/or lock them independent of the
    253         # scheduler tick.
    254         hosts = scheduler_models.Host.fetch(
    255             where="NOT afe_hosts.leased "
    256                   "AND NOT afe_hosts.locked "
    257                   "AND (afe_hosts.status IS NULL "
    258                       "OR afe_hosts.status = 'Ready')")
    259         return dict((host.id, host) for host in hosts)
    260 
    261 
    262     @_host_timer.decorate
    263     def _get_job_acl_groups(self, job_ids):
    264         query = """
    265         SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
    266         FROM afe_jobs
    267         INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
    268         INNER JOIN afe_acl_groups_users ON
    269                 afe_acl_groups_users.user_id = afe_users.id
    270         WHERE afe_jobs.id IN (%s)
    271         """
    272         return self._get_many2many_dict(query, job_ids)
    273 
    274 
    275     @_host_timer.decorate
    276     def _get_job_ineligible_hosts(self, job_ids):
    277         query = """
    278         SELECT job_id, host_id
    279         FROM afe_ineligible_host_queues
    280         WHERE job_id IN (%s)
    281         """
    282         return self._get_many2many_dict(query, job_ids)
    283 
    284 
    285     @_host_timer.decorate
    286     def _get_job_dependencies(self, job_ids):
    287         query = """
    288         SELECT job_id, label_id
    289         FROM afe_jobs_dependency_labels
    290         WHERE job_id IN (%s)
    291         """
    292         return self._get_many2many_dict(query, job_ids)
    293 
    294     @_host_timer.decorate
    295     def _get_host_acls(self, host_ids):
    296         query = """
    297         SELECT host_id, aclgroup_id
    298         FROM afe_acl_groups_hosts
    299         WHERE host_id IN (%s)
    300         """
    301         return self._get_many2many_dict(query, host_ids)
    302 
    303 
    304     @_host_timer.decorate
    305     def _get_label_hosts(self, host_ids):
    306         if not host_ids:
    307             return {}, {}
    308         query = """
    309         SELECT label_id, host_id
    310         FROM afe_hosts_labels
    311         WHERE host_id IN (%s)
    312         """ % self._get_sql_id_list(host_ids)
    313         rows = self._db.execute(query)
    314         labels_to_hosts = self._process_many2many_dict(rows)
    315         hosts_to_labels = self._process_many2many_dict(rows, flip=True)
    316         return labels_to_hosts, hosts_to_labels
    317 
    318 
    319     @classmethod
    320     def find_unused_healty_hosts(cls):
    321         """Get hosts that are currently unused and in the READY state.
    322 
    323         @return: A list of host objects, one for each unused healthy host.
    324         """
    325         # Avoid any host with a currently active queue entry against it.
    326         hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
    327                     'ON (afe_hosts.id = active_hqe.host_id AND '
    328                     'active_hqe.active)')
    329 
    330         # Avoid any host with a new special task against it. There are 2 cases
    331         # when an inactive but incomplete special task will not use the host
    332         # this tick: 1. When the host is locked 2. When an active hqe already
    333         # has special tasks for the same host. In both these cases this host
    334         # will not be in the ready hosts list anyway. In all other cases,
    335         # an incomplete special task will grab the host before a new job does
    336         # by assigning an agent to it.
    337         special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
    338                              'ON (afe_hosts.id = new_tasks.host_id AND '
    339                              'new_tasks.is_complete=0)')
    340 
    341         return scheduler_models.Host.fetch(
    342             joins='%s %s' % (hqe_join, special_task_join),
    343             where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
    344                   "AND afe_hosts.leased "
    345                   "AND NOT afe_hosts.locked "
    346                   "AND (afe_hosts.status IS NULL "
    347                           "OR afe_hosts.status = 'Ready')")
    348 
    349 
    350     @_host_timer.decorate
    351     def set_leased(self, leased_value, **kwargs):
    352         """Modify the leased bit on the hosts with ids in host_ids.
    353 
    354         @param leased_value: The True/False value of the leased column for
    355             the hosts with ids in host_ids.
    356         @param kwargs: The args to use in finding matching hosts.
    357         """
    358         logging.info('Setting leased = %s for the hosts that match %s',
    359                      leased_value, kwargs)
    360         models.Host.objects.filter(**kwargs).update(leased=leased_value)
    361 
    362 
    363     @_host_timer.decorate
    364     def _get_labels(self, job_dependencies):
    365         """
    366         Calculate a dict mapping label id to label object so that we don't
    367         frequently round trip to the database every time we need a label.
    368 
    369         @param job_dependencies: A dict mapping an integer job id to a list of
    370             integer label id's.  ie. {job_id: [label_id]}
    371         @return: A dict mapping an integer label id to a scheduler model label
    372             object.  ie. {label_id: label_object}
    373 
    374         """
    375         id_to_label = dict()
    376         # Pull all the labels on hosts we might look at
    377         host_labels = scheduler_models.Label.fetch(
    378                 where="id IN (SELECT label_id FROM afe_hosts_labels)")
    379         id_to_label.update([(label.id, label) for label in host_labels])
    380         # and pull all the labels on jobs we might look at.
    381         job_label_set = set()
    382         for job_deps in job_dependencies.values():
    383             job_label_set.update(job_deps)
    384         # On the rare/impossible chance that no jobs have any labels, we
    385         # can skip this.
    386         if job_label_set:
    387             job_string_label_list = ','.join([str(x) for x in job_label_set])
    388             job_labels = scheduler_models.Label.fetch(
    389                     where="id IN (%s)" % job_string_label_list)
    390             id_to_label.update([(label.id, label) for label in job_labels])
    391         return id_to_label
    392 
    393 
    394     @_host_timer.decorate
    395     def refresh(self, pending_queue_entries):
    396         """Update the query manager.
    397 
    398         Cache information about a list of queue entries and eligible hosts
    399         from the database so clients can avoid expensive round trips during
    400         host acquisition.
    401 
    402         @param pending_queue_entries: A list of queue entries about which we
    403             need information.
    404         """
    405         self._hosts_available = self._get_ready_hosts()
    406         relevant_jobs = [queue_entry.job_id
    407                          for queue_entry in pending_queue_entries]
    408         self._job_acls = self._get_job_acl_groups(relevant_jobs)
    409         self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
    410         self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
    411         host_ids = self._hosts_available.keys()
    412         self._host_acls = self._get_host_acls(host_ids)
    413         self._label_hosts, self._host_labels = (
    414                 self._get_label_hosts(host_ids))
    415         self._labels = self._get_labels(self._job_dependencies)
    416