Home | History | Annotate | Download | only in scheduler
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Tests for the drone managers thread queue."""
      8 
      9 import cPickle
     10 import logging
     11 import Queue
     12 import unittest
     13 
     14 import common
     15 from autotest_lib.client.common_lib import utils
     16 from autotest_lib.client.common_lib.test_utils import mock
     17 from autotest_lib.scheduler import drone_task_queue
     18 from autotest_lib.scheduler import drones
     19 from autotest_lib.scheduler import thread_lib
     20 from autotest_lib.server.hosts import ssh_host
     21 
     22 
     23 class DroneThreadLibTest(unittest.TestCase):
     24     """Threaded task queue drone library tests."""
     25 
     26     def create_remote_drone(self, hostname):
     27         """Create and initialize a Remote Drone.
     28 
     29         @param hostname: The name of the host for the remote drone.
     30 
     31         @return: A remote drone instance.
     32         """
     33         drones.drone_utility.create_host.expect_call(hostname).and_return(
     34                 self._mock_host)
     35         self._mock_host.is_up.expect_call().and_return(True)
     36         return drones._RemoteDrone(hostname, timestamp_remote_calls=False)
     37 
     38 
     39     def setUp(self):
     40         self.god = mock.mock_god()
     41         self._mock_host = self.god.create_mock_class(ssh_host.SSHHost,
     42                                                      'mock SSHHost')
     43         self.god.stub_function(drones.drone_utility, 'create_host')
     44         self.drone_utility_path = 'mock-drone-utility-path'
     45         self.mock_return = {'results': ['mock results'],
     46                             'warnings': []}
     47         self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
     48                 self.drone_utility_path)
     49 
     50 
     51     def tearDown(self):
     52         self.god.unstub_all()
     53 
     54 
     55     def test_worker(self):
     56         """Test the worker method of a ThreadedTaskQueue."""
     57         # Invoke the worker method with a drone that has a queued call and check
     58         # that the drones host.run method is invoked for the call, and the
     59         # results queue contains the expected results.
     60         drone = self.create_remote_drone('fakehostname')
     61         task_queue = thread_lib.ThreadedTaskQueue()
     62 
     63         drone.queue_call('foo')
     64         mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
     65         self._mock_host.run.expect_call(
     66                 'python %s' % self.drone_utility_path,
     67                 stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
     68                 connect_timeout=mock.is_instance_comparator(int)).and_return(
     69                         mock_result)
     70         task_queue.worker(drone, task_queue.results_queue)
     71         result = task_queue.results_queue.get()
     72 
     73         self.assertTrue(task_queue.results_queue.empty() and
     74                         result.drone == drone and
     75                         result.results == self.mock_return['results'])
     76         self.god.check_playback()
     77 
     78 
     79     def test_wait_on_drones(self):
     80         """Test waiting on drone threads."""
     81 
     82         def waiting_func(queue):
     83             while len(queue.queue) < 2:
     84                 continue
     85             logging.warning('Consuming thread finished.')
     86             queue.put(3)
     87 
     88         def exception_func(queue):
     89             while queue.empty():
     90                 continue
     91             queue.put(2)
     92             logging.warning('Failing thread raising error.')
     93             raise ValueError('Value error')
     94 
     95         def quick_func():
     96             return
     97 
     98         # Create 2 threads, one of which raises an exception while the other
     99         # just exits normally. Insert both threads into the thread_queue against
    100         # mock drones and confirm that:
    101         # a. The task queue waits for both threads, though the first one fails.
    102         # b. The task queue records the right DroneTaskQueueException, which
    103         #       contains the original exception.
    104         # c. The failing thread records its own exception instead of raising it.
    105         task_queue = thread_lib.ThreadedTaskQueue()
    106         drone1 = self.create_remote_drone('fakehostname1')
    107         drone2 = self.create_remote_drone('fakehostname2')
    108         sync_queue = Queue.Queue()
    109 
    110         waiting_worker = thread_lib.ExceptionRememberingThread(
    111                 target=waiting_func, args=(sync_queue,))
    112         failing_worker = thread_lib.ExceptionRememberingThread(
    113                 target=exception_func, args=(sync_queue,))
    114         task_queue.drone_threads[drone1] = waiting_worker
    115         task_queue.drone_threads[drone2] = failing_worker
    116         master_thread = thread_lib.ExceptionRememberingThread(
    117                 target=task_queue.wait_on_drones)
    118 
    119         thread_list = [failing_worker, waiting_worker, master_thread]
    120         for thread in thread_list:
    121             thread.setDaemon(True)
    122             thread.start()
    123         sync_queue.put(1)
    124         master_thread.join()
    125 
    126         self.assertTrue(isinstance(master_thread.err,
    127                                    drone_task_queue.DroneTaskQueueException))
    128         self.assertTrue(isinstance(failing_worker.err, ValueError))
    129         self.assertTrue(str(failing_worker.err) in str(master_thread.err))
    130         self.assertTrue(3 in list(sync_queue.queue))
    131         self.assertTrue(task_queue.drone_threads == {})
    132 
    133         # Call wait_on_drones after the child thread has exited.
    134         quick_worker = thread_lib.ExceptionRememberingThread(target=quick_func)
    135         task_queue.drone_threads[drone1] = quick_worker
    136         quick_worker.start()
    137         while quick_worker.isAlive():
    138             continue
    139         task_queue.wait_on_drones()
    140         self.assertTrue(task_queue.drone_threads == {})
    141 
    142 
    143     def test_get_results(self):
    144         """Test retrieving results from the results queue."""
    145 
    146         # Insert results for the same drone twice into the results queue
    147         # and confirm that an exception is raised.
    148         task_queue = thread_lib.ThreadedTaskQueue()
    149         drone1 = self.create_remote_drone('fakehostname1')
    150         drone2 = self.create_remote_drone('fakehostname2')
    151         task_queue.results_queue.put(
    152                 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
    153         task_queue.results_queue.put(
    154                 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
    155         self.god.stub_function(task_queue, 'wait_on_drones')
    156         task_queue.wait_on_drones.expect_call()
    157         self.assertRaises(drone_task_queue.DroneTaskQueueException,
    158                           task_queue.get_results)
    159 
    160         # Insert results for different drones and check that they're returned
    161         # in a drone results dict.
    162         self.assertTrue(task_queue.results_queue.empty())
    163         task_queue.results_queue.put(
    164                 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
    165         task_queue.results_queue.put(
    166                 thread_lib.ThreadedTaskQueue.result(drone2, self.mock_return))
    167         task_queue.wait_on_drones.expect_call()
    168         results = task_queue.get_results()
    169         self.assertTrue(results[drone1] == self.mock_return and
    170                         results[drone2] == self.mock_return)
    171         self.god.check_playback()
    172 
    173 
    174     def test_execute(self):
    175         """Test task queue execute."""
    176         drone1 = self.create_remote_drone('fakehostname1')
    177         drone2 = self.create_remote_drone('fakehostname2')
    178         drone3 = self.create_remote_drone('fakehostname3')
    179 
    180         # Check task queue exception conditions.
    181         task_queue = thread_lib.ThreadedTaskQueue()
    182         task_queue.results_queue.put(1)
    183         self.assertRaises(drone_task_queue.DroneTaskQueueException,
    184                           task_queue.execute, [])
    185         task_queue.results_queue.get()
    186         task_queue.drone_threads[drone1] = None
    187         self.assertRaises(drone_task_queue.DroneTaskQueueException,
    188                           task_queue.execute, [])
    189         task_queue.drone_threads = {}
    190 
    191         # Queue 2 calls against each drone, and confirm that the host's
    192         # run method is called 3 times. Then check the threads created,
    193         # and finally compare results returned by the task queue against
    194         # the mock results.
    195         drones = [drone1, drone2, drone3]
    196         for drone in drones:
    197             drone.queue_call('foo')
    198             drone.queue_call('bar')
    199             mock_result = utils.CmdResult(
    200                     stdout=cPickle.dumps(self.mock_return))
    201             self._mock_host.run.expect_call(
    202                     'python %s' % self.drone_utility_path,
    203                     stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
    204                     connect_timeout=mock.is_instance_comparator(int)
    205                     ).and_return(mock_result)
    206         task_queue.execute(drones, wait=False)
    207         self.assertTrue(set(task_queue.drone_threads.keys()) == set(drones))
    208         for drone, thread in task_queue.drone_threads.iteritems():
    209             self.assertTrue(drone.hostname in thread.getName())
    210             self.assertTrue(thread.isDaemon())
    211             self.assertRaises(RuntimeError, thread.start)
    212         results = task_queue.get_results()
    213         for drone, result in results.iteritems():
    214             self.assertTrue(result == self.mock_return['results'])
    215 
    216         # Test synchronous execute
    217         drone1.queue_call('foo')
    218         mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
    219         self._mock_host.run.expect_call(
    220                 'python %s' % self.drone_utility_path,
    221                 stdin=cPickle.dumps(drone1.get_calls()), stdout_tee=None,
    222                 connect_timeout=mock.is_instance_comparator(int)).and_return(
    223                         mock_result)
    224         self.assertTrue(task_queue.execute(drones, wait=True)[drone1] ==
    225                         self.mock_return['results'])
    226         self.god.check_playback()
    227 
    228 
    229 if __name__ == '__main__':
    230     unittest.main()
    231