1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Dispatches tests, either sharding or replicating them. 6 7 Performs the following steps: 8 * Create a test collection factory, using the given tests 9 - If sharding: test collection factory returns the same shared test collection 10 to all test runners 11 - If replciating: test collection factory returns a unique test collection to 12 each test runner, with the same set of tests in each. 13 * Create a test runner for each device. 14 * Run each test runner in its own thread, grabbing tests from the test 15 collection until there are no tests left. 16 """ 17 18 import logging 19 import threading 20 21 from pylib import android_commands 22 from pylib import constants 23 from pylib.utils import reraiser_thread 24 from pylib.utils import watchdog_timer 25 26 import base_test_result 27 28 29 DEFAULT_TIMEOUT = 7 * 60 # seven minutes 30 31 32 class _ThreadSafeCounter(object): 33 """A threadsafe counter.""" 34 35 def __init__(self): 36 self._lock = threading.Lock() 37 self._value = 0 38 39 def GetAndIncrement(self): 40 """Get the current value and increment it atomically. 41 42 Returns: 43 The value before incrementing. 44 """ 45 with self._lock: 46 pre_increment = self._value 47 self._value += 1 48 return pre_increment 49 50 51 class _Test(object): 52 """Holds a test with additional metadata.""" 53 54 def __init__(self, test, tries=0): 55 """Initializes the _Test object. 56 57 Args: 58 test: The test. 59 tries: Number of tries so far. 60 """ 61 self.test = test 62 self.tries = tries 63 64 65 class _TestCollection(object): 66 """A threadsafe collection of tests. 67 68 Args: 69 tests: List of tests to put in the collection. 70 """ 71 72 def __init__(self, tests=[]): 73 self._lock = threading.Lock() 74 self._tests = [] 75 self._tests_in_progress = 0 76 # Used to signal that an item is avaliable or all items have been handled. 77 self._item_avaliable_or_all_done = threading.Event() 78 for t in tests: 79 self.add(t) 80 81 def _pop(self): 82 """Pop a test from the collection. 83 84 Waits until a test is avaliable or all tests have been handled. 85 86 Returns: 87 A test or None if all tests have been handled. 88 """ 89 while True: 90 # Wait for a test to be avaliable or all tests to have been handled. 91 self._item_avaliable_or_all_done.wait() 92 with self._lock: 93 # Check which of the two conditions triggered the signal. 94 if self._tests_in_progress == 0: 95 return None 96 try: 97 return self._tests.pop(0) 98 except IndexError: 99 # Another thread beat us to the avaliable test, wait again. 100 self._item_avaliable_or_all_done.clear() 101 102 def add(self, test): 103 """Add an test to the collection. 104 105 Args: 106 test: A test to add. 107 """ 108 with self._lock: 109 self._tests.append(test) 110 self._item_avaliable_or_all_done.set() 111 self._tests_in_progress += 1 112 113 def test_completed(self): 114 """Indicate that a test has been fully handled.""" 115 with self._lock: 116 self._tests_in_progress -= 1 117 if self._tests_in_progress == 0: 118 # All tests have been handled, signal all waiting threads. 119 self._item_avaliable_or_all_done.set() 120 121 def __iter__(self): 122 """Iterate through tests in the collection until all have been handled.""" 123 while True: 124 r = self._pop() 125 if r is None: 126 break 127 yield r 128 129 def __len__(self): 130 """Return the number of tests currently in the collection.""" 131 return len(self._tests) 132 133 134 def _RunTestsFromQueue(runner, test_collection, out_results, watcher, 135 num_retries, tag_results_with_device=False): 136 """Runs tests from the test_collection until empty using the given runner. 137 138 Adds TestRunResults objects to the out_results list and may add tests to the 139 out_retry list. 140 141 Args: 142 runner: A TestRunner object used to run the tests. 143 test_collection: A _TestCollection from which to get _Test objects to run. 144 out_results: A list to add TestRunResults to. 145 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout. 146 num_retries: Number of retries for a test. 147 tag_results_with_device: If True, appends the name of the device on which 148 the test was run to the test name. Used when replicating to identify 149 which device ran each copy of the test, and to ensure each copy of the 150 test is recorded separately. 151 """ 152 153 def TagTestRunResults(test_run_results): 154 """Tags all results with the last 4 digits of the device id. 155 156 Used when replicating tests to distinguish the same tests run on different 157 devices. We use a set to store test results, so the hash (generated from 158 name and tag) must be unique to be considered different results. 159 """ 160 new_test_run_results = base_test_result.TestRunResults() 161 for test_result in test_run_results.GetAll(): 162 test_result.SetName('%s_%s' % (runner.device[-4:], test_result.GetName())) 163 new_test_run_results.AddResult(test_result) 164 return new_test_run_results 165 166 for test in test_collection: 167 watcher.Reset() 168 try: 169 if not android_commands.IsDeviceAttached(runner.device): 170 # Device is unresponsive, stop handling tests on this device. 171 msg = 'Device %s is unresponsive.' % runner.device 172 logging.warning(msg) 173 raise android_commands.errors.DeviceUnresponsiveError(msg) 174 result, retry = runner.RunTest(test.test) 175 if tag_results_with_device: 176 result = TagTestRunResults(result) 177 test.tries += 1 178 if retry and test.tries <= num_retries: 179 # Retry non-passing results, only record passing results. 180 pass_results = base_test_result.TestRunResults() 181 pass_results.AddResults(result.GetPass()) 182 out_results.append(pass_results) 183 logging.warning('Will retry test, try #%s.' % test.tries) 184 test_collection.add(_Test(test=retry, tries=test.tries)) 185 else: 186 # All tests passed or retry limit reached. Either way, record results. 187 out_results.append(result) 188 except: 189 # An unhandleable exception, ensure tests get run by another device and 190 # reraise this exception on the main thread. 191 test_collection.add(test) 192 raise 193 finally: 194 # Retries count as separate tasks so always mark the popped test as done. 195 test_collection.test_completed() 196 197 198 def _SetUp(runner_factory, device, out_runners, threadsafe_counter): 199 """Creates a test runner for each device and calls SetUp() in parallel. 200 201 Note: if a device is unresponsive the corresponding TestRunner will not be 202 added to out_runners. 203 204 Args: 205 runner_factory: Callable that takes a device and index and returns a 206 TestRunner object. 207 device: The device serial number to set up. 208 out_runners: List to add the successfully set up TestRunner object. 209 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices. 210 """ 211 try: 212 index = threadsafe_counter.GetAndIncrement() 213 logging.warning('Creating shard %s for device %s.', index, device) 214 runner = runner_factory(device, index) 215 runner.SetUp() 216 out_runners.append(runner) 217 except android_commands.errors.DeviceUnresponsiveError as e: 218 logging.warning('Failed to create shard for %s: [%s]', device, e) 219 220 221 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None, 222 tag_results_with_device=False): 223 """Run all tests using the given TestRunners. 224 225 Args: 226 runners: A list of TestRunner objects. 227 test_collection_factory: A callable to generate a _TestCollection object for 228 each test runner. 229 num_retries: Number of retries for a test. 230 timeout: Watchdog timeout in seconds. 231 tag_results_with_device: If True, appends the name of the device on which 232 the test was run to the test name. Used when replicating to identify 233 which device ran each copy of the test, and to ensure each copy of the 234 test is recorded separately. 235 236 Returns: 237 A tuple of (TestRunResults object, exit code) 238 """ 239 logging.warning('Running tests with %s test runners.' % (len(runners))) 240 results = [] 241 exit_code = 0 242 run_results = base_test_result.TestRunResults() 243 watcher = watchdog_timer.WatchdogTimer(timeout) 244 test_collections = [test_collection_factory() for _ in runners] 245 246 threads = [ 247 reraiser_thread.ReraiserThread( 248 _RunTestsFromQueue, 249 [r, tc, results, watcher, num_retries, tag_results_with_device], 250 name=r.device[-4:]) 251 for r, tc in zip(runners, test_collections)] 252 253 workers = reraiser_thread.ReraiserThreadGroup(threads) 254 workers.StartAll() 255 256 # Catch DeviceUnresponsiveErrors and set a warning exit code 257 try: 258 workers.JoinAll(watcher) 259 except android_commands.errors.DeviceUnresponsiveError as e: 260 logging.error(e) 261 exit_code = constants.WARNING_EXIT_CODE 262 263 assert all([len(tc) == 0 for tc in test_collections]), ( 264 'Some tests were not run, all devices are likely offline (ran %d tests)' % 265 len(run_results.GetAll())) 266 267 for r in results: 268 run_results.AddTestRunResults(r) 269 if not run_results.DidRunPass(): 270 exit_code = constants.ERROR_EXIT_CODE 271 return (run_results, exit_code) 272 273 274 def _CreateRunners(runner_factory, devices, timeout=None): 275 """Creates a test runner for each device and calls SetUp() in parallel. 276 277 Note: if a device is unresponsive the corresponding TestRunner will not be 278 included in the returned list. 279 280 Args: 281 runner_factory: Callable that takes a device and index and returns a 282 TestRunner object. 283 devices: List of device serial numbers as strings. 284 timeout: Watchdog timeout in seconds, defaults to the default timeout. 285 286 Returns: 287 A list of TestRunner objects. 288 """ 289 logging.warning('Creating %s test runners.' % len(devices)) 290 runners = [] 291 counter = _ThreadSafeCounter() 292 threads = reraiser_thread.ReraiserThreadGroup( 293 [reraiser_thread.ReraiserThread(_SetUp, 294 [runner_factory, d, runners, counter], 295 name=d[-4:]) 296 for d in devices]) 297 threads.StartAll() 298 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 299 return runners 300 301 302 def _TearDownRunners(runners, timeout=None): 303 """Calls TearDown() for each test runner in parallel. 304 305 Args: 306 runners: A list of TestRunner objects. 307 timeout: Watchdog timeout in seconds, defaults to the default timeout. 308 """ 309 threads = reraiser_thread.ReraiserThreadGroup( 310 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:]) 311 for r in runners]) 312 threads.StartAll() 313 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 314 315 316 def RunTests(tests, runner_factory, devices, shard=True, 317 test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT, 318 num_retries=2): 319 """Run all tests on attached devices, retrying tests that don't pass. 320 321 Args: 322 tests: List of tests to run. 323 runner_factory: Callable that takes a device and index and returns a 324 TestRunner object. 325 devices: List of attached devices. 326 shard: True if we should shard, False if we should replicate tests. 327 - Sharding tests will distribute tests across all test runners through a 328 shared test collection. 329 - Replicating tests will copy all tests to each test runner through a 330 unique test collection for each test runner. 331 test_timeout: Watchdog timeout in seconds for running tests. 332 setup_timeout: Watchdog timeout in seconds for creating and cleaning up 333 test runners. 334 num_retries: Number of retries for a test. 335 336 Returns: 337 A tuple of (base_test_result.TestRunResults object, exit code). 338 """ 339 if not tests: 340 logging.critical('No tests to run.') 341 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE) 342 343 if shard: 344 # Generate a shared _TestCollection object for all test runners, so they 345 # draw from a common pool of tests. 346 shared_test_collection = _TestCollection([_Test(t) for t in tests]) 347 test_collection_factory = lambda: shared_test_collection 348 tag_results_with_device = False 349 log_string = 'sharded across devices' 350 else: 351 # Generate a unique _TestCollection object for each test runner, but use 352 # the same set of tests. 353 test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests]) 354 tag_results_with_device = True 355 log_string = 'replicated on each device' 356 357 logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests)) 358 runners = _CreateRunners(runner_factory, devices, setup_timeout) 359 try: 360 return _RunAllTests(runners, test_collection_factory, 361 num_retries, test_timeout, tag_results_with_device) 362 finally: 363 try: 364 _TearDownRunners(runners, setup_timeout) 365 except android_commands.errors.DeviceUnresponsiveError as e: 366 logging.warning('Device unresponsive during TearDown: [%s]', e) 367