1 import unittest 2 from test import test_support 3 from contextlib import closing 4 import gc 5 import pickle 6 import select 7 import signal 8 import subprocess 9 import traceback 10 import sys, os, time, errno 11 12 if sys.platform in ('os2', 'riscos'): 13 raise unittest.SkipTest("Can't test signal on %s" % sys.platform) 14 15 16 class HandlerBCalled(Exception): 17 pass 18 19 20 def exit_subprocess(): 21 """Use os._exit(0) to exit the current subprocess. 22 23 Otherwise, the test catches the SystemExit and continues executing 24 in parallel with the original test, so you wind up with an 25 exponential number of tests running concurrently. 26 """ 27 os._exit(0) 28 29 30 def ignoring_eintr(__func, *args, **kwargs): 31 try: 32 return __func(*args, **kwargs) 33 except EnvironmentError as e: 34 if e.errno != errno.EINTR: 35 raise 36 return None 37 38 39 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 40 class InterProcessSignalTests(unittest.TestCase): 41 MAX_DURATION = 20 # Entire test should last at most 20 sec. 42 43 def setUp(self): 44 self.using_gc = gc.isenabled() 45 gc.disable() 46 47 def tearDown(self): 48 if self.using_gc: 49 gc.enable() 50 51 def format_frame(self, frame, limit=None): 52 return ''.join(traceback.format_stack(frame, limit=limit)) 53 54 def handlerA(self, signum, frame): 55 self.a_called = True 56 if test_support.verbose: 57 print "handlerA invoked from signal %s at:\n%s" % ( 58 signum, self.format_frame(frame, limit=1)) 59 60 def handlerB(self, signum, frame): 61 self.b_called = True 62 if test_support.verbose: 63 print "handlerB invoked from signal %s at:\n%s" % ( 64 signum, self.format_frame(frame, limit=1)) 65 raise HandlerBCalled(signum, self.format_frame(frame)) 66 67 def wait(self, child): 68 """Wait for child to finish, ignoring EINTR.""" 69 while True: 70 try: 71 child.wait() 72 return 73 except OSError as e: 74 if e.errno != errno.EINTR: 75 raise 76 77 def run_test(self): 78 # Install handlers. This function runs in a sub-process, so we 79 # don't worry about re-setting the default handlers. 80 signal.signal(signal.SIGHUP, self.handlerA) 81 signal.signal(signal.SIGUSR1, self.handlerB) 82 signal.signal(signal.SIGUSR2, signal.SIG_IGN) 83 signal.signal(signal.SIGALRM, signal.default_int_handler) 84 85 # Variables the signals will modify: 86 self.a_called = False 87 self.b_called = False 88 89 # Let the sub-processes know who to send signals to. 90 pid = os.getpid() 91 if test_support.verbose: 92 print "test runner's pid is", pid 93 94 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) 95 if child: 96 self.wait(child) 97 if not self.a_called: 98 time.sleep(1) # Give the signal time to be delivered. 99 self.assertTrue(self.a_called) 100 self.assertFalse(self.b_called) 101 self.a_called = False 102 103 # Make sure the signal isn't delivered while the previous 104 # Popen object is being destroyed, because __del__ swallows 105 # exceptions. 106 del child 107 try: 108 child = subprocess.Popen(['kill', '-USR1', str(pid)]) 109 # This wait should be interrupted by the signal's exception. 110 self.wait(child) 111 time.sleep(1) # Give the signal time to be delivered. 112 self.fail('HandlerBCalled exception not thrown') 113 except HandlerBCalled: 114 self.assertTrue(self.b_called) 115 self.assertFalse(self.a_called) 116 if test_support.verbose: 117 print "HandlerBCalled exception caught" 118 119 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) 120 if child: 121 self.wait(child) # Nothing should happen. 122 123 try: 124 signal.alarm(1) 125 # The race condition in pause doesn't matter in this case, 126 # since alarm is going to raise a KeyboardException, which 127 # will skip the call. 128 signal.pause() 129 # But if another signal arrives before the alarm, pause 130 # may return early. 131 time.sleep(1) 132 except KeyboardInterrupt: 133 if test_support.verbose: 134 print "KeyboardInterrupt (the alarm() went off)" 135 except: 136 self.fail("Some other exception woke us from pause: %s" % 137 traceback.format_exc()) 138 else: 139 self.fail("pause returned of its own accord, and the signal" 140 " didn't arrive after another second.") 141 142 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 143 @unittest.skipIf(sys.platform=='freebsd6', 144 'inter process signals not reliable (do not mix well with threading) ' 145 'on freebsd6') 146 def test_main(self): 147 # This function spawns a child process to insulate the main 148 # test-running process from all the signals. It then 149 # communicates with that child process over a pipe and 150 # re-raises information about any exceptions the child 151 # throws. The real work happens in self.run_test(). 152 os_done_r, os_done_w = os.pipe() 153 with closing(os.fdopen(os_done_r)) as done_r, \ 154 closing(os.fdopen(os_done_w, 'w')) as done_w: 155 child = os.fork() 156 if child == 0: 157 # In the child process; run the test and report results 158 # through the pipe. 159 try: 160 done_r.close() 161 # Have to close done_w again here because 162 # exit_subprocess() will skip the enclosing with block. 163 with closing(done_w): 164 try: 165 self.run_test() 166 except: 167 pickle.dump(traceback.format_exc(), done_w) 168 else: 169 pickle.dump(None, done_w) 170 except: 171 print 'Uh oh, raised from pickle.' 172 traceback.print_exc() 173 finally: 174 exit_subprocess() 175 176 done_w.close() 177 # Block for up to MAX_DURATION seconds for the test to finish. 178 r, w, x = select.select([done_r], [], [], self.MAX_DURATION) 179 if done_r in r: 180 tb = pickle.load(done_r) 181 if tb: 182 self.fail(tb) 183 else: 184 os.kill(child, signal.SIGKILL) 185 self.fail('Test deadlocked after %d seconds.' % 186 self.MAX_DURATION) 187 188 189 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 190 class BasicSignalTests(unittest.TestCase): 191 def trivial_signal_handler(self, *args): 192 pass 193 194 def test_out_of_range_signal_number_raises_error(self): 195 self.assertRaises(ValueError, signal.getsignal, 4242) 196 197 self.assertRaises(ValueError, signal.signal, 4242, 198 self.trivial_signal_handler) 199 200 def test_setting_signal_handler_to_none_raises_error(self): 201 self.assertRaises(TypeError, signal.signal, 202 signal.SIGUSR1, None) 203 204 def test_getsignal(self): 205 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 206 self.assertEqual(signal.getsignal(signal.SIGHUP), 207 self.trivial_signal_handler) 208 signal.signal(signal.SIGHUP, hup) 209 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 210 211 212 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 213 class WindowsSignalTests(unittest.TestCase): 214 def test_issue9324(self): 215 # Updated for issue #10003, adding SIGBREAK 216 handler = lambda x, y: None 217 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 218 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 219 signal.SIGTERM): 220 # Set and then reset a handler for signals that work on windows 221 signal.signal(sig, signal.signal(sig, handler)) 222 223 with self.assertRaises(ValueError): 224 signal.signal(-1, handler) 225 226 with self.assertRaises(ValueError): 227 signal.signal(7, handler) 228 229 230 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 231 class WakeupSignalTests(unittest.TestCase): 232 TIMEOUT_FULL = 10 233 TIMEOUT_HALF = 5 234 235 def test_wakeup_fd_early(self): 236 import select 237 238 signal.alarm(1) 239 before_time = time.time() 240 # We attempt to get a signal during the sleep, 241 # before select is called 242 time.sleep(self.TIMEOUT_FULL) 243 mid_time = time.time() 244 self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF) 245 select.select([self.read], [], [], self.TIMEOUT_FULL) 246 after_time = time.time() 247 self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) 248 249 def test_wakeup_fd_during(self): 250 import select 251 252 signal.alarm(1) 253 before_time = time.time() 254 # We attempt to get a signal during the select call 255 self.assertRaises(select.error, select.select, 256 [self.read], [], [], self.TIMEOUT_FULL) 257 after_time = time.time() 258 self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) 259 260 def setUp(self): 261 import fcntl 262 263 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) 264 self.read, self.write = os.pipe() 265 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0) 266 flags = flags | os.O_NONBLOCK 267 fcntl.fcntl(self.write, fcntl.F_SETFL, flags) 268 self.old_wakeup = signal.set_wakeup_fd(self.write) 269 270 def tearDown(self): 271 signal.set_wakeup_fd(self.old_wakeup) 272 os.close(self.read) 273 os.close(self.write) 274 signal.signal(signal.SIGALRM, self.alrm) 275 276 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 277 class SiginterruptTest(unittest.TestCase): 278 279 def setUp(self): 280 """Install a no-op signal handler that can be set to allow 281 interrupts or not, and arrange for the original signal handler to be 282 re-installed when the test is finished. 283 """ 284 self.signum = signal.SIGUSR1 285 oldhandler = signal.signal(self.signum, lambda x,y: None) 286 self.addCleanup(signal.signal, self.signum, oldhandler) 287 288 def readpipe_interrupted(self): 289 """Perform a read during which a signal will arrive. Return True if the 290 read is interrupted by the signal and raises an exception. Return False 291 if it returns normally. 292 """ 293 # Create a pipe that can be used for the read. Also clean it up 294 # when the test is over, since nothing else will (but see below for 295 # the write end). 296 r, w = os.pipe() 297 self.addCleanup(os.close, r) 298 299 # Create another process which can send a signal to this one to try 300 # to interrupt the read. 301 ppid = os.getpid() 302 pid = os.fork() 303 304 if pid == 0: 305 # Child code: sleep to give the parent enough time to enter the 306 # read() call (there's a race here, but it's really tricky to 307 # eliminate it); then signal the parent process. Also, sleep 308 # again to make it likely that the signal is delivered to the 309 # parent process before the child exits. If the child exits 310 # first, the write end of the pipe will be closed and the test 311 # is invalid. 312 try: 313 time.sleep(0.2) 314 os.kill(ppid, self.signum) 315 time.sleep(0.2) 316 finally: 317 # No matter what, just exit as fast as possible now. 318 exit_subprocess() 319 else: 320 # Parent code. 321 # Make sure the child is eventually reaped, else it'll be a 322 # zombie for the rest of the test suite run. 323 self.addCleanup(os.waitpid, pid, 0) 324 325 # Close the write end of the pipe. The child has a copy, so 326 # it's not really closed until the child exits. We need it to 327 # close when the child exits so that in the non-interrupt case 328 # the read eventually completes, otherwise we could just close 329 # it *after* the test. 330 os.close(w) 331 332 # Try the read and report whether it is interrupted or not to 333 # the caller. 334 try: 335 d = os.read(r, 1) 336 return False 337 except OSError, err: 338 if err.errno != errno.EINTR: 339 raise 340 return True 341 342 def test_without_siginterrupt(self): 343 """If a signal handler is installed and siginterrupt is not called 344 at all, when that signal arrives, it interrupts a syscall that's in 345 progress. 346 """ 347 i = self.readpipe_interrupted() 348 self.assertTrue(i) 349 # Arrival of the signal shouldn't have changed anything. 350 i = self.readpipe_interrupted() 351 self.assertTrue(i) 352 353 def test_siginterrupt_on(self): 354 """If a signal handler is installed and siginterrupt is called with 355 a true value for the second argument, when that signal arrives, it 356 interrupts a syscall that's in progress. 357 """ 358 signal.siginterrupt(self.signum, 1) 359 i = self.readpipe_interrupted() 360 self.assertTrue(i) 361 # Arrival of the signal shouldn't have changed anything. 362 i = self.readpipe_interrupted() 363 self.assertTrue(i) 364 365 def test_siginterrupt_off(self): 366 """If a signal handler is installed and siginterrupt is called with 367 a false value for the second argument, when that signal arrives, it 368 does not interrupt a syscall that's in progress. 369 """ 370 signal.siginterrupt(self.signum, 0) 371 i = self.readpipe_interrupted() 372 self.assertFalse(i) 373 # Arrival of the signal shouldn't have changed anything. 374 i = self.readpipe_interrupted() 375 self.assertFalse(i) 376 377 378 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 379 class ItimerTest(unittest.TestCase): 380 def setUp(self): 381 self.hndl_called = False 382 self.hndl_count = 0 383 self.itimer = None 384 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 385 386 def tearDown(self): 387 signal.signal(signal.SIGALRM, self.old_alarm) 388 if self.itimer is not None: # test_itimer_exc doesn't change this attr 389 # just ensure that itimer is stopped 390 signal.setitimer(self.itimer, 0) 391 392 def sig_alrm(self, *args): 393 self.hndl_called = True 394 if test_support.verbose: 395 print("SIGALRM handler invoked", args) 396 397 def sig_vtalrm(self, *args): 398 self.hndl_called = True 399 400 if self.hndl_count > 3: 401 # it shouldn't be here, because it should have been disabled. 402 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 403 "timer.") 404 elif self.hndl_count == 3: 405 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 406 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 407 if test_support.verbose: 408 print("last SIGVTALRM handler call") 409 410 self.hndl_count += 1 411 412 if test_support.verbose: 413 print("SIGVTALRM handler invoked", args) 414 415 def sig_prof(self, *args): 416 self.hndl_called = True 417 signal.setitimer(signal.ITIMER_PROF, 0) 418 419 if test_support.verbose: 420 print("SIGPROF handler invoked", args) 421 422 def test_itimer_exc(self): 423 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 424 # defines it ? 425 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 426 # Negative times are treated as zero on some platforms. 427 if 0: 428 self.assertRaises(signal.ItimerError, 429 signal.setitimer, signal.ITIMER_REAL, -1) 430 431 def test_itimer_real(self): 432 self.itimer = signal.ITIMER_REAL 433 signal.setitimer(self.itimer, 1.0) 434 if test_support.verbose: 435 print("\ncall pause()...") 436 signal.pause() 437 438 self.assertEqual(self.hndl_called, True) 439 440 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 441 @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), 442 'itimer not reliable (does not mix well with threading) on some BSDs.') 443 def test_itimer_virtual(self): 444 self.itimer = signal.ITIMER_VIRTUAL 445 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 446 signal.setitimer(self.itimer, 0.3, 0.2) 447 448 start_time = time.time() 449 while time.time() - start_time < 60.0: 450 # use up some virtual time by doing real work 451 _ = pow(12345, 67890, 10000019) 452 if signal.getitimer(self.itimer) == (0.0, 0.0): 453 break # sig_vtalrm handler stopped this itimer 454 else: # Issue 8424 455 self.skipTest("timeout: likely cause: machine too slow or load too " 456 "high") 457 458 # virtual itimer should be (0.0, 0.0) now 459 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 460 # and the handler should have been called 461 self.assertEqual(self.hndl_called, True) 462 463 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 464 @unittest.skipIf(sys.platform=='freebsd6', 465 'itimer not reliable (does not mix well with threading) on freebsd6') 466 def test_itimer_prof(self): 467 self.itimer = signal.ITIMER_PROF 468 signal.signal(signal.SIGPROF, self.sig_prof) 469 signal.setitimer(self.itimer, 0.2, 0.2) 470 471 start_time = time.time() 472 while time.time() - start_time < 60.0: 473 # do some work 474 _ = pow(12345, 67890, 10000019) 475 if signal.getitimer(self.itimer) == (0.0, 0.0): 476 break # sig_prof handler stopped this itimer 477 else: # Issue 8424 478 self.skipTest("timeout: likely cause: machine too slow or load too " 479 "high") 480 481 # profiling itimer should be (0.0, 0.0) now 482 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 483 # and the handler should have been called 484 self.assertEqual(self.hndl_called, True) 485 486 def test_main(): 487 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests, 488 WakeupSignalTests, SiginterruptTest, 489 ItimerTest, WindowsSignalTests) 490 491 492 if __name__ == "__main__": 493 test_main() 494