Home | History | Annotate | Download | only in base
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Dispatches tests, either sharding or replicating them.
      6 
      7 Performs the following steps:
      8 * Create a test collection factory, using the given tests
      9   - If sharding: test collection factory returns the same shared test collection
     10     to all test runners
     11   - If replciating: test collection factory returns a unique test collection to
     12     each test runner, with the same set of tests in each.
     13 * Create a test runner for each device.
     14 * Run each test runner in its own thread, grabbing tests from the test
     15   collection until there are no tests left.
     16 """
     17 
     18 import logging
     19 import threading
     20 
     21 from pylib import android_commands
     22 from pylib import constants
     23 from pylib.utils import reraiser_thread
     24 from pylib.utils import watchdog_timer
     25 
     26 import base_test_result
     27 
     28 
     29 DEFAULT_TIMEOUT = 7 * 60  # seven minutes
     30 
     31 
     32 class _ThreadSafeCounter(object):
     33   """A threadsafe counter."""
     34 
     35   def __init__(self):
     36     self._lock = threading.Lock()
     37     self._value = 0
     38 
     39   def GetAndIncrement(self):
     40     """Get the current value and increment it atomically.
     41 
     42     Returns:
     43       The value before incrementing.
     44     """
     45     with self._lock:
     46       pre_increment = self._value
     47       self._value += 1
     48       return pre_increment
     49 
     50 
     51 class _Test(object):
     52   """Holds a test with additional metadata."""
     53 
     54   def __init__(self, test, tries=0):
     55     """Initializes the _Test object.
     56 
     57     Args:
     58       test: The test.
     59       tries: Number of tries so far.
     60     """
     61     self.test = test
     62     self.tries = tries
     63 
     64 
     65 class _TestCollection(object):
     66   """A threadsafe collection of tests.
     67 
     68   Args:
     69     tests: List of tests to put in the collection.
     70   """
     71 
     72   def __init__(self, tests=[]):
     73     self._lock = threading.Lock()
     74     self._tests = []
     75     self._tests_in_progress = 0
     76     # Used to signal that an item is avaliable or all items have been handled.
     77     self._item_avaliable_or_all_done = threading.Event()
     78     for t in tests:
     79       self.add(t)
     80 
     81   def _pop(self):
     82     """Pop a test from the collection.
     83 
     84     Waits until a test is avaliable or all tests have been handled.
     85 
     86     Returns:
     87       A test or None if all tests have been handled.
     88     """
     89     while True:
     90       # Wait for a test to be avaliable or all tests to have been handled.
     91       self._item_avaliable_or_all_done.wait()
     92       with self._lock:
     93         # Check which of the two conditions triggered the signal.
     94         if self._tests_in_progress == 0:
     95           return None
     96         try:
     97           return self._tests.pop(0)
     98         except IndexError:
     99           # Another thread beat us to the avaliable test, wait again.
    100           self._item_avaliable_or_all_done.clear()
    101 
    102   def add(self, test):
    103     """Add an test to the collection.
    104 
    105     Args:
    106       test: A test to add.
    107     """
    108     with self._lock:
    109       self._tests.append(test)
    110       self._item_avaliable_or_all_done.set()
    111       self._tests_in_progress += 1
    112 
    113   def test_completed(self):
    114     """Indicate that a test has been fully handled."""
    115     with self._lock:
    116       self._tests_in_progress -= 1
    117       if self._tests_in_progress == 0:
    118         # All tests have been handled, signal all waiting threads.
    119         self._item_avaliable_or_all_done.set()
    120 
    121   def __iter__(self):
    122     """Iterate through tests in the collection until all have been handled."""
    123     while True:
    124       r = self._pop()
    125       if r is None:
    126         break
    127       yield r
    128 
    129   def __len__(self):
    130     """Return the number of tests currently in the collection."""
    131     return len(self._tests)
    132 
    133 
    134 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
    135                        num_retries, tag_results_with_device=False):
    136   """Runs tests from the test_collection until empty using the given runner.
    137 
    138   Adds TestRunResults objects to the out_results list and may add tests to the
    139   out_retry list.
    140 
    141   Args:
    142     runner: A TestRunner object used to run the tests.
    143     test_collection: A _TestCollection from which to get _Test objects to run.
    144     out_results: A list to add TestRunResults to.
    145     watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
    146     num_retries: Number of retries for a test.
    147     tag_results_with_device: If True, appends the name of the device on which
    148         the test was run to the test name. Used when replicating to identify
    149         which device ran each copy of the test, and to ensure each copy of the
    150         test is recorded separately.
    151   """
    152 
    153   def TagTestRunResults(test_run_results):
    154     """Tags all results with the last 4 digits of the device id.
    155 
    156     Used when replicating tests to distinguish the same tests run on different
    157     devices. We use a set to store test results, so the hash (generated from
    158     name and tag) must be unique to be considered different results.
    159     """
    160     new_test_run_results = base_test_result.TestRunResults()
    161     for test_result in test_run_results.GetAll():
    162       test_result.SetName('%s_%s' % (runner.device[-4:], test_result.GetName()))
    163       new_test_run_results.AddResult(test_result)
    164     return new_test_run_results
    165 
    166   for test in test_collection:
    167     watcher.Reset()
    168     try:
    169       if not android_commands.IsDeviceAttached(runner.device):
    170         # Device is unresponsive, stop handling tests on this device.
    171         msg = 'Device %s is unresponsive.' % runner.device
    172         logging.warning(msg)
    173         raise android_commands.errors.DeviceUnresponsiveError(msg)
    174       result, retry = runner.RunTest(test.test)
    175       if tag_results_with_device:
    176         result = TagTestRunResults(result)
    177       test.tries += 1
    178       if retry and test.tries <= num_retries:
    179         # Retry non-passing results, only record passing results.
    180         pass_results = base_test_result.TestRunResults()
    181         pass_results.AddResults(result.GetPass())
    182         out_results.append(pass_results)
    183         logging.warning('Will retry test, try #%s.' % test.tries)
    184         test_collection.add(_Test(test=retry, tries=test.tries))
    185       else:
    186         # All tests passed or retry limit reached. Either way, record results.
    187         out_results.append(result)
    188     except:
    189       # An unhandleable exception, ensure tests get run by another device and
    190       # reraise this exception on the main thread.
    191       test_collection.add(test)
    192       raise
    193     finally:
    194       # Retries count as separate tasks so always mark the popped test as done.
    195       test_collection.test_completed()
    196 
    197 
    198 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
    199   """Creates a test runner for each device and calls SetUp() in parallel.
    200 
    201   Note: if a device is unresponsive the corresponding TestRunner will not be
    202     added to out_runners.
    203 
    204   Args:
    205     runner_factory: Callable that takes a device and index and returns a
    206       TestRunner object.
    207     device: The device serial number to set up.
    208     out_runners: List to add the successfully set up TestRunner object.
    209     threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
    210   """
    211   try:
    212     index = threadsafe_counter.GetAndIncrement()
    213     logging.warning('Creating shard %s for device %s.', index, device)
    214     runner = runner_factory(device, index)
    215     runner.SetUp()
    216     out_runners.append(runner)
    217   except android_commands.errors.DeviceUnresponsiveError as e:
    218     logging.warning('Failed to create shard for %s: [%s]', device, e)
    219 
    220 
    221 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
    222                  tag_results_with_device=False):
    223   """Run all tests using the given TestRunners.
    224 
    225   Args:
    226     runners: A list of TestRunner objects.
    227     test_collection_factory: A callable to generate a _TestCollection object for
    228         each test runner.
    229     num_retries: Number of retries for a test.
    230     timeout: Watchdog timeout in seconds.
    231     tag_results_with_device: If True, appends the name of the device on which
    232         the test was run to the test name. Used when replicating to identify
    233         which device ran each copy of the test, and to ensure each copy of the
    234         test is recorded separately.
    235 
    236   Returns:
    237     A tuple of (TestRunResults object, exit code)
    238   """
    239   logging.warning('Running tests with %s test runners.' % (len(runners)))
    240   results = []
    241   exit_code = 0
    242   run_results = base_test_result.TestRunResults()
    243   watcher = watchdog_timer.WatchdogTimer(timeout)
    244   test_collections = [test_collection_factory() for _ in runners]
    245 
    246   threads = [
    247       reraiser_thread.ReraiserThread(
    248           _RunTestsFromQueue,
    249           [r, tc, results, watcher, num_retries, tag_results_with_device],
    250           name=r.device[-4:])
    251       for r, tc in zip(runners, test_collections)]
    252 
    253   workers = reraiser_thread.ReraiserThreadGroup(threads)
    254   workers.StartAll()
    255 
    256   # Catch DeviceUnresponsiveErrors and set a warning exit code
    257   try:
    258     workers.JoinAll(watcher)
    259   except android_commands.errors.DeviceUnresponsiveError as e:
    260     logging.error(e)
    261     exit_code = constants.WARNING_EXIT_CODE
    262 
    263   assert all([len(tc) == 0 for tc in test_collections]), (
    264       'Some tests were not run, all devices are likely offline (ran %d tests)' %
    265       len(run_results.GetAll()))
    266 
    267   for r in results:
    268     run_results.AddTestRunResults(r)
    269   if not run_results.DidRunPass():
    270     exit_code = constants.ERROR_EXIT_CODE
    271   return (run_results, exit_code)
    272 
    273 
    274 def _CreateRunners(runner_factory, devices, timeout=None):
    275   """Creates a test runner for each device and calls SetUp() in parallel.
    276 
    277   Note: if a device is unresponsive the corresponding TestRunner will not be
    278     included in the returned list.
    279 
    280   Args:
    281     runner_factory: Callable that takes a device and index and returns a
    282       TestRunner object.
    283     devices: List of device serial numbers as strings.
    284     timeout: Watchdog timeout in seconds, defaults to the default timeout.
    285 
    286   Returns:
    287     A list of TestRunner objects.
    288   """
    289   logging.warning('Creating %s test runners.' % len(devices))
    290   runners = []
    291   counter = _ThreadSafeCounter()
    292   threads = reraiser_thread.ReraiserThreadGroup(
    293       [reraiser_thread.ReraiserThread(_SetUp,
    294                                       [runner_factory, d, runners, counter],
    295                                       name=d[-4:])
    296        for d in devices])
    297   threads.StartAll()
    298   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
    299   return runners
    300 
    301 
    302 def _TearDownRunners(runners, timeout=None):
    303   """Calls TearDown() for each test runner in parallel.
    304 
    305   Args:
    306     runners: A list of TestRunner objects.
    307     timeout: Watchdog timeout in seconds, defaults to the default timeout.
    308   """
    309   threads = reraiser_thread.ReraiserThreadGroup(
    310       [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
    311        for r in runners])
    312   threads.StartAll()
    313   threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
    314 
    315 
    316 def RunTests(tests, runner_factory, devices, shard=True,
    317              test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
    318              num_retries=2):
    319   """Run all tests on attached devices, retrying tests that don't pass.
    320 
    321   Args:
    322     tests: List of tests to run.
    323     runner_factory: Callable that takes a device and index and returns a
    324         TestRunner object.
    325     devices: List of attached devices.
    326     shard: True if we should shard, False if we should replicate tests.
    327       - Sharding tests will distribute tests across all test runners through a
    328         shared test collection.
    329       - Replicating tests will copy all tests to each test runner through a
    330         unique test collection for each test runner.
    331     test_timeout: Watchdog timeout in seconds for running tests.
    332     setup_timeout: Watchdog timeout in seconds for creating and cleaning up
    333         test runners.
    334     num_retries: Number of retries for a test.
    335 
    336   Returns:
    337     A tuple of (base_test_result.TestRunResults object, exit code).
    338   """
    339   if not tests:
    340     logging.critical('No tests to run.')
    341     return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
    342 
    343   if shard:
    344     # Generate a shared _TestCollection object for all test runners, so they
    345     # draw from a common pool of tests.
    346     shared_test_collection = _TestCollection([_Test(t) for t in tests])
    347     test_collection_factory = lambda: shared_test_collection
    348     tag_results_with_device = False
    349     log_string = 'sharded across devices'
    350   else:
    351     # Generate a unique _TestCollection object for each test runner, but use
    352     # the same set of tests.
    353     test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
    354     tag_results_with_device = True
    355     log_string = 'replicated on each device'
    356 
    357   logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests))
    358   runners = _CreateRunners(runner_factory, devices, setup_timeout)
    359   try:
    360     return _RunAllTests(runners, test_collection_factory,
    361                         num_retries, test_timeout, tag_results_with_device)
    362   finally:
    363     try:
    364       _TearDownRunners(runners, setup_timeout)
    365     except android_commands.errors.DeviceUnresponsiveError as e:
    366       logging.warning('Device unresponsive during TearDown: [%s]', e)
    367