1 import os, sys 2 3 4 class ParallelError(Exception): 5 def __init__(self, str, errors): 6 self.str = str 7 self.errors = errors 8 Exception.__init__(self, str) 9 10 11 class ParallelExecute(object): 12 def __init__(self, functions, max_simultaneous_procs=20): 13 """\ 14 This takes in a dictionary of functions which map to a set of 15 functions that they depend on. 16 17 functions: This is either a list of or dictionary of functions to 18 be run. If it's a dictionary, the value should be a set 19 of other functions this function is dependent on. If its 20 a list (or tuple or anything iterable that returns a 21 single element each iteration), then it's assumed that 22 there are no dependencies. 23 24 max_simultaneous_procs: Throttle the number of processes we have 25 running at once. 26 """ 27 if not isinstance(functions, dict): 28 function_list = functions 29 functions = {} 30 for fn in function_list: 31 functions[fn] = set() 32 33 dependents = {} 34 for fn, deps in functions.iteritems(): 35 dependents[fn] = [] 36 for fn, deps in functions.iteritems(): 37 for dep in deps: 38 dependents[dep].append(fn) 39 40 self.max_procs = max_simultaneous_procs 41 self.functions = functions 42 self.dependents = dependents 43 self.pid_map = {} 44 self.ready_to_run = [] 45 46 47 def _run(self, function): 48 self.functions.pop(function) 49 pid = os.fork() 50 if pid: 51 self.pid_map[pid] = function 52 else: 53 function() 54 sys.exit(0) 55 56 57 def run_until_completion(self): 58 for fn, deps in self.functions.iteritems(): 59 if len(deps) == 0: 60 self.ready_to_run.append(fn) 61 62 errors = [] 63 while len(self.pid_map) > 0 or len(self.ready_to_run) > 0: 64 max_allowed = self.max_procs - len(self.pid_map) 65 max_able = len(self.ready_to_run) 66 for i in xrange(min(max_allowed, max_able)): 67 self._run(self.ready_to_run.pop()) 68 69 # Handle one proc that's finished. 70 pid, status = os.wait() 71 fn = self.pid_map.pop(pid) 72 if status != 0: 73 errors.append("%s failed" % fn.__name__) 74 continue 75 76 for dependent in self.dependents[fn]: 77 self.functions[dependent].remove(fn) 78 if len(self.functions[dependent]) == 0: 79 self.ready_to_run.append(dependent) 80 81 if len(self.functions) > 0 and len(errors) == 0: 82 errors.append("Deadlock detected") 83 84 if len(errors) > 0: 85 msg = "Errors occurred during execution:" 86 msg = '\n'.join([msg] + errors) 87 raise ParallelError(msg, errors) 88 89 90 def redirect_io(log_file='/dev/null'): 91 # Always redirect stdin. 92 in_fd = os.open('/dev/null', os.O_RDONLY) 93 try: 94 os.dup2(in_fd, 0) 95 finally: 96 os.close(in_fd) 97 98 out_fd = os.open(log_file, os.O_WRONLY | os.O_CREAT) 99 try: 100 os.dup2(out_fd, 2) 101 os.dup2(out_fd, 1) 102 finally: 103 os.close(out_fd) 104 105 sys.stdin = os.fdopen(0, 'r') 106 sys.stdout = os.fdopen(1, 'w') 107 sys.stderr = os.fdopen(2, 'w') 108