Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 
      3 import gc
      4 import time
      5 import unittest
      6 
      7 import common
      8 from autotest_lib.frontend import setup_django_environment
      9 from autotest_lib.frontend.afe import frontend_test_utils
     10 from autotest_lib.client.common_lib import global_config
     11 from autotest_lib.client.common_lib.test_utils import mock
     12 from autotest_lib.database import database_connection
     13 from autotest_lib.frontend.afe import models
     14 from autotest_lib.scheduler import agent_task
     15 from autotest_lib.scheduler import monitor_db, drone_manager
     16 from autotest_lib.scheduler import pidfile_monitor
     17 from autotest_lib.scheduler import scheduler_config, gc_stats
     18 from autotest_lib.scheduler import scheduler_lib
     19 from autotest_lib.scheduler import scheduler_models
     20 
     21 _DEBUG = False
     22 
     23 
     24 class DummyAgentTask(object):
     25     num_processes = 1
     26     owner_username = 'my_user'
     27 
     28     def get_drone_hostnames_allowed(self):
     29         return None
     30 
     31 
     32 class DummyAgent(object):
     33     started = False
     34     _is_done = False
     35     host_ids = ()
     36     hostnames = {}
     37     queue_entry_ids = ()
     38 
     39     def __init__(self):
     40         self.task = DummyAgentTask()
     41 
     42 
     43     def tick(self):
     44         self.started = True
     45 
     46 
     47     def is_done(self):
     48         return self._is_done
     49 
     50 
     51     def set_done(self, done):
     52         self._is_done = done
     53 
     54 
     55 class IsRow(mock.argument_comparator):
     56     def __init__(self, row_id):
     57         self.row_id = row_id
     58 
     59 
     60     def is_satisfied_by(self, parameter):
     61         return list(parameter)[0] == self.row_id
     62 
     63 
     64     def __str__(self):
     65         return 'row with id %s' % self.row_id
     66 
     67 
     68 class IsAgentWithTask(mock.argument_comparator):
     69     def __init__(self, task):
     70         self._task = task
     71 
     72 
     73     def is_satisfied_by(self, parameter):
     74         if not isinstance(parameter, monitor_db.Agent):
     75             return False
     76         tasks = list(parameter.queue.queue)
     77         if len(tasks) != 1:
     78             return False
     79         return tasks[0] == self._task
     80 
     81 
     82 def _set_host_and_qe_ids(agent_or_task, id_list=None):
     83     if id_list is None:
     84         id_list = []
     85     agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
     86     agent_or_task.hostnames = dict((host_id, '192.168.1.1')
     87                                    for host_id in id_list)
     88 
     89 
     90 class BaseSchedulerTest(unittest.TestCase,
     91                         frontend_test_utils.FrontendTestMixin):
     92     _config_section = 'AUTOTEST_WEB'
     93 
     94     def _do_query(self, sql):
     95         self._database.execute(sql)
     96 
     97 
     98     def _set_monitor_stubs(self):
     99         self.mock_config = global_config.FakeGlobalConfig()
    100         self.god.stub_with(global_config, 'global_config', self.mock_config)
    101 
    102         # Clear the instance cache as this is a brand new database.
    103         scheduler_models.DBObject._clear_instance_cache()
    104 
    105         self._database = (
    106             database_connection.TranslatingDatabase.get_test_database(
    107                 translators=scheduler_lib._DB_TRANSLATORS))
    108         self._database.connect(db_type='django')
    109         self._database.debug = _DEBUG
    110 
    111         connection_manager = scheduler_lib.ConnectionManager(autocommit=False)
    112         self.god.stub_with(connection_manager, 'db_connection', self._database)
    113         self.god.stub_with(monitor_db, '_db_manager', connection_manager)
    114         self.god.stub_with(monitor_db, '_db', self._database)
    115 
    116         self.god.stub_with(monitor_db.Dispatcher,
    117                            '_get_pending_queue_entries',
    118                            self._get_pending_hqes)
    119         self.god.stub_with(scheduler_models, '_db', self._database)
    120         self.god.stub_with(drone_manager.instance(), '_results_dir',
    121                            '/test/path')
    122         self.god.stub_with(drone_manager.instance(), '_temporary_directory',
    123                            '/test/path/tmp')
    124         self.god.stub_with(drone_manager.instance(), 'initialize',
    125                            lambda *args: None)
    126         self.god.stub_with(drone_manager.instance(), 'execute_actions',
    127                            lambda *args: None)
    128 
    129         monitor_db.initialize_globals()
    130         scheduler_models.initialize_globals()
    131 
    132 
    133     def setUp(self):
    134         self._frontend_common_setup()
    135         self._set_monitor_stubs()
    136         self._set_global_config_values()
    137         self._dispatcher = monitor_db.Dispatcher()
    138 
    139 
    140     def tearDown(self):
    141         self._database.disconnect()
    142         self._frontend_common_teardown()
    143 
    144 
    145     def _set_global_config_values(self):
    146         """Set global_config values to suit unittest needs."""
    147         self.mock_config.set_config_value(
    148                 'SCHEDULER', 'inline_host_acquisition', True)
    149 
    150 
    151     def _update_hqe(self, set, where=''):
    152         query = 'UPDATE afe_host_queue_entries SET ' + set
    153         if where:
    154             query += ' WHERE ' + where
    155         self._do_query(query)
    156 
    157 
    158     def _get_pending_hqes(self):
    159         query_string=('afe_jobs.priority DESC, '
    160                       'ifnull(nullif(host_id, NULL), host_id) DESC, '
    161                       'ifnull(nullif(meta_host, NULL), meta_host) DESC, '
    162                       'job_id')
    163         return list(scheduler_models.HostQueueEntry.fetch(
    164             joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
    165             where='NOT complete AND NOT active AND status="Queued"',
    166             order_by=query_string))
    167 
    168 
    169 class DispatcherSchedulingTest(BaseSchedulerTest):
    170     _jobs_scheduled = []
    171 
    172 
    173     def tearDown(self):
    174         super(DispatcherSchedulingTest, self).tearDown()
    175 
    176 
    177     def _set_monitor_stubs(self):
    178         super(DispatcherSchedulingTest, self)._set_monitor_stubs()
    179 
    180         def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
    181             """Called by HostQueueEntry.run()."""
    182             self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
    183             queue_entry.set_status('Starting')
    184 
    185         self.god.stub_with(scheduler_models.HostQueueEntry,
    186                            '_do_schedule_pre_job_tasks',
    187                            hqe__do_schedule_pre_job_tasks_stub)
    188 
    189 
    190     def _record_job_scheduled(self, job_id, host_id):
    191         record = (job_id, host_id)
    192         self.assert_(record not in self._jobs_scheduled,
    193                      'Job %d scheduled on host %d twice' %
    194                      (job_id, host_id))
    195         self._jobs_scheduled.append(record)
    196 
    197 
    198     def _assert_job_scheduled_on(self, job_id, host_id):
    199         record = (job_id, host_id)
    200         self.assert_(record in self._jobs_scheduled,
    201                      'Job %d not scheduled on host %d as expected\n'
    202                      'Jobs scheduled: %s' %
    203                      (job_id, host_id, self._jobs_scheduled))
    204         self._jobs_scheduled.remove(record)
    205 
    206 
    207     def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
    208         """Assert job was scheduled on exactly number hosts out of a set."""
    209         found = []
    210         for host_id in host_ids:
    211             record = (job_id, host_id)
    212             if record in self._jobs_scheduled:
    213                 found.append(record)
    214                 self._jobs_scheduled.remove(record)
    215         if len(found) < number:
    216             self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
    217                       'Jobs scheduled: %s' % (job_id, number, host_ids, found))
    218         elif len(found) > number:
    219             self.fail('Job %d scheduled on more than %d hosts in %s.\n'
    220                       'Jobs scheduled: %s' % (job_id, number, host_ids, found))
    221 
    222 
    223     def _check_for_extra_schedulings(self):
    224         if len(self._jobs_scheduled) != 0:
    225             self.fail('Extra jobs scheduled: ' +
    226                       str(self._jobs_scheduled))
    227 
    228 
    229     def _convert_jobs_to_metahosts(self, *job_ids):
    230         sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
    231         self._do_query('UPDATE afe_host_queue_entries SET '
    232                        'meta_host=host_id, host_id=NULL '
    233                        'WHERE job_id IN ' + sql_tuple)
    234 
    235 
    236     def _lock_host(self, host_id):
    237         self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +
    238                        str(host_id))
    239 
    240 
    241     def setUp(self):
    242         super(DispatcherSchedulingTest, self).setUp()
    243         self._jobs_scheduled = []
    244 
    245 
    246     def _run_scheduler(self):
    247         self._dispatcher._host_scheduler.tick()
    248         for _ in xrange(2): # metahost scheduling can take two ticks
    249             self._dispatcher._schedule_new_jobs()
    250 
    251 
    252     def _test_basic_scheduling_helper(self, use_metahosts):
    253         'Basic nonmetahost scheduling'
    254         self._create_job_simple([1], use_metahosts)
    255         self._create_job_simple([2], use_metahosts)
    256         self._run_scheduler()
    257         self._assert_job_scheduled_on(1, 1)
    258         self._assert_job_scheduled_on(2, 2)
    259         self._check_for_extra_schedulings()
    260 
    261 
    262     def _test_priorities_helper(self, use_metahosts):
    263         'Test prioritization ordering'
    264         self._create_job_simple([1], use_metahosts)
    265         self._create_job_simple([2], use_metahosts)
    266         self._create_job_simple([1,2], use_metahosts)
    267         self._create_job_simple([1], use_metahosts, priority=1)
    268         self._run_scheduler()
    269         self._assert_job_scheduled_on(4, 1) # higher priority
    270         self._assert_job_scheduled_on(2, 2) # earlier job over later
    271         self._check_for_extra_schedulings()
    272 
    273 
    274     def _test_hosts_ready_helper(self, use_metahosts):
    275         """
    276         Only hosts that are status=Ready, unlocked and not invalid get
    277         scheduled.
    278         """
    279         self._create_job_simple([1], use_metahosts)
    280         self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
    281         self._run_scheduler()
    282         self._check_for_extra_schedulings()
    283 
    284         self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
    285                        'WHERE id=1')
    286         self._run_scheduler()
    287         self._check_for_extra_schedulings()
    288 
    289         self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
    290                        'WHERE id=1')
    291         self._run_scheduler()
    292         if not use_metahosts:
    293             self._assert_job_scheduled_on(1, 1)
    294         self._check_for_extra_schedulings()
    295 
    296 
    297     def _test_hosts_idle_helper(self, use_metahosts):
    298         'Only idle hosts get scheduled'
    299         self._create_job(hosts=[1], active=True)
    300         self._create_job_simple([1], use_metahosts)
    301         self._run_scheduler()
    302         self._check_for_extra_schedulings()
    303 
    304 
    305     def _test_obey_ACLs_helper(self, use_metahosts):
    306         self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
    307         self._create_job_simple([1], use_metahosts)
    308         self._run_scheduler()
    309         self._check_for_extra_schedulings()
    310 
    311 
    312     def test_basic_scheduling(self):
    313         self._test_basic_scheduling_helper(False)
    314 
    315 
    316     def test_priorities(self):
    317         self._test_priorities_helper(False)
    318 
    319 
    320     def test_hosts_ready(self):
    321         self._test_hosts_ready_helper(False)
    322 
    323 
    324     def test_hosts_idle(self):
    325         self._test_hosts_idle_helper(False)
    326 
    327 
    328     def test_obey_ACLs(self):
    329         self._test_obey_ACLs_helper(False)
    330 
    331 
    332     def test_one_time_hosts_ignore_ACLs(self):
    333         self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
    334         self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
    335         self._create_job_simple([1])
    336         self._run_scheduler()
    337         self._assert_job_scheduled_on(1, 1)
    338         self._check_for_extra_schedulings()
    339 
    340 
    341     def test_non_metahost_on_invalid_host(self):
    342         """
    343         Non-metahost entries can get scheduled on invalid hosts (this is how
    344         one-time hosts work).
    345         """
    346         self._do_query('UPDATE afe_hosts SET invalid=1')
    347         self._test_basic_scheduling_helper(False)
    348 
    349 
    350     def test_metahost_scheduling(self):
    351         """
    352         Basic metahost scheduling
    353         """
    354         self._test_basic_scheduling_helper(True)
    355 
    356 
    357     def test_metahost_priorities(self):
    358         self._test_priorities_helper(True)
    359 
    360 
    361     def test_metahost_hosts_ready(self):
    362         self._test_hosts_ready_helper(True)
    363 
    364 
    365     def test_metahost_hosts_idle(self):
    366         self._test_hosts_idle_helper(True)
    367 
    368 
    369     def test_metahost_obey_ACLs(self):
    370         self._test_obey_ACLs_helper(True)
    371 
    372 
    373     def test_nonmetahost_over_metahost(self):
    374         """
    375         Non-metahost entries should take priority over metahost entries
    376         for the same host
    377         """
    378         self._create_job(metahosts=[1])
    379         self._create_job(hosts=[1])
    380         self._run_scheduler()
    381         self._assert_job_scheduled_on(2, 1)
    382         self._check_for_extra_schedulings()
    383 
    384 
    385     def test_no_execution_subdir_not_found(self):
    386         """Reproduce bug crosbug.com/334353 and recover from it."""
    387 
    388         self.mock_config.set_config_value('SCHEDULER', 'drones', 'localhost')
    389 
    390         job = self._create_job(hostless=True)
    391 
    392         # Ensure execution_subdir is set before status
    393         original_set_status = scheduler_models.HostQueueEntry.set_status
    394         def fake_set_status(hqe, *args, **kwargs):
    395             self.assertEqual(hqe.execution_subdir, 'hostless')
    396             original_set_status(hqe, *args, **kwargs)
    397 
    398         self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status',
    399                            fake_set_status)
    400 
    401         self._dispatcher._schedule_new_jobs()
    402 
    403         hqe = job.hostqueueentry_set.all()[0]
    404         self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
    405         self.assertEqual('hostless', hqe.execution_subdir)
    406 
    407 
    408     def test_only_schedule_queued_entries(self):
    409         self._create_job(metahosts=[1])
    410         self._update_hqe(set='active=1, host_id=2')
    411         self._run_scheduler()
    412         self._check_for_extra_schedulings()
    413 
    414 
    415     def test_no_ready_hosts(self):
    416         self._create_job(hosts=[1])
    417         self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
    418         self._run_scheduler()
    419         self._check_for_extra_schedulings()
    420 
    421 
    422     def test_garbage_collection(self):
    423         self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',
    424                            999999)
    425         self.god.stub_function(gc, 'collect')
    426         self.god.stub_function(gc_stats, '_log_garbage_collector_stats')
    427         gc.collect.expect_call().and_return(0)
    428         gc_stats._log_garbage_collector_stats.expect_call()
    429         # Force a garbage collection run
    430         self._dispatcher._last_garbage_stats_time = 0
    431         self._dispatcher._garbage_collection()
    432         # The previous call should have reset the time, it won't do anything
    433         # the second time.  If it does, we'll get an unexpected call.
    434         self._dispatcher._garbage_collection()
    435 
    436 
    437 class DispatcherThrottlingTest(BaseSchedulerTest):
    438     """
    439     Test that the dispatcher throttles:
    440      * total number of running processes
    441      * number of processes started per cycle
    442     """
    443     _MAX_RUNNING = 3
    444     _MAX_STARTED = 2
    445 
    446     def setUp(self):
    447         super(DispatcherThrottlingTest, self).setUp()
    448         scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
    449 
    450         def fake_max_runnable_processes(fake_self, username,
    451                                         drone_hostnames_allowed):
    452             running = sum(agent.task.num_processes
    453                           for agent in self._agents
    454                           if agent.started and not agent.is_done())
    455             return self._MAX_RUNNING - running
    456         self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
    457                            fake_max_runnable_processes)
    458 
    459 
    460     def _setup_some_agents(self, num_agents):
    461         self._agents = [DummyAgent() for i in xrange(num_agents)]
    462         self._dispatcher._agents = list(self._agents)
    463 
    464 
    465     def _run_a_few_ticks(self):
    466         for i in xrange(4):
    467             self._dispatcher._handle_agents()
    468 
    469 
    470     def _assert_agents_started(self, indexes, is_started=True):
    471         for i in indexes:
    472             self.assert_(self._agents[i].started == is_started,
    473                          'Agent %d %sstarted' %
    474                          (i, is_started and 'not ' or ''))
    475 
    476 
    477     def _assert_agents_not_started(self, indexes):
    478         self._assert_agents_started(indexes, False)
    479 
    480 
    481     def test_throttle_total(self):
    482         self._setup_some_agents(4)
    483         self._run_a_few_ticks()
    484         self._assert_agents_started([0, 1, 2])
    485         self._assert_agents_not_started([3])
    486 
    487 
    488     def test_throttle_with_synchronous(self):
    489         self._setup_some_agents(2)
    490         self._agents[0].task.num_processes = 3
    491         self._run_a_few_ticks()
    492         self._assert_agents_started([0])
    493         self._assert_agents_not_started([1])
    494 
    495 
    496     def test_large_agent_starvation(self):
    497         """
    498         Ensure large agents don't get starved by lower-priority agents.
    499         """
    500         self._setup_some_agents(3)
    501         self._agents[1].task.num_processes = 3
    502         self._run_a_few_ticks()
    503         self._assert_agents_started([0])
    504         self._assert_agents_not_started([1, 2])
    505 
    506         self._agents[0].set_done(True)
    507         self._run_a_few_ticks()
    508         self._assert_agents_started([1])
    509         self._assert_agents_not_started([2])
    510 
    511 
    512     def test_zero_process_agent(self):
    513         self._setup_some_agents(5)
    514         self._agents[4].task.num_processes = 0
    515         self._run_a_few_ticks()
    516         self._assert_agents_started([0, 1, 2, 4])
    517         self._assert_agents_not_started([3])
    518 
    519 
    520 class PidfileRunMonitorTest(unittest.TestCase):
    521     execution_tag = 'test_tag'
    522     pid = 12345
    523     process = drone_manager.Process('myhost', pid)
    524     num_tests_failed = 1
    525 
    526     def setUp(self):
    527         self.god = mock.mock_god()
    528         self.mock_drone_manager = self.god.create_mock_class(
    529             drone_manager.DroneManager, 'drone_manager')
    530         self.god.stub_with(drone_manager, '_the_instance',
    531                            self.mock_drone_manager)
    532         self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
    533                            self._mock_get_pidfile_timeout_secs)
    534 
    535         self.pidfile_id = object()
    536 
    537         (self.mock_drone_manager.get_pidfile_id_from
    538              .expect_call(self.execution_tag,
    539                           pidfile_name=drone_manager.AUTOSERV_PID_FILE)
    540              .and_return(self.pidfile_id))
    541 
    542         self.monitor = pidfile_monitor.PidfileRunMonitor()
    543         self.monitor.attach_to_existing_process(self.execution_tag)
    544 
    545     def tearDown(self):
    546         self.god.unstub_all()
    547 
    548 
    549     def _mock_get_pidfile_timeout_secs(self):
    550         return 300
    551 
    552 
    553     def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
    554                       use_second_read=False):
    555         contents = drone_manager.PidfileContents()
    556         if pid is not None:
    557             contents.process = drone_manager.Process('myhost', pid)
    558         contents.exit_status = exit_code
    559         contents.num_tests_failed = tests_failed
    560         self.mock_drone_manager.get_pidfile_contents.expect_call(
    561             self.pidfile_id, use_second_read=use_second_read).and_return(
    562             contents)
    563 
    564 
    565     def set_not_yet_run(self):
    566         self.setup_pidfile()
    567 
    568 
    569     def set_empty_pidfile(self):
    570         self.setup_pidfile()
    571 
    572 
    573     def set_running(self, use_second_read=False):
    574         self.setup_pidfile(self.pid, use_second_read=use_second_read)
    575 
    576 
    577     def set_complete(self, error_code, use_second_read=False):
    578         self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
    579                            use_second_read=use_second_read)
    580 
    581 
    582     def _check_monitor(self, expected_pid, expected_exit_status,
    583                        expected_num_tests_failed):
    584         if expected_pid is None:
    585             self.assertEquals(self.monitor._state.process, None)
    586         else:
    587             self.assertEquals(self.monitor._state.process.pid, expected_pid)
    588         self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
    589         self.assertEquals(self.monitor._state.num_tests_failed,
    590                           expected_num_tests_failed)
    591 
    592 
    593         self.god.check_playback()
    594 
    595 
    596     def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
    597                                   expected_num_tests_failed):
    598         self.monitor._read_pidfile()
    599         self._check_monitor(expected_pid, expected_exit_status,
    600                             expected_num_tests_failed)
    601 
    602 
    603     def _get_expected_tests_failed(self, expected_exit_status):
    604         if expected_exit_status is None:
    605             expected_tests_failed = None
    606         else:
    607             expected_tests_failed = self.num_tests_failed
    608         return expected_tests_failed
    609 
    610 
    611     def test_read_pidfile(self):
    612         self.set_not_yet_run()
    613         self._test_read_pidfile_helper(None, None, None)
    614 
    615         self.set_empty_pidfile()
    616         self._test_read_pidfile_helper(None, None, None)
    617 
    618         self.set_running()
    619         self._test_read_pidfile_helper(self.pid, None, None)
    620 
    621         self.set_complete(123)
    622         self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
    623 
    624 
    625     def test_read_pidfile_error(self):
    626         self.mock_drone_manager.get_pidfile_contents.expect_call(
    627             self.pidfile_id, use_second_read=False).and_return(
    628             drone_manager.InvalidPidfile('error'))
    629         self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
    630                           self.monitor._read_pidfile)
    631         self.god.check_playback()
    632 
    633 
    634     def setup_is_running(self, is_running):
    635         self.mock_drone_manager.is_process_running.expect_call(
    636             self.process).and_return(is_running)
    637 
    638 
    639     def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
    640                                       expected_num_tests_failed):
    641         self.monitor._get_pidfile_info()
    642         self._check_monitor(expected_pid, expected_exit_status,
    643                             expected_num_tests_failed)
    644 
    645 
    646     def test_get_pidfile_info(self):
    647         """
    648         normal cases for get_pidfile_info
    649         """
    650         # running
    651         self.set_running()
    652         self.setup_is_running(True)
    653         self._test_get_pidfile_info_helper(self.pid, None, None)
    654 
    655         # exited during check
    656         self.set_running()
    657         self.setup_is_running(False)
    658         self.set_complete(123, use_second_read=True) # pidfile gets read again
    659         self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
    660 
    661         # completed
    662         self.set_complete(123)
    663         self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
    664 
    665 
    666     def test_get_pidfile_info_running_no_proc(self):
    667         """
    668         pidfile shows process running, but no proc exists
    669         """
    670         # running but no proc
    671         self.set_running()
    672         self.setup_is_running(False)
    673         self.set_running(use_second_read=True)
    674         self._test_get_pidfile_info_helper(self.pid, 1, 0)
    675         self.assertTrue(self.monitor.lost_process)
    676 
    677 
    678     def test_get_pidfile_info_not_yet_run(self):
    679         """
    680         pidfile hasn't been written yet
    681         """
    682         self.set_not_yet_run()
    683         self._test_get_pidfile_info_helper(None, None, None)
    684 
    685 
    686     def test_process_failed_to_write_pidfile(self):
    687         self.set_not_yet_run()
    688         self.monitor._start_time = (time.time() -
    689                                     pidfile_monitor._get_pidfile_timeout_secs() - 1)
    690         self._test_get_pidfile_info_helper(None, 1, 0)
    691         self.assertTrue(self.monitor.lost_process)
    692 
    693 
    694 class AgentTest(unittest.TestCase):
    695     def setUp(self):
    696         self.god = mock.mock_god()
    697         self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
    698                                                       'dispatcher')
    699 
    700 
    701     def tearDown(self):
    702         self.god.unstub_all()
    703 
    704 
    705     def _create_mock_task(self, name):
    706         task = self.god.create_mock_class(agent_task.AgentTask, name)
    707         task.num_processes = 1
    708         _set_host_and_qe_ids(task)
    709         return task
    710 
    711     def _create_agent(self, task):
    712         agent = monitor_db.Agent(task)
    713         agent.dispatcher = self._dispatcher
    714         return agent
    715 
    716 
    717     def _finish_agent(self, agent):
    718         while not agent.is_done():
    719             agent.tick()
    720 
    721 
    722     def test_agent_abort(self):
    723         task = self._create_mock_task('task')
    724         task.poll.expect_call()
    725         task.is_done.expect_call().and_return(False)
    726         task.abort.expect_call()
    727         task.aborted = True
    728 
    729         agent = self._create_agent(task)
    730         agent.tick()
    731         agent.abort()
    732         self._finish_agent(agent)
    733         self.god.check_playback()
    734 
    735 
    736     def _test_agent_abort_before_started_helper(self, ignore_abort=False):
    737         task = self._create_mock_task('task')
    738         task.abort.expect_call()
    739         if ignore_abort:
    740             task.aborted = False
    741             task.poll.expect_call()
    742             task.is_done.expect_call().and_return(True)
    743             task.success = True
    744         else:
    745             task.aborted = True
    746 
    747         agent = self._create_agent(task)
    748         agent.abort()
    749         self._finish_agent(agent)
    750         self.god.check_playback()
    751 
    752 
    753     def test_agent_abort_before_started(self):
    754         self._test_agent_abort_before_started_helper()
    755         self._test_agent_abort_before_started_helper(True)
    756 
    757 
    758 class JobSchedulingTest(BaseSchedulerTest):
    759     def _test_run_helper(self, expect_agent=True, expect_starting=False,
    760                          expect_pending=False):
    761         if expect_starting:
    762             expected_status = models.HostQueueEntry.Status.STARTING
    763         elif expect_pending:
    764             expected_status = models.HostQueueEntry.Status.PENDING
    765         else:
    766             expected_status = models.HostQueueEntry.Status.VERIFYING
    767         job = scheduler_models.Job.fetch('id = 1')[0]
    768         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
    769         assert queue_entry.job is job
    770         job.run_if_ready(queue_entry)
    771 
    772         self.god.check_playback()
    773 
    774         self._dispatcher._schedule_running_host_queue_entries()
    775         agent = self._dispatcher._agents[0]
    776 
    777         actual_status = models.HostQueueEntry.smart_get(1).status
    778         self.assertEquals(expected_status, actual_status)
    779 
    780         if not expect_agent:
    781             self.assertEquals(agent, None)
    782             return
    783 
    784         self.assert_(isinstance(agent, monitor_db.Agent))
    785         self.assert_(agent.task)
    786         return agent.task
    787 
    788 
    789     def test_run_synchronous_ready(self):
    790         self._create_job(hosts=[1, 2], synchronous=True)
    791         self._update_hqe("status='Pending', execution_subdir=''")
    792 
    793         queue_task = self._test_run_helper(expect_starting=True)
    794 
    795         self.assert_(isinstance(queue_task, monitor_db.QueueTask))
    796         self.assertEquals(queue_task.job.id, 1)
    797         hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
    798         self.assertEquals(hqe_ids, [1, 2])
    799 
    800 
    801     def test_schedule_running_host_queue_entries_fail(self):
    802         self._create_job(hosts=[2])
    803         self._update_hqe("status='%s', execution_subdir=''" %
    804                          models.HostQueueEntry.Status.PENDING)
    805         job = scheduler_models.Job.fetch('id = 1')[0]
    806         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
    807         assert queue_entry.job is job
    808         job.run_if_ready(queue_entry)
    809         self.assertEqual(queue_entry.status,
    810                          models.HostQueueEntry.Status.STARTING)
    811         self.assert_(queue_entry.execution_subdir)
    812         self.god.check_playback()
    813 
    814         class dummy_test_agent(object):
    815             task = 'dummy_test_agent'
    816         self._dispatcher._register_agent_for_ids(
    817                 self._dispatcher._host_agents, [queue_entry.host.id],
    818                 dummy_test_agent)
    819 
    820         # Attempted to schedule on a host that already has an agent.
    821         self.assertRaises(scheduler_lib.SchedulerError,
    822                           self._dispatcher._schedule_running_host_queue_entries)
    823 
    824 
    825     def test_schedule_hostless_job(self):
    826         job = self._create_job(hostless=True)
    827         self.assertEqual(1, job.hostqueueentry_set.count())
    828         hqe_query = scheduler_models.HostQueueEntry.fetch(
    829                 'id = %s' % job.hostqueueentry_set.all()[0].id)
    830         self.assertEqual(1, len(hqe_query))
    831         hqe = hqe_query[0]
    832 
    833         self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status)
    834         self.assertEqual(0, len(self._dispatcher._agents))
    835 
    836         self._dispatcher._schedule_new_jobs()
    837 
    838         self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
    839         self.assertEqual(1, len(self._dispatcher._agents))
    840 
    841         self._dispatcher._schedule_new_jobs()
    842 
    843         # No change to previously schedule hostless job, and no additional agent
    844         self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
    845         self.assertEqual(1, len(self._dispatcher._agents))
    846 
    847 
    848 class TopLevelFunctionsTest(unittest.TestCase):
    849     def setUp(self):
    850         self.god = mock.mock_god()
    851 
    852 
    853     def tearDown(self):
    854         self.god.unstub_all()
    855 
    856 
    857     def test_autoserv_command_line(self):
    858         machines = 'abcd12,efgh34'
    859         extra_args = ['-Z', 'hello']
    860         expected_command_line_base = set((monitor_db._autoserv_path, '-p',
    861                                           '-m', machines, '-r',
    862                                           '--lab', 'True',
    863                                           drone_manager.WORKING_DIRECTORY))
    864 
    865         expected_command_line = expected_command_line_base.union(
    866                 ['--verbose']).union(extra_args)
    867         command_line = set(
    868                 monitor_db._autoserv_command_line(machines, extra_args))
    869         self.assertEqual(expected_command_line, command_line)
    870 
    871         class FakeJob(object):
    872             owner = 'Bob'
    873             name = 'fake job name'
    874             test_retry = 0
    875             id = 1337
    876 
    877         class FakeHQE(object):
    878             job = FakeJob
    879 
    880         expected_command_line = expected_command_line_base.union(
    881                 ['-u', FakeJob.owner, '-l', FakeJob.name])
    882         command_line = set(monitor_db._autoserv_command_line(
    883                 machines, extra_args=[], queue_entry=FakeHQE, verbose=False))
    884         self.assertEqual(expected_command_line, command_line)
    885 
    886 
    887 class AgentTaskTest(unittest.TestCase,
    888                     frontend_test_utils.FrontendTestMixin):
    889     def setUp(self):
    890         self._frontend_common_setup()
    891 
    892 
    893     def tearDown(self):
    894         self._frontend_common_teardown()
    895 
    896 
    897     def _setup_drones(self):
    898         self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
    899         models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
    900 
    901         drones = []
    902         for x in xrange(4):
    903             drones.append(models.Drone.objects.create(hostname=str(x)))
    904 
    905         drone_set_1 = models.DroneSet.objects.create(name='1')
    906         drone_set_1.drones.add(*drones[0:2])
    907         drone_set_2 = models.DroneSet.objects.create(name='2')
    908         drone_set_2.drones.add(*drones[2:4])
    909         drone_set_3 = models.DroneSet.objects.create(name='3')
    910 
    911         job_1 = self._create_job_simple([self.hosts[0].id],
    912                                         drone_set=drone_set_1)
    913         job_2 = self._create_job_simple([self.hosts[0].id],
    914                                         drone_set=drone_set_2)
    915         job_3 = self._create_job_simple([self.hosts[0].id],
    916                                         drone_set=drone_set_3)
    917 
    918         job_4 = self._create_job_simple([self.hosts[0].id])
    919         job_4.drone_set = None
    920         job_4.save()
    921 
    922         hqe_1 = job_1.hostqueueentry_set.all()[0]
    923         hqe_2 = job_2.hostqueueentry_set.all()[0]
    924         hqe_3 = job_3.hostqueueentry_set.all()[0]
    925         hqe_4 = job_4.hostqueueentry_set.all()[0]
    926 
    927         return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
    928 
    929 
    930     def test_get_drone_hostnames_allowed_no_drones_in_set(self):
    931         hqes, task = self._setup_drones()
    932         task.queue_entry_ids = (hqes[2].id,)
    933         self.assertEqual(set(), task.get_drone_hostnames_allowed())
    934         self.god.check_playback()
    935 
    936 
    937     def test_get_drone_hostnames_allowed_no_drone_set(self):
    938         hqes, task = self._setup_drones()
    939         hqe = hqes[3]
    940         task.queue_entry_ids = (hqe.id,)
    941 
    942         result = object()
    943 
    944         self.god.stub_function(task, '_user_or_global_default_drone_set')
    945         task._user_or_global_default_drone_set.expect_call(
    946                 hqe.job, hqe.job.user()).and_return(result)
    947 
    948         self.assertEqual(result, task.get_drone_hostnames_allowed())
    949         self.god.check_playback()
    950 
    951 
    952     def test_get_drone_hostnames_allowed_success(self):
    953         hqes, task = self._setup_drones()
    954         task.queue_entry_ids = (hqes[0].id,)
    955         self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed([]))
    956         self.god.check_playback()
    957 
    958 
    959     def test_get_drone_hostnames_allowed_multiple_jobs(self):
    960         hqes, task = self._setup_drones()
    961         task.queue_entry_ids = (hqes[0].id, hqes[1].id)
    962         self.assertRaises(AssertionError,
    963                           task.get_drone_hostnames_allowed)
    964         self.god.check_playback()
    965 
    966 
    967     def test_get_drone_hostnames_allowed_no_hqe(self):
    968         class MockSpecialTask(object):
    969             requested_by = object()
    970 
    971         class MockSpecialAgentTask(agent_task.SpecialAgentTask):
    972             task = MockSpecialTask()
    973             queue_entry_ids = []
    974             def __init__(self, *args, **kwargs):
    975                 super(agent_task.SpecialAgentTask, self).__init__()
    976 
    977         task = MockSpecialAgentTask()
    978         self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
    979         self.god.stub_function(task, '_user_or_global_default_drone_set')
    980 
    981         result = object()
    982         models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
    983         task._user_or_global_default_drone_set.expect_call(
    984                 task.task, MockSpecialTask.requested_by).and_return(result)
    985 
    986         self.assertEqual(result, task.get_drone_hostnames_allowed())
    987         self.god.check_playback()
    988 
    989 
    990     def _setup_test_user_or_global_default_drone_set(self):
    991         result = object()
    992         class MockDroneSet(object):
    993             def get_drone_hostnames(self):
    994                 return result
    995 
    996         self.god.stub_function(models.DroneSet, 'get_default')
    997         models.DroneSet.get_default.expect_call().and_return(MockDroneSet())
    998         return result
    999 
   1000 
   1001     def test_user_or_global_default_drone_set(self):
   1002         expected = object()
   1003         class MockDroneSet(object):
   1004             def get_drone_hostnames(self):
   1005                 return expected
   1006         class MockUser(object):
   1007             drone_set = MockDroneSet()
   1008 
   1009         self._setup_test_user_or_global_default_drone_set()
   1010 
   1011         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1012                 None, MockUser())
   1013 
   1014         self.assertEqual(expected, actual)
   1015         self.god.check_playback()
   1016 
   1017 
   1018     def test_user_or_global_default_drone_set_no_user(self):
   1019         expected = self._setup_test_user_or_global_default_drone_set()
   1020         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1021                 None, None)
   1022 
   1023         self.assertEqual(expected, actual)
   1024         self.god.check_playback()
   1025 
   1026 
   1027     def test_user_or_global_default_drone_set_no_user_drone_set(self):
   1028         class MockUser(object):
   1029             drone_set = None
   1030             login = None
   1031 
   1032         expected = self._setup_test_user_or_global_default_drone_set()
   1033         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1034                 None, MockUser())
   1035 
   1036         self.assertEqual(expected, actual)
   1037         self.god.check_playback()
   1038 
   1039 
   1040     def test_abort_HostlessQueueTask(self):
   1041         hqe = self.god.create_mock_class(scheduler_models.HostQueueEntry,
   1042                                          'HostQueueEntry')
   1043         # If hqe is still in STARTING status, aborting the task should finish
   1044         # without changing hqe's status.
   1045         hqe.status = models.HostQueueEntry.Status.STARTING
   1046         hqe.job = None
   1047         hqe.id = 0
   1048         task = monitor_db.HostlessQueueTask(hqe)
   1049         task.abort()
   1050 
   1051         # If hqe is in RUNNING status, aborting the task should change hqe's
   1052         # status to Parsing, so FinalReparseTask can be scheduled.
   1053         hqe.set_status.expect_call('Parsing')
   1054         hqe.status = models.HostQueueEntry.Status.RUNNING
   1055         hqe.job = None
   1056         hqe.id = 0
   1057         task = monitor_db.HostlessQueueTask(hqe)
   1058         task.abort()
   1059 
   1060 
   1061 if __name__ == '__main__':
   1062     unittest.main()
   1063