1 # Copyright 2012 the V8 project authors. All rights reserved. 2 # Redistribution and use in source and binary forms, with or without 3 # modification, are permitted provided that the following conditions are 4 # met: 5 # 6 # * Redistributions of source code must retain the above copyright 7 # notice, this list of conditions and the following disclaimer. 8 # * Redistributions in binary form must reproduce the above 9 # copyright notice, this list of conditions and the following 10 # disclaimer in the documentation and/or other materials provided 11 # with the distribution. 12 # * Neither the name of Google Inc. nor the names of its 13 # contributors may be used to endorse or promote products derived 14 # from this software without specific prior written permission. 15 # 16 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 28 29 import multiprocessing 30 import os 31 import shutil 32 import subprocess 33 import threading 34 import time 35 36 from . import daemon 37 from . import local_handler 38 from . import presence_handler 39 from . import signatures 40 from . import status_handler 41 from . import work_handler 42 from ..network import perfdata 43 44 45 class Server(daemon.Daemon): 46 47 def __init__(self, pidfile, root, stdin="/dev/null", 48 stdout="/dev/null", stderr="/dev/null"): 49 super(Server, self).__init__(pidfile, stdin, stdout, stderr) 50 self.root = root 51 self.local_handler = None 52 self.local_handler_thread = None 53 self.work_handler = None 54 self.work_handler_thread = None 55 self.status_handler = None 56 self.status_handler_thread = None 57 self.presence_daemon = None 58 self.presence_daemon_thread = None 59 self.peers = [] 60 self.jobs = multiprocessing.cpu_count() 61 self.peer_list_lock = threading.Lock() 62 self.perf_data_lock = None 63 self.presence_daemon_lock = None 64 self.datadir = os.path.join(self.root, "data") 65 pubkey_fingerprint_filename = os.path.join(self.datadir, "mypubkey") 66 with open(pubkey_fingerprint_filename) as f: 67 self.pubkey_fingerprint = f.read().strip() 68 self.relative_perf_filename = os.path.join(self.datadir, "myperf") 69 if os.path.exists(self.relative_perf_filename): 70 with open(self.relative_perf_filename) as f: 71 try: 72 self.relative_perf = float(f.read()) 73 except: 74 self.relative_perf = 1.0 75 else: 76 self.relative_perf = 1.0 77 78 def run(self): 79 os.nice(20) 80 self.ip = presence_handler.GetOwnIP() 81 self.perf_data_manager = perfdata.PerfDataManager(self.datadir) 82 self.perf_data_lock = threading.Lock() 83 84 self.local_handler = local_handler.LocalSocketServer(self) 85 self.local_handler_thread = threading.Thread( 86 target=self.local_handler.serve_forever) 87 self.local_handler_thread.start() 88 89 self.work_handler = work_handler.WorkSocketServer(self) 90 self.work_handler_thread = threading.Thread( 91 target=self.work_handler.serve_forever) 92 self.work_handler_thread.start() 93 94 self.status_handler = status_handler.StatusSocketServer(self) 95 self.status_handler_thread = threading.Thread( 96 target=self.status_handler.serve_forever) 97 self.status_handler_thread.start() 98 99 self.presence_daemon = presence_handler.PresenceDaemon(self) 100 self.presence_daemon_thread = threading.Thread( 101 target=self.presence_daemon.serve_forever) 102 self.presence_daemon_thread.start() 103 104 self.presence_daemon.FindPeers() 105 time.sleep(0.5) # Give those peers some time to reply. 106 107 with self.peer_list_lock: 108 for p in self.peers: 109 if p.address == self.ip: continue 110 status_handler.RequestTrustedPubkeys(p, self) 111 112 while True: 113 try: 114 self.PeriodicTasks() 115 time.sleep(60) 116 except Exception, e: 117 print("MAIN LOOP EXCEPTION: %s" % e) 118 self.Shutdown() 119 break 120 except KeyboardInterrupt: 121 self.Shutdown() 122 break 123 124 def Shutdown(self): 125 with open(self.relative_perf_filename, "w") as f: 126 f.write("%s" % self.relative_perf) 127 self.presence_daemon.shutdown() 128 self.presence_daemon.server_close() 129 self.local_handler.shutdown() 130 self.local_handler.server_close() 131 self.work_handler.shutdown() 132 self.work_handler.server_close() 133 self.status_handler.shutdown() 134 self.status_handler.server_close() 135 136 def PeriodicTasks(self): 137 # If we know peers we don't trust, see if someone else trusts them. 138 with self.peer_list_lock: 139 for p in self.peers: 140 if p.trusted: continue 141 if self.IsTrusted(p.pubkey): 142 p.trusted = True 143 status_handler.ITrustYouNow(p) 144 continue 145 for p2 in self.peers: 146 if not p2.trusted: continue 147 status_handler.TryTransitiveTrust(p2, p.pubkey, self) 148 # TODO: Ping for more peers waiting to be discovered. 149 # TODO: Update the checkout (if currently idle). 150 151 def AddPeer(self, peer): 152 with self.peer_list_lock: 153 for p in self.peers: 154 if p.address == peer.address: 155 return 156 self.peers.append(peer) 157 if peer.trusted: 158 status_handler.ITrustYouNow(peer) 159 160 def DeletePeer(self, peer_address): 161 with self.peer_list_lock: 162 for i in xrange(len(self.peers)): 163 if self.peers[i].address == peer_address: 164 del self.peers[i] 165 return 166 167 def MarkPeerAsTrusting(self, peer_address): 168 with self.peer_list_lock: 169 for p in self.peers: 170 if p.address == peer_address: 171 p.trusting_me = True 172 break 173 174 def UpdatePeerPerformance(self, peer_address, performance): 175 with self.peer_list_lock: 176 for p in self.peers: 177 if p.address == peer_address: 178 p.relative_performance = performance 179 180 def CopyToTrusted(self, pubkey_filename): 181 with open(pubkey_filename, "r") as f: 182 lines = f.readlines() 183 fingerprint = lines[-1].strip() 184 target_filename = self._PubkeyFilename(fingerprint) 185 shutil.copy(pubkey_filename, target_filename) 186 with self.peer_list_lock: 187 for peer in self.peers: 188 if peer.address == self.ip: continue 189 if peer.pubkey == fingerprint: 190 status_handler.ITrustYouNow(peer) 191 else: 192 result = self.SignTrusted(fingerprint) 193 status_handler.NotifyNewTrusted(peer, result) 194 return fingerprint 195 196 def _PubkeyFilename(self, pubkey_fingerprint): 197 return os.path.join(self.root, "trusted", "%s.pem" % pubkey_fingerprint) 198 199 def IsTrusted(self, pubkey_fingerprint): 200 return os.path.exists(self._PubkeyFilename(pubkey_fingerprint)) 201 202 def ListTrusted(self): 203 path = os.path.join(self.root, "trusted") 204 if not os.path.exists(path): return [] 205 return [ f[:-4] for f in os.listdir(path) if f.endswith(".pem") ] 206 207 def SignTrusted(self, pubkey_fingerprint): 208 if not self.IsTrusted(pubkey_fingerprint): 209 return [] 210 filename = self._PubkeyFilename(pubkey_fingerprint) 211 result = signatures.ReadFileAndSignature(filename) # Format: [key, sig]. 212 return [pubkey_fingerprint, result[0], result[1], self.pubkey_fingerprint] 213 214 def AcceptNewTrusted(self, data): 215 # The format of |data| matches the return value of |SignTrusted()|. 216 if not data: return 217 fingerprint = data[0] 218 pubkey = data[1] 219 signature = data[2] 220 signer = data[3] 221 if not self.IsTrusted(signer): 222 return 223 if self.IsTrusted(fingerprint): 224 return # Already trust this guy. 225 filename = self._PubkeyFilename(fingerprint) 226 signer_pubkeyfile = self._PubkeyFilename(signer) 227 if not signatures.VerifySignature(filename, pubkey, signature, 228 signer_pubkeyfile): 229 return 230 return # Nothing more to do. 231 232 def AddPerfData(self, test_key, duration, arch, mode): 233 data_store = self.perf_data_manager.GetStore(arch, mode) 234 data_store.RawUpdatePerfData(str(test_key), duration) 235 236 def CompareOwnPerf(self, test, arch, mode): 237 data_store = self.perf_data_manager.GetStore(arch, mode) 238 observed = data_store.FetchPerfData(test) 239 if not observed: return 240 own_perf_estimate = observed / test.duration 241 with self.perf_data_lock: 242 kLearnRateLimiter = 9999 243 self.relative_perf *= kLearnRateLimiter 244 self.relative_perf += own_perf_estimate 245 self.relative_perf /= (kLearnRateLimiter + 1) 246