Home | History | Annotate | Download | only in utils
      1 import os, sys
      2 
      3 
      4 class ParallelError(Exception):
      5     def __init__(self, str, errors):
      6         self.str = str
      7         self.errors = errors
      8         Exception.__init__(self, str)
      9 
     10 
     11 class ParallelExecute(object):
     12     def __init__(self, functions, max_simultaneous_procs=20):
     13         """\
     14         This takes in a dictionary of functions which map to a set of
     15         functions that they depend on.
     16 
     17         functions: This is either a list of or dictionary of functions to
     18                    be run.  If it's a dictionary, the value should be a set
     19                    of other functions this function is dependent on.  If its
     20                    a list (or tuple or anything iterable that returns a
     21                    single element each iteration), then it's assumed that
     22                    there are no dependencies.
     23 
     24         max_simultaneous_procs: Throttle the number of processes we have
     25                                 running at once.
     26         """
     27         if not isinstance(functions, dict):
     28             function_list = functions
     29             functions = {}
     30             for fn in function_list:
     31                 functions[fn] = set()
     32 
     33         dependents = {}
     34         for fn, deps in functions.iteritems():
     35             dependents[fn] = []
     36         for fn, deps in functions.iteritems():
     37             for dep in deps:
     38                 dependents[dep].append(fn)
     39 
     40         self.max_procs = max_simultaneous_procs
     41         self.functions = functions
     42         self.dependents = dependents
     43         self.pid_map = {}
     44         self.ready_to_run = []
     45 
     46 
     47     def _run(self, function):
     48         self.functions.pop(function)
     49         pid = os.fork()
     50         if pid:
     51             self.pid_map[pid] = function
     52         else:
     53             function()
     54             sys.exit(0)
     55 
     56 
     57     def run_until_completion(self):
     58         for fn, deps in self.functions.iteritems():
     59             if len(deps) == 0:
     60                 self.ready_to_run.append(fn)
     61 
     62         errors = []
     63         while len(self.pid_map) > 0 or len(self.ready_to_run) > 0:
     64             max_allowed = self.max_procs - len(self.pid_map)
     65             max_able = len(self.ready_to_run)
     66             for i in xrange(min(max_allowed, max_able)):
     67                 self._run(self.ready_to_run.pop())
     68 
     69             # Handle one proc that's finished.
     70             pid, status = os.wait()
     71             fn = self.pid_map.pop(pid)
     72             if status != 0:
     73                 errors.append("%s failed" % fn.__name__)
     74                 continue
     75 
     76             for dependent in self.dependents[fn]:
     77                 self.functions[dependent].remove(fn)
     78                 if len(self.functions[dependent]) == 0:
     79                     self.ready_to_run.append(dependent)
     80 
     81         if len(self.functions) > 0 and len(errors) == 0:
     82             errors.append("Deadlock detected")
     83 
     84         if len(errors) > 0:
     85             msg = "Errors occurred during execution:"
     86             msg = '\n'.join([msg] + errors)
     87             raise ParallelError(msg, errors)
     88 
     89 
     90 def redirect_io(log_file='/dev/null'):
     91     # Always redirect stdin.
     92     in_fd = os.open('/dev/null', os.O_RDONLY)
     93     try:
     94         os.dup2(in_fd, 0)
     95     finally:
     96         os.close(in_fd)
     97 
     98     out_fd = os.open(log_file, os.O_WRONLY | os.O_CREAT)
     99     try:
    100         os.dup2(out_fd, 2)
    101         os.dup2(out_fd, 1)
    102     finally:
    103         os.close(out_fd)
    104 
    105     sys.stdin = os.fdopen(0, 'r')
    106     sys.stdout = os.fdopen(1, 'w')
    107     sys.stderr = os.fdopen(2, 'w')
    108