1 import unittest 2 from test import test_support 3 from contextlib import closing 4 import gc 5 import pickle 6 import select 7 import signal 8 import subprocess 9 import traceback 10 import sys, os, time, errno 11 12 if sys.platform in ('os2', 'riscos'): 13 raise unittest.SkipTest("Can't test signal on %s" % sys.platform) 14 15 16 class HandlerBCalled(Exception): 17 pass 18 19 20 def exit_subprocess(): 21 """Use os._exit(0) to exit the current subprocess. 22 23 Otherwise, the test catches the SystemExit and continues executing 24 in parallel with the original test, so you wind up with an 25 exponential number of tests running concurrently. 26 """ 27 os._exit(0) 28 29 30 def ignoring_eintr(__func, *args, **kwargs): 31 try: 32 return __func(*args, **kwargs) 33 except EnvironmentError as e: 34 if e.errno != errno.EINTR: 35 raise 36 return None 37 38 39 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 40 class InterProcessSignalTests(unittest.TestCase): 41 MAX_DURATION = 20 # Entire test should last at most 20 sec. 42 43 def setUp(self): 44 self.using_gc = gc.isenabled() 45 gc.disable() 46 47 def tearDown(self): 48 if self.using_gc: 49 gc.enable() 50 51 def format_frame(self, frame, limit=None): 52 return ''.join(traceback.format_stack(frame, limit=limit)) 53 54 def handlerA(self, signum, frame): 55 self.a_called = True 56 if test_support.verbose: 57 print "handlerA invoked from signal %s at:\n%s" % ( 58 signum, self.format_frame(frame, limit=1)) 59 60 def handlerB(self, signum, frame): 61 self.b_called = True 62 if test_support.verbose: 63 print "handlerB invoked from signal %s at:\n%s" % ( 64 signum, self.format_frame(frame, limit=1)) 65 raise HandlerBCalled(signum, self.format_frame(frame)) 66 67 def wait(self, child): 68 """Wait for child to finish, ignoring EINTR.""" 69 while True: 70 try: 71 child.wait() 72 return 73 except OSError as e: 74 if e.errno != errno.EINTR: 75 raise 76 77 def run_test(self): 78 # Install handlers. This function runs in a sub-process, so we 79 # don't worry about re-setting the default handlers. 80 signal.signal(signal.SIGHUP, self.handlerA) 81 signal.signal(signal.SIGUSR1, self.handlerB) 82 signal.signal(signal.SIGUSR2, signal.SIG_IGN) 83 signal.signal(signal.SIGALRM, signal.default_int_handler) 84 85 # Variables the signals will modify: 86 self.a_called = False 87 self.b_called = False 88 89 # Let the sub-processes know who to send signals to. 90 pid = os.getpid() 91 if test_support.verbose: 92 print "test runner's pid is", pid 93 94 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) 95 if child: 96 self.wait(child) 97 if not self.a_called: 98 time.sleep(1) # Give the signal time to be delivered. 99 self.assertTrue(self.a_called) 100 self.assertFalse(self.b_called) 101 self.a_called = False 102 103 # Make sure the signal isn't delivered while the previous 104 # Popen object is being destroyed, because __del__ swallows 105 # exceptions. 106 del child 107 try: 108 child = subprocess.Popen(['kill', '-USR1', str(pid)]) 109 # This wait should be interrupted by the signal's exception. 110 self.wait(child) 111 time.sleep(1) # Give the signal time to be delivered. 112 self.fail('HandlerBCalled exception not raised') 113 except HandlerBCalled: 114 self.assertTrue(self.b_called) 115 self.assertFalse(self.a_called) 116 if test_support.verbose: 117 print "HandlerBCalled exception caught" 118 119 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) 120 if child: 121 self.wait(child) # Nothing should happen. 122 123 try: 124 signal.alarm(1) 125 # The race condition in pause doesn't matter in this case, 126 # since alarm is going to raise a KeyboardException, which 127 # will skip the call. 128 signal.pause() 129 # But if another signal arrives before the alarm, pause 130 # may return early. 131 time.sleep(1) 132 except KeyboardInterrupt: 133 if test_support.verbose: 134 print "KeyboardInterrupt (the alarm() went off)" 135 except: 136 self.fail("Some other exception woke us from pause: %s" % 137 traceback.format_exc()) 138 else: 139 self.fail("pause returned of its own accord, and the signal" 140 " didn't arrive after another second.") 141 142 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 143 @unittest.skipIf(sys.platform=='freebsd6', 144 'inter process signals not reliable (do not mix well with threading) ' 145 'on freebsd6') 146 def test_main(self): 147 # This function spawns a child process to insulate the main 148 # test-running process from all the signals. It then 149 # communicates with that child process over a pipe and 150 # re-raises information about any exceptions the child 151 # raises. The real work happens in self.run_test(). 152 os_done_r, os_done_w = os.pipe() 153 with closing(os.fdopen(os_done_r)) as done_r, \ 154 closing(os.fdopen(os_done_w, 'w')) as done_w: 155 child = os.fork() 156 if child == 0: 157 # In the child process; run the test and report results 158 # through the pipe. 159 try: 160 done_r.close() 161 # Have to close done_w again here because 162 # exit_subprocess() will skip the enclosing with block. 163 with closing(done_w): 164 try: 165 self.run_test() 166 except: 167 pickle.dump(traceback.format_exc(), done_w) 168 else: 169 pickle.dump(None, done_w) 170 except: 171 print 'Uh oh, raised from pickle.' 172 traceback.print_exc() 173 finally: 174 exit_subprocess() 175 176 done_w.close() 177 # Block for up to MAX_DURATION seconds for the test to finish. 178 r, w, x = select.select([done_r], [], [], self.MAX_DURATION) 179 if done_r in r: 180 tb = pickle.load(done_r) 181 if tb: 182 self.fail(tb) 183 else: 184 os.kill(child, signal.SIGKILL) 185 self.fail('Test deadlocked after %d seconds.' % 186 self.MAX_DURATION) 187 188 189 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 190 class BasicSignalTests(unittest.TestCase): 191 def trivial_signal_handler(self, *args): 192 pass 193 194 def test_out_of_range_signal_number_raises_error(self): 195 self.assertRaises(ValueError, signal.getsignal, 4242) 196 197 self.assertRaises(ValueError, signal.signal, 4242, 198 self.trivial_signal_handler) 199 200 def test_setting_signal_handler_to_none_raises_error(self): 201 self.assertRaises(TypeError, signal.signal, 202 signal.SIGUSR1, None) 203 204 def test_getsignal(self): 205 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 206 self.assertEqual(signal.getsignal(signal.SIGHUP), 207 self.trivial_signal_handler) 208 signal.signal(signal.SIGHUP, hup) 209 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 210 211 212 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 213 class WindowsSignalTests(unittest.TestCase): 214 def test_issue9324(self): 215 # Updated for issue #10003, adding SIGBREAK 216 handler = lambda x, y: None 217 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 218 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 219 signal.SIGTERM): 220 # Set and then reset a handler for signals that work on windows 221 signal.signal(sig, signal.signal(sig, handler)) 222 223 with self.assertRaises(ValueError): 224 signal.signal(-1, handler) 225 226 with self.assertRaises(ValueError): 227 signal.signal(7, handler) 228 229 230 class WakeupFDTests(unittest.TestCase): 231 232 def test_invalid_fd(self): 233 fd = test_support.make_bad_fd() 234 self.assertRaises(ValueError, signal.set_wakeup_fd, fd) 235 236 237 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 238 class WakeupSignalTests(unittest.TestCase): 239 TIMEOUT_FULL = 10 240 TIMEOUT_HALF = 5 241 242 def test_wakeup_fd_early(self): 243 import select 244 245 signal.alarm(1) 246 before_time = time.time() 247 # We attempt to get a signal during the sleep, 248 # before select is called 249 time.sleep(self.TIMEOUT_FULL) 250 mid_time = time.time() 251 self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF) 252 select.select([self.read], [], [], self.TIMEOUT_FULL) 253 after_time = time.time() 254 self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) 255 256 def test_wakeup_fd_during(self): 257 import select 258 259 signal.alarm(1) 260 before_time = time.time() 261 # We attempt to get a signal during the select call 262 self.assertRaises(select.error, select.select, 263 [self.read], [], [], self.TIMEOUT_FULL) 264 after_time = time.time() 265 self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) 266 267 def setUp(self): 268 import fcntl 269 270 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) 271 self.read, self.write = os.pipe() 272 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0) 273 flags = flags | os.O_NONBLOCK 274 fcntl.fcntl(self.write, fcntl.F_SETFL, flags) 275 self.old_wakeup = signal.set_wakeup_fd(self.write) 276 277 def tearDown(self): 278 signal.set_wakeup_fd(self.old_wakeup) 279 os.close(self.read) 280 os.close(self.write) 281 signal.signal(signal.SIGALRM, self.alrm) 282 283 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 284 class SiginterruptTest(unittest.TestCase): 285 286 def setUp(self): 287 """Install a no-op signal handler that can be set to allow 288 interrupts or not, and arrange for the original signal handler to be 289 re-installed when the test is finished. 290 """ 291 self.signum = signal.SIGUSR1 292 oldhandler = signal.signal(self.signum, lambda x,y: None) 293 self.addCleanup(signal.signal, self.signum, oldhandler) 294 295 def readpipe_interrupted(self): 296 """Perform a read during which a signal will arrive. Return True if the 297 read is interrupted by the signal and raises an exception. Return False 298 if it returns normally. 299 """ 300 # Create a pipe that can be used for the read. Also clean it up 301 # when the test is over, since nothing else will (but see below for 302 # the write end). 303 r, w = os.pipe() 304 self.addCleanup(os.close, r) 305 306 # Create another process which can send a signal to this one to try 307 # to interrupt the read. 308 ppid = os.getpid() 309 pid = os.fork() 310 311 if pid == 0: 312 # Child code: sleep to give the parent enough time to enter the 313 # read() call (there's a race here, but it's really tricky to 314 # eliminate it); then signal the parent process. Also, sleep 315 # again to make it likely that the signal is delivered to the 316 # parent process before the child exits. If the child exits 317 # first, the write end of the pipe will be closed and the test 318 # is invalid. 319 try: 320 time.sleep(0.2) 321 os.kill(ppid, self.signum) 322 time.sleep(0.2) 323 finally: 324 # No matter what, just exit as fast as possible now. 325 exit_subprocess() 326 else: 327 # Parent code. 328 # Make sure the child is eventually reaped, else it'll be a 329 # zombie for the rest of the test suite run. 330 self.addCleanup(os.waitpid, pid, 0) 331 332 # Close the write end of the pipe. The child has a copy, so 333 # it's not really closed until the child exits. We need it to 334 # close when the child exits so that in the non-interrupt case 335 # the read eventually completes, otherwise we could just close 336 # it *after* the test. 337 os.close(w) 338 339 # Try the read and report whether it is interrupted or not to 340 # the caller. 341 try: 342 d = os.read(r, 1) 343 return False 344 except OSError, err: 345 if err.errno != errno.EINTR: 346 raise 347 return True 348 349 def test_without_siginterrupt(self): 350 """If a signal handler is installed and siginterrupt is not called 351 at all, when that signal arrives, it interrupts a syscall that's in 352 progress. 353 """ 354 i = self.readpipe_interrupted() 355 self.assertTrue(i) 356 # Arrival of the signal shouldn't have changed anything. 357 i = self.readpipe_interrupted() 358 self.assertTrue(i) 359 360 def test_siginterrupt_on(self): 361 """If a signal handler is installed and siginterrupt is called with 362 a true value for the second argument, when that signal arrives, it 363 interrupts a syscall that's in progress. 364 """ 365 signal.siginterrupt(self.signum, 1) 366 i = self.readpipe_interrupted() 367 self.assertTrue(i) 368 # Arrival of the signal shouldn't have changed anything. 369 i = self.readpipe_interrupted() 370 self.assertTrue(i) 371 372 def test_siginterrupt_off(self): 373 """If a signal handler is installed and siginterrupt is called with 374 a false value for the second argument, when that signal arrives, it 375 does not interrupt a syscall that's in progress. 376 """ 377 signal.siginterrupt(self.signum, 0) 378 i = self.readpipe_interrupted() 379 self.assertFalse(i) 380 # Arrival of the signal shouldn't have changed anything. 381 i = self.readpipe_interrupted() 382 self.assertFalse(i) 383 384 385 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 386 class ItimerTest(unittest.TestCase): 387 def setUp(self): 388 self.hndl_called = False 389 self.hndl_count = 0 390 self.itimer = None 391 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 392 393 def tearDown(self): 394 signal.signal(signal.SIGALRM, self.old_alarm) 395 if self.itimer is not None: # test_itimer_exc doesn't change this attr 396 # just ensure that itimer is stopped 397 signal.setitimer(self.itimer, 0) 398 399 def sig_alrm(self, *args): 400 self.hndl_called = True 401 if test_support.verbose: 402 print("SIGALRM handler invoked", args) 403 404 def sig_vtalrm(self, *args): 405 self.hndl_called = True 406 407 if self.hndl_count > 3: 408 # it shouldn't be here, because it should have been disabled. 409 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 410 "timer.") 411 elif self.hndl_count == 3: 412 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 413 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 414 if test_support.verbose: 415 print("last SIGVTALRM handler call") 416 417 self.hndl_count += 1 418 419 if test_support.verbose: 420 print("SIGVTALRM handler invoked", args) 421 422 def sig_prof(self, *args): 423 self.hndl_called = True 424 signal.setitimer(signal.ITIMER_PROF, 0) 425 426 if test_support.verbose: 427 print("SIGPROF handler invoked", args) 428 429 def test_itimer_exc(self): 430 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 431 # defines it ? 432 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 433 # Negative times are treated as zero on some platforms. 434 if 0: 435 self.assertRaises(signal.ItimerError, 436 signal.setitimer, signal.ITIMER_REAL, -1) 437 438 def test_itimer_real(self): 439 self.itimer = signal.ITIMER_REAL 440 signal.setitimer(self.itimer, 1.0) 441 if test_support.verbose: 442 print("\ncall pause()...") 443 signal.pause() 444 445 self.assertEqual(self.hndl_called, True) 446 447 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 448 @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), 449 'itimer not reliable (does not mix well with threading) on some BSDs.') 450 def test_itimer_virtual(self): 451 self.itimer = signal.ITIMER_VIRTUAL 452 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 453 signal.setitimer(self.itimer, 0.3, 0.2) 454 455 start_time = time.time() 456 while time.time() - start_time < 60.0: 457 # use up some virtual time by doing real work 458 _ = pow(12345, 67890, 10000019) 459 if signal.getitimer(self.itimer) == (0.0, 0.0): 460 break # sig_vtalrm handler stopped this itimer 461 else: # Issue 8424 462 self.skipTest("timeout: likely cause: machine too slow or load too " 463 "high") 464 465 # virtual itimer should be (0.0, 0.0) now 466 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 467 # and the handler should have been called 468 self.assertEqual(self.hndl_called, True) 469 470 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 471 @unittest.skipIf(sys.platform=='freebsd6', 472 'itimer not reliable (does not mix well with threading) on freebsd6') 473 def test_itimer_prof(self): 474 self.itimer = signal.ITIMER_PROF 475 signal.signal(signal.SIGPROF, self.sig_prof) 476 signal.setitimer(self.itimer, 0.2, 0.2) 477 478 start_time = time.time() 479 while time.time() - start_time < 60.0: 480 # do some work 481 _ = pow(12345, 67890, 10000019) 482 if signal.getitimer(self.itimer) == (0.0, 0.0): 483 break # sig_prof handler stopped this itimer 484 else: # Issue 8424 485 self.skipTest("timeout: likely cause: machine too slow or load too " 486 "high") 487 488 # profiling itimer should be (0.0, 0.0) now 489 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 490 # and the handler should have been called 491 self.assertEqual(self.hndl_called, True) 492 493 def test_main(): 494 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests, 495 WakeupFDTests, WakeupSignalTests, 496 SiginterruptTest, ItimerTest, 497 WindowsSignalTests) 498 499 500 if __name__ == "__main__": 501 test_main() 502