Home | History | Annotate | Download | only in local
      1 #!/usr/bin/env python
      2 # Copyright 2014 the V8 project authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 from multiprocessing import Event, Process, Queue
      7 
      8 class NormalResult():
      9   def __init__(self, result):
     10     self.result = result
     11     self.exception = False
     12     self.break_now = False
     13 
     14 
     15 class ExceptionResult():
     16   def __init__(self):
     17     self.exception = True
     18     self.break_now = False
     19 
     20 
     21 class BreakResult():
     22   def __init__(self):
     23     self.exception = False
     24     self.break_now = True
     25 
     26 
     27 def Worker(fn, work_queue, done_queue, done):
     28   """Worker to be run in a child process.
     29   The worker stops on two conditions. 1. When the poison pill "STOP" is
     30   reached or 2. when the event "done" is set."""
     31   try:
     32     for args in iter(work_queue.get, "STOP"):
     33       if done.is_set():
     34         break
     35       try:
     36         done_queue.put(NormalResult(fn(*args)))
     37       except Exception, e:
     38         print(">>> EXCEPTION: %s" % e)
     39         done_queue.put(ExceptionResult())
     40   except KeyboardInterrupt:
     41     done_queue.put(BreakResult())
     42 
     43 
     44 class Pool():
     45   """Distributes tasks to a number of worker processes.
     46   New tasks can be added dynamically even after the workers have been started.
     47   Requirement: Tasks can only be added from the parent process, e.g. while
     48   consuming the results generator."""
     49 
     50   # Factor to calculate the maximum number of items in the work/done queue.
     51   # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
     52   BUFFER_FACTOR = 4
     53 
     54   def __init__(self, num_workers):
     55     self.num_workers = num_workers
     56     self.processes = []
     57     self.terminated = False
     58 
     59     # Invariant: count >= #work_queue + #done_queue. It is greater when a
     60     # worker takes an item from the work_queue and before the result is
     61     # submitted to the done_queue. It is equal when no worker is working,
     62     # e.g. when all workers have finished, and when no results are processed.
     63     # Count is only accessed by the parent process. Only the parent process is
     64     # allowed to remove items from the done_queue and to add items to the
     65     # work_queue.
     66     self.count = 0
     67     self.work_queue = Queue()
     68     self.done_queue = Queue()
     69     self.done = Event()
     70 
     71   def imap_unordered(self, fn, gen):
     72     """Maps function "fn" to items in generator "gen" on the worker processes
     73     in an arbitrary order. The items are expected to be lists of arguments to
     74     the function. Returns a results iterator."""
     75     try:
     76       gen = iter(gen)
     77       self.advance = self._advance_more
     78 
     79       for w in xrange(self.num_workers):
     80         p = Process(target=Worker, args=(fn,
     81                                          self.work_queue,
     82                                          self.done_queue,
     83                                          self.done))
     84         self.processes.append(p)
     85         p.start()
     86 
     87       self.advance(gen)
     88       while self.count > 0:
     89         result = self.done_queue.get()
     90         self.count -= 1
     91         if result.exception:
     92           # Ignore items with unexpected exceptions.
     93           continue
     94         elif result.break_now:
     95           # A keyboard interrupt happened in one of the worker processes.
     96           raise KeyboardInterrupt
     97         else:
     98           yield result.result
     99         self.advance(gen)
    100     finally:
    101       self.terminate()
    102 
    103   def _advance_more(self, gen):
    104     while self.count < self.num_workers * self.BUFFER_FACTOR:
    105       try:
    106         self.work_queue.put(gen.next())
    107         self.count += 1
    108       except StopIteration:
    109         self.advance = self._advance_empty
    110         break
    111 
    112   def _advance_empty(self, gen):
    113     pass
    114 
    115   def add(self, args):
    116     """Adds an item to the work queue. Can be called dynamically while
    117     processing the results from imap_unordered."""
    118     self.work_queue.put(args)
    119     self.count += 1
    120 
    121   def terminate(self):
    122     if self.terminated:
    123       return
    124     self.terminated = True
    125 
    126     # For exceptional tear down set the "done" event to stop the workers before
    127     # they empty the queue buffer.
    128     self.done.set()
    129 
    130     for p in self.processes:
    131       # During normal tear down the workers block on get(). Feed a poison pill
    132       # per worker to make them stop.
    133       self.work_queue.put("STOP")
    134 
    135     for p in self.processes:
    136       p.join()
    137 
    138     # Drain the queues to prevent failures when queues are garbage collected.
    139     try:
    140       while True: self.work_queue.get(False)
    141     except:
    142       pass
    143     try:
    144       while True: self.done_queue.get(False)
    145     except:
    146       pass
    147