Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 # pylint: disable=missing-docstring
      3 
      4 import time
      5 import unittest
      6 
      7 import common
      8 from autotest_lib.frontend import setup_django_environment
      9 from autotest_lib.frontend.afe import frontend_test_utils
     10 from autotest_lib.client.common_lib import global_config
     11 from autotest_lib.client.common_lib.test_utils import mock
     12 from autotest_lib.database import database_connection
     13 from autotest_lib.frontend.afe import models
     14 from autotest_lib.scheduler import agent_task
     15 from autotest_lib.scheduler import luciferlib
     16 from autotest_lib.scheduler import monitor_db, drone_manager
     17 from autotest_lib.scheduler import pidfile_monitor
     18 from autotest_lib.scheduler import scheduler_config
     19 from autotest_lib.scheduler import scheduler_lib
     20 from autotest_lib.scheduler import scheduler_models
     21 
     22 _DEBUG = False
     23 
     24 
     25 class DummyAgentTask(object):
     26     num_processes = 1
     27     owner_username = 'my_user'
     28 
     29     def get_drone_hostnames_allowed(self):
     30         return None
     31 
     32 
     33 class DummyAgent(object):
     34     started = False
     35     _is_done = False
     36     host_ids = ()
     37     hostnames = {}
     38     queue_entry_ids = ()
     39 
     40     def __init__(self):
     41         self.task = DummyAgentTask()
     42 
     43 
     44     def tick(self):
     45         self.started = True
     46 
     47 
     48     def is_done(self):
     49         return self._is_done
     50 
     51 
     52     def set_done(self, done):
     53         self._is_done = done
     54 
     55 
     56 class IsRow(mock.argument_comparator):
     57     def __init__(self, row_id):
     58         self.row_id = row_id
     59 
     60 
     61     def is_satisfied_by(self, parameter):
     62         return list(parameter)[0] == self.row_id
     63 
     64 
     65     def __str__(self):
     66         return 'row with id %s' % self.row_id
     67 
     68 
     69 class IsAgentWithTask(mock.argument_comparator):
     70     def __init__(self, task):
     71         self._task = task
     72 
     73 
     74     def is_satisfied_by(self, parameter):
     75         if not isinstance(parameter, monitor_db.Agent):
     76             return False
     77         tasks = list(parameter.queue.queue)
     78         if len(tasks) != 1:
     79             return False
     80         return tasks[0] == self._task
     81 
     82 
     83 def _set_host_and_qe_ids(agent_or_task, id_list=None):
     84     if id_list is None:
     85         id_list = []
     86     agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
     87     agent_or_task.hostnames = dict((host_id, '192.168.1.1')
     88                                    for host_id in id_list)
     89 
     90 
     91 class BaseSchedulerTest(unittest.TestCase,
     92                         frontend_test_utils.FrontendTestMixin):
     93     _config_section = 'AUTOTEST_WEB'
     94 
     95     def _do_query(self, sql):
     96         self._database.execute(sql)
     97 
     98 
     99     def _set_monitor_stubs(self):
    100         self.mock_config = global_config.FakeGlobalConfig()
    101         self.god.stub_with(global_config, 'global_config', self.mock_config)
    102 
    103         # Clear the instance cache as this is a brand new database.
    104         scheduler_models.DBObject._clear_instance_cache()
    105 
    106         self._database = (
    107             database_connection.TranslatingDatabase.get_test_database(
    108                 translators=scheduler_lib._DB_TRANSLATORS))
    109         self._database.connect(db_type='django')
    110         self._database.debug = _DEBUG
    111 
    112         connection_manager = scheduler_lib.ConnectionManager(autocommit=False)
    113         self.god.stub_with(connection_manager, 'db_connection', self._database)
    114         self.god.stub_with(monitor_db, '_db_manager', connection_manager)
    115         self.god.stub_with(monitor_db, '_db', self._database)
    116 
    117         self.god.stub_with(monitor_db.Dispatcher,
    118                            '_get_pending_queue_entries',
    119                            self._get_pending_hqes)
    120         self.god.stub_with(scheduler_models, '_db', self._database)
    121         self.god.stub_with(drone_manager.instance(), '_results_dir',
    122                            '/test/path')
    123         self.god.stub_with(drone_manager.instance(), '_temporary_directory',
    124                            '/test/path/tmp')
    125         self.god.stub_with(drone_manager.instance(), 'initialize',
    126                            lambda *args: None)
    127         self.god.stub_with(drone_manager.instance(), 'execute_actions',
    128                            lambda *args: None)
    129 
    130         monitor_db.initialize_globals()
    131         scheduler_models.initialize_globals()
    132 
    133 
    134     def setUp(self):
    135         self._frontend_common_setup()
    136         self._set_monitor_stubs()
    137         self._set_global_config_values()
    138         self._dispatcher = monitor_db.Dispatcher()
    139 
    140 
    141     def tearDown(self):
    142         self._database.disconnect()
    143         self._frontend_common_teardown()
    144 
    145 
    146     def _set_global_config_values(self):
    147         """Set global_config values to suit unittest needs."""
    148         self.mock_config.set_config_value(
    149                 'SCHEDULER', 'inline_host_acquisition', True)
    150 
    151 
    152     def _update_hqe(self, set, where=''):
    153         query = 'UPDATE afe_host_queue_entries SET ' + set
    154         if where:
    155             query += ' WHERE ' + where
    156         self._do_query(query)
    157 
    158 
    159     def _get_pending_hqes(self):
    160         query_string=('afe_jobs.priority DESC, '
    161                       'ifnull(nullif(host_id, NULL), host_id) DESC, '
    162                       'ifnull(nullif(meta_host, NULL), meta_host) DESC, '
    163                       'job_id')
    164         return list(scheduler_models.HostQueueEntry.fetch(
    165             joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
    166             where='NOT complete AND NOT active AND status="Queued"',
    167             order_by=query_string))
    168 
    169 
    170 class DispatcherSchedulingTest(BaseSchedulerTest):
    171     _jobs_scheduled = []
    172 
    173 
    174     def tearDown(self):
    175         super(DispatcherSchedulingTest, self).tearDown()
    176 
    177 
    178     def _set_monitor_stubs(self):
    179         super(DispatcherSchedulingTest, self)._set_monitor_stubs()
    180 
    181         def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
    182             """Called by HostQueueEntry.run()."""
    183             self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
    184             queue_entry.set_status('Starting')
    185 
    186         self.god.stub_with(scheduler_models.HostQueueEntry,
    187                            '_do_schedule_pre_job_tasks',
    188                            hqe__do_schedule_pre_job_tasks_stub)
    189 
    190 
    191     def _record_job_scheduled(self, job_id, host_id):
    192         record = (job_id, host_id)
    193         self.assert_(record not in self._jobs_scheduled,
    194                      'Job %d scheduled on host %d twice' %
    195                      (job_id, host_id))
    196         self._jobs_scheduled.append(record)
    197 
    198 
    199     def _assert_job_scheduled_on(self, job_id, host_id):
    200         record = (job_id, host_id)
    201         self.assert_(record in self._jobs_scheduled,
    202                      'Job %d not scheduled on host %d as expected\n'
    203                      'Jobs scheduled: %s' %
    204                      (job_id, host_id, self._jobs_scheduled))
    205         self._jobs_scheduled.remove(record)
    206 
    207 
    208     def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
    209         """Assert job was scheduled on exactly number hosts out of a set."""
    210         found = []
    211         for host_id in host_ids:
    212             record = (job_id, host_id)
    213             if record in self._jobs_scheduled:
    214                 found.append(record)
    215                 self._jobs_scheduled.remove(record)
    216         if len(found) < number:
    217             self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
    218                       'Jobs scheduled: %s' % (job_id, number, host_ids, found))
    219         elif len(found) > number:
    220             self.fail('Job %d scheduled on more than %d hosts in %s.\n'
    221                       'Jobs scheduled: %s' % (job_id, number, host_ids, found))
    222 
    223 
    224     def _check_for_extra_schedulings(self):
    225         if len(self._jobs_scheduled) != 0:
    226             self.fail('Extra jobs scheduled: ' +
    227                       str(self._jobs_scheduled))
    228 
    229 
    230     def _convert_jobs_to_metahosts(self, *job_ids):
    231         sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
    232         self._do_query('UPDATE afe_host_queue_entries SET '
    233                        'meta_host=host_id, host_id=NULL '
    234                        'WHERE job_id IN ' + sql_tuple)
    235 
    236 
    237     def _lock_host(self, host_id):
    238         self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +
    239                        str(host_id))
    240 
    241 
    242     def setUp(self):
    243         super(DispatcherSchedulingTest, self).setUp()
    244         self._jobs_scheduled = []
    245 
    246 
    247     def _run_scheduler(self):
    248         self._dispatcher._host_scheduler.tick()
    249         for _ in xrange(2): # metahost scheduling can take two ticks
    250             self._dispatcher._schedule_new_jobs()
    251 
    252 
    253     def _test_basic_scheduling_helper(self, use_metahosts):
    254         'Basic nonmetahost scheduling'
    255         self._create_job_simple([1], use_metahosts)
    256         self._create_job_simple([2], use_metahosts)
    257         self._run_scheduler()
    258         self._assert_job_scheduled_on(1, 1)
    259         self._assert_job_scheduled_on(2, 2)
    260         self._check_for_extra_schedulings()
    261 
    262 
    263     def _test_priorities_helper(self, use_metahosts):
    264         'Test prioritization ordering'
    265         self._create_job_simple([1], use_metahosts)
    266         self._create_job_simple([2], use_metahosts)
    267         self._create_job_simple([1,2], use_metahosts)
    268         self._create_job_simple([1], use_metahosts, priority=1)
    269         self._run_scheduler()
    270         self._assert_job_scheduled_on(4, 1) # higher priority
    271         self._assert_job_scheduled_on(2, 2) # earlier job over later
    272         self._check_for_extra_schedulings()
    273 
    274 
    275     def _test_hosts_ready_helper(self, use_metahosts):
    276         """
    277         Only hosts that are status=Ready, unlocked and not invalid get
    278         scheduled.
    279         """
    280         self._create_job_simple([1], use_metahosts)
    281         self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
    282         self._run_scheduler()
    283         self._check_for_extra_schedulings()
    284 
    285         self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
    286                        'WHERE id=1')
    287         self._run_scheduler()
    288         self._check_for_extra_schedulings()
    289 
    290         self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
    291                        'WHERE id=1')
    292         self._run_scheduler()
    293         if not use_metahosts:
    294             self._assert_job_scheduled_on(1, 1)
    295         self._check_for_extra_schedulings()
    296 
    297 
    298     def _test_hosts_idle_helper(self, use_metahosts):
    299         'Only idle hosts get scheduled'
    300         self._create_job(hosts=[1], active=True)
    301         self._create_job_simple([1], use_metahosts)
    302         self._run_scheduler()
    303         self._check_for_extra_schedulings()
    304 
    305 
    306     def _test_obey_ACLs_helper(self, use_metahosts):
    307         self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
    308         self._create_job_simple([1], use_metahosts)
    309         self._run_scheduler()
    310         self._check_for_extra_schedulings()
    311 
    312 
    313     def test_basic_scheduling(self):
    314         self._test_basic_scheduling_helper(False)
    315 
    316 
    317     def test_priorities(self):
    318         self._test_priorities_helper(False)
    319 
    320 
    321     def test_hosts_ready(self):
    322         self._test_hosts_ready_helper(False)
    323 
    324 
    325     def test_hosts_idle(self):
    326         self._test_hosts_idle_helper(False)
    327 
    328 
    329     def test_obey_ACLs(self):
    330         self._test_obey_ACLs_helper(False)
    331 
    332 
    333     def test_one_time_hosts_ignore_ACLs(self):
    334         self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
    335         self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
    336         self._create_job_simple([1])
    337         self._run_scheduler()
    338         self._assert_job_scheduled_on(1, 1)
    339         self._check_for_extra_schedulings()
    340 
    341 
    342     def test_non_metahost_on_invalid_host(self):
    343         """
    344         Non-metahost entries can get scheduled on invalid hosts (this is how
    345         one-time hosts work).
    346         """
    347         self._do_query('UPDATE afe_hosts SET invalid=1')
    348         self._test_basic_scheduling_helper(False)
    349 
    350 
    351     def test_metahost_scheduling(self):
    352         """
    353         Basic metahost scheduling
    354         """
    355         self._test_basic_scheduling_helper(True)
    356 
    357 
    358     def test_metahost_priorities(self):
    359         self._test_priorities_helper(True)
    360 
    361 
    362     def test_metahost_hosts_ready(self):
    363         self._test_hosts_ready_helper(True)
    364 
    365 
    366     def test_metahost_hosts_idle(self):
    367         self._test_hosts_idle_helper(True)
    368 
    369 
    370     def test_metahost_obey_ACLs(self):
    371         self._test_obey_ACLs_helper(True)
    372 
    373 
    374     def test_nonmetahost_over_metahost(self):
    375         """
    376         Non-metahost entries should take priority over metahost entries
    377         for the same host
    378         """
    379         self._create_job(metahosts=[1])
    380         self._create_job(hosts=[1])
    381         self._run_scheduler()
    382         self._assert_job_scheduled_on(2, 1)
    383         self._check_for_extra_schedulings()
    384 
    385 
    386     def test_no_execution_subdir_not_found(self):
    387         """Reproduce bug crosbug.com/334353 and recover from it."""
    388 
    389         self.mock_config.set_config_value('SCHEDULER', 'drones', 'localhost')
    390 
    391         job = self._create_job(hostless=True)
    392 
    393         # Ensure execution_subdir is set before status
    394         original_set_status = scheduler_models.HostQueueEntry.set_status
    395         def fake_set_status(hqe, *args, **kwargs):
    396             self.assertEqual(hqe.execution_subdir, 'hostless')
    397             original_set_status(hqe, *args, **kwargs)
    398 
    399         self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status',
    400                            fake_set_status)
    401 
    402         self._dispatcher._schedule_new_jobs()
    403 
    404         hqe = job.hostqueueentry_set.all()[0]
    405         self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
    406         self.assertEqual('hostless', hqe.execution_subdir)
    407 
    408 
    409     def test_only_schedule_queued_entries(self):
    410         self._create_job(metahosts=[1])
    411         self._update_hqe(set='active=1, host_id=2')
    412         self._run_scheduler()
    413         self._check_for_extra_schedulings()
    414 
    415 
    416     def test_no_ready_hosts(self):
    417         self._create_job(hosts=[1])
    418         self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
    419         self._run_scheduler()
    420         self._check_for_extra_schedulings()
    421 
    422 
    423 class DispatcherThrottlingTest(BaseSchedulerTest):
    424     """
    425     Test that the dispatcher throttles:
    426      * total number of running processes
    427      * number of processes started per cycle
    428     """
    429     _MAX_RUNNING = 3
    430     _MAX_STARTED = 2
    431 
    432     def setUp(self):
    433         super(DispatcherThrottlingTest, self).setUp()
    434         scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
    435 
    436         def fake_max_runnable_processes(fake_self, username,
    437                                         drone_hostnames_allowed):
    438             running = sum(agent.task.num_processes
    439                           for agent in self._agents
    440                           if agent.started and not agent.is_done())
    441             return self._MAX_RUNNING - running
    442         self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
    443                            fake_max_runnable_processes)
    444 
    445 
    446     def _setup_some_agents(self, num_agents):
    447         self._agents = [DummyAgent() for i in xrange(num_agents)]
    448         self._dispatcher._agents = list(self._agents)
    449 
    450 
    451     def _run_a_few_ticks(self):
    452         for i in xrange(4):
    453             self._dispatcher._handle_agents()
    454 
    455 
    456     def _assert_agents_started(self, indexes, is_started=True):
    457         for i in indexes:
    458             self.assert_(self._agents[i].started == is_started,
    459                          'Agent %d %sstarted' %
    460                          (i, is_started and 'not ' or ''))
    461 
    462 
    463     def _assert_agents_not_started(self, indexes):
    464         self._assert_agents_started(indexes, False)
    465 
    466 
    467     def test_throttle_total(self):
    468         self._setup_some_agents(4)
    469         self._run_a_few_ticks()
    470         self._assert_agents_started([0, 1, 2])
    471         self._assert_agents_not_started([3])
    472 
    473 
    474     def test_throttle_with_synchronous(self):
    475         self._setup_some_agents(2)
    476         self._agents[0].task.num_processes = 3
    477         self._run_a_few_ticks()
    478         self._assert_agents_started([0])
    479         self._assert_agents_not_started([1])
    480 
    481 
    482     def test_large_agent_starvation(self):
    483         """
    484         Ensure large agents don't get starved by lower-priority agents.
    485         """
    486         self._setup_some_agents(3)
    487         self._agents[1].task.num_processes = 3
    488         self._run_a_few_ticks()
    489         self._assert_agents_started([0])
    490         self._assert_agents_not_started([1, 2])
    491 
    492         self._agents[0].set_done(True)
    493         self._run_a_few_ticks()
    494         self._assert_agents_started([1])
    495         self._assert_agents_not_started([2])
    496 
    497 
    498     def test_zero_process_agent(self):
    499         self._setup_some_agents(5)
    500         self._agents[4].task.num_processes = 0
    501         self._run_a_few_ticks()
    502         self._assert_agents_started([0, 1, 2, 4])
    503         self._assert_agents_not_started([3])
    504 
    505 
    506 class PidfileRunMonitorTest(unittest.TestCase):
    507     execution_tag = 'test_tag'
    508     pid = 12345
    509     process = drone_manager.Process('myhost', pid)
    510     num_tests_failed = 1
    511 
    512     def setUp(self):
    513         self.god = mock.mock_god()
    514         self.mock_drone_manager = self.god.create_mock_class(
    515             drone_manager.DroneManager, 'drone_manager')
    516         self.god.stub_with(drone_manager, '_the_instance',
    517                            self.mock_drone_manager)
    518         self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
    519                            self._mock_get_pidfile_timeout_secs)
    520 
    521         self.pidfile_id = object()
    522 
    523         (self.mock_drone_manager.get_pidfile_id_from
    524              .expect_call(self.execution_tag,
    525                           pidfile_name=drone_manager.AUTOSERV_PID_FILE)
    526              .and_return(self.pidfile_id))
    527 
    528         self.monitor = pidfile_monitor.PidfileRunMonitor()
    529         self.monitor.attach_to_existing_process(self.execution_tag)
    530 
    531     def tearDown(self):
    532         self.god.unstub_all()
    533 
    534 
    535     def _mock_get_pidfile_timeout_secs(self):
    536         return 300
    537 
    538 
    539     def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
    540                       use_second_read=False):
    541         contents = drone_manager.PidfileContents()
    542         if pid is not None:
    543             contents.process = drone_manager.Process('myhost', pid)
    544         contents.exit_status = exit_code
    545         contents.num_tests_failed = tests_failed
    546         self.mock_drone_manager.get_pidfile_contents.expect_call(
    547             self.pidfile_id, use_second_read=use_second_read).and_return(
    548             contents)
    549 
    550 
    551     def set_not_yet_run(self):
    552         self.setup_pidfile()
    553 
    554 
    555     def set_empty_pidfile(self):
    556         self.setup_pidfile()
    557 
    558 
    559     def set_running(self, use_second_read=False):
    560         self.setup_pidfile(self.pid, use_second_read=use_second_read)
    561 
    562 
    563     def set_complete(self, error_code, use_second_read=False):
    564         self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
    565                            use_second_read=use_second_read)
    566 
    567 
    568     def _check_monitor(self, expected_pid, expected_exit_status,
    569                        expected_num_tests_failed):
    570         if expected_pid is None:
    571             self.assertEquals(self.monitor._state.process, None)
    572         else:
    573             self.assertEquals(self.monitor._state.process.pid, expected_pid)
    574         self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
    575         self.assertEquals(self.monitor._state.num_tests_failed,
    576                           expected_num_tests_failed)
    577 
    578 
    579         self.god.check_playback()
    580 
    581 
    582     def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
    583                                   expected_num_tests_failed):
    584         self.monitor._read_pidfile()
    585         self._check_monitor(expected_pid, expected_exit_status,
    586                             expected_num_tests_failed)
    587 
    588 
    589     def _get_expected_tests_failed(self, expected_exit_status):
    590         if expected_exit_status is None:
    591             expected_tests_failed = None
    592         else:
    593             expected_tests_failed = self.num_tests_failed
    594         return expected_tests_failed
    595 
    596 
    597     def test_read_pidfile(self):
    598         self.set_not_yet_run()
    599         self._test_read_pidfile_helper(None, None, None)
    600 
    601         self.set_empty_pidfile()
    602         self._test_read_pidfile_helper(None, None, None)
    603 
    604         self.set_running()
    605         self._test_read_pidfile_helper(self.pid, None, None)
    606 
    607         self.set_complete(123)
    608         self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
    609 
    610 
    611     def test_read_pidfile_error(self):
    612         self.mock_drone_manager.get_pidfile_contents.expect_call(
    613             self.pidfile_id, use_second_read=False).and_return(
    614             drone_manager.InvalidPidfile('error'))
    615         self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
    616                           self.monitor._read_pidfile)
    617         self.god.check_playback()
    618 
    619 
    620     def setup_is_running(self, is_running):
    621         self.mock_drone_manager.is_process_running.expect_call(
    622             self.process).and_return(is_running)
    623 
    624 
    625     def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
    626                                       expected_num_tests_failed):
    627         self.monitor._get_pidfile_info()
    628         self._check_monitor(expected_pid, expected_exit_status,
    629                             expected_num_tests_failed)
    630 
    631 
    632     def test_get_pidfile_info(self):
    633         """
    634         normal cases for get_pidfile_info
    635         """
    636         # running
    637         self.set_running()
    638         self.setup_is_running(True)
    639         self._test_get_pidfile_info_helper(self.pid, None, None)
    640 
    641         # exited during check
    642         self.set_running()
    643         self.setup_is_running(False)
    644         self.set_complete(123, use_second_read=True) # pidfile gets read again
    645         self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
    646 
    647         # completed
    648         self.set_complete(123)
    649         self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
    650 
    651 
    652     def test_get_pidfile_info_running_no_proc(self):
    653         """
    654         pidfile shows process running, but no proc exists
    655         """
    656         # running but no proc
    657         self.set_running()
    658         self.setup_is_running(False)
    659         self.set_running(use_second_read=True)
    660         self._test_get_pidfile_info_helper(self.pid, 1, 0)
    661         self.assertTrue(self.monitor.lost_process)
    662 
    663 
    664     def test_get_pidfile_info_not_yet_run(self):
    665         """
    666         pidfile hasn't been written yet
    667         """
    668         self.set_not_yet_run()
    669         self._test_get_pidfile_info_helper(None, None, None)
    670 
    671 
    672     def test_process_failed_to_write_pidfile(self):
    673         self.set_not_yet_run()
    674         self.monitor._start_time = (time.time() -
    675                                     pidfile_monitor._get_pidfile_timeout_secs() - 1)
    676         self._test_get_pidfile_info_helper(None, 1, 0)
    677         self.assertTrue(self.monitor.lost_process)
    678 
    679 
    680 class AgentTest(unittest.TestCase):
    681     def setUp(self):
    682         self.god = mock.mock_god()
    683         self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
    684                                                       'dispatcher')
    685 
    686 
    687     def tearDown(self):
    688         self.god.unstub_all()
    689 
    690 
    691     def _create_mock_task(self, name):
    692         task = self.god.create_mock_class(agent_task.AgentTask, name)
    693         task.num_processes = 1
    694         _set_host_and_qe_ids(task)
    695         return task
    696 
    697     def _create_agent(self, task):
    698         agent = monitor_db.Agent(task)
    699         agent.dispatcher = self._dispatcher
    700         return agent
    701 
    702 
    703     def _finish_agent(self, agent):
    704         while not agent.is_done():
    705             agent.tick()
    706 
    707 
    708     def test_agent_abort(self):
    709         task = self._create_mock_task('task')
    710         task.poll.expect_call()
    711         task.is_done.expect_call().and_return(False)
    712         task.abort.expect_call()
    713         task.aborted = True
    714 
    715         agent = self._create_agent(task)
    716         agent.tick()
    717         agent.abort()
    718         self._finish_agent(agent)
    719         self.god.check_playback()
    720 
    721 
    722     def _test_agent_abort_before_started_helper(self, ignore_abort=False):
    723         task = self._create_mock_task('task')
    724         task.abort.expect_call()
    725         if ignore_abort:
    726             task.aborted = False
    727             task.poll.expect_call()
    728             task.is_done.expect_call().and_return(True)
    729             task.success = True
    730         else:
    731             task.aborted = True
    732 
    733         agent = self._create_agent(task)
    734         agent.abort()
    735         self._finish_agent(agent)
    736         self.god.check_playback()
    737 
    738 
    739     def test_agent_abort_before_started(self):
    740         self._test_agent_abort_before_started_helper()
    741         self._test_agent_abort_before_started_helper(True)
    742 
    743 
    744 class JobSchedulingTest(BaseSchedulerTest):
    745     def _test_run_helper(self, expect_agent=True, expect_starting=False,
    746                          expect_pending=False, lucifer=False):
    747         if expect_starting:
    748             expected_status = models.HostQueueEntry.Status.STARTING
    749         elif expect_pending:
    750             expected_status = models.HostQueueEntry.Status.PENDING
    751         else:
    752             expected_status = models.HostQueueEntry.Status.VERIFYING
    753         job = scheduler_models.Job.fetch('id = 1')[0]
    754         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
    755         assert queue_entry.job is job
    756         job.run_if_ready(queue_entry)
    757 
    758         self.god.check_playback()
    759 
    760         self._dispatcher._schedule_running_host_queue_entries()
    761 
    762         actual_status = models.HostQueueEntry.smart_get(1).status
    763         self.assertEquals(expected_status, actual_status)
    764 
    765         if lucifer:
    766             # TODO(ayatane): Lucifer ruins the fragile expectations
    767             # here.  In particular, there won't be any agents.
    768             return
    769         agent = self._dispatcher._agents[0]
    770         if not expect_agent:
    771             self.assertEquals(agent, None)
    772             return
    773 
    774         self.assert_(isinstance(agent, monitor_db.Agent))
    775         self.assert_(agent.task)
    776         return agent.task
    777 
    778 
    779     def test_run_synchronous_ready(self):
    780         job = self._create_job(hosts=[1, 2], synchronous=True)
    781         self._update_hqe("status='Pending', execution_subdir=''")
    782         self._test_run_helper(expect_starting=True, lucifer=True)
    783         self._assert_lucifer_called_with(job)
    784 
    785 
    786     def test_schedule_running_host_queue_entries_fail(self):
    787         self._create_job(hosts=[2])
    788         self._update_hqe("status='%s', execution_subdir=''" %
    789                          models.HostQueueEntry.Status.PENDING)
    790         job = scheduler_models.Job.fetch('id = 1')[0]
    791         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
    792         assert queue_entry.job is job
    793         job.run_if_ready(queue_entry)
    794         self.assertEqual(queue_entry.status,
    795                          models.HostQueueEntry.Status.STARTING)
    796         self.assert_(queue_entry.execution_subdir)
    797         self.god.check_playback()
    798 
    799         class dummy_test_agent(object):
    800             task = 'dummy_test_agent'
    801         self._dispatcher._register_agent_for_ids(
    802                 self._dispatcher._host_agents, [queue_entry.host.id],
    803                 dummy_test_agent)
    804 
    805         # Attempted to schedule on a host that already has an agent.
    806         # Verify that it doesn't raise any error.
    807         self._dispatcher._schedule_running_host_queue_entries()
    808 
    809 
    810     def test_schedule_hostless_job(self):
    811         job = self._create_job(hostless=True)
    812         self.assertEqual(1, job.hostqueueentry_set.count())
    813         hqe_query = scheduler_models.HostQueueEntry.fetch(
    814                 'id = %s' % job.hostqueueentry_set.all()[0].id)
    815         self.assertEqual(1, len(hqe_query))
    816         hqe = hqe_query[0]
    817 
    818         self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status)
    819         self.assertEqual(0, len(self._dispatcher._agents))
    820 
    821         self._dispatcher._schedule_new_jobs()
    822 
    823         self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
    824         self._assert_lucifer_called_with(job)
    825 
    826 
    827     def _assert_lucifer_called_with(self, job):
    828         calls = []
    829         # Create a thing we can assign attributes on.
    830         fake_drone = lambda: None
    831         fake_drone.hostname = lambda: 'localhost'
    832         def fake(manager, job):
    833             calls.append((manager, job))
    834             return fake_drone
    835         old = luciferlib.spawn_starting_job_handler
    836         try:
    837             luciferlib.spawn_starting_job_handler = fake
    838             self._dispatcher._send_to_lucifer()
    839         finally:
    840             luciferlib.spawn_starting_job_handler = old
    841         self.assertEqual(calls[0][1], job)
    842 
    843 
    844 class TopLevelFunctionsTest(unittest.TestCase):
    845     def setUp(self):
    846         self.god = mock.mock_god()
    847 
    848 
    849     def tearDown(self):
    850         self.god.unstub_all()
    851 
    852 
    853     def test_autoserv_command_line(self):
    854         machines = 'abcd12,efgh34'
    855         extra_args = ['-Z', 'hello']
    856         expected_command_line_base = set((monitor_db._autoserv_path, '-p',
    857                                           '-m', machines, '-r',
    858                                           '--lab', 'True',
    859                                           drone_manager.WORKING_DIRECTORY))
    860 
    861         expected_command_line = expected_command_line_base.union(
    862                 ['--verbose']).union(extra_args)
    863         command_line = set(
    864                 monitor_db._autoserv_command_line(machines, extra_args))
    865         self.assertEqual(expected_command_line, command_line)
    866 
    867         class FakeJob(object):
    868             owner = 'Bob'
    869             name = 'fake job name'
    870             test_retry = 0
    871             id = 1337
    872 
    873         class FakeHQE(object):
    874             job = FakeJob
    875 
    876         expected_command_line = expected_command_line_base.union(
    877                 ['-u', FakeJob.owner, '-l', FakeJob.name])
    878         command_line = set(monitor_db._autoserv_command_line(
    879                 machines, extra_args=[], queue_entry=FakeHQE, verbose=False))
    880         self.assertEqual(expected_command_line, command_line)
    881 
    882 
    883 class AgentTaskTest(unittest.TestCase,
    884                     frontend_test_utils.FrontendTestMixin):
    885     def setUp(self):
    886         self._frontend_common_setup()
    887 
    888 
    889     def tearDown(self):
    890         self._frontend_common_teardown()
    891 
    892 
    893     def _setup_drones(self):
    894         self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
    895         models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
    896 
    897         drones = []
    898         for x in xrange(4):
    899             drones.append(models.Drone.objects.create(hostname=str(x)))
    900 
    901         drone_set_1 = models.DroneSet.objects.create(name='1')
    902         drone_set_1.drones.add(*drones[0:2])
    903         drone_set_2 = models.DroneSet.objects.create(name='2')
    904         drone_set_2.drones.add(*drones[2:4])
    905         drone_set_3 = models.DroneSet.objects.create(name='3')
    906 
    907         job_1 = self._create_job_simple([self.hosts[0].id],
    908                                         drone_set=drone_set_1)
    909         job_2 = self._create_job_simple([self.hosts[0].id],
    910                                         drone_set=drone_set_2)
    911         job_3 = self._create_job_simple([self.hosts[0].id],
    912                                         drone_set=drone_set_3)
    913 
    914         job_4 = self._create_job_simple([self.hosts[0].id])
    915         job_4.drone_set = None
    916         job_4.save()
    917 
    918         hqe_1 = job_1.hostqueueentry_set.all()[0]
    919         hqe_2 = job_2.hostqueueentry_set.all()[0]
    920         hqe_3 = job_3.hostqueueentry_set.all()[0]
    921         hqe_4 = job_4.hostqueueentry_set.all()[0]
    922 
    923         return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
    924 
    925 
    926     def test_get_drone_hostnames_allowed_no_drones_in_set(self):
    927         hqes, task = self._setup_drones()
    928         task.queue_entry_ids = (hqes[2].id,)
    929         self.assertEqual(set(), task.get_drone_hostnames_allowed())
    930         self.god.check_playback()
    931 
    932 
    933     def test_get_drone_hostnames_allowed_no_drone_set(self):
    934         hqes, task = self._setup_drones()
    935         hqe = hqes[3]
    936         task.queue_entry_ids = (hqe.id,)
    937 
    938         result = object()
    939 
    940         self.god.stub_function(task, '_user_or_global_default_drone_set')
    941         task._user_or_global_default_drone_set.expect_call(
    942                 hqe.job, hqe.job.user()).and_return(result)
    943 
    944         self.assertEqual(result, task.get_drone_hostnames_allowed())
    945         self.god.check_playback()
    946 
    947 
    948     def test_get_drone_hostnames_allowed_success(self):
    949         hqes, task = self._setup_drones()
    950         task.queue_entry_ids = (hqes[0].id,)
    951         self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed([]))
    952         self.god.check_playback()
    953 
    954 
    955     def test_get_drone_hostnames_allowed_multiple_jobs(self):
    956         hqes, task = self._setup_drones()
    957         task.queue_entry_ids = (hqes[0].id, hqes[1].id)
    958         self.assertRaises(AssertionError,
    959                           task.get_drone_hostnames_allowed)
    960         self.god.check_playback()
    961 
    962 
    963     def test_get_drone_hostnames_allowed_no_hqe(self):
    964         class MockSpecialTask(object):
    965             requested_by = object()
    966 
    967         class MockSpecialAgentTask(agent_task.SpecialAgentTask):
    968             task = MockSpecialTask()
    969             queue_entry_ids = []
    970             def __init__(self, *args, **kwargs):
    971                 super(agent_task.SpecialAgentTask, self).__init__()
    972 
    973         task = MockSpecialAgentTask()
    974         self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
    975         self.god.stub_function(task, '_user_or_global_default_drone_set')
    976 
    977         result = object()
    978         models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
    979         task._user_or_global_default_drone_set.expect_call(
    980                 task.task, MockSpecialTask.requested_by).and_return(result)
    981 
    982         self.assertEqual(result, task.get_drone_hostnames_allowed())
    983         self.god.check_playback()
    984 
    985 
    986     def _setup_test_user_or_global_default_drone_set(self):
    987         result = object()
    988         class MockDroneSet(object):
    989             def get_drone_hostnames(self):
    990                 return result
    991 
    992         self.god.stub_function(models.DroneSet, 'get_default')
    993         models.DroneSet.get_default.expect_call().and_return(MockDroneSet())
    994         return result
    995 
    996 
    997     def test_user_or_global_default_drone_set(self):
    998         expected = object()
    999         class MockDroneSet(object):
   1000             def get_drone_hostnames(self):
   1001                 return expected
   1002         class MockUser(object):
   1003             drone_set = MockDroneSet()
   1004 
   1005         self._setup_test_user_or_global_default_drone_set()
   1006 
   1007         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1008                 None, MockUser())
   1009 
   1010         self.assertEqual(expected, actual)
   1011         self.god.check_playback()
   1012 
   1013 
   1014     def test_user_or_global_default_drone_set_no_user(self):
   1015         expected = self._setup_test_user_or_global_default_drone_set()
   1016         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1017                 None, None)
   1018 
   1019         self.assertEqual(expected, actual)
   1020         self.god.check_playback()
   1021 
   1022 
   1023     def test_user_or_global_default_drone_set_no_user_drone_set(self):
   1024         class MockUser(object):
   1025             drone_set = None
   1026             login = None
   1027 
   1028         expected = self._setup_test_user_or_global_default_drone_set()
   1029         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1030                 None, MockUser())
   1031 
   1032         self.assertEqual(expected, actual)
   1033         self.god.check_playback()
   1034 
   1035 
   1036     def test_abort_HostlessQueueTask(self):
   1037         hqe = self.god.create_mock_class(scheduler_models.HostQueueEntry,
   1038                                          'HostQueueEntry')
   1039         # If hqe is still in STARTING status, aborting the task should finish
   1040         # without changing hqe's status.
   1041         hqe.status = models.HostQueueEntry.Status.STARTING
   1042         hqe.job = None
   1043         hqe.id = 0
   1044         task = monitor_db.HostlessQueueTask(hqe)
   1045         task.abort()
   1046 
   1047         # If hqe is in RUNNING status, aborting the task should change hqe's
   1048         # status to Parsing, so FinalReparseTask can be scheduled.
   1049         hqe.set_status.expect_call('Parsing')
   1050         hqe.status = models.HostQueueEntry.Status.RUNNING
   1051         hqe.job = None
   1052         hqe.id = 0
   1053         task = monitor_db.HostlessQueueTask(hqe)
   1054         task.abort()
   1055 
   1056 
   1057 if __name__ == '__main__':
   1058     unittest.main()
   1059