1 import os 2 import threading 3 import time 4 import traceback 5 try: 6 import Queue as queue 7 except ImportError: 8 import queue 9 10 try: 11 import win32api 12 except ImportError: 13 win32api = None 14 15 try: 16 import multiprocessing 17 except ImportError: 18 multiprocessing = None 19 20 import lit.Test 21 22 ### 23 # Test Execution Implementation 24 25 class LockedValue(object): 26 def __init__(self, value): 27 self.lock = threading.Lock() 28 self._value = value 29 30 def _get_value(self): 31 self.lock.acquire() 32 try: 33 return self._value 34 finally: 35 self.lock.release() 36 37 def _set_value(self, value): 38 self.lock.acquire() 39 try: 40 self._value = value 41 finally: 42 self.lock.release() 43 44 value = property(_get_value, _set_value) 45 46 class TestProvider(object): 47 def __init__(self, queue_impl, canceled_flag): 48 self.canceled_flag = canceled_flag 49 50 # Create a shared queue to provide the test indices. 51 self.queue = queue_impl() 52 53 def queue_tests(self, tests, num_jobs): 54 for i in range(len(tests)): 55 self.queue.put(i) 56 for i in range(num_jobs): 57 self.queue.put(None) 58 59 def cancel(self): 60 self.canceled_flag.value = 1 61 62 def get(self): 63 # Check if we are canceled. 64 if self.canceled_flag.value: 65 return None 66 67 # Otherwise take the next test. 68 return self.queue.get() 69 70 class Tester(object): 71 def __init__(self, run_instance, provider, consumer): 72 self.run_instance = run_instance 73 self.provider = provider 74 self.consumer = consumer 75 76 def run(self): 77 while True: 78 item = self.provider.get() 79 if item is None: 80 break 81 self.run_test(item) 82 self.consumer.task_finished() 83 84 def run_test(self, test_index): 85 test = self.run_instance.tests[test_index] 86 try: 87 self.run_instance.execute_test(test) 88 except KeyboardInterrupt: 89 # This is a sad hack. Unfortunately subprocess goes 90 # bonkers with ctrl-c and we start forking merrily. 91 print('\nCtrl-C detected, goodbye.') 92 os.kill(0,9) 93 self.consumer.update(test_index, test) 94 95 class ThreadResultsConsumer(object): 96 def __init__(self, display): 97 self.display = display 98 self.lock = threading.Lock() 99 100 def update(self, test_index, test): 101 self.lock.acquire() 102 try: 103 self.display.update(test) 104 finally: 105 self.lock.release() 106 107 def task_finished(self): 108 pass 109 110 def handle_results(self): 111 pass 112 113 class MultiprocessResultsConsumer(object): 114 def __init__(self, run, display, num_jobs): 115 self.run = run 116 self.display = display 117 self.num_jobs = num_jobs 118 self.queue = multiprocessing.Queue() 119 120 def update(self, test_index, test): 121 # This method is called in the child processes, and communicates the 122 # results to the actual display implementation via an output queue. 123 self.queue.put((test_index, test.result)) 124 125 def task_finished(self): 126 # This method is called in the child processes, and communicates that 127 # individual tasks are complete. 128 self.queue.put(None) 129 130 def handle_results(self): 131 # This method is called in the parent, and consumes the results from the 132 # output queue and dispatches to the actual display. The method will 133 # complete after each of num_jobs tasks has signalled completion. 134 completed = 0 135 while completed != self.num_jobs: 136 # Wait for a result item. 137 item = self.queue.get() 138 if item is None: 139 completed += 1 140 continue 141 142 # Update the test result in the parent process. 143 index,result = item 144 test = self.run.tests[index] 145 test.result = result 146 147 self.display.update(test) 148 149 def run_one_tester(run, provider, display): 150 tester = Tester(run, provider, display) 151 tester.run() 152 153 ### 154 155 class Run(object): 156 """ 157 This class represents a concrete, configured testing run. 158 """ 159 160 def __init__(self, lit_config, tests): 161 self.lit_config = lit_config 162 self.tests = tests 163 164 def execute_test(self, test): 165 result = None 166 start_time = time.time() 167 try: 168 result = test.config.test_format.execute(test, self.lit_config) 169 170 # Support deprecated result from execute() which returned the result 171 # code and additional output as a tuple. 172 if isinstance(result, tuple): 173 code, output = result 174 result = lit.Test.Result(code, output) 175 elif not isinstance(result, lit.Test.Result): 176 raise ValueError("unexpected result from test execution") 177 except KeyboardInterrupt: 178 raise 179 except: 180 if self.lit_config.debug: 181 raise 182 output = 'Exception during script execution:\n' 183 output += traceback.format_exc() 184 output += '\n' 185 result = lit.Test.Result(lit.Test.UNRESOLVED, output) 186 result.elapsed = time.time() - start_time 187 188 test.setResult(result) 189 190 def execute_tests(self, display, jobs, max_time=None, 191 use_processes=False): 192 """ 193 execute_tests(display, jobs, [max_time]) 194 195 Execute each of the tests in the run, using up to jobs number of 196 parallel tasks, and inform the display of each individual result. The 197 provided tests should be a subset of the tests available in this run 198 object. 199 200 If max_time is non-None, it should be a time in seconds after which to 201 stop executing tests. 202 203 The display object will have its update method called with each test as 204 it is completed. The calls are guaranteed to be locked with respect to 205 one another, but are *not* guaranteed to be called on the same thread as 206 this method was invoked on. 207 208 Upon completion, each test in the run will have its result 209 computed. Tests which were not actually executed (for any reason) will 210 be given an UNRESOLVED result. 211 """ 212 213 # Choose the appropriate parallel execution implementation. 214 consumer = None 215 if jobs != 1 and use_processes and multiprocessing: 216 try: 217 task_impl = multiprocessing.Process 218 queue_impl = multiprocessing.Queue 219 canceled_flag = multiprocessing.Value('i', 0) 220 consumer = MultiprocessResultsConsumer(self, display, jobs) 221 except: 222 # multiprocessing fails to initialize with certain OpenBSD and 223 # FreeBSD Python versions: http://bugs.python.org/issue3770 224 # Unfortunately the error raised also varies by platform. 225 self.lit_config.note('failed to initialize multiprocessing') 226 consumer = None 227 if not consumer: 228 task_impl = threading.Thread 229 queue_impl = queue.Queue 230 canceled_flag = LockedValue(0) 231 consumer = ThreadResultsConsumer(display) 232 233 # Create the test provider. 234 provider = TestProvider(queue_impl, canceled_flag) 235 236 # Queue the tests outside the main thread because we can't guarantee 237 # that we can put() all the tests without blocking: 238 # https://docs.python.org/2/library/multiprocessing.html 239 # e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue 240 # without taking any out. 241 queuer = task_impl(target=provider.queue_tests, args=(self.tests, jobs)) 242 queuer.start() 243 244 # Install a console-control signal handler on Windows. 245 if win32api is not None: 246 def console_ctrl_handler(type): 247 provider.cancel() 248 return True 249 win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) 250 251 # Install a timeout handler, if requested. 252 if max_time is not None: 253 def timeout_handler(): 254 provider.cancel() 255 timeout_timer = threading.Timer(max_time, timeout_handler) 256 timeout_timer.start() 257 258 # If not using multiple tasks, just run the tests directly. 259 if jobs == 1: 260 run_one_tester(self, provider, consumer) 261 else: 262 # Otherwise, execute the tests in parallel 263 self._execute_tests_in_parallel(task_impl, provider, consumer, jobs) 264 265 queuer.join() 266 267 # Cancel the timeout handler. 268 if max_time is not None: 269 timeout_timer.cancel() 270 271 # Update results for any tests which weren't run. 272 for test in self.tests: 273 if test.result is None: 274 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) 275 276 def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs): 277 # Start all of the tasks. 278 tasks = [task_impl(target=run_one_tester, 279 args=(self, provider, consumer)) 280 for i in range(jobs)] 281 for t in tasks: 282 t.start() 283 284 # Allow the consumer to handle results, if necessary. 285 consumer.handle_results() 286 287 # Wait for all the tasks to complete. 288 for t in tasks: 289 t.join() 290