1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 """The experiment setting module.""" 5 6 from __future__ import print_function 7 8 import os 9 import time 10 11 import afe_lock_machine 12 from threading import Lock 13 14 from cros_utils import logger 15 from cros_utils import misc 16 17 import benchmark_run 18 from machine_manager import BadChecksum 19 from machine_manager import MachineManager 20 from machine_manager import MockMachineManager 21 import test_flag 22 23 24 class Experiment(object): 25 """Class representing an Experiment to be run.""" 26 27 def __init__(self, name, remote, working_directory, chromeos_root, 28 cache_conditions, labels, benchmarks, experiment_file, email_to, 29 acquire_timeout, log_dir, log_level, share_cache, 30 results_directory, locks_directory): 31 self.name = name 32 self.working_directory = working_directory 33 self.remote = remote 34 self.chromeos_root = chromeos_root 35 self.cache_conditions = cache_conditions 36 self.experiment_file = experiment_file 37 self.email_to = email_to 38 if not results_directory: 39 self.results_directory = os.path.join(self.working_directory, 40 self.name + '_results') 41 else: 42 self.results_directory = misc.CanonicalizePath(results_directory) 43 self.log_dir = log_dir 44 self.log_level = log_level 45 self.labels = labels 46 self.benchmarks = benchmarks 47 self.num_complete = 0 48 self.num_run_complete = 0 49 self.share_cache = share_cache 50 self.active_threads = [] 51 # If locks_directory (self.lock_dir) not blank, we will use the file 52 # locking mechanism; if it is blank then we will use the AFE server 53 # locking mechanism. 54 self.locks_dir = locks_directory 55 self.locked_machines = [] 56 57 if not remote: 58 raise RuntimeError('No remote hosts specified') 59 if not self.benchmarks: 60 raise RuntimeError('No benchmarks specified') 61 if not self.labels: 62 raise RuntimeError('No labels specified') 63 64 # We need one chromeos_root to run the benchmarks in, but it doesn't 65 # matter where it is, unless the ABIs are different. 66 if not chromeos_root: 67 for label in self.labels: 68 if label.chromeos_root: 69 chromeos_root = label.chromeos_root 70 break 71 if not chromeos_root: 72 raise RuntimeError('No chromeos_root given and could not determine ' 73 'one from the image path.') 74 75 machine_manager_fn = MachineManager 76 if test_flag.GetTestMode(): 77 machine_manager_fn = MockMachineManager 78 self.machine_manager = machine_manager_fn(chromeos_root, acquire_timeout, 79 log_level, locks_directory) 80 self.l = logger.GetLogger(log_dir) 81 82 for machine in self.remote: 83 # machine_manager.AddMachine only adds reachable machines. 84 self.machine_manager.AddMachine(machine) 85 # Now machine_manager._all_machines contains a list of reachable 86 # machines. This is a subset of self.remote. We make both lists the same. 87 self.remote = [m.name for m in self.machine_manager.GetAllMachines()] 88 if not self.remote: 89 raise RuntimeError('No machine available for running experiment.') 90 91 for label in labels: 92 # We filter out label remotes that are not reachable (not in 93 # self.remote). So each label.remote is a sublist of experiment.remote. 94 label.remote = [r for r in label.remote if r in self.remote] 95 try: 96 self.machine_manager.ComputeCommonCheckSum(label) 97 except BadChecksum: 98 # Force same image on all machines, then we do checksum again. No 99 # bailout if checksums still do not match. 100 self.machine_manager.ForceSameImageToAllMachines(label) 101 self.machine_manager.ComputeCommonCheckSum(label) 102 103 self.machine_manager.ComputeCommonCheckSumString(label) 104 105 self.start_time = None 106 self.benchmark_runs = self._GenerateBenchmarkRuns() 107 108 self._schedv2 = None 109 self._internal_counter_lock = Lock() 110 111 def set_schedv2(self, schedv2): 112 self._schedv2 = schedv2 113 114 def schedv2(self): 115 return self._schedv2 116 117 def _GenerateBenchmarkRuns(self): 118 """Generate benchmark runs from labels and benchmark defintions.""" 119 benchmark_runs = [] 120 for label in self.labels: 121 for benchmark in self.benchmarks: 122 for iteration in xrange(1, benchmark.iterations + 1): 123 124 benchmark_run_name = '%s: %s (%s)' % (label.name, benchmark.name, 125 iteration) 126 full_name = '%s_%s_%s' % (label.name, benchmark.name, iteration) 127 logger_to_use = logger.Logger(self.log_dir, 'run.%s' % (full_name), 128 True) 129 benchmark_runs.append(benchmark_run.BenchmarkRun( 130 benchmark_run_name, benchmark, label, iteration, 131 self.cache_conditions, self.machine_manager, logger_to_use, 132 self.log_level, self.share_cache)) 133 134 return benchmark_runs 135 136 def Build(self): 137 pass 138 139 def Terminate(self): 140 if self._schedv2 is not None: 141 self._schedv2.terminate() 142 else: 143 for t in self.benchmark_runs: 144 if t.isAlive(): 145 self.l.LogError("Terminating run: '%s'." % t.name) 146 t.Terminate() 147 148 def IsComplete(self): 149 if self._schedv2: 150 return self._schedv2.is_complete() 151 if self.active_threads: 152 for t in self.active_threads: 153 if t.isAlive(): 154 t.join(0) 155 if not t.isAlive(): 156 self.num_complete += 1 157 if not t.cache_hit: 158 self.num_run_complete += 1 159 self.active_threads.remove(t) 160 return False 161 return True 162 163 def BenchmarkRunFinished(self, br): 164 """Update internal counters after br finishes. 165 166 Note this is only used by schedv2 and is called by multiple threads. 167 Never throw any exception here. 168 """ 169 170 assert self._schedv2 is not None 171 with self._internal_counter_lock: 172 self.num_complete += 1 173 if not br.cache_hit: 174 self.num_run_complete += 1 175 176 def Run(self): 177 self.start_time = time.time() 178 if self._schedv2 is not None: 179 self._schedv2.run_sched() 180 else: 181 self.active_threads = [] 182 for run in self.benchmark_runs: 183 # Set threads to daemon so program exits when ctrl-c is pressed. 184 run.daemon = True 185 run.start() 186 self.active_threads.append(run) 187 188 def SetCacheConditions(self, cache_conditions): 189 for run in self.benchmark_runs: 190 run.SetCacheConditions(cache_conditions) 191 192 def Cleanup(self): 193 """Make sure all machines are unlocked.""" 194 if self.locks_dir: 195 # We are using the file locks mechanism, so call machine_manager.Cleanup 196 # to unlock everything. 197 self.machine_manager.Cleanup() 198 else: 199 if test_flag.GetTestMode(): 200 return 201 202 all_machines = self.locked_machines 203 if not all_machines: 204 return 205 206 # If we locked any machines earlier, make sure we unlock them now. 207 lock_mgr = afe_lock_machine.AFELockManager( 208 all_machines, '', self.labels[0].chromeos_root, None) 209 machine_states = lock_mgr.GetMachineStates('unlock') 210 for k, state in machine_states.iteritems(): 211 if state['locked']: 212 lock_mgr.UpdateLockInAFE(False, k) 213