Home | History | Annotate | Download | only in server
      1 # pylint: disable-msg=C0111
      2 
      3 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 """
      7 The main job wrapper for the server side.
      8 
      9 This is the core infrastructure. Derived from the client side job.py
     10 
     11 Copyright Martin J. Bligh, Andy Whitcroft 2007
     12 """
     13 
     14 import errno
     15 import fcntl
     16 import getpass
     17 import itertools
     18 import logging
     19 import os
     20 import pickle
     21 import platform
     22 import re
     23 import select
     24 import shutil
     25 import sys
     26 import tempfile
     27 import time
     28 import traceback
     29 import uuid
     30 import warnings
     31 
     32 from autotest_lib.client.bin import sysinfo
     33 from autotest_lib.client.common_lib import base_job
     34 from autotest_lib.client.common_lib import control_data
     35 from autotest_lib.client.common_lib import error
     36 from autotest_lib.client.common_lib import global_config
     37 from autotest_lib.client.common_lib import logging_manager
     38 from autotest_lib.client.common_lib import packages
     39 from autotest_lib.client.common_lib import utils
     40 from autotest_lib.server import profilers
     41 from autotest_lib.server import site_gtest_runner
     42 from autotest_lib.server import subcommand
     43 from autotest_lib.server import test
     44 from autotest_lib.server import utils as server_utils
     45 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     46 from autotest_lib.server.hosts import abstract_ssh
     47 from autotest_lib.server.hosts import afe_store
     48 from autotest_lib.server.hosts import file_store
     49 from autotest_lib.server.hosts import shadowing_store
     50 from autotest_lib.server.hosts import factory as host_factory
     51 from autotest_lib.server.hosts import host_info
     52 from autotest_lib.server.hosts import ssh_multiplex
     53 from autotest_lib.tko import db as tko_db
     54 from autotest_lib.tko import models as tko_models
     55 from autotest_lib.tko import status_lib
     56 from autotest_lib.tko import parser_lib
     57 from autotest_lib.tko import utils as tko_utils
     58 
     59 try:
     60     from chromite.lib import metrics
     61 except ImportError:
     62     metrics = utils.metrics_mock
     63 
     64 
     65 INCREMENTAL_TKO_PARSING = global_config.global_config.get_config_value(
     66         'autoserv', 'incremental_tko_parsing', type=bool, default=False)
     67 
     68 def _control_segment_path(name):
     69     """Get the pathname of the named control segment file."""
     70     server_dir = os.path.dirname(os.path.abspath(__file__))
     71     return os.path.join(server_dir, "control_segments", name)
     72 
     73 
     74 CLIENT_CONTROL_FILENAME = 'control'
     75 SERVER_CONTROL_FILENAME = 'control.srv'
     76 MACHINES_FILENAME = '.machines'
     77 
     78 CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
     79 CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
     80 CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
     81 INSTALL_CONTROL_FILE = _control_segment_path('install')
     82 CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
     83 VERIFY_CONTROL_FILE = _control_segment_path('verify')
     84 REPAIR_CONTROL_FILE = _control_segment_path('repair')
     85 PROVISION_CONTROL_FILE = _control_segment_path('provision')
     86 VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
     87 RESET_CONTROL_FILE = _control_segment_path('reset')
     88 GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
     89 
     90 
     91 def get_machine_dicts(machine_names, workdir, in_lab, host_attributes=None,
     92                       connection_pool=None):
     93     """Converts a list of machine names to list of dicts.
     94 
     95     TODO(crbug.com/678430): This function temporarily has a side effect of
     96     creating files under workdir for backing a FileStore. This side-effect will
     97     go away once callers of autoserv start passing in the FileStore.
     98 
     99     @param machine_names: A list of machine names.
    100     @param workdir: A directory where any on-disk files related to the machines
    101             may be created.
    102     @param in_lab: A boolean indicating whether we're running in lab.
    103     @param host_attributes: Optional list of host attributes to add for each
    104             host.
    105     @param connection_pool: ssh_multiplex.ConnectionPool instance to share
    106             master connections across control scripts.
    107     @returns: A list of dicts. Each dict has the following keys:
    108             'hostname': Name of the machine originally in machine_names (str).
    109             'afe_host': A frontend.Host object for the machine, or a stub if
    110                     in_lab is false.
    111             'host_info_store': A host_info.CachingHostInfoStore object to obtain
    112                     host information. A stub if in_lab is False.
    113             'connection_pool': ssh_multiplex.ConnectionPool instance to share
    114                     master ssh connection across control scripts.
    115     """
    116     machine_dict_list = []
    117     for machine in machine_names:
    118         # See autoserv_parser.parse_args. Only one of in_lab or host_attributes
    119         # can be provided.
    120         if not in_lab:
    121             afe_host = server_utils.EmptyAFEHost()
    122             host_info_store = host_info.InMemoryHostInfoStore()
    123             if host_attributes is not None:
    124                 afe_host.attributes.update(host_attributes)
    125                 info = host_info.HostInfo(attributes=host_attributes)
    126                 host_info_store.commit(info)
    127         elif host_attributes:
    128             raise error.AutoservError(
    129                     'in_lab and host_attribute are mutually exclusive. '
    130                     'Obtained in_lab:%s, host_attributes:%s'
    131                     % (in_lab, host_attributes))
    132         else:
    133             afe_host = _create_afe_host(machine)
    134             host_info_store = _create_host_info_store(machine, workdir)
    135 
    136         machine_dict_list.append({
    137                 'hostname' : machine,
    138                 'afe_host' : afe_host,
    139                 'host_info_store': host_info_store,
    140                 'connection_pool': connection_pool,
    141         })
    142 
    143     return machine_dict_list
    144 
    145 
    146 class status_indenter(base_job.status_indenter):
    147     """Provide a simple integer-backed status indenter."""
    148     def __init__(self):
    149         self._indent = 0
    150 
    151 
    152     @property
    153     def indent(self):
    154         return self._indent
    155 
    156 
    157     def increment(self):
    158         self._indent += 1
    159 
    160 
    161     def decrement(self):
    162         self._indent -= 1
    163 
    164 
    165     def get_context(self):
    166         """Returns a context object for use by job.get_record_context."""
    167         class context(object):
    168             def __init__(self, indenter, indent):
    169                 self._indenter = indenter
    170                 self._indent = indent
    171             def restore(self):
    172                 self._indenter._indent = self._indent
    173         return context(self, self._indent)
    174 
    175 
    176 class server_job_record_hook(object):
    177     """The job.record hook for server job. Used to inject WARN messages from
    178     the console or vlm whenever new logs are written, and to echo any logs
    179     to INFO level logging. Implemented as a class so that it can use state to
    180     block recursive calls, so that the hook can call job.record itself to
    181     log WARN messages.
    182 
    183     Depends on job._read_warnings and job._logger.
    184     """
    185     def __init__(self, job):
    186         self._job = job
    187         self._being_called = False
    188 
    189 
    190     def __call__(self, entry):
    191         """A wrapper around the 'real' record hook, the _hook method, which
    192         prevents recursion. This isn't making any effort to be threadsafe,
    193         the intent is to outright block infinite recursion via a
    194         job.record->_hook->job.record->_hook->job.record... chain."""
    195         if self._being_called:
    196             return
    197         self._being_called = True
    198         try:
    199             self._hook(self._job, entry)
    200         finally:
    201             self._being_called = False
    202 
    203 
    204     @staticmethod
    205     def _hook(job, entry):
    206         """The core hook, which can safely call job.record."""
    207         entries = []
    208         # poll all our warning loggers for new warnings
    209         for timestamp, msg in job._read_warnings():
    210             warning_entry = base_job.status_log_entry(
    211                 'WARN', None, None, msg, {}, timestamp=timestamp)
    212             entries.append(warning_entry)
    213             job.record_entry(warning_entry)
    214         # echo rendered versions of all the status logs to info
    215         entries.append(entry)
    216         for entry in entries:
    217             rendered_entry = job._logger.render_entry(entry)
    218             logging.info(rendered_entry)
    219             job._parse_status(rendered_entry)
    220 
    221 
    222 class server_job(base_job.base_job):
    223     """The server-side concrete implementation of base_job.
    224 
    225     Optional properties provided by this implementation:
    226         serverdir
    227 
    228         num_tests_run
    229         num_tests_failed
    230 
    231         warning_manager
    232         warning_loggers
    233     """
    234 
    235     _STATUS_VERSION = 1
    236 
    237     # TODO crbug.com/285395 eliminate ssh_verbosity_flag
    238     def __init__(self, control, args, resultdir, label, user, machines,
    239                  client=False, parse_job='',
    240                  ssh_user=host_factory.DEFAULT_SSH_USER,
    241                  ssh_port=host_factory.DEFAULT_SSH_PORT,
    242                  ssh_pass=host_factory.DEFAULT_SSH_PASS,
    243                  ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
    244                  ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
    245                  test_retry=0, group_name='',
    246                  tag='', disable_sysinfo=False,
    247                  control_filename=SERVER_CONTROL_FILENAME,
    248                  parent_job_id=None, host_attributes=None, in_lab=False):
    249         """
    250         Create a server side job object.
    251 
    252         @param control: The pathname of the control file.
    253         @param args: Passed to the control file.
    254         @param resultdir: Where to throw the results.
    255         @param label: Description of the job.
    256         @param user: Username for the job (email address).
    257         @param client: True if this is a client-side control file.
    258         @param parse_job: string, if supplied it is the job execution tag that
    259                 the results will be passed through to the TKO parser with.
    260         @param ssh_user: The SSH username.  [root]
    261         @param ssh_port: The SSH port number.  [22]
    262         @param ssh_pass: The SSH passphrase, if needed.
    263         @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
    264                 '-vvv', or an empty string if not needed.
    265         @param ssh_options: A string giving additional options that will be
    266                             included in ssh commands.
    267         @param test_retry: The number of times to retry a test if the test did
    268                 not complete successfully.
    269         @param group_name: If supplied, this will be written out as
    270                 host_group_name in the keyvals file for the parser.
    271         @param tag: The job execution tag from the scheduler.  [optional]
    272         @param disable_sysinfo: Whether we should disable the sysinfo step of
    273                 tests for a modest shortening of test time.  [optional]
    274         @param control_filename: The filename where the server control file
    275                 should be written in the results directory.
    276         @param parent_job_id: Job ID of the parent job. Default to None if the
    277                 job does not have a parent job.
    278         @param host_attributes: Dict of host attributes passed into autoserv
    279                                 via the command line. If specified here, these
    280                                 attributes will apply to all machines.
    281         @param in_lab: Boolean that indicates if this is running in the lab
    282                        environment.
    283         """
    284         super(server_job, self).__init__(resultdir=resultdir,
    285                                          test_retry=test_retry)
    286         self.test_retry = test_retry
    287         self.control = control
    288         self._uncollected_log_file = os.path.join(self.resultdir,
    289                                                   'uncollected_logs')
    290         debugdir = os.path.join(self.resultdir, 'debug')
    291         if not os.path.exists(debugdir):
    292             os.mkdir(debugdir)
    293 
    294         if user:
    295             self.user = user
    296         else:
    297             self.user = getpass.getuser()
    298 
    299         self.args = args
    300         self.label = label
    301         self.machines = machines
    302         self._client = client
    303         self.warning_loggers = set()
    304         self.warning_manager = warning_manager()
    305         self._ssh_user = ssh_user
    306         self._ssh_port = ssh_port
    307         self._ssh_pass = ssh_pass
    308         self._ssh_verbosity_flag = ssh_verbosity_flag
    309         self._ssh_options = ssh_options
    310         self.tag = tag
    311         self.hosts = set()
    312         self.drop_caches = False
    313         self.drop_caches_between_iterations = False
    314         self._control_filename = control_filename
    315         self._disable_sysinfo = disable_sysinfo
    316 
    317         self.logging = logging_manager.get_logging_manager(
    318                 manage_stdout_and_stderr=True, redirect_fds=True)
    319         subcommand.logging_manager_object = self.logging
    320 
    321         self.sysinfo = sysinfo.sysinfo(self.resultdir)
    322         self.profilers = profilers.profilers(self)
    323 
    324         job_data = {'label' : label, 'user' : user,
    325                     'hostname' : ','.join(machines),
    326                     'drone' : platform.node(),
    327                     'status_version' : str(self._STATUS_VERSION),
    328                     'job_started' : str(int(time.time()))}
    329         # Save parent job id to keyvals, so parser can retrieve the info and
    330         # write to tko_jobs record.
    331         if parent_job_id:
    332             job_data['parent_job_id'] = parent_job_id
    333         if group_name:
    334             job_data['host_group_name'] = group_name
    335 
    336         # only write these keyvals out on the first job in a resultdir
    337         if 'job_started' not in utils.read_keyval(self.resultdir):
    338             job_data.update(self._get_job_data())
    339             utils.write_keyval(self.resultdir, job_data)
    340 
    341         self._parse_job = parse_job
    342         self._using_parser = (INCREMENTAL_TKO_PARSING and self._parse_job
    343                               and len(machines) <= 1)
    344         self.pkgmgr = packages.PackageManager(
    345             self.autodir, run_function_dargs={'timeout':600})
    346         self.num_tests_run = 0
    347         self.num_tests_failed = 0
    348 
    349         self._register_subcommand_hooks()
    350 
    351         # set up the status logger
    352         self._indenter = status_indenter()
    353         self._logger = base_job.status_logger(
    354             self, self._indenter, 'status.log', 'status.log',
    355             record_hook=server_job_record_hook(self))
    356 
    357         # Initialize a flag to indicate DUT failure during the test, e.g.,
    358         # unexpected reboot.
    359         self.failed_with_device_error = False
    360 
    361         self._connection_pool = ssh_multiplex.ConnectionPool()
    362 
    363         self.parent_job_id = parent_job_id
    364         self.in_lab = in_lab
    365         self.machine_dict_list = get_machine_dicts(
    366                 self.machines, self.resultdir, self.in_lab, host_attributes,
    367                 self._connection_pool)
    368 
    369         # TODO(jrbarnette) The harness attribute is only relevant to
    370         # client jobs, but it's required to be present, or we will fail
    371         # server job unit tests.  Yes, really.
    372         #
    373         # TODO(jrbarnette) The utility of the 'harness' attribute even
    374         # to client jobs is suspect.  Probably, we should remove it.
    375         self.harness = None
    376 
    377         if control:
    378             parsed_control = control_data.parse_control(
    379                     control, raise_warnings=False)
    380             self.fast = parsed_control.fast
    381             self.max_result_size_KB = parsed_control.max_result_size_KB
    382         else:
    383             self.fast = False
    384             # Set the maximum result size to be the default specified in
    385             # global config, if the job has no control file associated.
    386             self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
    387 
    388 
    389     @classmethod
    390     def _find_base_directories(cls):
    391         """
    392         Determine locations of autodir, clientdir and serverdir. Assumes
    393         that this file is located within serverdir and uses __file__ along
    394         with relative paths to resolve the location.
    395         """
    396         serverdir = os.path.abspath(os.path.dirname(__file__))
    397         autodir = os.path.normpath(os.path.join(serverdir, '..'))
    398         clientdir = os.path.join(autodir, 'client')
    399         return autodir, clientdir, serverdir
    400 
    401 
    402     def _find_resultdir(self, resultdir, *args, **dargs):
    403         """
    404         Determine the location of resultdir. For server jobs we expect one to
    405         always be explicitly passed in to __init__, so just return that.
    406         """
    407         if resultdir:
    408             return os.path.normpath(resultdir)
    409         else:
    410             return None
    411 
    412 
    413     def _get_status_logger(self):
    414         """Return a reference to the status logger."""
    415         return self._logger
    416 
    417 
    418     @staticmethod
    419     def _load_control_file(path):
    420         f = open(path)
    421         try:
    422             control_file = f.read()
    423         finally:
    424             f.close()
    425         return re.sub('\r', '', control_file)
    426 
    427 
    428     def _register_subcommand_hooks(self):
    429         """
    430         Register some hooks into the subcommand modules that allow us
    431         to properly clean up self.hosts created in forked subprocesses.
    432         """
    433         def on_fork(cmd):
    434             self._existing_hosts_on_fork = set(self.hosts)
    435         def on_join(cmd):
    436             new_hosts = self.hosts - self._existing_hosts_on_fork
    437             for host in new_hosts:
    438                 host.close()
    439         subcommand.subcommand.register_fork_hook(on_fork)
    440         subcommand.subcommand.register_join_hook(on_join)
    441 
    442 
    443     def init_parser(self):
    444         """
    445         Start the continuous parsing of self.resultdir. This sets up
    446         the database connection and inserts the basic job object into
    447         the database if necessary.
    448         """
    449         if self.fast and not self._using_parser:
    450             self.parser = parser_lib.parser(self._STATUS_VERSION)
    451             self.job_model = self.parser.make_job(self.resultdir)
    452             self.parser.start(self.job_model)
    453             return
    454 
    455         if not self._using_parser:
    456             return
    457 
    458         # redirect parser debugging to .parse.log
    459         parse_log = os.path.join(self.resultdir, '.parse.log')
    460         parse_log = open(parse_log, 'w', 0)
    461         tko_utils.redirect_parser_debugging(parse_log)
    462         # create a job model object and set up the db
    463         self.results_db = tko_db.db(autocommit=True)
    464         self.parser = parser_lib.parser(self._STATUS_VERSION)
    465         self.job_model = self.parser.make_job(self.resultdir)
    466         self.parser.start(self.job_model)
    467         # check if a job already exists in the db and insert it if
    468         # it does not
    469         job_idx = self.results_db.find_job(self._parse_job)
    470         if job_idx is None:
    471             self.results_db.insert_job(self._parse_job, self.job_model,
    472                                        self.parent_job_id)
    473         else:
    474             machine_idx = self.results_db.lookup_machine(self.job_model.machine)
    475             self.job_model.index = job_idx
    476             self.job_model.machine_idx = machine_idx
    477 
    478 
    479     def cleanup_parser(self):
    480         """
    481         This should be called after the server job is finished
    482         to carry out any remaining cleanup (e.g. flushing any
    483         remaining test results to the results db)
    484         """
    485         if self.fast and not self._using_parser:
    486             final_tests = self.parser.end()
    487             for test in final_tests:
    488                 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
    489                     self.num_tests_failed += 1
    490             return
    491 
    492         if not self._using_parser:
    493             return
    494 
    495         final_tests = self.parser.end()
    496         for test in final_tests:
    497             self.__insert_test(test)
    498         self._using_parser = False
    499 
    500     # TODO crbug.com/285395 add a kwargs parameter.
    501     def _make_namespace(self):
    502         """Create a namespace dictionary to be passed along to control file.
    503 
    504         Creates a namespace argument populated with standard values:
    505         machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
    506         and ssh_options.
    507         """
    508         namespace = {'machines' : self.machine_dict_list,
    509                      'job' : self,
    510                      'ssh_user' : self._ssh_user,
    511                      'ssh_port' : self._ssh_port,
    512                      'ssh_pass' : self._ssh_pass,
    513                      'ssh_verbosity_flag' : self._ssh_verbosity_flag,
    514                      'ssh_options' : self._ssh_options}
    515         return namespace
    516 
    517 
    518     def cleanup(self, labels):
    519         """Cleanup machines.
    520 
    521         @param labels: Comma separated job labels, will be used to
    522                        determine special task actions.
    523         """
    524         if not self.machines:
    525             raise error.AutoservError('No machines specified to cleanup')
    526         if self.resultdir:
    527             os.chdir(self.resultdir)
    528 
    529         namespace = self._make_namespace()
    530         namespace.update({'job_labels': labels, 'args': ''})
    531         self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
    532 
    533 
    534     def verify(self, labels):
    535         """Verify machines are all ssh-able.
    536 
    537         @param labels: Comma separated job labels, will be used to
    538                        determine special task actions.
    539         """
    540         if not self.machines:
    541             raise error.AutoservError('No machines specified to verify')
    542         if self.resultdir:
    543             os.chdir(self.resultdir)
    544 
    545         namespace = self._make_namespace()
    546         namespace.update({'job_labels': labels, 'args': ''})
    547         self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
    548 
    549 
    550     def reset(self, labels):
    551         """Reset machines by first cleanup then verify each machine.
    552 
    553         @param labels: Comma separated job labels, will be used to
    554                        determine special task actions.
    555         """
    556         if not self.machines:
    557             raise error.AutoservError('No machines specified to reset.')
    558         if self.resultdir:
    559             os.chdir(self.resultdir)
    560 
    561         namespace = self._make_namespace()
    562         namespace.update({'job_labels': labels, 'args': ''})
    563         self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
    564 
    565 
    566     def repair(self, labels):
    567         """Repair machines.
    568 
    569         @param labels: Comma separated job labels, will be used to
    570                        determine special task actions.
    571         """
    572         if not self.machines:
    573             raise error.AutoservError('No machines specified to repair')
    574         if self.resultdir:
    575             os.chdir(self.resultdir)
    576 
    577         namespace = self._make_namespace()
    578         namespace.update({'job_labels': labels, 'args': ''})
    579         self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
    580 
    581 
    582     def provision(self, labels):
    583         """
    584         Provision all hosts to match |labels|.
    585 
    586         @param labels: A comma seperated string of labels to provision the
    587                        host to.
    588 
    589         """
    590         control = self._load_control_file(PROVISION_CONTROL_FILE)
    591         self.run(control=control, job_labels=labels)
    592 
    593 
    594     def precheck(self):
    595         """
    596         perform any additional checks in derived classes.
    597         """
    598         pass
    599 
    600 
    601     def enable_external_logging(self):
    602         """
    603         Start or restart external logging mechanism.
    604         """
    605         pass
    606 
    607 
    608     def disable_external_logging(self):
    609         """
    610         Pause or stop external logging mechanism.
    611         """
    612         pass
    613 
    614 
    615     def use_external_logging(self):
    616         """
    617         Return True if external logging should be used.
    618         """
    619         return False
    620 
    621 
    622     def _make_parallel_wrapper(self, function, machines, log):
    623         """Wrap function as appropriate for calling by parallel_simple."""
    624         # machines could be a list of dictionaries, e.g.,
    625         # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
    626         # The dictionary is generated in server_job.__init__, refer to
    627         # variable machine_dict_list, then passed in with namespace, see method
    628         # server_job._make_namespace.
    629         # To compare the machinese to self.machines, which is a list of machine
    630         # hostname, we need to convert machines back to a list of hostnames.
    631         # Note that the order of hostnames doesn't matter, as is_forking will be
    632         # True if there are more than one machine.
    633         if (machines and isinstance(machines, list)
    634             and isinstance(machines[0], dict)):
    635             machines = [m['hostname'] for m in machines]
    636         is_forking = not (len(machines) == 1 and self.machines == machines)
    637         if self._parse_job and is_forking and log:
    638             def wrapper(machine):
    639                 hostname = server_utils.get_hostname_from_machine(machine)
    640                 self._parse_job += "/" + hostname
    641                 self._using_parser = INCREMENTAL_TKO_PARSING
    642                 self.machines = [machine]
    643                 self.push_execution_context(hostname)
    644                 os.chdir(self.resultdir)
    645                 utils.write_keyval(self.resultdir, {"hostname": hostname})
    646                 self.init_parser()
    647                 result = function(machine)
    648                 self.cleanup_parser()
    649                 return result
    650         elif len(machines) > 1 and log:
    651             def wrapper(machine):
    652                 hostname = server_utils.get_hostname_from_machine(machine)
    653                 self.push_execution_context(hostname)
    654                 os.chdir(self.resultdir)
    655                 machine_data = {'hostname' : hostname,
    656                                 'status_version' : str(self._STATUS_VERSION)}
    657                 utils.write_keyval(self.resultdir, machine_data)
    658                 result = function(machine)
    659                 return result
    660         else:
    661             wrapper = function
    662         return wrapper
    663 
    664 
    665     def parallel_simple(self, function, machines, log=True, timeout=None,
    666                         return_results=False):
    667         """
    668         Run 'function' using parallel_simple, with an extra wrapper to handle
    669         the necessary setup for continuous parsing, if possible. If continuous
    670         parsing is already properly initialized then this should just work.
    671 
    672         @param function: A callable to run in parallel given each machine.
    673         @param machines: A list of machine names to be passed one per subcommand
    674                 invocation of function.
    675         @param log: If True, output will be written to output in a subdirectory
    676                 named after each machine.
    677         @param timeout: Seconds after which the function call should timeout.
    678         @param return_results: If True instead of an AutoServError being raised
    679                 on any error a list of the results|exceptions from the function
    680                 called on each arg is returned.  [default: False]
    681 
    682         @raises error.AutotestError: If any of the functions failed.
    683         """
    684         wrapper = self._make_parallel_wrapper(function, machines, log)
    685         return subcommand.parallel_simple(
    686                 wrapper, machines,
    687                 subdir_name_constructor=server_utils.get_hostname_from_machine,
    688                 log=log, timeout=timeout, return_results=return_results)
    689 
    690 
    691     def parallel_on_machines(self, function, machines, timeout=None):
    692         """
    693         @param function: Called in parallel with one machine as its argument.
    694         @param machines: A list of machines to call function(machine) on.
    695         @param timeout: Seconds after which the function call should timeout.
    696 
    697         @returns A list of machines on which function(machine) returned
    698                 without raising an exception.
    699         """
    700         results = self.parallel_simple(function, machines, timeout=timeout,
    701                                        return_results=True)
    702         success_machines = []
    703         for result, machine in itertools.izip(results, machines):
    704             if not isinstance(result, Exception):
    705                 success_machines.append(machine)
    706         return success_machines
    707 
    708 
    709     def record_skipped_test(self, skipped_test, message=None):
    710         """Insert a failure record into status.log for this test."""
    711         msg = message
    712         if msg is None:
    713             msg = 'No valid machines found for test %s.' % skipped_test
    714         logging.info(msg)
    715         self.record('START', None, skipped_test.test_name)
    716         self.record('INFO', None, skipped_test.test_name, msg)
    717         self.record('END TEST_NA', None, skipped_test.test_name, msg)
    718 
    719 
    720     def _has_failed_tests(self):
    721         """Parse status log for failed tests.
    722 
    723         This checks the current working directory and is intended only for use
    724         by the run() method.
    725 
    726         @return boolean
    727         """
    728         path = os.getcwd()
    729 
    730         # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
    731         # make code reuse plausible.
    732         job_keyval = tko_models.job.read_keyval(path)
    733         status_version = job_keyval.get("status_version", 0)
    734 
    735         # parse out the job
    736         parser = parser_lib.parser(status_version)
    737         job = parser.make_job(path)
    738         status_log = os.path.join(path, "status.log")
    739         if not os.path.exists(status_log):
    740             status_log = os.path.join(path, "status")
    741         if not os.path.exists(status_log):
    742             logging.warning("! Unable to parse job, no status file")
    743             return True
    744 
    745         # parse the status logs
    746         status_lines = open(status_log).readlines()
    747         parser.start(job)
    748         tests = parser.end(status_lines)
    749 
    750         # parser.end can return the same object multiple times, so filter out
    751         # dups
    752         job.tests = []
    753         already_added = set()
    754         for test in tests:
    755             if test not in already_added:
    756                 already_added.add(test)
    757                 job.tests.append(test)
    758 
    759         failed = False
    760         for test in job.tests:
    761             # The current job is still running and shouldn't count as failed.
    762             # The parser will fail to parse the exit status of the job since it
    763             # hasn't exited yet (this running right now is the job).
    764             failed = failed or (test.status != 'GOOD'
    765                                 and not _is_current_server_job(test))
    766         return failed
    767 
    768 
    769     def _collect_crashes(self, namespace, collect_crashinfo):
    770         """Collect crashes.
    771 
    772         @param namespace: namespace dict.
    773         @param collect_crashinfo: whether to collect crashinfo in addition to
    774                 dumps
    775         """
    776         if collect_crashinfo:
    777             # includes crashdumps
    778             crash_control_file = CRASHINFO_CONTROL_FILE
    779         else:
    780             crash_control_file = CRASHDUMPS_CONTROL_FILE
    781         self._execute_code(crash_control_file, namespace)
    782 
    783 
    784     _USE_TEMP_DIR = object()
    785     def run(self, install_before=False, install_after=False,
    786             collect_crashdumps=True, namespace={}, control=None,
    787             control_file_dir=None, verify_job_repo_url=False,
    788             only_collect_crashinfo=False, skip_crash_collection=False,
    789             job_labels='', use_packaging=True):
    790         # for a normal job, make sure the uncollected logs file exists
    791         # for a crashinfo-only run it should already exist, bail out otherwise
    792         created_uncollected_logs = False
    793         logging.info("I am PID %s", os.getpid())
    794         if self.resultdir and not os.path.exists(self._uncollected_log_file):
    795             if only_collect_crashinfo:
    796                 # if this is a crashinfo-only run, and there were no existing
    797                 # uncollected logs, just bail out early
    798                 logging.info("No existing uncollected logs, "
    799                              "skipping crashinfo collection")
    800                 return
    801             else:
    802                 log_file = open(self._uncollected_log_file, "w")
    803                 pickle.dump([], log_file)
    804                 log_file.close()
    805                 created_uncollected_logs = True
    806 
    807         # use a copy so changes don't affect the original dictionary
    808         namespace = namespace.copy()
    809         machines = self.machines
    810         if control is None:
    811             if self.control is None:
    812                 control = ''
    813             else:
    814                 control = self._load_control_file(self.control)
    815         if control_file_dir is None:
    816             control_file_dir = self.resultdir
    817 
    818         self.aborted = False
    819         namespace.update(self._make_namespace())
    820         namespace.update({
    821                 'args': self.args,
    822                 'job_labels': job_labels,
    823                 'gtest_runner': site_gtest_runner.gtest_runner(),
    824         })
    825         test_start_time = int(time.time())
    826 
    827         if self.resultdir:
    828             os.chdir(self.resultdir)
    829             # touch status.log so that the parser knows a job is running here
    830             open(self.get_status_log_path(), 'a').close()
    831             self.enable_external_logging()
    832 
    833         collect_crashinfo = True
    834         temp_control_file_dir = None
    835         try:
    836             try:
    837                 if not self.fast:
    838                     with metrics.SecondsTimer(
    839                             'chromeos/autotest/job/get_network_stats',
    840                             fields = {'stage': 'start'}):
    841                         namespace['network_stats_label'] = 'at-start'
    842                         self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
    843                                            namespace)
    844 
    845                 if install_before and machines:
    846                     self._execute_code(INSTALL_CONTROL_FILE, namespace)
    847 
    848                 if only_collect_crashinfo:
    849                     return
    850 
    851                 # If the verify_job_repo_url option is set but we're unable
    852                 # to actually verify that the job_repo_url contains the autotest
    853                 # package, this job will fail.
    854                 if verify_job_repo_url:
    855                     self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
    856                                        namespace)
    857                 else:
    858                     logging.warning('Not checking if job_repo_url contains '
    859                                     'autotest packages on %s', machines)
    860 
    861                 # determine the dir to write the control files to
    862                 cfd_specified = (control_file_dir
    863                                  and control_file_dir is not self._USE_TEMP_DIR)
    864                 if cfd_specified:
    865                     temp_control_file_dir = None
    866                 else:
    867                     temp_control_file_dir = tempfile.mkdtemp(
    868                         suffix='temp_control_file_dir')
    869                     control_file_dir = temp_control_file_dir
    870                 server_control_file = os.path.join(control_file_dir,
    871                                                    self._control_filename)
    872                 client_control_file = os.path.join(control_file_dir,
    873                                                    CLIENT_CONTROL_FILENAME)
    874                 if self._client:
    875                     namespace['control'] = control
    876                     utils.open_write_close(client_control_file, control)
    877                     shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
    878                                     server_control_file)
    879                 else:
    880                     utils.open_write_close(server_control_file, control)
    881 
    882                 logging.info("Processing control file")
    883                 namespace['use_packaging'] = use_packaging
    884                 self._execute_code(server_control_file, namespace)
    885                 logging.info("Finished processing control file")
    886 
    887                 # If no device error occured, no need to collect crashinfo.
    888                 collect_crashinfo = self.failed_with_device_error
    889             except Exception as e:
    890                 try:
    891                     # Add num_tests_failed if any extra exceptions are raised
    892                     # outside _execute_code().
    893                     self.num_tests_failed += 1
    894                     logging.exception(
    895                             'Exception escaped control file, job aborting:')
    896                     reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
    897                                     ' ', str(e))
    898                     self.record('INFO', None, None, str(e),
    899                                 {'job_abort_reason': reason})
    900                 except:
    901                     pass # don't let logging exceptions here interfere
    902                 raise
    903         finally:
    904             if temp_control_file_dir:
    905                 # Clean up temp directory used for copies of the control files
    906                 try:
    907                     shutil.rmtree(temp_control_file_dir)
    908                 except Exception as e:
    909                     logging.warning('Could not remove temp directory %s: %s',
    910                                  temp_control_file_dir, e)
    911 
    912             if machines and (collect_crashdumps or collect_crashinfo):
    913                 if skip_crash_collection or self.fast:
    914                     logging.info('Skipping crash dump/info collection '
    915                                  'as requested.')
    916                 else:
    917                     with metrics.SecondsTimer(
    918                             'chromeos/autotest/job/collect_crashinfo'):
    919                         namespace['test_start_time'] = test_start_time
    920                         # Remove crash files for passing tests.
    921                         # TODO(ayatane): Tests that create crash files should be
    922                         # reported.
    923                         namespace['has_failed_tests'] = self._has_failed_tests()
    924                         self._collect_crashes(namespace, collect_crashinfo)
    925             self.disable_external_logging()
    926             if self._uncollected_log_file and created_uncollected_logs:
    927                 os.remove(self._uncollected_log_file)
    928             if install_after and machines:
    929                 self._execute_code(INSTALL_CONTROL_FILE, namespace)
    930 
    931             if not self.fast:
    932                 with metrics.SecondsTimer(
    933                         'chromeos/autotest/job/get_network_stats',
    934                         fields = {'stage': 'end'}):
    935                     namespace['network_stats_label'] = 'at-end'
    936                     self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
    937                                        namespace)
    938 
    939 
    940     def run_test(self, url, *args, **dargs):
    941         """
    942         Summon a test object and run it.
    943 
    944         tag
    945                 tag to add to testname
    946         url
    947                 url of the test to run
    948         """
    949         if self._disable_sysinfo:
    950             dargs['disable_sysinfo'] = True
    951 
    952         group, testname = self.pkgmgr.get_package_name(url, 'test')
    953         testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
    954         outputdir = self._make_test_outputdir(subdir)
    955 
    956         def group_func():
    957             try:
    958                 test.runtest(self, url, tag, args, dargs)
    959             except error.TestBaseException as e:
    960                 self.record(e.exit_status, subdir, testname, str(e))
    961                 raise
    962             except Exception as e:
    963                 info = str(e) + "\n" + traceback.format_exc()
    964                 self.record('FAIL', subdir, testname, info)
    965                 raise
    966             else:
    967                 self.record('GOOD', subdir, testname, 'completed successfully')
    968 
    969         try:
    970             result = self._run_group(testname, subdir, group_func)
    971         except error.TestBaseException as e:
    972             return False
    973         else:
    974             return True
    975 
    976 
    977     def _run_group(self, name, subdir, function, *args, **dargs):
    978         """Underlying method for running something inside of a group."""
    979         result, exc_info = None, None
    980         try:
    981             self.record('START', subdir, name)
    982             result = function(*args, **dargs)
    983         except error.TestBaseException as e:
    984             self.record("END %s" % e.exit_status, subdir, name)
    985             raise
    986         except Exception as e:
    987             err_msg = str(e) + '\n'
    988             err_msg += traceback.format_exc()
    989             self.record('END ABORT', subdir, name, err_msg)
    990             raise error.JobError(name + ' failed\n' + traceback.format_exc())
    991         else:
    992             self.record('END GOOD', subdir, name)
    993 
    994         return result
    995 
    996 
    997     def run_group(self, function, *args, **dargs):
    998         """\
    999         @param function: subroutine to run
   1000         @returns: (result, exc_info). When the call succeeds, result contains
   1001                 the return value of |function| and exc_info is None. If
   1002                 |function| raises an exception, exc_info contains the tuple
   1003                 returned by sys.exc_info(), and result is None.
   1004         """
   1005 
   1006         name = function.__name__
   1007         # Allow the tag for the group to be specified.
   1008         tag = dargs.pop('tag', None)
   1009         if tag:
   1010             name = tag
   1011 
   1012         try:
   1013             result = self._run_group(name, None, function, *args, **dargs)[0]
   1014         except error.TestBaseException:
   1015             return None, sys.exc_info()
   1016         return result, None
   1017 
   1018 
   1019     def run_op(self, op, op_func, get_kernel_func):
   1020         """\
   1021         A specialization of run_group meant specifically for handling
   1022         management operation. Includes support for capturing the kernel version
   1023         after the operation.
   1024 
   1025         Args:
   1026            op: name of the operation.
   1027            op_func: a function that carries out the operation (reboot, suspend)
   1028            get_kernel_func: a function that returns a string
   1029                             representing the kernel version.
   1030         """
   1031         try:
   1032             self.record('START', None, op)
   1033             op_func()
   1034         except Exception as e:
   1035             err_msg = str(e) + '\n' + traceback.format_exc()
   1036             self.record('END FAIL', None, op, err_msg)
   1037             raise
   1038         else:
   1039             kernel = get_kernel_func()
   1040             self.record('END GOOD', None, op,
   1041                         optional_fields={"kernel": kernel})
   1042 
   1043 
   1044     def run_control(self, path):
   1045         """Execute a control file found at path (relative to the autotest
   1046         path). Intended for executing a control file within a control file,
   1047         not for running the top-level job control file."""
   1048         path = os.path.join(self.autodir, path)
   1049         control_file = self._load_control_file(path)
   1050         self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
   1051 
   1052 
   1053     def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
   1054         self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
   1055                                    on_every_test)
   1056 
   1057 
   1058     def add_sysinfo_logfile(self, file, on_every_test=False):
   1059         self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
   1060 
   1061 
   1062     def _add_sysinfo_loggable(self, loggable, on_every_test):
   1063         if on_every_test:
   1064             self.sysinfo.test_loggables.add(loggable)
   1065         else:
   1066             self.sysinfo.boot_loggables.add(loggable)
   1067 
   1068 
   1069     def _read_warnings(self):
   1070         """Poll all the warning loggers and extract any new warnings that have
   1071         been logged. If the warnings belong to a category that is currently
   1072         disabled, this method will discard them and they will no longer be
   1073         retrievable.
   1074 
   1075         Returns a list of (timestamp, message) tuples, where timestamp is an
   1076         integer epoch timestamp."""
   1077         warnings = []
   1078         while True:
   1079             # pull in a line of output from every logger that has
   1080             # output ready to be read
   1081             loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
   1082             closed_loggers = set()
   1083             for logger in loggers:
   1084                 line = logger.readline()
   1085                 # record any broken pipes (aka line == empty)
   1086                 if len(line) == 0:
   1087                     closed_loggers.add(logger)
   1088                     continue
   1089                 # parse out the warning
   1090                 timestamp, msgtype, msg = line.split('\t', 2)
   1091                 timestamp = int(timestamp)
   1092                 # if the warning is valid, add it to the results
   1093                 if self.warning_manager.is_valid(timestamp, msgtype):
   1094                     warnings.append((timestamp, msg.strip()))
   1095 
   1096             # stop listening to loggers that are closed
   1097             self.warning_loggers -= closed_loggers
   1098 
   1099             # stop if none of the loggers have any output left
   1100             if not loggers:
   1101                 break
   1102 
   1103         # sort into timestamp order
   1104         warnings.sort()
   1105         return warnings
   1106 
   1107 
   1108     def _unique_subdirectory(self, base_subdirectory_name):
   1109         """Compute a unique results subdirectory based on the given name.
   1110 
   1111         Appends base_subdirectory_name with a number as necessary to find a
   1112         directory name that doesn't already exist.
   1113         """
   1114         subdirectory = base_subdirectory_name
   1115         counter = 1
   1116         while os.path.exists(os.path.join(self.resultdir, subdirectory)):
   1117             subdirectory = base_subdirectory_name + '.' + str(counter)
   1118             counter += 1
   1119         return subdirectory
   1120 
   1121 
   1122     def get_record_context(self):
   1123         """Returns an object representing the current job.record context.
   1124 
   1125         The object returned is an opaque object with a 0-arg restore method
   1126         which can be called to restore the job.record context (i.e. indentation)
   1127         to the current level. The intention is that it should be used when
   1128         something external which generate job.record calls (e.g. an autotest
   1129         client) can fail catastrophically and the server job record state
   1130         needs to be reset to its original "known good" state.
   1131 
   1132         @return: A context object with a 0-arg restore() method."""
   1133         return self._indenter.get_context()
   1134 
   1135 
   1136     def record_summary(self, status_code, test_name, reason='', attributes=None,
   1137                        distinguishing_attributes=(), child_test_ids=None):
   1138         """Record a summary test result.
   1139 
   1140         @param status_code: status code string, see
   1141                 common_lib.log.is_valid_status()
   1142         @param test_name: name of the test
   1143         @param reason: (optional) string providing detailed reason for test
   1144                 outcome
   1145         @param attributes: (optional) dict of string keyvals to associate with
   1146                 this result
   1147         @param distinguishing_attributes: (optional) list of attribute names
   1148                 that should be used to distinguish identically-named test
   1149                 results.  These attributes should be present in the attributes
   1150                 parameter.  This is used to generate user-friendly subdirectory
   1151                 names.
   1152         @param child_test_ids: (optional) list of test indices for test results
   1153                 used in generating this result.
   1154         """
   1155         subdirectory_name_parts = [test_name]
   1156         for attribute in distinguishing_attributes:
   1157             assert attributes
   1158             assert attribute in attributes, '%s not in %s' % (attribute,
   1159                                                               attributes)
   1160             subdirectory_name_parts.append(attributes[attribute])
   1161         base_subdirectory_name = '.'.join(subdirectory_name_parts)
   1162 
   1163         subdirectory = self._unique_subdirectory(base_subdirectory_name)
   1164         subdirectory_path = os.path.join(self.resultdir, subdirectory)
   1165         os.mkdir(subdirectory_path)
   1166 
   1167         self.record(status_code, subdirectory, test_name,
   1168                     status=reason, optional_fields={'is_summary': True})
   1169 
   1170         if attributes:
   1171             utils.write_keyval(subdirectory_path, attributes)
   1172 
   1173         if child_test_ids:
   1174             ids_string = ','.join(str(test_id) for test_id in child_test_ids)
   1175             summary_data = {'child_test_ids': ids_string}
   1176             utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
   1177                                summary_data)
   1178 
   1179 
   1180     def disable_warnings(self, warning_type):
   1181         self.warning_manager.disable_warnings(warning_type)
   1182         self.record("INFO", None, None,
   1183                     "disabling %s warnings" % warning_type,
   1184                     {"warnings.disable": warning_type})
   1185 
   1186 
   1187     def enable_warnings(self, warning_type):
   1188         self.warning_manager.enable_warnings(warning_type)
   1189         self.record("INFO", None, None,
   1190                     "enabling %s warnings" % warning_type,
   1191                     {"warnings.enable": warning_type})
   1192 
   1193 
   1194     def get_status_log_path(self, subdir=None):
   1195         """Return the path to the job status log.
   1196 
   1197         @param subdir - Optional paramter indicating that you want the path
   1198             to a subdirectory status log.
   1199 
   1200         @returns The path where the status log should be.
   1201         """
   1202         if self.resultdir:
   1203             if subdir:
   1204                 return os.path.join(self.resultdir, subdir, "status.log")
   1205             else:
   1206                 return os.path.join(self.resultdir, "status.log")
   1207         else:
   1208             return None
   1209 
   1210 
   1211     def _update_uncollected_logs_list(self, update_func):
   1212         """Updates the uncollected logs list in a multi-process safe manner.
   1213 
   1214         @param update_func - a function that updates the list of uncollected
   1215             logs. Should take one parameter, the list to be updated.
   1216         """
   1217         # Skip log collection if file _uncollected_log_file does not exist.
   1218         if not (self._uncollected_log_file and
   1219                 os.path.exists(self._uncollected_log_file)):
   1220             return
   1221         if self._uncollected_log_file:
   1222             log_file = open(self._uncollected_log_file, "r+")
   1223             fcntl.flock(log_file, fcntl.LOCK_EX)
   1224         try:
   1225             uncollected_logs = pickle.load(log_file)
   1226             update_func(uncollected_logs)
   1227             log_file.seek(0)
   1228             log_file.truncate()
   1229             pickle.dump(uncollected_logs, log_file)
   1230             log_file.flush()
   1231         finally:
   1232             fcntl.flock(log_file, fcntl.LOCK_UN)
   1233             log_file.close()
   1234 
   1235 
   1236     def add_client_log(self, hostname, remote_path, local_path):
   1237         """Adds a new set of client logs to the list of uncollected logs,
   1238         to allow for future log recovery.
   1239 
   1240         @param host - the hostname of the machine holding the logs
   1241         @param remote_path - the directory on the remote machine holding logs
   1242         @param local_path - the local directory to copy the logs into
   1243         """
   1244         def update_func(logs_list):
   1245             logs_list.append((hostname, remote_path, local_path))
   1246         self._update_uncollected_logs_list(update_func)
   1247 
   1248 
   1249     def remove_client_log(self, hostname, remote_path, local_path):
   1250         """Removes a set of client logs from the list of uncollected logs,
   1251         to allow for future log recovery.
   1252 
   1253         @param host - the hostname of the machine holding the logs
   1254         @param remote_path - the directory on the remote machine holding logs
   1255         @param local_path - the local directory to copy the logs into
   1256         """
   1257         def update_func(logs_list):
   1258             logs_list.remove((hostname, remote_path, local_path))
   1259         self._update_uncollected_logs_list(update_func)
   1260 
   1261 
   1262     def get_client_logs(self):
   1263         """Retrieves the list of uncollected logs, if it exists.
   1264 
   1265         @returns A list of (host, remote_path, local_path) tuples. Returns
   1266                  an empty list if no uncollected logs file exists.
   1267         """
   1268         log_exists = (self._uncollected_log_file and
   1269                       os.path.exists(self._uncollected_log_file))
   1270         if log_exists:
   1271             return pickle.load(open(self._uncollected_log_file))
   1272         else:
   1273             return []
   1274 
   1275 
   1276     def _fill_server_control_namespace(self, namespace, protect=True):
   1277         """
   1278         Prepare a namespace to be used when executing server control files.
   1279 
   1280         This sets up the control file API by importing modules and making them
   1281         available under the appropriate names within namespace.
   1282 
   1283         For use by _execute_code().
   1284 
   1285         Args:
   1286           namespace: The namespace dictionary to fill in.
   1287           protect: Boolean.  If True (the default) any operation that would
   1288               clobber an existing entry in namespace will cause an error.
   1289         Raises:
   1290           error.AutoservError: When a name would be clobbered by import.
   1291         """
   1292         def _import_names(module_name, names=()):
   1293             """
   1294             Import a module and assign named attributes into namespace.
   1295 
   1296             Args:
   1297                 module_name: The string module name.
   1298                 names: A limiting list of names to import from module_name.  If
   1299                     empty (the default), all names are imported from the module
   1300                     similar to a "from foo.bar import *" statement.
   1301             Raises:
   1302                 error.AutoservError: When a name being imported would clobber
   1303                     a name already in namespace.
   1304             """
   1305             module = __import__(module_name, {}, {}, names)
   1306 
   1307             # No names supplied?  Import * from the lowest level module.
   1308             # (Ugh, why do I have to implement this part myself?)
   1309             if not names:
   1310                 for submodule_name in module_name.split('.')[1:]:
   1311                     module = getattr(module, submodule_name)
   1312                 if hasattr(module, '__all__'):
   1313                     names = getattr(module, '__all__')
   1314                 else:
   1315                     names = dir(module)
   1316 
   1317             # Install each name into namespace, checking to make sure it
   1318             # doesn't override anything that already exists.
   1319             for name in names:
   1320                 # Check for conflicts to help prevent future problems.
   1321                 if name in namespace and protect:
   1322                     if namespace[name] is not getattr(module, name):
   1323                         raise error.AutoservError('importing name '
   1324                                 '%s from %s %r would override %r' %
   1325                                 (name, module_name, getattr(module, name),
   1326                                  namespace[name]))
   1327                     else:
   1328                         # Encourage cleanliness and the use of __all__ for a
   1329                         # more concrete API with less surprises on '*' imports.
   1330                         warnings.warn('%s (%r) being imported from %s for use '
   1331                                       'in server control files is not the '
   1332                                       'first occurrence of that import.' %
   1333                                       (name, namespace[name], module_name))
   1334 
   1335                 namespace[name] = getattr(module, name)
   1336 
   1337 
   1338         # This is the equivalent of prepending a bunch of import statements to
   1339         # the front of the control script.
   1340         namespace.update(os=os, sys=sys, logging=logging)
   1341         _import_names('autotest_lib.server',
   1342                 ('hosts', 'autotest', 'standalone_profiler'))
   1343         _import_names('autotest_lib.server.subcommand',
   1344                       ('parallel', 'parallel_simple', 'subcommand'))
   1345         _import_names('autotest_lib.server.utils',
   1346                       ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
   1347         _import_names('autotest_lib.client.common_lib.error')
   1348         _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
   1349 
   1350         # Inject ourself as the job object into other classes within the API.
   1351         # (Yuck, this injection is a gross thing be part of a public API. -gps)
   1352         #
   1353         # XXX Autotest does not appear to use .job.  Who does?
   1354         namespace['autotest'].Autotest.job = self
   1355         # server.hosts.base_classes.Host uses .job.
   1356         namespace['hosts'].Host.job = self
   1357         namespace['hosts'].TestBed.job = self
   1358         namespace['hosts'].factory.ssh_user = self._ssh_user
   1359         namespace['hosts'].factory.ssh_port = self._ssh_port
   1360         namespace['hosts'].factory.ssh_pass = self._ssh_pass
   1361         namespace['hosts'].factory.ssh_verbosity_flag = (
   1362                 self._ssh_verbosity_flag)
   1363         namespace['hosts'].factory.ssh_options = self._ssh_options
   1364 
   1365 
   1366     def _execute_code(self, code_file, namespace, protect=True):
   1367         """
   1368         Execute code using a copy of namespace as a server control script.
   1369 
   1370         Unless protect_namespace is explicitly set to False, the dict will not
   1371         be modified.
   1372 
   1373         Args:
   1374           code_file: The filename of the control file to execute.
   1375           namespace: A dict containing names to make available during execution.
   1376           protect: Boolean.  If True (the default) a copy of the namespace dict
   1377               is used during execution to prevent the code from modifying its
   1378               contents outside of this function.  If False the raw dict is
   1379               passed in and modifications will be allowed.
   1380         """
   1381         if protect:
   1382             namespace = namespace.copy()
   1383         self._fill_server_control_namespace(namespace, protect=protect)
   1384         # TODO: Simplify and get rid of the special cases for only 1 machine.
   1385         if len(self.machines) > 1:
   1386             machines_text = '\n'.join(self.machines) + '\n'
   1387             # Only rewrite the file if it does not match our machine list.
   1388             try:
   1389                 machines_f = open(MACHINES_FILENAME, 'r')
   1390                 existing_machines_text = machines_f.read()
   1391                 machines_f.close()
   1392             except EnvironmentError:
   1393                 existing_machines_text = None
   1394             if machines_text != existing_machines_text:
   1395                 utils.open_write_close(MACHINES_FILENAME, machines_text)
   1396         execfile(code_file, namespace, namespace)
   1397 
   1398 
   1399     def _parse_status(self, new_line):
   1400         if self.fast and not self._using_parser:
   1401             logging.info('Parsing lines in fast mode')
   1402             new_tests = self.parser.process_lines([new_line])
   1403             for test in new_tests:
   1404                 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
   1405                     self.num_tests_failed += 1
   1406 
   1407         if not self._using_parser:
   1408             return
   1409 
   1410         new_tests = self.parser.process_lines([new_line])
   1411         for test in new_tests:
   1412             self.__insert_test(test)
   1413 
   1414 
   1415     def __insert_test(self, test):
   1416         """
   1417         An internal method to insert a new test result into the
   1418         database. This method will not raise an exception, even if an
   1419         error occurs during the insert, to avoid failing a test
   1420         simply because of unexpected database issues."""
   1421         self.num_tests_run += 1
   1422         if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
   1423             self.num_tests_failed += 1
   1424         try:
   1425             self.results_db.insert_test(self.job_model, test)
   1426         except Exception:
   1427             msg = ("WARNING: An unexpected error occured while "
   1428                    "inserting test results into the database. "
   1429                    "Ignoring error.\n" + traceback.format_exc())
   1430             print >> sys.stderr, msg
   1431 
   1432 
   1433     def preprocess_client_state(self):
   1434         """
   1435         Produce a state file for initializing the state of a client job.
   1436 
   1437         Creates a new client state file with all the current server state, as
   1438         well as some pre-set client state.
   1439 
   1440         @returns The path of the file the state was written into.
   1441         """
   1442         # initialize the sysinfo state
   1443         self._state.set('client', 'sysinfo', self.sysinfo.serialize())
   1444 
   1445         # dump the state out to a tempfile
   1446         fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
   1447         os.close(fd)
   1448 
   1449         # write_to_file doesn't need locking, we exclusively own file_path
   1450         self._state.write_to_file(file_path)
   1451         return file_path
   1452 
   1453 
   1454     def postprocess_client_state(self, state_path):
   1455         """
   1456         Update the state of this job with the state from a client job.
   1457 
   1458         Updates the state of the server side of a job with the final state
   1459         of a client job that was run. Updates the non-client-specific state,
   1460         pulls in some specific bits from the client-specific state, and then
   1461         discards the rest. Removes the state file afterwards
   1462 
   1463         @param state_file A path to the state file from the client.
   1464         """
   1465         # update the on-disk state
   1466         try:
   1467             self._state.read_from_file(state_path)
   1468             os.remove(state_path)
   1469         except OSError, e:
   1470             # ignore file-not-found errors
   1471             if e.errno != errno.ENOENT:
   1472                 raise
   1473             else:
   1474                 logging.debug('Client state file %s not found', state_path)
   1475 
   1476         # update the sysinfo state
   1477         if self._state.has('client', 'sysinfo'):
   1478             self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
   1479 
   1480         # drop all the client-specific state
   1481         self._state.discard_namespace('client')
   1482 
   1483 
   1484     def clear_all_known_hosts(self):
   1485         """Clears known hosts files for all AbstractSSHHosts."""
   1486         for host in self.hosts:
   1487             if isinstance(host, abstract_ssh.AbstractSSHHost):
   1488                 host.clear_known_hosts()
   1489 
   1490 
   1491     def close(self):
   1492         """Closes this job's operation."""
   1493 
   1494         # Use shallow copy, because host.close() internally discards itself.
   1495         for host in list(self.hosts):
   1496             host.close()
   1497         assert not self.hosts
   1498         self._connection_pool.shutdown()
   1499 
   1500 
   1501     def _get_job_data(self):
   1502         """Add custom data to the job keyval info.
   1503 
   1504         When multiple machines are used in a job, change the hostname to
   1505         the platform of the first machine instead of machine1,machine2,...  This
   1506         makes the job reports easier to read and keeps the tko_machines table from
   1507         growing too large.
   1508 
   1509         Returns:
   1510             keyval dictionary with new hostname value, or empty dictionary.
   1511         """
   1512         job_data = {}
   1513         # Only modify hostname on multimachine jobs. Assume all host have the same
   1514         # platform.
   1515         if len(self.machines) > 1:
   1516             # Search through machines for first machine with a platform.
   1517             for host in self.machines:
   1518                 keyval_path = os.path.join(self.resultdir, 'host_keyvals', host)
   1519                 keyvals = utils.read_keyval(keyval_path)
   1520                 host_plat = keyvals.get('platform', None)
   1521                 if not host_plat:
   1522                     continue
   1523                 job_data['hostname'] = host_plat
   1524                 break
   1525         return job_data
   1526 
   1527 
   1528 class warning_manager(object):
   1529     """Class for controlling warning logs. Manages the enabling and disabling
   1530     of warnings."""
   1531     def __init__(self):
   1532         # a map of warning types to a list of disabled time intervals
   1533         self.disabled_warnings = {}
   1534 
   1535 
   1536     def is_valid(self, timestamp, warning_type):
   1537         """Indicates if a warning (based on the time it occured and its type)
   1538         is a valid warning. A warning is considered "invalid" if this type of
   1539         warning was marked as "disabled" at the time the warning occured."""
   1540         disabled_intervals = self.disabled_warnings.get(warning_type, [])
   1541         for start, end in disabled_intervals:
   1542             if timestamp >= start and (end is None or timestamp < end):
   1543                 return False
   1544         return True
   1545 
   1546 
   1547     def disable_warnings(self, warning_type, current_time_func=time.time):
   1548         """As of now, disables all further warnings of this type."""
   1549         intervals = self.disabled_warnings.setdefault(warning_type, [])
   1550         if not intervals or intervals[-1][1] is not None:
   1551             intervals.append((int(current_time_func()), None))
   1552 
   1553 
   1554     def enable_warnings(self, warning_type, current_time_func=time.time):
   1555         """As of now, enables all further warnings of this type."""
   1556         intervals = self.disabled_warnings.get(warning_type, [])
   1557         if intervals and intervals[-1][1] is None:
   1558             intervals[-1] = (intervals[-1][0], int(current_time_func()))
   1559 
   1560 
   1561 def _is_current_server_job(test):
   1562     """Return True if parsed test is the currently running job.
   1563 
   1564     @param test: test instance from tko parser.
   1565     """
   1566     return test.testname == 'SERVER_JOB'
   1567 
   1568 
   1569 def _create_afe_host(hostname):
   1570     """Create an afe_host object backed by the AFE.
   1571 
   1572     @param hostname: Name of the host for which we want the Host object.
   1573     @returns: An object of type frontend.AFE
   1574     """
   1575     afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
   1576     hosts = afe.get_hosts(hostname=hostname)
   1577     if not hosts:
   1578         raise error.AutoservError('No hosts named %s found' % hostname)
   1579 
   1580     return hosts[0]
   1581 
   1582 
   1583 def _create_host_info_store(hostname, workdir):
   1584     """Create a CachingHostInfo store backed by the AFE.
   1585 
   1586     @param hostname: Name of the host for which we want the store.
   1587     @param workdir: A directory where any on-disk files related to the machines
   1588             may be created.
   1589     @returns: An object of type shadowing_store.ShadowingStore
   1590     """
   1591     primary_store = afe_store.AfeStore(hostname)
   1592     try:
   1593         primary_store.get(force_refresh=True)
   1594     except host_info.StoreError:
   1595         raise error.AutoservError('Could not obtain HostInfo for hostname %s' %
   1596                                   hostname)
   1597     backing_file_path = _file_store_unique_file_path(workdir)
   1598     logging.info('Shadowing AFE store with a FileStore at %s',
   1599                  backing_file_path)
   1600     shadow_store = file_store.FileStore(backing_file_path)
   1601     return shadowing_store.ShadowingStore(primary_store, shadow_store)
   1602 
   1603 
   1604 def _file_store_unique_file_path(workdir):
   1605     """Returns a unique filepath for the on-disk FileStore.
   1606 
   1607     Also makes sure that the workdir exists.
   1608 
   1609     @param: Top level working directory.
   1610     """
   1611     store_dir = os.path.join(workdir, 'host_info_store')
   1612     _make_dirs_if_needed(store_dir)
   1613     file_path = os.path.join(store_dir, 'store_%s' % uuid.uuid4())
   1614     return file_path
   1615 
   1616 
   1617 def _make_dirs_if_needed(path):
   1618     """os.makedirs, but ignores failure because the leaf directory exists"""
   1619     try:
   1620         os.makedirs(path)
   1621     except OSError as e:
   1622         if e.errno != errno.EEXIST:
   1623             raise
   1624