Home | History | Annotate | Download | only in skylab_suite
      1 # Copyright 2018 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Module for CrOS dynamic test suite generation and execution."""
      6 
      7 from __future__ import absolute_import
      8 from __future__ import division
      9 from __future__ import print_function
     10 
     11 import contextlib
     12 import itertools
     13 import json
     14 import logging
     15 import os
     16 import re
     17 import time
     18 
     19 from lucifer import autotest
     20 from skylab_suite import cros_suite
     21 from skylab_suite import swarming_lib
     22 
     23 
     24 SKYLAB_DRONE_SWARMING_WORKER = '/opt/infra-tools/skylab_swarming_worker'
     25 SKYLAB_SUITE_USER = 'skylab_suite_runner'
     26 SKYLAB_TOOL = '/opt/infra-tools/skylab'
     27 
     28 SUITE_WAIT_SLEEP_INTERVAL_SECONDS = 30
     29 
     30 # See #5 in crbug.com/873886 for more details.
     31 _NOT_SUPPORTED_DEPENDENCIES = ['skip_provision', 'cleanup-reboot', 'rpm',
     32                                'modem_repair']
     33 
     34 
     35 def run(client, test_specs, suite_handler, dry_run=False):
     36     """Run a CrOS dynamic test suite.
     37 
     38     @param client: A swarming_lib.Client instance.
     39     @param test_specs: A list of cros_suite.TestSpec objects.
     40     @param suite_handler: A cros_suite.SuiteHandler object.
     41     @param dry_run: Whether to kick off dry runs of the tests.
     42     """
     43     assert isinstance(client, swarming_lib.Client)
     44     if suite_handler.suite_id:
     45         # Resume an existing suite.
     46         _resume_suite(client, test_specs, suite_handler, dry_run)
     47     else:
     48         # Make a new suite.
     49         _run_suite(test_specs, suite_handler, dry_run)
     50 
     51 
     52 def _resume_suite(client, test_specs, suite_handler, dry_run=False):
     53     """Resume a suite and its child tasks by given suite id."""
     54     assert isinstance(client, swarming_lib.Client)
     55     suite_id = suite_handler.suite_id
     56     all_tasks = client.get_child_tasks(suite_id)
     57     not_yet_scheduled = _get_unscheduled_test_specs(
     58             test_specs, suite_handler, all_tasks)
     59 
     60     logging.info('Not yet scheduled test_specs: %r', not_yet_scheduled)
     61     _create_test_tasks(not_yet_scheduled, suite_handler, suite_id, dry_run)
     62 
     63     if suite_id is not None and suite_handler.should_wait():
     64         _wait_for_results(suite_handler, dry_run=dry_run)
     65 
     66 
     67 def _get_unscheduled_test_specs(test_specs, suite_handler, all_tasks):
     68     not_yet_scheduled = []
     69     for test_spec in test_specs:
     70         if suite_handler.is_provision():
     71             # We cannot check bot_id because pending tasks do not have it yet.
     72             bot_id_tag = 'id:%s' % test_spec.bot_id
     73             tasks = [t for t in all_tasks if bot_id_tag in t['tags']]
     74         else:
     75             tasks = [t for t in all_tasks if t['name']==test_spec.test.name]
     76 
     77         if not tasks:
     78             not_yet_scheduled.append(test_spec)
     79             continue
     80 
     81         current_task = _get_current_task(tasks)
     82         test_task_id = (current_task['task_id'] if current_task
     83                         else tasks[0]['task_id'])
     84         remaining_retries = test_spec.test.job_retries - len(tasks)
     85         previous_retried_ids = [t['task_id'] for t in tasks
     86                                 if t['task_id'] != test_task_id]
     87         suite_handler.add_test_by_task_id(
     88                 test_task_id,
     89                 cros_suite.TestHandlerSpec(
     90                         test_spec=test_spec,
     91                         remaining_retries=remaining_retries,
     92                         previous_retried_ids=previous_retried_ids))
     93 
     94     return not_yet_scheduled
     95 
     96 
     97 def _get_current_task(tasks):
     98     """Get current running task.
     99 
    100     @param tasks: A list of task dicts including task_id, state, etc.
    101 
    102     @return a dict representing the current running task.
    103     """
    104     current_task = None
    105     for t in tasks:
    106         if t['state'] not in swarming_lib.TASK_FINISHED_STATUS:
    107             if current_task:
    108                 raise ValueError(
    109                         'Parent task has 2 same running child tasks: %s, %s'
    110                         % (current_task['task_id'], t['task_id']))
    111 
    112             current_task = t
    113 
    114     return current_task
    115 
    116 
    117 def _run_suite(test_specs, suite_handler, dry_run=False):
    118     """Make a new suite."""
    119     suite_id = os.environ.get('SWARMING_TASK_ID')
    120     if not suite_id:
    121         raise ValueError("Unable to determine suite's task id from env var "
    122                          "SWARMING_TASK_ID.")
    123     _create_test_tasks(test_specs, suite_handler, suite_id, dry_run)
    124     suite_handler.set_suite_id(suite_id)
    125 
    126     if suite_handler.should_wait():
    127         _wait_for_results(suite_handler, dry_run=dry_run)
    128 
    129 
    130 def _create_test_tasks(test_specs, suite_handler, suite_id, dry_run=False):
    131     """Create test tasks for a list of tests (TestSpecs).
    132 
    133     Given a list of TestSpec object, this function will schedule them on
    134     swarming one by one, and add them to the swarming_task_id-to-test map
    135     of suite_handler to keep monitoring them.
    136 
    137     @param test_specs: A list of cros_suite.TestSpec objects to schedule.
    138     @param suite_handler: A cros_suite.SuiteHandler object to monitor the
    139         test_specs' progress.
    140     @param suite_id: A string ID for a suite task, it's the parent task id for
    141         these to-be-scheduled test_specs.
    142     @param dry_run: Whether to kick off dry runs of the tests.
    143     """
    144     for test_spec in test_specs:
    145         test_task_id = _create_test_task(
    146                 test_spec,
    147                 suite_id=suite_id,
    148                 is_provision=suite_handler.is_provision(),
    149                 dry_run=dry_run)
    150         suite_handler.add_test_by_task_id(
    151                 test_task_id,
    152                 cros_suite.TestHandlerSpec(
    153                         test_spec=test_spec,
    154                         remaining_retries=test_spec.test.job_retries - 1,
    155                         previous_retried_ids=[]))
    156 
    157 
    158 def _create_test_task(test_spec, suite_id=None,
    159                       is_provision=False, dry_run=False):
    160     """Create a test task for a given test spec.
    161 
    162     @param test_spec: A cros_suite.TestSpec object.
    163     @param suite_id: the suite task id of the test.
    164     @param dry_run: If true, don't actually create task.
    165 
    166     @return the swarming task id of this task.
    167     """
    168     logging.info('Creating task for test %s', test_spec.test.name)
    169     skylab_tool_path = os.environ.get('SKYLAB_TOOL', SKYLAB_TOOL)
    170 
    171     cmd = [
    172         skylab_tool_path, 'create-test',
    173         '-board', test_spec.board,
    174         '-image', test_spec.build,
    175         '-service-account-json', os.environ['SWARMING_CREDS'],
    176         ]
    177     if _is_dev():
    178         cmd += ['-dev']
    179     if test_spec.pool:
    180         # TODO(akeshet): Clean up this hack around pool name translation.
    181         autotest_pool_label = 'pool:%s' % test_spec.pool
    182         pool_dependency_value = swarming_lib.task_dependencies_from_labels(
    183             [autotest_pool_label])['label-pool']
    184         cmd += ['-pool', pool_dependency_value]
    185 
    186     if test_spec.model:
    187         cmd += ['-model', test_spec.model]
    188     if test_spec.quota_account:
    189         cmd += ['-qs-account', test_spec.quota_account]
    190     if test_spec.test.test_type.lower() == 'client':
    191         cmd += ['-client-test']
    192 
    193     tags = _compute_tags(test_spec.build, suite_id)
    194     dimensions = _compute_dimensions(
    195             test_spec.bot_id, test_spec.test.dependencies)
    196     keyvals_flat = _compute_job_keyvals_flat(test_spec.keyvals, suite_id)
    197 
    198     for tag in tags:
    199         cmd += ['-tag', tag]
    200     for keyval in keyvals_flat:
    201         cmd += ['-keyval', keyval]
    202     cmd += [test_spec.test.name]
    203     cmd += dimensions
    204 
    205     if dry_run:
    206         logging.info('Would have created task with command %s', cmd)
    207         return
    208 
    209     # TODO(akeshet): Avoid this late chromite import.
    210     cros_build_lib = autotest.chromite_load('cros_build_lib')
    211     result = cros_build_lib.RunCommand(cmd, capture_output=True)
    212     # TODO(akeshet): Use -json flag and json-parse output of the command instead
    213     # of regex matching to determine task_id.
    214     m = re.match('.*id=(.*)$', result.output)
    215     task_id = m.group(1)
    216     logging.info('Created task with id %s', task_id)
    217     return task_id
    218 
    219 
    220 # TODO(akeshet): Eliminate the need for this, by either adding an explicit
    221 # swarming_server argument to skylab tool, or having the tool respect the
    222 # SWARMING_SERVER environment variable. See crbug.com/948774
    223 def _is_dev():
    224     """Detect whether skylab tool should be invoked with -dev flag."""
    225     return 'chromium-swarm-dev' in os.environ['SWARMING_SERVER']
    226 
    227 def _compute_tags(build, suite_id):
    228     tags = [
    229         'build:%s' % build,
    230     ]
    231     if suite_id is not None:
    232         tags += ['parent_task_id:%s' % suite_id]
    233     return tags
    234 
    235 
    236 def _compute_dimensions(bot_id, dependencies):
    237     dimensions = []
    238     if bot_id:
    239         dimensions += ['id:%s' % bot_id]
    240     deps = _filter_unsupported_dependencies(dependencies)
    241     flattened_swarming_deps = sorted([
    242         '%s:%s' % (k, v) for
    243         k, v in swarming_lib.task_dependencies_from_labels(deps).items()
    244         ])
    245     dimensions += flattened_swarming_deps
    246     return dimensions
    247 
    248 
    249 def _compute_job_keyvals_flat(keyvals, suite_id):
    250     # Job keyvals calculation.
    251     job_keyvals = keyvals.copy()
    252     if suite_id is not None:
    253         # TODO(akeshet): Avoid this late autotest constants import.
    254         constants = autotest.load('server.cros.dynamic_suite.constants')
    255         job_keyvals[constants.PARENT_JOB_ID] = suite_id
    256     keyvals_flat = sorted(
    257         ['%s:%s' % (k, v) for k, v in job_keyvals.items()])
    258     return keyvals_flat
    259 
    260 
    261 def _filter_unsupported_dependencies(dependencies):
    262     """Filter out Skylab-unsupported test dependencies, with a warning."""
    263     deps = []
    264     for dep in dependencies:
    265         if dep in _NOT_SUPPORTED_DEPENDENCIES:
    266             logging.warning('Dependency %s is not supported in skylab', dep)
    267         else:
    268             deps.append(dep)
    269     return deps
    270 
    271 
    272 @contextlib.contextmanager
    273 def disable_logging(logging_level):
    274     """Context manager for disabling logging of a given logging level."""
    275     try:
    276         logging.disable(logging_level)
    277         yield
    278     finally:
    279         logging.disable(logging.NOTSET)
    280 
    281 
    282 def _loop_and_wait_forever(suite_handler, dry_run):
    283     """Wait for child tasks to finish or break."""
    284     for iterations in itertools.count(0):
    285         # Log progress every 300 seconds.
    286         no_logging = bool(iterations * SUITE_WAIT_SLEEP_INTERVAL_SECONDS % 300)
    287         with disable_logging(logging.INFO if no_logging else logging.NOTSET):
    288             suite_handler.handle_results(suite_handler.suite_id)
    289             if suite_handler.is_finished_waiting():
    290                 break
    291 
    292         for t in suite_handler.retried_tasks:
    293             _retry_test(suite_handler, t['task_id'], dry_run=dry_run)
    294 
    295         time.sleep(SUITE_WAIT_SLEEP_INTERVAL_SECONDS)
    296 
    297 
    298 def _wait_for_results(suite_handler, dry_run=False):
    299     """Wait for child tasks to finish and return their results.
    300 
    301     @param suite_handler: a cros_suite.SuiteHandler object.
    302     """
    303     timeout_util = autotest.chromite_load('timeout_util')
    304     try:
    305         with timeout_util.Timeout(suite_handler.timeout_mins * 60 -
    306                                   suite_handler.passed_mins * 60):
    307             _loop_and_wait_forever(suite_handler, dry_run)
    308     except timeout_util.TimeoutError:
    309         logging.error('Timeout in waiting for child tasks.')
    310         return
    311 
    312     logging.info('Finished to wait for child tasks.')
    313 
    314 
    315 def _retry_test(suite_handler, task_id, dry_run=False):
    316     """Retry test for a suite.
    317 
    318     We will execute the following actions for retrying a test:
    319         1. Schedule the test.
    320         2. Add the test with the new swarming task id to the suite's
    321            retry handler, but reduce its remaining retries by 1.
    322         3. Reduce the suite-level max retries by 1.
    323         4. Remove prevous failed test from retry handler since it's not
    324            actively monitored by the suite.
    325 
    326     @param suite_handler: a cros_suite.SuiteHandler object.
    327     @param task_id: The swarming task id for the retried test.
    328     @param dry_run: Whether to retry a dry run of the test.
    329     """
    330     last_retry_spec = suite_handler.get_test_by_task_id(task_id)
    331     logging.info('Retrying test %s, remaining %d retries.',
    332                  last_retry_spec.test_spec.test.name,
    333                  last_retry_spec.remaining_retries - 1)
    334     retried_task_id = _create_test_task(
    335             last_retry_spec.test_spec,
    336             suite_id=suite_handler.suite_id,
    337             is_provision=suite_handler.is_provision(),
    338             dry_run=dry_run)
    339     previous_retried_ids = last_retry_spec.previous_retried_ids + [task_id]
    340     suite_handler.add_test_by_task_id(
    341             retried_task_id,
    342             cros_suite.TestHandlerSpec(
    343                     test_spec=last_retry_spec.test_spec,
    344                     remaining_retries=last_retry_spec.remaining_retries - 1,
    345                     previous_retried_ids=previous_retried_ids))
    346     suite_handler.set_max_retries(suite_handler.max_retries - 1)
    347     suite_handler.remove_test_by_task_id(task_id)
    348 
    349 
    350 def _convert_dict_to_string(input_dict):
    351     """Convert dictionary to a string.
    352 
    353     @param input_dict: A dictionary.
    354     """
    355     for k, v in input_dict.iteritems():
    356         if isinstance(v, dict):
    357             input_dict[k] = _convert_dict_to_string(v)
    358         else:
    359             input_dict[k] = str(v)
    360 
    361     return json.dumps(input_dict)
    362