Home | History | Annotate | Download | only in layout_package
      1 # Copyright (C) 2011 Google Inc. All rights reserved.
      2 #
      3 # Redistribution and use in source and binary forms, with or without
      4 # modification, are permitted provided that the following conditions are
      5 # met:
      6 #
      7 #     * Redistributions of source code must retain the above copyright
      8 # notice, this list of conditions and the following disclaimer.
      9 #     * Redistributions in binary form must reproduce the above
     10 # copyright notice, this list of conditions and the following disclaimer
     11 # in the documentation and/or other materials provided with the
     12 # distribution.
     13 #     * Neither the name of Google Inc. nor the names of its
     14 # contributors may be used to endorse or promote products derived from
     15 # this software without specific prior written permission.
     16 #
     17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28 
     29 """Module for handling messages and concurrency for run-webkit-tests.
     30 
     31 This module implements a message broker that connects the manager
     32 (TestRunner2) to the workers: it provides a messaging abstraction and
     33 message loops (building on top of message_broker2), and handles starting
     34 workers by launching threads and/or processes depending on the
     35 requested configuration.
     36 
     37 There are a lot of classes and objects involved in a fully connected system.
     38 They interact more or less like:
     39 
     40 TestRunner2  --> _InlineManager ---> _InlineWorker <-> Worker
     41      ^                    \               /              ^
     42      |                     v             v               |
     43      \--------------------  MessageBroker   -------------/
     44 """
     45 
     46 import logging
     47 import optparse
     48 import printing
     49 import Queue
     50 import sys
     51 import thread
     52 import threading
     53 import time
     54 
     55 
     56 # Handle Python < 2.6 where multiprocessing isn't available.
     57 try:
     58     import multiprocessing
     59 except ImportError:
     60     multiprocessing = None
     61 
     62 
     63 from webkitpy.common.system import stack_utils
     64 from webkitpy.layout_tests import port
     65 from webkitpy.layout_tests.layout_package import message_broker2
     66 
     67 
     68 _log = logging.getLogger(__name__)
     69 
     70 #
     71 # Topic names for Manager <-> Worker messaging
     72 #
     73 MANAGER_TOPIC = 'managers'
     74 ANY_WORKER_TOPIC = 'workers'
     75 
     76 
     77 def runtime_options():
     78     """Return a list of optparse.Option objects for any runtime values used
     79     by this module."""
     80     options = [
     81         optparse.make_option("--worker-model", action="store",
     82             help=("controls worker model. Valid values are "
     83             "'inline', 'threads', and 'processes'.")),
     84     ]
     85     return options
     86 
     87 
     88 def get(port, options, client, worker_class):
     89     """Return a connection to a manager/worker message_broker
     90 
     91     Args:
     92         port - handle to layout_tests/port object for port-specific stuff
     93         options - optparse argument for command-line options
     94         client - message_broker2.BrokerClient implementation to dispatch
     95             replies to.
     96         worker_class - type of workers to create. This class must implement
     97             the methods in AbstractWorker.
     98     Returns:
     99         A handle to an object that will talk to a message broker configured
    100         for the normal manager/worker communication.
    101     """
    102     worker_model = options.worker_model
    103     if worker_model == 'inline':
    104         queue_class = Queue.Queue
    105         manager_class = _InlineManager
    106     elif worker_model == 'threads':
    107         queue_class = Queue.Queue
    108         manager_class = _ThreadedManager
    109     elif worker_model == 'processes' and multiprocessing:
    110         queue_class = multiprocessing.Queue
    111         manager_class = _MultiProcessManager
    112     else:
    113         raise ValueError("unsupported value for --worker-model: %s" %
    114                          worker_model)
    115 
    116     broker = message_broker2.Broker(options, queue_class)
    117     return manager_class(broker, port, options, client, worker_class)
    118 
    119 
    120 class AbstractWorker(message_broker2.BrokerClient):
    121     def __init__(self, broker_connection, worker_number, options):
    122         """The constructor should be used to do any simple initialization
    123         necessary, but should not do anything that creates data structures
    124         that cannot be Pickled or sent across processes (like opening
    125         files or sockets). Complex initialization should be done at the
    126         start of the run() call.
    127 
    128         Args:
    129             broker_connection - handle to the BrokerConnection object creating
    130                 the worker and that can be used for messaging.
    131             worker_number - identifier for this particular worker
    132             options - command-line argument object from optparse"""
    133 
    134         raise NotImplementedError
    135 
    136     def run(self, port):
    137         """Callback for the worker to start executing. Typically does any
    138         remaining initialization and then calls broker_connection.run_message_loop()."""
    139         raise NotImplementedError
    140 
    141     def cancel(self):
    142         """Called when possible to indicate to the worker to stop processing
    143         messages and shut down. Note that workers may be stopped without this
    144         method being called, so clients should not rely solely on this."""
    145         raise NotImplementedError
    146 
    147 
    148 class _ManagerConnection(message_broker2.BrokerConnection):
    149     def __init__(self, broker, options, client, worker_class):
    150         """Base initialization for all Manager objects.
    151 
    152         Args:
    153             broker: handle to the message_broker2 object
    154             options: command line options object
    155             client: callback object (the caller)
    156             worker_class: class object to use to create workers.
    157         """
    158         message_broker2.BrokerConnection.__init__(self, broker, client,
    159             MANAGER_TOPIC, ANY_WORKER_TOPIC)
    160         self._options = options
    161         self._worker_class = worker_class
    162 
    163     def start_worker(self, worker_number):
    164         raise NotImplementedError
    165 
    166 
    167 class _InlineManager(_ManagerConnection):
    168     def __init__(self, broker, port, options, client, worker_class):
    169         _ManagerConnection.__init__(self, broker, options, client, worker_class)
    170         self._port = port
    171         self._inline_worker = None
    172 
    173     def start_worker(self, worker_number):
    174         self._inline_worker = _InlineWorkerConnection(self._broker, self._port,
    175             self._client, self._worker_class, worker_number)
    176         return self._inline_worker
    177 
    178     def run_message_loop(self, delay_secs=None):
    179         # Note that delay_secs is ignored in this case since we can't easily
    180         # implement it.
    181         self._inline_worker.run()
    182         self._broker.run_all_pending(MANAGER_TOPIC, self._client)
    183 
    184 
    185 class _ThreadedManager(_ManagerConnection):
    186     def __init__(self, broker, port, options, client, worker_class):
    187         _ManagerConnection.__init__(self, broker, options, client, worker_class)
    188         self._port = port
    189 
    190     def start_worker(self, worker_number):
    191         worker_connection = _ThreadedWorkerConnection(self._broker, self._port,
    192             self._worker_class, worker_number)
    193         worker_connection.start()
    194         return worker_connection
    195 
    196 
    197 class _MultiProcessManager(_ManagerConnection):
    198     def __init__(self, broker, port, options, client, worker_class):
    199         # Note that this class does not keep a handle to the actual port
    200         # object, because it isn't Picklable. Instead it keeps the port
    201         # name and recreates the port in the child process from the name
    202         # and options.
    203         _ManagerConnection.__init__(self, broker, options, client, worker_class)
    204         self._platform_name = port.real_name()
    205 
    206     def start_worker(self, worker_number):
    207         worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name,
    208             self._worker_class, worker_number, self._options)
    209         worker_connection.start()
    210         return worker_connection
    211 
    212 
    213 class _WorkerConnection(message_broker2.BrokerConnection):
    214     def __init__(self, broker, worker_class, worker_number, options):
    215         self._client = worker_class(self, worker_number, options)
    216         self.name = self._client.name()
    217         message_broker2.BrokerConnection.__init__(self, broker, self._client,
    218                                                   ANY_WORKER_TOPIC, MANAGER_TOPIC)
    219 
    220     def cancel(self):
    221         raise NotImplementedError
    222 
    223     def is_alive(self):
    224         raise NotImplementedError
    225 
    226     def join(self, timeout):
    227         raise NotImplementedError
    228 
    229     def log_wedged_worker(self, test_name):
    230         raise NotImplementedError
    231 
    232     def yield_to_broker(self):
    233         pass
    234 
    235 
    236 class _InlineWorkerConnection(_WorkerConnection):
    237     def __init__(self, broker, port, manager_client, worker_class, worker_number):
    238         _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
    239         self._alive = False
    240         self._port = port
    241         self._manager_client = manager_client
    242 
    243     def cancel(self):
    244         self._client.cancel()
    245 
    246     def is_alive(self):
    247         return self._alive
    248 
    249     def join(self, timeout):
    250         assert not self._alive
    251 
    252     def log_wedged_worker(self, test_name):
    253         assert False, "_InlineWorkerConnection.log_wedged_worker() called"
    254 
    255     def run(self):
    256         self._alive = True
    257         self._client.run(self._port)
    258         self._alive = False
    259 
    260     def yield_to_broker(self):
    261         self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
    262 
    263 
    264 class _Thread(threading.Thread):
    265     def __init__(self, worker_connection, port, client):
    266         threading.Thread.__init__(self)
    267         self._worker_connection = worker_connection
    268         self._port = port
    269         self._client = client
    270 
    271     def cancel(self):
    272         return self._client.cancel()
    273 
    274     def log_wedged_worker(self, test_name):
    275         stack_utils.log_thread_state(_log.error, self._client.name(), self.ident, " is wedged on test %s" % test_name)
    276 
    277     def run(self):
    278         # FIXME: We can remove this once everyone is on 2.6.
    279         if not hasattr(self, 'ident'):
    280             self.ident = thread.get_ident()
    281         self._client.run(self._port)
    282 
    283 
    284 class _ThreadedWorkerConnection(_WorkerConnection):
    285     def __init__(self, broker, port, worker_class, worker_number):
    286         _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
    287         self._thread = _Thread(self, port, self._client)
    288 
    289     def cancel(self):
    290         return self._thread.cancel()
    291 
    292     def is_alive(self):
    293         # FIXME: Change this to is_alive once everyone is on 2.6.
    294         return self._thread.isAlive()
    295 
    296     def join(self, timeout):
    297         return self._thread.join(timeout)
    298 
    299     def log_wedged_worker(self, test_name):
    300         return self._thread.log_wedged_worker(test_name)
    301 
    302     def start(self):
    303         self._thread.start()
    304 
    305 
    306 if multiprocessing:
    307 
    308     class _Process(multiprocessing.Process):
    309         def __init__(self, worker_connection, platform_name, options, client):
    310             multiprocessing.Process.__init__(self)
    311             self._worker_connection = worker_connection
    312             self._platform_name = platform_name
    313             self._options = options
    314             self._client = client
    315 
    316         def log_wedged_worker(self, test_name):
    317             _log.error("%s (pid %d) is wedged on test %s" % (self.name, self.pid, test_name))
    318 
    319         def run(self):
    320             options = self._options
    321             port_obj = port.get(self._platform_name, options)
    322             # FIXME: this won't work if the calling process is logging
    323             # somewhere other than sys.stderr and sys.stdout, but I'm not sure
    324             # if this will be an issue in practice.
    325             printer = printing.Printer(port_obj, options, sys.stderr, sys.stdout,
    326                 int(options.child_processes), options.experimental_fully_parallel)
    327             self._client.run(port_obj)
    328             printer.cleanup()
    329 
    330 
    331 class _MultiProcessWorkerConnection(_WorkerConnection):
    332     def __init__(self, broker, platform_name, worker_class, worker_number, options):
    333         _WorkerConnection.__init__(self, broker, worker_class, worker_number, options)
    334         self._proc = _Process(self, platform_name, options, self._client)
    335 
    336     def cancel(self):
    337         return self._proc.terminate()
    338 
    339     def is_alive(self):
    340         return self._proc.is_alive()
    341 
    342     def join(self, timeout):
    343         return self._proc.join(timeout)
    344 
    345     def log_wedged_worker(self, test_name):
    346         return self._proc.log_wedged_worker(test_name)
    347 
    348     def start(self):
    349         self._proc.start()
    350