Home | History | Annotate | Download | only in server
      1 # pylint: disable-msg=C0111
      2 
      3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 """
      7 The main job wrapper for the server side.
      8 
      9 This is the core infrastructure. Derived from the client side job.py
     10 
     11 Copyright Martin J. Bligh, Andy Whitcroft 2007
     12 """
     13 
     14 import errno
     15 import fcntl
     16 import getpass
     17 import itertools
     18 import logging
     19 import os
     20 import pickle
     21 import platform
     22 import re
     23 import select
     24 import shutil
     25 import sys
     26 import tempfile
     27 import time
     28 import traceback
     29 import warnings
     30 
     31 from autotest_lib.client.bin import sysinfo
     32 from autotest_lib.client.common_lib import base_job
     33 from autotest_lib.client.common_lib import error
     34 from autotest_lib.client.common_lib import global_config
     35 from autotest_lib.client.common_lib import logging_manager
     36 from autotest_lib.client.common_lib import packages
     37 from autotest_lib.client.common_lib import utils
     38 from autotest_lib.server import profilers
     39 from autotest_lib.server import subcommand
     40 from autotest_lib.server import test
     41 from autotest_lib.server import utils as server_utils
     42 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     43 from autotest_lib.server.hosts import abstract_ssh
     44 from autotest_lib.server.hosts import afe_store
     45 from autotest_lib.server.hosts import factory as host_factory
     46 from autotest_lib.server.hosts import host_info
     47 from autotest_lib.tko import db as tko_db
     48 from autotest_lib.tko import models as tko_models
     49 from autotest_lib.tko import status_lib
     50 from autotest_lib.tko import parser_lib
     51 from autotest_lib.tko import utils as tko_utils
     52 
     53 
     54 INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value(
     55         'autoserv', 'incremental_tko_parsing', type=bool, default=False)
     56 
     57 def _control_segment_path(name):
     58     """Get the pathname of the named control segment file."""
     59     server_dir = os.path.dirname(os.path.abspath(__file__))
     60     return os.path.join(server_dir, "control_segments", name)
     61 
     62 
     63 CLIENT_CONTROL_FILENAME = 'control'
     64 SERVER_CONTROL_FILENAME = 'control.srv'
     65 MACHINES_FILENAME = '.machines'
     66 
     67 CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
     68 CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
     69 CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
     70 INSTALL_CONTROL_FILE = _control_segment_path('install')
     71 CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
     72 VERIFY_CONTROL_FILE = _control_segment_path('verify')
     73 REPAIR_CONTROL_FILE = _control_segment_path('repair')
     74 PROVISION_CONTROL_FILE = _control_segment_path('provision')
     75 VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
     76 RESET_CONTROL_FILE = _control_segment_path('reset')
     77 GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
     78 
     79 # by default provide a stub that generates no site data
     80 def _get_site_job_data_dummy(job):
     81     return {}
     82 
     83 
     84 def get_machine_dicts(machine_names, in_lab, host_attributes=None):
     85     """Converts a list of machine names to list of dicts.
     86 
     87     @param machine_names: A list of machine names.
     88     @param in_lab: A boolean indicating whether we're running in lab.
     89     @param host_attributes: Optional list of host attributes to add for each
     90             host.
     91     @returns: A list of dicts. Each dict has the following keys:
     92             'hostname': Name of the machine originally in machine_names (str).
     93             'afe_host': A frontend.Host object for the machine, or a stub if
     94                     in_lab is false.
     95             'host_info_store': A host_info.CachingHostInfoStore object to obtain
     96                     host information. A stub if in_lab is False.
     97     """
     98     if host_attributes is None:
     99         host_attributes = dict()
    100     machine_dict_list = []
    101     for machine in machine_names:
    102         afe_host = _create_afe_host(machine, in_lab)
    103         afe_host.attributes.update(host_attributes)
    104         machine_dict_list.append({
    105                 'hostname' : machine,
    106                 'afe_host' : afe_host,
    107                 'host_info_store': _create_host_info_store(machine, in_lab),
    108         })
    109     return machine_dict_list
    110 
    111 
    112 class status_indenter(base_job.status_indenter):
    113     """Provide a simple integer-backed status indenter."""
    114     def __init__(self):
    115         self._indent = 0
    116 
    117 
    118     @property
    119     def indent(self):
    120         return self._indent
    121 
    122 
    123     def increment(self):
    124         self._indent += 1
    125 
    126 
    127     def decrement(self):
    128         self._indent -= 1
    129 
    130 
    131     def get_context(self):
    132         """Returns a context object for use by job.get_record_context."""
    133         class context(object):
    134             def __init__(self, indenter, indent):
    135                 self._indenter = indenter
    136                 self._indent = indent
    137             def restore(self):
    138                 self._indenter._indent = self._indent
    139         return context(self, self._indent)
    140 
    141 
    142 class server_job_record_hook(object):
    143     """The job.record hook for server job. Used to inject WARN messages from
    144     the console or vlm whenever new logs are written, and to echo any logs
    145     to INFO level logging. Implemented as a class so that it can use state to
    146     block recursive calls, so that the hook can call job.record itself to
    147     log WARN messages.
    148 
    149     Depends on job._read_warnings and job._logger.
    150     """
    151     def __init__(self, job):
    152         self._job = job
    153         self._being_called = False
    154 
    155 
    156     def __call__(self, entry):
    157         """A wrapper around the 'real' record hook, the _hook method, which
    158         prevents recursion. This isn't making any effort to be threadsafe,
    159         the intent is to outright block infinite recursion via a
    160         job.record->_hook->job.record->_hook->job.record... chain."""
    161         if self._being_called:
    162             return
    163         self._being_called = True
    164         try:
    165             self._hook(self._job, entry)
    166         finally:
    167             self._being_called = False
    168 
    169 
    170     @staticmethod
    171     def _hook(job, entry):
    172         """The core hook, which can safely call job.record."""
    173         entries = []
    174         # poll all our warning loggers for new warnings
    175         for timestamp, msg in job._read_warnings():
    176             warning_entry = base_job.status_log_entry(
    177                 'WARN', None, None, msg, {}, timestamp=timestamp)
    178             entries.append(warning_entry)
    179             job.record_entry(warning_entry)
    180         # echo rendered versions of all the status logs to info
    181         entries.append(entry)
    182         for entry in entries:
    183             rendered_entry = job._logger.render_entry(entry)
    184             logging.info(rendered_entry)
    185             job._parse_status(rendered_entry)
    186 
    187 
    188 class base_server_job(base_job.base_job):
    189     """The server-side concrete implementation of base_job.
    190 
    191     Optional properties provided by this implementation:
    192         serverdir
    193 
    194         num_tests_run
    195         num_tests_failed
    196 
    197         warning_manager
    198         warning_loggers
    199     """
    200 
    201     _STATUS_VERSION = 1
    202 
    203     # TODO crbug.com/285395 eliminate ssh_verbosity_flag
    204     def __init__(self, control, args, resultdir, label, user, machines,
    205                  client=False, parse_job='',
    206                  ssh_user=host_factory.DEFAULT_SSH_USER,
    207                  ssh_port=host_factory.DEFAULT_SSH_PORT,
    208                  ssh_pass=host_factory.DEFAULT_SSH_PASS,
    209                  ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
    210                  ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
    211                  test_retry=0, group_name='',
    212                  tag='', disable_sysinfo=False,
    213                  control_filename=SERVER_CONTROL_FILENAME,
    214                  parent_job_id=None, host_attributes=None, in_lab=False):
    215         """
    216         Create a server side job object.
    217 
    218         @param control: The pathname of the control file.
    219         @param args: Passed to the control file.
    220         @param resultdir: Where to throw the results.
    221         @param label: Description of the job.
    222         @param user: Username for the job (email address).
    223         @param client: True if this is a client-side control file.
    224         @param parse_job: string, if supplied it is the job execution tag that
    225                 the results will be passed through to the TKO parser with.
    226         @param ssh_user: The SSH username.  [root]
    227         @param ssh_port: The SSH port number.  [22]
    228         @param ssh_pass: The SSH passphrase, if needed.
    229         @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
    230                 '-vvv', or an empty string if not needed.
    231         @param ssh_options: A string giving additional options that will be
    232                             included in ssh commands.
    233         @param test_retry: The number of times to retry a test if the test did
    234                 not complete successfully.
    235         @param group_name: If supplied, this will be written out as
    236                 host_group_name in the keyvals file for the parser.
    237         @param tag: The job execution tag from the scheduler.  [optional]
    238         @param disable_sysinfo: Whether we should disable the sysinfo step of
    239                 tests for a modest shortening of test time.  [optional]
    240         @param control_filename: The filename where the server control file
    241                 should be written in the results directory.
    242         @param parent_job_id: Job ID of the parent job. Default to None if the
    243                 job does not have a parent job.
    244         @param host_attributes: Dict of host attributes passed into autoserv
    245                                 via the command line. If specified here, these
    246                                 attributes will apply to all machines.
    247         @param in_lab: Boolean that indicates if this is running in the lab
    248                        environment.
    249         """
    250         super(base_server_job, self).__init__(resultdir=resultdir,
    251                                               test_retry=test_retry)
    252         path = os.path.dirname(__file__)
    253         self.test_retry = test_retry
    254         self.control = control
    255         self._uncollected_log_file = os.path.join(self.resultdir,
    256                                                   'uncollected_logs')
    257         debugdir = os.path.join(self.resultdir, 'debug')
    258         if not os.path.exists(debugdir):
    259             os.mkdir(debugdir)
    260 
    261         if user:
    262             self.user = user
    263         else:
    264             self.user = getpass.getuser()
    265 
    266         self.args = args
    267         self.label = label
    268         self.machines = machines
    269         self._client = client
    270         self.warning_loggers = set()
    271         self.warning_manager = warning_manager()
    272         self._ssh_user = ssh_user
    273         self._ssh_port = ssh_port
    274         self._ssh_pass = ssh_pass
    275         self._ssh_verbosity_flag = ssh_verbosity_flag
    276         self._ssh_options = ssh_options
    277         self.tag = tag
    278         self.hosts = set()
    279         self.drop_caches = False
    280         self.drop_caches_between_iterations = False
    281         self._control_filename = control_filename
    282         self._disable_sysinfo = disable_sysinfo
    283 
    284         self.logging = logging_manager.get_logging_manager(
    285                 manage_stdout_and_stderr=True, redirect_fds=True)
    286         subcommand.logging_manager_object = self.logging
    287 
    288         self.sysinfo = sysinfo.sysinfo(self.resultdir)
    289         self.profilers = profilers.profilers(self)
    290 
    291         job_data = {'label' : label, 'user' : user,
    292                     'hostname' : ','.join(machines),
    293                     'drone' : platform.node(),
    294                     'status_version' : str(self._STATUS_VERSION),
    295                     'job_started' : str(int(time.time()))}
    296         # Save parent job id to keyvals, so parser can retrieve the info and
    297         # write to tko_jobs record.
    298         if parent_job_id:
    299             job_data['parent_job_id'] = parent_job_id
    300         if group_name:
    301             job_data['host_group_name'] = group_name
    302 
    303         # only write these keyvals out on the first job in a resultdir
    304         if 'job_started' not in utils.read_keyval(self.resultdir):
    305             job_data.update(get_site_job_data(self))
    306             utils.write_keyval(self.resultdir, job_data)
    307 
    308         self._parse_job = parse_job
    309         self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job
    310                               and len(machines) <= 1)
    311         self.pkgmgr = packages.PackageManager(
    312             self.autodir, run_function_dargs={'timeout':600})
    313         self.num_tests_run = 0
    314         self.num_tests_failed = 0
    315 
    316         self._register_subcommand_hooks()
    317 
    318         # set up the status logger
    319         self._indenter = status_indenter()
    320         self._logger = base_job.status_logger(
    321             self, self._indenter, 'status.log', 'status.log',
    322             record_hook=server_job_record_hook(self))
    323 
    324         # Initialize a flag to indicate DUT failure during the test, e.g.,
    325         # unexpected reboot.
    326         self.failed_with_device_error = False
    327 
    328         self.parent_job_id = parent_job_id
    329         self.in_lab = in_lab
    330         self.machine_dict_list = get_machine_dicts(
    331                 self.machines, self.in_lab, host_attributes)
    332 
    333         # TODO(jrbarnette) The harness attribute is only relevant to
    334         # client jobs, but it's required to be present, or we will fail
    335         # server job unit tests.  Yes, really.
    336         #
    337         # TODO(jrbarnette) The utility of the 'harness' attribute even
    338         # to client jobs is suspect.  Probably, we should remove it.
    339         self.harness = None
    340 
    341 
    342     @classmethod
    343     def _find_base_directories(cls):
    344         """
    345         Determine locations of autodir, clientdir and serverdir. Assumes
    346         that this file is located within serverdir and uses __file__ along
    347         with relative paths to resolve the location.
    348         """
    349         serverdir = os.path.abspath(os.path.dirname(__file__))
    350         autodir = os.path.normpath(os.path.join(serverdir, '..'))
    351         clientdir = os.path.join(autodir, 'client')
    352         return autodir, clientdir, serverdir
    353 
    354 
    355     def _find_resultdir(self, resultdir, *args, **dargs):
    356         """
    357         Determine the location of resultdir. For server jobs we expect one to
    358         always be explicitly passed in to __init__, so just return that.
    359         """
    360         if resultdir:
    361             return os.path.normpath(resultdir)
    362         else:
    363             return None
    364 
    365 
    366     def _get_status_logger(self):
    367         """Return a reference to the status logger."""
    368         return self._logger
    369 
    370 
    371     @staticmethod
    372     def _load_control_file(path):
    373         f = open(path)
    374         try:
    375             control_file = f.read()
    376         finally:
    377             f.close()
    378         return re.sub('\r', '', control_file)
    379 
    380 
    381     def _register_subcommand_hooks(self):
    382         """
    383         Register some hooks into the subcommand modules that allow us
    384         to properly clean up self.hosts created in forked subprocesses.
    385         """
    386         def on_fork(cmd):
    387             self._existing_hosts_on_fork = set(self.hosts)
    388         def on_join(cmd):
    389             new_hosts = self.hosts - self._existing_hosts_on_fork
    390             for host in new_hosts:
    391                 host.close()
    392         subcommand.subcommand.register_fork_hook(on_fork)
    393         subcommand.subcommand.register_join_hook(on_join)
    394 
    395 
    396     def init_parser(self):
    397         """
    398         Start the continuous parsing of self.resultdir. This sets up
    399         the database connection and inserts the basic job object into
    400         the database if necessary.
    401         """
    402         if not self._using_parser:
    403             return
    404         # redirect parser debugging to .parse.log
    405         parse_log = os.path.join(self.resultdir, '.parse.log')
    406         parse_log = open(parse_log, 'w', 0)
    407         tko_utils.redirect_parser_debugging(parse_log)
    408         # create a job model object and set up the db
    409         self.results_db = tko_db.db(autocommit=True)
    410         self.parser = parser_lib.parser(self._STATUS_VERSION)
    411         self.job_model = self.parser.make_job(self.resultdir)
    412         self.parser.start(self.job_model)
    413         # check if a job already exists in the db and insert it if
    414         # it does not
    415         job_idx = self.results_db.find_job(self._parse_job)
    416         if job_idx is None:
    417             self.results_db.insert_job(self._parse_job, self.job_model,
    418                                        self.parent_job_id)
    419         else:
    420             machine_idx = self.results_db.lookup_machine(self.job_model.machine)
    421             self.job_model.index = job_idx
    422             self.job_model.machine_idx = machine_idx
    423 
    424 
    425     def cleanup_parser(self):
    426         """
    427         This should be called after the server job is finished
    428         to carry out any remaining cleanup (e.g. flushing any
    429         remaining test results to the results db)
    430         """
    431         if not self._using_parser:
    432             return
    433         final_tests = self.parser.end()
    434         for test in final_tests:
    435             self.__insert_test(test)
    436         self._using_parser = False
    437 
    438     # TODO crbug.com/285395 add a kwargs parameter.
    439     def _make_namespace(self):
    440         """Create a namespace dictionary to be passed along to control file.
    441 
    442         Creates a namespace argument populated with standard values:
    443         machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
    444         and ssh_options.
    445         """
    446         namespace = {'machines' : self.machine_dict_list,
    447                      'job' : self,
    448                      'ssh_user' : self._ssh_user,
    449                      'ssh_port' : self._ssh_port,
    450                      'ssh_pass' : self._ssh_pass,
    451                      'ssh_verbosity_flag' : self._ssh_verbosity_flag,
    452                      'ssh_options' : self._ssh_options}
    453         return namespace
    454 
    455 
    456     def cleanup(self, labels):
    457         """Cleanup machines.
    458 
    459         @param labels: Comma separated job labels, will be used to
    460                        determine special task actions.
    461         """
    462         if not self.machines:
    463             raise error.AutoservError('No machines specified to cleanup')
    464         if self.resultdir:
    465             os.chdir(self.resultdir)
    466 
    467         namespace = self._make_namespace()
    468         namespace.update({'job_labels': labels, 'args': ''})
    469         self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
    470 
    471 
    472     def verify(self, labels):
    473         """Verify machines are all ssh-able.
    474 
    475         @param labels: Comma separated job labels, will be used to
    476                        determine special task actions.
    477         """
    478         if not self.machines:
    479             raise error.AutoservError('No machines specified to verify')
    480         if self.resultdir:
    481             os.chdir(self.resultdir)
    482 
    483         namespace = self._make_namespace()
    484         namespace.update({'job_labels': labels, 'args': ''})
    485         self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
    486 
    487 
    488     def reset(self, labels):
    489         """Reset machines by first cleanup then verify each machine.
    490 
    491         @param labels: Comma separated job labels, will be used to
    492                        determine special task actions.
    493         """
    494         if not self.machines:
    495             raise error.AutoservError('No machines specified to reset.')
    496         if self.resultdir:
    497             os.chdir(self.resultdir)
    498 
    499         namespace = self._make_namespace()
    500         namespace.update({'job_labels': labels, 'args': ''})
    501         self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
    502 
    503 
    504     def repair(self, labels):
    505         """Repair machines.
    506 
    507         @param labels: Comma separated job labels, will be used to
    508                        determine special task actions.
    509         """
    510         if not self.machines:
    511             raise error.AutoservError('No machines specified to repair')
    512         if self.resultdir:
    513             os.chdir(self.resultdir)
    514 
    515         namespace = self._make_namespace()
    516         namespace.update({'job_labels': labels, 'args': ''})
    517         self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
    518 
    519 
    520     def provision(self, labels):
    521         """
    522         Provision all hosts to match |labels|.
    523 
    524         @param labels: A comma seperated string of labels to provision the
    525                        host to.
    526 
    527         """
    528         control = self._load_control_file(PROVISION_CONTROL_FILE)
    529         self.run(control=control, job_labels=labels)
    530 
    531 
    532     def precheck(self):
    533         """
    534         perform any additional checks in derived classes.
    535         """
    536         pass
    537 
    538 
    539     def enable_external_logging(self):
    540         """
    541         Start or restart external logging mechanism.
    542         """
    543         pass
    544 
    545 
    546     def disable_external_logging(self):
    547         """
    548         Pause or stop external logging mechanism.
    549         """
    550         pass
    551 
    552 
    553     def use_external_logging(self):
    554         """
    555         Return True if external logging should be used.
    556         """
    557         return False
    558 
    559 
    560     def _make_parallel_wrapper(self, function, machines, log):
    561         """Wrap function as appropriate for calling by parallel_simple."""
    562         # machines could be a list of dictionaries, e.g.,
    563         # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
    564         # The dictionary is generated in base_server_job.__init__, refer to
    565         # variable machine_dict_list, then passed in with namespace, see method
    566         # base_server_job._make_namespace.
    567         # To compare the machinese to self.machines, which is a list of machine
    568         # hostname, we need to convert machines back to a list of hostnames.
    569         # Note that the order of hostnames doesn't matter, as is_forking will be
    570         # True if there are more than one machine.
    571         if (machines and isinstance(machines, list)
    572             and isinstance(machines[0], dict)):
    573             machines = [m['hostname'] for m in machines]
    574         is_forking = not (len(machines) == 1 and self.machines == machines)
    575         if self._parse_job and is_forking and log:
    576             def wrapper(machine):
    577                 hostname = server_utils.get_hostname_from_machine(machine)
    578                 self._parse_job += "/" + hostname
    579                 self._using_parser = INCREMENTAL_TKO_PARSING
    580                 self.machines = [machine]
    581                 self.push_execution_context(hostname)
    582                 os.chdir(self.resultdir)
    583                 utils.write_keyval(self.resultdir, {"hostname": hostname})
    584                 self.init_parser()
    585                 result = function(machine)
    586                 self.cleanup_parser()
    587                 return result
    588         elif len(machines) > 1 and log:
    589             def wrapper(machine):
    590                 hostname = server_utils.get_hostname_from_machine(machine)
    591                 self.push_execution_context(hostname)
    592                 os.chdir(self.resultdir)
    593                 machine_data = {'hostname' : hostname,
    594                                 'status_version' : str(self._STATUS_VERSION)}
    595                 utils.write_keyval(self.resultdir, machine_data)
    596                 result = function(machine)
    597                 return result
    598         else:
    599             wrapper = function
    600         return wrapper
    601 
    602 
    603     def parallel_simple(self, function, machines, log=True, timeout=None,
    604                         return_results=False):
    605         """
    606         Run 'function' using parallel_simple, with an extra wrapper to handle
    607         the necessary setup for continuous parsing, if possible. If continuous
    608         parsing is already properly initialized then this should just work.
    609 
    610         @param function: A callable to run in parallel given each machine.
    611         @param machines: A list of machine names to be passed one per subcommand
    612                 invocation of function.
    613         @param log: If True, output will be written to output in a subdirectory
    614                 named after each machine.
    615         @param timeout: Seconds after which the function call should timeout.
    616         @param return_results: If True instead of an AutoServError being raised
    617                 on any error a list of the results|exceptions from the function
    618                 called on each arg is returned.  [default: False]
    619 
    620         @raises error.AutotestError: If any of the functions failed.
    621         """
    622         wrapper = self._make_parallel_wrapper(function, machines, log)
    623         return subcommand.parallel_simple(wrapper, machines,
    624                                           log=log, timeout=timeout,
    625                                           return_results=return_results)
    626 
    627 
    628     def parallel_on_machines(self, function, machines, timeout=None):
    629         """
    630         @param function: Called in parallel with one machine as its argument.
    631         @param machines: A list of machines to call function(machine) on.
    632         @param timeout: Seconds after which the function call should timeout.
    633 
    634         @returns A list of machines on which function(machine) returned
    635                 without raising an exception.
    636         """
    637         results = self.parallel_simple(function, machines, timeout=timeout,
    638                                        return_results=True)
    639         success_machines = []
    640         for result, machine in itertools.izip(results, machines):
    641             if not isinstance(result, Exception):
    642                 success_machines.append(machine)
    643         return success_machines
    644 
    645 
    646     def _has_failed_tests(self):
    647         """Parse status log for failed tests.
    648 
    649         This checks the current working directory and is intended only for use
    650         by the run() method.
    651 
    652         @return boolean
    653         """
    654         path = os.getcwd()
    655 
    656         # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
    657         # make code reuse plausible.
    658         job_keyval = tko_models.job.read_keyval(path)
    659         status_version = job_keyval.get("status_version", 0)
    660 
    661         # parse out the job
    662         parser = parser_lib.parser(status_version)
    663         job = parser.make_job(path)
    664         status_log = os.path.join(path, "status.log")
    665         if not os.path.exists(status_log):
    666             status_log = os.path.join(path, "status")
    667         if not os.path.exists(status_log):
    668             logging.warning("! Unable to parse job, no status file")
    669             return True
    670 
    671         # parse the status logs
    672         status_lines = open(status_log).readlines()
    673         parser.start(job)
    674         tests = parser.end(status_lines)
    675 
    676         # parser.end can return the same object multiple times, so filter out
    677         # dups
    678         job.tests = []
    679         already_added = set()
    680         for test in tests:
    681             if test not in already_added:
    682                 already_added.add(test)
    683                 job.tests.append(test)
    684 
    685         failed = False
    686         for test in job.tests:
    687             # The current job is still running and shouldn't count as failed.
    688             # The parser will fail to parse the exit status of the job since it
    689             # hasn't exited yet (this running right now is the job).
    690             failed = failed or (test.status != 'GOOD'
    691                                 and not _is_current_server_job(test))
    692         return failed
    693 
    694 
    695     def _collect_crashes(self, namespace, collect_crashinfo):
    696         """Collect crashes.
    697 
    698         @param namespace: namespace dict.
    699         @param collect_crashinfo: whether to collect crashinfo in addition to
    700                 dumps
    701         """
    702         if collect_crashinfo:
    703             # includes crashdumps
    704             crash_control_file = CRASHINFO_CONTROL_FILE
    705         else:
    706             crash_control_file = CRASHDUMPS_CONTROL_FILE
    707         self._execute_code(crash_control_file, namespace)
    708 
    709 
    710     _USE_TEMP_DIR = object()
    711     def run(self, install_before=False, install_after=False,
    712             collect_crashdumps=True, namespace={}, control=None,
    713             control_file_dir=None, verify_job_repo_url=False,
    714             only_collect_crashinfo=False, skip_crash_collection=False,
    715             job_labels='', use_packaging=True):
    716         # for a normal job, make sure the uncollected logs file exists
    717         # for a crashinfo-only run it should already exist, bail out otherwise
    718         created_uncollected_logs = False
    719         logging.info("I am PID %s", os.getpid())
    720         if self.resultdir and not os.path.exists(self._uncollected_log_file):
    721             if only_collect_crashinfo:
    722                 # if this is a crashinfo-only run, and there were no existing
    723                 # uncollected logs, just bail out early
    724                 logging.info("No existing uncollected logs, "
    725                              "skipping crashinfo collection")
    726                 return
    727             else:
    728                 log_file = open(self._uncollected_log_file, "w")
    729                 pickle.dump([], log_file)
    730                 log_file.close()
    731                 created_uncollected_logs = True
    732 
    733         # use a copy so changes don't affect the original dictionary
    734         namespace = namespace.copy()
    735         machines = self.machines
    736         if control is None:
    737             if self.control is None:
    738                 control = ''
    739             else:
    740                 control = self._load_control_file(self.control)
    741         if control_file_dir is None:
    742             control_file_dir = self.resultdir
    743 
    744         self.aborted = False
    745         namespace.update(self._make_namespace())
    746         namespace.update({'args' : self.args,
    747                           'job_labels' : job_labels})
    748         test_start_time = int(time.time())
    749 
    750         if self.resultdir:
    751             os.chdir(self.resultdir)
    752             # touch status.log so that the parser knows a job is running here
    753             open(self.get_status_log_path(), 'a').close()
    754             self.enable_external_logging()
    755 
    756         collect_crashinfo = True
    757         temp_control_file_dir = None
    758         try:
    759             try:
    760                 namespace['network_stats_label'] = 'at-start'
    761                 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace)
    762                 if install_before and machines:
    763                     self._execute_code(INSTALL_CONTROL_FILE, namespace)
    764 
    765                 if only_collect_crashinfo:
    766                     return
    767 
    768                 # If the verify_job_repo_url option is set but we're unable
    769                 # to actually verify that the job_repo_url contains the autotest
    770                 # package, this job will fail.
    771                 if verify_job_repo_url:
    772                     self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
    773                                        namespace)
    774                 else:
    775                     logging.warning('Not checking if job_repo_url contains '
    776                                     'autotest packages on %s', machines)
    777 
    778                 # determine the dir to write the control files to
    779                 cfd_specified = (control_file_dir
    780                                  and control_file_dir is not self._USE_TEMP_DIR)
    781                 if cfd_specified:
    782                     temp_control_file_dir = None
    783                 else:
    784                     temp_control_file_dir = tempfile.mkdtemp(
    785                         suffix='temp_control_file_dir')
    786                     control_file_dir = temp_control_file_dir
    787                 server_control_file = os.path.join(control_file_dir,
    788                                                    self._control_filename)
    789                 client_control_file = os.path.join(control_file_dir,
    790                                                    CLIENT_CONTROL_FILENAME)
    791                 if self._client:
    792                     namespace['control'] = control
    793                     utils.open_write_close(client_control_file, control)
    794                     shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
    795                                     server_control_file)
    796                 else:
    797                     utils.open_write_close(server_control_file, control)
    798 
    799                 logging.info("Processing control file")
    800                 namespace['use_packaging'] = use_packaging
    801                 self._execute_code(server_control_file, namespace)
    802                 logging.info("Finished processing control file")
    803 
    804                 # If no device error occured, no need to collect crashinfo.
    805                 collect_crashinfo = self.failed_with_device_error
    806             except Exception, e:
    807                 try:
    808                     logging.exception(
    809                             'Exception escaped control file, job aborting:')
    810                     reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
    811                                     ' ', str(e))
    812                     self.record('INFO', None, None, str(e),
    813                                 {'job_abort_reason': reason})
    814                 except:
    815                     pass # don't let logging exceptions here interfere
    816                 raise
    817         finally:
    818             if temp_control_file_dir:
    819                 # Clean up temp directory used for copies of the control files
    820                 try:
    821                     shutil.rmtree(temp_control_file_dir)
    822                 except Exception, e:
    823                     logging.warning('Could not remove temp directory %s: %s',
    824                                  temp_control_file_dir, e)
    825 
    826             if machines and (collect_crashdumps or collect_crashinfo):
    827                 if skip_crash_collection:
    828                     logging.info('Skipping crash dump/info collection '
    829                                  'as requested.')
    830                 else:
    831                     namespace['test_start_time'] = test_start_time
    832                     # Remove crash files for passing tests.
    833                     # TODO(ayatane): Tests that create crash files should be
    834                     # reported.
    835                     namespace['has_failed_tests'] = self._has_failed_tests()
    836                     self._collect_crashes(namespace, collect_crashinfo)
    837             self.disable_external_logging()
    838             if self._uncollected_log_file and created_uncollected_logs:
    839                 os.remove(self._uncollected_log_file)
    840             if install_after and machines:
    841                 self._execute_code(INSTALL_CONTROL_FILE, namespace)
    842             namespace['network_stats_label'] = 'at-end'
    843             self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, namespace)
    844 
    845 
    846     def run_test(self, url, *args, **dargs):
    847         """
    848         Summon a test object and run it.
    849 
    850         tag
    851                 tag to add to testname
    852         url
    853                 url of the test to run
    854         """
    855         if self._disable_sysinfo:
    856             dargs['disable_sysinfo'] = True
    857 
    858         group, testname = self.pkgmgr.get_package_name(url, 'test')
    859         testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
    860         outputdir = self._make_test_outputdir(subdir)
    861 
    862         def group_func():
    863             try:
    864                 test.runtest(self, url, tag, args, dargs)
    865             except error.TestBaseException, e:
    866                 self.record(e.exit_status, subdir, testname, str(e))
    867                 raise
    868             except Exception, e:
    869                 info = str(e) + "\n" + traceback.format_exc()
    870                 self.record('FAIL', subdir, testname, info)
    871                 raise
    872             else:
    873                 self.record('GOOD', subdir, testname, 'completed successfully')
    874 
    875         result, exc_info = self._run_group(testname, subdir, group_func)
    876         if exc_info and isinstance(exc_info[1], error.TestBaseException):
    877             return False
    878         elif exc_info:
    879             raise exc_info[0], exc_info[1], exc_info[2]
    880         else:
    881             return True
    882 
    883 
    884     def _run_group(self, name, subdir, function, *args, **dargs):
    885         """\
    886         Underlying method for running something inside of a group.
    887         """
    888         result, exc_info = None, None
    889         try:
    890             self.record('START', subdir, name)
    891             result = function(*args, **dargs)
    892         except error.TestBaseException, e:
    893             self.record("END %s" % e.exit_status, subdir, name)
    894             exc_info = sys.exc_info()
    895         except Exception, e:
    896             err_msg = str(e) + '\n'
    897             err_msg += traceback.format_exc()
    898             self.record('END ABORT', subdir, name, err_msg)
    899             raise error.JobError(name + ' failed\n' + traceback.format_exc())
    900         else:
    901             self.record('END GOOD', subdir, name)
    902 
    903         return result, exc_info
    904 
    905 
    906     def run_group(self, function, *args, **dargs):
    907         """\
    908         function:
    909                 subroutine to run
    910         *args:
    911                 arguments for the function
    912         """
    913 
    914         name = function.__name__
    915 
    916         # Allow the tag for the group to be specified.
    917         tag = dargs.pop('tag', None)
    918         if tag:
    919             name = tag
    920 
    921         return self._run_group(name, None, function, *args, **dargs)[0]
    922 
    923 
    924     def run_op(self, op, op_func, get_kernel_func):
    925         """\
    926         A specialization of run_group meant specifically for handling
    927         management operation. Includes support for capturing the kernel version
    928         after the operation.
    929 
    930         Args:
    931            op: name of the operation.
    932            op_func: a function that carries out the operation (reboot, suspend)
    933            get_kernel_func: a function that returns a string
    934                             representing the kernel version.
    935         """
    936         try:
    937             self.record('START', None, op)
    938             op_func()
    939         except Exception, e:
    940             err_msg = str(e) + '\n' + traceback.format_exc()
    941             self.record('END FAIL', None, op, err_msg)
    942             raise
    943         else:
    944             kernel = get_kernel_func()
    945             self.record('END GOOD', None, op,
    946                         optional_fields={"kernel": kernel})
    947 
    948 
    949     def run_control(self, path):
    950         """Execute a control file found at path (relative to the autotest
    951         path). Intended for executing a control file within a control file,
    952         not for running the top-level job control file."""
    953         path = os.path.join(self.autodir, path)
    954         control_file = self._load_control_file(path)
    955         self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
    956 
    957 
    958     def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
    959         self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
    960                                    on_every_test)
    961 
    962 
    963     def add_sysinfo_logfile(self, file, on_every_test=False):
    964         self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
    965 
    966 
    967     def _add_sysinfo_loggable(self, loggable, on_every_test):
    968         if on_every_test:
    969             self.sysinfo.test_loggables.add(loggable)
    970         else:
    971             self.sysinfo.boot_loggables.add(loggable)
    972 
    973 
    974     def _read_warnings(self):
    975         """Poll all the warning loggers and extract any new warnings that have
    976         been logged. If the warnings belong to a category that is currently
    977         disabled, this method will discard them and they will no longer be
    978         retrievable.
    979 
    980         Returns a list of (timestamp, message) tuples, where timestamp is an
    981         integer epoch timestamp."""
    982         warnings = []
    983         while True:
    984             # pull in a line of output from every logger that has
    985             # output ready to be read
    986             loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
    987             closed_loggers = set()
    988             for logger in loggers:
    989                 line = logger.readline()
    990                 # record any broken pipes (aka line == empty)
    991                 if len(line) == 0:
    992                     closed_loggers.add(logger)
    993                     continue
    994                 # parse out the warning
    995                 timestamp, msgtype, msg = line.split('\t', 2)
    996                 timestamp = int(timestamp)
    997                 # if the warning is valid, add it to the results
    998                 if self.warning_manager.is_valid(timestamp, msgtype):
    999                     warnings.append((timestamp, msg.strip()))
   1000 
   1001             # stop listening to loggers that are closed
   1002             self.warning_loggers -= closed_loggers
   1003 
   1004             # stop if none of the loggers have any output left
   1005             if not loggers:
   1006                 break
   1007 
   1008         # sort into timestamp order
   1009         warnings.sort()
   1010         return warnings
   1011 
   1012 
   1013     def _unique_subdirectory(self, base_subdirectory_name):
   1014         """Compute a unique results subdirectory based on the given name.
   1015 
   1016         Appends base_subdirectory_name with a number as necessary to find a
   1017         directory name that doesn't already exist.
   1018         """
   1019         subdirectory = base_subdirectory_name
   1020         counter = 1
   1021         while os.path.exists(os.path.join(self.resultdir, subdirectory)):
   1022             subdirectory = base_subdirectory_name + '.' + str(counter)
   1023             counter += 1
   1024         return subdirectory
   1025 
   1026 
   1027     def get_record_context(self):
   1028         """Returns an object representing the current job.record context.
   1029 
   1030         The object returned is an opaque object with a 0-arg restore method
   1031         which can be called to restore the job.record context (i.e. indentation)
   1032         to the current level. The intention is that it should be used when
   1033         something external which generate job.record calls (e.g. an autotest
   1034         client) can fail catastrophically and the server job record state
   1035         needs to be reset to its original "known good" state.
   1036 
   1037         @return: A context object with a 0-arg restore() method."""
   1038         return self._indenter.get_context()
   1039 
   1040 
   1041     def record_summary(self, status_code, test_name, reason='', attributes=None,
   1042                        distinguishing_attributes=(), child_test_ids=None):
   1043         """Record a summary test result.
   1044 
   1045         @param status_code: status code string, see
   1046                 common_lib.log.is_valid_status()
   1047         @param test_name: name of the test
   1048         @param reason: (optional) string providing detailed reason for test
   1049                 outcome
   1050         @param attributes: (optional) dict of string keyvals to associate with
   1051                 this result
   1052         @param distinguishing_attributes: (optional) list of attribute names
   1053                 that should be used to distinguish identically-named test
   1054                 results.  These attributes should be present in the attributes
   1055                 parameter.  This is used to generate user-friendly subdirectory
   1056                 names.
   1057         @param child_test_ids: (optional) list of test indices for test results
   1058                 used in generating this result.
   1059         """
   1060         subdirectory_name_parts = [test_name]
   1061         for attribute in distinguishing_attributes:
   1062             assert attributes
   1063             assert attribute in attributes, '%s not in %s' % (attribute,
   1064                                                               attributes)
   1065             subdirectory_name_parts.append(attributes[attribute])
   1066         base_subdirectory_name = '.'.join(subdirectory_name_parts)
   1067 
   1068         subdirectory = self._unique_subdirectory(base_subdirectory_name)
   1069         subdirectory_path = os.path.join(self.resultdir, subdirectory)
   1070         os.mkdir(subdirectory_path)
   1071 
   1072         self.record(status_code, subdirectory, test_name,
   1073                     status=reason, optional_fields={'is_summary': True})
   1074 
   1075         if attributes:
   1076             utils.write_keyval(subdirectory_path, attributes)
   1077 
   1078         if child_test_ids:
   1079             ids_string = ','.join(str(test_id) for test_id in child_test_ids)
   1080             summary_data = {'child_test_ids': ids_string}
   1081             utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
   1082                                summary_data)
   1083 
   1084 
   1085     def disable_warnings(self, warning_type):
   1086         self.warning_manager.disable_warnings(warning_type)
   1087         self.record("INFO", None, None,
   1088                     "disabling %s warnings" % warning_type,
   1089                     {"warnings.disable": warning_type})
   1090 
   1091 
   1092     def enable_warnings(self, warning_type):
   1093         self.warning_manager.enable_warnings(warning_type)
   1094         self.record("INFO", None, None,
   1095                     "enabling %s warnings" % warning_type,
   1096                     {"warnings.enable": warning_type})
   1097 
   1098 
   1099     def get_status_log_path(self, subdir=None):
   1100         """Return the path to the job status log.
   1101 
   1102         @param subdir - Optional paramter indicating that you want the path
   1103             to a subdirectory status log.
   1104 
   1105         @returns The path where the status log should be.
   1106         """
   1107         if self.resultdir:
   1108             if subdir:
   1109                 return os.path.join(self.resultdir, subdir, "status.log")
   1110             else:
   1111                 return os.path.join(self.resultdir, "status.log")
   1112         else:
   1113             return None
   1114 
   1115 
   1116     def _update_uncollected_logs_list(self, update_func):
   1117         """Updates the uncollected logs list in a multi-process safe manner.
   1118 
   1119         @param update_func - a function that updates the list of uncollected
   1120             logs. Should take one parameter, the list to be updated.
   1121         """
   1122         # Skip log collection if file _uncollected_log_file does not exist.
   1123         if not (self._uncollected_log_file and
   1124                 os.path.exists(self._uncollected_log_file)):
   1125             return
   1126         if self._uncollected_log_file:
   1127             log_file = open(self._uncollected_log_file, "r+")
   1128             fcntl.flock(log_file, fcntl.LOCK_EX)
   1129         try:
   1130             uncollected_logs = pickle.load(log_file)
   1131             update_func(uncollected_logs)
   1132             log_file.seek(0)
   1133             log_file.truncate()
   1134             pickle.dump(uncollected_logs, log_file)
   1135             log_file.flush()
   1136         finally:
   1137             fcntl.flock(log_file, fcntl.LOCK_UN)
   1138             log_file.close()
   1139 
   1140 
   1141     def add_client_log(self, hostname, remote_path, local_path):
   1142         """Adds a new set of client logs to the list of uncollected logs,
   1143         to allow for future log recovery.
   1144 
   1145         @param host - the hostname of the machine holding the logs
   1146         @param remote_path - the directory on the remote machine holding logs
   1147         @param local_path - the local directory to copy the logs into
   1148         """
   1149         def update_func(logs_list):
   1150             logs_list.append((hostname, remote_path, local_path))
   1151         self._update_uncollected_logs_list(update_func)
   1152 
   1153 
   1154     def remove_client_log(self, hostname, remote_path, local_path):
   1155         """Removes a set of client logs from the list of uncollected logs,
   1156         to allow for future log recovery.
   1157 
   1158         @param host - the hostname of the machine holding the logs
   1159         @param remote_path - the directory on the remote machine holding logs
   1160         @param local_path - the local directory to copy the logs into
   1161         """
   1162         def update_func(logs_list):
   1163             logs_list.remove((hostname, remote_path, local_path))
   1164         self._update_uncollected_logs_list(update_func)
   1165 
   1166 
   1167     def get_client_logs(self):
   1168         """Retrieves the list of uncollected logs, if it exists.
   1169 
   1170         @returns A list of (host, remote_path, local_path) tuples. Returns
   1171                  an empty list if no uncollected logs file exists.
   1172         """
   1173         log_exists = (self._uncollected_log_file and
   1174                       os.path.exists(self._uncollected_log_file))
   1175         if log_exists:
   1176             return pickle.load(open(self._uncollected_log_file))
   1177         else:
   1178             return []
   1179 
   1180 
   1181     def _fill_server_control_namespace(self, namespace, protect=True):
   1182         """
   1183         Prepare a namespace to be used when executing server control files.
   1184 
   1185         This sets up the control file API by importing modules and making them
   1186         available under the appropriate names within namespace.
   1187 
   1188         For use by _execute_code().
   1189 
   1190         Args:
   1191           namespace: The namespace dictionary to fill in.
   1192           protect: Boolean.  If True (the default) any operation that would
   1193               clobber an existing entry in namespace will cause an error.
   1194         Raises:
   1195           error.AutoservError: When a name would be clobbered by import.
   1196         """
   1197         def _import_names(module_name, names=()):
   1198             """
   1199             Import a module and assign named attributes into namespace.
   1200 
   1201             Args:
   1202                 module_name: The string module name.
   1203                 names: A limiting list of names to import from module_name.  If
   1204                     empty (the default), all names are imported from the module
   1205                     similar to a "from foo.bar import *" statement.
   1206             Raises:
   1207                 error.AutoservError: When a name being imported would clobber
   1208                     a name already in namespace.
   1209             """
   1210             module = __import__(module_name, {}, {}, names)
   1211 
   1212             # No names supplied?  Import * from the lowest level module.
   1213             # (Ugh, why do I have to implement this part myself?)
   1214             if not names:
   1215                 for submodule_name in module_name.split('.')[1:]:
   1216                     module = getattr(module, submodule_name)
   1217                 if hasattr(module, '__all__'):
   1218                     names = getattr(module, '__all__')
   1219                 else:
   1220                     names = dir(module)
   1221 
   1222             # Install each name into namespace, checking to make sure it
   1223             # doesn't override anything that already exists.
   1224             for name in names:
   1225                 # Check for conflicts to help prevent future problems.
   1226                 if name in namespace and protect:
   1227                     if namespace[name] is not getattr(module, name):
   1228                         raise error.AutoservError('importing name '
   1229                                 '%s from %s %r would override %r' %
   1230                                 (name, module_name, getattr(module, name),
   1231                                  namespace[name]))
   1232                     else:
   1233                         # Encourage cleanliness and the use of __all__ for a
   1234                         # more concrete API with less surprises on '*' imports.
   1235                         warnings.warn('%s (%r) being imported from %s for use '
   1236                                       'in server control files is not the '
   1237                                       'first occurrence of that import.' %
   1238                                       (name, namespace[name], module_name))
   1239 
   1240                 namespace[name] = getattr(module, name)
   1241 
   1242 
   1243         # This is the equivalent of prepending a bunch of import statements to
   1244         # the front of the control script.
   1245         namespace.update(os=os, sys=sys, logging=logging)
   1246         _import_names('autotest_lib.server',
   1247                 ('hosts', 'autotest', 'standalone_profiler'))
   1248         _import_names('autotest_lib.server.subcommand',
   1249                       ('parallel', 'parallel_simple', 'subcommand'))
   1250         _import_names('autotest_lib.server.utils',
   1251                       ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
   1252         _import_names('autotest_lib.client.common_lib.error')
   1253         _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
   1254 
   1255         # Inject ourself as the job object into other classes within the API.
   1256         # (Yuck, this injection is a gross thing be part of a public API. -gps)
   1257         #
   1258         # XXX Base & SiteAutotest do not appear to use .job.  Who does?
   1259         namespace['autotest'].Autotest.job = self
   1260         # server.hosts.base_classes.Host uses .job.
   1261         namespace['hosts'].Host.job = self
   1262         namespace['hosts'].TestBed.job = self
   1263         namespace['hosts'].factory.ssh_user = self._ssh_user
   1264         namespace['hosts'].factory.ssh_port = self._ssh_port
   1265         namespace['hosts'].factory.ssh_pass = self._ssh_pass
   1266         namespace['hosts'].factory.ssh_verbosity_flag = (
   1267                 self._ssh_verbosity_flag)
   1268         namespace['hosts'].factory.ssh_options = self._ssh_options
   1269 
   1270 
   1271     def _execute_code(self, code_file, namespace, protect=True):
   1272         """
   1273         Execute code using a copy of namespace as a server control script.
   1274 
   1275         Unless protect_namespace is explicitly set to False, the dict will not
   1276         be modified.
   1277 
   1278         Args:
   1279           code_file: The filename of the control file to execute.
   1280           namespace: A dict containing names to make available during execution.
   1281           protect: Boolean.  If True (the default) a copy of the namespace dict
   1282               is used during execution to prevent the code from modifying its
   1283               contents outside of this function.  If False the raw dict is
   1284               passed in and modifications will be allowed.
   1285         """
   1286         if protect:
   1287             namespace = namespace.copy()
   1288         self._fill_server_control_namespace(namespace, protect=protect)
   1289         # TODO: Simplify and get rid of the special cases for only 1 machine.
   1290         if len(self.machines) > 1:
   1291             machines_text = '\n'.join(self.machines) + '\n'
   1292             # Only rewrite the file if it does not match our machine list.
   1293             try:
   1294                 machines_f = open(MACHINES_FILENAME, 'r')
   1295                 existing_machines_text = machines_f.read()
   1296                 machines_f.close()
   1297             except EnvironmentError:
   1298                 existing_machines_text = None
   1299             if machines_text != existing_machines_text:
   1300                 utils.open_write_close(MACHINES_FILENAME, machines_text)
   1301         execfile(code_file, namespace, namespace)
   1302 
   1303 
   1304     def _parse_status(self, new_line):
   1305         if not self._using_parser:
   1306             return
   1307         new_tests = self.parser.process_lines([new_line])
   1308         for test in new_tests:
   1309             self.__insert_test(test)
   1310 
   1311 
   1312     def __insert_test(self, test):
   1313         """
   1314         An internal method to insert a new test result into the
   1315         database. This method will not raise an exception, even if an
   1316         error occurs during the insert, to avoid failing a test
   1317         simply because of unexpected database issues."""
   1318         self.num_tests_run += 1
   1319         if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
   1320             self.num_tests_failed += 1
   1321         try:
   1322             self.results_db.insert_test(self.job_model, test)
   1323         except Exception:
   1324             msg = ("WARNING: An unexpected error occured while "
   1325                    "inserting test results into the database. "
   1326                    "Ignoring error.\n" + traceback.format_exc())
   1327             print >> sys.stderr, msg
   1328 
   1329 
   1330     def preprocess_client_state(self):
   1331         """
   1332         Produce a state file for initializing the state of a client job.
   1333 
   1334         Creates a new client state file with all the current server state, as
   1335         well as some pre-set client state.
   1336 
   1337         @returns The path of the file the state was written into.
   1338         """
   1339         # initialize the sysinfo state
   1340         self._state.set('client', 'sysinfo', self.sysinfo.serialize())
   1341 
   1342         # dump the state out to a tempfile
   1343         fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
   1344         os.close(fd)
   1345 
   1346         # write_to_file doesn't need locking, we exclusively own file_path
   1347         self._state.write_to_file(file_path)
   1348         return file_path
   1349 
   1350 
   1351     def postprocess_client_state(self, state_path):
   1352         """
   1353         Update the state of this job with the state from a client job.
   1354 
   1355         Updates the state of the server side of a job with the final state
   1356         of a client job that was run. Updates the non-client-specific state,
   1357         pulls in some specific bits from the client-specific state, and then
   1358         discards the rest. Removes the state file afterwards
   1359 
   1360         @param state_file A path to the state file from the client.
   1361         """
   1362         # update the on-disk state
   1363         try:
   1364             self._state.read_from_file(state_path)
   1365             os.remove(state_path)
   1366         except OSError, e:
   1367             # ignore file-not-found errors
   1368             if e.errno != errno.ENOENT:
   1369                 raise
   1370             else:
   1371                 logging.debug('Client state file %s not found', state_path)
   1372 
   1373         # update the sysinfo state
   1374         if self._state.has('client', 'sysinfo'):
   1375             self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
   1376 
   1377         # drop all the client-specific state
   1378         self._state.discard_namespace('client')
   1379 
   1380 
   1381     def clear_all_known_hosts(self):
   1382         """Clears known hosts files for all AbstractSSHHosts."""
   1383         for host in self.hosts:
   1384             if isinstance(host, abstract_ssh.AbstractSSHHost):
   1385                 host.clear_known_hosts()
   1386 
   1387 
   1388 class warning_manager(object):
   1389     """Class for controlling warning logs. Manages the enabling and disabling
   1390     of warnings."""
   1391     def __init__(self):
   1392         # a map of warning types to a list of disabled time intervals
   1393         self.disabled_warnings = {}
   1394 
   1395 
   1396     def is_valid(self, timestamp, warning_type):
   1397         """Indicates if a warning (based on the time it occured and its type)
   1398         is a valid warning. A warning is considered "invalid" if this type of
   1399         warning was marked as "disabled" at the time the warning occured."""
   1400         disabled_intervals = self.disabled_warnings.get(warning_type, [])
   1401         for start, end in disabled_intervals:
   1402             if timestamp >= start and (end is None or timestamp < end):
   1403                 return False
   1404         return True
   1405 
   1406 
   1407     def disable_warnings(self, warning_type, current_time_func=time.time):
   1408         """As of now, disables all further warnings of this type."""
   1409         intervals = self.disabled_warnings.setdefault(warning_type, [])
   1410         if not intervals or intervals[-1][1] is not None:
   1411             intervals.append((int(current_time_func()), None))
   1412 
   1413 
   1414     def enable_warnings(self, warning_type, current_time_func=time.time):
   1415         """As of now, enables all further warnings of this type."""
   1416         intervals = self.disabled_warnings.get(warning_type, [])
   1417         if intervals and intervals[-1][1] is None:
   1418             intervals[-1] = (intervals[-1][0], int(current_time_func()))
   1419 
   1420 
   1421 def _is_current_server_job(test):
   1422     """Return True if parsed test is the currently running job.
   1423 
   1424     @param test: test instance from tko parser.
   1425     """
   1426     return test.testname == 'SERVER_JOB'
   1427 
   1428 
   1429 def _create_afe_host(hostname, in_lab):
   1430     """Create a real or stub frontend.Host object.
   1431 
   1432     @param hostname: Name of the host for which we want the Host object.
   1433     @param in_lab: (bool) whether we have access to the AFE.
   1434     @returns: An object of type frontend.AFE
   1435     """
   1436     if not in_lab:
   1437         return server_utils.EmptyAFEHost()
   1438 
   1439     afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
   1440     hosts = afe.get_hosts(hostname=hostname)
   1441     if not hosts:
   1442         raise error.AutoservError('No hosts named %s found' % hostname)
   1443 
   1444     return hosts[0]
   1445 
   1446 
   1447 def _create_host_info_store(hostname, in_lab):
   1448     """Create a real or stub afe_store.AfeStore object.
   1449 
   1450     @param hostname: Name of the host for which we want the store.
   1451     @param in_lab: (bool) whether we have access to the AFE.
   1452     @returns: An object of type afe_store.AfeStore
   1453     """
   1454     if not in_lab:
   1455         return host_info.InMemoryHostInfoStore()
   1456 
   1457     host_info_store = afe_store.AfeStore(hostname)
   1458     try:
   1459         host_info_store.get(force_refresh=True)
   1460     except host_info.StoreError:
   1461         raise error.AutoservError('Could not obtain HostInfo for hostname %s' %
   1462                                   hostname)
   1463     return host_info_store
   1464 
   1465 
   1466 # load up site-specific code for generating site-specific job data
   1467 get_site_job_data = utils.import_site_function(__file__,
   1468     "autotest_lib.server.site_server_job", "get_site_job_data",
   1469     _get_site_job_data_dummy)
   1470 
   1471 
   1472 site_server_job = utils.import_site_class(
   1473     __file__, "autotest_lib.server.site_server_job", "site_server_job",
   1474     base_server_job)
   1475 
   1476 
   1477 class server_job(site_server_job):
   1478     pass
   1479