Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Tool to validate code in prod branch before pushing to lab.
      8 
      9 The script runs push_to_prod suite to verify code in prod branch is ready to be
     10 pushed. Link to design document:
     11 https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit
     12 
     13 To verify if prod branch can be pushed to lab, run following command in
     14 chromeos-staging-master2.hot server:
     15 /usr/local/autotest/site_utils/test_push.py -e someone (at] company.com
     16 
     17 The script uses latest gandof stable build as test build by default.
     18 
     19 """
     20 
     21 import argparse
     22 import ast
     23 import datetime
     24 import getpass
     25 import multiprocessing
     26 import os
     27 import re
     28 import subprocess
     29 import sys
     30 import time
     31 import traceback
     32 import urllib2
     33 
     34 import common
     35 try:
     36     from autotest_lib.frontend import setup_django_environment
     37     from autotest_lib.frontend.afe import models
     38     from autotest_lib.frontend.afe import rpc_utils
     39 except ImportError:
     40     # Unittest may not have Django database configured and will fail to import.
     41     pass
     42 from autotest_lib.client.common_lib import global_config
     43 from autotest_lib.client.common_lib import priorities
     44 from autotest_lib.client.common_lib.cros import retry
     45 from autotest_lib.frontend.afe import rpc_client_lib
     46 from autotest_lib.server import constants
     47 from autotest_lib.server import site_utils
     48 from autotest_lib.server import utils
     49 from autotest_lib.server.cros import provision
     50 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     51 from autotest_lib.site_utils import test_push_common
     52 
     53 AUTOTEST_DIR=common.autotest_dir
     54 CONFIG = global_config.global_config
     55 
     56 AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2)
     57 TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10)
     58 
     59 MAIL_FROM = 'chromeos-test (at] google.com'
     60 BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+'
     61 RUN_SUITE_COMMAND = 'run_suite.py'
     62 PUSH_TO_PROD_SUITE = 'push_to_prod'
     63 DUMMY_SUITE = 'dummy'
     64 DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30
     65 IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server')
     66 DEFAULT_NUM_DUTS = (
     67         ('gandof', 4),
     68         ('quawks', 2),
     69 )
     70 
     71 SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*'
     72                               'tab_id=view_job&object_id=(\d+)$')
     73 
     74 URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str)
     75 URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str)
     76 
     77 # Some test could be extra / missing or have mismatched results for various
     78 # reasons. Add such test in this list and explain the reason.
     79 _IGNORED_TESTS = [
     80     # test_push uses a stable image build to test, which is quite behind ToT.
     81     # The following expectations are correct at ToT, but need to be ignored
     82     # until stable image is recent enough.
     83 
     84     # TODO(pprabhu): Remove once R70 is stable.
     85     'dummy_Fail.RetrySuccess',
     86     'dummy_Fail.RetryFail',
     87 ]
     88 
     89 # Multiprocessing proxy objects that are used to share data between background
     90 # suite-running processes and main process. The multiprocessing-compatible
     91 # versions are initialized in _main.
     92 _run_suite_output = []
     93 _all_suite_ids = []
     94 
     95 DEFAULT_SERVICE_RESPAWN_LIMIT = 2
     96 
     97 
     98 class TestPushException(Exception):
     99     """Exception to be raised when the test to push to prod failed."""
    100     pass
    101 
    102 @retry.retry(TestPushException, timeout_min=5, delay_sec=30)
    103 def check_dut_inventory(required_num_duts, pool):
    104     """Check DUT inventory for each board in the pool specified..
    105 
    106     @param required_num_duts: a dict specifying the number of DUT each platform
    107                               requires in order to finish push tests.
    108     @param pool: the pool used by test_push.
    109     @raise TestPushException: if number of DUTs are less than the requirement.
    110     """
    111     print 'Checking DUT inventory...'
    112     pool_label = constants.Labels.POOL_PREFIX + pool
    113     hosts = AFE.run('get_hosts', status='Ready', locked=False)
    114     hosts = [h for h in hosts if pool_label in h.get('labels', [])]
    115     platforms = [host['platform'] for host in hosts]
    116     current_inventory = {p : platforms.count(p) for p in platforms}
    117     error_msg = ''
    118     for platform, req_num in required_num_duts.items():
    119         curr_num = current_inventory.get(platform, 0)
    120         if curr_num < req_num:
    121             error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready'
    122                           ' now' % (req_num, platform, pool, curr_num))
    123     if error_msg:
    124         raise TestPushException('Not enough DUTs to run push tests. %s' %
    125                                 error_msg)
    126 
    127 
    128 def powerwash_dut_to_test_repair(hostname, timeout):
    129     """Powerwash dut to test repair workflow.
    130 
    131     @param hostname: hostname of the dut.
    132     @param timeout: seconds of the powerwash test to hit timeout.
    133     @raise TestPushException: if DUT fail to run the test.
    134     """
    135     t = models.Test.objects.get(name='platform_Powerwash')
    136     c = utils.read_file(os.path.join(AUTOTEST_DIR, t.path))
    137     job_id = rpc_utils.create_job_common(
    138              'powerwash', priority=priorities.Priority.SUPER,
    139              control_type='Server', control_file=c, hosts=[hostname])
    140 
    141     end = time.time() + timeout
    142     while not TKO.get_job_test_statuses_from_db(job_id):
    143         if time.time() >= end:
    144             AFE.run('abort_host_queue_entries', job=job_id)
    145             raise TestPushException(
    146                 'Powerwash test on %s timeout after %ds, abort it.' %
    147                 (hostname, timeout))
    148         time.sleep(10)
    149     verify_test_results(job_id,
    150                         test_push_common.EXPECTED_TEST_RESULTS_POWERWASH)
    151     # Kick off verify, verify will fail and a repair should be triggered.
    152     AFE.reverify_hosts(hostnames=[hostname])
    153 
    154 
    155 def reverify_all_push_duts():
    156     """Reverify all the push DUTs."""
    157     print 'Reverifying all DUTs.'
    158     hosts = [h.hostname for h in AFE.get_hosts()]
    159     AFE.reverify_hosts(hostnames=hosts)
    160 
    161 
    162 def parse_arguments(argv):
    163     """Parse arguments for test_push tool.
    164 
    165     @param argv   Argument vector, as for `sys.argv`, including the
    166                   command name in `argv[0]`.
    167     @return: Parsed arguments.
    168 
    169     """
    170     parser = argparse.ArgumentParser(prog=argv[0])
    171     parser.add_argument('-b', '--board', dest='board', default='gandof',
    172                         help='Default is gandof.')
    173     parser.add_argument('-sb', '--shard_board', dest='shard_board',
    174                         default='quawks',
    175                         help='Default is quawks.')
    176     parser.add_argument('-i', '--build', dest='build', default=None,
    177                         help='Default is the latest stale build of given '
    178                              'board. Must be a stable build, otherwise AU test '
    179                              'will fail. (ex: gandolf-release/R54-8743.25.0)')
    180     parser.add_argument('-si', '--shard_build', dest='shard_build', default=None,
    181                         help='Default is the latest stable build of given '
    182                              'board. Must be a stable build, otherwise AU test '
    183                              'will fail.')
    184     parser.add_argument('-p', '--pool', dest='pool', default='bvt')
    185     parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int,
    186                         default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB,
    187                         help='Time in mins to wait before abort the jobs we '
    188                              'are waiting on. Only for the asynchronous suites '
    189                              'triggered by create_and_return flag.')
    190     parser.add_argument('-ud', '--num_duts', dest='num_duts',
    191                         default=dict(DEFAULT_NUM_DUTS),
    192                         type=ast.literal_eval,
    193                         help="Python dict literal that specifies the required"
    194                         " number of DUTs for each board. E.g {'gandof':4}")
    195     parser.add_argument('-c', '--continue_on_failure', action='store_true',
    196                         dest='continue_on_failure',
    197                         help='All tests continue to run when there is failure')
    198     parser.add_argument('-sl', '--service_respawn_limit', type=int,
    199                         default=DEFAULT_SERVICE_RESPAWN_LIMIT,
    200                         help='If a service crashes more than this, the test '
    201                              'push is considered failed.')
    202 
    203     arguments = parser.parse_args(argv[1:])
    204 
    205     # Get latest stable build as default build.
    206     version_map = AFE.get_stable_version_map(AFE.CROS_IMAGE_TYPE)
    207     if not arguments.build:
    208         arguments.build = version_map.get_image_name(arguments.board)
    209     if not arguments.shard_build:
    210         arguments.shard_build = version_map.get_image_name(
    211             arguments.shard_board)
    212     return arguments
    213 
    214 
    215 def do_run_suite(suite_name, arguments, use_shard=False,
    216                  create_and_return=False):
    217     """Call run_suite to run a suite job, and return the suite job id.
    218 
    219     The script waits the suite job to finish before returning the suite job id.
    220     Also it will echo the run_suite output to stdout.
    221 
    222     @param suite_name: Name of a suite, e.g., dummy.
    223     @param arguments: Arguments for run_suite command.
    224     @param use_shard: If true, suite is scheduled for shard board.
    225     @param create_and_return: If True, run_suite just creates the suite, print
    226                               the job id, then finish immediately.
    227 
    228     @return: Suite job ID.
    229 
    230     """
    231     if use_shard:
    232         board = arguments.shard_board
    233         build = arguments.shard_build
    234     else:
    235         board = arguments.board
    236         build = arguments.build
    237 
    238     # Remove cros-version label to force provision.
    239     hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board,
    240                           locked=False)
    241     for host in hosts:
    242         labels_to_remove = [
    243                 l for l in host.labels
    244                 if l.startswith(provision.CROS_VERSION_PREFIX)]
    245         if labels_to_remove:
    246             AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove)
    247 
    248         # Test repair work flow on shards, powerwash test will timeout after 7m.
    249         if use_shard and not create_and_return:
    250             powerwash_dut_to_test_repair(host.hostname, timeout=420)
    251 
    252     current_dir = os.path.dirname(os.path.realpath(__file__))
    253     cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND),
    254            '-s', suite_name,
    255            '-b', board,
    256            '-i', build,
    257            '-p', arguments.pool,
    258            '--minimum_duts', str(arguments.num_duts[board])]
    259     if create_and_return:
    260         cmd += ['-c']
    261 
    262     suite_job_id = None
    263 
    264     proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
    265                             stderr=subprocess.STDOUT)
    266 
    267     while True:
    268         line = proc.stdout.readline()
    269 
    270         # Break when run_suite process completed.
    271         if not line and proc.poll() != None:
    272             break
    273         print line.rstrip()
    274         _run_suite_output.append(line.rstrip())
    275 
    276         if not suite_job_id:
    277             m = re.match(SUITE_JOB_START_INFO_REGEX, line)
    278             if m and m.group(1):
    279                 suite_job_id = int(m.group(1))
    280                 _all_suite_ids.append(suite_job_id)
    281 
    282     if not suite_job_id:
    283         raise TestPushException('Failed to retrieve suite job ID.')
    284 
    285     # If create_and_return specified, wait for the suite to finish.
    286     if create_and_return:
    287         end = time.time() + arguments.timeout_min * 60
    288         while not AFE.get_jobs(id=suite_job_id, finished=True):
    289             if time.time() < end:
    290                 time.sleep(10)
    291             else:
    292                 AFE.run('abort_host_queue_entries', job=suite_job_id)
    293                 raise TestPushException(
    294                         'Asynchronous suite triggered by create_and_return '
    295                         'flag has timed out after %d mins. Aborting it.' %
    296                         arguments.timeout_min)
    297 
    298     print 'Suite job %s is completed.' % suite_job_id
    299     return suite_job_id
    300 
    301 
    302 def check_dut_image(build, suite_job_id):
    303     """Confirm all DUTs used for the suite are imaged to expected build.
    304 
    305     @param build: Expected build to be imaged.
    306     @param suite_job_id: job ID of the suite job.
    307     @raise TestPushException: If a DUT does not have expected build imaged.
    308     """
    309     print 'Checking image installed in DUTs...'
    310     job_ids = [job.id for job in
    311                models.Job.objects.filter(parent_job_id=suite_job_id)]
    312     hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0]
    313             for job_id in job_ids]
    314     hostnames = set([hqe.host.hostname for hqe in hqes])
    315     for hostname in hostnames:
    316         found_build = site_utils.get_build_from_afe(hostname, AFE)
    317         if found_build != build:
    318             raise TestPushException('DUT is not imaged properly. Host %s has '
    319                                     'build %s, while build %s is expected.' %
    320                                     (hostname, found_build, build))
    321 
    322 
    323 def test_suite(suite_name, expected_results, arguments, use_shard=False,
    324                create_and_return=False):
    325     """Call run_suite to start a suite job and verify results.
    326 
    327     @param suite_name: Name of a suite, e.g., dummy
    328     @param expected_results: A dictionary of test name to test result.
    329     @param arguments: Arguments for run_suite command.
    330     @param use_shard: If true, suite is scheduled for shard board.
    331     @param create_and_return: If True, run_suite just creates the suite, print
    332                               the job id, then finish immediately.
    333     """
    334     suite_job_id = do_run_suite(suite_name, arguments, use_shard,
    335                                 create_and_return)
    336 
    337     # Confirm all DUTs used for the suite are imaged to expected build.
    338     # hqe.host_id for jobs running in shard is not synced back to master db,
    339     # therefore, skip verifying dut build for jobs running in shard.
    340     build_expected = arguments.build
    341     if not use_shard:
    342         check_dut_image(build_expected, suite_job_id)
    343 
    344     # Verify test results are the expected results.
    345     verify_test_results(suite_job_id, expected_results)
    346 
    347 
    348 def verify_test_results(job_id, expected_results):
    349     """Verify the test results with the expected results.
    350 
    351     @param job_id: id of the running jobs. For suite job, it is suite_job_id.
    352     @param expected_results: A dictionary of test name to test result.
    353     @raise TestPushException: If verify fails.
    354     """
    355     print 'Comparing test results...'
    356     test_views = site_utils.get_test_views_from_tko(job_id, TKO)
    357     summary = test_push_common.summarize_push(test_views, expected_results,
    358                                               _IGNORED_TESTS)
    359 
    360     # Test link to log can be loaded.
    361     job_name = '%s-%s' % (job_id, getpass.getuser())
    362     log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name)
    363     try:
    364         urllib2.urlopen(log_link).read()
    365     except urllib2.URLError:
    366         summary.append('Failed to load page for link to log: %s.' % log_link)
    367 
    368     if summary:
    369         raise TestPushException('\n'.join(summary))
    370 
    371 def test_suite_wrapper(queue, suite_name, expected_results, arguments,
    372                        use_shard=False, create_and_return=False):
    373     """Wrapper to call test_suite. Handle exception and pipe it to parent
    374     process.
    375 
    376     @param queue: Queue to save exception to be accessed by parent process.
    377     @param suite_name: Name of a suite, e.g., dummy
    378     @param expected_results: A dictionary of test name to test result.
    379     @param arguments: Arguments for run_suite command.
    380     @param use_shard: If true, suite is scheduled for shard board.
    381     @param create_and_return: If True, run_suite just creates the suite, print
    382                               the job id, then finish immediately.
    383     """
    384     try:
    385         test_suite(suite_name, expected_results, arguments, use_shard,
    386                    create_and_return)
    387     except Exception:
    388         # Store the whole exc_info leads to a PicklingError.
    389         except_type, except_value, tb = sys.exc_info()
    390         queue.put((except_type, except_value, traceback.extract_tb(tb)))
    391 
    392 
    393 def check_queue(queue):
    394     """Check the queue for any exception being raised.
    395 
    396     @param queue: Queue used to store exception for parent process to access.
    397     @raise: Any exception found in the queue.
    398     """
    399     if queue.empty():
    400         return
    401     exc_info = queue.get()
    402     # Raise the exception with original backtrace.
    403     print 'Original stack trace of the exception:\n%s' % exc_info[2]
    404     raise exc_info[0](exc_info[1])
    405 
    406 
    407 def _run_test_suites(arguments):
    408     """Run the actual tests that comprise the test_push."""
    409     # Use daemon flag will kill child processes when parent process fails.
    410     use_daemon = not arguments.continue_on_failure
    411     queue = multiprocessing.Queue()
    412 
    413     push_to_prod_suite = multiprocessing.Process(
    414             target=test_suite_wrapper,
    415             args=(queue, PUSH_TO_PROD_SUITE,
    416                   test_push_common.EXPECTED_TEST_RESULTS, arguments))
    417     push_to_prod_suite.daemon = use_daemon
    418     push_to_prod_suite.start()
    419 
    420     # suite test with --create_and_return flag
    421     asynchronous_suite = multiprocessing.Process(
    422             target=test_suite_wrapper,
    423             args=(queue, DUMMY_SUITE,
    424                   test_push_common.EXPECTED_TEST_RESULTS_DUMMY,
    425                   arguments, True, True))
    426     asynchronous_suite.daemon = True
    427     asynchronous_suite.start()
    428 
    429     while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive():
    430         check_queue(queue)
    431         time.sleep(5)
    432     check_queue(queue)
    433     push_to_prod_suite.join()
    434     asynchronous_suite.join()
    435 
    436 
    437 def check_service_crash(respawn_limit, start_time):
    438   """Check whether scheduler or host_scheduler crash during testing.
    439 
    440   Since the testing push is kicked off at the beginning of a given hour, the way
    441   to check whether a service is crashed is to check whether the times of the
    442   service being respawn during testing push is over the respawn_limit.
    443 
    444   @param respawn_limit: The maximum number of times the service is allowed to
    445                         be respawn.
    446   @param start_time: The time that testing push is kicked off.
    447   """
    448   def _parse(filename_prefix, filename):
    449     """Helper method to parse the time of the log.
    450 
    451     @param filename_prefix: The prefix of the filename.
    452     @param filename: The name of the log file.
    453     """
    454     return datetime.datetime.strptime(filename[len(filename_prefix):],
    455                                       "%Y-%m-%d-%H.%M.%S")
    456 
    457   services = ['scheduler', 'host_scheduler']
    458   logs = os.listdir('%s/logs/' % AUTOTEST_DIR)
    459   curr_time = datetime.datetime.now()
    460 
    461   error_msg = ''
    462   for service in services:
    463     log_prefix = '%s.log.' % service
    464     respawn_count = sum(1 for l in logs if l.startswith(log_prefix)
    465                         and start_time <= _parse(log_prefix, l) <= curr_time)
    466 
    467     if respawn_count > respawn_limit:
    468       error_msg += ('%s has been respawned %s times during testing push at %s. '
    469                     'It is very likely crashed. Please check!\n' %
    470                     (service, respawn_count,
    471                      start_time.strftime("%Y-%m-%d-%H")))
    472   if error_msg:
    473     raise TestPushException(error_msg)
    474 
    475 
    476 _SUCCESS_MSG = """
    477 All staging tests completed successfully.
    478 
    479 Instructions for pushing to prod are available at
    480 https://goto.google.com/autotest-to-prod
    481 """
    482 
    483 
    484 def _main(arguments):
    485     """Run test and promote repo branches if tests succeed.
    486 
    487     @param arguments: command line arguments.
    488     """
    489 
    490     # TODO Use chromite.lib.parallel.Manager instead, to workaround the
    491     # too-long-tmp-path problem.
    492     mpmanager = multiprocessing.Manager()
    493     # These are globals used by other functions in this module to communicate
    494     # back from worker processes.
    495     global _run_suite_output
    496     _run_suite_output = mpmanager.list()
    497     global _all_suite_ids
    498     _all_suite_ids = mpmanager.list()
    499 
    500     try:
    501         start_time = datetime.datetime.now()
    502         reverify_all_push_duts()
    503         time.sleep(15) # Wait for the verify test to start.
    504         check_dut_inventory(arguments.num_duts, arguments.pool)
    505         _run_test_suites(arguments)
    506         check_service_crash(arguments.service_respawn_limit, start_time)
    507         print _SUCCESS_MSG
    508     except Exception:
    509         # Abort running jobs unless flagged to continue when there is a failure.
    510         if not arguments.continue_on_failure:
    511             for suite_id in _all_suite_ids:
    512                 if AFE.get_jobs(id=suite_id, finished=False):
    513                     AFE.run('abort_host_queue_entries', job=suite_id)
    514         raise
    515 
    516 
    517 def main():
    518     """Entry point."""
    519     arguments = parse_arguments(sys.argv)
    520     _main(arguments)
    521 
    522 
    523 if __name__ == '__main__':
    524     main()
    525