Home | History | Annotate | Download | only in server
      1 # pylint: disable-msg=C0111
      2 
      3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 """
      7 The main job wrapper for the server side.
      8 
      9 This is the core infrastructure. Derived from the client side job.py
     10 
     11 Copyright Martin J. Bligh, Andy Whitcroft 2007
     12 """
     13 
     14 import errno
     15 import fcntl
     16 import getpass
     17 import itertools
     18 import logging
     19 import os
     20 import pickle
     21 import platform
     22 import re
     23 import select
     24 import shutil
     25 import sys
     26 import tempfile
     27 import time
     28 import traceback
     29 import uuid
     30 import warnings
     31 
     32 from autotest_lib.client.bin import sysinfo
     33 from autotest_lib.client.common_lib import base_job
     34 from autotest_lib.client.common_lib import control_data
     35 from autotest_lib.client.common_lib import error
     36 from autotest_lib.client.common_lib import logging_manager
     37 from autotest_lib.client.common_lib import packages
     38 from autotest_lib.client.common_lib import utils
     39 from autotest_lib.server import profilers
     40 from autotest_lib.server import site_gtest_runner
     41 from autotest_lib.server import subcommand
     42 from autotest_lib.server import test
     43 from autotest_lib.server import utils as server_utils
     44 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     45 from autotest_lib.server.hosts import abstract_ssh
     46 from autotest_lib.server.hosts import afe_store
     47 from autotest_lib.server.hosts import file_store
     48 from autotest_lib.server.hosts import shadowing_store
     49 from autotest_lib.server.hosts import factory as host_factory
     50 from autotest_lib.server.hosts import host_info
     51 from autotest_lib.server.hosts import ssh_multiplex
     52 from autotest_lib.tko import models as tko_models
     53 from autotest_lib.tko import parser_lib
     54 
     55 try:
     56     from chromite.lib import metrics
     57 except ImportError:
     58     metrics = utils.metrics_mock
     59 
     60 
     61 def _control_segment_path(name):
     62     """Get the pathname of the named control segment file."""
     63     server_dir = os.path.dirname(os.path.abspath(__file__))
     64     return os.path.join(server_dir, "control_segments", name)
     65 
     66 
     67 CLIENT_CONTROL_FILENAME = 'control'
     68 SERVER_CONTROL_FILENAME = 'control.srv'
     69 MACHINES_FILENAME = '.machines'
     70 
     71 CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
     72 CLIENT_TRAMPOLINE_CONTROL_FILE = _control_segment_path('client_trampoline')
     73 CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
     74 CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
     75 CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
     76 VERIFY_CONTROL_FILE = _control_segment_path('verify')
     77 REPAIR_CONTROL_FILE = _control_segment_path('repair')
     78 PROVISION_CONTROL_FILE = _control_segment_path('provision')
     79 VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
     80 RESET_CONTROL_FILE = _control_segment_path('reset')
     81 GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
     82 
     83 
     84 def get_machine_dicts(machine_names, store_dir, in_lab, use_shadow_store,
     85                       host_attributes=None):
     86     """Converts a list of machine names to list of dicts.
     87 
     88     TODO(crbug.com/678430): This function temporarily has a side effect of
     89     creating files under workdir for backing a FileStore. This side-effect will
     90     go away once callers of autoserv start passing in the FileStore.
     91 
     92     @param machine_names: A list of machine names.
     93     @param store_dir: A directory to contain store backing files.
     94     @param use_shadow_store: If True, we should create a ShadowingStore where
     95             actual store is backed by the AFE but we create a backing file to
     96             shadow the store. If False, backing file should already exist at:
     97             ${store_dir}/${hostname}.store
     98     @param in_lab: A boolean indicating whether we're running in lab.
     99     @param host_attributes: Optional list of host attributes to add for each
    100             host.
    101     @returns: A list of dicts. Each dict has the following keys:
    102             'hostname': Name of the machine originally in machine_names (str).
    103             'afe_host': A frontend.Host object for the machine, or a stub if
    104                     in_lab is false.
    105             'host_info_store': A host_info.CachingHostInfoStore object to obtain
    106                     host information. A stub if in_lab is False.
    107             'connection_pool': ssh_multiplex.ConnectionPool instance to share
    108                     master ssh connection across control scripts. This is set to
    109                     None, and should be overridden for connection sharing.
    110     """
    111     # See autoserv_parser.parse_args. Only one of in_lab or host_attributes can
    112     # be provided.
    113     if in_lab and host_attributes:
    114         raise error.AutoservError(
    115                 'in_lab and host_attribute are mutually exclusive.')
    116 
    117     machine_dict_list = []
    118     for machine in machine_names:
    119         if not in_lab:
    120             afe_host = server_utils.EmptyAFEHost()
    121             host_info_store = host_info.InMemoryHostInfoStore()
    122             if host_attributes is not None:
    123                 afe_host.attributes.update(host_attributes)
    124                 info = host_info.HostInfo(attributes=host_attributes)
    125                 host_info_store.commit(info)
    126         elif use_shadow_store:
    127             afe_host = _create_afe_host(machine)
    128             host_info_store = _create_afe_backed_host_info_store(store_dir,
    129                                                                  machine)
    130         else:
    131             afe_host = server_utils.EmptyAFEHost()
    132             host_info_store = _create_file_backed_host_info_store(store_dir,
    133                                                                   machine)
    134 
    135         machine_dict_list.append({
    136                 'hostname' : machine,
    137                 'afe_host' : afe_host,
    138                 'host_info_store': host_info_store,
    139                 'connection_pool': None,
    140         })
    141 
    142     return machine_dict_list
    143 
    144 
    145 class status_indenter(base_job.status_indenter):
    146     """Provide a simple integer-backed status indenter."""
    147     def __init__(self):
    148         self._indent = 0
    149 
    150 
    151     @property
    152     def indent(self):
    153         return self._indent
    154 
    155 
    156     def increment(self):
    157         self._indent += 1
    158 
    159 
    160     def decrement(self):
    161         self._indent -= 1
    162 
    163 
    164     def get_context(self):
    165         """Returns a context object for use by job.get_record_context."""
    166         class context(object):
    167             def __init__(self, indenter, indent):
    168                 self._indenter = indenter
    169                 self._indent = indent
    170             def restore(self):
    171                 self._indenter._indent = self._indent
    172         return context(self, self._indent)
    173 
    174 
    175 class server_job_record_hook(object):
    176     """The job.record hook for server job. Used to inject WARN messages from
    177     the console or vlm whenever new logs are written, and to echo any logs
    178     to INFO level logging. Implemented as a class so that it can use state to
    179     block recursive calls, so that the hook can call job.record itself to
    180     log WARN messages.
    181 
    182     Depends on job._read_warnings and job._logger.
    183     """
    184     def __init__(self, job):
    185         self._job = job
    186         self._being_called = False
    187 
    188 
    189     def __call__(self, entry):
    190         """A wrapper around the 'real' record hook, the _hook method, which
    191         prevents recursion. This isn't making any effort to be threadsafe,
    192         the intent is to outright block infinite recursion via a
    193         job.record->_hook->job.record->_hook->job.record... chain."""
    194         if self._being_called:
    195             return
    196         self._being_called = True
    197         try:
    198             self._hook(self._job, entry)
    199         finally:
    200             self._being_called = False
    201 
    202 
    203     @staticmethod
    204     def _hook(job, entry):
    205         """The core hook, which can safely call job.record."""
    206         entries = []
    207         # poll all our warning loggers for new warnings
    208         for timestamp, msg in job._read_warnings():
    209             warning_entry = base_job.status_log_entry(
    210                 'WARN', None, None, msg, {}, timestamp=timestamp)
    211             entries.append(warning_entry)
    212             job.record_entry(warning_entry)
    213         # echo rendered versions of all the status logs to info
    214         entries.append(entry)
    215         for entry in entries:
    216             rendered_entry = job._logger.render_entry(entry)
    217             logging.info(rendered_entry)
    218 
    219 
    220 class server_job(base_job.base_job):
    221     """The server-side concrete implementation of base_job.
    222 
    223     Optional properties provided by this implementation:
    224         serverdir
    225 
    226         warning_manager
    227         warning_loggers
    228     """
    229 
    230     _STATUS_VERSION = 1
    231 
    232     # TODO crbug.com/285395 eliminate ssh_verbosity_flag
    233     def __init__(self, control, args, resultdir, label, user, machines,
    234                  machine_dict_list,
    235                  client=False,
    236                  ssh_user=host_factory.DEFAULT_SSH_USER,
    237                  ssh_port=host_factory.DEFAULT_SSH_PORT,
    238                  ssh_pass=host_factory.DEFAULT_SSH_PASS,
    239                  ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
    240                  ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
    241                  group_name='',
    242                  tag='', disable_sysinfo=False,
    243                  control_filename=SERVER_CONTROL_FILENAME,
    244                  parent_job_id=None, in_lab=False,
    245                  use_client_trampoline=False):
    246         """
    247         Create a server side job object.
    248 
    249         @param control: The pathname of the control file.
    250         @param args: Passed to the control file.
    251         @param resultdir: Where to throw the results.
    252         @param label: Description of the job.
    253         @param user: Username for the job (email address).
    254         @param machines: A list of hostnames of the machines to use for the job.
    255         @param machine_dict_list: A list of dicts for each of the machines above
    256                 as returned by get_machine_dicts.
    257         @param client: True if this is a client-side control file.
    258         @param ssh_user: The SSH username.  [root]
    259         @param ssh_port: The SSH port number.  [22]
    260         @param ssh_pass: The SSH passphrase, if needed.
    261         @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
    262                 '-vvv', or an empty string if not needed.
    263         @param ssh_options: A string giving additional options that will be
    264                             included in ssh commands.
    265         @param group_name: If supplied, this will be written out as
    266                 host_group_name in the keyvals file for the parser.
    267         @param tag: The job execution tag from the scheduler.  [optional]
    268         @param disable_sysinfo: Whether we should disable the sysinfo step of
    269                 tests for a modest shortening of test time.  [optional]
    270         @param control_filename: The filename where the server control file
    271                 should be written in the results directory.
    272         @param parent_job_id: Job ID of the parent job. Default to None if the
    273                 job does not have a parent job.
    274         @param in_lab: Boolean that indicates if this is running in the lab
    275                        environment.
    276         @param use_client_trampoline: Boolean that indicates whether to
    277                use the client trampoline flow.  If this is True, control
    278                is interpreted as the name of the client test to run.
    279                The client control file will be client_trampoline.  The
    280                test name will be passed to client_trampoline, which will
    281                install the test package and re-exec the actual test
    282                control file.
    283         """
    284         super(server_job, self).__init__(resultdir=resultdir)
    285         self.control = control
    286         self._uncollected_log_file = os.path.join(self.resultdir,
    287                                                   'uncollected_logs')
    288         debugdir = os.path.join(self.resultdir, 'debug')
    289         if not os.path.exists(debugdir):
    290             os.mkdir(debugdir)
    291 
    292         if user:
    293             self.user = user
    294         else:
    295             self.user = getpass.getuser()
    296 
    297         self.args = args
    298         self.label = label
    299         self.machines = machines
    300         self._client = client
    301         self.warning_loggers = set()
    302         self.warning_manager = warning_manager()
    303         self._ssh_user = ssh_user
    304         self._ssh_port = ssh_port
    305         self._ssh_pass = ssh_pass
    306         self._ssh_verbosity_flag = ssh_verbosity_flag
    307         self._ssh_options = ssh_options
    308         self.tag = tag
    309         self.hosts = set()
    310         self.drop_caches = False
    311         self.drop_caches_between_iterations = False
    312         self._control_filename = control_filename
    313         self._disable_sysinfo = disable_sysinfo
    314         self._use_client_trampoline = use_client_trampoline
    315 
    316         self.logging = logging_manager.get_logging_manager(
    317                 manage_stdout_and_stderr=True, redirect_fds=True)
    318         subcommand.logging_manager_object = self.logging
    319 
    320         self.sysinfo = sysinfo.sysinfo(self.resultdir)
    321         self.profilers = profilers.profilers(self)
    322 
    323         job_data = {'label' : label, 'user' : user,
    324                     'hostname' : ','.join(machines),
    325                     'drone' : platform.node(),
    326                     'status_version' : str(self._STATUS_VERSION),
    327                     'job_started' : str(int(time.time()))}
    328         # Save parent job id to keyvals, so parser can retrieve the info and
    329         # write to tko_jobs record.
    330         if parent_job_id:
    331             job_data['parent_job_id'] = parent_job_id
    332         if group_name:
    333             job_data['host_group_name'] = group_name
    334 
    335         # only write these keyvals out on the first job in a resultdir
    336         if 'job_started' not in utils.read_keyval(self.resultdir):
    337             job_data.update(self._get_job_data())
    338             utils.write_keyval(self.resultdir, job_data)
    339 
    340         self.pkgmgr = packages.PackageManager(
    341             self.autodir, run_function_dargs={'timeout':600})
    342 
    343         self._register_subcommand_hooks()
    344 
    345         # We no longer parse results as part of the server_job. These arguments
    346         # can't be dropped yet because clients haven't all be cleaned up yet.
    347         self.num_tests_run = -1
    348         self.num_tests_failed = -1
    349 
    350         # set up the status logger
    351         self._indenter = status_indenter()
    352         self._logger = base_job.status_logger(
    353             self, self._indenter, 'status.log', 'status.log',
    354             record_hook=server_job_record_hook(self))
    355 
    356         # Initialize a flag to indicate DUT failure during the test, e.g.,
    357         # unexpected reboot.
    358         self.failed_with_device_error = False
    359 
    360         self._connection_pool = ssh_multiplex.ConnectionPool()
    361 
    362         # List of functions to run after the main job function.
    363         self._post_run_hooks = []
    364 
    365         self.parent_job_id = parent_job_id
    366         self.in_lab = in_lab
    367         self.machine_dict_list = machine_dict_list
    368         for machine_dict in self.machine_dict_list:
    369             machine_dict['connection_pool'] = self._connection_pool
    370 
    371         # TODO(jrbarnette) The harness attribute is only relevant to
    372         # client jobs, but it's required to be present, or we will fail
    373         # server job unit tests.  Yes, really.
    374         #
    375         # TODO(jrbarnette) The utility of the 'harness' attribute even
    376         # to client jobs is suspect.  Probably, we should remove it.
    377         self.harness = None
    378 
    379         # TODO(ayatane): fast and max_result_size_KB are not set for
    380         # client_trampoline jobs.
    381         if control and not use_client_trampoline:
    382             parsed_control = control_data.parse_control(
    383                     control, raise_warnings=False)
    384             self.fast = parsed_control.fast
    385             self.max_result_size_KB = parsed_control.max_result_size_KB
    386         else:
    387             self.fast = False
    388             # Set the maximum result size to be the default specified in
    389             # global config, if the job has no control file associated.
    390             self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
    391 
    392 
    393     @classmethod
    394     def _find_base_directories(cls):
    395         """
    396         Determine locations of autodir, clientdir and serverdir. Assumes
    397         that this file is located within serverdir and uses __file__ along
    398         with relative paths to resolve the location.
    399         """
    400         serverdir = os.path.abspath(os.path.dirname(__file__))
    401         autodir = os.path.normpath(os.path.join(serverdir, '..'))
    402         clientdir = os.path.join(autodir, 'client')
    403         return autodir, clientdir, serverdir
    404 
    405 
    406     def _find_resultdir(self, resultdir, *args, **dargs):
    407         """
    408         Determine the location of resultdir. For server jobs we expect one to
    409         always be explicitly passed in to __init__, so just return that.
    410         """
    411         if resultdir:
    412             return os.path.normpath(resultdir)
    413         else:
    414             return None
    415 
    416 
    417     def _get_status_logger(self):
    418         """Return a reference to the status logger."""
    419         return self._logger
    420 
    421 
    422     @staticmethod
    423     def _load_control_file(path):
    424         f = open(path)
    425         try:
    426             control_file = f.read()
    427         finally:
    428             f.close()
    429         return re.sub('\r', '', control_file)
    430 
    431 
    432     def _register_subcommand_hooks(self):
    433         """
    434         Register some hooks into the subcommand modules that allow us
    435         to properly clean up self.hosts created in forked subprocesses.
    436         """
    437         def on_fork(cmd):
    438             self._existing_hosts_on_fork = set(self.hosts)
    439         def on_join(cmd):
    440             new_hosts = self.hosts - self._existing_hosts_on_fork
    441             for host in new_hosts:
    442                 host.close()
    443         subcommand.subcommand.register_fork_hook(on_fork)
    444         subcommand.subcommand.register_join_hook(on_join)
    445 
    446 
    447     # TODO crbug.com/285395 add a kwargs parameter.
    448     def _make_namespace(self):
    449         """Create a namespace dictionary to be passed along to control file.
    450 
    451         Creates a namespace argument populated with standard values:
    452         machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
    453         and ssh_options.
    454         """
    455         namespace = {'machines' : self.machine_dict_list,
    456                      'job' : self,
    457                      'ssh_user' : self._ssh_user,
    458                      'ssh_port' : self._ssh_port,
    459                      'ssh_pass' : self._ssh_pass,
    460                      'ssh_verbosity_flag' : self._ssh_verbosity_flag,
    461                      'ssh_options' : self._ssh_options}
    462         return namespace
    463 
    464 
    465     def cleanup(self, labels):
    466         """Cleanup machines.
    467 
    468         @param labels: Comma separated job labels, will be used to
    469                        determine special task actions.
    470         """
    471         if not self.machines:
    472             raise error.AutoservError('No machines specified to cleanup')
    473         if self.resultdir:
    474             os.chdir(self.resultdir)
    475 
    476         namespace = self._make_namespace()
    477         namespace.update({'job_labels': labels, 'args': ''})
    478         self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
    479 
    480 
    481     def verify(self, labels):
    482         """Verify machines are all ssh-able.
    483 
    484         @param labels: Comma separated job labels, will be used to
    485                        determine special task actions.
    486         """
    487         if not self.machines:
    488             raise error.AutoservError('No machines specified to verify')
    489         if self.resultdir:
    490             os.chdir(self.resultdir)
    491 
    492         namespace = self._make_namespace()
    493         namespace.update({'job_labels': labels, 'args': ''})
    494         self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
    495 
    496 
    497     def reset(self, labels):
    498         """Reset machines by first cleanup then verify each machine.
    499 
    500         @param labels: Comma separated job labels, will be used to
    501                        determine special task actions.
    502         """
    503         if not self.machines:
    504             raise error.AutoservError('No machines specified to reset.')
    505         if self.resultdir:
    506             os.chdir(self.resultdir)
    507 
    508         namespace = self._make_namespace()
    509         namespace.update({'job_labels': labels, 'args': ''})
    510         self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
    511 
    512 
    513     def repair(self, labels):
    514         """Repair machines.
    515 
    516         @param labels: Comma separated job labels, will be used to
    517                        determine special task actions.
    518         """
    519         if not self.machines:
    520             raise error.AutoservError('No machines specified to repair')
    521         if self.resultdir:
    522             os.chdir(self.resultdir)
    523 
    524         namespace = self._make_namespace()
    525         namespace.update({'job_labels': labels, 'args': ''})
    526         self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
    527 
    528 
    529     def provision(self, labels):
    530         """
    531         Provision all hosts to match |labels|.
    532 
    533         @param labels: A comma seperated string of labels to provision the
    534                        host to.
    535 
    536         """
    537         control = self._load_control_file(PROVISION_CONTROL_FILE)
    538         self.run(control=control, job_labels=labels)
    539 
    540 
    541     def precheck(self):
    542         """
    543         perform any additional checks in derived classes.
    544         """
    545         pass
    546 
    547 
    548     def enable_external_logging(self):
    549         """
    550         Start or restart external logging mechanism.
    551         """
    552         pass
    553 
    554 
    555     def disable_external_logging(self):
    556         """
    557         Pause or stop external logging mechanism.
    558         """
    559         pass
    560 
    561 
    562     def use_external_logging(self):
    563         """
    564         Return True if external logging should be used.
    565         """
    566         return False
    567 
    568 
    569     def _make_parallel_wrapper(self, function, machines, log):
    570         """Wrap function as appropriate for calling by parallel_simple."""
    571         # machines could be a list of dictionaries, e.g.,
    572         # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
    573         # The dictionary is generated in server_job.__init__, refer to
    574         # variable machine_dict_list, then passed in with namespace, see method
    575         # server_job._make_namespace.
    576         # To compare the machinese to self.machines, which is a list of machine
    577         # hostname, we need to convert machines back to a list of hostnames.
    578         if (machines and isinstance(machines, list)
    579             and isinstance(machines[0], dict)):
    580             machines = [m['hostname'] for m in machines]
    581         if len(machines) > 1 and log:
    582             def wrapper(machine):
    583                 hostname = server_utils.get_hostname_from_machine(machine)
    584                 self.push_execution_context(hostname)
    585                 os.chdir(self.resultdir)
    586                 machine_data = {'hostname' : hostname,
    587                                 'status_version' : str(self._STATUS_VERSION)}
    588                 utils.write_keyval(self.resultdir, machine_data)
    589                 result = function(machine)
    590                 return result
    591         else:
    592             wrapper = function
    593         return wrapper
    594 
    595 
    596     def parallel_simple(self, function, machines, log=True, timeout=None,
    597                         return_results=False):
    598         """
    599         Run 'function' using parallel_simple, with an extra wrapper to handle
    600         the necessary setup for continuous parsing, if possible. If continuous
    601         parsing is already properly initialized then this should just work.
    602 
    603         @param function: A callable to run in parallel given each machine.
    604         @param machines: A list of machine names to be passed one per subcommand
    605                 invocation of function.
    606         @param log: If True, output will be written to output in a subdirectory
    607                 named after each machine.
    608         @param timeout: Seconds after which the function call should timeout.
    609         @param return_results: If True instead of an AutoServError being raised
    610                 on any error a list of the results|exceptions from the function
    611                 called on each arg is returned.  [default: False]
    612 
    613         @raises error.AutotestError: If any of the functions failed.
    614         """
    615         wrapper = self._make_parallel_wrapper(function, machines, log)
    616         return subcommand.parallel_simple(
    617                 wrapper, machines,
    618                 subdir_name_constructor=server_utils.get_hostname_from_machine,
    619                 log=log, timeout=timeout, return_results=return_results)
    620 
    621 
    622     def parallel_on_machines(self, function, machines, timeout=None):
    623         """
    624         @param function: Called in parallel with one machine as its argument.
    625         @param machines: A list of machines to call function(machine) on.
    626         @param timeout: Seconds after which the function call should timeout.
    627 
    628         @returns A list of machines on which function(machine) returned
    629                 without raising an exception.
    630         """
    631         results = self.parallel_simple(function, machines, timeout=timeout,
    632                                        return_results=True)
    633         success_machines = []
    634         for result, machine in itertools.izip(results, machines):
    635             if not isinstance(result, Exception):
    636                 success_machines.append(machine)
    637         return success_machines
    638 
    639 
    640     def record_skipped_test(self, skipped_test, message=None):
    641         """Insert a failure record into status.log for this test."""
    642         msg = message
    643         if msg is None:
    644             msg = 'No valid machines found for test %s.' % skipped_test
    645         logging.info(msg)
    646         self.record('START', None, skipped_test.test_name)
    647         self.record('INFO', None, skipped_test.test_name, msg)
    648         self.record('END TEST_NA', None, skipped_test.test_name, msg)
    649 
    650 
    651     def _has_failed_tests(self):
    652         """Parse status log for failed tests.
    653 
    654         This checks the current working directory and is intended only for use
    655         by the run() method.
    656 
    657         @return boolean
    658         """
    659         path = os.getcwd()
    660 
    661         # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
    662         # make code reuse plausible.
    663         job_keyval = tko_models.job.read_keyval(path)
    664         status_version = job_keyval.get("status_version", 0)
    665 
    666         # parse out the job
    667         parser = parser_lib.parser(status_version)
    668         job = parser.make_job(path)
    669         status_log = os.path.join(path, "status.log")
    670         if not os.path.exists(status_log):
    671             status_log = os.path.join(path, "status")
    672         if not os.path.exists(status_log):
    673             logging.warning("! Unable to parse job, no status file")
    674             return True
    675 
    676         # parse the status logs
    677         status_lines = open(status_log).readlines()
    678         parser.start(job)
    679         tests = parser.end(status_lines)
    680 
    681         # parser.end can return the same object multiple times, so filter out
    682         # dups
    683         job.tests = []
    684         already_added = set()
    685         for test in tests:
    686             if test not in already_added:
    687                 already_added.add(test)
    688                 job.tests.append(test)
    689 
    690         failed = False
    691         for test in job.tests:
    692             # The current job is still running and shouldn't count as failed.
    693             # The parser will fail to parse the exit status of the job since it
    694             # hasn't exited yet (this running right now is the job).
    695             failed = failed or (test.status != 'GOOD'
    696                                 and not _is_current_server_job(test))
    697         return failed
    698 
    699 
    700     def _collect_crashes(self, namespace, collect_crashinfo):
    701         """Collect crashes.
    702 
    703         @param namespace: namespace dict.
    704         @param collect_crashinfo: whether to collect crashinfo in addition to
    705                 dumps
    706         """
    707         if collect_crashinfo:
    708             # includes crashdumps
    709             crash_control_file = CRASHINFO_CONTROL_FILE
    710         else:
    711             crash_control_file = CRASHDUMPS_CONTROL_FILE
    712         self._execute_code(crash_control_file, namespace)
    713 
    714 
    715     _USE_TEMP_DIR = object()
    716     def run(self, collect_crashdumps=True, namespace={}, control=None,
    717             control_file_dir=None, verify_job_repo_url=False,
    718             only_collect_crashinfo=False, skip_crash_collection=False,
    719             job_labels='', use_packaging=True):
    720         # for a normal job, make sure the uncollected logs file exists
    721         # for a crashinfo-only run it should already exist, bail out otherwise
    722         created_uncollected_logs = False
    723         logging.info("I am PID %s", os.getpid())
    724         if self.resultdir and not os.path.exists(self._uncollected_log_file):
    725             if only_collect_crashinfo:
    726                 # if this is a crashinfo-only run, and there were no existing
    727                 # uncollected logs, just bail out early
    728                 logging.info("No existing uncollected logs, "
    729                              "skipping crashinfo collection")
    730                 return
    731             else:
    732                 log_file = open(self._uncollected_log_file, "w")
    733                 pickle.dump([], log_file)
    734                 log_file.close()
    735                 created_uncollected_logs = True
    736 
    737         # use a copy so changes don't affect the original dictionary
    738         namespace = namespace.copy()
    739         machines = self.machines
    740         if control is None:
    741             if self.control is None:
    742                 control = ''
    743             elif self._use_client_trampoline:
    744                 control = self._load_control_file(
    745                         CLIENT_TRAMPOLINE_CONTROL_FILE)
    746                 # repr of a string is safe for eval.
    747                 control = (('trampoline_testname = %r\n' % str(self.control))
    748                            + control)
    749             else:
    750                 control = self._load_control_file(self.control)
    751         if control_file_dir is None:
    752             control_file_dir = self.resultdir
    753 
    754         self.aborted = False
    755         namespace.update(self._make_namespace())
    756         namespace.update({
    757                 'args': self.args,
    758                 'job_labels': job_labels,
    759                 'gtest_runner': site_gtest_runner.gtest_runner(),
    760         })
    761         test_start_time = int(time.time())
    762 
    763         if self.resultdir:
    764             os.chdir(self.resultdir)
    765             # touch status.log so that the parser knows a job is running here
    766             open(self.get_status_log_path(), 'a').close()
    767             self.enable_external_logging()
    768 
    769         collect_crashinfo = True
    770         temp_control_file_dir = None
    771         try:
    772             try:
    773                 if not self.fast:
    774                     with metrics.SecondsTimer(
    775                             'chromeos/autotest/job/get_network_stats',
    776                             fields = {'stage': 'start'}):
    777                         namespace['network_stats_label'] = 'at-start'
    778                         self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
    779                                            namespace)
    780 
    781                 if only_collect_crashinfo:
    782                     return
    783 
    784                 # If the verify_job_repo_url option is set but we're unable
    785                 # to actually verify that the job_repo_url contains the autotest
    786                 # package, this job will fail.
    787                 if verify_job_repo_url:
    788                     self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
    789                                        namespace)
    790                 else:
    791                     logging.warning('Not checking if job_repo_url contains '
    792                                     'autotest packages on %s', machines)
    793 
    794                 # determine the dir to write the control files to
    795                 cfd_specified = (control_file_dir
    796                                  and control_file_dir is not self._USE_TEMP_DIR)
    797                 if cfd_specified:
    798                     temp_control_file_dir = None
    799                 else:
    800                     temp_control_file_dir = tempfile.mkdtemp(
    801                         suffix='temp_control_file_dir')
    802                     control_file_dir = temp_control_file_dir
    803                 server_control_file = os.path.join(control_file_dir,
    804                                                    self._control_filename)
    805                 client_control_file = os.path.join(control_file_dir,
    806                                                    CLIENT_CONTROL_FILENAME)
    807                 if self._client:
    808                     namespace['control'] = control
    809                     utils.open_write_close(client_control_file, control)
    810                     shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
    811                                     server_control_file)
    812                 else:
    813                     utils.open_write_close(server_control_file, control)
    814 
    815                 logging.info("Processing control file")
    816                 namespace['use_packaging'] = use_packaging
    817                 self._execute_code(server_control_file, namespace)
    818                 logging.info("Finished processing control file")
    819 
    820                 # If no device error occured, no need to collect crashinfo.
    821                 collect_crashinfo = self.failed_with_device_error
    822             except Exception as e:
    823                 try:
    824                     logging.exception(
    825                             'Exception escaped control file, job aborting:')
    826                     reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
    827                                     ' ', str(e))
    828                     self.record('INFO', None, None, str(e),
    829                                 {'job_abort_reason': reason})
    830                 except:
    831                     pass # don't let logging exceptions here interfere
    832                 raise
    833         finally:
    834             if temp_control_file_dir:
    835                 # Clean up temp directory used for copies of the control files
    836                 try:
    837                     shutil.rmtree(temp_control_file_dir)
    838                 except Exception as e:
    839                     logging.warning('Could not remove temp directory %s: %s',
    840                                  temp_control_file_dir, e)
    841 
    842             if machines and (collect_crashdumps or collect_crashinfo):
    843                 if skip_crash_collection or self.fast:
    844                     logging.info('Skipping crash dump/info collection '
    845                                  'as requested.')
    846                 else:
    847                     with metrics.SecondsTimer(
    848                             'chromeos/autotest/job/collect_crashinfo'):
    849                         namespace['test_start_time'] = test_start_time
    850                         # Remove crash files for passing tests.
    851                         # TODO(ayatane): Tests that create crash files should be
    852                         # reported.
    853                         namespace['has_failed_tests'] = self._has_failed_tests()
    854                         self._collect_crashes(namespace, collect_crashinfo)
    855             self.disable_external_logging()
    856             if self._uncollected_log_file and created_uncollected_logs:
    857                 os.remove(self._uncollected_log_file)
    858 
    859             if not self.fast:
    860                 with metrics.SecondsTimer(
    861                         'chromeos/autotest/job/get_network_stats',
    862                         fields = {'stage': 'end'}):
    863                     namespace['network_stats_label'] = 'at-end'
    864                     self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
    865                                        namespace)
    866 
    867 
    868     def run_test(self, url, *args, **dargs):
    869         """
    870         Summon a test object and run it.
    871 
    872         tag
    873                 tag to add to testname
    874         url
    875                 url of the test to run
    876         """
    877         if self._disable_sysinfo:
    878             dargs['disable_sysinfo'] = True
    879 
    880         group, testname = self.pkgmgr.get_package_name(url, 'test')
    881         testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
    882         outputdir = self._make_test_outputdir(subdir)
    883 
    884         def group_func():
    885             try:
    886                 test.runtest(self, url, tag, args, dargs)
    887             except error.TestBaseException as e:
    888                 self.record(e.exit_status, subdir, testname, str(e))
    889                 raise
    890             except Exception as e:
    891                 info = str(e) + "\n" + traceback.format_exc()
    892                 self.record('FAIL', subdir, testname, info)
    893                 raise
    894             else:
    895                 self.record('GOOD', subdir, testname, 'completed successfully')
    896 
    897         try:
    898             result = self._run_group(testname, subdir, group_func)
    899         except error.TestBaseException as e:
    900             return False
    901         else:
    902             return True
    903 
    904 
    905     def _run_group(self, name, subdir, function, *args, **dargs):
    906         """Underlying method for running something inside of a group."""
    907         result, exc_info = None, None
    908         try:
    909             self.record('START', subdir, name)
    910             result = function(*args, **dargs)
    911         except error.TestBaseException as e:
    912             self.record("END %s" % e.exit_status, subdir, name)
    913             raise
    914         except Exception as e:
    915             err_msg = str(e) + '\n'
    916             err_msg += traceback.format_exc()
    917             self.record('END ABORT', subdir, name, err_msg)
    918             raise error.JobError(name + ' failed\n' + traceback.format_exc())
    919         else:
    920             self.record('END GOOD', subdir, name)
    921         finally:
    922             for hook in self._post_run_hooks:
    923                 hook()
    924 
    925         return result
    926 
    927 
    928     def run_group(self, function, *args, **dargs):
    929         """\
    930         @param function: subroutine to run
    931         @returns: (result, exc_info). When the call succeeds, result contains
    932                 the return value of |function| and exc_info is None. If
    933                 |function| raises an exception, exc_info contains the tuple
    934                 returned by sys.exc_info(), and result is None.
    935         """
    936 
    937         name = function.__name__
    938         # Allow the tag for the group to be specified.
    939         tag = dargs.pop('tag', None)
    940         if tag:
    941             name = tag
    942 
    943         try:
    944             result = self._run_group(name, None, function, *args, **dargs)[0]
    945         except error.TestBaseException:
    946             return None, sys.exc_info()
    947         return result, None
    948 
    949 
    950     def run_op(self, op, op_func, get_kernel_func):
    951         """\
    952         A specialization of run_group meant specifically for handling
    953         management operation. Includes support for capturing the kernel version
    954         after the operation.
    955 
    956         Args:
    957            op: name of the operation.
    958            op_func: a function that carries out the operation (reboot, suspend)
    959            get_kernel_func: a function that returns a string
    960                             representing the kernel version.
    961         """
    962         try:
    963             self.record('START', None, op)
    964             op_func()
    965         except Exception as e:
    966             err_msg = str(e) + '\n' + traceback.format_exc()
    967             self.record('END FAIL', None, op, err_msg)
    968             raise
    969         else:
    970             kernel = get_kernel_func()
    971             self.record('END GOOD', None, op,
    972                         optional_fields={"kernel": kernel})
    973 
    974 
    975     def run_control(self, path):
    976         """Execute a control file found at path (relative to the autotest
    977         path). Intended for executing a control file within a control file,
    978         not for running the top-level job control file."""
    979         path = os.path.join(self.autodir, path)
    980         control_file = self._load_control_file(path)
    981         self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
    982 
    983 
    984     def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
    985         self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
    986                                    on_every_test)
    987 
    988 
    989     def add_sysinfo_logfile(self, file, on_every_test=False):
    990         self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
    991 
    992 
    993     def _add_sysinfo_loggable(self, loggable, on_every_test):
    994         if on_every_test:
    995             self.sysinfo.test_loggables.add(loggable)
    996         else:
    997             self.sysinfo.boot_loggables.add(loggable)
    998 
    999 
   1000     def _read_warnings(self):
   1001         """Poll all the warning loggers and extract any new warnings that have
   1002         been logged. If the warnings belong to a category that is currently
   1003         disabled, this method will discard them and they will no longer be
   1004         retrievable.
   1005 
   1006         Returns a list of (timestamp, message) tuples, where timestamp is an
   1007         integer epoch timestamp."""
   1008         warnings = []
   1009         while True:
   1010             # pull in a line of output from every logger that has
   1011             # output ready to be read
   1012             loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
   1013             closed_loggers = set()
   1014             for logger in loggers:
   1015                 line = logger.readline()
   1016                 # record any broken pipes (aka line == empty)
   1017                 if len(line) == 0:
   1018                     closed_loggers.add(logger)
   1019                     continue
   1020                 # parse out the warning
   1021                 timestamp, msgtype, msg = line.split('\t', 2)
   1022                 timestamp = int(timestamp)
   1023                 # if the warning is valid, add it to the results
   1024                 if self.warning_manager.is_valid(timestamp, msgtype):
   1025                     warnings.append((timestamp, msg.strip()))
   1026 
   1027             # stop listening to loggers that are closed
   1028             self.warning_loggers -= closed_loggers
   1029 
   1030             # stop if none of the loggers have any output left
   1031             if not loggers:
   1032                 break
   1033 
   1034         # sort into timestamp order
   1035         warnings.sort()
   1036         return warnings
   1037 
   1038 
   1039     def _unique_subdirectory(self, base_subdirectory_name):
   1040         """Compute a unique results subdirectory based on the given name.
   1041 
   1042         Appends base_subdirectory_name with a number as necessary to find a
   1043         directory name that doesn't already exist.
   1044         """
   1045         subdirectory = base_subdirectory_name
   1046         counter = 1
   1047         while os.path.exists(os.path.join(self.resultdir, subdirectory)):
   1048             subdirectory = base_subdirectory_name + '.' + str(counter)
   1049             counter += 1
   1050         return subdirectory
   1051 
   1052 
   1053     def get_record_context(self):
   1054         """Returns an object representing the current job.record context.
   1055 
   1056         The object returned is an opaque object with a 0-arg restore method
   1057         which can be called to restore the job.record context (i.e. indentation)
   1058         to the current level. The intention is that it should be used when
   1059         something external which generate job.record calls (e.g. an autotest
   1060         client) can fail catastrophically and the server job record state
   1061         needs to be reset to its original "known good" state.
   1062 
   1063         @return: A context object with a 0-arg restore() method."""
   1064         return self._indenter.get_context()
   1065 
   1066 
   1067     def record_summary(self, status_code, test_name, reason='', attributes=None,
   1068                        distinguishing_attributes=(), child_test_ids=None):
   1069         """Record a summary test result.
   1070 
   1071         @param status_code: status code string, see
   1072                 common_lib.log.is_valid_status()
   1073         @param test_name: name of the test
   1074         @param reason: (optional) string providing detailed reason for test
   1075                 outcome
   1076         @param attributes: (optional) dict of string keyvals to associate with
   1077                 this result
   1078         @param distinguishing_attributes: (optional) list of attribute names
   1079                 that should be used to distinguish identically-named test
   1080                 results.  These attributes should be present in the attributes
   1081                 parameter.  This is used to generate user-friendly subdirectory
   1082                 names.
   1083         @param child_test_ids: (optional) list of test indices for test results
   1084                 used in generating this result.
   1085         """
   1086         subdirectory_name_parts = [test_name]
   1087         for attribute in distinguishing_attributes:
   1088             assert attributes
   1089             assert attribute in attributes, '%s not in %s' % (attribute,
   1090                                                               attributes)
   1091             subdirectory_name_parts.append(attributes[attribute])
   1092         base_subdirectory_name = '.'.join(subdirectory_name_parts)
   1093 
   1094         subdirectory = self._unique_subdirectory(base_subdirectory_name)
   1095         subdirectory_path = os.path.join(self.resultdir, subdirectory)
   1096         os.mkdir(subdirectory_path)
   1097 
   1098         self.record(status_code, subdirectory, test_name,
   1099                     status=reason, optional_fields={'is_summary': True})
   1100 
   1101         if attributes:
   1102             utils.write_keyval(subdirectory_path, attributes)
   1103 
   1104         if child_test_ids:
   1105             ids_string = ','.join(str(test_id) for test_id in child_test_ids)
   1106             summary_data = {'child_test_ids': ids_string}
   1107             utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
   1108                                summary_data)
   1109 
   1110 
   1111     def add_post_run_hook(self, hook):
   1112         """
   1113         Registers a hook to run after the main job function.
   1114 
   1115         This provides a mechanism by which tests that perform multiple tests of
   1116         their own can write additional top-level results to the TKO status.log
   1117         file.
   1118 
   1119         @param hook: Function to invoke (without any args) after the main job
   1120             function completes and the job status is logged.
   1121         """
   1122         self._post_run_hooks.append(hook)
   1123 
   1124 
   1125     def disable_warnings(self, warning_type):
   1126         self.warning_manager.disable_warnings(warning_type)
   1127         self.record("INFO", None, None,
   1128                     "disabling %s warnings" % warning_type,
   1129                     {"warnings.disable": warning_type})
   1130 
   1131 
   1132     def enable_warnings(self, warning_type):
   1133         self.warning_manager.enable_warnings(warning_type)
   1134         self.record("INFO", None, None,
   1135                     "enabling %s warnings" % warning_type,
   1136                     {"warnings.enable": warning_type})
   1137 
   1138 
   1139     def get_status_log_path(self, subdir=None):
   1140         """Return the path to the job status log.
   1141 
   1142         @param subdir - Optional paramter indicating that you want the path
   1143             to a subdirectory status log.
   1144 
   1145         @returns The path where the status log should be.
   1146         """
   1147         if self.resultdir:
   1148             if subdir:
   1149                 return os.path.join(self.resultdir, subdir, "status.log")
   1150             else:
   1151                 return os.path.join(self.resultdir, "status.log")
   1152         else:
   1153             return None
   1154 
   1155 
   1156     def _update_uncollected_logs_list(self, update_func):
   1157         """Updates the uncollected logs list in a multi-process safe manner.
   1158 
   1159         @param update_func - a function that updates the list of uncollected
   1160             logs. Should take one parameter, the list to be updated.
   1161         """
   1162         # Skip log collection if file _uncollected_log_file does not exist.
   1163         if not (self._uncollected_log_file and
   1164                 os.path.exists(self._uncollected_log_file)):
   1165             return
   1166         if self._uncollected_log_file:
   1167             log_file = open(self._uncollected_log_file, "r+")
   1168             fcntl.flock(log_file, fcntl.LOCK_EX)
   1169         try:
   1170             uncollected_logs = pickle.load(log_file)
   1171             update_func(uncollected_logs)
   1172             log_file.seek(0)
   1173             log_file.truncate()
   1174             pickle.dump(uncollected_logs, log_file)
   1175             log_file.flush()
   1176         finally:
   1177             fcntl.flock(log_file, fcntl.LOCK_UN)
   1178             log_file.close()
   1179 
   1180 
   1181     def add_client_log(self, hostname, remote_path, local_path):
   1182         """Adds a new set of client logs to the list of uncollected logs,
   1183         to allow for future log recovery.
   1184 
   1185         @param host - the hostname of the machine holding the logs
   1186         @param remote_path - the directory on the remote machine holding logs
   1187         @param local_path - the local directory to copy the logs into
   1188         """
   1189         def update_func(logs_list):
   1190             logs_list.append((hostname, remote_path, local_path))
   1191         self._update_uncollected_logs_list(update_func)
   1192 
   1193 
   1194     def remove_client_log(self, hostname, remote_path, local_path):
   1195         """Removes a set of client logs from the list of uncollected logs,
   1196         to allow for future log recovery.
   1197 
   1198         @param host - the hostname of the machine holding the logs
   1199         @param remote_path - the directory on the remote machine holding logs
   1200         @param local_path - the local directory to copy the logs into
   1201         """
   1202         def update_func(logs_list):
   1203             logs_list.remove((hostname, remote_path, local_path))
   1204         self._update_uncollected_logs_list(update_func)
   1205 
   1206 
   1207     def get_client_logs(self):
   1208         """Retrieves the list of uncollected logs, if it exists.
   1209 
   1210         @returns A list of (host, remote_path, local_path) tuples. Returns
   1211                  an empty list if no uncollected logs file exists.
   1212         """
   1213         log_exists = (self._uncollected_log_file and
   1214                       os.path.exists(self._uncollected_log_file))
   1215         if log_exists:
   1216             return pickle.load(open(self._uncollected_log_file))
   1217         else:
   1218             return []
   1219 
   1220 
   1221     def _fill_server_control_namespace(self, namespace, protect=True):
   1222         """
   1223         Prepare a namespace to be used when executing server control files.
   1224 
   1225         This sets up the control file API by importing modules and making them
   1226         available under the appropriate names within namespace.
   1227 
   1228         For use by _execute_code().
   1229 
   1230         Args:
   1231           namespace: The namespace dictionary to fill in.
   1232           protect: Boolean.  If True (the default) any operation that would
   1233               clobber an existing entry in namespace will cause an error.
   1234         Raises:
   1235           error.AutoservError: When a name would be clobbered by import.
   1236         """
   1237         def _import_names(module_name, names=()):
   1238             """
   1239             Import a module and assign named attributes into namespace.
   1240 
   1241             Args:
   1242                 module_name: The string module name.
   1243                 names: A limiting list of names to import from module_name.  If
   1244                     empty (the default), all names are imported from the module
   1245                     similar to a "from foo.bar import *" statement.
   1246             Raises:
   1247                 error.AutoservError: When a name being imported would clobber
   1248                     a name already in namespace.
   1249             """
   1250             module = __import__(module_name, {}, {}, names)
   1251 
   1252             # No names supplied?  Import * from the lowest level module.
   1253             # (Ugh, why do I have to implement this part myself?)
   1254             if not names:
   1255                 for submodule_name in module_name.split('.')[1:]:
   1256                     module = getattr(module, submodule_name)
   1257                 if hasattr(module, '__all__'):
   1258                     names = getattr(module, '__all__')
   1259                 else:
   1260                     names = dir(module)
   1261 
   1262             # Install each name into namespace, checking to make sure it
   1263             # doesn't override anything that already exists.
   1264             for name in names:
   1265                 # Check for conflicts to help prevent future problems.
   1266                 if name in namespace and protect:
   1267                     if namespace[name] is not getattr(module, name):
   1268                         raise error.AutoservError('importing name '
   1269                                 '%s from %s %r would override %r' %
   1270                                 (name, module_name, getattr(module, name),
   1271                                  namespace[name]))
   1272                     else:
   1273                         # Encourage cleanliness and the use of __all__ for a
   1274                         # more concrete API with less surprises on '*' imports.
   1275                         warnings.warn('%s (%r) being imported from %s for use '
   1276                                       'in server control files is not the '
   1277                                       'first occurrence of that import.' %
   1278                                       (name, namespace[name], module_name))
   1279 
   1280                 namespace[name] = getattr(module, name)
   1281 
   1282 
   1283         # This is the equivalent of prepending a bunch of import statements to
   1284         # the front of the control script.
   1285         namespace.update(os=os, sys=sys, logging=logging)
   1286         _import_names('autotest_lib.server',
   1287                 ('hosts', 'autotest', 'standalone_profiler'))
   1288         _import_names('autotest_lib.server.subcommand',
   1289                       ('parallel', 'parallel_simple', 'subcommand'))
   1290         _import_names('autotest_lib.server.utils',
   1291                       ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
   1292         _import_names('autotest_lib.client.common_lib.error')
   1293         _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
   1294 
   1295         # Inject ourself as the job object into other classes within the API.
   1296         # (Yuck, this injection is a gross thing be part of a public API. -gps)
   1297         #
   1298         # XXX Autotest does not appear to use .job.  Who does?
   1299         namespace['autotest'].Autotest.job = self
   1300         # server.hosts.base_classes.Host uses .job.
   1301         namespace['hosts'].Host.job = self
   1302         namespace['hosts'].factory.ssh_user = self._ssh_user
   1303         namespace['hosts'].factory.ssh_port = self._ssh_port
   1304         namespace['hosts'].factory.ssh_pass = self._ssh_pass
   1305         namespace['hosts'].factory.ssh_verbosity_flag = (
   1306                 self._ssh_verbosity_flag)
   1307         namespace['hosts'].factory.ssh_options = self._ssh_options
   1308 
   1309 
   1310     def _execute_code(self, code_file, namespace, protect=True):
   1311         """
   1312         Execute code using a copy of namespace as a server control script.
   1313 
   1314         Unless protect_namespace is explicitly set to False, the dict will not
   1315         be modified.
   1316 
   1317         Args:
   1318           code_file: The filename of the control file to execute.
   1319           namespace: A dict containing names to make available during execution.
   1320           protect: Boolean.  If True (the default) a copy of the namespace dict
   1321               is used during execution to prevent the code from modifying its
   1322               contents outside of this function.  If False the raw dict is
   1323               passed in and modifications will be allowed.
   1324         """
   1325         if protect:
   1326             namespace = namespace.copy()
   1327         self._fill_server_control_namespace(namespace, protect=protect)
   1328         # TODO: Simplify and get rid of the special cases for only 1 machine.
   1329         if len(self.machines) > 1:
   1330             machines_text = '\n'.join(self.machines) + '\n'
   1331             # Only rewrite the file if it does not match our machine list.
   1332             try:
   1333                 machines_f = open(MACHINES_FILENAME, 'r')
   1334                 existing_machines_text = machines_f.read()
   1335                 machines_f.close()
   1336             except EnvironmentError:
   1337                 existing_machines_text = None
   1338             if machines_text != existing_machines_text:
   1339                 utils.open_write_close(MACHINES_FILENAME, machines_text)
   1340         execfile(code_file, namespace, namespace)
   1341 
   1342 
   1343     def preprocess_client_state(self):
   1344         """
   1345         Produce a state file for initializing the state of a client job.
   1346 
   1347         Creates a new client state file with all the current server state, as
   1348         well as some pre-set client state.
   1349 
   1350         @returns The path of the file the state was written into.
   1351         """
   1352         # initialize the sysinfo state
   1353         self._state.set('client', 'sysinfo', self.sysinfo.serialize())
   1354 
   1355         # dump the state out to a tempfile
   1356         fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
   1357         os.close(fd)
   1358 
   1359         # write_to_file doesn't need locking, we exclusively own file_path
   1360         self._state.write_to_file(file_path)
   1361         return file_path
   1362 
   1363 
   1364     def postprocess_client_state(self, state_path):
   1365         """
   1366         Update the state of this job with the state from a client job.
   1367 
   1368         Updates the state of the server side of a job with the final state
   1369         of a client job that was run. Updates the non-client-specific state,
   1370         pulls in some specific bits from the client-specific state, and then
   1371         discards the rest. Removes the state file afterwards
   1372 
   1373         @param state_file A path to the state file from the client.
   1374         """
   1375         # update the on-disk state
   1376         try:
   1377             self._state.read_from_file(state_path)
   1378             os.remove(state_path)
   1379         except OSError, e:
   1380             # ignore file-not-found errors
   1381             if e.errno != errno.ENOENT:
   1382                 raise
   1383             else:
   1384                 logging.debug('Client state file %s not found', state_path)
   1385 
   1386         # update the sysinfo state
   1387         if self._state.has('client', 'sysinfo'):
   1388             self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
   1389 
   1390         # drop all the client-specific state
   1391         self._state.discard_namespace('client')
   1392 
   1393 
   1394     def clear_all_known_hosts(self):
   1395         """Clears known hosts files for all AbstractSSHHosts."""
   1396         for host in self.hosts:
   1397             if isinstance(host, abstract_ssh.AbstractSSHHost):
   1398                 host.clear_known_hosts()
   1399 
   1400 
   1401     def close(self):
   1402         """Closes this job's operation."""
   1403 
   1404         # Use shallow copy, because host.close() internally discards itself.
   1405         for host in list(self.hosts):
   1406             host.close()
   1407         assert not self.hosts
   1408         self._connection_pool.shutdown()
   1409 
   1410 
   1411     def _get_job_data(self):
   1412         """Add custom data to the job keyval info.
   1413 
   1414         When multiple machines are used in a job, change the hostname to
   1415         the platform of the first machine instead of machine1,machine2,...  This
   1416         makes the job reports easier to read and keeps the tko_machines table from
   1417         growing too large.
   1418 
   1419         Returns:
   1420             keyval dictionary with new hostname value, or empty dictionary.
   1421         """
   1422         job_data = {}
   1423         # Only modify hostname on multimachine jobs. Assume all host have the same
   1424         # platform.
   1425         if len(self.machines) > 1:
   1426             # Search through machines for first machine with a platform.
   1427             for host in self.machines:
   1428                 keyval_path = os.path.join(self.resultdir, 'host_keyvals', host)
   1429                 keyvals = utils.read_keyval(keyval_path)
   1430                 host_plat = keyvals.get('platform', None)
   1431                 if not host_plat:
   1432                     continue
   1433                 job_data['hostname'] = host_plat
   1434                 break
   1435         return job_data
   1436 
   1437 
   1438 class warning_manager(object):
   1439     """Class for controlling warning logs. Manages the enabling and disabling
   1440     of warnings."""
   1441     def __init__(self):
   1442         # a map of warning types to a list of disabled time intervals
   1443         self.disabled_warnings = {}
   1444 
   1445 
   1446     def is_valid(self, timestamp, warning_type):
   1447         """Indicates if a warning (based on the time it occured and its type)
   1448         is a valid warning. A warning is considered "invalid" if this type of
   1449         warning was marked as "disabled" at the time the warning occured."""
   1450         disabled_intervals = self.disabled_warnings.get(warning_type, [])
   1451         for start, end in disabled_intervals:
   1452             if timestamp >= start and (end is None or timestamp < end):
   1453                 return False
   1454         return True
   1455 
   1456 
   1457     def disable_warnings(self, warning_type, current_time_func=time.time):
   1458         """As of now, disables all further warnings of this type."""
   1459         intervals = self.disabled_warnings.setdefault(warning_type, [])
   1460         if not intervals or intervals[-1][1] is not None:
   1461             intervals.append((int(current_time_func()), None))
   1462 
   1463 
   1464     def enable_warnings(self, warning_type, current_time_func=time.time):
   1465         """As of now, enables all further warnings of this type."""
   1466         intervals = self.disabled_warnings.get(warning_type, [])
   1467         if intervals and intervals[-1][1] is None:
   1468             intervals[-1] = (intervals[-1][0], int(current_time_func()))
   1469 
   1470 
   1471 def _is_current_server_job(test):
   1472     """Return True if parsed test is the currently running job.
   1473 
   1474     @param test: test instance from tko parser.
   1475     """
   1476     return test.testname == 'SERVER_JOB'
   1477 
   1478 
   1479 def _create_afe_host(hostname):
   1480     """Create an afe_host object backed by the AFE.
   1481 
   1482     @param hostname: Name of the host for which we want the Host object.
   1483     @returns: An object of type frontend.AFE
   1484     """
   1485     afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
   1486     hosts = afe.get_hosts(hostname=hostname)
   1487     if not hosts:
   1488         raise error.AutoservError('No hosts named %s found' % hostname)
   1489 
   1490     return hosts[0]
   1491 
   1492 
   1493 def _create_file_backed_host_info_store(store_dir, hostname):
   1494     """Create a CachingHostInfoStore backed by an existing file.
   1495 
   1496     @param store_dir: A directory to contain store backing files.
   1497     @param hostname: Name of the host for which we want the store.
   1498 
   1499     @returns: An object of type CachingHostInfoStore.
   1500     """
   1501     backing_file_path = os.path.join(store_dir, '%s.store' % hostname)
   1502     if not os.path.isfile(backing_file_path):
   1503         raise error.AutoservError(
   1504                 'Requested FileStore but no backing file at %s'
   1505                 % backing_file_path
   1506         )
   1507     return file_store.FileStore(backing_file_path)
   1508 
   1509 
   1510 def _create_afe_backed_host_info_store(store_dir, hostname):
   1511     """Create a CachingHostInfoStore backed by the AFE.
   1512 
   1513     @param store_dir: A directory to contain store backing files.
   1514     @param hostname: Name of the host for which we want the store.
   1515 
   1516     @returns: An object of type CachingHostInfoStore.
   1517     """
   1518     primary_store = afe_store.AfeStore(hostname)
   1519     try:
   1520         primary_store.get(force_refresh=True)
   1521     except host_info.StoreError:
   1522         raise error.AutoservError(
   1523                 'Could not obtain HostInfo for hostname %s' % hostname)
   1524     # Since the store wasn't initialized external to autoserv, we must
   1525     # ensure that the store we create is unique within store_dir.
   1526     backing_file_path = os.path.join(
   1527             _make_unique_subdir(store_dir),
   1528             '%s.store' % hostname,
   1529     )
   1530     logging.info('Shadowing AFE store with a FileStore at %s',
   1531                  backing_file_path)
   1532     shadow_store = file_store.FileStore(backing_file_path)
   1533     return shadowing_store.ShadowingStore(primary_store, shadow_store)
   1534 
   1535 
   1536 def _make_unique_subdir(workdir):
   1537     """Creates a new subdir within workdir and returns the path to it."""
   1538     store_dir = os.path.join(workdir, 'dir_%s' % uuid.uuid4())
   1539     _make_dirs_if_needed(store_dir)
   1540     return store_dir
   1541 
   1542 
   1543 def _make_dirs_if_needed(path):
   1544     """os.makedirs, but ignores failure because the leaf directory exists"""
   1545     try:
   1546         os.makedirs(path)
   1547     except OSError as e:
   1548         if e.errno != errno.EEXIST:
   1549             raise
   1550