Home | History | Annotate | Download | only in libregrtest
      1 import faulthandler
      2 import json
      3 import os
      4 import queue
      5 import sys
      6 import time
      7 import traceback
      8 import types
      9 from test import support
     10 try:
     11     import threading
     12 except ImportError:
     13     print("Multiprocess option requires thread support")
     14     sys.exit(2)
     15 
     16 from test.libregrtest.runtest import (
     17     runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
     18     format_test_result)
     19 from test.libregrtest.setup import setup_tests
     20 
     21 
     22 # Display the running tests if nothing happened last N seconds
     23 PROGRESS_UPDATE = 30.0   # seconds
     24 
     25 # If interrupted, display the wait progress every N seconds
     26 WAIT_PROGRESS = 2.0   # seconds
     27 
     28 
     29 def run_test_in_subprocess(testname, ns):
     30     """Run the given test in a subprocess with --slaveargs.
     31 
     32     ns is the option Namespace parsed from command-line arguments. regrtest
     33     is invoked in a subprocess with the --slaveargs argument; when the
     34     subprocess exits, its return code, stdout and stderr are returned as a
     35     3-tuple.
     36     """
     37     from subprocess import Popen, PIPE
     38 
     39     ns_dict = vars(ns)
     40     slaveargs = (ns_dict, testname)
     41     slaveargs = json.dumps(slaveargs)
     42 
     43     cmd = [sys.executable, *support.args_from_interpreter_flags(),
     44            '-X', 'faulthandler',
     45            '-m', 'test.regrtest',
     46            '--slaveargs', slaveargs]
     47     if ns.pgo:
     48         cmd += ['--pgo']
     49 
     50     # Running the child from the same working directory as regrtest's original
     51     # invocation ensures that TEMPDIR for the child is the same when
     52     # sysconfig.is_python_build() is true. See issue 15300.
     53     popen = Popen(cmd,
     54                   stdout=PIPE, stderr=PIPE,
     55                   universal_newlines=True,
     56                   close_fds=(os.name != 'nt'),
     57                   cwd=support.SAVEDCWD)
     58     with popen:
     59         stdout, stderr = popen.communicate()
     60         retcode = popen.wait()
     61     return retcode, stdout, stderr
     62 
     63 
     64 def run_tests_slave(slaveargs):
     65     ns_dict, testname = json.loads(slaveargs)
     66     ns = types.SimpleNamespace(**ns_dict)
     67 
     68     setup_tests(ns)
     69 
     70     try:
     71         result = runtest(ns, testname)
     72     except KeyboardInterrupt:
     73         result = INTERRUPTED, ''
     74     except BaseException as e:
     75         traceback.print_exc()
     76         result = CHILD_ERROR, str(e)
     77 
     78     print()   # Force a newline (just in case)
     79     print(json.dumps(result), flush=True)
     80     sys.exit(0)
     81 
     82 
     83 # We do not use a generator so multiple threads can call next().
     84 class MultiprocessIterator:
     85 
     86     """A thread-safe iterator over tests for multiprocess mode."""
     87 
     88     def __init__(self, tests):
     89         self.interrupted = False
     90         self.lock = threading.Lock()
     91         self.tests = tests
     92 
     93     def __iter__(self):
     94         return self
     95 
     96     def __next__(self):
     97         with self.lock:
     98             if self.interrupted:
     99                 raise StopIteration('tests interrupted')
    100             return next(self.tests)
    101 
    102 
    103 class MultiprocessThread(threading.Thread):
    104     def __init__(self, pending, output, ns):
    105         super().__init__()
    106         self.pending = pending
    107         self.output = output
    108         self.ns = ns
    109         self.current_test = None
    110         self.start_time = None
    111 
    112     def _runtest(self):
    113         try:
    114             test = next(self.pending)
    115         except StopIteration:
    116             self.output.put((None, None, None, None))
    117             return True
    118 
    119         try:
    120             self.start_time = time.monotonic()
    121             self.current_test = test
    122 
    123             retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
    124         finally:
    125             self.current_test = None
    126 
    127         stdout, _, result = stdout.strip().rpartition("\n")
    128         if retcode != 0:
    129             result = (CHILD_ERROR, "Exit code %s" % retcode)
    130             self.output.put((test, stdout.rstrip(), stderr.rstrip(),
    131                              result))
    132             return True
    133 
    134         if not result:
    135             self.output.put((None, None, None, None))
    136             return True
    137 
    138         result = json.loads(result)
    139         self.output.put((test, stdout.rstrip(), stderr.rstrip(),
    140                          result))
    141         return False
    142 
    143     def run(self):
    144         try:
    145             stop = False
    146             while not stop:
    147                 stop = self._runtest()
    148         except BaseException:
    149             self.output.put((None, None, None, None))
    150             raise
    151 
    152 
    153 def run_tests_multiprocess(regrtest):
    154     output = queue.Queue()
    155     pending = MultiprocessIterator(regrtest.tests)
    156     test_timeout = regrtest.ns.timeout
    157     use_timeout = (test_timeout is not None)
    158 
    159     workers = [MultiprocessThread(pending, output, regrtest.ns)
    160                for i in range(regrtest.ns.use_mp)]
    161     print("Run tests in parallel using %s child processes"
    162           % len(workers))
    163     for worker in workers:
    164         worker.start()
    165 
    166     def get_running(workers):
    167         running = []
    168         for worker in workers:
    169             current_test = worker.current_test
    170             if not current_test:
    171                 continue
    172             dt = time.monotonic() - worker.start_time
    173             if dt >= PROGRESS_MIN_TIME:
    174                 running.append('%s (%.0f sec)' % (current_test, dt))
    175         return running
    176 
    177     finished = 0
    178     test_index = 1
    179     get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
    180     try:
    181         while finished < regrtest.ns.use_mp:
    182             if use_timeout:
    183                 faulthandler.dump_traceback_later(test_timeout, exit=True)
    184 
    185             try:
    186                 item = output.get(timeout=get_timeout)
    187             except queue.Empty:
    188                 running = get_running(workers)
    189                 if running and not regrtest.ns.pgo:
    190                     print('running: %s' % ', '.join(running))
    191                 continue
    192 
    193             test, stdout, stderr, result = item
    194             if test is None:
    195                 finished += 1
    196                 continue
    197             regrtest.accumulate_result(test, result)
    198 
    199             # Display progress
    200             ok, test_time = result
    201             text = format_test_result(test, ok)
    202             if (ok not in (CHILD_ERROR, INTERRUPTED)
    203                 and test_time >= PROGRESS_MIN_TIME
    204                 and not regrtest.ns.pgo):
    205                 text += ' (%.0f sec)' % test_time
    206             running = get_running(workers)
    207             if running and not regrtest.ns.pgo:
    208                 text += ' -- running: %s' % ', '.join(running)
    209             regrtest.display_progress(test_index, text)
    210 
    211             # Copy stdout and stderr from the child process
    212             if stdout:
    213                 print(stdout, flush=True)
    214             if stderr and not regrtest.ns.pgo:
    215                 print(stderr, file=sys.stderr, flush=True)
    216 
    217             if result[0] == INTERRUPTED:
    218                 raise KeyboardInterrupt
    219             if result[0] == CHILD_ERROR:
    220                 msg = "Child error on {}: {}".format(test, result[1])
    221                 raise Exception(msg)
    222             test_index += 1
    223     except KeyboardInterrupt:
    224         regrtest.interrupted = True
    225         pending.interrupted = True
    226         print()
    227     finally:
    228         if use_timeout:
    229             faulthandler.cancel_dump_traceback_later()
    230 
    231     # If tests are interrupted, wait until tests complete
    232     wait_start = time.monotonic()
    233     while True:
    234         running = [worker.current_test for worker in workers]
    235         running = list(filter(bool, running))
    236         if not running:
    237             break
    238 
    239         dt = time.monotonic() - wait_start
    240         line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
    241         if dt >= WAIT_PROGRESS:
    242             line = "%s since %.0f sec" % (line, dt)
    243         print(line)
    244         for worker in workers:
    245             worker.join(WAIT_PROGRESS)
    246