1 import unittest 2 from test import support 3 from contextlib import closing 4 import enum 5 import gc 6 import pickle 7 import select 8 import signal 9 import socket 10 import struct 11 import subprocess 12 import traceback 13 import sys, os, time, errno 14 from test.support.script_helper import assert_python_ok, spawn_python 15 try: 16 import threading 17 except ImportError: 18 threading = None 19 try: 20 import _testcapi 21 except ImportError: 22 _testcapi = None 23 24 25 class GenericTests(unittest.TestCase): 26 27 @unittest.skipIf(threading is None, "test needs threading module") 28 def test_enums(self): 29 for name in dir(signal): 30 sig = getattr(signal, name) 31 if name in {'SIG_DFL', 'SIG_IGN'}: 32 self.assertIsInstance(sig, signal.Handlers) 33 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: 34 self.assertIsInstance(sig, signal.Sigmasks) 35 elif name.startswith('SIG') and not name.startswith('SIG_'): 36 self.assertIsInstance(sig, signal.Signals) 37 elif name.startswith('CTRL_'): 38 self.assertIsInstance(sig, signal.Signals) 39 self.assertEqual(sys.platform, "win32") 40 41 42 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 43 class PosixTests(unittest.TestCase): 44 def trivial_signal_handler(self, *args): 45 pass 46 47 def test_out_of_range_signal_number_raises_error(self): 48 self.assertRaises(ValueError, signal.getsignal, 4242) 49 50 self.assertRaises(ValueError, signal.signal, 4242, 51 self.trivial_signal_handler) 52 53 def test_setting_signal_handler_to_none_raises_error(self): 54 self.assertRaises(TypeError, signal.signal, 55 signal.SIGUSR1, None) 56 57 def test_getsignal(self): 58 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 59 self.assertIsInstance(hup, signal.Handlers) 60 self.assertEqual(signal.getsignal(signal.SIGHUP), 61 self.trivial_signal_handler) 62 signal.signal(signal.SIGHUP, hup) 63 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 64 65 # Issue 3864, unknown if this affects earlier versions of freebsd also 66 @unittest.skipIf(sys.platform=='freebsd6', 67 'inter process signals not reliable (do not mix well with threading) ' 68 'on freebsd6') 69 def test_interprocess_signal(self): 70 dirname = os.path.dirname(__file__) 71 script = os.path.join(dirname, 'signalinterproctester.py') 72 assert_python_ok(script) 73 74 75 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 76 class WindowsSignalTests(unittest.TestCase): 77 def test_issue9324(self): 78 # Updated for issue #10003, adding SIGBREAK 79 handler = lambda x, y: None 80 checked = set() 81 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 82 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 83 signal.SIGTERM): 84 # Set and then reset a handler for signals that work on windows. 85 # Issue #18396, only for signals without a C-level handler. 86 if signal.getsignal(sig) is not None: 87 signal.signal(sig, signal.signal(sig, handler)) 88 checked.add(sig) 89 # Issue #18396: Ensure the above loop at least tested *something* 90 self.assertTrue(checked) 91 92 with self.assertRaises(ValueError): 93 signal.signal(-1, handler) 94 95 with self.assertRaises(ValueError): 96 signal.signal(7, handler) 97 98 99 class WakeupFDTests(unittest.TestCase): 100 101 def test_invalid_fd(self): 102 fd = support.make_bad_fd() 103 self.assertRaises((ValueError, OSError), 104 signal.set_wakeup_fd, fd) 105 106 def test_invalid_socket(self): 107 sock = socket.socket() 108 fd = sock.fileno() 109 sock.close() 110 self.assertRaises((ValueError, OSError), 111 signal.set_wakeup_fd, fd) 112 113 def test_set_wakeup_fd_result(self): 114 r1, w1 = os.pipe() 115 self.addCleanup(os.close, r1) 116 self.addCleanup(os.close, w1) 117 r2, w2 = os.pipe() 118 self.addCleanup(os.close, r2) 119 self.addCleanup(os.close, w2) 120 121 if hasattr(os, 'set_blocking'): 122 os.set_blocking(w1, False) 123 os.set_blocking(w2, False) 124 125 signal.set_wakeup_fd(w1) 126 self.assertEqual(signal.set_wakeup_fd(w2), w1) 127 self.assertEqual(signal.set_wakeup_fd(-1), w2) 128 self.assertEqual(signal.set_wakeup_fd(-1), -1) 129 130 def test_set_wakeup_fd_socket_result(self): 131 sock1 = socket.socket() 132 self.addCleanup(sock1.close) 133 sock1.setblocking(False) 134 fd1 = sock1.fileno() 135 136 sock2 = socket.socket() 137 self.addCleanup(sock2.close) 138 sock2.setblocking(False) 139 fd2 = sock2.fileno() 140 141 signal.set_wakeup_fd(fd1) 142 self.assertEqual(signal.set_wakeup_fd(fd2), fd1) 143 self.assertEqual(signal.set_wakeup_fd(-1), fd2) 144 self.assertEqual(signal.set_wakeup_fd(-1), -1) 145 146 # On Windows, files are always blocking and Windows does not provide a 147 # function to test if a socket is in non-blocking mode. 148 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX") 149 def test_set_wakeup_fd_blocking(self): 150 rfd, wfd = os.pipe() 151 self.addCleanup(os.close, rfd) 152 self.addCleanup(os.close, wfd) 153 154 # fd must be non-blocking 155 os.set_blocking(wfd, True) 156 with self.assertRaises(ValueError) as cm: 157 signal.set_wakeup_fd(wfd) 158 self.assertEqual(str(cm.exception), 159 "the fd %s must be in non-blocking mode" % wfd) 160 161 # non-blocking is ok 162 os.set_blocking(wfd, False) 163 signal.set_wakeup_fd(wfd) 164 signal.set_wakeup_fd(-1) 165 166 167 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 168 class WakeupSignalTests(unittest.TestCase): 169 @unittest.skipIf(_testcapi is None, 'need _testcapi') 170 def check_wakeup(self, test_body, *signals, ordered=True): 171 # use a subprocess to have only one thread 172 code = """if 1: 173 import _testcapi 174 import os 175 import signal 176 import struct 177 178 signals = {!r} 179 180 def handler(signum, frame): 181 pass 182 183 def check_signum(signals): 184 data = os.read(read, len(signals)+1) 185 raised = struct.unpack('%uB' % len(data), data) 186 if not {!r}: 187 raised = set(raised) 188 signals = set(signals) 189 if raised != signals: 190 raise Exception("%r != %r" % (raised, signals)) 191 192 {} 193 194 signal.signal(signal.SIGALRM, handler) 195 read, write = os.pipe() 196 os.set_blocking(write, False) 197 signal.set_wakeup_fd(write) 198 199 test() 200 check_signum(signals) 201 202 os.close(read) 203 os.close(write) 204 """.format(tuple(map(int, signals)), ordered, test_body) 205 206 assert_python_ok('-c', code) 207 208 @unittest.skipIf(_testcapi is None, 'need _testcapi') 209 def test_wakeup_write_error(self): 210 # Issue #16105: write() errors in the C signal handler should not 211 # pass silently. 212 # Use a subprocess to have only one thread. 213 code = """if 1: 214 import _testcapi 215 import errno 216 import os 217 import signal 218 import sys 219 from test.support import captured_stderr 220 221 def handler(signum, frame): 222 1/0 223 224 signal.signal(signal.SIGALRM, handler) 225 r, w = os.pipe() 226 os.set_blocking(r, False) 227 228 # Set wakeup_fd a read-only file descriptor to trigger the error 229 signal.set_wakeup_fd(r) 230 try: 231 with captured_stderr() as err: 232 _testcapi.raise_signal(signal.SIGALRM) 233 except ZeroDivisionError: 234 # An ignored exception should have been printed out on stderr 235 err = err.getvalue() 236 if ('Exception ignored when trying to write to the signal wakeup fd' 237 not in err): 238 raise AssertionError(err) 239 if ('OSError: [Errno %d]' % errno.EBADF) not in err: 240 raise AssertionError(err) 241 else: 242 raise AssertionError("ZeroDivisionError not raised") 243 244 os.close(r) 245 os.close(w) 246 """ 247 r, w = os.pipe() 248 try: 249 os.write(r, b'x') 250 except OSError: 251 pass 252 else: 253 self.skipTest("OS doesn't report write() error on the read end of a pipe") 254 finally: 255 os.close(r) 256 os.close(w) 257 258 assert_python_ok('-c', code) 259 260 def test_wakeup_fd_early(self): 261 self.check_wakeup("""def test(): 262 import select 263 import time 264 265 TIMEOUT_FULL = 10 266 TIMEOUT_HALF = 5 267 268 class InterruptSelect(Exception): 269 pass 270 271 def handler(signum, frame): 272 raise InterruptSelect 273 signal.signal(signal.SIGALRM, handler) 274 275 signal.alarm(1) 276 277 # We attempt to get a signal during the sleep, 278 # before select is called 279 try: 280 select.select([], [], [], TIMEOUT_FULL) 281 except InterruptSelect: 282 pass 283 else: 284 raise Exception("select() was not interrupted") 285 286 before_time = time.monotonic() 287 select.select([read], [], [], TIMEOUT_FULL) 288 after_time = time.monotonic() 289 dt = after_time - before_time 290 if dt >= TIMEOUT_HALF: 291 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 292 """, signal.SIGALRM) 293 294 def test_wakeup_fd_during(self): 295 self.check_wakeup("""def test(): 296 import select 297 import time 298 299 TIMEOUT_FULL = 10 300 TIMEOUT_HALF = 5 301 302 class InterruptSelect(Exception): 303 pass 304 305 def handler(signum, frame): 306 raise InterruptSelect 307 signal.signal(signal.SIGALRM, handler) 308 309 signal.alarm(1) 310 before_time = time.monotonic() 311 # We attempt to get a signal during the select call 312 try: 313 select.select([read], [], [], TIMEOUT_FULL) 314 except InterruptSelect: 315 pass 316 else: 317 raise Exception("select() was not interrupted") 318 after_time = time.monotonic() 319 dt = after_time - before_time 320 if dt >= TIMEOUT_HALF: 321 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 322 """, signal.SIGALRM) 323 324 def test_signum(self): 325 self.check_wakeup("""def test(): 326 import _testcapi 327 signal.signal(signal.SIGUSR1, handler) 328 _testcapi.raise_signal(signal.SIGUSR1) 329 _testcapi.raise_signal(signal.SIGALRM) 330 """, signal.SIGUSR1, signal.SIGALRM) 331 332 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 333 'need signal.pthread_sigmask()') 334 def test_pending(self): 335 self.check_wakeup("""def test(): 336 signum1 = signal.SIGUSR1 337 signum2 = signal.SIGUSR2 338 339 signal.signal(signum1, handler) 340 signal.signal(signum2, handler) 341 342 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) 343 _testcapi.raise_signal(signum1) 344 _testcapi.raise_signal(signum2) 345 # Unblocking the 2 signals calls the C signal handler twice 346 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) 347 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) 348 349 350 @unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') 351 class WakeupSocketSignalTests(unittest.TestCase): 352 353 @unittest.skipIf(_testcapi is None, 'need _testcapi') 354 def test_socket(self): 355 # use a subprocess to have only one thread 356 code = """if 1: 357 import signal 358 import socket 359 import struct 360 import _testcapi 361 362 signum = signal.SIGINT 363 signals = (signum,) 364 365 def handler(signum, frame): 366 pass 367 368 signal.signal(signum, handler) 369 370 read, write = socket.socketpair() 371 read.setblocking(False) 372 write.setblocking(False) 373 signal.set_wakeup_fd(write.fileno()) 374 375 _testcapi.raise_signal(signum) 376 377 data = read.recv(1) 378 if not data: 379 raise Exception("no signum written") 380 raised = struct.unpack('B', data) 381 if raised != signals: 382 raise Exception("%r != %r" % (raised, signals)) 383 384 read.close() 385 write.close() 386 """ 387 388 assert_python_ok('-c', code) 389 390 @unittest.skipIf(_testcapi is None, 'need _testcapi') 391 def test_send_error(self): 392 # Use a subprocess to have only one thread. 393 if os.name == 'nt': 394 action = 'send' 395 else: 396 action = 'write' 397 code = """if 1: 398 import errno 399 import signal 400 import socket 401 import sys 402 import time 403 import _testcapi 404 from test.support import captured_stderr 405 406 signum = signal.SIGINT 407 408 def handler(signum, frame): 409 pass 410 411 signal.signal(signum, handler) 412 413 read, write = socket.socketpair() 414 read.setblocking(False) 415 write.setblocking(False) 416 417 signal.set_wakeup_fd(write.fileno()) 418 419 # Close sockets: send() will fail 420 read.close() 421 write.close() 422 423 with captured_stderr() as err: 424 _testcapi.raise_signal(signum) 425 426 err = err.getvalue() 427 if ('Exception ignored when trying to {action} to the signal wakeup fd' 428 not in err): 429 raise AssertionError(err) 430 """.format(action=action) 431 assert_python_ok('-c', code) 432 433 434 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 435 class SiginterruptTest(unittest.TestCase): 436 437 def readpipe_interrupted(self, interrupt): 438 """Perform a read during which a signal will arrive. Return True if the 439 read is interrupted by the signal and raises an exception. Return False 440 if it returns normally. 441 """ 442 # use a subprocess to have only one thread, to have a timeout on the 443 # blocking read and to not touch signal handling in this process 444 code = """if 1: 445 import errno 446 import os 447 import signal 448 import sys 449 450 interrupt = %r 451 r, w = os.pipe() 452 453 def handler(signum, frame): 454 1 / 0 455 456 signal.signal(signal.SIGALRM, handler) 457 if interrupt is not None: 458 signal.siginterrupt(signal.SIGALRM, interrupt) 459 460 print("ready") 461 sys.stdout.flush() 462 463 # run the test twice 464 try: 465 for loop in range(2): 466 # send a SIGALRM in a second (during the read) 467 signal.alarm(1) 468 try: 469 # blocking call: read from a pipe without data 470 os.read(r, 1) 471 except ZeroDivisionError: 472 pass 473 else: 474 sys.exit(2) 475 sys.exit(3) 476 finally: 477 os.close(r) 478 os.close(w) 479 """ % (interrupt,) 480 with spawn_python('-c', code) as process: 481 try: 482 # wait until the child process is loaded and has started 483 first_line = process.stdout.readline() 484 485 stdout, stderr = process.communicate(timeout=5.0) 486 except subprocess.TimeoutExpired: 487 process.kill() 488 return False 489 else: 490 stdout = first_line + stdout 491 exitcode = process.wait() 492 if exitcode not in (2, 3): 493 raise Exception("Child error (exit code %s): %r" 494 % (exitcode, stdout)) 495 return (exitcode == 3) 496 497 def test_without_siginterrupt(self): 498 # If a signal handler is installed and siginterrupt is not called 499 # at all, when that signal arrives, it interrupts a syscall that's in 500 # progress. 501 interrupted = self.readpipe_interrupted(None) 502 self.assertTrue(interrupted) 503 504 def test_siginterrupt_on(self): 505 # If a signal handler is installed and siginterrupt is called with 506 # a true value for the second argument, when that signal arrives, it 507 # interrupts a syscall that's in progress. 508 interrupted = self.readpipe_interrupted(True) 509 self.assertTrue(interrupted) 510 511 def test_siginterrupt_off(self): 512 # If a signal handler is installed and siginterrupt is called with 513 # a false value for the second argument, when that signal arrives, it 514 # does not interrupt a syscall that's in progress. 515 interrupted = self.readpipe_interrupted(False) 516 self.assertFalse(interrupted) 517 518 519 @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 520 class ItimerTest(unittest.TestCase): 521 def setUp(self): 522 self.hndl_called = False 523 self.hndl_count = 0 524 self.itimer = None 525 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 526 527 def tearDown(self): 528 signal.signal(signal.SIGALRM, self.old_alarm) 529 if self.itimer is not None: # test_itimer_exc doesn't change this attr 530 # just ensure that itimer is stopped 531 signal.setitimer(self.itimer, 0) 532 533 def sig_alrm(self, *args): 534 self.hndl_called = True 535 536 def sig_vtalrm(self, *args): 537 self.hndl_called = True 538 539 if self.hndl_count > 3: 540 # it shouldn't be here, because it should have been disabled. 541 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 542 "timer.") 543 elif self.hndl_count == 3: 544 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 545 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 546 547 self.hndl_count += 1 548 549 def sig_prof(self, *args): 550 self.hndl_called = True 551 signal.setitimer(signal.ITIMER_PROF, 0) 552 553 def test_itimer_exc(self): 554 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 555 # defines it ? 556 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 557 # Negative times are treated as zero on some platforms. 558 if 0: 559 self.assertRaises(signal.ItimerError, 560 signal.setitimer, signal.ITIMER_REAL, -1) 561 562 def test_itimer_real(self): 563 self.itimer = signal.ITIMER_REAL 564 signal.setitimer(self.itimer, 1.0) 565 signal.pause() 566 self.assertEqual(self.hndl_called, True) 567 568 # Issue 3864, unknown if this affects earlier versions of freebsd also 569 @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), 570 'itimer not reliable (does not mix well with threading) on some BSDs.') 571 def test_itimer_virtual(self): 572 self.itimer = signal.ITIMER_VIRTUAL 573 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 574 signal.setitimer(self.itimer, 0.3, 0.2) 575 576 start_time = time.monotonic() 577 while time.monotonic() - start_time < 60.0: 578 # use up some virtual time by doing real work 579 _ = pow(12345, 67890, 10000019) 580 if signal.getitimer(self.itimer) == (0.0, 0.0): 581 break # sig_vtalrm handler stopped this itimer 582 else: # Issue 8424 583 self.skipTest("timeout: likely cause: machine too slow or load too " 584 "high") 585 586 # virtual itimer should be (0.0, 0.0) now 587 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 588 # and the handler should have been called 589 self.assertEqual(self.hndl_called, True) 590 591 # Issue 3864, unknown if this affects earlier versions of freebsd also 592 @unittest.skipIf(sys.platform=='freebsd6', 593 'itimer not reliable (does not mix well with threading) on freebsd6') 594 def test_itimer_prof(self): 595 self.itimer = signal.ITIMER_PROF 596 signal.signal(signal.SIGPROF, self.sig_prof) 597 signal.setitimer(self.itimer, 0.2, 0.2) 598 599 start_time = time.monotonic() 600 while time.monotonic() - start_time < 60.0: 601 # do some work 602 _ = pow(12345, 67890, 10000019) 603 if signal.getitimer(self.itimer) == (0.0, 0.0): 604 break # sig_prof handler stopped this itimer 605 else: # Issue 8424 606 self.skipTest("timeout: likely cause: machine too slow or load too " 607 "high") 608 609 # profiling itimer should be (0.0, 0.0) now 610 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 611 # and the handler should have been called 612 self.assertEqual(self.hndl_called, True) 613 614 615 class PendingSignalsTests(unittest.TestCase): 616 """ 617 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() 618 functions. 619 """ 620 @unittest.skipUnless(hasattr(signal, 'sigpending'), 621 'need signal.sigpending()') 622 def test_sigpending_empty(self): 623 self.assertEqual(signal.sigpending(), set()) 624 625 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 626 'need signal.pthread_sigmask()') 627 @unittest.skipUnless(hasattr(signal, 'sigpending'), 628 'need signal.sigpending()') 629 def test_sigpending(self): 630 code = """if 1: 631 import os 632 import signal 633 634 def handler(signum, frame): 635 1/0 636 637 signum = signal.SIGUSR1 638 signal.signal(signum, handler) 639 640 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 641 os.kill(os.getpid(), signum) 642 pending = signal.sigpending() 643 for sig in pending: 644 assert isinstance(sig, signal.Signals), repr(pending) 645 if pending != {signum}: 646 raise Exception('%s != {%s}' % (pending, signum)) 647 try: 648 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 649 except ZeroDivisionError: 650 pass 651 else: 652 raise Exception("ZeroDivisionError not raised") 653 """ 654 assert_python_ok('-c', code) 655 656 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 657 'need signal.pthread_kill()') 658 def test_pthread_kill(self): 659 code = """if 1: 660 import signal 661 import threading 662 import sys 663 664 signum = signal.SIGUSR1 665 666 def handler(signum, frame): 667 1/0 668 669 signal.signal(signum, handler) 670 671 if sys.platform == 'freebsd6': 672 # Issue #12392 and #12469: send a signal to the main thread 673 # doesn't work before the creation of the first thread on 674 # FreeBSD 6 675 def noop(): 676 pass 677 thread = threading.Thread(target=noop) 678 thread.start() 679 thread.join() 680 681 tid = threading.get_ident() 682 try: 683 signal.pthread_kill(tid, signum) 684 except ZeroDivisionError: 685 pass 686 else: 687 raise Exception("ZeroDivisionError not raised") 688 """ 689 assert_python_ok('-c', code) 690 691 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 692 'need signal.pthread_sigmask()') 693 def wait_helper(self, blocked, test): 694 """ 695 test: body of the "def test(signum):" function. 696 blocked: number of the blocked signal 697 """ 698 code = '''if 1: 699 import signal 700 import sys 701 from signal import Signals 702 703 def handler(signum, frame): 704 1/0 705 706 %s 707 708 blocked = %s 709 signum = signal.SIGALRM 710 711 # child: block and wait the signal 712 try: 713 signal.signal(signum, handler) 714 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) 715 716 # Do the tests 717 test(signum) 718 719 # The handler must not be called on unblock 720 try: 721 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) 722 except ZeroDivisionError: 723 print("the signal handler has been called", 724 file=sys.stderr) 725 sys.exit(1) 726 except BaseException as err: 727 print("error: {}".format(err), file=sys.stderr) 728 sys.stderr.flush() 729 sys.exit(1) 730 ''' % (test.strip(), blocked) 731 732 # sig*wait* must be called with the signal blocked: since the current 733 # process might have several threads running, use a subprocess to have 734 # a single thread. 735 assert_python_ok('-c', code) 736 737 @unittest.skipUnless(hasattr(signal, 'sigwait'), 738 'need signal.sigwait()') 739 def test_sigwait(self): 740 self.wait_helper(signal.SIGALRM, ''' 741 def test(signum): 742 signal.alarm(1) 743 received = signal.sigwait([signum]) 744 assert isinstance(received, signal.Signals), received 745 if received != signum: 746 raise Exception('received %s, not %s' % (received, signum)) 747 ''') 748 749 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 750 'need signal.sigwaitinfo()') 751 def test_sigwaitinfo(self): 752 self.wait_helper(signal.SIGALRM, ''' 753 def test(signum): 754 signal.alarm(1) 755 info = signal.sigwaitinfo([signum]) 756 if info.si_signo != signum: 757 raise Exception("info.si_signo != %s" % signum) 758 ''') 759 760 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 761 'need signal.sigtimedwait()') 762 def test_sigtimedwait(self): 763 self.wait_helper(signal.SIGALRM, ''' 764 def test(signum): 765 signal.alarm(1) 766 info = signal.sigtimedwait([signum], 10.1000) 767 if info.si_signo != signum: 768 raise Exception('info.si_signo != %s' % signum) 769 ''') 770 771 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 772 'need signal.sigtimedwait()') 773 def test_sigtimedwait_poll(self): 774 # check that polling with sigtimedwait works 775 self.wait_helper(signal.SIGALRM, ''' 776 def test(signum): 777 import os 778 os.kill(os.getpid(), signum) 779 info = signal.sigtimedwait([signum], 0) 780 if info.si_signo != signum: 781 raise Exception('info.si_signo != %s' % signum) 782 ''') 783 784 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 785 'need signal.sigtimedwait()') 786 def test_sigtimedwait_timeout(self): 787 self.wait_helper(signal.SIGALRM, ''' 788 def test(signum): 789 received = signal.sigtimedwait([signum], 1.0) 790 if received is not None: 791 raise Exception("received=%r" % (received,)) 792 ''') 793 794 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 795 'need signal.sigtimedwait()') 796 def test_sigtimedwait_negative_timeout(self): 797 signum = signal.SIGALRM 798 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) 799 800 @unittest.skipUnless(hasattr(signal, 'sigwait'), 801 'need signal.sigwait()') 802 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 803 'need signal.pthread_sigmask()') 804 @unittest.skipIf(threading is None, "test needs threading module") 805 def test_sigwait_thread(self): 806 # Check that calling sigwait() from a thread doesn't suspend the whole 807 # process. A new interpreter is spawned to avoid problems when mixing 808 # threads and fork(): only async-safe functions are allowed between 809 # fork() and exec(). 810 assert_python_ok("-c", """if True: 811 import os, threading, sys, time, signal 812 813 # the default handler terminates the process 814 signum = signal.SIGUSR1 815 816 def kill_later(): 817 # wait until the main thread is waiting in sigwait() 818 time.sleep(1) 819 os.kill(os.getpid(), signum) 820 821 # the signal must be blocked by all the threads 822 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 823 killer = threading.Thread(target=kill_later) 824 killer.start() 825 received = signal.sigwait([signum]) 826 if received != signum: 827 print("sigwait() received %s, not %s" % (received, signum), 828 file=sys.stderr) 829 sys.exit(1) 830 killer.join() 831 # unblock the signal, which should have been cleared by sigwait() 832 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 833 """) 834 835 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 836 'need signal.pthread_sigmask()') 837 def test_pthread_sigmask_arguments(self): 838 self.assertRaises(TypeError, signal.pthread_sigmask) 839 self.assertRaises(TypeError, signal.pthread_sigmask, 1) 840 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) 841 self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) 842 843 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 844 'need signal.pthread_sigmask()') 845 def test_pthread_sigmask(self): 846 code = """if 1: 847 import signal 848 import os; import threading 849 850 def handler(signum, frame): 851 1/0 852 853 def kill(signum): 854 os.kill(os.getpid(), signum) 855 856 def check_mask(mask): 857 for sig in mask: 858 assert isinstance(sig, signal.Signals), repr(sig) 859 860 def read_sigmask(): 861 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) 862 check_mask(sigmask) 863 return sigmask 864 865 signum = signal.SIGUSR1 866 867 # Install our signal handler 868 old_handler = signal.signal(signum, handler) 869 870 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler 871 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 872 check_mask(old_mask) 873 try: 874 kill(signum) 875 except ZeroDivisionError: 876 pass 877 else: 878 raise Exception("ZeroDivisionError not raised") 879 880 # Block and then raise SIGUSR1. The signal is blocked: the signal 881 # handler is not called, and the signal is now pending 882 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 883 check_mask(mask) 884 kill(signum) 885 886 # Check the new mask 887 blocked = read_sigmask() 888 check_mask(blocked) 889 if signum not in blocked: 890 raise Exception("%s not in %s" % (signum, blocked)) 891 if old_mask ^ blocked != {signum}: 892 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) 893 894 # Unblock SIGUSR1 895 try: 896 # unblock the pending signal calls immediately the signal handler 897 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 898 except ZeroDivisionError: 899 pass 900 else: 901 raise Exception("ZeroDivisionError not raised") 902 try: 903 kill(signum) 904 except ZeroDivisionError: 905 pass 906 else: 907 raise Exception("ZeroDivisionError not raised") 908 909 # Check the new mask 910 unblocked = read_sigmask() 911 if signum in unblocked: 912 raise Exception("%s in %s" % (signum, unblocked)) 913 if blocked ^ unblocked != {signum}: 914 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) 915 if old_mask != unblocked: 916 raise Exception("%s != %s" % (old_mask, unblocked)) 917 """ 918 assert_python_ok('-c', code) 919 920 @unittest.skipIf(sys.platform == 'freebsd6', 921 "issue #12392: send a signal to the main thread doesn't work " 922 "before the creation of the first thread on FreeBSD 6") 923 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 924 'need signal.pthread_kill()') 925 def test_pthread_kill_main_thread(self): 926 # Test that a signal can be sent to the main thread with pthread_kill() 927 # before any other thread has been created (see issue #12392). 928 code = """if True: 929 import threading 930 import signal 931 import sys 932 933 def handler(signum, frame): 934 sys.exit(3) 935 936 signal.signal(signal.SIGUSR1, handler) 937 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) 938 sys.exit(2) 939 """ 940 941 with spawn_python('-c', code) as process: 942 stdout, stderr = process.communicate() 943 exitcode = process.wait() 944 if exitcode != 3: 945 raise Exception("Child error (exit code %s): %s" % 946 (exitcode, stdout)) 947 948 949 def tearDownModule(): 950 support.reap_children() 951 952 if __name__ == "__main__": 953 unittest.main() 954