Home | History | Annotate | Download | only in server
      1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Utility classes used by site_server_job.distribute_across_machines().
      6 
      7 test_item: extends the basic test tuple to add include/exclude attributes and
      8     pre/post actions.
      9 
     10 machine_worker: is a thread that manages running tests on a host.  It
     11     verifies test are valid for a host using the test attributes from test_item
     12     and the host attributes from host_attributes.
     13 """
     14 
     15 
     16 import logging, os, Queue
     17 from autotest_lib.client.common_lib import error, utils
     18 from autotest_lib.server import autotest, hosts, host_attributes
     19 
     20 
     21 class test_item(object):
     22     """Adds machine verification logic to the basic test tuple.
     23 
     24     Tests can either be tuples of the existing form ('testName', {args}) or the
     25     extended form ('testname', {args}, {'include': [], 'exclude': [],
     26     'attributes': []}) where include and exclude are lists of host attribute
     27     labels and attributes is a list of strings. A machine must have all the
     28     labels in include and must not have any of the labels in exclude to be valid
     29     for the test. Attributes strings can include reboot_before, reboot_after,
     30     and server_job.
     31     """
     32 
     33     def __init__(self, test_name, test_args, test_attribs=None):
     34         """Creates an instance of test_item.
     35 
     36         Args:
     37             test_name: string, name of test to execute.
     38             test_args: dictionary, arguments to pass into test.
     39             test_attribs: Dictionary of test attributes. Valid keys are:
     40               include - labels a machine must have to run a test.
     41               exclude - labels preventing a machine from running a test.
     42               attributes - reboot before/after test, run test as server job.
     43         """
     44         self.test_name = test_name
     45         self.test_args = test_args
     46         self.tagged_test_name = test_name
     47         if test_args.get('tag'):
     48             self.tagged_test_name = test_name + '.' + test_args.get('tag')
     49 
     50         if test_attribs is None:
     51             test_attribs = {}
     52         self.inc_set = set(test_attribs.get('include', []))
     53         self.exc_set = set(test_attribs.get('exclude', []))
     54         self.attributes = test_attribs.get('attributes', [])
     55 
     56     def __str__(self):
     57         """Return an info string of this test."""
     58         params = ['%s=%s' % (k, v) for k, v in self.test_args.items()]
     59         msg = '%s(%s)' % (self.test_name, params)
     60         if self.inc_set:
     61             msg += ' include=%s' % [s for s in self.inc_set]
     62         if self.exc_set:
     63             msg += ' exclude=%s' % [s for s in self.exc_set]
     64         if self.attributes:
     65             msg += ' attributes=%s' % self.attributes
     66         return msg
     67 
     68     def validate(self, machine_attributes):
     69         """Check if this test can run on machine with machine_attributes.
     70 
     71         If the test has include attributes, a candidate machine must have all
     72         the attributes to be valid.
     73 
     74         If the test has exclude attributes, a candidate machine cannot have any
     75         of the attributes to be valid.
     76 
     77         Args:
     78             machine_attributes: set, True attributes of candidate machine.
     79 
     80         Returns:
     81             True/False if the machine is valid for this test.
     82         """
     83         if self.inc_set is not None:
     84             if not self.inc_set <= machine_attributes:
     85                 return False
     86         if self.exc_set is not None:
     87             if self.exc_set & machine_attributes:
     88                 return False
     89         return True
     90 
     91     def run_test(self, client_at, work_dir='.', server_job=None):
     92         """Runs the test on the client using autotest.
     93 
     94         Args:
     95             client_at: Autotest instance for this host.
     96             work_dir: Directory to use for results and log files.
     97             server_job: Server_Job instance to use to runs server tests.
     98         """
     99         if 'reboot_before' in self.attributes:
    100             client_at.host.reboot()
    101 
    102         try:
    103             if 'server_job' in self.attributes:
    104                 if 'host' in self.test_args:
    105                     self.test_args['host'] = client_at.host
    106                 if server_job is not None:
    107                     logging.info('Running Server_Job=%s', self.test_name)
    108                     server_job.run_test(self.test_name, **self.test_args)
    109                 else:
    110                     logging.error('No Server_Job instance provided for test '
    111                                   '%s.', self.test_name)
    112             else:
    113                 client_at.run_test(self.test_name, results_dir=work_dir,
    114                                    **self.test_args)
    115         finally:
    116             if 'reboot_after' in self.attributes:
    117                 client_at.host.reboot()
    118 
    119 
    120 class machine_worker(object):
    121     """Worker that runs tests on a remote host machine."""
    122 
    123     def __init__(self, server_job, machine, work_dir, test_queue, queue_lock,
    124                  continuous_parsing=False):
    125         """Creates an instance of machine_worker to run tests on a remote host.
    126 
    127         Retrieves that host attributes for this machine and creates the set of
    128         True attributes to validate against test include/exclude attributes.
    129 
    130         Creates a directory to hold the log files for tests run and writes the
    131         hostname and tko parser version into keyvals file.
    132 
    133         Args:
    134             server_job: run tests for this server_job.
    135             machine: name of remote host.
    136             work_dir: directory server job is using.
    137             test_queue: queue of tests.
    138             queue_lock: lock protecting test_queue.
    139             continuous_parsing: bool, enable continuous parsing.
    140         """
    141         self._server_job = server_job
    142         self._test_queue = test_queue
    143         self._test_queue_lock = queue_lock
    144         self._continuous_parsing = continuous_parsing
    145         self._tests_run = 0
    146         self._machine = machine
    147         self._host = hosts.create_host(self._machine)
    148         self._client_at = autotest.Autotest(self._host)
    149         client_attributes = host_attributes.host_attributes(machine)
    150         self.attribute_set = set(client_attributes.get_attributes())
    151         self._results_dir = work_dir
    152         if not os.path.exists(self._results_dir):
    153             os.makedirs(self._results_dir)
    154         machine_data = {'hostname': self._machine,
    155                         'status_version': str(1)}
    156         utils.write_keyval(self._results_dir, machine_data)
    157 
    158     def __str__(self):
    159         attributes = [a for a in self.attribute_set]
    160         return '%s attributes=%s' % (self._machine, attributes)
    161 
    162     def get_test(self):
    163         """Return a test from the queue to run on this host.
    164 
    165         The test queue can be non-empty, but still not contain a test that is
    166         valid for this machine. This function will take exclusive access to
    167         the queue via _test_queue_lock and repeatedly pop tests off the queue
    168         until finding a valid test or depleting the queue.  In either case if
    169         invalid tests have been popped from the queue, they are pushed back
    170         onto the queue before returning.
    171 
    172         Returns:
    173             test_item, or None if no more tests exist for this machine.
    174         """
    175         good_test = None
    176         skipped_tests = []
    177 
    178         with self._test_queue_lock:
    179             while True:
    180                 try:
    181                     canidate_test = self._test_queue.get_nowait()
    182                     # Check if test is valid for this machine.
    183                     if canidate_test.validate(self.attribute_set):
    184                         good_test = canidate_test
    185                         break
    186                     skipped_tests.append(canidate_test)
    187 
    188                 except Queue.Empty:
    189                     break
    190 
    191             # Return any skipped tests to the queue.
    192             for st in skipped_tests:
    193                 self._test_queue.put(st)
    194 
    195         return good_test
    196 
    197     def run(self):
    198         """Executes tests on the host machine.
    199 
    200         If continuous parsing was requested, start the parser before running
    201         tests.
    202         """
    203         # Modify job.resultdir so that it points to the results directory for
    204         # the machine we're working on. Required so that server jobs will write
    205         # to the proper location.
    206         self._server_job.machines = [self._machine]
    207         self._server_job.push_execution_context(self._machine['hostname'])
    208         os.chdir(self._server_job.resultdir)
    209         if self._continuous_parsing:
    210             self._server_job._parse_job += "/" + self._machine['hostname']
    211             self._server_job._using_parser = True
    212             self._server_job.init_parser()
    213 
    214         while True:
    215             active_test = self.get_test()
    216             if active_test is None:
    217                 break
    218 
    219             logging.info('%s running %s', self._machine, active_test)
    220             try:
    221                 active_test.run_test(self._client_at, self._results_dir,
    222                                      self._server_job)
    223             except error.AutoservError:
    224                 logging.exception('Autoserv error running "%s".', active_test)
    225             except error.AutotestError:
    226                 logging.exception('Autotest error running  "%s".', active_test)
    227             except Exception:
    228                 logging.exception('Exception running test "%s".', active_test)
    229                 raise
    230             finally:
    231                 self._test_queue.task_done()
    232                 self._tests_run += 1
    233 
    234         if self._continuous_parsing:
    235             self._server_job.cleanup_parser()
    236         logging.info('%s completed %d tests.', self._machine, self._tests_run)
    237