Home | History | Annotate | Download | only in server
      1 __author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
      2 
      3 import sys, os, signal, time, cPickle, logging
      4 
      5 from autotest_lib.client.common_lib import error, utils
      6 from autotest_lib.client.common_lib.cros import retry
      7 
      8 
      9 # entry points that use subcommand must set this to their logging manager
     10 # to get log redirection for subcommands
     11 logging_manager_object = None
     12 
     13 
     14 def parallel(tasklist, timeout=None, return_results=False):
     15     """
     16     Run a set of predefined subcommands in parallel.
     17 
     18     @param tasklist: A list of subcommand instances to execute.
     19     @param timeout: Number of seconds after which the commands should timeout.
     20     @param return_results: If True instead of an AutoServError being raised
     21             on any error a list of the results|exceptions from the tasks is
     22             returned.  [default: False]
     23     """
     24     run_error = False
     25     for task in tasklist:
     26         task.fork_start()
     27 
     28     remaining_timeout = None
     29     if timeout:
     30         endtime = time.time() + timeout
     31 
     32     results = []
     33     for task in tasklist:
     34         if timeout:
     35             remaining_timeout = max(endtime - time.time(), 1)
     36         try:
     37             status = task.fork_waitfor(timeout=remaining_timeout)
     38         except error.AutoservSubcommandError:
     39             run_error = True
     40         else:
     41             if status != 0:
     42                 run_error = True
     43 
     44         results.append(cPickle.load(task.result_pickle))
     45         task.result_pickle.close()
     46 
     47     if return_results:
     48         return results
     49     elif run_error:
     50         message = 'One or more subcommands failed:\n'
     51         for task, result in zip(tasklist, results):
     52             message += 'task: %s returned/raised: %r\n' % (task, result)
     53         raise error.AutoservError(message)
     54 
     55 
     56 def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x),
     57                     log=True, timeout=None, return_results=False):
     58     """
     59     Each element in the arglist used to create a subcommand object,
     60     where that arg is used both as a subdir name, and a single argument
     61     to pass to "function".
     62 
     63     We create a subcommand object for each element in the list,
     64     then execute those subcommand objects in parallel.
     65 
     66     NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
     67 
     68     @param function: A callable to run in parallel once per arg in arglist.
     69     @param arglist: A list of arguments to be used one per subcommand
     70     @param subdir_name_constructor: A function that returns a name for the
     71             result sub-directory created per subcommand.
     72             Signature is:
     73                 subdir_name_constructor(arg)
     74             where arg is the argument passed to function.
     75     @param log: If True, output will be written to output in a subdirectory
     76             named after each subcommand's arg.
     77     @param timeout: Number of seconds after which the commands should timeout.
     78     @param return_results: If True instead of an AutoServError being raised
     79             on any error a list of the results|exceptions from the function
     80             called on each arg is returned.  [default: False]
     81 
     82     @returns None or a list of results/exceptions.
     83     """
     84     if not arglist:
     85         logging.warning('parallel_simple was called with an empty arglist, '
     86                         'did you forget to pass in a list of machines?')
     87 
     88     # Bypass the multithreading if only one machine.
     89     if len(arglist) == 1:
     90         arg = arglist[0]
     91         if return_results:
     92             try:
     93                 result = function(arg)
     94             except Exception, e:
     95                 return [e]
     96             return [result]
     97         else:
     98             function(arg)
     99             return
    100 
    101     subcommands = []
    102     for arg in arglist:
    103         args = [arg]
    104         subdir = subdir_name_constructor(arg) if log else None
    105         subcommands.append(subcommand(function, args, subdir))
    106     return parallel(subcommands, timeout, return_results=return_results)
    107 
    108 
    109 class subcommand(object):
    110     fork_hooks, join_hooks = [], []
    111 
    112     def __init__(self, func, args, subdir = None):
    113         # func(args) - the subcommand to run
    114         # subdir     - the subdirectory to log results in
    115         if subdir:
    116             self.subdir = os.path.abspath(subdir)
    117             if not os.path.exists(self.subdir):
    118                 os.mkdir(self.subdir)
    119             self.debug = os.path.join(self.subdir, 'debug')
    120             if not os.path.exists(self.debug):
    121                 os.mkdir(self.debug)
    122         else:
    123             self.subdir = None
    124             self.debug = None
    125 
    126         self.func = func
    127         self.args = args
    128         self.pid = None
    129         self.returncode = None
    130 
    131 
    132     def __str__(self):
    133         return str('subcommand(func=%s,  args=%s, subdir=%s)' %
    134                    (self.func, self.args, self.subdir))
    135 
    136 
    137     @classmethod
    138     def register_fork_hook(cls, hook):
    139         """ Register a function to be called from the child process after
    140         forking. """
    141         cls.fork_hooks.append(hook)
    142 
    143 
    144     @classmethod
    145     def register_join_hook(cls, hook):
    146         """ Register a function to be called when from the child process
    147         just before the child process terminates (joins to the parent). """
    148         cls.join_hooks.append(hook)
    149 
    150 
    151     def redirect_output(self):
    152         if self.subdir and logging_manager_object:
    153             tag = os.path.basename(self.subdir)
    154             logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
    155 
    156 
    157     def fork_start(self):
    158         sys.stdout.flush()
    159         sys.stderr.flush()
    160         r, w = os.pipe()
    161         self.returncode = None
    162         self.pid = os.fork()
    163 
    164         if self.pid:                            # I am the parent
    165             os.close(w)
    166             self.result_pickle = os.fdopen(r, 'r')
    167             return
    168         else:
    169             os.close(r)
    170 
    171         # We are the child from this point on. Never return.
    172         signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
    173         if self.subdir:
    174             os.chdir(self.subdir)
    175         self.redirect_output()
    176 
    177         try:
    178             for hook in self.fork_hooks:
    179                 hook(self)
    180             result = self.func(*self.args)
    181             os.write(w, cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL))
    182             exit_code = 0
    183         except Exception, e:
    184             logging.exception('function failed')
    185             exit_code = 1
    186             os.write(w, cPickle.dumps(e, cPickle.HIGHEST_PROTOCOL))
    187 
    188         os.close(w)
    189 
    190         try:
    191             for hook in self.join_hooks:
    192                 hook(self)
    193         finally:
    194             sys.stdout.flush()
    195             sys.stderr.flush()
    196             os._exit(exit_code)
    197 
    198 
    199     def _handle_exitstatus(self, sts):
    200         """
    201         This is partially borrowed from subprocess.Popen.
    202         """
    203         if os.WIFSIGNALED(sts):
    204             self.returncode = -os.WTERMSIG(sts)
    205         elif os.WIFEXITED(sts):
    206             self.returncode = os.WEXITSTATUS(sts)
    207         else:
    208             # Should never happen
    209             raise RuntimeError("Unknown child exit status!")
    210 
    211         if self.returncode != 0:
    212             print "subcommand failed pid %d" % self.pid
    213             print "%s" % (self.func,)
    214             print "rc=%d" % self.returncode
    215             print
    216             if self.debug:
    217                 stderr_file = os.path.join(self.debug, 'autoserv.stderr')
    218                 if os.path.exists(stderr_file):
    219                     for line in open(stderr_file).readlines():
    220                         print line,
    221             print "\n--------------------------------------------\n"
    222             raise error.AutoservSubcommandError(self.func, self.returncode)
    223 
    224 
    225     def poll(self):
    226         """
    227         This is borrowed from subprocess.Popen.
    228         """
    229         if self.returncode is None:
    230             try:
    231                 pid, sts = os.waitpid(self.pid, os.WNOHANG)
    232                 if pid == self.pid:
    233                     self._handle_exitstatus(sts)
    234             except os.error:
    235                 pass
    236         return self.returncode
    237 
    238 
    239     def wait(self):
    240         """
    241         This is borrowed from subprocess.Popen.
    242         """
    243         if self.returncode is None:
    244             pid, sts = os.waitpid(self.pid, 0)
    245             self._handle_exitstatus(sts)
    246         return self.returncode
    247 
    248 
    249     def fork_waitfor(self, timeout=None):
    250         if not timeout:
    251             return self.wait()
    252         else:
    253             _, result = retry.timeout(self.wait, timeout_sec=timeout)
    254 
    255             if result is None:
    256                 utils.nuke_pid(self.pid)
    257                 print "subcommand failed pid %d" % self.pid
    258                 print "%s" % (self.func,)
    259                 print "timeout after %ds" % timeout
    260                 print
    261                 result = self.wait()
    262 
    263             return result
    264