Home | History | Annotate | Download | only in server
      1 # pylint: disable-msg=C0111
      2 
      3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 """
      7 The main job wrapper for the server side.
      8 
      9 This is the core infrastructure. Derived from the client side job.py
     10 
     11 Copyright Martin J. Bligh, Andy Whitcroft 2007
     12 """
     13 
     14 import getpass, os, sys, re, tempfile, time, select, platform
     15 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
     16 from autotest_lib.client.bin import sysinfo
     17 from autotest_lib.client.common_lib import base_job, global_config
     18 from autotest_lib.client.common_lib import error, utils, packages
     19 from autotest_lib.client.common_lib import logging_manager
     20 from autotest_lib.server import test, subcommand, profilers
     21 from autotest_lib.server import utils as server_utils
     22 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     23 from autotest_lib.server.hosts import abstract_ssh, factory as host_factory
     24 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
     25 
     26 
     27 INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value(
     28         'autoserv', 'incremental_tko_parsing', type=bool, default=False)
     29 
     30 def _control_segment_path(name):
     31     """Get the pathname of the named control segment file."""
     32     server_dir = os.path.dirname(os.path.abspath(__file__))
     33     return os.path.join(server_dir, "control_segments", name)
     34 
     35 
     36 CLIENT_CONTROL_FILENAME = 'control'
     37 SERVER_CONTROL_FILENAME = 'control.srv'
     38 MACHINES_FILENAME = '.machines'
     39 
     40 CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
     41 CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
     42 CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
     43 INSTALL_CONTROL_FILE = _control_segment_path('install')
     44 CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
     45 VERIFY_CONTROL_FILE = _control_segment_path('verify')
     46 REPAIR_CONTROL_FILE = _control_segment_path('repair')
     47 PROVISION_CONTROL_FILE = _control_segment_path('provision')
     48 VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
     49 RESET_CONTROL_FILE = _control_segment_path('reset')
     50 
     51 # by default provide a stub that generates no site data
     52 def _get_site_job_data_dummy(job):
     53     return {}
     54 
     55 
     56 class status_indenter(base_job.status_indenter):
     57     """Provide a simple integer-backed status indenter."""
     58     def __init__(self):
     59         self._indent = 0
     60 
     61 
     62     @property
     63     def indent(self):
     64         return self._indent
     65 
     66 
     67     def increment(self):
     68         self._indent += 1
     69 
     70 
     71     def decrement(self):
     72         self._indent -= 1
     73 
     74 
     75     def get_context(self):
     76         """Returns a context object for use by job.get_record_context."""
     77         class context(object):
     78             def __init__(self, indenter, indent):
     79                 self._indenter = indenter
     80                 self._indent = indent
     81             def restore(self):
     82                 self._indenter._indent = self._indent
     83         return context(self, self._indent)
     84 
     85 
     86 class server_job_record_hook(object):
     87     """The job.record hook for server job. Used to inject WARN messages from
     88     the console or vlm whenever new logs are written, and to echo any logs
     89     to INFO level logging. Implemented as a class so that it can use state to
     90     block recursive calls, so that the hook can call job.record itself to
     91     log WARN messages.
     92 
     93     Depends on job._read_warnings and job._logger.
     94     """
     95     def __init__(self, job):
     96         self._job = job
     97         self._being_called = False
     98 
     99 
    100     def __call__(self, entry):
    101         """A wrapper around the 'real' record hook, the _hook method, which
    102         prevents recursion. This isn't making any effort to be threadsafe,
    103         the intent is to outright block infinite recursion via a
    104         job.record->_hook->job.record->_hook->job.record... chain."""
    105         if self._being_called:
    106             return
    107         self._being_called = True
    108         try:
    109             self._hook(self._job, entry)
    110         finally:
    111             self._being_called = False
    112 
    113 
    114     @staticmethod
    115     def _hook(job, entry):
    116         """The core hook, which can safely call job.record."""
    117         entries = []
    118         # poll all our warning loggers for new warnings
    119         for timestamp, msg in job._read_warnings():
    120             warning_entry = base_job.status_log_entry(
    121                 'WARN', None, None, msg, {}, timestamp=timestamp)
    122             entries.append(warning_entry)
    123             job.record_entry(warning_entry)
    124         # echo rendered versions of all the status logs to info
    125         entries.append(entry)
    126         for entry in entries:
    127             rendered_entry = job._logger.render_entry(entry)
    128             logging.info(rendered_entry)
    129             job._parse_status(rendered_entry)
    130 
    131 
    132 class base_server_job(base_job.base_job):
    133     """The server-side concrete implementation of base_job.
    134 
    135     Optional properties provided by this implementation:
    136         serverdir
    137 
    138         num_tests_run
    139         num_tests_failed
    140 
    141         warning_manager
    142         warning_loggers
    143     """
    144 
    145     _STATUS_VERSION = 1
    146 
    147     # TODO crbug.com/285395 eliminate ssh_verbosity_flag
    148     def __init__(self, control, args, resultdir, label, user, machines,
    149                  client=False, parse_job='',
    150                  ssh_user=host_factory.DEFAULT_SSH_USER,
    151                  ssh_port=host_factory.DEFAULT_SSH_PORT,
    152                  ssh_pass=host_factory.DEFAULT_SSH_PASS,
    153                  ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
    154                  ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
    155                  test_retry=0, group_name='',
    156                  tag='', disable_sysinfo=False,
    157                  control_filename=SERVER_CONTROL_FILENAME,
    158                  parent_job_id=None, host_attributes=None, in_lab=False):
    159         """
    160         Create a server side job object.
    161 
    162         @param control: The pathname of the control file.
    163         @param args: Passed to the control file.
    164         @param resultdir: Where to throw the results.
    165         @param label: Description of the job.
    166         @param user: Username for the job (email address).
    167         @param client: True if this is a client-side control file.
    168         @param parse_job: string, if supplied it is the job execution tag that
    169                 the results will be passed through to the TKO parser with.
    170         @param ssh_user: The SSH username.  [root]
    171         @param ssh_port: The SSH port number.  [22]
    172         @param ssh_pass: The SSH passphrase, if needed.
    173         @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
    174                 '-vvv', or an empty string if not needed.
    175         @param ssh_options: A string giving additional options that will be
    176                             included in ssh commands.
    177         @param test_retry: The number of times to retry a test if the test did
    178                 not complete successfully.
    179         @param group_name: If supplied, this will be written out as
    180                 host_group_name in the keyvals file for the parser.
    181         @param tag: The job execution tag from the scheduler.  [optional]
    182         @param disable_sysinfo: Whether we should disable the sysinfo step of
    183                 tests for a modest shortening of test time.  [optional]
    184         @param control_filename: The filename where the server control file
    185                 should be written in the results directory.
    186         @param parent_job_id: Job ID of the parent job. Default to None if the
    187                 job does not have a parent job.
    188         @param host_attributes: Dict of host attributes passed into autoserv
    189                                 via the command line. If specified here, these
    190                                 attributes will apply to all machines.
    191         @param in_lab: Boolean that indicates if this is running in the lab
    192                        environment.
    193         """
    194         super(base_server_job, self).__init__(resultdir=resultdir,
    195                                               test_retry=test_retry)
    196         path = os.path.dirname(__file__)
    197         self.test_retry = test_retry
    198         self.control = control
    199         self._uncollected_log_file = os.path.join(self.resultdir,
    200                                                   'uncollected_logs')
    201         debugdir = os.path.join(self.resultdir, 'debug')
    202         if not os.path.exists(debugdir):
    203             os.mkdir(debugdir)
    204 
    205         if user:
    206             self.user = user
    207         else:
    208             self.user = getpass.getuser()
    209 
    210         self.args = args
    211         self.label = label
    212         self.machines = machines
    213         self._client = client
    214         self.warning_loggers = set()
    215         self.warning_manager = warning_manager()
    216         self._ssh_user = ssh_user
    217         self._ssh_port = ssh_port
    218         self._ssh_pass = ssh_pass
    219         self._ssh_verbosity_flag = ssh_verbosity_flag
    220         self._ssh_options = ssh_options
    221         self.tag = tag
    222         self.last_boot_tag = None
    223         self.hosts = set()
    224         self.drop_caches = False
    225         self.drop_caches_between_iterations = False
    226         self._control_filename = control_filename
    227         self._disable_sysinfo = disable_sysinfo
    228 
    229         self.logging = logging_manager.get_logging_manager(
    230                 manage_stdout_and_stderr=True, redirect_fds=True)
    231         subcommand.logging_manager_object = self.logging
    232 
    233         self.sysinfo = sysinfo.sysinfo(self.resultdir)
    234         self.profilers = profilers.profilers(self)
    235 
    236         job_data = {'label' : label, 'user' : user,
    237                     'hostname' : ','.join(machines),
    238                     'drone' : platform.node(),
    239                     'status_version' : str(self._STATUS_VERSION),
    240                     'job_started' : str(int(time.time()))}
    241         # Save parent job id to keyvals, so parser can retrieve the info and
    242         # write to tko_jobs record.
    243         if parent_job_id:
    244             job_data['parent_job_id'] = parent_job_id
    245         if group_name:
    246             job_data['host_group_name'] = group_name
    247 
    248         # only write these keyvals out on the first job in a resultdir
    249         if 'job_started' not in utils.read_keyval(self.resultdir):
    250             job_data.update(get_site_job_data(self))
    251             utils.write_keyval(self.resultdir, job_data)
    252 
    253         self._parse_job = parse_job
    254         self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job
    255                               and len(machines) <= 1)
    256         self.pkgmgr = packages.PackageManager(
    257             self.autodir, run_function_dargs={'timeout':600})
    258         self.num_tests_run = 0
    259         self.num_tests_failed = 0
    260 
    261         self._register_subcommand_hooks()
    262 
    263         # these components aren't usable on the server
    264         self.bootloader = None
    265         self.harness = None
    266 
    267         # set up the status logger
    268         self._indenter = status_indenter()
    269         self._logger = base_job.status_logger(
    270             self, self._indenter, 'status.log', 'status.log',
    271             record_hook=server_job_record_hook(self))
    272 
    273         # Initialize a flag to indicate DUT failure during the test, e.g.,
    274         # unexpected reboot.
    275         self.failed_with_device_error = False
    276 
    277         self.parent_job_id = parent_job_id
    278         self.in_lab = in_lab
    279         afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
    280         self.machine_dict_list = []
    281         for machine in self.machines:
    282             host_attributes = host_attributes or {}
    283             if self.in_lab:
    284                 host = afe.get_hosts(hostname=machine)[0]
    285                 host_attributes.update(host.attributes)
    286             self.machine_dict_list.append(
    287                     {'hostname' : machine,
    288                      'host_attributes' : host_attributes})
    289 
    290 
    291     @classmethod
    292     def _find_base_directories(cls):
    293         """
    294         Determine locations of autodir, clientdir and serverdir. Assumes
    295         that this file is located within serverdir and uses __file__ along
    296         with relative paths to resolve the location.
    297         """
    298         serverdir = os.path.abspath(os.path.dirname(__file__))
    299         autodir = os.path.normpath(os.path.join(serverdir, '..'))
    300         clientdir = os.path.join(autodir, 'client')
    301         return autodir, clientdir, serverdir
    302 
    303 
    304     def _find_resultdir(self, resultdir, *args, **dargs):
    305         """
    306         Determine the location of resultdir. For server jobs we expect one to
    307         always be explicitly passed in to __init__, so just return that.
    308         """
    309         if resultdir:
    310             return os.path.normpath(resultdir)
    311         else:
    312             return None
    313 
    314 
    315     def _get_status_logger(self):
    316         """Return a reference to the status logger."""
    317         return self._logger
    318 
    319 
    320     @staticmethod
    321     def _load_control_file(path):
    322         f = open(path)
    323         try:
    324             control_file = f.read()
    325         finally:
    326             f.close()
    327         return re.sub('\r', '', control_file)
    328 
    329 
    330     def _register_subcommand_hooks(self):
    331         """
    332         Register some hooks into the subcommand modules that allow us
    333         to properly clean up self.hosts created in forked subprocesses.
    334         """
    335         def on_fork(cmd):
    336             self._existing_hosts_on_fork = set(self.hosts)
    337         def on_join(cmd):
    338             new_hosts = self.hosts - self._existing_hosts_on_fork
    339             for host in new_hosts:
    340                 host.close()
    341         subcommand.subcommand.register_fork_hook(on_fork)
    342         subcommand.subcommand.register_join_hook(on_join)
    343 
    344 
    345     def init_parser(self):
    346         """
    347         Start the continuous parsing of self.resultdir. This sets up
    348         the database connection and inserts the basic job object into
    349         the database if necessary.
    350         """
    351         if not self._using_parser:
    352             return
    353         # redirect parser debugging to .parse.log
    354         parse_log = os.path.join(self.resultdir, '.parse.log')
    355         parse_log = open(parse_log, 'w', 0)
    356         tko_utils.redirect_parser_debugging(parse_log)
    357         # create a job model object and set up the db
    358         self.results_db = tko_db.db(autocommit=True)
    359         self.parser = status_lib.parser(self._STATUS_VERSION)
    360         self.job_model = self.parser.make_job(self.resultdir)
    361         self.parser.start(self.job_model)
    362         # check if a job already exists in the db and insert it if
    363         # it does not
    364         job_idx = self.results_db.find_job(self._parse_job)
    365         if job_idx is None:
    366             self.results_db.insert_job(self._parse_job, self.job_model,
    367                                        self.parent_job_id)
    368         else:
    369             machine_idx = self.results_db.lookup_machine(self.job_model.machine)
    370             self.job_model.index = job_idx
    371             self.job_model.machine_idx = machine_idx
    372 
    373 
    374     def cleanup_parser(self):
    375         """
    376         This should be called after the server job is finished
    377         to carry out any remaining cleanup (e.g. flushing any
    378         remaining test results to the results db)
    379         """
    380         if not self._using_parser:
    381             return
    382         final_tests = self.parser.end()
    383         for test in final_tests:
    384             self.__insert_test(test)
    385         self._using_parser = False
    386 
    387     # TODO crbug.com/285395 add a kwargs parameter.
    388     def _make_namespace(self):
    389         """Create a namespace dictionary to be passed along to control file.
    390 
    391         Creates a namespace argument populated with standard values:
    392         machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
    393         and ssh_options.
    394         """
    395         namespace = {'machines' : self.machine_dict_list,
    396                      'job' : self,
    397                      'ssh_user' : self._ssh_user,
    398                      'ssh_port' : self._ssh_port,
    399                      'ssh_pass' : self._ssh_pass,
    400                      'ssh_verbosity_flag' : self._ssh_verbosity_flag,
    401                      'ssh_options' : self._ssh_options}
    402         return namespace
    403 
    404 
    405     def cleanup(self, labels):
    406         """Cleanup machines.
    407 
    408         @param labels: Comma separated job labels, will be used to
    409                        determine special task actions.
    410         """
    411         if not self.machines:
    412             raise error.AutoservError('No machines specified to cleanup')
    413         if self.resultdir:
    414             os.chdir(self.resultdir)
    415 
    416         namespace = self._make_namespace()
    417         namespace.update({'job_labels': labels, 'args': ''})
    418         self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
    419 
    420 
    421     def verify(self, labels):
    422         """Verify machines are all ssh-able.
    423 
    424         @param labels: Comma separated job labels, will be used to
    425                        determine special task actions.
    426         """
    427         if not self.machines:
    428             raise error.AutoservError('No machines specified to verify')
    429         if self.resultdir:
    430             os.chdir(self.resultdir)
    431 
    432         namespace = self._make_namespace()
    433         namespace.update({'job_labels': labels, 'args': ''})
    434         self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
    435 
    436 
    437     def reset(self, labels):
    438         """Reset machines by first cleanup then verify each machine.
    439 
    440         @param labels: Comma separated job labels, will be used to
    441                        determine special task actions.
    442         """
    443         if not self.machines:
    444             raise error.AutoservError('No machines specified to reset.')
    445         if self.resultdir:
    446             os.chdir(self.resultdir)
    447 
    448         namespace = self._make_namespace()
    449         namespace.update({'job_labels': labels, 'args': ''})
    450         self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
    451 
    452 
    453     def repair(self, labels):
    454         """Repair machines.
    455 
    456         @param labels: Comma separated job labels, will be used to
    457                        determine special task actions.
    458         """
    459         if not self.machines:
    460             raise error.AutoservError('No machines specified to repair')
    461         if self.resultdir:
    462             os.chdir(self.resultdir)
    463 
    464         namespace = self._make_namespace()
    465         namespace.update({'job_labels': labels, 'args': ''})
    466         self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
    467 
    468 
    469     def provision(self, labels):
    470         """
    471         Provision all hosts to match |labels|.
    472 
    473         @param labels: A comma seperated string of labels to provision the
    474                        host to.
    475 
    476         """
    477         control = self._load_control_file(PROVISION_CONTROL_FILE)
    478         self.run(control=control, job_labels=labels)
    479 
    480 
    481     def precheck(self):
    482         """
    483         perform any additional checks in derived classes.
    484         """
    485         pass
    486 
    487 
    488     def enable_external_logging(self):
    489         """
    490         Start or restart external logging mechanism.
    491         """
    492         pass
    493 
    494 
    495     def disable_external_logging(self):
    496         """
    497         Pause or stop external logging mechanism.
    498         """
    499         pass
    500 
    501 
    502     def use_external_logging(self):
    503         """
    504         Return True if external logging should be used.
    505         """
    506         return False
    507 
    508 
    509     def _make_parallel_wrapper(self, function, machines, log):
    510         """Wrap function as appropriate for calling by parallel_simple."""
    511         # machines could be a list of dictionaries, e.g.,
    512         # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
    513         # The dictionary is generated in base_server_job.__init__, refer to
    514         # variable machine_dict_list, then passed in with namespace, see method
    515         # base_server_job._make_namespace.
    516         # To compare the machinese to self.machines, which is a list of machine
    517         # hostname, we need to convert machines back to a list of hostnames.
    518         # Note that the order of hostnames doesn't matter, as is_forking will be
    519         # True if there are more than one machine.
    520         if (machines and isinstance(machines, list)
    521             and isinstance(machines[0], dict)):
    522             machines = [m['hostname'] for m in machines]
    523         is_forking = not (len(machines) == 1 and self.machines == machines)
    524         if self._parse_job and is_forking and log:
    525             def wrapper(machine):
    526                 hostname = server_utils.get_hostname_from_machine(machine)
    527                 self._parse_job += "/" + hostname
    528                 self._using_parser = INCREMENTAL_TKO_PARSING
    529                 self.machines = [machine]
    530                 self.push_execution_context(hostname)
    531                 os.chdir(self.resultdir)
    532                 utils.write_keyval(self.resultdir, {"hostname": hostname})
    533                 self.init_parser()
    534                 result = function(machine)
    535                 self.cleanup_parser()
    536                 return result
    537         elif len(machines) > 1 and log:
    538             def wrapper(machine):
    539                 hostname = server_utils.get_hostname_from_machine(machine)
    540                 self.push_execution_context(hostname)
    541                 os.chdir(self.resultdir)
    542                 machine_data = {'hostname' : hostname,
    543                                 'status_version' : str(self._STATUS_VERSION)}
    544                 utils.write_keyval(self.resultdir, machine_data)
    545                 result = function(machine)
    546                 return result
    547         else:
    548             wrapper = function
    549         return wrapper
    550 
    551 
    552     def parallel_simple(self, function, machines, log=True, timeout=None,
    553                         return_results=False):
    554         """
    555         Run 'function' using parallel_simple, with an extra wrapper to handle
    556         the necessary setup for continuous parsing, if possible. If continuous
    557         parsing is already properly initialized then this should just work.
    558 
    559         @param function: A callable to run in parallel given each machine.
    560         @param machines: A list of machine names to be passed one per subcommand
    561                 invocation of function.
    562         @param log: If True, output will be written to output in a subdirectory
    563                 named after each machine.
    564         @param timeout: Seconds after which the function call should timeout.
    565         @param return_results: If True instead of an AutoServError being raised
    566                 on any error a list of the results|exceptions from the function
    567                 called on each arg is returned.  [default: False]
    568 
    569         @raises error.AutotestError: If any of the functions failed.
    570         """
    571         wrapper = self._make_parallel_wrapper(function, machines, log)
    572         return subcommand.parallel_simple(wrapper, machines,
    573                                           log=log, timeout=timeout,
    574                                           return_results=return_results)
    575 
    576 
    577     def parallel_on_machines(self, function, machines, timeout=None):
    578         """
    579         @param function: Called in parallel with one machine as its argument.
    580         @param machines: A list of machines to call function(machine) on.
    581         @param timeout: Seconds after which the function call should timeout.
    582 
    583         @returns A list of machines on which function(machine) returned
    584                 without raising an exception.
    585         """
    586         results = self.parallel_simple(function, machines, timeout=timeout,
    587                                        return_results=True)
    588         success_machines = []
    589         for result, machine in itertools.izip(results, machines):
    590             if not isinstance(result, Exception):
    591                 success_machines.append(machine)
    592         return success_machines
    593 
    594 
    595     _USE_TEMP_DIR = object()
    596     def run(self, install_before=False, install_after=False,
    597             collect_crashdumps=True, namespace={}, control=None,
    598             control_file_dir=None, verify_job_repo_url=False,
    599             only_collect_crashinfo=False, skip_crash_collection=False,
    600             job_labels='', use_packaging=True):
    601         # for a normal job, make sure the uncollected logs file exists
    602         # for a crashinfo-only run it should already exist, bail out otherwise
    603         created_uncollected_logs = False
    604         logging.info("I am PID %s", os.getpid())
    605         if self.resultdir and not os.path.exists(self._uncollected_log_file):
    606             if only_collect_crashinfo:
    607                 # if this is a crashinfo-only run, and there were no existing
    608                 # uncollected logs, just bail out early
    609                 logging.info("No existing uncollected logs, "
    610                              "skipping crashinfo collection")
    611                 return
    612             else:
    613                 log_file = open(self._uncollected_log_file, "w")
    614                 pickle.dump([], log_file)
    615                 log_file.close()
    616                 created_uncollected_logs = True
    617 
    618         # use a copy so changes don't affect the original dictionary
    619         namespace = namespace.copy()
    620         machines = self.machines
    621         if control is None:
    622             if self.control is None:
    623                 control = ''
    624             else:
    625                 control = self._load_control_file(self.control)
    626         if control_file_dir is None:
    627             control_file_dir = self.resultdir
    628 
    629         self.aborted = False
    630         namespace.update(self._make_namespace())
    631         namespace.update({'args' : self.args,
    632                           'job_labels' : job_labels})
    633         test_start_time = int(time.time())
    634 
    635         if self.resultdir:
    636             os.chdir(self.resultdir)
    637             # touch status.log so that the parser knows a job is running here
    638             open(self.get_status_log_path(), 'a').close()
    639             self.enable_external_logging()
    640 
    641         collect_crashinfo = True
    642         temp_control_file_dir = None
    643         try:
    644             try:
    645                 if install_before and machines:
    646                     self._execute_code(INSTALL_CONTROL_FILE, namespace)
    647 
    648                 if only_collect_crashinfo:
    649                     return
    650 
    651                 # If the verify_job_repo_url option is set but we're unable
    652                 # to actually verify that the job_repo_url contains the autotest
    653                 # package, this job will fail.
    654                 if verify_job_repo_url:
    655                     self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
    656                                        namespace)
    657                 else:
    658                     logging.warning('Not checking if job_repo_url contains '
    659                                     'autotest packages on %s', machines)
    660 
    661                 # determine the dir to write the control files to
    662                 cfd_specified = (control_file_dir
    663                                  and control_file_dir is not self._USE_TEMP_DIR)
    664                 if cfd_specified:
    665                     temp_control_file_dir = None
    666                 else:
    667                     temp_control_file_dir = tempfile.mkdtemp(
    668                         suffix='temp_control_file_dir')
    669                     control_file_dir = temp_control_file_dir
    670                 server_control_file = os.path.join(control_file_dir,
    671                                                    self._control_filename)
    672                 client_control_file = os.path.join(control_file_dir,
    673                                                    CLIENT_CONTROL_FILENAME)
    674                 if self._client:
    675                     namespace['control'] = control
    676                     utils.open_write_close(client_control_file, control)
    677                     shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
    678                                     server_control_file)
    679                 else:
    680                     utils.open_write_close(server_control_file, control)
    681 
    682                 logging.info("Processing control file")
    683                 namespace['use_packaging'] = use_packaging
    684                 self._execute_code(server_control_file, namespace)
    685                 logging.info("Finished processing control file")
    686 
    687                 # If no device error occured, no need to collect crashinfo.
    688                 collect_crashinfo = self.failed_with_device_error
    689             except Exception, e:
    690                 try:
    691                     logging.exception(
    692                             'Exception escaped control file, job aborting:')
    693                     reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
    694                                     ' ', str(e))
    695                     self.record('INFO', None, None, str(e),
    696                                 {'job_abort_reason': reason})
    697                 except:
    698                     pass # don't let logging exceptions here interfere
    699                 raise
    700         finally:
    701             if temp_control_file_dir:
    702                 # Clean up temp directory used for copies of the control files
    703                 try:
    704                     shutil.rmtree(temp_control_file_dir)
    705                 except Exception, e:
    706                     logging.warning('Could not remove temp directory %s: %s',
    707                                  temp_control_file_dir, e)
    708 
    709             if machines and (collect_crashdumps or collect_crashinfo):
    710                 namespace['test_start_time'] = test_start_time
    711                 if skip_crash_collection:
    712                     logging.info('Skipping crash dump/info collection '
    713                                  'as requested.')
    714                 elif collect_crashinfo:
    715                     # includes crashdumps
    716                     self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
    717                 else:
    718                     self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
    719             self.disable_external_logging()
    720             if self._uncollected_log_file and created_uncollected_logs:
    721                 os.remove(self._uncollected_log_file)
    722             if install_after and machines:
    723                 self._execute_code(INSTALL_CONTROL_FILE, namespace)
    724 
    725 
    726     def run_test(self, url, *args, **dargs):
    727         """
    728         Summon a test object and run it.
    729 
    730         tag
    731                 tag to add to testname
    732         url
    733                 url of the test to run
    734         """
    735         if self._disable_sysinfo:
    736             dargs['disable_sysinfo'] = True
    737 
    738         group, testname = self.pkgmgr.get_package_name(url, 'test')
    739         testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
    740         outputdir = self._make_test_outputdir(subdir)
    741 
    742         def group_func():
    743             try:
    744                 test.runtest(self, url, tag, args, dargs)
    745             except error.TestBaseException, e:
    746                 self.record(e.exit_status, subdir, testname, str(e))
    747                 raise
    748             except Exception, e:
    749                 info = str(e) + "\n" + traceback.format_exc()
    750                 self.record('FAIL', subdir, testname, info)
    751                 raise
    752             else:
    753                 self.record('GOOD', subdir, testname, 'completed successfully')
    754 
    755         result, exc_info = self._run_group(testname, subdir, group_func)
    756         if exc_info and isinstance(exc_info[1], error.TestBaseException):
    757             return False
    758         elif exc_info:
    759             raise exc_info[0], exc_info[1], exc_info[2]
    760         else:
    761             return True
    762 
    763 
    764     def _run_group(self, name, subdir, function, *args, **dargs):
    765         """\
    766         Underlying method for running something inside of a group.
    767         """
    768         result, exc_info = None, None
    769         try:
    770             self.record('START', subdir, name)
    771             result = function(*args, **dargs)
    772         except error.TestBaseException, e:
    773             self.record("END %s" % e.exit_status, subdir, name)
    774             exc_info = sys.exc_info()
    775         except Exception, e:
    776             err_msg = str(e) + '\n'
    777             err_msg += traceback.format_exc()
    778             self.record('END ABORT', subdir, name, err_msg)
    779             raise error.JobError(name + ' failed\n' + traceback.format_exc())
    780         else:
    781             self.record('END GOOD', subdir, name)
    782 
    783         return result, exc_info
    784 
    785 
    786     def run_group(self, function, *args, **dargs):
    787         """\
    788         function:
    789                 subroutine to run
    790         *args:
    791                 arguments for the function
    792         """
    793 
    794         name = function.__name__
    795 
    796         # Allow the tag for the group to be specified.
    797         tag = dargs.pop('tag', None)
    798         if tag:
    799             name = tag
    800 
    801         return self._run_group(name, None, function, *args, **dargs)[0]
    802 
    803 
    804     def run_op(self, op, op_func, get_kernel_func):
    805         """\
    806         A specialization of run_group meant specifically for handling
    807         management operation. Includes support for capturing the kernel version
    808         after the operation.
    809 
    810         Args:
    811            op: name of the operation.
    812            op_func: a function that carries out the operation (reboot, suspend)
    813            get_kernel_func: a function that returns a string
    814                             representing the kernel version.
    815         """
    816         try:
    817             self.record('START', None, op)
    818             op_func()
    819         except Exception, e:
    820             err_msg = str(e) + '\n' + traceback.format_exc()
    821             self.record('END FAIL', None, op, err_msg)
    822             raise
    823         else:
    824             kernel = get_kernel_func()
    825             self.record('END GOOD', None, op,
    826                         optional_fields={"kernel": kernel})
    827 
    828 
    829     def run_control(self, path):
    830         """Execute a control file found at path (relative to the autotest
    831         path). Intended for executing a control file within a control file,
    832         not for running the top-level job control file."""
    833         path = os.path.join(self.autodir, path)
    834         control_file = self._load_control_file(path)
    835         self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
    836 
    837 
    838     def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
    839         self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
    840                                    on_every_test)
    841 
    842 
    843     def add_sysinfo_logfile(self, file, on_every_test=False):
    844         self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
    845 
    846 
    847     def _add_sysinfo_loggable(self, loggable, on_every_test):
    848         if on_every_test:
    849             self.sysinfo.test_loggables.add(loggable)
    850         else:
    851             self.sysinfo.boot_loggables.add(loggable)
    852 
    853 
    854     def _read_warnings(self):
    855         """Poll all the warning loggers and extract any new warnings that have
    856         been logged. If the warnings belong to a category that is currently
    857         disabled, this method will discard them and they will no longer be
    858         retrievable.
    859 
    860         Returns a list of (timestamp, message) tuples, where timestamp is an
    861         integer epoch timestamp."""
    862         warnings = []
    863         while True:
    864             # pull in a line of output from every logger that has
    865             # output ready to be read
    866             loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
    867             closed_loggers = set()
    868             for logger in loggers:
    869                 line = logger.readline()
    870                 # record any broken pipes (aka line == empty)
    871                 if len(line) == 0:
    872                     closed_loggers.add(logger)
    873                     continue
    874                 # parse out the warning
    875                 timestamp, msgtype, msg = line.split('\t', 2)
    876                 timestamp = int(timestamp)
    877                 # if the warning is valid, add it to the results
    878                 if self.warning_manager.is_valid(timestamp, msgtype):
    879                     warnings.append((timestamp, msg.strip()))
    880 
    881             # stop listening to loggers that are closed
    882             self.warning_loggers -= closed_loggers
    883 
    884             # stop if none of the loggers have any output left
    885             if not loggers:
    886                 break
    887 
    888         # sort into timestamp order
    889         warnings.sort()
    890         return warnings
    891 
    892 
    893     def _unique_subdirectory(self, base_subdirectory_name):
    894         """Compute a unique results subdirectory based on the given name.
    895 
    896         Appends base_subdirectory_name with a number as necessary to find a
    897         directory name that doesn't already exist.
    898         """
    899         subdirectory = base_subdirectory_name
    900         counter = 1
    901         while os.path.exists(os.path.join(self.resultdir, subdirectory)):
    902             subdirectory = base_subdirectory_name + '.' + str(counter)
    903             counter += 1
    904         return subdirectory
    905 
    906 
    907     def get_record_context(self):
    908         """Returns an object representing the current job.record context.
    909 
    910         The object returned is an opaque object with a 0-arg restore method
    911         which can be called to restore the job.record context (i.e. indentation)
    912         to the current level. The intention is that it should be used when
    913         something external which generate job.record calls (e.g. an autotest
    914         client) can fail catastrophically and the server job record state
    915         needs to be reset to its original "known good" state.
    916 
    917         @return: A context object with a 0-arg restore() method."""
    918         return self._indenter.get_context()
    919 
    920 
    921     def record_summary(self, status_code, test_name, reason='', attributes=None,
    922                        distinguishing_attributes=(), child_test_ids=None):
    923         """Record a summary test result.
    924 
    925         @param status_code: status code string, see
    926                 common_lib.log.is_valid_status()
    927         @param test_name: name of the test
    928         @param reason: (optional) string providing detailed reason for test
    929                 outcome
    930         @param attributes: (optional) dict of string keyvals to associate with
    931                 this result
    932         @param distinguishing_attributes: (optional) list of attribute names
    933                 that should be used to distinguish identically-named test
    934                 results.  These attributes should be present in the attributes
    935                 parameter.  This is used to generate user-friendly subdirectory
    936                 names.
    937         @param child_test_ids: (optional) list of test indices for test results
    938                 used in generating this result.
    939         """
    940         subdirectory_name_parts = [test_name]
    941         for attribute in distinguishing_attributes:
    942             assert attributes
    943             assert attribute in attributes, '%s not in %s' % (attribute,
    944                                                               attributes)
    945             subdirectory_name_parts.append(attributes[attribute])
    946         base_subdirectory_name = '.'.join(subdirectory_name_parts)
    947 
    948         subdirectory = self._unique_subdirectory(base_subdirectory_name)
    949         subdirectory_path = os.path.join(self.resultdir, subdirectory)
    950         os.mkdir(subdirectory_path)
    951 
    952         self.record(status_code, subdirectory, test_name,
    953                     status=reason, optional_fields={'is_summary': True})
    954 
    955         if attributes:
    956             utils.write_keyval(subdirectory_path, attributes)
    957 
    958         if child_test_ids:
    959             ids_string = ','.join(str(test_id) for test_id in child_test_ids)
    960             summary_data = {'child_test_ids': ids_string}
    961             utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
    962                                summary_data)
    963 
    964 
    965     def disable_warnings(self, warning_type):
    966         self.warning_manager.disable_warnings(warning_type)
    967         self.record("INFO", None, None,
    968                     "disabling %s warnings" % warning_type,
    969                     {"warnings.disable": warning_type})
    970 
    971 
    972     def enable_warnings(self, warning_type):
    973         self.warning_manager.enable_warnings(warning_type)
    974         self.record("INFO", None, None,
    975                     "enabling %s warnings" % warning_type,
    976                     {"warnings.enable": warning_type})
    977 
    978 
    979     def get_status_log_path(self, subdir=None):
    980         """Return the path to the job status log.
    981 
    982         @param subdir - Optional paramter indicating that you want the path
    983             to a subdirectory status log.
    984 
    985         @returns The path where the status log should be.
    986         """
    987         if self.resultdir:
    988             if subdir:
    989                 return os.path.join(self.resultdir, subdir, "status.log")
    990             else:
    991                 return os.path.join(self.resultdir, "status.log")
    992         else:
    993             return None
    994 
    995 
    996     def _update_uncollected_logs_list(self, update_func):
    997         """Updates the uncollected logs list in a multi-process safe manner.
    998 
    999         @param update_func - a function that updates the list of uncollected
   1000             logs. Should take one parameter, the list to be updated.
   1001         """
   1002         # Skip log collection if file _uncollected_log_file does not exist.
   1003         if not (self._uncollected_log_file and
   1004                 os.path.exists(self._uncollected_log_file)):
   1005             return
   1006         if self._uncollected_log_file:
   1007             log_file = open(self._uncollected_log_file, "r+")
   1008             fcntl.flock(log_file, fcntl.LOCK_EX)
   1009         try:
   1010             uncollected_logs = pickle.load(log_file)
   1011             update_func(uncollected_logs)
   1012             log_file.seek(0)
   1013             log_file.truncate()
   1014             pickle.dump(uncollected_logs, log_file)
   1015             log_file.flush()
   1016         finally:
   1017             fcntl.flock(log_file, fcntl.LOCK_UN)
   1018             log_file.close()
   1019 
   1020 
   1021     def add_client_log(self, hostname, remote_path, local_path):
   1022         """Adds a new set of client logs to the list of uncollected logs,
   1023         to allow for future log recovery.
   1024 
   1025         @param host - the hostname of the machine holding the logs
   1026         @param remote_path - the directory on the remote machine holding logs
   1027         @param local_path - the local directory to copy the logs into
   1028         """
   1029         def update_func(logs_list):
   1030             logs_list.append((hostname, remote_path, local_path))
   1031         self._update_uncollected_logs_list(update_func)
   1032 
   1033 
   1034     def remove_client_log(self, hostname, remote_path, local_path):
   1035         """Removes a set of client logs from the list of uncollected logs,
   1036         to allow for future log recovery.
   1037 
   1038         @param host - the hostname of the machine holding the logs
   1039         @param remote_path - the directory on the remote machine holding logs
   1040         @param local_path - the local directory to copy the logs into
   1041         """
   1042         def update_func(logs_list):
   1043             logs_list.remove((hostname, remote_path, local_path))
   1044         self._update_uncollected_logs_list(update_func)
   1045 
   1046 
   1047     def get_client_logs(self):
   1048         """Retrieves the list of uncollected logs, if it exists.
   1049 
   1050         @returns A list of (host, remote_path, local_path) tuples. Returns
   1051                  an empty list if no uncollected logs file exists.
   1052         """
   1053         log_exists = (self._uncollected_log_file and
   1054                       os.path.exists(self._uncollected_log_file))
   1055         if log_exists:
   1056             return pickle.load(open(self._uncollected_log_file))
   1057         else:
   1058             return []
   1059 
   1060 
   1061     def _fill_server_control_namespace(self, namespace, protect=True):
   1062         """
   1063         Prepare a namespace to be used when executing server control files.
   1064 
   1065         This sets up the control file API by importing modules and making them
   1066         available under the appropriate names within namespace.
   1067 
   1068         For use by _execute_code().
   1069 
   1070         Args:
   1071           namespace: The namespace dictionary to fill in.
   1072           protect: Boolean.  If True (the default) any operation that would
   1073               clobber an existing entry in namespace will cause an error.
   1074         Raises:
   1075           error.AutoservError: When a name would be clobbered by import.
   1076         """
   1077         def _import_names(module_name, names=()):
   1078             """
   1079             Import a module and assign named attributes into namespace.
   1080 
   1081             Args:
   1082                 module_name: The string module name.
   1083                 names: A limiting list of names to import from module_name.  If
   1084                     empty (the default), all names are imported from the module
   1085                     similar to a "from foo.bar import *" statement.
   1086             Raises:
   1087                 error.AutoservError: When a name being imported would clobber
   1088                     a name already in namespace.
   1089             """
   1090             module = __import__(module_name, {}, {}, names)
   1091 
   1092             # No names supplied?  Import * from the lowest level module.
   1093             # (Ugh, why do I have to implement this part myself?)
   1094             if not names:
   1095                 for submodule_name in module_name.split('.')[1:]:
   1096                     module = getattr(module, submodule_name)
   1097                 if hasattr(module, '__all__'):
   1098                     names = getattr(module, '__all__')
   1099                 else:
   1100                     names = dir(module)
   1101 
   1102             # Install each name into namespace, checking to make sure it
   1103             # doesn't override anything that already exists.
   1104             for name in names:
   1105                 # Check for conflicts to help prevent future problems.
   1106                 if name in namespace and protect:
   1107                     if namespace[name] is not getattr(module, name):
   1108                         raise error.AutoservError('importing name '
   1109                                 '%s from %s %r would override %r' %
   1110                                 (name, module_name, getattr(module, name),
   1111                                  namespace[name]))
   1112                     else:
   1113                         # Encourage cleanliness and the use of __all__ for a
   1114                         # more concrete API with less surprises on '*' imports.
   1115                         warnings.warn('%s (%r) being imported from %s for use '
   1116                                       'in server control files is not the '
   1117                                       'first occurrance of that import.' %
   1118                                       (name, namespace[name], module_name))
   1119 
   1120                 namespace[name] = getattr(module, name)
   1121 
   1122 
   1123         # This is the equivalent of prepending a bunch of import statements to
   1124         # the front of the control script.
   1125         namespace.update(os=os, sys=sys, logging=logging)
   1126         _import_names('autotest_lib.server',
   1127                 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
   1128                  'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
   1129         _import_names('autotest_lib.server.subcommand',
   1130                       ('parallel', 'parallel_simple', 'subcommand'))
   1131         _import_names('autotest_lib.server.utils',
   1132                       ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
   1133         _import_names('autotest_lib.client.common_lib.error')
   1134         _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
   1135 
   1136         # Inject ourself as the job object into other classes within the API.
   1137         # (Yuck, this injection is a gross thing be part of a public API. -gps)
   1138         #
   1139         # XXX Base & SiteAutotest do not appear to use .job.  Who does?
   1140         namespace['autotest'].Autotest.job = self
   1141         # server.hosts.base_classes.Host uses .job.
   1142         namespace['hosts'].Host.job = self
   1143         namespace['hosts'].TestBed.job = self
   1144         namespace['hosts'].factory.ssh_user = self._ssh_user
   1145         namespace['hosts'].factory.ssh_port = self._ssh_port
   1146         namespace['hosts'].factory.ssh_pass = self._ssh_pass
   1147         namespace['hosts'].factory.ssh_verbosity_flag = (
   1148                 self._ssh_verbosity_flag)
   1149         namespace['hosts'].factory.ssh_options = self._ssh_options
   1150 
   1151 
   1152     def _execute_code(self, code_file, namespace, protect=True):
   1153         """
   1154         Execute code using a copy of namespace as a server control script.
   1155 
   1156         Unless protect_namespace is explicitly set to False, the dict will not
   1157         be modified.
   1158 
   1159         Args:
   1160           code_file: The filename of the control file to execute.
   1161           namespace: A dict containing names to make available during execution.
   1162           protect: Boolean.  If True (the default) a copy of the namespace dict
   1163               is used during execution to prevent the code from modifying its
   1164               contents outside of this function.  If False the raw dict is
   1165               passed in and modifications will be allowed.
   1166         """
   1167         if protect:
   1168             namespace = namespace.copy()
   1169         self._fill_server_control_namespace(namespace, protect=protect)
   1170         # TODO: Simplify and get rid of the special cases for only 1 machine.
   1171         if len(self.machines) > 1:
   1172             machines_text = '\n'.join(self.machines) + '\n'
   1173             # Only rewrite the file if it does not match our machine list.
   1174             try:
   1175                 machines_f = open(MACHINES_FILENAME, 'r')
   1176                 existing_machines_text = machines_f.read()
   1177                 machines_f.close()
   1178             except EnvironmentError:
   1179                 existing_machines_text = None
   1180             if machines_text != existing_machines_text:
   1181                 utils.open_write_close(MACHINES_FILENAME, machines_text)
   1182         execfile(code_file, namespace, namespace)
   1183 
   1184 
   1185     def _parse_status(self, new_line):
   1186         if not self._using_parser:
   1187             return
   1188         new_tests = self.parser.process_lines([new_line])
   1189         for test in new_tests:
   1190             self.__insert_test(test)
   1191 
   1192 
   1193     def __insert_test(self, test):
   1194         """
   1195         An internal method to insert a new test result into the
   1196         database. This method will not raise an exception, even if an
   1197         error occurs during the insert, to avoid failing a test
   1198         simply because of unexpected database issues."""
   1199         self.num_tests_run += 1
   1200         if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
   1201             self.num_tests_failed += 1
   1202         try:
   1203             self.results_db.insert_test(self.job_model, test)
   1204         except Exception:
   1205             msg = ("WARNING: An unexpected error occured while "
   1206                    "inserting test results into the database. "
   1207                    "Ignoring error.\n" + traceback.format_exc())
   1208             print >> sys.stderr, msg
   1209 
   1210 
   1211     def preprocess_client_state(self):
   1212         """
   1213         Produce a state file for initializing the state of a client job.
   1214 
   1215         Creates a new client state file with all the current server state, as
   1216         well as some pre-set client state.
   1217 
   1218         @returns The path of the file the state was written into.
   1219         """
   1220         # initialize the sysinfo state
   1221         self._state.set('client', 'sysinfo', self.sysinfo.serialize())
   1222 
   1223         # dump the state out to a tempfile
   1224         fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
   1225         os.close(fd)
   1226 
   1227         # write_to_file doesn't need locking, we exclusively own file_path
   1228         self._state.write_to_file(file_path)
   1229         return file_path
   1230 
   1231 
   1232     def postprocess_client_state(self, state_path):
   1233         """
   1234         Update the state of this job with the state from a client job.
   1235 
   1236         Updates the state of the server side of a job with the final state
   1237         of a client job that was run. Updates the non-client-specific state,
   1238         pulls in some specific bits from the client-specific state, and then
   1239         discards the rest. Removes the state file afterwards
   1240 
   1241         @param state_file A path to the state file from the client.
   1242         """
   1243         # update the on-disk state
   1244         try:
   1245             self._state.read_from_file(state_path)
   1246             os.remove(state_path)
   1247         except OSError, e:
   1248             # ignore file-not-found errors
   1249             if e.errno != errno.ENOENT:
   1250                 raise
   1251             else:
   1252                 logging.debug('Client state file %s not found', state_path)
   1253 
   1254         # update the sysinfo state
   1255         if self._state.has('client', 'sysinfo'):
   1256             self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
   1257 
   1258         # drop all the client-specific state
   1259         self._state.discard_namespace('client')
   1260 
   1261 
   1262     def clear_all_known_hosts(self):
   1263         """Clears known hosts files for all AbstractSSHHosts."""
   1264         for host in self.hosts:
   1265             if isinstance(host, abstract_ssh.AbstractSSHHost):
   1266                 host.clear_known_hosts()
   1267 
   1268 
   1269 class warning_manager(object):
   1270     """Class for controlling warning logs. Manages the enabling and disabling
   1271     of warnings."""
   1272     def __init__(self):
   1273         # a map of warning types to a list of disabled time intervals
   1274         self.disabled_warnings = {}
   1275 
   1276 
   1277     def is_valid(self, timestamp, warning_type):
   1278         """Indicates if a warning (based on the time it occured and its type)
   1279         is a valid warning. A warning is considered "invalid" if this type of
   1280         warning was marked as "disabled" at the time the warning occured."""
   1281         disabled_intervals = self.disabled_warnings.get(warning_type, [])
   1282         for start, end in disabled_intervals:
   1283             if timestamp >= start and (end is None or timestamp < end):
   1284                 return False
   1285         return True
   1286 
   1287 
   1288     def disable_warnings(self, warning_type, current_time_func=time.time):
   1289         """As of now, disables all further warnings of this type."""
   1290         intervals = self.disabled_warnings.setdefault(warning_type, [])
   1291         if not intervals or intervals[-1][1] is not None:
   1292             intervals.append((int(current_time_func()), None))
   1293 
   1294 
   1295     def enable_warnings(self, warning_type, current_time_func=time.time):
   1296         """As of now, enables all further warnings of this type."""
   1297         intervals = self.disabled_warnings.get(warning_type, [])
   1298         if intervals and intervals[-1][1] is None:
   1299             intervals[-1] = (intervals[-1][0], int(current_time_func()))
   1300 
   1301 
   1302 # load up site-specific code for generating site-specific job data
   1303 get_site_job_data = utils.import_site_function(__file__,
   1304     "autotest_lib.server.site_server_job", "get_site_job_data",
   1305     _get_site_job_data_dummy)
   1306 
   1307 
   1308 site_server_job = utils.import_site_class(
   1309     __file__, "autotest_lib.server.site_server_job", "site_server_job",
   1310     base_server_job)
   1311 
   1312 
   1313 class server_job(site_server_job):
   1314     pass
   1315