Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 
      3 import gc, time
      4 import common
      5 from autotest_lib.frontend import setup_django_environment
      6 from autotest_lib.frontend.afe import frontend_test_utils
      7 from autotest_lib.client.common_lib import global_config
      8 from autotest_lib.client.common_lib.test_utils import mock
      9 from autotest_lib.client.common_lib.test_utils import unittest
     10 from autotest_lib.database import database_connection
     11 from autotest_lib.frontend.afe import models
     12 from autotest_lib.scheduler import agent_task
     13 from autotest_lib.scheduler import monitor_db, drone_manager
     14 from autotest_lib.scheduler import pidfile_monitor
     15 from autotest_lib.scheduler import scheduler_config, gc_stats
     16 from autotest_lib.scheduler import scheduler_lib
     17 from autotest_lib.scheduler import scheduler_models
     18 
     19 _DEBUG = False
     20 
     21 
     22 class DummyAgentTask(object):
     23     num_processes = 1
     24     owner_username = 'my_user'
     25 
     26     def get_drone_hostnames_allowed(self):
     27         return None
     28 
     29 
     30 class DummyAgent(object):
     31     started = False
     32     _is_done = False
     33     host_ids = ()
     34     hostnames = {}
     35     queue_entry_ids = ()
     36 
     37     def __init__(self):
     38         self.task = DummyAgentTask()
     39 
     40 
     41     def tick(self):
     42         self.started = True
     43 
     44 
     45     def is_done(self):
     46         return self._is_done
     47 
     48 
     49     def set_done(self, done):
     50         self._is_done = done
     51 
     52 
     53 class IsRow(mock.argument_comparator):
     54     def __init__(self, row_id):
     55         self.row_id = row_id
     56 
     57 
     58     def is_satisfied_by(self, parameter):
     59         return list(parameter)[0] == self.row_id
     60 
     61 
     62     def __str__(self):
     63         return 'row with id %s' % self.row_id
     64 
     65 
     66 class IsAgentWithTask(mock.argument_comparator):
     67     def __init__(self, task):
     68         self._task = task
     69 
     70 
     71     def is_satisfied_by(self, parameter):
     72         if not isinstance(parameter, monitor_db.Agent):
     73             return False
     74         tasks = list(parameter.queue.queue)
     75         if len(tasks) != 1:
     76             return False
     77         return tasks[0] == self._task
     78 
     79 
     80 def _set_host_and_qe_ids(agent_or_task, id_list=None):
     81     if id_list is None:
     82         id_list = []
     83     agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
     84     agent_or_task.hostnames = dict((host_id, '192.168.1.1')
     85                                    for host_id in id_list)
     86 
     87 
     88 class BaseSchedulerTest(unittest.TestCase,
     89                         frontend_test_utils.FrontendTestMixin):
     90     _config_section = 'AUTOTEST_WEB'
     91 
     92     def _do_query(self, sql):
     93         self._database.execute(sql)
     94 
     95 
     96     def _set_monitor_stubs(self):
     97         # Clear the instance cache as this is a brand new database.
     98         scheduler_models.DBObject._clear_instance_cache()
     99 
    100         self._database = (
    101             database_connection.TranslatingDatabase.get_test_database(
    102                 translators=scheduler_lib._DB_TRANSLATORS))
    103         self._database.connect(db_type='django')
    104         self._database.debug = _DEBUG
    105 
    106         connection_manager = scheduler_lib.ConnectionManager(autocommit=False)
    107         self.god.stub_with(connection_manager, 'db_connection', self._database)
    108         self.god.stub_with(monitor_db, '_db_manager', connection_manager)
    109         self.god.stub_with(monitor_db, '_db', self._database)
    110 
    111         # These tests only make sense if hosts are acquired inline with the
    112         # rest of the tick.
    113         self.god.stub_with(monitor_db, '_inline_host_acquisition', True)
    114         self.god.stub_with(monitor_db.BaseDispatcher,
    115                            '_get_pending_queue_entries',
    116                            self._get_pending_hqes)
    117         self.god.stub_with(scheduler_models, '_db', self._database)
    118         self.god.stub_with(drone_manager.instance(), '_results_dir',
    119                            '/test/path')
    120         self.god.stub_with(drone_manager.instance(), '_temporary_directory',
    121                            '/test/path/tmp')
    122         self.god.stub_with(drone_manager.instance(), 'initialize',
    123                            lambda *args: None)
    124         self.god.stub_with(drone_manager.instance(), 'execute_actions',
    125                            lambda *args: None)
    126 
    127         monitor_db.initialize_globals()
    128         scheduler_models.initialize_globals()
    129 
    130 
    131     def setUp(self):
    132         self._frontend_common_setup()
    133         self._set_monitor_stubs()
    134         self._dispatcher = monitor_db.Dispatcher()
    135 
    136 
    137     def tearDown(self):
    138         self._database.disconnect()
    139         self._frontend_common_teardown()
    140 
    141 
    142     def _update_hqe(self, set, where=''):
    143         query = 'UPDATE afe_host_queue_entries SET ' + set
    144         if where:
    145             query += ' WHERE ' + where
    146         self._do_query(query)
    147 
    148 
    149     def _get_pending_hqes(self):
    150         query_string=('afe_jobs.priority DESC, '
    151                       'ifnull(nullif(host_id, NULL), host_id) DESC, '
    152                       'ifnull(nullif(meta_host, NULL), meta_host) DESC, '
    153                       'job_id')
    154         return list(scheduler_models.HostQueueEntry.fetch(
    155             joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
    156             where='NOT complete AND NOT active AND status="Queued"',
    157             order_by=query_string))
    158 
    159 
    160 class DispatcherSchedulingTest(BaseSchedulerTest):
    161     _jobs_scheduled = []
    162 
    163 
    164     def tearDown(self):
    165         super(DispatcherSchedulingTest, self).tearDown()
    166 
    167 
    168     def _set_monitor_stubs(self):
    169         super(DispatcherSchedulingTest, self)._set_monitor_stubs()
    170 
    171         def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
    172             """Called by HostQueueEntry.run()."""
    173             self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
    174             queue_entry.set_status('Starting')
    175 
    176         self.god.stub_with(scheduler_models.HostQueueEntry,
    177                            '_do_schedule_pre_job_tasks',
    178                            hqe__do_schedule_pre_job_tasks_stub)
    179 
    180 
    181     def _record_job_scheduled(self, job_id, host_id):
    182         record = (job_id, host_id)
    183         self.assert_(record not in self._jobs_scheduled,
    184                      'Job %d scheduled on host %d twice' %
    185                      (job_id, host_id))
    186         self._jobs_scheduled.append(record)
    187 
    188 
    189     def _assert_job_scheduled_on(self, job_id, host_id):
    190         record = (job_id, host_id)
    191         self.assert_(record in self._jobs_scheduled,
    192                      'Job %d not scheduled on host %d as expected\n'
    193                      'Jobs scheduled: %s' %
    194                      (job_id, host_id, self._jobs_scheduled))
    195         self._jobs_scheduled.remove(record)
    196 
    197 
    198     def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
    199         """Assert job was scheduled on exactly number hosts out of a set."""
    200         found = []
    201         for host_id in host_ids:
    202             record = (job_id, host_id)
    203             if record in self._jobs_scheduled:
    204                 found.append(record)
    205                 self._jobs_scheduled.remove(record)
    206         if len(found) < number:
    207             self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
    208                       'Jobs scheduled: %s' % (job_id, number, host_ids, found))
    209         elif len(found) > number:
    210             self.fail('Job %d scheduled on more than %d hosts in %s.\n'
    211                       'Jobs scheduled: %s' % (job_id, number, host_ids, found))
    212 
    213 
    214     def _check_for_extra_schedulings(self):
    215         if len(self._jobs_scheduled) != 0:
    216             self.fail('Extra jobs scheduled: ' +
    217                       str(self._jobs_scheduled))
    218 
    219 
    220     def _convert_jobs_to_metahosts(self, *job_ids):
    221         sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
    222         self._do_query('UPDATE afe_host_queue_entries SET '
    223                        'meta_host=host_id, host_id=NULL '
    224                        'WHERE job_id IN ' + sql_tuple)
    225 
    226 
    227     def _lock_host(self, host_id):
    228         self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +
    229                        str(host_id))
    230 
    231 
    232     def setUp(self):
    233         super(DispatcherSchedulingTest, self).setUp()
    234         self._jobs_scheduled = []
    235 
    236 
    237     def _run_scheduler(self):
    238         self._dispatcher._host_scheduler.tick()
    239         for _ in xrange(2): # metahost scheduling can take two ticks
    240             self._dispatcher._schedule_new_jobs()
    241 
    242 
    243     def _test_basic_scheduling_helper(self, use_metahosts):
    244         'Basic nonmetahost scheduling'
    245         self._create_job_simple([1], use_metahosts)
    246         self._create_job_simple([2], use_metahosts)
    247         self._run_scheduler()
    248         self._assert_job_scheduled_on(1, 1)
    249         self._assert_job_scheduled_on(2, 2)
    250         self._check_for_extra_schedulings()
    251 
    252 
    253     def _test_priorities_helper(self, use_metahosts):
    254         'Test prioritization ordering'
    255         self._create_job_simple([1], use_metahosts)
    256         self._create_job_simple([2], use_metahosts)
    257         self._create_job_simple([1,2], use_metahosts)
    258         self._create_job_simple([1], use_metahosts, priority=1)
    259         self._run_scheduler()
    260         self._assert_job_scheduled_on(4, 1) # higher priority
    261         self._assert_job_scheduled_on(2, 2) # earlier job over later
    262         self._check_for_extra_schedulings()
    263 
    264 
    265     def _test_hosts_ready_helper(self, use_metahosts):
    266         """
    267         Only hosts that are status=Ready, unlocked and not invalid get
    268         scheduled.
    269         """
    270         self._create_job_simple([1], use_metahosts)
    271         self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
    272         self._run_scheduler()
    273         self._check_for_extra_schedulings()
    274 
    275         self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
    276                        'WHERE id=1')
    277         self._run_scheduler()
    278         self._check_for_extra_schedulings()
    279 
    280         self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
    281                        'WHERE id=1')
    282         self._run_scheduler()
    283         if not use_metahosts:
    284             self._assert_job_scheduled_on(1, 1)
    285         self._check_for_extra_schedulings()
    286 
    287 
    288     def _test_hosts_idle_helper(self, use_metahosts):
    289         'Only idle hosts get scheduled'
    290         self._create_job(hosts=[1], active=True)
    291         self._create_job_simple([1], use_metahosts)
    292         self._run_scheduler()
    293         self._check_for_extra_schedulings()
    294 
    295 
    296     def _test_obey_ACLs_helper(self, use_metahosts):
    297         self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
    298         self._create_job_simple([1], use_metahosts)
    299         self._run_scheduler()
    300         self._check_for_extra_schedulings()
    301 
    302 
    303     def test_basic_scheduling(self):
    304         self._test_basic_scheduling_helper(False)
    305 
    306 
    307     def test_priorities(self):
    308         self._test_priorities_helper(False)
    309 
    310 
    311     def test_hosts_ready(self):
    312         self._test_hosts_ready_helper(False)
    313 
    314 
    315     def test_hosts_idle(self):
    316         self._test_hosts_idle_helper(False)
    317 
    318 
    319     def test_obey_ACLs(self):
    320         self._test_obey_ACLs_helper(False)
    321 
    322 
    323     def test_one_time_hosts_ignore_ACLs(self):
    324         self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
    325         self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
    326         self._create_job_simple([1])
    327         self._run_scheduler()
    328         self._assert_job_scheduled_on(1, 1)
    329         self._check_for_extra_schedulings()
    330 
    331 
    332     def test_non_metahost_on_invalid_host(self):
    333         """
    334         Non-metahost entries can get scheduled on invalid hosts (this is how
    335         one-time hosts work).
    336         """
    337         self._do_query('UPDATE afe_hosts SET invalid=1')
    338         self._test_basic_scheduling_helper(False)
    339 
    340 
    341     def test_metahost_scheduling(self):
    342         """
    343         Basic metahost scheduling
    344         """
    345         self._test_basic_scheduling_helper(True)
    346 
    347 
    348     def test_metahost_priorities(self):
    349         self._test_priorities_helper(True)
    350 
    351 
    352     def test_metahost_hosts_ready(self):
    353         self._test_hosts_ready_helper(True)
    354 
    355 
    356     def test_metahost_hosts_idle(self):
    357         self._test_hosts_idle_helper(True)
    358 
    359 
    360     def test_metahost_obey_ACLs(self):
    361         self._test_obey_ACLs_helper(True)
    362 
    363 
    364     def test_nonmetahost_over_metahost(self):
    365         """
    366         Non-metahost entries should take priority over metahost entries
    367         for the same host
    368         """
    369         self._create_job(metahosts=[1])
    370         self._create_job(hosts=[1])
    371         self._run_scheduler()
    372         self._assert_job_scheduled_on(2, 1)
    373         self._check_for_extra_schedulings()
    374 
    375 
    376 #    TODO: Revive this test.
    377 #    def test_HostScheduler_get_host_atomic_group_id(self):
    378 #        job = self._create_job(metahosts=[self.label6.id])
    379 #        queue_entry = scheduler_models.HostQueueEntry.fetch(
    380 #                where='job_id=%d' % job.id)[0]
    381 #        # Indirectly initialize the internal state of the host scheduler.
    382 #        self._dispatcher._refresh_pending_queue_entries()
    383 #
    384 #        # Test the host scheduler
    385 #        host_scheduler = self._dispatcher._host_scheduler
    386 #
    387 #
    388 #        # Two labels each in a different atomic group.  This should log an
    389 #        # error and continue.
    390 #        orig_logging_error = logging.error
    391 #        def mock_logging_error(message, *args):
    392 #            mock_logging_error._num_calls += 1
    393 #            # Test the logging call itself, we just wrapped it to count it.
    394 #            orig_logging_error(message, *args)
    395 #        mock_logging_error._num_calls = 0
    396 #        self.god.stub_with(logging, 'error', mock_logging_error)
    397 #        host_scheduler.refresh([])
    398 #        self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id(
    399 #                [self.label4.id, self.label8.id], queue_entry))
    400 #        self.assertTrue(mock_logging_error._num_calls > 0)
    401 #        self.god.unstub(logging, 'error')
    402 #
    403 #        # Two labels both in the same atomic group, this should not raise an
    404 #        # error, it will merely cause the job to schedule on the intersection.
    405 #        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
    406 #                [self.label4.id, self.label5.id]))
    407 #
    408 #        self.assertEquals(None, host_scheduler._get_host_atomic_group_id([]))
    409 #        self.assertEquals(None, host_scheduler._get_host_atomic_group_id(
    410 #                [self.label3.id, self.label7.id, self.label6.id]))
    411 #        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
    412 #                [self.label4.id, self.label7.id, self.label6.id]))
    413 #        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
    414 #                [self.label7.id, self.label5.id]))
    415 
    416 
    417     def test_no_execution_subdir_not_found(self):
    418         """Reproduce bug crosbug.com/334353 and recover from it."""
    419 
    420         global_config.global_config.override_config_value(
    421                 'SCHEDULER', 'drones', 'localhost')
    422 
    423         job = self._create_job(hostless=True)
    424 
    425         # Ensure execution_subdir is set before status
    426         original_set_status = scheduler_models.HostQueueEntry.set_status
    427         def fake_set_status(hqe, *args, **kwargs):
    428             self.assertEqual(hqe.execution_subdir, 'hostless')
    429             original_set_status(hqe, *args, **kwargs)
    430 
    431         self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status',
    432                            fake_set_status)
    433 
    434         self._dispatcher._schedule_new_jobs()
    435 
    436         hqe = job.hostqueueentry_set.all()[0]
    437         self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
    438         self.assertEqual('hostless', hqe.execution_subdir)
    439 
    440 
    441     def test_only_schedule_queued_entries(self):
    442         self._create_job(metahosts=[1])
    443         self._update_hqe(set='active=1, host_id=2')
    444         self._run_scheduler()
    445         self._check_for_extra_schedulings()
    446 
    447 
    448     def test_no_ready_hosts(self):
    449         self._create_job(hosts=[1])
    450         self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
    451         self._run_scheduler()
    452         self._check_for_extra_schedulings()
    453 
    454 
    455     def test_garbage_collection(self):
    456         self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',
    457                            999999)
    458         self.god.stub_function(gc, 'collect')
    459         self.god.stub_function(gc_stats, '_log_garbage_collector_stats')
    460         gc.collect.expect_call().and_return(0)
    461         gc_stats._log_garbage_collector_stats.expect_call()
    462         # Force a garbage collection run
    463         self._dispatcher._last_garbage_stats_time = 0
    464         self._dispatcher._garbage_collection()
    465         # The previous call should have reset the time, it won't do anything
    466         # the second time.  If it does, we'll get an unexpected call.
    467         self._dispatcher._garbage_collection()
    468 
    469 
    470 class DispatcherThrottlingTest(BaseSchedulerTest):
    471     """
    472     Test that the dispatcher throttles:
    473      * total number of running processes
    474      * number of processes started per cycle
    475     """
    476     _MAX_RUNNING = 3
    477     _MAX_STARTED = 2
    478 
    479     def setUp(self):
    480         super(DispatcherThrottlingTest, self).setUp()
    481         scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
    482 
    483         def fake_max_runnable_processes(fake_self, username,
    484                                         drone_hostnames_allowed):
    485             running = sum(agent.task.num_processes
    486                           for agent in self._agents
    487                           if agent.started and not agent.is_done())
    488             return self._MAX_RUNNING - running
    489         self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
    490                            fake_max_runnable_processes)
    491 
    492 
    493     def _setup_some_agents(self, num_agents):
    494         self._agents = [DummyAgent() for i in xrange(num_agents)]
    495         self._dispatcher._agents = list(self._agents)
    496 
    497 
    498     def _run_a_few_ticks(self):
    499         for i in xrange(4):
    500             self._dispatcher._handle_agents()
    501 
    502 
    503     def _assert_agents_started(self, indexes, is_started=True):
    504         for i in indexes:
    505             self.assert_(self._agents[i].started == is_started,
    506                          'Agent %d %sstarted' %
    507                          (i, is_started and 'not ' or ''))
    508 
    509 
    510     def _assert_agents_not_started(self, indexes):
    511         self._assert_agents_started(indexes, False)
    512 
    513 
    514     def test_throttle_total(self):
    515         self._setup_some_agents(4)
    516         self._run_a_few_ticks()
    517         self._assert_agents_started([0, 1, 2])
    518         self._assert_agents_not_started([3])
    519 
    520 
    521     def test_throttle_with_synchronous(self):
    522         self._setup_some_agents(2)
    523         self._agents[0].task.num_processes = 3
    524         self._run_a_few_ticks()
    525         self._assert_agents_started([0])
    526         self._assert_agents_not_started([1])
    527 
    528 
    529     def test_large_agent_starvation(self):
    530         """
    531         Ensure large agents don't get starved by lower-priority agents.
    532         """
    533         self._setup_some_agents(3)
    534         self._agents[1].task.num_processes = 3
    535         self._run_a_few_ticks()
    536         self._assert_agents_started([0])
    537         self._assert_agents_not_started([1, 2])
    538 
    539         self._agents[0].set_done(True)
    540         self._run_a_few_ticks()
    541         self._assert_agents_started([1])
    542         self._assert_agents_not_started([2])
    543 
    544 
    545     def test_zero_process_agent(self):
    546         self._setup_some_agents(5)
    547         self._agents[4].task.num_processes = 0
    548         self._run_a_few_ticks()
    549         self._assert_agents_started([0, 1, 2, 4])
    550         self._assert_agents_not_started([3])
    551 
    552 
    553 class PidfileRunMonitorTest(unittest.TestCase):
    554     execution_tag = 'test_tag'
    555     pid = 12345
    556     process = drone_manager.Process('myhost', pid)
    557     num_tests_failed = 1
    558 
    559     def setUp(self):
    560         self.god = mock.mock_god()
    561         self.mock_drone_manager = self.god.create_mock_class(
    562             drone_manager.DroneManager, 'drone_manager')
    563         self.god.stub_with(drone_manager, '_the_instance',
    564                            self.mock_drone_manager)
    565         self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',
    566                            self._mock_get_pidfile_timeout_secs)
    567 
    568         self.pidfile_id = object()
    569 
    570         (self.mock_drone_manager.get_pidfile_id_from
    571              .expect_call(self.execution_tag,
    572                           pidfile_name=drone_manager.AUTOSERV_PID_FILE)
    573              .and_return(self.pidfile_id))
    574 
    575         self.monitor = pidfile_monitor.PidfileRunMonitor()
    576         self.monitor.attach_to_existing_process(self.execution_tag)
    577 
    578     def tearDown(self):
    579         self.god.unstub_all()
    580 
    581 
    582     def _mock_get_pidfile_timeout_secs(self):
    583         return 300
    584 
    585 
    586     def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
    587                       use_second_read=False):
    588         contents = drone_manager.PidfileContents()
    589         if pid is not None:
    590             contents.process = drone_manager.Process('myhost', pid)
    591         contents.exit_status = exit_code
    592         contents.num_tests_failed = tests_failed
    593         self.mock_drone_manager.get_pidfile_contents.expect_call(
    594             self.pidfile_id, use_second_read=use_second_read).and_return(
    595             contents)
    596 
    597 
    598     def set_not_yet_run(self):
    599         self.setup_pidfile()
    600 
    601 
    602     def set_empty_pidfile(self):
    603         self.setup_pidfile()
    604 
    605 
    606     def set_running(self, use_second_read=False):
    607         self.setup_pidfile(self.pid, use_second_read=use_second_read)
    608 
    609 
    610     def set_complete(self, error_code, use_second_read=False):
    611         self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
    612                            use_second_read=use_second_read)
    613 
    614 
    615     def _check_monitor(self, expected_pid, expected_exit_status,
    616                        expected_num_tests_failed):
    617         if expected_pid is None:
    618             self.assertEquals(self.monitor._state.process, None)
    619         else:
    620             self.assertEquals(self.monitor._state.process.pid, expected_pid)
    621         self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
    622         self.assertEquals(self.monitor._state.num_tests_failed,
    623                           expected_num_tests_failed)
    624 
    625 
    626         self.god.check_playback()
    627 
    628 
    629     def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
    630                                   expected_num_tests_failed):
    631         self.monitor._read_pidfile()
    632         self._check_monitor(expected_pid, expected_exit_status,
    633                             expected_num_tests_failed)
    634 
    635 
    636     def _get_expected_tests_failed(self, expected_exit_status):
    637         if expected_exit_status is None:
    638             expected_tests_failed = None
    639         else:
    640             expected_tests_failed = self.num_tests_failed
    641         return expected_tests_failed
    642 
    643 
    644     def test_read_pidfile(self):
    645         self.set_not_yet_run()
    646         self._test_read_pidfile_helper(None, None, None)
    647 
    648         self.set_empty_pidfile()
    649         self._test_read_pidfile_helper(None, None, None)
    650 
    651         self.set_running()
    652         self._test_read_pidfile_helper(self.pid, None, None)
    653 
    654         self.set_complete(123)
    655         self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
    656 
    657 
    658     def test_read_pidfile_error(self):
    659         self.mock_drone_manager.get_pidfile_contents.expect_call(
    660             self.pidfile_id, use_second_read=False).and_return(
    661             drone_manager.InvalidPidfile('error'))
    662         self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,
    663                           self.monitor._read_pidfile)
    664         self.god.check_playback()
    665 
    666 
    667     def setup_is_running(self, is_running):
    668         self.mock_drone_manager.is_process_running.expect_call(
    669             self.process).and_return(is_running)
    670 
    671 
    672     def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
    673                                       expected_num_tests_failed):
    674         self.monitor._get_pidfile_info()
    675         self._check_monitor(expected_pid, expected_exit_status,
    676                             expected_num_tests_failed)
    677 
    678 
    679     def test_get_pidfile_info(self):
    680         """
    681         normal cases for get_pidfile_info
    682         """
    683         # running
    684         self.set_running()
    685         self.setup_is_running(True)
    686         self._test_get_pidfile_info_helper(self.pid, None, None)
    687 
    688         # exited during check
    689         self.set_running()
    690         self.setup_is_running(False)
    691         self.set_complete(123, use_second_read=True) # pidfile gets read again
    692         self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
    693 
    694         # completed
    695         self.set_complete(123)
    696         self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
    697 
    698 
    699     def test_get_pidfile_info_running_no_proc(self):
    700         """
    701         pidfile shows process running, but no proc exists
    702         """
    703         # running but no proc
    704         self.set_running()
    705         self.setup_is_running(False)
    706         self.set_running(use_second_read=True)
    707         self._test_get_pidfile_info_helper(self.pid, 1, 0)
    708         self.assertTrue(self.monitor.lost_process)
    709 
    710 
    711     def test_get_pidfile_info_not_yet_run(self):
    712         """
    713         pidfile hasn't been written yet
    714         """
    715         self.set_not_yet_run()
    716         self._test_get_pidfile_info_helper(None, None, None)
    717 
    718 
    719     def test_process_failed_to_write_pidfile(self):
    720         self.set_not_yet_run()
    721         self.monitor._start_time = (time.time() -
    722                                     pidfile_monitor._get_pidfile_timeout_secs() - 1)
    723         self._test_get_pidfile_info_helper(None, 1, 0)
    724         self.assertTrue(self.monitor.lost_process)
    725 
    726 
    727 class AgentTest(unittest.TestCase):
    728     def setUp(self):
    729         self.god = mock.mock_god()
    730         self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
    731                                                       'dispatcher')
    732 
    733 
    734     def tearDown(self):
    735         self.god.unstub_all()
    736 
    737 
    738     def _create_mock_task(self, name):
    739         task = self.god.create_mock_class(agent_task.AgentTask, name)
    740         task.num_processes = 1
    741         _set_host_and_qe_ids(task)
    742         return task
    743 
    744     def _create_agent(self, task):
    745         agent = monitor_db.Agent(task)
    746         agent.dispatcher = self._dispatcher
    747         return agent
    748 
    749 
    750     def _finish_agent(self, agent):
    751         while not agent.is_done():
    752             agent.tick()
    753 
    754 
    755     def test_agent_abort(self):
    756         task = self._create_mock_task('task')
    757         task.poll.expect_call()
    758         task.is_done.expect_call().and_return(False)
    759         task.abort.expect_call()
    760         task.aborted = True
    761 
    762         agent = self._create_agent(task)
    763         agent.tick()
    764         agent.abort()
    765         self._finish_agent(agent)
    766         self.god.check_playback()
    767 
    768 
    769     def _test_agent_abort_before_started_helper(self, ignore_abort=False):
    770         task = self._create_mock_task('task')
    771         task.abort.expect_call()
    772         if ignore_abort:
    773             task.aborted = False
    774             task.poll.expect_call()
    775             task.is_done.expect_call().and_return(True)
    776             task.success = True
    777         else:
    778             task.aborted = True
    779 
    780         agent = self._create_agent(task)
    781         agent.abort()
    782         self._finish_agent(agent)
    783         self.god.check_playback()
    784 
    785 
    786     def test_agent_abort_before_started(self):
    787         self._test_agent_abort_before_started_helper()
    788         self._test_agent_abort_before_started_helper(True)
    789 
    790 
    791 class JobSchedulingTest(BaseSchedulerTest):
    792     def _test_run_helper(self, expect_agent=True, expect_starting=False,
    793                          expect_pending=False):
    794         if expect_starting:
    795             expected_status = models.HostQueueEntry.Status.STARTING
    796         elif expect_pending:
    797             expected_status = models.HostQueueEntry.Status.PENDING
    798         else:
    799             expected_status = models.HostQueueEntry.Status.VERIFYING
    800         job = scheduler_models.Job.fetch('id = 1')[0]
    801         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
    802         assert queue_entry.job is job
    803         job.run_if_ready(queue_entry)
    804 
    805         self.god.check_playback()
    806 
    807         self._dispatcher._schedule_delay_tasks()
    808         self._dispatcher._schedule_running_host_queue_entries()
    809         agent = self._dispatcher._agents[0]
    810 
    811         actual_status = models.HostQueueEntry.smart_get(1).status
    812         self.assertEquals(expected_status, actual_status)
    813 
    814         if not expect_agent:
    815             self.assertEquals(agent, None)
    816             return
    817 
    818         self.assert_(isinstance(agent, monitor_db.Agent))
    819         self.assert_(agent.task)
    820         return agent.task
    821 
    822 
    823     def test_run_if_ready_delays(self):
    824         # Also tests Job.run_with_ready_delay() on atomic group jobs.
    825         django_job = self._create_job(hosts=[5, 6], atomic_group=1)
    826         job = scheduler_models.Job(django_job.id)
    827         self.assertEqual(1, job.synch_count)
    828         django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
    829         self.assertEqual(2, len(django_hqes))
    830         self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)
    831 
    832         def set_hqe_status(django_hqe, status):
    833             django_hqe.status = status
    834             django_hqe.save()
    835             scheduler_models.HostQueueEntry(django_hqe.id).host.set_status(status)
    836 
    837         # An initial state, our synch_count is 1
    838         set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)
    839         set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)
    840 
    841         # So that we don't depend on the config file value during the test.
    842         self.assert_(scheduler_config.config
    843                      .secs_to_wait_for_atomic_group_hosts is not None)
    844         self.god.stub_with(scheduler_config.config,
    845                            'secs_to_wait_for_atomic_group_hosts', 123456)
    846 
    847         # Get the pending one as a scheduler_models.HostQueueEntry object.
    848         hqe = scheduler_models.HostQueueEntry(django_hqes[1].id)
    849         self.assert_(not job._delay_ready_task)
    850         self.assertTrue(job.is_ready())
    851 
    852         # Ready with one pending, one verifying and an atomic group should
    853         # result in a DelayCallTask to re-check if we're ready a while later.
    854         job.run_if_ready(hqe)
    855         self.assertEquals('Waiting', hqe.status)
    856         self._dispatcher._schedule_delay_tasks()
    857         self.assertEquals('Pending', hqe.status)
    858         agent = self._dispatcher._agents[0]
    859         self.assert_(job._delay_ready_task)
    860         self.assert_(isinstance(agent, monitor_db.Agent))
    861         self.assert_(agent.task)
    862         delay_task = agent.task
    863         self.assert_(isinstance(delay_task, scheduler_models.DelayedCallTask))
    864         self.assert_(not delay_task.is_done())
    865 
    866         self.god.stub_function(delay_task, 'abort')
    867 
    868         self.god.stub_function(job, 'run')
    869 
    870         self.god.stub_function(job, '_pending_count')
    871         self.god.stub_with(job, 'synch_count', 9)
    872         self.god.stub_function(job, 'request_abort')
    873 
    874         # Test that the DelayedCallTask's callback queued up above does the
    875         # correct thing and does not call run if there are not enough hosts
    876         # in pending after the delay.
    877         job._pending_count.expect_call().and_return(0)
    878         job.request_abort.expect_call()
    879         delay_task._callback()
    880         self.god.check_playback()
    881 
    882         # Test that the DelayedCallTask's callback queued up above does the
    883         # correct thing and returns the Agent returned by job.run() if
    884         # there are still enough hosts pending after the delay.
    885         job.synch_count = 4
    886         job._pending_count.expect_call().and_return(4)
    887         job.run.expect_call(hqe)
    888         delay_task._callback()
    889         self.god.check_playback()
    890 
    891         job._pending_count.expect_call().and_return(4)
    892 
    893         # Adjust the delay deadline so that enough time has passed.
    894         job._delay_ready_task.end_time = time.time() - 111111
    895         job.run.expect_call(hqe)
    896         # ...the delay_expired condition should cause us to call run()
    897         self._dispatcher._handle_agents()
    898         self.god.check_playback()
    899         delay_task.success = False
    900 
    901         # Adjust the delay deadline back so that enough time has not passed.
    902         job._delay_ready_task.end_time = time.time() + 111111
    903         self._dispatcher._handle_agents()
    904         self.god.check_playback()
    905 
    906         # Now max_number_of_machines HQEs are in pending state.  Remaining
    907         # delay will now be ignored.
    908         other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
    909         self.god.unstub(job, 'run')
    910         self.god.unstub(job, '_pending_count')
    911         self.god.unstub(job, 'synch_count')
    912         self.god.unstub(job, 'request_abort')
    913         # ...the over_max_threshold test should cause us to call run()
    914         delay_task.abort.expect_call()
    915         other_hqe.on_pending()
    916         self.assertEquals('Starting', other_hqe.status)
    917         self.assertEquals('Starting', hqe.status)
    918         self.god.stub_function(job, 'run')
    919         self.god.unstub(delay_task, 'abort')
    920 
    921         hqe.set_status('Pending')
    922         other_hqe.set_status('Pending')
    923         # Now we're not over the max for the atomic group.  But all assigned
    924         # hosts are in pending state.  over_max_threshold should make us run().
    925         hqe.atomic_group.max_number_of_machines += 1
    926         hqe.atomic_group.save()
    927         job.run.expect_call(hqe)
    928         hqe.on_pending()
    929         self.god.check_playback()
    930         hqe.atomic_group.max_number_of_machines -= 1
    931         hqe.atomic_group.save()
    932 
    933         other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)
    934         self.assertTrue(hqe.job is other_hqe.job)
    935         # DBObject classes should reuse instances so these should be the same.
    936         self.assertEqual(job, other_hqe.job)
    937         self.assertEqual(other_hqe.job, hqe.job)
    938         # Be sure our delay was not lost during the other_hqe construction.
    939         self.assertEqual(job._delay_ready_task, delay_task)
    940         self.assert_(job._delay_ready_task)
    941         self.assertFalse(job._delay_ready_task.is_done())
    942         self.assertFalse(job._delay_ready_task.aborted)
    943 
    944         # We want the real run() to be called below.
    945         self.god.unstub(job, 'run')
    946 
    947         # We pass in the other HQE this time the same way it would happen
    948         # for real when one host finishes verifying and enters pending.
    949         job.run_if_ready(other_hqe)
    950 
    951         # The delayed task must be aborted by the actual run() call above.
    952         self.assertTrue(job._delay_ready_task.aborted)
    953         self.assertFalse(job._delay_ready_task.success)
    954         self.assertTrue(job._delay_ready_task.is_done())
    955 
    956         # Check that job run() and _finish_run() were called by the above:
    957         self._dispatcher._schedule_running_host_queue_entries()
    958         agent = self._dispatcher._agents[0]
    959         self.assert_(agent.task)
    960         task = agent.task
    961         self.assert_(isinstance(task, monitor_db.QueueTask))
    962         # Requery these hqes in order to verify the status from the DB.
    963         django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
    964         for entry in django_hqes:
    965             self.assertEqual(models.HostQueueEntry.Status.STARTING,
    966                              entry.status)
    967 
    968         # We're already running, but more calls to run_with_ready_delay can
    969         # continue to come in due to straggler hosts enter Pending.  Make
    970         # sure we don't do anything.
    971         self.god.stub_function(job, 'run')
    972         job.run_with_ready_delay(hqe)
    973         self.god.check_playback()
    974         self.god.unstub(job, 'run')
    975 
    976 
    977     def test_run_synchronous_atomic_group_ready(self):
    978         self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
    979         self._update_hqe("status='Pending', execution_subdir=''")
    980 
    981         queue_task = self._test_run_helper(expect_starting=True)
    982 
    983         self.assert_(isinstance(queue_task, monitor_db.QueueTask))
    984         # Atomic group jobs that do not depend on a specific label in the
    985         # atomic group will use the atomic group name as their group name.
    986         self.assertEquals(queue_task.queue_entries[0].get_group_name(),
    987                           'atomic1')
    988 
    989 
    990     def test_run_synchronous_atomic_group_with_label_ready(self):
    991         job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
    992         job.dependency_labels.add(self.label4)
    993         self._update_hqe("status='Pending', execution_subdir=''")
    994 
    995         queue_task = self._test_run_helper(expect_starting=True)
    996 
    997         self.assert_(isinstance(queue_task, monitor_db.QueueTask))
    998         # Atomic group jobs that also specify a label in the atomic group
    999         # will use the label name as their group name.
   1000         self.assertEquals(queue_task.queue_entries[0].get_group_name(),
   1001                           'label4')
   1002 
   1003 
   1004     def test_run_synchronous_ready(self):
   1005         self._create_job(hosts=[1, 2], synchronous=True)
   1006         self._update_hqe("status='Pending', execution_subdir=''")
   1007 
   1008         queue_task = self._test_run_helper(expect_starting=True)
   1009 
   1010         self.assert_(isinstance(queue_task, monitor_db.QueueTask))
   1011         self.assertEquals(queue_task.job.id, 1)
   1012         hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
   1013         self.assertEquals(hqe_ids, [1, 2])
   1014 
   1015 
   1016     def test_schedule_running_host_queue_entries_fail(self):
   1017         self._create_job(hosts=[2])
   1018         self._update_hqe("status='%s', execution_subdir=''" %
   1019                          models.HostQueueEntry.Status.PENDING)
   1020         job = scheduler_models.Job.fetch('id = 1')[0]
   1021         queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]
   1022         assert queue_entry.job is job
   1023         job.run_if_ready(queue_entry)
   1024         self.assertEqual(queue_entry.status,
   1025                          models.HostQueueEntry.Status.STARTING)
   1026         self.assert_(queue_entry.execution_subdir)
   1027         self.god.check_playback()
   1028 
   1029         class dummy_test_agent(object):
   1030             task = 'dummy_test_agent'
   1031         self._dispatcher._register_agent_for_ids(
   1032                 self._dispatcher._host_agents, [queue_entry.host.id],
   1033                 dummy_test_agent)
   1034 
   1035         # Attempted to schedule on a host that already has an agent.
   1036         self.assertRaises(scheduler_lib.SchedulerError,
   1037                           self._dispatcher._schedule_running_host_queue_entries)
   1038 
   1039 
   1040     def test_schedule_hostless_job(self):
   1041         job = self._create_job(hostless=True)
   1042         self.assertEqual(1, job.hostqueueentry_set.count())
   1043         hqe_query = scheduler_models.HostQueueEntry.fetch(
   1044                 'id = %s' % job.hostqueueentry_set.all()[0].id)
   1045         self.assertEqual(1, len(hqe_query))
   1046         hqe = hqe_query[0]
   1047 
   1048         self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status)
   1049         self.assertEqual(0, len(self._dispatcher._agents))
   1050 
   1051         self._dispatcher._schedule_new_jobs()
   1052 
   1053         self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
   1054         self.assertEqual(1, len(self._dispatcher._agents))
   1055 
   1056         self._dispatcher._schedule_new_jobs()
   1057 
   1058         # No change to previously schedule hostless job, and no additional agent
   1059         self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)
   1060         self.assertEqual(1, len(self._dispatcher._agents))
   1061 
   1062 
   1063 class TopLevelFunctionsTest(unittest.TestCase):
   1064     def setUp(self):
   1065         self.god = mock.mock_god()
   1066 
   1067 
   1068     def tearDown(self):
   1069         self.god.unstub_all()
   1070 
   1071 
   1072     def test_autoserv_command_line(self):
   1073         machines = 'abcd12,efgh34'
   1074         extra_args = ['-Z', 'hello']
   1075         expected_command_line_base = set((monitor_db._autoserv_path, '-p',
   1076                                           '-m', machines, '-r',
   1077                                           '--lab', 'True',
   1078                                           drone_manager.WORKING_DIRECTORY))
   1079 
   1080         expected_command_line = expected_command_line_base.union(
   1081                 ['--verbose']).union(extra_args)
   1082         command_line = set(
   1083                 monitor_db._autoserv_command_line(machines, extra_args))
   1084         self.assertEqual(expected_command_line, command_line)
   1085 
   1086         class FakeJob(object):
   1087             owner = 'Bob'
   1088             name = 'fake job name'
   1089             test_retry = 0
   1090             id = 1337
   1091 
   1092         class FakeHQE(object):
   1093             job = FakeJob
   1094 
   1095         expected_command_line = expected_command_line_base.union(
   1096                 ['-u', FakeJob.owner, '-l', FakeJob.name])
   1097         command_line = set(monitor_db._autoserv_command_line(
   1098                 machines, extra_args=[], queue_entry=FakeHQE, verbose=False))
   1099         self.assertEqual(expected_command_line, command_line)
   1100 
   1101 
   1102 class AgentTaskTest(unittest.TestCase,
   1103                     frontend_test_utils.FrontendTestMixin):
   1104     def setUp(self):
   1105         self._frontend_common_setup()
   1106 
   1107 
   1108     def tearDown(self):
   1109         self._frontend_common_teardown()
   1110 
   1111 
   1112     def _setup_drones(self):
   1113         self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
   1114         models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
   1115 
   1116         drones = []
   1117         for x in xrange(4):
   1118             drones.append(models.Drone.objects.create(hostname=str(x)))
   1119 
   1120         drone_set_1 = models.DroneSet.objects.create(name='1')
   1121         drone_set_1.drones.add(*drones[0:2])
   1122         drone_set_2 = models.DroneSet.objects.create(name='2')
   1123         drone_set_2.drones.add(*drones[2:4])
   1124         drone_set_3 = models.DroneSet.objects.create(name='3')
   1125 
   1126         job_1 = self._create_job_simple([self.hosts[0].id],
   1127                                         drone_set=drone_set_1)
   1128         job_2 = self._create_job_simple([self.hosts[0].id],
   1129                                         drone_set=drone_set_2)
   1130         job_3 = self._create_job_simple([self.hosts[0].id],
   1131                                         drone_set=drone_set_3)
   1132 
   1133         job_4 = self._create_job_simple([self.hosts[0].id])
   1134         job_4.drone_set = None
   1135         job_4.save()
   1136 
   1137         hqe_1 = job_1.hostqueueentry_set.all()[0]
   1138         hqe_2 = job_2.hostqueueentry_set.all()[0]
   1139         hqe_3 = job_3.hostqueueentry_set.all()[0]
   1140         hqe_4 = job_4.hostqueueentry_set.all()[0]
   1141 
   1142         return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()
   1143 
   1144 
   1145     def test_get_drone_hostnames_allowed_no_drones_in_set(self):
   1146         hqes, task = self._setup_drones()
   1147         task.queue_entry_ids = (hqes[2].id,)
   1148         self.assertEqual(set(), task.get_drone_hostnames_allowed())
   1149         self.god.check_playback()
   1150 
   1151 
   1152     def test_get_drone_hostnames_allowed_no_drone_set(self):
   1153         hqes, task = self._setup_drones()
   1154         hqe = hqes[3]
   1155         task.queue_entry_ids = (hqe.id,)
   1156 
   1157         result = object()
   1158 
   1159         self.god.stub_function(task, '_user_or_global_default_drone_set')
   1160         task._user_or_global_default_drone_set.expect_call(
   1161                 hqe.job, hqe.job.user()).and_return(result)
   1162 
   1163         self.assertEqual(result, task.get_drone_hostnames_allowed())
   1164         self.god.check_playback()
   1165 
   1166 
   1167     def test_get_drone_hostnames_allowed_success(self):
   1168         hqes, task = self._setup_drones()
   1169         task.queue_entry_ids = (hqes[0].id,)
   1170         self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed([]))
   1171         self.god.check_playback()
   1172 
   1173 
   1174     def test_get_drone_hostnames_allowed_multiple_jobs(self):
   1175         hqes, task = self._setup_drones()
   1176         task.queue_entry_ids = (hqes[0].id, hqes[1].id)
   1177         self.assertRaises(AssertionError,
   1178                           task.get_drone_hostnames_allowed)
   1179         self.god.check_playback()
   1180 
   1181 
   1182     def test_get_drone_hostnames_allowed_no_hqe(self):
   1183         class MockSpecialTask(object):
   1184             requested_by = object()
   1185 
   1186         class MockSpecialAgentTask(agent_task.SpecialAgentTask):
   1187             task = MockSpecialTask()
   1188             queue_entry_ids = []
   1189             def __init__(self, *args, **kwargs):
   1190                 super(agent_task.SpecialAgentTask, self).__init__()
   1191 
   1192         task = MockSpecialAgentTask()
   1193         self.god.stub_function(models.DroneSet, 'drone_sets_enabled')
   1194         self.god.stub_function(task, '_user_or_global_default_drone_set')
   1195 
   1196         result = object()
   1197         models.DroneSet.drone_sets_enabled.expect_call().and_return(True)
   1198         task._user_or_global_default_drone_set.expect_call(
   1199                 task.task, MockSpecialTask.requested_by).and_return(result)
   1200 
   1201         self.assertEqual(result, task.get_drone_hostnames_allowed())
   1202         self.god.check_playback()
   1203 
   1204 
   1205     def _setup_test_user_or_global_default_drone_set(self):
   1206         result = object()
   1207         class MockDroneSet(object):
   1208             def get_drone_hostnames(self):
   1209                 return result
   1210 
   1211         self.god.stub_function(models.DroneSet, 'get_default')
   1212         models.DroneSet.get_default.expect_call().and_return(MockDroneSet())
   1213         return result
   1214 
   1215 
   1216     def test_user_or_global_default_drone_set(self):
   1217         expected = object()
   1218         class MockDroneSet(object):
   1219             def get_drone_hostnames(self):
   1220                 return expected
   1221         class MockUser(object):
   1222             drone_set = MockDroneSet()
   1223 
   1224         self._setup_test_user_or_global_default_drone_set()
   1225 
   1226         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1227                 None, MockUser())
   1228 
   1229         self.assertEqual(expected, actual)
   1230         self.god.check_playback()
   1231 
   1232 
   1233     def test_user_or_global_default_drone_set_no_user(self):
   1234         expected = self._setup_test_user_or_global_default_drone_set()
   1235         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1236                 None, None)
   1237 
   1238         self.assertEqual(expected, actual)
   1239         self.god.check_playback()
   1240 
   1241 
   1242     def test_user_or_global_default_drone_set_no_user_drone_set(self):
   1243         class MockUser(object):
   1244             drone_set = None
   1245             login = None
   1246 
   1247         expected = self._setup_test_user_or_global_default_drone_set()
   1248         actual = agent_task.AgentTask()._user_or_global_default_drone_set(
   1249                 None, MockUser())
   1250 
   1251         self.assertEqual(expected, actual)
   1252         self.god.check_playback()
   1253 
   1254 
   1255     def test_abort_HostlessQueueTask(self):
   1256         hqe = self.god.create_mock_class(scheduler_models.HostQueueEntry,
   1257                                          'HostQueueEntry')
   1258         # If hqe is still in STARTING status, aborting the task should finish
   1259         # without changing hqe's status.
   1260         hqe.status = models.HostQueueEntry.Status.STARTING
   1261         hqe.job = None
   1262         hqe.id = 0
   1263         task = monitor_db.HostlessQueueTask(hqe)
   1264         task.abort()
   1265 
   1266         # If hqe is in RUNNING status, aborting the task should change hqe's
   1267         # status to Parsing, so FinalReparseTask can be scheduled.
   1268         hqe.set_status.expect_call('Parsing')
   1269         hqe.status = models.HostQueueEntry.Status.RUNNING
   1270         hqe.job = None
   1271         hqe.id = 0
   1272         task = monitor_db.HostlessQueueTask(hqe)
   1273         task.abort()
   1274 
   1275 
   1276 if __name__ == '__main__':
   1277     unittest.main()
   1278