Home | History | Annotate | Download | only in local
      1 #!/usr/bin/env python
      2 # Copyright 2014 the V8 project authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 from Queue import Empty
      7 from contextlib import contextmanager
      8 from multiprocessing import Process, Queue
      9 import os
     10 import signal
     11 import time
     12 import traceback
     13 
     14 from . import command
     15 
     16 
     17 def setup_testing():
     18   """For testing only: Use threading under the hood instead of multiprocessing
     19   to make coverage work.
     20   """
     21   global Queue
     22   global Process
     23   del Queue
     24   del Process
     25   from Queue import Queue
     26   from threading import Thread as Process
     27   # Monkeypatch threading Queue to look like multiprocessing Queue.
     28   Queue.cancel_join_thread = lambda self: None
     29 
     30 
     31 class NormalResult():
     32   def __init__(self, result):
     33     self.result = result
     34     self.exception = None
     35 
     36 class ExceptionResult():
     37   def __init__(self, exception):
     38     self.exception = exception
     39 
     40 
     41 class MaybeResult():
     42   def __init__(self, heartbeat, value):
     43     self.heartbeat = heartbeat
     44     self.value = value
     45 
     46   @staticmethod
     47   def create_heartbeat():
     48     return MaybeResult(True, None)
     49 
     50   @staticmethod
     51   def create_result(value):
     52     return MaybeResult(False, value)
     53 
     54 
     55 def Worker(fn, work_queue, done_queue,
     56            process_context_fn=None, process_context_args=None):
     57   """Worker to be run in a child process.
     58   The worker stops when the poison pill "STOP" is reached.
     59   """
     60   try:
     61     kwargs = {}
     62     if process_context_fn and process_context_args is not None:
     63       kwargs.update(process_context=process_context_fn(*process_context_args))
     64     for args in iter(work_queue.get, "STOP"):
     65       try:
     66         done_queue.put(NormalResult(fn(*args, **kwargs)))
     67       except command.AbortException:
     68         # SIGINT, SIGTERM or internal hard timeout.
     69         break
     70       except Exception, e:
     71         traceback.print_exc()
     72         print(">>> EXCEPTION: %s" % e)
     73         done_queue.put(ExceptionResult(e))
     74     # When we reach here on normal tear down, all items have been pulled from
     75     # the done_queue before and this should have no effect. On fast abort, it's
     76     # possible that a fast worker left items on the done_queue in memory, which
     77     # will never be pulled. This call purges those to avoid a deadlock.
     78     done_queue.cancel_join_thread()
     79   except KeyboardInterrupt:
     80     assert False, 'Unreachable'
     81 
     82 
     83 @contextmanager
     84 def without_sig():
     85   int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
     86   term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
     87   try:
     88     yield
     89   finally:
     90     signal.signal(signal.SIGINT, int_handler)
     91     signal.signal(signal.SIGTERM, term_handler)
     92 
     93 
     94 class Pool():
     95   """Distributes tasks to a number of worker processes.
     96   New tasks can be added dynamically even after the workers have been started.
     97   Requirement: Tasks can only be added from the parent process, e.g. while
     98   consuming the results generator."""
     99 
    100   # Factor to calculate the maximum number of items in the work/done queue.
    101   # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
    102   BUFFER_FACTOR = 4
    103 
    104   def __init__(self, num_workers, heartbeat_timeout=1):
    105     self.num_workers = num_workers
    106     self.processes = []
    107     self.terminated = False
    108     self.abort_now = False
    109 
    110     # Invariant: processing_count >= #work_queue + #done_queue. It is greater
    111     # when a worker takes an item from the work_queue and before the result is
    112     # submitted to the done_queue. It is equal when no worker is working,
    113     # e.g. when all workers have finished, and when no results are processed.
    114     # Count is only accessed by the parent process. Only the parent process is
    115     # allowed to remove items from the done_queue and to add items to the
    116     # work_queue.
    117     self.processing_count = 0
    118     self.heartbeat_timeout = heartbeat_timeout
    119 
    120     # Disable sigint and sigterm to prevent subprocesses from capturing the
    121     # signals.
    122     with without_sig():
    123       self.work_queue = Queue()
    124       self.done_queue = Queue()
    125 
    126   def imap_unordered(self, fn, gen,
    127                      process_context_fn=None, process_context_args=None):
    128     """Maps function "fn" to items in generator "gen" on the worker processes
    129     in an arbitrary order. The items are expected to be lists of arguments to
    130     the function. Returns a results iterator. A result value of type
    131     MaybeResult either indicates a heartbeat of the runner, i.e. indicating
    132     that the runner is still waiting for the result to be computed, or it wraps
    133     the real result.
    134 
    135     Args:
    136       process_context_fn: Function executed once by each worker. Expected to
    137           return a process-context object. If present, this object is passed
    138           as additional argument to each call to fn.
    139       process_context_args: List of arguments for the invocation of
    140           process_context_fn. All arguments will be pickled and sent beyond the
    141           process boundary.
    142     """
    143     if self.terminated:
    144       return
    145     try:
    146       internal_error = False
    147       gen = iter(gen)
    148       self.advance = self._advance_more
    149 
    150       # Disable sigint and sigterm to prevent subprocesses from capturing the
    151       # signals.
    152       with without_sig():
    153         for w in xrange(self.num_workers):
    154           p = Process(target=Worker, args=(fn,
    155                                           self.work_queue,
    156                                           self.done_queue,
    157                                           process_context_fn,
    158                                           process_context_args))
    159           p.start()
    160           self.processes.append(p)
    161 
    162       self.advance(gen)
    163       while self.processing_count > 0:
    164         while True:
    165           try:
    166             # Read from result queue in a responsive fashion. If available,
    167             # this will return a normal result immediately or a heartbeat on
    168             # heartbeat timeout (default 1 second).
    169             result = self._get_result_from_queue()
    170           except:
    171             # TODO(machenbach): Handle a few known types of internal errors
    172             # gracefully, e.g. missing test files.
    173             internal_error = True
    174             continue
    175 
    176           if self.abort_now:
    177             # SIGINT, SIGTERM or internal hard timeout.
    178             return
    179 
    180           yield result
    181           break
    182 
    183         self.advance(gen)
    184     except KeyboardInterrupt:
    185       assert False, 'Unreachable'
    186     except Exception as e:
    187       traceback.print_exc()
    188       print(">>> EXCEPTION: %s" % e)
    189     finally:
    190       self._terminate()
    191 
    192     if internal_error:
    193       raise Exception("Internal error in a worker process.")
    194 
    195   def _advance_more(self, gen):
    196     while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
    197       try:
    198         self.work_queue.put(gen.next())
    199         self.processing_count += 1
    200       except StopIteration:
    201         self.advance = self._advance_empty
    202         break
    203 
    204   def _advance_empty(self, gen):
    205     pass
    206 
    207   def add(self, args):
    208     """Adds an item to the work queue. Can be called dynamically while
    209     processing the results from imap_unordered."""
    210     assert not self.terminated
    211 
    212     self.work_queue.put(args)
    213     self.processing_count += 1
    214 
    215   def abort(self):
    216     """Schedules abort on next queue read.
    217 
    218     This is safe to call when handling SIGINT, SIGTERM or when an internal
    219     hard timeout is reached.
    220     """
    221     self.abort_now = True
    222 
    223   def _terminate(self):
    224     """Terminates execution and cleans up the queues.
    225 
    226     If abort() was called before termination, this also terminates the
    227     subprocesses and doesn't wait for ongoing tests.
    228     """
    229     if self.terminated:
    230       return
    231     self.terminated = True
    232 
    233     # Drain out work queue from tests
    234     try:
    235       while True:
    236         self.work_queue.get(True, 0.1)
    237     except Empty:
    238       pass
    239 
    240     # Make sure all processes stop
    241     for _ in self.processes:
    242       # During normal tear down the workers block on get(). Feed a poison pill
    243       # per worker to make them stop.
    244       self.work_queue.put("STOP")
    245 
    246     if self.abort_now:
    247       for p in self.processes:
    248         os.kill(p.pid, signal.SIGTERM)
    249 
    250     for p in self.processes:
    251       p.join()
    252 
    253     # Drain the queues to prevent stderr chatter when queues are garbage
    254     # collected.
    255     try:
    256       while True: self.work_queue.get(False)
    257     except:
    258       pass
    259     try:
    260       while True: self.done_queue.get(False)
    261     except:
    262       pass
    263 
    264   def _get_result_from_queue(self):
    265     """Attempts to get the next result from the queue.
    266 
    267     Returns: A wrapped result if one was available within heartbeat timeout,
    268         a heartbeat result otherwise.
    269     Raises:
    270         Exception: If an exception occured when processing the task on the
    271             worker side, it is reraised here.
    272     """
    273     while True:
    274       try:
    275         result = self.done_queue.get(timeout=self.heartbeat_timeout)
    276         self.processing_count -= 1
    277         if result.exception:
    278           raise result.exception
    279         return MaybeResult.create_result(result.result)
    280       except Empty:
    281         return MaybeResult.create_heartbeat()
    282