Home | History | Annotate | Download | only in common
      1 # Copyright (C) 2011 Google Inc. All rights reserved.
      2 #
      3 # Redistribution and use in source and binary forms, with or without
      4 # modification, are permitted provided that the following conditions are
      5 # met:
      6 #
      7 #     * Redistributions of source code must retain the above copyright
      8 # notice, this list of conditions and the following disclaimer.
      9 #     * Redistributions in binary form must reproduce the above
     10 # copyright notice, this list of conditions and the following disclaimer
     11 # in the documentation and/or other materials provided with the
     12 # distribution.
     13 #     * Neither the name of Google Inc. nor the names of its
     14 # contributors may be used to endorse or promote products derived from
     15 # this software without specific prior written permission.
     16 #
     17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28 
     29 """Module for handling messages and concurrency for run-webkit-tests
     30 and test-webkitpy. This module follows the design for multiprocessing.Pool
     31 and concurrency.futures.ProcessPoolExecutor, with the following differences:
     32 
     33 * Tasks are executed in stateful subprocesses via objects that implement the
     34   Worker interface - this allows the workers to share state across tasks.
     35 * The pool provides an asynchronous event-handling interface so the caller
     36   may receive events as tasks are processed.
     37 
     38 If you don't need these features, use multiprocessing.Pool or concurrency.futures
     39 intead.
     40 
     41 """
     42 
     43 import cPickle
     44 import logging
     45 import multiprocessing
     46 import Queue
     47 import sys
     48 import time
     49 import traceback
     50 
     51 
     52 from webkitpy.common.host import Host
     53 from webkitpy.common.system import stack_utils
     54 
     55 
     56 _log = logging.getLogger(__name__)
     57 
     58 
     59 def get(caller, worker_factory, num_workers, host=None):
     60     """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel."""
     61     return _MessagePool(caller, worker_factory, num_workers, host)
     62 
     63 
     64 class _MessagePool(object):
     65     def __init__(self, caller, worker_factory, num_workers, host=None):
     66         self._caller = caller
     67         self._worker_factory = worker_factory
     68         self._num_workers = num_workers
     69         self._workers = []
     70         self._workers_stopped = set()
     71         self._host = host
     72         self._name = 'manager'
     73         self._running_inline = (self._num_workers == 1)
     74         if self._running_inline:
     75             self._messages_to_worker = Queue.Queue()
     76             self._messages_to_manager = Queue.Queue()
     77         else:
     78             self._messages_to_worker = multiprocessing.Queue()
     79             self._messages_to_manager = multiprocessing.Queue()
     80 
     81     def __enter__(self):
     82         return self
     83 
     84     def __exit__(self, exc_type, exc_value, exc_traceback):
     85         self._close()
     86         return False
     87 
     88     def run(self, shards):
     89         """Posts a list of messages to the pool and waits for them to complete."""
     90         for message in shards:
     91             self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=()))
     92 
     93         for _ in xrange(self._num_workers):
     94             self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=()))
     95 
     96         self.wait()
     97 
     98     def _start_workers(self):
     99         assert not self._workers
    100         self._workers_stopped = set()
    101         host = None
    102         if self._running_inline or self._can_pickle(self._host):
    103             host = self._host
    104 
    105         for worker_number in xrange(self._num_workers):
    106             worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None, self._worker_log_level())
    107             self._workers.append(worker)
    108             worker.start()
    109 
    110     def _worker_log_level(self):
    111         log_level = logging.NOTSET
    112         for handler in logging.root.handlers:
    113             if handler.level != logging.NOTSET:
    114                 if log_level == logging.NOTSET:
    115                     log_level = handler.level
    116                 else:
    117                     log_level = min(log_level, handler.level)
    118         return log_level
    119 
    120     def wait(self):
    121         try:
    122             self._start_workers()
    123             if self._running_inline:
    124                 self._workers[0].run()
    125                 self._loop(block=False)
    126             else:
    127                 self._loop(block=True)
    128         finally:
    129             self._close()
    130 
    131     def _close(self):
    132         for worker in self._workers:
    133             if worker.is_alive():
    134                 worker.terminate()
    135                 worker.join()
    136         self._workers = []
    137         if not self._running_inline:
    138             # FIXME: This is a hack to get multiprocessing to not log tracebacks during shutdown :(.
    139             multiprocessing.util._exiting = True
    140             if self._messages_to_worker:
    141                 self._messages_to_worker.close()
    142                 self._messages_to_worker = None
    143             if self._messages_to_manager:
    144                 self._messages_to_manager.close()
    145                 self._messages_to_manager = None
    146 
    147     def _log_messages(self, messages):
    148         for message in messages:
    149             logging.root.handle(message)
    150 
    151     def _handle_done(self, source):
    152         self._workers_stopped.add(source)
    153 
    154     @staticmethod
    155     def _handle_worker_exception(source, exception_type, exception_value, _):
    156         if exception_type == KeyboardInterrupt:
    157             raise exception_type(exception_value)
    158         raise WorkerException(str(exception_value))
    159 
    160     def _can_pickle(self, host):
    161         try:
    162             cPickle.dumps(host)
    163             return True
    164         except TypeError:
    165             return False
    166 
    167     def _loop(self, block):
    168         try:
    169             while True:
    170                 if len(self._workers_stopped) == len(self._workers):
    171                     block = False
    172                 message = self._messages_to_manager.get(block)
    173                 self._log_messages(message.logs)
    174                 if message.from_user:
    175                     self._caller.handle(message.name, message.src, *message.args)
    176                     continue
    177                 method = getattr(self, '_handle_' + message.name)
    178                 assert method, 'bad message %s' % repr(message)
    179                 method(message.src, *message.args)
    180         except Queue.Empty:
    181             pass
    182 
    183 
    184 class WorkerException(BaseException):
    185     """Raised when we receive an unexpected/unknown exception from a worker."""
    186     pass
    187 
    188 
    189 class _Message(object):
    190     def __init__(self, src, message_name, message_args, from_user, logs):
    191         self.src = src
    192         self.name = message_name
    193         self.args = message_args
    194         self.from_user = from_user
    195         self.logs = logs
    196 
    197     def __repr__(self):
    198         return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs)
    199 
    200 
    201 class _Worker(multiprocessing.Process):
    202     def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager, log_level):
    203         super(_Worker, self).__init__()
    204         self.host = host
    205         self.worker_number = worker_number
    206         self.name = 'worker/%d' % worker_number
    207         self.log_messages = []
    208         self.log_level = log_level
    209         self._running = False
    210         self._running_inline = running_inline
    211         self._manager = manager
    212 
    213         self._messages_to_manager = messages_to_manager
    214         self._messages_to_worker = messages_to_worker
    215         self._worker = worker_factory(self)
    216         self._logger = None
    217         self._log_handler = None
    218 
    219     def terminate(self):
    220         if self._worker:
    221             if hasattr(self._worker, 'stop'):
    222                 self._worker.stop()
    223             self._worker = None
    224         if self.is_alive():
    225             super(_Worker, self).terminate()
    226 
    227     def _close(self):
    228         if self._log_handler and self._logger:
    229             self._logger.removeHandler(self._log_handler)
    230         self._log_handler = None
    231         self._logger = None
    232 
    233     def start(self):
    234         if not self._running_inline:
    235             super(_Worker, self).start()
    236 
    237     def run(self):
    238         if not self.host:
    239             self.host = Host()
    240         if not self._running_inline:
    241             self._set_up_logging()
    242 
    243         worker = self._worker
    244         exception_msg = ""
    245         _log.debug("%s starting" % self.name)
    246         self._running = True
    247 
    248         try:
    249             if hasattr(worker, 'start'):
    250                 worker.start()
    251             while self._running:
    252                 message = self._messages_to_worker.get()
    253                 if message.from_user:
    254                     worker.handle(message.name, message.src, *message.args)
    255                     self._yield_to_manager()
    256                 else:
    257                     assert message.name == 'stop', 'bad message %s' % repr(message)
    258                     break
    259 
    260             _log.debug("%s exiting" % self.name)
    261         except Queue.Empty:
    262             assert False, '%s: ran out of messages in worker queue.' % self.name
    263         except KeyboardInterrupt, e:
    264             self._raise(sys.exc_info())
    265         except Exception, e:
    266             self._raise(sys.exc_info())
    267         finally:
    268             try:
    269                 if hasattr(worker, 'stop'):
    270                     worker.stop()
    271             finally:
    272                 self._post(name='done', args=(), from_user=False)
    273             self._close()
    274 
    275     def stop_running(self):
    276         self._running = False
    277 
    278     def post(self, name, *args):
    279         self._post(name, args, from_user=True)
    280         self._yield_to_manager()
    281 
    282     def _yield_to_manager(self):
    283         if self._running_inline:
    284             self._manager._loop(block=False)
    285 
    286     def _post(self, name, args, from_user):
    287         log_messages = self.log_messages
    288         self.log_messages = []
    289         self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages))
    290 
    291     def _raise(self, exc_info):
    292         exception_type, exception_value, exception_traceback = exc_info
    293         if self._running_inline:
    294             raise exception_type, exception_value, exception_traceback
    295 
    296         if exception_type == KeyboardInterrupt:
    297             _log.debug("%s: interrupted, exiting" % self.name)
    298             stack_utils.log_traceback(_log.debug, exception_traceback)
    299         else:
    300             _log.error("%s: %s('%s') raised:" % (self.name, exception_value.__class__.__name__, str(exception_value)))
    301             stack_utils.log_traceback(_log.error, exception_traceback)
    302         # Since tracebacks aren't picklable, send the extracted stack instead.
    303         stack = traceback.extract_tb(exception_traceback)
    304         self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False)
    305 
    306     def _set_up_logging(self):
    307         self._logger = logging.getLogger()
    308 
    309         # The unix multiprocessing implementation clones any log handlers into the child process,
    310         # so we remove them to avoid duplicate logging.
    311         for h in self._logger.handlers:
    312             self._logger.removeHandler(h)
    313 
    314         self._log_handler = _WorkerLogHandler(self)
    315         self._logger.addHandler(self._log_handler)
    316         self._logger.setLevel(self.log_level)
    317 
    318 
    319 class _WorkerLogHandler(logging.Handler):
    320     def __init__(self, worker):
    321         logging.Handler.__init__(self)
    322         self._worker = worker
    323         self.setLevel(worker.log_level)
    324 
    325     def emit(self, record):
    326         self._worker.log_messages.append(record)
    327