Home | History | Annotate | Download | only in bin
      1 """The main job wrapper
      2 
      3 This is the core infrastructure.
      4 
      5 Copyright Andy Whitcroft, Martin J. Bligh 2006
      6 """
      7 
      8 # pylint: disable=missing-docstring
      9 
     10 import copy
     11 from datetime import datetime
     12 import getpass
     13 import glob
     14 import logging
     15 import os
     16 import re
     17 import shutil
     18 import sys
     19 import time
     20 import traceback
     21 import types
     22 import weakref
     23 
     24 import common
     25 from autotest_lib.client.bin import client_logging_config
     26 from autotest_lib.client.bin import harness
     27 from autotest_lib.client.bin import local_host
     28 from autotest_lib.client.bin import parallel
     29 from autotest_lib.client.bin import partition as partition_lib
     30 from autotest_lib.client.bin import profilers
     31 from autotest_lib.client.bin import sysinfo
     32 from autotest_lib.client.bin import test
     33 from autotest_lib.client.bin import utils
     34 from autotest_lib.client.common_lib import barrier
     35 from autotest_lib.client.common_lib import base_job
     36 from autotest_lib.client.common_lib import packages
     37 from autotest_lib.client.common_lib import error
     38 from autotest_lib.client.common_lib import global_config
     39 from autotest_lib.client.common_lib import logging_manager
     40 from autotest_lib.client.common_lib import packages
     41 from autotest_lib.client.cros import cros_logging
     42 from autotest_lib.client.tools import html_report
     43 
     44 GLOBAL_CONFIG = global_config.global_config
     45 
     46 LAST_BOOT_TAG = object()
     47 JOB_PREAMBLE = """
     48 from autotest_lib.client.common_lib.error import *
     49 from autotest_lib.client.bin.utils import *
     50 """
     51 
     52 
     53 class StepError(error.AutotestError):
     54     pass
     55 
     56 class NotAvailableError(error.AutotestError):
     57     pass
     58 
     59 
     60 
     61 def _run_test_complete_on_exit(f):
     62     """Decorator for job methods that automatically calls
     63     self.harness.run_test_complete when the method exits, if appropriate."""
     64     def wrapped(self, *args, **dargs):
     65         try:
     66             return f(self, *args, **dargs)
     67         finally:
     68             if self._logger.global_filename == 'status':
     69                 self.harness.run_test_complete()
     70                 if self.drop_caches:
     71                     utils.drop_caches()
     72     wrapped.__name__ = f.__name__
     73     wrapped.__doc__ = f.__doc__
     74     wrapped.__dict__.update(f.__dict__)
     75     return wrapped
     76 
     77 
     78 class status_indenter(base_job.status_indenter):
     79     """Provide a status indenter that is backed by job._record_prefix."""
     80     def __init__(self, job_):
     81         self._job = weakref.proxy(job_)  # avoid a circular reference
     82 
     83 
     84     @property
     85     def indent(self):
     86         return self._job._record_indent
     87 
     88 
     89     def increment(self):
     90         self._job._record_indent += 1
     91 
     92 
     93     def decrement(self):
     94         self._job._record_indent -= 1
     95 
     96 
     97 class base_client_job(base_job.base_job):
     98     """The client-side concrete implementation of base_job.
     99 
    100     Optional properties provided by this implementation:
    101         control
    102         harness
    103     """
    104 
    105     _WARNING_DISABLE_DELAY = 5
    106 
    107     # _record_indent is a persistent property, but only on the client
    108     _job_state = base_job.base_job._job_state
    109     _record_indent = _job_state.property_factory(
    110         '_state', '_record_indent', 0, namespace='client')
    111     _max_disk_usage_rate = _job_state.property_factory(
    112         '_state', '_max_disk_usage_rate', 0.0, namespace='client')
    113 
    114 
    115     def __init__(self, control, options, drop_caches=True):
    116         """
    117         Prepare a client side job object.
    118 
    119         @param control: The control file (pathname of).
    120         @param options: an object which includes:
    121                 jobtag: The job tag string (eg "default").
    122                 cont: If this is the continuation of this job.
    123                 harness_type: An alternative server harness.  [None]
    124                 use_external_logging: If true, the enable_external_logging
    125                           method will be called during construction.  [False]
    126         @param drop_caches: If true, utils.drop_caches() is called before and
    127                 between all tests.  [True]
    128         """
    129         super(base_client_job, self).__init__(options=options)
    130         self._pre_record_init(control, options)
    131         try:
    132             self._post_record_init(control, options, drop_caches)
    133         except Exception, err:
    134             self.record(
    135                     'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
    136                     str(err))
    137             raise
    138 
    139 
    140     @classmethod
    141     def _get_environ_autodir(cls):
    142         return os.environ['AUTODIR']
    143 
    144 
    145     @classmethod
    146     def _find_base_directories(cls):
    147         """
    148         Determine locations of autodir and clientdir (which are the same)
    149         using os.environ. Serverdir does not exist in this context.
    150         """
    151         autodir = clientdir = cls._get_environ_autodir()
    152         return autodir, clientdir, None
    153 
    154 
    155     @classmethod
    156     def _parse_args(cls, args):
    157         return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
    158 
    159 
    160     def _find_resultdir(self, options):
    161         """
    162         Determine the directory for storing results. On a client this is
    163         always <autodir>/results/<tag>, where tag is passed in on the command
    164         line as an option.
    165         """
    166         output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
    167                                                            'output_dir',
    168                                                             default="")
    169         if options.output_dir:
    170             basedir = options.output_dir
    171         elif output_dir_config:
    172             basedir = output_dir_config
    173         else:
    174             basedir = self.autodir
    175 
    176         return os.path.join(basedir, 'results', options.tag)
    177 
    178 
    179     def _get_status_logger(self):
    180         """Return a reference to the status logger."""
    181         return self._logger
    182 
    183 
    184     def _pre_record_init(self, control, options):
    185         """
    186         Initialization function that should peform ONLY the required
    187         setup so that the self.record() method works.
    188 
    189         As of now self.record() needs self.resultdir, self._group_level,
    190         self.harness and of course self._logger.
    191         """
    192         if not options.cont:
    193             self._cleanup_debugdir_files()
    194             self._cleanup_results_dir()
    195 
    196         logging_manager.configure_logging(
    197             client_logging_config.ClientLoggingConfig(),
    198             results_dir=self.resultdir,
    199             verbose=options.verbose)
    200         logging.info('Writing results to %s', self.resultdir)
    201 
    202         # init_group_level needs the state
    203         self.control = os.path.realpath(control)
    204         self._is_continuation = options.cont
    205         self._current_step_ancestry = []
    206         self._next_step_index = 0
    207         self._load_state()
    208 
    209         _harness = self.handle_persistent_option(options, 'harness')
    210         _harness_args = self.handle_persistent_option(options, 'harness_args')
    211 
    212         self.harness = harness.select(_harness, self, _harness_args)
    213 
    214         # set up the status logger
    215         def client_job_record_hook(entry):
    216             msg_tag = ''
    217             if '.' in self._logger.global_filename:
    218                 msg_tag = self._logger.global_filename.split('.', 1)[1]
    219             # send the entry to the job harness
    220             message = '\n'.join([entry.message] + entry.extra_message_lines)
    221             rendered_entry = self._logger.render_entry(entry)
    222             self.harness.test_status_detail(entry.status_code, entry.subdir,
    223                                             entry.operation, message, msg_tag,
    224                                             entry.fields)
    225             self.harness.test_status(rendered_entry, msg_tag)
    226             # send the entry to stdout, if it's enabled
    227             logging.info(rendered_entry)
    228         self._logger = base_job.status_logger(
    229             self, status_indenter(self), record_hook=client_job_record_hook)
    230 
    231 
    232     def _post_record_init(self, control, options, drop_caches):
    233         """
    234         Perform job initialization not required by self.record().
    235         """
    236         self._init_drop_caches(drop_caches)
    237 
    238         self._init_packages()
    239 
    240         self.sysinfo = sysinfo.sysinfo(self.resultdir)
    241         self._load_sysinfo_state()
    242 
    243         if not options.cont:
    244             download = os.path.join(self.testdir, 'download')
    245             if not os.path.exists(download):
    246                 os.mkdir(download)
    247 
    248             shutil.copyfile(self.control,
    249                             os.path.join(self.resultdir, 'control'))
    250 
    251         self.control = control
    252 
    253         self.logging = logging_manager.get_logging_manager(
    254                 manage_stdout_and_stderr=True, redirect_fds=True)
    255         self.logging.start_logging()
    256 
    257         self.profilers = profilers.profilers(self)
    258 
    259         self.machines = [options.hostname]
    260         self.machine_dict_list = [{'hostname' : options.hostname}]
    261         # Client side tests should always run the same whether or not they are
    262         # running in the lab.
    263         self.in_lab = False
    264         self.hosts = set([local_host.LocalHost(hostname=options.hostname)])
    265 
    266         self.args = []
    267         if options.args:
    268             self.args = self._parse_args(options.args)
    269 
    270         if options.user:
    271             self.user = options.user
    272         else:
    273             self.user = getpass.getuser()
    274 
    275         self.sysinfo.log_per_reboot_data()
    276 
    277         if not options.cont:
    278             self.record('START', None, None)
    279 
    280         self.harness.run_start()
    281 
    282         if options.log:
    283             self.enable_external_logging()
    284 
    285         self.num_tests_run = None
    286         self.num_tests_failed = None
    287 
    288         self.warning_loggers = None
    289         self.warning_manager = None
    290 
    291 
    292     def _init_drop_caches(self, drop_caches):
    293         """
    294         Perform the drop caches initialization.
    295         """
    296         self.drop_caches_between_iterations = (
    297                                     GLOBAL_CONFIG.get_config_value('CLIENT',
    298                                     'drop_caches_between_iterations',
    299                                     type=bool, default=True))
    300         self.drop_caches = drop_caches
    301         if self.drop_caches:
    302             utils.drop_caches()
    303 
    304 
    305     def _init_packages(self):
    306         """
    307         Perform the packages support initialization.
    308         """
    309         self.pkgmgr = packages.PackageManager(
    310             self.autodir, run_function_dargs={'timeout':3600})
    311 
    312 
    313     def _cleanup_results_dir(self):
    314         """Delete everything in resultsdir"""
    315         assert os.path.exists(self.resultdir)
    316         list_files = glob.glob('%s/*' % self.resultdir)
    317         for f in list_files:
    318             if os.path.isdir(f):
    319                 shutil.rmtree(f)
    320             elif os.path.isfile(f):
    321                 os.remove(f)
    322 
    323 
    324     def _cleanup_debugdir_files(self):
    325         """
    326         Delete any leftover debugdir files
    327         """
    328         list_files = glob.glob("/tmp/autotest_results_dir.*")
    329         for f in list_files:
    330             os.remove(f)
    331 
    332 
    333     def disable_warnings(self, warning_type):
    334         self.record("INFO", None, None,
    335                     "disabling %s warnings" % warning_type,
    336                     {"warnings.disable": warning_type})
    337         time.sleep(self._WARNING_DISABLE_DELAY)
    338 
    339 
    340     def enable_warnings(self, warning_type):
    341         time.sleep(self._WARNING_DISABLE_DELAY)
    342         self.record("INFO", None, None,
    343                     "enabling %s warnings" % warning_type,
    344                     {"warnings.enable": warning_type})
    345 
    346 
    347     def monitor_disk_usage(self, max_rate):
    348         """\
    349         Signal that the job should monitor disk space usage on /
    350         and generate a warning if a test uses up disk space at a
    351         rate exceeding 'max_rate'.
    352 
    353         Parameters:
    354              max_rate - the maximium allowed rate of disk consumption
    355                         during a test, in MB/hour, or 0 to indicate
    356                         no limit.
    357         """
    358         self._max_disk_usage_rate = max_rate
    359 
    360 
    361     def control_get(self):
    362         return self.control
    363 
    364 
    365     def control_set(self, control):
    366         self.control = os.path.abspath(control)
    367 
    368 
    369     def harness_select(self, which, harness_args):
    370         self.harness = harness.select(which, self, harness_args)
    371 
    372 
    373     def setup_dirs(self, results_dir, tmp_dir):
    374         if not tmp_dir:
    375             tmp_dir = os.path.join(self.tmpdir, 'build')
    376         if not os.path.exists(tmp_dir):
    377             os.mkdir(tmp_dir)
    378         if not os.path.isdir(tmp_dir):
    379             e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
    380             raise ValueError(e_msg)
    381 
    382         # We label the first build "build" and then subsequent ones
    383         # as "build.2", "build.3", etc. Whilst this is a little bit
    384         # inconsistent, 99.9% of jobs will only have one build
    385         # (that's not done as kernbench, sparse, or buildtest),
    386         # so it works out much cleaner. One of life's compromises.
    387         if not results_dir:
    388             results_dir = os.path.join(self.resultdir, 'build')
    389             i = 2
    390             while os.path.exists(results_dir):
    391                 results_dir = os.path.join(self.resultdir, 'build.%d' % i)
    392                 i += 1
    393         if not os.path.exists(results_dir):
    394             os.mkdir(results_dir)
    395 
    396         return (results_dir, tmp_dir)
    397 
    398 
    399     def barrier(self, *args, **kwds):
    400         """Create a barrier object"""
    401         return barrier.barrier(*args, **kwds)
    402 
    403 
    404     def install_pkg(self, name, pkg_type, install_dir):
    405         '''
    406         This method is a simple wrapper around the actual package
    407         installation method in the Packager class. This is used
    408         internally by the profilers, deps and tests code.
    409         name : name of the package (ex: sleeptest, dbench etc.)
    410         pkg_type : Type of the package (ex: test, dep etc.)
    411         install_dir : The directory in which the source is actually
    412                       untarred into. (ex: client/profilers/<name> for profilers)
    413         '''
    414         if self.pkgmgr.repositories:
    415             self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
    416 
    417 
    418     def add_repository(self, repo_urls):
    419         '''
    420         Adds the repository locations to the job so that packages
    421         can be fetched from them when needed. The repository list
    422         needs to be a string list
    423         Ex: job.add_repository(['http://blah1','http://blah2'])
    424         '''
    425         for repo_url in repo_urls:
    426             self.pkgmgr.add_repository(repo_url)
    427 
    428         # Fetch the packages' checksum file that contains the checksums
    429         # of all the packages if it is not already fetched. The checksum
    430         # is always fetched whenever a job is first started. This
    431         # is not done in the job's constructor as we don't have the list of
    432         # the repositories there (and obviously don't care about this file
    433         # if we are not using the repos)
    434         try:
    435             checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
    436                                               packages.CHECKSUM_FILE)
    437             self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,
    438                                   checksum_file_path, use_checksum=False)
    439         except error.PackageFetchError:
    440             # packaging system might not be working in this case
    441             # Silently fall back to the normal case
    442             pass
    443 
    444 
    445     def require_gcc(self):
    446         """
    447         Test whether gcc is installed on the machine.
    448         """
    449         # check if gcc is installed on the system.
    450         try:
    451             utils.system('which gcc')
    452         except error.CmdError:
    453             raise NotAvailableError('gcc is required by this job and is '
    454                                     'not available on the system')
    455 
    456 
    457     def setup_dep(self, deps):
    458         """Set up the dependencies for this test.
    459         deps is a list of libraries required for this test.
    460         """
    461         # Fetch the deps from the repositories and set them up.
    462         for dep in deps:
    463             dep_dir = os.path.join(self.autodir, 'deps', dep)
    464             # Search for the dependency in the repositories if specified,
    465             # else check locally.
    466             try:
    467                 self.install_pkg(dep, 'dep', dep_dir)
    468             except error.PackageInstallError:
    469                 # see if the dep is there locally
    470                 pass
    471 
    472             # dep_dir might not exist if it is not fetched from the repos
    473             if not os.path.exists(dep_dir):
    474                 raise error.TestError("Dependency %s does not exist" % dep)
    475 
    476             os.chdir(dep_dir)
    477             if execfile('%s.py' % dep, {}) is None:
    478                 logging.info('Dependency %s successfuly built', dep)
    479 
    480 
    481     def _runtest(self, url, tag, timeout, args, dargs):
    482         try:
    483             l = lambda : test.runtest(self, url, tag, args, dargs)
    484             pid = parallel.fork_start(self.resultdir, l)
    485 
    486             if timeout:
    487                 logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
    488                 parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
    489             else:
    490                 parallel.fork_waitfor(self.resultdir, pid)
    491 
    492         except error.TestBaseException:
    493             # These are already classified with an error type (exit_status)
    494             raise
    495         except error.JobError:
    496             raise  # Caught further up and turned into an ABORT.
    497         except Exception, e:
    498             # Converts all other exceptions thrown by the test regardless
    499             # of phase into a TestError(TestBaseException) subclass that
    500             # reports them with their full stack trace.
    501             raise error.UnhandledTestError(e)
    502 
    503 
    504     def _run_test_base(self, url, *args, **dargs):
    505         """
    506         Prepares arguments and run functions to run_test and run_test_detail.
    507 
    508         @param url A url that identifies the test to run.
    509         @param tag An optional keyword argument that will be added to the
    510             test and subdir name.
    511         @param subdir_tag An optional keyword argument that will be added
    512             to the subdir name.
    513 
    514         @returns:
    515                 subdir: Test subdirectory
    516                 testname: Test name
    517                 group_func: Actual test run function
    518                 timeout: Test timeout
    519         """
    520         _group, testname = self.pkgmgr.get_package_name(url, 'test')
    521         testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
    522         self._make_test_outputdir(subdir)
    523 
    524         timeout = dargs.pop('timeout', None)
    525         if timeout:
    526             logging.debug('Test has timeout: %d sec.', timeout)
    527 
    528         def log_warning(reason):
    529             self.record("WARN", subdir, testname, reason)
    530         @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
    531         def group_func():
    532             try:
    533                 self._runtest(url, tag, timeout, args, dargs)
    534             except error.TestBaseException, detail:
    535                 # The error is already classified, record it properly.
    536                 self.record(detail.exit_status, subdir, testname, str(detail))
    537                 raise
    538             else:
    539                 self.record('GOOD', subdir, testname, 'completed successfully')
    540 
    541         return (subdir, testname, group_func, timeout)
    542 
    543 
    544     @_run_test_complete_on_exit
    545     def run_test(self, url, *args, **dargs):
    546         """
    547         Summon a test object and run it.
    548 
    549         @param url A url that identifies the test to run.
    550         @param tag An optional keyword argument that will be added to the
    551             test and subdir name.
    552         @param subdir_tag An optional keyword argument that will be added
    553             to the subdir name.
    554 
    555         @returns True if the test passes, False otherwise.
    556         """
    557         (subdir, testname, group_func, timeout) = self._run_test_base(url,
    558                                                                       *args,
    559                                                                       **dargs)
    560         try:
    561             self._rungroup(subdir, testname, group_func, timeout)
    562             return True
    563         except error.TestBaseException:
    564             return False
    565         # Any other exception here will be given to the caller
    566         #
    567         # NOTE: The only exception possible from the control file here
    568         # is error.JobError as _runtest() turns all others into an
    569         # UnhandledTestError that is caught above.
    570 
    571 
    572     @_run_test_complete_on_exit
    573     def run_test_detail(self, url, *args, **dargs):
    574         """
    575         Summon a test object and run it, returning test status.
    576 
    577         @param url A url that identifies the test to run.
    578         @param tag An optional keyword argument that will be added to the
    579             test and subdir name.
    580         @param subdir_tag An optional keyword argument that will be added
    581             to the subdir name.
    582 
    583         @returns Test status
    584         @see: client/common_lib/error.py, exit_status
    585         """
    586         (subdir, testname, group_func, timeout) = self._run_test_base(url,
    587                                                                       *args,
    588                                                                       **dargs)
    589         try:
    590             self._rungroup(subdir, testname, group_func, timeout)
    591             return 'GOOD'
    592         except error.TestBaseException, detail:
    593             return detail.exit_status
    594 
    595 
    596     def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
    597         """\
    598         subdir:
    599                 name of the group
    600         testname:
    601                 name of the test to run, or support step
    602         function:
    603                 subroutine to run
    604         *args:
    605                 arguments for the function
    606 
    607         Returns the result of the passed in function
    608         """
    609 
    610         try:
    611             optional_fields = None
    612             if timeout:
    613                 optional_fields = {}
    614                 optional_fields['timeout'] = timeout
    615             self.record('START', subdir, testname,
    616                         optional_fields=optional_fields)
    617 
    618             self._state.set('client', 'unexpected_reboot', (subdir, testname))
    619             try:
    620                 result = function(*args, **dargs)
    621                 self.record('END GOOD', subdir, testname)
    622                 return result
    623             except error.TestBaseException, e:
    624                 self.record('END %s' % e.exit_status, subdir, testname)
    625                 raise
    626             except error.JobError, e:
    627                 self.record('END ABORT', subdir, testname)
    628                 raise
    629             except Exception, e:
    630                 # This should only ever happen due to a bug in the given
    631                 # function's code.  The common case of being called by
    632                 # run_test() will never reach this.  If a control file called
    633                 # run_group() itself, bugs in its function will be caught
    634                 # here.
    635                 err_msg = str(e) + '\n' + traceback.format_exc()
    636                 self.record('END ERROR', subdir, testname, err_msg)
    637                 raise
    638         finally:
    639             self._state.discard('client', 'unexpected_reboot')
    640 
    641 
    642     def run_group(self, function, tag=None, **dargs):
    643         """
    644         Run a function nested within a group level.
    645 
    646         function:
    647                 Callable to run.
    648         tag:
    649                 An optional tag name for the group.  If None (default)
    650                 function.__name__ will be used.
    651         **dargs:
    652                 Named arguments for the function.
    653         """
    654         if tag:
    655             name = tag
    656         else:
    657             name = function.__name__
    658 
    659         try:
    660             return self._rungroup(subdir=None, testname=name,
    661                                   function=function, timeout=None, **dargs)
    662         except (SystemExit, error.TestBaseException):
    663             raise
    664         # If there was a different exception, turn it into a TestError.
    665         # It will be caught by step_engine or _run_step_fn.
    666         except Exception, e:
    667             raise error.UnhandledTestError(e)
    668 
    669 
    670     def cpu_count(self):
    671         return utils.count_cpus()  # use total system count
    672 
    673 
    674     def start_reboot(self):
    675         self.record('START', None, 'reboot')
    676         self.record('GOOD', None, 'reboot.start')
    677 
    678 
    679     def _record_reboot_failure(self, subdir, operation, status,
    680                                running_id=None):
    681         self.record("ABORT", subdir, operation, status)
    682         if not running_id:
    683             running_id = utils.running_os_ident()
    684         kernel = {"kernel": running_id.split("::")[0]}
    685         self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
    686 
    687 
    688     def _check_post_reboot(self, subdir, running_id=None):
    689         """
    690         Function to perform post boot checks such as if the system configuration
    691         has changed across reboots (specifically, CPUs and partitions).
    692 
    693         @param subdir: The subdir to use in the job.record call.
    694         @param running_id: An optional running_id to include in the reboot
    695             failure log message
    696 
    697         @raise JobError: Raised if the current configuration does not match the
    698             pre-reboot configuration.
    699         """
    700         # check to see if any partitions have changed
    701         partition_list = partition_lib.get_partition_list(self,
    702                                                           exclude_swap=False)
    703         mount_info = partition_lib.get_mount_info(partition_list)
    704         old_mount_info = self._state.get('client', 'mount_info')
    705         if mount_info != old_mount_info:
    706             new_entries = mount_info - old_mount_info
    707             old_entries = old_mount_info - mount_info
    708             description = ("mounted partitions are different after reboot "
    709                            "(old entries: %s, new entries: %s)" %
    710                            (old_entries, new_entries))
    711             self._record_reboot_failure(subdir, "reboot.verify_config",
    712                                         description, running_id=running_id)
    713             raise error.JobError("Reboot failed: %s" % description)
    714 
    715         # check to see if any CPUs have changed
    716         cpu_count = utils.count_cpus()
    717         old_count = self._state.get('client', 'cpu_count')
    718         if cpu_count != old_count:
    719             description = ('Number of CPUs changed after reboot '
    720                            '(old count: %d, new count: %d)' %
    721                            (old_count, cpu_count))
    722             self._record_reboot_failure(subdir, 'reboot.verify_config',
    723                                         description, running_id=running_id)
    724             raise error.JobError('Reboot failed: %s' % description)
    725 
    726 
    727     def partition(self, device, loop_size=0, mountpoint=None):
    728         """
    729         Work with a machine partition
    730 
    731             @param device: e.g. /dev/sda2, /dev/sdb1 etc...
    732             @param mountpoint: Specify a directory to mount to. If not specified
    733                                autotest tmp directory will be used.
    734             @param loop_size: Size of loopback device (in MB). Defaults to 0.
    735 
    736             @return: A L{client.bin.partition.partition} object
    737         """
    738 
    739         if not mountpoint:
    740             mountpoint = self.tmpdir
    741         return partition_lib.partition(self, device, loop_size, mountpoint)
    742 
    743     @utils.deprecated
    744     def filesystem(self, device, mountpoint=None, loop_size=0):
    745         """ Same as partition
    746 
    747         @deprecated: Use partition method instead
    748         """
    749         return self.partition(device, loop_size, mountpoint)
    750 
    751 
    752     def enable_external_logging(self):
    753         pass
    754 
    755 
    756     def disable_external_logging(self):
    757         pass
    758 
    759 
    760     def reboot_setup(self):
    761         # save the partition list and mount points, as well as the cpu count
    762         partition_list = partition_lib.get_partition_list(self,
    763                                                           exclude_swap=False)
    764         mount_info = partition_lib.get_mount_info(partition_list)
    765         self._state.set('client', 'mount_info', mount_info)
    766         self._state.set('client', 'cpu_count', utils.count_cpus())
    767 
    768 
    769     def reboot(self):
    770         self.reboot_setup()
    771         self.harness.run_reboot()
    772 
    773         # HACK: using this as a module sometimes hangs shutdown, so if it's
    774         # installed unload it first
    775         utils.system("modprobe -r netconsole", ignore_status=True)
    776 
    777         # sync first, so that a sync during shutdown doesn't time out
    778         utils.system("sync; sync", ignore_status=True)
    779 
    780         utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
    781         self.quit()
    782 
    783 
    784     def noop(self, text):
    785         logging.info("job: noop: " + text)
    786 
    787 
    788     @_run_test_complete_on_exit
    789     def parallel(self, *tasklist):
    790         """Run tasks in parallel"""
    791 
    792         pids = []
    793         old_log_filename = self._logger.global_filename
    794         for i, task in enumerate(tasklist):
    795             assert isinstance(task, (tuple, list))
    796             self._logger.global_filename = old_log_filename + (".%d" % i)
    797             def task_func():
    798                 # stub out _record_indent with a process-local one
    799                 base_record_indent = self._record_indent
    800                 proc_local = self._job_state.property_factory(
    801                     '_state', '_record_indent.%d' % os.getpid(),
    802                     base_record_indent, namespace='client')
    803                 self.__class__._record_indent = proc_local
    804                 task[0](*task[1:])
    805             pids.append(parallel.fork_start(self.resultdir, task_func))
    806 
    807         old_log_path = os.path.join(self.resultdir, old_log_filename)
    808         old_log = open(old_log_path, "a")
    809         exceptions = []
    810         for i, pid in enumerate(pids):
    811             # wait for the task to finish
    812             try:
    813                 parallel.fork_waitfor(self.resultdir, pid)
    814             except Exception, e:
    815                 exceptions.append(e)
    816             # copy the logs from the subtask into the main log
    817             new_log_path = old_log_path + (".%d" % i)
    818             if os.path.exists(new_log_path):
    819                 new_log = open(new_log_path)
    820                 old_log.write(new_log.read())
    821                 new_log.close()
    822                 old_log.flush()
    823                 os.remove(new_log_path)
    824         old_log.close()
    825 
    826         self._logger.global_filename = old_log_filename
    827 
    828         # handle any exceptions raised by the parallel tasks
    829         if exceptions:
    830             msg = "%d task(s) failed in job.parallel" % len(exceptions)
    831             raise error.JobError(msg)
    832 
    833 
    834     def quit(self):
    835         # XXX: should have a better name.
    836         self.harness.run_pause()
    837         raise error.JobContinue("more to come")
    838 
    839 
    840     def complete(self, status):
    841         """Write pending reports, clean up, and exit"""
    842         # write out a job HTML report
    843         try:
    844             html_report.create_report(self.resultdir)
    845         except Exception, e:
    846             logging.error("Error writing job HTML report: %s", e)
    847 
    848         # We are about to exit 'complete' so clean up the control file.
    849         dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
    850         shutil.move(self._state_file, dest)
    851 
    852         self.harness.run_complete()
    853         self.disable_external_logging()
    854         sys.exit(status)
    855 
    856 
    857     def _load_state(self):
    858         # grab any initial state and set up $CONTROL.state as the backing file
    859         init_state_file = self.control + '.init.state'
    860         self._state_file = self.control + '.state'
    861         if os.path.exists(init_state_file):
    862             shutil.move(init_state_file, self._state_file)
    863         self._state.set_backing_file(self._state_file)
    864 
    865         # initialize the state engine, if necessary
    866         has_steps = self._state.has('client', 'steps')
    867         if not self._is_continuation and has_steps:
    868             raise RuntimeError('Loaded state can only contain client.steps if '
    869                                'this is a continuation')
    870 
    871         if not has_steps:
    872             logging.debug('Initializing the state engine')
    873             self._state.set('client', 'steps', [])
    874 
    875 
    876     def handle_persistent_option(self, options, option_name):
    877         """
    878         Select option from command line or persistent state.
    879         Store selected option to allow standalone client to continue
    880         after reboot with previously selected options.
    881         Priority:
    882         1. explicitly specified via command line
    883         2. stored in state file (if continuing job '-c')
    884         3. default == None
    885         """
    886         option = None
    887         cmd_line_option = getattr(options, option_name)
    888         if cmd_line_option:
    889             option = cmd_line_option
    890             self._state.set('client', option_name, option)
    891         else:
    892             stored_option = self._state.get('client', option_name, None)
    893             if stored_option:
    894                 option = stored_option
    895         logging.debug('Persistent option %s now set to %s', option_name, option)
    896         return option
    897 
    898 
    899     def __create_step_tuple(self, fn, args, dargs):
    900         # Legacy code passes in an array where the first arg is
    901         # the function or its name.
    902         if isinstance(fn, list):
    903             assert(len(args) == 0)
    904             assert(len(dargs) == 0)
    905             args = fn[1:]
    906             fn = fn[0]
    907         # Pickling actual functions is hairy, thus we have to call
    908         # them by name.  Unfortunately, this means only functions
    909         # defined globally can be used as a next step.
    910         if callable(fn):
    911             fn = fn.__name__
    912         if not isinstance(fn, types.StringTypes):
    913             raise StepError("Next steps must be functions or "
    914                             "strings containing the function name")
    915         ancestry = copy.copy(self._current_step_ancestry)
    916         return (ancestry, fn, args, dargs)
    917 
    918 
    919     def next_step_append(self, fn, *args, **dargs):
    920         """Define the next step and place it at the end"""
    921         steps = self._state.get('client', 'steps')
    922         steps.append(self.__create_step_tuple(fn, args, dargs))
    923         self._state.set('client', 'steps', steps)
    924 
    925 
    926     def next_step(self, fn, *args, **dargs):
    927         """Create a new step and place it after any steps added
    928         while running the current step but before any steps added in
    929         previous steps"""
    930         steps = self._state.get('client', 'steps')
    931         steps.insert(self._next_step_index,
    932                      self.__create_step_tuple(fn, args, dargs))
    933         self._next_step_index += 1
    934         self._state.set('client', 'steps', steps)
    935 
    936 
    937     def next_step_prepend(self, fn, *args, **dargs):
    938         """Insert a new step, executing first"""
    939         steps = self._state.get('client', 'steps')
    940         steps.insert(0, self.__create_step_tuple(fn, args, dargs))
    941         self._next_step_index += 1
    942         self._state.set('client', 'steps', steps)
    943 
    944 
    945 
    946     def _run_step_fn(self, local_vars, fn, args, dargs):
    947         """Run a (step) function within the given context"""
    948 
    949         local_vars['__args'] = args
    950         local_vars['__dargs'] = dargs
    951         try:
    952             exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
    953             return local_vars['__ret']
    954         except SystemExit:
    955             raise  # Send error.JobContinue and JobComplete on up to runjob.
    956         except error.TestNAError, detail:
    957             self.record(detail.exit_status, None, fn, str(detail))
    958         except Exception, detail:
    959             raise error.UnhandledJobError(detail)
    960 
    961 
    962     def _create_frame(self, global_vars, ancestry, fn_name):
    963         """Set up the environment like it would have been when this
    964         function was first defined.
    965 
    966         Child step engine 'implementations' must have 'return locals()'
    967         at end end of their steps.  Because of this, we can call the
    968         parent function and get back all child functions (i.e. those
    969         defined within it).
    970 
    971         Unfortunately, the call stack of the function calling
    972         job.next_step might have been deeper than the function it
    973         added.  In order to make sure that the environment is what it
    974         should be, we need to then pop off the frames we built until
    975         we find the frame where the function was first defined."""
    976 
    977         # The copies ensure that the parent frames are not modified
    978         # while building child frames.  This matters if we then
    979         # pop some frames in the next part of this function.
    980         current_frame = copy.copy(global_vars)
    981         frames = [current_frame]
    982         for steps_fn_name in ancestry:
    983             ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
    984             current_frame = copy.copy(ret)
    985             frames.append(current_frame)
    986 
    987         # Walk up the stack frames until we find the place fn_name was defined.
    988         while len(frames) > 2:
    989             if fn_name not in frames[-2]:
    990                 break
    991             if frames[-2][fn_name] != frames[-1][fn_name]:
    992                 break
    993             frames.pop()
    994             ancestry.pop()
    995 
    996         return (frames[-1], ancestry)
    997 
    998 
    999     def _add_step_init(self, local_vars, current_function):
   1000         """If the function returned a dictionary that includes a
   1001         function named 'step_init', prepend it to our list of steps.
   1002         This will only get run the first time a function with a nested
   1003         use of the step engine is run."""
   1004 
   1005         if (isinstance(local_vars, dict) and
   1006             'step_init' in local_vars and
   1007             callable(local_vars['step_init'])):
   1008             # The init step is a child of the function
   1009             # we were just running.
   1010             self._current_step_ancestry.append(current_function)
   1011             self.next_step_prepend('step_init')
   1012 
   1013 
   1014     def step_engine(self):
   1015         """The multi-run engine used when the control file defines step_init.
   1016 
   1017         Does the next step.
   1018         """
   1019 
   1020         # Set up the environment and then interpret the control file.
   1021         # Some control files will have code outside of functions,
   1022         # which means we need to have our state engine initialized
   1023         # before reading in the file.
   1024         global_control_vars = {'job': self,
   1025                                'args': self.args}
   1026         exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
   1027         try:
   1028             execfile(self.control, global_control_vars, global_control_vars)
   1029         except error.TestNAError, detail:
   1030             self.record(detail.exit_status, None, self.control, str(detail))
   1031         except SystemExit:
   1032             raise  # Send error.JobContinue and JobComplete on up to runjob.
   1033         except Exception, detail:
   1034             # Syntax errors or other general Python exceptions coming out of
   1035             # the top level of the control file itself go through here.
   1036             raise error.UnhandledJobError(detail)
   1037 
   1038         # If we loaded in a mid-job state file, then we presumably
   1039         # know what steps we have yet to run.
   1040         if not self._is_continuation:
   1041             if 'step_init' in global_control_vars:
   1042                 self.next_step(global_control_vars['step_init'])
   1043         else:
   1044             # if last job failed due to unexpected reboot, record it as fail
   1045             # so harness gets called
   1046             last_job = self._state.get('client', 'unexpected_reboot', None)
   1047             if last_job:
   1048                 subdir, testname = last_job
   1049                 self.record('FAIL', subdir, testname, 'unexpected reboot')
   1050                 self.record('END FAIL', subdir, testname)
   1051 
   1052         # Iterate through the steps.  If we reboot, we'll simply
   1053         # continue iterating on the next step.
   1054         while len(self._state.get('client', 'steps')) > 0:
   1055             steps = self._state.get('client', 'steps')
   1056             (ancestry, fn_name, args, dargs) = steps.pop(0)
   1057             self._state.set('client', 'steps', steps)
   1058 
   1059             self._next_step_index = 0
   1060             ret = self._create_frame(global_control_vars, ancestry, fn_name)
   1061             local_vars, self._current_step_ancestry = ret
   1062             local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
   1063             self._add_step_init(local_vars, fn_name)
   1064 
   1065 
   1066     def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
   1067         self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
   1068                                    on_every_test)
   1069 
   1070 
   1071     def add_sysinfo_logfile(self, file, on_every_test=False):
   1072         self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
   1073 
   1074 
   1075     def _add_sysinfo_loggable(self, loggable, on_every_test):
   1076         if on_every_test:
   1077             self.sysinfo.test_loggables.add(loggable)
   1078         else:
   1079             self.sysinfo.boot_loggables.add(loggable)
   1080         self._save_sysinfo_state()
   1081 
   1082 
   1083     def _load_sysinfo_state(self):
   1084         state = self._state.get('client', 'sysinfo', None)
   1085         if state:
   1086             self.sysinfo.deserialize(state)
   1087 
   1088 
   1089     def _save_sysinfo_state(self):
   1090         state = self.sysinfo.serialize()
   1091         self._state.set('client', 'sysinfo', state)
   1092 
   1093 
   1094 class disk_usage_monitor:
   1095     def __init__(self, logging_func, device, max_mb_per_hour):
   1096         self.func = logging_func
   1097         self.device = device
   1098         self.max_mb_per_hour = max_mb_per_hour
   1099 
   1100 
   1101     def start(self):
   1102         self.initial_space = utils.freespace(self.device)
   1103         self.start_time = time.time()
   1104 
   1105 
   1106     def stop(self):
   1107         # if no maximum usage rate was set, we don't need to
   1108         # generate any warnings
   1109         if not self.max_mb_per_hour:
   1110             return
   1111 
   1112         final_space = utils.freespace(self.device)
   1113         used_space = self.initial_space - final_space
   1114         stop_time = time.time()
   1115         total_time = stop_time - self.start_time
   1116         # round up the time to one minute, to keep extremely short
   1117         # tests from generating false positives due to short, badly
   1118         # timed bursts of activity
   1119         total_time = max(total_time, 60.0)
   1120 
   1121         # determine the usage rate
   1122         bytes_per_sec = used_space / total_time
   1123         mb_per_sec = bytes_per_sec / 1024**2
   1124         mb_per_hour = mb_per_sec * 60 * 60
   1125 
   1126         if mb_per_hour > self.max_mb_per_hour:
   1127             msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
   1128             msg %= (self.device, mb_per_hour)
   1129             self.func(msg)
   1130 
   1131 
   1132     @classmethod
   1133     def watch(cls, *monitor_args, **monitor_dargs):
   1134         """ Generic decorator to wrap a function call with the
   1135         standard create-monitor -> start -> call -> stop idiom."""
   1136         def decorator(func):
   1137             def watched_func(*args, **dargs):
   1138                 monitor = cls(*monitor_args, **monitor_dargs)
   1139                 monitor.start()
   1140                 try:
   1141                     func(*args, **dargs)
   1142                 finally:
   1143                     monitor.stop()
   1144             return watched_func
   1145         return decorator
   1146 
   1147 
   1148 def runjob(control, drop_caches, options):
   1149     """
   1150     Run a job using the given control file.
   1151 
   1152     This is the main interface to this module.
   1153 
   1154     @see base_job.__init__ for parameter info.
   1155     """
   1156     control = os.path.abspath(control)
   1157     state = control + '.state'
   1158     # Ensure state file is cleaned up before the job starts to run if autotest
   1159     # is not running with the --continue flag
   1160     if not options.cont and os.path.isfile(state):
   1161         logging.debug('Cleaning up previously found state file')
   1162         os.remove(state)
   1163 
   1164     # instantiate the job object ready for the control file.
   1165     myjob = None
   1166     try:
   1167         # Check that the control file is valid
   1168         if not os.path.exists(control):
   1169             raise error.JobError(control + ": control file not found")
   1170 
   1171         # When continuing, the job is complete when there is no
   1172         # state file, ensure we don't try and continue.
   1173         if options.cont and not os.path.exists(state):
   1174             raise error.JobComplete("all done")
   1175 
   1176         myjob = job(control=control, drop_caches=drop_caches, options=options)
   1177 
   1178         # Load in the users control file, may do any one of:
   1179         #  1) execute in toto
   1180         #  2) define steps, and select the first via next_step()
   1181         myjob.step_engine()
   1182 
   1183     except error.JobContinue:
   1184         sys.exit(5)
   1185 
   1186     except error.JobComplete:
   1187         sys.exit(1)
   1188 
   1189     except error.JobError, instance:
   1190         logging.error("JOB ERROR: " + str(instance))
   1191         if myjob:
   1192             command = None
   1193             if len(instance.args) > 1:
   1194                 command = instance.args[1]
   1195                 myjob.record('ABORT', None, command, str(instance))
   1196             myjob.record('END ABORT', None, None, str(instance))
   1197             assert myjob._record_indent == 0
   1198             myjob.complete(1)
   1199         else:
   1200             sys.exit(1)
   1201 
   1202     except Exception, e:
   1203         # NOTE: job._run_step_fn and job.step_engine will turn things into
   1204         # a JobError for us.  If we get here, its likely an autotest bug.
   1205         msg = str(e) + '\n' + traceback.format_exc()
   1206         logging.critical("JOB ERROR (autotest bug?): " + msg)
   1207         if myjob:
   1208             myjob.record('END ABORT', None, None, msg)
   1209             assert myjob._record_indent == 0
   1210             myjob.complete(1)
   1211         else:
   1212             sys.exit(1)
   1213 
   1214     # If we get here, then we assume the job is complete and good.
   1215     myjob.record('END GOOD', None, None)
   1216     assert myjob._record_indent == 0
   1217 
   1218     myjob.complete(0)
   1219 
   1220 
   1221 class job(base_client_job):
   1222 
   1223     def __init__(self, *args, **kwargs):
   1224         base_client_job.__init__(self, *args, **kwargs)
   1225 
   1226 
   1227     def run_test(self, url, *args, **dargs):
   1228         log_pauser = cros_logging.LogRotationPauser()
   1229         passed = False
   1230         try:
   1231             log_pauser.begin()
   1232             passed = base_client_job.run_test(self, url, *args, **dargs)
   1233             if not passed:
   1234                 # Save the VM state immediately after the test failure.
   1235                 # This is a NOOP if the the test isn't running in a VM or
   1236                 # if the VM is not properly configured to save state.
   1237                 _group, testname = self.pkgmgr.get_package_name(url, 'test')
   1238                 now = datetime.now().strftime('%I:%M:%S.%f')
   1239                 checkpoint_name = '%s-%s' % (testname, now)
   1240                 utils.save_vm_state(checkpoint_name)
   1241         finally:
   1242             log_pauser.end()
   1243         return passed
   1244 
   1245 
   1246     def reboot(self):
   1247         self.reboot_setup()
   1248         self.harness.run_reboot()
   1249 
   1250         # sync first, so that a sync during shutdown doesn't time out
   1251         utils.system('sync; sync', ignore_status=True)
   1252 
   1253         utils.system('reboot </dev/null >/dev/null 2>&1 &')
   1254         self.quit()
   1255 
   1256 
   1257     def require_gcc(self):
   1258         return False
   1259