Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support
      3 from contextlib import closing
      4 import gc
      5 import pickle
      6 import select
      7 import signal
      8 import subprocess
      9 import traceback
     10 import sys, os, time, errno
     11 
     12 if sys.platform in ('os2', 'riscos'):
     13     raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
     14 
     15 
     16 class HandlerBCalled(Exception):
     17     pass
     18 
     19 
     20 def exit_subprocess():
     21     """Use os._exit(0) to exit the current subprocess.
     22 
     23     Otherwise, the test catches the SystemExit and continues executing
     24     in parallel with the original test, so you wind up with an
     25     exponential number of tests running concurrently.
     26     """
     27     os._exit(0)
     28 
     29 
     30 def ignoring_eintr(__func, *args, **kwargs):
     31     try:
     32         return __func(*args, **kwargs)
     33     except EnvironmentError as e:
     34         if e.errno != errno.EINTR:
     35             raise
     36         return None
     37 
     38 
     39 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     40 class InterProcessSignalTests(unittest.TestCase):
     41     MAX_DURATION = 20   # Entire test should last at most 20 sec.

     42 
     43     def setUp(self):
     44         self.using_gc = gc.isenabled()
     45         gc.disable()
     46 
     47     def tearDown(self):
     48         if self.using_gc:
     49             gc.enable()
     50 
     51     def format_frame(self, frame, limit=None):
     52         return ''.join(traceback.format_stack(frame, limit=limit))
     53 
     54     def handlerA(self, signum, frame):
     55         self.a_called = True
     56         if test_support.verbose:
     57             print "handlerA invoked from signal %s at:\n%s" % (
     58                 signum, self.format_frame(frame, limit=1))
     59 
     60     def handlerB(self, signum, frame):
     61         self.b_called = True
     62         if test_support.verbose:
     63             print "handlerB invoked from signal %s at:\n%s" % (
     64                 signum, self.format_frame(frame, limit=1))
     65         raise HandlerBCalled(signum, self.format_frame(frame))
     66 
     67     def wait(self, child):
     68         """Wait for child to finish, ignoring EINTR."""
     69         while True:
     70             try:
     71                 child.wait()
     72                 return
     73             except OSError as e:
     74                 if e.errno != errno.EINTR:
     75                     raise
     76 
     77     def run_test(self):
     78         # Install handlers. This function runs in a sub-process, so we

     79         # don't worry about re-setting the default handlers.

     80         signal.signal(signal.SIGHUP, self.handlerA)
     81         signal.signal(signal.SIGUSR1, self.handlerB)
     82         signal.signal(signal.SIGUSR2, signal.SIG_IGN)
     83         signal.signal(signal.SIGALRM, signal.default_int_handler)
     84 
     85         # Variables the signals will modify:

     86         self.a_called = False
     87         self.b_called = False
     88 
     89         # Let the sub-processes know who to send signals to.

     90         pid = os.getpid()
     91         if test_support.verbose:
     92             print "test runner's pid is", pid
     93 
     94         child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
     95         if child:
     96             self.wait(child)
     97             if not self.a_called:
     98                 time.sleep(1)  # Give the signal time to be delivered.

     99         self.assertTrue(self.a_called)
    100         self.assertFalse(self.b_called)
    101         self.a_called = False
    102 
    103         # Make sure the signal isn't delivered while the previous

    104         # Popen object is being destroyed, because __del__ swallows

    105         # exceptions.

    106         del child
    107         try:
    108             child = subprocess.Popen(['kill', '-USR1', str(pid)])
    109             # This wait should be interrupted by the signal's exception.

    110             self.wait(child)
    111             time.sleep(1)  # Give the signal time to be delivered.

    112             self.fail('HandlerBCalled exception not thrown')
    113         except HandlerBCalled:
    114             self.assertTrue(self.b_called)
    115             self.assertFalse(self.a_called)
    116             if test_support.verbose:
    117                 print "HandlerBCalled exception caught"
    118 
    119         child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
    120         if child:
    121             self.wait(child)  # Nothing should happen.

    122 
    123         try:
    124             signal.alarm(1)
    125             # The race condition in pause doesn't matter in this case,

    126             # since alarm is going to raise a KeyboardException, which

    127             # will skip the call.

    128             signal.pause()
    129             # But if another signal arrives before the alarm, pause

    130             # may return early.

    131             time.sleep(1)
    132         except KeyboardInterrupt:
    133             if test_support.verbose:
    134                 print "KeyboardInterrupt (the alarm() went off)"
    135         except:
    136             self.fail("Some other exception woke us from pause: %s" %
    137                       traceback.format_exc())
    138         else:
    139             self.fail("pause returned of its own accord, and the signal"
    140                       " didn't arrive after another second.")
    141 
    142     # Issue 3864. Unknown if this affects earlier versions of freebsd also.

    143     @unittest.skipIf(sys.platform=='freebsd6',
    144         'inter process signals not reliable (do not mix well with threading) '
    145         'on freebsd6')
    146     def test_main(self):
    147         # This function spawns a child process to insulate the main

    148         # test-running process from all the signals. It then

    149         # communicates with that child process over a pipe and

    150         # re-raises information about any exceptions the child

    151         # throws. The real work happens in self.run_test().

    152         os_done_r, os_done_w = os.pipe()
    153         with closing(os.fdopen(os_done_r)) as done_r, \
    154              closing(os.fdopen(os_done_w, 'w')) as done_w:
    155             child = os.fork()
    156             if child == 0:
    157                 # In the child process; run the test and report results

    158                 # through the pipe.

    159                 try:
    160                     done_r.close()
    161                     # Have to close done_w again here because

    162                     # exit_subprocess() will skip the enclosing with block.

    163                     with closing(done_w):
    164                         try:
    165                             self.run_test()
    166                         except:
    167                             pickle.dump(traceback.format_exc(), done_w)
    168                         else:
    169                             pickle.dump(None, done_w)
    170                 except:
    171                     print 'Uh oh, raised from pickle.'
    172                     traceback.print_exc()
    173                 finally:
    174                     exit_subprocess()
    175 
    176             done_w.close()
    177             # Block for up to MAX_DURATION seconds for the test to finish.

    178             r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
    179             if done_r in r:
    180                 tb = pickle.load(done_r)
    181                 if tb:
    182                     self.fail(tb)
    183             else:
    184                 os.kill(child, signal.SIGKILL)
    185                 self.fail('Test deadlocked after %d seconds.' %
    186                           self.MAX_DURATION)
    187 
    188 
    189 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    190 class BasicSignalTests(unittest.TestCase):
    191     def trivial_signal_handler(self, *args):
    192         pass
    193 
    194     def test_out_of_range_signal_number_raises_error(self):
    195         self.assertRaises(ValueError, signal.getsignal, 4242)
    196 
    197         self.assertRaises(ValueError, signal.signal, 4242,
    198                           self.trivial_signal_handler)
    199 
    200     def test_setting_signal_handler_to_none_raises_error(self):
    201         self.assertRaises(TypeError, signal.signal,
    202                           signal.SIGUSR1, None)
    203 
    204     def test_getsignal(self):
    205         hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
    206         self.assertEqual(signal.getsignal(signal.SIGHUP),
    207                          self.trivial_signal_handler)
    208         signal.signal(signal.SIGHUP, hup)
    209         self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
    210 
    211 
    212 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
    213 class WindowsSignalTests(unittest.TestCase):
    214     def test_issue9324(self):
    215         # Updated for issue #10003, adding SIGBREAK

    216         handler = lambda x, y: None
    217         for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
    218                     signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
    219                     signal.SIGTERM):
    220             # Set and then reset a handler for signals that work on windows

    221             signal.signal(sig, signal.signal(sig, handler))
    222 
    223         with self.assertRaises(ValueError):
    224             signal.signal(-1, handler)
    225 
    226         with self.assertRaises(ValueError):
    227             signal.signal(7, handler)
    228 
    229 
    230 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    231 class WakeupSignalTests(unittest.TestCase):
    232     TIMEOUT_FULL = 10
    233     TIMEOUT_HALF = 5
    234 
    235     def test_wakeup_fd_early(self):
    236         import select
    237 
    238         signal.alarm(1)
    239         before_time = time.time()
    240         # We attempt to get a signal during the sleep,

    241         # before select is called

    242         time.sleep(self.TIMEOUT_FULL)
    243         mid_time = time.time()
    244         self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
    245         select.select([self.read], [], [], self.TIMEOUT_FULL)
    246         after_time = time.time()
    247         self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
    248 
    249     def test_wakeup_fd_during(self):
    250         import select
    251 
    252         signal.alarm(1)
    253         before_time = time.time()
    254         # We attempt to get a signal during the select call

    255         self.assertRaises(select.error, select.select,
    256             [self.read], [], [], self.TIMEOUT_FULL)
    257         after_time = time.time()
    258         self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
    259 
    260     def setUp(self):
    261         import fcntl
    262 
    263         self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
    264         self.read, self.write = os.pipe()
    265         flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
    266         flags = flags | os.O_NONBLOCK
    267         fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
    268         self.old_wakeup = signal.set_wakeup_fd(self.write)
    269 
    270     def tearDown(self):
    271         signal.set_wakeup_fd(self.old_wakeup)
    272         os.close(self.read)
    273         os.close(self.write)
    274         signal.signal(signal.SIGALRM, self.alrm)
    275 
    276 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    277 class SiginterruptTest(unittest.TestCase):
    278 
    279     def setUp(self):
    280         """Install a no-op signal handler that can be set to allow
    281         interrupts or not, and arrange for the original signal handler to be
    282         re-installed when the test is finished.
    283         """
    284         self.signum = signal.SIGUSR1
    285         oldhandler = signal.signal(self.signum, lambda x,y: None)
    286         self.addCleanup(signal.signal, self.signum, oldhandler)
    287 
    288     def readpipe_interrupted(self):
    289         """Perform a read during which a signal will arrive.  Return True if the
    290         read is interrupted by the signal and raises an exception.  Return False
    291         if it returns normally.
    292         """
    293         # Create a pipe that can be used for the read.  Also clean it up

    294         # when the test is over, since nothing else will (but see below for

    295         # the write end).

    296         r, w = os.pipe()
    297         self.addCleanup(os.close, r)
    298 
    299         # Create another process which can send a signal to this one to try

    300         # to interrupt the read.

    301         ppid = os.getpid()
    302         pid = os.fork()
    303 
    304         if pid == 0:
    305             # Child code: sleep to give the parent enough time to enter the

    306             # read() call (there's a race here, but it's really tricky to

    307             # eliminate it); then signal the parent process.  Also, sleep

    308             # again to make it likely that the signal is delivered to the

    309             # parent process before the child exits.  If the child exits

    310             # first, the write end of the pipe will be closed and the test

    311             # is invalid.

    312             try:
    313                 time.sleep(0.2)
    314                 os.kill(ppid, self.signum)
    315                 time.sleep(0.2)
    316             finally:
    317                 # No matter what, just exit as fast as possible now.

    318                 exit_subprocess()
    319         else:
    320             # Parent code.

    321             # Make sure the child is eventually reaped, else it'll be a

    322             # zombie for the rest of the test suite run.

    323             self.addCleanup(os.waitpid, pid, 0)
    324 
    325             # Close the write end of the pipe.  The child has a copy, so

    326             # it's not really closed until the child exits.  We need it to

    327             # close when the child exits so that in the non-interrupt case

    328             # the read eventually completes, otherwise we could just close

    329             # it *after* the test.

    330             os.close(w)
    331 
    332             # Try the read and report whether it is interrupted or not to

    333             # the caller.

    334             try:
    335                 d = os.read(r, 1)
    336                 return False
    337             except OSError, err:
    338                 if err.errno != errno.EINTR:
    339                     raise
    340                 return True
    341 
    342     def test_without_siginterrupt(self):
    343         """If a signal handler is installed and siginterrupt is not called
    344         at all, when that signal arrives, it interrupts a syscall that's in
    345         progress.
    346         """
    347         i = self.readpipe_interrupted()
    348         self.assertTrue(i)
    349         # Arrival of the signal shouldn't have changed anything.

    350         i = self.readpipe_interrupted()
    351         self.assertTrue(i)
    352 
    353     def test_siginterrupt_on(self):
    354         """If a signal handler is installed and siginterrupt is called with
    355         a true value for the second argument, when that signal arrives, it
    356         interrupts a syscall that's in progress.
    357         """
    358         signal.siginterrupt(self.signum, 1)
    359         i = self.readpipe_interrupted()
    360         self.assertTrue(i)
    361         # Arrival of the signal shouldn't have changed anything.

    362         i = self.readpipe_interrupted()
    363         self.assertTrue(i)
    364 
    365     def test_siginterrupt_off(self):
    366         """If a signal handler is installed and siginterrupt is called with
    367         a false value for the second argument, when that signal arrives, it
    368         does not interrupt a syscall that's in progress.
    369         """
    370         signal.siginterrupt(self.signum, 0)
    371         i = self.readpipe_interrupted()
    372         self.assertFalse(i)
    373         # Arrival of the signal shouldn't have changed anything.

    374         i = self.readpipe_interrupted()
    375         self.assertFalse(i)
    376 
    377 
    378 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    379 class ItimerTest(unittest.TestCase):
    380     def setUp(self):
    381         self.hndl_called = False
    382         self.hndl_count = 0
    383         self.itimer = None
    384         self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
    385 
    386     def tearDown(self):
    387         signal.signal(signal.SIGALRM, self.old_alarm)
    388         if self.itimer is not None: # test_itimer_exc doesn't change this attr

    389             # just ensure that itimer is stopped

    390             signal.setitimer(self.itimer, 0)
    391 
    392     def sig_alrm(self, *args):
    393         self.hndl_called = True
    394         if test_support.verbose:
    395             print("SIGALRM handler invoked", args)
    396 
    397     def sig_vtalrm(self, *args):
    398         self.hndl_called = True
    399 
    400         if self.hndl_count > 3:
    401             # it shouldn't be here, because it should have been disabled.

    402             raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
    403                 "timer.")
    404         elif self.hndl_count == 3:
    405             # disable ITIMER_VIRTUAL, this function shouldn't be called anymore

    406             signal.setitimer(signal.ITIMER_VIRTUAL, 0)
    407             if test_support.verbose:
    408                 print("last SIGVTALRM handler call")
    409 
    410         self.hndl_count += 1
    411 
    412         if test_support.verbose:
    413             print("SIGVTALRM handler invoked", args)
    414 
    415     def sig_prof(self, *args):
    416         self.hndl_called = True
    417         signal.setitimer(signal.ITIMER_PROF, 0)
    418 
    419         if test_support.verbose:
    420             print("SIGPROF handler invoked", args)
    421 
    422     def test_itimer_exc(self):
    423         # XXX I'm assuming -1 is an invalid itimer, but maybe some platform

    424         # defines it ?

    425         self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
    426         # Negative times are treated as zero on some platforms.

    427         if 0:
    428             self.assertRaises(signal.ItimerError,
    429                               signal.setitimer, signal.ITIMER_REAL, -1)
    430 
    431     def test_itimer_real(self):
    432         self.itimer = signal.ITIMER_REAL
    433         signal.setitimer(self.itimer, 1.0)
    434         if test_support.verbose:
    435             print("\ncall pause()...")
    436         signal.pause()
    437 
    438         self.assertEqual(self.hndl_called, True)
    439 
    440     # Issue 3864. Unknown if this affects earlier versions of freebsd also.

    441     @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
    442         'itimer not reliable (does not mix well with threading) on some BSDs.')
    443     def test_itimer_virtual(self):
    444         self.itimer = signal.ITIMER_VIRTUAL
    445         signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
    446         signal.setitimer(self.itimer, 0.3, 0.2)
    447 
    448         start_time = time.time()
    449         while time.time() - start_time < 60.0:
    450             # use up some virtual time by doing real work

    451             _ = pow(12345, 67890, 10000019)
    452             if signal.getitimer(self.itimer) == (0.0, 0.0):
    453                 break # sig_vtalrm handler stopped this itimer

    454         else: # Issue 8424

    455             self.skipTest("timeout: likely cause: machine too slow or load too "
    456                           "high")
    457 
    458         # virtual itimer should be (0.0, 0.0) now

    459         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
    460         # and the handler should have been called

    461         self.assertEqual(self.hndl_called, True)
    462 
    463     # Issue 3864. Unknown if this affects earlier versions of freebsd also.

    464     @unittest.skipIf(sys.platform=='freebsd6',
    465         'itimer not reliable (does not mix well with threading) on freebsd6')
    466     def test_itimer_prof(self):
    467         self.itimer = signal.ITIMER_PROF
    468         signal.signal(signal.SIGPROF, self.sig_prof)
    469         signal.setitimer(self.itimer, 0.2, 0.2)
    470 
    471         start_time = time.time()
    472         while time.time() - start_time < 60.0:
    473             # do some work

    474             _ = pow(12345, 67890, 10000019)
    475             if signal.getitimer(self.itimer) == (0.0, 0.0):
    476                 break # sig_prof handler stopped this itimer

    477         else: # Issue 8424

    478             self.skipTest("timeout: likely cause: machine too slow or load too "
    479                           "high")
    480 
    481         # profiling itimer should be (0.0, 0.0) now

    482         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
    483         # and the handler should have been called

    484         self.assertEqual(self.hndl_called, True)
    485 
    486 def test_main():
    487     test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
    488                               WakeupSignalTests, SiginterruptTest,
    489                               ItimerTest, WindowsSignalTests)
    490 
    491 
    492 if __name__ == "__main__":
    493     test_main()
    494