Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #pylint: disable-msg=C0111
      3 
      4 import datetime
      5 import common
      6 from autotest_lib.frontend import setup_django_environment
      7 from autotest_lib.frontend.afe import frontend_test_utils
      8 from autotest_lib.client.common_lib import host_queue_entry_states
      9 from autotest_lib.client.common_lib.test_utils import mock
     10 from autotest_lib.client.common_lib.test_utils import unittest
     11 from autotest_lib.database import database_connection
     12 from autotest_lib.frontend.afe import models, model_attributes
     13 from autotest_lib.scheduler import monitor_db
     14 from autotest_lib.scheduler import scheduler_lib
     15 from autotest_lib.scheduler import scheduler_models
     16 
     17 _DEBUG = False
     18 
     19 
     20 class BaseSchedulerModelsTest(unittest.TestCase,
     21                               frontend_test_utils.FrontendTestMixin):
     22     _config_section = 'AUTOTEST_WEB'
     23 
     24     def _do_query(self, sql):
     25         self._database.execute(sql)
     26 
     27 
     28     def _set_monitor_stubs(self):
     29         # Clear the instance cache as this is a brand new database.
     30         scheduler_models.DBObject._clear_instance_cache()
     31 
     32         self._database = (
     33             database_connection.TranslatingDatabase.get_test_database(
     34                 translators=scheduler_lib._DB_TRANSLATORS))
     35         self._database.connect(db_type='django')
     36         self._database.debug = _DEBUG
     37 
     38         self.god.stub_with(scheduler_models, '_db', self._database)
     39 
     40 
     41     def setUp(self):
     42         self._frontend_common_setup()
     43         self._set_monitor_stubs()
     44 
     45 
     46     def tearDown(self):
     47         self._database.disconnect()
     48         self._frontend_common_teardown()
     49 
     50 
     51     def _update_hqe(self, set, where=''):
     52         query = 'UPDATE afe_host_queue_entries SET ' + set
     53         if where:
     54             query += ' WHERE ' + where
     55         self._do_query(query)
     56 
     57 
     58 class DelayedCallTaskTest(unittest.TestCase):
     59     def setUp(self):
     60         self.god = mock.mock_god()
     61 
     62 
     63     def tearDown(self):
     64         self.god.unstub_all()
     65 
     66 
     67     def test_delayed_call(self):
     68         test_time = self.god.create_mock_function('time')
     69         test_time.expect_call().and_return(33)
     70         test_time.expect_call().and_return(34.01)
     71         test_time.expect_call().and_return(34.99)
     72         test_time.expect_call().and_return(35.01)
     73         def test_callback():
     74             test_callback.calls += 1
     75         test_callback.calls = 0
     76         delay_task = scheduler_models.DelayedCallTask(
     77                 delay_seconds=2, callback=test_callback,
     78                 now_func=test_time)  # time 33
     79         self.assertEqual(35, delay_task.end_time)
     80         delay_task.poll()  # activates the task and polls it once, time 34.01
     81         self.assertEqual(0, test_callback.calls, "callback called early")
     82         delay_task.poll()  # time 34.99
     83         self.assertEqual(0, test_callback.calls, "callback called early")
     84         delay_task.poll()  # time 35.01
     85         self.assertEqual(1, test_callback.calls)
     86         self.assert_(delay_task.is_done())
     87         self.assert_(delay_task.success)
     88         self.assert_(not delay_task.aborted)
     89         self.god.check_playback()
     90 
     91 
     92     def test_delayed_call_abort(self):
     93         delay_task = scheduler_models.DelayedCallTask(
     94                 delay_seconds=987654, callback=lambda : None)
     95         delay_task.abort()
     96         self.assert_(delay_task.aborted)
     97         self.assert_(delay_task.is_done())
     98         self.assert_(not delay_task.success)
     99         self.god.check_playback()
    100 
    101 
    102 class DBObjectTest(BaseSchedulerModelsTest):
    103     def test_compare_fields_in_row(self):
    104         host = scheduler_models.Host(id=1)
    105         fields = list(host._fields)
    106         row_data = [getattr(host, fieldname) for fieldname in fields]
    107         self.assertEqual({}, host._compare_fields_in_row(row_data))
    108         row_data[fields.index('hostname')] = 'spam'
    109         self.assertEqual({'hostname': ('host1', 'spam')},
    110                          host._compare_fields_in_row(row_data))
    111         row_data[fields.index('id')] = 23
    112         self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)},
    113                          host._compare_fields_in_row(row_data))
    114 
    115 
    116     def test_compare_fields_in_row_datetime_ignores_microseconds(self):
    117         datetime_with_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 7890)
    118         datetime_without_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 0)
    119         class TestTable(scheduler_models.DBObject):
    120             _table_name = 'test_table'
    121             _fields = ('id', 'test_datetime')
    122         tt = TestTable(row=[1, datetime_without_us])
    123         self.assertEqual({}, tt._compare_fields_in_row([1, datetime_with_us]))
    124 
    125 
    126     def test_always_query(self):
    127         host_a = scheduler_models.Host(id=2)
    128         self.assertEqual(host_a.hostname, 'host2')
    129         self._do_query('UPDATE afe_hosts SET hostname="host2-updated" '
    130                        'WHERE id=2')
    131         host_b = scheduler_models.Host(id=2, always_query=True)
    132         self.assert_(host_a is host_b, 'Cached instance not returned.')
    133         self.assertEqual(host_a.hostname, 'host2-updated',
    134                          'Database was not re-queried')
    135 
    136         # If either of these are called, a query was made when it shouldn't be.
    137         host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!')
    138         host_a._update_fields_from_row = host_a._compare_fields_in_row
    139         host_c = scheduler_models.Host(id=2, always_query=False)
    140         self.assert_(host_a is host_c, 'Cached instance not returned')
    141 
    142 
    143     def test_delete(self):
    144         host = scheduler_models.Host(id=3)
    145         host.delete()
    146         host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3,
    147                                  always_query=False)
    148         host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3,
    149                                  always_query=True)
    150 
    151     def test_save(self):
    152         # Dummy Job to avoid creating a one in the HostQueueEntry __init__.
    153         class MockJob(object):
    154             def __init__(self, id):
    155                 pass
    156             def tag(self):
    157                 return 'MockJob'
    158         self.god.stub_with(scheduler_models, 'Job', MockJob)
    159         hqe = scheduler_models.HostQueueEntry(
    160                 new_record=True,
    161                 row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None,
    162                      None])
    163         hqe.save()
    164         new_id = hqe.id
    165         # Force a re-query and verify that the correct data was stored.
    166         scheduler_models.DBObject._clear_instance_cache()
    167         hqe = scheduler_models.HostQueueEntry(id=new_id)
    168         self.assertEqual(hqe.id, new_id)
    169         self.assertEqual(hqe.job_id, 1)
    170         self.assertEqual(hqe.host_id, 2)
    171         self.assertEqual(hqe.status, 'Queued')
    172         self.assertEqual(hqe.meta_host, None)
    173         self.assertEqual(hqe.active, False)
    174         self.assertEqual(hqe.complete, False)
    175         self.assertEqual(hqe.deleted, False)
    176         self.assertEqual(hqe.execution_subdir, '.')
    177         self.assertEqual(hqe.atomic_group_id, None)
    178         self.assertEqual(hqe.started_on, None)
    179         self.assertEqual(hqe.finished_on, None)
    180 
    181 
    182 class HostTest(BaseSchedulerModelsTest):
    183     def test_cmp_for_sort(self):
    184         expected_order = [
    185                 'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010',
    186                 'host10', 'host11', 'yolkfolk']
    187         hostname_idx = list(scheduler_models.Host._fields).index('hostname')
    188         row = [None] * len(scheduler_models.Host._fields)
    189         hosts = []
    190         for hostname in expected_order:
    191             row[hostname_idx] = hostname
    192             hosts.append(scheduler_models.Host(row=row, new_record=True))
    193 
    194         host1 = hosts[expected_order.index('Host1')]
    195         host010 = hosts[expected_order.index('HOST010')]
    196         host10 = hosts[expected_order.index('host10')]
    197         host3 = hosts[expected_order.index('host3')]
    198         alice = hosts[expected_order.index('alice')]
    199         self.assertEqual(0, scheduler_models.Host.cmp_for_sort(host10, host10))
    200         self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host10, host010))
    201         self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host010, host10))
    202         self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host10))
    203         self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host010))
    204         self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host10))
    205         self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host010))
    206         self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, host1))
    207         self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host3))
    208         self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(alice, host3))
    209         self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, alice))
    210         self.assertEqual(0, scheduler_models.Host.cmp_for_sort(alice, alice))
    211 
    212         hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)
    213         self.assertEqual(expected_order, [h.hostname for h in hosts])
    214 
    215         hosts.reverse()
    216         hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)
    217         self.assertEqual(expected_order, [h.hostname for h in hosts])
    218 
    219 
    220 class HostQueueEntryTest(BaseSchedulerModelsTest):
    221     def _create_hqe(self, dependency_labels=(), **create_job_kwargs):
    222         job = self._create_job(**create_job_kwargs)
    223         for label in dependency_labels:
    224             job.dependency_labels.add(label)
    225         hqes = list(scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % job.id))
    226         self.assertEqual(1, len(hqes))
    227         return hqes[0]
    228 
    229 
    230     def _check_hqe_labels(self, hqe, expected_labels):
    231         expected_labels = set(expected_labels)
    232         label_names = set(label.name for label in hqe.get_labels())
    233         self.assertEqual(expected_labels, label_names)
    234 
    235 
    236     def test_get_labels_empty(self):
    237         hqe = self._create_hqe(hosts=[1])
    238         labels = list(hqe.get_labels())
    239         self.assertEqual([], labels)
    240 
    241 
    242     def test_get_labels_metahost(self):
    243         hqe = self._create_hqe(metahosts=[2])
    244         self._check_hqe_labels(hqe, ['label2'])
    245 
    246 
    247     def test_get_labels_dependancies(self):
    248         hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),
    249                                metahosts=[1])
    250         self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])
    251 
    252 
    253     def setup_abort_test(self, agent_finished=True):
    254         """Setup the variables for testing abort method.
    255 
    256         @param agent_finished: True to mock agent is finished before aborting
    257                                the hqe.
    258         @return hqe, dispatcher: Mock object of hqe and dispatcher to be used
    259                                to test abort method.
    260         """
    261         hqe = self._create_hqe(hosts=[1])
    262         hqe.aborted = True
    263         hqe.complete = False
    264         hqe.status = models.HostQueueEntry.Status.STARTING
    265         hqe.started_on = datetime.datetime.now()
    266 
    267         dispatcher = self.god.create_mock_class(monitor_db.BaseDispatcher,
    268                                                 'BaseDispatcher')
    269         agent = self.god.create_mock_class(monitor_db.Agent, 'Agent')
    270         dispatcher.get_agents_for_entry.expect_call(hqe).and_return([agent])
    271         agent.is_done.expect_call().and_return(agent_finished)
    272         return hqe, dispatcher
    273 
    274 
    275     def test_abort_fail_with_unfinished_agent(self):
    276         """abort should fail if the hqe still has agent not finished.
    277         """
    278         hqe, dispatcher = self.setup_abort_test(agent_finished=False)
    279         self.assertIsNone(hqe.finished_on)
    280         with self.assertRaises(AssertionError):
    281             hqe.abort(dispatcher)
    282         self.god.check_playback()
    283         # abort failed, finished_on should not be set
    284         self.assertIsNone(hqe.finished_on)
    285 
    286 
    287     def test_abort_success(self):
    288         """abort should succeed if all agents for the hqe are finished.
    289         """
    290         hqe, dispatcher = self.setup_abort_test(agent_finished=True)
    291         self.assertIsNone(hqe.finished_on)
    292         hqe.abort(dispatcher)
    293         self.god.check_playback()
    294         self.assertIsNotNone(hqe.finished_on)
    295 
    296 
    297     def test_set_finished_on(self):
    298         """Test that finished_on is set when hqe completes."""
    299         for status in host_queue_entry_states.Status.values:
    300             hqe = self._create_hqe(hosts=[1])
    301             hqe.started_on = datetime.datetime.now()
    302             hqe.job.update_field('shard_id', 3)
    303             self.assertIsNone(hqe.finished_on)
    304             hqe.set_status(status)
    305             if status in host_queue_entry_states.COMPLETE_STATUSES:
    306                 self.assertIsNotNone(hqe.finished_on)
    307                 self.assertIsNone(hqe.job.shard_id)
    308             else:
    309                 self.assertIsNone(hqe.finished_on)
    310                 self.assertEquals(hqe.job.shard_id, 3)
    311 
    312 
    313 class JobTest(BaseSchedulerModelsTest):
    314     def setUp(self):
    315         super(JobTest, self).setUp()
    316 
    317         def _mock_create(**kwargs):
    318             task = models.SpecialTask(**kwargs)
    319             task.save()
    320             self._tasks.append(task)
    321         self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create)
    322 
    323 
    324     def _test_pre_job_tasks_helper(self,
    325                             reboot_before=model_attributes.RebootBefore.ALWAYS):
    326         """
    327         Calls HQE._do_schedule_pre_job_tasks() and returns the created special
    328         task
    329         """
    330         self._tasks = []
    331         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
    332         queue_entry.job.reboot_before = reboot_before
    333         queue_entry._do_schedule_pre_job_tasks()
    334         return self._tasks
    335 
    336 
    337     def test_job_request_abort(self):
    338         django_job = self._create_job(hosts=[5, 6], atomic_group=1)
    339         job = scheduler_models.Job(django_job.id)
    340         job.request_abort()
    341         django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
    342         for hqe in django_hqes:
    343             self.assertTrue(hqe.aborted)
    344 
    345 
    346     def test__atomic_and_has_started__on_atomic(self):
    347         self._create_job(hosts=[5, 6], atomic_group=1)
    348         job = scheduler_models.Job.fetch('id = 1')[0]
    349         self.assertFalse(job._atomic_and_has_started())
    350 
    351         self._update_hqe("status='Pending'")
    352         self.assertFalse(job._atomic_and_has_started())
    353         self._update_hqe("status='Verifying'")
    354         self.assertFalse(job._atomic_and_has_started())
    355         self.assertFalse(job._atomic_and_has_started())
    356         self._update_hqe("status='Failed'")
    357         self.assertFalse(job._atomic_and_has_started())
    358         self._update_hqe("status='Stopped'")
    359         self.assertFalse(job._atomic_and_has_started())
    360 
    361         self._update_hqe("status='Starting'")
    362         self.assertTrue(job._atomic_and_has_started())
    363         self._update_hqe("status='Completed'")
    364         self.assertTrue(job._atomic_and_has_started())
    365         self._update_hqe("status='Aborted'")
    366 
    367 
    368     def test__atomic_and_has_started__not_atomic(self):
    369         self._create_job(hosts=[1, 2])
    370         job = scheduler_models.Job.fetch('id = 1')[0]
    371         self.assertFalse(job._atomic_and_has_started())
    372         self._update_hqe("status='Starting'")
    373         self.assertFalse(job._atomic_and_has_started())
    374 
    375 
    376     def _check_special_tasks(self, tasks, task_types):
    377         self.assertEquals(len(tasks), len(task_types))
    378         for task, (task_type, queue_entry_id) in zip(tasks, task_types):
    379             self.assertEquals(task.task, task_type)
    380             self.assertEquals(task.host.id, 1)
    381             if queue_entry_id:
    382                 self.assertEquals(task.queue_entry.id, queue_entry_id)
    383 
    384 
    385     def test_run_asynchronous(self):
    386         self._create_job(hosts=[1, 2])
    387 
    388         tasks = self._test_pre_job_tasks_helper()
    389 
    390         self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)])
    391 
    392 
    393     def test_run_asynchronous_skip_verify(self):
    394         job = self._create_job(hosts=[1, 2])
    395         job.run_verify = False
    396         job.save()
    397 
    398         tasks = self._test_pre_job_tasks_helper()
    399 
    400         self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)])
    401 
    402 
    403     def test_run_synchronous_verify(self):
    404         self._create_job(hosts=[1, 2], synchronous=True)
    405 
    406         tasks = self._test_pre_job_tasks_helper()
    407 
    408         self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)])
    409 
    410 
    411     def test_run_synchronous_skip_verify(self):
    412         job = self._create_job(hosts=[1, 2], synchronous=True)
    413         job.run_verify = False
    414         job.save()
    415 
    416         tasks = self._test_pre_job_tasks_helper()
    417 
    418         self._check_special_tasks(tasks, [(models.SpecialTask.Task.RESET, 1)])
    419 
    420 
    421     def test_run_asynchronous_do_not_reset(self):
    422         job = self._create_job(hosts=[1, 2])
    423         job.run_reset = False
    424         job.run_verify = False
    425         job.save()
    426 
    427         tasks = self._test_pre_job_tasks_helper()
    428 
    429         self.assertEquals(tasks, [])
    430 
    431 
    432     def test_run_synchronous_do_not_reset_no_RebootBefore(self):
    433         job = self._create_job(hosts=[1, 2], synchronous=True)
    434         job.reboot_before = model_attributes.RebootBefore.NEVER
    435         job.save()
    436 
    437         tasks = self._test_pre_job_tasks_helper(
    438                             reboot_before=model_attributes.RebootBefore.NEVER)
    439 
    440         self._check_special_tasks(tasks, [(models.SpecialTask.Task.VERIFY, 1)])
    441 
    442 
    443     def test_run_asynchronous_do_not_reset(self):
    444         job = self._create_job(hosts=[1, 2], synchronous=False)
    445         job.reboot_before = model_attributes.RebootBefore.NEVER
    446         job.save()
    447 
    448         tasks = self._test_pre_job_tasks_helper(
    449                             reboot_before=model_attributes.RebootBefore.NEVER)
    450 
    451         self._check_special_tasks(tasks, [(models.SpecialTask.Task.VERIFY, 1)])
    452 
    453 
    454     def test_run_atomic_group_already_started(self):
    455         self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
    456         self._update_hqe("status='Starting', execution_subdir=''")
    457 
    458         job = scheduler_models.Job.fetch('id = 1')[0]
    459         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
    460         assert queue_entry.job is job
    461         self.assertEqual(None, job.run(queue_entry))
    462 
    463         self.god.check_playback()
    464 
    465 
    466     def test_reboot_before_always(self):
    467         job = self._create_job(hosts=[1])
    468         job.reboot_before = model_attributes.RebootBefore.ALWAYS
    469         job.save()
    470 
    471         tasks = self._test_pre_job_tasks_helper()
    472 
    473         self._check_special_tasks(tasks, [
    474                 (models.SpecialTask.Task.RESET, None)
    475             ])
    476 
    477 
    478     def _test_reboot_before_if_dirty_helper(self):
    479         job = self._create_job(hosts=[1])
    480         job.reboot_before = model_attributes.RebootBefore.IF_DIRTY
    481         job.save()
    482 
    483         tasks = self._test_pre_job_tasks_helper()
    484         task_types = [(models.SpecialTask.Task.RESET, None)]
    485 
    486         self._check_special_tasks(tasks, task_types)
    487 
    488 
    489     def test_reboot_before_if_dirty(self):
    490         models.Host.smart_get(1).update_object(dirty=True)
    491         self._test_reboot_before_if_dirty_helper()
    492 
    493 
    494     def test_reboot_before_not_dirty(self):
    495         models.Host.smart_get(1).update_object(dirty=False)
    496         self._test_reboot_before_if_dirty_helper()
    497 
    498 
    499     def test_next_group_name(self):
    500         django_job = self._create_job(metahosts=[1])
    501         job = scheduler_models.Job(id=django_job.id)
    502         self.assertEqual('group0', job._next_group_name())
    503 
    504         for hqe in django_job.hostqueueentry_set.filter():
    505             hqe.execution_subdir = 'my_rack.group0'
    506             hqe.save()
    507         self.assertEqual('my_rack.group1', job._next_group_name('my/rack'))
    508 
    509 
    510 if __name__ == '__main__':
    511     unittest.main()
    512