1 # Copyright (C) 2011 Google Inc. All rights reserved. 2 # 3 # Redistribution and use in source and binary forms, with or without 4 # modification, are permitted provided that the following conditions are 5 # met: 6 # 7 # * Redistributions of source code must retain the above copyright 8 # notice, this list of conditions and the following disclaimer. 9 # * Redistributions in binary form must reproduce the above 10 # copyright notice, this list of conditions and the following disclaimer 11 # in the documentation and/or other materials provided with the 12 # distribution. 13 # * Neither the name of Google Inc. nor the names of its 14 # contributors may be used to endorse or promote products derived from 15 # this software without specific prior written permission. 16 # 17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 29 import logging 30 import math 31 import threading 32 import time 33 34 from webkitpy.common import message_pool 35 from webkitpy.layout_tests.controllers import single_test_runner 36 from webkitpy.layout_tests.models.test_run_results import TestRunResults 37 from webkitpy.layout_tests.models import test_expectations 38 from webkitpy.layout_tests.models import test_failures 39 from webkitpy.layout_tests.models import test_results 40 from webkitpy.tool import grammar 41 42 43 _log = logging.getLogger(__name__) 44 45 46 TestExpectations = test_expectations.TestExpectations 47 48 # Export this so callers don't need to know about message pools. 49 WorkerException = message_pool.WorkerException 50 51 52 class TestRunInterruptedException(Exception): 53 """Raised when a test run should be stopped immediately.""" 54 def __init__(self, reason): 55 Exception.__init__(self) 56 self.reason = reason 57 self.msg = reason 58 59 def __reduce__(self): 60 return self.__class__, (self.reason,) 61 62 63 class LayoutTestRunner(object): 64 def __init__(self, options, port, printer, results_directory, test_is_slow_fn): 65 self._options = options 66 self._port = port 67 self._printer = printer 68 self._results_directory = results_directory 69 self._test_is_slow = test_is_slow_fn 70 self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards) 71 self._filesystem = self._port.host.filesystem 72 73 self._expectations = None 74 self._test_inputs = [] 75 self._retrying = False 76 77 self._current_run_results = None 78 79 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying): 80 self._expectations = expectations 81 self._test_inputs = test_inputs 82 self._retrying = retrying 83 self._shards_to_redo = [] 84 85 # FIXME: rename all variables to test_run_results or some such ... 86 run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip)) 87 self._current_run_results = run_results 88 self._printer.num_tests = len(test_inputs) 89 self._printer.num_completed = 0 90 91 if not retrying: 92 self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type) 93 94 for test_name in set(tests_to_skip): 95 result = test_results.TestResult(test_name) 96 result.type = test_expectations.SKIP 97 run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name)) 98 99 self._printer.write_update('Sharding tests ...') 100 locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, 101 int(self._options.child_processes), self._options.fully_parallel, 102 self._options.run_singly or (self._options.batch_size == 1)) 103 104 # We don't have a good way to coordinate the workers so that they don't 105 # try to run the shards that need a lock. The easiest solution is to 106 # run all of the locked shards first. 107 all_shards = locked_shards + unlocked_shards 108 num_workers = min(num_workers, len(all_shards)) 109 self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards)) 110 111 if self._options.dry_run: 112 return run_results 113 114 self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers)) 115 116 start_time = time.time() 117 try: 118 with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool: 119 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards) 120 121 if self._shards_to_redo: 122 num_workers -= len(self._shards_to_redo) 123 if num_workers > 0: 124 with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool: 125 pool.run(('test_list', shard.name, shard.test_inputs) for shard in self._shards_to_redo) 126 except TestRunInterruptedException, e: 127 _log.warning(e.reason) 128 run_results.interrupted = True 129 except KeyboardInterrupt: 130 self._printer.flush() 131 self._printer.writeln('Interrupted, exiting ...') 132 run_results.keyboard_interrupted = True 133 except Exception, e: 134 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e))) 135 raise 136 finally: 137 run_results.run_time = time.time() - start_time 138 139 return run_results 140 141 def _worker_factory(self, worker_connection): 142 results_directory = self._results_directory 143 if self._retrying: 144 self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries')) 145 results_directory = self._filesystem.join(self._results_directory, 'retries') 146 return Worker(worker_connection, results_directory, self._options) 147 148 def _mark_interrupted_tests_as_skipped(self, run_results): 149 for test_input in self._test_inputs: 150 if test_input.test_name not in run_results.results_by_name: 151 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()]) 152 # FIXME: We probably need to loop here if there are multiple iterations. 153 # FIXME: Also, these results are really neither expected nor unexpected. We probably 154 # need a third type of result. 155 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name)) 156 157 def _interrupt_if_at_failure_limits(self, run_results): 158 # Note: The messages in this method are constructed to match old-run-webkit-tests 159 # so that existing buildbot grep rules work. 160 def interrupt_if_at_failure_limit(limit, failure_count, run_results, message): 161 if limit and failure_count >= limit: 162 message += " %d tests run." % (run_results.expected + run_results.unexpected) 163 self._mark_interrupted_tests_as_skipped(run_results) 164 raise TestRunInterruptedException(message) 165 166 interrupt_if_at_failure_limit( 167 self._options.exit_after_n_failures, 168 run_results.unexpected_failures, 169 run_results, 170 "Exiting early after %d failures." % run_results.unexpected_failures) 171 interrupt_if_at_failure_limit( 172 self._options.exit_after_n_crashes_or_timeouts, 173 run_results.unexpected_crashes + run_results.unexpected_timeouts, 174 run_results, 175 # This differs from ORWT because it does not include WebProcess crashes. 176 "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts)) 177 178 def _update_summary_with_result(self, run_results, result): 179 expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type, self._options.enable_sanitizer) 180 exp_str = self._expectations.get_expectations_string(result.test_name) 181 got_str = self._expectations.expectation_to_string(result.type) 182 183 if result.device_failed: 184 self._printer.print_finished_test(result, False, exp_str, "Aborted") 185 return 186 187 run_results.add(result, expected, self._test_is_slow(result.test_name)) 188 self._printer.print_finished_test(result, expected, exp_str, got_str) 189 self._interrupt_if_at_failure_limits(run_results) 190 191 def handle(self, name, source, *args): 192 method = getattr(self, '_handle_' + name) 193 if method: 194 return method(source, *args) 195 raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args))) 196 197 def _handle_started_test(self, worker_name, test_input, test_timeout_sec): 198 self._printer.print_started_test(test_input.test_name) 199 200 def _handle_finished_test_list(self, worker_name, list_name): 201 pass 202 203 def _handle_finished_test(self, worker_name, result, log_messages=[]): 204 self._update_summary_with_result(self._current_run_results, result) 205 206 def _handle_device_failed(self, worker_name, list_name, remaining_tests): 207 _log.warning("%s has failed" % worker_name) 208 if remaining_tests: 209 self._shards_to_redo.append(TestShard(list_name, remaining_tests)) 210 211 class Worker(object): 212 def __init__(self, caller, results_directory, options): 213 self._caller = caller 214 self._worker_number = caller.worker_number 215 self._name = caller.name 216 self._results_directory = results_directory 217 self._options = options 218 219 # The remaining fields are initialized in start() 220 self._host = None 221 self._port = None 222 self._batch_size = None 223 self._batch_count = None 224 self._filesystem = None 225 self._driver = None 226 self._num_tests = 0 227 228 def __del__(self): 229 self.stop() 230 231 def start(self): 232 """This method is called when the object is starting to be used and it is safe 233 for the object to create state that does not need to be pickled (usually this means 234 it is called in a child process).""" 235 self._host = self._caller.host 236 self._filesystem = self._host.filesystem 237 self._port = self._host.port_factory.get(self._options.platform, self._options) 238 239 self._batch_count = 0 240 self._batch_size = self._options.batch_size or 0 241 242 def handle(self, name, source, test_list_name, test_inputs): 243 assert name == 'test_list' 244 for i, test_input in enumerate(test_inputs): 245 device_failed = self._run_test(test_input, test_list_name) 246 if device_failed: 247 self._caller.post('device_failed', test_list_name, test_inputs[i:]) 248 self._caller.stop_running() 249 return 250 251 self._caller.post('finished_test_list', test_list_name) 252 253 def _update_test_input(self, test_input): 254 if test_input.reference_files is None: 255 # Lazy initialization. 256 test_input.reference_files = self._port.reference_files(test_input.test_name) 257 if test_input.reference_files: 258 test_input.should_run_pixel_test = True 259 else: 260 test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input) 261 262 def _run_test(self, test_input, shard_name): 263 self._batch_count += 1 264 265 stop_when_done = False 266 if self._batch_size > 0 and self._batch_count >= self._batch_size: 267 self._batch_count = 0 268 stop_when_done = True 269 270 self._update_test_input(test_input) 271 test_timeout_sec = self._timeout(test_input) 272 start = time.time() 273 device_failed = False 274 275 if self._driver and self._driver.has_crashed(): 276 self._kill_driver() 277 if not self._driver: 278 self._driver = self._port.create_driver(self._worker_number) 279 280 if not self._driver: 281 # FIXME: Is this the best way to handle a device crashing in the middle of the test, or should we create 282 # a new failure type? 283 device_failed = True 284 return device_failed 285 286 self._caller.post('started_test', test_input, test_timeout_sec) 287 result = single_test_runner.run_single_test(self._port, self._options, self._results_directory, 288 self._name, self._driver, test_input, stop_when_done) 289 290 result.shard_name = shard_name 291 result.worker_name = self._name 292 result.total_run_time = time.time() - start 293 result.test_number = self._num_tests 294 self._num_tests += 1 295 self._caller.post('finished_test', result) 296 self._clean_up_after_test(test_input, result) 297 return result.device_failed 298 299 def stop(self): 300 _log.debug("%s cleaning up" % self._name) 301 self._kill_driver() 302 303 def _timeout(self, test_input): 304 """Compute the appropriate timeout value for a test.""" 305 # The driver watchdog uses 2.5x the timeout; we want to be 306 # larger than that. We also add a little more padding if we're 307 # running tests in a separate thread. 308 # 309 # Note that we need to convert the test timeout from a 310 # string value in milliseconds to a float for Python. 311 312 # FIXME: Can we just return the test_input.timeout now? 313 driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0 314 315 def _kill_driver(self): 316 # Be careful about how and when we kill the driver; if driver.stop() 317 # raises an exception, this routine may get re-entered via __del__. 318 driver = self._driver 319 self._driver = None 320 if driver: 321 _log.debug("%s killing driver" % self._name) 322 driver.stop() 323 324 325 def _clean_up_after_test(self, test_input, result): 326 test_name = test_input.test_name 327 328 if result.failures: 329 # Check and kill the driver if we need to. 330 if any([f.driver_needs_restart() for f in result.failures]): 331 self._kill_driver() 332 # Reset the batch count since the shell just bounced. 333 self._batch_count = 0 334 335 # Print the error message(s). 336 _log.debug("%s %s failed:" % (self._name, test_name)) 337 for f in result.failures: 338 _log.debug("%s %s" % (self._name, f.message())) 339 elif result.type == test_expectations.SKIP: 340 _log.debug("%s %s skipped" % (self._name, test_name)) 341 else: 342 _log.debug("%s %s passed" % (self._name, test_name)) 343 344 345 class TestShard(object): 346 """A test shard is a named list of TestInputs.""" 347 348 def __init__(self, name, test_inputs): 349 self.name = name 350 self.test_inputs = test_inputs 351 self.requires_lock = test_inputs[0].requires_lock 352 353 def __repr__(self): 354 return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock) 355 356 def __eq__(self, other): 357 return self.name == other.name and self.test_inputs == other.test_inputs 358 359 360 class Sharder(object): 361 def __init__(self, test_split_fn, max_locked_shards): 362 self._split = test_split_fn 363 self._max_locked_shards = max_locked_shards 364 365 def shard_tests(self, test_inputs, num_workers, fully_parallel, run_singly): 366 """Groups tests into batches. 367 This helps ensure that tests that depend on each other (aka bad tests!) 368 continue to run together as most cross-tests dependencies tend to 369 occur within the same directory. 370 Return: 371 Two list of TestShards. The first contains tests that must only be 372 run under the server lock, the second can be run whenever. 373 """ 374 375 # FIXME: Move all of the sharding logic out of manager into its 376 # own class or module. Consider grouping it with the chunking logic 377 # in prepare_lists as well. 378 if num_workers == 1: 379 return self._shard_in_two(test_inputs) 380 elif fully_parallel: 381 return self._shard_every_file(test_inputs, run_singly) 382 return self._shard_by_directory(test_inputs) 383 384 def _shard_in_two(self, test_inputs): 385 """Returns two lists of shards, one with all the tests requiring a lock and one with the rest. 386 387 This is used when there's only one worker, to minimize the per-shard overhead.""" 388 locked_inputs = [] 389 unlocked_inputs = [] 390 for test_input in test_inputs: 391 if test_input.requires_lock: 392 locked_inputs.append(test_input) 393 else: 394 unlocked_inputs.append(test_input) 395 396 locked_shards = [] 397 unlocked_shards = [] 398 if locked_inputs: 399 locked_shards = [TestShard('locked_tests', locked_inputs)] 400 if unlocked_inputs: 401 unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)] 402 403 return locked_shards, unlocked_shards 404 405 def _shard_every_file(self, test_inputs, run_singly): 406 """Returns two lists of shards, each shard containing a single test file. 407 408 This mode gets maximal parallelism at the cost of much higher flakiness.""" 409 locked_shards = [] 410 unlocked_shards = [] 411 virtual_inputs = [] 412 413 for test_input in test_inputs: 414 # Note that we use a '.' for the shard name; the name doesn't really 415 # matter, and the only other meaningful value would be the filename, 416 # which would be really redundant. 417 if test_input.requires_lock: 418 locked_shards.append(TestShard('.', [test_input])) 419 elif test_input.test_name.startswith('virtual') and not run_singly: 420 # This violates the spirit of sharding every file, but in practice, since the 421 # virtual test suites require a different commandline flag and thus a restart 422 # of content_shell, it's too slow to shard them fully. 423 virtual_inputs.append(test_input) 424 else: 425 unlocked_shards.append(TestShard('.', [test_input])) 426 427 locked_virtual_shards, unlocked_virtual_shards = self._shard_by_directory(virtual_inputs) 428 429 # The locked shards still need to be limited to self._max_locked_shards in order to not 430 # overload the http server for the http tests. 431 return (self._resize_shards(locked_virtual_shards + locked_shards, self._max_locked_shards, 'locked_shard'), 432 unlocked_virtual_shards + unlocked_shards) 433 434 def _shard_by_directory(self, test_inputs): 435 """Returns two lists of shards, each shard containing all the files in a directory. 436 437 This is the default mode, and gets as much parallelism as we can while 438 minimizing flakiness caused by inter-test dependencies.""" 439 locked_shards = [] 440 unlocked_shards = [] 441 unlocked_slow_shards = [] 442 tests_by_dir = {} 443 # FIXME: Given that the tests are already sorted by directory, 444 # we can probably rewrite this to be clearer and faster. 445 for test_input in test_inputs: 446 directory = self._split(test_input.test_name)[0] 447 tests_by_dir.setdefault(directory, []) 448 tests_by_dir[directory].append(test_input) 449 450 for directory, test_inputs in tests_by_dir.iteritems(): 451 shard = TestShard(directory, test_inputs) 452 if test_inputs[0].requires_lock: 453 locked_shards.append(shard) 454 # In practice, virtual test suites are slow to run. It's a bit hacky, but 455 # put them first since they're the long-tail of test runtime. 456 elif directory.startswith('virtual'): 457 unlocked_slow_shards.append(shard) 458 else: 459 unlocked_shards.append(shard) 460 461 # Sort the shards by directory name. 462 locked_shards.sort(key=lambda shard: shard.name) 463 unlocked_slow_shards.sort(key=lambda shard: shard.name) 464 unlocked_shards.sort(key=lambda shard: shard.name) 465 466 # Put a ceiling on the number of locked shards, so that we 467 # don't hammer the servers too badly. 468 469 # FIXME: For now, limit to one shard or set it 470 # with the --max-locked-shards. After testing to make sure we 471 # can handle multiple shards, we should probably do something like 472 # limit this to no more than a quarter of all workers, e.g.: 473 # return max(math.ceil(num_workers / 4.0), 1) 474 return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'), 475 unlocked_slow_shards + unlocked_shards) 476 477 def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix): 478 """Takes a list of shards and redistributes the tests into no more 479 than |max_new_shards| new shards.""" 480 481 # This implementation assumes that each input shard only contains tests from a 482 # single directory, and that tests in each shard must remain together; as a 483 # result, a given input shard is never split between output shards. 484 # 485 # Each output shard contains the tests from one or more input shards and 486 # hence may contain tests from multiple directories. 487 488 def divide_and_round_up(numerator, divisor): 489 return int(math.ceil(float(numerator) / divisor)) 490 491 def extract_and_flatten(shards): 492 test_inputs = [] 493 for shard in shards: 494 test_inputs.extend(shard.test_inputs) 495 return test_inputs 496 497 def split_at(seq, index): 498 return (seq[:index], seq[index:]) 499 500 num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) 501 new_shards = [] 502 remaining_shards = old_shards 503 while remaining_shards: 504 some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new) 505 new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards))) 506 return new_shards 507