Home | History | Annotate | Download | only in layout_package
      1 # Copyright (C) 2011 Google Inc. All rights reserved.
      2 #
      3 # Redistribution and use in source and binary forms, with or without
      4 # modification, are permitted provided that the following conditions are
      5 # met:
      6 #
      7 #     * Redistributions of source code must retain the above copyright
      8 # notice, this list of conditions and the following disclaimer.
      9 #     * Redistributions in binary form must reproduce the above
     10 # copyright notice, this list of conditions and the following disclaimer
     11 # in the documentation and/or other materials provided with the
     12 # distribution.
     13 #     * Neither the name of Google Inc. nor the names of its
     14 # contributors may be used to endorse or promote products derived from
     15 # this software without specific prior written permission.
     16 #
     17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28 
     29 """Module for handling messaging for run-webkit-tests.
     30 
     31 This module implements a simple message broker abstraction that will be
     32 used to coordinate messages between the main run-webkit-tests thread
     33 (aka TestRunner) and the individual worker threads (previously known as
     34 dump_render_tree_threads).
     35 
     36 The broker simply distributes messages onto topics (named queues); the actual
     37 queues themselves are provided by the caller, as the queue's implementation
     38 requirements varies vary depending on the desired concurrency model
     39 (none/threads/processes).
     40 
     41 In order for shared-nothing messaging between processing to be possible,
     42 Messages must be picklable.
     43 
     44 The module defines one interface and two classes. Callers of this package
     45 must implement the BrokerClient interface, and most callers will create
     46 BrokerConnections as well as Brokers.
     47 
     48 The classes relate to each other as:
     49 
     50     BrokerClient   ------>    BrokerConnection
     51          ^                         |
     52          |                         v
     53          \----------------      Broker
     54 
     55 (The BrokerClient never calls broker directly after it is created, only
     56 BrokerConnection.  BrokerConnection passes a reference to BrokerClient to
     57 Broker, and Broker only invokes that reference, never talking directly to
     58 BrokerConnection).
     59 """
     60 
     61 import cPickle
     62 import logging
     63 import Queue
     64 import time
     65 
     66 
     67 _log = logging.getLogger(__name__)
     68 
     69 
     70 class BrokerClient(object):
     71     """Abstract base class / interface that all message broker clients must
     72     implement. In addition to the methods below, by convention clients
     73     implement routines of the signature type
     74 
     75         handle_MESSAGE_NAME(self, src, ...):
     76 
     77     where MESSAGE_NAME matches the string passed to post_message(), and
     78     src indicates the name of the sender. If the message contains values in
     79     the message body, those will be provided as optparams."""
     80 
     81     def __init__(self, *optargs, **kwargs):
     82         raise NotImplementedError
     83 
     84     def is_done(self):
     85         """Called from inside run_message_loop() to indicate whether to exit."""
     86         raise NotImplementedError
     87 
     88     def name(self):
     89         """Return a name that identifies the client."""
     90         raise NotImplementedError
     91 
     92 
     93 class Broker(object):
     94     """Brokers provide the basic model of a set of topics. Clients can post a
     95     message to any topic using post_message(), and can process messages on one
     96     topic at a time using run_message_loop()."""
     97 
     98     def __init__(self, options, queue_maker):
     99         """Args:
    100             options: a runtime option class from optparse
    101             queue_maker: a factory method that returns objects implementing a
    102                 Queue interface (put()/get()).
    103         """
    104         self._options = options
    105         self._queue_maker = queue_maker
    106         self._topics = {}
    107 
    108     def add_topic(self, topic_name):
    109         if topic_name not in self._topics:
    110             self._topics[topic_name] = self._queue_maker()
    111 
    112     def _get_queue_for_topic(self, topic_name):
    113         return self._topics[topic_name]
    114 
    115     def post_message(self, client, topic_name, message_name, *message_args):
    116         """Post a message to the appropriate topic name.
    117 
    118         Messages have a name and a tuple of optional arguments. Both must be picklable."""
    119         message = _Message(client.name(), topic_name, message_name, message_args)
    120         queue = self._get_queue_for_topic(topic_name)
    121         queue.put(_Message.dumps(message))
    122 
    123     def run_message_loop(self, topic_name, client, delay_secs=None):
    124         """Loop processing messages until client.is_done() or delay passes.
    125 
    126         To run indefinitely, set delay_secs to None."""
    127         assert delay_secs is None or delay_secs > 0
    128         self._run_loop(topic_name, client, block=True, delay_secs=delay_secs)
    129 
    130     def run_all_pending(self, topic_name, client):
    131         """Process messages until client.is_done() or caller would block."""
    132         self._run_loop(topic_name, client, block=False, delay_secs=None)
    133 
    134     def _run_loop(self, topic_name, client, block, delay_secs):
    135         queue = self._get_queue_for_topic(topic_name)
    136         while not client.is_done():
    137             try:
    138                 s = queue.get(block, delay_secs)
    139             except Queue.Empty:
    140                 return
    141             msg = _Message.loads(s)
    142             self._dispatch_message(msg, client)
    143 
    144     def _dispatch_message(self, message, client):
    145         if not hasattr(client, 'handle_' + message.name):
    146             raise ValueError(
    147                "%s: received message '%s' it couldn't handle" %
    148                (client.name(), message.name))
    149         optargs = message.args
    150         message_handler = getattr(client, 'handle_' + message.name)
    151         message_handler(message.src, *optargs)
    152 
    153 
    154 class _Message(object):
    155     @staticmethod
    156     def loads(str):
    157         obj = cPickle.loads(str)
    158         assert(isinstance(obj, _Message))
    159         return obj
    160 
    161     def __init__(self, src, topic_name, message_name, message_args):
    162         self.src = src
    163         self.topic_name = topic_name
    164         self.name = message_name
    165         self.args = message_args
    166 
    167     def dumps(self):
    168         return cPickle.dumps(self)
    169 
    170     def __repr__(self):
    171         return ("_Message(from='%s', topic_name='%s', message_name='%s')" %
    172                 (self.src, self.topic_name, self.name))
    173 
    174 
    175 class BrokerConnection(object):
    176     """BrokerConnection provides a connection-oriented facade on top of a
    177     Broker, so that callers don't have to repeatedly pass the same topic
    178     names over and over."""
    179 
    180     def __init__(self, broker, client, run_topic, post_topic):
    181         """Create a BrokerConnection on top of a Broker. Note that the Broker
    182         is passed in rather than created so that a single Broker can be used
    183         by multiple BrokerConnections."""
    184         self._broker = broker
    185         self._client = client
    186         self._post_topic = post_topic
    187         self._run_topic = run_topic
    188         broker.add_topic(run_topic)
    189         broker.add_topic(post_topic)
    190 
    191     def run_message_loop(self, delay_secs=None):
    192         self._broker.run_message_loop(self._run_topic, self._client, delay_secs)
    193 
    194     def post_message(self, message_name, *message_args):
    195         self._broker.post_message(self._client, self._post_topic,
    196                                   message_name, *message_args)
    197