Home | History | Annotate | Download | only in controllers
      1 # Copyright (C) 2011 Google Inc. All rights reserved.
      2 #
      3 # Redistribution and use in source and binary forms, with or without
      4 # modification, are permitted provided that the following conditions are
      5 # met:
      6 #
      7 #     * Redistributions of source code must retain the above copyright
      8 # notice, this list of conditions and the following disclaimer.
      9 #     * Redistributions in binary form must reproduce the above
     10 # copyright notice, this list of conditions and the following disclaimer
     11 # in the documentation and/or other materials provided with the
     12 # distribution.
     13 #     * Neither the name of Google Inc. nor the names of its
     14 # contributors may be used to endorse or promote products derived from
     15 # this software without specific prior written permission.
     16 #
     17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28 
     29 import logging
     30 import math
     31 import threading
     32 import time
     33 
     34 from webkitpy.common import message_pool
     35 from webkitpy.layout_tests.controllers import single_test_runner
     36 from webkitpy.layout_tests.models.test_run_results import TestRunResults
     37 from webkitpy.layout_tests.models import test_expectations
     38 from webkitpy.layout_tests.models import test_failures
     39 from webkitpy.layout_tests.models import test_results
     40 from webkitpy.tool import grammar
     41 
     42 
     43 _log = logging.getLogger(__name__)
     44 
     45 
     46 TestExpectations = test_expectations.TestExpectations
     47 
     48 # Export this so callers don't need to know about message pools.
     49 WorkerException = message_pool.WorkerException
     50 
     51 
     52 class TestRunInterruptedException(Exception):
     53     """Raised when a test run should be stopped immediately."""
     54     def __init__(self, reason):
     55         Exception.__init__(self)
     56         self.reason = reason
     57         self.msg = reason
     58 
     59     def __reduce__(self):
     60         return self.__class__, (self.reason,)
     61 
     62 
     63 class LayoutTestRunner(object):
     64     def __init__(self, options, port, printer, results_directory, test_is_slow_fn):
     65         self._options = options
     66         self._port = port
     67         self._printer = printer
     68         self._results_directory = results_directory
     69         self._test_is_slow = test_is_slow_fn
     70         self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards)
     71         self._filesystem = self._port.host.filesystem
     72 
     73         self._expectations = None
     74         self._test_inputs = []
     75         self._retrying = False
     76 
     77         self._current_run_results = None
     78 
     79     def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
     80         self._expectations = expectations
     81         self._test_inputs = test_inputs
     82         self._retrying = retrying
     83         self._shards_to_redo = []
     84 
     85         # FIXME: rename all variables to test_run_results or some such ...
     86         run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
     87         self._current_run_results = run_results
     88         self._printer.num_tests = len(test_inputs)
     89         self._printer.num_completed = 0
     90 
     91         if not retrying:
     92             self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type)
     93 
     94         for test_name in set(tests_to_skip):
     95             result = test_results.TestResult(test_name)
     96             result.type = test_expectations.SKIP
     97             run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
     98 
     99         self._printer.write_update('Sharding tests ...')
    100         locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
    101 
    102         # We don't have a good way to coordinate the workers so that they don't
    103         # try to run the shards that need a lock. The easiest solution is to
    104         # run all of the locked shards first.
    105         all_shards = locked_shards + unlocked_shards
    106         num_workers = min(num_workers, len(all_shards))
    107         self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards))
    108 
    109         if self._options.dry_run:
    110             return run_results
    111 
    112         self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
    113 
    114         start_time = time.time()
    115         try:
    116             with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool:
    117                 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
    118 
    119             if self._shards_to_redo:
    120                 num_workers -= len(self._shards_to_redo)
    121                 if num_workers > 0:
    122                     with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool:
    123                         pool.run(('test_list', shard.name, shard.test_inputs) for shard in self._shards_to_redo)
    124         except TestRunInterruptedException, e:
    125             _log.warning(e.reason)
    126             run_results.interrupted = True
    127         except KeyboardInterrupt:
    128             self._printer.flush()
    129             self._printer.writeln('Interrupted, exiting ...')
    130             run_results.keyboard_interrupted = True
    131         except Exception, e:
    132             _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
    133             raise
    134         finally:
    135             run_results.run_time = time.time() - start_time
    136 
    137         return run_results
    138 
    139     def _worker_factory(self, worker_connection):
    140         results_directory = self._results_directory
    141         if self._retrying:
    142             self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
    143             results_directory = self._filesystem.join(self._results_directory, 'retries')
    144         return Worker(worker_connection, results_directory, self._options)
    145 
    146     def _mark_interrupted_tests_as_skipped(self, run_results):
    147         for test_input in self._test_inputs:
    148             if test_input.test_name not in run_results.results_by_name:
    149                 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
    150                 # FIXME: We probably need to loop here if there are multiple iterations.
    151                 # FIXME: Also, these results are really neither expected nor unexpected. We probably
    152                 # need a third type of result.
    153                 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
    154 
    155     def _interrupt_if_at_failure_limits(self, run_results):
    156         # Note: The messages in this method are constructed to match old-run-webkit-tests
    157         # so that existing buildbot grep rules work.
    158         def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
    159             if limit and failure_count >= limit:
    160                 message += " %d tests run." % (run_results.expected + run_results.unexpected)
    161                 self._mark_interrupted_tests_as_skipped(run_results)
    162                 raise TestRunInterruptedException(message)
    163 
    164         interrupt_if_at_failure_limit(
    165             self._options.exit_after_n_failures,
    166             run_results.unexpected_failures,
    167             run_results,
    168             "Exiting early after %d failures." % run_results.unexpected_failures)
    169         interrupt_if_at_failure_limit(
    170             self._options.exit_after_n_crashes_or_timeouts,
    171             run_results.unexpected_crashes + run_results.unexpected_timeouts,
    172             run_results,
    173             # This differs from ORWT because it does not include WebProcess crashes.
    174             "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
    175 
    176     def _update_summary_with_result(self, run_results, result):
    177         expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type)
    178         exp_str = self._expectations.get_expectations_string(result.test_name)
    179         got_str = self._expectations.expectation_to_string(result.type)
    180 
    181         if result.device_failed:
    182             self._printer.print_finished_test(result, False, exp_str, "Aborted")
    183             return
    184 
    185         run_results.add(result, expected, self._test_is_slow(result.test_name))
    186         self._printer.print_finished_test(result, expected, exp_str, got_str)
    187         self._interrupt_if_at_failure_limits(run_results)
    188 
    189     def handle(self, name, source, *args):
    190         method = getattr(self, '_handle_' + name)
    191         if method:
    192             return method(source, *args)
    193         raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
    194 
    195     def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
    196         self._printer.print_started_test(test_input.test_name)
    197 
    198     def _handle_finished_test_list(self, worker_name, list_name):
    199         pass
    200 
    201     def _handle_finished_test(self, worker_name, result, log_messages=[]):
    202         self._update_summary_with_result(self._current_run_results, result)
    203 
    204     def _handle_device_failed(self, worker_name, list_name, remaining_tests):
    205         _log.warning("%s has failed" % worker_name)
    206         if remaining_tests:
    207             self._shards_to_redo.append(TestShard(list_name, remaining_tests))
    208 
    209 class Worker(object):
    210     def __init__(self, caller, results_directory, options):
    211         self._caller = caller
    212         self._worker_number = caller.worker_number
    213         self._name = caller.name
    214         self._results_directory = results_directory
    215         self._options = options
    216 
    217         # The remaining fields are initialized in start()
    218         self._host = None
    219         self._port = None
    220         self._batch_size = None
    221         self._batch_count = None
    222         self._filesystem = None
    223         self._driver = None
    224         self._num_tests = 0
    225 
    226     def __del__(self):
    227         self.stop()
    228 
    229     def start(self):
    230         """This method is called when the object is starting to be used and it is safe
    231         for the object to create state that does not need to be pickled (usually this means
    232         it is called in a child process)."""
    233         self._host = self._caller.host
    234         self._filesystem = self._host.filesystem
    235         self._port = self._host.port_factory.get(self._options.platform, self._options)
    236 
    237         self._batch_count = 0
    238         self._batch_size = self._options.batch_size or 0
    239 
    240     def handle(self, name, source, test_list_name, test_inputs):
    241         assert name == 'test_list'
    242         for i, test_input in enumerate(test_inputs):
    243             device_failed = self._run_test(test_input, test_list_name)
    244             if device_failed:
    245                 self._caller.post('device_failed', test_list_name, test_inputs[i:])
    246                 self._caller.stop_running()
    247                 return
    248 
    249         self._caller.post('finished_test_list', test_list_name)
    250 
    251     def _update_test_input(self, test_input):
    252         if test_input.reference_files is None:
    253             # Lazy initialization.
    254             test_input.reference_files = self._port.reference_files(test_input.test_name)
    255         if test_input.reference_files:
    256             test_input.should_run_pixel_test = True
    257         else:
    258             test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
    259 
    260     def _run_test(self, test_input, shard_name):
    261         self._batch_count += 1
    262 
    263         stop_when_done = False
    264         if self._batch_size > 0 and self._batch_count >= self._batch_size:
    265             self._batch_count = 0
    266             stop_when_done = True
    267 
    268         self._update_test_input(test_input)
    269         test_timeout_sec = self._timeout(test_input)
    270         start = time.time()
    271         device_failed = False
    272 
    273         if self._driver and self._driver.has_crashed():
    274             self._kill_driver()
    275         if not self._driver:
    276             self._driver = self._port.create_driver(self._worker_number)
    277 
    278         if not self._driver:
    279             # FIXME: Is this the best way to handle a device crashing in the middle of the test, or should we create
    280             # a new failure type?
    281             device_failed = True
    282             return device_failed
    283 
    284         self._caller.post('started_test', test_input, test_timeout_sec)
    285         result = single_test_runner.run_single_test(self._port, self._options, self._results_directory,
    286             self._name, self._driver, test_input, stop_when_done)
    287 
    288         result.shard_name = shard_name
    289         result.worker_name = self._name
    290         result.total_run_time = time.time() - start
    291         result.test_number = self._num_tests
    292         self._num_tests += 1
    293         self._caller.post('finished_test', result)
    294         self._clean_up_after_test(test_input, result)
    295         return result.device_failed
    296 
    297     def stop(self):
    298         _log.debug("%s cleaning up" % self._name)
    299         self._kill_driver()
    300 
    301     def _timeout(self, test_input):
    302         """Compute the appropriate timeout value for a test."""
    303         # The driver watchdog uses 2.5x the timeout; we want to be
    304         # larger than that. We also add a little more padding if we're
    305         # running tests in a separate thread.
    306         #
    307         # Note that we need to convert the test timeout from a
    308         # string value in milliseconds to a float for Python.
    309 
    310         # FIXME: Can we just return the test_input.timeout now?
    311         driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
    312 
    313     def _kill_driver(self):
    314         # Be careful about how and when we kill the driver; if driver.stop()
    315         # raises an exception, this routine may get re-entered via __del__.
    316         driver = self._driver
    317         self._driver = None
    318         if driver:
    319             _log.debug("%s killing driver" % self._name)
    320             driver.stop()
    321 
    322 
    323     def _clean_up_after_test(self, test_input, result):
    324         test_name = test_input.test_name
    325 
    326         if result.failures:
    327             # Check and kill the driver if we need to.
    328             if any([f.driver_needs_restart() for f in result.failures]):
    329                 self._kill_driver()
    330                 # Reset the batch count since the shell just bounced.
    331                 self._batch_count = 0
    332 
    333             # Print the error message(s).
    334             _log.debug("%s %s failed:" % (self._name, test_name))
    335             for f in result.failures:
    336                 _log.debug("%s  %s" % (self._name, f.message()))
    337         elif result.type == test_expectations.SKIP:
    338             _log.debug("%s %s skipped" % (self._name, test_name))
    339         else:
    340             _log.debug("%s %s passed" % (self._name, test_name))
    341 
    342 
    343 class TestShard(object):
    344     """A test shard is a named list of TestInputs."""
    345 
    346     def __init__(self, name, test_inputs):
    347         self.name = name
    348         self.test_inputs = test_inputs
    349         self.requires_lock = test_inputs[0].requires_lock
    350 
    351     def __repr__(self):
    352         return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock)
    353 
    354     def __eq__(self, other):
    355         return self.name == other.name and self.test_inputs == other.test_inputs
    356 
    357 
    358 class Sharder(object):
    359     def __init__(self, test_split_fn, max_locked_shards):
    360         self._split = test_split_fn
    361         self._max_locked_shards = max_locked_shards
    362 
    363     def shard_tests(self, test_inputs, num_workers, fully_parallel):
    364         """Groups tests into batches.
    365         This helps ensure that tests that depend on each other (aka bad tests!)
    366         continue to run together as most cross-tests dependencies tend to
    367         occur within the same directory.
    368         Return:
    369             Two list of TestShards. The first contains tests that must only be
    370             run under the server lock, the second can be run whenever.
    371         """
    372 
    373         # FIXME: Move all of the sharding logic out of manager into its
    374         # own class or module. Consider grouping it with the chunking logic
    375         # in prepare_lists as well.
    376         if num_workers == 1:
    377             return self._shard_in_two(test_inputs)
    378         elif fully_parallel:
    379             return self._shard_every_file(test_inputs)
    380         return self._shard_by_directory(test_inputs)
    381 
    382     def _shard_in_two(self, test_inputs):
    383         """Returns two lists of shards, one with all the tests requiring a lock and one with the rest.
    384 
    385         This is used when there's only one worker, to minimize the per-shard overhead."""
    386         locked_inputs = []
    387         unlocked_inputs = []
    388         for test_input in test_inputs:
    389             if test_input.requires_lock:
    390                 locked_inputs.append(test_input)
    391             else:
    392                 unlocked_inputs.append(test_input)
    393 
    394         locked_shards = []
    395         unlocked_shards = []
    396         if locked_inputs:
    397             locked_shards = [TestShard('locked_tests', locked_inputs)]
    398         if unlocked_inputs:
    399             unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)]
    400 
    401         return locked_shards, unlocked_shards
    402 
    403     def _shard_every_file(self, test_inputs):
    404         """Returns two lists of shards, each shard containing a single test file.
    405 
    406         This mode gets maximal parallelism at the cost of much higher flakiness."""
    407         locked_shards = []
    408         unlocked_shards = []
    409         virtual_inputs = []
    410 
    411         for test_input in test_inputs:
    412             # Note that we use a '.' for the shard name; the name doesn't really
    413             # matter, and the only other meaningful value would be the filename,
    414             # which would be really redundant.
    415             if test_input.requires_lock:
    416                 locked_shards.append(TestShard('.', [test_input]))
    417             elif test_input.test_name.startswith('virtual'):
    418                 # This violates the spirit of sharding every file, but in practice, since the
    419                 # virtual test suites require a different commandline flag and thus a restart
    420                 # of content_shell, it's too slow to shard them fully.
    421                 virtual_inputs.append(test_input)
    422             else:
    423                 unlocked_shards.append(TestShard('.', [test_input]))
    424 
    425         locked_virtual_shards, unlocked_virtual_shards = self._shard_by_directory(virtual_inputs)
    426 
    427         # The locked shards still need to be limited to self._max_locked_shards in order to not
    428         # overload the http server for the http tests.
    429         return (self._resize_shards(locked_virtual_shards + locked_shards, self._max_locked_shards, 'locked_shard'),
    430             unlocked_virtual_shards + unlocked_shards)
    431 
    432     def _shard_by_directory(self, test_inputs):
    433         """Returns two lists of shards, each shard containing all the files in a directory.
    434 
    435         This is the default mode, and gets as much parallelism as we can while
    436         minimizing flakiness caused by inter-test dependencies."""
    437         locked_shards = []
    438         unlocked_shards = []
    439         unlocked_slow_shards = []
    440         tests_by_dir = {}
    441         # FIXME: Given that the tests are already sorted by directory,
    442         # we can probably rewrite this to be clearer and faster.
    443         for test_input in test_inputs:
    444             directory = self._split(test_input.test_name)[0]
    445             tests_by_dir.setdefault(directory, [])
    446             tests_by_dir[directory].append(test_input)
    447 
    448         for directory, test_inputs in tests_by_dir.iteritems():
    449             shard = TestShard(directory, test_inputs)
    450             if test_inputs[0].requires_lock:
    451                 locked_shards.append(shard)
    452             # In practice, virtual test suites are slow to run. It's a bit hacky, but
    453             # put them first since they're the long-tail of test runtime.
    454             elif directory.startswith('virtual'):
    455                 unlocked_slow_shards.append(shard)
    456             else:
    457                 unlocked_shards.append(shard)
    458 
    459         # Sort the shards by directory name.
    460         locked_shards.sort(key=lambda shard: shard.name)
    461         unlocked_slow_shards.sort(key=lambda shard: shard.name)
    462         unlocked_shards.sort(key=lambda shard: shard.name)
    463 
    464         # Put a ceiling on the number of locked shards, so that we
    465         # don't hammer the servers too badly.
    466 
    467         # FIXME: For now, limit to one shard or set it
    468         # with the --max-locked-shards. After testing to make sure we
    469         # can handle multiple shards, we should probably do something like
    470         # limit this to no more than a quarter of all workers, e.g.:
    471         # return max(math.ceil(num_workers / 4.0), 1)
    472         return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'),
    473                 unlocked_slow_shards + unlocked_shards)
    474 
    475     def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix):
    476         """Takes a list of shards and redistributes the tests into no more
    477         than |max_new_shards| new shards."""
    478 
    479         # This implementation assumes that each input shard only contains tests from a
    480         # single directory, and that tests in each shard must remain together; as a
    481         # result, a given input shard is never split between output shards.
    482         #
    483         # Each output shard contains the tests from one or more input shards and
    484         # hence may contain tests from multiple directories.
    485 
    486         def divide_and_round_up(numerator, divisor):
    487             return int(math.ceil(float(numerator) / divisor))
    488 
    489         def extract_and_flatten(shards):
    490             test_inputs = []
    491             for shard in shards:
    492                 test_inputs.extend(shard.test_inputs)
    493             return test_inputs
    494 
    495         def split_at(seq, index):
    496             return (seq[:index], seq[index:])
    497 
    498         num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards)
    499         new_shards = []
    500         remaining_shards = old_shards
    501         while remaining_shards:
    502             some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new)
    503             new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards)))
    504         return new_shards
    505