1 from contextlib import contextmanager 2 import datetime 3 import faulthandler 4 import os 5 import re 6 import signal 7 import subprocess 8 import sys 9 from test import support 10 from test.support import script_helper, is_android, requires_android_level 11 import tempfile 12 import unittest 13 from textwrap import dedent 14 15 try: 16 import threading 17 HAVE_THREADS = True 18 except ImportError: 19 HAVE_THREADS = False 20 try: 21 import _testcapi 22 except ImportError: 23 _testcapi = None 24 25 TIMEOUT = 0.5 26 MS_WINDOWS = (os.name == 'nt') 27 28 def expected_traceback(lineno1, lineno2, header, min_count=1): 29 regex = header 30 regex += ' File "<string>", line %s in func\n' % lineno1 31 regex += ' File "<string>", line %s in <module>' % lineno2 32 if 1 < min_count: 33 return '^' + (regex + '\n') * (min_count - 1) + regex 34 else: 35 return '^' + regex + '$' 36 37 @contextmanager 38 def temporary_filename(): 39 filename = tempfile.mktemp() 40 try: 41 yield filename 42 finally: 43 support.unlink(filename) 44 45 def requires_raise(test): 46 return (test if not is_android else 47 requires_android_level(24, 'raise() is buggy')(test)) 48 49 class FaultHandlerTests(unittest.TestCase): 50 def get_output(self, code, filename=None, fd=None): 51 """ 52 Run the specified code in Python (in a new child process) and read the 53 output from the standard error or from a file (if filename is set). 54 Return the output lines as a list. 55 56 Strip the reference count from the standard error for Python debug 57 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current 58 thread XXX". 59 """ 60 code = dedent(code).strip() 61 pass_fds = [] 62 if fd is not None: 63 pass_fds.append(fd) 64 with support.SuppressCrashReport(): 65 process = script_helper.spawn_python('-c', code, pass_fds=pass_fds) 66 with process: 67 stdout, stderr = process.communicate() 68 exitcode = process.wait() 69 output = support.strip_python_stderr(stdout) 70 output = output.decode('ascii', 'backslashreplace') 71 if filename: 72 self.assertEqual(output, '') 73 with open(filename, "rb") as fp: 74 output = fp.read() 75 output = output.decode('ascii', 'backslashreplace') 76 elif fd is not None: 77 self.assertEqual(output, '') 78 os.lseek(fd, os.SEEK_SET, 0) 79 with open(fd, "rb", closefd=False) as fp: 80 output = fp.read() 81 output = output.decode('ascii', 'backslashreplace') 82 return output.splitlines(), exitcode 83 84 def check_error(self, code, line_number, fatal_error, *, 85 filename=None, all_threads=True, other_regex=None, 86 fd=None, know_current_thread=True): 87 """ 88 Check that the fault handler for fatal errors is enabled and check the 89 traceback from the child process output. 90 91 Raise an error if the output doesn't match the expected format. 92 """ 93 if all_threads: 94 if know_current_thread: 95 header = 'Current thread 0x[0-9a-f]+' 96 else: 97 header = 'Thread 0x[0-9a-f]+' 98 else: 99 header = 'Stack' 100 regex = r""" 101 ^{fatal_error} 102 103 {header} \(most recent call first\): 104 File "<string>", line {lineno} in <module> 105 """ 106 regex = dedent(regex.format( 107 lineno=line_number, 108 fatal_error=fatal_error, 109 header=header)).strip() 110 if other_regex: 111 regex += '|' + other_regex 112 output, exitcode = self.get_output(code, filename=filename, fd=fd) 113 output = '\n'.join(output) 114 self.assertRegex(output, regex) 115 self.assertNotEqual(exitcode, 0) 116 117 def check_fatal_error(self, code, line_number, name_regex, **kw): 118 fatal_error = 'Fatal Python error: %s' % name_regex 119 self.check_error(code, line_number, fatal_error, **kw) 120 121 def check_windows_exception(self, code, line_number, name_regex, **kw): 122 fatal_error = 'Windows fatal exception: %s' % name_regex 123 self.check_error(code, line_number, fatal_error, **kw) 124 125 @unittest.skipIf(sys.platform.startswith('aix'), 126 "the first page of memory is a mapped read-only on AIX") 127 def test_read_null(self): 128 if not MS_WINDOWS: 129 self.check_fatal_error(""" 130 import faulthandler 131 faulthandler.enable() 132 faulthandler._read_null() 133 """, 134 3, 135 # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion 136 '(?:Segmentation fault' 137 '|Bus error' 138 '|Illegal instruction)') 139 else: 140 self.check_windows_exception(""" 141 import faulthandler 142 faulthandler.enable() 143 faulthandler._read_null() 144 """, 145 3, 146 'access violation') 147 148 @requires_raise 149 def test_sigsegv(self): 150 self.check_fatal_error(""" 151 import faulthandler 152 faulthandler.enable() 153 faulthandler._sigsegv() 154 """, 155 3, 156 'Segmentation fault') 157 158 @unittest.skipIf(not HAVE_THREADS, 'need threads') 159 def test_fatal_error_c_thread(self): 160 self.check_fatal_error(""" 161 import faulthandler 162 faulthandler.enable() 163 faulthandler._fatal_error_c_thread() 164 """, 165 3, 166 'in new thread', 167 know_current_thread=False) 168 169 def test_sigabrt(self): 170 self.check_fatal_error(""" 171 import faulthandler 172 faulthandler.enable() 173 faulthandler._sigabrt() 174 """, 175 3, 176 'Aborted') 177 178 @unittest.skipIf(sys.platform == 'win32', 179 "SIGFPE cannot be caught on Windows") 180 def test_sigfpe(self): 181 self.check_fatal_error(""" 182 import faulthandler 183 faulthandler.enable() 184 faulthandler._sigfpe() 185 """, 186 3, 187 'Floating point exception') 188 189 @unittest.skipIf(_testcapi is None, 'need _testcapi') 190 @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS') 191 @requires_raise 192 def test_sigbus(self): 193 self.check_fatal_error(""" 194 import _testcapi 195 import faulthandler 196 import signal 197 198 faulthandler.enable() 199 _testcapi.raise_signal(signal.SIGBUS) 200 """, 201 6, 202 'Bus error') 203 204 @unittest.skipIf(_testcapi is None, 'need _testcapi') 205 @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL') 206 @requires_raise 207 def test_sigill(self): 208 self.check_fatal_error(""" 209 import _testcapi 210 import faulthandler 211 import signal 212 213 faulthandler.enable() 214 _testcapi.raise_signal(signal.SIGILL) 215 """, 216 6, 217 'Illegal instruction') 218 219 def test_fatal_error(self): 220 self.check_fatal_error(""" 221 import faulthandler 222 faulthandler._fatal_error(b'xyz') 223 """, 224 2, 225 'xyz') 226 227 def test_fatal_error_without_gil(self): 228 self.check_fatal_error(""" 229 import faulthandler 230 faulthandler._fatal_error(b'xyz', True) 231 """, 232 2, 233 'xyz') 234 235 @unittest.skipIf(sys.platform.startswith('openbsd') and HAVE_THREADS, 236 "Issue #12868: sigaltstack() doesn't work on " 237 "OpenBSD if Python is compiled with pthread") 238 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'), 239 'need faulthandler._stack_overflow()') 240 def test_stack_overflow(self): 241 self.check_fatal_error(""" 242 import faulthandler 243 faulthandler.enable() 244 faulthandler._stack_overflow() 245 """, 246 3, 247 '(?:Segmentation fault|Bus error)', 248 other_regex='unable to raise a stack overflow') 249 250 @requires_raise 251 def test_gil_released(self): 252 self.check_fatal_error(""" 253 import faulthandler 254 faulthandler.enable() 255 faulthandler._sigsegv(True) 256 """, 257 3, 258 'Segmentation fault') 259 260 @requires_raise 261 def test_enable_file(self): 262 with temporary_filename() as filename: 263 self.check_fatal_error(""" 264 import faulthandler 265 output = open({filename}, 'wb') 266 faulthandler.enable(output) 267 faulthandler._sigsegv() 268 """.format(filename=repr(filename)), 269 4, 270 'Segmentation fault', 271 filename=filename) 272 273 @unittest.skipIf(sys.platform == "win32", 274 "subprocess doesn't support pass_fds on Windows") 275 @requires_raise 276 def test_enable_fd(self): 277 with tempfile.TemporaryFile('wb+') as fp: 278 fd = fp.fileno() 279 self.check_fatal_error(""" 280 import faulthandler 281 import sys 282 faulthandler.enable(%s) 283 faulthandler._sigsegv() 284 """ % fd, 285 4, 286 'Segmentation fault', 287 fd=fd) 288 289 @requires_raise 290 def test_enable_single_thread(self): 291 self.check_fatal_error(""" 292 import faulthandler 293 faulthandler.enable(all_threads=False) 294 faulthandler._sigsegv() 295 """, 296 3, 297 'Segmentation fault', 298 all_threads=False) 299 300 @requires_raise 301 def test_disable(self): 302 code = """ 303 import faulthandler 304 faulthandler.enable() 305 faulthandler.disable() 306 faulthandler._sigsegv() 307 """ 308 not_expected = 'Fatal Python error' 309 stderr, exitcode = self.get_output(code) 310 stderr = '\n'.join(stderr) 311 self.assertTrue(not_expected not in stderr, 312 "%r is present in %r" % (not_expected, stderr)) 313 self.assertNotEqual(exitcode, 0) 314 315 def test_is_enabled(self): 316 orig_stderr = sys.stderr 317 try: 318 # regrtest may replace sys.stderr by io.StringIO object, but 319 # faulthandler.enable() requires that sys.stderr has a fileno() 320 # method 321 sys.stderr = sys.__stderr__ 322 323 was_enabled = faulthandler.is_enabled() 324 try: 325 faulthandler.enable() 326 self.assertTrue(faulthandler.is_enabled()) 327 faulthandler.disable() 328 self.assertFalse(faulthandler.is_enabled()) 329 finally: 330 if was_enabled: 331 faulthandler.enable() 332 else: 333 faulthandler.disable() 334 finally: 335 sys.stderr = orig_stderr 336 337 def test_disabled_by_default(self): 338 # By default, the module should be disabled 339 code = "import faulthandler; print(faulthandler.is_enabled())" 340 args = filter(None, (sys.executable, 341 "-E" if sys.flags.ignore_environment else "", 342 "-c", code)) 343 env = os.environ.copy() 344 env.pop("PYTHONFAULTHANDLER", None) 345 # don't use assert_python_ok() because it always enables faulthandler 346 output = subprocess.check_output(args, env=env) 347 self.assertEqual(output.rstrip(), b"False") 348 349 def test_sys_xoptions(self): 350 # Test python -X faulthandler 351 code = "import faulthandler; print(faulthandler.is_enabled())" 352 args = filter(None, (sys.executable, 353 "-E" if sys.flags.ignore_environment else "", 354 "-X", "faulthandler", "-c", code)) 355 env = os.environ.copy() 356 env.pop("PYTHONFAULTHANDLER", None) 357 # don't use assert_python_ok() because it always enables faulthandler 358 output = subprocess.check_output(args, env=env) 359 self.assertEqual(output.rstrip(), b"True") 360 361 def test_env_var(self): 362 # empty env var 363 code = "import faulthandler; print(faulthandler.is_enabled())" 364 args = (sys.executable, "-c", code) 365 env = os.environ.copy() 366 env['PYTHONFAULTHANDLER'] = '' 367 # don't use assert_python_ok() because it always enables faulthandler 368 output = subprocess.check_output(args, env=env) 369 self.assertEqual(output.rstrip(), b"False") 370 371 # non-empty env var 372 env = os.environ.copy() 373 env['PYTHONFAULTHANDLER'] = '1' 374 output = subprocess.check_output(args, env=env) 375 self.assertEqual(output.rstrip(), b"True") 376 377 def check_dump_traceback(self, *, filename=None, fd=None): 378 """ 379 Explicitly call dump_traceback() function and check its output. 380 Raise an error if the output doesn't match the expected format. 381 """ 382 code = """ 383 import faulthandler 384 385 filename = {filename!r} 386 fd = {fd} 387 388 def funcB(): 389 if filename: 390 with open(filename, "wb") as fp: 391 faulthandler.dump_traceback(fp, all_threads=False) 392 elif fd is not None: 393 faulthandler.dump_traceback(fd, 394 all_threads=False) 395 else: 396 faulthandler.dump_traceback(all_threads=False) 397 398 def funcA(): 399 funcB() 400 401 funcA() 402 """ 403 code = code.format( 404 filename=filename, 405 fd=fd, 406 ) 407 if filename: 408 lineno = 9 409 elif fd is not None: 410 lineno = 12 411 else: 412 lineno = 14 413 expected = [ 414 'Stack (most recent call first):', 415 ' File "<string>", line %s in funcB' % lineno, 416 ' File "<string>", line 17 in funcA', 417 ' File "<string>", line 19 in <module>' 418 ] 419 trace, exitcode = self.get_output(code, filename, fd) 420 self.assertEqual(trace, expected) 421 self.assertEqual(exitcode, 0) 422 423 def test_dump_traceback(self): 424 self.check_dump_traceback() 425 426 def test_dump_traceback_file(self): 427 with temporary_filename() as filename: 428 self.check_dump_traceback(filename=filename) 429 430 @unittest.skipIf(sys.platform == "win32", 431 "subprocess doesn't support pass_fds on Windows") 432 def test_dump_traceback_fd(self): 433 with tempfile.TemporaryFile('wb+') as fp: 434 self.check_dump_traceback(fd=fp.fileno()) 435 436 def test_truncate(self): 437 maxlen = 500 438 func_name = 'x' * (maxlen + 50) 439 truncated = 'x' * maxlen + '...' 440 code = """ 441 import faulthandler 442 443 def {func_name}(): 444 faulthandler.dump_traceback(all_threads=False) 445 446 {func_name}() 447 """ 448 code = code.format( 449 func_name=func_name, 450 ) 451 expected = [ 452 'Stack (most recent call first):', 453 ' File "<string>", line 4 in %s' % truncated, 454 ' File "<string>", line 6 in <module>' 455 ] 456 trace, exitcode = self.get_output(code) 457 self.assertEqual(trace, expected) 458 self.assertEqual(exitcode, 0) 459 460 @unittest.skipIf(not HAVE_THREADS, 'need threads') 461 def check_dump_traceback_threads(self, filename): 462 """ 463 Call explicitly dump_traceback(all_threads=True) and check the output. 464 Raise an error if the output doesn't match the expected format. 465 """ 466 code = """ 467 import faulthandler 468 from threading import Thread, Event 469 import time 470 471 def dump(): 472 if {filename}: 473 with open({filename}, "wb") as fp: 474 faulthandler.dump_traceback(fp, all_threads=True) 475 else: 476 faulthandler.dump_traceback(all_threads=True) 477 478 class Waiter(Thread): 479 # avoid blocking if the main thread raises an exception. 480 daemon = True 481 482 def __init__(self): 483 Thread.__init__(self) 484 self.running = Event() 485 self.stop = Event() 486 487 def run(self): 488 self.running.set() 489 self.stop.wait() 490 491 waiter = Waiter() 492 waiter.start() 493 waiter.running.wait() 494 dump() 495 waiter.stop.set() 496 waiter.join() 497 """ 498 code = code.format(filename=repr(filename)) 499 output, exitcode = self.get_output(code, filename) 500 output = '\n'.join(output) 501 if filename: 502 lineno = 8 503 else: 504 lineno = 10 505 regex = r""" 506 ^Thread 0x[0-9a-f]+ \(most recent call first\): 507 (?: File ".*threading.py", line [0-9]+ in [_a-z]+ 508 ){{1,3}} File "<string>", line 23 in run 509 File ".*threading.py", line [0-9]+ in _bootstrap_inner 510 File ".*threading.py", line [0-9]+ in _bootstrap 511 512 Current thread 0x[0-9a-f]+ \(most recent call first\): 513 File "<string>", line {lineno} in dump 514 File "<string>", line 28 in <module>$ 515 """ 516 regex = dedent(regex.format(lineno=lineno)).strip() 517 self.assertRegex(output, regex) 518 self.assertEqual(exitcode, 0) 519 520 def test_dump_traceback_threads(self): 521 self.check_dump_traceback_threads(None) 522 523 def test_dump_traceback_threads_file(self): 524 with temporary_filename() as filename: 525 self.check_dump_traceback_threads(filename) 526 527 @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'), 528 'need faulthandler.dump_traceback_later()') 529 def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1, 530 *, filename=None, fd=None): 531 """ 532 Check how many times the traceback is written in timeout x 2.5 seconds, 533 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending 534 on repeat and cancel options. 535 536 Raise an error if the output doesn't match the expect format. 537 """ 538 timeout_str = str(datetime.timedelta(seconds=TIMEOUT)) 539 code = """ 540 import faulthandler 541 import time 542 import sys 543 544 timeout = {timeout} 545 repeat = {repeat} 546 cancel = {cancel} 547 loops = {loops} 548 filename = {filename!r} 549 fd = {fd} 550 551 def func(timeout, repeat, cancel, file, loops): 552 for loop in range(loops): 553 faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) 554 if cancel: 555 faulthandler.cancel_dump_traceback_later() 556 time.sleep(timeout * 5) 557 faulthandler.cancel_dump_traceback_later() 558 559 if filename: 560 file = open(filename, "wb") 561 elif fd is not None: 562 file = sys.stderr.fileno() 563 else: 564 file = None 565 func(timeout, repeat, cancel, file, loops) 566 if filename: 567 file.close() 568 """ 569 code = code.format( 570 timeout=TIMEOUT, 571 repeat=repeat, 572 cancel=cancel, 573 loops=loops, 574 filename=filename, 575 fd=fd, 576 ) 577 trace, exitcode = self.get_output(code, filename) 578 trace = '\n'.join(trace) 579 580 if not cancel: 581 count = loops 582 if repeat: 583 count *= 2 584 header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str 585 regex = expected_traceback(17, 26, header, min_count=count) 586 self.assertRegex(trace, regex) 587 else: 588 self.assertEqual(trace, '') 589 self.assertEqual(exitcode, 0) 590 591 def test_dump_traceback_later(self): 592 self.check_dump_traceback_later() 593 594 def test_dump_traceback_later_repeat(self): 595 self.check_dump_traceback_later(repeat=True) 596 597 def test_dump_traceback_later_cancel(self): 598 self.check_dump_traceback_later(cancel=True) 599 600 def test_dump_traceback_later_file(self): 601 with temporary_filename() as filename: 602 self.check_dump_traceback_later(filename=filename) 603 604 @unittest.skipIf(sys.platform == "win32", 605 "subprocess doesn't support pass_fds on Windows") 606 def test_dump_traceback_later_fd(self): 607 with tempfile.TemporaryFile('wb+') as fp: 608 self.check_dump_traceback_later(fd=fp.fileno()) 609 610 def test_dump_traceback_later_twice(self): 611 self.check_dump_traceback_later(loops=2) 612 613 @unittest.skipIf(not hasattr(faulthandler, "register"), 614 "need faulthandler.register") 615 def check_register(self, filename=False, all_threads=False, 616 unregister=False, chain=False, fd=None): 617 """ 618 Register a handler displaying the traceback on a user signal. Raise the 619 signal and check the written traceback. 620 621 If chain is True, check that the previous signal handler is called. 622 623 Raise an error if the output doesn't match the expected format. 624 """ 625 signum = signal.SIGUSR1 626 code = """ 627 import faulthandler 628 import os 629 import signal 630 import sys 631 632 all_threads = {all_threads} 633 signum = {signum} 634 unregister = {unregister} 635 chain = {chain} 636 filename = {filename!r} 637 fd = {fd} 638 639 def func(signum): 640 os.kill(os.getpid(), signum) 641 642 def handler(signum, frame): 643 handler.called = True 644 handler.called = False 645 646 if filename: 647 file = open(filename, "wb") 648 elif fd is not None: 649 file = sys.stderr.fileno() 650 else: 651 file = None 652 if chain: 653 signal.signal(signum, handler) 654 faulthandler.register(signum, file=file, 655 all_threads=all_threads, chain={chain}) 656 if unregister: 657 faulthandler.unregister(signum) 658 func(signum) 659 if chain and not handler.called: 660 if file is not None: 661 output = file 662 else: 663 output = sys.stderr 664 print("Error: signal handler not called!", file=output) 665 exitcode = 1 666 else: 667 exitcode = 0 668 if filename: 669 file.close() 670 sys.exit(exitcode) 671 """ 672 code = code.format( 673 all_threads=all_threads, 674 signum=signum, 675 unregister=unregister, 676 chain=chain, 677 filename=filename, 678 fd=fd, 679 ) 680 trace, exitcode = self.get_output(code, filename) 681 trace = '\n'.join(trace) 682 if not unregister: 683 if all_threads: 684 regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n' 685 else: 686 regex = r'Stack \(most recent call first\):\n' 687 regex = expected_traceback(14, 32, regex) 688 self.assertRegex(trace, regex) 689 else: 690 self.assertEqual(trace, '') 691 if unregister: 692 self.assertNotEqual(exitcode, 0) 693 else: 694 self.assertEqual(exitcode, 0) 695 696 def test_register(self): 697 self.check_register() 698 699 def test_unregister(self): 700 self.check_register(unregister=True) 701 702 def test_register_file(self): 703 with temporary_filename() as filename: 704 self.check_register(filename=filename) 705 706 @unittest.skipIf(sys.platform == "win32", 707 "subprocess doesn't support pass_fds on Windows") 708 def test_register_fd(self): 709 with tempfile.TemporaryFile('wb+') as fp: 710 self.check_register(fd=fp.fileno()) 711 712 def test_register_threads(self): 713 self.check_register(all_threads=True) 714 715 def test_register_chain(self): 716 self.check_register(chain=True) 717 718 @contextmanager 719 def check_stderr_none(self): 720 stderr = sys.stderr 721 try: 722 sys.stderr = None 723 with self.assertRaises(RuntimeError) as cm: 724 yield 725 self.assertEqual(str(cm.exception), "sys.stderr is None") 726 finally: 727 sys.stderr = stderr 728 729 def test_stderr_None(self): 730 # Issue #21497: provide a helpful error if sys.stderr is None, 731 # instead of just an attribute error: "None has no attribute fileno". 732 with self.check_stderr_none(): 733 faulthandler.enable() 734 with self.check_stderr_none(): 735 faulthandler.dump_traceback() 736 if hasattr(faulthandler, 'dump_traceback_later'): 737 with self.check_stderr_none(): 738 faulthandler.dump_traceback_later(1e-3) 739 if hasattr(faulthandler, "register"): 740 with self.check_stderr_none(): 741 faulthandler.register(signal.SIGUSR1) 742 743 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 744 def test_raise_exception(self): 745 for exc, name in ( 746 ('EXCEPTION_ACCESS_VIOLATION', 'access violation'), 747 ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'), 748 ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'), 749 ): 750 self.check_windows_exception(f""" 751 import faulthandler 752 faulthandler.enable() 753 faulthandler._raise_exception(faulthandler._{exc}) 754 """, 755 3, 756 name) 757 758 759 760 if __name__ == "__main__": 761 unittest.main() 762