Home | History | Annotate | Download | only in test
      1 """PyUnit testing that threads honor our signal semantics"""
      2 
      3 import unittest
      4 import signal
      5 import os
      6 import sys
      7 from test.test_support import run_unittest, import_module, reap_threads
      8 thread = import_module('thread')
      9 
     10 if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
     11     raise unittest.SkipTest, "Can't test signal on %s" % sys.platform
     12 
     13 process_pid = os.getpid()
     14 signalled_all=thread.allocate_lock()
     15 
     16 
     17 def registerSignals(for_usr1, for_usr2, for_alrm):
     18     usr1 = signal.signal(signal.SIGUSR1, for_usr1)
     19     usr2 = signal.signal(signal.SIGUSR2, for_usr2)
     20     alrm = signal.signal(signal.SIGALRM, for_alrm)
     21     return usr1, usr2, alrm
     22 
     23 
     24 # The signal handler. Just note that the signal occurred and
     25 # from who.
     26 def handle_signals(sig,frame):
     27     signal_blackboard[sig]['tripped'] += 1
     28     signal_blackboard[sig]['tripped_by'] = thread.get_ident()
     29 
     30 # a function that will be spawned as a separate thread.
     31 def send_signals():
     32     os.kill(process_pid, signal.SIGUSR1)
     33     os.kill(process_pid, signal.SIGUSR2)
     34     signalled_all.release()
     35 
     36 class ThreadSignals(unittest.TestCase):
     37     """Test signal handling semantics of threads.
     38        We spawn a thread, have the thread send two signals, and
     39        wait for it to finish. Check that we got both signals
     40        and that they were run by the main thread.
     41     """
     42     @reap_threads
     43     def test_signals(self):
     44         signalled_all.acquire()
     45         self.spawnSignallingThread()
     46         signalled_all.acquire()
     47         # the signals that we asked the kernel to send
     48         # will come back, but we don't know when.
     49         # (it might even be after the thread exits
     50         # and might be out of order.)  If we haven't seen
     51         # the signals yet, send yet another signal and
     52         # wait for it return.
     53         if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
     54            or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
     55             signal.alarm(1)
     56             signal.pause()
     57             signal.alarm(0)
     58 
     59         self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
     60         self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
     61                            thread.get_ident())
     62         self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
     63         self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
     64                            thread.get_ident())
     65         signalled_all.release()
     66 
     67     def spawnSignallingThread(self):
     68         thread.start_new_thread(send_signals, ())
     69 
     70 
     71 def test_main():
     72     global signal_blackboard
     73 
     74     signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
     75                           signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
     76                           signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
     77 
     78     oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
     79     try:
     80         run_unittest(ThreadSignals)
     81     finally:
     82         registerSignals(*oldsigs)
     83 
     84 if __name__ == '__main__':
     85     test_main()
     86