Home | History | Annotate | Download | only in server
      1 # Copyright 2012 the V8 project authors. All rights reserved.
      2 # Redistribution and use in source and binary forms, with or without
      3 # modification, are permitted provided that the following conditions are
      4 # met:
      5 #
      6 #     * Redistributions of source code must retain the above copyright
      7 #       notice, this list of conditions and the following disclaimer.
      8 #     * Redistributions in binary form must reproduce the above
      9 #       copyright notice, this list of conditions and the following
     10 #       disclaimer in the documentation and/or other materials provided
     11 #       with the distribution.
     12 #     * Neither the name of Google Inc. nor the names of its
     13 #       contributors may be used to endorse or promote products derived
     14 #       from this software without specific prior written permission.
     15 #
     16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27 
     28 
     29 import multiprocessing
     30 import os
     31 import shutil
     32 import subprocess
     33 import threading
     34 import time
     35 
     36 from . import daemon
     37 from . import local_handler
     38 from . import presence_handler
     39 from . import signatures
     40 from . import status_handler
     41 from . import work_handler
     42 from ..network import perfdata
     43 
     44 
     45 class Server(daemon.Daemon):
     46 
     47   def __init__(self, pidfile, root, stdin="/dev/null",
     48                stdout="/dev/null", stderr="/dev/null"):
     49     super(Server, self).__init__(pidfile, stdin, stdout, stderr)
     50     self.root = root
     51     self.local_handler = None
     52     self.local_handler_thread = None
     53     self.work_handler = None
     54     self.work_handler_thread = None
     55     self.status_handler = None
     56     self.status_handler_thread = None
     57     self.presence_daemon = None
     58     self.presence_daemon_thread = None
     59     self.peers = []
     60     self.jobs = multiprocessing.cpu_count()
     61     self.peer_list_lock = threading.Lock()
     62     self.perf_data_lock = None
     63     self.presence_daemon_lock = None
     64     self.datadir = os.path.join(self.root, "data")
     65     pubkey_fingerprint_filename = os.path.join(self.datadir, "mypubkey")
     66     with open(pubkey_fingerprint_filename) as f:
     67       self.pubkey_fingerprint = f.read().strip()
     68     self.relative_perf_filename = os.path.join(self.datadir, "myperf")
     69     if os.path.exists(self.relative_perf_filename):
     70       with open(self.relative_perf_filename) as f:
     71         try:
     72           self.relative_perf = float(f.read())
     73         except:
     74           self.relative_perf = 1.0
     75     else:
     76       self.relative_perf = 1.0
     77 
     78   def run(self):
     79     os.nice(20)
     80     self.ip = presence_handler.GetOwnIP()
     81     self.perf_data_manager = perfdata.PerfDataManager(self.datadir)
     82     self.perf_data_lock = threading.Lock()
     83 
     84     self.local_handler = local_handler.LocalSocketServer(self)
     85     self.local_handler_thread = threading.Thread(
     86         target=self.local_handler.serve_forever)
     87     self.local_handler_thread.start()
     88 
     89     self.work_handler = work_handler.WorkSocketServer(self)
     90     self.work_handler_thread = threading.Thread(
     91         target=self.work_handler.serve_forever)
     92     self.work_handler_thread.start()
     93 
     94     self.status_handler = status_handler.StatusSocketServer(self)
     95     self.status_handler_thread = threading.Thread(
     96         target=self.status_handler.serve_forever)
     97     self.status_handler_thread.start()
     98 
     99     self.presence_daemon = presence_handler.PresenceDaemon(self)
    100     self.presence_daemon_thread = threading.Thread(
    101         target=self.presence_daemon.serve_forever)
    102     self.presence_daemon_thread.start()
    103 
    104     self.presence_daemon.FindPeers()
    105     time.sleep(0.5)  # Give those peers some time to reply.
    106 
    107     with self.peer_list_lock:
    108       for p in self.peers:
    109         if p.address == self.ip: continue
    110         status_handler.RequestTrustedPubkeys(p, self)
    111 
    112     while True:
    113       try:
    114         self.PeriodicTasks()
    115         time.sleep(60)
    116       except Exception, e:
    117         print("MAIN LOOP EXCEPTION: %s" % e)
    118         self.Shutdown()
    119         break
    120       except KeyboardInterrupt:
    121         self.Shutdown()
    122         break
    123 
    124   def Shutdown(self):
    125     with open(self.relative_perf_filename, "w") as f:
    126       f.write("%s" % self.relative_perf)
    127     self.presence_daemon.shutdown()
    128     self.presence_daemon.server_close()
    129     self.local_handler.shutdown()
    130     self.local_handler.server_close()
    131     self.work_handler.shutdown()
    132     self.work_handler.server_close()
    133     self.status_handler.shutdown()
    134     self.status_handler.server_close()
    135 
    136   def PeriodicTasks(self):
    137     # If we know peers we don't trust, see if someone else trusts them.
    138     with self.peer_list_lock:
    139       for p in self.peers:
    140         if p.trusted: continue
    141         if self.IsTrusted(p.pubkey):
    142           p.trusted = True
    143           status_handler.ITrustYouNow(p)
    144           continue
    145         for p2 in self.peers:
    146           if not p2.trusted: continue
    147           status_handler.TryTransitiveTrust(p2, p.pubkey, self)
    148     # TODO: Ping for more peers waiting to be discovered.
    149     # TODO: Update the checkout (if currently idle).
    150 
    151   def AddPeer(self, peer):
    152     with self.peer_list_lock:
    153       for p in self.peers:
    154         if p.address == peer.address:
    155           return
    156       self.peers.append(peer)
    157     if peer.trusted:
    158       status_handler.ITrustYouNow(peer)
    159 
    160   def DeletePeer(self, peer_address):
    161     with self.peer_list_lock:
    162       for i in xrange(len(self.peers)):
    163         if self.peers[i].address == peer_address:
    164           del self.peers[i]
    165           return
    166 
    167   def MarkPeerAsTrusting(self, peer_address):
    168     with self.peer_list_lock:
    169       for p in self.peers:
    170         if p.address == peer_address:
    171           p.trusting_me = True
    172           break
    173 
    174   def UpdatePeerPerformance(self, peer_address, performance):
    175     with self.peer_list_lock:
    176       for p in self.peers:
    177         if p.address == peer_address:
    178           p.relative_performance = performance
    179 
    180   def CopyToTrusted(self, pubkey_filename):
    181     with open(pubkey_filename, "r") as f:
    182       lines = f.readlines()
    183       fingerprint = lines[-1].strip()
    184     target_filename = self._PubkeyFilename(fingerprint)
    185     shutil.copy(pubkey_filename, target_filename)
    186     with self.peer_list_lock:
    187       for peer in self.peers:
    188         if peer.address == self.ip: continue
    189         if peer.pubkey == fingerprint:
    190           status_handler.ITrustYouNow(peer)
    191         else:
    192           result = self.SignTrusted(fingerprint)
    193           status_handler.NotifyNewTrusted(peer, result)
    194     return fingerprint
    195 
    196   def _PubkeyFilename(self, pubkey_fingerprint):
    197     return os.path.join(self.root, "trusted", "%s.pem" % pubkey_fingerprint)
    198 
    199   def IsTrusted(self, pubkey_fingerprint):
    200     return os.path.exists(self._PubkeyFilename(pubkey_fingerprint))
    201 
    202   def ListTrusted(self):
    203     path = os.path.join(self.root, "trusted")
    204     if not os.path.exists(path): return []
    205     return [ f[:-4] for f in os.listdir(path) if f.endswith(".pem") ]
    206 
    207   def SignTrusted(self, pubkey_fingerprint):
    208     if not self.IsTrusted(pubkey_fingerprint):
    209       return []
    210     filename = self._PubkeyFilename(pubkey_fingerprint)
    211     result = signatures.ReadFileAndSignature(filename)  # Format: [key, sig].
    212     return [pubkey_fingerprint, result[0], result[1], self.pubkey_fingerprint]
    213 
    214   def AcceptNewTrusted(self, data):
    215     # The format of |data| matches the return value of |SignTrusted()|.
    216     if not data: return
    217     fingerprint = data[0]
    218     pubkey = data[1]
    219     signature = data[2]
    220     signer = data[3]
    221     if not self.IsTrusted(signer):
    222       return
    223     if self.IsTrusted(fingerprint):
    224       return  # Already trust this guy.
    225     filename = self._PubkeyFilename(fingerprint)
    226     signer_pubkeyfile = self._PubkeyFilename(signer)
    227     if not signatures.VerifySignature(filename, pubkey, signature,
    228                                       signer_pubkeyfile):
    229       return
    230     return  # Nothing more to do.
    231 
    232   def AddPerfData(self, test_key, duration, arch, mode):
    233     data_store = self.perf_data_manager.GetStore(arch, mode)
    234     data_store.RawUpdatePerfData(str(test_key), duration)
    235 
    236   def CompareOwnPerf(self, test, arch, mode):
    237     data_store = self.perf_data_manager.GetStore(arch, mode)
    238     observed = data_store.FetchPerfData(test)
    239     if not observed: return
    240     own_perf_estimate = observed / test.duration
    241     with self.perf_data_lock:
    242       kLearnRateLimiter = 9999
    243       self.relative_perf *= kLearnRateLimiter
    244       self.relative_perf += own_perf_estimate
    245       self.relative_perf /= (kLearnRateLimiter + 1)
    246