1 #!/usr/bin/env python 2 # Copyright (C) 2011 Google Inc. All rights reserved. 3 # 4 # Redistribution and use in source and binary forms, with or without 5 # modification, are permitted provided that the following conditions are 6 # met: 7 # 8 # * Redistributions of source code must retain the above copyright 9 # notice, this list of conditions and the following disclaimer. 10 # * Redistributions in binary form must reproduce the above 11 # copyright notice, this list of conditions and the following disclaimer 12 # in the documentation and/or other materials provided with the 13 # distribution. 14 # * Neither the name of Google Inc. nor the names of its 15 # contributors may be used to endorse or promote products derived from 16 # this software without specific prior written permission. 17 # 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 """ 31 The TestRunner2 package is an alternate implementation of the TestRunner 32 class that uses the manager_worker_broker module to send sets of tests to 33 workers and receive their completion messages accordingly. 34 """ 35 36 import logging 37 import time 38 39 from webkitpy.tool import grammar 40 41 from webkitpy.layout_tests.layout_package import manager_worker_broker 42 from webkitpy.layout_tests.layout_package import test_runner 43 from webkitpy.layout_tests.layout_package import worker 44 45 46 _log = logging.getLogger(__name__) 47 48 49 class _WorkerState(object): 50 """A class for the TestRunner/manager to use to track the current state 51 of the workers.""" 52 def __init__(self, number, worker_connection): 53 self.worker_connection = worker_connection 54 self.number = number 55 self.done = False 56 self.current_test_name = None 57 self.next_timeout = None 58 self.wedged = False 59 self.stats = {} 60 self.stats['name'] = worker_connection.name 61 self.stats['num_tests'] = 0 62 self.stats['total_time'] = 0 63 64 def __repr__(self): 65 return "_WorkerState(" + str(self.__dict__) + ")" 66 67 68 class TestRunner2(test_runner.TestRunner): 69 def __init__(self, port, options, printer): 70 test_runner.TestRunner.__init__(self, port, options, printer) 71 self._all_results = [] 72 self._group_stats = {} 73 self._current_result_summary = None 74 75 # This maps worker names to the state we are tracking for each of them. 76 self._worker_states = {} 77 78 def is_done(self): 79 worker_states = self._worker_states.values() 80 return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states) 81 82 def _worker_is_done(self, worker_state): 83 t = time.time() 84 if worker_state.done or worker_state.wedged: 85 return True 86 87 next_timeout = worker_state.next_timeout 88 WEDGE_PADDING = 40.0 89 if next_timeout and t > next_timeout + WEDGE_PADDING: 90 _log.error('') 91 worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name) 92 _log.error('') 93 worker_state.wedged = True 94 return True 95 return False 96 97 def name(self): 98 return 'TestRunner2' 99 100 def _run_tests(self, file_list, result_summary): 101 """Runs the tests in the file_list. 102 103 Return: A tuple (interrupted, keyboard_interrupted, thread_timings, 104 test_timings, individual_test_timings) 105 interrupted is whether the run was interrupted 106 keyboard_interrupted is whether someone typed Ctrl^C 107 thread_timings is a list of dicts with the total runtime 108 of each thread with 'name', 'num_tests', 'total_time' properties 109 test_timings is a list of timings for each sharded subdirectory 110 of the form [time, directory_name, num_tests] 111 individual_test_timings is a list of run times for each test 112 in the form {filename:filename, test_run_time:test_run_time} 113 result_summary: summary object to populate with the results 114 """ 115 self._current_result_summary = result_summary 116 self._all_results = [] 117 self._group_stats = {} 118 self._worker_states = {} 119 120 keyboard_interrupted = False 121 interrupted = False 122 thread_timings = [] 123 124 self._printer.print_update('Sharding tests ...') 125 test_lists = self._shard_tests(file_list, 126 (int(self._options.child_processes) > 1) and not self._options.experimental_fully_parallel) 127 128 num_workers = self._num_workers(len(test_lists)) 129 130 manager_connection = manager_worker_broker.get(self._port, self._options, 131 self, worker.Worker) 132 133 if self._options.dry_run: 134 return (keyboard_interrupted, interrupted, thread_timings, 135 self._group_stats, self._all_results) 136 137 self._printer.print_update('Starting %s ...' % 138 grammar.pluralize('worker', num_workers)) 139 for worker_number in xrange(num_workers): 140 worker_connection = manager_connection.start_worker(worker_number) 141 worker_state = _WorkerState(worker_number, worker_connection) 142 self._worker_states[worker_connection.name] = worker_state 143 144 # FIXME: If we start workers up too quickly, DumpRenderTree appears 145 # to thrash on something and time out its first few tests. Until 146 # we can figure out what's going on, sleep a bit in between 147 # workers. 148 time.sleep(0.1) 149 150 self._printer.print_update("Starting testing ...") 151 for test_list in test_lists: 152 manager_connection.post_message('test_list', test_list[0], test_list[1]) 153 154 # We post one 'stop' message for each worker. Because the stop message 155 # are sent after all of the tests, and because each worker will stop 156 # reading messsages after receiving a stop, we can be sure each 157 # worker will get a stop message and hence they will all shut down. 158 for i in xrange(num_workers): 159 manager_connection.post_message('stop') 160 161 try: 162 while not self.is_done(): 163 # We loop with a timeout in order to be able to detect wedged threads. 164 manager_connection.run_message_loop(delay_secs=1.0) 165 166 if any(worker_state.wedged for worker_state in self._worker_states.values()): 167 _log.error('') 168 _log.error('Remaining workers are wedged, bailing out.') 169 _log.error('') 170 else: 171 _log.debug('No wedged threads') 172 173 # Make sure all of the workers have shut down (if possible). 174 for worker_state in self._worker_states.values(): 175 if not worker_state.wedged and worker_state.worker_connection.is_alive(): 176 worker_state.worker_connection.join(0.5) 177 assert not worker_state.worker_connection.is_alive() 178 179 except KeyboardInterrupt: 180 _log.info("Interrupted, exiting") 181 self.cancel_workers() 182 keyboard_interrupted = True 183 except test_runner.TestRunInterruptedException, e: 184 _log.info(e.reason) 185 self.cancel_workers() 186 interrupted = True 187 except: 188 # Unexpected exception; don't try to clean up workers. 189 _log.info("Exception raised, exiting") 190 raise 191 192 thread_timings = [worker_state.stats for worker_state in self._worker_states.values()] 193 194 # FIXME: should this be a class instead of a tuple? 195 return (interrupted, keyboard_interrupted, thread_timings, 196 self._group_stats, self._all_results) 197 198 def cancel_workers(self): 199 for worker_state in self._worker_states.values(): 200 worker_state.worker_connection.cancel() 201 202 def handle_started_test(self, source, test_info, hang_timeout): 203 worker_state = self._worker_states[source] 204 worker_state.current_test_name = self._port.relative_test_filename(test_info.filename) 205 worker_state.next_timeout = time.time() + hang_timeout 206 207 def handle_done(self, source): 208 worker_state = self._worker_states[source] 209 worker_state.done = True 210 211 def handle_exception(self, source, exception_info): 212 exception_type, exception_value, exception_traceback = exception_info 213 raise exception_type, exception_value, exception_traceback 214 215 def handle_finished_list(self, source, list_name, num_tests, elapsed_time): 216 self._group_stats[list_name] = (num_tests, elapsed_time) 217 218 def handle_finished_test(self, source, result, elapsed_time): 219 worker_state = self._worker_states[source] 220 worker_state.next_timeout = None 221 worker_state.current_test_name = None 222 worker_state.stats['total_time'] += elapsed_time 223 worker_state.stats['num_tests'] += 1 224 225 if worker_state.wedged: 226 # This shouldn't happen if we have our timeouts tuned properly. 227 _log.error("%s unwedged", source) 228 229 self._all_results.append(result) 230 self._update_summary_with_result(self._current_result_summary, result) 231