Home | History | Annotate | Download | only in utils
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Thread and ThreadGroup that reraise exceptions on the main thread."""
      6 
      7 import logging
      8 import sys
      9 import threading
     10 import time
     11 import traceback
     12 
     13 import watchdog_timer
     14 
     15 
     16 class TimeoutError(Exception):
     17   """Module-specific timeout exception."""
     18   pass
     19 
     20 
     21 class ReraiserThread(threading.Thread):
     22   """Thread class that can reraise exceptions."""
     23 
     24   def __init__(self, func, args=[], kwargs={}, name=None):
     25     """Initialize thread.
     26 
     27     Args:
     28       func: callable to call on a new thread.
     29       args: list of positional arguments for callable, defaults to empty.
     30       kwargs: dictionary of keyword arguments for callable, defaults to empty.
     31       name: thread name, defaults to Thread-N.
     32     """
     33     super(ReraiserThread, self).__init__(name=name)
     34     self.daemon = True
     35     self._func = func
     36     self._args = args
     37     self._kwargs = kwargs
     38     self._exc_info = None
     39 
     40   def ReraiseIfException(self):
     41     """Reraise exception if an exception was raised in the thread."""
     42     if self._exc_info:
     43       raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
     44 
     45   #override
     46   def run(self):
     47     """Overrides Thread.run() to add support for reraising exceptions."""
     48     try:
     49       self._func(*self._args, **self._kwargs)
     50     except:
     51       self._exc_info = sys.exc_info()
     52       raise
     53 
     54 
     55 class ReraiserThreadGroup(object):
     56   """A group of ReraiserThread objects."""
     57 
     58   def __init__(self, threads=[]):
     59     """Initialize thread group.
     60 
     61     Args:
     62       threads: a list of ReraiserThread objects; defaults to empty.
     63     """
     64     self._threads = threads
     65 
     66   def Add(self, thread):
     67     """Add a thread to the group.
     68 
     69     Args:
     70       thread: a ReraiserThread object.
     71     """
     72     self._threads.append(thread)
     73 
     74   def StartAll(self):
     75     """Start all threads."""
     76     for thread in self._threads:
     77       thread.start()
     78 
     79   def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
     80     """Join all threads without stack dumps.
     81 
     82     Reraises exceptions raised by the child threads and supports breaking
     83     immediately on exceptions raised on the main thread.
     84 
     85     Args:
     86       watcher: Watchdog object providing timeout, by default waits forever.
     87     """
     88     alive_threads = self._threads[:]
     89     while alive_threads:
     90       for thread in alive_threads[:]:
     91         if watcher.IsTimedOut():
     92           raise TimeoutError('Timed out waiting for %d of %d threads.' %
     93                              (len(alive_threads), len(self._threads)))
     94         # Allow the main thread to periodically check for interrupts.
     95         thread.join(0.1)
     96         if not thread.isAlive():
     97           alive_threads.remove(thread)
     98     # All threads are allowed to complete before reraising exceptions.
     99     for thread in self._threads:
    100       thread.ReraiseIfException()
    101 
    102   def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
    103     """Join all threads.
    104 
    105     Reraises exceptions raised by the child threads and supports breaking
    106     immediately on exceptions raised on the main thread. Unfinished threads'
    107     stacks will be logged on watchdog timeout.
    108 
    109     Args:
    110       watcher: Watchdog object providing timeout, by default waits forever.
    111     """
    112     try:
    113       self._JoinAll(watcher)
    114     except TimeoutError:
    115       for thread in (t for t in self._threads if t.isAlive()):
    116         stack = sys._current_frames()[thread.ident]
    117         logging.critical('*' * 80)
    118         logging.critical('Stack dump for timed out thread \'%s\'', thread.name)
    119         logging.critical('*' * 80)
    120         for filename, lineno, name, line in traceback.extract_stack(stack):
    121           logging.critical('File: "%s", line %d, in %s', filename, lineno, name)
    122           if line:
    123             logging.critical('  %s', line.strip())
    124         logging.critical('*' * 80)
    125       raise
    126