Home | History | Annotate | Download | only in scheduler
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Rdb server module.
      6 """
      7 
      8 import logging
      9 
     10 import common
     11 
     12 from django.core import exceptions as django_exceptions
     13 from django.db.models import fields
     14 from django.db.models import Q
     15 from autotest_lib.frontend.afe import models
     16 from autotest_lib.scheduler import rdb_cache_manager
     17 from autotest_lib.scheduler import rdb_hosts
     18 from autotest_lib.scheduler import rdb_requests
     19 from autotest_lib.scheduler import rdb_utils
     20 from autotest_lib.server import utils
     21 
     22 try:
     23     from chromite.lib import metrics
     24 except ImportError:
     25     metrics = utils.metrics_mock
     26 
     27 
     28 _rdb_timer_name = 'chromeos/autotest/scheduler/rdb/durations/%s'
     29 _is_master = not utils.is_shard()
     30 
     31 # Qeury managers: Provide a layer of abstraction over the database by
     32 # encapsulating common query patterns used by the rdb.
     33 class BaseHostQueryManager(object):
     34     """Base manager for host queries on all hosts.
     35     """
     36 
     37     host_objects = models.Host.objects
     38 
     39 
     40     def update_hosts(self, host_ids, **kwargs):
     41         """Update fields on a hosts.
     42 
     43         @param host_ids: A list of ids of hosts to update.
     44         @param kwargs: A key value dictionary corresponding to column, value
     45             in the host database.
     46         """
     47         self.host_objects.filter(id__in=host_ids).update(**kwargs)
     48 
     49 
     50     @rdb_hosts.return_rdb_host
     51     def get_hosts(self, ids):
     52         """Get host objects for the given ids.
     53 
     54         @param ids: The ids for which we need host objects.
     55 
     56         @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
     57         """
     58         return self.host_objects.filter(id__in=ids).order_by('id')
     59 
     60 
     61     @rdb_hosts.return_rdb_host
     62     def find_hosts(self, deps, acls):
     63         """Finds valid hosts matching deps, acls.
     64 
     65         @param deps: A list/frozenset of dependencies (label id) to match.
     66         @param acls: A list/frozenset of acls, at least one of which must
     67             coincide with an acl group the chosen host is in.
     68 
     69         @return: A set of matching hosts available.
     70         """
     71         hosts_available = self.host_objects.filter(invalid=0)
     72         hosts_available = hosts_available.filter(Q(aclgroup__id__in=acls))
     73         hosts_available = models.Host.get_hosts_with_label_ids(
     74                 list(deps), hosts_available)
     75         return set(hosts_available)
     76 
     77 
     78 class AvailableHostQueryManager(BaseHostQueryManager):
     79     """Query manager for requests on un-leased, un-locked hosts.
     80     """
     81 
     82     host_objects = models.Host.leased_objects
     83 
     84 
     85 # Request Handlers: Used in conjunction with requests in rdb_utils, these
     86 # handlers acquire hosts for a request and record the acquisition in
     87 # an response_map dictionary keyed on the request itself, with the host/hosts
     88 # as values.
     89 class BaseHostRequestHandler(object):
     90     """Handler for requests related to hosts, leased or unleased.
     91 
     92     This class is only capable of blindly returning host information.
     93     """
     94 
     95     def __init__(self):
     96         self.host_query_manager = BaseHostQueryManager()
     97         self.response_map = {}
     98 
     99 
    100     def update_response_map(self, request, response, append=False):
    101         """Record a response for a request.
    102 
    103         The response_map only contains requests that were either satisfied, or
    104         that ran into an exception. Often this translates to reserving hosts
    105         against a request. If the rdb hit an exception processing a request, the
    106         exception gets recorded in the map for the client to reraise.
    107 
    108         @param response: A response for the request.
    109         @param request: The request that has reserved these hosts.
    110         @param append: Boolean, whether to append new hosts in
    111                        |response| for existing request.
    112                        Will not append if existing response is
    113                        a list of exceptions.
    114 
    115         @raises RDBException: If an empty values is added to the map.
    116         """
    117         if not response:
    118             raise rdb_utils.RDBException('response_map dict can only contain '
    119                     'valid responses. Request %s, response %s is invalid.' %
    120                      (request, response))
    121         exist_response = self.response_map.setdefault(request, [])
    122         if exist_response and not append:
    123             raise rdb_utils.RDBException('Request %s already has response %s '
    124                                          'the rdb cannot return multiple '
    125                                          'responses for the same request.' %
    126                                          (request, response))
    127         if exist_response and append and not isinstance(
    128                 exist_response[0], rdb_hosts.RDBHost):
    129             # Do not append if existing response contains exception.
    130             return
    131         exist_response.extend(response)
    132 
    133 
    134     def _check_response_map(self):
    135         """Verify that we never give the same host to different requests.
    136 
    137         @raises RDBException: If the same host is assigned to multiple requests.
    138         """
    139         unique_hosts = set([])
    140         for request, response in self.response_map.iteritems():
    141             # Each value in the response map can only either be a list of
    142             # RDBHosts or a list of RDBExceptions, not a mix of both.
    143             if isinstance(response[0], rdb_hosts.RDBHost):
    144                 if any([host in unique_hosts for host in response]):
    145                     raise rdb_utils.RDBException(
    146                             'Assigning the same host to multiple requests. New '
    147                             'hosts %s, request %s, response_map: %s' %
    148                             (response, request, self.response_map))
    149                 else:
    150                     unique_hosts = unique_hosts.union(response)
    151 
    152 
    153     def _record_exceptions(self, request, exceptions):
    154         """Record a list of exceptions for a request.
    155 
    156         @param request: The request for which the exceptions were hit.
    157         @param exceptions: The exceptions hit while processing the request.
    158         """
    159         rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
    160         self.update_response_map(request, rdb_exceptions)
    161 
    162 
    163     def get_response(self):
    164         """Convert all RDBServerHostWrapper objects to host info dictionaries.
    165 
    166         @return: A dictionary mapping requests to a list of matching host_infos.
    167 
    168         @raises RDBException: If the same host is assigned to multiple requests.
    169         """
    170         self._check_response_map()
    171         for request, response in self.response_map.iteritems():
    172             self.response_map[request] = [reply.wire_format()
    173                                           for reply in response]
    174         return self.response_map
    175 
    176 
    177     def update_hosts(self, update_requests):
    178         """Updates host tables with a payload.
    179 
    180         @param update_requests: A list of update requests, as defined in
    181             rdb_requests.UpdateHostRequest.
    182         """
    183         # Last payload for a host_id wins in the case of conflicting requests.
    184         unique_host_requests = {}
    185         for request in update_requests:
    186             if unique_host_requests.get(request.host_id):
    187                 unique_host_requests[request.host_id].update(request.payload)
    188             else:
    189                 unique_host_requests[request.host_id] = request.payload
    190 
    191         # Batch similar payloads so we can do them in one table scan.
    192         similar_requests = {}
    193         for host_id, payload in unique_host_requests.iteritems():
    194             similar_requests.setdefault(payload, []).append(host_id)
    195 
    196         # If fields of the update don't match columns in the database,
    197         # record the exception in the response map. This also means later
    198         # updates will get applied even if previous updates fail.
    199         for payload, hosts in similar_requests.iteritems():
    200             try:
    201                 response = self.host_query_manager.update_hosts(hosts, **payload)
    202             except (django_exceptions.FieldError,
    203                     fields.FieldDoesNotExist, ValueError) as e:
    204                 for host in hosts:
    205                     # Since update requests have a consistent hash this will map
    206                     # to the same key as the original request.
    207                     request = rdb_requests.UpdateHostRequest(
    208                             host_id=host, payload=payload).get_request()
    209                     self._record_exceptions(request, [e])
    210 
    211 
    212     def batch_get_hosts(self, host_requests):
    213         """Get hosts matching the requests.
    214 
    215         This method does not acquire the hosts, i.e it reserves hosts against
    216         requests leaving their leased state untouched.
    217 
    218         @param host_requests: A list of requests, as defined in
    219             rdb_utils.BaseHostRequest.
    220         """
    221         host_ids = set([request.host_id for request in host_requests])
    222         host_map = {}
    223 
    224         # This list will not contain available hosts if executed using
    225         # an AvailableHostQueryManager.
    226         for host in self.host_query_manager.get_hosts(host_ids):
    227             host_map[host.id] = host
    228         for request in host_requests:
    229             if request.host_id in host_map:
    230                 self.update_response_map(request, [host_map[request.host_id]])
    231             else:
    232                 logging.warning('rdb could not get host for request: %s, it '
    233                                 'is already leased or locked', request)
    234 
    235 
    236 class AvailableHostRequestHandler(BaseHostRequestHandler):
    237     """Handler for requests related to available (unleased and unlocked) hosts.
    238 
    239     This class is capable of acquiring or validating hosts for requests.
    240     """
    241 
    242 
    243     def __init__(self):
    244         self.host_query_manager = AvailableHostQueryManager()
    245         self.cache = rdb_cache_manager.RDBHostCacheManager()
    246         self.response_map = {}
    247         self.unsatisfied_requests = 0
    248         self.leased_hosts_count = 0
    249         self.request_accountant = None
    250 
    251 
    252     @metrics.SecondsTimerDecorator(_rdb_timer_name % 'lease_hosts')
    253     def lease_hosts(self, hosts):
    254         """Leases a list of hosts.
    255 
    256         @param hosts: A list of RDBServerHostWrapper instances to lease.
    257 
    258         @return: The list of RDBServerHostWrappers that were successfully
    259             leased.
    260         """
    261         #TODO(beeps): crbug.com/353183.
    262         unleased_hosts = set(hosts)
    263         leased_hosts = set([])
    264         for host in unleased_hosts:
    265             try:
    266                 host.lease()
    267             except rdb_utils.RDBException as e:
    268                 logging.error('Unable to lease host %s: %s', host.hostname, e)
    269             else:
    270                 leased_hosts.add(host)
    271         return list(leased_hosts)
    272 
    273 
    274     @classmethod
    275     def valid_host_assignment(cls, request, host):
    276         """Check if a host, request pairing is valid.
    277 
    278         @param request: The request to match against the host.
    279         @param host: An RDBServerHostWrapper instance.
    280 
    281         @return: True if the host, request assignment is valid.
    282 
    283         @raises RDBException: If the request already has another host_ids
    284             associated with it.
    285         """
    286         if request.host_id and request.host_id != host.id:
    287             raise rdb_utils.RDBException(
    288                     'Cannot assign a different host for request: %s, it '
    289                     'already has one: %s ' % (request, host.id))
    290 
    291         # Getting all labels and acls might result in large queries, so
    292         # bail early if the host is already leased.
    293         if host.leased:
    294             return False
    295         # If a host is invalid it must be a one time host added to the
    296         # afe specifically for this purpose, so it doesn't require acl checking.
    297         acl_match = (request.acls.intersection(host.acls) or host.invalid)
    298         label_match = (request.deps.intersection(host.labels) == request.deps)
    299         return acl_match and label_match
    300 
    301 
    302     @classmethod
    303     def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps):
    304         """Sort hosts in the order of how many preferred deps it has.
    305 
    306         This allows rdb always choose the hosts with the most preferred deps
    307         for a request. One important use case is including cros-version as
    308         a preferred dependence. By choosing a host with the same cros-version,
    309         we can save the time on provisioning it. Note this is not guaranteed
    310         if preferred_deps contains other labels as well.
    311 
    312         @param hosts: A list of hosts to sort.
    313         @param preferred_deps: A list of deps that are preferred.
    314 
    315         @return: A list of sorted hosts.
    316 
    317         """
    318         hosts = sorted(
    319                 hosts,
    320                 key=lambda host: len(set(preferred_deps) & set(host.labels)),
    321                 reverse=True)
    322         return hosts
    323 
    324 
    325     @rdb_cache_manager.memoize_hosts
    326     def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False,
    327                        **kwargs):
    328         """Acquire hosts for a group of similar requests.
    329 
    330         Find and acquire hosts that can satisfy a group of requests.
    331         1. If the caching decorator doesn't pass in a list of matching hosts
    332            via the MEMOIZE_KEY this method will directly check the database for
    333            matching hosts.
    334         2. If all matching hosts are not leased for this request, the remaining
    335            hosts are returned to the caching decorator, to place in the cache.
    336 
    337         @param hosts_required: Number of hosts required to satisfy request.
    338         @param request: The request for hosts.
    339         @param is_acquire_min_duts: Boolean. Indicate whether this is to
    340                                     acquire minimum required duts, only used
    341                                     for stats purpose.
    342 
    343         @return: The list of excess matching hosts.
    344         """
    345         hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, [])
    346         if not hosts:
    347             hosts = self.host_query_manager.find_hosts(
    348                             request.deps, request.acls)
    349 
    350         # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)->
    351         # |   -leased_hosts-  |   -stale cached hosts-  | -unleased matching- |
    352         # --used this request---used by earlier request----------unused--------
    353         hosts = self._sort_hosts_by_preferred_deps(
    354                 hosts, request.preferred_deps)
    355         attempt_lease_hosts = min(len(hosts), hosts_required)
    356         leased_host_count = 0
    357         if attempt_lease_hosts:
    358             leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts])
    359             if leased_hosts:
    360                 self.update_response_map(request, leased_hosts, append=True)
    361 
    362             # [:attempt_leased_hosts] - leased_hosts will include hosts that
    363             # failed leasing, most likely because they're already leased, so
    364             # don't cache them again.
    365             leased_host_count = len(leased_hosts)
    366             failed_leasing = attempt_lease_hosts - leased_host_count
    367             if failed_leasing > 0:
    368                 # For the sake of simplicity this calculation assumes that
    369                 # leasing only fails if there's a stale cached host already
    370                 # leased by a previous request, ergo, we can only get here
    371                 # through a cache hit.
    372                 line_length = len(hosts)
    373                 self.cache.stale_entries.append(
    374                         (float(failed_leasing)/line_length) * 100)
    375             self.leased_hosts_count += leased_host_count
    376         if is_acquire_min_duts:
    377             self.request_accountant.record_acquire_min_duts(
    378                     request, hosts_required, leased_host_count)
    379         self.unsatisfied_requests += max(hosts_required - leased_host_count, 0)
    380         # Cache the unleased matching hosts against the request.
    381         return hosts[attempt_lease_hosts:]
    382 
    383 
    384     @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_acquire_hosts')
    385     def batch_acquire_hosts(self, host_requests):
    386         """Acquire hosts for a list of requests.
    387 
    388         The act of acquisition involves finding and leasing a set of hosts
    389         that match the parameters of a request. Each acquired host is added
    390         to the response_map dictionary as an RDBServerHostWrapper.
    391 
    392         @param host_requests: A list of requests to acquire hosts.
    393         """
    394         distinct_requests = 0
    395 
    396         logging.debug('Processing %s host acquisition requests',
    397                       len(host_requests))
    398         metrics.Gauge('chromeos/autotest/scheduler/pending_host_acq_requests'
    399                       ).set(len(host_requests))
    400 
    401         self.request_accountant = rdb_utils.RequestAccountant(host_requests)
    402         # First pass tries to satisfy min_duts for each suite.
    403         for request in self.request_accountant.requests:
    404             to_acquire = self.request_accountant.get_min_duts(request)
    405             if to_acquire > 0:
    406                 self._acquire_hosts(request, to_acquire,
    407                                     is_acquire_min_duts=True)
    408             distinct_requests += 1
    409 
    410         # Second pass tries to allocate duts to the rest unsatisfied requests.
    411         for request in self.request_accountant.requests:
    412             to_acquire = self.request_accountant.get_duts(request)
    413             if to_acquire > 0:
    414                 self._acquire_hosts(request, to_acquire,
    415                                     is_acquire_min_duts=False)
    416 
    417         self.cache.record_stats()
    418         logging.debug('Host acquisition stats: distinct requests: %s, leased '
    419                       'hosts: %s, unsatisfied requests: %s', distinct_requests,
    420                       self.leased_hosts_count, self.unsatisfied_requests)
    421 
    422 
    423     @metrics.SecondsTimerDecorator(_rdb_timer_name % 'batch_validate_hosts')
    424     def batch_validate_hosts(self, requests):
    425         """Validate requests with hosts.
    426 
    427         Reserve all hosts, check each one for validity and discard invalid
    428         request-host pairings. Lease the remaining hsots.
    429 
    430         @param requests: A list of requests to validate.
    431 
    432         @raises RDBException: If multiple hosts or the wrong host is returned
    433             for a response.
    434         """
    435         # The following cases are possible for frontend requests:
    436         # 1. Multiple requests for 1 host, with different acls/deps/priority:
    437         #    These form distinct requests because they hash differently.
    438         #    The response map will contain entries like: {r1: h1, r2: h1}
    439         #    after the batch_get_hosts call. There are 2 sub-cases:
    440         #        a. Same deps/acls, different priority:
    441         #           Since we sort the requests based on priority, the
    442         #           higher priority request r1, will lease h1. The
    443         #           validation of r2, h1 will fail because of the r1 lease.
    444         #        b. Different deps/acls, only one of which matches the host:
    445         #           The matching request will lease h1. The other host
    446         #           pairing will get dropped from the response map.
    447         # 2. Multiple requests with the same acls/deps/priority and 1 host:
    448         #    These all have the same request hash, so the response map will
    449         #    contain: {r: h}, regardless of the number of r's. If this is not
    450         #    a valid host assignment it will get dropped from the response.
    451         self.batch_get_hosts(set(requests))
    452         for request in sorted(self.response_map.keys(),
    453                 key=lambda request: request.priority, reverse=True):
    454             hosts = self.response_map[request]
    455             if len(hosts) > 1:
    456                 raise rdb_utils.RDBException('Got multiple hosts for a single '
    457                         'request. Hosts: %s, request %s.' % (hosts, request))
    458             # Job-shard is 1:1 mapping. Because a job can only belongs
    459             # to one shard, or belongs to master, we disallow frontend job
    460             # that spans hosts on and off shards or across multiple shards,
    461             # which would otherwise break the 1:1 mapping.
    462             # As such, on master, if a request asks for multiple hosts and
    463             # if any host is found on shard, we assume other requested hosts
    464             # would also be on the same shard.  We can safely drop this request.
    465             ignore_request = _is_master and any(
    466                     [host.shard_id for host in hosts])
    467             if (not ignore_request and
    468                     (self.valid_host_assignment(request, hosts[0]) and
    469                         self.lease_hosts(hosts))):
    470                 continue
    471             del self.response_map[request]
    472             logging.warning('Request %s was not able to lease host %s',
    473                             request, hosts[0])
    474 
    475 
    476 # Request dispatchers: Create the appropriate request handler, send a list
    477 # of requests to one of its methods. The corresponding request handler in
    478 # rdb_lib must understand how to match each request with a response from a
    479 # dispatcher, the easiest way to achieve this is to returned the response_map
    480 # attribute of the request handler, after making the appropriate requests.
    481 def get_hosts(host_requests):
    482     """Get host information about the requested hosts.
    483 
    484     @param host_requests: A list of requests as defined in BaseHostRequest.
    485     @return: A dictionary mapping each request to a list of hosts.
    486     """
    487     rdb_handler = BaseHostRequestHandler()
    488     rdb_handler.batch_get_hosts(host_requests)
    489     return rdb_handler.get_response()
    490 
    491 
    492 def update_hosts(update_requests):
    493     """Update hosts.
    494 
    495     @param update_requests: A list of updates to host tables
    496         as defined in UpdateHostRequest.
    497     """
    498     rdb_handler = BaseHostRequestHandler()
    499     rdb_handler.update_hosts(update_requests)
    500     return rdb_handler.get_response()
    501 
    502 
    503 def rdb_host_request_dispatcher(host_requests):
    504     """Dispatcher for all host acquisition queries.
    505 
    506     @param host_requests: A list of requests for acquiring hosts, as defined in
    507         AcquireHostRequest.
    508     @return: A dictionary mapping each request to a list of hosts, or
    509         an empty list if none could satisfy the request. Eg:
    510         {AcquireHostRequest.template: [host_info_dictionaries]}
    511     """
    512     validation_requests = []
    513     require_hosts_requests = []
    514 
    515     # Validation requests are made by a job scheduled against a specific host
    516     # specific host (eg: through the frontend) and only require the rdb to
    517     # match the parameters of the host against the request. Acquisition
    518     # requests are made by jobs that need hosts (eg: suites) and the rdb needs
    519     # to find hosts matching the parameters of the request.
    520     for request in host_requests:
    521         if request.host_id:
    522             validation_requests.append(request)
    523         else:
    524             require_hosts_requests.append(request)
    525 
    526     rdb_handler = AvailableHostRequestHandler()
    527     rdb_handler.batch_validate_hosts(validation_requests)
    528     rdb_handler.batch_acquire_hosts(require_hosts_requests)
    529     return rdb_handler.get_response()
    530