1 # Copyright 2012 the V8 project authors. All rights reserved. 2 # Redistribution and use in source and binary forms, with or without 3 # modification, are permitted provided that the following conditions are 4 # met: 5 # 6 # * Redistributions of source code must retain the above copyright 7 # notice, this list of conditions and the following disclaimer. 8 # * Redistributions in binary form must reproduce the above 9 # copyright notice, this list of conditions and the following 10 # disclaimer in the documentation and/or other materials provided 11 # with the distribution. 12 # * Neither the name of Google Inc. nor the names of its 13 # contributors may be used to endorse or promote products derived 14 # from this software without specific prior written permission. 15 # 16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 28 29 import os 30 import socket 31 import subprocess 32 import threading 33 import time 34 35 from . import distro 36 from ..local import execution 37 from ..local import perfdata 38 from ..objects import peer 39 from ..objects import workpacket 40 from ..server import compression 41 from ..server import constants 42 from ..server import local_handler 43 from ..server import signatures 44 45 46 def GetPeers(): 47 data = local_handler.LocalQuery([constants.REQUEST_PEERS]) 48 if not data: return [] 49 return [ peer.Peer.Unpack(p) for p in data ] 50 51 52 class NetworkedRunner(execution.Runner): 53 def __init__(self, suites, progress_indicator, context, peers, workspace): 54 self.suites = suites 55 datapath = os.path.join("out", "testrunner_data") 56 # TODO(machenbach): These fields should exist now in the superclass. 57 # But there is no super constructor call. Check if this is a problem. 58 self.perf_data_manager = perfdata.PerfDataManager(datapath) 59 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode) 60 for s in suites: 61 for t in s.tests: 62 t.duration = self.perfdata.FetchPerfData(t) or 1.0 63 self._CommonInit(suites, progress_indicator, context) 64 self.tests = [] # Only used if we need to fall back to local execution. 65 self.tests_lock = threading.Lock() 66 self.peers = peers 67 self.pubkey_fingerprint = None # Fetched later. 68 self.base_rev = subprocess.check_output( 69 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace, 70 shell=True).strip() 71 self.base_svn_rev = subprocess.check_output( 72 "cd %s; git log -1 %s" # Get commit description. 73 " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line. 74 " | awk '{print $2}'" # Extract "repository@revision" part. 75 " | sed -e 's/.*@//'" % # Strip away "repository@". 76 (workspace, self.base_rev), shell=True).strip() 77 self.patch = subprocess.check_output( 78 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True) 79 self.binaries = {} 80 self.initialization_lock = threading.Lock() 81 self.initialization_lock.acquire() # Released when init is done. 82 self._OpenLocalConnection() 83 self.local_receiver_thread = threading.Thread( 84 target=self._ListenLocalConnection) 85 self.local_receiver_thread.daemon = True 86 self.local_receiver_thread.start() 87 self.initialization_lock.acquire() 88 self.initialization_lock.release() 89 90 def _OpenLocalConnection(self): 91 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 92 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT)) 93 if code != 0: 94 raise RuntimeError("Failed to connect to local server") 95 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket) 96 97 def _ListenLocalConnection(self): 98 release_lock_countdown = 1 # Pubkey. 99 self.local_receiver = compression.Receiver(self.local_socket) 100 while not self.local_receiver.IsDone(): 101 data = self.local_receiver.Current() 102 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT: 103 pubkey = data[1] 104 if not pubkey: raise RuntimeError("Received empty public key") 105 self.pubkey_fingerprint = pubkey 106 release_lock_countdown -= 1 107 if release_lock_countdown == 0: 108 self.initialization_lock.release() 109 release_lock_countdown -= 1 # Prevent repeated triggering. 110 self.local_receiver.Advance() 111 112 def Run(self, jobs): 113 self.indicator.Starting() 114 need_libv8 = False 115 for s in self.suites: 116 shell = s.shell() 117 if shell not in self.binaries: 118 path = os.path.join(self.context.shell_dir, shell) 119 # Check if this is a shared library build. 120 try: 121 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path), 122 shell=True) 123 ldd = ldd.strip().split(" ") 124 assert ldd[0] == "libv8.so" 125 assert ldd[1] == "=>" 126 need_libv8 = True 127 binary_needs_libv8 = True 128 libv8 = signatures.ReadFileAndSignature(ldd[2]) 129 except: 130 binary_needs_libv8 = False 131 binary = signatures.ReadFileAndSignature(path) 132 if binary[0] is None: 133 print("Error: Failed to create signature.") 134 assert binary[1] != 0 135 return binary[1] 136 binary.append(binary_needs_libv8) 137 self.binaries[shell] = binary 138 if need_libv8: 139 self.binaries["libv8.so"] = libv8 140 distro.Assign(self.suites, self.peers) 141 # Spawn one thread for each peer. 142 threads = [] 143 for p in self.peers: 144 thread = threading.Thread(target=self._TalkToPeer, args=[p]) 145 threads.append(thread) 146 thread.start() 147 try: 148 for thread in threads: 149 # Use a timeout so that signals (Ctrl+C) will be processed. 150 thread.join(timeout=10000000) 151 self._AnalyzePeerRuntimes() 152 except KeyboardInterrupt: 153 self.terminate = True 154 raise 155 except Exception, _e: 156 # If there's an exception we schedule an interruption for any 157 # remaining threads... 158 self.terminate = True 159 # ...and then reraise the exception to bail out. 160 raise 161 compression.Send(constants.END_OF_STREAM, self.local_socket) 162 self.local_socket.close() 163 if self.tests: 164 self._RunInternal(jobs) 165 self.indicator.Done() 166 return not self.failed 167 168 def _TalkToPeer(self, peer): 169 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 170 sock.settimeout(self.context.timeout + 10) 171 code = sock.connect_ex((peer.address, constants.PEER_PORT)) 172 if code == 0: 173 try: 174 peer.runtime = None 175 start_time = time.time() 176 packet = workpacket.WorkPacket(peer=peer, context=self.context, 177 base_revision=self.base_svn_rev, 178 patch=self.patch, 179 pubkey=self.pubkey_fingerprint) 180 data, test_map = packet.Pack(self.binaries) 181 compression.Send(data, sock) 182 compression.Send(constants.END_OF_STREAM, sock) 183 rec = compression.Receiver(sock) 184 while not rec.IsDone() and not self.terminate: 185 data_list = rec.Current() 186 for data in data_list: 187 test_id = data[0] 188 if test_id < 0: 189 # The peer is reporting an error. 190 with self.lock: 191 print("\nPeer %s reports error: %s" % (peer.address, data[1])) 192 continue 193 test = test_map.pop(test_id) 194 test.MergeResult(data) 195 try: 196 self.perfdata.UpdatePerfData(test) 197 except Exception, e: 198 print("UpdatePerfData exception: %s" % e) 199 pass # Just keep working. 200 with self.lock: 201 perf_key = self.perfdata.GetKey(test) 202 compression.Send( 203 [constants.INFORM_DURATION, perf_key, test.duration, 204 self.context.arch, self.context.mode], 205 self.local_socket) 206 has_unexpected_output = test.suite.HasUnexpectedOutput(test) 207 if has_unexpected_output: 208 self.failed.append(test) 209 if test.output.HasCrashed(): 210 self.crashed += 1 211 else: 212 self.succeeded += 1 213 self.remaining -= 1 214 self.indicator.HasRun(test, has_unexpected_output) 215 rec.Advance() 216 peer.runtime = time.time() - start_time 217 except KeyboardInterrupt: 218 sock.close() 219 raise 220 except Exception, e: 221 print("Got exception: %s" % e) 222 pass # Fall back to local execution. 223 else: 224 compression.Send([constants.UNRESPONSIVE_PEER, peer.address], 225 self.local_socket) 226 sock.close() 227 if len(test_map) > 0: 228 # Some tests have not received any results. Run them locally. 229 print("\nNo results for %d tests, running them locally." % len(test_map)) 230 self._EnqueueLocally(test_map) 231 232 def _EnqueueLocally(self, test_map): 233 with self.tests_lock: 234 for test in test_map: 235 self.tests.append(test_map[test]) 236 237 def _AnalyzePeerRuntimes(self): 238 total_runtime = 0.0 239 total_work = 0.0 240 for p in self.peers: 241 if p.runtime is None: 242 return 243 total_runtime += p.runtime 244 total_work += p.assigned_work 245 for p in self.peers: 246 p.assigned_work /= total_work 247 p.runtime /= total_runtime 248 perf_correction = p.assigned_work / p.runtime 249 old_perf = p.relative_performance 250 p.relative_performance = (old_perf + perf_correction) / 2.0 251 compression.Send([constants.UPDATE_PERF, p.address, 252 p.relative_performance], 253 self.local_socket) 254