Home | History | Annotate | Download | only in libregrtest
      1 import faulthandler
      2 import json
      3 import os
      4 import queue
      5 import sys
      6 import threading
      7 import time
      8 import traceback
      9 import types
     10 from test import support
     11 
     12 from test.libregrtest.runtest import (
     13     runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
     14     format_test_result)
     15 from test.libregrtest.setup import setup_tests
     16 from test.libregrtest.utils import format_duration
     17 
     18 
     19 # Display the running tests if nothing happened last N seconds
     20 PROGRESS_UPDATE = 30.0   # seconds
     21 
     22 # If interrupted, display the wait progress every N seconds
     23 WAIT_PROGRESS = 2.0   # seconds
     24 
     25 
     26 def run_test_in_subprocess(testname, ns):
     27     """Run the given test in a subprocess with --worker-args.
     28 
     29     ns is the option Namespace parsed from command-line arguments. regrtest
     30     is invoked in a subprocess with the --worker-args argument; when the
     31     subprocess exits, its return code, stdout and stderr are returned as a
     32     3-tuple.
     33     """
     34     from subprocess import Popen, PIPE
     35 
     36     ns_dict = vars(ns)
     37     worker_args = (ns_dict, testname)
     38     worker_args = json.dumps(worker_args)
     39 
     40     cmd = [sys.executable, *support.args_from_interpreter_flags(),
     41            '-u',    # Unbuffered stdout and stderr
     42            '-m', 'test.regrtest',
     43            '--worker-args', worker_args]
     44     if ns.pgo:
     45         cmd += ['--pgo']
     46 
     47     # Running the child from the same working directory as regrtest's original
     48     # invocation ensures that TEMPDIR for the child is the same when
     49     # sysconfig.is_python_build() is true. See issue 15300.
     50     popen = Popen(cmd,
     51                   stdout=PIPE, stderr=PIPE,
     52                   universal_newlines=True,
     53                   close_fds=(os.name != 'nt'),
     54                   cwd=support.SAVEDCWD)
     55     with popen:
     56         stdout, stderr = popen.communicate()
     57         retcode = popen.wait()
     58     return retcode, stdout, stderr
     59 
     60 
     61 def run_tests_worker(worker_args):
     62     ns_dict, testname = json.loads(worker_args)
     63     ns = types.SimpleNamespace(**ns_dict)
     64 
     65     setup_tests(ns)
     66 
     67     try:
     68         result = runtest(ns, testname)
     69     except KeyboardInterrupt:
     70         result = INTERRUPTED, '', None
     71     except BaseException as e:
     72         traceback.print_exc()
     73         result = CHILD_ERROR, str(e)
     74 
     75     print()   # Force a newline (just in case)
     76     print(json.dumps(result), flush=True)
     77     sys.exit(0)
     78 
     79 
     80 # We do not use a generator so multiple threads can call next().
     81 class MultiprocessIterator:
     82 
     83     """A thread-safe iterator over tests for multiprocess mode."""
     84 
     85     def __init__(self, tests):
     86         self.interrupted = False
     87         self.lock = threading.Lock()
     88         self.tests = tests
     89 
     90     def __iter__(self):
     91         return self
     92 
     93     def __next__(self):
     94         with self.lock:
     95             if self.interrupted:
     96                 raise StopIteration('tests interrupted')
     97             return next(self.tests)
     98 
     99 
    100 class MultiprocessThread(threading.Thread):
    101     def __init__(self, pending, output, ns):
    102         super().__init__()
    103         self.pending = pending
    104         self.output = output
    105         self.ns = ns
    106         self.current_test = None
    107         self.start_time = None
    108 
    109     def _runtest(self):
    110         try:
    111             test = next(self.pending)
    112         except StopIteration:
    113             self.output.put((None, None, None, None))
    114             return True
    115 
    116         try:
    117             self.start_time = time.monotonic()
    118             self.current_test = test
    119 
    120             retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
    121         finally:
    122             self.current_test = None
    123 
    124         if retcode != 0:
    125             result = (CHILD_ERROR, "Exit code %s" % retcode, None)
    126             self.output.put((test, stdout.rstrip(), stderr.rstrip(),
    127                              result))
    128             return False
    129 
    130         stdout, _, result = stdout.strip().rpartition("\n")
    131         if not result:
    132             self.output.put((None, None, None, None))
    133             return True
    134 
    135         result = json.loads(result)
    136         assert len(result) == 3, f"Invalid result tuple: {result!r}"
    137         self.output.put((test, stdout.rstrip(), stderr.rstrip(),
    138                          result))
    139         return False
    140 
    141     def run(self):
    142         try:
    143             stop = False
    144             while not stop:
    145                 stop = self._runtest()
    146         except BaseException:
    147             self.output.put((None, None, None, None))
    148             raise
    149 
    150 
    151 def run_tests_multiprocess(regrtest):
    152     output = queue.Queue()
    153     pending = MultiprocessIterator(regrtest.tests)
    154     test_timeout = regrtest.ns.timeout
    155     use_timeout = (test_timeout is not None)
    156 
    157     workers = [MultiprocessThread(pending, output, regrtest.ns)
    158                for i in range(regrtest.ns.use_mp)]
    159     print("Run tests in parallel using %s child processes"
    160           % len(workers))
    161     for worker in workers:
    162         worker.start()
    163 
    164     def get_running(workers):
    165         running = []
    166         for worker in workers:
    167             current_test = worker.current_test
    168             if not current_test:
    169                 continue
    170             dt = time.monotonic() - worker.start_time
    171             if dt >= PROGRESS_MIN_TIME:
    172                 text = '%s (%s)' % (current_test, format_duration(dt))
    173                 running.append(text)
    174         return running
    175 
    176     finished = 0
    177     test_index = 1
    178     get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
    179     try:
    180         while finished < regrtest.ns.use_mp:
    181             if use_timeout:
    182                 faulthandler.dump_traceback_later(test_timeout, exit=True)
    183 
    184             try:
    185                 item = output.get(timeout=get_timeout)
    186             except queue.Empty:
    187                 running = get_running(workers)
    188                 if running and not regrtest.ns.pgo:
    189                     print('running: %s' % ', '.join(running), flush=True)
    190                 continue
    191 
    192             test, stdout, stderr, result = item
    193             if test is None:
    194                 finished += 1
    195                 continue
    196             regrtest.accumulate_result(test, result)
    197 
    198             # Display progress
    199             ok, test_time, xml_data = result
    200             text = format_test_result(test, ok)
    201             if (ok not in (CHILD_ERROR, INTERRUPTED)
    202                 and test_time >= PROGRESS_MIN_TIME
    203                 and not regrtest.ns.pgo):
    204                 text += ' (%s)' % format_duration(test_time)
    205             elif ok == CHILD_ERROR:
    206                 text = '%s (%s)' % (text, test_time)
    207             running = get_running(workers)
    208             if running and not regrtest.ns.pgo:
    209                 text += ' -- running: %s' % ', '.join(running)
    210             regrtest.display_progress(test_index, text)
    211 
    212             # Copy stdout and stderr from the child process
    213             if stdout:
    214                 print(stdout, flush=True)
    215             if stderr and not regrtest.ns.pgo:
    216                 print(stderr, file=sys.stderr, flush=True)
    217 
    218             if result[0] == INTERRUPTED:
    219                 raise KeyboardInterrupt
    220             test_index += 1
    221     except KeyboardInterrupt:
    222         regrtest.interrupted = True
    223         pending.interrupted = True
    224         print()
    225     finally:
    226         if use_timeout:
    227             faulthandler.cancel_dump_traceback_later()
    228 
    229     # If tests are interrupted, wait until tests complete
    230     wait_start = time.monotonic()
    231     while True:
    232         running = [worker.current_test for worker in workers]
    233         running = list(filter(bool, running))
    234         if not running:
    235             break
    236 
    237         dt = time.monotonic() - wait_start
    238         line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
    239         if dt >= WAIT_PROGRESS:
    240             line = "%s since %.0f sec" % (line, dt)
    241         print(line, flush=True)
    242         for worker in workers:
    243             worker.join(WAIT_PROGRESS)
    244