Home | History | Annotate | Download | only in crosperf
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 """The experiment setting module."""
      5 
      6 from __future__ import print_function
      7 
      8 import os
      9 import time
     10 
     11 import afe_lock_machine
     12 from threading import Lock
     13 
     14 from cros_utils import logger
     15 from cros_utils import misc
     16 
     17 import benchmark_run
     18 from machine_manager import BadChecksum
     19 from machine_manager import MachineManager
     20 from machine_manager import MockMachineManager
     21 import test_flag
     22 
     23 
     24 class Experiment(object):
     25   """Class representing an Experiment to be run."""
     26 
     27   def __init__(self, name, remote, working_directory, chromeos_root,
     28                cache_conditions, labels, benchmarks, experiment_file, email_to,
     29                acquire_timeout, log_dir, log_level, share_cache,
     30                results_directory, locks_directory):
     31     self.name = name
     32     self.working_directory = working_directory
     33     self.remote = remote
     34     self.chromeos_root = chromeos_root
     35     self.cache_conditions = cache_conditions
     36     self.experiment_file = experiment_file
     37     self.email_to = email_to
     38     if not results_directory:
     39       self.results_directory = os.path.join(self.working_directory,
     40                                             self.name + '_results')
     41     else:
     42       self.results_directory = misc.CanonicalizePath(results_directory)
     43     self.log_dir = log_dir
     44     self.log_level = log_level
     45     self.labels = labels
     46     self.benchmarks = benchmarks
     47     self.num_complete = 0
     48     self.num_run_complete = 0
     49     self.share_cache = share_cache
     50     self.active_threads = []
     51     # If locks_directory (self.lock_dir) not blank, we will use the file
     52     # locking mechanism; if it is blank then we will use the AFE server
     53     # locking mechanism.
     54     self.locks_dir = locks_directory
     55     self.locked_machines = []
     56 
     57     if not remote:
     58       raise RuntimeError('No remote hosts specified')
     59     if not self.benchmarks:
     60       raise RuntimeError('No benchmarks specified')
     61     if not self.labels:
     62       raise RuntimeError('No labels specified')
     63 
     64     # We need one chromeos_root to run the benchmarks in, but it doesn't
     65     # matter where it is, unless the ABIs are different.
     66     if not chromeos_root:
     67       for label in self.labels:
     68         if label.chromeos_root:
     69           chromeos_root = label.chromeos_root
     70           break
     71     if not chromeos_root:
     72       raise RuntimeError('No chromeos_root given and could not determine '
     73                          'one from the image path.')
     74 
     75     machine_manager_fn = MachineManager
     76     if test_flag.GetTestMode():
     77       machine_manager_fn = MockMachineManager
     78     self.machine_manager = machine_manager_fn(chromeos_root, acquire_timeout,
     79                                               log_level, locks_directory)
     80     self.l = logger.GetLogger(log_dir)
     81 
     82     for machine in self.remote:
     83       # machine_manager.AddMachine only adds reachable machines.
     84       self.machine_manager.AddMachine(machine)
     85     # Now machine_manager._all_machines contains a list of reachable
     86     # machines. This is a subset of self.remote. We make both lists the same.
     87     self.remote = [m.name for m in self.machine_manager.GetAllMachines()]
     88     if not self.remote:
     89       raise RuntimeError('No machine available for running experiment.')
     90 
     91     for label in labels:
     92       # We filter out label remotes that are not reachable (not in
     93       # self.remote). So each label.remote is a sublist of experiment.remote.
     94       label.remote = [r for r in label.remote if r in self.remote]
     95       try:
     96         self.machine_manager.ComputeCommonCheckSum(label)
     97       except BadChecksum:
     98         # Force same image on all machines, then we do checksum again. No
     99         # bailout if checksums still do not match.
    100         self.machine_manager.ForceSameImageToAllMachines(label)
    101         self.machine_manager.ComputeCommonCheckSum(label)
    102 
    103       self.machine_manager.ComputeCommonCheckSumString(label)
    104 
    105     self.start_time = None
    106     self.benchmark_runs = self._GenerateBenchmarkRuns()
    107 
    108     self._schedv2 = None
    109     self._internal_counter_lock = Lock()
    110 
    111   def set_schedv2(self, schedv2):
    112     self._schedv2 = schedv2
    113 
    114   def schedv2(self):
    115     return self._schedv2
    116 
    117   def _GenerateBenchmarkRuns(self):
    118     """Generate benchmark runs from labels and benchmark defintions."""
    119     benchmark_runs = []
    120     for label in self.labels:
    121       for benchmark in self.benchmarks:
    122         for iteration in xrange(1, benchmark.iterations + 1):
    123 
    124           benchmark_run_name = '%s: %s (%s)' % (label.name, benchmark.name,
    125                                                 iteration)
    126           full_name = '%s_%s_%s' % (label.name, benchmark.name, iteration)
    127           logger_to_use = logger.Logger(self.log_dir, 'run.%s' % (full_name),
    128                                         True)
    129           benchmark_runs.append(
    130               benchmark_run.BenchmarkRun(benchmark_run_name, benchmark, label,
    131                                          iteration, self.cache_conditions,
    132                                          self.machine_manager, logger_to_use,
    133                                          self.log_level, self.share_cache))
    134 
    135     return benchmark_runs
    136 
    137   def Build(self):
    138     pass
    139 
    140   def Terminate(self):
    141     if self._schedv2 is not None:
    142       self._schedv2.terminate()
    143     else:
    144       for t in self.benchmark_runs:
    145         if t.isAlive():
    146           self.l.LogError("Terminating run: '%s'." % t.name)
    147           t.Terminate()
    148 
    149   def IsComplete(self):
    150     if self._schedv2:
    151       return self._schedv2.is_complete()
    152     if self.active_threads:
    153       for t in self.active_threads:
    154         if t.isAlive():
    155           t.join(0)
    156         if not t.isAlive():
    157           self.num_complete += 1
    158           if not t.cache_hit:
    159             self.num_run_complete += 1
    160           self.active_threads.remove(t)
    161       return False
    162     return True
    163 
    164   def BenchmarkRunFinished(self, br):
    165     """Update internal counters after br finishes.
    166 
    167     Note this is only used by schedv2 and is called by multiple threads.
    168     Never throw any exception here.
    169     """
    170 
    171     assert self._schedv2 is not None
    172     with self._internal_counter_lock:
    173       self.num_complete += 1
    174       if not br.cache_hit:
    175         self.num_run_complete += 1
    176 
    177   def Run(self):
    178     self.start_time = time.time()
    179     if self._schedv2 is not None:
    180       self._schedv2.run_sched()
    181     else:
    182       self.active_threads = []
    183       for run in self.benchmark_runs:
    184         # Set threads to daemon so program exits when ctrl-c is pressed.
    185         run.daemon = True
    186         run.start()
    187         self.active_threads.append(run)
    188 
    189   def SetCacheConditions(self, cache_conditions):
    190     for run in self.benchmark_runs:
    191       run.SetCacheConditions(cache_conditions)
    192 
    193   def Cleanup(self):
    194     """Make sure all machines are unlocked."""
    195     if self.locks_dir:
    196       # We are using the file locks mechanism, so call machine_manager.Cleanup
    197       # to unlock everything.
    198       self.machine_manager.Cleanup()
    199     else:
    200       if test_flag.GetTestMode():
    201         return
    202 
    203       all_machines = self.locked_machines
    204       if not all_machines:
    205         return
    206 
    207       # If we locked any machines earlier, make sure we unlock them now.
    208       lock_mgr = afe_lock_machine.AFELockManager(
    209           all_machines, '', self.labels[0].chromeos_root, None)
    210       machine_states = lock_mgr.GetMachineStates('unlock')
    211       for k, state in machine_states.iteritems():
    212         if state['locked']:
    213           lock_mgr.UpdateLockInAFE(False, k)
    214