Home | History | Annotate | Download | only in faft
      1 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import pprint
      8 import StringIO
      9 import subprocess
     10 import time
     11 
     12 from autotest_lib.client.bin import utils
     13 from autotest_lib.client.common_lib import error, utils
     14 from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils
     15 from autotest_lib.server.cros import debugd_dev_tools, gsutil_wrapper
     16 from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
     17 
     18 
     19 class Cr50Test(FirmwareTest):
     20     """Base class that sets up helper objects/functions for cr50 tests."""
     21     version = 1
     22 
     23     CR50_GS_URL = 'gs://chromeos-localmirror-private/distfiles/chromeos-cr50-%s/'
     24     CR50_DEBUG_FILE = '*/cr50_dbg_%s.bin'
     25     CR50_PROD_FILE = 'cr50.%s.bin.prod'
     26     NONE = 0
     27     # Saved the original device state during init.
     28     INITIAL_STATE = 1 << 0
     29     # Saved the original image, the device image, and the debug image. These
     30     # images are needed to be able to restore the original image and board id.
     31     IMAGES = 1 << 1
     32     PP_SHORT_INTERVAL = 3
     33     CLEARED_FWMP_EXIT_STATUS = 1
     34     CLEARED_FWMP_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS'
     35                               '_INVALID')
     36     # Cr50 may have flash operation errors during the test. Here's an example
     37     # of one error message.
     38     # do_flash_op:245 errors 20 fsh_pe_control 40720004
     39     # The stuff after the ':' may change, but all flash operation errors
     40     # contain do_flash_op. do_flash_op is only ever printed if there is an
     41     # error during the flash operation. Just search for do_flash_op to simplify
     42     # the search string and make it applicable to all flash op errors.
     43     CR50_FLASH_OP_ERROR_MSG = 'do_flash_op'
     44 
     45     def initialize(self, host, cmdline_args, full_args,
     46             restore_cr50_state=False, cr50_dev_path='', provision_update=False):
     47         self._saved_state = self.NONE
     48         self._raise_error_on_mismatch = not restore_cr50_state
     49         self._provision_update = provision_update
     50         super(Cr50Test, self).initialize(host, cmdline_args)
     51 
     52         if not hasattr(self, 'cr50'):
     53             raise error.TestNAError('Test can only be run on devices with '
     54                                     'access to the Cr50 console')
     55 
     56         logging.info('Test Args: %r', full_args)
     57 
     58         self.can_set_ccd_level = (not self.cr50.using_ccd() or
     59             self.cr50.testlab_is_on())
     60         self.original_ccd_level = self.cr50.get_ccd_level()
     61         self.original_ccd_settings = self.cr50.get_cap_dict(
     62                 info=self.cr50.CAP_SETTING)
     63 
     64         self.host = host
     65         tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
     66         # Clear the FWMP, so it can't disable CCD.
     67         self.clear_fwmp()
     68 
     69         if self.can_set_ccd_level:
     70             # Lock cr50 so the console will be restricted
     71             self.cr50.set_ccd_level('lock')
     72         elif self.original_ccd_level != 'lock':
     73             raise error.TestNAError('Lock the console before running cr50 test')
     74 
     75         self._save_original_state()
     76 
     77         # Verify cr50 is still running the correct version
     78         cr50_qual_version = full_args.get('cr50_qual_version', '').strip()
     79         if cr50_qual_version:
     80             _, running_rw, running_bid = self.get_saved_cr50_original_version()
     81             expected_rw, expected_bid_sym = cr50_qual_version.split('/')
     82             expected_bid = cr50_utils.GetBoardIdInfoString(expected_bid_sym,
     83                                                            symbolic=False)
     84             logging.debug('Running %s %s Expect %s %s', running_rw, running_bid,
     85                           expected_rw, expected_bid)
     86             if running_rw != expected_rw or expected_bid != running_bid:
     87                 raise error.TestError('Not running %s' % cr50_qual_version)
     88 
     89         # We successfully saved the device state
     90         self._saved_state |= self.INITIAL_STATE
     91         try:
     92             self._save_node_locked_dev_image(cr50_dev_path)
     93             self._save_original_images(full_args.get('release_path', ''))
     94             # We successfully saved the device images
     95             self._saved_state |= self.IMAGES
     96         except:
     97             if restore_cr50_state:
     98                 raise
     99 
    100 
    101     def after_run_once(self):
    102         """Log which iteration just ran"""
    103         logging.info('successfully ran iteration %d', self.iteration)
    104 
    105 
    106     def _save_node_locked_dev_image(self, cr50_dev_path):
    107         """Save or download the node locked dev image.
    108 
    109         Args:
    110             cr50_dev_path: The path to the node locked cr50 image.
    111         """
    112         if os.path.isfile(cr50_dev_path):
    113             self._node_locked_cr50_image = cr50_dev_path
    114         else:
    115             devid = self.servo.get('cr50_devid')
    116             self._node_locked_cr50_image = self.download_cr50_debug_image(
    117                 devid)[0]
    118 
    119 
    120     def _save_original_images(self, release_path):
    121         """Use the saved state to find all of the device images.
    122 
    123         This will download running cr50 image and the device image.
    124 
    125         Args:
    126             release_path: The release path given by test args
    127         """
    128         # Copy the prod and prepvt images from the DUT
    129         _, prod_rw, prod_bid = self._original_state['device_prod_ver']
    130         filename = 'prod_device_image_' + prod_rw
    131         self._device_prod_image = os.path.join(self.resultsdir,
    132                 filename)
    133         self.host.get_file(cr50_utils.CR50_PROD,
    134                 self._device_prod_image)
    135 
    136         if cr50_utils.HasPrepvtImage(self.host):
    137             _, prepvt_rw, prepvt_bid = self._original_state['device_prepvt_ver']
    138             filename = 'prepvt_device_image_' + prepvt_rw
    139             self._device_prepvt_image = os.path.join(self.resultsdir,
    140                     filename)
    141             self.host.get_file(cr50_utils.CR50_PREPVT,
    142                     self._device_prepvt_image)
    143             prepvt_bid = cr50_utils.GetBoardIdInfoString(prepvt_bid)
    144         else:
    145             self._device_prepvt_image = None
    146             prepvt_rw = None
    147             prepvt_bid = None
    148 
    149         if os.path.isfile(release_path):
    150             self._original_cr50_image = release_path
    151             logging.info('using supplied image')
    152             return
    153         # If the running cr50 image version matches the image on the DUT use
    154         # the DUT image as the original image. If the versions don't match
    155         # download the image from google storage
    156         _, running_rw, running_bid = self.get_saved_cr50_original_version()
    157 
    158         # Make sure prod_bid and running_bid are in the same format
    159         prod_bid = cr50_utils.GetBoardIdInfoString(prod_bid)
    160         running_bid = cr50_utils.GetBoardIdInfoString(running_bid)
    161         if running_rw == prod_rw and running_bid == prod_bid:
    162             logging.info('Using device cr50 prod image %s %s', prod_rw,
    163                     prod_bid)
    164             self._original_cr50_image = self._device_prod_image
    165         elif running_rw == prepvt_rw and running_bid == prepvt_bid:
    166             logging.info('Using device cr50 prepvt image %s %s', prepvt_rw,
    167                     prepvt_bid)
    168             self._original_cr50_image = self._device_prepvt_image
    169         else:
    170             logging.info('Downloading cr50 image %s %s', running_rw,
    171                     running_bid)
    172             self._original_cr50_image = self.download_cr50_release_image(
    173                 running_rw, running_bid)[0]
    174 
    175 
    176     def _save_original_state(self):
    177         """Save the cr50 related state.
    178 
    179         Save the device's current cr50 version, cr50 board id, rlz, and image
    180         at /opt/google/cr50/firmware/cr50.bin.prod. These will be used to
    181         restore the state during cleanup.
    182         """
    183         self._original_state = self.get_cr50_device_state()
    184 
    185 
    186     def get_saved_cr50_original_version(self):
    187         """Return (ro ver, rw ver, bid)."""
    188         if ('running_ver' not in self._original_state or 'cr50_image_bid' not in
    189             self._original_state):
    190             raise error.TestError('No record of original cr50 image version')
    191         return (self._original_state['running_ver'][0],
    192                 self._original_state['running_ver'][1],
    193                 self._original_state['cr50_image_bid'])
    194 
    195 
    196     def get_saved_cr50_original_path(self):
    197         """Return the local path for the original cr50 image."""
    198         if not hasattr(self, '_original_cr50_image'):
    199             raise error.TestError('No record of original image')
    200         return self._original_cr50_image
    201 
    202 
    203     def has_saved_cr50_dev_path(self):
    204         """Returns true if we saved the node locked debug image."""
    205         return hasattr(self, '_node_locked_cr50_image')
    206 
    207 
    208     def get_saved_cr50_dev_path(self):
    209         """Return the local path for the cr50 dev image."""
    210         if not self.has_saved_cr50_dev_path():
    211             raise error.TestError('No record of debug image')
    212         return self._node_locked_cr50_image
    213 
    214 
    215     def _restore_original_image(self, chip_bid, chip_flags):
    216         """Restore the cr50 image and erase the state.
    217 
    218         Make 3 attempts to update to the original image. Use a rollback from
    219         the DBG image to erase the state that can only be erased by a DBG image.
    220         Set the chip board id during rollback
    221 
    222         Args:
    223             chip_bid: the integer representation of chip board id or None if the
    224                       board id should be erased
    225             chip_flags: the integer representation of chip board id flags or
    226                         None if the board id should be erased
    227         """
    228         for i in range(3):
    229             try:
    230                 # Update to the node-locked DBG image so we can erase all of
    231                 # the state we are trying to reset
    232                 self.cr50_update(self._node_locked_cr50_image)
    233 
    234                 # Rollback to the original cr50 image.
    235                 self.cr50_update(self._original_cr50_image, rollback=True,
    236                                  chip_bid=chip_bid, chip_flags=chip_flags)
    237                 break
    238             except Exception, e:
    239                 logging.warning('Failed to restore original image attempt %d: '
    240                                 '%r', i, e)
    241 
    242 
    243     def rootfs_verification_disable(self):
    244         """Remove rootfs verification."""
    245         if not self._rootfs_verification_is_disabled():
    246             logging.debug('Removing rootfs verification.')
    247             self.rootfs_tool.enable()
    248 
    249 
    250     def _rootfs_verification_is_disabled(self):
    251         """Returns true if rootfs verification is enabled."""
    252         # Clear the TPM owner before trying to check rootfs verification
    253         tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
    254         self.rootfs_tool = debugd_dev_tools.RootfsVerificationTool()
    255         self.rootfs_tool.initialize(self.host)
    256         # rootfs_tool.is_enabled is True, that means rootfs verification is
    257         # disabled.
    258         return self.rootfs_tool.is_enabled()
    259 
    260 
    261     def _restore_original_state(self):
    262         """Restore the original cr50 related device state."""
    263         if not (self._saved_state & self.IMAGES):
    264             logging.warning('Did not save the original images. Cannot restore '
    265                             'state')
    266             return
    267         # Remove the prepvt image if the test installed one.
    268         if (not self._original_state['has_prepvt'] and
    269             cr50_utils.HasPrepvtImage(self.host)):
    270             self.host.run('rm %s' % cr50_utils.CR50_PREPVT)
    271         # If rootfs verification has been disabled, copy the cr50 device image
    272         # back onto the DUT.
    273         if self._rootfs_verification_is_disabled():
    274             cr50_utils.InstallImage(self.host, self._device_prod_image,
    275                     cr50_utils.CR50_PROD)
    276             # Install the prepvt image if there was one.
    277             if self._device_prepvt_image:
    278                 cr50_utils.InstallImage(self.host, self._device_prepvt_image,
    279                         cr50_utils.CR50_PREPVT)
    280 
    281         chip_bid_info = self._original_state['chip_bid']
    282         bid_is_erased = chip_bid_info == cr50_utils.ERASED_CHIP_BID
    283         chip_bid = None if bid_is_erased else chip_bid_info[0]
    284         chip_flags = None if bid_is_erased else chip_bid_info[2]
    285         # Update to the original image and erase the board id
    286         self._restore_original_image(chip_bid, chip_flags)
    287 
    288         # Set the RLZ code
    289         cr50_utils.SetRLZ(self.host, self._original_state['rlz'])
    290 
    291         # Verify everything is still the same
    292         mismatch = self._check_original_state()
    293         if mismatch:
    294             raise error.TestError('Could not restore state: %s' % mismatch)
    295 
    296         logging.info('Successfully restored the original cr50 state')
    297 
    298 
    299     def get_cr50_device_state(self):
    300         """Get a dict with the current device cr50 information.
    301 
    302         The state dict will include the platform brand, rlz code, chip board id,
    303         the running cr50 image version, the running cr50 image board id, and the
    304         device cr50 image version.
    305         """
    306         state = {}
    307         state['mosys platform brand'] = self.host.run('mosys platform brand',
    308             ignore_status=True).stdout.strip()
    309         state['device_prod_ver'] = cr50_utils.GetBinVersion(self.host,
    310                 cr50_utils.CR50_PROD)
    311         state['has_prepvt'] = cr50_utils.HasPrepvtImage(self.host)
    312         if state['has_prepvt']:
    313             state['device_prepvt_ver'] = cr50_utils.GetBinVersion(self.host,
    314                     cr50_utils.CR50_PREPVT)
    315         else:
    316             state['device_prepvt_ver'] = None
    317         state['rlz'] = cr50_utils.GetRLZ(self.host)
    318         state['chip_bid'] = cr50_utils.GetChipBoardId(self.host)
    319         state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid']
    320         state['running_ver'] = cr50_utils.GetRunningVersion(self.host)
    321         state['cr50_image_bid'] = self.cr50.get_active_board_id_str()
    322 
    323         logging.debug('Current Cr50 state:\n%s', pprint.pformat(state))
    324         return state
    325 
    326 
    327     def _check_original_state(self):
    328         """Compare the current cr50 state to the original state.
    329 
    330         Returns:
    331             A dictionary with the state that is wrong as the key and
    332             the new and old state as the value
    333         """
    334         if not (self._saved_state & self.INITIAL_STATE):
    335             logging.warning('Did not save the original state. Cannot verify it '
    336                             'matches')
    337             return
    338         # Make sure the /var/cache/cr50* state is up to date.
    339         cr50_utils.ClearUpdateStateAndReboot(self.host)
    340 
    341         mismatch = {}
    342         new_state = self.get_cr50_device_state()
    343 
    344         for k, new_val in new_state.iteritems():
    345             original_val = self._original_state[k]
    346             if new_val != original_val:
    347                 mismatch[k] = 'old: %s, new: %s' % (original_val, new_val)
    348 
    349         if mismatch:
    350             logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch))
    351         else:
    352             logging.info('The device is in the original state')
    353         return mismatch
    354 
    355 
    356     def _reset_ccd_settings(self):
    357         """Reset the ccd lock and capability states."""
    358         # Clear the password if one was set.
    359         if self.cr50.get_ccd_info()['Password'] != 'none':
    360             self.servo.set_nocheck('cr50_testlab', 'open')
    361             self.cr50.send_command('ccd reset')
    362             if self.cr50.get_ccd_info()['Password'] != 'none':
    363                 raise error.TestFail('Could not clear password')
    364 
    365         current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING)
    366         if self.original_ccd_settings != current_settings:
    367             if not self.can_set_ccd_level:
    368                 raise error.TestError("CCD state has changed, but we can't "
    369                         "restore it")
    370             self.fast_open(True)
    371             self.cr50.set_caps(self.original_ccd_settings)
    372 
    373         # First try using testlab open to open the device
    374         if self.cr50.testlab_is_on() and self.original_ccd_level == 'open':
    375             self.servo.set_nocheck('cr50_testlab', 'open')
    376         if self.original_ccd_level != self.cr50.get_ccd_level():
    377             self.cr50.set_ccd_level(self.original_ccd_level)
    378 
    379 
    380 
    381     def cleanup(self):
    382         """Attempt to cleanup the cr50 state. Then run firmware cleanup"""
    383         try:
    384             self._restore_cr50_state()
    385         finally:
    386             super(Cr50Test, self).cleanup()
    387 
    388         # Check the logs captured during firmware_test cleanup for cr50 errors.
    389         self._get_cr50_stats_from_uart_capture()
    390 
    391 
    392     def _get_cr50_stats_from_uart_capture(self):
    393         """Check cr50 uart output for errors.
    394 
    395         Use the logs captured during firmware_test cleanup to check for cr50
    396         errors. Flash operation issues aren't obvious unless you check the logs.
    397         All flash op errors print do_flash_op and it isn't printed during normal
    398         operation. Open the cr50 uart file and count the number of times this is
    399         printed. Log the number of errors.
    400         """
    401         if not hasattr(self, 'cr50_uart_file'):
    402             logging.info('There is not a cr50 uart file')
    403             return
    404 
    405         flash_error_count = 0
    406         with open(self.cr50_uart_file, 'r') as f:
    407             for line in f:
    408                 if self.CR50_FLASH_OP_ERROR_MSG in line:
    409                     flash_error_count += 1
    410 
    411         # Log any flash operation errors.
    412         logging.info('do_flash_op count: %d', flash_error_count)
    413 
    414 
    415     def _restore_cr50_state(self):
    416         """Restore cr50 state, so the device can be used for further testing"""
    417         state_mismatch = self._check_original_state()
    418         if state_mismatch and not self._provision_update:
    419             self._restore_original_state()
    420             if self._raise_error_on_mismatch:
    421                 raise error.TestError('Unexpected state mismatch during '
    422                                       'cleanup %s' % state_mismatch)
    423 
    424         # Try to open cr50 and enable testlab mode if it isn't enabled.
    425         try:
    426             self.fast_open(True)
    427         except:
    428             # Even if we can't open cr50, do our best to reset the rest of the
    429             # system state. Log a warning here.
    430             logging.warning('Unable to Open cr50', exc_info=True)
    431         # Reset the password as the first thing in cleanup. It is important that
    432         # if some other part of cleanup fails, the password has at least been
    433         # reset.
    434         self.cr50.send_command('rddkeepalive disable')
    435         self.cr50.send_command('ccd reset')
    436         self.cr50.send_command('wp follow_batt_pres atboot')
    437 
    438         # Reboot cr50 if the console is accessible. This will reset most state.
    439         if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]:
    440             self.cr50.reboot()
    441 
    442         # Reenable servo v4 CCD
    443         self.cr50.ccd_enable()
    444 
    445         # reboot to normal mode if the device is in dev mode.
    446         self.enter_mode_after_checking_tpm_state('normal')
    447 
    448         tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
    449         self.clear_fwmp()
    450 
    451         # Restore the ccd privilege level
    452         if hasattr(self, 'original_ccd_level'):
    453             self._reset_ccd_settings()
    454 
    455 
    456     def find_cr50_gs_image(self, filename, image_type=None):
    457         """Find the cr50 gs image name.
    458 
    459         Args:
    460             filename: the cr50 filename to match to
    461             image_type: release or debug. If it is not specified we will search
    462                         both the release and debug directories
    463         Returns:
    464             a tuple of the gsutil bucket, filename
    465         """
    466         gs_url = self.CR50_GS_URL % (image_type if image_type else '*')
    467         gs_filename = os.path.join(gs_url, filename)
    468         bucket, gs_filename = utils.gs_ls(gs_filename)[0].rsplit('/', 1)
    469         return bucket, gs_filename
    470 
    471 
    472     def download_cr50_gs_image(self, filename, image_bid='', bucket=None,
    473                                image_type=None):
    474         """Get the image from gs and save it in the autotest dir.
    475 
    476         Args:
    477             filename: The cr50 image basename
    478             image_bid: the board id info list or string. It will be added to the
    479                        filename.
    480             bucket: The gs bucket name
    481             image_type: 'debug' or 'release'. This will be used to determine
    482                         the bucket if the bucket is not given.
    483         Returns:
    484             A tuple with the local path and version
    485         """
    486         # Add the image bid string to the filename
    487         if image_bid:
    488             bid_str = cr50_utils.GetBoardIdInfoString(image_bid,
    489                                                        symbolic=True)
    490             filename += '.' + bid_str.replace(':', '_')
    491 
    492         if not bucket:
    493             bucket, filename = self.find_cr50_gs_image(filename, image_type)
    494 
    495         remote_temp_dir = '/tmp/'
    496         src = os.path.join(remote_temp_dir, filename)
    497         dest = os.path.join(self.resultsdir, filename)
    498 
    499         # Copy the image to the dut
    500         gsutil_wrapper.copy_private_bucket(host=self.host,
    501                                            bucket=bucket,
    502                                            filename=filename,
    503                                            destination=remote_temp_dir)
    504 
    505         self.host.get_file(src, dest)
    506         ver = cr50_utils.GetBinVersion(self.host, src)
    507 
    508         # Compare the image board id to the downloaded image to make sure we got
    509         # the right file
    510         downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True)
    511         if image_bid and bid_str != downloaded_bid:
    512             raise error.TestError('Could not download image with matching '
    513                                   'board id wanted %s got %s' % (bid_str,
    514                                   downloaded_bid))
    515         return dest, ver
    516 
    517 
    518     def download_cr50_debug_image(self, devid, image_bid=''):
    519         """download the cr50 debug file.
    520 
    521         Get the file with the matching devid and image board id info
    522 
    523         Args:
    524             devid: the cr50_devid string '${DEVID0} ${DEVID1}'
    525             image_bid: the image board id info string or list
    526         Returns:
    527             A tuple with the debug image local path and version
    528         """
    529         # Debug images are node locked with the devid. Add the devid to the
    530         # filename
    531         filename = self.CR50_DEBUG_FILE % (devid.replace(' ', '_'))
    532 
    533         # Download the image
    534         dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid,
    535                                                 image_type='debug')
    536 
    537         return dest, ver
    538 
    539 
    540     def download_cr50_release_image(self, image_rw, image_bid=''):
    541         """download the cr50 release file.
    542 
    543         Get the file with the matching version and image board id info
    544 
    545         Args:
    546             image_rw: the rw version string
    547             image_bid: the image board id info string or list
    548         Returns:
    549             A tuple with the release image local path and version
    550         """
    551         # Release images can be found using the rw version
    552         filename = self.CR50_PROD_FILE % image_rw
    553 
    554         # Download the image
    555         dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid,
    556                                                 image_type='release')
    557 
    558         # Compare the rw version and board id info to make sure the right image
    559         # was found
    560         if image_rw != ver[1]:
    561             raise error.TestError('Could not download image with matching '
    562                                   'rw version')
    563         return dest, ver
    564 
    565 
    566     def _cr50_verify_update(self, expected_rw, expect_rollback):
    567         """Verify the expected version is running on cr50.
    568 
    569         Args:
    570             expected_rw: The RW version string we expect to be running
    571             expect_rollback: True if cr50 should have rolled back during the
    572                              update
    573 
    574         Raises:
    575             TestFail if there is any unexpected update state
    576         """
    577         errors = []
    578         running_rw = self.cr50.get_version()
    579         if expected_rw != running_rw:
    580             errors.append('running %s not %s' % (running_rw, expected_rw))
    581 
    582         if expect_rollback != self.cr50.rolledback():
    583             errors.append('%srollback detected' %
    584                           'no ' if expect_rollback else '')
    585         if len(errors):
    586             raise error.TestFail('cr50_update failed: %s' % ', '.join(errors))
    587         logging.info('RUNNING %s after %s', expected_rw,
    588                      'rollback' if expect_rollback else 'update')
    589 
    590 
    591     def _cr50_run_update(self, path):
    592         """Install the image at path onto cr50.
    593 
    594         Args:
    595             path: the location of the image to update to
    596 
    597         Returns:
    598             the rw version of the image
    599         """
    600         tmp_dest = '/tmp/' + os.path.basename(path)
    601 
    602         dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest)
    603         cr50_utils.GSCTool(self.host, ['-a', dest])
    604         return image_ver[1]
    605 
    606 
    607     def cr50_update(self, path, rollback=False, erase_nvmem=False,
    608                     expect_rollback=False, chip_bid=None, chip_flags=None):
    609         """Attempt to update to the given image.
    610 
    611         If rollback is True, we assume that cr50 is already running an image
    612         that can rollback.
    613 
    614         Args:
    615             path: the location of the update image
    616             rollback: True if we need to force cr50 to rollback to update to
    617                       the given image
    618             erase_nvmem: True if we need to erase nvmem during rollback
    619             expect_rollback: True if cr50 should rollback on its own
    620             chip_bid: the integer representation of chip board id or None if the
    621                       board id should be erased during rollback
    622             chip_flags: the integer representation of chip board id flags or
    623                         None if the board id should be erased during rollback
    624 
    625         Raises:
    626             TestFail if the update failed
    627         """
    628         original_rw = self.cr50.get_version()
    629 
    630         # Cr50 is going to reject an update if it hasn't been up for more than
    631         # 60 seconds. Wait until that passes before trying to run the update.
    632         self.cr50.wait_until_update_is_allowed()
    633 
    634         image_rw = self._cr50_run_update(path)
    635 
    636         # Running the update may cause cr50 to reboot. Wait for that before
    637         # sending more commands. The reboot should happen quickly. Wait a
    638         # maximum of 10 seconds.
    639         self.cr50.wait_for_reboot(timeout=10)
    640 
    641         if erase_nvmem and rollback:
    642             self.cr50.erase_nvmem()
    643 
    644         if rollback:
    645             self.cr50.rollback(chip_bid=chip_bid, chip_flags=chip_flags)
    646 
    647         expected_rw = original_rw if expect_rollback else image_rw
    648         # If we expect a rollback, the version should remain unchanged
    649         self._cr50_verify_update(expected_rw, rollback or expect_rollback)
    650 
    651 
    652     def ccd_open_from_ap(self):
    653         """Start the open process and press the power button."""
    654         self._ccd_open_last_len = 0
    655 
    656         self._ccd_open_stdout = StringIO.StringIO()
    657 
    658         ccd_open_cmd = utils.sh_escape('gsctool -a -o')
    659         full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'),
    660             ccd_open_cmd)
    661         # Start running the Cr50 Open process in the background.
    662         self._ccd_open_job = utils.BgJob(full_ssh_cmd,
    663                                          nickname='ccd_open',
    664                                          stdout_tee=self._ccd_open_stdout,
    665                                          stderr_tee=utils.TEE_TO_LOGS)
    666 
    667         if self._ccd_open_job == None:
    668             raise error.TestFail('could not start ccd open')
    669 
    670         try:
    671             # Wait for the first gsctool power button prompt before starting the
    672             # open process.
    673             logging.info(self._get_ccd_open_output())
    674             # Cr50 starts out by requesting 5 quick presses then 4 longer
    675             # power button presses. Run the quick presses without looking at the
    676             # command output, because getting the output can take some time. For
    677             # the presses that require a 1 minute wait check the output between
    678             # presses, so we can catch errors
    679             #
    680             # run quick presses for 30 seconds. It may take a couple of seconds
    681             # for open to start. 10 seconds should be enough. 30 is just used
    682             # because it will definitely be enough, and this process takes 300
    683             # seconds, so doing quick presses for 30 seconds won't matter.
    684             end_time = time.time() + 30
    685             while time.time() < end_time:
    686                 self.servo.power_short_press()
    687                 logging.info('short int power button press')
    688                 time.sleep(self.PP_SHORT_INTERVAL)
    689             # Poll the output and press the power button for the longer presses.
    690             utils.wait_for_value(self._check_open_and_press_power_button,
    691                 expected_value=True, timeout_sec=self.cr50.PP_LONG)
    692         except Exception, e:
    693             logging.info(e)
    694             raise
    695         finally:
    696             self._close_ccd_open_job()
    697         logging.info(self.cr50.get_ccd_info())
    698 
    699 
    700     def _check_open_and_press_power_button(self):
    701         """Check stdout and press the power button if prompted.
    702 
    703         Returns:
    704             True if the process is still running.
    705         """
    706         logging.info(self._get_ccd_open_output())
    707         self.servo.power_short_press()
    708         logging.info('long int power button press')
    709         # Give cr50 some time to complete the open process. After the last
    710         # power button press cr50 erases nvmem and resets the dut before setting
    711         # the state to open. Wait a bit so we don't check the ccd state in the
    712         # middle of this reset process. Power button requests happen once a
    713         # minute, so waiting 10 seconds isn't a big deal.
    714         time.sleep(10)
    715         return (self._ccd_open_job.sp.poll() is not None or 'Open' in
    716                 self.cr50.get_ccd_info()['State'])
    717 
    718 
    719     def _get_ccd_open_output(self):
    720         """Read the new output."""
    721         self._ccd_open_job.process_output()
    722         self._ccd_open_stdout.seek(self._ccd_open_last_len)
    723         output = self._ccd_open_stdout.read()
    724         self._ccd_open_last_len = self._ccd_open_stdout.len
    725         return output
    726 
    727 
    728     def _close_ccd_open_job(self):
    729         """Terminate the process and check the results."""
    730         exit_status = utils.nuke_subprocess(self._ccd_open_job.sp)
    731         stdout = self._ccd_open_stdout.getvalue().strip()
    732         delattr(self, '_ccd_open_job')
    733         if stdout:
    734             logging.info('stdout of ccd open:\n%s', stdout)
    735         if exit_status:
    736             logging.info('exit status: %d', exit_status)
    737         if 'Error' in stdout:
    738             raise error.TestFail('ccd open Error %s' %
    739                                  stdout.split('Error')[-1])
    740         if self.cr50.OPEN != self.cr50.get_ccd_level():
    741             raise error.TestFail('unable to open cr50: %s' % stdout)
    742         else:
    743             logging.info('Opened Cr50')
    744 
    745 
    746     def fast_open(self, enable_testlab=False):
    747         """Try to use testlab open. If that fails, do regular ap open.
    748 
    749         Args:
    750             enable_testlab: If True, enable testlab mode after cr50 is open.
    751         """
    752         # Try to use testlab open first, so we don't have to wait for the
    753         # physical presence check.
    754         self.cr50.send_command('ccd testlab open')
    755         if self.cr50.get_ccd_level() == 'open':
    756             return
    757 
    758         # Use the console to open cr50 without entering dev mode if possible. It
    759         # takes longer and relies on more systems to enter dev mode and ssh into
    760         # the AP. Skip the steps that aren't required.
    761         if not self.cr50.get_cap('OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]:
    762             self.enter_mode_after_checking_tpm_state('dev')
    763 
    764         if self.cr50.get_cap('OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]:
    765             self.cr50.set_ccd_level(self.cr50.OPEN)
    766         else:
    767             self.ccd_open_from_ap()
    768 
    769         if enable_testlab:
    770             self.cr50.set_ccd_testlab('on')
    771 
    772         # Make sure the device is in normal mode. After opening cr50, the TPM
    773         # should be cleared and the device should automatically reset to normal
    774         # mode. Just check to be consistent. It's possible capabilitiy settings
    775         # are set to skip wiping the TPM.
    776         self.enter_mode_after_checking_tpm_state('normal')
    777 
    778 
    779     def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error):
    780         """Run a gsctool command and input the password
    781 
    782         Args:
    783             password: The cr50 password string
    784             cmd: The gsctool command
    785             name: The name to give the job
    786             expect_error: True if the command should fail
    787         """
    788         set_pwd_cmd = utils.sh_escape(cmd)
    789         full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'),
    790             set_pwd_cmd)
    791         stdout = StringIO.StringIO()
    792         # Start running the gsctool Command in the background.
    793         gsctool_job = utils.BgJob(full_ssh_command,
    794                                   nickname='%s_with_password' % name,
    795                                   stdout_tee=stdout,
    796                                   stderr_tee=utils.TEE_TO_LOGS,
    797                                   stdin=subprocess.PIPE)
    798         if gsctool_job == None:
    799             raise error.TestFail('could not start gsctool command %r', cmd)
    800 
    801         try:
    802             # Wait for enter prompt
    803             gsctool_job.process_output()
    804             logging.info(stdout.getvalue().strip())
    805             # Enter the password
    806             gsctool_job.sp.stdin.write(password + '\n')
    807 
    808             # Wait for re-enter prompt
    809             gsctool_job.process_output()
    810             logging.info(stdout.getvalue().strip())
    811             # Re-enter the password
    812             gsctool_job.sp.stdin.write(password + '\n')
    813             time.sleep(self.cr50.CONSERVATIVE_CCD_WAIT)
    814             gsctool_job.process_output()
    815         finally:
    816             exit_status = utils.nuke_subprocess(gsctool_job.sp)
    817             output = stdout.getvalue().strip()
    818             logging.info('%s stdout: %s', name, output)
    819             logging.info('%s exit status: %s', name, exit_status)
    820             if exit_status:
    821                 message = ('gsctool %s failed using %r: %s %s' %
    822                            (name, password, exit_status, output))
    823                 if expect_error:
    824                     logging.info(message)
    825                 else:
    826                     raise error.TestFail(message)
    827             elif expect_error:
    828                 raise error.TestFail('%s with %r did not fail when expected' %
    829                                      (name, password))
    830 
    831 
    832     def set_ccd_password(self, password, expect_error=False):
    833         """Set the ccd password"""
    834         # If for some reason the test sets a password and is interrupted before
    835         # we can clear it, we want testlab mode to be enabled, so it's possible
    836         # to clear the password without knowing it.
    837         if not self.cr50.testlab_is_on():
    838             raise error.TestError('Will not set password unless testlab mode '
    839                                   'is enabled.')
    840         self.run_gsctool_cmd_with_password(
    841                 password, 'gsctool -a -P', 'set_password', expect_error)
    842 
    843 
    844     def ccd_unlock_from_ap(self, password=None, expect_error=False):
    845         """Unlock cr50"""
    846         if not password:
    847             self.host.run('gsctool -a -U')
    848             return
    849         self.run_gsctool_cmd_with_password(
    850                 password, 'gsctool -a -U', 'unlock', expect_error)
    851 
    852 
    853     def enter_mode_after_checking_tpm_state(self, mode):
    854         """Reboot to mode if cr50 doesn't already match the state"""
    855         # If the device is already in the correct mode, don't do anything
    856         if (mode == 'dev') == self.cr50.in_dev_mode():
    857             logging.info('already in %r mode', mode)
    858             return
    859 
    860         self.switcher.reboot_to_mode(to_mode=mode)
    861 
    862         if (mode == 'dev') != self.cr50.in_dev_mode():
    863             raise error.TestError('Unable to enter %r mode' % mode)
    864 
    865 
    866     def _fwmp_is_cleared(self):
    867         """Return True if the FWMP has been created"""
    868         res = self.host.run('cryptohome '
    869                             '--action=get_firmware_management_parameters',
    870                             ignore_status=True)
    871         if res.exit_status and res.exit_status != self.CLEARED_FWMP_EXIT_STATUS:
    872             raise error.TestError('Could not run cryptohome command %r' % res)
    873         return self.CLEARED_FWMP_ERROR_MSG in res.stdout
    874 
    875 
    876     def clear_fwmp(self):
    877         """Clear the FWMP"""
    878         if self._fwmp_is_cleared():
    879             return
    880         status = self.host.run('cryptohome --action=tpm_status').stdout
    881         logging.debug(status)
    882         if 'TPM Owned: true' not in status:
    883             self.host.run('cryptohome --action=tpm_take_ownership')
    884             self.host.run('cryptohome --action=tpm_wait_ownership')
    885         self.host.run('cryptohome '
    886                       '--action=remove_firmware_management_parameters')
    887 
    888     def tpm_is_responsive(self):
    889         """Check TPM responsiveness by running tpm_version."""
    890         result = self.host.run('tpm_version', ignore_status=True)
    891         logging.debug(result.stdout.strip())
    892         return not result.exit_status
    893