Home | History | Annotate | Download | only in utils
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Thread and ThreadGroup that reraise exceptions on the main thread."""
      6 # pylint: disable=W0212
      7 
      8 import logging
      9 import sys
     10 import threading
     11 import traceback
     12 
     13 from pylib.utils import watchdog_timer
     14 
     15 
     16 class TimeoutError(Exception):
     17   """Module-specific timeout exception."""
     18   pass
     19 
     20 
     21 def LogThreadStack(thread):
     22   """Log the stack for the given thread.
     23 
     24   Args:
     25     thread: a threading.Thread instance.
     26   """
     27   stack = sys._current_frames()[thread.ident]
     28   logging.critical('*' * 80)
     29   logging.critical('Stack dump for thread \'%s\'', thread.name)
     30   logging.critical('*' * 80)
     31   for filename, lineno, name, line in traceback.extract_stack(stack):
     32     logging.critical('File: "%s", line %d, in %s', filename, lineno, name)
     33     if line:
     34       logging.critical('  %s', line.strip())
     35   logging.critical('*' * 80)
     36 
     37 
     38 class ReraiserThread(threading.Thread):
     39   """Thread class that can reraise exceptions."""
     40 
     41   def __init__(self, func, args=None, kwargs=None, name=None):
     42     """Initialize thread.
     43 
     44     Args:
     45       func: callable to call on a new thread.
     46       args: list of positional arguments for callable, defaults to empty.
     47       kwargs: dictionary of keyword arguments for callable, defaults to empty.
     48       name: thread name, defaults to Thread-N.
     49     """
     50     super(ReraiserThread, self).__init__(name=name)
     51     if not args:
     52       args = []
     53     if not kwargs:
     54       kwargs = {}
     55     self.daemon = True
     56     self._func = func
     57     self._args = args
     58     self._kwargs = kwargs
     59     self._ret = None
     60     self._exc_info = None
     61 
     62   def ReraiseIfException(self):
     63     """Reraise exception if an exception was raised in the thread."""
     64     if self._exc_info:
     65       raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
     66 
     67   def GetReturnValue(self):
     68     """Reraise exception if present, otherwise get the return value."""
     69     self.ReraiseIfException()
     70     return self._ret
     71 
     72   #override
     73   def run(self):
     74     """Overrides Thread.run() to add support for reraising exceptions."""
     75     try:
     76       self._ret = self._func(*self._args, **self._kwargs)
     77     except:
     78       self._exc_info = sys.exc_info()
     79       raise
     80 
     81 
     82 class ReraiserThreadGroup(object):
     83   """A group of ReraiserThread objects."""
     84 
     85   def __init__(self, threads=None):
     86     """Initialize thread group.
     87 
     88     Args:
     89       threads: a list of ReraiserThread objects; defaults to empty.
     90     """
     91     if not threads:
     92       threads = []
     93     self._threads = threads
     94 
     95   def Add(self, thread):
     96     """Add a thread to the group.
     97 
     98     Args:
     99       thread: a ReraiserThread object.
    100     """
    101     self._threads.append(thread)
    102 
    103   def StartAll(self):
    104     """Start all threads."""
    105     for thread in self._threads:
    106       thread.start()
    107 
    108   def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
    109     """Join all threads without stack dumps.
    110 
    111     Reraises exceptions raised by the child threads and supports breaking
    112     immediately on exceptions raised on the main thread.
    113 
    114     Args:
    115       watcher: Watchdog object providing timeout, by default waits forever.
    116     """
    117     alive_threads = self._threads[:]
    118     while alive_threads:
    119       for thread in alive_threads[:]:
    120         if watcher.IsTimedOut():
    121           raise TimeoutError('Timed out waiting for %d of %d threads.' %
    122                              (len(alive_threads), len(self._threads)))
    123         # Allow the main thread to periodically check for interrupts.
    124         thread.join(0.1)
    125         if not thread.isAlive():
    126           alive_threads.remove(thread)
    127     # All threads are allowed to complete before reraising exceptions.
    128     for thread in self._threads:
    129       thread.ReraiseIfException()
    130 
    131   def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
    132     """Join all threads.
    133 
    134     Reraises exceptions raised by the child threads and supports breaking
    135     immediately on exceptions raised on the main thread. Unfinished threads'
    136     stacks will be logged on watchdog timeout.
    137 
    138     Args:
    139       watcher: Watchdog object providing timeout, by default waits forever.
    140     """
    141     try:
    142       self._JoinAll(watcher)
    143     except TimeoutError:
    144       for thread in (t for t in self._threads if t.isAlive()):
    145         LogThreadStack(thread)
    146       raise
    147 
    148   def GetAllReturnValues(self, watcher=watchdog_timer.WatchdogTimer(None)):
    149     """Get all return values, joining all threads if necessary.
    150 
    151     Args:
    152       watcher: same as in |JoinAll|. Only used if threads are alive.
    153     """
    154     if any([t.isAlive() for t in self._threads]):
    155       self.JoinAll(watcher)
    156     return [t.GetReturnValue() for t in self._threads]
    157 
    158