Home | History | Annotate | Download | only in base
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Dispatches tests, either sharding or replicating them.
      6 
      7 Performs the following steps:
      8 * Create a test collection factory, using the given tests
      9   - If sharding: test collection factory returns the same shared test collection
     10     to all test runners
     11   - If replciating: test collection factory returns a unique test collection to
     12     each test runner, with the same set of tests in each.
     13 * Create a test runner for each device.
     14 * Run each test runner in its own thread, grabbing tests from the test
     15   collection until there are no tests left.
     16 """
     17 
     18 import logging
     19 import threading
     20 
     21 from pylib import android_commands
     22 from pylib import constants
     23 from pylib.base import base_test_result
     24 from pylib.device import device_errors
     25 from pylib.utils import reraiser_thread
     26 from pylib.utils import watchdog_timer
     27 
     28 
     29 DEFAULT_TIMEOUT = 7 * 60  # seven minutes
     30 
     31 
     32 class _ThreadSafeCounter(object):
     33   """A threadsafe counter."""
     34 
     35   def __init__(self):
     36     self._lock = threading.Lock()
     37     self._value = 0
     38 
     39   def GetAndIncrement(self):
     40     """Get the current value and increment it atomically.
     41 
     42     Returns:
     43       The value before incrementing.
     44     """
     45     with self._lock:
     46       pre_increment = self._value
     47       self._value += 1
     48       return pre_increment
     49 
     50 
     51 class _Test(object):
     52   """Holds a test with additional metadata."""
     53 
     54   def __init__(self, test, tries=0):
     55     """Initializes the _Test object.
     56 
     57     Args:
     58       test: The test.
     59       tries: Number of tries so far.
     60     """
     61     self.test = test
     62     self.tries = tries
     63 
     64 
     65 class _TestCollection(object):
     66   """A threadsafe collection of tests.
     67 
     68   Args:
     69     tests: List of tests to put in the collection.
     70   """
     71 
     72   def __init__(self, tests=None):
     73     if not tests:
     74       tests = []
     75     self._lock = threading.Lock()
     76     self._tests = []
     77     self._tests_in_progress = 0
     78     # Used to signal that an item is available or all items have been handled.
     79     self._item_available_or_all_done = threading.Event()
     80     for t in tests:
     81       self.add(t)
     82 
     83   def _pop(self):
     84     """Pop a test from the collection.
     85 
     86     Waits until a test is available or all tests have been handled.
     87 
     88     Returns:
     89       A test or None if all tests have been handled.
     90     """
     91     while True:
     92       # Wait for a test to be available or all tests to have been handled.
     93       self._item_available_or_all_done.wait()
     94       with self._lock:
     95         # Check which of the two conditions triggered the signal.
     96         if self._tests_in_progress == 0:
     97           return None
     98         try:
     99           return self._tests.pop(0)
    100         except IndexError:
    101           # Another thread beat us to the available test, wait again.
    102           self._item_available_or_all_done.clear()
    103 
    104   def add(self, test):
    105     """Add an test to the collection.
    106 
    107     Args:
    108       test: A test to add.
    109     """
    110     with self._lock:
    111       self._tests.append(test)
    112       self._item_available_or_all_done.set()
    113       self._tests_in_progress += 1
    114 
    115   def test_completed(self):
    116     """Indicate that a test has been fully handled."""
    117     with self._lock:
    118       self._tests_in_progress -= 1
    119       if self._tests_in_progress == 0:
    120         # All tests have been handled, signal all waiting threads.
    121         self._item_available_or_all_done.set()
    122 
    123   def __iter__(self):
    124     """Iterate through tests in the collection until all have been handled."""
    125     while True:
    126       r = self._pop()
    127       if r is None:
    128         break
    129       yield r
    130 
    131   def __len__(self):
    132     """Return the number of tests currently in the collection."""
    133     return len(self._tests)
    134 
    135   def test_names(self):
    136     """Return a list of the names of the tests currently in the collection."""
    137     with self._lock:
    138       return list(t.test for t in self._tests)
    139 
    140 
    141 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
    142                        num_retries, tag_results_with_device=False):
    143   """Runs tests from the test_collection until empty using the given runner.
    144 
    145   Adds TestRunResults objects to the out_results list and may add tests to the
    146   out_retry list.
    147 
    148   Args:
    149     runner: A TestRunner object used to run the tests.
    150     test_collection: A _TestCollection from which to get _Test objects to run.
    151     out_results: A list to add TestRunResults to.
    152     watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
    153     num_retries: Number of retries for a test.
    154     tag_results_with_device: If True, appends the name of the device on which
    155         the test was run to the test name. Used when replicating to identify
    156         which device ran each copy of the test, and to ensure each copy of the
    157         test is recorded separately.
    158   """
    159 
    160   def TagTestRunResults(test_run_results):
    161     """Tags all results with the last 4 digits of the device id.
    162 
    163     Used when replicating tests to distinguish the same tests run on different
    164     devices. We use a set to store test results, so the hash (generated from
    165     name and tag) must be unique to be considered different results.
    166     """
    167     new_test_run_results = base_test_result.TestRunResults()
    168     for test_result in test_run_results.GetAll():
    169       test_result.SetName('%s_%s' % (runner.device_serial[-4:],
    170                                      test_result.GetName()))
    171       new_test_run_results.AddResult(test_result)
    172     return new_test_run_results
    173 
    174   for test in test_collection:
    175     watcher.Reset()
    176     try:
    177       if runner.device_serial not in android_commands.GetAttachedDevices():
    178         # Device is unresponsive, stop handling tests on this device.
    179         msg = 'Device %s is unresponsive.' % runner.device_serial
    180         logging.warning(msg)
    181         raise device_errors.DeviceUnreachableError(msg)
    182       result, retry = runner.RunTest(test.test)
    183       if tag_results_with_device:
    184         result = TagTestRunResults(result)
    185       test.tries += 1
    186       if retry and test.tries <= num_retries:
    187         # Retry non-passing results, only record passing results.
    188         pass_results = base_test_result.TestRunResults()
    189         pass_results.AddResults(result.GetPass())
    190         out_results.append(pass_results)
    191         logging.warning('Will retry test, try #%s.' % test.tries)
    192         test_collection.add(_Test(test=retry, tries=test.tries))
    193       else:
    194         # All tests passed or retry limit reached. Either way, record results.
    195         out_results.append(result)
    196     except:
    197       # An unhandleable exception, ensure tests get run by another device and
    198       # reraise this exception on the main thread.
    199       test_collection.add(test)
    200       raise
    201     finally:
    202       # Retries count as separate tasks so always mark the popped test as done.
    203       test_collection.test_completed()
    204 
    205 
    206 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
    207   """Creates a test runner for each device and calls SetUp() in parallel.
    208 
    209   Note: if a device is unresponsive the corresponding TestRunner will not be
    210     added to out_runners.
    211 
    212   Args:
    213     runner_factory: Callable that takes a device and index and returns a
    214       TestRunner object.
    215     device: The device serial number to set up.
    216     out_runners: List to add the successfully set up TestRunner object.
    217     threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
    218   """
    219   try:
    220     index = threadsafe_counter.GetAndIncrement()
    221     logging.warning('Creating shard %s for device %s.', index, device)
    222     runner = runner_factory(device, index)
    223     runner.SetUp()
    224     out_runners.append(runner)
    225   except (device_errors.DeviceUnreachableError,
    226           # TODO(jbudorick) Remove this once the underlying implementations
    227           #                 for the above are switched or wrapped.
    228           android_commands.errors.DeviceUnresponsiveError) as e:
    229     logging.warning('Failed to create shard for %s: [%s]', device, e)
    230 
    231 
    232 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
    233                  tag_results_with_device=False):
    234   """Run all tests using the given TestRunners.
    235 
    236   Args:
    237     runners: A list of TestRunner objects.
    238     test_collection_factory: A callable to generate a _TestCollection object for
    239         each test runner.
    240     num_retries: Number of retries for a test.
    241     timeout: Watchdog timeout in seconds.
    242     tag_results_with_device: If True, appends the name of the device on which
    243         the test was run to the test name. Used when replicating to identify
    244         which device ran each copy of the test, and to ensure each copy of the
    245         test is recorded separately.
    246 
    247   Returns:
    248     A tuple of (TestRunResults object, exit code)
    249   """
    250   logging.warning('Running tests with %s test runners.' % (len(runners)))
    251   results = []
    252   exit_code = 0
    253   run_results = base_test_result.TestRunResults()
    254   watcher = watchdog_timer.WatchdogTimer(timeout)
    255   test_collections = [test_collection_factory() for _ in runners]
    256 
    257   threads = [
    258       reraiser_thread.ReraiserThread(
    259           _RunTestsFromQueue,
    260           [r, tc, results, watcher, num_retries, tag_results_with_device],
    261           name=r.device_serial[-4:])
    262       for r, tc in zip(runners, test_collections)]
    263 
    264   workers = reraiser_thread.ReraiserThreadGroup(threads)
    265   workers.StartAll()
    266 
    267   # Catch DeviceUnreachableErrors and set a warning exit code
    268   try:
    269     workers.JoinAll(watcher)
    270   except (device_errors.DeviceUnreachableError,
    271           # TODO(jbudorick) Remove this once the underlying implementations
    272           #                 for the above are switched or wrapped.
    273           android_commands.errors.DeviceUnresponsiveError) as e:
    274     logging.error(e)
    275     exit_code = constants.WARNING_EXIT_CODE
    276 
    277   if not all((len(tc) == 0 for tc in test_collections)):
    278     logging.error('Only ran %d tests (all devices are likely offline).' %
    279                   len(results))
    280     for tc in test_collections:
    281       run_results.AddResults(base_test_result.BaseTestResult(
    282           t, base_test_result.ResultType.UNKNOWN) for t in tc.test_names())
    283 
    284   for r in results:
    285     run_results.AddTestRunResults(r)
    286   if not run_results.DidRunPass():
    287     exit_code = constants.ERROR_EXIT_CODE
    288   return (run_results, exit_code)
    289 
    290 
    291 def _CreateRunners(runner_factory, devices, timeout=None):
    292   """Creates a test runner for each device and calls SetUp() in parallel.
    293 
    294   Note: if a device is unresponsive the corresponding TestRunner will not be
    295     included in the returned list.
    296 
    297   Args:
    298     runner_factory: Callable that takes a device and index and returns a
    299       TestRunner object.
    300     devices: List of device serial numbers as strings.
    301     timeout: Watchdog timeout in seconds, defaults to the default timeout.
    302 
    303   Returns:
    304     A list of TestRunner objects.
    305   """
    306   logging.warning('Creating %s test runners.' % len(devices))
    307   runners = []
    308   counter = _ThreadSafeCounter()
    309   threads = reraiser_thread.ReraiserThreadGroup(
    310       [reraiser_thread.ReraiserThread(_SetUp,
    311                                       [runner_factory, d, runners, counter],
    312                                       name=d[-4:])
    313        for d in devices])
    314   threads.StartAll()
    315   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
    316   return runners
    317 
    318 
    319 def _TearDownRunners(runners, timeout=None):
    320   """Calls TearDown() for each test runner in parallel.
    321 
    322   Args:
    323     runners: A list of TestRunner objects.
    324     timeout: Watchdog timeout in seconds, defaults to the default timeout.
    325   """
    326   threads = reraiser_thread.ReraiserThreadGroup(
    327       [reraiser_thread.ReraiserThread(r.TearDown, name=r.device_serial[-4:])
    328        for r in runners])
    329   threads.StartAll()
    330   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
    331 
    332 
    333 def ApplyMaxPerRun(tests, max_per_run):
    334   """Rearrange the tests so that no group contains more than max_per_run tests.
    335 
    336   Args:
    337     tests:
    338     max_per_run:
    339 
    340   Returns:
    341     A list of tests with no more than max_per_run per run.
    342   """
    343   tests_expanded = []
    344   for test_group in tests:
    345     if type(test_group) != str:
    346       # Do not split test objects which are not strings.
    347       tests_expanded.append(test_group)
    348     else:
    349       test_split = test_group.split(':')
    350       for i in range(0, len(test_split), max_per_run):
    351         tests_expanded.append(':'.join(test_split[i:i+max_per_run]))
    352   return tests_expanded
    353 
    354 
    355 def RunTests(tests, runner_factory, devices, shard=True,
    356              test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
    357              num_retries=2, max_per_run=256):
    358   """Run all tests on attached devices, retrying tests that don't pass.
    359 
    360   Args:
    361     tests: List of tests to run.
    362     runner_factory: Callable that takes a device and index and returns a
    363         TestRunner object.
    364     devices: List of attached devices.
    365     shard: True if we should shard, False if we should replicate tests.
    366       - Sharding tests will distribute tests across all test runners through a
    367         shared test collection.
    368       - Replicating tests will copy all tests to each test runner through a
    369         unique test collection for each test runner.
    370     test_timeout: Watchdog timeout in seconds for running tests.
    371     setup_timeout: Watchdog timeout in seconds for creating and cleaning up
    372         test runners.
    373     num_retries: Number of retries for a test.
    374     max_per_run: Maximum number of tests to run in any group.
    375 
    376   Returns:
    377     A tuple of (base_test_result.TestRunResults object, exit code).
    378   """
    379   if not tests:
    380     logging.critical('No tests to run.')
    381     return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
    382 
    383   tests_expanded = ApplyMaxPerRun(tests, max_per_run)
    384   if shard:
    385     # Generate a shared _TestCollection object for all test runners, so they
    386     # draw from a common pool of tests.
    387     shared_test_collection = _TestCollection([_Test(t) for t in tests_expanded])
    388     test_collection_factory = lambda: shared_test_collection
    389     tag_results_with_device = False
    390     log_string = 'sharded across devices'
    391   else:
    392     # Generate a unique _TestCollection object for each test runner, but use
    393     # the same set of tests.
    394     test_collection_factory = lambda: _TestCollection(
    395         [_Test(t) for t in tests_expanded])
    396     tag_results_with_device = True
    397     log_string = 'replicated on each device'
    398 
    399   logging.info('Will run %d tests (%s): %s',
    400                len(tests_expanded), log_string, str(tests_expanded))
    401   runners = _CreateRunners(runner_factory, devices, setup_timeout)
    402   try:
    403     return _RunAllTests(runners, test_collection_factory,
    404                         num_retries, test_timeout, tag_results_with_device)
    405   finally:
    406     try:
    407       _TearDownRunners(runners, setup_timeout)
    408     except (device_errors.DeviceUnreachableError,
    409             # TODO(jbudorick) Remove this once the underlying implementations
    410             #                 for the above are switched or wrapped.
    411             android_commands.errors.DeviceUnresponsiveError) as e:
    412       logging.warning('Device unresponsive during TearDown: [%s]', e)
    413     except Exception as e:
    414       logging.error('Unexpected exception caught during TearDown: %s' % str(e))
    415