1 """ 2 Tests of regrtest.py. 3 4 Note: test_regrtest cannot be run twice in parallel. 5 """ 6 from __future__ import print_function 7 8 import collections 9 import errno 10 import os.path 11 import platform 12 import re 13 import subprocess 14 import sys 15 import sysconfig 16 import tempfile 17 import textwrap 18 import unittest 19 from test import support 20 # Use utils alias to use the same code for TestUtils in master and 2.7 branches 21 import regrtest as utils 22 23 24 Py_DEBUG = hasattr(sys, 'getobjects') 25 ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..') 26 ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR)) 27 28 TEST_INTERRUPTED = textwrap.dedent(""" 29 from signal import SIGINT 30 try: 31 from _testcapi import raise_signal 32 raise_signal(SIGINT) 33 except ImportError: 34 import os 35 os.kill(os.getpid(), SIGINT) 36 """) 37 38 39 SubprocessRun = collections.namedtuple('SubprocessRun', 40 'returncode stdout stderr') 41 42 43 class BaseTestCase(unittest.TestCase): 44 TEST_UNIQUE_ID = 1 45 TESTNAME_PREFIX = 'test_regrtest_' 46 TESTNAME_REGEX = r'test_[a-zA-Z0-9_]+' 47 48 def setUp(self): 49 self.testdir = os.path.realpath(os.path.dirname(__file__)) 50 51 self.tmptestdir = tempfile.mkdtemp() 52 self.addCleanup(support.rmtree, self.tmptestdir) 53 54 def create_test(self, name=None, code=''): 55 if not name: 56 name = 'noop%s' % BaseTestCase.TEST_UNIQUE_ID 57 BaseTestCase.TEST_UNIQUE_ID += 1 58 59 # test_regrtest cannot be run twice in parallel because 60 # of setUp() and create_test() 61 name = self.TESTNAME_PREFIX + name 62 path = os.path.join(self.tmptestdir, name + '.py') 63 64 self.addCleanup(support.unlink, path) 65 # Use O_EXCL to ensure that we do not override existing tests 66 try: 67 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 68 except OSError as exc: 69 if (exc.errno in (errno.EACCES, errno.EPERM) 70 and not sysconfig.is_python_build()): 71 self.skipTest("cannot write %s: %s" % (path, exc)) 72 else: 73 raise 74 else: 75 with os.fdopen(fd, 'w') as fp: 76 fp.write(code) 77 return name 78 79 def regex_search(self, regex, output): 80 match = re.search(regex, output, re.MULTILINE) 81 if not match: 82 self.fail("%r not found in %r" % (regex, output)) 83 return match 84 85 def check_line(self, output, regex): 86 regex = re.compile(r'^' + regex, re.MULTILINE) 87 self.assertRegexpMatches(output, regex) 88 89 def parse_executed_tests(self, output): 90 regex = (r'^[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?\[ *[0-9]+(?:/ *[0-9]+)*\] (%s)' 91 % self.TESTNAME_REGEX) 92 parser = re.finditer(regex, output, re.MULTILINE) 93 return list(match.group(1) for match in parser) 94 95 def check_executed_tests(self, output, tests, skipped=(), failed=(), 96 env_changed=(), omitted=(), 97 rerun=(), 98 randomize=False, interrupted=False, 99 fail_env_changed=False): 100 if isinstance(tests, str): 101 tests = [tests] 102 if isinstance(skipped, str): 103 skipped = [skipped] 104 if isinstance(failed, str): 105 failed = [failed] 106 if isinstance(env_changed, str): 107 env_changed = [env_changed] 108 if isinstance(omitted, str): 109 omitted = [omitted] 110 if isinstance(rerun, str): 111 rerun = [rerun] 112 113 executed = self.parse_executed_tests(output) 114 if randomize: 115 self.assertEqual(set(executed), set(tests), output) 116 else: 117 self.assertEqual(executed, tests, (executed, tests, output)) 118 119 def plural(count): 120 return 's' if count != 1 else '' 121 122 def list_regex(line_format, tests): 123 count = len(tests) 124 names = ' '.join(sorted(tests)) 125 regex = line_format % (count, plural(count)) 126 regex = r'%s:\n %s$' % (regex, names) 127 return regex 128 129 if skipped: 130 regex = list_regex('%s test%s skipped', skipped) 131 self.check_line(output, regex) 132 133 if failed: 134 regex = list_regex('%s test%s failed', failed) 135 self.check_line(output, regex) 136 137 if env_changed: 138 regex = list_regex('%s test%s altered the execution environment', 139 env_changed) 140 self.check_line(output, regex) 141 142 if omitted: 143 regex = list_regex('%s test%s omitted', omitted) 144 self.check_line(output, regex) 145 146 if rerun: 147 regex = list_regex('%s re-run test%s', rerun) 148 self.check_line(output, regex) 149 self.check_line(output, "Re-running failed tests in verbose mode") 150 for name in rerun: 151 regex = "Re-running test %r in verbose mode" % name 152 self.check_line(output, regex) 153 154 good = (len(tests) - len(skipped) - len(failed) 155 - len(omitted) - len(env_changed)) 156 if good: 157 regex = r'%s test%s OK\.$' % (good, plural(good)) 158 if not skipped and not failed and good > 1: 159 regex = 'All %s' % regex 160 self.check_line(output, regex) 161 162 if interrupted: 163 self.check_line(output, 'Test suite interrupted by signal SIGINT.') 164 165 result = [] 166 if failed: 167 result.append('FAILURE') 168 elif fail_env_changed and env_changed: 169 result.append('ENV CHANGED') 170 if interrupted: 171 result.append('INTERRUPTED') 172 if not result: 173 result.append('SUCCESS') 174 result = ', '.join(result) 175 if rerun: 176 self.check_line(output, 'Tests result: %s' % result) 177 result = 'FAILURE then %s' % result 178 self.check_line(output, 'Tests result: %s' % result) 179 180 def parse_random_seed(self, output): 181 match = self.regex_search(r'Using random seed ([0-9]+)', output) 182 randseed = int(match.group(1)) 183 self.assertTrue(0 <= randseed <= 10000000, randseed) 184 return randseed 185 186 def run_command(self, args, input=None, exitcode=0, **kw): 187 if not input: 188 input = '' 189 if 'stderr' not in kw: 190 kw['stderr'] = subprocess.PIPE 191 proc = subprocess.Popen(args, 192 universal_newlines=True, 193 stdout=subprocess.PIPE, 194 **kw) 195 stdout, stderr = proc.communicate(input=input) 196 if proc.returncode != exitcode: 197 msg = ("Command %s failed with exit code %s\n" 198 "\n" 199 "stdout:\n" 200 "---\n" 201 "%s\n" 202 "---\n" 203 % (str(args), proc.returncode, stdout)) 204 if proc.stderr: 205 msg += ("\n" 206 "stderr:\n" 207 "---\n" 208 "%s" 209 "---\n" 210 % stderr) 211 self.fail(msg) 212 return SubprocessRun(proc.returncode, stdout, stderr) 213 214 def run_python(self, args, **kw): 215 args = [sys.executable] + list(args) 216 proc = self.run_command(args, **kw) 217 return proc.stdout 218 219 220 class ProgramsTestCase(BaseTestCase): 221 """ 222 Test various ways to run the Python test suite. Use options close 223 to options used on the buildbot. 224 """ 225 226 NTEST = 4 227 228 def setUp(self): 229 super(ProgramsTestCase, self).setUp() 230 231 # Create NTEST tests doing nothing 232 self.tests = [self.create_test() for index in range(self.NTEST)] 233 234 self.python_args = ['-Wd', '-3', '-E', '-bb', '-tt'] 235 self.regrtest_args = ['-uall', '-rwW', 236 '--testdir=%s' % self.tmptestdir] 237 238 def check_output(self, output): 239 self.parse_random_seed(output) 240 self.check_executed_tests(output, self.tests, randomize=True) 241 242 def run_tests(self, args): 243 output = self.run_python(args) 244 self.check_output(output) 245 246 def test_script_regrtest(self): 247 # Lib/test/regrtest.py 248 script = os.path.join(self.testdir, 'regrtest.py') 249 250 args = self.python_args + [script] + self.regrtest_args + self.tests 251 self.run_tests(args) 252 253 def test_module_test(self): 254 # -m test 255 args = self.python_args + ['-m', 'test'] + self.regrtest_args + self.tests 256 self.run_tests(args) 257 258 def test_module_regrtest(self): 259 # -m test.regrtest 260 args = self.python_args + ['-m', 'test.regrtest'] + self.regrtest_args + self.tests 261 self.run_tests(args) 262 263 def test_module_autotest(self): 264 # -m test.autotest 265 args = self.python_args + ['-m', 'test.autotest'] + self.regrtest_args + self.tests 266 self.run_tests(args) 267 268 def test_module_from_test_autotest(self): 269 # from test import autotest 270 code = 'from test import autotest' 271 args = self.python_args + ['-c', code] + self.regrtest_args + self.tests 272 self.run_tests(args) 273 274 def test_script_autotest(self): 275 # Lib/test/autotest.py 276 script = os.path.join(self.testdir, 'autotest.py') 277 args = self.python_args + [script] + self.regrtest_args + self.tests 278 self.run_tests(args) 279 280 def run_batch(self, *args): 281 proc = self.run_command(args) 282 self.check_output(proc.stdout) 283 284 def need_pcbuild(self): 285 exe = os.path.normpath(os.path.abspath(sys.executable)) 286 parts = exe.split(os.path.sep) 287 if len(parts) < 3: 288 # it's not a python build, python is likely to be installed 289 return 290 291 build_dir = parts[-3] 292 if build_dir.lower() != 'pcbuild': 293 self.skipTest("Tools/buildbot/test.bat requires PCbuild build, " 294 "found %s" % build_dir) 295 296 @unittest.skipUnless(sysconfig.is_python_build(), 297 'test.bat script is not installed') 298 @unittest.skipUnless(sys.platform == 'win32', 'Windows only') 299 def test_tools_buildbot_test(self): 300 self.need_pcbuild() 301 302 # Tools\buildbot\test.bat 303 script = os.path.join(ROOT_DIR, 'Tools', 'buildbot', 'test.bat') 304 test_args = ['--testdir=%s' % self.tmptestdir] 305 if platform.architecture()[0] == '64bit': 306 test_args.append('-x64') # 64-bit build 307 if not Py_DEBUG: 308 test_args.append('+d') # Release build, use python.exe 309 310 args = [script] + test_args + self.tests 311 self.run_batch(*args) 312 313 @unittest.skipUnless(sys.platform == 'win32', 'Windows only') 314 def test_pcbuild_rt(self): 315 self.need_pcbuild() 316 317 # PCbuild\rt.bat 318 script = os.path.join(ROOT_DIR, r'PCbuild\rt.bat') 319 rt_args = ["-q"] # Quick, don't run tests twice 320 if platform.architecture()[0] == '64bit': 321 rt_args.append('-x64') # 64-bit build 322 if Py_DEBUG: 323 rt_args.append('-d') # Debug build, use python_d.exe 324 args = [script] + rt_args + self.regrtest_args + self.tests 325 self.run_batch(*args) 326 327 328 class ArgsTestCase(BaseTestCase): 329 """ 330 Test arguments of the Python test suite. 331 """ 332 333 def run_tests(self, *testargs, **kw): 334 cmdargs = ('-m', 'test', '--testdir=%s' % self.tmptestdir) + testargs 335 return self.run_python(cmdargs, **kw) 336 337 def test_failing_test(self): 338 # test a failing test 339 code = textwrap.dedent(""" 340 import unittest 341 from test import support 342 343 class FailingTest(unittest.TestCase): 344 def test_failing(self): 345 self.fail("bug") 346 347 def test_main(): 348 support.run_unittest(FailingTest) 349 """) 350 test_ok = self.create_test('ok') 351 test_failing = self.create_test('failing', code=code) 352 tests = [test_ok, test_failing] 353 354 output = self.run_tests(*tests, exitcode=2) 355 self.check_executed_tests(output, tests, failed=test_failing) 356 357 def test_resources(self): 358 # test -u command line option 359 tests = {} 360 for resource in ('audio', 'network'): 361 code = 'from test import support\nsupport.requires(%r)' % resource 362 tests[resource] = self.create_test(resource, code) 363 test_names = sorted(tests.values()) 364 365 # -u all: 2 resources enabled 366 output = self.run_tests('-u', 'all', *test_names) 367 self.check_executed_tests(output, test_names) 368 369 # -u audio: 1 resource enabled 370 output = self.run_tests('-uaudio', *test_names) 371 self.check_executed_tests(output, test_names, 372 skipped=tests['network']) 373 374 # no option: 0 resources enabled 375 output = self.run_tests(*test_names) 376 self.check_executed_tests(output, test_names, 377 skipped=test_names) 378 379 def test_random(self): 380 # test -r and --randseed command line option 381 code = textwrap.dedent(""" 382 import random 383 print("TESTRANDOM: %s" % random.randint(1, 1000)) 384 """) 385 test = self.create_test('random', code) 386 387 # first run to get the output with the random seed 388 output = self.run_tests('-r', '-v', test) 389 randseed = self.parse_random_seed(output) 390 match = self.regex_search(r'TESTRANDOM: ([0-9]+)', output) 391 test_random = int(match.group(1)) 392 393 # try to reproduce with the random seed 394 output = self.run_tests('-r', '-v', '--randseed=%s' % randseed, test) 395 randseed2 = self.parse_random_seed(output) 396 self.assertEqual(randseed2, randseed) 397 398 match = self.regex_search(r'TESTRANDOM: ([0-9]+)', output) 399 test_random2 = int(match.group(1)) 400 self.assertEqual(test_random2, test_random) 401 402 def test_fromfile(self): 403 # test --fromfile 404 tests = [self.create_test() for index in range(5)] 405 406 # Write the list of files using a format similar to regrtest output: 407 # [1/2] test_1 408 # [2/2] test_2 409 filename = support.TESTFN 410 self.addCleanup(support.unlink, filename) 411 412 # test format 'test_opcodes' 413 with open(filename, "w") as fp: 414 for name in tests: 415 print(name, file=fp) 416 417 output = self.run_tests('--fromfile', filename) 418 self.check_executed_tests(output, tests) 419 420 def test_interrupted(self): 421 code = TEST_INTERRUPTED 422 test = self.create_test('sigint', code=code) 423 output = self.run_tests(test, exitcode=130) 424 self.check_executed_tests(output, test, omitted=test, 425 interrupted=True) 426 427 def test_slowest(self): 428 # test --slow 429 tests = [self.create_test() for index in range(3)] 430 output = self.run_tests("--slowest", *tests) 431 self.check_executed_tests(output, tests) 432 regex = ('10 slowest tests:\n' 433 '(?:- %s: .*\n){%s}' 434 % (self.TESTNAME_REGEX, len(tests))) 435 self.check_line(output, regex) 436 437 def test_slow_interrupted(self): 438 # Issue #25373: test --slowest with an interrupted test 439 code = TEST_INTERRUPTED 440 test = self.create_test("sigint", code=code) 441 442 try: 443 import threading 444 tests = (False, True) 445 except ImportError: 446 tests = (False,) 447 for multiprocessing in tests: 448 if multiprocessing: 449 args = ("--slowest", "-j2", test) 450 else: 451 args = ("--slowest", test) 452 output = self.run_tests(*args, exitcode=130) 453 self.check_executed_tests(output, test, 454 omitted=test, interrupted=True) 455 456 regex = ('10 slowest tests:\n') 457 self.check_line(output, regex) 458 459 def test_coverage(self): 460 # test --coverage 461 test = self.create_test('coverage') 462 output = self.run_tests("--coverage", test) 463 self.check_executed_tests(output, [test]) 464 regex = (r'lines +cov% +module +\(path\)\n' 465 r'(?: *[0-9]+ *[0-9]{1,2}% *[^ ]+ +\([^)]+\)+)+') 466 self.check_line(output, regex) 467 468 def test_forever(self): 469 # test --forever 470 code = textwrap.dedent(""" 471 import __builtin__ 472 import unittest 473 from test import support 474 475 class ForeverTester(unittest.TestCase): 476 def test_run(self): 477 # Store the state in the __builtin__ module, because the test 478 # module is reload at each run 479 if 'RUN' in __builtin__.__dict__: 480 __builtin__.__dict__['RUN'] += 1 481 if __builtin__.__dict__['RUN'] >= 3: 482 self.fail("fail at the 3rd runs") 483 else: 484 __builtin__.__dict__['RUN'] = 1 485 486 def test_main(): 487 support.run_unittest(ForeverTester) 488 """) 489 test = self.create_test('forever', code=code) 490 output = self.run_tests('--forever', test, exitcode=2) 491 self.check_executed_tests(output, [test]*3, failed=test) 492 493 def check_leak(self, code, what): 494 test = self.create_test('huntrleaks', code=code) 495 496 filename = 'reflog.txt' 497 self.addCleanup(support.unlink, filename) 498 output = self.run_tests('--huntrleaks', '3:3:', test, 499 exitcode=2, 500 stderr=subprocess.STDOUT) 501 self.check_executed_tests(output, [test], failed=test) 502 503 line = 'beginning 6 repetitions\n123456\n......\n' 504 self.check_line(output, re.escape(line)) 505 506 line2 = '%s leaked [1, 1, 1] %s, sum=3\n' % (test, what) 507 self.assertIn(line2, output) 508 509 with open(filename) as fp: 510 reflog = fp.read() 511 self.assertIn(line2, reflog) 512 513 @unittest.skipUnless(Py_DEBUG, 'need a debug build') 514 @support.requires_type_collecting 515 def test_huntrleaks(self): 516 # test --huntrleaks 517 code = textwrap.dedent(""" 518 import unittest 519 from test import support 520 521 GLOBAL_LIST = [] 522 523 class RefLeakTest(unittest.TestCase): 524 def test_leak(self): 525 GLOBAL_LIST.append(object()) 526 527 def test_main(): 528 support.run_unittest(RefLeakTest) 529 """) 530 self.check_leak(code, 'references') 531 532 @unittest.skipUnless(Py_DEBUG, 'need a debug build') 533 def test_huntrleaks_fd_leak(self): 534 # test --huntrleaks for file descriptor leak 535 code = textwrap.dedent(""" 536 import os 537 import unittest 538 from test import support 539 540 class FDLeakTest(unittest.TestCase): 541 def test_leak(self): 542 fd = os.open(__file__, os.O_RDONLY) 543 # bug: never close the file descriptor 544 545 def test_main(): 546 support.run_unittest(FDLeakTest) 547 """) 548 self.check_leak(code, 'file descriptors') 549 550 def test_list_tests(self): 551 # test --list-tests 552 tests = [self.create_test() for i in range(5)] 553 output = self.run_tests('--list-tests', *tests) 554 self.assertEqual(output.rstrip().splitlines(), 555 tests) 556 557 def test_list_cases(self): 558 # test --list-cases 559 code = textwrap.dedent(""" 560 import unittest 561 562 class Tests(unittest.TestCase): 563 def test_method1(self): 564 pass 565 def test_method2(self): 566 pass 567 """) 568 testname = self.create_test(code=code) 569 570 # Test --list-cases 571 all_methods = ['%s.Tests.test_method1' % testname, 572 '%s.Tests.test_method2' % testname] 573 output = self.run_tests('--list-cases', testname) 574 self.assertEqual(output.splitlines(), all_methods) 575 576 # Test --list-cases with --match 577 all_methods = ['%s.Tests.test_method1' % testname] 578 output = self.run_tests('--list-cases', 579 '-m', 'test_method1', 580 testname) 581 self.assertEqual(output.splitlines(), all_methods) 582 583 @unittest.skipIf(sys.platform.startswith('aix'), 584 "support._crash_python() doesn't work on AIX") 585 def test_crashed(self): 586 # Any code which causes a crash 587 code = 'import test.support; test.support._crash_python()' 588 crash_test = self.create_test(name="crash", code=code) 589 ok_test = self.create_test(name="ok") 590 591 tests = [crash_test, ok_test] 592 output = self.run_tests("-j2", *tests, exitcode=2) 593 self.check_executed_tests(output, tests, failed=crash_test, 594 randomize=True) 595 596 def parse_methods(self, output): 597 regex = re.compile("^(test[^ ]+).*ok$", flags=re.MULTILINE) 598 return [match.group(1) for match in regex.finditer(output)] 599 600 def test_matchfile(self): 601 # Any code which causes a crash 602 code = textwrap.dedent(""" 603 import unittest 604 from test import support 605 606 class Tests(unittest.TestCase): 607 def test_method1(self): 608 pass 609 def test_method2(self): 610 pass 611 def test_method3(self): 612 pass 613 def test_method4(self): 614 pass 615 616 def test_main(): 617 support.run_unittest(Tests) 618 """) 619 all_methods = ['test_method1', 'test_method2', 620 'test_method3', 'test_method4'] 621 testname = self.create_test(code=code) 622 623 # by default, all methods should be run 624 output = self.run_tests("-v", testname) 625 methods = self.parse_methods(output) 626 self.assertEqual(methods, all_methods) 627 628 # only run a subset 629 filename = support.TESTFN 630 self.addCleanup(support.unlink, filename) 631 632 subset = [ 633 # only match the method name 634 'test_method1', 635 # match the full identifier 636 '%s.Tests.test_method3' % testname] 637 with open(filename, "w") as fp: 638 for name in subset: 639 print(name, file=fp) 640 641 output = self.run_tests("-v", "--matchfile", filename, testname) 642 methods = self.parse_methods(output) 643 subset = ['test_method1', 'test_method3'] 644 self.assertEqual(methods, subset) 645 646 def test_env_changed(self): 647 code = textwrap.dedent(""" 648 import unittest 649 from test import support 650 651 class Tests(unittest.TestCase): 652 def test_env_changed(self): 653 open("env_changed", "w").close() 654 655 def test_main(): 656 support.run_unittest(Tests) 657 """) 658 testname = self.create_test(code=code) 659 660 # don't fail by default 661 output = self.run_tests(testname) 662 self.check_executed_tests(output, [testname], env_changed=testname) 663 664 # fail with --fail-env-changed 665 output = self.run_tests("--fail-env-changed", testname, exitcode=3) 666 self.check_executed_tests(output, [testname], env_changed=testname, 667 fail_env_changed=True) 668 669 def test_rerun_fail(self): 670 code = textwrap.dedent(""" 671 import unittest 672 673 class Tests(unittest.TestCase): 674 def test_bug(self): 675 # test always fail 676 self.fail("bug") 677 678 def test_main(): 679 support.run_unittest(Tests) 680 """) 681 testname = self.create_test(code=code) 682 683 output = self.run_tests("-w", testname, exitcode=2) 684 self.check_executed_tests(output, [testname], 685 failed=testname, rerun=testname) 686 687 688 class TestUtils(unittest.TestCase): 689 def test_format_duration(self): 690 self.assertEqual(utils.format_duration(0), 691 '0 ms') 692 self.assertEqual(utils.format_duration(1e-9), 693 '1 ms') 694 self.assertEqual(utils.format_duration(10e-3), 695 '10 ms') 696 self.assertEqual(utils.format_duration(1.5), 697 '1 sec 500 ms') 698 self.assertEqual(utils.format_duration(1), 699 '1 sec') 700 self.assertEqual(utils.format_duration(2 * 60), 701 '2 min') 702 self.assertEqual(utils.format_duration(2 * 60 + 1), 703 '2 min 1 sec') 704 self.assertEqual(utils.format_duration(3 * 3600), 705 '3 hour') 706 self.assertEqual(utils.format_duration(3 * 3600 + 2 * 60 + 1), 707 '3 hour 2 min') 708 self.assertEqual(utils.format_duration(3 * 3600 + 1), 709 '3 hour 1 sec') 710 711 712 def test_main(): 713 support.run_unittest(ProgramsTestCase, ArgsTestCase, TestUtils) 714 715 716 if __name__ == "__main__": 717 test_main() 718