Home | History | Annotate | Download | only in lit
      1 import os
      2 import sys
      3 import threading
      4 import time
      5 import traceback
      6 try:
      7     import Queue as queue
      8 except ImportError:
      9     import queue
     10 
     11 try:
     12     import win32api
     13 except ImportError:
     14     win32api = None
     15 
     16 import multiprocessing
     17 import lit.Test
     18 
     19 def abort_now():
     20     """Abort the current process without doing any exception teardown"""
     21     sys.stdout.flush()
     22     if win32api:
     23         win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
     24     else:
     25         os.kill(0, 9)
     26 
     27 class _Display(object):
     28     def __init__(self, display, provider, maxFailures):
     29         self.display = display
     30         self.provider = provider
     31         self.maxFailures = maxFailures or object()
     32         self.failedCount = 0
     33     def update(self, test):
     34         self.display.update(test)
     35         self.failedCount += (test.result.code == lit.Test.FAIL)
     36         if self.failedCount == self.maxFailures:
     37             self.provider.cancel()
     38 
     39 class Run(object):
     40     """
     41     This class represents a concrete, configured testing run.
     42     """
     43 
     44     def __init__(self, lit_config, tests):
     45         self.lit_config = lit_config
     46         self.tests = tests
     47         # Set up semaphores to limit parallelism of certain classes of tests.
     48         # For example, some ASan tests require lots of virtual memory and run
     49         # faster with less parallelism on OS X.
     50         self.parallelism_semaphores = \
     51                 {k: multiprocessing.Semaphore(v) for k, v in
     52                  self.lit_config.parallelism_groups.items()}
     53 
     54     def execute_test(self, test):
     55         return _execute_test_impl(test, self.lit_config,
     56                                   self.parallelism_semaphores)
     57 
     58     def execute_tests_in_pool(self, jobs, max_time):
     59         # We need to issue many wait calls, so compute the final deadline and
     60         # subtract time.time() from that as we go along.
     61         deadline = None
     62         if max_time:
     63             deadline = time.time() + max_time
     64 
     65         # Start a process pool. Copy over the data shared between all test runs.
     66         # FIXME: Find a way to capture the worker process stderr. If the user
     67         # interrupts the workers before we make it into our task callback, they
     68         # will each raise a KeyboardInterrupt exception and print to stderr at
     69         # the same time.
     70         pool = multiprocessing.Pool(jobs, worker_initializer,
     71                                     (self.lit_config,
     72                                      self.parallelism_semaphores))
     73 
     74         # Install a console-control signal handler on Windows.
     75         if win32api is not None:
     76             def console_ctrl_handler(type):
     77                 print('\nCtrl-C detected, terminating.')
     78                 pool.terminate()
     79                 pool.join()
     80                 abort_now()
     81                 return True
     82             win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
     83 
     84         try:
     85             async_results = [pool.apply_async(worker_run_one_test,
     86                                               args=(test_index, test),
     87                                               callback=self.consume_test_result)
     88                              for test_index, test in enumerate(self.tests)]
     89             pool.close()
     90 
     91             # Wait for all results to come in. The callback that runs in the
     92             # parent process will update the display.
     93             for a in async_results:
     94                 if deadline:
     95                     a.wait(deadline - time.time())
     96                 else:
     97                     # Python condition variables cannot be interrupted unless
     98                     # they have a timeout. This can make lit unresponsive to
     99                     # KeyboardInterrupt, so do a busy wait with a timeout.
    100                     while not a.ready():
    101                         a.wait(1)
    102                 if not a.successful():
    103                     a.get() # Exceptions raised here come from the worker.
    104                 if self.hit_max_failures:
    105                     break
    106         except:
    107             # Stop the workers and wait for any straggling results to come in
    108             # if we exited without waiting on every async result.
    109             pool.terminate()
    110             raise
    111         finally:
    112             pool.join()
    113 
    114     def execute_tests(self, display, jobs, max_time=None):
    115         """
    116         execute_tests(display, jobs, [max_time])
    117 
    118         Execute each of the tests in the run, using up to jobs number of
    119         parallel tasks, and inform the display of each individual result. The
    120         provided tests should be a subset of the tests available in this run
    121         object.
    122 
    123         If max_time is non-None, it should be a time in seconds after which to
    124         stop executing tests.
    125 
    126         The display object will have its update method called with each test as
    127         it is completed. The calls are guaranteed to be locked with respect to
    128         one another, but are *not* guaranteed to be called on the same thread as
    129         this method was invoked on.
    130 
    131         Upon completion, each test in the run will have its result
    132         computed. Tests which were not actually executed (for any reason) will
    133         be given an UNRESOLVED result.
    134         """
    135         # Don't do anything if we aren't going to run any tests.
    136         if not self.tests or jobs == 0:
    137             return
    138 
    139         # Save the display object on the runner so that we can update it from
    140         # our task completion callback.
    141         self.display = display
    142 
    143         self.failure_count = 0
    144         self.hit_max_failures = False
    145         if self.lit_config.singleProcess:
    146             global child_lit_config
    147             child_lit_config = self.lit_config
    148             for test_index, test in enumerate(self.tests):
    149                 result = worker_run_one_test(test_index, test)
    150                 self.consume_test_result(result)
    151         else:
    152             self.execute_tests_in_pool(jobs, max_time)
    153 
    154         # Mark any tests that weren't run as UNRESOLVED.
    155         for test in self.tests:
    156             if test.result is None:
    157                 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
    158 
    159     def consume_test_result(self, pool_result):
    160         """Test completion callback for worker_run_one_test
    161 
    162         Updates the test result status in the parent process. Each task in the
    163         pool returns the test index and the result, and we use the index to look
    164         up the original test object. Also updates the progress bar as tasks
    165         complete.
    166         """
    167         # Don't add any more test results after we've hit the maximum failure
    168         # count.  Otherwise we're racing with the main thread, which is going
    169         # to terminate the process pool soon.
    170         if self.hit_max_failures:
    171             return
    172 
    173         (test_index, test_with_result) = pool_result
    174         # Update the parent process copy of the test. This includes the result,
    175         # XFAILS, REQUIRES, and UNSUPPORTED statuses.
    176         assert self.tests[test_index].file_path == test_with_result.file_path, \
    177                 "parent and child disagree on test path"
    178         self.tests[test_index] = test_with_result
    179         self.display.update(test_with_result)
    180 
    181         # If we've finished all the tests or too many tests have failed, notify
    182         # the main thread that we've stopped testing.
    183         self.failure_count += (test_with_result.result.code == lit.Test.FAIL)
    184         if self.lit_config.maxFailures and \
    185                 self.failure_count == self.lit_config.maxFailures:
    186             self.hit_max_failures = True
    187 
    188 def _execute_test_impl(test, lit_config, parallelism_semaphores):
    189     """Execute one test"""
    190     pg = test.config.parallelism_group
    191     if callable(pg):
    192         pg = pg(test)
    193 
    194     result = None
    195     semaphore = None
    196     try:
    197         if pg:
    198             semaphore = parallelism_semaphores[pg]
    199         if semaphore:
    200             semaphore.acquire()
    201         start_time = time.time()
    202         result = test.config.test_format.execute(test, lit_config)
    203         # Support deprecated result from execute() which returned the result
    204         # code and additional output as a tuple.
    205         if isinstance(result, tuple):
    206             code, output = result
    207             result = lit.Test.Result(code, output)
    208         elif not isinstance(result, lit.Test.Result):
    209             raise ValueError("unexpected result from test execution")
    210         result.elapsed = time.time() - start_time
    211     except KeyboardInterrupt:
    212         raise
    213     except:
    214         if lit_config.debug:
    215             raise
    216         output = 'Exception during script execution:\n'
    217         output += traceback.format_exc()
    218         output += '\n'
    219         result = lit.Test.Result(lit.Test.UNRESOLVED, output)
    220     finally:
    221         if semaphore:
    222             semaphore.release()
    223 
    224     test.setResult(result)
    225 
    226 child_lit_config = None
    227 child_parallelism_semaphores = None
    228 
    229 def worker_initializer(lit_config, parallelism_semaphores):
    230     """Copy expensive repeated data into worker processes"""
    231     global child_lit_config
    232     child_lit_config = lit_config
    233     global child_parallelism_semaphores
    234     child_parallelism_semaphores = parallelism_semaphores
    235 
    236 def worker_run_one_test(test_index, test):
    237     """Run one test in a multiprocessing.Pool
    238 
    239     Side effects in this function and functions it calls are not visible in the
    240     main lit process.
    241 
    242     Arguments and results of this function are pickled, so they should be cheap
    243     to copy. For efficiency, we copy all data needed to execute all tests into
    244     each worker and store it in the child_* global variables. This reduces the
    245     cost of each task.
    246 
    247     Returns an index and a Result, which the parent process uses to update
    248     the display.
    249     """
    250     try:
    251         _execute_test_impl(test, child_lit_config, child_parallelism_semaphores)
    252         return (test_index, test)
    253     except KeyboardInterrupt as e:
    254         # If a worker process gets an interrupt, abort it immediately.
    255         abort_now()
    256     except:
    257         traceback.print_exc()
    258