1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Rdb server module. 6 """ 7 8 import logging 9 10 import common 11 12 from django.core import exceptions as django_exceptions 13 from django.db.models import fields 14 from django.db.models import Q 15 from autotest_lib.client.common_lib.cros.graphite import autotest_stats 16 from autotest_lib.frontend.afe import models 17 from autotest_lib.scheduler import rdb_cache_manager 18 from autotest_lib.scheduler import rdb_hosts 19 from autotest_lib.scheduler import rdb_requests 20 from autotest_lib.scheduler import rdb_utils 21 from autotest_lib.server import utils 22 23 24 _timer = autotest_stats.Timer(rdb_utils.RDB_STATS_KEY) 25 _is_master = not utils.is_shard() 26 27 28 # Qeury managers: Provide a layer of abstraction over the database by 29 # encapsulating common query patterns used by the rdb. 30 class BaseHostQueryManager(object): 31 """Base manager for host queries on all hosts. 32 """ 33 34 host_objects = models.Host.objects 35 36 37 def update_hosts(self, host_ids, **kwargs): 38 """Update fields on a hosts. 39 40 @param host_ids: A list of ids of hosts to update. 41 @param kwargs: A key value dictionary corresponding to column, value 42 in the host database. 43 """ 44 self.host_objects.filter(id__in=host_ids).update(**kwargs) 45 46 47 @rdb_hosts.return_rdb_host 48 def get_hosts(self, ids): 49 """Get host objects for the given ids. 50 51 @param ids: The ids for which we need host objects. 52 53 @returns: A list of RDBServerHostWrapper objects, ordered by host_id. 54 """ 55 return self.host_objects.filter(id__in=ids).order_by('id') 56 57 58 @rdb_hosts.return_rdb_host 59 def find_hosts(self, deps, acls): 60 """Finds valid hosts matching deps, acls. 61 62 @param deps: A list of dependencies to match. 63 @param acls: A list of acls, at least one of which must coincide with 64 an acl group the chosen host is in. 65 66 @return: A set of matching hosts available. 67 """ 68 hosts_available = self.host_objects.filter(invalid=0) 69 queries = [Q(labels__id=dep) for dep in deps] 70 queries += [Q(aclgroup__id__in=acls)] 71 for query in queries: 72 hosts_available = hosts_available.filter(query) 73 return set(hosts_available) 74 75 76 class AvailableHostQueryManager(BaseHostQueryManager): 77 """Query manager for requests on un-leased, un-locked hosts. 78 """ 79 80 host_objects = models.Host.leased_objects 81 82 83 # Request Handlers: Used in conjunction with requests in rdb_utils, these 84 # handlers acquire hosts for a request and record the acquisition in 85 # an response_map dictionary keyed on the request itself, with the host/hosts 86 # as values. 87 class BaseHostRequestHandler(object): 88 """Handler for requests related to hosts, leased or unleased. 89 90 This class is only capable of blindly returning host information. 91 """ 92 93 def __init__(self): 94 self.host_query_manager = BaseHostQueryManager() 95 self.response_map = {} 96 97 98 def update_response_map(self, request, response, append=False): 99 """Record a response for a request. 100 101 The response_map only contains requests that were either satisfied, or 102 that ran into an exception. Often this translates to reserving hosts 103 against a request. If the rdb hit an exception processing a request, the 104 exception gets recorded in the map for the client to reraise. 105 106 @param response: A response for the request. 107 @param request: The request that has reserved these hosts. 108 @param append: Boolean, whether to append new hosts in 109 |response| for existing request. 110 Will not append if existing response is 111 a list of exceptions. 112 113 @raises RDBException: If an empty values is added to the map. 114 """ 115 if not response: 116 raise rdb_utils.RDBException('response_map dict can only contain ' 117 'valid responses. Request %s, response %s is invalid.' % 118 (request, response)) 119 exist_response = self.response_map.setdefault(request, []) 120 if exist_response and not append: 121 raise rdb_utils.RDBException('Request %s already has response %s ' 122 'the rdb cannot return multiple ' 123 'responses for the same request.' % 124 (request, response)) 125 if exist_response and append and not isinstance( 126 exist_response[0], rdb_hosts.RDBHost): 127 # Do not append if existing response contains exception. 128 return 129 exist_response.extend(response) 130 131 132 def _check_response_map(self): 133 """Verify that we never give the same host to different requests. 134 135 @raises RDBException: If the same host is assigned to multiple requests. 136 """ 137 unique_hosts = set([]) 138 for request, response in self.response_map.iteritems(): 139 # Each value in the response map can only either be a list of 140 # RDBHosts or a list of RDBExceptions, not a mix of both. 141 if isinstance(response[0], rdb_hosts.RDBHost): 142 if any([host in unique_hosts for host in response]): 143 raise rdb_utils.RDBException( 144 'Assigning the same host to multiple requests. New ' 145 'hosts %s, request %s, response_map: %s' % 146 (response, request, self.response_map)) 147 else: 148 unique_hosts = unique_hosts.union(response) 149 150 151 def _record_exceptions(self, request, exceptions): 152 """Record a list of exceptions for a request. 153 154 @param request: The request for which the exceptions were hit. 155 @param exceptions: The exceptions hit while processing the request. 156 """ 157 rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions] 158 self.update_response_map(request, rdb_exceptions) 159 160 161 def get_response(self): 162 """Convert all RDBServerHostWrapper objects to host info dictionaries. 163 164 @return: A dictionary mapping requests to a list of matching host_infos. 165 166 @raises RDBException: If the same host is assigned to multiple requests. 167 """ 168 self._check_response_map() 169 for request, response in self.response_map.iteritems(): 170 self.response_map[request] = [reply.wire_format() 171 for reply in response] 172 return self.response_map 173 174 175 def update_hosts(self, update_requests): 176 """Updates host tables with a payload. 177 178 @param update_requests: A list of update requests, as defined in 179 rdb_requests.UpdateHostRequest. 180 """ 181 # Last payload for a host_id wins in the case of conflicting requests. 182 unique_host_requests = {} 183 for request in update_requests: 184 if unique_host_requests.get(request.host_id): 185 unique_host_requests[request.host_id].update(request.payload) 186 else: 187 unique_host_requests[request.host_id] = request.payload 188 189 # Batch similar payloads so we can do them in one table scan. 190 similar_requests = {} 191 for host_id, payload in unique_host_requests.iteritems(): 192 similar_requests.setdefault(payload, []).append(host_id) 193 194 # If fields of the update don't match columns in the database, 195 # record the exception in the response map. This also means later 196 # updates will get applied even if previous updates fail. 197 for payload, hosts in similar_requests.iteritems(): 198 try: 199 response = self.host_query_manager.update_hosts(hosts, **payload) 200 except (django_exceptions.FieldError, 201 fields.FieldDoesNotExist, ValueError) as e: 202 for host in hosts: 203 # Since update requests have a consistent hash this will map 204 # to the same key as the original request. 205 request = rdb_requests.UpdateHostRequest( 206 host_id=host, payload=payload).get_request() 207 self._record_exceptions(request, [e]) 208 209 210 def batch_get_hosts(self, host_requests): 211 """Get hosts matching the requests. 212 213 This method does not acquire the hosts, i.e it reserves hosts against 214 requests leaving their leased state untouched. 215 216 @param host_requests: A list of requests, as defined in 217 rdb_utils.BaseHostRequest. 218 """ 219 host_ids = set([request.host_id for request in host_requests]) 220 host_map = {} 221 222 # This list will not contain available hosts if executed using 223 # an AvailableHostQueryManager. 224 for host in self.host_query_manager.get_hosts(host_ids): 225 host_map[host.id] = host 226 for request in host_requests: 227 if request.host_id in host_map: 228 self.update_response_map(request, [host_map[request.host_id]]) 229 else: 230 logging.warning('rdb could not get host for request: %s, it ' 231 'is already leased or locked', request) 232 233 234 class AvailableHostRequestHandler(BaseHostRequestHandler): 235 """Handler for requests related to available (unleased and unlocked) hosts. 236 237 This class is capable of acquiring or validating hosts for requests. 238 """ 239 240 241 def __init__(self): 242 self.host_query_manager = AvailableHostQueryManager() 243 self.cache = rdb_cache_manager.RDBHostCacheManager() 244 self.response_map = {} 245 self.unsatisfied_requests = 0 246 self.leased_hosts_count = 0 247 self.request_accountant = None 248 249 250 @_timer.decorate 251 def lease_hosts(self, hosts): 252 """Leases a list of hosts. 253 254 @param hosts: A list of RDBServerHostWrapper instances to lease. 255 256 @return: The list of RDBServerHostWrappers that were successfully 257 leased. 258 """ 259 #TODO(beeps): crbug.com/353183. 260 unleased_hosts = set(hosts) 261 leased_hosts = set([]) 262 for host in unleased_hosts: 263 try: 264 host.lease() 265 except rdb_utils.RDBException as e: 266 logging.error('Unable to lease host %s: %s', host.hostname, e) 267 else: 268 leased_hosts.add(host) 269 return list(leased_hosts) 270 271 272 @classmethod 273 def valid_host_assignment(cls, request, host): 274 """Check if a host, request pairing is valid. 275 276 @param request: The request to match against the host. 277 @param host: An RDBServerHostWrapper instance. 278 279 @return: True if the host, request assignment is valid. 280 281 @raises RDBException: If the request already has another host_ids 282 associated with it. 283 """ 284 if request.host_id and request.host_id != host.id: 285 raise rdb_utils.RDBException( 286 'Cannot assign a different host for request: %s, it ' 287 'already has one: %s ' % (request, host.id)) 288 289 # Getting all labels and acls might result in large queries, so 290 # bail early if the host is already leased. 291 if host.leased: 292 return False 293 # If a host is invalid it must be a one time host added to the 294 # afe specifically for this purpose, so it doesn't require acl checking. 295 acl_match = (request.acls.intersection(host.acls) or host.invalid) 296 label_match = (request.deps.intersection(host.labels) == request.deps) 297 return acl_match and label_match 298 299 300 @classmethod 301 def _sort_hosts_by_preferred_deps(cls, hosts, preferred_deps): 302 """Sort hosts in the order of how many preferred deps it has. 303 304 This allows rdb always choose the hosts with the most preferred deps 305 for a request. One important use case is including cros-version as 306 a preferred dependence. By choosing a host with the same cros-version, 307 we can save the time on provisioning it. Note this is not guaranteed 308 if preferred_deps contains other labels as well. 309 310 @param hosts: A list of hosts to sort. 311 @param preferred_deps: A list of deps that are preferred. 312 313 @return: A list of sorted hosts. 314 315 """ 316 hosts = sorted( 317 hosts, 318 key=lambda host: len(set(preferred_deps) & set(host.labels)), 319 reverse=True) 320 return hosts 321 322 323 @rdb_cache_manager.memoize_hosts 324 def _acquire_hosts(self, request, hosts_required, is_acquire_min_duts=False, 325 **kwargs): 326 """Acquire hosts for a group of similar requests. 327 328 Find and acquire hosts that can satisfy a group of requests. 329 1. If the caching decorator doesn't pass in a list of matching hosts 330 via the MEMOIZE_KEY this method will directly check the database for 331 matching hosts. 332 2. If all matching hosts are not leased for this request, the remaining 333 hosts are returned to the caching decorator, to place in the cache. 334 335 @param hosts_required: Number of hosts required to satisfy request. 336 @param request: The request for hosts. 337 @param is_acquire_min_duts: Boolean. Indicate whether this is to 338 acquire minimum required duts, only used 339 for stats purpose. 340 341 @return: The list of excess matching hosts. 342 """ 343 hosts = kwargs.get(rdb_cache_manager.MEMOIZE_KEY, []) 344 if not hosts: 345 hosts = self.host_query_manager.find_hosts( 346 request.deps, request.acls) 347 348 # <-----[:attempt_lease_hosts](evicted)--------> <-(returned, cached)-> 349 # | -leased_hosts- | -stale cached hosts- | -unleased matching- | 350 # --used this request---used by earlier request----------unused-------- 351 hosts = self._sort_hosts_by_preferred_deps( 352 hosts, request.preferred_deps) 353 attempt_lease_hosts = min(len(hosts), hosts_required) 354 leased_host_count = 0 355 if attempt_lease_hosts: 356 leased_hosts = self.lease_hosts(hosts[:attempt_lease_hosts]) 357 if leased_hosts: 358 self.update_response_map(request, leased_hosts, append=True) 359 360 # [:attempt_leased_hosts] - leased_hosts will include hosts that 361 # failed leasing, most likely because they're already leased, so 362 # don't cache them again. 363 leased_host_count = len(leased_hosts) 364 failed_leasing = attempt_lease_hosts - leased_host_count 365 if failed_leasing > 0: 366 # For the sake of simplicity this calculation assumes that 367 # leasing only fails if there's a stale cached host already 368 # leased by a previous request, ergo, we can only get here 369 # through a cache hit. 370 line_length = len(hosts) 371 self.cache.stale_entries.append( 372 (float(failed_leasing)/line_length) * 100) 373 self.leased_hosts_count += leased_host_count 374 if is_acquire_min_duts: 375 self.request_accountant.record_acquire_min_duts( 376 request, hosts_required, leased_host_count) 377 self.unsatisfied_requests += max(hosts_required - leased_host_count, 0) 378 # Cache the unleased matching hosts against the request. 379 return hosts[attempt_lease_hosts:] 380 381 382 @_timer.decorate 383 def batch_acquire_hosts(self, host_requests): 384 """Acquire hosts for a list of requests. 385 386 The act of acquisition involves finding and leasing a set of hosts 387 that match the parameters of a request. Each acquired host is added 388 to the response_map dictionary as an RDBServerHostWrapper. 389 390 @param host_requests: A list of requests to acquire hosts. 391 """ 392 distinct_requests = 0 393 394 logging.debug('Processing %s host acquisition requests', 395 len(host_requests)) 396 397 self.request_accountant = rdb_utils.RequestAccountant(host_requests) 398 # First pass tries to satisfy min_duts for each suite. 399 for request in self.request_accountant.requests: 400 to_acquire = self.request_accountant.get_min_duts(request) 401 if to_acquire > 0: 402 self._acquire_hosts(request, to_acquire, 403 is_acquire_min_duts=True) 404 distinct_requests += 1 405 406 # Second pass tries to allocate duts to the rest unsatisfied requests. 407 for request in self.request_accountant.requests: 408 to_acquire = self.request_accountant.get_duts(request) 409 if to_acquire > 0: 410 self._acquire_hosts(request, to_acquire, 411 is_acquire_min_duts=False) 412 413 self.cache.record_stats() 414 logging.debug('Host acquisition stats: distinct requests: %s, leased ' 415 'hosts: %s, unsatisfied requests: %s', distinct_requests, 416 self.leased_hosts_count, self.unsatisfied_requests) 417 autotest_stats.Gauge(rdb_utils.RDB_STATS_KEY).send( 418 'leased_hosts', self.leased_hosts_count) 419 autotest_stats.Gauge(rdb_utils.RDB_STATS_KEY).send( 420 'unsatisfied_requests', self.unsatisfied_requests) 421 422 423 @_timer.decorate 424 def batch_validate_hosts(self, requests): 425 """Validate requests with hosts. 426 427 Reserve all hosts, check each one for validity and discard invalid 428 request-host pairings. Lease the remaining hsots. 429 430 @param requests: A list of requests to validate. 431 432 @raises RDBException: If multiple hosts or the wrong host is returned 433 for a response. 434 """ 435 # The following cases are possible for frontend requests: 436 # 1. Multiple requests for 1 host, with different acls/deps/priority: 437 # These form distinct requests because they hash differently. 438 # The response map will contain entries like: {r1: h1, r2: h1} 439 # after the batch_get_hosts call. There are 2 sub-cases: 440 # a. Same deps/acls, different priority: 441 # Since we sort the requests based on priority, the 442 # higher priority request r1, will lease h1. The 443 # validation of r2, h1 will fail because of the r1 lease. 444 # b. Different deps/acls, only one of which matches the host: 445 # The matching request will lease h1. The other host 446 # pairing will get dropped from the response map. 447 # 2. Multiple requests with the same acls/deps/priority and 1 host: 448 # These all have the same request hash, so the response map will 449 # contain: {r: h}, regardless of the number of r's. If this is not 450 # a valid host assignment it will get dropped from the response. 451 self.batch_get_hosts(set(requests)) 452 for request in sorted(self.response_map.keys(), 453 key=lambda request: request.priority, reverse=True): 454 hosts = self.response_map[request] 455 if len(hosts) > 1: 456 raise rdb_utils.RDBException('Got multiple hosts for a single ' 457 'request. Hosts: %s, request %s.' % (hosts, request)) 458 # Job-shard is 1:1 mapping. Because a job can only belongs 459 # to one shard, or belongs to master, we disallow frontend job 460 # that spans hosts on and off shards or across multiple shards, 461 # which would otherwise break the 1:1 mapping. 462 # As such, on master, if a request asks for multiple hosts and 463 # if any host is found on shard, we assume other requested hosts 464 # would also be on the same shard. We can safely drop this request. 465 ignore_request = _is_master and any( 466 [host.shard_id for host in hosts]) 467 if (not ignore_request and 468 (self.valid_host_assignment(request, hosts[0]) and 469 self.lease_hosts(hosts))): 470 continue 471 del self.response_map[request] 472 logging.warning('Request %s was not able to lease host %s', 473 request, hosts[0]) 474 475 476 # Request dispatchers: Create the appropriate request handler, send a list 477 # of requests to one of its methods. The corresponding request handler in 478 # rdb_lib must understand how to match each request with a response from a 479 # dispatcher, the easiest way to achieve this is to returned the response_map 480 # attribute of the request handler, after making the appropriate requests. 481 def get_hosts(host_requests): 482 """Get host information about the requested hosts. 483 484 @param host_requests: A list of requests as defined in BaseHostRequest. 485 @return: A dictionary mapping each request to a list of hosts. 486 """ 487 rdb_handler = BaseHostRequestHandler() 488 rdb_handler.batch_get_hosts(host_requests) 489 return rdb_handler.get_response() 490 491 492 def update_hosts(update_requests): 493 """Update hosts. 494 495 @param update_requests: A list of updates to host tables 496 as defined in UpdateHostRequest. 497 """ 498 rdb_handler = BaseHostRequestHandler() 499 rdb_handler.update_hosts(update_requests) 500 return rdb_handler.get_response() 501 502 503 def rdb_host_request_dispatcher(host_requests): 504 """Dispatcher for all host acquisition queries. 505 506 @param host_requests: A list of requests for acquiring hosts, as defined in 507 AcquireHostRequest. 508 @return: A dictionary mapping each request to a list of hosts, or 509 an empty list if none could satisfy the request. Eg: 510 {AcquireHostRequest.template: [host_info_dictionaries]} 511 """ 512 validation_requests = [] 513 require_hosts_requests = [] 514 515 # Validation requests are made by a job scheduled against a specific host 516 # specific host (eg: through the frontend) and only require the rdb to 517 # match the parameters of the host against the request. Acquisition 518 # requests are made by jobs that need hosts (eg: suites) and the rdb needs 519 # to find hosts matching the parameters of the request. 520 for request in host_requests: 521 if request.host_id: 522 validation_requests.append(request) 523 else: 524 require_hosts_requests.append(request) 525 526 rdb_handler = AvailableHostRequestHandler() 527 rdb_handler.batch_validate_hosts(validation_requests) 528 rdb_handler.batch_acquire_hosts(require_hosts_requests) 529 return rdb_handler.get_response() 530