Home | History | Annotate | Download | only in libregrtest
      1 import datetime
      2 import faulthandler
      3 import locale
      4 import os
      5 import platform
      6 import random
      7 import re
      8 import sys
      9 import sysconfig
     10 import tempfile
     11 import textwrap
     12 import time
     13 from test.libregrtest.cmdline import _parse_args
     14 from test.libregrtest.runtest import (
     15     findtests, runtest,
     16     STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
     17     INTERRUPTED, CHILD_ERROR,
     18     PROGRESS_MIN_TIME, format_test_result)
     19 from test.libregrtest.setup import setup_tests
     20 from test import support
     21 try:
     22     import gc
     23 except ImportError:
     24     gc = None
     25 
     26 
     27 # When tests are run from the Python build directory, it is best practice
     28 # to keep the test files in a subfolder.  This eases the cleanup of leftover
     29 # files using the "make distclean" command.
     30 if sysconfig.is_python_build():
     31     TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
     32 else:
     33     TEMPDIR = tempfile.gettempdir()
     34 TEMPDIR = os.path.abspath(TEMPDIR)
     35 
     36 
     37 def format_duration(seconds):
     38     if seconds < 1.0:
     39         return '%.0f ms' % (seconds * 1e3)
     40     if seconds < 60.0:
     41         return '%.0f sec' % seconds
     42 
     43     minutes, seconds = divmod(seconds, 60.0)
     44     return '%.0f min %.0f sec' % (minutes, seconds)
     45 
     46 
     47 class Regrtest:
     48     """Execute a test suite.
     49 
     50     This also parses command-line options and modifies its behavior
     51     accordingly.
     52 
     53     tests -- a list of strings containing test names (optional)
     54     testdir -- the directory in which to look for tests (optional)
     55 
     56     Users other than the Python test suite will certainly want to
     57     specify testdir; if it's omitted, the directory containing the
     58     Python test suite is searched for.
     59 
     60     If the tests argument is omitted, the tests listed on the
     61     command-line will be used.  If that's empty, too, then all *.py
     62     files beginning with test_ will be used.
     63 
     64     The other default arguments (verbose, quiet, exclude,
     65     single, randomize, findleaks, use_resources, trace, coverdir,
     66     print_slow, and random_seed) allow programmers calling main()
     67     directly to set the values that would normally be set by flags
     68     on the command line.
     69     """
     70     def __init__(self):
     71         # Namespace of command line options
     72         self.ns = None
     73 
     74         # tests
     75         self.tests = []
     76         self.selected = []
     77 
     78         # test results
     79         self.good = []
     80         self.bad = []
     81         self.skipped = []
     82         self.resource_denieds = []
     83         self.environment_changed = []
     84         self.interrupted = False
     85 
     86         # used by --slow
     87         self.test_times = []
     88 
     89         # used by --coverage, trace.Trace instance
     90         self.tracer = None
     91 
     92         # used by --findleaks, store for gc.garbage
     93         self.found_garbage = []
     94 
     95         # used to display the progress bar "[ 3/100]"
     96         self.start_time = time.monotonic()
     97         self.test_count = ''
     98         self.test_count_width = 1
     99 
    100         # used by --single
    101         self.next_single_test = None
    102         self.next_single_filename = None
    103 
    104     def accumulate_result(self, test, result):
    105         ok, test_time = result
    106         if ok not in (CHILD_ERROR, INTERRUPTED):
    107             self.test_times.append((test_time, test))
    108         if ok == PASSED:
    109             self.good.append(test)
    110         elif ok == FAILED:
    111             self.bad.append(test)
    112         elif ok == ENV_CHANGED:
    113             self.environment_changed.append(test)
    114         elif ok == SKIPPED:
    115             self.skipped.append(test)
    116         elif ok == RESOURCE_DENIED:
    117             self.skipped.append(test)
    118             self.resource_denieds.append(test)
    119 
    120     def display_progress(self, test_index, test):
    121         if self.ns.quiet:
    122             return
    123         if self.bad and not self.ns.pgo:
    124             fmt = "{time} [{test_index:{count_width}}{test_count}/{nbad}] {test_name}"
    125         else:
    126             fmt = "{time} [{test_index:{count_width}}{test_count}] {test_name}"
    127         test_time = time.monotonic() - self.start_time
    128         test_time = datetime.timedelta(seconds=int(test_time))
    129         line = fmt.format(count_width=self.test_count_width,
    130                           test_index=test_index,
    131                           test_count=self.test_count,
    132                           nbad=len(self.bad),
    133                           test_name=test,
    134                           time=test_time)
    135         print(line, flush=True)
    136 
    137     def parse_args(self, kwargs):
    138         ns = _parse_args(sys.argv[1:], **kwargs)
    139 
    140         if ns.timeout and not hasattr(faulthandler, 'dump_traceback_later'):
    141             print("Warning: The timeout option requires "
    142                   "faulthandler.dump_traceback_later", file=sys.stderr)
    143             ns.timeout = None
    144 
    145         if ns.threshold is not None and gc is None:
    146             print('No GC available, ignore --threshold.', file=sys.stderr)
    147             ns.threshold = None
    148 
    149         if ns.findleaks:
    150             if gc is not None:
    151                 # Uncomment the line below to report garbage that is not
    152                 # freeable by reference counting alone.  By default only
    153                 # garbage that is not collectable by the GC is reported.
    154                 pass
    155                 #gc.set_debug(gc.DEBUG_SAVEALL)
    156             else:
    157                 print('No GC available, disabling --findleaks',
    158                       file=sys.stderr)
    159                 ns.findleaks = False
    160 
    161         # Strip .py extensions.
    162         removepy(ns.args)
    163 
    164         return ns
    165 
    166     def find_tests(self, tests):
    167         self.tests = tests
    168 
    169         if self.ns.single:
    170             self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest')
    171             try:
    172                 with open(self.next_single_filename, 'r') as fp:
    173                     next_test = fp.read().strip()
    174                     self.tests = [next_test]
    175             except OSError:
    176                 pass
    177 
    178         if self.ns.fromfile:
    179             self.tests = []
    180             # regex to match 'test_builtin' in line:
    181             # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
    182             regex = (r'^(?:[0-9]+:[0-9]+:[0-9]+ *)?'
    183                      r'(?:\[[0-9/ ]+\] *)?'
    184                      r'(test_[a-zA-Z0-9_]+)')
    185             regex = re.compile(regex)
    186             with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
    187                 for line in fp:
    188                     line = line.strip()
    189                     if line.startswith('#'):
    190                         continue
    191                     match = regex.match(line)
    192                     if match is None:
    193                         continue
    194                     self.tests.append(match.group(1))
    195 
    196         removepy(self.tests)
    197 
    198         stdtests = STDTESTS[:]
    199         nottests = NOTTESTS.copy()
    200         if self.ns.exclude:
    201             for arg in self.ns.args:
    202                 if arg in stdtests:
    203                     stdtests.remove(arg)
    204                 nottests.add(arg)
    205             self.ns.args = []
    206 
    207         # if testdir is set, then we are not running the python tests suite, so
    208         # don't add default tests to be executed or skipped (pass empty values)
    209         if self.ns.testdir:
    210             alltests = findtests(self.ns.testdir, list(), set())
    211         else:
    212             alltests = findtests(self.ns.testdir, stdtests, nottests)
    213 
    214         if not self.ns.fromfile:
    215             self.selected = self.tests or self.ns.args or alltests
    216         else:
    217             self.selected = self.tests
    218         if self.ns.single:
    219             self.selected = self.selected[:1]
    220             try:
    221                 pos = alltests.index(self.selected[0])
    222                 self.next_single_test = alltests[pos + 1]
    223             except IndexError:
    224                 pass
    225 
    226         # Remove all the selected tests that precede start if it's set.
    227         if self.ns.start:
    228             try:
    229                 del self.selected[:self.selected.index(self.ns.start)]
    230             except ValueError:
    231                 print("Couldn't find starting test (%s), using all tests"
    232                       % self.ns.start, file=sys.stderr)
    233 
    234         if self.ns.randomize:
    235             if self.ns.random_seed is None:
    236                 self.ns.random_seed = random.randrange(10000000)
    237             random.seed(self.ns.random_seed)
    238             random.shuffle(self.selected)
    239 
    240     def list_tests(self):
    241         for name in self.selected:
    242             print(name)
    243 
    244     def rerun_failed_tests(self):
    245         self.ns.verbose = True
    246         self.ns.failfast = False
    247         self.ns.verbose3 = False
    248         self.ns.match_tests = None
    249 
    250         print("Re-running failed tests in verbose mode")
    251         for test in self.bad[:]:
    252             print("Re-running test %r in verbose mode" % test, flush=True)
    253             try:
    254                 self.ns.verbose = True
    255                 ok = runtest(self.ns, test)
    256             except KeyboardInterrupt:
    257                 self.interrupted = True
    258                 # print a newline separate from the ^C
    259                 print()
    260                 break
    261             else:
    262                 if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
    263                     self.bad.remove(test)
    264         else:
    265             if self.bad:
    266                 print(count(len(self.bad), 'test'), "failed again:")
    267                 printlist(self.bad)
    268 
    269     def display_result(self):
    270         if self.interrupted:
    271             # print a newline after ^C
    272             print()
    273             print("Test suite interrupted by signal SIGINT.")
    274             executed = set(self.good) | set(self.bad) | set(self.skipped)
    275             omitted = set(self.selected) - executed
    276             print(count(len(omitted), "test"), "omitted:")
    277             printlist(omitted)
    278 
    279         # If running the test suite for PGO then no one cares about
    280         # results.
    281         if self.ns.pgo:
    282             return
    283 
    284         if self.good and not self.ns.quiet:
    285             if (not self.bad
    286                 and not self.skipped
    287                 and not self.interrupted
    288                 and len(self.good) > 1):
    289                 print("All", end=' ')
    290             print(count(len(self.good), "test"), "OK.")
    291 
    292         if self.ns.print_slow:
    293             self.test_times.sort(reverse=True)
    294             print()
    295             print("10 slowest tests:")
    296             for time, test in self.test_times[:10]:
    297                 print("- %s: %s" % (test, format_duration(time)))
    298 
    299         if self.bad:
    300             print()
    301             print(count(len(self.bad), "test"), "failed:")
    302             printlist(self.bad)
    303 
    304         if self.environment_changed:
    305             print()
    306             print("{} altered the execution environment:".format(
    307                      count(len(self.environment_changed), "test")))
    308             printlist(self.environment_changed)
    309 
    310         if self.skipped and not self.ns.quiet:
    311             print()
    312             print(count(len(self.skipped), "test"), "skipped:")
    313             printlist(self.skipped)
    314 
    315     def run_tests_sequential(self):
    316         if self.ns.trace:
    317             import trace
    318             self.tracer = trace.Trace(trace=False, count=True)
    319 
    320         save_modules = sys.modules.keys()
    321 
    322         print("Run tests sequentially")
    323 
    324         previous_test = None
    325         for test_index, test in enumerate(self.tests, 1):
    326             start_time = time.monotonic()
    327 
    328             text = test
    329             if previous_test:
    330                 text = '%s -- %s' % (text, previous_test)
    331             self.display_progress(test_index, text)
    332 
    333             if self.tracer:
    334                 # If we're tracing code coverage, then we don't exit with status
    335                 # if on a false return value from main.
    336                 cmd = ('result = runtest(self.ns, test); '
    337                        'self.accumulate_result(test, result)')
    338                 ns = dict(locals())
    339                 self.tracer.runctx(cmd, globals=globals(), locals=ns)
    340                 result = ns['result']
    341             else:
    342                 try:
    343                     result = runtest(self.ns, test)
    344                 except KeyboardInterrupt:
    345                     self.interrupted = True
    346                     self.accumulate_result(test, (INTERRUPTED, None))
    347                     break
    348                 else:
    349                     self.accumulate_result(test, result)
    350 
    351             previous_test = format_test_result(test, result[0])
    352             test_time = time.monotonic() - start_time
    353             if test_time >= PROGRESS_MIN_TIME:
    354                 previous_test = "%s in %s" % (previous_test, format_duration(test_time))
    355             elif result[0] == PASSED:
    356                 # be quiet: say nothing if the test passed shortly
    357                 previous_test = None
    358 
    359             if self.ns.findleaks:
    360                 gc.collect()
    361                 if gc.garbage:
    362                     print("Warning: test created", len(gc.garbage), end=' ')
    363                     print("uncollectable object(s).")
    364                     # move the uncollectable objects somewhere so we don't see
    365                     # them again
    366                     self.found_garbage.extend(gc.garbage)
    367                     del gc.garbage[:]
    368 
    369             # Unload the newly imported modules (best effort finalization)
    370             for module in sys.modules.keys():
    371                 if module not in save_modules and module.startswith("test."):
    372                     support.unload(module)
    373 
    374         if previous_test:
    375             print(previous_test)
    376 
    377     def _test_forever(self, tests):
    378         while True:
    379             for test in tests:
    380                 yield test
    381                 if self.bad:
    382                     return
    383 
    384     def run_tests(self):
    385         # For a partial run, we do not need to clutter the output.
    386         if (self.ns.verbose
    387             or self.ns.header
    388             or not (self.ns.pgo or self.ns.quiet or self.ns.single
    389                     or self.tests or self.ns.args)):
    390             # Print basic platform information
    391             print("==", platform.python_implementation(), *sys.version.split())
    392             print("==  ", platform.platform(aliased=True),
    393                           "%s-endian" % sys.byteorder)
    394             print("==  ", "hash algorithm:", sys.hash_info.algorithm,
    395                   "64bit" if sys.maxsize > 2**32 else "32bit")
    396             print("==  cwd:", os.getcwd())
    397             print("==  encodings: locale=%s, FS=%s"
    398                   % (locale.getpreferredencoding(False),
    399                      sys.getfilesystemencoding()))
    400             print("Testing with flags:", sys.flags)
    401 
    402         if self.ns.randomize:
    403             print("Using random seed", self.ns.random_seed)
    404 
    405         if self.ns.forever:
    406             self.tests = self._test_forever(list(self.selected))
    407             self.test_count = ''
    408             self.test_count_width = 3
    409         else:
    410             self.tests = iter(self.selected)
    411             self.test_count = '/{}'.format(len(self.selected))
    412             self.test_count_width = len(self.test_count) - 1
    413 
    414         if self.ns.use_mp:
    415             from test.libregrtest.runtest_mp import run_tests_multiprocess
    416             run_tests_multiprocess(self)
    417         else:
    418             self.run_tests_sequential()
    419 
    420     def finalize(self):
    421         if self.next_single_filename:
    422             if self.next_single_test:
    423                 with open(self.next_single_filename, 'w') as fp:
    424                     fp.write(self.next_single_test + '\n')
    425             else:
    426                 os.unlink(self.next_single_filename)
    427 
    428         if self.tracer:
    429             r = self.tracer.results()
    430             r.write_results(show_missing=True, summary=True,
    431                             coverdir=self.ns.coverdir)
    432 
    433         print()
    434         duration = time.monotonic() - self.start_time
    435         print("Total duration: %s" % format_duration(duration))
    436 
    437         if self.bad:
    438             result = "FAILURE"
    439         elif self.interrupted:
    440             result = "INTERRUPTED"
    441         else:
    442             result = "SUCCESS"
    443         print("Tests result: %s" % result)
    444 
    445         if self.ns.runleaks:
    446             os.system("leaks %d" % os.getpid())
    447 
    448     def main(self, tests=None, **kwargs):
    449         global TEMPDIR
    450 
    451         if sysconfig.is_python_build():
    452             try:
    453                 os.mkdir(TEMPDIR)
    454             except FileExistsError:
    455                 pass
    456 
    457         # Define a writable temp dir that will be used as cwd while running
    458         # the tests. The name of the dir includes the pid to allow parallel
    459         # testing (see the -j option).
    460         test_cwd = 'test_python_{}'.format(os.getpid())
    461         test_cwd = os.path.join(TEMPDIR, test_cwd)
    462 
    463         # Run the tests in a context manager that temporarily changes the CWD to a
    464         # temporary and writable directory.  If it's not possible to create or
    465         # change the CWD, the original CWD will be used.  The original CWD is
    466         # available from support.SAVEDCWD.
    467         with support.temp_cwd(test_cwd, quiet=True):
    468             self._main(tests, kwargs)
    469 
    470     def _main(self, tests, kwargs):
    471         self.ns = self.parse_args(kwargs)
    472 
    473         if self.ns.slaveargs is not None:
    474             from test.libregrtest.runtest_mp import run_tests_slave
    475             run_tests_slave(self.ns.slaveargs)
    476 
    477         if self.ns.wait:
    478             input("Press any key to continue...")
    479 
    480         support.PGO = self.ns.pgo
    481 
    482         setup_tests(self.ns)
    483 
    484         self.find_tests(tests)
    485 
    486         if self.ns.list_tests:
    487             self.list_tests()
    488             sys.exit(0)
    489 
    490         self.run_tests()
    491         self.display_result()
    492 
    493         if self.ns.verbose2 and self.bad:
    494             self.rerun_failed_tests()
    495 
    496         self.finalize()
    497         sys.exit(len(self.bad) > 0 or self.interrupted)
    498 
    499 
    500 def removepy(names):
    501     if not names:
    502         return
    503     for idx, name in enumerate(names):
    504         basename, ext = os.path.splitext(name)
    505         if ext == '.py':
    506             names[idx] = basename
    507 
    508 
    509 def count(n, word):
    510     if n == 1:
    511         return "%d %s" % (n, word)
    512     else:
    513         return "%d %ss" % (n, word)
    514 
    515 
    516 def printlist(x, width=70, indent=4):
    517     """Print the elements of iterable x to stdout.
    518 
    519     Optional arg width (default 70) is the maximum line length.
    520     Optional arg indent (default 4) is the number of blanks with which to
    521     begin each line.
    522     """
    523 
    524     blanks = ' ' * indent
    525     # Print the sorted list: 'x' may be a '--random' list or a set()
    526     print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
    527                         initial_indent=blanks, subsequent_indent=blanks))
    528 
    529 
    530 def main(tests=None, **kwargs):
    531     """Run the Python suite."""
    532     Regrtest().main(tests=tests, **kwargs)
    533