1 #!/usr/bin/python 2 # 3 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 4 # Use of this source code is governed by a BSD-style license that can be 5 # found in the LICENSE file. 6 7 """Tests for the drone managers thread queue.""" 8 9 import cPickle 10 import logging 11 import Queue 12 import unittest 13 14 import common 15 from autotest_lib.client.common_lib import utils 16 from autotest_lib.client.common_lib.test_utils import mock 17 from autotest_lib.scheduler import drone_task_queue 18 from autotest_lib.scheduler import drones 19 from autotest_lib.scheduler import thread_lib 20 from autotest_lib.server.hosts import ssh_host 21 22 23 class DroneThreadLibTest(unittest.TestCase): 24 """Threaded task queue drone library tests.""" 25 26 def create_remote_drone(self, hostname): 27 """Create and initialize a Remote Drone. 28 29 @param hostname: The name of the host for the remote drone. 30 31 @return: A remote drone instance. 32 """ 33 drones.drone_utility.create_host.expect_call(hostname).and_return( 34 self._mock_host) 35 self._mock_host.is_up.expect_call().and_return(True) 36 return drones._RemoteDrone(hostname, timestamp_remote_calls=False) 37 38 39 def setUp(self): 40 self.god = mock.mock_god() 41 self._mock_host = self.god.create_mock_class(ssh_host.SSHHost, 42 'mock SSHHost') 43 self.god.stub_function(drones.drone_utility, 'create_host') 44 self.drone_utility_path = 'mock-drone-utility-path' 45 self.mock_return = {'results': ['mock results'], 46 'warnings': []} 47 self.god.stub_with(drones._RemoteDrone, '_drone_utility_path', 48 self.drone_utility_path) 49 50 51 def tearDown(self): 52 self.god.unstub_all() 53 54 55 def test_worker(self): 56 """Test the worker method of a ThreadedTaskQueue.""" 57 # Invoke the worker method with a drone that has a queued call and check 58 # that the drones host.run method is invoked for the call, and the 59 # results queue contains the expected results. 60 drone = self.create_remote_drone('fakehostname') 61 task_queue = thread_lib.ThreadedTaskQueue() 62 63 drone.queue_call('foo') 64 mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return)) 65 self._mock_host.run.expect_call( 66 'python %s' % self.drone_utility_path, 67 stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None, 68 connect_timeout=mock.is_instance_comparator(int)).and_return( 69 mock_result) 70 task_queue.worker(drone, task_queue.results_queue) 71 result = task_queue.results_queue.get() 72 73 self.assertTrue(task_queue.results_queue.empty() and 74 result.drone == drone and 75 result.results == self.mock_return['results']) 76 self.god.check_playback() 77 78 79 def test_wait_on_drones(self): 80 """Test waiting on drone threads.""" 81 82 def waiting_func(queue): 83 while len(queue.queue) < 2: 84 continue 85 logging.warning('Consuming thread finished.') 86 queue.put(3) 87 88 def exception_func(queue): 89 while queue.empty(): 90 continue 91 queue.put(2) 92 logging.warning('Failing thread raising error.') 93 raise ValueError('Value error') 94 95 def quick_func(): 96 return 97 98 # Create 2 threads, one of which raises an exception while the other 99 # just exits normally. Insert both threads into the thread_queue against 100 # mock drones and confirm that: 101 # a. The task queue waits for both threads, though the first one fails. 102 # b. The task queue records the right DroneTaskQueueException, which 103 # contains the original exception. 104 # c. The failing thread records its own exception instead of raising it. 105 task_queue = thread_lib.ThreadedTaskQueue() 106 drone1 = self.create_remote_drone('fakehostname1') 107 drone2 = self.create_remote_drone('fakehostname2') 108 sync_queue = Queue.Queue() 109 110 waiting_worker = thread_lib.ExceptionRememberingThread( 111 target=waiting_func, args=(sync_queue,)) 112 failing_worker = thread_lib.ExceptionRememberingThread( 113 target=exception_func, args=(sync_queue,)) 114 task_queue.drone_threads[drone1] = waiting_worker 115 task_queue.drone_threads[drone2] = failing_worker 116 master_thread = thread_lib.ExceptionRememberingThread( 117 target=task_queue.wait_on_drones) 118 119 thread_list = [failing_worker, waiting_worker, master_thread] 120 for thread in thread_list: 121 thread.setDaemon(True) 122 thread.start() 123 sync_queue.put(1) 124 master_thread.join() 125 126 self.assertTrue(isinstance(master_thread.err, 127 drone_task_queue.DroneTaskQueueException)) 128 self.assertTrue(isinstance(failing_worker.err, ValueError)) 129 self.assertTrue(str(failing_worker.err) in str(master_thread.err)) 130 self.assertTrue(3 in list(sync_queue.queue)) 131 self.assertTrue(task_queue.drone_threads == {}) 132 133 # Call wait_on_drones after the child thread has exited. 134 quick_worker = thread_lib.ExceptionRememberingThread(target=quick_func) 135 task_queue.drone_threads[drone1] = quick_worker 136 quick_worker.start() 137 while quick_worker.isAlive(): 138 continue 139 task_queue.wait_on_drones() 140 self.assertTrue(task_queue.drone_threads == {}) 141 142 143 def test_get_results(self): 144 """Test retrieving results from the results queue.""" 145 146 # Insert results for the same drone twice into the results queue 147 # and confirm that an exception is raised. 148 task_queue = thread_lib.ThreadedTaskQueue() 149 drone1 = self.create_remote_drone('fakehostname1') 150 drone2 = self.create_remote_drone('fakehostname2') 151 task_queue.results_queue.put( 152 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return)) 153 task_queue.results_queue.put( 154 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return)) 155 self.god.stub_function(task_queue, 'wait_on_drones') 156 task_queue.wait_on_drones.expect_call() 157 self.assertRaises(drone_task_queue.DroneTaskQueueException, 158 task_queue.get_results) 159 160 # Insert results for different drones and check that they're returned 161 # in a drone results dict. 162 self.assertTrue(task_queue.results_queue.empty()) 163 task_queue.results_queue.put( 164 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return)) 165 task_queue.results_queue.put( 166 thread_lib.ThreadedTaskQueue.result(drone2, self.mock_return)) 167 task_queue.wait_on_drones.expect_call() 168 results = task_queue.get_results() 169 self.assertTrue(results[drone1] == self.mock_return and 170 results[drone2] == self.mock_return) 171 self.god.check_playback() 172 173 174 def test_execute(self): 175 """Test task queue execute.""" 176 drone1 = self.create_remote_drone('fakehostname1') 177 drone2 = self.create_remote_drone('fakehostname2') 178 drone3 = self.create_remote_drone('fakehostname3') 179 180 # Check task queue exception conditions. 181 task_queue = thread_lib.ThreadedTaskQueue() 182 task_queue.results_queue.put(1) 183 self.assertRaises(drone_task_queue.DroneTaskQueueException, 184 task_queue.execute, []) 185 task_queue.results_queue.get() 186 task_queue.drone_threads[drone1] = None 187 self.assertRaises(drone_task_queue.DroneTaskQueueException, 188 task_queue.execute, []) 189 task_queue.drone_threads = {} 190 191 # Queue 2 calls against each drone, and confirm that the host's 192 # run method is called 3 times. Then check the threads created, 193 # and finally compare results returned by the task queue against 194 # the mock results. 195 drones = [drone1, drone2, drone3] 196 for drone in drones: 197 drone.queue_call('foo') 198 drone.queue_call('bar') 199 mock_result = utils.CmdResult( 200 stdout=cPickle.dumps(self.mock_return)) 201 self._mock_host.run.expect_call( 202 'python %s' % self.drone_utility_path, 203 stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None, 204 connect_timeout=mock.is_instance_comparator(int) 205 ).and_return(mock_result) 206 task_queue.execute(drones, wait=False) 207 self.assertTrue(set(task_queue.drone_threads.keys()) == set(drones)) 208 for drone, thread in task_queue.drone_threads.iteritems(): 209 self.assertTrue(drone.hostname in thread.getName()) 210 self.assertTrue(thread.isDaemon()) 211 self.assertRaises(RuntimeError, thread.start) 212 results = task_queue.get_results() 213 for drone, result in results.iteritems(): 214 self.assertTrue(result == self.mock_return['results']) 215 216 # Test synchronous execute 217 drone1.queue_call('foo') 218 mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return)) 219 self._mock_host.run.expect_call( 220 'python %s' % self.drone_utility_path, 221 stdin=cPickle.dumps(drone1.get_calls()), stdout_tee=None, 222 connect_timeout=mock.is_instance_comparator(int)).and_return( 223 mock_result) 224 self.assertTrue(task_queue.execute(drones, wait=True)[drone1] == 225 self.mock_return['results']) 226 self.god.check_playback() 227 228 229 if __name__ == '__main__': 230 unittest.main() 231