Home | History | Annotate | Download | only in unittest
      1 import signal
      2 import weakref
      3 
      4 from functools import wraps
      5 
      6 __unittest = True
      7 
      8 
      9 class _InterruptHandler(object):
     10     def __init__(self, default_handler):
     11         self.called = False
     12         self.original_handler = default_handler
     13         if isinstance(default_handler, int):
     14             if default_handler == signal.SIG_DFL:
     15                 # Pretend it's signal.default_int_handler instead.
     16                 default_handler = signal.default_int_handler
     17             elif default_handler == signal.SIG_IGN:
     18                 # Not quite the same thing as SIG_IGN, but the closest we
     19                 # can make it: do nothing.
     20                 def default_handler(unused_signum, unused_frame):
     21                     pass
     22             else:
     23                 raise TypeError("expected SIGINT signal handler to be "
     24                                 "signal.SIG_IGN, signal.SIG_DFL, or a "
     25                                 "callable object")
     26         self.default_handler = default_handler
     27 
     28     def __call__(self, signum, frame):
     29         installed_handler = signal.getsignal(signal.SIGINT)
     30         if installed_handler is not self:
     31             # if we aren't the installed handler, then delegate immediately
     32             # to the default handler
     33             self.default_handler(signum, frame)
     34 
     35         if self.called:
     36             self.default_handler(signum, frame)
     37         self.called = True
     38         for result in _results.keys():
     39             result.stop()
     40 
     41 _results = weakref.WeakKeyDictionary()
     42 def registerResult(result):
     43     _results[result] = 1
     44 
     45 def removeResult(result):
     46     return bool(_results.pop(result, None))
     47 
     48 _interrupt_handler = None
     49 def installHandler():
     50     global _interrupt_handler
     51     if _interrupt_handler is None:
     52         default_handler = signal.getsignal(signal.SIGINT)
     53         _interrupt_handler = _InterruptHandler(default_handler)
     54         signal.signal(signal.SIGINT, _interrupt_handler)
     55 
     56 
     57 def removeHandler(method=None):
     58     if method is not None:
     59         @wraps(method)
     60         def inner(*args, **kwargs):
     61             initial = signal.getsignal(signal.SIGINT)
     62             removeHandler()
     63             try:
     64                 return method(*args, **kwargs)
     65             finally:
     66                 signal.signal(signal.SIGINT, initial)
     67         return inner
     68 
     69     global _interrupt_handler
     70     if _interrupt_handler is not None:
     71         signal.signal(signal.SIGINT, _interrupt_handler.original_handler)
     72