Home | History | Annotate | Download | only in server
      1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Site extensions to server_job.  Adds distribute_across_machines()."""
      6 
      7 import os, logging, multiprocessing
      8 from autotest_lib.server import site_gtest_runner, site_server_job_utils
      9 from autotest_lib.server import subcommand
     10 from autotest_lib.server.server_job import base_server_job
     11 import utils
     12 
     13 
     14 def get_site_job_data(job):
     15     """Add custom data to the job keyval info.
     16 
     17     When multiple machines are used in a job, change the hostname to
     18     the platform of the first machine instead of machine1,machine2,...  This
     19     makes the job reports easier to read and keeps the tko_machines table from
     20     growing too large.
     21 
     22     Args:
     23         job: instance of server_job.
     24 
     25     Returns:
     26         keyval dictionary with new hostname value, or empty dictionary.
     27     """
     28     site_job_data = {}
     29     # Only modify hostname on multimachine jobs. Assume all host have the same
     30     # platform.
     31     if len(job.machines) > 1:
     32         # Search through machines for first machine with a platform.
     33         for host in job.machines:
     34             keyval_path = os.path.join(job.resultdir, 'host_keyvals', host)
     35             keyvals = utils.read_keyval(keyval_path)
     36             host_plat = keyvals.get('platform', None)
     37             if not host_plat:
     38                 continue
     39             site_job_data['hostname'] = host_plat
     40             break
     41     return site_job_data
     42 
     43 
     44 class site_server_job(base_server_job):
     45     """Extend server_job adding distribute_across_machines."""
     46 
     47     def __init__(self, *args, **dargs):
     48         super(site_server_job, self).__init__(*args, **dargs)
     49 
     50 
     51     def run(self, *args, **dargs):
     52         """Extend server_job.run adding gtest_runner to the namespace."""
     53 
     54         gtest_run = {'gtest_runner': site_gtest_runner.gtest_runner()}
     55 
     56         # Namespace is the 5th parameter to run().  If args has 5 or more
     57         # entries in it then we need to fix-up this namespace entry.
     58         if len(args) >= 5:
     59             args[4].update(gtest_run)
     60         # Else, if present, namespace must be in dargs.
     61         else:
     62             dargs.setdefault('namespace', gtest_run).update(gtest_run)
     63         # Now call the original run() with the modified namespace containing a
     64         # gtest_runner
     65         super(site_server_job, self).run(*args, **dargs)
     66 
     67 
     68     def distribute_across_machines(self, tests, machines,
     69                                    continuous_parsing=False):
     70         """Run each test in tests once using machines.
     71 
     72         Instead of running each test on each machine like parallel_on_machines,
     73         run each test once across all machines. Put another way, the total
     74         number of tests run by parallel_on_machines is len(tests) *
     75         len(machines). The number of tests run by distribute_across_machines is
     76         len(tests).
     77 
     78         Args:
     79             tests: List of tests to run.
     80             machines: List of machines to use.
     81             continuous_parsing: Bool, if true parse job while running.
     82         """
     83         # The Queue is thread safe, but since a machine may have to search
     84         # through the queue to find a valid test the lock provides exclusive
     85         # queue access for more than just the get call.
     86         test_queue = multiprocessing.JoinableQueue()
     87         test_queue_lock = multiprocessing.Lock()
     88 
     89         unique_machine_attributes = []
     90         sub_commands = []
     91         work_dir = self.resultdir
     92 
     93         for machine in machines:
     94             if 'group' in self.resultdir:
     95                 work_dir = os.path.join(self.resultdir, machine)
     96 
     97             mw = site_server_job_utils.machine_worker(self,
     98                                                       machine,
     99                                                       work_dir,
    100                                                       test_queue,
    101                                                       test_queue_lock,
    102                                                       continuous_parsing)
    103 
    104             # Create the subcommand instance to run this machine worker.
    105             sub_commands.append(subcommand.subcommand(mw.run,
    106                                                       [],
    107                                                       work_dir))
    108 
    109             # To (potentially) speed up searching for valid tests create a list
    110             # of unique attribute sets present in the machines for this job. If
    111             # sets were hashable we could just use a dictionary for fast
    112             # verification. This at least reduces the search space from the
    113             # number of machines to the number of unique machines.
    114             if not mw.attribute_set in unique_machine_attributes:
    115                 unique_machine_attributes.append(mw.attribute_set)
    116 
    117         # Only queue tests which are valid on at least one machine.  Record
    118         # skipped tests in the status.log file using record_skipped_test().
    119         for test_entry in tests:
    120             # Check if it's an old style test entry.
    121             if len(test_entry) > 2 and not isinstance(test_entry[2], dict):
    122                 test_attribs = {'include': test_entry[2]}
    123                 if len(test_entry) > 3:
    124                     test_attribs['exclude'] = test_entry[3]
    125                 if len(test_entry) > 4:
    126                     test_attribs['attributes'] = test_entry[4]
    127 
    128                 test_entry = list(test_entry[:2])
    129                 test_entry.append(test_attribs)
    130 
    131             ti = site_server_job_utils.test_item(*test_entry)
    132             machine_found = False
    133             for ma in unique_machine_attributes:
    134                 if ti.validate(ma):
    135                     test_queue.put(ti)
    136                     machine_found = True
    137                     break
    138             if not machine_found:
    139                 self.record_skipped_test(ti)
    140 
    141         # Run valid tests and wait for completion.
    142         subcommand.parallel(sub_commands)
    143 
    144 
    145     def record_skipped_test(self, skipped_test, message=None):
    146         """Insert a failure record into status.log for this test."""
    147         msg = message
    148         if msg is None:
    149             msg = 'No valid machines found for test %s.' % skipped_test
    150         logging.info(msg)
    151         self.record('START', None, skipped_test.test_name)
    152         self.record('INFO', None, skipped_test.test_name, msg)
    153         self.record('END TEST_NA', None, skipped_test.test_name, msg)
    154