1 #!/usr/bin/python 2 3 #pylint: disable=missing-docstring 4 5 import unittest 6 7 import common 8 from autotest_lib.frontend import setup_django_environment 9 from autotest_lib.frontend.afe import models 10 from autotest_lib.scheduler import monitor_db_unittest 11 12 _DEBUG = False 13 14 15 class OnlyIfNeededTest(monitor_db_unittest.DispatcherSchedulingTest): 16 17 def _setup_test_only_if_needed_labels(self): 18 # apply only_if_needed label3 to host1 19 models.Host.smart_get('host1').labels.add(self.label3) 20 return self._create_job_simple([1], use_metahost=True) 21 22 23 def test_only_if_needed_labels_avoids_host(self): 24 job = self._setup_test_only_if_needed_labels() 25 # if the job doesn't depend on label3, there should be no scheduling 26 self._run_scheduler() 27 self._check_for_extra_schedulings() 28 29 30 def test_only_if_needed_labels_schedules(self): 31 job = self._setup_test_only_if_needed_labels() 32 job.dependency_labels.add(self.label3) 33 self._run_scheduler() 34 self._assert_job_scheduled_on(1, 1) 35 self._check_for_extra_schedulings() 36 37 38 def test_only_if_needed_labels_via_metahost(self): 39 job = self._setup_test_only_if_needed_labels() 40 job.dependency_labels.add(self.label3) 41 # should also work if the metahost is the only_if_needed label 42 self._do_query('DELETE FROM afe_jobs_dependency_labels') 43 self._create_job(metahosts=[3]) 44 self._run_scheduler() 45 self._assert_job_scheduled_on(2, 1) 46 self._check_for_extra_schedulings() 47 48 49 def test_metahosts_obey_blocks(self): 50 """ 51 Metahosts can't get scheduled on hosts already scheduled for 52 that job. 53 """ 54 self._create_job(metahosts=[1], hosts=[1]) 55 # make the nonmetahost entry complete, so the metahost can try 56 # to get scheduled 57 self._update_hqe(set='complete = 1', where='host_id=1') 58 self._run_scheduler() 59 self._check_for_extra_schedulings() 60 61 62 if __name__ == '__main__': 63 unittest.main() 64 65