Home | History | Annotate | Download | only in layout_package
      1 #!/usr/bin/env python
      2 # Copyright (C) 2011 Google Inc. All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #     * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #     * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #     * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 
     30 """
     31 The TestRunner2 package is an alternate implementation of the TestRunner
     32 class that uses the manager_worker_broker module to send sets of tests to
     33 workers and receive their completion messages accordingly.
     34 """
     35 
     36 import logging
     37 import time
     38 
     39 from webkitpy.tool import grammar
     40 
     41 from webkitpy.layout_tests.layout_package import manager_worker_broker
     42 from webkitpy.layout_tests.layout_package import test_runner
     43 from webkitpy.layout_tests.layout_package import worker
     44 
     45 
     46 _log = logging.getLogger(__name__)
     47 
     48 
     49 class _WorkerState(object):
     50     """A class for the TestRunner/manager to use to track the current state
     51     of the workers."""
     52     def __init__(self, number, worker_connection):
     53         self.worker_connection = worker_connection
     54         self.number = number
     55         self.done = False
     56         self.current_test_name = None
     57         self.next_timeout = None
     58         self.wedged = False
     59         self.stats = {}
     60         self.stats['name'] = worker_connection.name
     61         self.stats['num_tests'] = 0
     62         self.stats['total_time'] = 0
     63 
     64     def __repr__(self):
     65         return "_WorkerState(" + str(self.__dict__) + ")"
     66 
     67 
     68 class TestRunner2(test_runner.TestRunner):
     69     def __init__(self, port, options, printer):
     70         test_runner.TestRunner.__init__(self, port, options, printer)
     71         self._all_results = []
     72         self._group_stats = {}
     73         self._current_result_summary = None
     74 
     75         # This maps worker names to the state we are tracking for each of them.
     76         self._worker_states = {}
     77 
     78     def is_done(self):
     79         worker_states = self._worker_states.values()
     80         return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states)
     81 
     82     def _worker_is_done(self, worker_state):
     83         t = time.time()
     84         if worker_state.done or worker_state.wedged:
     85             return True
     86 
     87         next_timeout = worker_state.next_timeout
     88         WEDGE_PADDING = 40.0
     89         if next_timeout and t > next_timeout + WEDGE_PADDING:
     90             _log.error('')
     91             worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name)
     92             _log.error('')
     93             worker_state.wedged = True
     94             return True
     95         return False
     96 
     97     def name(self):
     98         return 'TestRunner2'
     99 
    100     def _run_tests(self, file_list, result_summary):
    101         """Runs the tests in the file_list.
    102 
    103         Return: A tuple (interrupted, keyboard_interrupted, thread_timings,
    104             test_timings, individual_test_timings)
    105             interrupted is whether the run was interrupted
    106             keyboard_interrupted is whether someone typed Ctrl^C
    107             thread_timings is a list of dicts with the total runtime
    108               of each thread with 'name', 'num_tests', 'total_time' properties
    109             test_timings is a list of timings for each sharded subdirectory
    110               of the form [time, directory_name, num_tests]
    111             individual_test_timings is a list of run times for each test
    112               in the form {filename:filename, test_run_time:test_run_time}
    113             result_summary: summary object to populate with the results
    114         """
    115         self._current_result_summary = result_summary
    116         self._all_results = []
    117         self._group_stats = {}
    118         self._worker_states = {}
    119 
    120         keyboard_interrupted = False
    121         interrupted = False
    122         thread_timings = []
    123 
    124         self._printer.print_update('Sharding tests ...')
    125         test_lists = self._shard_tests(file_list,
    126             (int(self._options.child_processes) > 1) and not self._options.experimental_fully_parallel)
    127 
    128         num_workers = self._num_workers(len(test_lists))
    129 
    130         manager_connection = manager_worker_broker.get(self._port, self._options,
    131                                                        self, worker.Worker)
    132 
    133         if self._options.dry_run:
    134             return (keyboard_interrupted, interrupted, thread_timings,
    135                     self._group_stats, self._all_results)
    136 
    137         self._printer.print_update('Starting %s ...' %
    138                                    grammar.pluralize('worker', num_workers))
    139         for worker_number in xrange(num_workers):
    140             worker_connection = manager_connection.start_worker(worker_number)
    141             worker_state = _WorkerState(worker_number, worker_connection)
    142             self._worker_states[worker_connection.name] = worker_state
    143 
    144             # FIXME: If we start workers up too quickly, DumpRenderTree appears
    145             # to thrash on something and time out its first few tests. Until
    146             # we can figure out what's going on, sleep a bit in between
    147             # workers.
    148             time.sleep(0.1)
    149 
    150         self._printer.print_update("Starting testing ...")
    151         for test_list in test_lists:
    152             manager_connection.post_message('test_list', test_list[0], test_list[1])
    153 
    154         # We post one 'stop' message for each worker. Because the stop message
    155         # are sent after all of the tests, and because each worker will stop
    156         # reading messsages after receiving a stop, we can be sure each
    157         # worker will get a stop message and hence they will all shut down.
    158         for i in xrange(num_workers):
    159             manager_connection.post_message('stop')
    160 
    161         try:
    162             while not self.is_done():
    163                 # We loop with a timeout in order to be able to detect wedged threads.
    164                 manager_connection.run_message_loop(delay_secs=1.0)
    165 
    166             if any(worker_state.wedged for worker_state in self._worker_states.values()):
    167                 _log.error('')
    168                 _log.error('Remaining workers are wedged, bailing out.')
    169                 _log.error('')
    170             else:
    171                 _log.debug('No wedged threads')
    172 
    173             # Make sure all of the workers have shut down (if possible).
    174             for worker_state in self._worker_states.values():
    175                 if not worker_state.wedged and worker_state.worker_connection.is_alive():
    176                     worker_state.worker_connection.join(0.5)
    177                     assert not worker_state.worker_connection.is_alive()
    178 
    179         except KeyboardInterrupt:
    180             _log.info("Interrupted, exiting")
    181             self.cancel_workers()
    182             keyboard_interrupted = True
    183         except test_runner.TestRunInterruptedException, e:
    184             _log.info(e.reason)
    185             self.cancel_workers()
    186             interrupted = True
    187         except:
    188             # Unexpected exception; don't try to clean up workers.
    189             _log.info("Exception raised, exiting")
    190             raise
    191 
    192         thread_timings = [worker_state.stats for worker_state in self._worker_states.values()]
    193 
    194         # FIXME: should this be a class instead of a tuple?
    195         return (interrupted, keyboard_interrupted, thread_timings,
    196                 self._group_stats, self._all_results)
    197 
    198     def cancel_workers(self):
    199         for worker_state in self._worker_states.values():
    200             worker_state.worker_connection.cancel()
    201 
    202     def handle_started_test(self, source, test_info, hang_timeout):
    203         worker_state = self._worker_states[source]
    204         worker_state.current_test_name = self._port.relative_test_filename(test_info.filename)
    205         worker_state.next_timeout = time.time() + hang_timeout
    206 
    207     def handle_done(self, source):
    208         worker_state = self._worker_states[source]
    209         worker_state.done = True
    210 
    211     def handle_exception(self, source, exception_info):
    212         exception_type, exception_value, exception_traceback = exception_info
    213         raise exception_type, exception_value, exception_traceback
    214 
    215     def handle_finished_list(self, source, list_name, num_tests, elapsed_time):
    216         self._group_stats[list_name] = (num_tests, elapsed_time)
    217 
    218     def handle_finished_test(self, source, result, elapsed_time):
    219         worker_state = self._worker_states[source]
    220         worker_state.next_timeout = None
    221         worker_state.current_test_name = None
    222         worker_state.stats['total_time'] += elapsed_time
    223         worker_state.stats['num_tests'] += 1
    224 
    225         if worker_state.wedged:
    226             # This shouldn't happen if we have our timeouts tuned properly.
    227             _log.error("%s unwedged", source)
    228 
    229         self._all_results.append(result)
    230         self._update_summary_with_result(self._current_result_summary, result)
    231