Home | History | Annotate | Download | only in scheduler
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import unittest
      6 
      7 import common
      8 from autotest_lib.client.common_lib.test_utils import unittest
      9 from autotest_lib.frontend import setup_django_environment
     10 from autotest_lib.frontend.afe import frontend_test_utils
     11 from autotest_lib.scheduler import rdb
     12 from autotest_lib.scheduler import rdb_cache_manager
     13 from autotest_lib.scheduler import rdb_lib
     14 from autotest_lib.scheduler import rdb_testing_utils as test_utils
     15 from autotest_lib.scheduler import rdb_utils
     16 
     17 
     18 def get_line_with_labels(required_labels, cache_lines):
     19     """Get the cache line with the hosts that match given labels.
     20 
     21     Confirm that all hosts have matching labels within a line,
     22     then return the lines with the requested labels. There can
     23     be more than one, since we use acls in the cache key.
     24 
     25     @param labels: A list of label names.
     26     @cache_lines: A list of cache lines to look through.
     27 
     28     @return: A list of the cache lines with the requested labels.
     29     """
     30     label_lines = []
     31     for line in cache_lines:
     32         if not line:
     33             continue
     34         labels = list(line)[0].labels.get_label_names()
     35         if any(host.labels.get_label_names() != labels for host in line):
     36             raise AssertionError('Mismatch in deps within a cache line')
     37         if required_labels == labels:
     38             label_lines.append(line)
     39     return label_lines
     40 
     41 
     42 def get_hosts_for_request(
     43         response_map, deps=test_utils.DEFAULT_DEPS,
     44         acls=test_utils.DEFAULT_ACLS, priority=0, parent_job_id=0, **kwargs):
     45     """Get the hosts for a request matching kwargs from the response map.
     46 
     47     @param response_map: A response map from an rdb request_handler.
     48     """
     49     return response_map[
     50             test_utils.AbstractBaseRDBTester.get_request(
     51                     deps, acls, priority, parent_job_id)]
     52 
     53 
     54 class RDBCacheTest(test_utils.AbstractBaseRDBTester, unittest.TestCase):
     55     """Unittests for RDBHost objects."""
     56 
     57 
     58     def testCachingBasic(self):
     59         """Test that different requests will hit the database."""
     60 
     61         # r1 should cache h2 and use h1; r2 should cach [] and use h2
     62         # at the end the cache should contain one stale line, with
     63         # h2 in it, and one empty line since r2 acquired h2.
     64         default_params = test_utils.get_default_job_params()
     65         self.create_job(**default_params)
     66         default_params['deps'] = default_params['deps'][0]
     67         self.create_job(**default_params)
     68         for i in range(0, 2):
     69             self.db_helper.create_host(
     70                     'h%s'%i, **test_utils.get_default_host_params())
     71         queue_entries = self._dispatcher._refresh_pending_queue_entries()
     72 
     73         def local_get_response(self):
     74             """ Local rdb.get_response handler."""
     75             requests = self.response_map.keys()
     76             if not (self.cache.hits == 0 and self.cache.misses == 2):
     77                 raise AssertionError('Neither request should have hit the '
     78                         'cache, but both should have inserted into it.')
     79 
     80             lines = get_line_with_labels(
     81                     test_utils.DEFAULT_DEPS,
     82                     self.cache._cache_backend._cache.values())
     83             if len(lines) > 1:
     84                 raise AssertionError('Caching was too agressive, '
     85                         'the second request should not have cached anything '
     86                         'because it used the one free host.')
     87 
     88             cached_host = lines[0].pop()
     89             default_params = test_utils.get_default_job_params()
     90             job1_host = get_hosts_for_request(
     91                     self.response_map, **default_params)[0]
     92             default_params['deps'] = default_params['deps'][0]
     93             job2_host = get_hosts_for_request(
     94                     self.response_map, **default_params)[0]
     95             if (job2_host.hostname == job1_host.hostname or
     96                 cached_host.hostname not in
     97                 [job2_host.hostname, job1_host.hostname]):
     98                 raise AssertionError('Wrong host cached %s. The first job '
     99                         'should have cached the host used by the second.' %
    100                         cached_host.hostname)
    101 
    102             # Shouldn't be able to lease this host since r2 used it.
    103             try:
    104                 cached_host.lease()
    105             except rdb_utils.RDBException:
    106                 pass
    107             else:
    108                 raise AssertionError('Was able to lease a stale host. The '
    109                         'second request should have leased it.')
    110             return test_utils.wire_format_response_map(self.response_map)
    111 
    112         self.god.stub_with(rdb.AvailableHostRequestHandler,
    113                            'get_response', local_get_response)
    114         self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
    115 
    116 
    117     def testCachingPriority(self):
    118         """Test requests with the same deps but different priorities."""
    119         # All 3 jobs should find hosts, and there should be one host left
    120         # behind in the cache. The first job will take one host and cache 3,
    121         # the second will take one and cache 2, while the last will take one.
    122         # The remaining host in the cache should not be stale.
    123         default_job_params = test_utils.get_default_job_params()
    124         for i in range(0, 3):
    125             default_job_params['priority'] = i
    126             job = self.create_job(**default_job_params)
    127 
    128         default_host_params = test_utils.get_default_host_params()
    129         for i in range(0, 4):
    130             self.db_helper.create_host('h%s'%i, **default_host_params)
    131         queue_entries = self._dispatcher._refresh_pending_queue_entries()
    132 
    133         def local_get_response(self):
    134             """ Local rdb.get_response handler."""
    135             if not (self.cache.hits == 2 and self.cache.misses ==1):
    136                 raise AssertionError('The first request should have populated '
    137                         'the cache for the others.')
    138 
    139             default_job_params = test_utils.get_default_job_params()
    140             lines = get_line_with_labels(
    141                     default_job_params['deps'],
    142                     self.cache._cache_backend._cache.values())
    143             if len(lines) > 1:
    144                 raise AssertionError('Should only be one cache line left.')
    145 
    146             # Make sure that all the jobs got different hosts, and that
    147             # the host cached isn't being used by a job.
    148             cached_host = lines[0].pop()
    149             cached_host.lease()
    150 
    151             job_hosts = []
    152             default_job_params = test_utils.get_default_job_params()
    153             for i in range(0, 3):
    154                 default_job_params['priority'] = i
    155                 hosts = get_hosts_for_request(self.response_map,
    156                                               **default_job_params)
    157                 assert(len(hosts) == 1)
    158                 host = hosts[0]
    159                 assert(host.id not in job_hosts and cached_host.id != host.id)
    160             return test_utils.wire_format_response_map(self.response_map)
    161 
    162         self.god.stub_with(rdb.AvailableHostRequestHandler,
    163                            'get_response', local_get_response)
    164         self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
    165 
    166 
    167     def testCachingEmptyList(self):
    168         """Test that the 'no available hosts' condition isn't a cache miss."""
    169         default_params = test_utils.get_default_job_params()
    170         for i in range(0 ,3):
    171             default_params['parent_job_id'] = i
    172             self.create_job(**default_params)
    173 
    174         default_host_params = test_utils.get_default_host_params()
    175         self.db_helper.create_host('h1', **default_host_params)
    176 
    177         def local_get_response(self):
    178             """ Local rdb.get_response handler."""
    179             if not (self.cache.misses == 1 and self.cache.hits == 2):
    180                 raise AssertionError('The first request should have taken h1 '
    181                         'while the other 2 should have hit the cache.')
    182 
    183             request = test_utils.AbstractBaseRDBTester.get_request(
    184                     test_utils.DEFAULT_DEPS, test_utils.DEFAULT_ACLS)
    185             key = self.cache.get_key(deps=request.deps, acls=request.acls)
    186             if self.cache._cache_backend.get(key) != []:
    187                 raise AssertionError('A request with no hosts does not get '
    188                         'cached corrrectly.')
    189             return test_utils.wire_format_response_map(self.response_map)
    190 
    191         queue_entries = self._dispatcher._refresh_pending_queue_entries()
    192         self.god.stub_with(rdb.AvailableHostRequestHandler,
    193                            'get_response', local_get_response)
    194         self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
    195 
    196 
    197     def testStaleCacheLine(self):
    198         """Test that a stale cache line doesn't satisfy a request."""
    199 
    200         # Create 3 jobs, all of which can use the same hosts. The first
    201         # will cache the only remaining host after taking one, the second
    202         # will also take a host, but not cache anything, while the third
    203         # will try to use the host cached by the first job but fail because
    204         # it is already leased.
    205         default_params = test_utils.get_default_job_params()
    206         default_params['priority'] = 2
    207         self.create_job(**default_params)
    208         default_params['priority'] = 1
    209         default_params['deps'] = default_params['deps'][0]
    210         self.create_job(**default_params)
    211         default_params['priority'] = 0
    212         default_params['deps'] = test_utils.DEFAULT_DEPS
    213         self.create_job(**default_params)
    214 
    215         host_1 = self.db_helper.create_host(
    216                 'h1', **test_utils.get_default_host_params())
    217         host_2 = self.db_helper.create_host(
    218                 'h2', **test_utils.get_default_host_params())
    219         queue_entries = self._dispatcher._refresh_pending_queue_entries()
    220 
    221         def local_get_response(self):
    222             """ Local rdb.get_response handler."""
    223             default_job_params = test_utils.get_default_job_params()
    224 
    225             # Confirm that even though the third job hit the cache, it wasn't
    226             # able to use the cached host because it was already leased, and
    227             # that it doesn't add it back to the cache.
    228             assert(self.cache.misses == 2 and self.cache.hits == 1)
    229             lines = get_line_with_labels(
    230                         default_job_params['deps'],
    231                         self.cache._cache_backend._cache.values())
    232             assert(len(lines) == 0)
    233             assert(int(self.cache.mean_staleness()) == 100)
    234             return test_utils.wire_format_response_map(self.response_map)
    235 
    236         self.god.stub_with(rdb.AvailableHostRequestHandler,
    237                            'get_response', local_get_response)
    238         acquired_hosts = list(rdb_lib.acquire_hosts(queue_entries))
    239         self.assertTrue(acquired_hosts[0].id == host_1.id and
    240                         acquired_hosts[1].id == host_2.id and
    241                         acquired_hosts[2] is None)
    242 
    243 
    244     def testCacheAPI(self):
    245         """Test the cache managers api."""
    246         cache = rdb_cache_manager.RDBHostCacheManager()
    247         key = cache.get_key(
    248                 deps=test_utils.DEFAULT_DEPS, acls=test_utils.DEFAULT_ACLS)
    249 
    250         # Cannot set None, it's reserved for cache expiration.
    251         self.assertRaises(rdb_utils.RDBException, cache.set_line, *(key, None))
    252 
    253         # Setting an empty list indicates a query with no results.
    254         cache.set_line(key, [])
    255         self.assertTrue(cache.get_line(key) == [])
    256 
    257         # Getting a value will delete the key, leading to a miss on subsequent
    258         # gets before a set.
    259         self.assertRaises(rdb_utils.CacheMiss, cache.get_line, *(key,))
    260 
    261         # Caching a leased host is just a waste of cache space.
    262         host = test_utils.FakeHost(
    263                 'h1', 1, labels=test_utils.DEFAULT_DEPS,
    264                 acls=test_utils.DEFAULT_ACLS, leased=1)
    265         cache.set_line(key, [host])
    266         self.assertRaises(
    267                 rdb_utils.CacheMiss, cache.get_line, *(key,))
    268 
    269         # Retrieving an unleased cached host shouldn't mutate it, even if the
    270         # key is reconstructed.
    271         host.leased=0
    272         cache.set_line(cache.get_key(host.labels, host.acls), [host])
    273         self.assertTrue(
    274                 cache.get_line(cache.get_key(host.labels, host.acls)) == [host])
    275 
    276         # Caching different hosts under the same key isn't allowed.
    277         different_host = test_utils.FakeHost(
    278                 'h2', 2, labels=[test_utils.DEFAULT_DEPS[0]],
    279                 acls=test_utils.DEFAULT_ACLS, leased=0)
    280         cache.set_line(key, [host, different_host])
    281         self.assertRaises(
    282                 rdb_utils.CacheMiss, cache.get_line, *(key,))
    283 
    284         # Caching hosts with the same deps but different acls under the
    285         # same key is allowed, as long as the acls match the key.
    286         different_host = test_utils.FakeHost(
    287                 'h2', 2, labels=test_utils.DEFAULT_DEPS,
    288                 acls=[test_utils.DEFAULT_ACLS[1]], leased=0)
    289         cache.set_line(key, [host, different_host])
    290         self.assertTrue(set(cache.get_line(key)) == set([host, different_host]))
    291 
    292         # Make sure we don't divide by zero while calculating hit ratio
    293         cache.misses = 0
    294         cache.hits = 0
    295         self.assertTrue(cache.hit_ratio() == 0)
    296         cache.hits = 1
    297         hit_ratio = cache.hit_ratio()
    298         self.assertTrue(type(hit_ratio) == float and hit_ratio == 100)
    299 
    300 
    301     def testDummyCache(self):
    302         """Test that the dummy cache doesn't save hosts."""
    303 
    304         # Create 2 jobs and 3 hosts. Both the jobs should not hit the cache,
    305         # nor should they cache anything, but both jobs should acquire hosts.
    306         default_params = test_utils.get_default_job_params()
    307         default_host_params = test_utils.get_default_host_params()
    308         for i in range(0, 2):
    309             default_params['parent_job_id'] = i
    310             self.create_job(**default_params)
    311             self.db_helper.create_host('h%s'%i, **default_host_params)
    312         self.db_helper.create_host('h2', **default_host_params)
    313         queue_entries = self._dispatcher._refresh_pending_queue_entries()
    314         self.god.stub_with(
    315                 rdb_cache_manager.RDBHostCacheManager, 'use_cache', False)
    316 
    317         def local_get_response(self):
    318             """ Local rdb.get_response handler."""
    319             requests = self.response_map.keys()
    320             if not (self.cache.hits == 0 and self.cache.misses == 2):
    321                 raise AssertionError('Neither request should have hit the '
    322                         'cache, but both should have inserted into it.')
    323 
    324             # Make sure both requests actually found a host
    325             default_params = test_utils.get_default_job_params()
    326             job1_host = get_hosts_for_request(
    327                     self.response_map, **default_params)[0]
    328             default_params['parent_job_id'] = 1
    329             job2_host = get_hosts_for_request(
    330                     self.response_map, **default_params)[0]
    331             if (not job1_host or not job2_host or
    332                 job2_host.hostname == job1_host.hostname):
    333                 raise AssertionError('Excected acquisitions did not occur.')
    334 
    335             assert(hasattr(self.cache._cache_backend, '_cache') == False)
    336             return test_utils.wire_format_response_map(self.response_map)
    337 
    338         self.god.stub_with(rdb.AvailableHostRequestHandler,
    339                            'get_response', local_get_response)
    340         self.check_hosts(rdb_lib.acquire_hosts(queue_entries))
    341 
    342