Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Tool to validate code in prod branch before pushing to lab.
      8 
      9 The script runs push_to_prod suite to verify code in prod branch is ready to be
     10 pushed. Link to design document:
     11 https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit
     12 
     13 To verify if prod branch can be pushed to lab, run following command in
     14 chromeos-autotest.cbf server:
     15 /usr/local/autotest/site_utils/test_push.py -e someone (at] company.com
     16 
     17 The script uses latest gandof stable build as test build by default.
     18 
     19 """
     20 
     21 import argparse
     22 import ast
     23 from contextlib import contextmanager
     24 import getpass
     25 import multiprocessing
     26 import os
     27 import re
     28 import subprocess
     29 import sys
     30 import time
     31 import traceback
     32 import urllib2
     33 
     34 import common
     35 try:
     36     from autotest_lib.frontend import setup_django_environment
     37     from autotest_lib.frontend.afe import models
     38     from autotest_lib.frontend.afe import rpc_utils
     39 except ImportError:
     40     # Unittest may not have Django database configured and will fail to import.
     41     pass
     42 from autotest_lib.client.common_lib import global_config
     43 from autotest_lib.client.common_lib import priorities
     44 from autotest_lib.client.common_lib.cros import retry
     45 from autotest_lib.server import site_utils
     46 from autotest_lib.server import utils
     47 from autotest_lib.server.cros import provision
     48 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
     49 from autotest_lib.site_utils import gmail_lib
     50 from autotest_lib.site_utils.suite_scheduler import constants
     51 
     52 try:
     53     from chromite.lib import metrics
     54     from chromite.lib import ts_mon_config
     55 except ImportError:
     56     metrics = site_utils.metrics_mock
     57     ts_mon_config = site_utils.metrics_mock
     58 
     59 AUTOTEST_DIR=common.autotest_dir
     60 CONFIG = global_config.global_config
     61 
     62 AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2)
     63 TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10)
     64 
     65 MAIL_FROM = 'chromeos-test (at] google.com'
     66 BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+'
     67 RUN_SUITE_COMMAND = 'run_suite.py'
     68 PUSH_TO_PROD_SUITE = 'push_to_prod'
     69 DUMMY_SUITE = 'dummy'
     70 # TODO(shuqianz): Dynamically get android build after crbug.com/646068 fixed
     71 DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30
     72 IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server')
     73 DEFAULT_EMAIL = CONFIG.get_config_value(
     74         'SCHEDULER', 'notify_email', type=list, default=[])
     75 DEFAULT_NUM_DUTS = "{'gandof': 4, 'quawks': 2, 'testbed': 1}"
     76 
     77 SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*'
     78                               'tab_id=view_job&object_id=(\d+)$')
     79 
     80 # Dictionary of test results keyed by test name regular expression.
     81 EXPECTED_TEST_RESULTS = {'^SERVER_JOB$':                 'GOOD',
     82                          # This is related to dummy_Fail/control.dependency.
     83                          'dummy_Fail.dependency$':       'TEST_NA',
     84                          'login_LoginSuccess.*':         'GOOD',
     85                          'provision_AutoUpdate.double':  'GOOD',
     86                          'dummy_Pass.*':                 'GOOD',
     87                          'dummy_Fail.Fail$':             'FAIL',
     88                          'dummy_Fail.RetryFail$':        'FAIL',
     89                          'dummy_Fail.RetrySuccess':      'GOOD',
     90                          'dummy_Fail.Error$':            'ERROR',
     91                          'dummy_Fail.Warn$':             'WARN',
     92                          'dummy_Fail.NAError$':          'TEST_NA',
     93                          'dummy_Fail.Crash$':            'GOOD',
     94                          }
     95 
     96 EXPECTED_TEST_RESULTS_DUMMY = {'^SERVER_JOB$':       'GOOD',
     97                                'dummy_Pass.*':       'GOOD',
     98                                'dummy_Fail.Fail':    'FAIL',
     99                                'dummy_Fail.Warn':    'WARN',
    100                                'dummy_Fail.Crash':   'GOOD',
    101                                'dummy_Fail.Error':   'ERROR',
    102                                'dummy_Fail.NAError': 'TEST_NA',}
    103 
    104 EXPECTED_TEST_RESULTS_TESTBED = {'^SERVER_JOB$':      'GOOD',
    105                                  'testbed_DummyTest': 'GOOD',}
    106 
    107 EXPECTED_TEST_RESULTS_POWERWASH = {'platform_Powerwash': 'GOOD',
    108                                    'SERVER_JOB':         'GOOD'}
    109 
    110 URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str)
    111 URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str)
    112 
    113 # Some test could be missing from the test results for various reasons. Add
    114 # such test in this list and explain the reason.
    115 IGNORE_MISSING_TESTS = [
    116     # For latest build, npo_test_delta does not exist.
    117     'autoupdate_EndToEndTest.npo_test_delta.*',
    118     # For trybot build, nmo_test_delta does not exist.
    119     'autoupdate_EndToEndTest.nmo_test_delta.*',
    120     # Older build does not have login_LoginSuccess test in push_to_prod suite.
    121     # TODO(dshi): Remove following lines after R41 is stable.
    122     'login_LoginSuccess']
    123 
    124 # Save all run_suite command output.
    125 manager = multiprocessing.Manager()
    126 run_suite_output = manager.list()
    127 all_suite_ids = manager.list()
    128 # A dict maps the name of the updated repos and the path of them.
    129 UPDATED_REPOS = {'autotest': AUTOTEST_DIR,
    130                  'chromite': '%s/site-packages/chromite/' % AUTOTEST_DIR}
    131 PUSH_USER = 'chromeos-test-lab'
    132 
    133 class TestPushException(Exception):
    134     """Exception to be raised when the test to push to prod failed."""
    135     pass
    136 
    137 
    138 @retry.retry(TestPushException, timeout_min=5, delay_sec=30)
    139 def check_dut_inventory(required_num_duts, pool):
    140     """Check DUT inventory for each board in the pool specified..
    141 
    142     @param required_num_duts: a dict specifying the number of DUT each platform
    143                               requires in order to finish push tests.
    144     @param pool: the pool used by test_push.
    145     @raise TestPushException: if number of DUTs are less than the requirement.
    146     """
    147     print 'Checking DUT inventory...'
    148     pool_label = constants.Labels.POOL_PREFIX + pool
    149     hosts = AFE.run('get_hosts', status='Ready', locked=False)
    150     hosts = [h for h in hosts if pool_label in h.get('labels', [])]
    151     platforms = [host['platform'] for host in hosts]
    152     current_inventory = {p : platforms.count(p) for p in platforms}
    153     error_msg = ''
    154     for platform, req_num in required_num_duts.items():
    155         curr_num = current_inventory.get(platform, 0)
    156         if curr_num < req_num:
    157             error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready'
    158                           ' now' % (req_num, platform, pool, curr_num))
    159     if error_msg:
    160         raise TestPushException('Not enough DUTs to run push tests. %s' %
    161                                 error_msg)
    162 
    163 
    164 def powerwash_dut_to_test_repair(hostname, timeout):
    165     """Powerwash dut to test repair workflow.
    166 
    167     @param hostname: hostname of the dut.
    168     @param timeout: seconds of the powerwash test to hit timeout.
    169     @raise TestPushException: if DUT fail to run the test.
    170     """
    171     t = models.Test.objects.get(name='platform_Powerwash')
    172     c = utils.read_file(os.path.join(common.autotest_dir, t.path))
    173     job_id = rpc_utils.create_job_common(
    174              'powerwash', priority=priorities.Priority.SUPER,
    175              control_type='Server', control_file=c, hosts=[hostname])
    176 
    177     end = time.time() + timeout
    178     while not TKO.get_job_test_statuses_from_db(job_id):
    179         if time.time() >= end:
    180             AFE.run('abort_host_queue_entries', job=job_id)
    181             raise TestPushException(
    182                 'Powerwash test on %s timeout after %ds, abort it.' %
    183                 (hostname, timeout))
    184         time.sleep(10)
    185     verify_test_results(job_id, EXPECTED_TEST_RESULTS_POWERWASH)
    186     # Kick off verify, verify will fail and a repair should be triggered.
    187     AFE.reverify_hosts(hostnames=[hostname])
    188 
    189 
    190 def reverify_all_push_duts():
    191     """Reverify all the push DUTs."""
    192     print 'Reverifying all DUTs.'
    193     hosts = [h.hostname for h in AFE.get_hosts()]
    194     AFE.reverify_hosts(hostnames=hosts)
    195 
    196 
    197 def get_default_build(board='gandof', server='chromeos-autotest.hot'):
    198     """Get the default build to be used for test.
    199 
    200     @param board: Name of board to be tested, default is gandof.
    201     @return: Build to be tested, e.g., gandof-release/R36-5881.0.0
    202     """
    203     build = None
    204     cmd = ('%s/cli/atest stable_version list --board=%s -w %s' %
    205            (AUTOTEST_DIR, board, server))
    206     result = subprocess.check_output(cmd, shell=True).strip()
    207     build = re.search(BUILD_REGEX, result)
    208     if build:
    209         return '%s-release/%s' % (board, build.group(0))
    210 
    211     # If fail to get stable version from cautotest, use that defined in config
    212     build = CONFIG.get_config_value('CROS', 'stable_cros_version')
    213     return '%s-release/%s' % (board, build)
    214 
    215 def parse_arguments():
    216     """Parse arguments for test_push tool.
    217 
    218     @return: Parsed arguments.
    219 
    220     """
    221     parser = argparse.ArgumentParser()
    222     parser.add_argument('-b', '--board', dest='board', default='gandof',
    223                         help='Default is gandof.')
    224     parser.add_argument('-sb', '--shard_board', dest='shard_board',
    225                         default='quawks',
    226                         help='Default is quawks.')
    227     parser.add_argument('-i', '--build', dest='build', default=None,
    228                         help='Default is the latest stale build of given '
    229                              'board. Must be a stable build, otherwise AU test '
    230                              'will fail. (ex: gandolf-release/R54-8743.25.0)')
    231     parser.add_argument('-si', '--shard_build', dest='shard_build', default=None,
    232                         help='Default is the latest stable build of given '
    233                              'board. Must be a stable build, otherwise AU test '
    234                              'will fail.')
    235     parser.add_argument('-w', '--web', default='chromeos-autotest.hot',
    236                         help='Specify web server to grab stable version from.')
    237     parser.add_argument('-ab', '--android_board', dest='android_board',
    238                         default='shamu-2', help='Android board to test.')
    239     parser.add_argument('-ai', '--android_build', dest='android_build',
    240                         help='Android build to test.')
    241     parser.add_argument('-p', '--pool', dest='pool', default='bvt')
    242     parser.add_argument('-u', '--num', dest='num', type=int, default=3,
    243                         help='Run on at most NUM machines.')
    244     parser.add_argument('-e', '--email', nargs='+', dest='email',
    245                         default=DEFAULT_EMAIL,
    246                         help='Email address for the notification to be sent to '
    247                              'after the script finished running.')
    248     parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int,
    249                         default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB,
    250                         help='Time in mins to wait before abort the jobs we '
    251                              'are waiting on. Only for the asynchronous suites '
    252                              'triggered by create_and_return flag.')
    253     parser.add_argument('-ud', '--num_duts', dest='num_duts',
    254                         default=DEFAULT_NUM_DUTS,
    255                         help="String of dict that indicates the required number"
    256                              " of DUTs for each board. E.g {'gandof':4}")
    257     parser.add_argument('-c', '--continue_on_failure', action='store_true',
    258                         dest='continue_on_failure',
    259                         help='All tests continue to run when there is failure')
    260 
    261     arguments = parser.parse_args(sys.argv[1:])
    262 
    263     # Get latest stable build as default build.
    264     if not arguments.build:
    265         arguments.build = get_default_build(arguments.board, arguments.web)
    266     if not arguments.shard_build:
    267         arguments.shard_build = get_default_build(arguments.shard_board,
    268                                                   arguments.web)
    269 
    270     arguments.num_duts = ast.literal_eval(arguments.num_duts)
    271 
    272     return arguments
    273 
    274 
    275 def do_run_suite(suite_name, arguments, use_shard=False,
    276                  create_and_return=False, testbed_test=False):
    277     """Call run_suite to run a suite job, and return the suite job id.
    278 
    279     The script waits the suite job to finish before returning the suite job id.
    280     Also it will echo the run_suite output to stdout.
    281 
    282     @param suite_name: Name of a suite, e.g., dummy.
    283     @param arguments: Arguments for run_suite command.
    284     @param use_shard: If true, suite is scheduled for shard board.
    285     @param create_and_return: If True, run_suite just creates the suite, print
    286                               the job id, then finish immediately.
    287     @param testbed_test: True to run testbed test. Default is False.
    288 
    289     @return: Suite job ID.
    290 
    291     """
    292     if use_shard and not testbed_test:
    293         board = arguments.shard_board
    294         build = arguments.shard_build
    295     elif testbed_test:
    296         board = arguments.android_board
    297         build = arguments.android_build
    298     else:
    299         board = arguments.board
    300         build = arguments.build
    301 
    302     # Remove cros-version label to force provision.
    303     hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board,
    304                           locked=False)
    305     for host in hosts:
    306         labels_to_remove = [
    307                 l for l in host.labels
    308                 if (l.startswith(provision.CROS_VERSION_PREFIX) or
    309                     l.startswith(provision.TESTBED_BUILD_VERSION_PREFIX))]
    310         if labels_to_remove:
    311             AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove)
    312 
    313         # Test repair work flow on shards, powerwash test will timeout after 7m.
    314         if use_shard and not create_and_return:
    315             powerwash_dut_to_test_repair(host.hostname, timeout=420)
    316 
    317     current_dir = os.path.dirname(os.path.realpath(__file__))
    318     cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND),
    319            '-s', suite_name,
    320            '-b', board,
    321            '-i', build,
    322            '-p', arguments.pool,
    323            '-u', str(arguments.num)]
    324     if create_and_return:
    325         cmd += ['-c']
    326     if testbed_test:
    327         cmd += ['--run_prod_code']
    328 
    329     suite_job_id = None
    330 
    331     proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
    332                             stderr=subprocess.STDOUT)
    333 
    334     while True:
    335         line = proc.stdout.readline()
    336 
    337         # Break when run_suite process completed.
    338         if not line and proc.poll() != None:
    339             break
    340         print line.rstrip()
    341         run_suite_output.append(line.rstrip())
    342 
    343         if not suite_job_id:
    344             m = re.match(SUITE_JOB_START_INFO_REGEX, line)
    345             if m and m.group(1):
    346                 suite_job_id = int(m.group(1))
    347                 all_suite_ids.append(suite_job_id)
    348 
    349     if not suite_job_id:
    350         raise TestPushException('Failed to retrieve suite job ID.')
    351 
    352     # If create_and_return specified, wait for the suite to finish.
    353     if create_and_return:
    354         end = time.time() + arguments.timeout_min * 60
    355         while not AFE.get_jobs(id=suite_job_id, finished=True):
    356             if time.time() < end:
    357                 time.sleep(10)
    358             else:
    359                 AFE.run('abort_host_queue_entries', job=suite_job_id)
    360                 raise TestPushException(
    361                         'Asynchronous suite triggered by create_and_return '
    362                         'flag has timed out after %d mins. Aborting it.' %
    363                         arguments.timeout_min)
    364 
    365     print 'Suite job %s is completed.' % suite_job_id
    366     return suite_job_id
    367 
    368 
    369 def check_dut_image(build, suite_job_id):
    370     """Confirm all DUTs used for the suite are imaged to expected build.
    371 
    372     @param build: Expected build to be imaged.
    373     @param suite_job_id: job ID of the suite job.
    374     @raise TestPushException: If a DUT does not have expected build imaged.
    375     """
    376     print 'Checking image installed in DUTs...'
    377     job_ids = [job.id for job in
    378                models.Job.objects.filter(parent_job_id=suite_job_id)]
    379     hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0]
    380             for job_id in job_ids]
    381     hostnames = set([hqe.host.hostname for hqe in hqes])
    382     for hostname in hostnames:
    383         found_build = site_utils.get_build_from_afe(hostname, AFE)
    384         if found_build != build:
    385             raise TestPushException('DUT is not imaged properly. Host %s has '
    386                                     'build %s, while build %s is expected.' %
    387                                     (hostname, found_build, build))
    388 
    389 
    390 def test_suite(suite_name, expected_results, arguments, use_shard=False,
    391                create_and_return=False, testbed_test=False):
    392     """Call run_suite to start a suite job and verify results.
    393 
    394     @param suite_name: Name of a suite, e.g., dummy
    395     @param expected_results: A dictionary of test name to test result.
    396     @param arguments: Arguments for run_suite command.
    397     @param use_shard: If true, suite is scheduled for shard board.
    398     @param create_and_return: If True, run_suite just creates the suite, print
    399                               the job id, then finish immediately.
    400     @param testbed_test: True to run testbed test. Default is False.
    401     """
    402     suite_job_id = do_run_suite(suite_name, arguments, use_shard,
    403                                 create_and_return, testbed_test)
    404 
    405     # Confirm all DUTs used for the suite are imaged to expected build.
    406     # hqe.host_id for jobs running in shard is not synced back to master db,
    407     # therefore, skip verifying dut build for jobs running in shard.
    408     build_expected = (arguments.android_build if testbed_test
    409                       else arguments.build)
    410     if not use_shard and not testbed_test:
    411         check_dut_image(build_expected, suite_job_id)
    412 
    413     # Verify test results are the expected results.
    414     verify_test_results(suite_job_id, expected_results)
    415 
    416 
    417 def verify_test_results(job_id, expected_results):
    418     """Verify the test results with the expected results.
    419 
    420     @param job_id: id of the running jobs. For suite job, it is suite_job_id.
    421     @param expected_results: A dictionary of test name to test result.
    422     @raise TestPushException: If verify fails.
    423     """
    424     print 'Comparing test results...'
    425     test_views = site_utils.get_test_views_from_tko(job_id, TKO)
    426 
    427     mismatch_errors = []
    428     extra_test_errors = []
    429 
    430     found_keys = set()
    431     for test_name, test_status in test_views.items():
    432         print "%s%s" % (test_name.ljust(30), test_status)
    433         # platform_InstallTestImage test may exist in old builds.
    434         if re.search('platform_InstallTestImage_SERVER_JOB$', test_name):
    435             continue
    436         test_found = False
    437         for key,val in expected_results.items():
    438             if re.search(key, test_name):
    439                 test_found = True
    440                 found_keys.add(key)
    441                 if val != test_status:
    442                     error = ('%s Expected: [%s], Actual: [%s]' %
    443                              (test_name, val, test_status))
    444                     mismatch_errors.append(error)
    445         if not test_found:
    446             extra_test_errors.append(test_name)
    447 
    448     missing_test_errors = set(expected_results.keys()) - found_keys
    449     for exception in IGNORE_MISSING_TESTS:
    450         try:
    451             missing_test_errors.remove(exception)
    452         except KeyError:
    453             pass
    454 
    455     summary = []
    456     if mismatch_errors:
    457         summary.append(('Results of %d test(s) do not match expected '
    458                         'values:') % len(mismatch_errors))
    459         summary.extend(mismatch_errors)
    460         summary.append('\n')
    461 
    462     if extra_test_errors:
    463         summary.append('%d test(s) are not expected to be run:' %
    464                        len(extra_test_errors))
    465         summary.extend(extra_test_errors)
    466         summary.append('\n')
    467 
    468     if missing_test_errors:
    469         summary.append('%d test(s) are missing from the results:' %
    470                        len(missing_test_errors))
    471         summary.extend(missing_test_errors)
    472         summary.append('\n')
    473 
    474     # Test link to log can be loaded.
    475     job_name = '%s-%s' % (job_id, getpass.getuser())
    476     log_link = URL_PATTERN % (URL_HOST, job_name)
    477     try:
    478         urllib2.urlopen(log_link).read()
    479     except urllib2.URLError:
    480         summary.append('Failed to load page for link to log: %s.' % log_link)
    481 
    482     if summary:
    483         raise TestPushException('\n'.join(summary))
    484 
    485 
    486 def test_suite_wrapper(queue, suite_name, expected_results, arguments,
    487                        use_shard=False, create_and_return=False,
    488                        testbed_test=False):
    489     """Wrapper to call test_suite. Handle exception and pipe it to parent
    490     process.
    491 
    492     @param queue: Queue to save exception to be accessed by parent process.
    493     @param suite_name: Name of a suite, e.g., dummy
    494     @param expected_results: A dictionary of test name to test result.
    495     @param arguments: Arguments for run_suite command.
    496     @param use_shard: If true, suite is scheduled for shard board.
    497     @param create_and_return: If True, run_suite just creates the suite, print
    498                               the job id, then finish immediately.
    499     @param testbed_test: True to run testbed test. Default is False.
    500     """
    501     try:
    502         test_suite(suite_name, expected_results, arguments, use_shard,
    503                    create_and_return, testbed_test)
    504     except:
    505         # Store the whole exc_info leads to a PicklingError.
    506         except_type, except_value, tb = sys.exc_info()
    507         queue.put((except_type, except_value, traceback.extract_tb(tb)))
    508 
    509 
    510 def check_queue(queue):
    511     """Check the queue for any exception being raised.
    512 
    513     @param queue: Queue used to store exception for parent process to access.
    514     @raise: Any exception found in the queue.
    515     """
    516     if queue.empty():
    517         return
    518     exc_info = queue.get()
    519     # Raise the exception with original backtrace.
    520     print 'Original stack trace of the exception:\n%s' % exc_info[2]
    521     raise exc_info[0](exc_info[1])
    522 
    523 
    524 def get_head_of_repos(repos):
    525     """Get HEAD of updated repos, currently are autotest and chromite repos
    526 
    527     @param repos: a map of repo name to the path of the repo. E.g.
    528                   {'autotest': '/usr/local/autotest'}
    529     @return: a map of repo names to the current HEAD of that repo.
    530     """
    531     @contextmanager
    532     def cd(new_wd):
    533         """Helper function to change working directory.
    534 
    535         @param new_wd: new working directory that switch to.
    536         """
    537         prev_wd = os.getcwd()
    538         os.chdir(os.path.expanduser(new_wd))
    539         try:
    540             yield
    541         finally:
    542             os.chdir(prev_wd)
    543 
    544     updated_repo_heads = {}
    545     for repo_name, path_to_repo in repos.iteritems():
    546         with cd(path_to_repo):
    547             head = subprocess.check_output('git rev-parse HEAD',
    548                                            shell=True).strip()
    549         updated_repo_heads[repo_name] = head
    550     return updated_repo_heads
    551 
    552 
    553 def push_prod_next_branch(updated_repo_heads):
    554     """push prod-next branch to the tested HEAD after all tests pass.
    555 
    556     The push command must be ran as PUSH_USER, since only PUSH_USER has the
    557     right to push branches.
    558 
    559     @param updated_repo_heads: a map of repo names to tested HEAD of that repo.
    560     """
    561     # prod-next branch for every repo is downloaded under PUSH_USER home dir.
    562     cmd = ('cd ~/{repo}; git pull; git rebase {hash} prod-next;'
    563            'git push origin prod-next')
    564     run_push_as_push_user = "sudo su - %s -c '%s'" % (PUSH_USER, cmd)
    565 
    566     for repo_name, test_hash in updated_repo_heads.iteritems():
    567          push_cmd = run_push_as_push_user.format(hash=test_hash, repo=repo_name)
    568          print 'Pushing %s prod-next branch to %s' % (repo_name, test_hash)
    569          print subprocess.check_output(push_cmd, stderr=subprocess.STDOUT,
    570                                        shell=True)
    571 
    572 
    573 def send_notification_email(email_list, title, msg):
    574     """Send notification to all email addresses in email list.
    575 
    576     @param email_list: a email address list which receives notification email,
    577         whose format is like:
    578             [xxx (at] google.com, xxx (at] google.com, xxx (at] google.com,...]
    579         so that users could also specify multiple email addresses by using
    580         config '--email' or '-e'.
    581     @param title: the title of the email to be sent.
    582     @param msg: the content of the email to be sent.
    583     """
    584     gmail_lib.send_email(','.join(email_list), title, msg)
    585 
    586 
    587 def _main(arguments):
    588     """Running tests.
    589 
    590     @param arguments: command line arguments.
    591     """
    592     updated_repo_heads = get_head_of_repos(UPDATED_REPOS)
    593     updated_repo_msg = '\n'.join(
    594         ['%s: %s' % (k, v) for k, v in updated_repo_heads.iteritems()])
    595     test_push_success = False
    596 
    597     try:
    598         # Use daemon flag will kill child processes when parent process fails.
    599         use_daemon = not arguments.continue_on_failure
    600         # Verify all the DUTs at the beginning of testing push.
    601         reverify_all_push_duts()
    602         time.sleep(15) # Wait 15 secs for the verify test to start.
    603         check_dut_inventory(arguments.num_duts, arguments.pool)
    604         queue = multiprocessing.Queue()
    605 
    606         push_to_prod_suite = multiprocessing.Process(
    607                 target=test_suite_wrapper,
    608                 args=(queue, PUSH_TO_PROD_SUITE, EXPECTED_TEST_RESULTS,
    609                       arguments))
    610         push_to_prod_suite.daemon = use_daemon
    611         push_to_prod_suite.start()
    612 
    613         # suite test with --create_and_return flag
    614         asynchronous_suite = multiprocessing.Process(
    615                 target=test_suite_wrapper,
    616                 args=(queue, DUMMY_SUITE, EXPECTED_TEST_RESULTS_DUMMY,
    617                       arguments, True, True))
    618         asynchronous_suite.daemon = True
    619         asynchronous_suite.start()
    620 
    621         while (push_to_prod_suite.is_alive()
    622                or asynchronous_suite.is_alive()):
    623             check_queue(queue)
    624             time.sleep(5)
    625 
    626         check_queue(queue)
    627 
    628         push_to_prod_suite.join()
    629         asynchronous_suite.join()
    630 
    631         # All tests pass, push prod-next branch for UPDATED_REPOS.
    632         push_prod_next_branch(updated_repo_heads)
    633         test_push_success = True
    634     except Exception as e:
    635         print 'Test for pushing to prod failed:\n'
    636         print str(e)
    637         # Abort running jobs when choose not to continue when there is failure.
    638         if not arguments.continue_on_failure:
    639             for suite_id in all_suite_ids:
    640                 if AFE.get_jobs(id=suite_id, finished=False):
    641                     AFE.run('abort_host_queue_entries', job=suite_id)
    642         # Send out email about the test failure.
    643         if arguments.email:
    644             send_notification_email(
    645                     arguments.email,
    646                     'Test for pushing to prod failed. Do NOT push!',
    647                     ('Test CLs of the following repos failed. Below are the '
    648                      'repos and the corresponding test HEAD.\n\n%s\n\n.'
    649                      'Error occurred during test:\n\n%s\n\n'
    650                      'All logs have been saved to '
    651                      '/var/log/test_push/test_push.log on push master. '
    652                      'Stats on recent success rate can be found at '
    653                      'go/test-push-stats . Detailed '
    654                      'debugging info can be found at go/push-to-prod' %
    655                      (updated_repo_msg, str(e)) + '\n'.join(run_suite_output)))
    656         raise
    657     finally:
    658         metrics.Counter('chromeos/autotest/test_push/completed').increment(
    659             fields={'success': test_push_success})
    660         # Reverify all the hosts
    661         reverify_all_push_duts()
    662 
    663     message = ('\nAll tests are completed successfully, the prod branch of the '
    664                'following repos ready to be pushed to the hash list below.\n'
    665                '%s\n\n\nInstructions for pushing to prod are available at '
    666                'https://goto.google.com/autotest-to-prod ' % updated_repo_msg)
    667     print message
    668     # Send out email about test completed successfully.
    669     if arguments.email:
    670         send_notification_email(
    671                 arguments.email,
    672                 'Test for pushing to prod completed successfully',
    673                 message)
    674 
    675 
    676 def main():
    677     """Entry point."""
    678     arguments = parse_arguments()
    679     with ts_mon_config.SetupTsMonGlobalState(service_name='test_push',
    680                                              indirect=True):
    681         return _main(arguments)
    682 
    683 if __name__ == '__main__':
    684     sys.exit(main())
    685