Home | History | Annotate | Download | only in faft
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import ast
      6 import ctypes
      7 import logging
      8 import os
      9 import pprint
     10 import re
     11 import time
     12 import uuid
     13 
     14 from threading import Timer
     15 from autotest_lib.client.bin import utils
     16 from autotest_lib.client.common_lib import error
     17 from autotest_lib.server import test
     18 from autotest_lib.server.cros import vboot_constants as vboot
     19 from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
     20 from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
     21 from autotest_lib.server.cros.faft.utils import mode_switcher
     22 from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
     23 from autotest_lib.server.cros.servo import chrome_ec
     24 from autotest_lib.server.cros.servo import chrome_usbpd
     25 
     26 ConnectionError = mode_switcher.ConnectionError
     27 
     28 
     29 class FAFTBase(test.test):
     30     """The base class of FAFT classes.
     31 
     32     It launches the FAFTClient on DUT, such that the test can access its
     33     firmware functions and interfaces. It also provides some methods to
     34     handle the reboot mechanism, in order to ensure FAFTClient is still
     35     connected after reboot.
     36     """
     37     def initialize(self, host):
     38         """Create a FAFTClient object and install the dependency."""
     39         self.servo = host.servo
     40         self.servo.initialize_dut()
     41         self._client = host
     42         self.faft_client = RPCProxy(host)
     43         self.lockfile = '/var/tmp/faft/lock'
     44 
     45 
     46 class FirmwareTest(FAFTBase):
     47     """
     48     Base class that sets up helper objects/functions for firmware tests.
     49 
     50     TODO: add documentaion as the FAFT rework progresses.
     51     """
     52     version = 1
     53 
     54     # Mapping of partition number of kernel and rootfs.
     55     KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
     56     ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
     57     OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
     58     OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
     59 
     60     CHROMEOS_MAGIC = "CHROMEOS"
     61     CORRUPTED_MAGIC = "CORRUPTD"
     62 
     63     # Delay for waiting client to return before EC suspend
     64     EC_SUSPEND_DELAY = 5
     65 
     66     # Delay between EC suspend and wake
     67     WAKE_DELAY = 10
     68 
     69     # Delay between closing and opening lid
     70     LID_DELAY = 1
     71 
     72     _SERVOD_LOG = '/var/log/servod.log'
     73 
     74     _ROOTFS_PARTITION_NUMBER = 3
     75 
     76     _backup_firmware_sha = ()
     77     _backup_kernel_sha = dict()
     78     _backup_cgpt_attr = dict()
     79     _backup_gbb_flags = None
     80     _backup_dev_mode = None
     81 
     82     # Class level variable, keep track the states of one time setup.
     83     # This variable is preserved across tests which inherit this class.
     84     _global_setup_done = {
     85         'gbb_flags': False,
     86         'reimage': False,
     87         'usb_check': False,
     88     }
     89 
     90     @classmethod
     91     def check_setup_done(cls, label):
     92         """Check if the given setup is done.
     93 
     94         @param label: The label of the setup.
     95         """
     96         return cls._global_setup_done[label]
     97 
     98     @classmethod
     99     def mark_setup_done(cls, label):
    100         """Mark the given setup done.
    101 
    102         @param label: The label of the setup.
    103         """
    104         cls._global_setup_done[label] = True
    105 
    106     @classmethod
    107     def unmark_setup_done(cls, label):
    108         """Mark the given setup not done.
    109 
    110         @param label: The label of the setup.
    111         """
    112         cls._global_setup_done[label] = False
    113 
    114     def initialize(self, host, cmdline_args, ec_wp=None):
    115         super(FirmwareTest, self).initialize(host)
    116         self.run_id = str(uuid.uuid4())
    117         logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
    118         # Parse arguments from command line
    119         args = {}
    120         self.power_control = host.POWER_CONTROL_RPM
    121         for arg in cmdline_args:
    122             match = re.search("^(\w+)=(.+)", arg)
    123             if match:
    124                 args[match.group(1)] = match.group(2)
    125         if 'power_control' in args:
    126             self.power_control = args['power_control']
    127             if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
    128                 raise error.TestError('Valid values for --args=power_control '
    129                                       'are %s. But you entered wrong argument '
    130                                       'as "%s".'
    131                                        % (host.POWER_CONTROL_VALID_ARGS,
    132                                        self.power_control))
    133 
    134         self.faft_config = FAFTConfig(
    135                 self.faft_client.system.get_platform_name())
    136         self.checkers = FAFTCheckers(self)
    137         self.switcher = mode_switcher.create_mode_switcher(self)
    138 
    139         if self.faft_config.chrome_ec:
    140             self.ec = chrome_ec.ChromeEC(self.servo)
    141         # Check for presence of a USBPD console
    142         if self.faft_config.chrome_usbpd:
    143             self.usbpd = chrome_usbpd.ChromeUSBPD(self.servo)
    144         elif self.faft_config.chrome_ec:
    145             # If no separate USBPD console, then PD exists on EC console
    146             self.usbpd = self.ec
    147         # Get plankton console
    148         self.plankton = host.plankton
    149         self.plankton_host = host._plankton_host
    150 
    151         self._setup_uart_capture()
    152         self._setup_servo_log()
    153         self._record_system_info()
    154         self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
    155         logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
    156         if self.fw_vboot2:
    157             self.faft_client.system.set_fw_try_next('A')
    158             if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B':
    159                 logging.info('mainfw_act is B. rebooting to set it A')
    160                 self.switcher.mode_aware_reboot()
    161         self._setup_gbb_flags()
    162         self._stop_service('update-engine')
    163         self._create_faft_lockfile()
    164         self._setup_ec_write_protect(ec_wp)
    165         # See chromium:239034 regarding needing this sync.
    166         self.blocking_sync()
    167         logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
    168 
    169     def cleanup(self):
    170         """Autotest cleanup function."""
    171         # Unset state checker in case it's set by subclass
    172         logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
    173         try:
    174             self.faft_client.system.is_available()
    175         except:
    176             # Remote is not responding. Revive DUT so that subsequent tests
    177             # don't fail.
    178             self._restore_routine_from_timeout()
    179         self.switcher.restore_mode()
    180         self._restore_ec_write_protect()
    181         self._restore_gbb_flags()
    182         self._start_service('update-engine')
    183         self._remove_faft_lockfile()
    184         self._record_servo_log()
    185         self._record_faft_client_log()
    186         self._cleanup_uart_capture()
    187         super(FirmwareTest, self).cleanup()
    188         logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
    189 
    190     def _record_system_info(self):
    191         """Record some critical system info to the attr keyval.
    192 
    193         This info is used by generate_test_report later.
    194         """
    195         system_info = {
    196             'hwid': self.faft_client.system.get_crossystem_value('hwid'),
    197             'ec_version': self.faft_client.ec.get_version(),
    198             'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
    199             'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
    200             'servod_version': self._client._servo_host.run(
    201                 'servod --version').stdout.strip(),
    202         }
    203         logging.info('System info:\n' + pprint.pformat(system_info))
    204         self.write_attr_keyval(system_info)
    205 
    206     def invalidate_firmware_setup(self):
    207         """Invalidate all firmware related setup state.
    208 
    209         This method is called when the firmware is re-flashed. It resets all
    210         firmware related setup states so that the next test setup properly
    211         again.
    212         """
    213         self.unmark_setup_done('gbb_flags')
    214 
    215     def _retrieve_recovery_reason_from_trap(self):
    216         """Try to retrieve the recovery reason from a trapped recovery screen.
    217 
    218         @return: The recovery_reason, 0 if any error.
    219         """
    220         recovery_reason = 0
    221         logging.info('Try to retrieve recovery reason...')
    222         if self.servo.get_usbkey_direction() == 'dut':
    223             self.switcher.bypass_rec_mode()
    224         else:
    225             self.servo.switch_usbkey('dut')
    226 
    227         try:
    228             self.switcher.wait_for_client()
    229             lines = self.faft_client.system.run_shell_command_get_output(
    230                         'crossystem recovery_reason')
    231             recovery_reason = int(lines[0])
    232             logging.info('Got the recovery reason %d.', recovery_reason)
    233         except ConnectionError:
    234             logging.error('Failed to get the recovery reason due to connection '
    235                           'error.')
    236         return recovery_reason
    237 
    238     def _reset_client(self):
    239         """Reset client to a workable state.
    240 
    241         This method is called when the client is not responsive. It may be
    242         caused by the following cases:
    243           - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
    244           - corrupted firmware;
    245           - corrutped OS image.
    246         """
    247         # DUT may halt on a firmware screen. Try cold reboot.
    248         logging.info('Try cold reboot...')
    249         self.switcher.mode_aware_reboot(reboot_type='cold',
    250                                         sync_before_boot=False,
    251                                         wait_for_dut_up=False)
    252         self.switcher.wait_for_client_offline()
    253         self.switcher.bypass_dev_mode()
    254         try:
    255             self.switcher.wait_for_client()
    256             return
    257         except ConnectionError:
    258             logging.warn('Cold reboot doesn\'t help, still connection error.')
    259 
    260         # DUT may be broken by a corrupted firmware. Restore firmware.
    261         # We assume the recovery boot still works fine. Since the recovery
    262         # code is in RO region and all FAFT tests don't change the RO region
    263         # except GBB.
    264         if self.is_firmware_saved():
    265             self._ensure_client_in_recovery()
    266             logging.info('Try restore the original firmware...')
    267             if self.is_firmware_changed():
    268                 try:
    269                     self.restore_firmware()
    270                     return
    271                 except ConnectionError:
    272                     logging.warn('Restoring firmware doesn\'t help, still '
    273                                  'connection error.')
    274 
    275         # Perhaps it's kernel that's broken. Let's try restoring it.
    276         if self.is_kernel_saved():
    277             self._ensure_client_in_recovery()
    278             logging.info('Try restore the original kernel...')
    279             if self.is_kernel_changed():
    280                 try:
    281                     self.restore_kernel()
    282                     return
    283                 except ConnectionError:
    284                     logging.warn('Restoring kernel doesn\'t help, still '
    285                                  'connection error.')
    286 
    287         # DUT may be broken by a corrupted OS image. Restore OS image.
    288         self._ensure_client_in_recovery()
    289         logging.info('Try restore the OS image...')
    290         self.faft_client.system.run_shell_command('chromeos-install --yes')
    291         self.switcher.mode_aware_reboot(wait_for_dut_up=False)
    292         self.switcher.wait_for_client_offline()
    293         self.switcher.bypass_dev_mode()
    294         try:
    295             self.switcher.wait_for_client()
    296             logging.info('Successfully restore OS image.')
    297             return
    298         except ConnectionError:
    299             logging.warn('Restoring OS image doesn\'t help, still connection '
    300                          'error.')
    301 
    302     def _ensure_client_in_recovery(self):
    303         """Ensure client in recovery boot; reboot into it if necessary.
    304 
    305         @raise TestError: if failed to boot the USB image.
    306         """
    307         logging.info('Try boot into USB image...')
    308         self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
    309                                      wait_for_dut_up=False)
    310         self.servo.switch_usbkey('host')
    311         self.switcher.bypass_rec_mode()
    312         try:
    313             self.switcher.wait_for_client()
    314         except ConnectionError:
    315             raise error.TestError('Failed to boot the USB image.')
    316 
    317     def _restore_routine_from_timeout(self):
    318         """A routine to try to restore the system from a timeout error.
    319 
    320         This method is called when FAFT failed to connect DUT after reboot.
    321 
    322         @raise TestFail: This exception is already raised, with a decription
    323                          why it failed.
    324         """
    325         # DUT is disconnected. Capture the UART output for debug.
    326         self._record_uart_capture()
    327 
    328         # TODO(waihong (at] chromium.org): Implement replugging the Ethernet to
    329         # identify if it is a network flaky.
    330 
    331         recovery_reason = self._retrieve_recovery_reason_from_trap()
    332 
    333         # Reset client to a workable state.
    334         self._reset_client()
    335 
    336         # Raise the proper TestFail exception.
    337         if recovery_reason:
    338             raise error.TestFail('Trapped in the recovery screen (reason: %d) '
    339                                  'and timed out' % recovery_reason)
    340         else:
    341             raise error.TestFail('Timed out waiting for DUT reboot')
    342 
    343     def assert_test_image_in_usb_disk(self, usb_dev=None):
    344         """Assert an USB disk plugged-in on servo and a test image inside.
    345 
    346         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
    347                         If None, it is detected automatically.
    348         @raise TestError: if USB disk not detected or not a test image.
    349         """
    350         if self.check_setup_done('usb_check'):
    351             return
    352         if usb_dev:
    353             assert self.servo.get_usbkey_direction() == 'host'
    354         else:
    355             self.servo.switch_usbkey('host')
    356             usb_dev = self.servo.probe_host_usb_dev()
    357             if not usb_dev:
    358                 raise error.TestError(
    359                         'An USB disk should be plugged in the servo board.')
    360 
    361         rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
    362         logging.info('usb dev is %s', usb_dev)
    363         tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
    364         self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
    365 
    366         try:
    367             usb_lsb = self.servo.system_output('cat %s' %
    368                 os.path.join(tmpd, 'etc/lsb-release'))
    369             logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
    370             dut_lsb = '\n'.join(self.faft_client.system.
    371                 run_shell_command_get_output('cat /etc/lsb-release'))
    372             logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
    373             if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
    374                 raise error.TestError('USB stick in servo is no test image')
    375             usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
    376             dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
    377             if usb_board != dut_board:
    378                 raise error.TestError('USB stick in servo contains a %s '
    379                     'image, but DUT is a %s' % (usb_board, dut_board))
    380         finally:
    381             for cmd in ('umount %s' % rootfs, 'sync', 'rm -rf %s' % tmpd):
    382                 self.servo.system(cmd)
    383 
    384         self.mark_setup_done('usb_check')
    385 
    386     def setup_usbkey(self, usbkey, host=None):
    387         """Setup the USB disk for the test.
    388 
    389         It checks the setup of USB disk and a valid ChromeOS test image inside.
    390         It also muxes the USB disk to either the host or DUT by request.
    391 
    392         @param usbkey: True if the USB disk is required for the test, False if
    393                        not required.
    394         @param host: Optional, True to mux the USB disk to host, False to mux it
    395                     to DUT, default to do nothing.
    396         """
    397         if usbkey:
    398             self.assert_test_image_in_usb_disk()
    399         elif host is None:
    400             # USB disk is not required for the test. Better to mux it to host.
    401             host = True
    402 
    403         if host is True:
    404             self.servo.switch_usbkey('host')
    405         elif host is False:
    406             self.servo.switch_usbkey('dut')
    407 
    408     def get_usbdisk_path_on_dut(self):
    409         """Get the path of the USB disk device plugged-in the servo on DUT.
    410 
    411         Returns:
    412           A string representing USB disk path, like '/dev/sdb', or None if
    413           no USB disk is found.
    414         """
    415         cmd = 'ls -d /dev/s*[a-z]'
    416         original_value = self.servo.get_usbkey_direction()
    417 
    418         # Make the dut unable to see the USB disk.
    419         self.servo.switch_usbkey('off')
    420         no_usb_set = set(
    421             self.faft_client.system.run_shell_command_get_output(cmd))
    422 
    423         # Make the dut able to see the USB disk.
    424         self.servo.switch_usbkey('dut')
    425         time.sleep(self.faft_config.usb_plug)
    426         has_usb_set = set(
    427             self.faft_client.system.run_shell_command_get_output(cmd))
    428 
    429         # Back to its original value.
    430         if original_value != self.servo.get_usbkey_direction():
    431             self.servo.switch_usbkey(original_value)
    432 
    433         diff_set = has_usb_set - no_usb_set
    434         if len(diff_set) == 1:
    435             return diff_set.pop()
    436         else:
    437             return None
    438 
    439     def _create_faft_lockfile(self):
    440         """Creates the FAFT lockfile."""
    441         logging.info('Creating FAFT lockfile...')
    442         command = 'touch %s' % (self.lockfile)
    443         self.faft_client.system.run_shell_command(command)
    444 
    445     def _remove_faft_lockfile(self):
    446         """Removes the FAFT lockfile."""
    447         logging.info('Removing FAFT lockfile...')
    448         command = 'rm -f %s' % (self.lockfile)
    449         self.faft_client.system.run_shell_command(command)
    450 
    451     def _stop_service(self, service):
    452         """Stops a upstart service on the client.
    453 
    454         @param service: The name of the upstart service.
    455         """
    456         logging.info('Stopping %s...', service)
    457         command = 'status %s | grep stop || stop %s' % (service, service)
    458         self.faft_client.system.run_shell_command(command)
    459 
    460     def _start_service(self, service):
    461         """Starts a upstart service on the client.
    462 
    463         @param service: The name of the upstart service.
    464         """
    465         logging.info('Starting %s...', service)
    466         command = 'status %s | grep start || start %s' % (service, service)
    467         self.faft_client.system.run_shell_command(command)
    468 
    469     def clear_set_gbb_flags(self, clear_mask, set_mask):
    470         """Clear and set the GBB flags in the current flashrom.
    471 
    472         @param clear_mask: A mask of flags to be cleared.
    473         @param set_mask: A mask of flags to be set.
    474         """
    475         gbb_flags = self.faft_client.bios.get_gbb_flags()
    476         new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
    477         if new_flags != gbb_flags:
    478             self._backup_gbb_flags = gbb_flags
    479             logging.info('Changing GBB flags from 0x%x to 0x%x.',
    480                          gbb_flags, new_flags)
    481             self.faft_client.bios.set_gbb_flags(new_flags)
    482             # If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state
    483             if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON):
    484                 self.switcher.mode_aware_reboot()
    485         else:
    486             logging.info('Current GBB flags look good for test: 0x%x.',
    487                          gbb_flags)
    488 
    489     def check_ec_capability(self, required_cap=None, suppress_warning=False):
    490         """Check if current platform has required EC capabilities.
    491 
    492         @param required_cap: A list containing required EC capabilities. Pass in
    493                              None to only check for presence of Chrome EC.
    494         @param suppress_warning: True to suppress any warning messages.
    495         @return: True if requirements are met. Otherwise, False.
    496         """
    497         if not self.faft_config.chrome_ec:
    498             if not suppress_warning:
    499                 logging.warn('Requires Chrome EC to run this test.')
    500             return False
    501 
    502         if not required_cap:
    503             return True
    504 
    505         for cap in required_cap:
    506             if cap not in self.faft_config.ec_capability:
    507                 if not suppress_warning:
    508                     logging.warn('Requires EC capability "%s" to run this '
    509                                  'test.', cap)
    510                 return False
    511 
    512         return True
    513 
    514     def check_root_part_on_non_recovery(self, part):
    515         """Check the partition number of root device and on normal/dev boot.
    516 
    517         @param part: A string of partition number, e.g.'3'.
    518         @return: True if the root device matched and on normal/dev boot;
    519                  otherwise, False.
    520         """
    521         return self.checkers.root_part_checker(part) and \
    522                 self.checkers.crossystem_checker({
    523                     'mainfw_type': ('normal', 'developer'),
    524                 })
    525 
    526     def _join_part(self, dev, part):
    527         """Return a concatenated string of device and partition number.
    528 
    529         @param dev: A string of device, e.g.'/dev/sda'.
    530         @param part: A string of partition number, e.g.'3'.
    531         @return: A concatenated string of device and partition number,
    532                  e.g.'/dev/sda3'.
    533 
    534         >>> seq = FirmwareTest()
    535         >>> seq._join_part('/dev/sda', '3')
    536         '/dev/sda3'
    537         >>> seq._join_part('/dev/mmcblk0', '2')
    538         '/dev/mmcblk0p2'
    539         """
    540         if 'mmcblk' in dev:
    541             return dev + 'p' + part
    542         else:
    543             return dev + part
    544 
    545     def copy_kernel_and_rootfs(self, from_part, to_part):
    546         """Copy kernel and rootfs from from_part to to_part.
    547 
    548         @param from_part: A string of partition number to be copied from.
    549         @param to_part: A string of partition number to be copied to.
    550         """
    551         root_dev = self.faft_client.system.get_root_dev()
    552         logging.info('Copying kernel from %s to %s. Please wait...',
    553                      from_part, to_part)
    554         self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
    555                 (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
    556                  self._join_part(root_dev, self.KERNEL_MAP[to_part])))
    557         logging.info('Copying rootfs from %s to %s. Please wait...',
    558                      from_part, to_part)
    559         self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
    560                 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
    561                  self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
    562 
    563     def ensure_kernel_boot(self, part):
    564         """Ensure the request kernel boot.
    565 
    566         If not, it duplicates the current kernel to the requested kernel
    567         and sets the requested higher priority to ensure it boot.
    568 
    569         @param part: A string of kernel partition number or 'a'/'b'.
    570         """
    571         if not self.checkers.root_part_checker(part):
    572             if self.faft_client.kernel.diff_a_b():
    573                 self.copy_kernel_and_rootfs(
    574                         from_part=self.OTHER_KERNEL_MAP[part],
    575                         to_part=part)
    576             self.reset_and_prioritize_kernel(part)
    577             self.switcher.mode_aware_reboot()
    578 
    579     def set_hardware_write_protect(self, enable):
    580         """Set hardware write protect pin.
    581 
    582         @param enable: True if asserting write protect pin. Otherwise, False.
    583         """
    584         self.servo.set('fw_wp_vref', self.faft_config.wp_voltage)
    585         self.servo.set('fw_wp_en', 'on')
    586         self.servo.set('fw_wp', 'on' if enable else 'off')
    587 
    588     def set_ec_write_protect_and_reboot(self, enable):
    589         """Set EC write protect status and reboot to take effect.
    590 
    591         The write protect state is only activated if both hardware write
    592         protect pin is asserted and software write protect flag is set.
    593         This method asserts/deasserts hardware write protect pin first, and
    594         set corresponding EC software write protect flag.
    595 
    596         If the device uses non-Chrome EC, set the software write protect via
    597         flashrom.
    598 
    599         If the device uses Chrome EC, a reboot is required for write protect
    600         to take effect. Since the software write protect flag cannot be unset
    601         if hardware write protect pin is asserted, we need to deasserted the
    602         pin first if we are deactivating write protect. Similarly, a reboot
    603         is required before we can modify the software flag.
    604 
    605         @param enable: True if activating EC write protect. Otherwise, False.
    606         """
    607         self.set_hardware_write_protect(enable)
    608         if self.faft_config.chrome_ec:
    609             self.set_chrome_ec_write_protect_and_reboot(enable)
    610         else:
    611             self.faft_client.ec.set_write_protect(enable)
    612             self.switcher.mode_aware_reboot()
    613 
    614     def set_chrome_ec_write_protect_and_reboot(self, enable):
    615         """Set Chrome EC write protect status and reboot to take effect.
    616 
    617         @param enable: True if activating EC write protect. Otherwise, False.
    618         """
    619         if enable:
    620             # Set write protect flag and reboot to take effect.
    621             self.ec.set_flash_write_protect(enable)
    622             self.sync_and_ec_reboot()
    623         else:
    624             # Reboot after deasserting hardware write protect pin to deactivate
    625             # write protect. And then remove software write protect flag.
    626             self.sync_and_ec_reboot()
    627             self.ec.set_flash_write_protect(enable)
    628 
    629     def _setup_ec_write_protect(self, ec_wp):
    630         """Setup for EC write-protection.
    631 
    632         It makes sure the EC in the requested write-protection state. If not, it
    633         flips the state. Flipping the write-protection requires DUT reboot.
    634 
    635         @param ec_wp: True to request EC write-protected; False to request EC
    636                       not write-protected; None to do nothing.
    637         """
    638         if ec_wp is None:
    639             self._old_ec_wp = None
    640             return
    641         self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'})
    642         if ec_wp != self._old_ec_wp:
    643             logging.info('The test required EC is %swrite-protected. Reboot '
    644                          'and flip the state.', '' if ec_wp else 'not ')
    645             self.switcher.mode_aware_reboot(
    646                     'custom',
    647                      lambda:self.set_ec_write_protect_and_reboot(ec_wp))
    648 
    649     def _restore_ec_write_protect(self):
    650         """Restore the original EC write-protection."""
    651         if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None):
    652             return
    653         if not self.checkers.crossystem_checker(
    654                 {'wpsw_boot': '1' if self._old_ec_wp else '0'}):
    655             logging.info('Restore original EC write protection and reboot.')
    656             self.switcher.mode_aware_reboot(
    657                     'custom',
    658                     lambda:self.set_ec_write_protect_and_reboot(
    659                             self._old_ec_wp))
    660 
    661     def _setup_uart_capture(self):
    662         """Setup the CPU/EC/PD UART capture."""
    663         self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
    664         self.servo.set('cpu_uart_capture', 'on')
    665         self.ec_uart_file = None
    666         self.usbpd_uart_file = None
    667         if self.faft_config.chrome_ec:
    668             try:
    669                 self.servo.set('ec_uart_capture', 'on')
    670                 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
    671             except error.TestFail as e:
    672                 if 'No control named' in str(e):
    673                     logging.warn('The servod is too old that ec_uart_capture '
    674                                  'not supported.')
    675             # Log separate PD console if supported
    676             if self.check_ec_capability(['usbpd_uart'], suppress_warning=True):
    677                 try:
    678                     self.servo.set('usbpd_uart_capture', 'on')
    679                     self.usbpd_uart_file = os.path.join(self.resultsdir,
    680                                                         'usbpd_uart.txt')
    681                 except error.TestFail as e:
    682                     if 'No control named' in str(e):
    683                         logging.warn('The servod is too old that '
    684                                      'usbpd_uart_capture is not supported.')
    685         else:
    686             logging.info('Not a Google EC, cannot capture ec console output.')
    687 
    688     def _record_uart_capture(self):
    689         """Record the CPU/EC/PD UART output stream to files."""
    690         if self.cpu_uart_file:
    691             with open(self.cpu_uart_file, 'a') as f:
    692                 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
    693         if self.ec_uart_file and self.faft_config.chrome_ec:
    694             with open(self.ec_uart_file, 'a') as f:
    695                 f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
    696         if (self.usbpd_uart_file and self.faft_config.chrome_ec and
    697             self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
    698             with open(self.usbpd_uart_file, 'a') as f:
    699                 f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream')))
    700 
    701     def _cleanup_uart_capture(self):
    702         """Cleanup the CPU/EC/PD UART capture."""
    703         # Flush the remaining UART output.
    704         self._record_uart_capture()
    705         self.servo.set('cpu_uart_capture', 'off')
    706         if self.ec_uart_file and self.faft_config.chrome_ec:
    707             self.servo.set('ec_uart_capture', 'off')
    708         if (self.usbpd_uart_file and self.faft_config.chrome_ec and
    709             self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
    710             self.servo.set('usbpd_uart_capture', 'off')
    711 
    712     def _get_power_state(self, power_state):
    713         """
    714         Return the current power state of the AP
    715         """
    716         return self.ec.send_command_get_output("powerinfo", [power_state])
    717 
    718     def wait_power_state(self, power_state, retries):
    719         """
    720         Wait for certain power state.
    721 
    722         @param power_state: power state you are expecting
    723         @param retries: retries.  This is necessary if AP is powering down
    724         and transitioning through different states.
    725         """
    726         logging.info('Checking power state "%s" maximum %d times.',
    727                      power_state, retries)
    728         while retries > 0:
    729             logging.info("try count: %d" % retries)
    730             try:
    731                 retries = retries - 1
    732                 ret = self._get_power_state(power_state)
    733                 return True
    734             except error.TestFail:
    735                 pass
    736         return False
    737 
    738     def delayed(seconds):
    739         def decorator(f):
    740             def wrapper(*args, **kargs):
    741                 t = Timer(seconds, f, args, kargs)
    742                 t.start()
    743             return wrapper
    744         return decorator
    745 
    746     @delayed(WAKE_DELAY)
    747     def wake_by_power_button(self):
    748         """Delay by WAKE_DELAY seconds and then wake DUT with power button."""
    749         self.servo.power_normal_press()
    750 
    751     @delayed(WAKE_DELAY)
    752     def wake_by_lid_switch(self):
    753         """Delay by WAKE_DELAY seconds and then wake DUT with lid switch."""
    754         self.servo.set('lid_open', 'no')
    755         time.sleep(self.LID_DELAY)
    756         self.servo.set('lid_open', 'yes')
    757 
    758     def suspend_as_reboot(self, wake_func):
    759         """
    760         Suspend DUT and also kill FAFT client so that this acts like a reboot.
    761 
    762         Args:
    763           wake_func: A function that is called to wake DUT. Note that this
    764             function must delay itself so that we don't wake DUT before
    765             suspend_as_reboot returns.
    766         """
    767         cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY
    768         self.faft_client.system.run_shell_command(cmd)
    769         self.faft_client.disconnect()
    770         time.sleep(self.EC_SUSPEND_DELAY)
    771         logging.info("wake function disabled")
    772         wake_func()
    773 
    774     def _fetch_servo_log(self):
    775         """Fetch the servo log."""
    776         cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
    777         servo_log = self.servo.system_output(cmd)
    778         return None if servo_log == 'NOTFOUND' else servo_log
    779 
    780     def _setup_servo_log(self):
    781         """Setup the servo log capturing."""
    782         self.servo_log_original_len = -1
    783         if self.servo.is_localhost():
    784             # No servo log recorded when servod runs locally.
    785             return
    786 
    787         servo_log = self._fetch_servo_log()
    788         if servo_log:
    789             self.servo_log_original_len = len(servo_log)
    790         else:
    791             logging.warn('Servo log file not found.')
    792 
    793     def _record_servo_log(self):
    794         """Record the servo log to the results directory."""
    795         if self.servo_log_original_len != -1:
    796             servo_log = self._fetch_servo_log()
    797             servo_log_file = os.path.join(self.resultsdir, 'servod.log')
    798             with open(servo_log_file, 'a') as f:
    799                 f.write(servo_log[self.servo_log_original_len:])
    800 
    801     def _record_faft_client_log(self):
    802         """Record the faft client log to the results directory."""
    803         client_log = self.faft_client.system.dump_log(True)
    804         client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
    805         with open(client_log_file, 'w') as f:
    806             f.write(client_log)
    807 
    808     def _setup_gbb_flags(self):
    809         """Setup the GBB flags for FAFT test."""
    810         if self.faft_config.gbb_version < 1.1:
    811             logging.info('Skip modifying GBB on versions older than 1.1.')
    812             return
    813 
    814         if self.check_setup_done('gbb_flags'):
    815             return
    816 
    817         logging.info('Set proper GBB flags for test.')
    818         self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY |
    819                                  vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
    820                                  vboot.GBB_FLAG_FORCE_DEV_BOOT_USB |
    821                                  vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK |
    822                                  vboot.GBB_FLAG_FORCE_DEV_BOOT_FASTBOOT_FULL_CAP,
    823                                  vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM |
    824                                  vboot.GBB_FLAG_FAFT_KEY_OVERIDE)
    825         self.mark_setup_done('gbb_flags')
    826 
    827     def drop_backup_gbb_flags(self):
    828         """Drops the backup GBB flags.
    829 
    830         This can be used when a test intends to permanently change GBB flags.
    831         """
    832         self._backup_gbb_flags = None
    833 
    834     def _restore_gbb_flags(self):
    835         """Restore GBB flags to their original state."""
    836         if self._backup_gbb_flags is None:
    837             return
    838         # Setting up and restoring the GBB flags take a lot of time. For
    839         # speed-up purpose, don't restore it.
    840         logging.info('***')
    841         logging.info('*** Please manually restore the original GBB flags to: '
    842                      '0x%x ***', self._backup_gbb_flags)
    843         logging.info('***')
    844         self.unmark_setup_done('gbb_flags')
    845 
    846     def setup_tried_fwb(self, tried_fwb):
    847         """Setup for fw B tried state.
    848 
    849         It makes sure the system in the requested fw B tried state. If not, it
    850         tries to do so.
    851 
    852         @param tried_fwb: True if requested in tried_fwb=1;
    853                           False if tried_fwb=0.
    854         """
    855         if tried_fwb:
    856             if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
    857                 logging.info(
    858                     'Firmware is not booted with tried_fwb. Reboot into it.')
    859                 self.faft_client.system.set_try_fw_b()
    860         else:
    861             if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
    862                 logging.info(
    863                     'Firmware is booted with tried_fwb. Reboot to clear.')
    864 
    865     def power_on(self):
    866         """Switch DUT AC power on."""
    867         self._client.power_on(self.power_control)
    868 
    869     def power_off(self):
    870         """Switch DUT AC power off."""
    871         self._client.power_off(self.power_control)
    872 
    873     def power_cycle(self):
    874         """Power cycle DUT AC power."""
    875         self._client.power_cycle(self.power_control)
    876 
    877     def setup_rw_boot(self, section='a'):
    878         """Make sure firmware is in RW-boot mode.
    879 
    880         If the given firmware section is in RO-boot mode, turn off the RO-boot
    881         flag and reboot DUT into RW-boot mode.
    882 
    883         @param section: A firmware section, either 'a' or 'b'.
    884         """
    885         flags = self.faft_client.bios.get_preamble_flags(section)
    886         if flags & vboot.PREAMBLE_USE_RO_NORMAL:
    887             flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
    888             self.faft_client.bios.set_preamble_flags(section, flags)
    889             self.switcher.mode_aware_reboot()
    890 
    891     def setup_kernel(self, part):
    892         """Setup for kernel test.
    893 
    894         It makes sure both kernel A and B bootable and the current boot is
    895         the requested kernel part.
    896 
    897         @param part: A string of kernel partition number or 'a'/'b'.
    898         """
    899         self.ensure_kernel_boot(part)
    900         logging.info('Checking the integrity of kernel B and rootfs B...')
    901         if (self.faft_client.kernel.diff_a_b() or
    902                 not self.faft_client.rootfs.verify_rootfs('B')):
    903             logging.info('Copying kernel and rootfs from A to B...')
    904             self.copy_kernel_and_rootfs(from_part=part,
    905                                         to_part=self.OTHER_KERNEL_MAP[part])
    906         self.reset_and_prioritize_kernel(part)
    907 
    908     def reset_and_prioritize_kernel(self, part):
    909         """Make the requested partition highest priority.
    910 
    911         This function also reset kerenl A and B to bootable.
    912 
    913         @param part: A string of partition number to be prioritized.
    914         """
    915         root_dev = self.faft_client.system.get_root_dev()
    916         # Reset kernel A and B to bootable.
    917         self.faft_client.system.run_shell_command(
    918             'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
    919         self.faft_client.system.run_shell_command(
    920             'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
    921         # Set kernel part highest priority.
    922         self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
    923                 (self.KERNEL_MAP[part], root_dev))
    924 
    925     def blocking_sync(self):
    926         """Run a blocking sync command."""
    927         # The double calls to sync fakes a blocking call
    928         # since the first call returns before the flush
    929         # is complete, but the second will wait for the
    930         # first to finish.
    931         self.faft_client.system.run_shell_command('sync')
    932         self.faft_client.system.run_shell_command('sync')
    933 
    934         # sync only sends SYNCHRONIZE_CACHE but doesn't
    935         # check the status. For mmc devices, use `mmc
    936         # status get` command to send an empty command to
    937         # wait for the disk to be available again.  For
    938         # other devices, hdparm sends TUR to check if
    939         # a device is ready for transfer operation.
    940         root_dev = self.faft_client.system.get_root_dev()
    941         if 'mmcblk' in root_dev:
    942             self.faft_client.system.run_shell_command('mmc status get %s' %
    943                                                       root_dev)
    944         else:
    945             self.faft_client.system.run_shell_command('hdparm -f %s' % root_dev)
    946 
    947     def sync_and_ec_reboot(self, flags=''):
    948         """Request the client sync and do a EC triggered reboot.
    949 
    950         @param flags: Optional, a space-separated string of flags passed to EC
    951                       reboot command, including:
    952                           default: EC soft reboot;
    953                           'hard': EC cold/hard reboot.
    954         """
    955         self.blocking_sync()
    956         self.ec.reboot(flags)
    957         time.sleep(self.faft_config.ec_boot_to_console)
    958         self.check_lid_and_power_on()
    959 
    960     def reboot_and_reset_tpm(self):
    961         """Reboot into recovery mode, reset TPM, then reboot back to disk."""
    962         self.switcher.reboot_to_mode(to_mode='rec')
    963         self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
    964         self.switcher.mode_aware_reboot()
    965 
    966     def full_power_off_and_on(self):
    967         """Shutdown the device by pressing power button and power on again."""
    968         boot_id = self.get_bootid()
    969         # Press power button to trigger Chrome OS normal shutdown process.
    970         # We use a customized delay since the normal-press 1.2s is not enough.
    971         self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
    972         # device can take 44-51 seconds to restart,
    973         # add buffer from the default timeout of 60 seconds.
    974         self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
    975         time.sleep(self.faft_config.shutdown)
    976         # Short press power button to boot DUT again.
    977         self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
    978 
    979     def check_lid_and_power_on(self):
    980         """
    981         On devices with EC software sync, system powers on after EC reboots if
    982         lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
    983         This method checks lid switch state and presses power button if
    984         necessary.
    985         """
    986         if self.servo.get("lid_open") == "no":
    987             time.sleep(self.faft_config.software_sync)
    988             self.servo.power_short_press()
    989 
    990     def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
    991         """Modify the kernel header magic in USB stick.
    992 
    993         The kernel header magic is the first 8-byte of kernel partition.
    994         We modify it to make it fail on kernel verification check.
    995 
    996         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
    997         @param from_magic: A string of magic which we change it from.
    998         @param to_magic: A string of magic which we change it to.
    999         @raise TestError: if failed to change magic.
   1000         """
   1001         assert len(from_magic) == 8
   1002         assert len(to_magic) == 8
   1003         # USB image only contains one kernel.
   1004         kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
   1005         read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
   1006         current_magic = self.servo.system_output(read_cmd)
   1007         if current_magic == to_magic:
   1008             logging.info("The kernel magic is already %s.", current_magic)
   1009             return
   1010         if current_magic != from_magic:
   1011             raise error.TestError("Invalid kernel image on USB: wrong magic.")
   1012 
   1013         logging.info('Modify the kernel magic in USB, from %s to %s.',
   1014                      from_magic, to_magic)
   1015         write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
   1016                      " 2>/dev/null" % (to_magic, kernel_part))
   1017         self.servo.system(write_cmd)
   1018 
   1019         if self.servo.system_output(read_cmd) != to_magic:
   1020             raise error.TestError("Failed to write new magic.")
   1021 
   1022     def corrupt_usb_kernel(self, usb_dev):
   1023         """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
   1024 
   1025         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
   1026         """
   1027         self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
   1028                                 self.CORRUPTED_MAGIC)
   1029 
   1030     def restore_usb_kernel(self, usb_dev):
   1031         """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
   1032 
   1033         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
   1034         """
   1035         self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
   1036                                 self.CHROMEOS_MAGIC)
   1037 
   1038     def _call_action(self, action_tuple, check_status=False):
   1039         """Call the action function with/without arguments.
   1040 
   1041         @param action_tuple: A function, or a tuple (function, args, error_msg),
   1042                              in which, args and error_msg are optional. args is
   1043                              either a value or a tuple if multiple arguments.
   1044                              This can also be a list containing multiple
   1045                              function or tuple. In this case, these actions are
   1046                              called in sequence.
   1047         @param check_status: Check the return value of action function. If not
   1048                              succeed, raises a TestFail exception.
   1049         @return: The result value of the action function.
   1050         @raise TestError: An error when the action function is not callable.
   1051         @raise TestFail: When check_status=True, action function not succeed.
   1052         """
   1053         if isinstance(action_tuple, list):
   1054             return all([self._call_action(action, check_status=check_status)
   1055                         for action in action_tuple])
   1056 
   1057         action = action_tuple
   1058         args = ()
   1059         error_msg = 'Not succeed'
   1060         if isinstance(action_tuple, tuple):
   1061             action = action_tuple[0]
   1062             if len(action_tuple) >= 2:
   1063                 args = action_tuple[1]
   1064                 if not isinstance(args, tuple):
   1065                     args = (args,)
   1066             if len(action_tuple) >= 3:
   1067                 error_msg = action_tuple[2]
   1068 
   1069         if action is None:
   1070             return
   1071 
   1072         if not callable(action):
   1073             raise error.TestError('action is not callable!')
   1074 
   1075         info_msg = 'calling %s' % str(action)
   1076         if args:
   1077             info_msg += ' with args %s' % str(args)
   1078         logging.info(info_msg)
   1079         ret = action(*args)
   1080 
   1081         if check_status and not ret:
   1082             raise error.TestFail('%s: %s returning %s' %
   1083                                  (error_msg, info_msg, str(ret)))
   1084         return ret
   1085 
   1086     def run_shutdown_process(self, shutdown_action, pre_power_action=None,
   1087             run_power_action=True, post_power_action=None, shutdown_timeout=None):
   1088         """Run shutdown_action(), which makes DUT shutdown, and power it on.
   1089 
   1090         @param shutdown_action: function which makes DUT shutdown, like
   1091                                 pressing power key.
   1092         @param pre_power_action: function which is called before next power on.
   1093         @param power_action: power_key press by default, set to None to skip.
   1094         @param post_power_action: function which is called after next power on.
   1095         @param shutdown_timeout: a timeout to confirm DUT shutdown.
   1096         @raise TestFail: if the shutdown_action() failed to turn DUT off.
   1097         """
   1098         self._call_action(shutdown_action)
   1099         logging.info('Wait to ensure DUT shut down...')
   1100         try:
   1101             if shutdown_timeout is None:
   1102                 shutdown_timeout = self.faft_config.shutdown_timeout
   1103             self.switcher.wait_for_client(timeout=shutdown_timeout)
   1104             raise error.TestFail(
   1105                     'Should shut the device down after calling %s.' %
   1106                     str(shutdown_action))
   1107         except ConnectionError:
   1108             logging.info(
   1109                 'DUT is surely shutdown. We are going to power it on again...')
   1110 
   1111         if pre_power_action:
   1112             self._call_action(pre_power_action)
   1113         if run_power_action:
   1114             self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
   1115         if post_power_action:
   1116             self._call_action(post_power_action)
   1117 
   1118     def get_bootid(self, retry=3):
   1119         """
   1120         Return the bootid.
   1121         """
   1122         boot_id = None
   1123         while retry:
   1124             try:
   1125                 boot_id = self._client.get_boot_id()
   1126                 break
   1127             except error.AutoservRunError:
   1128                 retry -= 1
   1129                 if retry:
   1130                     logging.info('Retry to get boot_id...')
   1131                 else:
   1132                     logging.warning('Failed to get boot_id.')
   1133         logging.info('boot_id: %s', boot_id)
   1134         return boot_id
   1135 
   1136     def check_state(self, func):
   1137         """
   1138         Wrapper around _call_action with check_status set to True. This is a
   1139         helper function to be used by tests and is currently implemented by
   1140         calling _call_action with check_status=True.
   1141 
   1142         TODO: This function's arguments need to be made more stringent. And
   1143         its functionality should be moved over to check functions directly in
   1144         the future.
   1145 
   1146         @param func: A function, or a tuple (function, args, error_msg),
   1147                              in which, args and error_msg are optional. args is
   1148                              either a value or a tuple if multiple arguments.
   1149                              This can also be a list containing multiple
   1150                              function or tuple. In this case, these actions are
   1151                              called in sequence.
   1152         @return: The result value of the action function.
   1153         @raise TestFail: If the function does notsucceed.
   1154         """
   1155         logging.info("-[FAFT]-[ start stepstate_checker ]----------")
   1156         self._call_action(func, check_status=True)
   1157         logging.info("-[FAFT]-[ end state_checker ]----------------")
   1158 
   1159     def get_current_firmware_sha(self):
   1160         """Get current firmware sha of body and vblock.
   1161 
   1162         @return: Current firmware sha follows the order (
   1163                  vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
   1164         """
   1165         current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
   1166                                 self.faft_client.bios.get_body_sha('a'),
   1167                                 self.faft_client.bios.get_sig_sha('b'),
   1168                                 self.faft_client.bios.get_body_sha('b'))
   1169         if not all(current_firmware_sha):
   1170             raise error.TestError('Failed to get firmware sha.')
   1171         return current_firmware_sha
   1172 
   1173     def is_firmware_changed(self):
   1174         """Check if the current firmware changed, by comparing its SHA.
   1175 
   1176         @return: True if it is changed, otherwise Flase.
   1177         """
   1178         # Device may not be rebooted after test.
   1179         self.faft_client.bios.reload()
   1180 
   1181         current_sha = self.get_current_firmware_sha()
   1182 
   1183         if current_sha == self._backup_firmware_sha:
   1184             return False
   1185         else:
   1186             corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
   1187             corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
   1188             corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
   1189             corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
   1190             logging.info("Firmware changed:")
   1191             logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
   1192             logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
   1193             logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
   1194             logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
   1195             return True
   1196 
   1197     def backup_firmware(self, suffix='.original'):
   1198         """Backup firmware to file, and then send it to host.
   1199 
   1200         @param suffix: a string appended to backup file name
   1201         """
   1202         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1203         remote_bios_path = os.path.join(remote_temp_dir, 'bios')
   1204         self.faft_client.bios.dump_whole(remote_bios_path)
   1205         self._client.get_file(remote_bios_path,
   1206                               os.path.join(self.resultsdir, 'bios' + suffix))
   1207         self._client.run('rm -rf %s' % remote_temp_dir)
   1208         logging.info('Backup firmware stored in %s with suffix %s',
   1209             self.resultsdir, suffix)
   1210 
   1211         self._backup_firmware_sha = self.get_current_firmware_sha()
   1212 
   1213     def is_firmware_saved(self):
   1214         """Check if a firmware saved (called backup_firmware before).
   1215 
   1216         @return: True if the firmware is backuped; otherwise False.
   1217         """
   1218         return self._backup_firmware_sha != ()
   1219 
   1220     def clear_saved_firmware(self):
   1221         """Clear the firmware saved by the method backup_firmware."""
   1222         self._backup_firmware_sha = ()
   1223 
   1224     def restore_firmware(self, suffix='.original'):
   1225         """Restore firmware from host in resultsdir.
   1226 
   1227         @param suffix: a string appended to backup file name
   1228         """
   1229         if not self.is_firmware_changed():
   1230             return
   1231 
   1232         # Backup current corrupted firmware.
   1233         self.backup_firmware(suffix='.corrupt')
   1234 
   1235         # Restore firmware.
   1236         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1237         self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
   1238                                os.path.join(remote_temp_dir, 'bios'))
   1239 
   1240         self.faft_client.bios.write_whole(
   1241             os.path.join(remote_temp_dir, 'bios'))
   1242         self.switcher.mode_aware_reboot()
   1243         logging.info('Successfully restore firmware.')
   1244 
   1245     def setup_firmwareupdate_shellball(self, shellball=None):
   1246         """Deside a shellball to use in firmware update test.
   1247 
   1248         Check if there is a given shellball, and it is a shell script. Then,
   1249         send it to the remote host. Otherwise, use
   1250         /usr/sbin/chromeos-firmwareupdate.
   1251 
   1252         @param shellball: path of a shellball or default to None.
   1253 
   1254         @return: Path of shellball in remote host. If use default shellball,
   1255                  reutrn None.
   1256         """
   1257         updater_path = None
   1258         if shellball:
   1259             # Determine the firmware file is a shellball or a raw binary.
   1260             is_shellball = (utils.system_output("file %s" % shellball).find(
   1261                     "shell script") != -1)
   1262             if is_shellball:
   1263                 logging.info('Device will update firmware with shellball %s',
   1264                              shellball)
   1265                 temp_dir = self.faft_client.system.create_temp_dir(
   1266                             'shellball_')
   1267                 temp_shellball = os.path.join(temp_dir, 'updater.sh')
   1268                 self._client.send_file(shellball, temp_shellball)
   1269                 updater_path = temp_shellball
   1270             else:
   1271                 raise error.TestFail(
   1272                     'The given shellball is not a shell script.')
   1273             return updater_path
   1274 
   1275     def is_kernel_changed(self):
   1276         """Check if the current kernel is changed, by comparing its SHA1 hash.
   1277 
   1278         @return: True if it is changed; otherwise, False.
   1279         """
   1280         changed = False
   1281         for p in ('A', 'B'):
   1282             backup_sha = self._backup_kernel_sha.get(p, None)
   1283             current_sha = self.faft_client.kernel.get_sha(p)
   1284             if backup_sha != current_sha:
   1285                 changed = True
   1286                 logging.info('Kernel %s is changed', p)
   1287         return changed
   1288 
   1289     def backup_kernel(self, suffix='.original'):
   1290         """Backup kernel to files, and the send them to host.
   1291 
   1292         @param suffix: a string appended to backup file name.
   1293         """
   1294         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1295         for p in ('A', 'B'):
   1296             remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
   1297             self.faft_client.kernel.dump(p, remote_path)
   1298             self._client.get_file(
   1299                     remote_path,
   1300                     os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
   1301             self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
   1302         logging.info('Backup kernel stored in %s with suffix %s',
   1303             self.resultsdir, suffix)
   1304 
   1305     def is_kernel_saved(self):
   1306         """Check if kernel images are saved (backup_kernel called before).
   1307 
   1308         @return: True if the kernel is saved; otherwise, False.
   1309         """
   1310         return len(self._backup_kernel_sha) != 0
   1311 
   1312     def clear_saved_kernel(self):
   1313         """Clear the kernel saved by backup_kernel()."""
   1314         self._backup_kernel_sha = dict()
   1315 
   1316     def restore_kernel(self, suffix='.original'):
   1317         """Restore kernel from host in resultsdir.
   1318 
   1319         @param suffix: a string appended to backup file name.
   1320         """
   1321         if not self.is_kernel_changed():
   1322             return
   1323 
   1324         # Backup current corrupted kernel.
   1325         self.backup_kernel(suffix='.corrupt')
   1326 
   1327         # Restore kernel.
   1328         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1329         for p in ('A', 'B'):
   1330             remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
   1331             self._client.send_file(
   1332                     os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
   1333                     remote_path)
   1334             self.faft_client.kernel.write(p, remote_path)
   1335 
   1336         self.switcher.mode_aware_reboot()
   1337         logging.info('Successfully restored kernel.')
   1338 
   1339     def backup_cgpt_attributes(self):
   1340         """Backup CGPT partition table attributes."""
   1341         self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
   1342 
   1343     def restore_cgpt_attributes(self):
   1344         """Restore CGPT partition table attributes."""
   1345         current_table = self.faft_client.cgpt.get_attributes()
   1346         if current_table == self._backup_cgpt_attr:
   1347             return
   1348         logging.info('CGPT table is changed. Original: %r. Current: %r.',
   1349                      self._backup_cgpt_attr,
   1350                      current_table)
   1351         self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
   1352 
   1353         self.switcher.mode_aware_reboot()
   1354         logging.info('Successfully restored CGPT table.')
   1355 
   1356     def try_fwb(self, count=0):
   1357         """set to try booting FWB count # times
   1358 
   1359         Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
   1360         vboot2
   1361 
   1362         @param count: an integer specifying value to program into
   1363                       fwb_tries(vb1)/fw_try_next(vb2)
   1364         """
   1365         if self.fw_vboot2:
   1366             self.faft_client.system.set_fw_try_next('B', count)
   1367         else:
   1368             # vboot1: we need to boot into fwb at least once
   1369             if not count:
   1370                 count = count + 1
   1371             self.faft_client.system.set_try_fw_b(count)
   1372 
   1373