Home | History | Annotate | Download | only in bin
      1 """ Parallel execution management """
      2 
      3 __author__ = """Copyright Andy Whitcroft 2006"""
      4 
      5 import sys, logging, os, pickle, traceback, gc, time
      6 from autotest_lib.client.common_lib import error, utils
      7 
      8 def fork_start(tmp, l):
      9     sys.stdout.flush()
     10     sys.stderr.flush()
     11     pid = os.fork()
     12     if pid:
     13         # Parent
     14         return pid
     15 
     16     try:
     17         try:
     18             l()
     19         except error.AutotestError:
     20             raise
     21         except Exception, e:
     22             raise error.UnhandledTestError(e)
     23     except Exception, detail:
     24         try:
     25             try:
     26                 logging.error('child process failed')
     27                 # logging.exception() uses ERROR level, but we want DEBUG for
     28                 # the traceback
     29                 for line in traceback.format_exc().splitlines():
     30                     logging.debug(line)
     31             finally:
     32                 # note that exceptions originating in this block won't make it
     33                 # to the logs
     34                 output_dir = os.path.join(tmp, 'debug')
     35                 if not os.path.exists(output_dir):
     36                     os.makedirs(output_dir)
     37                 ename = os.path.join(output_dir, "error-%d" % os.getpid())
     38                 pickle.dump(detail, open(ename, "w"))
     39 
     40                 sys.stdout.flush()
     41                 sys.stderr.flush()
     42         finally:
     43             # clear exception information to allow garbage collection of
     44             # objects referenced by the exception's traceback
     45             sys.exc_clear()
     46             gc.collect()
     47             os._exit(1)
     48     else:
     49         try:
     50             sys.stdout.flush()
     51             sys.stderr.flush()
     52         finally:
     53             os._exit(0)
     54 
     55 
     56 def _check_for_subprocess_exception(temp_dir, pid):
     57     ename = temp_dir + "/debug/error-%d" % pid
     58     if os.path.exists(ename):
     59         try:
     60             e = pickle.load(file(ename, 'r'))
     61         except ImportError:
     62             with open(ename, 'r') as fp:
     63                 file_text = fp.read()
     64             raise error.TestError(
     65                     'Subprocess raised an exception that could not be '
     66                     'identified. The root cause exception is in the text '
     67                     'that follows: ' + file_text)
     68         finally:
     69             # Rename the error-pid file so that they do not affect later child
     70             # processes that use recycled pids.
     71             i = 0
     72             while True:
     73                 pename = ename + ('-%d' % i)
     74                 i += 1
     75                 if not os.path.exists(pename):
     76                     break
     77             os.rename(ename, pename)
     78         raise e
     79 
     80 
     81 def fork_waitfor(tmp, pid):
     82     (pid, status) = os.waitpid(pid, 0)
     83 
     84     _check_for_subprocess_exception(tmp, pid)
     85 
     86     if status:
     87         raise error.TestError("Test subprocess failed rc=%d" % (status))
     88 
     89 def fork_waitfor_timed(tmp, pid, timeout):
     90     """
     91     Waits for pid until it terminates or timeout expires.
     92     If timeout expires, test subprocess is killed.
     93     """
     94     timer_expired = True
     95     poll_time = 2
     96     time_passed = 0
     97     while time_passed < timeout:
     98         time.sleep(poll_time)
     99         (child_pid, status) = os.waitpid(pid, os.WNOHANG)
    100         if (child_pid, status) == (0, 0):
    101             time_passed = time_passed + poll_time
    102         else:
    103             timer_expired = False
    104             break
    105 
    106     if timer_expired:
    107         logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid)
    108         utils.nuke_pid(pid)
    109         (child_pid, status) = os.waitpid(pid, 0)
    110         raise error.TestError("Test timeout expired, rc=%d" % (status))
    111     else:
    112         _check_for_subprocess_exception(tmp, pid)
    113 
    114     if status:
    115         raise error.TestError("Test subprocess failed rc=%d" % (status))
    116 
    117 def fork_nuke_subprocess(tmp, pid):
    118     utils.nuke_pid(pid)
    119     _check_for_subprocess_exception(tmp, pid)
    120