Home | History | Annotate | Download | only in controllers
      1 # Copyright (C) 2011 Google Inc. All rights reserved.
      2 #
      3 # Redistribution and use in source and binary forms, with or without
      4 # modification, are permitted provided that the following conditions are
      5 # met:
      6 #
      7 #     * Redistributions of source code must retain the above copyright
      8 # notice, this list of conditions and the following disclaimer.
      9 #     * Redistributions in binary form must reproduce the above
     10 # copyright notice, this list of conditions and the following disclaimer
     11 # in the documentation and/or other materials provided with the
     12 # distribution.
     13 #     * Neither the name of Google Inc. nor the names of its
     14 # contributors may be used to endorse or promote products derived from
     15 # this software without specific prior written permission.
     16 #
     17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28 
     29 import logging
     30 import math
     31 import threading
     32 import time
     33 
     34 from webkitpy.common import message_pool
     35 from webkitpy.layout_tests.controllers import single_test_runner
     36 from webkitpy.layout_tests.models.test_run_results import TestRunResults
     37 from webkitpy.layout_tests.models import test_expectations
     38 from webkitpy.layout_tests.models import test_failures
     39 from webkitpy.layout_tests.models import test_results
     40 from webkitpy.tool import grammar
     41 
     42 
     43 _log = logging.getLogger(__name__)
     44 
     45 
     46 TestExpectations = test_expectations.TestExpectations
     47 
     48 # Export this so callers don't need to know about message pools.
     49 WorkerException = message_pool.WorkerException
     50 
     51 
     52 class TestRunInterruptedException(Exception):
     53     """Raised when a test run should be stopped immediately."""
     54     def __init__(self, reason):
     55         Exception.__init__(self)
     56         self.reason = reason
     57         self.msg = reason
     58 
     59     def __reduce__(self):
     60         return self.__class__, (self.reason,)
     61 
     62 
     63 class LayoutTestRunner(object):
     64     def __init__(self, options, port, printer, results_directory, test_is_slow_fn):
     65         self._options = options
     66         self._port = port
     67         self._printer = printer
     68         self._results_directory = results_directory
     69         self._test_is_slow = test_is_slow_fn
     70         self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards)
     71         self._filesystem = self._port.host.filesystem
     72 
     73         self._expectations = None
     74         self._test_inputs = []
     75         self._retrying = False
     76 
     77         self._current_run_results = None
     78 
     79     def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
     80         self._expectations = expectations
     81         self._test_inputs = test_inputs
     82         self._retrying = retrying
     83         self._shards_to_redo = []
     84 
     85         # FIXME: rename all variables to test_run_results or some such ...
     86         run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
     87         self._current_run_results = run_results
     88         self._printer.num_tests = len(test_inputs)
     89         self._printer.num_completed = 0
     90 
     91         if not retrying:
     92             self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type)
     93 
     94         for test_name in set(tests_to_skip):
     95             result = test_results.TestResult(test_name)
     96             result.type = test_expectations.SKIP
     97             run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
     98 
     99         self._printer.write_update('Sharding tests ...')
    100         locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs,
    101             int(self._options.child_processes), self._options.fully_parallel,
    102             self._options.run_singly or (self._options.batch_size == 1))
    103 
    104         # We don't have a good way to coordinate the workers so that they don't
    105         # try to run the shards that need a lock. The easiest solution is to
    106         # run all of the locked shards first.
    107         all_shards = locked_shards + unlocked_shards
    108         num_workers = min(num_workers, len(all_shards))
    109         self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards))
    110 
    111         if self._options.dry_run:
    112             return run_results
    113 
    114         self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
    115 
    116         start_time = time.time()
    117         try:
    118             with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool:
    119                 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
    120 
    121             if self._shards_to_redo:
    122                 num_workers -= len(self._shards_to_redo)
    123                 if num_workers > 0:
    124                     with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool:
    125                         pool.run(('test_list', shard.name, shard.test_inputs) for shard in self._shards_to_redo)
    126         except TestRunInterruptedException, e:
    127             _log.warning(e.reason)
    128             run_results.interrupted = True
    129         except KeyboardInterrupt:
    130             self._printer.flush()
    131             self._printer.writeln('Interrupted, exiting ...')
    132             run_results.keyboard_interrupted = True
    133         except Exception, e:
    134             _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
    135             raise
    136         finally:
    137             run_results.run_time = time.time() - start_time
    138 
    139         return run_results
    140 
    141     def _worker_factory(self, worker_connection):
    142         results_directory = self._results_directory
    143         if self._retrying:
    144             self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
    145             results_directory = self._filesystem.join(self._results_directory, 'retries')
    146         return Worker(worker_connection, results_directory, self._options)
    147 
    148     def _mark_interrupted_tests_as_skipped(self, run_results):
    149         for test_input in self._test_inputs:
    150             if test_input.test_name not in run_results.results_by_name:
    151                 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
    152                 # FIXME: We probably need to loop here if there are multiple iterations.
    153                 # FIXME: Also, these results are really neither expected nor unexpected. We probably
    154                 # need a third type of result.
    155                 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
    156 
    157     def _interrupt_if_at_failure_limits(self, run_results):
    158         # Note: The messages in this method are constructed to match old-run-webkit-tests
    159         # so that existing buildbot grep rules work.
    160         def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
    161             if limit and failure_count >= limit:
    162                 message += " %d tests run." % (run_results.expected + run_results.unexpected)
    163                 self._mark_interrupted_tests_as_skipped(run_results)
    164                 raise TestRunInterruptedException(message)
    165 
    166         interrupt_if_at_failure_limit(
    167             self._options.exit_after_n_failures,
    168             run_results.unexpected_failures,
    169             run_results,
    170             "Exiting early after %d failures." % run_results.unexpected_failures)
    171         interrupt_if_at_failure_limit(
    172             self._options.exit_after_n_crashes_or_timeouts,
    173             run_results.unexpected_crashes + run_results.unexpected_timeouts,
    174             run_results,
    175             # This differs from ORWT because it does not include WebProcess crashes.
    176             "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
    177 
    178     def _update_summary_with_result(self, run_results, result):
    179         expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type, self._options.enable_sanitizer)
    180         exp_str = self._expectations.get_expectations_string(result.test_name)
    181         got_str = self._expectations.expectation_to_string(result.type)
    182 
    183         if result.device_failed:
    184             self._printer.print_finished_test(result, False, exp_str, "Aborted")
    185             return
    186 
    187         run_results.add(result, expected, self._test_is_slow(result.test_name))
    188         self._printer.print_finished_test(result, expected, exp_str, got_str)
    189         self._interrupt_if_at_failure_limits(run_results)
    190 
    191     def handle(self, name, source, *args):
    192         method = getattr(self, '_handle_' + name)
    193         if method:
    194             return method(source, *args)
    195         raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
    196 
    197     def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
    198         self._printer.print_started_test(test_input.test_name)
    199 
    200     def _handle_finished_test_list(self, worker_name, list_name):
    201         pass
    202 
    203     def _handle_finished_test(self, worker_name, result, log_messages=[]):
    204         self._update_summary_with_result(self._current_run_results, result)
    205 
    206     def _handle_device_failed(self, worker_name, list_name, remaining_tests):
    207         _log.warning("%s has failed" % worker_name)
    208         if remaining_tests:
    209             self._shards_to_redo.append(TestShard(list_name, remaining_tests))
    210 
    211 class Worker(object):
    212     def __init__(self, caller, results_directory, options):
    213         self._caller = caller
    214         self._worker_number = caller.worker_number
    215         self._name = caller.name
    216         self._results_directory = results_directory
    217         self._options = options
    218 
    219         # The remaining fields are initialized in start()
    220         self._host = None
    221         self._port = None
    222         self._batch_size = None
    223         self._batch_count = None
    224         self._filesystem = None
    225         self._driver = None
    226         self._num_tests = 0
    227 
    228     def __del__(self):
    229         self.stop()
    230 
    231     def start(self):
    232         """This method is called when the object is starting to be used and it is safe
    233         for the object to create state that does not need to be pickled (usually this means
    234         it is called in a child process)."""
    235         self._host = self._caller.host
    236         self._filesystem = self._host.filesystem
    237         self._port = self._host.port_factory.get(self._options.platform, self._options)
    238 
    239         self._batch_count = 0
    240         self._batch_size = self._options.batch_size or 0
    241 
    242     def handle(self, name, source, test_list_name, test_inputs):
    243         assert name == 'test_list'
    244         for i, test_input in enumerate(test_inputs):
    245             device_failed = self._run_test(test_input, test_list_name)
    246             if device_failed:
    247                 self._caller.post('device_failed', test_list_name, test_inputs[i:])
    248                 self._caller.stop_running()
    249                 return
    250 
    251         self._caller.post('finished_test_list', test_list_name)
    252 
    253     def _update_test_input(self, test_input):
    254         if test_input.reference_files is None:
    255             # Lazy initialization.
    256             test_input.reference_files = self._port.reference_files(test_input.test_name)
    257         if test_input.reference_files:
    258             test_input.should_run_pixel_test = True
    259         else:
    260             test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
    261 
    262     def _run_test(self, test_input, shard_name):
    263         self._batch_count += 1
    264 
    265         stop_when_done = False
    266         if self._batch_size > 0 and self._batch_count >= self._batch_size:
    267             self._batch_count = 0
    268             stop_when_done = True
    269 
    270         self._update_test_input(test_input)
    271         test_timeout_sec = self._timeout(test_input)
    272         start = time.time()
    273         device_failed = False
    274 
    275         if self._driver and self._driver.has_crashed():
    276             self._kill_driver()
    277         if not self._driver:
    278             self._driver = self._port.create_driver(self._worker_number)
    279 
    280         if not self._driver:
    281             # FIXME: Is this the best way to handle a device crashing in the middle of the test, or should we create
    282             # a new failure type?
    283             device_failed = True
    284             return device_failed
    285 
    286         self._caller.post('started_test', test_input, test_timeout_sec)
    287         result = single_test_runner.run_single_test(self._port, self._options, self._results_directory,
    288             self._name, self._driver, test_input, stop_when_done)
    289 
    290         result.shard_name = shard_name
    291         result.worker_name = self._name
    292         result.total_run_time = time.time() - start
    293         result.test_number = self._num_tests
    294         self._num_tests += 1
    295         self._caller.post('finished_test', result)
    296         self._clean_up_after_test(test_input, result)
    297         return result.device_failed
    298 
    299     def stop(self):
    300         _log.debug("%s cleaning up" % self._name)
    301         self._kill_driver()
    302 
    303     def _timeout(self, test_input):
    304         """Compute the appropriate timeout value for a test."""
    305         # The driver watchdog uses 2.5x the timeout; we want to be
    306         # larger than that. We also add a little more padding if we're
    307         # running tests in a separate thread.
    308         #
    309         # Note that we need to convert the test timeout from a
    310         # string value in milliseconds to a float for Python.
    311 
    312         # FIXME: Can we just return the test_input.timeout now?
    313         driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
    314 
    315     def _kill_driver(self):
    316         # Be careful about how and when we kill the driver; if driver.stop()
    317         # raises an exception, this routine may get re-entered via __del__.
    318         driver = self._driver
    319         self._driver = None
    320         if driver:
    321             _log.debug("%s killing driver" % self._name)
    322             driver.stop()
    323 
    324 
    325     def _clean_up_after_test(self, test_input, result):
    326         test_name = test_input.test_name
    327 
    328         if result.failures:
    329             # Check and kill the driver if we need to.
    330             if any([f.driver_needs_restart() for f in result.failures]):
    331                 self._kill_driver()
    332                 # Reset the batch count since the shell just bounced.
    333                 self._batch_count = 0
    334 
    335             # Print the error message(s).
    336             _log.debug("%s %s failed:" % (self._name, test_name))
    337             for f in result.failures:
    338                 _log.debug("%s  %s" % (self._name, f.message()))
    339         elif result.type == test_expectations.SKIP:
    340             _log.debug("%s %s skipped" % (self._name, test_name))
    341         else:
    342             _log.debug("%s %s passed" % (self._name, test_name))
    343 
    344 
    345 class TestShard(object):
    346     """A test shard is a named list of TestInputs."""
    347 
    348     def __init__(self, name, test_inputs):
    349         self.name = name
    350         self.test_inputs = test_inputs
    351         self.requires_lock = test_inputs[0].requires_lock
    352 
    353     def __repr__(self):
    354         return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock)
    355 
    356     def __eq__(self, other):
    357         return self.name == other.name and self.test_inputs == other.test_inputs
    358 
    359 
    360 class Sharder(object):
    361     def __init__(self, test_split_fn, max_locked_shards):
    362         self._split = test_split_fn
    363         self._max_locked_shards = max_locked_shards
    364 
    365     def shard_tests(self, test_inputs, num_workers, fully_parallel, run_singly):
    366         """Groups tests into batches.
    367         This helps ensure that tests that depend on each other (aka bad tests!)
    368         continue to run together as most cross-tests dependencies tend to
    369         occur within the same directory.
    370         Return:
    371             Two list of TestShards. The first contains tests that must only be
    372             run under the server lock, the second can be run whenever.
    373         """
    374 
    375         # FIXME: Move all of the sharding logic out of manager into its
    376         # own class or module. Consider grouping it with the chunking logic
    377         # in prepare_lists as well.
    378         if num_workers == 1:
    379             return self._shard_in_two(test_inputs)
    380         elif fully_parallel:
    381             return self._shard_every_file(test_inputs, run_singly)
    382         return self._shard_by_directory(test_inputs)
    383 
    384     def _shard_in_two(self, test_inputs):
    385         """Returns two lists of shards, one with all the tests requiring a lock and one with the rest.
    386 
    387         This is used when there's only one worker, to minimize the per-shard overhead."""
    388         locked_inputs = []
    389         unlocked_inputs = []
    390         for test_input in test_inputs:
    391             if test_input.requires_lock:
    392                 locked_inputs.append(test_input)
    393             else:
    394                 unlocked_inputs.append(test_input)
    395 
    396         locked_shards = []
    397         unlocked_shards = []
    398         if locked_inputs:
    399             locked_shards = [TestShard('locked_tests', locked_inputs)]
    400         if unlocked_inputs:
    401             unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)]
    402 
    403         return locked_shards, unlocked_shards
    404 
    405     def _shard_every_file(self, test_inputs, run_singly):
    406         """Returns two lists of shards, each shard containing a single test file.
    407 
    408         This mode gets maximal parallelism at the cost of much higher flakiness."""
    409         locked_shards = []
    410         unlocked_shards = []
    411         virtual_inputs = []
    412 
    413         for test_input in test_inputs:
    414             # Note that we use a '.' for the shard name; the name doesn't really
    415             # matter, and the only other meaningful value would be the filename,
    416             # which would be really redundant.
    417             if test_input.requires_lock:
    418                 locked_shards.append(TestShard('.', [test_input]))
    419             elif test_input.test_name.startswith('virtual') and not run_singly:
    420                 # This violates the spirit of sharding every file, but in practice, since the
    421                 # virtual test suites require a different commandline flag and thus a restart
    422                 # of content_shell, it's too slow to shard them fully.
    423                 virtual_inputs.append(test_input)
    424             else:
    425                 unlocked_shards.append(TestShard('.', [test_input]))
    426 
    427         locked_virtual_shards, unlocked_virtual_shards = self._shard_by_directory(virtual_inputs)
    428 
    429         # The locked shards still need to be limited to self._max_locked_shards in order to not
    430         # overload the http server for the http tests.
    431         return (self._resize_shards(locked_virtual_shards + locked_shards, self._max_locked_shards, 'locked_shard'),
    432             unlocked_virtual_shards + unlocked_shards)
    433 
    434     def _shard_by_directory(self, test_inputs):
    435         """Returns two lists of shards, each shard containing all the files in a directory.
    436 
    437         This is the default mode, and gets as much parallelism as we can while
    438         minimizing flakiness caused by inter-test dependencies."""
    439         locked_shards = []
    440         unlocked_shards = []
    441         unlocked_slow_shards = []
    442         tests_by_dir = {}
    443         # FIXME: Given that the tests are already sorted by directory,
    444         # we can probably rewrite this to be clearer and faster.
    445         for test_input in test_inputs:
    446             directory = self._split(test_input.test_name)[0]
    447             tests_by_dir.setdefault(directory, [])
    448             tests_by_dir[directory].append(test_input)
    449 
    450         for directory, test_inputs in tests_by_dir.iteritems():
    451             shard = TestShard(directory, test_inputs)
    452             if test_inputs[0].requires_lock:
    453                 locked_shards.append(shard)
    454             # In practice, virtual test suites are slow to run. It's a bit hacky, but
    455             # put them first since they're the long-tail of test runtime.
    456             elif directory.startswith('virtual'):
    457                 unlocked_slow_shards.append(shard)
    458             else:
    459                 unlocked_shards.append(shard)
    460 
    461         # Sort the shards by directory name.
    462         locked_shards.sort(key=lambda shard: shard.name)
    463         unlocked_slow_shards.sort(key=lambda shard: shard.name)
    464         unlocked_shards.sort(key=lambda shard: shard.name)
    465 
    466         # Put a ceiling on the number of locked shards, so that we
    467         # don't hammer the servers too badly.
    468 
    469         # FIXME: For now, limit to one shard or set it
    470         # with the --max-locked-shards. After testing to make sure we
    471         # can handle multiple shards, we should probably do something like
    472         # limit this to no more than a quarter of all workers, e.g.:
    473         # return max(math.ceil(num_workers / 4.0), 1)
    474         return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'),
    475                 unlocked_slow_shards + unlocked_shards)
    476 
    477     def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix):
    478         """Takes a list of shards and redistributes the tests into no more
    479         than |max_new_shards| new shards."""
    480 
    481         # This implementation assumes that each input shard only contains tests from a
    482         # single directory, and that tests in each shard must remain together; as a
    483         # result, a given input shard is never split between output shards.
    484         #
    485         # Each output shard contains the tests from one or more input shards and
    486         # hence may contain tests from multiple directories.
    487 
    488         def divide_and_round_up(numerator, divisor):
    489             return int(math.ceil(float(numerator) / divisor))
    490 
    491         def extract_and_flatten(shards):
    492             test_inputs = []
    493             for shard in shards:
    494                 test_inputs.extend(shard.test_inputs)
    495             return test_inputs
    496 
    497         def split_at(seq, index):
    498             return (seq[:index], seq[index:])
    499 
    500         num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards)
    501         new_shards = []
    502         remaining_shards = old_shards
    503         while remaining_shards:
    504             some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new)
    505             new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards)))
    506         return new_shards
    507