Home | History | Annotate | Download | only in faft
      1 # Copyright 2018 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 
      8 from autotest_lib.server import test
      9 from autotest_lib.client.common_lib import error, utils
     10 from autotest_lib.server.cros import gsutil_wrapper
     11 from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
     12 
     13 
     14 class FingerprintTest(test.test):
     15     """Base class that sets up helpers for fingerprint tests."""
     16     version = 1
     17 
     18     _FINGERPRINT_BOARD_NAME_SUFFIX = '_fp'
     19 
     20     # Location of firmware from the build on the DUT
     21     _FINGERPRINT_BUILD_FW_GLOB = '/opt/google/biod/fw/*_fp*.bin'
     22 
     23     _GENIMAGES_SCRIPT_NAME = 'gen_test_images.sh'
     24     _GENIMAGES_OUTPUT_DIR_NAME = 'images'
     25 
     26     _TEST_IMAGE_FORMAT_MAP = {
     27         'TEST_IMAGE_ORIGINAL': '%s.bin',
     28         'TEST_IMAGE_DEV': '%s.dev',
     29         'TEST_IMAGE_CORRUPT_FIRST_BYTE': '%s_corrupt_first_byte.bin',
     30         'TEST_IMAGE_CORRUPT_LAST_BYTE': '%s_corrupt_last_byte.bin',
     31         'TEST_IMAGE_DEV_RB_ZERO': '%s.dev.rb0',
     32         'TEST_IMAGE_DEV_RB_ONE': '%s.dev.rb1',
     33         'TEST_IMAGE_DEV_RB_NINE': '%s.dev.rb9'
     34     }
     35 
     36     _ROLLBACK_INITIAL_BLOCK_ID = '1'
     37     _ROLLBACK_INITIAL_MIN_VERSION = '0'
     38     _ROLLBACK_INITIAL_RW_VERSION = '0'
     39 
     40     _SERVER_GENERATED_FW_DIR_NAME = 'generated_fw'
     41 
     42     _DUT_TMP_PATH_BASE = '/tmp/fp_test'
     43 
     44     _GOLDEN_RO_FIRMWARE_VERSION_MAP = {
     45         'nocturne_fp': 'nocturne_fp_v2.2.64-58cf5974e'
     46     }
     47 
     48     _BIOD_UPSTART_JOB_NAME = 'biod'
     49     # TODO(crbug.com/925545)
     50     _TIMBERSLIDE_UPSTART_JOB_NAME = \
     51         'timberslide LOG_PATH=/sys/kernel/debug/cros_fp/console_log'
     52 
     53     _INIT_ENTROPY_CMD = 'bio_wash --factory_init'
     54 
     55     _CROS_FP_ARG = '--name=cros_fp'
     56     _ECTOOL_RO_VERSION = 'RO version'
     57     _ECTOOL_RW_VERSION = 'RW version'
     58     _ECTOOL_ROLLBACK_BLOCK_ID = 'Rollback block id'
     59     _ECTOOL_ROLLBACK_MIN_VERSION = 'Rollback min version'
     60     _ECTOOL_ROLLBACK_RW_VERSION = 'RW rollback version'
     61 
     62     @staticmethod
     63     def _parse_ectool_output(ectool_output):
     64         """Converts ectool colon delimited output into python dict.
     65 
     66         Example:
     67         RO version:    nocturne_fp_v2.2.64-58cf5974e
     68         RW version:    nocturne_fp_v2.2.110-b936c0a3c
     69 
     70         becomes:
     71         {
     72           'RO version': 'nocturne_fp_v2.2.64-58cf5974e',
     73           'RW version': 'nocturne_fp_v2.2.110-b936c0a3c'
     74         }
     75         """
     76         ret = {}
     77         try:
     78             for line in ectool_output.strip().split('\n'):
     79                 key = line.split(':', 1)[0].strip()
     80                 val = line.split(':', 1)[1].strip()
     81                 ret[key] = val
     82         except:
     83             raise error.TestFail('Unable to parse ectool output: %s'
     84                                  % ectool_output)
     85         return ret
     86 
     87     def initialize(self, host, test_dir, use_dev_signed_fw=False):
     88         """Performs initialization."""
     89         self.host = host
     90         self.servo = host.servo
     91 
     92         self._validate_compatible_servo_version()
     93 
     94         self.servo.initialize_dut()
     95 
     96         logging.info('HW write protect enabled: %s',
     97                      self.is_hardware_write_protect_enabled())
     98 
     99         # TODO(crbug.com/925545): stop timberslide so /var/log/cros_fp.log
    100         # continues to update after flashing.
    101         self._timberslide_running = self.host.upstart_status(
    102             self._TIMBERSLIDE_UPSTART_JOB_NAME)
    103         if self._timberslide_running:
    104             logging.info('Stopping %s', self._TIMBERSLIDE_UPSTART_JOB_NAME)
    105             self.host.upstart_stop(self._TIMBERSLIDE_UPSTART_JOB_NAME)
    106 
    107         self._biod_running = self.host.upstart_status(
    108             self._BIOD_UPSTART_JOB_NAME)
    109         if self._biod_running:
    110             logging.info('Stopping %s', self._BIOD_UPSTART_JOB_NAME)
    111             self.host.upstart_stop(self._BIOD_UPSTART_JOB_NAME)
    112 
    113         # create tmp working directory on device (automatically cleaned up)
    114         self._dut_working_dir = self.host.get_tmp_dir(
    115             parent=self._DUT_TMP_PATH_BASE)
    116         logging.info('Created dut_working_dir: %s', self._dut_working_dir)
    117         self.copy_files_to_dut(test_dir, self._dut_working_dir)
    118 
    119         self._build_fw_file = self.get_build_fw_file()
    120 
    121         gen_script = os.path.abspath(os.path.join(self.autodir,
    122                                                   'server', 'cros', 'faft',
    123                                                   self._GENIMAGES_SCRIPT_NAME))
    124         self._dut_firmware_test_images_dir = \
    125             self._generate_test_firmware_images(gen_script,
    126                                                 self._build_fw_file,
    127                                                 self._dut_working_dir)
    128         logging.info('dut_firmware_test_images_dir: %s',
    129                      self._dut_firmware_test_images_dir)
    130 
    131         self._initialize_test_firmware_image_attrs(
    132             self._dut_firmware_test_images_dir)
    133 
    134         self._initialize_running_fw_version(use_dev_signed_fw)
    135         self._initialize_fw_entropy()
    136 
    137     def cleanup(self):
    138         """Restores original state."""
    139         # Once the tests complete we need to make sure we're running the
    140         # original firmware (not dev version) and potentially reset rollback.
    141         self._initialize_running_fw_version(False)
    142         self._initialize_fw_entropy()
    143         if hasattr(self, '_biod_running') and self._biod_running:
    144             logging.info('Restarting biod')
    145             self.host.upstart_restart(self._BIOD_UPSTART_JOB_NAME)
    146         # TODO(crbug.com/925545)
    147         if hasattr(self, '_timberslide_running') and self._timberslide_running:
    148             logging.info('Restarting timberslide')
    149             self.host.upstart_restart(self._TIMBERSLIDE_UPSTART_JOB_NAME)
    150 
    151         super(FingerprintTest, self).cleanup()
    152 
    153     def after_run_once(self):
    154         """Logs which iteration just ran."""
    155         logging.info('successfully ran iteration %d', self.iteration)
    156 
    157     def _validate_compatible_servo_version(self):
    158         """Asserts if a compatible servo version is not attached."""
    159         servo_version = self.servo.get_servo_version()
    160         logging.info('servo version: %s', servo_version)
    161         if not servo_version.startswith('servo_v4'):
    162             raise error.TestFail(
    163                 'These tests have only been tested while using servo v4')
    164 
    165     def _generate_test_firmware_images(self, gen_script, build_fw_file,
    166                                        dut_working_dir):
    167         """
    168         Copies the fingerprint firmware from the DUT to the server running
    169         the tests, which runs a script to generate various test versions of
    170         the firmware.
    171 
    172         @return full path to location of test images on DUT
    173         """
    174         # create subdirectory under existing tmp dir
    175         server_tmp_dir = os.path.join(self.tmpdir,
    176                                       self._SERVER_GENERATED_FW_DIR_NAME)
    177         os.mkdir(server_tmp_dir)
    178         logging.info('server_tmp_dir: %s', server_tmp_dir)
    179 
    180         # Copy firmware from device to server
    181         self.get_files_from_dut(build_fw_file, server_tmp_dir)
    182 
    183         # Run the test image generation script on server
    184         pushd = os.getcwd()
    185         os.chdir(server_tmp_dir)
    186         cmd = ' '.join([gen_script,
    187                         self.get_fp_board(),
    188                         os.path.basename(build_fw_file)])
    189         self.run_server_cmd(cmd)
    190         os.chdir(pushd)
    191 
    192         # Copy resulting files to DUT tmp dir
    193         server_generated_images_dir = \
    194             os.path.join(server_tmp_dir, self._GENIMAGES_OUTPUT_DIR_NAME)
    195         self.copy_files_to_dut(server_generated_images_dir, dut_working_dir)
    196 
    197         return os.path.join(dut_working_dir, self._GENIMAGES_OUTPUT_DIR_NAME)
    198 
    199     def _initialize_test_firmware_image_attrs(self, dut_fw_test_images_dir):
    200         """Sets attributes with full path to test images on DUT.
    201 
    202         Example: self.TEST_IMAGE_DEV = /some/path/images/nocturne_fp.dev
    203         """
    204         for key, val in self._TEST_IMAGE_FORMAT_MAP.iteritems():
    205             full_path = os.path.join(dut_fw_test_images_dir,
    206                                      val % self.get_fp_board())
    207             setattr(self, key, full_path)
    208 
    209     def _initialize_running_fw_version(self, use_dev_signed_fw):
    210         """
    211         Ensures that the running firmware version matches build version
    212         and factory rollback settings; flashes to correct version if either
    213         fails to match.
    214 
    215         RO firmware: original version released at factory
    216         RW firmware: firmware from current build
    217         """
    218         build_rw_firmware_version = \
    219             self.get_build_rw_firmware_version(use_dev_signed_fw)
    220         golden_ro_firmware_version = \
    221             self.get_golden_ro_firmware_version(use_dev_signed_fw)
    222         logging.info('Build RW firmware version: %s', build_rw_firmware_version)
    223         logging.info('Golden RO firmware version: %s',
    224                      golden_ro_firmware_version)
    225 
    226         fw_versions_match = self.running_fw_version_matches_given_version(
    227             build_rw_firmware_version, golden_ro_firmware_version)
    228 
    229         if not fw_versions_match or not self.is_rollback_set_to_initial_val():
    230             fw_file = self._build_fw_file
    231             if use_dev_signed_fw:
    232                 fw_file = self.TEST_IMAGE_DEV
    233             self.flash_rw_ro_firmware(fw_file)
    234             if not self.running_fw_version_matches_given_version(
    235                 build_rw_firmware_version, golden_ro_firmware_version):
    236                 raise error.TestFail(
    237                     'Running firmware version does not match expected version')
    238 
    239     def _initialize_fw_entropy(self):
    240         """Sets the entropy (key) in FPMCU flash (if not set)."""
    241         result = self.run_cmd(self._INIT_ENTROPY_CMD)
    242         if result.exit_status != 0:
    243             raise error.TestFail('Unable to initialize entropy')
    244 
    245     def get_fp_board(self):
    246         """Returns name of fingerprint EC."""
    247         board = self.host.get_board().replace(ds_constants.BOARD_PREFIX, '')
    248         return board + self._FINGERPRINT_BOARD_NAME_SUFFIX
    249 
    250     def get_build_fw_file(self):
    251         """Returns full path to build FW file on DUT."""
    252         ls_cmd = 'ls ' + self._FINGERPRINT_BUILD_FW_GLOB
    253         result = self.run_cmd(ls_cmd)
    254         if result.exit_status != 0:
    255             raise error.TestFail('Unable to find firmware from build on device')
    256         ret = result.stdout.rstrip()
    257         logging.info('Build firmware file: %s', ret)
    258         return ret
    259 
    260     def _get_running_firmware_version(self, fw_type):
    261         """Returns requested firmware version (RW or RO)."""
    262         result = self._run_ectool_cmd('version')
    263         parsed = self._parse_ectool_output(result.stdout)
    264         if result.exit_status != 0:
    265             raise error.TestFail('Failed to get firmware version')
    266         version = parsed.get(fw_type)
    267         if version is None:
    268             raise error.TestFail('Failed to get firmware version: %s' % fw_type)
    269         return version
    270 
    271     def get_running_rw_firmware_version(self):
    272         """Returns running RW firmware version."""
    273         return self._get_running_firmware_version(self._ECTOOL_RW_VERSION)
    274 
    275     def get_running_ro_firmware_version(self):
    276         """Returns running RO firmware version."""
    277         return self._get_running_firmware_version(self._ECTOOL_RO_VERSION)
    278 
    279     def _get_rollback_info(self, info_type):
    280         """Returns requested type of rollback info."""
    281         result = self._run_ectool_cmd('rollbackinfo')
    282         parsed = self._parse_ectool_output(result.stdout)
    283         # TODO(crbug.com/924283): rollbackinfo always returns an error
    284         # if result.exit_status != 0:
    285         #    raise error.TestFail('Failed to get rollback info')
    286         info = parsed.get(info_type)
    287         if info is None:
    288             raise error.TestFail('Failed to get rollback info: %s' % info_type)
    289         return info
    290 
    291     def get_rollback_id(self):
    292         """Returns rollback ID."""
    293         return self._get_rollback_info(self._ECTOOL_ROLLBACK_BLOCK_ID)
    294 
    295     def get_rollback_min_version(self):
    296         """Returns rollback min version."""
    297         return self._get_rollback_info(self._ECTOOL_ROLLBACK_MIN_VERSION)
    298 
    299     def get_rollback_rw_version(self):
    300         """Returns RW rollback version."""
    301         return self._get_rollback_info(self._ECTOOL_ROLLBACK_RW_VERSION)
    302 
    303     def _construct_dev_version(self, orig_version):
    304         """
    305         Given a "regular" version string from a signed build, returns the
    306         special "dev" version that we use when creating the test images.
    307         """
    308         fw_version = orig_version
    309         if len(fw_version) + len('.dev') > 31:
    310             fw_version = fw_version[:27]
    311         fw_version = fw_version + '.dev'
    312         return fw_version
    313 
    314     def get_golden_ro_firmware_version(self, use_dev_signed_fw):
    315         """Returns RO firmware version used in factory."""
    316         board = self.get_fp_board()
    317         golden_version = self._GOLDEN_RO_FIRMWARE_VERSION_MAP.get(board)
    318         if golden_version is None:
    319             raise error.TestFail('Unable to get golden RO version for board: '
    320                                  % board)
    321         if use_dev_signed_fw:
    322             golden_version = self._construct_dev_version(golden_version)
    323         return golden_version
    324 
    325     def get_build_rw_firmware_version(self, use_dev_signed_fw):
    326         """Returns RW firmware version from build (based on filename)."""
    327         fw_file = os.path.basename(self._build_fw_file)
    328         if not fw_file.endswith('.bin'):
    329             raise error.TestFail('Unexpected filename for RW firmware: '
    330                                  % fw_file)
    331         fw_version = fw_file[:-4]
    332         if use_dev_signed_fw:
    333             fw_version = self._construct_dev_version(fw_version)
    334         return fw_version
    335 
    336     def running_fw_version_matches_given_version(self, rw_version, ro_version):
    337         """
    338         Returns True if the running RO and RW firmware versions match the
    339         provided versions.
    340         """
    341         running_rw_firmware_version = self.get_running_rw_firmware_version()
    342         running_ro_firmware_version = self.get_running_ro_firmware_version()
    343 
    344         logging.info('RW firmware, running: %s, expected: %s',
    345                      running_rw_firmware_version, rw_version)
    346         logging.info('RO firmware, running: %s, expected: %s',
    347                      running_ro_firmware_version, ro_version)
    348 
    349         return (running_rw_firmware_version == rw_version and
    350                 running_ro_firmware_version == ro_version)
    351 
    352     def is_rollback_set_to_initial_val(self):
    353         """
    354         Returns True if rollbackinfo matches the initial value that it
    355         should have coming from the factory.
    356         """
    357         return (self.get_rollback_id() ==
    358                 self._ROLLBACK_INITIAL_BLOCK_ID
    359                 and
    360                 self.get_rollback_min_version() ==
    361                 self._ROLLBACK_INITIAL_MIN_VERSION
    362                 and
    363                 self.get_rollback_rw_version() ==
    364                 self._ROLLBACK_INITIAL_RW_VERSION)
    365 
    366     def _download_firmware(self, gs_path, dut_file_path):
    367         """Downloads firmware from Google Storage bucket."""
    368         bucket = os.path.dirname(gs_path)
    369         filename = os.path.basename(gs_path)
    370         logging.info('Downloading firmware, '
    371                      'bucket: %s, filename: %s, dest: %s',
    372                      bucket, filename, dut_file_path)
    373         gsutil_wrapper.copy_private_bucket(host=self.host,
    374                                            bucket=bucket,
    375                                            filename=filename,
    376                                            destination=dut_file_path)
    377         return os.path.join(dut_file_path, filename)
    378 
    379     def flash_rw_firmware(self, fw_path):
    380         """Flashes the RW (read-write) firmware."""
    381         flash_cmd = os.path.join(self._dut_working_dir,
    382                                  'flash_fp_rw.sh' + ' ' + fw_path)
    383         result = self.run_cmd(flash_cmd)
    384         if result.exit_status != 0:
    385             raise error.TestFail('Flashing RW firmware failed')
    386 
    387     def flash_rw_ro_firmware(self, fw_path):
    388         """Flashes *all* firmware (both RO and RW)."""
    389         self.set_hardware_write_protect(False)
    390         flash_cmd = 'flash_fp_mcu' + ' ' + fw_path
    391         logging.info('Running flash cmd: %s', flash_cmd)
    392         result = self.run_cmd(flash_cmd)
    393         self.set_hardware_write_protect(True)
    394         if result.exit_status != 0:
    395             raise error.TestFail('Flashing RW/RO firmware failed')
    396 
    397     def is_hardware_write_protect_enabled(self):
    398         """Returns state of hardware write protect."""
    399         fw_wp_state = self.servo.get('fw_wp_state')
    400         return fw_wp_state == 'on' or fw_wp_state == 'force_on'
    401 
    402     def set_hardware_write_protect(self, enable):
    403         """Enables or disables hardware write protect."""
    404         self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
    405 
    406     def get_files_from_dut(self, src, dst):
    407         """Copes files from DUT to server."""
    408         logging.info('Copying files from (%s) to (%s).', src, dst)
    409         self.host.get_file(src, dst, delete_dest=True)
    410 
    411     def copy_files_to_dut(self, src_dir, dst_dir):
    412         """Copies files from server to DUT."""
    413         logging.info('Copying files from (%s) to (%s).', src_dir, dst_dir)
    414         self.host.send_file(src_dir, dst_dir, delete_dest=True)
    415 
    416     def run_server_cmd(self, command, timeout=60):
    417         """Runs command on server; return result with output and exit code."""
    418         logging.info('Server execute: %s', command)
    419         result = utils.run(command, timeout=timeout, ignore_status=True)
    420         logging.info('exit_code: %d', result.exit_status)
    421         logging.info('stdout:\n%s', result.stdout)
    422         logging.info('stderr:\n%s', result.stderr)
    423         return result
    424 
    425     def run_cmd(self, command, timeout=300):
    426         """Runs command on the DUT; return result with output and exit code."""
    427         logging.debug('DUT Execute: %s', command)
    428         result = self.host.run(command, timeout=timeout, ignore_status=True)
    429         logging.info('exit_code: %d', result.exit_status)
    430         logging.info('stdout:\n%s', result.stdout)
    431         logging.info('stderr:\n%s', result.stderr)
    432         return result
    433 
    434     def _run_ectool_cmd(self, command):
    435         """Runs ectool on DUT; return result with output and exit code."""
    436         cmd = 'ectool ' + self._CROS_FP_ARG + ' ' + command
    437         result = self.run_cmd(cmd)
    438         return result
    439 
    440     def run_test(self, test_name, *args):
    441         """Runs test on DUT."""
    442         logging.info('Running %s', test_name)
    443         # Redirecting stderr to stdout since some commands intentionally fail
    444         # and it's easier to read when everything ordered in the same output
    445         test_cmd = ' '.join([os.path.join(self._dut_working_dir, test_name)] +
    446                             list(args) + ['2>&1'])
    447         logging.info('Test command: %s', test_cmd)
    448         result = self.run_cmd(test_cmd)
    449         if result.exit_status != 0:
    450             raise error.TestFail(test_name + ' failed')
    451