Home | History | Annotate | Download | only in base
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Dispatches tests, either sharding or replicating them.
      6 
      7 To dispatch, performs the following steps:
      8 * Create a test collection factory, using the given tests
      9   - If sharding: test collection factory returns the same shared test collection
     10     to all test runners
     11   - If replciating: test collection factory returns a unique test collection to
     12     each test runner, with the same set of tests in each.
     13 * Get the list of devices to run on
     14 * Create test runners
     15 * Run each test runner in its own thread, pulling tests from the test collection
     16   generated from the test collection factory until there are no tests left.
     17 """
     18 
     19 import logging
     20 import threading
     21 
     22 from pylib import android_commands
     23 from pylib import constants
     24 from pylib.utils import reraiser_thread
     25 from pylib.utils import watchdog_timer
     26 
     27 import base_test_result
     28 
     29 
     30 DEFAULT_TIMEOUT = 7 * 60  # seven minutes
     31 
     32 
     33 class _ThreadSafeCounter(object):
     34   """A threadsafe counter."""
     35 
     36   def __init__(self):
     37     self._lock = threading.Lock()
     38     self._value = 0
     39 
     40   def GetAndIncrement(self):
     41     """Get the current value and increment it atomically.
     42 
     43     Returns:
     44       The value before incrementing.
     45     """
     46     with self._lock:
     47       pre_increment = self._value
     48       self._value += 1
     49       return pre_increment
     50 
     51 
     52 class _Test(object):
     53   """Holds a test with additional metadata."""
     54 
     55   def __init__(self, test, tries=0):
     56     """Initializes the _Test object.
     57 
     58     Args:
     59       test: The test.
     60       tries: Number of tries so far.
     61     """
     62     self.test = test
     63     self.tries = tries
     64 
     65 
     66 class _TestCollection(object):
     67   """A threadsafe collection of tests.
     68 
     69   Args:
     70     tests: List of tests to put in the collection.
     71   """
     72 
     73   def __init__(self, tests=[]):
     74     self._lock = threading.Lock()
     75     self._tests = []
     76     self._tests_in_progress = 0
     77     # Used to signal that an item is avaliable or all items have been handled.
     78     self._item_avaliable_or_all_done = threading.Event()
     79     for t in tests:
     80       self.add(t)
     81 
     82   def _pop(self):
     83     """Pop a test from the collection.
     84 
     85     Waits until a test is avaliable or all tests have been handled.
     86 
     87     Returns:
     88       A test or None if all tests have been handled.
     89     """
     90     while True:
     91       # Wait for a test to be avaliable or all tests to have been handled.
     92       self._item_avaliable_or_all_done.wait()
     93       with self._lock:
     94         # Check which of the two conditions triggered the signal.
     95         if self._tests_in_progress == 0:
     96           return None
     97         try:
     98           return self._tests.pop(0)
     99         except IndexError:
    100           # Another thread beat us to the avaliable test, wait again.
    101           self._item_avaliable_or_all_done.clear()
    102 
    103   def add(self, test):
    104     """Add an test to the collection.
    105 
    106     Args:
    107       test: A test to add.
    108     """
    109     with self._lock:
    110       self._tests.append(test)
    111       self._item_avaliable_or_all_done.set()
    112       self._tests_in_progress += 1
    113 
    114   def test_completed(self):
    115     """Indicate that a test has been fully handled."""
    116     with self._lock:
    117       self._tests_in_progress -= 1
    118       if self._tests_in_progress == 0:
    119         # All tests have been handled, signal all waiting threads.
    120         self._item_avaliable_or_all_done.set()
    121 
    122   def __iter__(self):
    123     """Iterate through tests in the collection until all have been handled."""
    124     while True:
    125       r = self._pop()
    126       if r is None:
    127         break
    128       yield r
    129 
    130 
    131 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
    132                        num_retries, tag_results_with_device=False):
    133   """Runs tests from the test_collection until empty using the given runner.
    134 
    135   Adds TestRunResults objects to the out_results list and may add tests to the
    136   out_retry list.
    137 
    138   Args:
    139     runner: A TestRunner object used to run the tests.
    140     test_collection: A _TestCollection from which to get _Test objects to run.
    141     out_results: A list to add TestRunResults to.
    142     watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
    143     num_retries: Number of retries for a test.
    144     tag_results_with_device: If True, appends the name of the device on which
    145         the test was run to the test name. Used when replicating to identify
    146         which device ran each copy of the test, and to ensure each copy of the
    147         test is recorded separately.
    148   """
    149 
    150   def TagTestRunResults(test_run_results):
    151     """Tags all results with the last 4 digits of the device id.
    152 
    153     Used when replicating tests to distinguish the same tests run on different
    154     devices. We use a set to store test results, so the hash (generated from
    155     name and tag) must be unique to be considered different results.
    156     """
    157     new_test_run_results = base_test_result.TestRunResults()
    158     for test_result in test_run_results.GetAll():
    159       test_result.SetName('%s_%s' % (runner.device[-4:], test_result.GetName()))
    160       new_test_run_results.AddResult(test_result)
    161     return new_test_run_results
    162 
    163   for test in test_collection:
    164     watcher.Reset()
    165     try:
    166       if not android_commands.IsDeviceAttached(runner.device):
    167         # Device is unresponsive, stop handling tests on this device.
    168         msg = 'Device %s is unresponsive.' % runner.device
    169         logging.warning(msg)
    170         raise android_commands.errors.DeviceUnresponsiveError(msg)
    171       result, retry = runner.RunTest(test.test)
    172       if tag_results_with_device:
    173         result = TagTestRunResults(result)
    174       test.tries += 1
    175       if retry and test.tries <= num_retries:
    176         # Retry non-passing results, only record passing results.
    177         pass_results = base_test_result.TestRunResults()
    178         pass_results.AddResults(result.GetPass())
    179         out_results.append(pass_results)
    180         logging.warning('Will retry test, try #%s.' % test.tries)
    181         test_collection.add(_Test(test=retry, tries=test.tries))
    182       else:
    183         # All tests passed or retry limit reached. Either way, record results.
    184         out_results.append(result)
    185     except:
    186       # An unhandleable exception, ensure tests get run by another device and
    187       # reraise this exception on the main thread.
    188       test_collection.add(test)
    189       raise
    190     finally:
    191       # Retries count as separate tasks so always mark the popped test as done.
    192       test_collection.test_completed()
    193 
    194 
    195 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
    196   """Creates a test runner for each device and calls SetUp() in parallel.
    197 
    198   Note: if a device is unresponsive the corresponding TestRunner will not be
    199     added to out_runners.
    200 
    201   Args:
    202     runner_factory: Callable that takes a device and index and returns a
    203       TestRunner object.
    204     device: The device serial number to set up.
    205     out_runners: List to add the successfully set up TestRunner object.
    206     threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
    207   """
    208   try:
    209     index = threadsafe_counter.GetAndIncrement()
    210     logging.warning('Creating shard %s for device %s.', index, device)
    211     runner = runner_factory(device, index)
    212     runner.SetUp()
    213     out_runners.append(runner)
    214   except android_commands.errors.DeviceUnresponsiveError as e:
    215     logging.warning('Failed to create shard for %s: [%s]', device, e)
    216 
    217 
    218 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
    219                  tag_results_with_device=False):
    220   """Run all tests using the given TestRunners.
    221 
    222   Args:
    223     runners: A list of TestRunner objects.
    224     test_collection_factory: A callable to generate a _TestCollection object for
    225         each test runner.
    226     num_retries: Number of retries for a test.
    227     timeout: Watchdog timeout in seconds.
    228     tag_results_with_device: If True, appends the name of the device on which
    229         the test was run to the test name. Used when replicating to identify
    230         which device ran each copy of the test, and to ensure each copy of the
    231         test is recorded separately.
    232 
    233   Returns:
    234     A tuple of (TestRunResults object, exit code)
    235   """
    236   logging.warning('Running tests with %s test runners.' % (len(runners)))
    237   results = []
    238   exit_code = 0
    239   watcher = watchdog_timer.WatchdogTimer(timeout)
    240 
    241   workers = reraiser_thread.ReraiserThreadGroup(
    242       [reraiser_thread.ReraiserThread(
    243           _RunTestsFromQueue,
    244           [r, test_collection_factory(), results, watcher, num_retries,
    245            tag_results_with_device],
    246           name=r.device[-4:])
    247        for r in runners])
    248   run_results = base_test_result.TestRunResults()
    249   workers.StartAll()
    250 
    251   # Catch DeviceUnresponsiveErrors and set a warning exit code
    252   try:
    253     workers.JoinAll(watcher)
    254   except android_commands.errors.DeviceUnresponsiveError as e:
    255     logging.error(e)
    256     exit_code = constants.WARNING_EXIT_CODE
    257 
    258   for r in results:
    259     run_results.AddTestRunResults(r)
    260   if not run_results.DidRunPass():
    261     exit_code = constants.ERROR_EXIT_CODE
    262   return (run_results, exit_code)
    263 
    264 
    265 def _CreateRunners(runner_factory, devices, timeout=None):
    266   """Creates a test runner for each device and calls SetUp() in parallel.
    267 
    268   Note: if a device is unresponsive the corresponding TestRunner will not be
    269     included in the returned list.
    270 
    271   Args:
    272     runner_factory: Callable that takes a device and index and returns a
    273       TestRunner object.
    274     devices: List of device serial numbers as strings.
    275     timeout: Watchdog timeout in seconds, defaults to the default timeout.
    276 
    277   Returns:
    278     A list of TestRunner objects.
    279   """
    280   logging.warning('Creating %s test runners.' % len(devices))
    281   runners = []
    282   counter = _ThreadSafeCounter()
    283   threads = reraiser_thread.ReraiserThreadGroup(
    284       [reraiser_thread.ReraiserThread(_SetUp,
    285                                       [runner_factory, d, runners, counter],
    286                                       name=d[-4:])
    287        for d in devices])
    288   threads.StartAll()
    289   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
    290   return runners
    291 
    292 
    293 def _TearDownRunners(runners, timeout=None):
    294   """Calls TearDown() for each test runner in parallel.
    295 
    296   Args:
    297     runners: A list of TestRunner objects.
    298     timeout: Watchdog timeout in seconds, defaults to the default timeout.
    299   """
    300   threads = reraiser_thread.ReraiserThreadGroup(
    301       [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
    302        for r in runners])
    303   threads.StartAll()
    304   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
    305 
    306 
    307 
    308 def _GetAttachedDevices(wait_for_debugger=False, test_device=None):
    309   """Get all attached devices.
    310 
    311   If we are using a debugger, limit to only one device.
    312 
    313   Args:
    314     wait_for_debugger: True if this run will use a debugger.
    315     test_device: Name of a specific device to use.
    316 
    317   Returns:
    318     A list of attached devices.
    319   """
    320   attached_devices = []
    321 
    322   attached_devices = android_commands.GetAttachedDevices()
    323   if test_device:
    324     assert test_device in attached_devices, (
    325         'Did not find device %s among attached device. Attached devices: %s'
    326         % (test_device, ', '.join(attached_devices)))
    327     attached_devices = [test_device]
    328 
    329   if len(attached_devices) > 1 and wait_for_debugger:
    330     logging.warning('Debugger can not be sharded, using first available device')
    331     attached_devices = attached_devices[:1]
    332 
    333   return attached_devices
    334 
    335 
    336 def RunTests(tests, runner_factory, wait_for_debugger, test_device,
    337              shard=True,
    338              build_type='Debug',
    339              test_timeout=DEFAULT_TIMEOUT,
    340              setup_timeout=DEFAULT_TIMEOUT,
    341              num_retries=2):
    342   """Run all tests on attached devices, retrying tests that don't pass.
    343 
    344   Args:
    345     tests: List of tests to run.
    346     runner_factory: Callable that takes a device and index and returns a
    347         TestRunner object.
    348     wait_for_debugger: True if this test is using a debugger.
    349     test_device: A specific device to run tests on, or None.
    350     shard: True if we should shard, False if we should replicate tests.
    351       - Sharding tests will distribute tests across all test runners through a
    352         shared test collection.
    353       - Replicating tests will copy all tests to each test runner through a
    354         unique test collection for each test runner.
    355     build_type: Either 'Debug' or 'Release'.
    356     test_timeout: Watchdog timeout in seconds for running tests.
    357     setup_timeout: Watchdog timeout in seconds for creating and cleaning up
    358         test runners.
    359     num_retries: Number of retries for a test.
    360 
    361   Returns:
    362     A tuple of (base_test_result.TestRunResults object, exit code).
    363   """
    364   if not tests:
    365     logging.error('No tests to run.')
    366     return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
    367 
    368   if shard:
    369     # Generate a shared _TestCollection object for all test runners, so they
    370     # draw from a common pool of tests.
    371     shared_test_collection = _TestCollection([_Test(t) for t in tests])
    372     test_collection_factory = lambda: shared_test_collection
    373     tag_results_with_device = False
    374     log_string = 'sharded across devices'
    375   else:
    376     # Generate a unique _TestCollection object for each test runner, but use
    377     # the same set of tests.
    378     test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
    379     tag_results_with_device = True
    380     log_string = 'replicated on each device'
    381 
    382   devices = _GetAttachedDevices(wait_for_debugger, test_device)
    383 
    384   logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests))
    385   runners = _CreateRunners(runner_factory, devices, setup_timeout)
    386   try:
    387     return _RunAllTests(runners, test_collection_factory,
    388                         num_retries, test_timeout, tag_results_with_device)
    389   finally:
    390     try:
    391       _TearDownRunners(runners, setup_timeout)
    392     except android_commands.errors.DeviceUnresponsiveError as e:
    393       logging.warning('Device unresponsive during TearDown: [%s]', e)
    394