Home | History | Annotate | Download | only in network
      1 # Copyright 2012 the V8 project authors. All rights reserved.
      2 # Redistribution and use in source and binary forms, with or without
      3 # modification, are permitted provided that the following conditions are
      4 # met:
      5 #
      6 #     * Redistributions of source code must retain the above copyright
      7 #       notice, this list of conditions and the following disclaimer.
      8 #     * Redistributions in binary form must reproduce the above
      9 #       copyright notice, this list of conditions and the following
     10 #       disclaimer in the documentation and/or other materials provided
     11 #       with the distribution.
     12 #     * Neither the name of Google Inc. nor the names of its
     13 #       contributors may be used to endorse or promote products derived
     14 #       from this software without specific prior written permission.
     15 #
     16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27 
     28 
     29 import os
     30 import socket
     31 import subprocess
     32 import threading
     33 import time
     34 
     35 from . import distro
     36 from ..local import execution
     37 from ..local import perfdata
     38 from ..objects import peer
     39 from ..objects import workpacket
     40 from ..server import compression
     41 from ..server import constants
     42 from ..server import local_handler
     43 from ..server import signatures
     44 
     45 
     46 def GetPeers():
     47   data = local_handler.LocalQuery([constants.REQUEST_PEERS])
     48   if not data: return []
     49   return [ peer.Peer.Unpack(p) for p in data ]
     50 
     51 
     52 class NetworkedRunner(execution.Runner):
     53   def __init__(self, suites, progress_indicator, context, peers, workspace):
     54     self.suites = suites
     55     num_tests = 0
     56     datapath = os.path.join("out", "testrunner_data")
     57     # TODO(machenbach): These fields should exist now in the superclass.
     58     # But there is no super constructor call. Check if this is a problem.
     59     self.perf_data_manager = perfdata.PerfDataManager(datapath)
     60     self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
     61     for s in suites:
     62       for t in s.tests:
     63         t.duration = self.perfdata.FetchPerfData(t) or 1.0
     64       num_tests += len(s.tests)
     65     self._CommonInit(num_tests, progress_indicator, context)
     66     self.tests = []  # Only used if we need to fall back to local execution.
     67     self.tests_lock = threading.Lock()
     68     self.peers = peers
     69     self.pubkey_fingerprint = None  # Fetched later.
     70     self.base_rev = subprocess.check_output(
     71         "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
     72         shell=True).strip()
     73     self.base_svn_rev = subprocess.check_output(
     74         "cd %s; git log -1 %s"          # Get commit description.
     75         " | grep -e '^\s*git-svn-id:'"  # Extract "git-svn-id" line.
     76         " | awk '{print $2}'"           # Extract "repository@revision" part.
     77         " | sed -e 's/.*@//'" %         # Strip away "repository@".
     78         (workspace, self.base_rev), shell=True).strip()
     79     self.patch = subprocess.check_output(
     80         "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
     81     self.binaries = {}
     82     self.initialization_lock = threading.Lock()
     83     self.initialization_lock.acquire()  # Released when init is done.
     84     self._OpenLocalConnection()
     85     self.local_receiver_thread = threading.Thread(
     86         target=self._ListenLocalConnection)
     87     self.local_receiver_thread.daemon = True
     88     self.local_receiver_thread.start()
     89     self.initialization_lock.acquire()
     90     self.initialization_lock.release()
     91 
     92   def _OpenLocalConnection(self):
     93     self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     94     code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
     95     if code != 0:
     96       raise RuntimeError("Failed to connect to local server")
     97     compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
     98 
     99   def _ListenLocalConnection(self):
    100     release_lock_countdown = 1  # Pubkey.
    101     self.local_receiver = compression.Receiver(self.local_socket)
    102     while not self.local_receiver.IsDone():
    103       data = self.local_receiver.Current()
    104       if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
    105         pubkey = data[1]
    106         if not pubkey: raise RuntimeError("Received empty public key")
    107         self.pubkey_fingerprint = pubkey
    108         release_lock_countdown -= 1
    109       if release_lock_countdown == 0:
    110         self.initialization_lock.release()
    111         release_lock_countdown -= 1  # Prevent repeated triggering.
    112       self.local_receiver.Advance()
    113 
    114   def Run(self, jobs):
    115     self.indicator.Starting()
    116     need_libv8 = False
    117     for s in self.suites:
    118       shell = s.shell()
    119       if shell not in self.binaries:
    120         path = os.path.join(self.context.shell_dir, shell)
    121         # Check if this is a shared library build.
    122         try:
    123           ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
    124                                         shell=True)
    125           ldd = ldd.strip().split(" ")
    126           assert ldd[0] == "libv8.so"
    127           assert ldd[1] == "=>"
    128           need_libv8 = True
    129           binary_needs_libv8 = True
    130           libv8 = signatures.ReadFileAndSignature(ldd[2])
    131         except:
    132           binary_needs_libv8 = False
    133         binary = signatures.ReadFileAndSignature(path)
    134         if binary[0] is None:
    135           print("Error: Failed to create signature.")
    136           assert binary[1] != 0
    137           return binary[1]
    138         binary.append(binary_needs_libv8)
    139         self.binaries[shell] = binary
    140     if need_libv8:
    141       self.binaries["libv8.so"] = libv8
    142     distro.Assign(self.suites, self.peers)
    143     # Spawn one thread for each peer.
    144     threads = []
    145     for p in self.peers:
    146       thread = threading.Thread(target=self._TalkToPeer, args=[p])
    147       threads.append(thread)
    148       thread.start()
    149     try:
    150       for thread in threads:
    151         # Use a timeout so that signals (Ctrl+C) will be processed.
    152         thread.join(timeout=10000000)
    153       self._AnalyzePeerRuntimes()
    154     except KeyboardInterrupt:
    155       self.terminate = True
    156       raise
    157     except Exception, _e:
    158       # If there's an exception we schedule an interruption for any
    159       # remaining threads...
    160       self.terminate = True
    161       # ...and then reraise the exception to bail out.
    162       raise
    163     compression.Send(constants.END_OF_STREAM, self.local_socket)
    164     self.local_socket.close()
    165     if self.tests:
    166       self._RunInternal(jobs)
    167     self.indicator.Done()
    168     return not self.failed
    169 
    170   def _TalkToPeer(self, peer):
    171     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    172     sock.settimeout(self.context.timeout + 10)
    173     code = sock.connect_ex((peer.address, constants.PEER_PORT))
    174     if code == 0:
    175       try:
    176         peer.runtime = None
    177         start_time = time.time()
    178         packet = workpacket.WorkPacket(peer=peer, context=self.context,
    179                                        base_revision=self.base_svn_rev,
    180                                        patch=self.patch,
    181                                        pubkey=self.pubkey_fingerprint)
    182         data, test_map = packet.Pack(self.binaries)
    183         compression.Send(data, sock)
    184         compression.Send(constants.END_OF_STREAM, sock)
    185         rec = compression.Receiver(sock)
    186         while not rec.IsDone() and not self.terminate:
    187           data_list = rec.Current()
    188           for data in data_list:
    189             test_id = data[0]
    190             if test_id < 0:
    191               # The peer is reporting an error.
    192               with self.lock:
    193                 print("\nPeer %s reports error: %s" % (peer.address, data[1]))
    194               continue
    195             test = test_map.pop(test_id)
    196             test.MergeResult(data)
    197             try:
    198               self.perfdata.UpdatePerfData(test)
    199             except Exception, e:
    200               print("UpdatePerfData exception: %s" % e)
    201               pass  # Just keep working.
    202             with self.lock:
    203               perf_key = self.perfdata.GetKey(test)
    204               compression.Send(
    205                   [constants.INFORM_DURATION, perf_key, test.duration,
    206                    self.context.arch, self.context.mode],
    207                   self.local_socket)
    208               self.indicator.AboutToRun(test)
    209               has_unexpected_output = test.suite.HasUnexpectedOutput(test)
    210               if has_unexpected_output:
    211                 self.failed.append(test)
    212                 if test.output.HasCrashed():
    213                   self.crashed += 1
    214               else:
    215                 self.succeeded += 1
    216               self.remaining -= 1
    217               self.indicator.HasRun(test, has_unexpected_output)
    218           rec.Advance()
    219         peer.runtime = time.time() - start_time
    220       except KeyboardInterrupt:
    221         sock.close()
    222         raise
    223       except Exception, e:
    224         print("Got exception: %s" % e)
    225         pass  # Fall back to local execution.
    226     else:
    227       compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
    228                        self.local_socket)
    229     sock.close()
    230     if len(test_map) > 0:
    231       # Some tests have not received any results. Run them locally.
    232       print("\nNo results for %d tests, running them locally." % len(test_map))
    233       self._EnqueueLocally(test_map)
    234 
    235   def _EnqueueLocally(self, test_map):
    236     with self.tests_lock:
    237       for test in test_map:
    238         self.tests.append(test_map[test])
    239 
    240   def _AnalyzePeerRuntimes(self):
    241     total_runtime = 0.0
    242     total_work = 0.0
    243     for p in self.peers:
    244       if p.runtime is None:
    245         return
    246       total_runtime += p.runtime
    247       total_work += p.assigned_work
    248     for p in self.peers:
    249       p.assigned_work /= total_work
    250       p.runtime /= total_runtime
    251       perf_correction = p.assigned_work / p.runtime
    252       old_perf = p.relative_performance
    253       p.relative_performance = (old_perf + perf_correction) / 2.0
    254       compression.Send([constants.UPDATE_PERF, p.address,
    255                         p.relative_performance],
    256                        self.local_socket)
    257