Home | History | Annotate | Download | only in common_lib
      1 # Shell class for a test, inherited by all individual tests
      2 #
      3 # Methods:
      4 #       __init__        initialise
      5 #       initialize      run once for each job
      6 #       setup           run once for each new version of the test installed
      7 #       run             run the test (wrapped by job.run_test())
      8 #
      9 # Data:
     10 #       job             backreference to the job this test instance is part of
     11 #       outputdir       eg. results/<job>/<testname.tag>
     12 #       resultsdir      eg. results/<job>/<testname.tag>/results
     13 #       profdir         eg. results/<job>/<testname.tag>/profiling
     14 #       debugdir        eg. results/<job>/<testname.tag>/debug
     15 #       bindir          eg. tests/<test>
     16 #       src             eg. tests/<test>/src
     17 #       tmpdir          eg. tmp/<tempname>_<testname.tag>
     18 
     19 #pylint: disable-msg=C0111
     20 
     21 import fcntl, json, os, re, sys, shutil, stat, tempfile, time, traceback
     22 import logging
     23 
     24 from autotest_lib.client.bin import utils
     25 from autotest_lib.client.common_lib import error
     26 
     27 
     28 class base_test(object):
     29     preserve_srcdir = False
     30     network_destabilizing = False
     31 
     32     def __init__(self, job, bindir, outputdir):
     33         self.job = job
     34         self.pkgmgr = job.pkgmgr
     35         self.autodir = job.autodir
     36         self.outputdir = outputdir
     37         self.tagged_testname = os.path.basename(self.outputdir)
     38         self.resultsdir = os.path.join(self.outputdir, 'results')
     39         os.mkdir(self.resultsdir)
     40         self.profdir = os.path.join(self.outputdir, 'profiling')
     41         os.mkdir(self.profdir)
     42         self.debugdir = os.path.join(self.outputdir, 'debug')
     43         os.mkdir(self.debugdir)
     44         # TODO(ericli): figure out how autotest crash handler work with cros
     45         # Once this is re-enabled import getpass. crosbug.com/31232
     46         # crash handler, we should restore it in near term.
     47         # if getpass.getuser() == 'root':
     48         #     self.configure_crash_handler()
     49         # else:
     50         self.crash_handling_enabled = False
     51         self.bindir = bindir
     52         self.srcdir = os.path.join(self.bindir, 'src')
     53         self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
     54                                        dir=job.tmpdir)
     55         self._keyvals = []
     56         self._new_keyval = False
     57         self.failed_constraints = []
     58         self.iteration = 0
     59         self.before_iteration_hooks = []
     60         self.after_iteration_hooks = []
     61 
     62         # Flag to indicate if the test has succeeded or failed.
     63         self.success = False
     64 
     65 
     66     def configure_crash_handler(self):
     67         pass
     68 
     69 
     70     def crash_handler_report(self):
     71         pass
     72 
     73 
     74     def assert_(self, expr, msg='Assertion failed.'):
     75         if not expr:
     76             raise error.TestError(msg)
     77 
     78 
     79     def write_test_keyval(self, attr_dict):
     80         utils.write_keyval(self.outputdir, attr_dict,
     81                            tap_report=self.job._tap)
     82 
     83 
     84     @staticmethod
     85     def _append_type_to_keys(dictionary, typename):
     86         new_dict = {}
     87         for key, value in dictionary.iteritems():
     88             new_key = "%s{%s}" % (key, typename)
     89             new_dict[new_key] = value
     90         return new_dict
     91 
     92 
     93     def output_perf_value(self, description, value, units=None,
     94                           higher_is_better=None, graph=None, replacement='_'):
     95         """
     96         Records a measured performance value in an output file.
     97 
     98         The output file will subsequently be parsed by the TKO parser to have
     99         the information inserted into the results database.
    100 
    101         @param description: A string describing the measured perf value. Must
    102                 be maximum length 256, and may only contain letters, numbers,
    103                 periods, dashes, and underscores.  For example:
    104                 "page_load_time", "scrolling-frame-rate".
    105         @param value: A number representing the measured perf value, or a list
    106                 of measured values if a test takes multiple measurements.
    107                 Measured perf values can be either ints or floats.
    108         @param units: A string describing the units associated with the
    109                 measured perf value. Must be maximum length 32, and may only
    110                 contain letters, numbers, periods, dashes, and underscores.
    111                 For example: "msec", "fps", "score", "runs_per_second".
    112         @param higher_is_better: A boolean indicating whether or not a "higher"
    113                 measured perf value is considered to be better. If False, it is
    114                 assumed that a "lower" measured value is considered to be
    115                 better. This impacts dashboard plotting and email notification.
    116                 Pure autotests are expected to specify either True or False!
    117                 This value can be set to "None" to indicate that the perf
    118                 dashboard should apply the rules encoded via Chromium
    119                 unit-info.json. This is only used for tracking Chromium based
    120                 tests (in particular telemetry).
    121         @param graph: A string indicating the name of the graph on which
    122                 the perf value will be subsequently displayed on the chrome perf
    123                 dashboard. This allows multiple metrics be grouped together on
    124                 the same graphs. Defaults to None, indicating that the perf
    125                 value should be displayed individually on a separate graph.
    126         @param replacement: string to replace illegal characters in
    127                 |description| and |units| with.
    128         """
    129         if len(description) > 256:
    130             raise ValueError('The description must be at most 256 characters.')
    131         if units and len(units) > 32:
    132             raise ValueError('The units must be at most 32 characters.')
    133 
    134         # If |replacement| is legal replace illegal characters with it.
    135         string_regex = re.compile(r'[^-\.\w]')
    136         if replacement is None or re.search(string_regex, replacement):
    137             raise ValueError('Invalid replacement string to mask illegal '
    138                              'characters. May only contain letters, numbers, '
    139                              'periods, dashes, and underscores. '
    140                              'replacement: %s' % replacement)
    141         description = re.sub(string_regex, replacement, description)
    142         units = re.sub(string_regex, replacement, units) if units else None
    143 
    144         entry = {
    145             'description': description,
    146             'value': value,
    147             'units': units,
    148             'higher_is_better': higher_is_better,
    149             'graph': graph
    150         }
    151 
    152         output_path = os.path.join(self.resultsdir, 'perf_measurements')
    153         with open(output_path, 'a') as fp:
    154             fp.write(json.dumps(entry, sort_keys=True) + '\n')
    155 
    156 
    157     def write_perf_keyval(self, perf_dict):
    158         self.write_iteration_keyval({}, perf_dict,
    159                                     tap_report=self.job._tap)
    160 
    161 
    162     def write_attr_keyval(self, attr_dict):
    163         self.write_iteration_keyval(attr_dict, {},
    164                                     tap_report=self.job._tap)
    165 
    166 
    167     def write_iteration_keyval(self, attr_dict, perf_dict, tap_report=None):
    168         # append the dictionaries before they have the {perf} and {attr} added
    169         self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
    170         self._new_keyval = True
    171 
    172         if attr_dict:
    173             attr_dict = self._append_type_to_keys(attr_dict, "attr")
    174             utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr",
    175                                tap_report=tap_report)
    176 
    177         if perf_dict:
    178             perf_dict = self._append_type_to_keys(perf_dict, "perf")
    179             utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf",
    180                                tap_report=tap_report)
    181 
    182         keyval_path = os.path.join(self.resultsdir, "keyval")
    183         print >> open(keyval_path, "a"), ""
    184 
    185 
    186     def analyze_perf_constraints(self, constraints):
    187         if not self._new_keyval:
    188             return
    189 
    190         # create a dict from the keyvals suitable as an environment for eval
    191         keyval_env = self._keyvals[-1]['perf'].copy()
    192         keyval_env['__builtins__'] = None
    193         self._new_keyval = False
    194         failures = []
    195 
    196         # evaluate each constraint using the current keyvals
    197         for constraint in constraints:
    198             logging.info('___________________ constraint = %s', constraint)
    199             logging.info('___________________ keyvals = %s', keyval_env)
    200 
    201             try:
    202                 if not eval(constraint, keyval_env):
    203                     failures.append('%s: constraint was not met' % constraint)
    204             except:
    205                 failures.append('could not evaluate constraint: %s'
    206                                 % constraint)
    207 
    208         # keep track of the errors for each iteration
    209         self.failed_constraints.append(failures)
    210 
    211 
    212     def process_failed_constraints(self):
    213         msg = ''
    214         for i, failures in enumerate(self.failed_constraints):
    215             if failures:
    216                 msg += 'iteration %d:%s  ' % (i, ','.join(failures))
    217 
    218         if msg:
    219             raise error.TestFail(msg)
    220 
    221 
    222     def register_before_iteration_hook(self, iteration_hook):
    223         """
    224         This is how we expect test writers to register a before_iteration_hook.
    225         This adds the method to the list of hooks which are executed
    226         before each iteration.
    227 
    228         @param iteration_hook: Method to run before each iteration. A valid
    229                                hook accepts a single argument which is the
    230                                test object.
    231         """
    232         self.before_iteration_hooks.append(iteration_hook)
    233 
    234 
    235     def register_after_iteration_hook(self, iteration_hook):
    236         """
    237         This is how we expect test writers to register an after_iteration_hook.
    238         This adds the method to the list of hooks which are executed
    239         after each iteration. Hooks are executed starting with the most-
    240         recently registered, in stack fashion.
    241 
    242         @param iteration_hook: Method to run after each iteration. A valid
    243                                hook accepts a single argument which is the
    244                                test object.
    245         """
    246         self.after_iteration_hooks.append(iteration_hook)
    247 
    248 
    249     def initialize(self):
    250         pass
    251 
    252 
    253     def setup(self):
    254         pass
    255 
    256 
    257     def warmup(self, *args, **dargs):
    258         pass
    259 
    260 
    261     def drop_caches_between_iterations(self):
    262         if self.job.drop_caches_between_iterations:
    263             utils.drop_caches()
    264 
    265 
    266     def _call_run_once_with_retry(self, constraints, profile_only,
    267                                   postprocess_profiled_run, args, dargs):
    268         """Thin wrapper around _call_run_once that retries unsuccessful tests.
    269 
    270         If the job object's attribute test_retry is > 0 retry any tests that
    271         ran unsuccessfully X times.
    272         *Note this does not competely re-initialize the test, it only
    273             re-executes code once all the initial job set up (packages,
    274             sysinfo, etc) is complete.
    275         """
    276         if self.job.test_retry != 0:
    277             logging.info('Test will be retried a maximum of %d times',
    278                          self.job.test_retry)
    279 
    280         max_runs = self.job.test_retry
    281         for retry_run in xrange(0, max_runs+1):
    282             try:
    283                 self._call_run_once(constraints, profile_only,
    284                                     postprocess_profiled_run, args, dargs)
    285                 break
    286             except error.TestFailRetry as err:
    287                 if retry_run == max_runs:
    288                     raise
    289                 self.job.record('INFO', None, None, 'Run %s failed with %s' % (
    290                         retry_run, err))
    291         if retry_run > 0:
    292             self.write_test_keyval({'test_retries_before_success': retry_run})
    293 
    294 
    295     def _call_run_once(self, constraints, profile_only,
    296                        postprocess_profiled_run, args, dargs):
    297         self.drop_caches_between_iterations()
    298         # execute iteration hooks
    299         for hook in self.before_iteration_hooks:
    300             hook(self)
    301 
    302         try:
    303             if profile_only:
    304                 if not self.job.profilers.present():
    305                     self.job.record('WARN', None, None,
    306                                     'No profilers have been added but '
    307                                     'profile_only is set - nothing '
    308                                     'will be run')
    309                 self.run_once_profiling(postprocess_profiled_run,
    310                                         *args, **dargs)
    311             else:
    312                 self.before_run_once()
    313                 self.run_once(*args, **dargs)
    314                 self.after_run_once()
    315 
    316             self.postprocess_iteration()
    317             self.analyze_perf_constraints(constraints)
    318         # Catch and re-raise to let after_iteration_hooks see the exception.
    319         except:
    320             raise
    321         finally:
    322             for hook in reversed(self.after_iteration_hooks):
    323                 hook(self)
    324 
    325 
    326     def execute(self, iterations=None, test_length=None, profile_only=None,
    327                 _get_time=time.time, postprocess_profiled_run=None,
    328                 constraints=(), *args, **dargs):
    329         """
    330         This is the basic execute method for the tests inherited from base_test.
    331         If you want to implement a benchmark test, it's better to implement
    332         the run_once function, to cope with the profiling infrastructure. For
    333         other tests, you can just override the default implementation.
    334 
    335         @param test_length: The minimum test length in seconds. We'll run the
    336             run_once function for a number of times large enough to cover the
    337             minimum test length.
    338 
    339         @param iterations: A number of iterations that we'll run the run_once
    340             function. This parameter is incompatible with test_length and will
    341             be silently ignored if you specify both.
    342 
    343         @param profile_only: If true run X iterations with profilers enabled.
    344             If false run X iterations and one with profiling if profiles are
    345             enabled. If None, default to the value of job.default_profile_only.
    346 
    347         @param _get_time: [time.time] Used for unit test time injection.
    348 
    349         @param postprocess_profiled_run: Run the postprocessing for the
    350             profiled run.
    351         """
    352 
    353         # For our special class of tests, the benchmarks, we don't want
    354         # profilers to run during the test iterations. Let's reserve only
    355         # the last iteration for profiling, if needed. So let's stop
    356         # all profilers if they are present and active.
    357         profilers = self.job.profilers
    358         if profilers.active():
    359             profilers.stop(self)
    360         if profile_only is None:
    361             profile_only = self.job.default_profile_only
    362         # If the user called this test in an odd way (specified both iterations
    363         # and test_length), let's warn them.
    364         if iterations and test_length:
    365             logging.debug('Iterations parameter ignored (timed execution)')
    366         if test_length:
    367             test_start = _get_time()
    368             time_elapsed = 0
    369             timed_counter = 0
    370             logging.debug('Test started. Specified %d s as the minimum test '
    371                           'length', test_length)
    372             while time_elapsed < test_length:
    373                 timed_counter = timed_counter + 1
    374                 if time_elapsed == 0:
    375                     logging.debug('Executing iteration %d', timed_counter)
    376                 elif time_elapsed > 0:
    377                     logging.debug('Executing iteration %d, time_elapsed %d s',
    378                                   timed_counter, time_elapsed)
    379                 self._call_run_once_with_retry(constraints, profile_only,
    380                                                postprocess_profiled_run, args,
    381                                                dargs)
    382                 test_iteration_finish = _get_time()
    383                 time_elapsed = test_iteration_finish - test_start
    384             logging.debug('Test finished after %d iterations, '
    385                           'time elapsed: %d s', timed_counter, time_elapsed)
    386         else:
    387             if iterations is None:
    388                 iterations = 1
    389             if iterations > 1:
    390                 logging.debug('Test started. Specified %d iterations',
    391                               iterations)
    392             for self.iteration in xrange(1, iterations + 1):
    393                 if iterations > 1:
    394                     logging.debug('Executing iteration %d of %d',
    395                                   self.iteration, iterations)
    396                 self._call_run_once_with_retry(constraints, profile_only,
    397                                                postprocess_profiled_run, args,
    398                                                dargs)
    399 
    400         if not profile_only:
    401             self.iteration += 1
    402             self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
    403 
    404         # Do any postprocessing, normally extracting performance keyvals, etc
    405         self.postprocess()
    406         self.process_failed_constraints()
    407 
    408 
    409     def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
    410         profilers = self.job.profilers
    411         # Do a profiling run if necessary
    412         if profilers.present():
    413             self.drop_caches_between_iterations()
    414             profilers.before_start(self)
    415 
    416             self.before_run_once()
    417             profilers.start(self)
    418             logging.debug('Profilers present. Profiling run started')
    419 
    420             try:
    421                 self.run_once(*args, **dargs)
    422 
    423                 # Priority to the run_once() argument over the attribute.
    424                 postprocess_attribute = getattr(self,
    425                                                 'postprocess_profiled_run',
    426                                                 False)
    427 
    428                 if (postprocess_profiled_run or
    429                     (postprocess_profiled_run is None and
    430                      postprocess_attribute)):
    431                     self.postprocess_iteration()
    432 
    433             finally:
    434                 profilers.stop(self)
    435                 profilers.report(self)
    436 
    437             self.after_run_once()
    438 
    439 
    440     def postprocess(self):
    441         pass
    442 
    443 
    444     def postprocess_iteration(self):
    445         pass
    446 
    447 
    448     def cleanup(self):
    449         pass
    450 
    451 
    452     def before_run_once(self):
    453         """
    454         Override in tests that need it, will be called before any run_once()
    455         call including the profiling run (when it's called before starting
    456         the profilers).
    457         """
    458         pass
    459 
    460 
    461     def after_run_once(self):
    462         """
    463         Called after every run_once (including from a profiled run when it's
    464         called after stopping the profilers).
    465         """
    466         pass
    467 
    468 
    469     @staticmethod
    470     def _make_writable_to_others(directory):
    471         mode = os.stat(directory).st_mode
    472         mode = mode | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH
    473         os.chmod(directory, mode)
    474 
    475 
    476     def _exec(self, args, dargs):
    477         self.job.logging.tee_redirect_debug_dir(self.debugdir,
    478                                                 log_name=self.tagged_testname)
    479         try:
    480             if self.network_destabilizing:
    481                 self.job.disable_warnings("NETWORK")
    482 
    483             # write out the test attributes into a keyval
    484             dargs   = dargs.copy()
    485             run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
    486             keyvals = dargs.pop('test_attributes', {}).copy()
    487             keyvals['version'] = self.version
    488             for i, arg in enumerate(args):
    489                 keyvals['param-%d' % i] = repr(arg)
    490             for name, arg in dargs.iteritems():
    491                 keyvals['param-%s' % name] = repr(arg)
    492             self.write_test_keyval(keyvals)
    493 
    494             _validate_args(args, dargs, self.initialize, self.setup,
    495                            self.execute, self.cleanup)
    496 
    497             try:
    498                 # Make resultsdir and tmpdir accessible to everyone. We may
    499                 # output data to these directories as others, e.g., chronos.
    500                 self._make_writable_to_others(self.tmpdir)
    501                 self._make_writable_to_others(self.resultsdir)
    502 
    503                 # Initialize:
    504                 _cherry_pick_call(self.initialize, *args, **dargs)
    505 
    506                 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
    507                 try:
    508                     fcntl.flock(lockfile, fcntl.LOCK_EX)
    509                     # Setup: (compile and install the test, if needed)
    510                     p_args, p_dargs = _cherry_pick_args(self.setup, args, dargs)
    511                     utils.update_version(self.srcdir, self.preserve_srcdir,
    512                                          self.version, self.setup,
    513                                          *p_args, **p_dargs)
    514                 finally:
    515                     fcntl.flock(lockfile, fcntl.LOCK_UN)
    516                     lockfile.close()
    517 
    518                 # Execute:
    519                 os.chdir(self.outputdir)
    520 
    521                 # call self.warmup cherry picking the arguments it accepts and
    522                 # translate exceptions if needed
    523                 _call_test_function(_cherry_pick_call, self.warmup,
    524                                     *args, **dargs)
    525 
    526                 if hasattr(self, 'run_once'):
    527                     p_args, p_dargs = _cherry_pick_args(self.run_once,
    528                                                         args, dargs)
    529                     # pull in any non-* and non-** args from self.execute
    530                     for param in _get_nonstar_args(self.execute):
    531                         if param in dargs:
    532                             p_dargs[param] = dargs[param]
    533                 else:
    534                     p_args, p_dargs = _cherry_pick_args(self.execute,
    535                                                         args, dargs)
    536 
    537                 _call_test_function(self.execute, *p_args, **p_dargs)
    538             except Exception:
    539                 # Save the exception while we run our cleanup() before
    540                 # reraising it, but log it to so actual time of error is known.
    541                 exc_info = sys.exc_info()
    542                 logging.warning('Autotest caught exception when running test:',
    543                                 exc_info=True)
    544 
    545                 try:
    546                     try:
    547                         if run_cleanup:
    548                             _cherry_pick_call(self.cleanup, *args, **dargs)
    549                     except Exception:
    550                         logging.error('Ignoring exception during cleanup() '
    551                                       'phase:')
    552                         traceback.print_exc()
    553                         logging.error('Now raising the earlier %s error',
    554                                       exc_info[0])
    555                     self.crash_handler_report()
    556                 finally:
    557                     self.job.logging.restore()
    558                     try:
    559                         raise exc_info[0], exc_info[1], exc_info[2]
    560                     finally:
    561                         # http://docs.python.org/library/sys.html#sys.exc_info
    562                         # Be nice and prevent a circular reference.
    563                         del exc_info
    564             else:
    565                 try:
    566                     if run_cleanup:
    567                         _cherry_pick_call(self.cleanup, *args, **dargs)
    568                     self.crash_handler_report()
    569                 finally:
    570                     self.job.logging.restore()
    571         except error.AutotestError:
    572             if self.network_destabilizing:
    573                 self.job.enable_warnings("NETWORK")
    574             # Pass already-categorized errors on up.
    575             raise
    576         except Exception, e:
    577             if self.network_destabilizing:
    578                 self.job.enable_warnings("NETWORK")
    579             # Anything else is an ERROR in our own code, not execute().
    580             raise error.UnhandledTestError(e)
    581         else:
    582             if self.network_destabilizing:
    583                 self.job.enable_warnings("NETWORK")
    584 
    585 
    586     def runsubtest(self, url, *args, **dargs):
    587         """
    588         Execute another autotest test from inside the current test's scope.
    589 
    590         @param test: Parent test.
    591         @param url: Url of new test.
    592         @param tag: Tag added to test name.
    593         @param args: Args for subtest.
    594         @param dargs: Dictionary with args for subtest.
    595         @iterations: Number of subtest iterations.
    596         @profile_only: If true execute one profiled run.
    597         """
    598         dargs["profile_only"] = dargs.get("profile_only", False)
    599         test_basepath = self.outputdir[len(self.job.resultdir + "/"):]
    600         return self.job.run_test(url, master_testpath=test_basepath,
    601                                  *args, **dargs)
    602 
    603 
    604 def _get_nonstar_args(func):
    605     """Extract all the (normal) function parameter names.
    606 
    607     Given a function, returns a tuple of parameter names, specifically
    608     excluding the * and ** parameters, if the function accepts them.
    609 
    610     @param func: A callable that we want to chose arguments for.
    611 
    612     @return: A tuple of parameters accepted by the function.
    613     """
    614     return func.func_code.co_varnames[:func.func_code.co_argcount]
    615 
    616 
    617 def _cherry_pick_args(func, args, dargs):
    618     """Sanitize positional and keyword arguments before calling a function.
    619 
    620     Given a callable (func), an argument tuple and a dictionary of keyword
    621     arguments, pick only those arguments which the function is prepared to
    622     accept and return a new argument tuple and keyword argument dictionary.
    623 
    624     Args:
    625       func: A callable that we want to choose arguments for.
    626       args: A tuple of positional arguments to consider passing to func.
    627       dargs: A dictionary of keyword arguments to consider passing to func.
    628     Returns:
    629       A tuple of: (args tuple, keyword arguments dictionary)
    630     """
    631     # Cherry pick args:
    632     if func.func_code.co_flags & 0x04:
    633         # func accepts *args, so return the entire args.
    634         p_args = args
    635     else:
    636         p_args = ()
    637 
    638     # Cherry pick dargs:
    639     if func.func_code.co_flags & 0x08:
    640         # func accepts **dargs, so return the entire dargs.
    641         p_dargs = dargs
    642     else:
    643         # Only return the keyword arguments that func accepts.
    644         p_dargs = {}
    645         for param in _get_nonstar_args(func):
    646             if param in dargs:
    647                 p_dargs[param] = dargs[param]
    648 
    649     return p_args, p_dargs
    650 
    651 
    652 def _cherry_pick_call(func, *args, **dargs):
    653     """Cherry picks arguments from args/dargs based on what "func" accepts
    654     and calls the function with the picked arguments."""
    655     p_args, p_dargs = _cherry_pick_args(func, args, dargs)
    656     return func(*p_args, **p_dargs)
    657 
    658 
    659 def _validate_args(args, dargs, *funcs):
    660     """Verify that arguments are appropriate for at least one callable.
    661 
    662     Given a list of callables as additional parameters, verify that
    663     the proposed keyword arguments in dargs will each be accepted by at least
    664     one of the callables.
    665 
    666     NOTE: args is currently not supported and must be empty.
    667 
    668     Args:
    669       args: A tuple of proposed positional arguments.
    670       dargs: A dictionary of proposed keyword arguments.
    671       *funcs: Callables to be searched for acceptance of args and dargs.
    672     Raises:
    673       error.AutotestError: if an arg won't be accepted by any of *funcs.
    674     """
    675     all_co_flags = 0
    676     all_varnames = ()
    677     for func in funcs:
    678         all_co_flags |= func.func_code.co_flags
    679         all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
    680 
    681     # Check if given args belongs to at least one of the methods below.
    682     if len(args) > 0:
    683         # Current implementation doesn't allow the use of args.
    684         raise error.TestError('Unnamed arguments not accepted. Please '
    685                               'call job.run_test with named args only')
    686 
    687     # Check if given dargs belongs to at least one of the methods below.
    688     if len(dargs) > 0:
    689         if not all_co_flags & 0x08:
    690             # no func accepts *dargs, so:
    691             for param in dargs:
    692                 if not param in all_varnames:
    693                     raise error.AutotestError('Unknown parameter: %s' % param)
    694 
    695 
    696 def _installtest(job, url):
    697     (group, name) = job.pkgmgr.get_package_name(url, 'test')
    698 
    699     # Bail if the test is already installed
    700     group_dir = os.path.join(job.testdir, "download", group)
    701     if os.path.exists(os.path.join(group_dir, name)):
    702         return (group, name)
    703 
    704     # If the group directory is missing create it and add
    705     # an empty  __init__.py so that sub-directories are
    706     # considered for import.
    707     if not os.path.exists(group_dir):
    708         os.makedirs(group_dir)
    709         f = file(os.path.join(group_dir, '__init__.py'), 'w+')
    710         f.close()
    711 
    712     logging.debug("%s: installing test url=%s", name, url)
    713     tarball = os.path.basename(url)
    714     tarball_path = os.path.join(group_dir, tarball)
    715     test_dir = os.path.join(group_dir, name)
    716     job.pkgmgr.fetch_pkg(tarball, tarball_path,
    717                          repo_url = os.path.dirname(url))
    718 
    719     # Create the directory for the test
    720     if not os.path.exists(test_dir):
    721         os.mkdir(os.path.join(group_dir, name))
    722 
    723     job.pkgmgr.untar_pkg(tarball_path, test_dir)
    724 
    725     os.remove(tarball_path)
    726 
    727     # For this 'sub-object' to be importable via the name
    728     # 'group.name' we need to provide an __init__.py,
    729     # so link the main entry point to this.
    730     os.symlink(name + '.py', os.path.join(group_dir, name,
    731                             '__init__.py'))
    732 
    733     # The test is now installed.
    734     return (group, name)
    735 
    736 
    737 def _call_test_function(func, *args, **dargs):
    738     """Calls a test function and translates exceptions so that errors
    739     inside test code are considered test failures."""
    740     try:
    741         return func(*args, **dargs)
    742     except error.AutotestError:
    743         raise
    744     except Exception, e:
    745         # Other exceptions must be treated as a FAIL when
    746         # raised during the test functions
    747         raise error.UnhandledTestFail(e)
    748 
    749 
    750 def runtest(job, url, tag, args, dargs,
    751             local_namespace={}, global_namespace={},
    752             before_test_hook=None, after_test_hook=None,
    753             before_iteration_hook=None, after_iteration_hook=None):
    754     local_namespace = local_namespace.copy()
    755     global_namespace = global_namespace.copy()
    756     # if this is not a plain test name then download and install the
    757     # specified test
    758     if url.endswith('.tar.bz2'):
    759         (testgroup, testname) = _installtest(job, url)
    760         bindir = os.path.join(job.testdir, 'download', testgroup, testname)
    761         importdir = os.path.join(job.testdir, 'download')
    762         modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname)
    763         classname = '%s.%s' % (modulename, testname)
    764         path = testname
    765     else:
    766         # If the test is local, it may be under either testdir or site_testdir.
    767         # Tests in site_testdir override tests defined in testdir
    768         testname = path = url
    769         testgroup = ''
    770         path = re.sub(':', '/', testname)
    771         modulename = os.path.basename(path)
    772         classname = '%s.%s' % (modulename, modulename)
    773 
    774         # Try installing the test package
    775         # The job object may be either a server side job or a client side job.
    776         # 'install_pkg' method will be present only if it's a client side job.
    777         if hasattr(job, 'install_pkg'):
    778             try:
    779                 bindir = os.path.join(job.testdir, testname)
    780                 job.install_pkg(testname, 'test', bindir)
    781             except error.PackageInstallError:
    782                 # continue as a fall back mechanism and see if the test code
    783                 # already exists on the machine
    784                 pass
    785 
    786         bindir = None
    787         for dir in [job.testdir, getattr(job, 'site_testdir', None)]:
    788             if dir is not None and os.path.exists(os.path.join(dir, path)):
    789                 importdir = bindir = os.path.join(dir, path)
    790         if not bindir:
    791             raise error.TestError(testname + ': test does not exist')
    792 
    793     subdir = os.path.join(dargs.pop('master_testpath', ""), testname)
    794     outputdir = os.path.join(job.resultdir, subdir)
    795     if tag:
    796         outputdir += '.' + tag
    797 
    798     local_namespace['job'] = job
    799     local_namespace['bindir'] = bindir
    800     local_namespace['outputdir'] = outputdir
    801 
    802     sys.path.insert(0, importdir)
    803     try:
    804         exec ('import %s' % modulename, local_namespace, global_namespace)
    805         exec ("mytest = %s(job, bindir, outputdir)" % classname,
    806               local_namespace, global_namespace)
    807     finally:
    808         sys.path.pop(0)
    809 
    810     pwd = os.getcwd()
    811     os.chdir(outputdir)
    812 
    813     try:
    814         mytest = global_namespace['mytest']
    815         mytest.success = False
    816         if before_test_hook:
    817             before_test_hook(mytest)
    818 
    819         # we use the register iteration hooks methods to register the passed
    820         # in hooks
    821         if before_iteration_hook:
    822             mytest.register_before_iteration_hook(before_iteration_hook)
    823         if after_iteration_hook:
    824             mytest.register_after_iteration_hook(after_iteration_hook)
    825         mytest._exec(args, dargs)
    826         mytest.success = True
    827     finally:
    828         os.chdir(pwd)
    829         if after_test_hook:
    830             after_test_hook(mytest)
    831         shutil.rmtree(mytest.tmpdir, ignore_errors=True)
    832