Home | History | Annotate | Download | only in local
      1 #!/usr/bin/env python
      2 # Copyright 2014 the V8 project authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 from Queue import Empty
      7 from multiprocessing import Event, Process, Queue
      8 import traceback
      9 
     10 
     11 class NormalResult():
     12   def __init__(self, result):
     13     self.result = result
     14     self.exception = False
     15     self.break_now = False
     16 
     17 
     18 class ExceptionResult():
     19   def __init__(self):
     20     self.exception = True
     21     self.break_now = False
     22 
     23 
     24 class BreakResult():
     25   def __init__(self):
     26     self.exception = False
     27     self.break_now = True
     28 
     29 
     30 class MaybeResult():
     31   def __init__(self, heartbeat, value):
     32     self.heartbeat = heartbeat
     33     self.value = value
     34 
     35   @staticmethod
     36   def create_heartbeat():
     37     return MaybeResult(True, None)
     38 
     39   @staticmethod
     40   def create_result(value):
     41     return MaybeResult(False, value)
     42 
     43 
     44 def Worker(fn, work_queue, done_queue, done,
     45            process_context_fn=None, process_context_args=None):
     46   """Worker to be run in a child process.
     47   The worker stops on two conditions. 1. When the poison pill "STOP" is
     48   reached or 2. when the event "done" is set."""
     49   try:
     50     kwargs = {}
     51     if process_context_fn and process_context_args is not None:
     52       kwargs.update(process_context=process_context_fn(*process_context_args))
     53     for args in iter(work_queue.get, "STOP"):
     54       if done.is_set():
     55         break
     56       try:
     57         done_queue.put(NormalResult(fn(*args, **kwargs)))
     58       except Exception, e:
     59         traceback.print_exc()
     60         print(">>> EXCEPTION: %s" % e)
     61         done_queue.put(ExceptionResult())
     62   except KeyboardInterrupt:
     63     done_queue.put(BreakResult())
     64 
     65 
     66 class Pool():
     67   """Distributes tasks to a number of worker processes.
     68   New tasks can be added dynamically even after the workers have been started.
     69   Requirement: Tasks can only be added from the parent process, e.g. while
     70   consuming the results generator."""
     71 
     72   # Factor to calculate the maximum number of items in the work/done queue.
     73   # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
     74   BUFFER_FACTOR = 4
     75 
     76   def __init__(self, num_workers, heartbeat_timeout=30):
     77     self.num_workers = num_workers
     78     self.processes = []
     79     self.terminated = False
     80 
     81     # Invariant: count >= #work_queue + #done_queue. It is greater when a
     82     # worker takes an item from the work_queue and before the result is
     83     # submitted to the done_queue. It is equal when no worker is working,
     84     # e.g. when all workers have finished, and when no results are processed.
     85     # Count is only accessed by the parent process. Only the parent process is
     86     # allowed to remove items from the done_queue and to add items to the
     87     # work_queue.
     88     self.count = 0
     89     self.work_queue = Queue()
     90     self.done_queue = Queue()
     91     self.done = Event()
     92     self.heartbeat_timeout = heartbeat_timeout
     93 
     94   def imap_unordered(self, fn, gen,
     95                      process_context_fn=None, process_context_args=None):
     96     """Maps function "fn" to items in generator "gen" on the worker processes
     97     in an arbitrary order. The items are expected to be lists of arguments to
     98     the function. Returns a results iterator. A result value of type
     99     MaybeResult either indicates a heartbeat of the runner, i.e. indicating
    100     that the runner is still waiting for the result to be computed, or it wraps
    101     the real result.
    102 
    103     Args:
    104       process_context_fn: Function executed once by each worker. Expected to
    105           return a process-context object. If present, this object is passed
    106           as additional argument to each call to fn.
    107       process_context_args: List of arguments for the invocation of
    108           process_context_fn. All arguments will be pickled and sent beyond the
    109           process boundary.
    110     """
    111     try:
    112       internal_error = False
    113       gen = iter(gen)
    114       self.advance = self._advance_more
    115 
    116       for w in xrange(self.num_workers):
    117         p = Process(target=Worker, args=(fn,
    118                                          self.work_queue,
    119                                          self.done_queue,
    120                                          self.done,
    121                                          process_context_fn,
    122                                          process_context_args))
    123         self.processes.append(p)
    124         p.start()
    125 
    126       self.advance(gen)
    127       while self.count > 0:
    128         while True:
    129           try:
    130             result = self.done_queue.get(timeout=self.heartbeat_timeout)
    131             break
    132           except Empty:
    133             # Indicate a heartbeat. The iterator will continue fetching the
    134             # next result.
    135             yield MaybeResult.create_heartbeat()
    136         self.count -= 1
    137         if result.exception:
    138           # TODO(machenbach): Handle a few known types of internal errors
    139           # gracefully, e.g. missing test files.
    140           internal_error = True
    141           continue
    142         elif result.break_now:
    143           # A keyboard interrupt happened in one of the worker processes.
    144           raise KeyboardInterrupt
    145         else:
    146           yield MaybeResult.create_result(result.result)
    147         self.advance(gen)
    148     finally:
    149       self.terminate()
    150     if internal_error:
    151       raise Exception("Internal error in a worker process.")
    152 
    153   def _advance_more(self, gen):
    154     while self.count < self.num_workers * self.BUFFER_FACTOR:
    155       try:
    156         self.work_queue.put(gen.next())
    157         self.count += 1
    158       except StopIteration:
    159         self.advance = self._advance_empty
    160         break
    161 
    162   def _advance_empty(self, gen):
    163     pass
    164 
    165   def add(self, args):
    166     """Adds an item to the work queue. Can be called dynamically while
    167     processing the results from imap_unordered."""
    168     self.work_queue.put(args)
    169     self.count += 1
    170 
    171   def terminate(self):
    172     if self.terminated:
    173       return
    174     self.terminated = True
    175 
    176     # For exceptional tear down set the "done" event to stop the workers before
    177     # they empty the queue buffer.
    178     self.done.set()
    179 
    180     for p in self.processes:
    181       # During normal tear down the workers block on get(). Feed a poison pill
    182       # per worker to make them stop.
    183       self.work_queue.put("STOP")
    184 
    185     for p in self.processes:
    186       p.join()
    187 
    188     # Drain the queues to prevent failures when queues are garbage collected.
    189     try:
    190       while True: self.work_queue.get(False)
    191     except:
    192       pass
    193     try:
    194       while True: self.done_queue.get(False)
    195     except:
    196       pass
    197