Home | History | Annotate | Download | only in crosperf
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 """The experiment setting module."""
      5 
      6 from __future__ import print_function
      7 
      8 import os
      9 import time
     10 
     11 import afe_lock_machine
     12 from threading import Lock
     13 
     14 from cros_utils import logger
     15 from cros_utils import misc
     16 
     17 import benchmark_run
     18 from machine_manager import BadChecksum
     19 from machine_manager import MachineManager
     20 from machine_manager import MockMachineManager
     21 import test_flag
     22 
     23 
     24 class Experiment(object):
     25   """Class representing an Experiment to be run."""
     26 
     27   def __init__(self, name, remote, working_directory, chromeos_root,
     28                cache_conditions, labels, benchmarks, experiment_file, email_to,
     29                acquire_timeout, log_dir, log_level, share_cache,
     30                results_directory, locks_directory):
     31     self.name = name
     32     self.working_directory = working_directory
     33     self.remote = remote
     34     self.chromeos_root = chromeos_root
     35     self.cache_conditions = cache_conditions
     36     self.experiment_file = experiment_file
     37     self.email_to = email_to
     38     if not results_directory:
     39       self.results_directory = os.path.join(self.working_directory,
     40                                             self.name + '_results')
     41     else:
     42       self.results_directory = misc.CanonicalizePath(results_directory)
     43     self.log_dir = log_dir
     44     self.log_level = log_level
     45     self.labels = labels
     46     self.benchmarks = benchmarks
     47     self.num_complete = 0
     48     self.num_run_complete = 0
     49     self.share_cache = share_cache
     50     self.active_threads = []
     51     # If locks_directory (self.lock_dir) not blank, we will use the file
     52     # locking mechanism; if it is blank then we will use the AFE server
     53     # locking mechanism.
     54     self.locks_dir = locks_directory
     55     self.locked_machines = []
     56 
     57     if not remote:
     58       raise RuntimeError('No remote hosts specified')
     59     if not self.benchmarks:
     60       raise RuntimeError('No benchmarks specified')
     61     if not self.labels:
     62       raise RuntimeError('No labels specified')
     63 
     64     # We need one chromeos_root to run the benchmarks in, but it doesn't
     65     # matter where it is, unless the ABIs are different.
     66     if not chromeos_root:
     67       for label in self.labels:
     68         if label.chromeos_root:
     69           chromeos_root = label.chromeos_root
     70           break
     71     if not chromeos_root:
     72       raise RuntimeError('No chromeos_root given and could not determine '
     73                          'one from the image path.')
     74 
     75     machine_manager_fn = MachineManager
     76     if test_flag.GetTestMode():
     77       machine_manager_fn = MockMachineManager
     78     self.machine_manager = machine_manager_fn(chromeos_root, acquire_timeout,
     79                                               log_level, locks_directory)
     80     self.l = logger.GetLogger(log_dir)
     81 
     82     for machine in self.remote:
     83       # machine_manager.AddMachine only adds reachable machines.
     84       self.machine_manager.AddMachine(machine)
     85     # Now machine_manager._all_machines contains a list of reachable
     86     # machines. This is a subset of self.remote. We make both lists the same.
     87     self.remote = [m.name for m in self.machine_manager.GetAllMachines()]
     88     if not self.remote:
     89       raise RuntimeError('No machine available for running experiment.')
     90 
     91     for label in labels:
     92       # We filter out label remotes that are not reachable (not in
     93       # self.remote). So each label.remote is a sublist of experiment.remote.
     94       label.remote = [r for r in label.remote if r in self.remote]
     95       try:
     96         self.machine_manager.ComputeCommonCheckSum(label)
     97       except BadChecksum:
     98         # Force same image on all machines, then we do checksum again. No
     99         # bailout if checksums still do not match.
    100         self.machine_manager.ForceSameImageToAllMachines(label)
    101         self.machine_manager.ComputeCommonCheckSum(label)
    102 
    103       self.machine_manager.ComputeCommonCheckSumString(label)
    104 
    105     self.start_time = None
    106     self.benchmark_runs = self._GenerateBenchmarkRuns()
    107 
    108     self._schedv2 = None
    109     self._internal_counter_lock = Lock()
    110 
    111   def set_schedv2(self, schedv2):
    112     self._schedv2 = schedv2
    113 
    114   def schedv2(self):
    115     return self._schedv2
    116 
    117   def _GenerateBenchmarkRuns(self):
    118     """Generate benchmark runs from labels and benchmark defintions."""
    119     benchmark_runs = []
    120     for label in self.labels:
    121       for benchmark in self.benchmarks:
    122         for iteration in xrange(1, benchmark.iterations + 1):
    123 
    124           benchmark_run_name = '%s: %s (%s)' % (label.name, benchmark.name,
    125                                                 iteration)
    126           full_name = '%s_%s_%s' % (label.name, benchmark.name, iteration)
    127           logger_to_use = logger.Logger(self.log_dir, 'run.%s' % (full_name),
    128                                         True)
    129           benchmark_runs.append(benchmark_run.BenchmarkRun(
    130               benchmark_run_name, benchmark, label, iteration,
    131               self.cache_conditions, self.machine_manager, logger_to_use,
    132               self.log_level, self.share_cache))
    133 
    134     return benchmark_runs
    135 
    136   def Build(self):
    137     pass
    138 
    139   def Terminate(self):
    140     if self._schedv2 is not None:
    141       self._schedv2.terminate()
    142     else:
    143       for t in self.benchmark_runs:
    144         if t.isAlive():
    145           self.l.LogError("Terminating run: '%s'." % t.name)
    146           t.Terminate()
    147 
    148   def IsComplete(self):
    149     if self._schedv2:
    150       return self._schedv2.is_complete()
    151     if self.active_threads:
    152       for t in self.active_threads:
    153         if t.isAlive():
    154           t.join(0)
    155         if not t.isAlive():
    156           self.num_complete += 1
    157           if not t.cache_hit:
    158             self.num_run_complete += 1
    159           self.active_threads.remove(t)
    160       return False
    161     return True
    162 
    163   def BenchmarkRunFinished(self, br):
    164     """Update internal counters after br finishes.
    165 
    166     Note this is only used by schedv2 and is called by multiple threads.
    167     Never throw any exception here.
    168     """
    169 
    170     assert self._schedv2 is not None
    171     with self._internal_counter_lock:
    172       self.num_complete += 1
    173       if not br.cache_hit:
    174         self.num_run_complete += 1
    175 
    176   def Run(self):
    177     self.start_time = time.time()
    178     if self._schedv2 is not None:
    179       self._schedv2.run_sched()
    180     else:
    181       self.active_threads = []
    182       for run in self.benchmark_runs:
    183         # Set threads to daemon so program exits when ctrl-c is pressed.
    184         run.daemon = True
    185         run.start()
    186         self.active_threads.append(run)
    187 
    188   def SetCacheConditions(self, cache_conditions):
    189     for run in self.benchmark_runs:
    190       run.SetCacheConditions(cache_conditions)
    191 
    192   def Cleanup(self):
    193     """Make sure all machines are unlocked."""
    194     if self.locks_dir:
    195       # We are using the file locks mechanism, so call machine_manager.Cleanup
    196       # to unlock everything.
    197       self.machine_manager.Cleanup()
    198     else:
    199       if test_flag.GetTestMode():
    200         return
    201 
    202       all_machines = self.locked_machines
    203       if not all_machines:
    204         return
    205 
    206       # If we locked any machines earlier, make sure we unlock them now.
    207       lock_mgr = afe_lock_machine.AFELockManager(
    208           all_machines, '', self.labels[0].chromeos_root, None)
    209       machine_states = lock_mgr.GetMachineStates('unlock')
    210       for k, state in machine_states.iteritems():
    211         if state['locked']:
    212           lock_mgr.UpdateLockInAFE(False, k)
    213