1 """PyUnit testing that threads honor our signal semantics""" 2 3 import unittest 4 import signal 5 import os 6 import sys 7 from test import support 8 import _thread as thread 9 import time 10 11 if (sys.platform[:3] == 'win'): 12 raise unittest.SkipTest("Can't test signal on %s" % sys.platform) 13 14 process_pid = os.getpid() 15 signalled_all=thread.allocate_lock() 16 17 USING_PTHREAD_COND = (sys.thread_info.name == 'pthread' 18 and sys.thread_info.lock == 'mutex+cond') 19 20 def registerSignals(for_usr1, for_usr2, for_alrm): 21 usr1 = signal.signal(signal.SIGUSR1, for_usr1) 22 usr2 = signal.signal(signal.SIGUSR2, for_usr2) 23 alrm = signal.signal(signal.SIGALRM, for_alrm) 24 return usr1, usr2, alrm 25 26 27 # The signal handler. Just note that the signal occurred and 28 # from who. 29 def handle_signals(sig,frame): 30 signal_blackboard[sig]['tripped'] += 1 31 signal_blackboard[sig]['tripped_by'] = thread.get_ident() 32 33 # a function that will be spawned as a separate thread. 34 def send_signals(): 35 os.kill(process_pid, signal.SIGUSR1) 36 os.kill(process_pid, signal.SIGUSR2) 37 signalled_all.release() 38 39 class ThreadSignals(unittest.TestCase): 40 41 def test_signals(self): 42 with support.wait_threads_exit(): 43 # Test signal handling semantics of threads. 44 # We spawn a thread, have the thread send two signals, and 45 # wait for it to finish. Check that we got both signals 46 # and that they were run by the main thread. 47 signalled_all.acquire() 48 self.spawnSignallingThread() 49 signalled_all.acquire() 50 51 # the signals that we asked the kernel to send 52 # will come back, but we don't know when. 53 # (it might even be after the thread exits 54 # and might be out of order.) If we haven't seen 55 # the signals yet, send yet another signal and 56 # wait for it return. 57 if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \ 58 or signal_blackboard[signal.SIGUSR2]['tripped'] == 0: 59 try: 60 signal.alarm(1) 61 signal.pause() 62 finally: 63 signal.alarm(0) 64 65 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1) 66 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'], 67 thread.get_ident()) 68 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1) 69 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'], 70 thread.get_ident()) 71 signalled_all.release() 72 73 def spawnSignallingThread(self): 74 thread.start_new_thread(send_signals, ()) 75 76 def alarm_interrupt(self, sig, frame): 77 raise KeyboardInterrupt 78 79 @unittest.skipIf(USING_PTHREAD_COND, 80 'POSIX condition variables cannot be interrupted') 81 @unittest.skipIf(sys.platform.startswith('linux') and 82 not sys.thread_info.version, 83 'Issue 34004: musl does not allow interruption of locks ' 84 'by signals.') 85 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD 86 @unittest.skipIf(sys.platform.startswith('openbsd'), 87 'lock cannot be interrupted on OpenBSD') 88 def test_lock_acquire_interruption(self): 89 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck 90 # in a deadlock. 91 # XXX this test can fail when the legacy (non-semaphore) implementation 92 # of locks is used in thread_pthread.h, see issue #11223. 93 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 94 try: 95 lock = thread.allocate_lock() 96 lock.acquire() 97 signal.alarm(1) 98 t1 = time.monotonic() 99 self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5) 100 dt = time.monotonic() - t1 101 # Checking that KeyboardInterrupt was raised is not sufficient. 102 # We want to assert that lock.acquire() was interrupted because 103 # of the signal, not that the signal handler was called immediately 104 # after timeout return of lock.acquire() (which can fool assertRaises). 105 self.assertLess(dt, 3.0) 106 finally: 107 signal.alarm(0) 108 signal.signal(signal.SIGALRM, oldalrm) 109 110 @unittest.skipIf(USING_PTHREAD_COND, 111 'POSIX condition variables cannot be interrupted') 112 @unittest.skipIf(sys.platform.startswith('linux') and 113 not sys.thread_info.version, 114 'Issue 34004: musl does not allow interruption of locks ' 115 'by signals.') 116 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD 117 @unittest.skipIf(sys.platform.startswith('openbsd'), 118 'lock cannot be interrupted on OpenBSD') 119 def test_rlock_acquire_interruption(self): 120 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck 121 # in a deadlock. 122 # XXX this test can fail when the legacy (non-semaphore) implementation 123 # of locks is used in thread_pthread.h, see issue #11223. 124 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 125 try: 126 rlock = thread.RLock() 127 # For reentrant locks, the initial acquisition must be in another 128 # thread. 129 def other_thread(): 130 rlock.acquire() 131 132 with support.wait_threads_exit(): 133 thread.start_new_thread(other_thread, ()) 134 # Wait until we can't acquire it without blocking... 135 while rlock.acquire(blocking=False): 136 rlock.release() 137 time.sleep(0.01) 138 signal.alarm(1) 139 t1 = time.monotonic() 140 self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5) 141 dt = time.monotonic() - t1 142 # See rationale above in test_lock_acquire_interruption 143 self.assertLess(dt, 3.0) 144 finally: 145 signal.alarm(0) 146 signal.signal(signal.SIGALRM, oldalrm) 147 148 def acquire_retries_on_intr(self, lock): 149 self.sig_recvd = False 150 def my_handler(signal, frame): 151 self.sig_recvd = True 152 153 old_handler = signal.signal(signal.SIGUSR1, my_handler) 154 try: 155 def other_thread(): 156 # Acquire the lock in a non-main thread, so this test works for 157 # RLocks. 158 lock.acquire() 159 # Wait until the main thread is blocked in the lock acquire, and 160 # then wake it up with this. 161 time.sleep(0.5) 162 os.kill(process_pid, signal.SIGUSR1) 163 # Let the main thread take the interrupt, handle it, and retry 164 # the lock acquisition. Then we'll let it run. 165 time.sleep(0.5) 166 lock.release() 167 168 with support.wait_threads_exit(): 169 thread.start_new_thread(other_thread, ()) 170 # Wait until we can't acquire it without blocking... 171 while lock.acquire(blocking=False): 172 lock.release() 173 time.sleep(0.01) 174 result = lock.acquire() # Block while we receive a signal. 175 self.assertTrue(self.sig_recvd) 176 self.assertTrue(result) 177 finally: 178 signal.signal(signal.SIGUSR1, old_handler) 179 180 def test_lock_acquire_retries_on_intr(self): 181 self.acquire_retries_on_intr(thread.allocate_lock()) 182 183 def test_rlock_acquire_retries_on_intr(self): 184 self.acquire_retries_on_intr(thread.RLock()) 185 186 def test_interrupted_timed_acquire(self): 187 # Test to make sure we recompute lock acquisition timeouts when we 188 # receive a signal. Check this by repeatedly interrupting a lock 189 # acquire in the main thread, and make sure that the lock acquire times 190 # out after the right amount of time. 191 # NOTE: this test only behaves as expected if C signals get delivered 192 # to the main thread. Otherwise lock.acquire() itself doesn't get 193 # interrupted and the test trivially succeeds. 194 self.start = None 195 self.end = None 196 self.sigs_recvd = 0 197 done = thread.allocate_lock() 198 done.acquire() 199 lock = thread.allocate_lock() 200 lock.acquire() 201 def my_handler(signum, frame): 202 self.sigs_recvd += 1 203 old_handler = signal.signal(signal.SIGUSR1, my_handler) 204 try: 205 def timed_acquire(): 206 self.start = time.monotonic() 207 lock.acquire(timeout=0.5) 208 self.end = time.monotonic() 209 def send_signals(): 210 for _ in range(40): 211 time.sleep(0.02) 212 os.kill(process_pid, signal.SIGUSR1) 213 done.release() 214 215 with support.wait_threads_exit(): 216 # Send the signals from the non-main thread, since the main thread 217 # is the only one that can process signals. 218 thread.start_new_thread(send_signals, ()) 219 timed_acquire() 220 # Wait for thread to finish 221 done.acquire() 222 # This allows for some timing and scheduling imprecision 223 self.assertLess(self.end - self.start, 2.0) 224 self.assertGreater(self.end - self.start, 0.3) 225 # If the signal is received several times before PyErr_CheckSignals() 226 # is called, the handler will get called less than 40 times. Just 227 # check it's been called at least once. 228 self.assertGreater(self.sigs_recvd, 0) 229 finally: 230 signal.signal(signal.SIGUSR1, old_handler) 231 232 233 def test_main(): 234 global signal_blackboard 235 236 signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 }, 237 signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 }, 238 signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } } 239 240 oldsigs = registerSignals(handle_signals, handle_signals, handle_signals) 241 try: 242 support.run_unittest(ThreadSignals) 243 finally: 244 registerSignals(*oldsigs) 245 246 if __name__ == '__main__': 247 test_main() 248