Home | History | Annotate | Download | only in testproc
      1 # Copyright 2018 the V8 project authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 from .result import SKIPPED
      6 
      7 
      8 """
      9 Pipeline
     10 
     11 Test processors are chained together and communicate with each other by
     12 calling previous/next processor in the chain.
     13      ----next_test()---->     ----next_test()---->
     14 Proc1                    Proc2                    Proc3
     15      <---result_for()----     <---result_for()----
     16 
     17 For every next_test there is exactly one result_for call.
     18 If processor ignores the test it has to return SkippedResult.
     19 If it created multiple subtests for one test and wants to pass all of them to
     20 the previous processor it can enclose them in GroupedResult.
     21 
     22 
     23 Subtests
     24 
     25 When test processor needs to modify the test or create some variants of the
     26 test it creates subtests and sends them to the next processor.
     27 Each subtest has:
     28 - procid - globally unique id that should contain id of the parent test and
     29           some suffix given by test processor, e.g. its name + subtest type.
     30 - processor - which created it
     31 - origin - pointer to the parent (sub)test
     32 """
     33 
     34 
     35 DROP_RESULT = 0
     36 DROP_OUTPUT = 1
     37 DROP_PASS_OUTPUT = 2
     38 DROP_PASS_STDOUT = 3
     39 
     40 
     41 class TestProc(object):
     42   def __init__(self):
     43     self._prev_proc = None
     44     self._next_proc = None
     45     self._stopped = False
     46     self._requirement = DROP_RESULT
     47     self._prev_requirement = None
     48     self._reduce_result = lambda result: result
     49 
     50   def connect_to(self, next_proc):
     51     """Puts `next_proc` after itself in the chain."""
     52     next_proc._prev_proc = self
     53     self._next_proc = next_proc
     54 
     55   def remove_from_chain(self):
     56     if self._prev_proc:
     57       self._prev_proc._next_proc = self._next_proc
     58     if self._next_proc:
     59       self._next_proc._prev_proc = self._prev_proc
     60 
     61   def setup(self, requirement=DROP_RESULT):
     62     """
     63     Method called by previous processor or processor pipeline creator to let
     64     the processors know what part of the result can be ignored.
     65     """
     66     self._prev_requirement = requirement
     67     if self._next_proc:
     68       self._next_proc.setup(max(requirement, self._requirement))
     69 
     70     # Since we're not winning anything by droping part of the result we are
     71     # dropping the whole result or pass it as it is. The real reduction happens
     72     # during result creation (in the output processor), so the result is
     73     # immutable.
     74     if (self._prev_requirement < self._requirement and
     75         self._prev_requirement == DROP_RESULT):
     76       self._reduce_result = lambda _: None
     77 
     78   def next_test(self, test):
     79     """
     80     Method called by previous processor whenever it produces new test.
     81     This method shouldn't be called by anyone except previous processor.
     82     """
     83     raise NotImplementedError()
     84 
     85   def result_for(self, test, result):
     86     """
     87     Method called by next processor whenever it has result for some test.
     88     This method shouldn't be called by anyone except next processor.
     89     """
     90     raise NotImplementedError()
     91 
     92   def heartbeat(self):
     93     if self._prev_proc:
     94       self._prev_proc.heartbeat()
     95 
     96   def stop(self):
     97     if not self._stopped:
     98       self._stopped = True
     99       if self._prev_proc:
    100         self._prev_proc.stop()
    101       if self._next_proc:
    102         self._next_proc.stop()
    103 
    104   @property
    105   def is_stopped(self):
    106     return self._stopped
    107 
    108   ### Communication
    109 
    110   def _send_test(self, test):
    111     """Helper method for sending test to the next processor."""
    112     self._next_proc.next_test(test)
    113 
    114   def _send_result(self, test, result):
    115     """Helper method for sending result to the previous processor."""
    116     if not test.keep_output:
    117       result = self._reduce_result(result)
    118     self._prev_proc.result_for(test, result)
    119 
    120 
    121 
    122 class TestProcObserver(TestProc):
    123   """Processor used for observing the data."""
    124   def __init__(self):
    125     super(TestProcObserver, self).__init__()
    126 
    127   def next_test(self, test):
    128     self._on_next_test(test)
    129     self._send_test(test)
    130 
    131   def result_for(self, test, result):
    132     self._on_result_for(test, result)
    133     self._send_result(test, result)
    134 
    135   def heartbeat(self):
    136     self._on_heartbeat()
    137     super(TestProcObserver, self).heartbeat()
    138 
    139   def _on_next_test(self, test):
    140     """Method called after receiving test from previous processor but before
    141     sending it to the next one."""
    142     pass
    143 
    144   def _on_result_for(self, test, result):
    145     """Method called after receiving result from next processor but before
    146     sending it to the previous one."""
    147     pass
    148 
    149   def _on_heartbeat(self):
    150     pass
    151 
    152 
    153 class TestProcProducer(TestProc):
    154   """Processor for creating subtests."""
    155 
    156   def __init__(self, name):
    157     super(TestProcProducer, self).__init__()
    158     self._name = name
    159 
    160   def next_test(self, test):
    161     self._next_test(test)
    162 
    163   def result_for(self, subtest, result):
    164     self._result_for(subtest.origin, subtest, result)
    165 
    166   ### Implementation
    167   def _next_test(self, test):
    168     raise NotImplementedError()
    169 
    170   def _result_for(self, test, subtest, result):
    171     """
    172     result_for method extended with `subtest` parameter.
    173 
    174     Args
    175       test: test used by current processor to create the subtest.
    176       subtest: test for which the `result` is.
    177       result: subtest execution result created by the output processor.
    178     """
    179     raise NotImplementedError()
    180 
    181   ### Managing subtests
    182   def _create_subtest(self, test, subtest_id, **kwargs):
    183     """Creates subtest with subtest id <processor name>-`subtest_id`."""
    184     return test.create_subtest(self, '%s-%s' % (self._name, subtest_id),
    185                                **kwargs)
    186 
    187 
    188 class TestProcFilter(TestProc):
    189   """Processor for filtering tests."""
    190 
    191   def next_test(self, test):
    192     if self._filter(test):
    193       self._send_result(test, SKIPPED)
    194     else:
    195       self._send_test(test)
    196 
    197   def result_for(self, test, result):
    198     self._send_result(test, result)
    199 
    200   def _filter(self, test):
    201     """Returns whether test should be filtered out."""
    202     raise NotImplementedError()
    203