1 import faulthandler 2 import json 3 import os 4 import queue 5 import sys 6 import time 7 import traceback 8 import types 9 from test import support 10 try: 11 import threading 12 except ImportError: 13 print("Multiprocess option requires thread support") 14 sys.exit(2) 15 16 from test.libregrtest.runtest import ( 17 runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, 18 format_test_result) 19 from test.libregrtest.setup import setup_tests 20 21 22 # Display the running tests if nothing happened last N seconds 23 PROGRESS_UPDATE = 30.0 # seconds 24 25 # If interrupted, display the wait progress every N seconds 26 WAIT_PROGRESS = 2.0 # seconds 27 28 29 def run_test_in_subprocess(testname, ns): 30 """Run the given test in a subprocess with --slaveargs. 31 32 ns is the option Namespace parsed from command-line arguments. regrtest 33 is invoked in a subprocess with the --slaveargs argument; when the 34 subprocess exits, its return code, stdout and stderr are returned as a 35 3-tuple. 36 """ 37 from subprocess import Popen, PIPE 38 39 ns_dict = vars(ns) 40 slaveargs = (ns_dict, testname) 41 slaveargs = json.dumps(slaveargs) 42 43 cmd = [sys.executable, *support.args_from_interpreter_flags(), 44 '-X', 'faulthandler', 45 '-m', 'test.regrtest', 46 '--slaveargs', slaveargs] 47 if ns.pgo: 48 cmd += ['--pgo'] 49 50 # Running the child from the same working directory as regrtest's original 51 # invocation ensures that TEMPDIR for the child is the same when 52 # sysconfig.is_python_build() is true. See issue 15300. 53 popen = Popen(cmd, 54 stdout=PIPE, stderr=PIPE, 55 universal_newlines=True, 56 close_fds=(os.name != 'nt'), 57 cwd=support.SAVEDCWD) 58 with popen: 59 stdout, stderr = popen.communicate() 60 retcode = popen.wait() 61 return retcode, stdout, stderr 62 63 64 def run_tests_slave(slaveargs): 65 ns_dict, testname = json.loads(slaveargs) 66 ns = types.SimpleNamespace(**ns_dict) 67 68 setup_tests(ns) 69 70 try: 71 result = runtest(ns, testname) 72 except KeyboardInterrupt: 73 result = INTERRUPTED, '' 74 except BaseException as e: 75 traceback.print_exc() 76 result = CHILD_ERROR, str(e) 77 78 print() # Force a newline (just in case) 79 print(json.dumps(result), flush=True) 80 sys.exit(0) 81 82 83 # We do not use a generator so multiple threads can call next(). 84 class MultiprocessIterator: 85 86 """A thread-safe iterator over tests for multiprocess mode.""" 87 88 def __init__(self, tests): 89 self.interrupted = False 90 self.lock = threading.Lock() 91 self.tests = tests 92 93 def __iter__(self): 94 return self 95 96 def __next__(self): 97 with self.lock: 98 if self.interrupted: 99 raise StopIteration('tests interrupted') 100 return next(self.tests) 101 102 103 class MultiprocessThread(threading.Thread): 104 def __init__(self, pending, output, ns): 105 super().__init__() 106 self.pending = pending 107 self.output = output 108 self.ns = ns 109 self.current_test = None 110 self.start_time = None 111 112 def _runtest(self): 113 try: 114 test = next(self.pending) 115 except StopIteration: 116 self.output.put((None, None, None, None)) 117 return True 118 119 try: 120 self.start_time = time.monotonic() 121 self.current_test = test 122 123 retcode, stdout, stderr = run_test_in_subprocess(test, self.ns) 124 finally: 125 self.current_test = None 126 127 stdout, _, result = stdout.strip().rpartition("\n") 128 if retcode != 0: 129 result = (CHILD_ERROR, "Exit code %s" % retcode) 130 self.output.put((test, stdout.rstrip(), stderr.rstrip(), 131 result)) 132 return True 133 134 if not result: 135 self.output.put((None, None, None, None)) 136 return True 137 138 result = json.loads(result) 139 self.output.put((test, stdout.rstrip(), stderr.rstrip(), 140 result)) 141 return False 142 143 def run(self): 144 try: 145 stop = False 146 while not stop: 147 stop = self._runtest() 148 except BaseException: 149 self.output.put((None, None, None, None)) 150 raise 151 152 153 def run_tests_multiprocess(regrtest): 154 output = queue.Queue() 155 pending = MultiprocessIterator(regrtest.tests) 156 test_timeout = regrtest.ns.timeout 157 use_timeout = (test_timeout is not None) 158 159 workers = [MultiprocessThread(pending, output, regrtest.ns) 160 for i in range(regrtest.ns.use_mp)] 161 print("Run tests in parallel using %s child processes" 162 % len(workers)) 163 for worker in workers: 164 worker.start() 165 166 def get_running(workers): 167 running = [] 168 for worker in workers: 169 current_test = worker.current_test 170 if not current_test: 171 continue 172 dt = time.monotonic() - worker.start_time 173 if dt >= PROGRESS_MIN_TIME: 174 running.append('%s (%.0f sec)' % (current_test, dt)) 175 return running 176 177 finished = 0 178 test_index = 1 179 get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME) 180 try: 181 while finished < regrtest.ns.use_mp: 182 if use_timeout: 183 faulthandler.dump_traceback_later(test_timeout, exit=True) 184 185 try: 186 item = output.get(timeout=get_timeout) 187 except queue.Empty: 188 running = get_running(workers) 189 if running and not regrtest.ns.pgo: 190 print('running: %s' % ', '.join(running)) 191 continue 192 193 test, stdout, stderr, result = item 194 if test is None: 195 finished += 1 196 continue 197 regrtest.accumulate_result(test, result) 198 199 # Display progress 200 ok, test_time = result 201 text = format_test_result(test, ok) 202 if (ok not in (CHILD_ERROR, INTERRUPTED) 203 and test_time >= PROGRESS_MIN_TIME 204 and not regrtest.ns.pgo): 205 text += ' (%.0f sec)' % test_time 206 running = get_running(workers) 207 if running and not regrtest.ns.pgo: 208 text += ' -- running: %s' % ', '.join(running) 209 regrtest.display_progress(test_index, text) 210 211 # Copy stdout and stderr from the child process 212 if stdout: 213 print(stdout, flush=True) 214 if stderr and not regrtest.ns.pgo: 215 print(stderr, file=sys.stderr, flush=True) 216 217 if result[0] == INTERRUPTED: 218 raise KeyboardInterrupt 219 if result[0] == CHILD_ERROR: 220 msg = "Child error on {}: {}".format(test, result[1]) 221 raise Exception(msg) 222 test_index += 1 223 except KeyboardInterrupt: 224 regrtest.interrupted = True 225 pending.interrupted = True 226 print() 227 finally: 228 if use_timeout: 229 faulthandler.cancel_dump_traceback_later() 230 231 # If tests are interrupted, wait until tests complete 232 wait_start = time.monotonic() 233 while True: 234 running = [worker.current_test for worker in workers] 235 running = list(filter(bool, running)) 236 if not running: 237 break 238 239 dt = time.monotonic() - wait_start 240 line = "Waiting for %s (%s tests)" % (', '.join(running), len(running)) 241 if dt >= WAIT_PROGRESS: 242 line = "%s since %.0f sec" % (line, dt) 243 print(line) 244 for worker in workers: 245 worker.join(WAIT_PROGRESS) 246