Home | History | Annotate | Download | only in test
      1 import unittest
      2 from test import test_support
      3 from contextlib import closing
      4 import gc
      5 import pickle
      6 import select
      7 import signal
      8 import subprocess
      9 import traceback
     10 import sys, os, time, errno
     11 
     12 if sys.platform in ('os2', 'riscos'):
     13     raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
     14 
     15 
     16 class HandlerBCalled(Exception):
     17     pass
     18 
     19 
     20 def exit_subprocess():
     21     """Use os._exit(0) to exit the current subprocess.
     22 
     23     Otherwise, the test catches the SystemExit and continues executing
     24     in parallel with the original test, so you wind up with an
     25     exponential number of tests running concurrently.
     26     """
     27     os._exit(0)
     28 
     29 
     30 def ignoring_eintr(__func, *args, **kwargs):
     31     try:
     32         return __func(*args, **kwargs)
     33     except EnvironmentError as e:
     34         if e.errno != errno.EINTR:
     35             raise
     36         return None
     37 
     38 
     39 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
     40 class InterProcessSignalTests(unittest.TestCase):
     41     MAX_DURATION = 20   # Entire test should last at most 20 sec.
     42 
     43     def setUp(self):
     44         self.using_gc = gc.isenabled()
     45         gc.disable()
     46 
     47     def tearDown(self):
     48         if self.using_gc:
     49             gc.enable()
     50 
     51     def format_frame(self, frame, limit=None):
     52         return ''.join(traceback.format_stack(frame, limit=limit))
     53 
     54     def handlerA(self, signum, frame):
     55         self.a_called = True
     56         if test_support.verbose:
     57             print "handlerA invoked from signal %s at:\n%s" % (
     58                 signum, self.format_frame(frame, limit=1))
     59 
     60     def handlerB(self, signum, frame):
     61         self.b_called = True
     62         if test_support.verbose:
     63             print "handlerB invoked from signal %s at:\n%s" % (
     64                 signum, self.format_frame(frame, limit=1))
     65         raise HandlerBCalled(signum, self.format_frame(frame))
     66 
     67     def wait(self, child):
     68         """Wait for child to finish, ignoring EINTR."""
     69         while True:
     70             try:
     71                 child.wait()
     72                 return
     73             except OSError as e:
     74                 if e.errno != errno.EINTR:
     75                     raise
     76 
     77     def run_test(self):
     78         # Install handlers. This function runs in a sub-process, so we
     79         # don't worry about re-setting the default handlers.
     80         signal.signal(signal.SIGHUP, self.handlerA)
     81         signal.signal(signal.SIGUSR1, self.handlerB)
     82         signal.signal(signal.SIGUSR2, signal.SIG_IGN)
     83         signal.signal(signal.SIGALRM, signal.default_int_handler)
     84 
     85         # Variables the signals will modify:
     86         self.a_called = False
     87         self.b_called = False
     88 
     89         # Let the sub-processes know who to send signals to.
     90         pid = os.getpid()
     91         if test_support.verbose:
     92             print "test runner's pid is", pid
     93 
     94         child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
     95         if child:
     96             self.wait(child)
     97             if not self.a_called:
     98                 time.sleep(1)  # Give the signal time to be delivered.
     99         self.assertTrue(self.a_called)
    100         self.assertFalse(self.b_called)
    101         self.a_called = False
    102 
    103         # Make sure the signal isn't delivered while the previous
    104         # Popen object is being destroyed, because __del__ swallows
    105         # exceptions.
    106         del child
    107         try:
    108             child = subprocess.Popen(['kill', '-USR1', str(pid)])
    109             # This wait should be interrupted by the signal's exception.
    110             self.wait(child)
    111             time.sleep(1)  # Give the signal time to be delivered.
    112             self.fail('HandlerBCalled exception not raised')
    113         except HandlerBCalled:
    114             self.assertTrue(self.b_called)
    115             self.assertFalse(self.a_called)
    116             if test_support.verbose:
    117                 print "HandlerBCalled exception caught"
    118 
    119         child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
    120         if child:
    121             self.wait(child)  # Nothing should happen.
    122 
    123         try:
    124             signal.alarm(1)
    125             # The race condition in pause doesn't matter in this case,
    126             # since alarm is going to raise a KeyboardException, which
    127             # will skip the call.
    128             signal.pause()
    129             # But if another signal arrives before the alarm, pause
    130             # may return early.
    131             time.sleep(1)
    132         except KeyboardInterrupt:
    133             if test_support.verbose:
    134                 print "KeyboardInterrupt (the alarm() went off)"
    135         except:
    136             self.fail("Some other exception woke us from pause: %s" %
    137                       traceback.format_exc())
    138         else:
    139             self.fail("pause returned of its own accord, and the signal"
    140                       " didn't arrive after another second.")
    141 
    142     # Issue 3864. Unknown if this affects earlier versions of freebsd also.
    143     @unittest.skipIf(sys.platform=='freebsd6',
    144         'inter process signals not reliable (do not mix well with threading) '
    145         'on freebsd6')
    146     def test_main(self):
    147         # This function spawns a child process to insulate the main
    148         # test-running process from all the signals. It then
    149         # communicates with that child process over a pipe and
    150         # re-raises information about any exceptions the child
    151         # raises. The real work happens in self.run_test().
    152         os_done_r, os_done_w = os.pipe()
    153         with closing(os.fdopen(os_done_r)) as done_r, \
    154              closing(os.fdopen(os_done_w, 'w')) as done_w:
    155             child = os.fork()
    156             if child == 0:
    157                 # In the child process; run the test and report results
    158                 # through the pipe.
    159                 try:
    160                     done_r.close()
    161                     # Have to close done_w again here because
    162                     # exit_subprocess() will skip the enclosing with block.
    163                     with closing(done_w):
    164                         try:
    165                             self.run_test()
    166                         except:
    167                             pickle.dump(traceback.format_exc(), done_w)
    168                         else:
    169                             pickle.dump(None, done_w)
    170                 except:
    171                     print 'Uh oh, raised from pickle.'
    172                     traceback.print_exc()
    173                 finally:
    174                     exit_subprocess()
    175 
    176             done_w.close()
    177             # Block for up to MAX_DURATION seconds for the test to finish.
    178             r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
    179             if done_r in r:
    180                 tb = pickle.load(done_r)
    181                 if tb:
    182                     self.fail(tb)
    183             else:
    184                 os.kill(child, signal.SIGKILL)
    185                 self.fail('Test deadlocked after %d seconds.' %
    186                           self.MAX_DURATION)
    187 
    188 
    189 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    190 class BasicSignalTests(unittest.TestCase):
    191     def trivial_signal_handler(self, *args):
    192         pass
    193 
    194     def test_out_of_range_signal_number_raises_error(self):
    195         self.assertRaises(ValueError, signal.getsignal, 4242)
    196 
    197         self.assertRaises(ValueError, signal.signal, 4242,
    198                           self.trivial_signal_handler)
    199 
    200     def test_setting_signal_handler_to_none_raises_error(self):
    201         self.assertRaises(TypeError, signal.signal,
    202                           signal.SIGUSR1, None)
    203 
    204     def test_getsignal(self):
    205         hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
    206         self.assertEqual(signal.getsignal(signal.SIGHUP),
    207                          self.trivial_signal_handler)
    208         signal.signal(signal.SIGHUP, hup)
    209         self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
    210 
    211 
    212 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
    213 class WindowsSignalTests(unittest.TestCase):
    214     def test_issue9324(self):
    215         # Updated for issue #10003, adding SIGBREAK
    216         handler = lambda x, y: None
    217         for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
    218                     signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
    219                     signal.SIGTERM):
    220             # Set and then reset a handler for signals that work on windows
    221             signal.signal(sig, signal.signal(sig, handler))
    222 
    223         with self.assertRaises(ValueError):
    224             signal.signal(-1, handler)
    225 
    226         with self.assertRaises(ValueError):
    227             signal.signal(7, handler)
    228 
    229 
    230 class WakeupFDTests(unittest.TestCase):
    231 
    232     def test_invalid_fd(self):
    233         fd = test_support.make_bad_fd()
    234         self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
    235 
    236 
    237 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    238 class WakeupSignalTests(unittest.TestCase):
    239     TIMEOUT_FULL = 10
    240     TIMEOUT_HALF = 5
    241 
    242     def test_wakeup_fd_early(self):
    243         import select
    244 
    245         signal.alarm(1)
    246         before_time = time.time()
    247         # We attempt to get a signal during the sleep,
    248         # before select is called
    249         time.sleep(self.TIMEOUT_FULL)
    250         mid_time = time.time()
    251         self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
    252         select.select([self.read], [], [], self.TIMEOUT_FULL)
    253         after_time = time.time()
    254         self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
    255 
    256     def test_wakeup_fd_during(self):
    257         import select
    258 
    259         signal.alarm(1)
    260         before_time = time.time()
    261         # We attempt to get a signal during the select call
    262         self.assertRaises(select.error, select.select,
    263             [self.read], [], [], self.TIMEOUT_FULL)
    264         after_time = time.time()
    265         self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
    266 
    267     def setUp(self):
    268         import fcntl
    269 
    270         self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
    271         self.read, self.write = os.pipe()
    272         flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
    273         flags = flags | os.O_NONBLOCK
    274         fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
    275         self.old_wakeup = signal.set_wakeup_fd(self.write)
    276 
    277     def tearDown(self):
    278         signal.set_wakeup_fd(self.old_wakeup)
    279         os.close(self.read)
    280         os.close(self.write)
    281         signal.signal(signal.SIGALRM, self.alrm)
    282 
    283 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    284 class SiginterruptTest(unittest.TestCase):
    285 
    286     def setUp(self):
    287         """Install a no-op signal handler that can be set to allow
    288         interrupts or not, and arrange for the original signal handler to be
    289         re-installed when the test is finished.
    290         """
    291         self.signum = signal.SIGUSR1
    292         oldhandler = signal.signal(self.signum, lambda x,y: None)
    293         self.addCleanup(signal.signal, self.signum, oldhandler)
    294 
    295     def readpipe_interrupted(self):
    296         """Perform a read during which a signal will arrive.  Return True if the
    297         read is interrupted by the signal and raises an exception.  Return False
    298         if it returns normally.
    299         """
    300         # Create a pipe that can be used for the read.  Also clean it up
    301         # when the test is over, since nothing else will (but see below for
    302         # the write end).
    303         r, w = os.pipe()
    304         self.addCleanup(os.close, r)
    305 
    306         # Create another process which can send a signal to this one to try
    307         # to interrupt the read.
    308         ppid = os.getpid()
    309         pid = os.fork()
    310 
    311         if pid == 0:
    312             # Child code: sleep to give the parent enough time to enter the
    313             # read() call (there's a race here, but it's really tricky to
    314             # eliminate it); then signal the parent process.  Also, sleep
    315             # again to make it likely that the signal is delivered to the
    316             # parent process before the child exits.  If the child exits
    317             # first, the write end of the pipe will be closed and the test
    318             # is invalid.
    319             try:
    320                 time.sleep(0.2)
    321                 os.kill(ppid, self.signum)
    322                 time.sleep(0.2)
    323             finally:
    324                 # No matter what, just exit as fast as possible now.
    325                 exit_subprocess()
    326         else:
    327             # Parent code.
    328             # Make sure the child is eventually reaped, else it'll be a
    329             # zombie for the rest of the test suite run.
    330             self.addCleanup(os.waitpid, pid, 0)
    331 
    332             # Close the write end of the pipe.  The child has a copy, so
    333             # it's not really closed until the child exits.  We need it to
    334             # close when the child exits so that in the non-interrupt case
    335             # the read eventually completes, otherwise we could just close
    336             # it *after* the test.
    337             os.close(w)
    338 
    339             # Try the read and report whether it is interrupted or not to
    340             # the caller.
    341             try:
    342                 d = os.read(r, 1)
    343                 return False
    344             except OSError, err:
    345                 if err.errno != errno.EINTR:
    346                     raise
    347                 return True
    348 
    349     def test_without_siginterrupt(self):
    350         """If a signal handler is installed and siginterrupt is not called
    351         at all, when that signal arrives, it interrupts a syscall that's in
    352         progress.
    353         """
    354         i = self.readpipe_interrupted()
    355         self.assertTrue(i)
    356         # Arrival of the signal shouldn't have changed anything.
    357         i = self.readpipe_interrupted()
    358         self.assertTrue(i)
    359 
    360     def test_siginterrupt_on(self):
    361         """If a signal handler is installed and siginterrupt is called with
    362         a true value for the second argument, when that signal arrives, it
    363         interrupts a syscall that's in progress.
    364         """
    365         signal.siginterrupt(self.signum, 1)
    366         i = self.readpipe_interrupted()
    367         self.assertTrue(i)
    368         # Arrival of the signal shouldn't have changed anything.
    369         i = self.readpipe_interrupted()
    370         self.assertTrue(i)
    371 
    372     def test_siginterrupt_off(self):
    373         """If a signal handler is installed and siginterrupt is called with
    374         a false value for the second argument, when that signal arrives, it
    375         does not interrupt a syscall that's in progress.
    376         """
    377         signal.siginterrupt(self.signum, 0)
    378         i = self.readpipe_interrupted()
    379         self.assertFalse(i)
    380         # Arrival of the signal shouldn't have changed anything.
    381         i = self.readpipe_interrupted()
    382         self.assertFalse(i)
    383 
    384 
    385 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
    386 class ItimerTest(unittest.TestCase):
    387     def setUp(self):
    388         self.hndl_called = False
    389         self.hndl_count = 0
    390         self.itimer = None
    391         self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
    392 
    393     def tearDown(self):
    394         signal.signal(signal.SIGALRM, self.old_alarm)
    395         if self.itimer is not None: # test_itimer_exc doesn't change this attr
    396             # just ensure that itimer is stopped
    397             signal.setitimer(self.itimer, 0)
    398 
    399     def sig_alrm(self, *args):
    400         self.hndl_called = True
    401         if test_support.verbose:
    402             print("SIGALRM handler invoked", args)
    403 
    404     def sig_vtalrm(self, *args):
    405         self.hndl_called = True
    406 
    407         if self.hndl_count > 3:
    408             # it shouldn't be here, because it should have been disabled.
    409             raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
    410                 "timer.")
    411         elif self.hndl_count == 3:
    412             # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
    413             signal.setitimer(signal.ITIMER_VIRTUAL, 0)
    414             if test_support.verbose:
    415                 print("last SIGVTALRM handler call")
    416 
    417         self.hndl_count += 1
    418 
    419         if test_support.verbose:
    420             print("SIGVTALRM handler invoked", args)
    421 
    422     def sig_prof(self, *args):
    423         self.hndl_called = True
    424         signal.setitimer(signal.ITIMER_PROF, 0)
    425 
    426         if test_support.verbose:
    427             print("SIGPROF handler invoked", args)
    428 
    429     def test_itimer_exc(self):
    430         # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
    431         # defines it ?
    432         self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
    433         # Negative times are treated as zero on some platforms.
    434         if 0:
    435             self.assertRaises(signal.ItimerError,
    436                               signal.setitimer, signal.ITIMER_REAL, -1)
    437 
    438     def test_itimer_real(self):
    439         self.itimer = signal.ITIMER_REAL
    440         signal.setitimer(self.itimer, 1.0)
    441         if test_support.verbose:
    442             print("\ncall pause()...")
    443         signal.pause()
    444 
    445         self.assertEqual(self.hndl_called, True)
    446 
    447     # Issue 3864. Unknown if this affects earlier versions of freebsd also.
    448     @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
    449         'itimer not reliable (does not mix well with threading) on some BSDs.')
    450     def test_itimer_virtual(self):
    451         self.itimer = signal.ITIMER_VIRTUAL
    452         signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
    453         signal.setitimer(self.itimer, 0.3, 0.2)
    454 
    455         start_time = time.time()
    456         while time.time() - start_time < 60.0:
    457             # use up some virtual time by doing real work
    458             _ = pow(12345, 67890, 10000019)
    459             if signal.getitimer(self.itimer) == (0.0, 0.0):
    460                 break # sig_vtalrm handler stopped this itimer
    461         else: # Issue 8424
    462             self.skipTest("timeout: likely cause: machine too slow or load too "
    463                           "high")
    464 
    465         # virtual itimer should be (0.0, 0.0) now
    466         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
    467         # and the handler should have been called
    468         self.assertEqual(self.hndl_called, True)
    469 
    470     # Issue 3864. Unknown if this affects earlier versions of freebsd also.
    471     @unittest.skipIf(sys.platform=='freebsd6',
    472         'itimer not reliable (does not mix well with threading) on freebsd6')
    473     def test_itimer_prof(self):
    474         self.itimer = signal.ITIMER_PROF
    475         signal.signal(signal.SIGPROF, self.sig_prof)
    476         signal.setitimer(self.itimer, 0.2, 0.2)
    477 
    478         start_time = time.time()
    479         while time.time() - start_time < 60.0:
    480             # do some work
    481             _ = pow(12345, 67890, 10000019)
    482             if signal.getitimer(self.itimer) == (0.0, 0.0):
    483                 break # sig_prof handler stopped this itimer
    484         else: # Issue 8424
    485             self.skipTest("timeout: likely cause: machine too slow or load too "
    486                           "high")
    487 
    488         # profiling itimer should be (0.0, 0.0) now
    489         self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
    490         # and the handler should have been called
    491         self.assertEqual(self.hndl_called, True)
    492 
    493 def test_main():
    494     test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
    495                               WakeupFDTests, WakeupSignalTests,
    496                               SiginterruptTest, ItimerTest,
    497                               WindowsSignalTests)
    498 
    499 
    500 if __name__ == "__main__":
    501     test_main()
    502