Home | History | Annotate | Download | only in bin
      1 """The main job wrapper
      2 
      3 This is the core infrastructure.
      4 
      5 Copyright Andy Whitcroft, Martin J. Bligh 2006
      6 """
      7 
      8 # pylint: disable=missing-docstring
      9 
     10 import copy
     11 from datetime import datetime
     12 import getpass
     13 import glob
     14 import logging
     15 import os
     16 import re
     17 import shutil
     18 import sys
     19 import time
     20 import traceback
     21 import types
     22 import weakref
     23 
     24 import common
     25 from autotest_lib.client.bin import client_logging_config
     26 from autotest_lib.client.bin import harness
     27 from autotest_lib.client.bin import local_host
     28 from autotest_lib.client.bin import parallel
     29 from autotest_lib.client.bin import partition as partition_lib
     30 from autotest_lib.client.bin import profilers
     31 from autotest_lib.client.bin import sysinfo
     32 from autotest_lib.client.bin import test
     33 from autotest_lib.client.bin import utils
     34 from autotest_lib.client.common_lib import barrier
     35 from autotest_lib.client.common_lib import base_job
     36 from autotest_lib.client.common_lib import control_data
     37 from autotest_lib.client.common_lib import error
     38 from autotest_lib.client.common_lib import global_config
     39 from autotest_lib.client.common_lib import logging_manager
     40 from autotest_lib.client.common_lib import packages
     41 from autotest_lib.client.cros import cros_logging
     42 from autotest_lib.client.tools import html_report
     43 
     44 GLOBAL_CONFIG = global_config.global_config
     45 
     46 LAST_BOOT_TAG = object()
     47 JOB_PREAMBLE = """
     48 from autotest_lib.client.common_lib.error import *
     49 from autotest_lib.client.bin.utils import *
     50 """
     51 
     52 
     53 class StepError(error.AutotestError):
     54     pass
     55 
     56 class NotAvailableError(error.AutotestError):
     57     pass
     58 
     59 
     60 
     61 def _run_test_complete_on_exit(f):
     62     """Decorator for job methods that automatically calls
     63     self.harness.run_test_complete when the method exits, if appropriate."""
     64     def wrapped(self, *args, **dargs):
     65         try:
     66             return f(self, *args, **dargs)
     67         finally:
     68             if self._logger.global_filename == 'status':
     69                 self.harness.run_test_complete()
     70                 if self.drop_caches:
     71                     utils.drop_caches()
     72     wrapped.__name__ = f.__name__
     73     wrapped.__doc__ = f.__doc__
     74     wrapped.__dict__.update(f.__dict__)
     75     return wrapped
     76 
     77 
     78 class status_indenter(base_job.status_indenter):
     79     """Provide a status indenter that is backed by job._record_prefix."""
     80     def __init__(self, job_):
     81         self._job = weakref.proxy(job_)  # avoid a circular reference
     82 
     83 
     84     @property
     85     def indent(self):
     86         return self._job._record_indent
     87 
     88 
     89     def increment(self):
     90         self._job._record_indent += 1
     91 
     92 
     93     def decrement(self):
     94         self._job._record_indent -= 1
     95 
     96 
     97 class base_client_job(base_job.base_job):
     98     """The client-side concrete implementation of base_job.
     99 
    100     Optional properties provided by this implementation:
    101         control
    102         harness
    103     """
    104 
    105     _WARNING_DISABLE_DELAY = 5
    106 
    107     # _record_indent is a persistent property, but only on the client
    108     _job_state = base_job.base_job._job_state
    109     _record_indent = _job_state.property_factory(
    110         '_state', '_record_indent', 0, namespace='client')
    111     _max_disk_usage_rate = _job_state.property_factory(
    112         '_state', '_max_disk_usage_rate', 0.0, namespace='client')
    113 
    114 
    115     def __init__(self, control, options, drop_caches=True):
    116         """
    117         Prepare a client side job object.
    118 
    119         @param control: The control file (pathname of).
    120         @param options: an object which includes:
    121                 jobtag: The job tag string (eg "default").
    122                 cont: If this is the continuation of this job.
    123                 harness_type: An alternative server harness.  [None]
    124                 use_external_logging: If true, the enable_external_logging
    125                           method will be called during construction.  [False]
    126         @param drop_caches: If true, utils.drop_caches() is called before and
    127                 between all tests.  [True]
    128         """
    129         super(base_client_job, self).__init__(options=options)
    130         self._pre_record_init(control, options)
    131         try:
    132             self._post_record_init(control, options, drop_caches)
    133         except Exception, err:
    134             self.record(
    135                     'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
    136                     str(err))
    137             raise
    138 
    139 
    140     @classmethod
    141     def _get_environ_autodir(cls):
    142         return os.environ['AUTODIR']
    143 
    144 
    145     @classmethod
    146     def _find_base_directories(cls):
    147         """
    148         Determine locations of autodir and clientdir (which are the same)
    149         using os.environ. Serverdir does not exist in this context.
    150         """
    151         autodir = clientdir = cls._get_environ_autodir()
    152         return autodir, clientdir, None
    153 
    154 
    155     @classmethod
    156     def _parse_args(cls, args):
    157         return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
    158 
    159 
    160     def _find_resultdir(self, options):
    161         """
    162         Determine the directory for storing results. On a client this is
    163         always <autodir>/results/<tag>, where tag is passed in on the command
    164         line as an option.
    165         """
    166         output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
    167                                                            'output_dir',
    168                                                             default="")
    169         if options.output_dir:
    170             basedir = options.output_dir
    171         elif output_dir_config:
    172             basedir = output_dir_config
    173         else:
    174             basedir = self.autodir
    175 
    176         return os.path.join(basedir, 'results', options.tag)
    177 
    178 
    179     def _get_status_logger(self):
    180         """Return a reference to the status logger."""
    181         return self._logger
    182 
    183 
    184     def _pre_record_init(self, control, options):
    185         """
    186         Initialization function that should peform ONLY the required
    187         setup so that the self.record() method works.
    188 
    189         As of now self.record() needs self.resultdir, self._group_level,
    190         self.harness and of course self._logger.
    191         """
    192         if not options.cont:
    193             self._cleanup_debugdir_files()
    194             self._cleanup_results_dir()
    195 
    196         logging_manager.configure_logging(
    197             client_logging_config.ClientLoggingConfig(),
    198             results_dir=self.resultdir,
    199             verbose=options.verbose)
    200         logging.info('Writing results to %s', self.resultdir)
    201 
    202         # init_group_level needs the state
    203         self.control = os.path.realpath(control)
    204         self._is_continuation = options.cont
    205         self._current_step_ancestry = []
    206         self._next_step_index = 0
    207         self._load_state()
    208 
    209         _harness = self.handle_persistent_option(options, 'harness')
    210         _harness_args = self.handle_persistent_option(options, 'harness_args')
    211 
    212         self.harness = harness.select(_harness, self, _harness_args)
    213 
    214         if self.control:
    215             parsed_control = control_data.parse_control(
    216                     self.control, raise_warnings=False)
    217             self.fast = parsed_control.fast
    218 
    219         # set up the status logger
    220         def client_job_record_hook(entry):
    221             msg_tag = ''
    222             if '.' in self._logger.global_filename:
    223                 msg_tag = self._logger.global_filename.split('.', 1)[1]
    224             # send the entry to the job harness
    225             message = '\n'.join([entry.message] + entry.extra_message_lines)
    226             rendered_entry = self._logger.render_entry(entry)
    227             self.harness.test_status_detail(entry.status_code, entry.subdir,
    228                                             entry.operation, message, msg_tag,
    229                                             entry.fields)
    230             self.harness.test_status(rendered_entry, msg_tag)
    231             # send the entry to stdout, if it's enabled
    232             logging.info(rendered_entry)
    233         self._logger = base_job.status_logger(
    234             self, status_indenter(self), record_hook=client_job_record_hook)
    235 
    236 
    237     def _post_record_init(self, control, options, drop_caches):
    238         """
    239         Perform job initialization not required by self.record().
    240         """
    241         self._init_drop_caches(drop_caches)
    242 
    243         self._init_packages()
    244 
    245         self.sysinfo = sysinfo.sysinfo(self.resultdir)
    246         self._load_sysinfo_state()
    247 
    248         if not options.cont:
    249             download = os.path.join(self.testdir, 'download')
    250             if not os.path.exists(download):
    251                 os.mkdir(download)
    252 
    253             shutil.copyfile(self.control,
    254                             os.path.join(self.resultdir, 'control'))
    255 
    256         self.control = control
    257 
    258         self.logging = logging_manager.get_logging_manager(
    259                 manage_stdout_and_stderr=True, redirect_fds=True)
    260         self.logging.start_logging()
    261 
    262         self.profilers = profilers.profilers(self)
    263 
    264         self.machines = [options.hostname]
    265         self.machine_dict_list = [{'hostname' : options.hostname}]
    266         # Client side tests should always run the same whether or not they are
    267         # running in the lab.
    268         self.in_lab = False
    269         self.hosts = set([local_host.LocalHost(hostname=options.hostname)])
    270 
    271         self.args = []
    272         if options.args:
    273             self.args = self._parse_args(options.args)
    274 
    275         if options.user:
    276             self.user = options.user
    277         else:
    278             self.user = getpass.getuser()
    279 
    280         self.sysinfo.log_per_reboot_data()
    281 
    282         if not options.cont:
    283             self.record('START', None, None)
    284 
    285         self.harness.run_start()
    286 
    287         if options.log:
    288             self.enable_external_logging()
    289 
    290         self.num_tests_run = None
    291         self.num_tests_failed = None
    292 
    293         self.warning_loggers = None
    294         self.warning_manager = None
    295 
    296 
    297     def _init_drop_caches(self, drop_caches):
    298         """
    299         Perform the drop caches initialization.
    300         """
    301         self.drop_caches_between_iterations = (
    302                                     GLOBAL_CONFIG.get_config_value('CLIENT',
    303                                     'drop_caches_between_iterations',
    304                                     type=bool, default=True))
    305         self.drop_caches = drop_caches
    306         if self.drop_caches:
    307             utils.drop_caches()
    308 
    309 
    310     def _init_packages(self):
    311         """
    312         Perform the packages support initialization.
    313         """
    314         self.pkgmgr = packages.PackageManager(
    315             self.autodir, run_function_dargs={'timeout':3600})
    316 
    317 
    318     def _cleanup_results_dir(self):
    319         """Delete everything in resultsdir"""
    320         assert os.path.exists(self.resultdir)
    321         list_files = glob.glob('%s/*' % self.resultdir)
    322         for f in list_files:
    323             if os.path.isdir(f):
    324                 shutil.rmtree(f)
    325             elif os.path.isfile(f):
    326                 os.remove(f)
    327 
    328 
    329     def _cleanup_debugdir_files(self):
    330         """
    331         Delete any leftover debugdir files
    332         """
    333         list_files = glob.glob("/tmp/autotest_results_dir.*")
    334         for f in list_files:
    335             os.remove(f)
    336 
    337 
    338     def disable_warnings(self, warning_type):
    339         self.record("INFO", None, None,
    340                     "disabling %s warnings" % warning_type,
    341                     {"warnings.disable": warning_type})
    342         time.sleep(self._WARNING_DISABLE_DELAY)
    343 
    344 
    345     def enable_warnings(self, warning_type):
    346         time.sleep(self._WARNING_DISABLE_DELAY)
    347         self.record("INFO", None, None,
    348                     "enabling %s warnings" % warning_type,
    349                     {"warnings.enable": warning_type})
    350 
    351 
    352     def monitor_disk_usage(self, max_rate):
    353         """\
    354         Signal that the job should monitor disk space usage on /
    355         and generate a warning if a test uses up disk space at a
    356         rate exceeding 'max_rate'.
    357 
    358         Parameters:
    359              max_rate - the maximium allowed rate of disk consumption
    360                         during a test, in MB/hour, or 0 to indicate
    361                         no limit.
    362         """
    363         self._max_disk_usage_rate = max_rate
    364 
    365 
    366     def control_get(self):
    367         return self.control
    368 
    369 
    370     def control_set(self, control):
    371         self.control = os.path.abspath(control)
    372 
    373 
    374     def harness_select(self, which, harness_args):
    375         self.harness = harness.select(which, self, harness_args)
    376 
    377 
    378     def setup_dirs(self, results_dir, tmp_dir):
    379         if not tmp_dir:
    380             tmp_dir = os.path.join(self.tmpdir, 'build')
    381         if not os.path.exists(tmp_dir):
    382             os.mkdir(tmp_dir)
    383         if not os.path.isdir(tmp_dir):
    384             e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
    385             raise ValueError(e_msg)
    386 
    387         # We label the first build "build" and then subsequent ones
    388         # as "build.2", "build.3", etc. Whilst this is a little bit
    389         # inconsistent, 99.9% of jobs will only have one build
    390         # (that's not done as kernbench, sparse, or buildtest),
    391         # so it works out much cleaner. One of life's compromises.
    392         if not results_dir:
    393             results_dir = os.path.join(self.resultdir, 'build')
    394             i = 2
    395             while os.path.exists(results_dir):
    396                 results_dir = os.path.join(self.resultdir, 'build.%d' % i)
    397                 i += 1
    398         if not os.path.exists(results_dir):
    399             os.mkdir(results_dir)
    400 
    401         return (results_dir, tmp_dir)
    402 
    403 
    404     def barrier(self, *args, **kwds):
    405         """Create a barrier object"""
    406         return barrier.barrier(*args, **kwds)
    407 
    408 
    409     def install_pkg(self, name, pkg_type, install_dir):
    410         '''
    411         This method is a simple wrapper around the actual package
    412         installation method in the Packager class. This is used
    413         internally by the profilers, deps and tests code.
    414         name : name of the package (ex: sleeptest, dbench etc.)
    415         pkg_type : Type of the package (ex: test, dep etc.)
    416         install_dir : The directory in which the source is actually
    417                       untarred into. (ex: client/profilers/<name> for profilers)
    418         '''
    419         if self.pkgmgr.repositories:
    420             self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
    421 
    422 
    423     def add_repository(self, repo_urls):
    424         '''
    425         Adds the repository locations to the job so that packages
    426         can be fetched from them when needed. The repository list
    427         needs to be a string list
    428         Ex: job.add_repository(['http://blah1','http://blah2'])
    429         '''
    430         for repo_url in repo_urls:
    431             self.pkgmgr.add_repository(repo_url)
    432 
    433         # Fetch the packages' checksum file that contains the checksums
    434         # of all the packages if it is not already fetched. The checksum
    435         # is always fetched whenever a job is first started. This
    436         # is not done in the job's constructor as we don't have the list of
    437         # the repositories there (and obviously don't care about this file
    438         # if we are not using the repos)
    439         try:
    440             checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
    441                                               packages.CHECKSUM_FILE)
    442             self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,
    443                                   checksum_file_path, use_checksum=False)
    444         except error.PackageFetchError:
    445             # packaging system might not be working in this case
    446             # Silently fall back to the normal case
    447             pass
    448 
    449 
    450     def require_gcc(self):
    451         """
    452         Test whether gcc is installed on the machine.
    453         """
    454         # check if gcc is installed on the system.
    455         try:
    456             utils.system('which gcc')
    457         except error.CmdError:
    458             raise NotAvailableError('gcc is required by this job and is '
    459                                     'not available on the system')
    460 
    461 
    462     def setup_dep(self, deps):
    463         """Set up the dependencies for this test.
    464         deps is a list of libraries required for this test.
    465         """
    466         # Fetch the deps from the repositories and set them up.
    467         for dep in deps:
    468             dep_dir = os.path.join(self.autodir, 'deps', dep)
    469             # Search for the dependency in the repositories if specified,
    470             # else check locally.
    471             try:
    472                 self.install_pkg(dep, 'dep', dep_dir)
    473             except error.PackageInstallError:
    474                 # see if the dep is there locally
    475                 pass
    476 
    477             # dep_dir might not exist if it is not fetched from the repos
    478             if not os.path.exists(dep_dir):
    479                 raise error.TestError("Dependency %s does not exist" % dep)
    480 
    481             os.chdir(dep_dir)
    482             if execfile('%s.py' % dep, {}) is None:
    483                 logging.info('Dependency %s successfuly built', dep)
    484 
    485 
    486     def _runtest(self, url, tag, timeout, args, dargs):
    487         try:
    488             l = lambda : test.runtest(self, url, tag, args, dargs)
    489             pid = parallel.fork_start(self.resultdir, l)
    490 
    491             if timeout:
    492                 logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
    493                 parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
    494             else:
    495                 parallel.fork_waitfor(self.resultdir, pid)
    496 
    497         except error.TestBaseException:
    498             # These are already classified with an error type (exit_status)
    499             raise
    500         except error.JobError:
    501             raise  # Caught further up and turned into an ABORT.
    502         except Exception, e:
    503             # Converts all other exceptions thrown by the test regardless
    504             # of phase into a TestError(TestBaseException) subclass that
    505             # reports them with their full stack trace.
    506             raise error.UnhandledTestError(e)
    507 
    508 
    509     def _run_test_base(self, url, *args, **dargs):
    510         """
    511         Prepares arguments and run functions to run_test and run_test_detail.
    512 
    513         @param url A url that identifies the test to run.
    514         @param tag An optional keyword argument that will be added to the
    515             test and subdir name.
    516         @param subdir_tag An optional keyword argument that will be added
    517             to the subdir name.
    518 
    519         @returns:
    520                 subdir: Test subdirectory
    521                 testname: Test name
    522                 group_func: Actual test run function
    523                 timeout: Test timeout
    524         """
    525         _group, testname = self.pkgmgr.get_package_name(url, 'test')
    526         testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
    527         self._make_test_outputdir(subdir)
    528 
    529         timeout = dargs.pop('timeout', None)
    530         if timeout:
    531             logging.debug('Test has timeout: %d sec.', timeout)
    532 
    533         def log_warning(reason):
    534             self.record("WARN", subdir, testname, reason)
    535         @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
    536         def group_func():
    537             try:
    538                 self._runtest(url, tag, timeout, args, dargs)
    539             except error.TestBaseException, detail:
    540                 # The error is already classified, record it properly.
    541                 self.record(detail.exit_status, subdir, testname, str(detail))
    542                 raise
    543             else:
    544                 self.record('GOOD', subdir, testname, 'completed successfully')
    545 
    546         return (subdir, testname, group_func, timeout)
    547 
    548 
    549     @_run_test_complete_on_exit
    550     def run_test(self, url, *args, **dargs):
    551         """
    552         Summon a test object and run it.
    553 
    554         @param url A url that identifies the test to run.
    555         @param tag An optional keyword argument that will be added to the
    556             test and subdir name.
    557         @param subdir_tag An optional keyword argument that will be added
    558             to the subdir name.
    559 
    560         @returns True if the test passes, False otherwise.
    561         """
    562         (subdir, testname, group_func, timeout) = self._run_test_base(url,
    563                                                                       *args,
    564                                                                       **dargs)
    565         try:
    566             self._rungroup(subdir, testname, group_func, timeout)
    567             return True
    568         except error.TestBaseException:
    569             return False
    570         # Any other exception here will be given to the caller
    571         #
    572         # NOTE: The only exception possible from the control file here
    573         # is error.JobError as _runtest() turns all others into an
    574         # UnhandledTestError that is caught above.
    575 
    576 
    577     @_run_test_complete_on_exit
    578     def run_test_detail(self, url, *args, **dargs):
    579         """
    580         Summon a test object and run it, returning test status.
    581 
    582         @param url A url that identifies the test to run.
    583         @param tag An optional keyword argument that will be added to the
    584             test and subdir name.
    585         @param subdir_tag An optional keyword argument that will be added
    586             to the subdir name.
    587 
    588         @returns Test status
    589         @see: client/common_lib/error.py, exit_status
    590         """
    591         (subdir, testname, group_func, timeout) = self._run_test_base(url,
    592                                                                       *args,
    593                                                                       **dargs)
    594         try:
    595             self._rungroup(subdir, testname, group_func, timeout)
    596             return 'GOOD'
    597         except error.TestBaseException, detail:
    598             return detail.exit_status
    599 
    600 
    601     def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
    602         """\
    603         subdir:
    604                 name of the group
    605         testname:
    606                 name of the test to run, or support step
    607         function:
    608                 subroutine to run
    609         *args:
    610                 arguments for the function
    611 
    612         Returns the result of the passed in function
    613         """
    614 
    615         try:
    616             optional_fields = None
    617             if timeout:
    618                 optional_fields = {}
    619                 optional_fields['timeout'] = timeout
    620             self.record('START', subdir, testname,
    621                         optional_fields=optional_fields)
    622 
    623             self._state.set('client', 'unexpected_reboot', (subdir, testname))
    624             try:
    625                 result = function(*args, **dargs)
    626                 self.record('END GOOD', subdir, testname)
    627                 return result
    628             except error.TestBaseException, e:
    629                 self.record('END %s' % e.exit_status, subdir, testname)
    630                 raise
    631             except error.JobError, e:
    632                 self.record('END ABORT', subdir, testname)
    633                 raise
    634             except Exception, e:
    635                 # This should only ever happen due to a bug in the given
    636                 # function's code.  The common case of being called by
    637                 # run_test() will never reach this.  If a control file called
    638                 # run_group() itself, bugs in its function will be caught
    639                 # here.
    640                 err_msg = str(e) + '\n' + traceback.format_exc()
    641                 self.record('END ERROR', subdir, testname, err_msg)
    642                 raise
    643         finally:
    644             self._state.discard('client', 'unexpected_reboot')
    645 
    646 
    647     def run_group(self, function, tag=None, **dargs):
    648         """
    649         Run a function nested within a group level.
    650 
    651         function:
    652                 Callable to run.
    653         tag:
    654                 An optional tag name for the group.  If None (default)
    655                 function.__name__ will be used.
    656         **dargs:
    657                 Named arguments for the function.
    658         """
    659         if tag:
    660             name = tag
    661         else:
    662             name = function.__name__
    663 
    664         try:
    665             return self._rungroup(subdir=None, testname=name,
    666                                   function=function, timeout=None, **dargs)
    667         except (SystemExit, error.TestBaseException):
    668             raise
    669         # If there was a different exception, turn it into a TestError.
    670         # It will be caught by step_engine or _run_step_fn.
    671         except Exception, e:
    672             raise error.UnhandledTestError(e)
    673 
    674 
    675     def cpu_count(self):
    676         return utils.count_cpus()  # use total system count
    677 
    678 
    679     def start_reboot(self):
    680         self.record('START', None, 'reboot')
    681         self.record('GOOD', None, 'reboot.start')
    682 
    683 
    684     def _record_reboot_failure(self, subdir, operation, status,
    685                                running_id=None):
    686         self.record("ABORT", subdir, operation, status)
    687         if not running_id:
    688             running_id = utils.running_os_ident()
    689         kernel = {"kernel": running_id.split("::")[0]}
    690         self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
    691 
    692 
    693     def _check_post_reboot(self, subdir, running_id=None):
    694         """
    695         Function to perform post boot checks such as if the system configuration
    696         has changed across reboots (specifically, CPUs and partitions).
    697 
    698         @param subdir: The subdir to use in the job.record call.
    699         @param running_id: An optional running_id to include in the reboot
    700             failure log message
    701 
    702         @raise JobError: Raised if the current configuration does not match the
    703             pre-reboot configuration.
    704         """
    705         # check to see if any partitions have changed
    706         partition_list = partition_lib.get_partition_list(self,
    707                                                           exclude_swap=False)
    708         mount_info = partition_lib.get_mount_info(partition_list)
    709         old_mount_info = self._state.get('client', 'mount_info')
    710         if mount_info != old_mount_info:
    711             new_entries = mount_info - old_mount_info
    712             old_entries = old_mount_info - mount_info
    713             description = ("mounted partitions are different after reboot "
    714                            "(old entries: %s, new entries: %s)" %
    715                            (old_entries, new_entries))
    716             self._record_reboot_failure(subdir, "reboot.verify_config",
    717                                         description, running_id=running_id)
    718             raise error.JobError("Reboot failed: %s" % description)
    719 
    720         # check to see if any CPUs have changed
    721         cpu_count = utils.count_cpus()
    722         old_count = self._state.get('client', 'cpu_count')
    723         if cpu_count != old_count:
    724             description = ('Number of CPUs changed after reboot '
    725                            '(old count: %d, new count: %d)' %
    726                            (old_count, cpu_count))
    727             self._record_reboot_failure(subdir, 'reboot.verify_config',
    728                                         description, running_id=running_id)
    729             raise error.JobError('Reboot failed: %s' % description)
    730 
    731 
    732     def partition(self, device, loop_size=0, mountpoint=None):
    733         """
    734         Work with a machine partition
    735 
    736             @param device: e.g. /dev/sda2, /dev/sdb1 etc...
    737             @param mountpoint: Specify a directory to mount to. If not specified
    738                                autotest tmp directory will be used.
    739             @param loop_size: Size of loopback device (in MB). Defaults to 0.
    740 
    741             @return: A L{client.bin.partition.partition} object
    742         """
    743 
    744         if not mountpoint:
    745             mountpoint = self.tmpdir
    746         return partition_lib.partition(self, device, loop_size, mountpoint)
    747 
    748     @utils.deprecated
    749     def filesystem(self, device, mountpoint=None, loop_size=0):
    750         """ Same as partition
    751 
    752         @deprecated: Use partition method instead
    753         """
    754         return self.partition(device, loop_size, mountpoint)
    755 
    756 
    757     def enable_external_logging(self):
    758         pass
    759 
    760 
    761     def disable_external_logging(self):
    762         pass
    763 
    764 
    765     def reboot_setup(self):
    766         # save the partition list and mount points, as well as the cpu count
    767         partition_list = partition_lib.get_partition_list(self,
    768                                                           exclude_swap=False)
    769         mount_info = partition_lib.get_mount_info(partition_list)
    770         self._state.set('client', 'mount_info', mount_info)
    771         self._state.set('client', 'cpu_count', utils.count_cpus())
    772 
    773 
    774     def reboot(self):
    775         self.reboot_setup()
    776         self.harness.run_reboot()
    777 
    778         # HACK: using this as a module sometimes hangs shutdown, so if it's
    779         # installed unload it first
    780         utils.system("modprobe -r netconsole", ignore_status=True)
    781 
    782         # sync first, so that a sync during shutdown doesn't time out
    783         utils.system("sync; sync", ignore_status=True)
    784 
    785         utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
    786         self.quit()
    787 
    788 
    789     def noop(self, text):
    790         logging.info("job: noop: " + text)
    791 
    792 
    793     @_run_test_complete_on_exit
    794     def parallel(self, *tasklist):
    795         """Run tasks in parallel"""
    796 
    797         pids = []
    798         old_log_filename = self._logger.global_filename
    799         for i, task in enumerate(tasklist):
    800             assert isinstance(task, (tuple, list))
    801             self._logger.global_filename = old_log_filename + (".%d" % i)
    802             def task_func():
    803                 # stub out _record_indent with a process-local one
    804                 base_record_indent = self._record_indent
    805                 proc_local = self._job_state.property_factory(
    806                     '_state', '_record_indent.%d' % os.getpid(),
    807                     base_record_indent, namespace='client')
    808                 self.__class__._record_indent = proc_local
    809                 task[0](*task[1:])
    810             pids.append(parallel.fork_start(self.resultdir, task_func))
    811 
    812         old_log_path = os.path.join(self.resultdir, old_log_filename)
    813         old_log = open(old_log_path, "a")
    814         exceptions = []
    815         for i, pid in enumerate(pids):
    816             # wait for the task to finish
    817             try:
    818                 parallel.fork_waitfor(self.resultdir, pid)
    819             except Exception, e:
    820                 exceptions.append(e)
    821             # copy the logs from the subtask into the main log
    822             new_log_path = old_log_path + (".%d" % i)
    823             if os.path.exists(new_log_path):
    824                 new_log = open(new_log_path)
    825                 old_log.write(new_log.read())
    826                 new_log.close()
    827                 old_log.flush()
    828                 os.remove(new_log_path)
    829         old_log.close()
    830 
    831         self._logger.global_filename = old_log_filename
    832 
    833         # handle any exceptions raised by the parallel tasks
    834         if exceptions:
    835             msg = "%d task(s) failed in job.parallel" % len(exceptions)
    836             raise error.JobError(msg)
    837 
    838 
    839     def quit(self):
    840         # XXX: should have a better name.
    841         self.harness.run_pause()
    842         raise error.JobContinue("more to come")
    843 
    844 
    845     def complete(self, status):
    846         """Write pending reports, clean up, and exit"""
    847         # write out a job HTML report
    848         try:
    849             html_report.create_report(self.resultdir)
    850         except Exception, e:
    851             logging.error("Error writing job HTML report: %s", e)
    852 
    853         # We are about to exit 'complete' so clean up the control file.
    854         dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
    855         shutil.move(self._state_file, dest)
    856 
    857         self.harness.run_complete()
    858         self.disable_external_logging()
    859         sys.exit(status)
    860 
    861 
    862     def _load_state(self):
    863         # grab any initial state and set up $CONTROL.state as the backing file
    864         init_state_file = self.control + '.init.state'
    865         self._state_file = self.control + '.state'
    866         if os.path.exists(init_state_file):
    867             shutil.move(init_state_file, self._state_file)
    868         self._state.set_backing_file(self._state_file)
    869 
    870         # initialize the state engine, if necessary
    871         has_steps = self._state.has('client', 'steps')
    872         if not self._is_continuation and has_steps:
    873             raise RuntimeError('Loaded state can only contain client.steps if '
    874                                'this is a continuation')
    875 
    876         if not has_steps:
    877             logging.debug('Initializing the state engine')
    878             self._state.set('client', 'steps', [])
    879 
    880 
    881     def handle_persistent_option(self, options, option_name):
    882         """
    883         Select option from command line or persistent state.
    884         Store selected option to allow standalone client to continue
    885         after reboot with previously selected options.
    886         Priority:
    887         1. explicitly specified via command line
    888         2. stored in state file (if continuing job '-c')
    889         3. default == None
    890         """
    891         option = None
    892         cmd_line_option = getattr(options, option_name)
    893         if cmd_line_option:
    894             option = cmd_line_option
    895             self._state.set('client', option_name, option)
    896         else:
    897             stored_option = self._state.get('client', option_name, None)
    898             if stored_option:
    899                 option = stored_option
    900         logging.debug('Persistent option %s now set to %s', option_name, option)
    901         return option
    902 
    903 
    904     def __create_step_tuple(self, fn, args, dargs):
    905         # Legacy code passes in an array where the first arg is
    906         # the function or its name.
    907         if isinstance(fn, list):
    908             assert(len(args) == 0)
    909             assert(len(dargs) == 0)
    910             args = fn[1:]
    911             fn = fn[0]
    912         # Pickling actual functions is hairy, thus we have to call
    913         # them by name.  Unfortunately, this means only functions
    914         # defined globally can be used as a next step.
    915         if callable(fn):
    916             fn = fn.__name__
    917         if not isinstance(fn, types.StringTypes):
    918             raise StepError("Next steps must be functions or "
    919                             "strings containing the function name")
    920         ancestry = copy.copy(self._current_step_ancestry)
    921         return (ancestry, fn, args, dargs)
    922 
    923 
    924     def next_step_append(self, fn, *args, **dargs):
    925         """Define the next step and place it at the end"""
    926         steps = self._state.get('client', 'steps')
    927         steps.append(self.__create_step_tuple(fn, args, dargs))
    928         self._state.set('client', 'steps', steps)
    929 
    930 
    931     def next_step(self, fn, *args, **dargs):
    932         """Create a new step and place it after any steps added
    933         while running the current step but before any steps added in
    934         previous steps"""
    935         steps = self._state.get('client', 'steps')
    936         steps.insert(self._next_step_index,
    937                      self.__create_step_tuple(fn, args, dargs))
    938         self._next_step_index += 1
    939         self._state.set('client', 'steps', steps)
    940 
    941 
    942     def next_step_prepend(self, fn, *args, **dargs):
    943         """Insert a new step, executing first"""
    944         steps = self._state.get('client', 'steps')
    945         steps.insert(0, self.__create_step_tuple(fn, args, dargs))
    946         self._next_step_index += 1
    947         self._state.set('client', 'steps', steps)
    948 
    949 
    950 
    951     def _run_step_fn(self, local_vars, fn, args, dargs):
    952         """Run a (step) function within the given context"""
    953 
    954         local_vars['__args'] = args
    955         local_vars['__dargs'] = dargs
    956         try:
    957             exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
    958             return local_vars['__ret']
    959         except SystemExit:
    960             raise  # Send error.JobContinue and JobComplete on up to runjob.
    961         except error.TestNAError, detail:
    962             self.record(detail.exit_status, None, fn, str(detail))
    963         except Exception, detail:
    964             raise error.UnhandledJobError(detail)
    965 
    966 
    967     def _create_frame(self, global_vars, ancestry, fn_name):
    968         """Set up the environment like it would have been when this
    969         function was first defined.
    970 
    971         Child step engine 'implementations' must have 'return locals()'
    972         at end end of their steps.  Because of this, we can call the
    973         parent function and get back all child functions (i.e. those
    974         defined within it).
    975 
    976         Unfortunately, the call stack of the function calling
    977         job.next_step might have been deeper than the function it
    978         added.  In order to make sure that the environment is what it
    979         should be, we need to then pop off the frames we built until
    980         we find the frame where the function was first defined."""
    981 
    982         # The copies ensure that the parent frames are not modified
    983         # while building child frames.  This matters if we then
    984         # pop some frames in the next part of this function.
    985         current_frame = copy.copy(global_vars)
    986         frames = [current_frame]
    987         for steps_fn_name in ancestry:
    988             ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
    989             current_frame = copy.copy(ret)
    990             frames.append(current_frame)
    991 
    992         # Walk up the stack frames until we find the place fn_name was defined.
    993         while len(frames) > 2:
    994             if fn_name not in frames[-2]:
    995                 break
    996             if frames[-2][fn_name] != frames[-1][fn_name]:
    997                 break
    998             frames.pop()
    999             ancestry.pop()
   1000 
   1001         return (frames[-1], ancestry)
   1002 
   1003 
   1004     def _add_step_init(self, local_vars, current_function):
   1005         """If the function returned a dictionary that includes a
   1006         function named 'step_init', prepend it to our list of steps.
   1007         This will only get run the first time a function with a nested
   1008         use of the step engine is run."""
   1009 
   1010         if (isinstance(local_vars, dict) and
   1011             'step_init' in local_vars and
   1012             callable(local_vars['step_init'])):
   1013             # The init step is a child of the function
   1014             # we were just running.
   1015             self._current_step_ancestry.append(current_function)
   1016             self.next_step_prepend('step_init')
   1017 
   1018 
   1019     def step_engine(self):
   1020         """The multi-run engine used when the control file defines step_init.
   1021 
   1022         Does the next step.
   1023         """
   1024 
   1025         # Set up the environment and then interpret the control file.
   1026         # Some control files will have code outside of functions,
   1027         # which means we need to have our state engine initialized
   1028         # before reading in the file.
   1029         global_control_vars = {'job': self,
   1030                                'args': self.args}
   1031         exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
   1032         try:
   1033             execfile(self.control, global_control_vars, global_control_vars)
   1034         except error.TestNAError, detail:
   1035             self.record(detail.exit_status, None, self.control, str(detail))
   1036         except SystemExit:
   1037             raise  # Send error.JobContinue and JobComplete on up to runjob.
   1038         except Exception, detail:
   1039             # Syntax errors or other general Python exceptions coming out of
   1040             # the top level of the control file itself go through here.
   1041             raise error.UnhandledJobError(detail)
   1042 
   1043         # If we loaded in a mid-job state file, then we presumably
   1044         # know what steps we have yet to run.
   1045         if not self._is_continuation:
   1046             if 'step_init' in global_control_vars:
   1047                 self.next_step(global_control_vars['step_init'])
   1048         else:
   1049             # if last job failed due to unexpected reboot, record it as fail
   1050             # so harness gets called
   1051             last_job = self._state.get('client', 'unexpected_reboot', None)
   1052             if last_job:
   1053                 subdir, testname = last_job
   1054                 self.record('FAIL', subdir, testname, 'unexpected reboot')
   1055                 self.record('END FAIL', subdir, testname)
   1056 
   1057         # Iterate through the steps.  If we reboot, we'll simply
   1058         # continue iterating on the next step.
   1059         while len(self._state.get('client', 'steps')) > 0:
   1060             steps = self._state.get('client', 'steps')
   1061             (ancestry, fn_name, args, dargs) = steps.pop(0)
   1062             self._state.set('client', 'steps', steps)
   1063 
   1064             self._next_step_index = 0
   1065             ret = self._create_frame(global_control_vars, ancestry, fn_name)
   1066             local_vars, self._current_step_ancestry = ret
   1067             local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
   1068             self._add_step_init(local_vars, fn_name)
   1069 
   1070 
   1071     def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
   1072         self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
   1073                                    on_every_test)
   1074 
   1075 
   1076     def add_sysinfo_logfile(self, file, on_every_test=False):
   1077         self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
   1078 
   1079 
   1080     def _add_sysinfo_loggable(self, loggable, on_every_test):
   1081         if on_every_test:
   1082             self.sysinfo.test_loggables.add(loggable)
   1083         else:
   1084             self.sysinfo.boot_loggables.add(loggable)
   1085         self._save_sysinfo_state()
   1086 
   1087 
   1088     def _load_sysinfo_state(self):
   1089         state = self._state.get('client', 'sysinfo', None)
   1090         if state:
   1091             self.sysinfo.deserialize(state)
   1092 
   1093 
   1094     def _save_sysinfo_state(self):
   1095         state = self.sysinfo.serialize()
   1096         self._state.set('client', 'sysinfo', state)
   1097 
   1098 
   1099 class disk_usage_monitor:
   1100     def __init__(self, logging_func, device, max_mb_per_hour):
   1101         self.func = logging_func
   1102         self.device = device
   1103         self.max_mb_per_hour = max_mb_per_hour
   1104 
   1105 
   1106     def start(self):
   1107         self.initial_space = utils.freespace(self.device)
   1108         self.start_time = time.time()
   1109 
   1110 
   1111     def stop(self):
   1112         # if no maximum usage rate was set, we don't need to
   1113         # generate any warnings
   1114         if not self.max_mb_per_hour:
   1115             return
   1116 
   1117         final_space = utils.freespace(self.device)
   1118         used_space = self.initial_space - final_space
   1119         stop_time = time.time()
   1120         total_time = stop_time - self.start_time
   1121         # round up the time to one minute, to keep extremely short
   1122         # tests from generating false positives due to short, badly
   1123         # timed bursts of activity
   1124         total_time = max(total_time, 60.0)
   1125 
   1126         # determine the usage rate
   1127         bytes_per_sec = used_space / total_time
   1128         mb_per_sec = bytes_per_sec / 1024**2
   1129         mb_per_hour = mb_per_sec * 60 * 60
   1130 
   1131         if mb_per_hour > self.max_mb_per_hour:
   1132             msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
   1133             msg %= (self.device, mb_per_hour)
   1134             self.func(msg)
   1135 
   1136 
   1137     @classmethod
   1138     def watch(cls, *monitor_args, **monitor_dargs):
   1139         """ Generic decorator to wrap a function call with the
   1140         standard create-monitor -> start -> call -> stop idiom."""
   1141         def decorator(func):
   1142             def watched_func(*args, **dargs):
   1143                 monitor = cls(*monitor_args, **monitor_dargs)
   1144                 monitor.start()
   1145                 try:
   1146                     func(*args, **dargs)
   1147                 finally:
   1148                     monitor.stop()
   1149             return watched_func
   1150         return decorator
   1151 
   1152 
   1153 def runjob(control, drop_caches, options):
   1154     """
   1155     Run a job using the given control file.
   1156 
   1157     This is the main interface to this module.
   1158 
   1159     @see base_job.__init__ for parameter info.
   1160     """
   1161     control = os.path.abspath(control)
   1162     state = control + '.state'
   1163     # Ensure state file is cleaned up before the job starts to run if autotest
   1164     # is not running with the --continue flag
   1165     if not options.cont and os.path.isfile(state):
   1166         logging.debug('Cleaning up previously found state file')
   1167         os.remove(state)
   1168 
   1169     # instantiate the job object ready for the control file.
   1170     myjob = None
   1171     try:
   1172         # Check that the control file is valid
   1173         if not os.path.exists(control):
   1174             raise error.JobError(control + ": control file not found")
   1175 
   1176         # When continuing, the job is complete when there is no
   1177         # state file, ensure we don't try and continue.
   1178         if options.cont and not os.path.exists(state):
   1179             raise error.JobComplete("all done")
   1180 
   1181         myjob = job(control=control, drop_caches=drop_caches, options=options)
   1182 
   1183         # Load in the users control file, may do any one of:
   1184         #  1) execute in toto
   1185         #  2) define steps, and select the first via next_step()
   1186         myjob.step_engine()
   1187 
   1188     except error.JobContinue:
   1189         sys.exit(5)
   1190 
   1191     except error.JobComplete:
   1192         sys.exit(1)
   1193 
   1194     except error.JobError, instance:
   1195         logging.error("JOB ERROR: " + str(instance))
   1196         if myjob:
   1197             command = None
   1198             if len(instance.args) > 1:
   1199                 command = instance.args[1]
   1200                 myjob.record('ABORT', None, command, str(instance))
   1201             myjob.record('END ABORT', None, None, str(instance))
   1202             assert myjob._record_indent == 0
   1203             myjob.complete(1)
   1204         else:
   1205             sys.exit(1)
   1206 
   1207     except Exception, e:
   1208         # NOTE: job._run_step_fn and job.step_engine will turn things into
   1209         # a JobError for us.  If we get here, its likely an autotest bug.
   1210         msg = str(e) + '\n' + traceback.format_exc()
   1211         logging.critical("JOB ERROR (autotest bug?): " + msg)
   1212         if myjob:
   1213             myjob.record('END ABORT', None, None, msg)
   1214             assert myjob._record_indent == 0
   1215             myjob.complete(1)
   1216         else:
   1217             sys.exit(1)
   1218 
   1219     # If we get here, then we assume the job is complete and good.
   1220     myjob.record('END GOOD', None, None)
   1221     assert myjob._record_indent == 0
   1222 
   1223     myjob.complete(0)
   1224 
   1225 
   1226 class job(base_client_job):
   1227 
   1228     def __init__(self, *args, **kwargs):
   1229         base_client_job.__init__(self, *args, **kwargs)
   1230 
   1231 
   1232     def run_test(self, url, *args, **dargs):
   1233         log_pauser = cros_logging.LogRotationPauser()
   1234         passed = False
   1235         try:
   1236             log_pauser.begin()
   1237             passed = base_client_job.run_test(self, url, *args, **dargs)
   1238             if not passed:
   1239                 # Save the VM state immediately after the test failure.
   1240                 # This is a NOOP if the the test isn't running in a VM or
   1241                 # if the VM is not properly configured to save state.
   1242                 _group, testname = self.pkgmgr.get_package_name(url, 'test')
   1243                 now = datetime.now().strftime('%I:%M:%S.%f')
   1244                 checkpoint_name = '%s-%s' % (testname, now)
   1245                 utils.save_vm_state(checkpoint_name)
   1246         finally:
   1247             log_pauser.end()
   1248         return passed
   1249 
   1250 
   1251     def reboot(self):
   1252         self.reboot_setup()
   1253         self.harness.run_reboot()
   1254 
   1255         # sync first, so that a sync during shutdown doesn't time out
   1256         utils.system('sync; sync', ignore_status=True)
   1257 
   1258         utils.system('reboot </dev/null >/dev/null 2>&1 &')
   1259         self.quit()
   1260 
   1261 
   1262     def require_gcc(self):
   1263         return False
   1264