Home | History | Annotate | Download | only in network
      1 # Copyright 2012 the V8 project authors. All rights reserved.
      2 # Redistribution and use in source and binary forms, with or without
      3 # modification, are permitted provided that the following conditions are
      4 # met:
      5 #
      6 #     * Redistributions of source code must retain the above copyright
      7 #       notice, this list of conditions and the following disclaimer.
      8 #     * Redistributions in binary form must reproduce the above
      9 #       copyright notice, this list of conditions and the following
     10 #       disclaimer in the documentation and/or other materials provided
     11 #       with the distribution.
     12 #     * Neither the name of Google Inc. nor the names of its
     13 #       contributors may be used to endorse or promote products derived
     14 #       from this software without specific prior written permission.
     15 #
     16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27 
     28 
     29 import os
     30 import socket
     31 import subprocess
     32 import threading
     33 import time
     34 
     35 from . import distro
     36 from ..local import execution
     37 from ..local import perfdata
     38 from ..objects import peer
     39 from ..objects import workpacket
     40 from ..server import compression
     41 from ..server import constants
     42 from ..server import local_handler
     43 from ..server import signatures
     44 
     45 
     46 def GetPeers():
     47   data = local_handler.LocalQuery([constants.REQUEST_PEERS])
     48   if not data: return []
     49   return [ peer.Peer.Unpack(p) for p in data ]
     50 
     51 
     52 class NetworkedRunner(execution.Runner):
     53   def __init__(self, suites, progress_indicator, context, peers, workspace):
     54     self.suites = suites
     55     datapath = os.path.join("out", "testrunner_data")
     56     # TODO(machenbach): These fields should exist now in the superclass.
     57     # But there is no super constructor call. Check if this is a problem.
     58     self.perf_data_manager = perfdata.PerfDataManager(datapath)
     59     self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
     60     for s in suites:
     61       for t in s.tests:
     62         t.duration = self.perfdata.FetchPerfData(t) or 1.0
     63     self._CommonInit(suites, progress_indicator, context)
     64     self.tests = []  # Only used if we need to fall back to local execution.
     65     self.tests_lock = threading.Lock()
     66     self.peers = peers
     67     self.pubkey_fingerprint = None  # Fetched later.
     68     self.base_rev = subprocess.check_output(
     69         "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
     70         shell=True).strip()
     71     self.base_svn_rev = subprocess.check_output(
     72         "cd %s; git log -1 %s"          # Get commit description.
     73         " | grep -e '^\s*git-svn-id:'"  # Extract "git-svn-id" line.
     74         " | awk '{print $2}'"           # Extract "repository@revision" part.
     75         " | sed -e 's/.*@//'" %         # Strip away "repository@".
     76         (workspace, self.base_rev), shell=True).strip()
     77     self.patch = subprocess.check_output(
     78         "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
     79     self.binaries = {}
     80     self.initialization_lock = threading.Lock()
     81     self.initialization_lock.acquire()  # Released when init is done.
     82     self._OpenLocalConnection()
     83     self.local_receiver_thread = threading.Thread(
     84         target=self._ListenLocalConnection)
     85     self.local_receiver_thread.daemon = True
     86     self.local_receiver_thread.start()
     87     self.initialization_lock.acquire()
     88     self.initialization_lock.release()
     89 
     90   def _OpenLocalConnection(self):
     91     self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     92     code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
     93     if code != 0:
     94       raise RuntimeError("Failed to connect to local server")
     95     compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
     96 
     97   def _ListenLocalConnection(self):
     98     release_lock_countdown = 1  # Pubkey.
     99     self.local_receiver = compression.Receiver(self.local_socket)
    100     while not self.local_receiver.IsDone():
    101       data = self.local_receiver.Current()
    102       if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
    103         pubkey = data[1]
    104         if not pubkey: raise RuntimeError("Received empty public key")
    105         self.pubkey_fingerprint = pubkey
    106         release_lock_countdown -= 1
    107       if release_lock_countdown == 0:
    108         self.initialization_lock.release()
    109         release_lock_countdown -= 1  # Prevent repeated triggering.
    110       self.local_receiver.Advance()
    111 
    112   def Run(self, jobs):
    113     self.indicator.Starting()
    114     need_libv8 = False
    115     for s in self.suites:
    116       shell = s.shell()
    117       if shell not in self.binaries:
    118         path = os.path.join(self.context.shell_dir, shell)
    119         # Check if this is a shared library build.
    120         try:
    121           ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
    122                                         shell=True)
    123           ldd = ldd.strip().split(" ")
    124           assert ldd[0] == "libv8.so"
    125           assert ldd[1] == "=>"
    126           need_libv8 = True
    127           binary_needs_libv8 = True
    128           libv8 = signatures.ReadFileAndSignature(ldd[2])
    129         except:
    130           binary_needs_libv8 = False
    131         binary = signatures.ReadFileAndSignature(path)
    132         if binary[0] is None:
    133           print("Error: Failed to create signature.")
    134           assert binary[1] != 0
    135           return binary[1]
    136         binary.append(binary_needs_libv8)
    137         self.binaries[shell] = binary
    138     if need_libv8:
    139       self.binaries["libv8.so"] = libv8
    140     distro.Assign(self.suites, self.peers)
    141     # Spawn one thread for each peer.
    142     threads = []
    143     for p in self.peers:
    144       thread = threading.Thread(target=self._TalkToPeer, args=[p])
    145       threads.append(thread)
    146       thread.start()
    147     try:
    148       for thread in threads:
    149         # Use a timeout so that signals (Ctrl+C) will be processed.
    150         thread.join(timeout=10000000)
    151       self._AnalyzePeerRuntimes()
    152     except KeyboardInterrupt:
    153       self.terminate = True
    154       raise
    155     except Exception, _e:
    156       # If there's an exception we schedule an interruption for any
    157       # remaining threads...
    158       self.terminate = True
    159       # ...and then reraise the exception to bail out.
    160       raise
    161     compression.Send(constants.END_OF_STREAM, self.local_socket)
    162     self.local_socket.close()
    163     if self.tests:
    164       self._RunInternal(jobs)
    165     self.indicator.Done()
    166     return not self.failed
    167 
    168   def _TalkToPeer(self, peer):
    169     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    170     sock.settimeout(self.context.timeout + 10)
    171     code = sock.connect_ex((peer.address, constants.PEER_PORT))
    172     if code == 0:
    173       try:
    174         peer.runtime = None
    175         start_time = time.time()
    176         packet = workpacket.WorkPacket(peer=peer, context=self.context,
    177                                        base_revision=self.base_svn_rev,
    178                                        patch=self.patch,
    179                                        pubkey=self.pubkey_fingerprint)
    180         data, test_map = packet.Pack(self.binaries)
    181         compression.Send(data, sock)
    182         compression.Send(constants.END_OF_STREAM, sock)
    183         rec = compression.Receiver(sock)
    184         while not rec.IsDone() and not self.terminate:
    185           data_list = rec.Current()
    186           for data in data_list:
    187             test_id = data[0]
    188             if test_id < 0:
    189               # The peer is reporting an error.
    190               with self.lock:
    191                 print("\nPeer %s reports error: %s" % (peer.address, data[1]))
    192               continue
    193             test = test_map.pop(test_id)
    194             test.MergeResult(data)
    195             try:
    196               self.perfdata.UpdatePerfData(test)
    197             except Exception, e:
    198               print("UpdatePerfData exception: %s" % e)
    199               pass  # Just keep working.
    200             with self.lock:
    201               perf_key = self.perfdata.GetKey(test)
    202               compression.Send(
    203                   [constants.INFORM_DURATION, perf_key, test.duration,
    204                    self.context.arch, self.context.mode],
    205                   self.local_socket)
    206               has_unexpected_output = test.suite.HasUnexpectedOutput(test)
    207               if has_unexpected_output:
    208                 self.failed.append(test)
    209                 if test.output.HasCrashed():
    210                   self.crashed += 1
    211               else:
    212                 self.succeeded += 1
    213               self.remaining -= 1
    214               self.indicator.HasRun(test, has_unexpected_output)
    215           rec.Advance()
    216         peer.runtime = time.time() - start_time
    217       except KeyboardInterrupt:
    218         sock.close()
    219         raise
    220       except Exception, e:
    221         print("Got exception: %s" % e)
    222         pass  # Fall back to local execution.
    223     else:
    224       compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
    225                        self.local_socket)
    226     sock.close()
    227     if len(test_map) > 0:
    228       # Some tests have not received any results. Run them locally.
    229       print("\nNo results for %d tests, running them locally." % len(test_map))
    230       self._EnqueueLocally(test_map)
    231 
    232   def _EnqueueLocally(self, test_map):
    233     with self.tests_lock:
    234       for test in test_map:
    235         self.tests.append(test_map[test])
    236 
    237   def _AnalyzePeerRuntimes(self):
    238     total_runtime = 0.0
    239     total_work = 0.0
    240     for p in self.peers:
    241       if p.runtime is None:
    242         return
    243       total_runtime += p.runtime
    244       total_work += p.assigned_work
    245     for p in self.peers:
    246       p.assigned_work /= total_work
    247       p.runtime /= total_runtime
    248       perf_correction = p.assigned_work / p.runtime
    249       old_perf = p.relative_performance
    250       p.relative_performance = (old_perf + perf_correction) / 2.0
    251       compression.Send([constants.UPDATE_PERF, p.address,
    252                         p.relative_performance],
    253                        self.local_socket)
    254