Home | History | Annotate | Download | only in afe
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Unit tests for frontend/afe/site_rpc_interface.py."""
      8 
      9 
     10 import __builtin__
     11 import datetime
     12 import mox
     13 import StringIO
     14 import unittest
     15 
     16 import common
     17 
     18 from autotest_lib.frontend import setup_django_environment
     19 from autotest_lib.frontend.afe import frontend_test_utils
     20 from autotest_lib.frontend.afe import models, model_logic, rpc_utils
     21 from autotest_lib.client.common_lib import control_data, error
     22 from autotest_lib.client.common_lib import global_config
     23 from autotest_lib.client.common_lib import lsbrelease_utils
     24 from autotest_lib.client.common_lib import priorities
     25 from autotest_lib.client.common_lib.cros import dev_server
     26 from autotest_lib.frontend.afe import rpc_interface, site_rpc_interface
     27 from autotest_lib.server import utils
     28 from autotest_lib.server.cros.dynamic_suite import control_file_getter
     29 from autotest_lib.server.cros.dynamic_suite import constants
     30 from autotest_lib.server.hosts import moblab_host
     31 
     32 
     33 CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT
     34 SERVER = control_data.CONTROL_TYPE_NAMES.SERVER
     35 
     36 
     37 class SiteRpcInterfaceTest(mox.MoxTestBase,
     38                            frontend_test_utils.FrontendTestMixin):
     39     """Unit tests for functions in site_rpc_interface.py.
     40 
     41     @var _NAME: fake suite name.
     42     @var _BOARD: fake board to reimage.
     43     @var _BUILD: fake build with which to reimage.
     44     @var _PRIORITY: fake priority with which to reimage.
     45     """
     46     _NAME = 'name'
     47     _BOARD = 'link'
     48     _BUILD = 'link-release/R36-5812.0.0'
     49     _PRIORITY = priorities.Priority.DEFAULT
     50     _TIMEOUT = 24
     51 
     52 
     53     def setUp(self):
     54         super(SiteRpcInterfaceTest, self).setUp()
     55         self._SUITE_NAME = site_rpc_interface.canonicalize_suite_name(
     56             self._NAME)
     57         self.dev_server = self.mox.CreateMock(dev_server.ImageServer)
     58         self._frontend_common_setup(fill_data=False)
     59 
     60 
     61     def tearDown(self):
     62         self._frontend_common_teardown()
     63 
     64 
     65     def _setupDevserver(self):
     66         self.mox.StubOutClassWithMocks(dev_server, 'ImageServer')
     67         dev_server.ImageServer.resolve(self._BUILD).AndReturn(self.dev_server)
     68 
     69 
     70     def _mockDevServerGetter(self, get_control_file=True):
     71         self._setupDevserver()
     72         if get_control_file:
     73           self.getter = self.mox.CreateMock(
     74               control_file_getter.DevServerGetter)
     75           self.mox.StubOutWithMock(control_file_getter.DevServerGetter,
     76                                    'create')
     77           control_file_getter.DevServerGetter.create(
     78               mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter)
     79 
     80 
     81     def _mockRpcUtils(self, to_return, control_file_substring=''):
     82         """Fake out the autotest rpc_utils module with a mockable class.
     83 
     84         @param to_return: the value that rpc_utils.create_job_common() should
     85                           be mocked out to return.
     86         @param control_file_substring: A substring that is expected to appear
     87                                        in the control file output string that
     88                                        is passed to create_job_common.
     89                                        Default: ''
     90         """
     91         download_started_time = constants.DOWNLOAD_STARTED_TIME
     92         payload_finished_time = constants.PAYLOAD_FINISHED_TIME
     93         self.mox.StubOutWithMock(rpc_utils, 'create_job_common')
     94         rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME),
     95                                     mox.StrContains(self._BUILD)),
     96                             priority=self._PRIORITY,
     97                             timeout_mins=self._TIMEOUT*60,
     98                             max_runtime_mins=self._TIMEOUT*60,
     99                             control_type='Server',
    100                             control_file=mox.And(mox.StrContains(self._BOARD),
    101                                                  mox.StrContains(self._BUILD),
    102                                                  mox.StrContains(
    103                                                      control_file_substring)),
    104                             hostless=True,
    105                             keyvals=mox.And(mox.In(download_started_time),
    106                                             mox.In(payload_finished_time))
    107                             ).AndReturn(to_return)
    108 
    109 
    110     def testStageBuildFail(self):
    111         """Ensure that a failure to stage the desired build fails the RPC."""
    112         self._setupDevserver()
    113 
    114         self.dev_server.url().AndReturn('mox_url')
    115         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    116         self.dev_server.stage_artifacts(
    117             self._BUILD, ['test_suites']).AndRaise(
    118                 dev_server.DevServerException())
    119         self.mox.ReplayAll()
    120         self.assertRaises(error.StageControlFileFailure,
    121                           site_rpc_interface.create_suite_job,
    122                           name=self._NAME,
    123                           board=self._BOARD,
    124                           build=self._BUILD,
    125                           pool=None)
    126 
    127 
    128     def testGetControlFileFail(self):
    129         """Ensure that a failure to get needed control file fails the RPC."""
    130         self._mockDevServerGetter()
    131 
    132         self.dev_server.url().AndReturn('mox_url')
    133         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    134         self.dev_server.stage_artifacts(self._BUILD,
    135                                         ['test_suites']).AndReturn(True)
    136 
    137         self.dev_server.url().AndReturn('mox_url')
    138         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    139         self.getter.get_control_file_contents_by_name(
    140             self._SUITE_NAME).AndReturn(None)
    141         self.mox.ReplayAll()
    142         self.assertRaises(error.ControlFileEmpty,
    143                           site_rpc_interface.create_suite_job,
    144                           name=self._NAME,
    145                           board=self._BOARD,
    146                           build=self._BUILD,
    147                           pool=None)
    148 
    149 
    150     def testGetControlFileListFail(self):
    151         """Ensure that a failure to get needed control file fails the RPC."""
    152         self._mockDevServerGetter()
    153 
    154         self.dev_server.url().AndReturn('mox_url')
    155         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    156         self.dev_server.stage_artifacts(self._BUILD,
    157                                         ['test_suites']).AndReturn(True)
    158 
    159         self.dev_server.url().AndReturn('mox_url')
    160         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    161         self.getter.get_control_file_contents_by_name(
    162             self._SUITE_NAME).AndRaise(error.NoControlFileList())
    163         self.mox.ReplayAll()
    164         self.assertRaises(error.NoControlFileList,
    165                           site_rpc_interface.create_suite_job,
    166                           name=self._NAME,
    167                           board=self._BOARD,
    168                           build=self._BUILD,
    169                           pool=None)
    170 
    171 
    172     def testBadNumArgument(self):
    173         """Ensure we handle bad values for the |num| argument."""
    174         self.assertRaises(error.SuiteArgumentException,
    175                           site_rpc_interface.create_suite_job,
    176                           name=self._NAME,
    177                           board=self._BOARD,
    178                           build=self._BUILD,
    179                           pool=None,
    180                           num='goo')
    181         self.assertRaises(error.SuiteArgumentException,
    182                           site_rpc_interface.create_suite_job,
    183                           name=self._NAME,
    184                           board=self._BOARD,
    185                           build=self._BUILD,
    186                           pool=None,
    187                           num=[])
    188         self.assertRaises(error.SuiteArgumentException,
    189                           site_rpc_interface.create_suite_job,
    190                           name=self._NAME,
    191                           board=self._BOARD,
    192                           build=self._BUILD,
    193                           pool=None,
    194                           num='5')
    195 
    196 
    197 
    198     def testCreateSuiteJobFail(self):
    199         """Ensure that failure to schedule the suite job fails the RPC."""
    200         self._mockDevServerGetter()
    201 
    202         self.dev_server.url().AndReturn('mox_url')
    203         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    204         self.dev_server.stage_artifacts(self._BUILD,
    205                                         ['test_suites']).AndReturn(True)
    206 
    207         self.dev_server.url().AndReturn('mox_url')
    208         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    209         self.getter.get_control_file_contents_by_name(
    210             self._SUITE_NAME).AndReturn('f')
    211 
    212         self.dev_server.url().AndReturn('mox_url')
    213         self._mockRpcUtils(-1)
    214         self.mox.ReplayAll()
    215         self.assertEquals(
    216             site_rpc_interface.create_suite_job(name=self._NAME,
    217                                                 board=self._BOARD,
    218                                                 build=self._BUILD, pool=None),
    219             -1)
    220 
    221 
    222     def testCreateSuiteJobSuccess(self):
    223         """Ensures that success results in a successful RPC."""
    224         self._mockDevServerGetter()
    225 
    226         self.dev_server.url().AndReturn('mox_url')
    227         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    228         self.dev_server.stage_artifacts(self._BUILD,
    229                                         ['test_suites']).AndReturn(True)
    230 
    231         self.dev_server.url().AndReturn('mox_url')
    232         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    233         self.getter.get_control_file_contents_by_name(
    234             self._SUITE_NAME).AndReturn('f')
    235 
    236         self.dev_server.url().AndReturn('mox_url')
    237         job_id = 5
    238         self._mockRpcUtils(job_id)
    239         self.mox.ReplayAll()
    240         self.assertEquals(
    241             site_rpc_interface.create_suite_job(name=self._NAME,
    242                                                 board=self._BOARD,
    243                                                 build=self._BUILD,
    244                                                 pool=None),
    245             job_id)
    246 
    247 
    248     def testCreateSuiteJobNoHostCheckSuccess(self):
    249         """Ensures that success results in a successful RPC."""
    250         self._mockDevServerGetter()
    251 
    252         self.dev_server.url().AndReturn('mox_url')
    253         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    254         self.dev_server.stage_artifacts(self._BUILD,
    255                                         ['test_suites']).AndReturn(True)
    256 
    257         self.dev_server.url().AndReturn('mox_url')
    258         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    259         self.getter.get_control_file_contents_by_name(
    260             self._SUITE_NAME).AndReturn('f')
    261 
    262         self.dev_server.url().AndReturn('mox_url')
    263         job_id = 5
    264         self._mockRpcUtils(job_id)
    265         self.mox.ReplayAll()
    266         self.assertEquals(
    267           site_rpc_interface.create_suite_job(name=self._NAME,
    268                                               board=self._BOARD,
    269                                               build=self._BUILD,
    270                                               pool=None, check_hosts=False),
    271           job_id)
    272 
    273     def testCreateSuiteIntegerNum(self):
    274         """Ensures that success results in a successful RPC."""
    275         self._mockDevServerGetter()
    276 
    277         self.dev_server.url().AndReturn('mox_url')
    278         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    279         self.dev_server.stage_artifacts(self._BUILD,
    280                                         ['test_suites']).AndReturn(True)
    281 
    282         self.dev_server.url().AndReturn('mox_url')
    283         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    284         self.getter.get_control_file_contents_by_name(
    285             self._SUITE_NAME).AndReturn('f')
    286 
    287         self.dev_server.url().AndReturn('mox_url')
    288         job_id = 5
    289         self._mockRpcUtils(job_id, control_file_substring='num=17')
    290         self.mox.ReplayAll()
    291         self.assertEquals(
    292             site_rpc_interface.create_suite_job(name=self._NAME,
    293                                                 board=self._BOARD,
    294                                                 build=self._BUILD,
    295                                                 pool=None,
    296                                                 check_hosts=False,
    297                                                 num=17),
    298             job_id)
    299 
    300 
    301     def testCreateSuiteJobControlFileSupplied(self):
    302         """Ensure we can supply the control file to create_suite_job."""
    303         self._mockDevServerGetter(get_control_file=False)
    304 
    305         self.dev_server.url().AndReturn('mox_url')
    306         self.dev_server.get_server_name(mox.IgnoreArg()).AndReturn('mox_url')
    307         self.dev_server.stage_artifacts(self._BUILD,
    308                                         ['test_suites']).AndReturn(True)
    309         self.dev_server.url().AndReturn('mox_url')
    310         job_id = 5
    311         self._mockRpcUtils(job_id)
    312         self.mox.ReplayAll()
    313         self.assertEquals(
    314             site_rpc_interface.create_suite_job(name='%s/%s' % (self._NAME,
    315                                                                 self._BUILD),
    316                                                 board=None,
    317                                                 build=self._BUILD,
    318                                                 pool=None,
    319                                                 control_file='CONTROL FILE'),
    320             job_id)
    321 
    322 
    323     def setIsMoblab(self, is_moblab):
    324         """Set utils.is_moblab result.
    325 
    326         @param is_moblab: Value to have utils.is_moblab to return.
    327         """
    328         self.mox.StubOutWithMock(utils, 'is_moblab')
    329         utils.is_moblab().AndReturn(is_moblab)
    330 
    331 
    332     def testMoblabOnlyDecorator(self):
    333         """Ensure the moblab only decorator gates functions properly."""
    334         self.setIsMoblab(False)
    335         self.mox.ReplayAll()
    336         self.assertRaises(error.RPCException,
    337                           site_rpc_interface.get_config_values)
    338 
    339 
    340     def testGetConfigValues(self):
    341         """Ensure that the config object is properly converted to a dict."""
    342         self.setIsMoblab(True)
    343         config_mock = self.mox.CreateMockAnything()
    344         site_rpc_interface._CONFIG = config_mock
    345         config_mock.get_sections().AndReturn(['section1', 'section2'])
    346         config_mock.config = self.mox.CreateMockAnything()
    347         config_mock.config.items('section1').AndReturn([('item1', 'value1'),
    348                                                         ('item2', 'value2')])
    349         config_mock.config.items('section2').AndReturn([('item3', 'value3'),
    350                                                         ('item4', 'value4')])
    351 
    352         rpc_utils.prepare_for_serialization(
    353             {'section1' : [('item1', 'value1'),
    354                            ('item2', 'value2')],
    355              'section2' : [('item3', 'value3'),
    356                            ('item4', 'value4')]})
    357         self.mox.ReplayAll()
    358         site_rpc_interface.get_config_values()
    359 
    360 
    361     def _mockReadFile(self, path, lines=[]):
    362         """Mock out reading a file line by line.
    363 
    364         @param path: Path of the file we are mock reading.
    365         @param lines: lines of the mock file that will be returned when
    366                       readLine() is called.
    367         """
    368         mockFile = self.mox.CreateMockAnything()
    369         for line in lines:
    370             mockFile.readline().AndReturn(line)
    371         mockFile.readline()
    372         mockFile.close()
    373         open(path).AndReturn(mockFile)
    374 
    375 
    376     def testUpdateConfig(self):
    377         """Ensure that updating the config works as expected."""
    378         self.setIsMoblab(True)
    379         site_rpc_interface.os = self.mox.CreateMockAnything()
    380 
    381         self.mox.StubOutWithMock(__builtin__, 'open')
    382         self._mockReadFile(global_config.DEFAULT_CONFIG_FILE)
    383 
    384         self.mox.StubOutWithMock(lsbrelease_utils, 'is_moblab')
    385         lsbrelease_utils.is_moblab().AndReturn(True)
    386 
    387         self._mockReadFile(global_config.DEFAULT_MOBLAB_FILE,
    388                            ['[section1]', 'item1: value1'])
    389 
    390         site_rpc_interface.os = self.mox.CreateMockAnything()
    391         site_rpc_interface.os.path = self.mox.CreateMockAnything()
    392         site_rpc_interface.os.path.exists(
    393                 site_rpc_interface._CONFIG.shadow_file).AndReturn(
    394                 True)
    395         mockShadowFile = self.mox.CreateMockAnything()
    396         mockShadowFileContents = StringIO.StringIO()
    397         mockShadowFile.__enter__().AndReturn(mockShadowFileContents)
    398         mockShadowFile.__exit__(mox.IgnoreArg(), mox.IgnoreArg(),
    399                                 mox.IgnoreArg())
    400         open(site_rpc_interface._CONFIG.shadow_file,
    401              'w').AndReturn(mockShadowFile)
    402         site_rpc_interface.os.system('sudo reboot')
    403 
    404         self.mox.ReplayAll()
    405         site_rpc_interface.update_config_handler(
    406                 {'section1' : [('item1', 'value1'),
    407                                ('item2', 'value2')],
    408                  'section2' : [('item3', 'value3'),
    409                                ('item4', 'value4')]})
    410 
    411         # item1 should not be in the new shadow config as its updated value
    412         # matches the original config's value.
    413         self.assertEquals(
    414                 mockShadowFileContents.getvalue(),
    415                 '[section2]\nitem3 = value3\nitem4 = value4\n\n'
    416                 '[section1]\nitem2 = value2\n\n')
    417 
    418 
    419     def testResetConfig(self):
    420         """Ensure that reset opens the shadow_config file for writing."""
    421         self.setIsMoblab(True)
    422         config_mock = self.mox.CreateMockAnything()
    423         site_rpc_interface._CONFIG = config_mock
    424         config_mock.shadow_file = 'shadow_config.ini'
    425         self.mox.StubOutWithMock(__builtin__, 'open')
    426         mockFile = self.mox.CreateMockAnything()
    427         file_contents = self.mox.CreateMockAnything()
    428         mockFile.__enter__().AndReturn(file_contents)
    429         mockFile.__exit__(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
    430         open(config_mock.shadow_file, 'w').AndReturn(mockFile)
    431         site_rpc_interface.os = self.mox.CreateMockAnything()
    432         site_rpc_interface.os.system('sudo reboot')
    433         self.mox.ReplayAll()
    434         site_rpc_interface.reset_config_settings()
    435 
    436 
    437     def testSetBotoKey(self):
    438         """Ensure that the botokey path supplied is copied correctly."""
    439         self.setIsMoblab(True)
    440         boto_key = '/tmp/boto'
    441         site_rpc_interface.os.path = self.mox.CreateMockAnything()
    442         site_rpc_interface.os.path.exists(boto_key).AndReturn(
    443                 True)
    444         site_rpc_interface.shutil = self.mox.CreateMockAnything()
    445         site_rpc_interface.shutil.copyfile(
    446                 boto_key, site_rpc_interface.MOBLAB_BOTO_LOCATION)
    447         self.mox.ReplayAll()
    448         site_rpc_interface.set_boto_key(boto_key)
    449 
    450 
    451     def testSetLaunchControlKey(self):
    452         """Ensure that the Launch Control key path supplied is copied correctly.
    453         """
    454         self.setIsMoblab(True)
    455         launch_control_key = '/tmp/launch_control'
    456         site_rpc_interface.os = self.mox.CreateMockAnything()
    457         site_rpc_interface.os.path = self.mox.CreateMockAnything()
    458         site_rpc_interface.os.path.exists(launch_control_key).AndReturn(
    459                 True)
    460         site_rpc_interface.shutil = self.mox.CreateMockAnything()
    461         site_rpc_interface.shutil.copyfile(
    462                 launch_control_key,
    463                 moblab_host.MOBLAB_LAUNCH_CONTROL_KEY_LOCATION)
    464         site_rpc_interface.os.system('sudo restart moblab-devserver-init')
    465         self.mox.ReplayAll()
    466         site_rpc_interface.set_launch_control_key(launch_control_key)
    467 
    468 
    469     def _get_records_for_sending_to_master(self):
    470         return [{'control_file': 'foo',
    471                  'control_type': 1,
    472                  'created_on': datetime.datetime(2014, 8, 21),
    473                  'drone_set': None,
    474                  'email_list': '',
    475                  'max_runtime_hrs': 72,
    476                  'max_runtime_mins': 1440,
    477                  'name': 'dummy',
    478                  'owner': 'autotest_system',
    479                  'parse_failed_repair': True,
    480                  'priority': 40,
    481                  'reboot_after': 0,
    482                  'reboot_before': 1,
    483                  'run_reset': True,
    484                  'run_verify': False,
    485                  'synch_count': 0,
    486                  'test_retry': 10,
    487                  'timeout': 24,
    488                  'timeout_mins': 1440,
    489                  'id': 1
    490                  }], [{
    491                     'aborted': False,
    492                     'active': False,
    493                     'complete': False,
    494                     'deleted': False,
    495                     'execution_subdir': '',
    496                     'finished_on': None,
    497                     'started_on': None,
    498                     'status': 'Queued',
    499                     'id': 1
    500                 }]
    501 
    502 
    503     def _do_heartbeat_and_assert_response(self, shard_hostname='shard1',
    504                                           upload_jobs=(), upload_hqes=(),
    505                                           known_jobs=(), known_hosts=(),
    506                                           **kwargs):
    507         known_job_ids = [job.id for job in known_jobs]
    508         known_host_ids = [host.id for host in known_hosts]
    509         known_host_statuses = [host.status for host in known_hosts]
    510 
    511         retval = site_rpc_interface.shard_heartbeat(
    512             shard_hostname=shard_hostname,
    513             jobs=upload_jobs, hqes=upload_hqes,
    514             known_job_ids=known_job_ids, known_host_ids=known_host_ids,
    515             known_host_statuses=known_host_statuses)
    516 
    517         self._assert_shard_heartbeat_response(shard_hostname, retval,
    518                                               **kwargs)
    519 
    520         return shard_hostname
    521 
    522 
    523     def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[],
    524                                          hosts=[], hqes=[]):
    525 
    526         retval_hosts, retval_jobs = retval['hosts'], retval['jobs']
    527 
    528         expected_jobs = [
    529             (job.id, job.name, shard_hostname) for job in jobs]
    530         returned_jobs = [(job['id'], job['name'], job['shard']['hostname'])
    531                          for job in retval_jobs]
    532         self.assertEqual(returned_jobs, expected_jobs)
    533 
    534         expected_hosts = [(host.id, host.hostname) for host in hosts]
    535         returned_hosts = [(host['id'], host['hostname'])
    536                           for host in retval_hosts]
    537         self.assertEqual(returned_hosts, expected_hosts)
    538 
    539         retval_hqes = []
    540         for job in retval_jobs:
    541             retval_hqes += job['hostqueueentry_set']
    542 
    543         expected_hqes = [(hqe.id) for hqe in hqes]
    544         returned_hqes = [(hqe['id']) for hqe in retval_hqes]
    545         self.assertEqual(returned_hqes, expected_hqes)
    546 
    547 
    548     def _send_records_to_master_helper(
    549         self, jobs, hqes, shard_hostname='host1',
    550         exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False):
    551         job_id = rpc_interface.create_job(name='dummy', priority='Medium',
    552                                           control_file='foo',
    553                                           control_type=SERVER,
    554                                           test_retry=10, hostless=True)
    555         job = models.Job.objects.get(pk=job_id)
    556         shard = models.Shard.objects.create(hostname='host1')
    557         job.shard = shard
    558         job.save()
    559 
    560         if aborted:
    561             job.hostqueueentry_set.update(aborted=True)
    562             job.shard = None
    563             job.save()
    564 
    565         hqe = job.hostqueueentry_set.all()[0]
    566         if not exception_to_throw:
    567             self._do_heartbeat_and_assert_response(
    568                 shard_hostname=shard_hostname,
    569                 upload_jobs=jobs, upload_hqes=hqes)
    570         else:
    571             self.assertRaises(
    572                 exception_to_throw,
    573                 self._do_heartbeat_and_assert_response,
    574                 shard_hostname=shard_hostname,
    575                 upload_jobs=jobs, upload_hqes=hqes)
    576 
    577 
    578     def testSendingRecordsToMaster(self):
    579         """Send records to the master and ensure they are persisted."""
    580         jobs, hqes = self._get_records_for_sending_to_master()
    581         hqes[0]['status'] = 'Completed'
    582         self._send_records_to_master_helper(
    583             jobs=jobs, hqes=hqes, exception_to_throw=None)
    584 
    585         # Check the entry was actually written to db
    586         self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
    587                          'Completed')
    588 
    589 
    590     def testSendingRecordsToMasterAbortedOnMaster(self):
    591         """Send records to the master and ensure they are persisted."""
    592         jobs, hqes = self._get_records_for_sending_to_master()
    593         hqes[0]['status'] = 'Completed'
    594         self._send_records_to_master_helper(
    595             jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True)
    596 
    597         # Check the entry was actually written to db
    598         self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
    599                          'Completed')
    600 
    601 
    602     def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
    603         """Ensure records that belong to a different shard are rejected."""
    604         jobs, hqes = self._get_records_for_sending_to_master()
    605         models.Shard.objects.create(hostname='other_shard')
    606         self._send_records_to_master_helper(
    607             jobs=jobs, hqes=hqes, shard_hostname='other_shard')
    608 
    609 
    610     def testSendingRecordsToMasterJobHqeWithoutJob(self):
    611         """Ensure update for hqe without update for it's job gets rejected."""
    612         _, hqes = self._get_records_for_sending_to_master()
    613         self._send_records_to_master_helper(
    614             jobs=[], hqes=hqes)
    615 
    616 
    617     def testSendingRecordsToMasterNotExistingJob(self):
    618         """Ensure update for non existing job gets rejected."""
    619         jobs, hqes = self._get_records_for_sending_to_master()
    620         jobs[0]['id'] = 3
    621 
    622         self._send_records_to_master_helper(
    623             jobs=jobs, hqes=hqes)
    624 
    625 
    626     def _createShardAndHostWithLabel(self, shard_hostname='shard1',
    627                                      host_hostname='host1',
    628                                      label_name='board:lumpy'):
    629         label = models.Label.objects.create(name=label_name)
    630 
    631         shard = models.Shard.objects.create(hostname=shard_hostname)
    632         shard.labels.add(label)
    633 
    634         host = models.Host.objects.create(hostname=host_hostname, leased=False)
    635         host.labels.add(label)
    636 
    637         return shard, host, label
    638 
    639 
    640     def _createJobForLabel(self, label):
    641         job_id = rpc_interface.create_job(name='dummy', priority='Medium',
    642                                           control_file='foo',
    643                                           control_type=CLIENT,
    644                                           meta_hosts=[label.name],
    645                                           dependencies=(label.name,))
    646         return models.Job.objects.get(id=job_id)
    647 
    648 
    649     def testShardHeartbeatFetchHostlessJob(self):
    650         """Create a hostless job and ensure it's not assigned to a shard."""
    651         shard1, host1, lumpy_label = self._createShardAndHostWithLabel(
    652             'shard1', 'host1', 'board:lumpy')
    653 
    654         label2 = models.Label.objects.create(name='bluetooth', platform=False)
    655 
    656         job1 = self._create_job(hostless=True)
    657 
    658         # Hostless jobs should be executed by the global scheduler.
    659         self._do_heartbeat_and_assert_response(hosts=[host1])
    660 
    661 
    662     def testShardRetrieveJobs(self):
    663         """Create jobs and retrieve them."""
    664         # should never be returned by heartbeat
    665         leased_host = models.Host.objects.create(hostname='leased_host',
    666                                                  leased=True)
    667 
    668         shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
    669         shard2, host2, grumpy_label = self._createShardAndHostWithLabel(
    670             'shard2', 'host2', 'board:grumpy')
    671 
    672         leased_host.labels.add(lumpy_label)
    673 
    674         job1 = self._createJobForLabel(lumpy_label)
    675 
    676         job2 = self._createJobForLabel(grumpy_label)
    677 
    678         job_completed = self._createJobForLabel(lumpy_label)
    679         # Job is already being run, so don't sync it
    680         job_completed.hostqueueentry_set.update(complete=True)
    681         job_completed.hostqueueentry_set.create(complete=False)
    682 
    683         job_active = self._createJobForLabel(lumpy_label)
    684         # Job is already started, so don't sync it
    685         job_active.hostqueueentry_set.update(active=True)
    686         job_active.hostqueueentry_set.create(complete=False, active=False)
    687 
    688         self._do_heartbeat_and_assert_response(
    689             jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all())
    690 
    691         self._do_heartbeat_and_assert_response(
    692             shard_hostname=shard2.hostname,
    693             jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all())
    694 
    695         host3 = models.Host.objects.create(hostname='host3', leased=False)
    696         host3.labels.add(lumpy_label)
    697 
    698         self._do_heartbeat_and_assert_response(
    699             known_jobs=[job1], known_hosts=[host1], hosts=[host3])
    700 
    701 
    702     def testResendJobsAfterFailedHeartbeat(self):
    703         """Create jobs, retrieve them, fail on client, fetch them again."""
    704         shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
    705 
    706         job1 = self._createJobForLabel(lumpy_label)
    707 
    708         self._do_heartbeat_and_assert_response(
    709             jobs=[job1],
    710             hqes=job1.hostqueueentry_set.all(), hosts=[host1])
    711 
    712         # Make sure it's resubmitted by sending last_job=None again
    713         self._do_heartbeat_and_assert_response(
    714             known_hosts=[host1],
    715             jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[])
    716 
    717         # Now it worked, make sure it's not sent again
    718         self._do_heartbeat_and_assert_response(
    719             known_jobs=[job1], known_hosts=[host1])
    720 
    721         job1 = models.Job.objects.get(pk=job1.id)
    722         job1.hostqueueentry_set.all().update(complete=True)
    723 
    724         # Job is completed, make sure it's not sent again
    725         self._do_heartbeat_and_assert_response(
    726             known_hosts=[host1])
    727 
    728         job2 = self._createJobForLabel(lumpy_label)
    729 
    730         # job2's creation was later, it should be returned now.
    731         self._do_heartbeat_and_assert_response(
    732             known_hosts=[host1],
    733             jobs=[job2], hqes=job2.hostqueueentry_set.all())
    734 
    735         self._do_heartbeat_and_assert_response(
    736             known_jobs=[job2], known_hosts=[host1])
    737 
    738         job2 = models.Job.objects.get(pk=job2.pk)
    739         job2.hostqueueentry_set.update(aborted=True)
    740         # Setting a job to a complete status will set the shard_id to None in
    741         # scheduler_models. We have to emulate that here, because we use Django
    742         # models in tests.
    743         job2.shard = None
    744         job2.save()
    745 
    746         self._do_heartbeat_and_assert_response(
    747             known_jobs=[job2], known_hosts=[host1],
    748             jobs=[job2],
    749             hqes=job2.hostqueueentry_set.all())
    750 
    751         site_rpc_interface.delete_shard(hostname=shard1.hostname)
    752 
    753         self.assertRaises(
    754             models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id)
    755 
    756         job1 = models.Job.objects.get(pk=job1.id)
    757         lumpy_label = models.Label.objects.get(pk=lumpy_label.id)
    758         host1 = models.Host.objects.get(pk=host1.id)
    759 
    760         self.assertIsNone(job1.shard)
    761         self.assertEqual(len(lumpy_label.shard_set.all()), 0)
    762         self.assertIsNone(host1.shard)
    763         self.assertEqual([s.task for s in host1.specialtask_set.all()],
    764                          ['Repair'])
    765 
    766 
    767     def testCreateListShard(self):
    768         """Retrieve a list of all shards."""
    769         lumpy_label = models.Label.objects.create(name='board:lumpy',
    770                                                   platform=True)
    771         stumpy_label = models.Label.objects.create(name='board:stumpy',
    772                                                   platform=True)
    773 
    774         shard_id = site_rpc_interface.add_shard(
    775             hostname='host1', labels='board:lumpy,board:stumpy')
    776         self.assertRaises(model_logic.ValidationError,
    777                           site_rpc_interface.add_shard,
    778                           hostname='host1', labels='board:lumpy,board:stumpy')
    779         shard = models.Shard.objects.get(pk=shard_id)
    780         self.assertEqual(shard.hostname, 'host1')
    781         self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
    782         self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
    783 
    784         self.assertEqual(site_rpc_interface.get_shards(),
    785                          [{'labels': ['board:lumpy','board:stumpy'],
    786                            'hostname': 'host1',
    787                            'id': 1}])
    788 
    789 
    790     def testResendHostsAfterFailedHeartbeat(self):
    791         """Check that master accepts resending updated records after failure."""
    792         shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
    793 
    794         # Send the host
    795         self._do_heartbeat_and_assert_response(hosts=[host1])
    796 
    797         # Send it again because previous one didn't persist correctly
    798         self._do_heartbeat_and_assert_response(hosts=[host1])
    799 
    800         # Now it worked, make sure it isn't sent again
    801         self._do_heartbeat_and_assert_response(known_hosts=[host1])
    802 
    803 
    804 if __name__ == '__main__':
    805   unittest.main()
    806