1 # Copyright 2012 the V8 project authors. All rights reserved. 2 # Redistribution and use in source and binary forms, with or without 3 # modification, are permitted provided that the following conditions are 4 # met: 5 # 6 # * Redistributions of source code must retain the above copyright 7 # notice, this list of conditions and the following disclaimer. 8 # * Redistributions in binary form must reproduce the above 9 # copyright notice, this list of conditions and the following 10 # disclaimer in the documentation and/or other materials provided 11 # with the distribution. 12 # * Neither the name of Google Inc. nor the names of its 13 # contributors may be used to endorse or promote products derived 14 # from this software without specific prior written permission. 15 # 16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 28 29 import os 30 import socket 31 import subprocess 32 import threading 33 import time 34 35 from . import distro 36 from ..local import execution 37 from ..local import perfdata 38 from ..objects import peer 39 from ..objects import workpacket 40 from ..server import compression 41 from ..server import constants 42 from ..server import local_handler 43 from ..server import signatures 44 45 46 def GetPeers(): 47 data = local_handler.LocalQuery([constants.REQUEST_PEERS]) 48 if not data: return [] 49 return [ peer.Peer.Unpack(p) for p in data ] 50 51 52 class NetworkedRunner(execution.Runner): 53 def __init__(self, suites, progress_indicator, context, peers, workspace): 54 self.suites = suites 55 num_tests = 0 56 datapath = os.path.join("out", "testrunner_data") 57 # TODO(machenbach): These fields should exist now in the superclass. 58 # But there is no super constructor call. Check if this is a problem. 59 self.perf_data_manager = perfdata.PerfDataManager(datapath) 60 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode) 61 for s in suites: 62 for t in s.tests: 63 t.duration = self.perfdata.FetchPerfData(t) or 1.0 64 num_tests += len(s.tests) 65 self._CommonInit(num_tests, progress_indicator, context) 66 self.tests = [] # Only used if we need to fall back to local execution. 67 self.tests_lock = threading.Lock() 68 self.peers = peers 69 self.pubkey_fingerprint = None # Fetched later. 70 self.base_rev = subprocess.check_output( 71 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace, 72 shell=True).strip() 73 self.base_svn_rev = subprocess.check_output( 74 "cd %s; git log -1 %s" # Get commit description. 75 " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line. 76 " | awk '{print $2}'" # Extract "repository@revision" part. 77 " | sed -e 's/.*@//'" % # Strip away "repository@". 78 (workspace, self.base_rev), shell=True).strip() 79 self.patch = subprocess.check_output( 80 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True) 81 self.binaries = {} 82 self.initialization_lock = threading.Lock() 83 self.initialization_lock.acquire() # Released when init is done. 84 self._OpenLocalConnection() 85 self.local_receiver_thread = threading.Thread( 86 target=self._ListenLocalConnection) 87 self.local_receiver_thread.daemon = True 88 self.local_receiver_thread.start() 89 self.initialization_lock.acquire() 90 self.initialization_lock.release() 91 92 def _OpenLocalConnection(self): 93 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 94 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT)) 95 if code != 0: 96 raise RuntimeError("Failed to connect to local server") 97 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket) 98 99 def _ListenLocalConnection(self): 100 release_lock_countdown = 1 # Pubkey. 101 self.local_receiver = compression.Receiver(self.local_socket) 102 while not self.local_receiver.IsDone(): 103 data = self.local_receiver.Current() 104 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT: 105 pubkey = data[1] 106 if not pubkey: raise RuntimeError("Received empty public key") 107 self.pubkey_fingerprint = pubkey 108 release_lock_countdown -= 1 109 if release_lock_countdown == 0: 110 self.initialization_lock.release() 111 release_lock_countdown -= 1 # Prevent repeated triggering. 112 self.local_receiver.Advance() 113 114 def Run(self, jobs): 115 self.indicator.Starting() 116 need_libv8 = False 117 for s in self.suites: 118 shell = s.shell() 119 if shell not in self.binaries: 120 path = os.path.join(self.context.shell_dir, shell) 121 # Check if this is a shared library build. 122 try: 123 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path), 124 shell=True) 125 ldd = ldd.strip().split(" ") 126 assert ldd[0] == "libv8.so" 127 assert ldd[1] == "=>" 128 need_libv8 = True 129 binary_needs_libv8 = True 130 libv8 = signatures.ReadFileAndSignature(ldd[2]) 131 except: 132 binary_needs_libv8 = False 133 binary = signatures.ReadFileAndSignature(path) 134 if binary[0] is None: 135 print("Error: Failed to create signature.") 136 assert binary[1] != 0 137 return binary[1] 138 binary.append(binary_needs_libv8) 139 self.binaries[shell] = binary 140 if need_libv8: 141 self.binaries["libv8.so"] = libv8 142 distro.Assign(self.suites, self.peers) 143 # Spawn one thread for each peer. 144 threads = [] 145 for p in self.peers: 146 thread = threading.Thread(target=self._TalkToPeer, args=[p]) 147 threads.append(thread) 148 thread.start() 149 try: 150 for thread in threads: 151 # Use a timeout so that signals (Ctrl+C) will be processed. 152 thread.join(timeout=10000000) 153 self._AnalyzePeerRuntimes() 154 except KeyboardInterrupt: 155 self.terminate = True 156 raise 157 except Exception, _e: 158 # If there's an exception we schedule an interruption for any 159 # remaining threads... 160 self.terminate = True 161 # ...and then reraise the exception to bail out. 162 raise 163 compression.Send(constants.END_OF_STREAM, self.local_socket) 164 self.local_socket.close() 165 if self.tests: 166 self._RunInternal(jobs) 167 self.indicator.Done() 168 return not self.failed 169 170 def _TalkToPeer(self, peer): 171 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 172 sock.settimeout(self.context.timeout + 10) 173 code = sock.connect_ex((peer.address, constants.PEER_PORT)) 174 if code == 0: 175 try: 176 peer.runtime = None 177 start_time = time.time() 178 packet = workpacket.WorkPacket(peer=peer, context=self.context, 179 base_revision=self.base_svn_rev, 180 patch=self.patch, 181 pubkey=self.pubkey_fingerprint) 182 data, test_map = packet.Pack(self.binaries) 183 compression.Send(data, sock) 184 compression.Send(constants.END_OF_STREAM, sock) 185 rec = compression.Receiver(sock) 186 while not rec.IsDone() and not self.terminate: 187 data_list = rec.Current() 188 for data in data_list: 189 test_id = data[0] 190 if test_id < 0: 191 # The peer is reporting an error. 192 with self.lock: 193 print("\nPeer %s reports error: %s" % (peer.address, data[1])) 194 continue 195 test = test_map.pop(test_id) 196 test.MergeResult(data) 197 try: 198 self.perfdata.UpdatePerfData(test) 199 except Exception, e: 200 print("UpdatePerfData exception: %s" % e) 201 pass # Just keep working. 202 with self.lock: 203 perf_key = self.perfdata.GetKey(test) 204 compression.Send( 205 [constants.INFORM_DURATION, perf_key, test.duration, 206 self.context.arch, self.context.mode], 207 self.local_socket) 208 self.indicator.AboutToRun(test) 209 has_unexpected_output = test.suite.HasUnexpectedOutput(test) 210 if has_unexpected_output: 211 self.failed.append(test) 212 if test.output.HasCrashed(): 213 self.crashed += 1 214 else: 215 self.succeeded += 1 216 self.remaining -= 1 217 self.indicator.HasRun(test, has_unexpected_output) 218 rec.Advance() 219 peer.runtime = time.time() - start_time 220 except KeyboardInterrupt: 221 sock.close() 222 raise 223 except Exception, e: 224 print("Got exception: %s" % e) 225 pass # Fall back to local execution. 226 else: 227 compression.Send([constants.UNRESPONSIVE_PEER, peer.address], 228 self.local_socket) 229 sock.close() 230 if len(test_map) > 0: 231 # Some tests have not received any results. Run them locally. 232 print("\nNo results for %d tests, running them locally." % len(test_map)) 233 self._EnqueueLocally(test_map) 234 235 def _EnqueueLocally(self, test_map): 236 with self.tests_lock: 237 for test in test_map: 238 self.tests.append(test_map[test]) 239 240 def _AnalyzePeerRuntimes(self): 241 total_runtime = 0.0 242 total_work = 0.0 243 for p in self.peers: 244 if p.runtime is None: 245 return 246 total_runtime += p.runtime 247 total_work += p.assigned_work 248 for p in self.peers: 249 p.assigned_work /= total_work 250 p.runtime /= total_runtime 251 perf_correction = p.assigned_work / p.runtime 252 old_perf = p.relative_performance 253 p.relative_performance = (old_perf + perf_correction) / 2.0 254 compression.Send([constants.UPDATE_PERF, p.address, 255 p.relative_performance], 256 self.local_socket) 257