Home | History | Annotate | Download | only in controllers
      1 # Copyright (C) 2011 Google Inc. All rights reserved.
      2 #
      3 # Redistribution and use in source and binary forms, with or without
      4 # modification, are permitted provided that the following conditions are
      5 # met:
      6 #
      7 #     * Redistributions of source code must retain the above copyright
      8 # notice, this list of conditions and the following disclaimer.
      9 #     * Redistributions in binary form must reproduce the above
     10 # copyright notice, this list of conditions and the following disclaimer
     11 # in the documentation and/or other materials provided with the
     12 # distribution.
     13 #     * Neither the name of Google Inc. nor the names of its
     14 # contributors may be used to endorse or promote products derived from
     15 # this software without specific prior written permission.
     16 #
     17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28 
     29 import logging
     30 import math
     31 import threading
     32 import time
     33 
     34 from webkitpy.common import message_pool
     35 from webkitpy.layout_tests.controllers import single_test_runner
     36 from webkitpy.layout_tests.models.test_run_results import TestRunResults
     37 from webkitpy.layout_tests.models import test_expectations
     38 from webkitpy.layout_tests.models import test_failures
     39 from webkitpy.layout_tests.models import test_results
     40 from webkitpy.tool import grammar
     41 
     42 
     43 _log = logging.getLogger(__name__)
     44 
     45 
     46 TestExpectations = test_expectations.TestExpectations
     47 
     48 # Export this so callers don't need to know about message pools.
     49 WorkerException = message_pool.WorkerException
     50 
     51 
     52 class TestRunInterruptedException(Exception):
     53     """Raised when a test run should be stopped immediately."""
     54     def __init__(self, reason):
     55         Exception.__init__(self)
     56         self.reason = reason
     57         self.msg = reason
     58 
     59     def __reduce__(self):
     60         return self.__class__, (self.reason,)
     61 
     62 
     63 class LayoutTestRunner(object):
     64     def __init__(self, options, port, printer, results_directory, test_is_slow_fn):
     65         self._options = options
     66         self._port = port
     67         self._printer = printer
     68         self._results_directory = results_directory
     69         self._test_is_slow = test_is_slow_fn
     70         self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards)
     71         self._filesystem = self._port.host.filesystem
     72 
     73         self._expectations = None
     74         self._test_inputs = []
     75         self._retrying = False
     76 
     77         self._current_run_results = None
     78 
     79     def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
     80         self._expectations = expectations
     81         self._test_inputs = test_inputs
     82         self._retrying = retrying
     83 
     84         # FIXME: rename all variables to test_run_results or some such ...
     85         run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
     86         self._current_run_results = run_results
     87         self._printer.num_tests = len(test_inputs)
     88         self._printer.num_completed = 0
     89 
     90         if not retrying:
     91             self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type)
     92 
     93         for test_name in set(tests_to_skip):
     94             result = test_results.TestResult(test_name)
     95             result.type = test_expectations.SKIP
     96             run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
     97 
     98         self._printer.write_update('Sharding tests ...')
     99         locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
    100 
    101         # We don't have a good way to coordinate the workers so that they don't
    102         # try to run the shards that need a lock. The easiest solution is to
    103         # run all of the locked shards first.
    104         all_shards = locked_shards + unlocked_shards
    105         num_workers = min(num_workers, len(all_shards))
    106         self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards))
    107 
    108         if self._options.dry_run:
    109             return run_results
    110 
    111         self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
    112 
    113         start_time = time.time()
    114         try:
    115             with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
    116                 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
    117         except TestRunInterruptedException, e:
    118             _log.warning(e.reason)
    119             run_results.interrupted = True
    120         except KeyboardInterrupt:
    121             self._printer.flush()
    122             self._printer.writeln('Interrupted, exiting ...')
    123             raise
    124         except Exception, e:
    125             _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
    126             raise
    127         finally:
    128             run_results.run_time = time.time() - start_time
    129 
    130         return run_results
    131 
    132     def _worker_factory(self, worker_connection):
    133         results_directory = self._results_directory
    134         if self._retrying:
    135             self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
    136             results_directory = self._filesystem.join(self._results_directory, 'retries')
    137         return Worker(worker_connection, results_directory, self._options)
    138 
    139     def _mark_interrupted_tests_as_skipped(self, run_results):
    140         for test_input in self._test_inputs:
    141             if test_input.test_name not in run_results.results_by_name:
    142                 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
    143                 # FIXME: We probably need to loop here if there are multiple iterations.
    144                 # FIXME: Also, these results are really neither expected nor unexpected. We probably
    145                 # need a third type of result.
    146                 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
    147 
    148     def _interrupt_if_at_failure_limits(self, run_results):
    149         # Note: The messages in this method are constructed to match old-run-webkit-tests
    150         # so that existing buildbot grep rules work.
    151         def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
    152             if limit and failure_count >= limit:
    153                 message += " %d tests run." % (run_results.expected + run_results.unexpected)
    154                 self._mark_interrupted_tests_as_skipped(run_results)
    155                 raise TestRunInterruptedException(message)
    156 
    157         interrupt_if_at_failure_limit(
    158             self._options.exit_after_n_failures,
    159             run_results.unexpected_failures,
    160             run_results,
    161             "Exiting early after %d failures." % run_results.unexpected_failures)
    162         interrupt_if_at_failure_limit(
    163             self._options.exit_after_n_crashes_or_timeouts,
    164             run_results.unexpected_crashes + run_results.unexpected_timeouts,
    165             run_results,
    166             # This differs from ORWT because it does not include WebProcess crashes.
    167             "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
    168 
    169     def _update_summary_with_result(self, run_results, result):
    170         expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type)
    171         exp_str = self._expectations.get_expectations_string(result.test_name)
    172         got_str = self._expectations.expectation_to_string(result.type)
    173 
    174         run_results.add(result, expected, self._test_is_slow(result.test_name))
    175 
    176         self._printer.print_finished_test(result, expected, exp_str, got_str)
    177 
    178         self._interrupt_if_at_failure_limits(run_results)
    179 
    180     def handle(self, name, source, *args):
    181         method = getattr(self, '_handle_' + name)
    182         if method:
    183             return method(source, *args)
    184         raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
    185 
    186     def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
    187         self._printer.print_started_test(test_input.test_name)
    188 
    189     def _handle_finished_test_list(self, worker_name, list_name):
    190         pass
    191 
    192     def _handle_finished_test(self, worker_name, result, log_messages=[]):
    193         self._update_summary_with_result(self._current_run_results, result)
    194 
    195 
    196 class Worker(object):
    197     def __init__(self, caller, results_directory, options):
    198         self._caller = caller
    199         self._worker_number = caller.worker_number
    200         self._name = caller.name
    201         self._results_directory = results_directory
    202         self._options = options
    203 
    204         # The remaining fields are initialized in start()
    205         self._host = None
    206         self._port = None
    207         self._batch_size = None
    208         self._batch_count = None
    209         self._filesystem = None
    210         self._driver = None
    211         self._num_tests = 0
    212 
    213     def __del__(self):
    214         self.stop()
    215 
    216     def start(self):
    217         """This method is called when the object is starting to be used and it is safe
    218         for the object to create state that does not need to be pickled (usually this means
    219         it is called in a child process)."""
    220         self._host = self._caller.host
    221         self._filesystem = self._host.filesystem
    222         self._port = self._host.port_factory.get(self._options.platform, self._options)
    223 
    224         self._batch_count = 0
    225         self._batch_size = self._options.batch_size or 0
    226 
    227     def handle(self, name, source, test_list_name, test_inputs):
    228         assert name == 'test_list'
    229         for test_input in test_inputs:
    230             self._run_test(test_input, test_list_name)
    231         self._caller.post('finished_test_list', test_list_name)
    232 
    233     def _update_test_input(self, test_input):
    234         if test_input.reference_files is None:
    235             # Lazy initialization.
    236             test_input.reference_files = self._port.reference_files(test_input.test_name)
    237         if test_input.reference_files:
    238             test_input.should_run_pixel_test = True
    239         else:
    240             test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
    241 
    242     def _run_test(self, test_input, shard_name):
    243         self._batch_count += 1
    244 
    245         stop_when_done = False
    246         if self._batch_size > 0 and self._batch_count >= self._batch_size:
    247             self._batch_count = 0
    248             stop_when_done = True
    249 
    250         self._update_test_input(test_input)
    251         test_timeout_sec = self._timeout(test_input)
    252         start = time.time()
    253         self._caller.post('started_test', test_input, test_timeout_sec)
    254 
    255         result = self._run_test_with_timeout(test_input, test_timeout_sec, stop_when_done)
    256         result.shard_name = shard_name
    257         result.worker_name = self._name
    258         result.total_run_time = time.time() - start
    259         result.test_number = self._num_tests
    260         self._num_tests += 1
    261 
    262         self._caller.post('finished_test', result)
    263 
    264         self._clean_up_after_test(test_input, result)
    265 
    266     def stop(self):
    267         _log.debug("%s cleaning up" % self._name)
    268         self._kill_driver()
    269 
    270     def _timeout(self, test_input):
    271         """Compute the appropriate timeout value for a test."""
    272         # The driver watchdog uses 2.5x the timeout; we want to be
    273         # larger than that. We also add a little more padding if we're
    274         # running tests in a separate thread.
    275         #
    276         # Note that we need to convert the test timeout from a
    277         # string value in milliseconds to a float for Python.
    278         driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
    279         if not self._options.run_singly:
    280             return driver_timeout_sec
    281 
    282         thread_padding_sec = 1.0
    283         thread_timeout_sec = driver_timeout_sec + thread_padding_sec
    284         return thread_timeout_sec
    285 
    286     def _kill_driver(self):
    287         # Be careful about how and when we kill the driver; if driver.stop()
    288         # raises an exception, this routine may get re-entered via __del__.
    289         driver = self._driver
    290         self._driver = None
    291         if driver:
    292             _log.debug("%s killing driver" % self._name)
    293             driver.stop()
    294 
    295     def _run_test_with_timeout(self, test_input, timeout, stop_when_done):
    296         if self._options.run_singly:
    297             return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
    298         return self._run_test_in_this_thread(test_input, stop_when_done)
    299 
    300     def _clean_up_after_test(self, test_input, result):
    301         test_name = test_input.test_name
    302 
    303         if result.failures:
    304             # Check and kill the driver if we need to.
    305             if any([f.driver_needs_restart() for f in result.failures]):
    306                 self._kill_driver()
    307                 # Reset the batch count since the shell just bounced.
    308                 self._batch_count = 0
    309 
    310             # Print the error message(s).
    311             _log.debug("%s %s failed:" % (self._name, test_name))
    312             for f in result.failures:
    313                 _log.debug("%s  %s" % (self._name, f.message()))
    314         elif result.type == test_expectations.SKIP:
    315             _log.debug("%s %s skipped" % (self._name, test_name))
    316         else:
    317             _log.debug("%s %s passed" % (self._name, test_name))
    318 
    319     def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
    320         """Run a test in a separate thread, enforcing a hard time limit.
    321 
    322         Since we can only detect the termination of a thread, not any internal
    323         state or progress, we can only run per-test timeouts when running test
    324         files singly.
    325 
    326         Args:
    327           test_input: Object containing the test filename and timeout
    328           thread_timeout_sec: time to wait before killing the driver process.
    329         Returns:
    330           A TestResult
    331         """
    332         worker = self
    333 
    334         driver = self._port.create_driver(self._worker_number)
    335 
    336         class SingleTestThread(threading.Thread):
    337             def __init__(self):
    338                 threading.Thread.__init__(self)
    339                 self.result = None
    340 
    341             def run(self):
    342                 self.result = worker._run_single_test(driver, test_input, stop_when_done)
    343 
    344         thread = SingleTestThread()
    345         thread.start()
    346         thread.join(thread_timeout_sec)
    347         result = thread.result
    348         failures = []
    349         if thread.isAlive():
    350             # If join() returned with the thread still running, the
    351             # driver is completely hung and there's nothing
    352             # more we can do with it.  We have to kill all the
    353             # drivers to free it up. If we're running more than
    354             # one driver thread, we'll end up killing the other
    355             # drivers too, introducing spurious crashes. We accept
    356             # that tradeoff in order to avoid losing the rest of this
    357             # thread's results.
    358             _log.error('Test thread hung: killing all drivers')
    359             failures = [test_failures.FailureTimeout()]
    360 
    361         driver.stop()
    362 
    363         if not result:
    364             result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0)
    365         return result
    366 
    367     def _run_test_in_this_thread(self, test_input, stop_when_done):
    368         """Run a single test file using a shared driver process.
    369 
    370         Args:
    371           test_input: Object containing the test filename, uri and timeout
    372 
    373         Returns: a TestResult object.
    374         """
    375         if self._driver and self._driver.has_crashed():
    376             self._kill_driver()
    377         if not self._driver:
    378             self._driver = self._port.create_driver(self._worker_number)
    379         return self._run_single_test(self._driver, test_input, stop_when_done)
    380 
    381     def _run_single_test(self, driver, test_input, stop_when_done):
    382         return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
    383             self._name, driver, test_input, stop_when_done)
    384 
    385 
    386 class TestShard(object):
    387     """A test shard is a named list of TestInputs."""
    388 
    389     def __init__(self, name, test_inputs):
    390         self.name = name
    391         self.test_inputs = test_inputs
    392         self.requires_lock = test_inputs[0].requires_lock
    393 
    394     def __repr__(self):
    395         return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock)
    396 
    397     def __eq__(self, other):
    398         return self.name == other.name and self.test_inputs == other.test_inputs
    399 
    400 
    401 class Sharder(object):
    402     def __init__(self, test_split_fn, max_locked_shards):
    403         self._split = test_split_fn
    404         self._max_locked_shards = max_locked_shards
    405 
    406     def shard_tests(self, test_inputs, num_workers, fully_parallel):
    407         """Groups tests into batches.
    408         This helps ensure that tests that depend on each other (aka bad tests!)
    409         continue to run together as most cross-tests dependencies tend to
    410         occur within the same directory.
    411         Return:
    412             Two list of TestShards. The first contains tests that must only be
    413             run under the server lock, the second can be run whenever.
    414         """
    415 
    416         # FIXME: Move all of the sharding logic out of manager into its
    417         # own class or module. Consider grouping it with the chunking logic
    418         # in prepare_lists as well.
    419         if num_workers == 1:
    420             return self._shard_in_two(test_inputs)
    421         elif fully_parallel:
    422             return self._shard_every_file(test_inputs)
    423         return self._shard_by_directory(test_inputs, num_workers)
    424 
    425     def _shard_in_two(self, test_inputs):
    426         """Returns two lists of shards, one with all the tests requiring a lock and one with the rest.
    427 
    428         This is used when there's only one worker, to minimize the per-shard overhead."""
    429         locked_inputs = []
    430         unlocked_inputs = []
    431         for test_input in test_inputs:
    432             if test_input.requires_lock:
    433                 locked_inputs.append(test_input)
    434             else:
    435                 unlocked_inputs.append(test_input)
    436 
    437         locked_shards = []
    438         unlocked_shards = []
    439         if locked_inputs:
    440             locked_shards = [TestShard('locked_tests', locked_inputs)]
    441         if unlocked_inputs:
    442             unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)]
    443 
    444         return locked_shards, unlocked_shards
    445 
    446     def _shard_every_file(self, test_inputs):
    447         """Returns two lists of shards, each shard containing a single test file.
    448 
    449         This mode gets maximal parallelism at the cost of much higher flakiness."""
    450         locked_shards = []
    451         unlocked_shards = []
    452         for test_input in test_inputs:
    453             # Note that we use a '.' for the shard name; the name doesn't really
    454             # matter, and the only other meaningful value would be the filename,
    455             # which would be really redundant.
    456             if test_input.requires_lock:
    457                 locked_shards.append(TestShard('.', [test_input]))
    458             else:
    459                 unlocked_shards.append(TestShard('.', [test_input]))
    460 
    461         return locked_shards, unlocked_shards
    462 
    463     def _shard_by_directory(self, test_inputs, num_workers):
    464         """Returns two lists of shards, each shard containing all the files in a directory.
    465 
    466         This is the default mode, and gets as much parallelism as we can while
    467         minimizing flakiness caused by inter-test dependencies."""
    468         locked_shards = []
    469         unlocked_shards = []
    470         unlocked_slow_shards = []
    471         tests_by_dir = {}
    472         # FIXME: Given that the tests are already sorted by directory,
    473         # we can probably rewrite this to be clearer and faster.
    474         for test_input in test_inputs:
    475             directory = self._split(test_input.test_name)[0]
    476             tests_by_dir.setdefault(directory, [])
    477             tests_by_dir[directory].append(test_input)
    478 
    479         for directory, test_inputs in tests_by_dir.iteritems():
    480             shard = TestShard(directory, test_inputs)
    481             if test_inputs[0].requires_lock:
    482                 locked_shards.append(shard)
    483             # In practice, virtual test suites are slow to run. It's a bit hacky, but
    484             # put them first since they're the long-tail of test runtime.
    485             elif directory.startswith('virtual'):
    486                 unlocked_slow_shards.append(shard)
    487             else:
    488                 unlocked_shards.append(shard)
    489 
    490         # Sort the shards by directory name.
    491         locked_shards.sort(key=lambda shard: shard.name)
    492         unlocked_slow_shards.sort(key=lambda shard: shard.name)
    493         unlocked_shards.sort(key=lambda shard: shard.name)
    494 
    495         # Put a ceiling on the number of locked shards, so that we
    496         # don't hammer the servers too badly.
    497 
    498         # FIXME: For now, limit to one shard or set it
    499         # with the --max-locked-shards. After testing to make sure we
    500         # can handle multiple shards, we should probably do something like
    501         # limit this to no more than a quarter of all workers, e.g.:
    502         # return max(math.ceil(num_workers / 4.0), 1)
    503         return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'),
    504                 unlocked_slow_shards + unlocked_shards)
    505 
    506     def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix):
    507         """Takes a list of shards and redistributes the tests into no more
    508         than |max_new_shards| new shards."""
    509 
    510         # This implementation assumes that each input shard only contains tests from a
    511         # single directory, and that tests in each shard must remain together; as a
    512         # result, a given input shard is never split between output shards.
    513         #
    514         # Each output shard contains the tests from one or more input shards and
    515         # hence may contain tests from multiple directories.
    516 
    517         def divide_and_round_up(numerator, divisor):
    518             return int(math.ceil(float(numerator) / divisor))
    519 
    520         def extract_and_flatten(shards):
    521             test_inputs = []
    522             for shard in shards:
    523                 test_inputs.extend(shard.test_inputs)
    524             return test_inputs
    525 
    526         def split_at(seq, index):
    527             return (seq[:index], seq[index:])
    528 
    529         num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards)
    530         new_shards = []
    531         remaining_shards = old_shards
    532         while remaining_shards:
    533             some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new)
    534             new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards)))
    535         return new_shards
    536