Home | History | Annotate | Download | only in test
      1 import gc
      2 import os
      3 import sys
      4 import signal
      5 import weakref
      6 
      7 from cStringIO import StringIO
      8 
      9 
     10 import unittest
     11 
     12 
     13 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
     14 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
     15 @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
     16     "if threads have been used")
     17 class TestBreak(unittest.TestCase):
     18     int_handler = None
     19 
     20     def setUp(self):
     21         self._default_handler = signal.getsignal(signal.SIGINT)
     22         if self.int_handler is not None:
     23             signal.signal(signal.SIGINT, self.int_handler)
     24 
     25     def tearDown(self):
     26         signal.signal(signal.SIGINT, self._default_handler)
     27         unittest.signals._results = weakref.WeakKeyDictionary()
     28         unittest.signals._interrupt_handler = None
     29 
     30 
     31     def testInstallHandler(self):
     32         default_handler = signal.getsignal(signal.SIGINT)
     33         unittest.installHandler()
     34         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
     35 
     36         try:
     37             pid = os.getpid()
     38             os.kill(pid, signal.SIGINT)
     39         except KeyboardInterrupt:
     40             self.fail("KeyboardInterrupt not handled")
     41 
     42         self.assertTrue(unittest.signals._interrupt_handler.called)
     43 
     44     def testRegisterResult(self):
     45         result = unittest.TestResult()
     46         unittest.registerResult(result)
     47 
     48         for ref in unittest.signals._results:
     49             if ref is result:
     50                 break
     51             elif ref is not result:
     52                 self.fail("odd object in result set")
     53         else:
     54             self.fail("result not found")
     55 
     56 
     57     def testInterruptCaught(self):
     58         default_handler = signal.getsignal(signal.SIGINT)
     59 
     60         result = unittest.TestResult()
     61         unittest.installHandler()
     62         unittest.registerResult(result)
     63 
     64         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
     65 
     66         def test(result):
     67             pid = os.getpid()
     68             os.kill(pid, signal.SIGINT)
     69             result.breakCaught = True
     70             self.assertTrue(result.shouldStop)
     71 
     72         try:
     73             test(result)
     74         except KeyboardInterrupt:
     75             self.fail("KeyboardInterrupt not handled")
     76         self.assertTrue(result.breakCaught)
     77 
     78 
     79     def testSecondInterrupt(self):
     80         # Can't use skipIf decorator because the signal handler may have
     81         # been changed after defining this method.
     82         if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
     83             self.skipTest("test requires SIGINT to not be ignored")
     84         result = unittest.TestResult()
     85         unittest.installHandler()
     86         unittest.registerResult(result)
     87 
     88         def test(result):
     89             pid = os.getpid()
     90             os.kill(pid, signal.SIGINT)
     91             result.breakCaught = True
     92             self.assertTrue(result.shouldStop)
     93             os.kill(pid, signal.SIGINT)
     94             self.fail("Second KeyboardInterrupt not raised")
     95 
     96         try:
     97             test(result)
     98         except KeyboardInterrupt:
     99             pass
    100         else:
    101             self.fail("Second KeyboardInterrupt not raised")
    102         self.assertTrue(result.breakCaught)
    103 
    104 
    105     def testTwoResults(self):
    106         unittest.installHandler()
    107 
    108         result = unittest.TestResult()
    109         unittest.registerResult(result)
    110         new_handler = signal.getsignal(signal.SIGINT)
    111 
    112         result2 = unittest.TestResult()
    113         unittest.registerResult(result2)
    114         self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
    115 
    116         result3 = unittest.TestResult()
    117 
    118         def test(result):
    119             pid = os.getpid()
    120             os.kill(pid, signal.SIGINT)
    121 
    122         try:
    123             test(result)
    124         except KeyboardInterrupt:
    125             self.fail("KeyboardInterrupt not handled")
    126 
    127         self.assertTrue(result.shouldStop)
    128         self.assertTrue(result2.shouldStop)
    129         self.assertFalse(result3.shouldStop)
    130 
    131 
    132     def testHandlerReplacedButCalled(self):
    133         # Can't use skipIf decorator because the signal handler may have
    134         # been changed after defining this method.
    135         if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
    136             self.skipTest("test requires SIGINT to not be ignored")
    137         # If our handler has been replaced (is no longer installed) but is
    138         # called by the *new* handler, then it isn't safe to delay the
    139         # SIGINT and we should immediately delegate to the default handler
    140         unittest.installHandler()
    141 
    142         handler = signal.getsignal(signal.SIGINT)
    143         def new_handler(frame, signum):
    144             handler(frame, signum)
    145         signal.signal(signal.SIGINT, new_handler)
    146 
    147         try:
    148             pid = os.getpid()
    149             os.kill(pid, signal.SIGINT)
    150         except KeyboardInterrupt:
    151             pass
    152         else:
    153             self.fail("replaced but delegated handler doesn't raise interrupt")
    154 
    155     def testRunner(self):
    156         # Creating a TextTestRunner with the appropriate argument should
    157         # register the TextTestResult it creates
    158         runner = unittest.TextTestRunner(stream=StringIO())
    159 
    160         result = runner.run(unittest.TestSuite())
    161         self.assertIn(result, unittest.signals._results)
    162 
    163     def testWeakReferences(self):
    164         # Calling registerResult on a result should not keep it alive
    165         result = unittest.TestResult()
    166         unittest.registerResult(result)
    167 
    168         ref = weakref.ref(result)
    169         del result
    170 
    171         # For non-reference counting implementations
    172         gc.collect();gc.collect()
    173         self.assertIsNone(ref())
    174 
    175 
    176     def testRemoveResult(self):
    177         result = unittest.TestResult()
    178         unittest.registerResult(result)
    179 
    180         unittest.installHandler()
    181         self.assertTrue(unittest.removeResult(result))
    182 
    183         # Should this raise an error instead?
    184         self.assertFalse(unittest.removeResult(unittest.TestResult()))
    185 
    186         try:
    187             pid = os.getpid()
    188             os.kill(pid, signal.SIGINT)
    189         except KeyboardInterrupt:
    190             pass
    191 
    192         self.assertFalse(result.shouldStop)
    193 
    194     def testMainInstallsHandler(self):
    195         failfast = object()
    196         test = object()
    197         verbosity = object()
    198         result = object()
    199         default_handler = signal.getsignal(signal.SIGINT)
    200 
    201         class FakeRunner(object):
    202             initArgs = []
    203             runArgs = []
    204             def __init__(self, *args, **kwargs):
    205                 self.initArgs.append((args, kwargs))
    206             def run(self, test):
    207                 self.runArgs.append(test)
    208                 return result
    209 
    210         class Program(unittest.TestProgram):
    211             def __init__(self, catchbreak):
    212                 self.exit = False
    213                 self.verbosity = verbosity
    214                 self.failfast = failfast
    215                 self.catchbreak = catchbreak
    216                 self.testRunner = FakeRunner
    217                 self.test = test
    218                 self.result = None
    219 
    220         p = Program(False)
    221         p.runTests()
    222 
    223         self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
    224                                                      'verbosity': verbosity,
    225                                                      'failfast': failfast})])
    226         self.assertEqual(FakeRunner.runArgs, [test])
    227         self.assertEqual(p.result, result)
    228 
    229         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
    230 
    231         FakeRunner.initArgs = []
    232         FakeRunner.runArgs = []
    233         p = Program(True)
    234         p.runTests()
    235 
    236         self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
    237                                                      'verbosity': verbosity,
    238                                                      'failfast': failfast})])
    239         self.assertEqual(FakeRunner.runArgs, [test])
    240         self.assertEqual(p.result, result)
    241 
    242         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
    243 
    244     def testRemoveHandler(self):
    245         default_handler = signal.getsignal(signal.SIGINT)
    246         unittest.installHandler()
    247         unittest.removeHandler()
    248         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
    249 
    250         # check that calling removeHandler multiple times has no ill-effect
    251         unittest.removeHandler()
    252         self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
    253 
    254     def testRemoveHandlerAsDecorator(self):
    255         default_handler = signal.getsignal(signal.SIGINT)
    256         unittest.installHandler()
    257 
    258         @unittest.removeHandler
    259         def test():
    260             self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
    261 
    262         test()
    263         self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
    264 
    265 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
    266 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
    267 @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
    268     "if threads have been used")
    269 class TestBreakDefaultIntHandler(TestBreak):
    270     int_handler = signal.default_int_handler
    271 
    272 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
    273 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
    274 @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
    275     "if threads have been used")
    276 class TestBreakSignalIgnored(TestBreak):
    277     int_handler = signal.SIG_IGN
    278 
    279 @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
    280 @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
    281 @unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
    282     "if threads have been used")
    283 class TestBreakSignalDefault(TestBreak):
    284     int_handler = signal.SIG_DFL
    285