1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Thread and ThreadGroup that reraise exceptions on the main thread.""" 6 7 import logging 8 import sys 9 import threading 10 import time 11 import traceback 12 13 import watchdog_timer 14 15 16 class TimeoutError(Exception): 17 """Module-specific timeout exception.""" 18 pass 19 20 21 class ReraiserThread(threading.Thread): 22 """Thread class that can reraise exceptions.""" 23 24 def __init__(self, func, args=[], kwargs={}, name=None): 25 """Initialize thread. 26 27 Args: 28 func: callable to call on a new thread. 29 args: list of positional arguments for callable, defaults to empty. 30 kwargs: dictionary of keyword arguments for callable, defaults to empty. 31 name: thread name, defaults to Thread-N. 32 """ 33 super(ReraiserThread, self).__init__(name=name) 34 self.daemon = True 35 self._func = func 36 self._args = args 37 self._kwargs = kwargs 38 self._exc_info = None 39 40 def ReraiseIfException(self): 41 """Reraise exception if an exception was raised in the thread.""" 42 if self._exc_info: 43 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] 44 45 #override 46 def run(self): 47 """Overrides Thread.run() to add support for reraising exceptions.""" 48 try: 49 self._func(*self._args, **self._kwargs) 50 except: 51 self._exc_info = sys.exc_info() 52 raise 53 54 55 class ReraiserThreadGroup(object): 56 """A group of ReraiserThread objects.""" 57 58 def __init__(self, threads=[]): 59 """Initialize thread group. 60 61 Args: 62 threads: a list of ReraiserThread objects; defaults to empty. 63 """ 64 self._threads = threads 65 66 def Add(self, thread): 67 """Add a thread to the group. 68 69 Args: 70 thread: a ReraiserThread object. 71 """ 72 self._threads.append(thread) 73 74 def StartAll(self): 75 """Start all threads.""" 76 for thread in self._threads: 77 thread.start() 78 79 def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): 80 """Join all threads without stack dumps. 81 82 Reraises exceptions raised by the child threads and supports breaking 83 immediately on exceptions raised on the main thread. 84 85 Args: 86 watcher: Watchdog object providing timeout, by default waits forever. 87 """ 88 alive_threads = self._threads[:] 89 while alive_threads: 90 for thread in alive_threads[:]: 91 if watcher.IsTimedOut(): 92 raise TimeoutError('Timed out waiting for %d of %d threads.' % 93 (len(alive_threads), len(self._threads))) 94 # Allow the main thread to periodically check for interrupts. 95 thread.join(0.1) 96 if not thread.isAlive(): 97 alive_threads.remove(thread) 98 # All threads are allowed to complete before reraising exceptions. 99 for thread in self._threads: 100 thread.ReraiseIfException() 101 102 def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): 103 """Join all threads. 104 105 Reraises exceptions raised by the child threads and supports breaking 106 immediately on exceptions raised on the main thread. Unfinished threads' 107 stacks will be logged on watchdog timeout. 108 109 Args: 110 watcher: Watchdog object providing timeout, by default waits forever. 111 """ 112 try: 113 self._JoinAll(watcher) 114 except TimeoutError: 115 for thread in (t for t in self._threads if t.isAlive()): 116 stack = sys._current_frames()[thread.ident] 117 logging.critical('*' * 80) 118 logging.critical('Stack dump for timed out thread \'%s\'', thread.name) 119 logging.critical('*' * 80) 120 for filename, lineno, name, line in traceback.extract_stack(stack): 121 logging.critical('File: "%s", line %d, in %s', filename, lineno, name) 122 if line: 123 logging.critical(' %s', line.strip()) 124 logging.critical('*' * 80) 125 raise 126