Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Tool to validate code in prod branch before pushing to lab.
      8 
      9 The script runs push_to_prod suite to verify code in prod branch is ready to be
     10 pushed. Link to design document:
     11 https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit
     12 
     13 To verify if prod branch can be pushed to lab, run following command in
     14 chromeos-staging-master2.hot server:
     15 /usr/local/autotest/site_utils/test_push.py -e someone (at] company.com
     16 
     17 The script uses latest gandof stable build as test build by default.
     18 
     19 """
     20 
     21 import argparse
     22 import ast
     23 from contextlib import contextmanager
     24 import datetime
     25 import getpass
     26 import multiprocessing
     27 import os
     28 import re
     29 import subprocess
     30 import sys
     31 import time
     32 import traceback
     33 import urllib2
     34 
     35 import common
     36 try:
     37     from autotest_lib.frontend import setup_django_environment
     38     from autotest_lib.frontend.afe import models
     39     from autotest_lib.frontend.afe import rpc_utils
     40 except ImportError:
     41     # Unittest may not have Django database configured and will fail to import.
     42     pass
     43 from autotest_lib.client.common_lib import global_config
     44 from autotest_lib.client.common_lib import priorities
     45 from autotest_lib.client.common_lib.cros import retry
     46 from autotest_lib.frontend.afe import rpc_client_lib
     47 from autotest_lib.server import constants
     48 from autotest_lib.server import site_utils
     49 from autotest_lib.server import utils
     50 from autotest_lib.server.cros import provision
     51 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     52 
     53 try:
     54     from chromite.lib import metrics
     55     from chromite.lib import ts_mon_config
     56 except ImportError:
     57     metrics = site_utils.metrics_mock
     58     ts_mon_config = site_utils.metrics_mock
     59 
     60 AUTOTEST_DIR=common.autotest_dir
     61 CONFIG = global_config.global_config
     62 
     63 AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2)
     64 TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10)
     65 
     66 MAIL_FROM = 'chromeos-test (at] google.com'
     67 BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+'
     68 RUN_SUITE_COMMAND = 'run_suite.py'
     69 PUSH_TO_PROD_SUITE = 'push_to_prod'
     70 DUMMY_SUITE = 'dummy'
     71 TESTBED_SUITE = 'testbed_push'
     72 # TODO(shuqianz): Dynamically get android build after crbug.com/646068 fixed
     73 DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30
     74 IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server')
     75 # TODO(crbug.com/767302): Bump up tesbed requirement back to 1 when we
     76 # re-enable testbed tests.
     77 DEFAULT_NUM_DUTS = (
     78         ('gandof', 4),
     79         ('quawks', 2),
     80         ('testbed', 0),
     81 )
     82 
     83 SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*'
     84                               'tab_id=view_job&object_id=(\d+)$')
     85 
     86 # Dictionary of test results keyed by test name regular expression.
     87 EXPECTED_TEST_RESULTS = {'^SERVER_JOB$':                 'GOOD',
     88                          # This is related to dummy_Fail/control.dependency.
     89                          'dummy_Fail.dependency$':       'TEST_NA',
     90                          'login_LoginSuccess.*':         'GOOD',
     91                          'provision_AutoUpdate.double':  'GOOD',
     92                          'dummy_Pass.*':                 'GOOD',
     93                          'dummy_Fail.Fail$':             'FAIL',
     94                          'dummy_Fail.RetryFail$':        'FAIL',
     95                          'dummy_Fail.RetrySuccess':      'GOOD',
     96                          'dummy_Fail.Error$':            'ERROR',
     97                          'dummy_Fail.Warn$':             'WARN',
     98                          'dummy_Fail.NAError$':          'TEST_NA',
     99                          'dummy_Fail.Crash$':            'GOOD',
    100                          'autotest_SyncCount$':          'GOOD',
    101                          }
    102 
    103 EXPECTED_TEST_RESULTS_DUMMY = {'^SERVER_JOB$':       'GOOD',
    104                                'dummy_Pass.*':       'GOOD',
    105                                'dummy_Fail.Fail':    'FAIL',
    106                                'dummy_Fail.Warn':    'WARN',
    107                                'dummy_Fail.Crash':   'GOOD',
    108                                'dummy_Fail.Error':   'ERROR',
    109                                'dummy_Fail.NAError': 'TEST_NA',}
    110 
    111 EXPECTED_TEST_RESULTS_TESTBED = {'^SERVER_JOB$':      'GOOD',
    112                                  'testbed_DummyTest': 'GOOD',}
    113 
    114 EXPECTED_TEST_RESULTS_POWERWASH = {'platform_Powerwash': 'GOOD',
    115                                    'SERVER_JOB':         'GOOD'}
    116 
    117 URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str)
    118 URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str)
    119 
    120 # Some test could be missing from the test results for various reasons. Add
    121 # such test in this list and explain the reason.
    122 IGNORE_MISSING_TESTS = [
    123     # For latest build, npo_test_delta does not exist.
    124     'autoupdate_EndToEndTest.npo_test_delta.*',
    125     # For trybot build, nmo_test_delta does not exist.
    126     'autoupdate_EndToEndTest.nmo_test_delta.*',
    127     # Older build does not have login_LoginSuccess test in push_to_prod suite.
    128     # TODO(dshi): Remove following lines after R41 is stable.
    129     'login_LoginSuccess']
    130 
    131 # Multiprocessing proxy objects that are used to share data between background
    132 # suite-running processes and main process. The multiprocessing-compatible
    133 # versions are initialized in _main.
    134 _run_suite_output = []
    135 _all_suite_ids = []
    136 
    137 # A dict maps the name of the updated repos and the path of them.
    138 UPDATED_REPOS = {'autotest': AUTOTEST_DIR,
    139                  'chromite': '%s/site-packages/chromite/' % AUTOTEST_DIR}
    140 PUSH_USER = 'chromeos-test-lab'
    141 
    142 DEFAULT_SERVICE_RESPAWN_LIMIT = 2
    143 
    144 
    145 class TestPushException(Exception):
    146     """Exception to be raised when the test to push to prod failed."""
    147     pass
    148 
    149 @retry.retry(TestPushException, timeout_min=5, delay_sec=30)
    150 def check_dut_inventory(required_num_duts, pool):
    151     """Check DUT inventory for each board in the pool specified..
    152 
    153     @param required_num_duts: a dict specifying the number of DUT each platform
    154                               requires in order to finish push tests.
    155     @param pool: the pool used by test_push.
    156     @raise TestPushException: if number of DUTs are less than the requirement.
    157     """
    158     print 'Checking DUT inventory...'
    159     pool_label = constants.Labels.POOL_PREFIX + pool
    160     hosts = AFE.run('get_hosts', status='Ready', locked=False)
    161     hosts = [h for h in hosts if pool_label in h.get('labels', [])]
    162     platforms = [host['platform'] for host in hosts]
    163     current_inventory = {p : platforms.count(p) for p in platforms}
    164     error_msg = ''
    165     for platform, req_num in required_num_duts.items():
    166         curr_num = current_inventory.get(platform, 0)
    167         if curr_num < req_num:
    168             error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready'
    169                           ' now' % (req_num, platform, pool, curr_num))
    170     if error_msg:
    171         raise TestPushException('Not enough DUTs to run push tests. %s' %
    172                                 error_msg)
    173 
    174 
    175 def powerwash_dut_to_test_repair(hostname, timeout):
    176     """Powerwash dut to test repair workflow.
    177 
    178     @param hostname: hostname of the dut.
    179     @param timeout: seconds of the powerwash test to hit timeout.
    180     @raise TestPushException: if DUT fail to run the test.
    181     """
    182     t = models.Test.objects.get(name='platform_Powerwash')
    183     c = utils.read_file(os.path.join(common.autotest_dir, t.path))
    184     job_id = rpc_utils.create_job_common(
    185              'powerwash', priority=priorities.Priority.SUPER,
    186              control_type='Server', control_file=c, hosts=[hostname])
    187 
    188     end = time.time() + timeout
    189     while not TKO.get_job_test_statuses_from_db(job_id):
    190         if time.time() >= end:
    191             AFE.run('abort_host_queue_entries', job=job_id)
    192             raise TestPushException(
    193                 'Powerwash test on %s timeout after %ds, abort it.' %
    194                 (hostname, timeout))
    195         time.sleep(10)
    196     verify_test_results(job_id, EXPECTED_TEST_RESULTS_POWERWASH)
    197     # Kick off verify, verify will fail and a repair should be triggered.
    198     AFE.reverify_hosts(hostnames=[hostname])
    199 
    200 
    201 def reverify_all_push_duts():
    202     """Reverify all the push DUTs."""
    203     print 'Reverifying all DUTs.'
    204     hosts = [h.hostname for h in AFE.get_hosts()]
    205     AFE.reverify_hosts(hostnames=hosts)
    206 
    207 
    208 def get_default_build(board='gandof', server='chromeos-staging-master2.hot'):
    209     """Get the default build to be used for test.
    210 
    211     @param board: Name of board to be tested, default is gandof.
    212     @return: Build to be tested, e.g., gandof-release/R36-5881.0.0
    213     """
    214     build = None
    215     cmd = ('%s/cli/atest stable_version list --board=%s -w %s' %
    216            (AUTOTEST_DIR, board, server))
    217     result = subprocess.check_output(cmd, shell=True).strip()
    218     build = re.search(BUILD_REGEX, result)
    219     if build:
    220         return '%s-release/%s' % (board, build.group(0))
    221 
    222     # If fail to get stable version from cautotest, use that defined in config
    223     build = CONFIG.get_config_value('CROS', 'stable_cros_version')
    224     return '%s-release/%s' % (board, build)
    225 
    226 def parse_arguments():
    227     """Parse arguments for test_push tool.
    228 
    229     @return: Parsed arguments.
    230 
    231     """
    232     parser = argparse.ArgumentParser()
    233     parser.add_argument('-b', '--board', dest='board', default='gandof',
    234                         help='Default is gandof.')
    235     parser.add_argument('-sb', '--shard_board', dest='shard_board',
    236                         default='quawks',
    237                         help='Default is quawks.')
    238     parser.add_argument('-i', '--build', dest='build', default=None,
    239                         help='Default is the latest stale build of given '
    240                              'board. Must be a stable build, otherwise AU test '
    241                              'will fail. (ex: gandolf-release/R54-8743.25.0)')
    242     parser.add_argument('-si', '--shard_build', dest='shard_build', default=None,
    243                         help='Default is the latest stable build of given '
    244                              'board. Must be a stable build, otherwise AU test '
    245                              'will fail.')
    246     parser.add_argument('-w', '--web', default='chromeos-staging-master2.hot',
    247                         help='Specify web server to grab stable version from.')
    248     parser.add_argument('-ab', '--android_board', dest='android_board',
    249                         default='shamu-2', help='Android board to test.')
    250     parser.add_argument('-ai', '--android_build', dest='android_build',
    251                         help='Android build to test.')
    252     parser.add_argument('-p', '--pool', dest='pool', default='bvt')
    253     parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int,
    254                         default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB,
    255                         help='Time in mins to wait before abort the jobs we '
    256                              'are waiting on. Only for the asynchronous suites '
    257                              'triggered by create_and_return flag.')
    258     parser.add_argument('-ud', '--num_duts', dest='num_duts',
    259                         default=dict(DEFAULT_NUM_DUTS),
    260                         type=ast.literal_eval,
    261                         help="Python dict literal that specifies the required"
    262                         " number of DUTs for each board. E.g {'gandof':4}")
    263     parser.add_argument('-c', '--continue_on_failure', action='store_true',
    264                         dest='continue_on_failure',
    265                         help='All tests continue to run when there is failure')
    266     parser.add_argument('-sl', '--service_respawn_limit', type=int,
    267                         default=DEFAULT_SERVICE_RESPAWN_LIMIT,
    268                         help='If a service crashes more than this, the test '
    269                              'push is considered failed.')
    270 
    271     arguments = parser.parse_args(sys.argv[1:])
    272 
    273     # Get latest stable build as default build.
    274     if not arguments.build:
    275         arguments.build = get_default_build(arguments.board, arguments.web)
    276     if not arguments.shard_build:
    277         arguments.shard_build = get_default_build(arguments.shard_board,
    278                                                   arguments.web)
    279 
    280     return arguments
    281 
    282 
    283 def do_run_suite(suite_name, arguments, use_shard=False,
    284                  create_and_return=False, testbed_test=False):
    285     """Call run_suite to run a suite job, and return the suite job id.
    286 
    287     The script waits the suite job to finish before returning the suite job id.
    288     Also it will echo the run_suite output to stdout.
    289 
    290     @param suite_name: Name of a suite, e.g., dummy.
    291     @param arguments: Arguments for run_suite command.
    292     @param use_shard: If true, suite is scheduled for shard board.
    293     @param create_and_return: If True, run_suite just creates the suite, print
    294                               the job id, then finish immediately.
    295     @param testbed_test: True to run testbed test. Default is False.
    296 
    297     @return: Suite job ID.
    298 
    299     """
    300     if use_shard and not testbed_test:
    301         board = arguments.shard_board
    302         build = arguments.shard_build
    303     elif testbed_test:
    304         board = arguments.android_board
    305         build = arguments.android_build
    306     else:
    307         board = arguments.board
    308         build = arguments.build
    309 
    310     # Remove cros-version label to force provision.
    311     hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board,
    312                           locked=False)
    313     for host in hosts:
    314         labels_to_remove = [
    315                 l for l in host.labels
    316                 if (l.startswith(provision.CROS_VERSION_PREFIX) or
    317                     l.startswith(provision.TESTBED_BUILD_VERSION_PREFIX))]
    318         if labels_to_remove:
    319             AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove)
    320 
    321         # Test repair work flow on shards, powerwash test will timeout after 7m.
    322         if use_shard and not create_and_return:
    323             powerwash_dut_to_test_repair(host.hostname, timeout=420)
    324 
    325     current_dir = os.path.dirname(os.path.realpath(__file__))
    326     cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND),
    327            '-s', suite_name,
    328            '-b', board,
    329            '-i', build,
    330            '-p', arguments.pool,
    331            '--minimum_duts', str(arguments.num_duts[board])]
    332     if create_and_return:
    333         cmd += ['-c']
    334     if testbed_test:
    335         cmd += ['--run_prod_code']
    336 
    337     suite_job_id = None
    338 
    339     proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
    340                             stderr=subprocess.STDOUT)
    341 
    342     while True:
    343         line = proc.stdout.readline()
    344 
    345         # Break when run_suite process completed.
    346         if not line and proc.poll() != None:
    347             break
    348         print line.rstrip()
    349         _run_suite_output.append(line.rstrip())
    350 
    351         if not suite_job_id:
    352             m = re.match(SUITE_JOB_START_INFO_REGEX, line)
    353             if m and m.group(1):
    354                 suite_job_id = int(m.group(1))
    355                 _all_suite_ids.append(suite_job_id)
    356 
    357     if not suite_job_id:
    358         raise TestPushException('Failed to retrieve suite job ID.')
    359 
    360     # If create_and_return specified, wait for the suite to finish.
    361     if create_and_return:
    362         end = time.time() + arguments.timeout_min * 60
    363         while not AFE.get_jobs(id=suite_job_id, finished=True):
    364             if time.time() < end:
    365                 time.sleep(10)
    366             else:
    367                 AFE.run('abort_host_queue_entries', job=suite_job_id)
    368                 raise TestPushException(
    369                         'Asynchronous suite triggered by create_and_return '
    370                         'flag has timed out after %d mins. Aborting it.' %
    371                         arguments.timeout_min)
    372 
    373     print 'Suite job %s is completed.' % suite_job_id
    374     return suite_job_id
    375 
    376 
    377 def check_dut_image(build, suite_job_id):
    378     """Confirm all DUTs used for the suite are imaged to expected build.
    379 
    380     @param build: Expected build to be imaged.
    381     @param suite_job_id: job ID of the suite job.
    382     @raise TestPushException: If a DUT does not have expected build imaged.
    383     """
    384     print 'Checking image installed in DUTs...'
    385     job_ids = [job.id for job in
    386                models.Job.objects.filter(parent_job_id=suite_job_id)]
    387     hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0]
    388             for job_id in job_ids]
    389     hostnames = set([hqe.host.hostname for hqe in hqes])
    390     for hostname in hostnames:
    391         found_build = site_utils.get_build_from_afe(hostname, AFE)
    392         if found_build != build:
    393             raise TestPushException('DUT is not imaged properly. Host %s has '
    394                                     'build %s, while build %s is expected.' %
    395                                     (hostname, found_build, build))
    396 
    397 
    398 def test_suite(suite_name, expected_results, arguments, use_shard=False,
    399                create_and_return=False, testbed_test=False):
    400     """Call run_suite to start a suite job and verify results.
    401 
    402     @param suite_name: Name of a suite, e.g., dummy
    403     @param expected_results: A dictionary of test name to test result.
    404     @param arguments: Arguments for run_suite command.
    405     @param use_shard: If true, suite is scheduled for shard board.
    406     @param create_and_return: If True, run_suite just creates the suite, print
    407                               the job id, then finish immediately.
    408     @param testbed_test: True to run testbed test. Default is False.
    409     """
    410     suite_job_id = do_run_suite(suite_name, arguments, use_shard,
    411                                 create_and_return, testbed_test)
    412 
    413     # Confirm all DUTs used for the suite are imaged to expected build.
    414     # hqe.host_id for jobs running in shard is not synced back to master db,
    415     # therefore, skip verifying dut build for jobs running in shard.
    416     build_expected = (arguments.android_build if testbed_test
    417                       else arguments.build)
    418     if not use_shard and not testbed_test:
    419         check_dut_image(build_expected, suite_job_id)
    420 
    421     # Verify test results are the expected results.
    422     verify_test_results(suite_job_id, expected_results)
    423 
    424 
    425 def verify_test_results(job_id, expected_results):
    426     """Verify the test results with the expected results.
    427 
    428     @param job_id: id of the running jobs. For suite job, it is suite_job_id.
    429     @param expected_results: A dictionary of test name to test result.
    430     @raise TestPushException: If verify fails.
    431     """
    432     print 'Comparing test results...'
    433     test_views = site_utils.get_test_views_from_tko(job_id, TKO)
    434 
    435     mismatch_errors = []
    436     extra_test_errors = []
    437 
    438     found_keys = set()
    439     for test_name, test_status in test_views.items():
    440         print "%s%s" % (test_name.ljust(30), test_status)
    441         # platform_InstallTestImage test may exist in old builds.
    442         if re.search('platform_InstallTestImage_SERVER_JOB$', test_name):
    443             continue
    444         test_found = False
    445         for key,val in expected_results.items():
    446             if re.search(key, test_name):
    447                 test_found = True
    448                 found_keys.add(key)
    449                 if val != test_status:
    450                     error = ('%s Expected: [%s], Actual: [%s]' %
    451                              (test_name, val, test_status))
    452                     mismatch_errors.append(error)
    453         if not test_found:
    454             extra_test_errors.append(test_name)
    455 
    456     missing_test_errors = set(expected_results.keys()) - found_keys
    457     for exception in IGNORE_MISSING_TESTS:
    458         try:
    459             missing_test_errors.remove(exception)
    460         except KeyError:
    461             pass
    462 
    463     summary = []
    464     if mismatch_errors:
    465         summary.append(('Results of %d test(s) do not match expected '
    466                         'values:') % len(mismatch_errors))
    467         summary.extend(mismatch_errors)
    468         summary.append('\n')
    469 
    470     if extra_test_errors:
    471         summary.append('%d test(s) are not expected to be run:' %
    472                        len(extra_test_errors))
    473         summary.extend(extra_test_errors)
    474         summary.append('\n')
    475 
    476     if missing_test_errors:
    477         summary.append('%d test(s) are missing from the results:' %
    478                        len(missing_test_errors))
    479         summary.extend(missing_test_errors)
    480         summary.append('\n')
    481 
    482     # Test link to log can be loaded.
    483     job_name = '%s-%s' % (job_id, getpass.getuser())
    484     log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name)
    485     try:
    486         urllib2.urlopen(log_link).read()
    487     except urllib2.URLError:
    488         summary.append('Failed to load page for link to log: %s.' % log_link)
    489 
    490     if summary:
    491         raise TestPushException('\n'.join(summary))
    492 
    493 
    494 def test_suite_wrapper(queue, suite_name, expected_results, arguments,
    495                        use_shard=False, create_and_return=False,
    496                        testbed_test=False):
    497     """Wrapper to call test_suite. Handle exception and pipe it to parent
    498     process.
    499 
    500     @param queue: Queue to save exception to be accessed by parent process.
    501     @param suite_name: Name of a suite, e.g., dummy
    502     @param expected_results: A dictionary of test name to test result.
    503     @param arguments: Arguments for run_suite command.
    504     @param use_shard: If true, suite is scheduled for shard board.
    505     @param create_and_return: If True, run_suite just creates the suite, print
    506                               the job id, then finish immediately.
    507     @param testbed_test: True to run testbed test. Default is False.
    508     """
    509     try:
    510         test_suite(suite_name, expected_results, arguments, use_shard,
    511                    create_and_return, testbed_test)
    512     except Exception:
    513         # Store the whole exc_info leads to a PicklingError.
    514         except_type, except_value, tb = sys.exc_info()
    515         queue.put((except_type, except_value, traceback.extract_tb(tb)))
    516 
    517 
    518 def check_queue(queue):
    519     """Check the queue for any exception being raised.
    520 
    521     @param queue: Queue used to store exception for parent process to access.
    522     @raise: Any exception found in the queue.
    523     """
    524     if queue.empty():
    525         return
    526     exc_info = queue.get()
    527     # Raise the exception with original backtrace.
    528     print 'Original stack trace of the exception:\n%s' % exc_info[2]
    529     raise exc_info[0](exc_info[1])
    530 
    531 
    532 def get_head_of_repos(repos):
    533     """Get HEAD of updated repos, currently are autotest and chromite repos
    534 
    535     @param repos: a map of repo name to the path of the repo. E.g.
    536                   {'autotest': '/usr/local/autotest'}
    537     @return: a map of repo names to the current HEAD of that repo.
    538     """
    539     @contextmanager
    540     def cd(new_wd):
    541         """Helper function to change working directory.
    542 
    543         @param new_wd: new working directory that switch to.
    544         """
    545         prev_wd = os.getcwd()
    546         os.chdir(os.path.expanduser(new_wd))
    547         try:
    548             yield
    549         finally:
    550             os.chdir(prev_wd)
    551 
    552     updated_repo_heads = {}
    553     for repo_name, path_to_repo in repos.iteritems():
    554         with cd(path_to_repo):
    555             head = subprocess.check_output('git rev-parse HEAD',
    556                                            shell=True).strip()
    557         updated_repo_heads[repo_name] = head
    558     return updated_repo_heads
    559 
    560 
    561 def push_prod_next_branch(updated_repo_heads):
    562     """push prod-next branch to the tested HEAD after all tests pass.
    563 
    564     The push command must be ran as PUSH_USER, since only PUSH_USER has the
    565     right to push branches.
    566 
    567     @param updated_repo_heads: a map of repo names to tested HEAD of that repo.
    568     """
    569     # prod-next branch for every repo is downloaded under PUSH_USER home dir.
    570     cmd = ('cd ~/{repo}; git pull; git rebase {hash} prod-next;'
    571            'git push origin prod-next')
    572     run_push_as_push_user = "sudo su - %s -c '%s'" % (PUSH_USER, cmd)
    573 
    574     for repo_name, test_hash in updated_repo_heads.iteritems():
    575          push_cmd = run_push_as_push_user.format(hash=test_hash, repo=repo_name)
    576          print 'Pushing %s prod-next branch to %s' % (repo_name, test_hash)
    577          print subprocess.check_output(push_cmd, stderr=subprocess.STDOUT,
    578                                        shell=True)
    579 
    580 
    581 def _run_test_suites(arguments):
    582     """Run the actual tests that comprise the test_push."""
    583     # Use daemon flag will kill child processes when parent process fails.
    584     use_daemon = not arguments.continue_on_failure
    585     queue = multiprocessing.Queue()
    586 
    587     push_to_prod_suite = multiprocessing.Process(
    588             target=test_suite_wrapper,
    589             args=(queue, PUSH_TO_PROD_SUITE, EXPECTED_TEST_RESULTS,
    590                     arguments))
    591     push_to_prod_suite.daemon = use_daemon
    592     push_to_prod_suite.start()
    593 
    594     # suite test with --create_and_return flag
    595     asynchronous_suite = multiprocessing.Process(
    596             target=test_suite_wrapper,
    597             args=(queue, DUMMY_SUITE, EXPECTED_TEST_RESULTS_DUMMY,
    598                     arguments, True, True))
    599     asynchronous_suite.daemon = True
    600     asynchronous_suite.start()
    601 
    602     while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive():
    603         check_queue(queue)
    604         time.sleep(5)
    605     check_queue(queue)
    606     push_to_prod_suite.join()
    607     asynchronous_suite.join()
    608 
    609 
    610 def check_service_crash(respawn_limit, start_time):
    611   """Check whether scheduler or host_scheduler crash during testing.
    612 
    613   Since the testing push is kicked off at the beginning of a given hour, the way
    614   to check whether a service is crashed is to check whether the times of the
    615   service being respawn during testing push is over the respawn_limit.
    616 
    617   @param respawn_limit: The maximum number of times the service is allowed to
    618                         be respawn.
    619   @param start_time: The time that testing push is kicked off.
    620   """
    621   def _parse(filename_prefix, filename):
    622     """Helper method to parse the time of the log.
    623 
    624     @param filename_prefix: The prefix of the filename.
    625     @param filename: The name of the log file.
    626     """
    627     return datetime.datetime.strptime(filename[len(filename_prefix):],
    628                                       "%Y-%m-%d-%H.%M.%S")
    629 
    630   services = ['scheduler', 'host_scheduler']
    631   logs = os.listdir('%s/logs/' % AUTOTEST_DIR)
    632   curr_time = datetime.datetime.now()
    633 
    634   error_msg = ''
    635   for service in services:
    636     log_prefix = '%s.log.' % service
    637     respawn_count = sum(1 for l in logs if l.startswith(log_prefix)
    638                         and start_time <= _parse(log_prefix, l) <= curr_time)
    639 
    640     if respawn_count > respawn_limit:
    641       error_msg += ('%s has been respawned %s times during testing push at %s. '
    642                     'It is very likely crashed. Please check!\n' %
    643                     (service, respawn_count,
    644                      start_time.strftime("%Y-%m-%d-%H")))
    645   if error_msg:
    646     raise TestPushException(error_msg)
    647 
    648 
    649 def _promote_prod_next_refs():
    650     """Updates prod-next branch on relevant repos."""
    651     updated_repo_heads = get_head_of_repos(UPDATED_REPOS)
    652     push_prod_next_branch(updated_repo_heads)
    653     return updated_repo_heads
    654 
    655 
    656 _SUCCESS_MSG = """
    657 All tests completed successfully, the prod branch of the following repos is
    658 ready to be pushed to the hash list below.
    659 
    660 %(updated_repos_msg)s
    661 
    662 Instructions for pushing to prod are available at
    663 https://goto.google.com/autotest-to-prod
    664 """
    665 
    666 
    667 def _main(arguments):
    668     """Run test and promote repo branches if tests succeed.
    669 
    670     @param arguments: command line arguments.
    671     """
    672 
    673     # TODO Use chromite.lib.parallel.Manager instead, to workaround the
    674     # too-long-tmp-path problem.
    675     mpmanager = multiprocessing.Manager()
    676     # These are globals used by other functions in this module to communicate
    677     # back from worker processes.
    678     global _run_suite_output
    679     _run_suite_output = mpmanager.list()
    680     global _all_suite_ids
    681     _all_suite_ids = mpmanager.list()
    682 
    683     try:
    684         start_time = datetime.datetime.now()
    685         reverify_all_push_duts()
    686         time.sleep(15) # Wait for the verify test to start.
    687         check_dut_inventory(arguments.num_duts, arguments.pool)
    688         _run_test_suites(arguments)
    689         check_service_crash(arguments.service_respawn_limit, start_time)
    690         updated_repo_heads = _promote_prod_next_refs()
    691         updated_repos_msg = '\n'.join(
    692                 ['%s: %s' % (k, v) for k, v in updated_repo_heads.iteritems()])
    693         print _SUCCESS_MSG % {'updated_repos_msg': updated_repos_msg}
    694     except Exception:
    695         # Abort running jobs when choose not to continue when there is failure.
    696         if not arguments.continue_on_failure:
    697             for suite_id in _all_suite_ids:
    698                 if AFE.get_jobs(id=suite_id, finished=False):
    699                     AFE.run('abort_host_queue_entries', job=suite_id)
    700         raise
    701     finally:
    702         # Reverify all the hosts
    703         reverify_all_push_duts()
    704 
    705 
    706 def main():
    707     """Entry point."""
    708     arguments = parse_arguments()
    709     with ts_mon_config.SetupTsMonGlobalState(service_name='test_push',
    710                                              indirect=True):
    711         test_push_success = False
    712         try:
    713             _main(arguments)
    714             test_push_success = True
    715         finally:
    716             metrics.Counter('chromeos/autotest/test_push/completed').increment(
    717                     fields={'success': test_push_success})
    718 
    719 
    720 if __name__ == '__main__':
    721     main()
    722