1 # Copyright (C) 2011 Google Inc. All rights reserved. 2 # 3 # Redistribution and use in source and binary forms, with or without 4 # modification, are permitted provided that the following conditions are 5 # met: 6 # 7 # * Redistributions of source code must retain the above copyright 8 # notice, this list of conditions and the following disclaimer. 9 # * Redistributions in binary form must reproduce the above 10 # copyright notice, this list of conditions and the following disclaimer 11 # in the documentation and/or other materials provided with the 12 # distribution. 13 # * Neither the name of Google Inc. nor the names of its 14 # contributors may be used to endorse or promote products derived from 15 # this software without specific prior written permission. 16 # 17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 29 import logging 30 import math 31 import threading 32 import time 33 34 from webkitpy.common import message_pool 35 from webkitpy.layout_tests.controllers import single_test_runner 36 from webkitpy.layout_tests.models.test_run_results import TestRunResults 37 from webkitpy.layout_tests.models import test_expectations 38 from webkitpy.layout_tests.models import test_failures 39 from webkitpy.layout_tests.models import test_results 40 from webkitpy.tool import grammar 41 42 43 _log = logging.getLogger(__name__) 44 45 46 TestExpectations = test_expectations.TestExpectations 47 48 # Export this so callers don't need to know about message pools. 49 WorkerException = message_pool.WorkerException 50 51 52 class TestRunInterruptedException(Exception): 53 """Raised when a test run should be stopped immediately.""" 54 def __init__(self, reason): 55 Exception.__init__(self) 56 self.reason = reason 57 self.msg = reason 58 59 def __reduce__(self): 60 return self.__class__, (self.reason,) 61 62 63 class LayoutTestRunner(object): 64 def __init__(self, options, port, printer, results_directory, test_is_slow_fn): 65 self._options = options 66 self._port = port 67 self._printer = printer 68 self._results_directory = results_directory 69 self._test_is_slow = test_is_slow_fn 70 self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards) 71 self._filesystem = self._port.host.filesystem 72 73 self._expectations = None 74 self._test_inputs = [] 75 self._retrying = False 76 77 self._current_run_results = None 78 79 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying): 80 self._expectations = expectations 81 self._test_inputs = test_inputs 82 self._retrying = retrying 83 84 # FIXME: rename all variables to test_run_results or some such ... 85 run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip)) 86 self._current_run_results = run_results 87 self._printer.num_tests = len(test_inputs) 88 self._printer.num_completed = 0 89 90 if not retrying: 91 self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type) 92 93 for test_name in set(tests_to_skip): 94 result = test_results.TestResult(test_name) 95 result.type = test_expectations.SKIP 96 run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name)) 97 98 self._printer.write_update('Sharding tests ...') 99 locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel) 100 101 # We don't have a good way to coordinate the workers so that they don't 102 # try to run the shards that need a lock. The easiest solution is to 103 # run all of the locked shards first. 104 all_shards = locked_shards + unlocked_shards 105 num_workers = min(num_workers, len(all_shards)) 106 self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards)) 107 108 if self._options.dry_run: 109 return run_results 110 111 self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers)) 112 113 start_time = time.time() 114 try: 115 with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool: 116 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards) 117 except TestRunInterruptedException, e: 118 _log.warning(e.reason) 119 run_results.interrupted = True 120 except KeyboardInterrupt: 121 self._printer.flush() 122 self._printer.writeln('Interrupted, exiting ...') 123 raise 124 except Exception, e: 125 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e))) 126 raise 127 finally: 128 run_results.run_time = time.time() - start_time 129 130 return run_results 131 132 def _worker_factory(self, worker_connection): 133 results_directory = self._results_directory 134 if self._retrying: 135 self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries')) 136 results_directory = self._filesystem.join(self._results_directory, 'retries') 137 return Worker(worker_connection, results_directory, self._options) 138 139 def _mark_interrupted_tests_as_skipped(self, run_results): 140 for test_input in self._test_inputs: 141 if test_input.test_name not in run_results.results_by_name: 142 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()]) 143 # FIXME: We probably need to loop here if there are multiple iterations. 144 # FIXME: Also, these results are really neither expected nor unexpected. We probably 145 # need a third type of result. 146 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name)) 147 148 def _interrupt_if_at_failure_limits(self, run_results): 149 # Note: The messages in this method are constructed to match old-run-webkit-tests 150 # so that existing buildbot grep rules work. 151 def interrupt_if_at_failure_limit(limit, failure_count, run_results, message): 152 if limit and failure_count >= limit: 153 message += " %d tests run." % (run_results.expected + run_results.unexpected) 154 self._mark_interrupted_tests_as_skipped(run_results) 155 raise TestRunInterruptedException(message) 156 157 interrupt_if_at_failure_limit( 158 self._options.exit_after_n_failures, 159 run_results.unexpected_failures, 160 run_results, 161 "Exiting early after %d failures." % run_results.unexpected_failures) 162 interrupt_if_at_failure_limit( 163 self._options.exit_after_n_crashes_or_timeouts, 164 run_results.unexpected_crashes + run_results.unexpected_timeouts, 165 run_results, 166 # This differs from ORWT because it does not include WebProcess crashes. 167 "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts)) 168 169 def _update_summary_with_result(self, run_results, result): 170 expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type) 171 exp_str = self._expectations.get_expectations_string(result.test_name) 172 got_str = self._expectations.expectation_to_string(result.type) 173 174 run_results.add(result, expected, self._test_is_slow(result.test_name)) 175 176 self._printer.print_finished_test(result, expected, exp_str, got_str) 177 178 self._interrupt_if_at_failure_limits(run_results) 179 180 def handle(self, name, source, *args): 181 method = getattr(self, '_handle_' + name) 182 if method: 183 return method(source, *args) 184 raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args))) 185 186 def _handle_started_test(self, worker_name, test_input, test_timeout_sec): 187 self._printer.print_started_test(test_input.test_name) 188 189 def _handle_finished_test_list(self, worker_name, list_name): 190 pass 191 192 def _handle_finished_test(self, worker_name, result, log_messages=[]): 193 self._update_summary_with_result(self._current_run_results, result) 194 195 196 class Worker(object): 197 def __init__(self, caller, results_directory, options): 198 self._caller = caller 199 self._worker_number = caller.worker_number 200 self._name = caller.name 201 self._results_directory = results_directory 202 self._options = options 203 204 # The remaining fields are initialized in start() 205 self._host = None 206 self._port = None 207 self._batch_size = None 208 self._batch_count = None 209 self._filesystem = None 210 self._driver = None 211 self._num_tests = 0 212 213 def __del__(self): 214 self.stop() 215 216 def start(self): 217 """This method is called when the object is starting to be used and it is safe 218 for the object to create state that does not need to be pickled (usually this means 219 it is called in a child process).""" 220 self._host = self._caller.host 221 self._filesystem = self._host.filesystem 222 self._port = self._host.port_factory.get(self._options.platform, self._options) 223 224 self._batch_count = 0 225 self._batch_size = self._options.batch_size or 0 226 227 def handle(self, name, source, test_list_name, test_inputs): 228 assert name == 'test_list' 229 for test_input in test_inputs: 230 self._run_test(test_input, test_list_name) 231 self._caller.post('finished_test_list', test_list_name) 232 233 def _update_test_input(self, test_input): 234 if test_input.reference_files is None: 235 # Lazy initialization. 236 test_input.reference_files = self._port.reference_files(test_input.test_name) 237 if test_input.reference_files: 238 test_input.should_run_pixel_test = True 239 else: 240 test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input) 241 242 def _run_test(self, test_input, shard_name): 243 self._batch_count += 1 244 245 stop_when_done = False 246 if self._batch_size > 0 and self._batch_count >= self._batch_size: 247 self._batch_count = 0 248 stop_when_done = True 249 250 self._update_test_input(test_input) 251 test_timeout_sec = self._timeout(test_input) 252 start = time.time() 253 self._caller.post('started_test', test_input, test_timeout_sec) 254 255 result = self._run_test_with_timeout(test_input, test_timeout_sec, stop_when_done) 256 result.shard_name = shard_name 257 result.worker_name = self._name 258 result.total_run_time = time.time() - start 259 result.test_number = self._num_tests 260 self._num_tests += 1 261 262 self._caller.post('finished_test', result) 263 264 self._clean_up_after_test(test_input, result) 265 266 def stop(self): 267 _log.debug("%s cleaning up" % self._name) 268 self._kill_driver() 269 270 def _timeout(self, test_input): 271 """Compute the appropriate timeout value for a test.""" 272 # The driver watchdog uses 2.5x the timeout; we want to be 273 # larger than that. We also add a little more padding if we're 274 # running tests in a separate thread. 275 # 276 # Note that we need to convert the test timeout from a 277 # string value in milliseconds to a float for Python. 278 driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0 279 if not self._options.run_singly: 280 return driver_timeout_sec 281 282 thread_padding_sec = 1.0 283 thread_timeout_sec = driver_timeout_sec + thread_padding_sec 284 return thread_timeout_sec 285 286 def _kill_driver(self): 287 # Be careful about how and when we kill the driver; if driver.stop() 288 # raises an exception, this routine may get re-entered via __del__. 289 driver = self._driver 290 self._driver = None 291 if driver: 292 _log.debug("%s killing driver" % self._name) 293 driver.stop() 294 295 def _run_test_with_timeout(self, test_input, timeout, stop_when_done): 296 if self._options.run_singly: 297 return self._run_test_in_another_thread(test_input, timeout, stop_when_done) 298 return self._run_test_in_this_thread(test_input, stop_when_done) 299 300 def _clean_up_after_test(self, test_input, result): 301 test_name = test_input.test_name 302 303 if result.failures: 304 # Check and kill the driver if we need to. 305 if any([f.driver_needs_restart() for f in result.failures]): 306 self._kill_driver() 307 # Reset the batch count since the shell just bounced. 308 self._batch_count = 0 309 310 # Print the error message(s). 311 _log.debug("%s %s failed:" % (self._name, test_name)) 312 for f in result.failures: 313 _log.debug("%s %s" % (self._name, f.message())) 314 elif result.type == test_expectations.SKIP: 315 _log.debug("%s %s skipped" % (self._name, test_name)) 316 else: 317 _log.debug("%s %s passed" % (self._name, test_name)) 318 319 def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done): 320 """Run a test in a separate thread, enforcing a hard time limit. 321 322 Since we can only detect the termination of a thread, not any internal 323 state or progress, we can only run per-test timeouts when running test 324 files singly. 325 326 Args: 327 test_input: Object containing the test filename and timeout 328 thread_timeout_sec: time to wait before killing the driver process. 329 Returns: 330 A TestResult 331 """ 332 worker = self 333 334 driver = self._port.create_driver(self._worker_number) 335 336 class SingleTestThread(threading.Thread): 337 def __init__(self): 338 threading.Thread.__init__(self) 339 self.result = None 340 341 def run(self): 342 self.result = worker._run_single_test(driver, test_input, stop_when_done) 343 344 thread = SingleTestThread() 345 thread.start() 346 thread.join(thread_timeout_sec) 347 result = thread.result 348 failures = [] 349 if thread.isAlive(): 350 # If join() returned with the thread still running, the 351 # driver is completely hung and there's nothing 352 # more we can do with it. We have to kill all the 353 # drivers to free it up. If we're running more than 354 # one driver thread, we'll end up killing the other 355 # drivers too, introducing spurious crashes. We accept 356 # that tradeoff in order to avoid losing the rest of this 357 # thread's results. 358 _log.error('Test thread hung: killing all drivers') 359 failures = [test_failures.FailureTimeout()] 360 361 driver.stop() 362 363 if not result: 364 result = test_results.TestResult(test_input.test_name, failures=failures, test_run_time=0) 365 return result 366 367 def _run_test_in_this_thread(self, test_input, stop_when_done): 368 """Run a single test file using a shared driver process. 369 370 Args: 371 test_input: Object containing the test filename, uri and timeout 372 373 Returns: a TestResult object. 374 """ 375 if self._driver and self._driver.has_crashed(): 376 self._kill_driver() 377 if not self._driver: 378 self._driver = self._port.create_driver(self._worker_number) 379 return self._run_single_test(self._driver, test_input, stop_when_done) 380 381 def _run_single_test(self, driver, test_input, stop_when_done): 382 return single_test_runner.run_single_test(self._port, self._options, self._results_directory, 383 self._name, driver, test_input, stop_when_done) 384 385 386 class TestShard(object): 387 """A test shard is a named list of TestInputs.""" 388 389 def __init__(self, name, test_inputs): 390 self.name = name 391 self.test_inputs = test_inputs 392 self.requires_lock = test_inputs[0].requires_lock 393 394 def __repr__(self): 395 return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock) 396 397 def __eq__(self, other): 398 return self.name == other.name and self.test_inputs == other.test_inputs 399 400 401 class Sharder(object): 402 def __init__(self, test_split_fn, max_locked_shards): 403 self._split = test_split_fn 404 self._max_locked_shards = max_locked_shards 405 406 def shard_tests(self, test_inputs, num_workers, fully_parallel): 407 """Groups tests into batches. 408 This helps ensure that tests that depend on each other (aka bad tests!) 409 continue to run together as most cross-tests dependencies tend to 410 occur within the same directory. 411 Return: 412 Two list of TestShards. The first contains tests that must only be 413 run under the server lock, the second can be run whenever. 414 """ 415 416 # FIXME: Move all of the sharding logic out of manager into its 417 # own class or module. Consider grouping it with the chunking logic 418 # in prepare_lists as well. 419 if num_workers == 1: 420 return self._shard_in_two(test_inputs) 421 elif fully_parallel: 422 return self._shard_every_file(test_inputs) 423 return self._shard_by_directory(test_inputs, num_workers) 424 425 def _shard_in_two(self, test_inputs): 426 """Returns two lists of shards, one with all the tests requiring a lock and one with the rest. 427 428 This is used when there's only one worker, to minimize the per-shard overhead.""" 429 locked_inputs = [] 430 unlocked_inputs = [] 431 for test_input in test_inputs: 432 if test_input.requires_lock: 433 locked_inputs.append(test_input) 434 else: 435 unlocked_inputs.append(test_input) 436 437 locked_shards = [] 438 unlocked_shards = [] 439 if locked_inputs: 440 locked_shards = [TestShard('locked_tests', locked_inputs)] 441 if unlocked_inputs: 442 unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)] 443 444 return locked_shards, unlocked_shards 445 446 def _shard_every_file(self, test_inputs): 447 """Returns two lists of shards, each shard containing a single test file. 448 449 This mode gets maximal parallelism at the cost of much higher flakiness.""" 450 locked_shards = [] 451 unlocked_shards = [] 452 for test_input in test_inputs: 453 # Note that we use a '.' for the shard name; the name doesn't really 454 # matter, and the only other meaningful value would be the filename, 455 # which would be really redundant. 456 if test_input.requires_lock: 457 locked_shards.append(TestShard('.', [test_input])) 458 else: 459 unlocked_shards.append(TestShard('.', [test_input])) 460 461 return locked_shards, unlocked_shards 462 463 def _shard_by_directory(self, test_inputs, num_workers): 464 """Returns two lists of shards, each shard containing all the files in a directory. 465 466 This is the default mode, and gets as much parallelism as we can while 467 minimizing flakiness caused by inter-test dependencies.""" 468 locked_shards = [] 469 unlocked_shards = [] 470 unlocked_slow_shards = [] 471 tests_by_dir = {} 472 # FIXME: Given that the tests are already sorted by directory, 473 # we can probably rewrite this to be clearer and faster. 474 for test_input in test_inputs: 475 directory = self._split(test_input.test_name)[0] 476 tests_by_dir.setdefault(directory, []) 477 tests_by_dir[directory].append(test_input) 478 479 for directory, test_inputs in tests_by_dir.iteritems(): 480 shard = TestShard(directory, test_inputs) 481 if test_inputs[0].requires_lock: 482 locked_shards.append(shard) 483 # In practice, virtual test suites are slow to run. It's a bit hacky, but 484 # put them first since they're the long-tail of test runtime. 485 elif directory.startswith('virtual'): 486 unlocked_slow_shards.append(shard) 487 else: 488 unlocked_shards.append(shard) 489 490 # Sort the shards by directory name. 491 locked_shards.sort(key=lambda shard: shard.name) 492 unlocked_slow_shards.sort(key=lambda shard: shard.name) 493 unlocked_shards.sort(key=lambda shard: shard.name) 494 495 # Put a ceiling on the number of locked shards, so that we 496 # don't hammer the servers too badly. 497 498 # FIXME: For now, limit to one shard or set it 499 # with the --max-locked-shards. After testing to make sure we 500 # can handle multiple shards, we should probably do something like 501 # limit this to no more than a quarter of all workers, e.g.: 502 # return max(math.ceil(num_workers / 4.0), 1) 503 return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'), 504 unlocked_slow_shards + unlocked_shards) 505 506 def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix): 507 """Takes a list of shards and redistributes the tests into no more 508 than |max_new_shards| new shards.""" 509 510 # This implementation assumes that each input shard only contains tests from a 511 # single directory, and that tests in each shard must remain together; as a 512 # result, a given input shard is never split between output shards. 513 # 514 # Each output shard contains the tests from one or more input shards and 515 # hence may contain tests from multiple directories. 516 517 def divide_and_round_up(numerator, divisor): 518 return int(math.ceil(float(numerator) / divisor)) 519 520 def extract_and_flatten(shards): 521 test_inputs = [] 522 for shard in shards: 523 test_inputs.extend(shard.test_inputs) 524 return test_inputs 525 526 def split_at(seq, index): 527 return (seq[:index], seq[index:]) 528 529 num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) 530 new_shards = [] 531 remaining_shards = old_shards 532 while remaining_shards: 533 some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new) 534 new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards))) 535 return new_shards 536