Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      5 # Use of this source code is governed by a BSD-style license that can be
      6 # found in the LICENSE file.
      7 
      8 import mock
      9 import unittest
     10 
     11 import common
     12 from autotest_lib.frontend import setup_django_environment
     13 from autotest_lib.frontend.afe import frontend_test_utils
     14 from autotest_lib.frontend.afe import models
     15 from autotest_lib.server.cros.dynamic_suite import constants
     16 from autotest_lib.scheduler import host_scheduler
     17 from autotest_lib.scheduler import monitor_db
     18 from autotest_lib.scheduler import rdb
     19 from autotest_lib.scheduler import rdb_lib
     20 from autotest_lib.scheduler import rdb_testing_utils
     21 from autotest_lib.scheduler import scheduler_models
     22 
     23 
     24 class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester,
     25                         unittest.TestCase):
     26     """Verify scheduler behavior when pending jobs are already given hosts."""
     27 
     28     _config_section = 'AUTOTEST_WEB'
     29 
     30 
     31     def testPendingQueueEntries(self):
     32         """Test retrieval of pending queue entries."""
     33         job = self.create_job(deps=set(['a']))
     34 
     35         # Check that we don't pull the job we just created with only_hostless.
     36         jobs_with_hosts = self.job_query_manager.get_pending_queue_entries(
     37                 only_hostless=True)
     38         self.assertTrue(len(jobs_with_hosts) == 0)
     39 
     40         # Check that only_hostless=False pulls new jobs, as always.
     41         jobs_without_hosts = self.job_query_manager.get_pending_queue_entries(
     42                 only_hostless=False)
     43         self.assertTrue(jobs_without_hosts[0].id == job.id and
     44                         jobs_without_hosts[0].host_id is None)
     45 
     46 
     47     def testPendingQueueEntriesForShard(self):
     48         """Test queue entries for shards aren't executed by master scheduler"""
     49         job1 = self.create_job(deps=set(['a']))
     50         job2 = self.create_job(deps=set(['b']))
     51         shard = models.Shard.objects.create()
     52         # Assign the job's label to a shard
     53         shard.labels.add(job1.dependency_labels.all()[0])
     54 
     55         # Check that we only pull jobs which are not assigned to a shard.
     56         jobs_with_hosts = self.job_query_manager.get_pending_queue_entries()
     57         self.assertTrue(len(jobs_with_hosts) == 1)
     58         self.assertEqual(jobs_with_hosts[0].id, job2.id)
     59 
     60 
     61     def testHostQueries(self):
     62         """Verify that the host query manager maintains its data structures."""
     63         # Create a job and use the host_query_managers internal datastructures
     64         # to retrieve its job info.
     65         job = self.create_job(
     66                 deps=rdb_testing_utils.DEFAULT_DEPS,
     67                 acls=rdb_testing_utils.DEFAULT_ACLS)
     68         queue_entries = self._dispatcher._refresh_pending_queue_entries()
     69         job_manager = rdb_lib.JobQueryManager(queue_entries)
     70         job_info = job_manager.get_job_info(queue_entries[0])
     71         default_dep_ids = set([label.id for label in self.db_helper.get_labels(
     72                 name__in=rdb_testing_utils.DEFAULT_DEPS)])
     73         default_acl_ids = set([acl.id for acl in self.db_helper.get_acls(
     74                 name__in=rdb_testing_utils.DEFAULT_ACLS)])
     75         self.assertTrue(set(job_info['deps']) == default_dep_ids)
     76         self.assertTrue(set(job_info['acls']) == default_acl_ids)
     77 
     78 
     79     def testNewJobsWithHosts(self):
     80         """Test that we handle inactive hqes with unleased hosts correctly."""
     81         # Create a job and assign it an unleased host, then check that the
     82         # HQE becomes active and the host remains assigned to it.
     83         job = self.create_job(deps=['a'])
     84         host = self.db_helper.create_host('h1', deps=['a'])
     85         self.db_helper.add_host_to_job(host, job.id)
     86 
     87         queue_entries = self._dispatcher._refresh_pending_queue_entries()
     88         self._dispatcher._schedule_new_jobs()
     89 
     90         host = self.db_helper.get_host(hostname='h1')[0]
     91         self.assertTrue(host.leased == True and
     92                         host.status == models.Host.Status.READY)
     93         hqes = list(self.db_helper.get_hqes(host_id=host.id))
     94         self.assertTrue(len(hqes) == 1 and hqes[0].active and
     95                         hqes[0].status == models.HostQueueEntry.Status.QUEUED)
     96 
     97 
     98     def testNewJobsWithInvalidHost(self):
     99         """Test handling of inactive hqes assigned invalid, unleased hosts."""
    100         # Create a job and assign it an unleased host, then check that the
    101         # HQE becomes DOES NOT become active, because we validate the
    102         # assignment again.
    103         job = self.create_job(deps=['a'])
    104         host = self.db_helper.create_host('h1', deps=['b'])
    105         self.db_helper.add_host_to_job(host, job.id)
    106 
    107         queue_entries = self._dispatcher._refresh_pending_queue_entries()
    108         self._dispatcher._schedule_new_jobs()
    109 
    110         host = self.db_helper.get_host(hostname='h1')[0]
    111         self.assertTrue(host.leased == False and
    112                         host.status == models.Host.Status.READY)
    113         hqes = list(self.db_helper.get_hqes(host_id=host.id))
    114         self.assertTrue(len(hqes) == 1 and not hqes[0].active and
    115                         hqes[0].status == models.HostQueueEntry.Status.QUEUED)
    116 
    117 
    118     def testNewJobsWithLeasedHost(self):
    119         """Test handling of inactive hqes assigned leased hosts."""
    120         # Create a job and assign it a leased host, then check that the
    121         # HQE does not become active through the scheduler, and that the
    122         # host gets released.
    123         job = self.create_job(deps=['a'])
    124         host = self.db_helper.create_host('h1', deps=['b'])
    125         self.db_helper.add_host_to_job(host, job.id)
    126         host.leased = 1
    127         host.save()
    128 
    129         rdb.batch_acquire_hosts = mock.MagicMock()
    130         queue_entries = self._dispatcher._refresh_pending_queue_entries()
    131         self._dispatcher._schedule_new_jobs()
    132         self.assertTrue(rdb.batch_acquire_hosts.call_count == 0)
    133         host = self.db_helper.get_host(hostname='h1')[0]
    134         self.assertTrue(host.leased == True and
    135                         host.status == models.Host.Status.READY)
    136         hqes = list(self.db_helper.get_hqes(host_id=host.id))
    137         self.assertTrue(len(hqes) == 1 and not hqes[0].active and
    138                         hqes[0].status == models.HostQueueEntry.Status.QUEUED)
    139         self.host_scheduler._release_hosts()
    140         self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0)
    141 
    142 
    143     def testSpecialTaskOrdering(self):
    144         """Test priority ordering of special tasks."""
    145 
    146         # Create 2 special tasks, one with and one without an hqe.
    147         # Activate the hqe and make sure it gets scheduled before the other.
    148         host = self.db_helper.create_host('h1', deps=['a'])
    149         job1 = self.create_job(deps=['a'])
    150         self.db_helper.add_host_to_job(host, job1.id)
    151         task1 = self.db_helper.create_special_task(job1.id)
    152         hqe = self.db_helper.get_hqes(job=job1.id)[0]
    153 
    154         # This task has no queue entry.
    155         task2 = self.db_helper.create_special_task(host_id=host.id)
    156 
    157         # Since the hqe task isn't active we get both back.
    158         tasks = self.job_query_manager.get_prioritized_special_tasks()
    159         self.assertTrue(tasks[1].queue_entry_id is None and
    160                         tasks[0].queue_entry_id == hqe.id)
    161 
    162         # Activate the hqe and make sure the frontned task isn't returned.
    163         self.db_helper.update_hqe(hqe.id, active=True)
    164         tasks = self.job_query_manager.get_prioritized_special_tasks()
    165         self.assertTrue(tasks[0].id == task1.id)
    166 
    167 
    168 class HostSchedulerTests(rdb_testing_utils.AbstractBaseRDBTester,
    169                          unittest.TestCase):
    170     """Verify scheduler behavior when pending jobs are already given hosts."""
    171 
    172     _config_section = 'AUTOTEST_WEB'
    173 
    174 
    175     def setUp(self):
    176         super(HostSchedulerTests, self).setUp()
    177         self.host_scheduler = host_scheduler.HostScheduler()
    178 
    179 
    180     def testSpecialTaskLocking(self):
    181         """Test that frontend special tasks lock hosts."""
    182         # Create multiple tasks with hosts and make sure the hosts get locked.
    183         host = self.db_helper.create_host('h')
    184         host1 = self.db_helper.create_host('h1')
    185         task = self.db_helper.create_special_task(host_id=host.id)
    186         task1 = self.db_helper.create_special_task(host_id=host1.id)
    187         self.host_scheduler._lease_hosts_of_frontend_tasks()
    188         self.assertTrue(self.db_helper.get_host(hostname='h')[0].leased == 1 and
    189                         self.db_helper.get_host(hostname='h1')[0].leased == 1)
    190 
    191 
    192     def testJobScheduling(self):
    193         """Test new host acquisitions."""
    194         # Create a job that will find a host through the host scheduler, and
    195         # make sure the hqe is activated, and a special task is created.
    196         job = self.create_job(deps=set(['a']))
    197         host = self.db_helper.create_host('h1', deps=set(['a']))
    198         self.host_scheduler._schedule_jobs()
    199         hqe = self.db_helper.get_hqes(job_id=job.id)[0]
    200         self.assertTrue(hqe.active and hqe.host_id == host.id and
    201                         hqe.status == models.HostQueueEntry.Status.QUEUED)
    202         task = self.db_helper.get_tasks(queue_entry_id=hqe.id)[0]
    203         self.assertTrue(task.is_active == 0 and task.host_id == host.id)
    204 
    205 
    206     def _check_agent_invariants(self, host, agent):
    207         host_agents = list(self._dispatcher._host_agents[host.id])
    208         self.assertTrue(len(host_agents) == 1)
    209         self.assertTrue(host_agents[0].task.task.id == agent.id)
    210         return host_agents[0]
    211 
    212 
    213     def testLeasedFrontendTaskHost(self):
    214         """Check that we don't scheduler a special task on an unleased host."""
    215         # Create a special task without an hqe and make sure it isn't returned
    216         # for scheduling till its host is leased.
    217         host = self.db_helper.create_host('h1', deps=['a'])
    218         task = self.db_helper.create_special_task(host_id=host.id)
    219 
    220         tasks = self.job_query_manager.get_prioritized_special_tasks(
    221                 only_tasks_with_leased_hosts=True)
    222         self.assertTrue(tasks == [])
    223         tasks = self.job_query_manager.get_prioritized_special_tasks(
    224                 only_tasks_with_leased_hosts=False)
    225         self.assertTrue(tasks[0].id == task.id)
    226         self.host_scheduler._lease_hosts_of_frontend_tasks()
    227         tasks = self.job_query_manager.get_prioritized_special_tasks(
    228                 only_tasks_with_leased_hosts=True)
    229         self.assertTrue(tasks[0].id == task.id)
    230 
    231 
    232     def testTickLockStep(self):
    233         """Check that a frontend task and an hqe never run simultaneously."""
    234 
    235         self.god.stub_with(monitor_db, '_inline_host_acquisition', False)
    236 
    237         # Create a frontend special task against a host.
    238         host = self.db_helper.create_host('h1', deps=set(['a']))
    239         frontend_task = self.db_helper.create_special_task(host_id=host.id)
    240         self._dispatcher._schedule_special_tasks()
    241         # The frontend special task shouldn't get scheduled on the host till
    242         # the host is leased.
    243         self.assertFalse(self._dispatcher.host_has_agent(host))
    244 
    245         # Create a job for the same host and make the host scheduler lease the
    246         # host out to that job.
    247         job =  self.create_job(deps=set(['a']))
    248         self.host_scheduler._schedule_jobs()
    249         hqe = self.db_helper.get_hqes(job_id=job.id)[0]
    250         tasks = self.job_query_manager.get_prioritized_special_tasks(
    251                 only_tasks_with_leased_hosts=True)
    252         # We should not find the frontend special task, even though its host is
    253         # now leased, because its leased by an active hqe.
    254         self.assertTrue(len(tasks) == 1 and tasks[0].queue_entry_id == hqe.id)
    255         self._dispatcher._schedule_special_tasks()
    256         self.assertTrue(self._dispatcher.host_has_agent(host))
    257 
    258         # Deactivate the hqe task and make sure the frontend task gets the host.
    259         task = tasks[0]
    260         self._dispatcher.remove_agent(self._check_agent_invariants(host, task))
    261         task.is_complete = 1
    262         task.is_active = 0
    263         task.save()
    264         self.db_helper.update_hqe(hqe.id, active=False)
    265         self._dispatcher._schedule_special_tasks()
    266         self.assertTrue(self._dispatcher.host_has_agent(host))
    267         self._check_agent_invariants(host, frontend_task)
    268 
    269         # Make sure we don't release the host being used by the incomplete task.
    270         self.host_scheduler._release_hosts()
    271         host = self.db_helper.get_host(hostname='h1')[0]
    272         self.assertTrue(host.leased == True)
    273 
    274 
    275 class SuiteRecorderTest(rdb_testing_utils.AbstractBaseRDBTester,
    276                         unittest.TestCase):
    277     """Test the functionality of SuiteRecorder"""
    278 
    279     _config_section = 'AUTOTEST_WEB'
    280 
    281     def testGetSuiteHostAssignment(self):
    282         """Test the initialization of SuiteRecord."""
    283         hosts = []
    284         num = 4
    285         for i in range (0, num):
    286             hosts.append(self.db_helper.create_host(
    287                 'h%d' % i, deps=set(['board:lumpy'])))
    288         single_job =  self.create_job(deps=set(['a']))
    289         jobs_1 = self.create_suite(num=2, board='board:lumpy')
    290         jobs_2 = self.create_suite(num=2, board='board:lumpy')
    291         # We have 4 hosts, 5 jobs, one job in the second suite won't
    292         # get a host.
    293         all_jobs = ([single_job] +
    294                     [jobs_1[k] for k in jobs_1 if k !='parent_job'] +
    295                     [jobs_2[k] for k in jobs_2 if k !='parent_job'])
    296         for i in range(0, num):
    297             self.db_helper.add_host_to_job(hosts[i], all_jobs[i].id,
    298                                            activate=True)
    299         r = host_scheduler.SuiteRecorder(self.job_query_manager)
    300         self.assertEqual(r.suite_host_num,
    301                          {jobs_1['parent_job'].id:2,
    302                           jobs_2['parent_job'].id:1})
    303         self.assertEqual(r.hosts_to_suites,
    304                          {hosts[1].id: jobs_1['parent_job'].id,
    305                           hosts[2].id: jobs_1['parent_job'].id,
    306                           hosts[3].id: jobs_2['parent_job'].id})
    307 
    308 
    309     def verify_state(self, recorder, suite_host_num, hosts_to_suites):
    310         """Verify the suite, host information held by SuiteRecorder.
    311 
    312         @param recorder: A SuiteRecorder object.
    313         @param suite_host_num: a dict, expected value of suite_host_num.
    314         @param hosts_to_suites: a dict, expected value of hosts_to_suites.
    315         """
    316         self.assertEqual(recorder.suite_host_num, suite_host_num)
    317         self.assertEqual(recorder.hosts_to_suites, hosts_to_suites)
    318 
    319 
    320     def assign_host_to_job(self, host, job, recorder=None):
    321         """A helper function that adds a host to a job and record it.
    322 
    323         @param host: A Host object.
    324         @param job: A Job object.
    325         @param recorder: A SuiteRecorder object to record the assignment.
    326 
    327         @return a HostQueueEntry object that binds the host and job together.
    328         """
    329         self.db_helper.add_host_to_job(host, job)
    330         hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s',
    331                                                      params=(job.id,))[0]
    332         if recorder:
    333             recorder.record_assignment(hqe)
    334         return hqe
    335 
    336 
    337     def testRecordAssignmentAndRelease(self):
    338         """Test when a host is assigned to suite"""
    339         r = host_scheduler.SuiteRecorder(self.job_query_manager)
    340         self.verify_state(r, {}, {})
    341         host1 = self.db_helper.create_host('h1')
    342         host2 = self.db_helper.create_host('h2')
    343         jobs = self.create_suite(num=2)
    344         hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s',
    345                                                      params=(jobs[0].id,))[0]
    346         # HQE got a host.
    347         hqe = self.assign_host_to_job(host1, jobs[0], r)
    348         self.verify_state(r, {jobs['parent_job'].id:1},
    349                           {host1.id: jobs['parent_job'].id})
    350         # Tried to call record_assignment again, nothing should happen.
    351         r.record_assignment(hqe)
    352         self.verify_state(r, {jobs['parent_job'].id:1},
    353                           {host1.id: jobs['parent_job'].id})
    354         # Second hqe got a host
    355         self.assign_host_to_job(host2, jobs[1], r)
    356         self.verify_state(r, {jobs['parent_job'].id:2},
    357                           {host1.id: jobs['parent_job'].id,
    358                            host2.id: jobs['parent_job'].id})
    359         # Release host1
    360         r.record_release([host1])
    361         self.verify_state(r, {jobs['parent_job'].id:1},
    362                           {host2.id: jobs['parent_job'].id})
    363         # Release host2
    364         r.record_release([host2])
    365         self.verify_state(r, {}, {})
    366 
    367 
    368     def testGetMinDuts(self):
    369         """Test get min dut for suite."""
    370         host1 = self.db_helper.create_host('h1')
    371         host2 = self.db_helper.create_host('h2')
    372         host3 = self.db_helper.create_host('h3')
    373         jobs = self.create_suite(num=3)
    374         pid = jobs['parent_job'].id
    375         # Set min_dut=1 for the suite as a job keyval.
    376         keyval = models.JobKeyval(
    377                 job_id=pid, key=constants.SUITE_MIN_DUTS_KEY, value=2)
    378         keyval.save()
    379         r = host_scheduler.SuiteRecorder(self.job_query_manager)
    380         # Not job has got any host, min dut to request should equal to what's
    381         # specified in the job keyval.
    382         self.assertEqual(r.get_min_duts([pid]), {pid: 2})
    383         self.assign_host_to_job(host1, jobs[0], r)
    384         self.assertEqual(r.get_min_duts([pid]), {pid: 1})
    385         self.assign_host_to_job(host2, jobs[1], r)
    386         self.assertEqual(r.get_min_duts([pid]), {pid: 0})
    387         self.assign_host_to_job(host3, jobs[2], r)
    388         self.assertEqual(r.get_min_duts([pid]), {pid: 0})
    389         r.record_release([host1])
    390         self.assertEqual(r.get_min_duts([pid]), {pid: 0})
    391         r.record_release([host2])
    392         self.assertEqual(r.get_min_duts([pid]), {pid: 1})
    393         r.record_release([host3])
    394         self.assertEqual(r.get_min_duts([pid]), {pid: 2})
    395 
    396 if __name__ == '__main__':
    397     unittest.main()
    398 
    399