Home | History | Annotate | Download | only in skylab_suite
      1 # Copyright 2018 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Definition of a CrOS suite in skylab.
      6 
      7 This file is a simplicication of dynamic_suite.suite without any useless
      8 features for skylab suite.
      9 
     10 Suite class in this file mainly has 2 features:
     11     1. Integrate parameters from control file & passed in arguments.
     12     2. Find proper child tests for a given suite.
     13 
     14 Use case:
     15     See _run_suite() in skylab_suite.run_suite_skylab.
     16 """
     17 
     18 from __future__ import absolute_import
     19 from __future__ import division
     20 from __future__ import print_function
     21 
     22 import collections
     23 import logging
     24 import os
     25 
     26 from lucifer import autotest
     27 from skylab_suite import errors
     28 from skylab_suite import swarming_lib
     29 
     30 
     31 SuiteSpec = collections.namedtuple(
     32         'SuiteSpec',
     33         [
     34                 'builds',
     35                 'suite_name',
     36                 'suite_file_name',
     37                 'test_source_build',
     38                 'suite_args',
     39                 'priority',
     40                 'board',
     41                 'model',
     42                 'pool',
     43                 'job_keyvals',
     44                 'minimum_duts',
     45                 'timeout_mins',
     46                 'quota_account',
     47         ])
     48 
     49 SuiteHandlerSpec = collections.namedtuple(
     50         'SuiteHandlerSpec',
     51         [
     52                 'suite_name',
     53                 'wait',
     54                 'suite_id',
     55                 'timeout_mins',
     56                 'passed_mins',
     57                 'test_retry',
     58                 'max_retries',
     59                 'provision_num_required',
     60         ])
     61 
     62 TestHandlerSpec = collections.namedtuple(
     63         'TestHandlerSpec',
     64         [
     65                 'test_spec',
     66                 'remaining_retries',
     67                 'previous_retried_ids',
     68         ])
     69 
     70 TestSpec = collections.namedtuple(
     71         'TestSpec',
     72         [
     73                 'test',
     74                 'priority',
     75                 'board',
     76                 'model',
     77                 'pool',
     78                 'build',
     79                 'keyvals',
     80                 # TODO(akeshet): Determine why this is necessary
     81                 # (can't this just be specified as its own dimension?) and
     82                 # delete it if it isn't necessary.
     83                 'bot_id',
     84                 'dut_name',
     85                 'expiration_secs',
     86                 'grace_period_secs',
     87                 'execution_timeout_secs',
     88                 'io_timeout_secs',
     89                 'quota_account',
     90         ])
     91 
     92 
     93 class SuiteHandler(object):
     94     """The class for handling a CrOS suite run.
     95 
     96     Its responsibility includes handling retries for child tests.
     97     """
     98 
     99     def __init__(self, specs, client):
    100         self._suite_name = specs.suite_name
    101         self._wait = specs.wait
    102         self._timeout_mins = specs.timeout_mins
    103         self._provision_num_required = specs.provision_num_required
    104         self._test_retry = specs.test_retry
    105         self._max_retries = specs.max_retries
    106         self.passed_mins = specs.passed_mins
    107 
    108         # The swarming task id of the suite that this suite_handler is handling.
    109         self._suite_id = specs.suite_id
    110         # The swarming task id of current run_suite_skylab process. It could be
    111         # different from self._suite_id if a suite_id is passed in.
    112         self._task_id = os.environ.get('SWARMING_TASK_ID')
    113         self._task_to_test_maps = {}
    114         self.successfully_provisioned_duts = set()
    115         self._client = client
    116 
    117         # It only maintains the swarming task of the final run of each
    118         # child task, i.e. it doesn't include failed swarming tasks of
    119         # each child task which will get retried later.
    120         self._active_child_tasks = []
    121 
    122     def should_wait(self):
    123         """Return whether to wait for a suite's result."""
    124         return self._wait
    125 
    126     def is_provision(self):
    127         """Return whether the suite handler is for provision suite."""
    128         return self._suite_name == 'provision'
    129 
    130     def set_suite_id(self, suite_id):
    131         """Set swarming task id for a suite.
    132 
    133         @param suite_id: The swarming task id of this suite.
    134         """
    135         self._suite_id = suite_id
    136 
    137     def add_test_by_task_id(self, task_id, test_handler_spec):
    138         """Record a child test and its swarming task id.
    139 
    140         @param task_id: the swarming task id of a child test.
    141         @param test_handler_spec: a TestHandlerSpec object.
    142         """
    143         self._task_to_test_maps[task_id] = test_handler_spec
    144 
    145     def get_test_by_task_id(self, task_id):
    146         """Get a child test by its swarming task id.
    147 
    148         @param task_id: the swarming task id of a child test.
    149         """
    150         return self._task_to_test_maps[task_id]
    151 
    152     def remove_test_by_task_id(self, task_id):
    153         """Delete a child test by its swarming task id.
    154 
    155         @param task_id: the swarming task id of a child test.
    156         """
    157         self._task_to_test_maps.pop(task_id, None)
    158 
    159     def set_max_retries(self, max_retries):
    160         """Set the max retries for a suite.
    161 
    162         @param max_retries: The current maximum retries to set.
    163         """
    164         self._max_retries = max_retries
    165 
    166     @property
    167     def task_to_test_maps(self):
    168         """Get the task_to_test_maps of a suite."""
    169         return self._task_to_test_maps
    170 
    171     @property
    172     def timeout_mins(self):
    173         """Get the timeout minutes of a suite."""
    174         return self._timeout_mins
    175 
    176     @property
    177     def suite_id(self):
    178         """Get the swarming task id of a suite."""
    179         return self._suite_id
    180 
    181     @property
    182     def task_id(self):
    183         """Get swarming task id of current process."""
    184         return self._task_id
    185 
    186     @property
    187     def max_retries(self):
    188         """Get the max num of retries of a suite."""
    189         return self._max_retries
    190 
    191     def get_active_child_tasks(self, suite_id):
    192         """Get the child tasks which is actively monitored by a suite.
    193 
    194         The active child tasks list includes tasks which are currently running
    195         or finished without following retries. E.g.
    196         Suite task X:
    197             child task 1: x1 (first try x1_1, second try x1_2)
    198             child task 2: x2 (first try: x2_1)
    199         The final active child task list will include task x1_2 and x2_1, won't
    200         include x1_1 since it's a task which is finished but get retried later.
    201         """
    202         all_tasks = self._client.get_child_tasks(suite_id)
    203         return [t for t in all_tasks if t['task_id'] in self._task_to_test_maps]
    204 
    205     def handle_results(self, suite_id):
    206         """Handle child tasks' results."""
    207         self._active_child_tasks = self.get_active_child_tasks(suite_id)
    208         self.retried_tasks = [t for t in self._active_child_tasks
    209                               if self._should_retry(t)]
    210         logging.info('Found %d tests to be retried.', len(self.retried_tasks))
    211 
    212     def _check_all_tasks_finished(self):
    213         """Check whether all tasks are finished, including retried tasks."""
    214         finished_tasks = [t for t in self._active_child_tasks if
    215                           t['state'] in swarming_lib.TASK_FINISHED_STATUS]
    216         logging.info('%d/%d child tasks finished, %d got retried.',
    217                      len(finished_tasks), len(self._active_child_tasks),
    218                      len(self.retried_tasks))
    219         return (len(finished_tasks) == len(self._active_child_tasks)
    220                 and not self.retried_tasks)
    221 
    222     def _set_successful_provisioned_duts(self):
    223         """Set successfully provisioned duts."""
    224         for t in self._active_child_tasks:
    225             if (swarming_lib.get_task_final_state(t) ==
    226                 swarming_lib.TASK_COMPLETED_SUCCESS):
    227                 dut_name = self.get_test_by_task_id(
    228                         t['task_id']).test_spec.dut_name
    229                 if dut_name:
    230                     self.successfully_provisioned_duts.add(dut_name)
    231 
    232     def is_provision_successfully_finished(self):
    233         """Check whether provision succeeds."""
    234         logging.info('Found %d successfully provisioned duts, '
    235                      'the minimum requirement is %d',
    236                      len(self.successfully_provisioned_duts),
    237                      self._provision_num_required)
    238         return (len(self.successfully_provisioned_duts) >=
    239                 self._provision_num_required)
    240 
    241     def is_finished_waiting(self):
    242         """Check whether the suite should finish its waiting."""
    243         if self.is_provision():
    244             self._set_successful_provisioned_duts()
    245             return (self.is_provision_successfully_finished() or
    246                     self._check_all_tasks_finished())
    247 
    248         return self._check_all_tasks_finished()
    249 
    250     def _should_retry(self, test_result):
    251         """Check whether a test should be retried.
    252 
    253         We will retry a test if:
    254             1. The test-level retry is enabled for this suite.
    255             2. The test fails.
    256             3. The test is currently monitored by the suite, i.e.
    257                it's not a previous retried test.
    258             4. The test has remaining retries based on JOB_RETRIES in
    259                its control file.
    260             5. The suite-level max retries isn't hit.
    261 
    262         @param test_result: A json test result from swarming API.
    263 
    264         @return True if we should retry the test.
    265         """
    266         task_id = test_result['task_id']
    267         state = test_result['state']
    268         is_failure = test_result['failure']
    269         return (self._test_retry and
    270                 ((state == swarming_lib.TASK_COMPLETED and is_failure)
    271                  or (state in swarming_lib.TASK_STATUS_TO_RETRY))
    272                 and (task_id in self._task_to_test_maps)
    273                 and (self._task_to_test_maps[task_id].remaining_retries > 0)
    274                 and (self._max_retries > 0))
    275 
    276 
    277 class Suite(object):
    278     """The class for a CrOS suite."""
    279     EXPIRATION_SECS = swarming_lib.DEFAULT_EXPIRATION_SECS
    280 
    281     def __init__(self, spec, client):
    282         """Initialize a suite.
    283 
    284         @param spec: A SuiteSpec object.
    285         @param client: A swarming_lib.Client instance.
    286         """
    287         self._ds = None
    288 
    289         self.control_file = ''
    290         self.test_specs = []
    291         self.builds = spec.builds
    292         self.test_source_build = spec.test_source_build
    293         self.suite_name = spec.suite_name
    294         self.suite_file_name = spec.suite_file_name
    295         self.priority = spec.priority
    296         self.board = spec.board
    297         self.model = spec.model
    298         self.pool = spec.pool
    299         self.job_keyvals = spec.job_keyvals
    300         self.minimum_duts = spec.minimum_duts
    301         self.timeout_mins = spec.timeout_mins
    302         self.quota_account = spec.quota_account
    303         self._client = client
    304 
    305     @property
    306     def ds(self):
    307         """Getter for private |self._ds| property.
    308 
    309         This ensures that once self.ds is called, there's a devserver ready
    310         for it.
    311         """
    312         if self._ds is None:
    313             raise errors.InValidPropertyError(
    314                 'Property self.ds is None. Please call stage_suite_artifacts() '
    315                 'before calling it.')
    316 
    317         return self._ds
    318 
    319     def _get_cros_build(self):
    320         provision = autotest.load('server.cros.provision')
    321         return self.builds.get(provision.CROS_VERSION_PREFIX,
    322                                self.builds.values()[0])
    323 
    324     def _create_suite_keyvals(self):
    325         constants = autotest.load('server.cros.dynamic_suite.constants')
    326         provision = autotest.load('server.cros.provision')
    327         cros_build = self._get_cros_build()
    328         keyvals = {
    329                 constants.JOB_BUILD_KEY: cros_build,
    330                 constants.JOB_SUITE_KEY: self.suite_name,
    331                 constants.JOB_BUILDS_KEY: self.builds
    332         }
    333         if (cros_build != self.test_source_build or
    334             len(self.builds) > 1):
    335             keyvals[constants.JOB_TEST_SOURCE_BUILD_KEY] = (
    336                     self.test_source_build)
    337             for prefix, build in self.builds.iteritems():
    338                 if prefix == provision.FW_RW_VERSION_PREFIX:
    339                     keyvals[constants.FWRW_BUILD]= build
    340                 elif prefix == provision.FW_RO_VERSION_PREFIX:
    341                     keyvals[constants.FWRO_BUILD] = build
    342 
    343         for key in self.job_keyvals:
    344             if key in constants.INHERITED_KEYVALS:
    345                 keyvals[key] = self.job_keyvals[key]
    346 
    347         return keyvals
    348 
    349     def prepare(self):
    350         """Prepare a suite job for execution."""
    351         self._stage_suite_artifacts()
    352         self._parse_suite_args()
    353         keyvals = self._create_suite_keyvals()
    354         available_bots = self._get_available_bots()
    355         if len(available_bots) < self.minimum_duts:
    356             raise errors.NoAvailableDUTsError(
    357                     self.board, self.pool, len(available_bots),
    358                     self.minimum_duts)
    359 
    360         tests = self._find_tests(available_bots_num=len(available_bots))
    361         self.test_specs = self._get_test_specs(tests, available_bots, keyvals)
    362 
    363     def _create_test_spec(self, test, keyvals, bot_id='', dut_name=''):
    364         return TestSpec(
    365                 test=test,
    366                 priority=self.priority,
    367                 board=self.board,
    368                 model=self.model,
    369                 pool=self.pool,
    370                 build=self.test_source_build,
    371                 bot_id=bot_id,
    372                 dut_name=dut_name,
    373                 keyvals=keyvals,
    374                 expiration_secs=self.timeout_mins * 60,
    375                 grace_period_secs=swarming_lib.DEFAULT_TIMEOUT_SECS,
    376                 execution_timeout_secs=self.timeout_mins * 60,
    377                 io_timeout_secs=swarming_lib.DEFAULT_TIMEOUT_SECS,
    378                 quota_account=self.quota_account,
    379         )
    380 
    381     def _get_test_specs(self, tests, available_bots, keyvals):
    382         return [self._create_test_spec(test, keyvals) for test in tests]
    383 
    384     def _stage_suite_artifacts(self):
    385         """Stage suite control files and suite-to-tests mapping file.
    386 
    387         @param build: The build to stage artifacts.
    388         """
    389         suite_common = autotest.load('server.cros.dynamic_suite.suite_common')
    390         ds, _ = suite_common.stage_build_artifacts(self.test_source_build)
    391         self._ds = ds
    392 
    393     def _parse_suite_args(self):
    394         """Get the suite args.
    395 
    396         The suite args includes:
    397             a. suite args in suite control file.
    398             b. passed-in suite args by user.
    399         """
    400         suite_common = autotest.load('server.cros.dynamic_suite.suite_common')
    401         self.control_file = suite_common.get_control_file_by_build(
    402                 self.test_source_build, self.ds, self.suite_file_name)
    403 
    404     def _find_tests(self, available_bots_num=0):
    405         """Fetch the child tests."""
    406         control_file_getter = autotest.load(
    407                 'server.cros.dynamic_suite.control_file_getter')
    408         suite_common = autotest.load('server.cros.dynamic_suite.suite_common')
    409         cf_getter = control_file_getter.DevServerGetter(
    410                 self.test_source_build, self.ds)
    411         tests = suite_common.retrieve_for_suite(
    412                 cf_getter, self.suite_name)
    413         return suite_common.filter_tests(
    414                 tests, suite_common.name_in_tag_predicate(self.suite_name))
    415 
    416     def _get_available_bots(self):
    417         """Get available bots for suites."""
    418         dimensions = {'pool': swarming_lib.SKYLAB_DRONE_POOL,
    419                       'label-board': self.board}
    420         swarming_pool_deps = swarming_lib.task_dependencies_from_labels(
    421             ['pool:%s' % self.pool])
    422         dimensions.update(swarming_pool_deps)
    423         bots = self._client.query_bots_list(dimensions)
    424         return [bot for bot in bots if swarming_lib.bot_available(bot)]
    425 
    426 
    427 class ProvisionSuite(Suite):
    428     """The class for a CrOS provision suite."""
    429     EXPIRATION_SECS = swarming_lib.DEFAULT_EXPIRATION_SECS
    430 
    431     def __init__(self, spec, client):
    432         super(ProvisionSuite, self).__init__(spec, client)
    433         self._num_required = spec.suite_args['num_required']
    434 
    435     def _find_tests(self, available_bots_num=0):
    436         """Fetch the child tests for provision suite."""
    437         control_file_getter = autotest.load(
    438                 'server.cros.dynamic_suite.control_file_getter')
    439         suite_common = autotest.load('server.cros.dynamic_suite.suite_common')
    440         cf_getter = control_file_getter.DevServerGetter(
    441                 self.test_source_build, self.ds)
    442         dummy_test = suite_common.retrieve_control_data_for_test(
    443                 cf_getter, 'dummy_Pass')
    444         logging.info('Get %d available DUTs for provision.', available_bots_num)
    445         if available_bots_num < self._num_required:
    446             logging.warning('Not enough available DUTs for provision.')
    447             raise errors.NoAvailableDUTsError(
    448                     self.board, self.pool, available_bots_num,
    449                     self._num_required)
    450 
    451         return [dummy_test] * max(self._num_required, available_bots_num)
    452 
    453     def _get_test_specs(self, tests, available_bots, keyvals):
    454         test_specs = []
    455         for idx, test in enumerate(tests):
    456             if idx < len(available_bots):
    457                 bot = available_bots[idx]
    458                 test_specs.append(self._create_test_spec(
    459                         test, keyvals, bot_id=bot['bot_id'],
    460                         dut_name=swarming_lib.get_task_dut_name(
    461                                 bot['dimensions'])))
    462             else:
    463                 test_specs.append(self._create_test_spec(test, keyvals))
    464 
    465         return test_specs
    466