1 # Copyright 2018 the V8 project authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 from .result import SKIPPED 6 7 8 """ 9 Pipeline 10 11 Test processors are chained together and communicate with each other by 12 calling previous/next processor in the chain. 13 ----next_test()----> ----next_test()----> 14 Proc1 Proc2 Proc3 15 <---result_for()---- <---result_for()---- 16 17 For every next_test there is exactly one result_for call. 18 If processor ignores the test it has to return SkippedResult. 19 If it created multiple subtests for one test and wants to pass all of them to 20 the previous processor it can enclose them in GroupedResult. 21 22 23 Subtests 24 25 When test processor needs to modify the test or create some variants of the 26 test it creates subtests and sends them to the next processor. 27 Each subtest has: 28 - procid - globally unique id that should contain id of the parent test and 29 some suffix given by test processor, e.g. its name + subtest type. 30 - processor - which created it 31 - origin - pointer to the parent (sub)test 32 """ 33 34 35 DROP_RESULT = 0 36 DROP_OUTPUT = 1 37 DROP_PASS_OUTPUT = 2 38 DROP_PASS_STDOUT = 3 39 40 41 class TestProc(object): 42 def __init__(self): 43 self._prev_proc = None 44 self._next_proc = None 45 self._stopped = False 46 self._requirement = DROP_RESULT 47 self._prev_requirement = None 48 self._reduce_result = lambda result: result 49 50 def connect_to(self, next_proc): 51 """Puts `next_proc` after itself in the chain.""" 52 next_proc._prev_proc = self 53 self._next_proc = next_proc 54 55 def remove_from_chain(self): 56 if self._prev_proc: 57 self._prev_proc._next_proc = self._next_proc 58 if self._next_proc: 59 self._next_proc._prev_proc = self._prev_proc 60 61 def setup(self, requirement=DROP_RESULT): 62 """ 63 Method called by previous processor or processor pipeline creator to let 64 the processors know what part of the result can be ignored. 65 """ 66 self._prev_requirement = requirement 67 if self._next_proc: 68 self._next_proc.setup(max(requirement, self._requirement)) 69 70 # Since we're not winning anything by droping part of the result we are 71 # dropping the whole result or pass it as it is. The real reduction happens 72 # during result creation (in the output processor), so the result is 73 # immutable. 74 if (self._prev_requirement < self._requirement and 75 self._prev_requirement == DROP_RESULT): 76 self._reduce_result = lambda _: None 77 78 def next_test(self, test): 79 """ 80 Method called by previous processor whenever it produces new test. 81 This method shouldn't be called by anyone except previous processor. 82 """ 83 raise NotImplementedError() 84 85 def result_for(self, test, result): 86 """ 87 Method called by next processor whenever it has result for some test. 88 This method shouldn't be called by anyone except next processor. 89 """ 90 raise NotImplementedError() 91 92 def heartbeat(self): 93 if self._prev_proc: 94 self._prev_proc.heartbeat() 95 96 def stop(self): 97 if not self._stopped: 98 self._stopped = True 99 if self._prev_proc: 100 self._prev_proc.stop() 101 if self._next_proc: 102 self._next_proc.stop() 103 104 @property 105 def is_stopped(self): 106 return self._stopped 107 108 ### Communication 109 110 def _send_test(self, test): 111 """Helper method for sending test to the next processor.""" 112 self._next_proc.next_test(test) 113 114 def _send_result(self, test, result): 115 """Helper method for sending result to the previous processor.""" 116 if not test.keep_output: 117 result = self._reduce_result(result) 118 self._prev_proc.result_for(test, result) 119 120 121 122 class TestProcObserver(TestProc): 123 """Processor used for observing the data.""" 124 def __init__(self): 125 super(TestProcObserver, self).__init__() 126 127 def next_test(self, test): 128 self._on_next_test(test) 129 self._send_test(test) 130 131 def result_for(self, test, result): 132 self._on_result_for(test, result) 133 self._send_result(test, result) 134 135 def heartbeat(self): 136 self._on_heartbeat() 137 super(TestProcObserver, self).heartbeat() 138 139 def _on_next_test(self, test): 140 """Method called after receiving test from previous processor but before 141 sending it to the next one.""" 142 pass 143 144 def _on_result_for(self, test, result): 145 """Method called after receiving result from next processor but before 146 sending it to the previous one.""" 147 pass 148 149 def _on_heartbeat(self): 150 pass 151 152 153 class TestProcProducer(TestProc): 154 """Processor for creating subtests.""" 155 156 def __init__(self, name): 157 super(TestProcProducer, self).__init__() 158 self._name = name 159 160 def next_test(self, test): 161 self._next_test(test) 162 163 def result_for(self, subtest, result): 164 self._result_for(subtest.origin, subtest, result) 165 166 ### Implementation 167 def _next_test(self, test): 168 raise NotImplementedError() 169 170 def _result_for(self, test, subtest, result): 171 """ 172 result_for method extended with `subtest` parameter. 173 174 Args 175 test: test used by current processor to create the subtest. 176 subtest: test for which the `result` is. 177 result: subtest execution result created by the output processor. 178 """ 179 raise NotImplementedError() 180 181 ### Managing subtests 182 def _create_subtest(self, test, subtest_id, **kwargs): 183 """Creates subtest with subtest id <processor name>-`subtest_id`.""" 184 return test.create_subtest(self, '%s-%s' % (self._name, subtest_id), 185 **kwargs) 186 187 188 class TestProcFilter(TestProc): 189 """Processor for filtering tests.""" 190 191 def next_test(self, test): 192 if self._filter(test): 193 self._send_result(test, SKIPPED) 194 else: 195 self._send_test(test) 196 197 def result_for(self, test, result): 198 self._send_result(test, result) 199 200 def _filter(self, test): 201 """Returns whether test should be filtered out.""" 202 raise NotImplementedError() 203