Home | History | Annotate | Download | only in test
      1 """PyUnit testing that threads honor our signal semantics"""
      2 
      3 import unittest
      4 import signal
      5 import os
      6 import sys
      7 from test import support
      8 import _thread as thread
      9 import time
     10 
     11 if (sys.platform[:3] == 'win'):
     12     raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
     13 
     14 process_pid = os.getpid()
     15 signalled_all=thread.allocate_lock()
     16 
     17 USING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
     18                       and sys.thread_info.lock == 'mutex+cond')
     19 
     20 def registerSignals(for_usr1, for_usr2, for_alrm):
     21     usr1 = signal.signal(signal.SIGUSR1, for_usr1)
     22     usr2 = signal.signal(signal.SIGUSR2, for_usr2)
     23     alrm = signal.signal(signal.SIGALRM, for_alrm)
     24     return usr1, usr2, alrm
     25 
     26 
     27 # The signal handler. Just note that the signal occurred and
     28 # from who.
     29 def handle_signals(sig,frame):
     30     signal_blackboard[sig]['tripped'] += 1
     31     signal_blackboard[sig]['tripped_by'] = thread.get_ident()
     32 
     33 # a function that will be spawned as a separate thread.
     34 def send_signals():
     35     os.kill(process_pid, signal.SIGUSR1)
     36     os.kill(process_pid, signal.SIGUSR2)
     37     signalled_all.release()
     38 
     39 class ThreadSignals(unittest.TestCase):
     40 
     41     def test_signals(self):
     42         with support.wait_threads_exit():
     43             # Test signal handling semantics of threads.
     44             # We spawn a thread, have the thread send two signals, and
     45             # wait for it to finish. Check that we got both signals
     46             # and that they were run by the main thread.
     47             signalled_all.acquire()
     48             self.spawnSignallingThread()
     49             signalled_all.acquire()
     50 
     51         # the signals that we asked the kernel to send
     52         # will come back, but we don't know when.
     53         # (it might even be after the thread exits
     54         # and might be out of order.)  If we haven't seen
     55         # the signals yet, send yet another signal and
     56         # wait for it return.
     57         if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
     58            or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
     59             try:
     60                 signal.alarm(1)
     61                 signal.pause()
     62             finally:
     63                 signal.alarm(0)
     64 
     65         self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
     66         self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
     67                            thread.get_ident())
     68         self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
     69         self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
     70                            thread.get_ident())
     71         signalled_all.release()
     72 
     73     def spawnSignallingThread(self):
     74         thread.start_new_thread(send_signals, ())
     75 
     76     def alarm_interrupt(self, sig, frame):
     77         raise KeyboardInterrupt
     78 
     79     @unittest.skipIf(USING_PTHREAD_COND,
     80                      'POSIX condition variables cannot be interrupted')
     81     @unittest.skipIf(sys.platform.startswith('linux') and
     82                      not sys.thread_info.version,
     83                      'Issue 34004: musl does not allow interruption of locks '
     84                      'by signals.')
     85     # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
     86     @unittest.skipIf(sys.platform.startswith('openbsd'),
     87                      'lock cannot be interrupted on OpenBSD')
     88     def test_lock_acquire_interruption(self):
     89         # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
     90         # in a deadlock.
     91         # XXX this test can fail when the legacy (non-semaphore) implementation
     92         # of locks is used in thread_pthread.h, see issue #11223.
     93         oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
     94         try:
     95             lock = thread.allocate_lock()
     96             lock.acquire()
     97             signal.alarm(1)
     98             t1 = time.monotonic()
     99             self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
    100             dt = time.monotonic() - t1
    101             # Checking that KeyboardInterrupt was raised is not sufficient.
    102             # We want to assert that lock.acquire() was interrupted because
    103             # of the signal, not that the signal handler was called immediately
    104             # after timeout return of lock.acquire() (which can fool assertRaises).
    105             self.assertLess(dt, 3.0)
    106         finally:
    107             signal.alarm(0)
    108             signal.signal(signal.SIGALRM, oldalrm)
    109 
    110     @unittest.skipIf(USING_PTHREAD_COND,
    111                      'POSIX condition variables cannot be interrupted')
    112     @unittest.skipIf(sys.platform.startswith('linux') and
    113                      not sys.thread_info.version,
    114                      'Issue 34004: musl does not allow interruption of locks '
    115                      'by signals.')
    116     # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
    117     @unittest.skipIf(sys.platform.startswith('openbsd'),
    118                      'lock cannot be interrupted on OpenBSD')
    119     def test_rlock_acquire_interruption(self):
    120         # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
    121         # in a deadlock.
    122         # XXX this test can fail when the legacy (non-semaphore) implementation
    123         # of locks is used in thread_pthread.h, see issue #11223.
    124         oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
    125         try:
    126             rlock = thread.RLock()
    127             # For reentrant locks, the initial acquisition must be in another
    128             # thread.
    129             def other_thread():
    130                 rlock.acquire()
    131 
    132             with support.wait_threads_exit():
    133                 thread.start_new_thread(other_thread, ())
    134                 # Wait until we can't acquire it without blocking...
    135                 while rlock.acquire(blocking=False):
    136                     rlock.release()
    137                     time.sleep(0.01)
    138                 signal.alarm(1)
    139                 t1 = time.monotonic()
    140                 self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
    141                 dt = time.monotonic() - t1
    142                 # See rationale above in test_lock_acquire_interruption
    143                 self.assertLess(dt, 3.0)
    144         finally:
    145             signal.alarm(0)
    146             signal.signal(signal.SIGALRM, oldalrm)
    147 
    148     def acquire_retries_on_intr(self, lock):
    149         self.sig_recvd = False
    150         def my_handler(signal, frame):
    151             self.sig_recvd = True
    152 
    153         old_handler = signal.signal(signal.SIGUSR1, my_handler)
    154         try:
    155             def other_thread():
    156                 # Acquire the lock in a non-main thread, so this test works for
    157                 # RLocks.
    158                 lock.acquire()
    159                 # Wait until the main thread is blocked in the lock acquire, and
    160                 # then wake it up with this.
    161                 time.sleep(0.5)
    162                 os.kill(process_pid, signal.SIGUSR1)
    163                 # Let the main thread take the interrupt, handle it, and retry
    164                 # the lock acquisition.  Then we'll let it run.
    165                 time.sleep(0.5)
    166                 lock.release()
    167 
    168             with support.wait_threads_exit():
    169                 thread.start_new_thread(other_thread, ())
    170                 # Wait until we can't acquire it without blocking...
    171                 while lock.acquire(blocking=False):
    172                     lock.release()
    173                     time.sleep(0.01)
    174                 result = lock.acquire()  # Block while we receive a signal.
    175                 self.assertTrue(self.sig_recvd)
    176                 self.assertTrue(result)
    177         finally:
    178             signal.signal(signal.SIGUSR1, old_handler)
    179 
    180     def test_lock_acquire_retries_on_intr(self):
    181         self.acquire_retries_on_intr(thread.allocate_lock())
    182 
    183     def test_rlock_acquire_retries_on_intr(self):
    184         self.acquire_retries_on_intr(thread.RLock())
    185 
    186     def test_interrupted_timed_acquire(self):
    187         # Test to make sure we recompute lock acquisition timeouts when we
    188         # receive a signal.  Check this by repeatedly interrupting a lock
    189         # acquire in the main thread, and make sure that the lock acquire times
    190         # out after the right amount of time.
    191         # NOTE: this test only behaves as expected if C signals get delivered
    192         # to the main thread.  Otherwise lock.acquire() itself doesn't get
    193         # interrupted and the test trivially succeeds.
    194         self.start = None
    195         self.end = None
    196         self.sigs_recvd = 0
    197         done = thread.allocate_lock()
    198         done.acquire()
    199         lock = thread.allocate_lock()
    200         lock.acquire()
    201         def my_handler(signum, frame):
    202             self.sigs_recvd += 1
    203         old_handler = signal.signal(signal.SIGUSR1, my_handler)
    204         try:
    205             def timed_acquire():
    206                 self.start = time.monotonic()
    207                 lock.acquire(timeout=0.5)
    208                 self.end = time.monotonic()
    209             def send_signals():
    210                 for _ in range(40):
    211                     time.sleep(0.02)
    212                     os.kill(process_pid, signal.SIGUSR1)
    213                 done.release()
    214 
    215             with support.wait_threads_exit():
    216                 # Send the signals from the non-main thread, since the main thread
    217                 # is the only one that can process signals.
    218                 thread.start_new_thread(send_signals, ())
    219                 timed_acquire()
    220                 # Wait for thread to finish
    221                 done.acquire()
    222                 # This allows for some timing and scheduling imprecision
    223                 self.assertLess(self.end - self.start, 2.0)
    224                 self.assertGreater(self.end - self.start, 0.3)
    225                 # If the signal is received several times before PyErr_CheckSignals()
    226                 # is called, the handler will get called less than 40 times. Just
    227                 # check it's been called at least once.
    228                 self.assertGreater(self.sigs_recvd, 0)
    229         finally:
    230             signal.signal(signal.SIGUSR1, old_handler)
    231 
    232 
    233 def test_main():
    234     global signal_blackboard
    235 
    236     signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
    237                           signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
    238                           signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
    239 
    240     oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
    241     try:
    242         support.run_unittest(ThreadSignals)
    243     finally:
    244         registerSignals(*oldsigs)
    245 
    246 if __name__ == '__main__':
    247     test_main()
    248