Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 
      3 import cPickle
      4 import os, unittest
      5 import common
      6 from autotest_lib.client.bin import local_host
      7 from autotest_lib.client.common_lib import global_config
      8 from autotest_lib.client.common_lib import utils
      9 from autotest_lib.client.common_lib.test_utils import mock
     10 from autotest_lib.frontend import setup_django_lite_environment
     11 from autotest_lib.scheduler import drone_manager, drone_utility, drones
     12 from autotest_lib.scheduler import scheduler_config, site_drone_manager
     13 from autotest_lib.scheduler import thread_lib
     14 from autotest_lib.scheduler import pidfile_monitor
     15 from autotest_lib.server.hosts import ssh_host
     16 
     17 
     18 class MockDrone(drones._AbstractDrone):
     19     def __init__(self, name, active_processes=0, max_processes=10,
     20                  allowed_users=None, support_ssp=False):
     21         super(MockDrone, self).__init__()
     22         self.name = name
     23         self.hostname = name
     24         self.active_processes = active_processes
     25         self.max_processes = max_processes
     26         self.allowed_users = allowed_users
     27         self._host = 'mock_drone'
     28         self._support_ssp = support_ssp
     29         # maps method names list of tuples containing method arguments
     30         self._recorded_calls = {'queue_call': [],
     31                                 'send_file_to': []}
     32 
     33 
     34     def queue_call(self, method, *args, **kwargs):
     35         self._recorded_calls['queue_call'].append((method, args, kwargs))
     36 
     37 
     38     def call(self, method, *args, **kwargs):
     39         # don't bother differentiating between call() and queue_call()
     40         return self.queue_call(method, *args, **kwargs)
     41 
     42 
     43     def send_file_to(self, drone, source_path, destination_path,
     44                      can_fail=False):
     45         self._recorded_calls['send_file_to'].append(
     46                 (drone, source_path, destination_path))
     47 
     48 
     49     # method for use by tests
     50     def _check_for_recorded_call(self, method_name, arguments):
     51         recorded_arg_list = self._recorded_calls[method_name]
     52         was_called = arguments in recorded_arg_list
     53         if not was_called:
     54             print 'Recorded args:', recorded_arg_list
     55             print 'Expected:', arguments
     56         return was_called
     57 
     58 
     59     def was_call_queued(self, method, *args, **kwargs):
     60         return self._check_for_recorded_call('queue_call',
     61                                              (method, args, kwargs))
     62 
     63 
     64     def was_file_sent(self, drone, source_path, destination_path):
     65         return self._check_for_recorded_call('send_file_to',
     66                                              (drone, source_path,
     67                                               destination_path))
     68 
     69 
     70 class DroneManager(unittest.TestCase):
     71     _DRONE_INSTALL_DIR = '/drone/install/dir'
     72     _DRONE_RESULTS_DIR = os.path.join(_DRONE_INSTALL_DIR, 'results')
     73     _RESULTS_DIR = '/results/dir'
     74     _SOURCE_PATH = 'source/path'
     75     _DESTINATION_PATH = 'destination/path'
     76     _WORKING_DIRECTORY = 'working/directory'
     77     _USERNAME = 'my_user'
     78 
     79     def setUp(self):
     80         self.god = mock.mock_god()
     81         self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
     82                            self._DRONE_INSTALL_DIR)
     83         self.manager = drone_manager.DroneManager()
     84         self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
     85 
     86         # we don't want this to ever actually get called
     87         self.god.stub_function(drones, 'get_drone')
     88         # we don't want the DroneManager to go messing with global config
     89         def do_nothing():
     90             pass
     91         self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
     92 
     93         # set up some dummy drones
     94         self.mock_drone = MockDrone('mock_drone')
     95         self.manager._drones[self.mock_drone.name] = self.mock_drone
     96         self.results_drone = MockDrone('results_drone', 0, 10)
     97         self.manager._results_drone = self.results_drone
     98 
     99         self.mock_drone_process = drone_manager.Process(self.mock_drone.name, 0)
    100 
    101 
    102     def tearDown(self):
    103         self.god.unstub_all()
    104 
    105 
    106     def _test_choose_drone_for_execution_helper(self, processes_info_list,
    107                                                 requested_processes,
    108                                                 require_ssp=False):
    109         for index, process_info in enumerate(processes_info_list):
    110             if len(process_info) == 2:
    111                 active_processes, max_processes = process_info
    112                 support_ssp = False
    113             else:
    114                 active_processes, max_processes, support_ssp = process_info
    115             self.manager._enqueue_drone(MockDrone(
    116                     index, active_processes, max_processes, allowed_users=None,
    117                     support_ssp=support_ssp))
    118 
    119         return self.manager._choose_drone_for_execution(
    120                 requested_processes, self._USERNAME, None, require_ssp)
    121 
    122 
    123     def test_choose_drone_for_execution(self):
    124         drone = self._test_choose_drone_for_execution_helper([(1, 2), (0, 2)],
    125                                                              1)
    126         self.assertEquals(drone.name, 1)
    127 
    128 
    129     def test_choose_drone_for_execution_some_full(self):
    130         drone = self._test_choose_drone_for_execution_helper([(0, 1), (1, 3)],
    131                                                              2)
    132         self.assertEquals(drone.name, 1)
    133 
    134 
    135     def test_choose_drone_for_execution_all_full(self):
    136         drone = self._test_choose_drone_for_execution_helper([(2, 1), (3, 2)],
    137                                                              1)
    138         self.assertEquals(drone.name, 1)
    139 
    140 
    141     def test_choose_drone_for_execution_all_full_same_percentage_capacity(self):
    142         drone = self._test_choose_drone_for_execution_helper([(5, 3), (10, 6)],
    143                                                              1)
    144         self.assertEquals(drone.name, 1)
    145 
    146 
    147     def test_choose_drone_for_execution_no_ssp_support(self):
    148         drone = self._test_choose_drone_for_execution_helper(
    149                 [(0, 1), (1, 3)], 1, True)
    150         self.assertEquals(drone.name, 0)
    151 
    152 
    153     def test_choose_drone_for_execution_with_ssp_support(self):
    154         self.mock_drone._support_ssp = True
    155         drone = self._test_choose_drone_for_execution_helper(
    156                 [(0, 1), (1, 3, True)], 1, True)
    157         self.assertEquals(drone.name, 1)
    158 
    159 
    160     def test_user_restrictions(self):
    161         # this drone is restricted to a different user
    162         self.manager._enqueue_drone(MockDrone(1, max_processes=10,
    163                                               allowed_users=['fakeuser']))
    164         # this drone is allowed but has lower capacity
    165         self.manager._enqueue_drone(MockDrone(2, max_processes=2,
    166                                               allowed_users=[self._USERNAME]))
    167 
    168         self.assertEquals(2,
    169                           self.manager.max_runnable_processes(self._USERNAME,
    170                                                               None))
    171         drone = self.manager._choose_drone_for_execution(
    172                 1, username=self._USERNAME, drone_hostnames_allowed=None)
    173         self.assertEquals(drone.name, 2)
    174 
    175 
    176     def test_user_restrictions_with_full_drone(self):
    177         # this drone is restricted to a different user
    178         self.manager._enqueue_drone(MockDrone(1, max_processes=10,
    179                                               allowed_users=['fakeuser']))
    180         # this drone is allowed but is full
    181         self.manager._enqueue_drone(MockDrone(2, active_processes=3,
    182                                               max_processes=2,
    183                                               allowed_users=[self._USERNAME]))
    184 
    185         self.assertEquals(0,
    186                           self.manager.max_runnable_processes(self._USERNAME,
    187                                                               None))
    188         drone = self.manager._choose_drone_for_execution(
    189                 1, username=self._USERNAME, drone_hostnames_allowed=None)
    190         self.assertEquals(drone.name, 2)
    191 
    192 
    193     def _setup_test_drone_restrictions(self, active_processes=0):
    194         self.manager._enqueue_drone(MockDrone(
    195                 1, active_processes=active_processes, max_processes=10))
    196         self.manager._enqueue_drone(MockDrone(
    197                 2, active_processes=active_processes, max_processes=5))
    198         self.manager._enqueue_drone(MockDrone(
    199                 3, active_processes=active_processes, max_processes=2))
    200 
    201 
    202     def test_drone_restrictions_allow_any(self):
    203         self._setup_test_drone_restrictions()
    204         self.assertEquals(10,
    205                           self.manager.max_runnable_processes(self._USERNAME,
    206                                                               None))
    207         drone = self.manager._choose_drone_for_execution(
    208                 1, username=self._USERNAME, drone_hostnames_allowed=None)
    209         self.assertEqual(drone.name, 1)
    210 
    211 
    212     def test_drone_restrictions_under_capacity(self):
    213         self._setup_test_drone_restrictions()
    214         drone_hostnames_allowed = (2, 3)
    215         self.assertEquals(
    216                 5, self.manager.max_runnable_processes(self._USERNAME,
    217                                                        drone_hostnames_allowed))
    218         drone = self.manager._choose_drone_for_execution(
    219                 1, username=self._USERNAME,
    220                 drone_hostnames_allowed=drone_hostnames_allowed)
    221 
    222         self.assertEqual(drone.name, 2)
    223 
    224 
    225     def test_drone_restrictions_over_capacity(self):
    226         self._setup_test_drone_restrictions(active_processes=6)
    227         drone_hostnames_allowed = (2, 3)
    228         self.assertEquals(
    229                 0, self.manager.max_runnable_processes(self._USERNAME,
    230                                                        drone_hostnames_allowed))
    231         drone = self.manager._choose_drone_for_execution(
    232                 7, username=self._USERNAME,
    233                 drone_hostnames_allowed=drone_hostnames_allowed)
    234         self.assertEqual(drone.name, 2)
    235 
    236 
    237     def test_drone_restrictions_allow_none(self):
    238         self._setup_test_drone_restrictions()
    239         drone_hostnames_allowed = ()
    240         self.assertEquals(
    241                 0, self.manager.max_runnable_processes(self._USERNAME,
    242                                                        drone_hostnames_allowed))
    243         drone = self.manager._choose_drone_for_execution(
    244                 1, username=self._USERNAME,
    245                 drone_hostnames_allowed=drone_hostnames_allowed)
    246         self.assertEqual(drone, None)
    247 
    248 
    249     def test_initialize(self):
    250         results_hostname = 'results_repo'
    251         results_install_dir = '/results/install'
    252         global_config.global_config.override_config_value(
    253                 scheduler_config.CONFIG_SECTION,
    254                 'results_host_installation_directory', results_install_dir)
    255 
    256         (drones.get_drone.expect_call(self.mock_drone.name)
    257          .and_return(self.mock_drone))
    258 
    259         results_drone = MockDrone('results_drone')
    260         self.god.stub_function(results_drone, 'set_autotest_install_dir')
    261         drones.get_drone.expect_call(results_hostname).and_return(results_drone)
    262         results_drone.set_autotest_install_dir.expect_call(results_install_dir)
    263 
    264         self.manager.initialize(base_results_dir=self._RESULTS_DIR,
    265                                 drone_hostnames=[self.mock_drone.name],
    266                                 results_repository_hostname=results_hostname)
    267 
    268         self.assert_(self.mock_drone.was_call_queued(
    269                 'initialize', self._DRONE_RESULTS_DIR + '/'))
    270         self.god.check_playback()
    271 
    272 
    273     def test_execute_command(self):
    274         self.manager._enqueue_drone(self.mock_drone)
    275 
    276         pidfile_name = 'my_pidfile'
    277         log_file = 'log_file'
    278 
    279         pidfile_id = self.manager.execute_command(
    280                 command=['test', drone_manager.WORKING_DIRECTORY],
    281                 working_directory=self._WORKING_DIRECTORY,
    282                 pidfile_name=pidfile_name,
    283                 num_processes=1,
    284                 log_file=log_file)
    285 
    286         full_working_directory = os.path.join(self._DRONE_RESULTS_DIR,
    287                                               self._WORKING_DIRECTORY)
    288         self.assertEquals(pidfile_id.path,
    289                           os.path.join(full_working_directory, pidfile_name))
    290         self.assert_(self.mock_drone.was_call_queued(
    291                 'execute_command', ['test', full_working_directory],
    292                 full_working_directory,
    293                 os.path.join(self._DRONE_RESULTS_DIR, log_file), pidfile_name))
    294 
    295 
    296     def test_attach_file_to_execution(self):
    297         self.manager._enqueue_drone(self.mock_drone)
    298 
    299         contents = 'my\ncontents'
    300         attached_path = self.manager.attach_file_to_execution(
    301                 self._WORKING_DIRECTORY, contents)
    302         self.manager.execute_command(command=['test'],
    303                                      working_directory=self._WORKING_DIRECTORY,
    304                                      pidfile_name='mypidfile',
    305                                      num_processes=1,
    306                                      drone_hostnames_allowed=None)
    307 
    308         self.assert_(self.mock_drone.was_call_queued(
    309                 'write_to_file',
    310                 os.path.join(self._DRONE_RESULTS_DIR, attached_path),
    311                 contents))
    312 
    313 
    314     def test_copy_results_on_drone(self):
    315         self.manager.copy_results_on_drone(self.mock_drone_process,
    316                                            self._SOURCE_PATH,
    317                                            self._DESTINATION_PATH)
    318         self.assert_(self.mock_drone.was_call_queued(
    319                 'copy_file_or_directory',
    320                 os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
    321                 os.path.join(self._DRONE_RESULTS_DIR, self._DESTINATION_PATH)))
    322 
    323 
    324     def test_copy_to_results_repository(self):
    325         site_drone_manager.ENABLE_ARCHIVING = True
    326         self.manager.copy_to_results_repository(self.mock_drone_process,
    327                                                 self._SOURCE_PATH)
    328         self.assert_(self.mock_drone.was_file_sent(
    329                 self.results_drone,
    330                 os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
    331                 os.path.join(self._RESULTS_DIR, self._SOURCE_PATH)))
    332 
    333 
    334     def test_write_lines_to_file(self):
    335         file_path = 'file/path'
    336         lines = ['line1', 'line2']
    337         written_data = 'line1\nline2\n'
    338 
    339         # write to results repository
    340         self.manager.write_lines_to_file(file_path, lines)
    341         self.assert_(self.results_drone.was_call_queued(
    342                 'write_to_file', os.path.join(self._RESULTS_DIR, file_path),
    343                 written_data))
    344 
    345         # write to a drone
    346         self.manager.write_lines_to_file(
    347                 file_path, lines, paired_with_process=self.mock_drone_process)
    348         self.assert_(self.mock_drone.was_call_queued(
    349                 'write_to_file',
    350                 os.path.join(self._DRONE_RESULTS_DIR, file_path), written_data))
    351 
    352 
    353     def test_pidfile_expiration(self):
    354         self.god.stub_with(self.manager, '_get_max_pidfile_refreshes',
    355                            lambda: 0)
    356         pidfile_id = self.manager.get_pidfile_id_from('tag', 'name')
    357         self.manager.register_pidfile(pidfile_id)
    358         self.manager._drop_old_pidfiles()
    359         self.manager._drop_old_pidfiles()
    360         self.assertFalse(self.manager._registered_pidfile_info)
    361 
    362 
    363 class ThreadedDroneTest(unittest.TestCase):
    364     _DRONE_INSTALL_DIR = '/drone/install/dir'
    365     _RESULTS_DIR = '/results/dir'
    366     _DRONE_CLASS = drones._RemoteDrone
    367     _DRONE_HOST = ssh_host.SSHHost
    368 
    369 
    370     def create_drone(self, drone_hostname, mock_hostname,
    371                      timestamp_remote_calls=False):
    372         """Create and initialize a Remote Drone.
    373 
    374         @return: A remote drone instance.
    375         """
    376         mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
    377         self.god.stub_function(drones.drone_utility, 'create_host')
    378         drones.drone_utility.create_host.expect_call(drone_hostname).and_return(
    379                 mock_host)
    380         mock_host.is_up.expect_call().and_return(True)
    381         return self._DRONE_CLASS(drone_hostname,
    382                                  timestamp_remote_calls=timestamp_remote_calls)
    383 
    384 
    385     def create_fake_pidfile_info(self, tag='tag', name='name'):
    386         pidfile_id = self.manager.get_pidfile_id_from(tag, name)
    387         self.manager.register_pidfile(pidfile_id)
    388         return self.manager._registered_pidfile_info
    389 
    390 
    391     def setUp(self):
    392         self.god = mock.mock_god()
    393         self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
    394                            self._DRONE_INSTALL_DIR)
    395         self.manager = drone_manager.DroneManager()
    396         self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
    397 
    398         # we don't want this to ever actually get called
    399         self.god.stub_function(drones, 'get_drone')
    400         # we don't want the DroneManager to go messing with global config
    401         def do_nothing():
    402             pass
    403         self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
    404 
    405         self.results_drone = MockDrone('results_drone', 0, 10)
    406         self.manager._results_drone = self.results_drone
    407         self.drone_utility_path = 'mock-drone-utility-path'
    408         self.mock_return = {'results': ['mock results'],
    409                             'warnings': []}
    410 
    411 
    412     def tearDown(self):
    413         self.god.unstub_all()
    414 
    415     def test_trigger_refresh(self):
    416         """Test drone manager trigger refresh."""
    417         self.god.stub_with(self._DRONE_CLASS, '_drone_utility_path',
    418                            self.drone_utility_path)
    419         mock_drone = self.create_drone('fakedrone1', 'fakehost1')
    420         self.manager._drones[mock_drone.hostname] = mock_drone
    421 
    422         # Create some fake pidfiles and confirm that a refresh call is
    423         # executed on each drone host, with the same pidfile paths. Then
    424         # check that each drone gets a key in the returned results dictionary.
    425         for i in range(0, 1):
    426             pidfile_info = self.create_fake_pidfile_info(
    427                     'tag%s' % i, 'name%s' %i)
    428         pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
    429         refresh_call = drone_utility.call('refresh', pidfile_paths)
    430         expected_results = {}
    431         mock_result = utils.CmdResult(
    432                 stdout=cPickle.dumps(self.mock_return))
    433         for drone in self.manager.get_drones():
    434             drone._host.run.expect_call(
    435                     'python %s' % self.drone_utility_path,
    436                     stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
    437                     connect_timeout=mock.is_instance_comparator(int)
    438                 ).and_return(mock_result)
    439             expected_results[drone] = self.mock_return['results']
    440         self.manager.trigger_refresh()
    441         self.assertTrue(self.manager._refresh_task_queue.get_results() ==
    442                         expected_results)
    443         self.god.check_playback()
    444 
    445 
    446     def test_sync_refresh(self):
    447         """Test drone manager sync refresh."""
    448 
    449         mock_drone = self.create_drone('fakedrone1', 'fakehost1')
    450         self.manager._drones[mock_drone.hostname] = mock_drone
    451 
    452         # Insert some drone_utility results into the results queue, then
    453         # check that get_results returns it in the right format, and that
    454         # the rest of sync_refresh populates the right datastructures for
    455         # correct handling of agents. Also confirm that this method of
    456         # syncing is sufficient for the monitor to pick up the exit status
    457         # of the process in the same way it would in handle_agents.
    458         pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
    459         pidfiles = {pidfile_path: '123\n12\n0\n'}
    460         drone_utility_results = {
    461                 'pidfiles': pidfiles,
    462                 'autoserv_processes':{},
    463                 'all_processes':{},
    464                 'parse_processes':{},
    465                 'pidfiles_second_read':pidfiles,
    466         }
    467         # Our manager instance isn't the drone manager singletone that the
    468         # pidfile_monitor will use by default, becuase setUp doesn't call
    469         # drone_manager.instance().
    470         self.god.stub_with(drone_manager, '_the_instance', self.manager)
    471         monitor = pidfile_monitor.PidfileRunMonitor()
    472         monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
    473         self.manager.register_pidfile(monitor.pidfile_id)
    474         self.assertTrue(monitor._state.exit_status == None)
    475 
    476         self.manager._refresh_task_queue.results_queue.put(
    477                 thread_lib.ThreadedTaskQueue.result(
    478                     mock_drone, [drone_utility_results]))
    479         self.manager.sync_refresh()
    480         pidfiles = self.manager._pidfiles
    481         pidfile_id = pidfiles.keys()[0]
    482         pidfile_contents = pidfiles[pidfile_id]
    483 
    484         self.assertTrue(
    485                 pidfile_id.path == pidfile_path and
    486                 pidfile_contents.process.pid == 123 and
    487                 pidfile_contents.process.hostname ==
    488                         mock_drone.hostname and
    489                 pidfile_contents.exit_status == 12 and
    490                 pidfile_contents.num_tests_failed == 0)
    491         self.assertTrue(monitor.exit_code() == 12)
    492         self.god.check_playback()
    493 
    494 
    495 class ThreadedLocalhostDroneTest(ThreadedDroneTest):
    496     _DRONE_CLASS = drones._LocalDrone
    497     _DRONE_HOST = local_host.LocalHost
    498 
    499 
    500     def create_drone(self, drone_hostname, mock_hostname,
    501                      timestamp_remote_calls=False):
    502         """Create and initialize a Remote Drone.
    503 
    504         @return: A remote drone instance.
    505         """
    506         mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
    507         self.god.stub_function(drones.drone_utility, 'create_host')
    508         local_drone = self._DRONE_CLASS(
    509                 timestamp_remote_calls=timestamp_remote_calls)
    510         self.god.stub_with(local_drone, '_host', mock_host)
    511         return local_drone
    512 
    513 
    514 if __name__ == '__main__':
    515     unittest.main()
    516