Home | History | Annotate | Download | only in scheduler
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Rdb server module.
      6 """
      7 
      8 import logging
      9 
     10 import common
     11 
     12 from django.core import exceptions as django_exceptions
     13 from django.db.models import fields
     14 from django.db.models import Q
     15 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     16 from autotest_lib.frontend.afe import models
     17 from autotest_lib.scheduler import rdb_cache_manager
     18 from autotest_lib.scheduler import rdb_hosts
     19 from autotest_lib.scheduler import rdb_requests
     20 from autotest_lib.scheduler import rdb_utils
     21 from autotest_lib.server import utils
     22 
     23 
     24 _timer = autotest_stats.Timer(rdb_utils.RDB_STATS_KEY)
     25 _is_master = not utils.is_shard()
     26 
     27 
     28 # Qeury managers: Provide a layer of abstraction over the database by
     29 # encapsulating common query patterns used by the rdb.
     30 class BaseHostQueryManager(object):
     31     """Base manager for host queries on all hosts.
     32     """
     33 
     34     host_objects = models.Host.objects
     35 
     36 
     37     def update_hosts(self, host_ids, **kwargs):
     38         """Update fields on a hosts.
     39 
     40         @param host_ids: A list of ids of hosts to update.
     41         @param kwargs: A key value dictionary corresponding to column, value
     42             in the host database.
     43         """
     44         self.host_objects.filter(id__in=host_ids).update(**kwargs)
     45 
     46 
     47     @rdb_hosts.return_rdb_host
     48     def get_hosts(self, ids):
     49         """Get host objects for the given ids.
     50 
     51         @param ids: The ids for which we need host objects.
     52 
     53         @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
     54         """
     55         return self.host_objects.filter(id__in=ids).order_by('id')
     56 
     57 
     58     @rdb_hosts.return_rdb_host
     59     def find_hosts(self, deps, acls):
     60         """Finds valid hosts matching deps, acls.
     61 
     62         @param deps: A list of dependencies to match.
     63         @param acls: A list of acls, at least one of which must coincide with
     64             an acl group the chosen host is in.
     65 
     66         @return: A set of matching hosts available.
     67         """
     68         hosts_available = self.host_objects.filter(invalid=0)
     69         queries = [Q(labels__id=dep) for dep in deps]
     70         queries += [Q(aclgroup__id__in=acls)]
     71         for query in queries:
     72             hosts_available = hosts_available.filter(query)
     73         return set(hosts_available)
     74 
     75 
     76 class AvailableHostQueryManager(BaseHostQueryManager):
     77     """Query manager for requests on un-leased, un-locked hosts.
     78     """
     79 
     80     host_objects = models.Host.leased_objects
     81 
     82 
     83 # Request Handlers: Used in conjunction with requests in rdb_utils, these
     84 # handlers acquire hosts for a request and record the acquisition in
     85 # an response_map dictionary keyed on the request itself, with the host/hosts
     86 # as values.
     87 class BaseHostRequestHandler(object):
     88     """Handler for requests related to hosts, leased or unleased.
     89 
     90     This class is only capable of blindly returning host information.
     91     """
     92 
     93     def __init__(self):
     94         self.host_query_manager = BaseHostQueryManager()
     95         self.response_map = {}
     96 
     97 
     98     def update_response_map(self, request, response, append=False):
     99         """Record a response for a request.
    100 
    101         The response_map only contains requests that were either satisfied, or
    102         that ran into an exception. Often this translates to reserving hosts
    103         against a request. If the rdb hit an exception processing a request, the
    104         exception gets recorded in the map for the client to reraise.
    105 
    106         @param response: A response for the request.
    107         @param request: The request that has reserved these hosts.
    108         @param append: Boolean, whether to append new hosts in
    109                        |response| for existing request.
    110                        Will not append if existing response is
    111                        a list of exceptions.
    112 
    113         @raises RDBException: If an empty values is added to the map.
    114         """
    115         if not response:
    116             raise rdb_utils.RDBException('response_map dict can only contain '
    117                     'valid responses. Request %s, response %s is invalid.' %
    118                      (request, response))
    119         exist_response = self.response_map.setdefault(request, [])
    120         if exist_response and not append:
    121             raise rdb_utils.RDBException('Request %s already has response %s '
    122                                          'the rdb cannot return multiple '
    123                                          'responses for the same request.' %
    124                                          (request, response))
    125         if exist_response and append and not isinstance(
    126                 exist_response[0], rdb_hosts.RDBHost):
    127             # Do not append if existing response contains exception.
    128             return
    129         exist_response.extend(response)
    130 
    131 
    132     def _check_response_map(self):
    133         """Verify that we never give the same host to different requests.
    134 
    135         @raises RDBException: If the same host is assigned to multiple requests.
    136         """
    137         unique_hosts = set([])
    138         for request, response in self.response_map.iteritems():
    139             # Each value in the response map can only either be a list of
    140             # RDBHosts or a list of RDBExceptions, not a mix of both.
    141             if isinstance(response[0], rdb_hosts.RDBHost):
    142                 if any([host in unique_hosts for host in response]):
    143                     raise rdb_utils.RDBException(
    144                             'Assigning the same host to multiple requests. New '
    145                             'hosts %s, request %s, response_map: %s' %
    146                             (response, request, self.response_map))
    147                 else:
    148                     unique_hosts = unique_hosts.union(response)
    149 
    150 
    151     def _record_exceptions(self, request, exceptions):
    152         """Record a list of exceptions for a request.
    153 
    154         @param request: The request for which the exceptions were hit.
    155         @param exceptions: The exceptions hit while processing the request.
    156         """
    157         rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
    158         self.update_response_map(request, rdb_exceptions)
    159 
    160 
    161     def get_response(self):
    162         """Convert all RDBServerHostWrapper objects to host info dictionaries.
    163 
    164         @return: A dictionary mapping requests to a list of matching host_infos.
    165 
    166         @raises RDBException: If the same host is assigned to multiple requests.
    167         """
    168         self._check_response_map()
    169         for request, response in self.response_map.iteritems():
    170             self.response_map[request] = [reply.wire_format()
    171                                           for reply in response]
    172         return self.response_map
    173 
    174 
    175     def update_hosts(self, update_requests):
    176         """Updates host tables with a payload.
    177 
    178         @param update_requests: A list of update requests, as defined in
    179             rdb_requests.UpdateHostRequest.
    180         """
    181         # Last payload for a host_id wins in the case of conflicting requests.
    182         unique_host_requests = {}
    183         for request in update_requests:
    184             if unique_host_requests.get(request.host_id):
    185                 unique_host_requests[request.host_id].update(request.payload)
    186             else:
    187                 unique_host_requests[request.host_id] = request.payload
    188 
    189         # Batch similar payloads so we can do them in one table scan.
    190         similar_requests = {}
    191         for host_id, payload in unique_host_requests.iteritems():
    192             similar_requests.setdefault(payload, []).append(host_id)
    193 
    194         # If fields of the update don't match columns in the database,
    195         # record the exception in the response map. This also means later
    196         # updates will get applied even if previous updates fail.
    197         for payload, hosts in similar_requests.iteritems():
    198             try:
    199                 response = self.host_query_manager.update_hosts(hosts, **payload)
    200             except (django_exceptions.FieldError,
    201                     fields.FieldDoesNotExist, ValueError) as e:
    202                 for host in hosts:
    203                     # Since update requests have a consistent hash this will map
    204                     # to the same key as the original request.
    205                     request = rdb_requests.UpdateHostRequest(
    206                             host_id=host, payload=payload).get_request()
    207                     self._record_exceptions(request, [e])
    208 
    209 
    210     def batch_get_hosts(self, host_requests):
    211         """Get hosts matching the requests.
    212 
    213         This method does not acquire the hosts, i.e it reserves hosts against
    214         requests leaving their leased state untouched.
    215 
    216         @param host_requests: A list of requests, as defined in
    217             rdb_utils.BaseHostRequest.
    218         """
    219         host_ids = set([request.host_id for request in host_requests])
    220         host_map = {}
    221 
    222         # This list will not contain available hosts if executed using
    223         # an AvailableHostQueryManager.
    224         for host in self.host_query_manager.get_hosts(host_ids):
    225             host_map[host.id] = host
    226         for request in host_requests:
    227             if request.host_id in host_map:
    228                 self.update_response_map(request, [host_map[request.host_id]])
    229             else:
    230                 logging.warning('rdb could not get host for request: %s, it '
    231                                 'is already leased or locked', request)
    232 
    233 
    234 class AvailableHostRequestHandler(BaseHostRequestHandler):
    235     """Handler for requests related to available (unleased and unlocked) hosts.
    236 
    237     This class is capable of acquiring or validating hosts for requests.
    238     """
    239 
    240 
    241     def __init__(self):
    242         self.host_query_manager = AvailableHostQueryManager()
    243         self.cache = rdb_cache_manager.RDBHostCacheManager()
    244         self.response_map = {}
    245         self.unsatisfied_requests = 0
    246         self.leased_hosts_count = 0
    247         self.request_accountant = None
    248 
    249 
    250     @_timer.decorate
    251     def lease_hosts(self, hosts):
    252         """Leases a list of hosts.
    253 
    254         @param hosts: A list of RDBServerHostWrapper instances to lease.
    255 
    256         @return: The list of RDBServerHostWrappers that were successfully
    257             leased.
    258         """
    259         #TODO(beeps): crbug.com/353183.
    260         unleased_hosts = set(hosts)
    261         leased_hosts = set([])
    262         for host in unleased_hosts:
    263             try:
    264                 host.lease()
    265             except rdb_utils.RDBException as e:
    266                 logging.error('Unable to lease host %s: %s', host.hostname, e)
    267             else:
    268                 leased_hosts.add(host)
    269         return list(leased_hosts)
    270 
    271 
    272     @classmethod
    273     def valid_host_assignment(cls, request, host):
    274         """Check if a host, request pairing is valid.
    275 
    276         @param request: The request to match against the host.
    277         @param host: An RDBServerHostWrapper instance.
    278 
    279         @return: True if the host, request assignment is valid.
    280 
    281         @raises RDBException: If the request already has another host_ids
    282             associated with it.
    283         """
    284         if request.host_id and request.host_id != host.id:
    285             raise rdb_utils.RDBException(
    286                     'Cannot assign a different host for request: %s, it '
    287                     'already has one: %s ' % (request, host.id))
    288 
    289         # Getting all labels and acls might result in large queries, so
    290         # bail early if the host is already leased.
    291         if host.leased:
    292             return False
    293         # If a host is invalid it must be a one time host added to the
    294         # afe specifically for this purpose, so it doesn't require acl checking.
    295         acl_match = (request.acls.intersection(host.acls) or host.invalid)
    296         label_match = (request.deps.intersection(host.labels) == request.deps)
    297         return acl_match and label_match
    298 
    299 
    300     @classmethod
    301     def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps):
    302         """Sort hosts in the order of how many preferred deps it has.
    303 
    304         This allows rdb always choose the hosts with the most preferred deps
    305         for a request. One important use case is including cros-version as
    306         a preferred dependence. By choosing a host with the same cros-version,
    307         we can save the time on provisioning it. Note this is not guaranteed
    308         if preferred_deps contains other labels as well.
    309 
    310         @param hosts: A list of hosts to sort.
    311         @param preferred_deps: A list of deps that are preferred.
    312 
    313         @return: A list of sorted hosts.
    314 
    315         """
    316         hosts = sorted(
    317                 hosts,
    318                 key=lambda host: len(set(preferred_deps) & set(host.labels)),
    319                 reverse=True)
    320         return hosts
    321 
    322 
    323     @rdb_cache_manager.memoize_hosts
    324     def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False,
    325                        **kwargs):
    326         """Acquire hosts for a group of similar requests.
    327 
    328         Find and acquire hosts that can satisfy a group of requests.
    329         1. If the caching decorator doesn't pass in a list of matching hosts
    330            via the MEMOIZE_KEY this method will directly check the database for
    331            matching hosts.
    332         2. If all matching hosts are not leased for this request, the remaining
    333            hosts are returned to the caching decorator, to place in the cache.
    334 
    335         @param hosts_required: Number of hosts required to satisfy request.
    336         @param request: The request for hosts.
    337         @param is_acquire_min_duts: Boolean. Indicate whether this is to
    338                                     acquire minimum required duts, only used
    339                                     for stats purpose.
    340 
    341         @return: The list of excess matching hosts.
    342         """
    343         hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, [])
    344         if not hosts:
    345             hosts = self.host_query_manager.find_hosts(
    346                             request.deps, request.acls)
    347 
    348         # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)->
    349         # |   -leased_hosts-  |   -stale cached hosts-  | -unleased matching- |
    350         # --used this request---used by earlier request----------unused--------
    351         hosts = self._sort_hosts_by_preferred_deps(
    352                 hosts, request.preferred_deps)
    353         attempt_lease_hosts = min(len(hosts), hosts_required)
    354         leased_host_count = 0
    355         if attempt_lease_hosts:
    356             leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts])
    357             if leased_hosts:
    358                 self.update_response_map(request, leased_hosts, append=True)
    359 
    360             # [:attempt_leased_hosts] - leased_hosts will include hosts that
    361             # failed leasing, most likely because they're already leased, so
    362             # don't cache them again.
    363             leased_host_count = len(leased_hosts)
    364             failed_leasing = attempt_lease_hosts - leased_host_count
    365             if failed_leasing > 0:
    366                 # For the sake of simplicity this calculation assumes that
    367                 # leasing only fails if there's a stale cached host already
    368                 # leased by a previous request, ergo, we can only get here
    369                 # through a cache hit.
    370                 line_length = len(hosts)
    371                 self.cache.stale_entries.append(
    372                         (float(failed_leasing)/line_length) * 100)
    373             self.leased_hosts_count += leased_host_count
    374         if is_acquire_min_duts:
    375             self.request_accountant.record_acquire_min_duts(
    376                     request, hosts_required, leased_host_count)
    377         self.unsatisfied_requests += max(hosts_required - leased_host_count, 0)
    378         # Cache the unleased matching hosts against the request.
    379         return hosts[attempt_lease_hosts:]
    380 
    381 
    382     @_timer.decorate
    383     def batch_acquire_hosts(self, host_requests):
    384         """Acquire hosts for a list of requests.
    385 
    386         The act of acquisition involves finding and leasing a set of hosts
    387         that match the parameters of a request. Each acquired host is added
    388         to the response_map dictionary as an RDBServerHostWrapper.
    389 
    390         @param host_requests: A list of requests to acquire hosts.
    391         """
    392         distinct_requests = 0
    393 
    394         logging.debug('Processing %s host acquisition requests',
    395                       len(host_requests))
    396 
    397         self.request_accountant = rdb_utils.RequestAccountant(host_requests)
    398         # First pass tries to satisfy min_duts for each suite.
    399         for request in self.request_accountant.requests:
    400             to_acquire = self.request_accountant.get_min_duts(request)
    401             if to_acquire > 0:
    402                 self._acquire_hosts(request, to_acquire,
    403                                     is_acquire_min_duts=True)
    404             distinct_requests += 1
    405 
    406         # Second pass tries to allocate duts to the rest unsatisfied requests.
    407         for request in self.request_accountant.requests:
    408             to_acquire = self.request_accountant.get_duts(request)
    409             if to_acquire > 0:
    410                 self._acquire_hosts(request, to_acquire,
    411                                     is_acquire_min_duts=False)
    412 
    413         self.cache.record_stats()
    414         logging.debug('Host acquisition stats: distinct requests: %s, leased '
    415                       'hosts: %s, unsatisfied requests: %s', distinct_requests,
    416                       self.leased_hosts_count, self.unsatisfied_requests)
    417         autotest_stats.Gauge(rdb_utils.RDB_STATS_KEY).send(
    418                 'leased_hosts', self.leased_hosts_count)
    419         autotest_stats.Gauge(rdb_utils.RDB_STATS_KEY).send(
    420                 'unsatisfied_requests', self.unsatisfied_requests)
    421 
    422 
    423     @_timer.decorate
    424     def batch_validate_hosts(self, requests):
    425         """Validate requests with hosts.
    426 
    427         Reserve all hosts, check each one for validity and discard invalid
    428         request-host pairings. Lease the remaining hsots.
    429 
    430         @param requests: A list of requests to validate.
    431 
    432         @raises RDBException: If multiple hosts or the wrong host is returned
    433             for a response.
    434         """
    435         # The following cases are possible for frontend requests:
    436         # 1. Multiple requests for 1 host, with different acls/deps/priority:
    437         #    These form distinct requests because they hash differently.
    438         #    The response map will contain entries like: {r1: h1, r2: h1}
    439         #    after the batch_get_hosts call. There are 2 sub-cases:
    440         #        a. Same deps/acls, different priority:
    441         #           Since we sort the requests based on priority, the
    442         #           higher priority request r1, will lease h1. The
    443         #           validation of r2, h1 will fail because of the r1 lease.
    444         #        b. Different deps/acls, only one of which matches the host:
    445         #           The matching request will lease h1. The other host
    446         #           pairing will get dropped from the response map.
    447         # 2. Multiple requests with the same acls/deps/priority and 1 host:
    448         #    These all have the same request hash, so the response map will
    449         #    contain: {r: h}, regardless of the number of r's. If this is not
    450         #    a valid host assignment it will get dropped from the response.
    451         self.batch_get_hosts(set(requests))
    452         for request in sorted(self.response_map.keys(),
    453                 key=lambda request: request.priority, reverse=True):
    454             hosts = self.response_map[request]
    455             if len(hosts) > 1:
    456                 raise rdb_utils.RDBException('Got multiple hosts for a single '
    457                         'request. Hosts: %s, request %s.' % (hosts, request))
    458             # Job-shard is 1:1 mapping. Because a job can only belongs
    459             # to one shard, or belongs to master, we disallow frontend job
    460             # that spans hosts on and off shards or across multiple shards,
    461             # which would otherwise break the 1:1 mapping.
    462             # As such, on master, if a request asks for multiple hosts and
    463             # if any host is found on shard, we assume other requested hosts
    464             # would also be on the same shard.  We can safely drop this request.
    465             ignore_request = _is_master and any(
    466                     [host.shard_id for host in hosts])
    467             if (not ignore_request and
    468                     (self.valid_host_assignment(request, hosts[0]) and
    469                         self.lease_hosts(hosts))):
    470                 continue
    471             del self.response_map[request]
    472             logging.warning('Request %s was not able to lease host %s',
    473                             request, hosts[0])
    474 
    475 
    476 # Request dispatchers: Create the appropriate request handler, send a list
    477 # of requests to one of its methods. The corresponding request handler in
    478 # rdb_lib must understand how to match each request with a response from a
    479 # dispatcher, the easiest way to achieve this is to returned the response_map
    480 # attribute of the request handler, after making the appropriate requests.
    481 def get_hosts(host_requests):
    482     """Get host information about the requested hosts.
    483 
    484     @param host_requests: A list of requests as defined in BaseHostRequest.
    485     @return: A dictionary mapping each request to a list of hosts.
    486     """
    487     rdb_handler = BaseHostRequestHandler()
    488     rdb_handler.batch_get_hosts(host_requests)
    489     return rdb_handler.get_response()
    490 
    491 
    492 def update_hosts(update_requests):
    493     """Update hosts.
    494 
    495     @param update_requests: A list of updates to host tables
    496         as defined in UpdateHostRequest.
    497     """
    498     rdb_handler = BaseHostRequestHandler()
    499     rdb_handler.update_hosts(update_requests)
    500     return rdb_handler.get_response()
    501 
    502 
    503 def rdb_host_request_dispatcher(host_requests):
    504     """Dispatcher for all host acquisition queries.
    505 
    506     @param host_requests: A list of requests for acquiring hosts, as defined in
    507         AcquireHostRequest.
    508     @return: A dictionary mapping each request to a list of hosts, or
    509         an empty list if none could satisfy the request. Eg:
    510         {AcquireHostRequest.template: [host_info_dictionaries]}
    511     """
    512     validation_requests = []
    513     require_hosts_requests = []
    514 
    515     # Validation requests are made by a job scheduled against a specific host
    516     # specific host (eg: through the frontend) and only require the rdb to
    517     # match the parameters of the host against the request. Acquisition
    518     # requests are made by jobs that need hosts (eg: suites) and the rdb needs
    519     # to find hosts matching the parameters of the request.
    520     for request in host_requests:
    521         if request.host_id:
    522             validation_requests.append(request)
    523         else:
    524             require_hosts_requests.append(request)
    525 
    526     rdb_handler = AvailableHostRequestHandler()
    527     rdb_handler.batch_validate_hosts(validation_requests)
    528     rdb_handler.batch_acquire_hosts(require_hosts_requests)
    529     return rdb_handler.get_response()
    530