1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Dispatches tests, either sharding or replicating them. 6 7 To dispatch, performs the following steps: 8 * Create a test collection factory, using the given tests 9 - If sharding: test collection factory returns the same shared test collection 10 to all test runners 11 - If replciating: test collection factory returns a unique test collection to 12 each test runner, with the same set of tests in each. 13 * Get the list of devices to run on 14 * Create test runners 15 * Run each test runner in its own thread, pulling tests from the test collection 16 generated from the test collection factory until there are no tests left. 17 """ 18 19 import logging 20 import threading 21 22 from pylib import android_commands 23 from pylib import constants 24 from pylib.utils import reraiser_thread 25 from pylib.utils import watchdog_timer 26 27 import base_test_result 28 29 30 DEFAULT_TIMEOUT = 7 * 60 # seven minutes 31 32 33 class _ThreadSafeCounter(object): 34 """A threadsafe counter.""" 35 36 def __init__(self): 37 self._lock = threading.Lock() 38 self._value = 0 39 40 def GetAndIncrement(self): 41 """Get the current value and increment it atomically. 42 43 Returns: 44 The value before incrementing. 45 """ 46 with self._lock: 47 pre_increment = self._value 48 self._value += 1 49 return pre_increment 50 51 52 class _Test(object): 53 """Holds a test with additional metadata.""" 54 55 def __init__(self, test, tries=0): 56 """Initializes the _Test object. 57 58 Args: 59 test: The test. 60 tries: Number of tries so far. 61 """ 62 self.test = test 63 self.tries = tries 64 65 66 class _TestCollection(object): 67 """A threadsafe collection of tests. 68 69 Args: 70 tests: List of tests to put in the collection. 71 """ 72 73 def __init__(self, tests=[]): 74 self._lock = threading.Lock() 75 self._tests = [] 76 self._tests_in_progress = 0 77 # Used to signal that an item is avaliable or all items have been handled. 78 self._item_avaliable_or_all_done = threading.Event() 79 for t in tests: 80 self.add(t) 81 82 def _pop(self): 83 """Pop a test from the collection. 84 85 Waits until a test is avaliable or all tests have been handled. 86 87 Returns: 88 A test or None if all tests have been handled. 89 """ 90 while True: 91 # Wait for a test to be avaliable or all tests to have been handled. 92 self._item_avaliable_or_all_done.wait() 93 with self._lock: 94 # Check which of the two conditions triggered the signal. 95 if self._tests_in_progress == 0: 96 return None 97 try: 98 return self._tests.pop(0) 99 except IndexError: 100 # Another thread beat us to the avaliable test, wait again. 101 self._item_avaliable_or_all_done.clear() 102 103 def add(self, test): 104 """Add an test to the collection. 105 106 Args: 107 test: A test to add. 108 """ 109 with self._lock: 110 self._tests.append(test) 111 self._item_avaliable_or_all_done.set() 112 self._tests_in_progress += 1 113 114 def test_completed(self): 115 """Indicate that a test has been fully handled.""" 116 with self._lock: 117 self._tests_in_progress -= 1 118 if self._tests_in_progress == 0: 119 # All tests have been handled, signal all waiting threads. 120 self._item_avaliable_or_all_done.set() 121 122 def __iter__(self): 123 """Iterate through tests in the collection until all have been handled.""" 124 while True: 125 r = self._pop() 126 if r is None: 127 break 128 yield r 129 130 131 def _RunTestsFromQueue(runner, test_collection, out_results, watcher, 132 num_retries, tag_results_with_device=False): 133 """Runs tests from the test_collection until empty using the given runner. 134 135 Adds TestRunResults objects to the out_results list and may add tests to the 136 out_retry list. 137 138 Args: 139 runner: A TestRunner object used to run the tests. 140 test_collection: A _TestCollection from which to get _Test objects to run. 141 out_results: A list to add TestRunResults to. 142 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout. 143 num_retries: Number of retries for a test. 144 tag_results_with_device: If True, appends the name of the device on which 145 the test was run to the test name. Used when replicating to identify 146 which device ran each copy of the test, and to ensure each copy of the 147 test is recorded separately. 148 """ 149 150 def TagTestRunResults(test_run_results): 151 """Tags all results with the last 4 digits of the device id. 152 153 Used when replicating tests to distinguish the same tests run on different 154 devices. We use a set to store test results, so the hash (generated from 155 name and tag) must be unique to be considered different results. 156 """ 157 new_test_run_results = base_test_result.TestRunResults() 158 for test_result in test_run_results.GetAll(): 159 test_result.SetName('%s_%s' % (runner.device[-4:], test_result.GetName())) 160 new_test_run_results.AddResult(test_result) 161 return new_test_run_results 162 163 for test in test_collection: 164 watcher.Reset() 165 try: 166 if not android_commands.IsDeviceAttached(runner.device): 167 # Device is unresponsive, stop handling tests on this device. 168 msg = 'Device %s is unresponsive.' % runner.device 169 logging.warning(msg) 170 raise android_commands.errors.DeviceUnresponsiveError(msg) 171 result, retry = runner.RunTest(test.test) 172 if tag_results_with_device: 173 result = TagTestRunResults(result) 174 test.tries += 1 175 if retry and test.tries <= num_retries: 176 # Retry non-passing results, only record passing results. 177 pass_results = base_test_result.TestRunResults() 178 pass_results.AddResults(result.GetPass()) 179 out_results.append(pass_results) 180 logging.warning('Will retry test, try #%s.' % test.tries) 181 test_collection.add(_Test(test=retry, tries=test.tries)) 182 else: 183 # All tests passed or retry limit reached. Either way, record results. 184 out_results.append(result) 185 except: 186 # An unhandleable exception, ensure tests get run by another device and 187 # reraise this exception on the main thread. 188 test_collection.add(test) 189 raise 190 finally: 191 # Retries count as separate tasks so always mark the popped test as done. 192 test_collection.test_completed() 193 194 195 def _SetUp(runner_factory, device, out_runners, threadsafe_counter): 196 """Creates a test runner for each device and calls SetUp() in parallel. 197 198 Note: if a device is unresponsive the corresponding TestRunner will not be 199 added to out_runners. 200 201 Args: 202 runner_factory: Callable that takes a device and index and returns a 203 TestRunner object. 204 device: The device serial number to set up. 205 out_runners: List to add the successfully set up TestRunner object. 206 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices. 207 """ 208 try: 209 index = threadsafe_counter.GetAndIncrement() 210 logging.warning('Creating shard %s for device %s.', index, device) 211 runner = runner_factory(device, index) 212 runner.SetUp() 213 out_runners.append(runner) 214 except android_commands.errors.DeviceUnresponsiveError as e: 215 logging.warning('Failed to create shard for %s: [%s]', device, e) 216 217 218 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None, 219 tag_results_with_device=False): 220 """Run all tests using the given TestRunners. 221 222 Args: 223 runners: A list of TestRunner objects. 224 test_collection_factory: A callable to generate a _TestCollection object for 225 each test runner. 226 num_retries: Number of retries for a test. 227 timeout: Watchdog timeout in seconds. 228 tag_results_with_device: If True, appends the name of the device on which 229 the test was run to the test name. Used when replicating to identify 230 which device ran each copy of the test, and to ensure each copy of the 231 test is recorded separately. 232 233 Returns: 234 A tuple of (TestRunResults object, exit code) 235 """ 236 logging.warning('Running tests with %s test runners.' % (len(runners))) 237 results = [] 238 exit_code = 0 239 watcher = watchdog_timer.WatchdogTimer(timeout) 240 241 workers = reraiser_thread.ReraiserThreadGroup( 242 [reraiser_thread.ReraiserThread( 243 _RunTestsFromQueue, 244 [r, test_collection_factory(), results, watcher, num_retries, 245 tag_results_with_device], 246 name=r.device[-4:]) 247 for r in runners]) 248 run_results = base_test_result.TestRunResults() 249 workers.StartAll() 250 251 # Catch DeviceUnresponsiveErrors and set a warning exit code 252 try: 253 workers.JoinAll(watcher) 254 except android_commands.errors.DeviceUnresponsiveError as e: 255 logging.error(e) 256 exit_code = constants.WARNING_EXIT_CODE 257 258 for r in results: 259 run_results.AddTestRunResults(r) 260 if not run_results.DidRunPass(): 261 exit_code = constants.ERROR_EXIT_CODE 262 return (run_results, exit_code) 263 264 265 def _CreateRunners(runner_factory, devices, timeout=None): 266 """Creates a test runner for each device and calls SetUp() in parallel. 267 268 Note: if a device is unresponsive the corresponding TestRunner will not be 269 included in the returned list. 270 271 Args: 272 runner_factory: Callable that takes a device and index and returns a 273 TestRunner object. 274 devices: List of device serial numbers as strings. 275 timeout: Watchdog timeout in seconds, defaults to the default timeout. 276 277 Returns: 278 A list of TestRunner objects. 279 """ 280 logging.warning('Creating %s test runners.' % len(devices)) 281 runners = [] 282 counter = _ThreadSafeCounter() 283 threads = reraiser_thread.ReraiserThreadGroup( 284 [reraiser_thread.ReraiserThread(_SetUp, 285 [runner_factory, d, runners, counter], 286 name=d[-4:]) 287 for d in devices]) 288 threads.StartAll() 289 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 290 return runners 291 292 293 def _TearDownRunners(runners, timeout=None): 294 """Calls TearDown() for each test runner in parallel. 295 296 Args: 297 runners: A list of TestRunner objects. 298 timeout: Watchdog timeout in seconds, defaults to the default timeout. 299 """ 300 threads = reraiser_thread.ReraiserThreadGroup( 301 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:]) 302 for r in runners]) 303 threads.StartAll() 304 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 305 306 307 308 def _GetAttachedDevices(wait_for_debugger=False, test_device=None): 309 """Get all attached devices. 310 311 If we are using a debugger, limit to only one device. 312 313 Args: 314 wait_for_debugger: True if this run will use a debugger. 315 test_device: Name of a specific device to use. 316 317 Returns: 318 A list of attached devices. 319 """ 320 attached_devices = [] 321 322 attached_devices = android_commands.GetAttachedDevices() 323 if test_device: 324 assert test_device in attached_devices, ( 325 'Did not find device %s among attached device. Attached devices: %s' 326 % (test_device, ', '.join(attached_devices))) 327 attached_devices = [test_device] 328 329 if len(attached_devices) > 1 and wait_for_debugger: 330 logging.warning('Debugger can not be sharded, using first available device') 331 attached_devices = attached_devices[:1] 332 333 return attached_devices 334 335 336 def RunTests(tests, runner_factory, wait_for_debugger, test_device, 337 shard=True, 338 build_type='Debug', 339 test_timeout=DEFAULT_TIMEOUT, 340 setup_timeout=DEFAULT_TIMEOUT, 341 num_retries=2): 342 """Run all tests on attached devices, retrying tests that don't pass. 343 344 Args: 345 tests: List of tests to run. 346 runner_factory: Callable that takes a device and index and returns a 347 TestRunner object. 348 wait_for_debugger: True if this test is using a debugger. 349 test_device: A specific device to run tests on, or None. 350 shard: True if we should shard, False if we should replicate tests. 351 - Sharding tests will distribute tests across all test runners through a 352 shared test collection. 353 - Replicating tests will copy all tests to each test runner through a 354 unique test collection for each test runner. 355 build_type: Either 'Debug' or 'Release'. 356 test_timeout: Watchdog timeout in seconds for running tests. 357 setup_timeout: Watchdog timeout in seconds for creating and cleaning up 358 test runners. 359 num_retries: Number of retries for a test. 360 361 Returns: 362 A tuple of (base_test_result.TestRunResults object, exit code). 363 """ 364 if not tests: 365 logging.error('No tests to run.') 366 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE) 367 368 if shard: 369 # Generate a shared _TestCollection object for all test runners, so they 370 # draw from a common pool of tests. 371 shared_test_collection = _TestCollection([_Test(t) for t in tests]) 372 test_collection_factory = lambda: shared_test_collection 373 tag_results_with_device = False 374 log_string = 'sharded across devices' 375 else: 376 # Generate a unique _TestCollection object for each test runner, but use 377 # the same set of tests. 378 test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests]) 379 tag_results_with_device = True 380 log_string = 'replicated on each device' 381 382 devices = _GetAttachedDevices(wait_for_debugger, test_device) 383 384 logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests)) 385 runners = _CreateRunners(runner_factory, devices, setup_timeout) 386 try: 387 return _RunAllTests(runners, test_collection_factory, 388 num_retries, test_timeout, tag_results_with_device) 389 finally: 390 try: 391 _TearDownRunners(runners, setup_timeout) 392 except android_commands.errors.DeviceUnresponsiveError as e: 393 logging.warning('Device unresponsive during TearDown: [%s]', e) 394