Home | History | Annotate | Download | only in faft
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import ast
      6 import ctypes
      7 import logging
      8 import os
      9 import pprint
     10 import re
     11 import time
     12 import uuid
     13 
     14 from autotest_lib.client.bin import utils
     15 from autotest_lib.client.common_lib import error
     16 from autotest_lib.server import test
     17 from autotest_lib.server.cros import vboot_constants as vboot
     18 from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
     19 from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
     20 from autotest_lib.server.cros.faft.utils import mode_switcher
     21 from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
     22 from autotest_lib.server.cros.servo import chrome_base_ec
     23 from autotest_lib.server.cros.servo import chrome_cr50
     24 from autotest_lib.server.cros.servo import chrome_ec
     25 
     26 ConnectionError = mode_switcher.ConnectionError
     27 
     28 
     29 class FAFTBase(test.test):
     30     """The base class of FAFT classes.
     31 
     32     It launches the FAFTClient on DUT, such that the test can access its
     33     firmware functions and interfaces. It also provides some methods to
     34     handle the reboot mechanism, in order to ensure FAFTClient is still
     35     connected after reboot.
     36     """
     37     def initialize(self, host):
     38         """Create a FAFTClient object and install the dependency."""
     39         self.servo = host.servo
     40         self.servo.initialize_dut()
     41         self._client = host
     42         self.faft_client = RPCProxy(host)
     43         self.lockfile = '/var/tmp/faft/lock'
     44 
     45 
     46 class FirmwareTest(FAFTBase):
     47     """
     48     Base class that sets up helper objects/functions for firmware tests.
     49 
     50     TODO: add documentaion as the FAFT rework progresses.
     51     """
     52     version = 1
     53 
     54     # Mapping of partition number of kernel and rootfs.
     55     KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
     56     ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
     57     OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
     58     OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
     59 
     60     CHROMEOS_MAGIC = "CHROMEOS"
     61     CORRUPTED_MAGIC = "CORRUPTD"
     62 
     63     # Delay for waiting client to return before EC suspend
     64     EC_SUSPEND_DELAY = 5
     65 
     66     # Delay between EC suspend and wake
     67     WAKE_DELAY = 10
     68 
     69     # Delay between closing and opening lid
     70     LID_DELAY = 1
     71 
     72     _SERVOD_LOG = '/var/log/servod.log'
     73 
     74     _ROOTFS_PARTITION_NUMBER = 3
     75 
     76     _backup_firmware_sha = ()
     77     _backup_kernel_sha = dict()
     78     _backup_cgpt_attr = dict()
     79     _backup_gbb_flags = None
     80     _backup_dev_mode = None
     81 
     82     # Class level variable, keep track the states of one time setup.
     83     # This variable is preserved across tests which inherit this class.
     84     _global_setup_done = {
     85         'gbb_flags': False,
     86         'reimage': False,
     87         'usb_check': False,
     88     }
     89 
     90     @classmethod
     91     def check_setup_done(cls, label):
     92         """Check if the given setup is done.
     93 
     94         @param label: The label of the setup.
     95         """
     96         return cls._global_setup_done[label]
     97 
     98     @classmethod
     99     def mark_setup_done(cls, label):
    100         """Mark the given setup done.
    101 
    102         @param label: The label of the setup.
    103         """
    104         cls._global_setup_done[label] = True
    105 
    106     @classmethod
    107     def unmark_setup_done(cls, label):
    108         """Mark the given setup not done.
    109 
    110         @param label: The label of the setup.
    111         """
    112         cls._global_setup_done[label] = False
    113 
    114     def initialize(self, host, cmdline_args, ec_wp=None):
    115         super(FirmwareTest, self).initialize(host)
    116         self.run_id = str(uuid.uuid4())
    117         logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
    118         # Parse arguments from command line
    119         args = {}
    120         self.power_control = host.POWER_CONTROL_RPM
    121         for arg in cmdline_args:
    122             match = re.search("^(\w+)=(.+)", arg)
    123             if match:
    124                 args[match.group(1)] = match.group(2)
    125         if 'power_control' in args:
    126             self.power_control = args['power_control']
    127             if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
    128                 raise error.TestError('Valid values for --args=power_control '
    129                                       'are %s. But you entered wrong argument '
    130                                       'as "%s".'
    131                                        % (host.POWER_CONTROL_VALID_ARGS,
    132                                        self.power_control))
    133 
    134         if not self.faft_client.system.dev_tpm_present():
    135             raise error.TestError('/dev/tpm0 does not exist on the client')
    136 
    137         self.faft_config = FAFTConfig(
    138                 self.faft_client.system.get_platform_name())
    139         self.checkers = FAFTCheckers(self)
    140         self.switcher = mode_switcher.create_mode_switcher(self)
    141 
    142         if self.faft_config.chrome_ec:
    143             self.ec = chrome_ec.ChromeEC(self.servo)
    144         # Check for presence of a USBPD console
    145         if self.faft_config.chrome_usbpd:
    146             self.usbpd = chrome_ec.ChromeUSBPD(self.servo)
    147         elif self.faft_config.chrome_ec:
    148             # If no separate USBPD console, then PD exists on EC console
    149             self.usbpd = self.ec
    150         # Get plankton console
    151         self.plankton = host.plankton
    152         self.plankton_host = host._plankton_host
    153 
    154         # Create the BaseEC object. None if not available.
    155         self.base_ec = chrome_base_ec.create_base_ec(self.servo)
    156 
    157         self._setup_uart_capture()
    158         self._setup_servo_log()
    159         self._record_system_info()
    160         self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
    161         logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
    162         if self.fw_vboot2:
    163             self.faft_client.system.set_fw_try_next('A')
    164             if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B':
    165                 logging.info('mainfw_act is B. rebooting to set it A')
    166                 self.switcher.mode_aware_reboot()
    167         self._setup_gbb_flags()
    168         self.faft_client.updater.stop_daemon()
    169         self._create_faft_lockfile()
    170         self._setup_ec_write_protect(ec_wp)
    171         # See chromium:239034 regarding needing this sync.
    172         self.blocking_sync()
    173         logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
    174 
    175     def cleanup(self):
    176         """Autotest cleanup function."""
    177         # Unset state checker in case it's set by subclass
    178         logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
    179         try:
    180             self.faft_client.system.is_available()
    181         except:
    182             # Remote is not responding. Revive DUT so that subsequent tests
    183             # don't fail.
    184             self._restore_routine_from_timeout()
    185         self.switcher.restore_mode()
    186         self._restore_ec_write_protect()
    187         self._restore_gbb_flags()
    188         self.faft_client.updater.start_daemon()
    189         self._remove_faft_lockfile()
    190         self._record_servo_log()
    191         self._record_faft_client_log()
    192         self._cleanup_uart_capture()
    193         super(FirmwareTest, self).cleanup()
    194         logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
    195 
    196     def _record_system_info(self):
    197         """Record some critical system info to the attr keyval.
    198 
    199         This info is used by generate_test_report later.
    200         """
    201         system_info = {
    202             'hwid': self.faft_client.system.get_crossystem_value('hwid'),
    203             'ec_version': self.faft_client.ec.get_version(),
    204             'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
    205             'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
    206             'servod_version': self._client._servo_host.run(
    207                 'servod --version').stdout.strip(),
    208         }
    209 
    210         if hasattr(self, 'cr50'):
    211             system_info['cr50_version'] = self.servo.get('cr50_version')
    212 
    213         logging.info('System info:\n' + pprint.pformat(system_info))
    214         self.write_attr_keyval(system_info)
    215 
    216     def invalidate_firmware_setup(self):
    217         """Invalidate all firmware related setup state.
    218 
    219         This method is called when the firmware is re-flashed. It resets all
    220         firmware related setup states so that the next test setup properly
    221         again.
    222         """
    223         self.unmark_setup_done('gbb_flags')
    224 
    225     def _retrieve_recovery_reason_from_trap(self):
    226         """Try to retrieve the recovery reason from a trapped recovery screen.
    227 
    228         @return: The recovery_reason, 0 if any error.
    229         """
    230         recovery_reason = 0
    231         logging.info('Try to retrieve recovery reason...')
    232         if self.servo.get_usbkey_direction() == 'dut':
    233             self.switcher.bypass_rec_mode()
    234         else:
    235             self.servo.switch_usbkey('dut')
    236 
    237         try:
    238             self.switcher.wait_for_client()
    239             lines = self.faft_client.system.run_shell_command_get_output(
    240                         'crossystem recovery_reason')
    241             recovery_reason = int(lines[0])
    242             logging.info('Got the recovery reason %d.', recovery_reason)
    243         except ConnectionError:
    244             logging.error('Failed to get the recovery reason due to connection '
    245                           'error.')
    246         return recovery_reason
    247 
    248     def _reset_client(self):
    249         """Reset client to a workable state.
    250 
    251         This method is called when the client is not responsive. It may be
    252         caused by the following cases:
    253           - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
    254           - corrupted firmware;
    255           - corrutped OS image.
    256         """
    257         # DUT may halt on a firmware screen. Try cold reboot.
    258         logging.info('Try cold reboot...')
    259         self.switcher.mode_aware_reboot(reboot_type='cold',
    260                                         sync_before_boot=False,
    261                                         wait_for_dut_up=False)
    262         self.switcher.wait_for_client_offline()
    263         self.switcher.bypass_dev_mode()
    264         try:
    265             self.switcher.wait_for_client()
    266             return
    267         except ConnectionError:
    268             logging.warn('Cold reboot doesn\'t help, still connection error.')
    269 
    270         # DUT may be broken by a corrupted firmware. Restore firmware.
    271         # We assume the recovery boot still works fine. Since the recovery
    272         # code is in RO region and all FAFT tests don't change the RO region
    273         # except GBB.
    274         if self.is_firmware_saved():
    275             self._ensure_client_in_recovery()
    276             logging.info('Try restore the original firmware...')
    277             if self.is_firmware_changed():
    278                 try:
    279                     self.restore_firmware()
    280                     return
    281                 except ConnectionError:
    282                     logging.warn('Restoring firmware doesn\'t help, still '
    283                                  'connection error.')
    284 
    285         # Perhaps it's kernel that's broken. Let's try restoring it.
    286         if self.is_kernel_saved():
    287             self._ensure_client_in_recovery()
    288             logging.info('Try restore the original kernel...')
    289             if self.is_kernel_changed():
    290                 try:
    291                     self.restore_kernel()
    292                     return
    293                 except ConnectionError:
    294                     logging.warn('Restoring kernel doesn\'t help, still '
    295                                  'connection error.')
    296 
    297         # DUT may be broken by a corrupted OS image. Restore OS image.
    298         self._ensure_client_in_recovery()
    299         logging.info('Try restore the OS image...')
    300         self.faft_client.system.run_shell_command('chromeos-install --yes')
    301         self.switcher.mode_aware_reboot(wait_for_dut_up=False)
    302         self.switcher.wait_for_client_offline()
    303         self.switcher.bypass_dev_mode()
    304         try:
    305             self.switcher.wait_for_client()
    306             logging.info('Successfully restore OS image.')
    307             return
    308         except ConnectionError:
    309             logging.warn('Restoring OS image doesn\'t help, still connection '
    310                          'error.')
    311 
    312     def _ensure_client_in_recovery(self):
    313         """Ensure client in recovery boot; reboot into it if necessary.
    314 
    315         @raise TestError: if failed to boot the USB image.
    316         """
    317         logging.info('Try boot into USB image...')
    318         self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
    319                                      wait_for_dut_up=False)
    320         self.servo.switch_usbkey('host')
    321         self.switcher.bypass_rec_mode()
    322         try:
    323             self.switcher.wait_for_client()
    324         except ConnectionError:
    325             raise error.TestError('Failed to boot the USB image.')
    326 
    327     def _restore_routine_from_timeout(self):
    328         """A routine to try to restore the system from a timeout error.
    329 
    330         This method is called when FAFT failed to connect DUT after reboot.
    331 
    332         @raise TestFail: This exception is already raised, with a decription
    333                          why it failed.
    334         """
    335         # DUT is disconnected. Capture the UART output for debug.
    336         self._record_uart_capture()
    337 
    338         # TODO(waihong (at] chromium.org): Implement replugging the Ethernet to
    339         # identify if it is a network flaky.
    340 
    341         recovery_reason = self._retrieve_recovery_reason_from_trap()
    342 
    343         # Reset client to a workable state.
    344         self._reset_client()
    345 
    346         # Raise the proper TestFail exception.
    347         if recovery_reason:
    348             raise error.TestFail('Trapped in the recovery screen (reason: %d) '
    349                                  'and timed out' % recovery_reason)
    350         else:
    351             raise error.TestFail('Timed out waiting for DUT reboot')
    352 
    353     def assert_test_image_in_usb_disk(self, usb_dev=None):
    354         """Assert an USB disk plugged-in on servo and a test image inside.
    355 
    356         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
    357                         If None, it is detected automatically.
    358         @raise TestError: if USB disk not detected or not a test image.
    359         """
    360         if self.check_setup_done('usb_check'):
    361             return
    362         if usb_dev:
    363             assert self.servo.get_usbkey_direction() == 'host'
    364         else:
    365             self.servo.switch_usbkey('host')
    366             usb_dev = self.servo.probe_host_usb_dev()
    367             if not usb_dev:
    368                 raise error.TestError(
    369                         'An USB disk should be plugged in the servo board.')
    370 
    371         rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
    372         logging.info('usb dev is %s', usb_dev)
    373         tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
    374         self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
    375 
    376         try:
    377             usb_lsb = self.servo.system_output('cat %s' %
    378                 os.path.join(tmpd, 'etc/lsb-release'))
    379             logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
    380             dut_lsb = '\n'.join(self.faft_client.system.
    381                 run_shell_command_get_output('cat /etc/lsb-release'))
    382             logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
    383             if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
    384                 raise error.TestError('USB stick in servo is no test image')
    385             usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
    386             dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
    387             if usb_board != dut_board:
    388                 raise error.TestError('USB stick in servo contains a %s '
    389                     'image, but DUT is a %s' % (usb_board, dut_board))
    390         finally:
    391             for cmd in ('umount -l %s' % rootfs, 'sync', 'rm -rf %s' % tmpd):
    392                 self.servo.system(cmd)
    393 
    394         self.mark_setup_done('usb_check')
    395 
    396     def setup_usbkey(self, usbkey, host=None):
    397         """Setup the USB disk for the test.
    398 
    399         It checks the setup of USB disk and a valid ChromeOS test image inside.
    400         It also muxes the USB disk to either the host or DUT by request.
    401 
    402         @param usbkey: True if the USB disk is required for the test, False if
    403                        not required.
    404         @param host: Optional, True to mux the USB disk to host, False to mux it
    405                     to DUT, default to do nothing.
    406         """
    407         if usbkey:
    408             self.assert_test_image_in_usb_disk()
    409         elif host is None:
    410             # USB disk is not required for the test. Better to mux it to host.
    411             host = True
    412 
    413         if host is True:
    414             self.servo.switch_usbkey('host')
    415         elif host is False:
    416             self.servo.switch_usbkey('dut')
    417 
    418     def get_usbdisk_path_on_dut(self):
    419         """Get the path of the USB disk device plugged-in the servo on DUT.
    420 
    421         Returns:
    422           A string representing USB disk path, like '/dev/sdb', or None if
    423           no USB disk is found.
    424         """
    425         cmd = 'ls -d /dev/s*[a-z]'
    426         original_value = self.servo.get_usbkey_direction()
    427 
    428         # Make the dut unable to see the USB disk.
    429         self.servo.switch_usbkey('off')
    430         no_usb_set = set(
    431             self.faft_client.system.run_shell_command_get_output(cmd))
    432 
    433         # Make the dut able to see the USB disk.
    434         self.servo.switch_usbkey('dut')
    435         time.sleep(self.faft_config.usb_plug)
    436         has_usb_set = set(
    437             self.faft_client.system.run_shell_command_get_output(cmd))
    438 
    439         # Back to its original value.
    440         if original_value != self.servo.get_usbkey_direction():
    441             self.servo.switch_usbkey(original_value)
    442 
    443         diff_set = has_usb_set - no_usb_set
    444         if len(diff_set) == 1:
    445             return diff_set.pop()
    446         else:
    447             return None
    448 
    449     def _create_faft_lockfile(self):
    450         """Creates the FAFT lockfile."""
    451         logging.info('Creating FAFT lockfile...')
    452         command = 'touch %s' % (self.lockfile)
    453         self.faft_client.system.run_shell_command(command)
    454 
    455     def _remove_faft_lockfile(self):
    456         """Removes the FAFT lockfile."""
    457         logging.info('Removing FAFT lockfile...')
    458         command = 'rm -f %s' % (self.lockfile)
    459         self.faft_client.system.run_shell_command(command)
    460 
    461     def clear_set_gbb_flags(self, clear_mask, set_mask):
    462         """Clear and set the GBB flags in the current flashrom.
    463 
    464         @param clear_mask: A mask of flags to be cleared.
    465         @param set_mask: A mask of flags to be set.
    466         """
    467         gbb_flags = self.faft_client.bios.get_gbb_flags()
    468         new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
    469         if new_flags != gbb_flags:
    470             self._backup_gbb_flags = gbb_flags
    471             logging.info('Changing GBB flags from 0x%x to 0x%x.',
    472                          gbb_flags, new_flags)
    473             self.faft_client.bios.set_gbb_flags(new_flags)
    474             # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag,
    475             # reboot to get a clear state
    476             if ((gbb_flags ^ new_flags) &
    477                 (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
    478                  vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)):
    479                 self.switcher.mode_aware_reboot()
    480         else:
    481             logging.info('Current GBB flags look good for test: 0x%x.',
    482                          gbb_flags)
    483 
    484     def check_ec_capability(self, required_cap=None, suppress_warning=False):
    485         """Check if current platform has required EC capabilities.
    486 
    487         @param required_cap: A list containing required EC capabilities. Pass in
    488                              None to only check for presence of Chrome EC.
    489         @param suppress_warning: True to suppress any warning messages.
    490         @return: True if requirements are met. Otherwise, False.
    491         """
    492         if not self.faft_config.chrome_ec:
    493             if not suppress_warning:
    494                 logging.warn('Requires Chrome EC to run this test.')
    495             return False
    496 
    497         if not required_cap:
    498             return True
    499 
    500         for cap in required_cap:
    501             if cap not in self.faft_config.ec_capability:
    502                 if not suppress_warning:
    503                     logging.warn('Requires EC capability "%s" to run this '
    504                                  'test.', cap)
    505                 return False
    506 
    507         return True
    508 
    509     def check_root_part_on_non_recovery(self, part):
    510         """Check the partition number of root device and on normal/dev boot.
    511 
    512         @param part: A string of partition number, e.g.'3'.
    513         @return: True if the root device matched and on normal/dev boot;
    514                  otherwise, False.
    515         """
    516         return self.checkers.root_part_checker(part) and \
    517                 self.checkers.crossystem_checker({
    518                     'mainfw_type': ('normal', 'developer'),
    519                 })
    520 
    521     def _join_part(self, dev, part):
    522         """Return a concatenated string of device and partition number.
    523 
    524         @param dev: A string of device, e.g.'/dev/sda'.
    525         @param part: A string of partition number, e.g.'3'.
    526         @return: A concatenated string of device and partition number,
    527                  e.g.'/dev/sda3'.
    528 
    529         >>> seq = FirmwareTest()
    530         >>> seq._join_part('/dev/sda', '3')
    531         '/dev/sda3'
    532         >>> seq._join_part('/dev/mmcblk0', '2')
    533         '/dev/mmcblk0p2'
    534         """
    535         if 'mmcblk' in dev:
    536             return dev + 'p' + part
    537         elif 'nvme' in dev:
    538             return dev + 'p' + part
    539         else:
    540             return dev + part
    541 
    542     def copy_kernel_and_rootfs(self, from_part, to_part):
    543         """Copy kernel and rootfs from from_part to to_part.
    544 
    545         @param from_part: A string of partition number to be copied from.
    546         @param to_part: A string of partition number to be copied to.
    547         """
    548         root_dev = self.faft_client.system.get_root_dev()
    549         logging.info('Copying kernel from %s to %s. Please wait...',
    550                      from_part, to_part)
    551         self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
    552                 (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
    553                  self._join_part(root_dev, self.KERNEL_MAP[to_part])))
    554         logging.info('Copying rootfs from %s to %s. Please wait...',
    555                      from_part, to_part)
    556         self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
    557                 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
    558                  self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
    559 
    560     def ensure_kernel_boot(self, part):
    561         """Ensure the request kernel boot.
    562 
    563         If not, it duplicates the current kernel to the requested kernel
    564         and sets the requested higher priority to ensure it boot.
    565 
    566         @param part: A string of kernel partition number or 'a'/'b'.
    567         """
    568         if not self.checkers.root_part_checker(part):
    569             if self.faft_client.kernel.diff_a_b():
    570                 self.copy_kernel_and_rootfs(
    571                         from_part=self.OTHER_KERNEL_MAP[part],
    572                         to_part=part)
    573             self.reset_and_prioritize_kernel(part)
    574             self.switcher.mode_aware_reboot()
    575 
    576     def ensure_dev_internal_boot(self, original_dev_boot_usb):
    577         """Ensure internal device boot in developer mode.
    578 
    579         If not internal device boot, it will try to reboot the device and
    580         bypass dev mode to boot into internal device.
    581 
    582         @param original_dev_boot_usb: Original dev_boot_usb value.
    583         """
    584         logging.info('Checking internal device boot.')
    585         if self.faft_client.system.is_removable_device_boot():
    586             logging.info('Reboot into internal disk...')
    587             self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb)
    588             self.switcher.mode_aware_reboot()
    589         self.check_state((self.checkers.dev_boot_usb_checker, False,
    590                           'Device not booted from internal disk properly.'))
    591 
    592     def set_hardware_write_protect(self, enable):
    593         """Set hardware write protect pin.
    594 
    595         @param enable: True if asserting write protect pin. Otherwise, False.
    596         """
    597         try:
    598             self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
    599         except:
    600             # TODO(waihong): Remove this fallback when all servos have the
    601             # above new fw_wp_state control.
    602             self.servo.set('fw_wp_vref', self.faft_config.wp_voltage)
    603             self.servo.set('fw_wp_en', 'on')
    604             self.servo.set('fw_wp', 'on' if enable else 'off')
    605 
    606     def set_ec_write_protect_and_reboot(self, enable):
    607         """Set EC write protect status and reboot to take effect.
    608 
    609         The write protect state is only activated if both hardware write
    610         protect pin is asserted and software write protect flag is set.
    611         This method asserts/deasserts hardware write protect pin first, and
    612         set corresponding EC software write protect flag.
    613 
    614         If the device uses non-Chrome EC, set the software write protect via
    615         flashrom.
    616 
    617         If the device uses Chrome EC, a reboot is required for write protect
    618         to take effect. Since the software write protect flag cannot be unset
    619         if hardware write protect pin is asserted, we need to deasserted the
    620         pin first if we are deactivating write protect. Similarly, a reboot
    621         is required before we can modify the software flag.
    622 
    623         @param enable: True if activating EC write protect. Otherwise, False.
    624         """
    625         self.set_hardware_write_protect(enable)
    626         if self.faft_config.chrome_ec:
    627             self.set_chrome_ec_write_protect_and_reboot(enable)
    628         else:
    629             self.faft_client.ec.set_write_protect(enable)
    630             self.switcher.mode_aware_reboot()
    631 
    632     def set_chrome_ec_write_protect_and_reboot(self, enable):
    633         """Set Chrome EC write protect status and reboot to take effect.
    634 
    635         @param enable: True if activating EC write protect. Otherwise, False.
    636         """
    637         if enable:
    638             # Set write protect flag and reboot to take effect.
    639             self.ec.set_flash_write_protect(enable)
    640             self.sync_and_ec_reboot()
    641         else:
    642             # Reboot after deasserting hardware write protect pin to deactivate
    643             # write protect. And then remove software write protect flag.
    644             self.sync_and_ec_reboot()
    645             self.ec.set_flash_write_protect(enable)
    646 
    647     def _setup_ec_write_protect(self, ec_wp):
    648         """Setup for EC write-protection.
    649 
    650         It makes sure the EC in the requested write-protection state. If not, it
    651         flips the state. Flipping the write-protection requires DUT reboot.
    652 
    653         @param ec_wp: True to request EC write-protected; False to request EC
    654                       not write-protected; None to do nothing.
    655         """
    656         if ec_wp is None:
    657             self._old_ec_wp = None
    658             return
    659         self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'})
    660         if ec_wp != self._old_ec_wp:
    661             logging.info('The test required EC is %swrite-protected. Reboot '
    662                          'and flip the state.', '' if ec_wp else 'not ')
    663             self.switcher.mode_aware_reboot(
    664                     'custom',
    665                      lambda:self.set_ec_write_protect_and_reboot(ec_wp))
    666 
    667     def _restore_ec_write_protect(self):
    668         """Restore the original EC write-protection."""
    669         if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None):
    670             return
    671         if not self.checkers.crossystem_checker(
    672                 {'wpsw_boot': '1' if self._old_ec_wp else '0'}):
    673             logging.info('Restore original EC write protection and reboot.')
    674             self.switcher.mode_aware_reboot(
    675                     'custom',
    676                     lambda:self.set_ec_write_protect_and_reboot(
    677                             self._old_ec_wp))
    678 
    679     def _setup_uart_capture(self):
    680         """Setup the CPU/EC/PD UART capture."""
    681         self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
    682         self.servo.set('cpu_uart_capture', 'on')
    683         self.cr50_uart_file = None
    684         self.ec_uart_file = None
    685         self.usbpd_uart_file = None
    686         try:
    687             # Check that the console works before declaring the cr50 console
    688             # connection exists and enabling uart capture.
    689             self.servo.get('cr50_version')
    690             self.servo.set('cr50_uart_capture', 'on')
    691             self.cr50_uart_file = os.path.join(self.resultsdir, 'cr50_uart.txt')
    692             self.cr50 = chrome_cr50.ChromeCr50(self.servo)
    693         except error.TestFail as e:
    694             if 'No control named' in str(e):
    695                 logging.warn('cr50 console not supported.')
    696         if self.faft_config.chrome_ec:
    697             try:
    698                 self.servo.set('ec_uart_capture', 'on')
    699                 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
    700             except error.TestFail as e:
    701                 if 'No control named' in str(e):
    702                     logging.warn('The servod is too old that ec_uart_capture '
    703                                  'not supported.')
    704             # Log separate PD console if supported
    705             if self.check_ec_capability(['usbpd_uart'], suppress_warning=True):
    706                 try:
    707                     self.servo.set('usbpd_uart_capture', 'on')
    708                     self.usbpd_uart_file = os.path.join(self.resultsdir,
    709                                                         'usbpd_uart.txt')
    710                 except error.TestFail as e:
    711                     if 'No control named' in str(e):
    712                         logging.warn('The servod is too old that '
    713                                      'usbpd_uart_capture is not supported.')
    714         else:
    715             logging.info('Not a Google EC, cannot capture ec console output.')
    716 
    717     def _record_uart_capture(self):
    718         """Record the CPU/EC/PD UART output stream to files."""
    719         if self.cpu_uart_file:
    720             with open(self.cpu_uart_file, 'a') as f:
    721                 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
    722         if self.cr50_uart_file:
    723             with open(self.cr50_uart_file, 'a') as f:
    724                 f.write(ast.literal_eval(self.servo.get('cr50_uart_stream')))
    725         if self.ec_uart_file and self.faft_config.chrome_ec:
    726             with open(self.ec_uart_file, 'a') as f:
    727                 f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
    728         if (self.usbpd_uart_file and self.faft_config.chrome_ec and
    729             self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
    730             with open(self.usbpd_uart_file, 'a') as f:
    731                 f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream')))
    732 
    733     def _cleanup_uart_capture(self):
    734         """Cleanup the CPU/EC/PD UART capture."""
    735         # Flush the remaining UART output.
    736         self._record_uart_capture()
    737         self.servo.set('cpu_uart_capture', 'off')
    738         if self.cr50_uart_file:
    739             self.servo.set('cr50_uart_capture', 'off')
    740         if self.ec_uart_file and self.faft_config.chrome_ec:
    741             self.servo.set('ec_uart_capture', 'off')
    742         if (self.usbpd_uart_file and self.faft_config.chrome_ec and
    743             self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
    744             self.servo.set('usbpd_uart_capture', 'off')
    745 
    746     def _get_power_state(self, power_state):
    747         """
    748         Return the current power state of the AP
    749         """
    750         return self.ec.send_command_get_output("powerinfo", [power_state])
    751 
    752     def wait_power_state(self, power_state, retries):
    753         """
    754         Wait for certain power state.
    755 
    756         @param power_state: power state you are expecting
    757         @param retries: retries.  This is necessary if AP is powering down
    758         and transitioning through different states.
    759         """
    760         logging.info('Checking power state "%s" maximum %d times.',
    761                      power_state, retries)
    762         while retries > 0:
    763             logging.info("try count: %d", retries)
    764             try:
    765                 retries = retries - 1
    766                 ret = self._get_power_state(power_state)
    767                 return True
    768             except error.TestFail:
    769                 pass
    770         return False
    771 
    772     def suspend(self):
    773         """Suspends the DUT."""
    774         cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY
    775         self.faft_client.system.run_shell_command(cmd)
    776         time.sleep(self.EC_SUSPEND_DELAY)
    777 
    778     def _fetch_servo_log(self):
    779         """Fetch the servo log."""
    780         cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
    781         servo_log = self.servo.system_output(cmd)
    782         return None if servo_log == 'NOTFOUND' else servo_log
    783 
    784     def _setup_servo_log(self):
    785         """Setup the servo log capturing."""
    786         self.servo_log_original_len = -1
    787         if self.servo.is_localhost():
    788             # No servo log recorded when servod runs locally.
    789             return
    790 
    791         servo_log = self._fetch_servo_log()
    792         if servo_log:
    793             self.servo_log_original_len = len(servo_log)
    794         else:
    795             logging.warn('Servo log file not found.')
    796 
    797     def _record_servo_log(self):
    798         """Record the servo log to the results directory."""
    799         if self.servo_log_original_len != -1:
    800             servo_log = self._fetch_servo_log()
    801             servo_log_file = os.path.join(self.resultsdir, 'servod.log')
    802             with open(servo_log_file, 'a') as f:
    803                 f.write(servo_log[self.servo_log_original_len:])
    804 
    805     def _record_faft_client_log(self):
    806         """Record the faft client log to the results directory."""
    807         client_log = self.faft_client.system.dump_log(True)
    808         client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
    809         with open(client_log_file, 'w') as f:
    810             f.write(client_log)
    811 
    812     def _setup_gbb_flags(self):
    813         """Setup the GBB flags for FAFT test."""
    814         if self.faft_config.gbb_version < 1.1:
    815             logging.info('Skip modifying GBB on versions older than 1.1.')
    816             return
    817 
    818         if self.check_setup_done('gbb_flags'):
    819             return
    820 
    821         logging.info('Set proper GBB flags for test.')
    822         self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY |
    823                                  vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
    824                                  vboot.GBB_FLAG_FORCE_DEV_BOOT_USB |
    825                                  vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK |
    826                                  vboot.GBB_FLAG_FORCE_DEV_BOOT_FASTBOOT_FULL_CAP,
    827                                  vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM |
    828                                  vboot.GBB_FLAG_FAFT_KEY_OVERIDE)
    829         self.mark_setup_done('gbb_flags')
    830 
    831     def drop_backup_gbb_flags(self):
    832         """Drops the backup GBB flags.
    833 
    834         This can be used when a test intends to permanently change GBB flags.
    835         """
    836         self._backup_gbb_flags = None
    837 
    838     def _restore_gbb_flags(self):
    839         """Restore GBB flags to their original state."""
    840         if self._backup_gbb_flags is None:
    841             return
    842         # Setting up and restoring the GBB flags take a lot of time. For
    843         # speed-up purpose, don't restore it.
    844         logging.info('***')
    845         logging.info('*** Please manually restore the original GBB flags to: '
    846                      '0x%x ***', self._backup_gbb_flags)
    847         logging.info('***')
    848         self.unmark_setup_done('gbb_flags')
    849 
    850     def setup_tried_fwb(self, tried_fwb):
    851         """Setup for fw B tried state.
    852 
    853         It makes sure the system in the requested fw B tried state. If not, it
    854         tries to do so.
    855 
    856         @param tried_fwb: True if requested in tried_fwb=1;
    857                           False if tried_fwb=0.
    858         """
    859         if tried_fwb:
    860             if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
    861                 logging.info(
    862                     'Firmware is not booted with tried_fwb. Reboot into it.')
    863                 self.faft_client.system.set_try_fw_b()
    864         else:
    865             if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
    866                 logging.info(
    867                     'Firmware is booted with tried_fwb. Reboot to clear.')
    868 
    869     def power_on(self):
    870         """Switch DUT AC power on."""
    871         self._client.power_on(self.power_control)
    872 
    873     def power_off(self):
    874         """Switch DUT AC power off."""
    875         self._client.power_off(self.power_control)
    876 
    877     def power_cycle(self):
    878         """Power cycle DUT AC power."""
    879         self._client.power_cycle(self.power_control)
    880 
    881     def setup_rw_boot(self, section='a'):
    882         """Make sure firmware is in RW-boot mode.
    883 
    884         If the given firmware section is in RO-boot mode, turn off the RO-boot
    885         flag and reboot DUT into RW-boot mode.
    886 
    887         @param section: A firmware section, either 'a' or 'b'.
    888         """
    889         flags = self.faft_client.bios.get_preamble_flags(section)
    890         if flags & vboot.PREAMBLE_USE_RO_NORMAL:
    891             flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
    892             self.faft_client.bios.set_preamble_flags(section, flags)
    893             self.switcher.mode_aware_reboot()
    894 
    895     def setup_kernel(self, part):
    896         """Setup for kernel test.
    897 
    898         It makes sure both kernel A and B bootable and the current boot is
    899         the requested kernel part.
    900 
    901         @param part: A string of kernel partition number or 'a'/'b'.
    902         """
    903         self.ensure_kernel_boot(part)
    904         logging.info('Checking the integrity of kernel B and rootfs B...')
    905         if (self.faft_client.kernel.diff_a_b() or
    906                 not self.faft_client.rootfs.verify_rootfs('B')):
    907             logging.info('Copying kernel and rootfs from A to B...')
    908             self.copy_kernel_and_rootfs(from_part=part,
    909                                         to_part=self.OTHER_KERNEL_MAP[part])
    910         self.reset_and_prioritize_kernel(part)
    911 
    912     def reset_and_prioritize_kernel(self, part):
    913         """Make the requested partition highest priority.
    914 
    915         This function also reset kerenl A and B to bootable.
    916 
    917         @param part: A string of partition number to be prioritized.
    918         """
    919         root_dev = self.faft_client.system.get_root_dev()
    920         # Reset kernel A and B to bootable.
    921         self.faft_client.system.run_shell_command(
    922             'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
    923         self.faft_client.system.run_shell_command(
    924             'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
    925         # Set kernel part highest priority.
    926         self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
    927                 (self.KERNEL_MAP[part], root_dev))
    928 
    929     def do_blocking_sync(self, device):
    930         """Run a blocking sync command."""
    931         logging.info("Blocking sync for %s", device)
    932         if 'mmcblk' in device:
    933             # For mmc devices, use `mmc status get` command to send an
    934             # empty command to wait for the disk to be available again.
    935             self.faft_client.system.run_shell_command('mmc status get %s' %
    936                                                       device)
    937         elif 'nvme' in device:
    938             # Get a list of NVMe namespace and flush them individually
    939             # Assumes the output format from nvme list-ns command will
    940             # be something like follows:
    941             # [ 0]:0x1
    942             # [ 1]:0x2
    943             available_ns = self.faft_client.system.run_shell_command_get_output(
    944                                                   'nvme list-ns %s -a' % device)
    945             for ns in available_ns:
    946                 ns = ns.split(':')[-1]
    947                 # For NVMe devices, use `nvme flush` command to commit data
    948                 # and metadata to non-volatile media.
    949                 self.faft_client.system.run_shell_command(
    950                                  'nvme flush %s -n %s' % (device, ns))
    951         else:
    952             # For other devices, hdparm sends TUR to check if
    953             # a device is ready for transfer operation.
    954             self.faft_client.system.run_shell_command('hdparm -f %s' % device)
    955 
    956     def blocking_sync(self):
    957         """Sync root device and internal device."""
    958         # The double calls to sync fakes a blocking call
    959         # since the first call returns before the flush
    960         # is complete, but the second will wait for the
    961         # first to finish.
    962         self.faft_client.system.run_shell_command('sync')
    963         self.faft_client.system.run_shell_command('sync')
    964 
    965         # sync only sends SYNCHRONIZE_CACHE but doesn't check the status.
    966         # This function will perform a device-specific sync command.
    967         root_dev = self.faft_client.system.get_root_dev()
    968         self.do_blocking_sync(root_dev)
    969 
    970         # Also sync the internal device if booted from removable media.
    971         if self.faft_client.system.is_removable_device_boot():
    972             internal_dev = self.faft_client.system.get_internal_device()
    973             self.do_blocking_sync(internal_dev)
    974 
    975     def sync_and_ec_reboot(self, flags=''):
    976         """Request the client sync and do a EC triggered reboot.
    977 
    978         @param flags: Optional, a space-separated string of flags passed to EC
    979                       reboot command, including:
    980                           default: EC soft reboot;
    981                           'hard': EC cold/hard reboot.
    982         """
    983         self.blocking_sync()
    984         self.ec.reboot(flags)
    985         time.sleep(self.faft_config.ec_boot_to_console)
    986         self.check_lid_and_power_on()
    987 
    988     def reboot_and_reset_tpm(self):
    989         """Reboot into recovery mode, reset TPM, then reboot back to disk."""
    990         self.switcher.reboot_to_mode(to_mode='rec')
    991         self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
    992         self.switcher.mode_aware_reboot()
    993 
    994     def full_power_off_and_on(self):
    995         """Shutdown the device by pressing power button and power on again."""
    996         boot_id = self.get_bootid()
    997         # Press power button to trigger Chrome OS normal shutdown process.
    998         # We use a customized delay since the normal-press 1.2s is not enough.
    999         self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
   1000         # device can take 44-51 seconds to restart,
   1001         # add buffer from the default timeout of 60 seconds.
   1002         self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
   1003         time.sleep(self.faft_config.shutdown)
   1004         # Short press power button to boot DUT again.
   1005         self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
   1006 
   1007     def check_lid_and_power_on(self):
   1008         """
   1009         On devices with EC software sync, system powers on after EC reboots if
   1010         lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
   1011         This method checks lid switch state and presses power button if
   1012         necessary.
   1013         """
   1014         if self.servo.get("lid_open") == "no":
   1015             time.sleep(self.faft_config.software_sync)
   1016             self.servo.power_short_press()
   1017 
   1018     def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
   1019         """Modify the kernel header magic in USB stick.
   1020 
   1021         The kernel header magic is the first 8-byte of kernel partition.
   1022         We modify it to make it fail on kernel verification check.
   1023 
   1024         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
   1025         @param from_magic: A string of magic which we change it from.
   1026         @param to_magic: A string of magic which we change it to.
   1027         @raise TestError: if failed to change magic.
   1028         """
   1029         assert len(from_magic) == 8
   1030         assert len(to_magic) == 8
   1031         # USB image only contains one kernel.
   1032         kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
   1033         read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
   1034         current_magic = self.servo.system_output(read_cmd)
   1035         if current_magic == to_magic:
   1036             logging.info("The kernel magic is already %s.", current_magic)
   1037             return
   1038         if current_magic != from_magic:
   1039             raise error.TestError("Invalid kernel image on USB: wrong magic.")
   1040 
   1041         logging.info('Modify the kernel magic in USB, from %s to %s.',
   1042                      from_magic, to_magic)
   1043         write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
   1044                      " 2>/dev/null" % (to_magic, kernel_part))
   1045         self.servo.system(write_cmd)
   1046 
   1047         if self.servo.system_output(read_cmd) != to_magic:
   1048             raise error.TestError("Failed to write new magic.")
   1049 
   1050     def corrupt_usb_kernel(self, usb_dev):
   1051         """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
   1052 
   1053         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
   1054         """
   1055         self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
   1056                                 self.CORRUPTED_MAGIC)
   1057 
   1058     def restore_usb_kernel(self, usb_dev):
   1059         """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
   1060 
   1061         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
   1062         """
   1063         self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
   1064                                 self.CHROMEOS_MAGIC)
   1065 
   1066     def _call_action(self, action_tuple, check_status=False):
   1067         """Call the action function with/without arguments.
   1068 
   1069         @param action_tuple: A function, or a tuple (function, args, error_msg),
   1070                              in which, args and error_msg are optional. args is
   1071                              either a value or a tuple if multiple arguments.
   1072                              This can also be a list containing multiple
   1073                              function or tuple. In this case, these actions are
   1074                              called in sequence.
   1075         @param check_status: Check the return value of action function. If not
   1076                              succeed, raises a TestFail exception.
   1077         @return: The result value of the action function.
   1078         @raise TestError: An error when the action function is not callable.
   1079         @raise TestFail: When check_status=True, action function not succeed.
   1080         """
   1081         if isinstance(action_tuple, list):
   1082             return all([self._call_action(action, check_status=check_status)
   1083                         for action in action_tuple])
   1084 
   1085         action = action_tuple
   1086         args = ()
   1087         error_msg = 'Not succeed'
   1088         if isinstance(action_tuple, tuple):
   1089             action = action_tuple[0]
   1090             if len(action_tuple) >= 2:
   1091                 args = action_tuple[1]
   1092                 if not isinstance(args, tuple):
   1093                     args = (args,)
   1094             if len(action_tuple) >= 3:
   1095                 error_msg = action_tuple[2]
   1096 
   1097         if action is None:
   1098             return
   1099 
   1100         if not callable(action):
   1101             raise error.TestError('action is not callable!')
   1102 
   1103         info_msg = 'calling %s' % action.__name__
   1104         if args:
   1105             info_msg += ' with args %s' % str(args)
   1106         logging.info(info_msg)
   1107         ret = action(*args)
   1108 
   1109         if check_status and not ret:
   1110             raise error.TestFail('%s: %s returning %s' %
   1111                                  (error_msg, info_msg, str(ret)))
   1112         return ret
   1113 
   1114     def run_shutdown_process(self, shutdown_action, pre_power_action=None,
   1115                              run_power_action=True, post_power_action=None,
   1116                              shutdown_timeout=None):
   1117         """Run shutdown_action(), which makes DUT shutdown, and power it on.
   1118 
   1119         @param shutdown_action: function which makes DUT shutdown, like
   1120                                 pressing power key.
   1121         @param pre_power_action: function which is called before next power on.
   1122         @param run_power_action: power_key press by default, set to None to skip.
   1123         @param post_power_action: function which is called after next power on.
   1124         @param shutdown_timeout: a timeout to confirm DUT shutdown.
   1125         @raise TestFail: if the shutdown_action() failed to turn DUT off.
   1126         """
   1127         self._call_action(shutdown_action)
   1128         logging.info('Wait to ensure DUT shut down...')
   1129         try:
   1130             if shutdown_timeout is None:
   1131                 shutdown_timeout = self.faft_config.shutdown_timeout
   1132             self.switcher.wait_for_client(timeout=shutdown_timeout)
   1133             raise error.TestFail(
   1134                     'Should shut the device down after calling %s.' %
   1135                     shutdown_action.__name__)
   1136         except ConnectionError:
   1137             if self.check_ec_capability(['x86'], suppress_warning=True):
   1138                 PWR_RETRIES=5
   1139                 if not self.wait_power_state("G3", PWR_RETRIES):
   1140                     raise error.TestFail("System not shutdown properly and EC"
   1141                                          "fails to enter into G3 state.")
   1142                 logging.info('System entered into G3 state..')
   1143             logging.info(
   1144                 'DUT is surely shutdown. We are going to power it on again...')
   1145 
   1146         if pre_power_action:
   1147             self._call_action(pre_power_action)
   1148         if run_power_action:
   1149             self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
   1150         if post_power_action:
   1151             self._call_action(post_power_action)
   1152 
   1153     def get_bootid(self, retry=3):
   1154         """
   1155         Return the bootid.
   1156         """
   1157         boot_id = None
   1158         while retry:
   1159             try:
   1160                 boot_id = self._client.get_boot_id()
   1161                 break
   1162             except error.AutoservRunError:
   1163                 retry -= 1
   1164                 if retry:
   1165                     logging.info('Retry to get boot_id...')
   1166                 else:
   1167                     logging.warning('Failed to get boot_id.')
   1168         logging.info('boot_id: %s', boot_id)
   1169         return boot_id
   1170 
   1171     def check_state(self, func):
   1172         """
   1173         Wrapper around _call_action with check_status set to True. This is a
   1174         helper function to be used by tests and is currently implemented by
   1175         calling _call_action with check_status=True.
   1176 
   1177         TODO: This function's arguments need to be made more stringent. And
   1178         its functionality should be moved over to check functions directly in
   1179         the future.
   1180 
   1181         @param func: A function, or a tuple (function, args, error_msg),
   1182                              in which, args and error_msg are optional. args is
   1183                              either a value or a tuple if multiple arguments.
   1184                              This can also be a list containing multiple
   1185                              function or tuple. In this case, these actions are
   1186                              called in sequence.
   1187         @return: The result value of the action function.
   1188         @raise TestFail: If the function does notsucceed.
   1189         """
   1190         logging.info("-[FAFT]-[ start stepstate_checker ]----------")
   1191         self._call_action(func, check_status=True)
   1192         logging.info("-[FAFT]-[ end state_checker ]----------------")
   1193 
   1194     def get_current_firmware_sha(self):
   1195         """Get current firmware sha of body and vblock.
   1196 
   1197         @return: Current firmware sha follows the order (
   1198                  vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
   1199         """
   1200         current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
   1201                                 self.faft_client.bios.get_body_sha('a'),
   1202                                 self.faft_client.bios.get_sig_sha('b'),
   1203                                 self.faft_client.bios.get_body_sha('b'))
   1204         if not all(current_firmware_sha):
   1205             raise error.TestError('Failed to get firmware sha.')
   1206         return current_firmware_sha
   1207 
   1208     def is_firmware_changed(self):
   1209         """Check if the current firmware changed, by comparing its SHA.
   1210 
   1211         @return: True if it is changed, otherwise Flase.
   1212         """
   1213         # Device may not be rebooted after test.
   1214         self.faft_client.bios.reload()
   1215 
   1216         current_sha = self.get_current_firmware_sha()
   1217 
   1218         if current_sha == self._backup_firmware_sha:
   1219             return False
   1220         else:
   1221             corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
   1222             corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
   1223             corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
   1224             corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
   1225             logging.info('Firmware changed:')
   1226             logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
   1227             logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
   1228             logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
   1229             logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
   1230             return True
   1231 
   1232     def backup_firmware(self, suffix='.original'):
   1233         """Backup firmware to file, and then send it to host.
   1234 
   1235         @param suffix: a string appended to backup file name
   1236         """
   1237         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1238         remote_bios_path = os.path.join(remote_temp_dir, 'bios')
   1239         self.faft_client.bios.dump_whole(remote_bios_path)
   1240         self._client.get_file(remote_bios_path,
   1241                               os.path.join(self.resultsdir, 'bios' + suffix))
   1242 
   1243         if self.faft_config.chrome_ec:
   1244             remote_ec_path = os.path.join(remote_temp_dir, 'ec')
   1245             self.faft_client.ec.dump_whole(remote_ec_path)
   1246             self._client.get_file(remote_ec_path,
   1247                               os.path.join(self.resultsdir, 'ec' + suffix))
   1248 
   1249         self._client.run('rm -rf %s' % remote_temp_dir)
   1250         logging.info('Backup firmware stored in %s with suffix %s',
   1251             self.resultsdir, suffix)
   1252 
   1253         self._backup_firmware_sha = self.get_current_firmware_sha()
   1254 
   1255     def is_firmware_saved(self):
   1256         """Check if a firmware saved (called backup_firmware before).
   1257 
   1258         @return: True if the firmware is backuped; otherwise False.
   1259         """
   1260         return self._backup_firmware_sha != ()
   1261 
   1262     def clear_saved_firmware(self):
   1263         """Clear the firmware saved by the method backup_firmware."""
   1264         self._backup_firmware_sha = ()
   1265 
   1266     def restore_firmware(self, suffix='.original', restore_ec=True):
   1267         """Restore firmware from host in resultsdir.
   1268 
   1269         @param suffix: a string appended to backup file name
   1270         @param restore_ec: True to restore the ec firmware; False not to do.
   1271         """
   1272         if not self.is_firmware_changed():
   1273             return
   1274 
   1275         # Backup current corrupted firmware.
   1276         self.backup_firmware(suffix='.corrupt')
   1277 
   1278         # Restore firmware.
   1279         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1280         self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
   1281                                os.path.join(remote_temp_dir, 'bios'))
   1282 
   1283         self.faft_client.bios.write_whole(
   1284             os.path.join(remote_temp_dir, 'bios'))
   1285 
   1286         if self.faft_config.chrome_ec and restore_ec:
   1287             self._client.send_file(os.path.join(self.resultsdir, 'ec' + suffix),
   1288                 os.path.join(remote_temp_dir, 'ec'))
   1289             self.faft_client.ec.write_whole(
   1290                 os.path.join(remote_temp_dir, 'ec'))
   1291 
   1292         self.switcher.mode_aware_reboot()
   1293         logging.info('Successfully restore firmware.')
   1294 
   1295     def setup_firmwareupdate_shellball(self, shellball=None):
   1296         """Setup a shellball to use in firmware update test.
   1297 
   1298         Check if there is a given shellball, and it is a shell script. Then,
   1299         send it to the remote host. Otherwise, use the
   1300         /usr/sbin/chromeos-firmwareupdate in the image and replace its inside
   1301         BIOS and EC images with the active firmware images.
   1302 
   1303         @param shellball: path of a shellball or default to None.
   1304         """
   1305         if shellball:
   1306             # Determine the firmware file is a shellball or a raw binary.
   1307             is_shellball = (utils.system_output("file %s" % shellball).find(
   1308                     "shell script") != -1)
   1309             if is_shellball:
   1310                 logging.info('Device will update firmware with shellball %s',
   1311                              shellball)
   1312                 temp_path = self.faft_client.updater.get_temp_path()
   1313                 working_shellball = os.path.join(temp_path,
   1314                                                  'chromeos-firmwareupdate')
   1315                 self._client.send_file(shellball, working_shellball)
   1316                 self.faft_client.updater.extract_shellball()
   1317             else:
   1318                 raise error.TestFail(
   1319                     'The given shellball is not a shell script.')
   1320         else:
   1321             logging.info('No shellball given, use the original shellball and '
   1322                          'replace its BIOS and EC images.')
   1323             work_path = self.faft_client.updater.get_work_path()
   1324             bios_in_work_path = os.path.join(
   1325                 work_path, self.faft_client.updater.get_bios_relative_path())
   1326             ec_in_work_path = os.path.join(
   1327                 work_path, self.faft_client.updater.get_ec_relative_path())
   1328             logging.info('Writing current BIOS to: %s', bios_in_work_path)
   1329             self.faft_client.bios.dump_whole(bios_in_work_path)
   1330             if self.faft_config.chrome_ec:
   1331                 logging.info('Writing current EC to: %s', ec_in_work_path)
   1332                 self.faft_client.ec.dump_firmware(ec_in_work_path)
   1333             self.faft_client.updater.repack_shellball()
   1334 
   1335     def is_kernel_changed(self):
   1336         """Check if the current kernel is changed, by comparing its SHA1 hash.
   1337 
   1338         @return: True if it is changed; otherwise, False.
   1339         """
   1340         changed = False
   1341         for p in ('A', 'B'):
   1342             backup_sha = self._backup_kernel_sha.get(p, None)
   1343             current_sha = self.faft_client.kernel.get_sha(p)
   1344             if backup_sha != current_sha:
   1345                 changed = True
   1346                 logging.info('Kernel %s is changed', p)
   1347         return changed
   1348 
   1349     def backup_kernel(self, suffix='.original'):
   1350         """Backup kernel to files, and the send them to host.
   1351 
   1352         @param suffix: a string appended to backup file name.
   1353         """
   1354         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1355         for p in ('A', 'B'):
   1356             remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
   1357             self.faft_client.kernel.dump(p, remote_path)
   1358             self._client.get_file(
   1359                     remote_path,
   1360                     os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
   1361             self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
   1362         logging.info('Backup kernel stored in %s with suffix %s',
   1363             self.resultsdir, suffix)
   1364 
   1365     def is_kernel_saved(self):
   1366         """Check if kernel images are saved (backup_kernel called before).
   1367 
   1368         @return: True if the kernel is saved; otherwise, False.
   1369         """
   1370         return len(self._backup_kernel_sha) != 0
   1371 
   1372     def clear_saved_kernel(self):
   1373         """Clear the kernel saved by backup_kernel()."""
   1374         self._backup_kernel_sha = dict()
   1375 
   1376     def restore_kernel(self, suffix='.original'):
   1377         """Restore kernel from host in resultsdir.
   1378 
   1379         @param suffix: a string appended to backup file name.
   1380         """
   1381         if not self.is_kernel_changed():
   1382             return
   1383 
   1384         # Backup current corrupted kernel.
   1385         self.backup_kernel(suffix='.corrupt')
   1386 
   1387         # Restore kernel.
   1388         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1389         for p in ('A', 'B'):
   1390             remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
   1391             self._client.send_file(
   1392                     os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
   1393                     remote_path)
   1394             self.faft_client.kernel.write(p, remote_path)
   1395 
   1396         self.switcher.mode_aware_reboot()
   1397         logging.info('Successfully restored kernel.')
   1398 
   1399     def backup_cgpt_attributes(self):
   1400         """Backup CGPT partition table attributes."""
   1401         self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
   1402 
   1403     def restore_cgpt_attributes(self):
   1404         """Restore CGPT partition table attributes."""
   1405         current_table = self.faft_client.cgpt.get_attributes()
   1406         if current_table == self._backup_cgpt_attr:
   1407             return
   1408         logging.info('CGPT table is changed. Original: %r. Current: %r.',
   1409                      self._backup_cgpt_attr,
   1410                      current_table)
   1411         self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
   1412 
   1413         self.switcher.mode_aware_reboot()
   1414         logging.info('Successfully restored CGPT table.')
   1415 
   1416     def try_fwb(self, count=0):
   1417         """set to try booting FWB count # times
   1418 
   1419         Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
   1420         vboot2
   1421 
   1422         @param count: an integer specifying value to program into
   1423                       fwb_tries(vb1)/fw_try_next(vb2)
   1424         """
   1425         if self.fw_vboot2:
   1426             self.faft_client.system.set_fw_try_next('B', count)
   1427         else:
   1428             # vboot1: we need to boot into fwb at least once
   1429             if not count:
   1430                 count = count + 1
   1431             self.faft_client.system.set_try_fw_b(count)
   1432 
   1433