Home | History | Annotate | Download | only in bin
      1 """The main job wrapper
      2 
      3 This is the core infrastructure.
      4 
      5 Copyright Andy Whitcroft, Martin J. Bligh 2006
      6 """
      7 
      8 import copy, os, re, shutil, sys, time, traceback, types, glob
      9 import logging, getpass, weakref
     10 from autotest_lib.client.bin import client_logging_config
     11 from autotest_lib.client.bin import utils, parallel, kernel, xen
     12 from autotest_lib.client.bin import profilers, boottool, harness
     13 from autotest_lib.client.bin import config, sysinfo, test, local_host
     14 from autotest_lib.client.bin import partition as partition_lib
     15 from autotest_lib.client.common_lib import base_job
     16 from autotest_lib.client.common_lib import error, barrier, logging_manager
     17 from autotest_lib.client.common_lib import base_packages, packages
     18 from autotest_lib.client.common_lib import global_config
     19 from autotest_lib.client.tools import html_report
     20 
     21 GLOBAL_CONFIG = global_config.global_config
     22 
     23 LAST_BOOT_TAG = object()
     24 JOB_PREAMBLE = """
     25 from autotest_lib.client.common_lib.error import *
     26 from autotest_lib.client.bin.utils import *
     27 """
     28 
     29 
     30 class StepError(error.AutotestError):
     31     pass
     32 
     33 class NotAvailableError(error.AutotestError):
     34     pass
     35 
     36 
     37 
     38 def _run_test_complete_on_exit(f):
     39     """Decorator for job methods that automatically calls
     40     self.harness.run_test_complete when the method exits, if appropriate."""
     41     def wrapped(self, *args, **dargs):
     42         try:
     43             return f(self, *args, **dargs)
     44         finally:
     45             if self._logger.global_filename == 'status':
     46                 self.harness.run_test_complete()
     47                 if self.drop_caches:
     48                     utils.drop_caches()
     49     wrapped.__name__ = f.__name__
     50     wrapped.__doc__ = f.__doc__
     51     wrapped.__dict__.update(f.__dict__)
     52     return wrapped
     53 
     54 
     55 class status_indenter(base_job.status_indenter):
     56     """Provide a status indenter that is backed by job._record_prefix."""
     57     def __init__(self, job):
     58         self.job = weakref.proxy(job)  # avoid a circular reference
     59 
     60 
     61     @property
     62     def indent(self):
     63         return self.job._record_indent
     64 
     65 
     66     def increment(self):
     67         self.job._record_indent += 1
     68 
     69 
     70     def decrement(self):
     71         self.job._record_indent -= 1
     72 
     73 
     74 class base_client_job(base_job.base_job):
     75     """The client-side concrete implementation of base_job.
     76 
     77     Optional properties provided by this implementation:
     78         control
     79         bootloader
     80         harness
     81     """
     82 
     83     _WARNING_DISABLE_DELAY = 5
     84 
     85     # _record_indent is a persistent property, but only on the client
     86     _job_state = base_job.base_job._job_state
     87     _record_indent = _job_state.property_factory(
     88         '_state', '_record_indent', 0, namespace='client')
     89     _max_disk_usage_rate = _job_state.property_factory(
     90         '_state', '_max_disk_usage_rate', 0.0, namespace='client')
     91 
     92 
     93     def __init__(self, control, options, drop_caches=True,
     94                  extra_copy_cmdline=None):
     95         """
     96         Prepare a client side job object.
     97 
     98         @param control: The control file (pathname of).
     99         @param options: an object which includes:
    100                 jobtag: The job tag string (eg "default").
    101                 cont: If this is the continuation of this job.
    102                 harness_type: An alternative server harness.  [None]
    103                 use_external_logging: If true, the enable_external_logging
    104                           method will be called during construction.  [False]
    105         @param drop_caches: If true, utils.drop_caches() is called before and
    106                 between all tests.  [True]
    107         @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
    108                 copy from the running kernel to all the installed kernels with
    109                 this job
    110         """
    111         super(base_client_job, self).__init__(options=options)
    112         self._pre_record_init(control, options)
    113         try:
    114             self._post_record_init(control, options, drop_caches,
    115                                    extra_copy_cmdline)
    116         except Exception, err:
    117             self.record(
    118                     'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
    119                     str(err))
    120             raise
    121 
    122 
    123     @classmethod
    124     def _get_environ_autodir(cls):
    125         return os.environ['AUTODIR']
    126 
    127 
    128     @classmethod
    129     def _find_base_directories(cls):
    130         """
    131         Determine locations of autodir and clientdir (which are the same)
    132         using os.environ. Serverdir does not exist in this context.
    133         """
    134         autodir = clientdir = cls._get_environ_autodir()
    135         return autodir, clientdir, None
    136 
    137 
    138     @classmethod
    139     def _parse_args(cls, args):
    140         return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
    141 
    142 
    143     def _find_resultdir(self, options):
    144         """
    145         Determine the directory for storing results. On a client this is
    146         always <autodir>/results/<tag>, where tag is passed in on the command
    147         line as an option.
    148         """
    149         output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
    150                                                            'output_dir',
    151                                                             default="")
    152         if options.output_dir:
    153             basedir = options.output_dir
    154         elif output_dir_config:
    155             basedir = output_dir_config
    156         else:
    157             basedir = self.autodir
    158 
    159         return os.path.join(basedir, 'results', options.tag)
    160 
    161 
    162     def _get_status_logger(self):
    163         """Return a reference to the status logger."""
    164         return self._logger
    165 
    166 
    167     def _pre_record_init(self, control, options):
    168         """
    169         Initialization function that should peform ONLY the required
    170         setup so that the self.record() method works.
    171 
    172         As of now self.record() needs self.resultdir, self._group_level,
    173         self.harness and of course self._logger.
    174         """
    175         if not options.cont:
    176             self._cleanup_debugdir_files()
    177             self._cleanup_results_dir()
    178 
    179         logging_manager.configure_logging(
    180             client_logging_config.ClientLoggingConfig(),
    181             results_dir=self.resultdir,
    182             verbose=options.verbose)
    183         logging.info('Writing results to %s', self.resultdir)
    184 
    185         # init_group_level needs the state
    186         self.control = os.path.realpath(control)
    187         self._is_continuation = options.cont
    188         self._current_step_ancestry = []
    189         self._next_step_index = 0
    190         self._load_state()
    191 
    192         _harness = self.handle_persistent_option(options, 'harness')
    193         _harness_args = self.handle_persistent_option(options, 'harness_args')
    194 
    195         self.harness = harness.select(_harness, self, _harness_args)
    196 
    197         # set up the status logger
    198         def client_job_record_hook(entry):
    199             msg_tag = ''
    200             if '.' in self._logger.global_filename:
    201                 msg_tag = self._logger.global_filename.split('.', 1)[1]
    202             # send the entry to the job harness
    203             message = '\n'.join([entry.message] + entry.extra_message_lines)
    204             rendered_entry = self._logger.render_entry(entry)
    205             self.harness.test_status_detail(entry.status_code, entry.subdir,
    206                                             entry.operation, message, msg_tag,
    207                                             entry.fields)
    208             self.harness.test_status(rendered_entry, msg_tag)
    209             # send the entry to stdout, if it's enabled
    210             logging.info(rendered_entry)
    211         self._logger = base_job.status_logger(
    212             self, status_indenter(self), record_hook=client_job_record_hook,
    213             tap_writer=self._tap)
    214 
    215     def _post_record_init(self, control, options, drop_caches,
    216                           extra_copy_cmdline):
    217         """
    218         Perform job initialization not required by self.record().
    219         """
    220         self._init_drop_caches(drop_caches)
    221 
    222         self._init_packages()
    223 
    224         self.sysinfo = sysinfo.sysinfo(self.resultdir)
    225         self._load_sysinfo_state()
    226 
    227         if not options.cont:
    228             download = os.path.join(self.testdir, 'download')
    229             if not os.path.exists(download):
    230                 os.mkdir(download)
    231 
    232             shutil.copyfile(self.control,
    233                             os.path.join(self.resultdir, 'control'))
    234 
    235         self.control = control
    236 
    237         self.logging = logging_manager.get_logging_manager(
    238                 manage_stdout_and_stderr=True, redirect_fds=True)
    239         self.logging.start_logging()
    240 
    241         self._config = config.config(self)
    242         self.profilers = profilers.profilers(self)
    243 
    244         self._init_bootloader()
    245 
    246         self.machines = [options.hostname]
    247         self.machine_dict_list = [{'hostname' : options.hostname}]
    248         # Client side tests should always run the same whether or not they are
    249         # running in the lab.
    250         self.in_lab = False
    251         self.hosts = set([local_host.LocalHost(hostname=options.hostname,
    252                                                bootloader=self.bootloader)])
    253 
    254         self.args = []
    255         if options.args:
    256             self.args = self._parse_args(options.args)
    257 
    258         if options.user:
    259             self.user = options.user
    260         else:
    261             self.user = getpass.getuser()
    262 
    263         self.sysinfo.log_per_reboot_data()
    264 
    265         if not options.cont:
    266             self.record('START', None, None)
    267 
    268         self.harness.run_start()
    269 
    270         if options.log:
    271             self.enable_external_logging()
    272 
    273         self._init_cmdline(extra_copy_cmdline)
    274 
    275         self.num_tests_run = None
    276         self.num_tests_failed = None
    277 
    278         self.warning_loggers = None
    279         self.warning_manager = None
    280 
    281 
    282     def _init_drop_caches(self, drop_caches):
    283         """
    284         Perform the drop caches initialization.
    285         """
    286         self.drop_caches_between_iterations = (
    287                                     GLOBAL_CONFIG.get_config_value('CLIENT',
    288                                     'drop_caches_between_iterations',
    289                                     type=bool, default=True))
    290         self.drop_caches = drop_caches
    291         if self.drop_caches:
    292             utils.drop_caches()
    293 
    294 
    295     def _init_bootloader(self):
    296         """
    297         Perform boottool initialization.
    298         """
    299         tool = self.config_get('boottool.executable')
    300         self.bootloader = boottool.boottool(tool)
    301 
    302 
    303     def _init_packages(self):
    304         """
    305         Perform the packages support initialization.
    306         """
    307         self.pkgmgr = packages.PackageManager(
    308             self.autodir, run_function_dargs={'timeout':3600})
    309 
    310 
    311     def _init_cmdline(self, extra_copy_cmdline):
    312         """
    313         Initialize default cmdline for booted kernels in this job.
    314         """
    315         copy_cmdline = set(['console'])
    316         if extra_copy_cmdline is not None:
    317             copy_cmdline.update(extra_copy_cmdline)
    318 
    319         # extract console= and other args from cmdline and add them into the
    320         # base args that we use for all kernels we install
    321         cmdline = utils.read_one_line('/proc/cmdline')
    322         kernel_args = []
    323         for karg in cmdline.split():
    324             for param in copy_cmdline:
    325                 if karg.startswith(param) and \
    326                     (len(param) == len(karg) or karg[len(param)] == '='):
    327                     kernel_args.append(karg)
    328         self.config_set('boot.default_args', ' '.join(kernel_args))
    329 
    330 
    331     def _cleanup_results_dir(self):
    332         """Delete everything in resultsdir"""
    333         assert os.path.exists(self.resultdir)
    334         list_files = glob.glob('%s/*' % self.resultdir)
    335         for f in list_files:
    336             if os.path.isdir(f):
    337                 shutil.rmtree(f)
    338             elif os.path.isfile(f):
    339                 os.remove(f)
    340 
    341 
    342     def _cleanup_debugdir_files(self):
    343         """
    344         Delete any leftover debugdir files
    345         """
    346         list_files = glob.glob("/tmp/autotest_results_dir.*")
    347         for f in list_files:
    348             os.remove(f)
    349 
    350 
    351     def disable_warnings(self, warning_type):
    352         self.record("INFO", None, None,
    353                     "disabling %s warnings" % warning_type,
    354                     {"warnings.disable": warning_type})
    355         time.sleep(self._WARNING_DISABLE_DELAY)
    356 
    357 
    358     def enable_warnings(self, warning_type):
    359         time.sleep(self._WARNING_DISABLE_DELAY)
    360         self.record("INFO", None, None,
    361                     "enabling %s warnings" % warning_type,
    362                     {"warnings.enable": warning_type})
    363 
    364 
    365     def monitor_disk_usage(self, max_rate):
    366         """\
    367         Signal that the job should monitor disk space usage on /
    368         and generate a warning if a test uses up disk space at a
    369         rate exceeding 'max_rate'.
    370 
    371         Parameters:
    372              max_rate - the maximium allowed rate of disk consumption
    373                         during a test, in MB/hour, or 0 to indicate
    374                         no limit.
    375         """
    376         self._max_disk_usage_rate = max_rate
    377 
    378 
    379     def relative_path(self, path):
    380         """\
    381         Return a patch relative to the job results directory
    382         """
    383         head = len(self.resultdir) + 1     # remove the / inbetween
    384         return path[head:]
    385 
    386 
    387     def control_get(self):
    388         return self.control
    389 
    390 
    391     def control_set(self, control):
    392         self.control = os.path.abspath(control)
    393 
    394 
    395     def harness_select(self, which, harness_args):
    396         self.harness = harness.select(which, self, harness_args)
    397 
    398 
    399     def config_set(self, name, value):
    400         self._config.set(name, value)
    401 
    402 
    403     def config_get(self, name):
    404         return self._config.get(name)
    405 
    406 
    407     def setup_dirs(self, results_dir, tmp_dir):
    408         if not tmp_dir:
    409             tmp_dir = os.path.join(self.tmpdir, 'build')
    410         if not os.path.exists(tmp_dir):
    411             os.mkdir(tmp_dir)
    412         if not os.path.isdir(tmp_dir):
    413             e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
    414             raise ValueError(e_msg)
    415 
    416         # We label the first build "build" and then subsequent ones
    417         # as "build.2", "build.3", etc. Whilst this is a little bit
    418         # inconsistent, 99.9% of jobs will only have one build
    419         # (that's not done as kernbench, sparse, or buildtest),
    420         # so it works out much cleaner. One of life's comprimises.
    421         if not results_dir:
    422             results_dir = os.path.join(self.resultdir, 'build')
    423             i = 2
    424             while os.path.exists(results_dir):
    425                 results_dir = os.path.join(self.resultdir, 'build.%d' % i)
    426                 i += 1
    427         if not os.path.exists(results_dir):
    428             os.mkdir(results_dir)
    429 
    430         return (results_dir, tmp_dir)
    431 
    432 
    433     def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
    434                             kjob = None ):
    435         """Summon a xen object"""
    436         (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
    437         build_dir = 'xen'
    438         return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
    439                        leave, kjob)
    440 
    441 
    442     def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
    443         """Summon a kernel object"""
    444         (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
    445         build_dir = 'linux'
    446         return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
    447                                   build_dir, leave)
    448 
    449 
    450     def barrier(self, *args, **kwds):
    451         """Create a barrier object"""
    452         return barrier.barrier(*args, **kwds)
    453 
    454 
    455     def install_pkg(self, name, pkg_type, install_dir):
    456         '''
    457         This method is a simple wrapper around the actual package
    458         installation method in the Packager class. This is used
    459         internally by the profilers, deps and tests code.
    460         name : name of the package (ex: sleeptest, dbench etc.)
    461         pkg_type : Type of the package (ex: test, dep etc.)
    462         install_dir : The directory in which the source is actually
    463                       untarred into. (ex: client/profilers/<name> for profilers)
    464         '''
    465         if self.pkgmgr.repositories:
    466             self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
    467 
    468 
    469     def add_repository(self, repo_urls):
    470         '''
    471         Adds the repository locations to the job so that packages
    472         can be fetched from them when needed. The repository list
    473         needs to be a string list
    474         Ex: job.add_repository(['http://blah1','http://blah2'])
    475         '''
    476         for repo_url in repo_urls:
    477             self.pkgmgr.add_repository(repo_url)
    478 
    479         # Fetch the packages' checksum file that contains the checksums
    480         # of all the packages if it is not already fetched. The checksum
    481         # is always fetched whenever a job is first started. This
    482         # is not done in the job's constructor as we don't have the list of
    483         # the repositories there (and obviously don't care about this file
    484         # if we are not using the repos)
    485         try:
    486             checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
    487                                               base_packages.CHECKSUM_FILE)
    488             self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE,
    489                                   checksum_file_path, use_checksum=False)
    490         except error.PackageFetchError:
    491             # packaging system might not be working in this case
    492             # Silently fall back to the normal case
    493             pass
    494 
    495 
    496     def require_gcc(self):
    497         """
    498         Test whether gcc is installed on the machine.
    499         """
    500         # check if gcc is installed on the system.
    501         try:
    502             utils.system('which gcc')
    503         except error.CmdError, e:
    504             raise NotAvailableError('gcc is required by this job and is '
    505                                     'not available on the system')
    506 
    507 
    508     def setup_dep(self, deps):
    509         """Set up the dependencies for this test.
    510         deps is a list of libraries required for this test.
    511         """
    512         # Fetch the deps from the repositories and set them up.
    513         for dep in deps:
    514             dep_dir = os.path.join(self.autodir, 'deps', dep)
    515             # Search for the dependency in the repositories if specified,
    516             # else check locally.
    517             try:
    518                 self.install_pkg(dep, 'dep', dep_dir)
    519             except error.PackageInstallError:
    520                 # see if the dep is there locally
    521                 pass
    522 
    523             # dep_dir might not exist if it is not fetched from the repos
    524             if not os.path.exists(dep_dir):
    525                 raise error.TestError("Dependency %s does not exist" % dep)
    526 
    527             os.chdir(dep_dir)
    528             if execfile('%s.py' % dep, {}) is None:
    529                 logging.info('Dependency %s successfuly built', dep)
    530 
    531 
    532     def _runtest(self, url, tag, timeout, args, dargs):
    533         try:
    534             l = lambda : test.runtest(self, url, tag, args, dargs)
    535             pid = parallel.fork_start(self.resultdir, l)
    536 
    537             if timeout:
    538                 logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
    539                 parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
    540             else:
    541                 parallel.fork_waitfor(self.resultdir, pid)
    542 
    543         except error.TestBaseException:
    544             # These are already classified with an error type (exit_status)
    545             raise
    546         except error.JobError:
    547             raise  # Caught further up and turned into an ABORT.
    548         except Exception, e:
    549             # Converts all other exceptions thrown by the test regardless
    550             # of phase into a TestError(TestBaseException) subclass that
    551             # reports them with their full stack trace.
    552             raise error.UnhandledTestError(e)
    553 
    554 
    555     def _run_test_base(self, url, *args, **dargs):
    556         """
    557         Prepares arguments and run functions to run_test and run_test_detail.
    558 
    559         @param url A url that identifies the test to run.
    560         @param tag An optional keyword argument that will be added to the
    561             test and subdir name.
    562         @param subdir_tag An optional keyword argument that will be added
    563             to the subdir name.
    564 
    565         @returns:
    566                 subdir: Test subdirectory
    567                 testname: Test name
    568                 group_func: Actual test run function
    569                 timeout: Test timeout
    570         """
    571         group, testname = self.pkgmgr.get_package_name(url, 'test')
    572         testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
    573         outputdir = self._make_test_outputdir(subdir)
    574 
    575         timeout = dargs.pop('timeout', None)
    576         if timeout:
    577             logging.debug('Test has timeout: %d sec.', timeout)
    578 
    579         def log_warning(reason):
    580             self.record("WARN", subdir, testname, reason)
    581         @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
    582         def group_func():
    583             try:
    584                 self._runtest(url, tag, timeout, args, dargs)
    585             except error.TestBaseException, detail:
    586                 # The error is already classified, record it properly.
    587                 self.record(detail.exit_status, subdir, testname, str(detail))
    588                 raise
    589             else:
    590                 self.record('GOOD', subdir, testname, 'completed successfully')
    591 
    592         return (subdir, testname, group_func, timeout)
    593 
    594 
    595     @_run_test_complete_on_exit
    596     def run_test(self, url, *args, **dargs):
    597         """
    598         Summon a test object and run it.
    599 
    600         @param url A url that identifies the test to run.
    601         @param tag An optional keyword argument that will be added to the
    602             test and subdir name.
    603         @param subdir_tag An optional keyword argument that will be added
    604             to the subdir name.
    605 
    606         @returns True if the test passes, False otherwise.
    607         """
    608         (subdir, testname, group_func, timeout) = self._run_test_base(url,
    609                                                                       *args,
    610                                                                       **dargs)
    611         try:
    612             self._rungroup(subdir, testname, group_func, timeout)
    613             return True
    614         except error.TestBaseException:
    615             return False
    616         # Any other exception here will be given to the caller
    617         #
    618         # NOTE: The only exception possible from the control file here
    619         # is error.JobError as _runtest() turns all others into an
    620         # UnhandledTestError that is caught above.
    621 
    622 
    623     @_run_test_complete_on_exit
    624     def run_test_detail(self, url, *args, **dargs):
    625         """
    626         Summon a test object and run it, returning test status.
    627 
    628         @param url A url that identifies the test to run.
    629         @param tag An optional keyword argument that will be added to the
    630             test and subdir name.
    631         @param subdir_tag An optional keyword argument that will be added
    632             to the subdir name.
    633 
    634         @returns Test status
    635         @see: client/common_lib/error.py, exit_status
    636         """
    637         (subdir, testname, group_func, timeout) = self._run_test_base(url,
    638                                                                       *args,
    639                                                                       **dargs)
    640         try:
    641             self._rungroup(subdir, testname, group_func, timeout)
    642             return 'GOOD'
    643         except error.TestBaseException, detail:
    644             return detail.exit_status
    645 
    646 
    647     def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
    648         """\
    649         subdir:
    650                 name of the group
    651         testname:
    652                 name of the test to run, or support step
    653         function:
    654                 subroutine to run
    655         *args:
    656                 arguments for the function
    657 
    658         Returns the result of the passed in function
    659         """
    660 
    661         try:
    662             optional_fields = None
    663             if timeout:
    664                 optional_fields = {}
    665                 optional_fields['timeout'] = timeout
    666             self.record('START', subdir, testname,
    667                         optional_fields=optional_fields)
    668 
    669             self._state.set('client', 'unexpected_reboot', (subdir, testname))
    670             try:
    671                 result = function(*args, **dargs)
    672                 self.record('END GOOD', subdir, testname)
    673                 return result
    674             except error.TestBaseException, e:
    675                 self.record('END %s' % e.exit_status, subdir, testname)
    676                 raise
    677             except error.JobError, e:
    678                 self.record('END ABORT', subdir, testname)
    679                 raise
    680             except Exception, e:
    681                 # This should only ever happen due to a bug in the given
    682                 # function's code.  The common case of being called by
    683                 # run_test() will never reach this.  If a control file called
    684                 # run_group() itself, bugs in its function will be caught
    685                 # here.
    686                 err_msg = str(e) + '\n' + traceback.format_exc()
    687                 self.record('END ERROR', subdir, testname, err_msg)
    688                 raise
    689         finally:
    690             self._state.discard('client', 'unexpected_reboot')
    691 
    692 
    693     def run_group(self, function, tag=None, **dargs):
    694         """
    695         Run a function nested within a group level.
    696 
    697         function:
    698                 Callable to run.
    699         tag:
    700                 An optional tag name for the group.  If None (default)
    701                 function.__name__ will be used.
    702         **dargs:
    703                 Named arguments for the function.
    704         """
    705         if tag:
    706             name = tag
    707         else:
    708             name = function.__name__
    709 
    710         try:
    711             return self._rungroup(subdir=None, testname=name,
    712                                   function=function, timeout=None, **dargs)
    713         except (SystemExit, error.TestBaseException):
    714             raise
    715         # If there was a different exception, turn it into a TestError.
    716         # It will be caught by step_engine or _run_step_fn.
    717         except Exception, e:
    718             raise error.UnhandledTestError(e)
    719 
    720 
    721     def cpu_count(self):
    722         return utils.count_cpus()  # use total system count
    723 
    724 
    725     def start_reboot(self):
    726         self.record('START', None, 'reboot')
    727         self.record('GOOD', None, 'reboot.start')
    728 
    729 
    730     def _record_reboot_failure(self, subdir, operation, status,
    731                                running_id=None):
    732         self.record("ABORT", subdir, operation, status)
    733         if not running_id:
    734             running_id = utils.running_os_ident()
    735         kernel = {"kernel": running_id.split("::")[0]}
    736         self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
    737 
    738 
    739     def _check_post_reboot(self, subdir, running_id=None):
    740         """
    741         Function to perform post boot checks such as if the system configuration
    742         has changed across reboots (specifically, CPUs and partitions).
    743 
    744         @param subdir: The subdir to use in the job.record call.
    745         @param running_id: An optional running_id to include in the reboot
    746             failure log message
    747 
    748         @raise JobError: Raised if the current configuration does not match the
    749             pre-reboot configuration.
    750         """
    751         # check to see if any partitions have changed
    752         partition_list = partition_lib.get_partition_list(self,
    753                                                           exclude_swap=False)
    754         mount_info = partition_lib.get_mount_info(partition_list)
    755         old_mount_info = self._state.get('client', 'mount_info')
    756         if mount_info != old_mount_info:
    757             new_entries = mount_info - old_mount_info
    758             old_entries = old_mount_info - mount_info
    759             description = ("mounted partitions are different after reboot "
    760                            "(old entries: %s, new entries: %s)" %
    761                            (old_entries, new_entries))
    762             self._record_reboot_failure(subdir, "reboot.verify_config",
    763                                         description, running_id=running_id)
    764             raise error.JobError("Reboot failed: %s" % description)
    765 
    766         # check to see if any CPUs have changed
    767         cpu_count = utils.count_cpus()
    768         old_count = self._state.get('client', 'cpu_count')
    769         if cpu_count != old_count:
    770             description = ('Number of CPUs changed after reboot '
    771                            '(old count: %d, new count: %d)' %
    772                            (old_count, cpu_count))
    773             self._record_reboot_failure(subdir, 'reboot.verify_config',
    774                                         description, running_id=running_id)
    775             raise error.JobError('Reboot failed: %s' % description)
    776 
    777 
    778     def end_reboot(self, subdir, kernel, patches, running_id=None):
    779         self._check_post_reboot(subdir, running_id=running_id)
    780 
    781         # strip ::<timestamp> from the kernel version if present
    782         kernel = kernel.split("::")[0]
    783         kernel_info = {"kernel": kernel}
    784         for i, patch in enumerate(patches):
    785             kernel_info["patch%d" % i] = patch
    786         self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
    787 
    788 
    789     def end_reboot_and_verify(self, expected_when, expected_id, subdir,
    790                               type='src', patches=[]):
    791         """ Check the passed kernel identifier against the command line
    792             and the running kernel, abort the job on missmatch. """
    793 
    794         logging.info("POST BOOT: checking booted kernel "
    795                      "mark=%d identity='%s' type='%s'",
    796                      expected_when, expected_id, type)
    797 
    798         running_id = utils.running_os_ident()
    799 
    800         cmdline = utils.read_one_line("/proc/cmdline")
    801 
    802         find_sum = re.compile(r'.*IDENT=(\d+)')
    803         m = find_sum.match(cmdline)
    804         cmdline_when = -1
    805         if m:
    806             cmdline_when = int(m.groups()[0])
    807 
    808         # We have all the facts, see if they indicate we
    809         # booted the requested kernel or not.
    810         bad = False
    811         if (type == 'src' and expected_id != running_id or
    812             type == 'rpm' and
    813             not running_id.startswith(expected_id + '::')):
    814             logging.error("Kernel identifier mismatch")
    815             bad = True
    816         if expected_when != cmdline_when:
    817             logging.error("Kernel command line mismatch")
    818             bad = True
    819 
    820         if bad:
    821             logging.error("   Expected Ident: " + expected_id)
    822             logging.error("    Running Ident: " + running_id)
    823             logging.error("    Expected Mark: %d", expected_when)
    824             logging.error("Command Line Mark: %d", cmdline_when)
    825             logging.error("     Command Line: " + cmdline)
    826 
    827             self._record_reboot_failure(subdir, "reboot.verify", "boot failure",
    828                                         running_id=running_id)
    829             raise error.JobError("Reboot returned with the wrong kernel")
    830 
    831         self.record('GOOD', subdir, 'reboot.verify',
    832                     utils.running_os_full_version())
    833         self.end_reboot(subdir, expected_id, patches, running_id=running_id)
    834 
    835 
    836     def partition(self, device, loop_size=0, mountpoint=None):
    837         """
    838         Work with a machine partition
    839 
    840             @param device: e.g. /dev/sda2, /dev/sdb1 etc...
    841             @param mountpoint: Specify a directory to mount to. If not specified
    842                                autotest tmp directory will be used.
    843             @param loop_size: Size of loopback device (in MB). Defaults to 0.
    844 
    845             @return: A L{client.bin.partition.partition} object
    846         """
    847 
    848         if not mountpoint:
    849             mountpoint = self.tmpdir
    850         return partition_lib.partition(self, device, loop_size, mountpoint)
    851 
    852     @utils.deprecated
    853     def filesystem(self, device, mountpoint=None, loop_size=0):
    854         """ Same as partition
    855 
    856         @deprecated: Use partition method instead
    857         """
    858         return self.partition(device, loop_size, mountpoint)
    859 
    860 
    861     def enable_external_logging(self):
    862         pass
    863 
    864 
    865     def disable_external_logging(self):
    866         pass
    867 
    868 
    869     def reboot_setup(self):
    870         # save the partition list and mount points, as well as the cpu count
    871         partition_list = partition_lib.get_partition_list(self,
    872                                                           exclude_swap=False)
    873         mount_info = partition_lib.get_mount_info(partition_list)
    874         self._state.set('client', 'mount_info', mount_info)
    875         self._state.set('client', 'cpu_count', utils.count_cpus())
    876 
    877 
    878     def reboot(self, tag=LAST_BOOT_TAG):
    879         if tag == LAST_BOOT_TAG:
    880             tag = self.last_boot_tag
    881         else:
    882             self.last_boot_tag = tag
    883 
    884         self.reboot_setup()
    885         self.harness.run_reboot()
    886         default = self.config_get('boot.set_default')
    887         if default:
    888             self.bootloader.set_default(tag)
    889         else:
    890             self.bootloader.boot_once(tag)
    891 
    892         # HACK: using this as a module sometimes hangs shutdown, so if it's
    893         # installed unload it first
    894         utils.system("modprobe -r netconsole", ignore_status=True)
    895 
    896         # sync first, so that a sync during shutdown doesn't time out
    897         utils.system("sync; sync", ignore_status=True)
    898 
    899         utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
    900         self.quit()
    901 
    902 
    903     def noop(self, text):
    904         logging.info("job: noop: " + text)
    905 
    906 
    907     @_run_test_complete_on_exit
    908     def parallel(self, *tasklist):
    909         """Run tasks in parallel"""
    910 
    911         pids = []
    912         old_log_filename = self._logger.global_filename
    913         for i, task in enumerate(tasklist):
    914             assert isinstance(task, (tuple, list))
    915             self._logger.global_filename = old_log_filename + (".%d" % i)
    916             def task_func():
    917                 # stub out _record_indent with a process-local one
    918                 base_record_indent = self._record_indent
    919                 proc_local = self._job_state.property_factory(
    920                     '_state', '_record_indent.%d' % os.getpid(),
    921                     base_record_indent, namespace='client')
    922                 self.__class__._record_indent = proc_local
    923                 task[0](*task[1:])
    924             pids.append(parallel.fork_start(self.resultdir, task_func))
    925 
    926         old_log_path = os.path.join(self.resultdir, old_log_filename)
    927         old_log = open(old_log_path, "a")
    928         exceptions = []
    929         for i, pid in enumerate(pids):
    930             # wait for the task to finish
    931             try:
    932                 parallel.fork_waitfor(self.resultdir, pid)
    933             except Exception, e:
    934                 exceptions.append(e)
    935             # copy the logs from the subtask into the main log
    936             new_log_path = old_log_path + (".%d" % i)
    937             if os.path.exists(new_log_path):
    938                 new_log = open(new_log_path)
    939                 old_log.write(new_log.read())
    940                 new_log.close()
    941                 old_log.flush()
    942                 os.remove(new_log_path)
    943         old_log.close()
    944 
    945         self._logger.global_filename = old_log_filename
    946 
    947         # handle any exceptions raised by the parallel tasks
    948         if exceptions:
    949             msg = "%d task(s) failed in job.parallel" % len(exceptions)
    950             raise error.JobError(msg)
    951 
    952 
    953     def quit(self):
    954         # XXX: should have a better name.
    955         self.harness.run_pause()
    956         raise error.JobContinue("more to come")
    957 
    958 
    959     def complete(self, status):
    960         """Write pending TAP reports, clean up, and exit"""
    961         # write out TAP reports
    962         if self._tap.do_tap_report:
    963             self._tap.write()
    964             self._tap._write_tap_archive()
    965 
    966         # write out a job HTML report
    967         try:
    968             html_report.create_report(self.resultdir)
    969         except Exception, e:
    970             logging.error("Error writing job HTML report: %s", e)
    971 
    972         # We are about to exit 'complete' so clean up the control file.
    973         dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
    974         shutil.move(self._state_file, dest)
    975 
    976         self.harness.run_complete()
    977         self.disable_external_logging()
    978         sys.exit(status)
    979 
    980 
    981     def _load_state(self):
    982         # grab any initial state and set up $CONTROL.state as the backing file
    983         init_state_file = self.control + '.init.state'
    984         self._state_file = self.control + '.state'
    985         if os.path.exists(init_state_file):
    986             shutil.move(init_state_file, self._state_file)
    987         self._state.set_backing_file(self._state_file)
    988 
    989         # initialize the state engine, if necessary
    990         has_steps = self._state.has('client', 'steps')
    991         if not self._is_continuation and has_steps:
    992             raise RuntimeError('Loaded state can only contain client.steps if '
    993                                'this is a continuation')
    994 
    995         if not has_steps:
    996             logging.debug('Initializing the state engine')
    997             self._state.set('client', 'steps', [])
    998 
    999 
   1000     def handle_persistent_option(self, options, option_name):
   1001         """
   1002         Select option from command line or persistent state.
   1003         Store selected option to allow standalone client to continue
   1004         after reboot with previously selected options.
   1005         Priority:
   1006         1. explicitly specified via command line
   1007         2. stored in state file (if continuing job '-c')
   1008         3. default == None
   1009         """
   1010         option = None
   1011         cmd_line_option = getattr(options, option_name)
   1012         if cmd_line_option:
   1013             option = cmd_line_option
   1014             self._state.set('client', option_name, option)
   1015         else:
   1016             stored_option = self._state.get('client', option_name, None)
   1017             if stored_option:
   1018                 option = stored_option
   1019         logging.debug('Persistent option %s now set to %s', option_name, option)
   1020         return option
   1021 
   1022 
   1023     def __create_step_tuple(self, fn, args, dargs):
   1024         # Legacy code passes in an array where the first arg is
   1025         # the function or its name.
   1026         if isinstance(fn, list):
   1027             assert(len(args) == 0)
   1028             assert(len(dargs) == 0)
   1029             args = fn[1:]
   1030             fn = fn[0]
   1031         # Pickling actual functions is hairy, thus we have to call
   1032         # them by name.  Unfortunately, this means only functions
   1033         # defined globally can be used as a next step.
   1034         if callable(fn):
   1035             fn = fn.__name__
   1036         if not isinstance(fn, types.StringTypes):
   1037             raise StepError("Next steps must be functions or "
   1038                             "strings containing the function name")
   1039         ancestry = copy.copy(self._current_step_ancestry)
   1040         return (ancestry, fn, args, dargs)
   1041 
   1042 
   1043     def next_step_append(self, fn, *args, **dargs):
   1044         """Define the next step and place it at the end"""
   1045         steps = self._state.get('client', 'steps')
   1046         steps.append(self.__create_step_tuple(fn, args, dargs))
   1047         self._state.set('client', 'steps', steps)
   1048 
   1049 
   1050     def next_step(self, fn, *args, **dargs):
   1051         """Create a new step and place it after any steps added
   1052         while running the current step but before any steps added in
   1053         previous steps"""
   1054         steps = self._state.get('client', 'steps')
   1055         steps.insert(self._next_step_index,
   1056                      self.__create_step_tuple(fn, args, dargs))
   1057         self._next_step_index += 1
   1058         self._state.set('client', 'steps', steps)
   1059 
   1060 
   1061     def next_step_prepend(self, fn, *args, **dargs):
   1062         """Insert a new step, executing first"""
   1063         steps = self._state.get('client', 'steps')
   1064         steps.insert(0, self.__create_step_tuple(fn, args, dargs))
   1065         self._next_step_index += 1
   1066         self._state.set('client', 'steps', steps)
   1067 
   1068 
   1069 
   1070     def _run_step_fn(self, local_vars, fn, args, dargs):
   1071         """Run a (step) function within the given context"""
   1072 
   1073         local_vars['__args'] = args
   1074         local_vars['__dargs'] = dargs
   1075         try:
   1076             exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
   1077             return local_vars['__ret']
   1078         except SystemExit:
   1079             raise  # Send error.JobContinue and JobComplete on up to runjob.
   1080         except error.TestNAError, detail:
   1081             self.record(detail.exit_status, None, fn, str(detail))
   1082         except Exception, detail:
   1083             raise error.UnhandledJobError(detail)
   1084 
   1085 
   1086     def _create_frame(self, global_vars, ancestry, fn_name):
   1087         """Set up the environment like it would have been when this
   1088         function was first defined.
   1089 
   1090         Child step engine 'implementations' must have 'return locals()'
   1091         at end end of their steps.  Because of this, we can call the
   1092         parent function and get back all child functions (i.e. those
   1093         defined within it).
   1094 
   1095         Unfortunately, the call stack of the function calling
   1096         job.next_step might have been deeper than the function it
   1097         added.  In order to make sure that the environment is what it
   1098         should be, we need to then pop off the frames we built until
   1099         we find the frame where the function was first defined."""
   1100 
   1101         # The copies ensure that the parent frames are not modified
   1102         # while building child frames.  This matters if we then
   1103         # pop some frames in the next part of this function.
   1104         current_frame = copy.copy(global_vars)
   1105         frames = [current_frame]
   1106         for steps_fn_name in ancestry:
   1107             ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
   1108             current_frame = copy.copy(ret)
   1109             frames.append(current_frame)
   1110 
   1111         # Walk up the stack frames until we find the place fn_name was defined.
   1112         while len(frames) > 2:
   1113             if fn_name not in frames[-2]:
   1114                 break
   1115             if frames[-2][fn_name] != frames[-1][fn_name]:
   1116                 break
   1117             frames.pop()
   1118             ancestry.pop()
   1119 
   1120         return (frames[-1], ancestry)
   1121 
   1122 
   1123     def _add_step_init(self, local_vars, current_function):
   1124         """If the function returned a dictionary that includes a
   1125         function named 'step_init', prepend it to our list of steps.
   1126         This will only get run the first time a function with a nested
   1127         use of the step engine is run."""
   1128 
   1129         if (isinstance(local_vars, dict) and
   1130             'step_init' in local_vars and
   1131             callable(local_vars['step_init'])):
   1132             # The init step is a child of the function
   1133             # we were just running.
   1134             self._current_step_ancestry.append(current_function)
   1135             self.next_step_prepend('step_init')
   1136 
   1137 
   1138     def step_engine(self):
   1139         """The multi-run engine used when the control file defines step_init.
   1140 
   1141         Does the next step.
   1142         """
   1143 
   1144         # Set up the environment and then interpret the control file.
   1145         # Some control files will have code outside of functions,
   1146         # which means we need to have our state engine initialized
   1147         # before reading in the file.
   1148         global_control_vars = {'job': self,
   1149                                'args': self.args}
   1150         exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
   1151         try:
   1152             execfile(self.control, global_control_vars, global_control_vars)
   1153         except error.TestNAError, detail:
   1154             self.record(detail.exit_status, None, self.control, str(detail))
   1155         except SystemExit:
   1156             raise  # Send error.JobContinue and JobComplete on up to runjob.
   1157         except Exception, detail:
   1158             # Syntax errors or other general Python exceptions coming out of
   1159             # the top level of the control file itself go through here.
   1160             raise error.UnhandledJobError(detail)
   1161 
   1162         # If we loaded in a mid-job state file, then we presumably
   1163         # know what steps we have yet to run.
   1164         if not self._is_continuation:
   1165             if 'step_init' in global_control_vars:
   1166                 self.next_step(global_control_vars['step_init'])
   1167         else:
   1168             # if last job failed due to unexpected reboot, record it as fail
   1169             # so harness gets called
   1170             last_job = self._state.get('client', 'unexpected_reboot', None)
   1171             if last_job:
   1172                 subdir, testname = last_job
   1173                 self.record('FAIL', subdir, testname, 'unexpected reboot')
   1174                 self.record('END FAIL', subdir, testname)
   1175 
   1176         # Iterate through the steps.  If we reboot, we'll simply
   1177         # continue iterating on the next step.
   1178         while len(self._state.get('client', 'steps')) > 0:
   1179             steps = self._state.get('client', 'steps')
   1180             (ancestry, fn_name, args, dargs) = steps.pop(0)
   1181             self._state.set('client', 'steps', steps)
   1182 
   1183             self._next_step_index = 0
   1184             ret = self._create_frame(global_control_vars, ancestry, fn_name)
   1185             local_vars, self._current_step_ancestry = ret
   1186             local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
   1187             self._add_step_init(local_vars, fn_name)
   1188 
   1189 
   1190     def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
   1191         self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
   1192                                    on_every_test)
   1193 
   1194 
   1195     def add_sysinfo_logfile(self, file, on_every_test=False):
   1196         self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
   1197 
   1198 
   1199     def _add_sysinfo_loggable(self, loggable, on_every_test):
   1200         if on_every_test:
   1201             self.sysinfo.test_loggables.add(loggable)
   1202         else:
   1203             self.sysinfo.boot_loggables.add(loggable)
   1204         self._save_sysinfo_state()
   1205 
   1206 
   1207     def _load_sysinfo_state(self):
   1208         state = self._state.get('client', 'sysinfo', None)
   1209         if state:
   1210             self.sysinfo.deserialize(state)
   1211 
   1212 
   1213     def _save_sysinfo_state(self):
   1214         state = self.sysinfo.serialize()
   1215         self._state.set('client', 'sysinfo', state)
   1216 
   1217 
   1218 class disk_usage_monitor:
   1219     def __init__(self, logging_func, device, max_mb_per_hour):
   1220         self.func = logging_func
   1221         self.device = device
   1222         self.max_mb_per_hour = max_mb_per_hour
   1223 
   1224 
   1225     def start(self):
   1226         self.initial_space = utils.freespace(self.device)
   1227         self.start_time = time.time()
   1228 
   1229 
   1230     def stop(self):
   1231         # if no maximum usage rate was set, we don't need to
   1232         # generate any warnings
   1233         if not self.max_mb_per_hour:
   1234             return
   1235 
   1236         final_space = utils.freespace(self.device)
   1237         used_space = self.initial_space - final_space
   1238         stop_time = time.time()
   1239         total_time = stop_time - self.start_time
   1240         # round up the time to one minute, to keep extremely short
   1241         # tests from generating false positives due to short, badly
   1242         # timed bursts of activity
   1243         total_time = max(total_time, 60.0)
   1244 
   1245         # determine the usage rate
   1246         bytes_per_sec = used_space / total_time
   1247         mb_per_sec = bytes_per_sec / 1024**2
   1248         mb_per_hour = mb_per_sec * 60 * 60
   1249 
   1250         if mb_per_hour > self.max_mb_per_hour:
   1251             msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
   1252             msg %= (self.device, mb_per_hour)
   1253             self.func(msg)
   1254 
   1255 
   1256     @classmethod
   1257     def watch(cls, *monitor_args, **monitor_dargs):
   1258         """ Generic decorator to wrap a function call with the
   1259         standard create-monitor -> start -> call -> stop idiom."""
   1260         def decorator(func):
   1261             def watched_func(*args, **dargs):
   1262                 monitor = cls(*monitor_args, **monitor_dargs)
   1263                 monitor.start()
   1264                 try:
   1265                     func(*args, **dargs)
   1266                 finally:
   1267                     monitor.stop()
   1268             return watched_func
   1269         return decorator
   1270 
   1271 
   1272 def runjob(control, drop_caches, options):
   1273     """
   1274     Run a job using the given control file.
   1275 
   1276     This is the main interface to this module.
   1277 
   1278     @see base_job.__init__ for parameter info.
   1279     """
   1280     control = os.path.abspath(control)
   1281     state = control + '.state'
   1282     # Ensure state file is cleaned up before the job starts to run if autotest
   1283     # is not running with the --continue flag
   1284     if not options.cont and os.path.isfile(state):
   1285         logging.debug('Cleaning up previously found state file')
   1286         os.remove(state)
   1287 
   1288     # instantiate the job object ready for the control file.
   1289     myjob = None
   1290     try:
   1291         # Check that the control file is valid
   1292         if not os.path.exists(control):
   1293             raise error.JobError(control + ": control file not found")
   1294 
   1295         # When continuing, the job is complete when there is no
   1296         # state file, ensure we don't try and continue.
   1297         if options.cont and not os.path.exists(state):
   1298             raise error.JobComplete("all done")
   1299 
   1300         myjob = job(control=control, drop_caches=drop_caches, options=options)
   1301 
   1302         # Load in the users control file, may do any one of:
   1303         #  1) execute in toto
   1304         #  2) define steps, and select the first via next_step()
   1305         myjob.step_engine()
   1306 
   1307     except error.JobContinue:
   1308         sys.exit(5)
   1309 
   1310     except error.JobComplete:
   1311         sys.exit(1)
   1312 
   1313     except error.JobError, instance:
   1314         logging.error("JOB ERROR: " + str(instance))
   1315         if myjob:
   1316             command = None
   1317             if len(instance.args) > 1:
   1318                 command = instance.args[1]
   1319                 myjob.record('ABORT', None, command, str(instance))
   1320             myjob.record('END ABORT', None, None, str(instance))
   1321             assert myjob._record_indent == 0
   1322             myjob.complete(1)
   1323         else:
   1324             sys.exit(1)
   1325 
   1326     except Exception, e:
   1327         # NOTE: job._run_step_fn and job.step_engine will turn things into
   1328         # a JobError for us.  If we get here, its likely an autotest bug.
   1329         msg = str(e) + '\n' + traceback.format_exc()
   1330         logging.critical("JOB ERROR (autotest bug?): " + msg)
   1331         if myjob:
   1332             myjob.record('END ABORT', None, None, msg)
   1333             assert myjob._record_indent == 0
   1334             myjob.complete(1)
   1335         else:
   1336             sys.exit(1)
   1337 
   1338     # If we get here, then we assume the job is complete and good.
   1339     myjob.record('END GOOD', None, None)
   1340     assert myjob._record_indent == 0
   1341 
   1342     myjob.complete(0)
   1343 
   1344 
   1345 site_job = utils.import_site_class(
   1346     __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job)
   1347 
   1348 class job(site_job):
   1349     pass
   1350