Home | History | Annotate | Download | only in lit
      1 import os
      2 import threading
      3 import time
      4 import traceback
      5 try:
      6     import Queue as queue
      7 except ImportError:
      8     import queue
      9 
     10 try:
     11     import win32api
     12 except ImportError:
     13     win32api = None
     14 
     15 try:
     16     import multiprocessing
     17 except ImportError:
     18     multiprocessing = None
     19 
     20 import lit.Test
     21 
     22 ###
     23 # Test Execution Implementation
     24 
     25 class LockedValue(object):
     26     def __init__(self, value):
     27         self.lock = threading.Lock()
     28         self._value = value
     29 
     30     def _get_value(self):
     31         self.lock.acquire()
     32         try:
     33             return self._value
     34         finally:
     35             self.lock.release()
     36 
     37     def _set_value(self, value):
     38         self.lock.acquire()
     39         try:
     40             self._value = value
     41         finally:
     42             self.lock.release()
     43 
     44     value = property(_get_value, _set_value)
     45 
     46 class TestProvider(object):
     47     def __init__(self, queue_impl, canceled_flag):
     48         self.canceled_flag = canceled_flag
     49 
     50         # Create a shared queue to provide the test indices.
     51         self.queue = queue_impl()
     52 
     53     def queue_tests(self, tests, num_jobs):
     54         for i in range(len(tests)):
     55             self.queue.put(i)
     56         for i in range(num_jobs):
     57             self.queue.put(None)
     58 
     59     def cancel(self):
     60         self.canceled_flag.value = 1
     61 
     62     def get(self):
     63         # Check if we are canceled.
     64         if self.canceled_flag.value:
     65           return None
     66 
     67         # Otherwise take the next test.
     68         return self.queue.get()
     69 
     70 class Tester(object):
     71     def __init__(self, run_instance, provider, consumer):
     72         self.run_instance = run_instance
     73         self.provider = provider
     74         self.consumer = consumer
     75 
     76     def run(self):
     77         while True:
     78             item = self.provider.get()
     79             if item is None:
     80                 break
     81             self.run_test(item)
     82         self.consumer.task_finished()
     83 
     84     def run_test(self, test_index):
     85         test = self.run_instance.tests[test_index]
     86         try:
     87             self.run_instance.execute_test(test)
     88         except KeyboardInterrupt:
     89             # This is a sad hack. Unfortunately subprocess goes
     90             # bonkers with ctrl-c and we start forking merrily.
     91             print('\nCtrl-C detected, goodbye.')
     92             os.kill(0,9)
     93         self.consumer.update(test_index, test)
     94 
     95 class ThreadResultsConsumer(object):
     96     def __init__(self, display):
     97         self.display = display
     98         self.lock = threading.Lock()
     99 
    100     def update(self, test_index, test):
    101         self.lock.acquire()
    102         try:
    103             self.display.update(test)
    104         finally:
    105             self.lock.release()
    106 
    107     def task_finished(self):
    108         pass
    109 
    110     def handle_results(self):
    111         pass
    112 
    113 class MultiprocessResultsConsumer(object):
    114     def __init__(self, run, display, num_jobs):
    115         self.run = run
    116         self.display = display
    117         self.num_jobs = num_jobs
    118         self.queue = multiprocessing.Queue()
    119 
    120     def update(self, test_index, test):
    121         # This method is called in the child processes, and communicates the
    122         # results to the actual display implementation via an output queue.
    123         self.queue.put((test_index, test.result))
    124 
    125     def task_finished(self):
    126         # This method is called in the child processes, and communicates that
    127         # individual tasks are complete.
    128         self.queue.put(None)
    129 
    130     def handle_results(self):
    131         # This method is called in the parent, and consumes the results from the
    132         # output queue and dispatches to the actual display. The method will
    133         # complete after each of num_jobs tasks has signalled completion.
    134         completed = 0
    135         while completed != self.num_jobs:
    136             # Wait for a result item.
    137             item = self.queue.get()
    138             if item is None:
    139                 completed += 1
    140                 continue
    141 
    142             # Update the test result in the parent process.
    143             index,result = item
    144             test = self.run.tests[index]
    145             test.result = result
    146 
    147             self.display.update(test)
    148 
    149 def run_one_tester(run, provider, display):
    150     tester = Tester(run, provider, display)
    151     tester.run()
    152 
    153 ###
    154 
    155 class Run(object):
    156     """
    157     This class represents a concrete, configured testing run.
    158     """
    159 
    160     def __init__(self, lit_config, tests):
    161         self.lit_config = lit_config
    162         self.tests = tests
    163 
    164     def execute_test(self, test):
    165         result = None
    166         start_time = time.time()
    167         try:
    168             result = test.config.test_format.execute(test, self.lit_config)
    169 
    170             # Support deprecated result from execute() which returned the result
    171             # code and additional output as a tuple.
    172             if isinstance(result, tuple):
    173                 code, output = result
    174                 result = lit.Test.Result(code, output)
    175             elif not isinstance(result, lit.Test.Result):
    176                 raise ValueError("unexpected result from test execution")
    177         except KeyboardInterrupt:
    178             raise
    179         except:
    180             if self.lit_config.debug:
    181                 raise
    182             output = 'Exception during script execution:\n'
    183             output += traceback.format_exc()
    184             output += '\n'
    185             result = lit.Test.Result(lit.Test.UNRESOLVED, output)
    186         result.elapsed = time.time() - start_time
    187 
    188         test.setResult(result)
    189 
    190     def execute_tests(self, display, jobs, max_time=None,
    191                       use_processes=False):
    192         """
    193         execute_tests(display, jobs, [max_time])
    194 
    195         Execute each of the tests in the run, using up to jobs number of
    196         parallel tasks, and inform the display of each individual result. The
    197         provided tests should be a subset of the tests available in this run
    198         object.
    199 
    200         If max_time is non-None, it should be a time in seconds after which to
    201         stop executing tests.
    202 
    203         The display object will have its update method called with each test as
    204         it is completed. The calls are guaranteed to be locked with respect to
    205         one another, but are *not* guaranteed to be called on the same thread as
    206         this method was invoked on.
    207 
    208         Upon completion, each test in the run will have its result
    209         computed. Tests which were not actually executed (for any reason) will
    210         be given an UNRESOLVED result.
    211         """
    212 
    213         # Choose the appropriate parallel execution implementation.
    214         consumer = None
    215         if jobs != 1 and use_processes and multiprocessing:
    216             try:
    217                 task_impl = multiprocessing.Process
    218                 queue_impl = multiprocessing.Queue
    219                 canceled_flag =  multiprocessing.Value('i', 0)
    220                 consumer = MultiprocessResultsConsumer(self, display, jobs)
    221             except:
    222                 # multiprocessing fails to initialize with certain OpenBSD and
    223                 # FreeBSD Python versions: http://bugs.python.org/issue3770
    224                 # Unfortunately the error raised also varies by platform.
    225                 self.lit_config.note('failed to initialize multiprocessing')
    226                 consumer = None
    227         if not consumer:
    228             task_impl = threading.Thread
    229             queue_impl = queue.Queue
    230             canceled_flag = LockedValue(0)
    231             consumer = ThreadResultsConsumer(display)
    232 
    233         # Create the test provider.
    234         provider = TestProvider(queue_impl, canceled_flag)
    235 
    236         # Queue the tests outside the main thread because we can't guarantee
    237         # that we can put() all the tests without blocking:
    238         # https://docs.python.org/2/library/multiprocessing.html
    239         # e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
    240         # without taking any out.
    241         queuer = task_impl(target=provider.queue_tests, args=(self.tests, jobs))
    242         queuer.start()
    243 
    244         # Install a console-control signal handler on Windows.
    245         if win32api is not None:
    246             def console_ctrl_handler(type):
    247                 provider.cancel()
    248                 return True
    249             win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
    250 
    251         # Install a timeout handler, if requested.
    252         if max_time is not None:
    253             def timeout_handler():
    254                 provider.cancel()
    255             timeout_timer = threading.Timer(max_time, timeout_handler)
    256             timeout_timer.start()
    257 
    258         # If not using multiple tasks, just run the tests directly.
    259         if jobs == 1:
    260             run_one_tester(self, provider, consumer)
    261         else:
    262             # Otherwise, execute the tests in parallel
    263             self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
    264 
    265         queuer.join()
    266 
    267         # Cancel the timeout handler.
    268         if max_time is not None:
    269             timeout_timer.cancel()
    270 
    271         # Update results for any tests which weren't run.
    272         for test in self.tests:
    273             if test.result is None:
    274                 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
    275 
    276     def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
    277         # Start all of the tasks.
    278         tasks = [task_impl(target=run_one_tester,
    279                            args=(self, provider, consumer))
    280                  for i in range(jobs)]
    281         for t in tasks:
    282             t.start()
    283 
    284         # Allow the consumer to handle results, if necessary.
    285         consumer.handle_results()
    286 
    287         # Wait for all the tasks to complete.
    288         for t in tasks:
    289             t.join()
    290