Home | History | Annotate | Download | only in faft
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import ast
      6 import ctypes
      7 import logging
      8 import os
      9 import pprint
     10 import re
     11 import time
     12 import uuid
     13 
     14 from autotest_lib.client.bin import utils
     15 from autotest_lib.client.common_lib import error
     16 from autotest_lib.server import test
     17 from autotest_lib.server.cros import vboot_constants as vboot
     18 from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
     19 from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
     20 from autotest_lib.server.cros.faft.utils import mode_switcher
     21 from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
     22 from autotest_lib.server.cros.servo import chrome_base_ec
     23 from autotest_lib.server.cros.servo import chrome_cr50
     24 from autotest_lib.server.cros.servo import chrome_ec
     25 
     26 ConnectionError = mode_switcher.ConnectionError
     27 
     28 
     29 class FAFTBase(test.test):
     30     """The base class of FAFT classes.
     31 
     32     It launches the FAFTClient on DUT, such that the test can access its
     33     firmware functions and interfaces. It also provides some methods to
     34     handle the reboot mechanism, in order to ensure FAFTClient is still
     35     connected after reboot.
     36     """
     37     def initialize(self, host):
     38         """Create a FAFTClient object and install the dependency."""
     39         self.servo = host.servo
     40         self.servo.initialize_dut()
     41         self._client = host
     42         self.faft_client = RPCProxy(host)
     43         self.lockfile = '/var/tmp/faft/lock'
     44 
     45 
     46 class FirmwareTest(FAFTBase):
     47     """
     48     Base class that sets up helper objects/functions for firmware tests.
     49 
     50     TODO: add documentaion as the FAFT rework progresses.
     51     """
     52     version = 1
     53 
     54     # Mapping of partition number of kernel and rootfs.
     55     KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
     56     ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
     57     OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
     58     OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
     59 
     60     CHROMEOS_MAGIC = "CHROMEOS"
     61     CORRUPTED_MAGIC = "CORRUPTD"
     62 
     63     # Delay for waiting client to return before EC suspend
     64     EC_SUSPEND_DELAY = 5
     65 
     66     # Delay between EC suspend and wake
     67     WAKE_DELAY = 10
     68 
     69     # Delay between closing and opening lid
     70     LID_DELAY = 1
     71 
     72     _SERVOD_LOG = '/var/log/servod.log'
     73 
     74     _ROOTFS_PARTITION_NUMBER = 3
     75 
     76     _backup_firmware_sha = ()
     77     _backup_kernel_sha = dict()
     78     _backup_cgpt_attr = dict()
     79     _backup_gbb_flags = None
     80     _backup_dev_mode = None
     81 
     82     # Class level variable, keep track the states of one time setup.
     83     # This variable is preserved across tests which inherit this class.
     84     _global_setup_done = {
     85         'gbb_flags': False,
     86         'reimage': False,
     87         'usb_check': False,
     88     }
     89 
     90     @classmethod
     91     def check_setup_done(cls, label):
     92         """Check if the given setup is done.
     93 
     94         @param label: The label of the setup.
     95         """
     96         return cls._global_setup_done[label]
     97 
     98     @classmethod
     99     def mark_setup_done(cls, label):
    100         """Mark the given setup done.
    101 
    102         @param label: The label of the setup.
    103         """
    104         cls._global_setup_done[label] = True
    105 
    106     @classmethod
    107     def unmark_setup_done(cls, label):
    108         """Mark the given setup not done.
    109 
    110         @param label: The label of the setup.
    111         """
    112         cls._global_setup_done[label] = False
    113 
    114     def initialize(self, host, cmdline_args, ec_wp=None):
    115         super(FirmwareTest, self).initialize(host)
    116         self.run_id = str(uuid.uuid4())
    117         logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
    118         # Parse arguments from command line
    119         args = {}
    120         self.power_control = host.POWER_CONTROL_RPM
    121         for arg in cmdline_args:
    122             match = re.search("^(\w+)=(.+)", arg)
    123             if match:
    124                 args[match.group(1)] = match.group(2)
    125         if 'power_control' in args:
    126             self.power_control = args['power_control']
    127             if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
    128                 raise error.TestError('Valid values for --args=power_control '
    129                                       'are %s. But you entered wrong argument '
    130                                       'as "%s".'
    131                                        % (host.POWER_CONTROL_VALID_ARGS,
    132                                        self.power_control))
    133         self._no_ec_sync = False
    134         if 'no_ec_sync' in args:
    135             if 'true' in args['no_ec_sync'].lower():
    136                 self._no_ec_sync = True
    137 
    138         if not self.faft_client.system.dev_tpm_present():
    139             raise error.TestError('/dev/tpm0 does not exist on the client')
    140 
    141         self.faft_config = FAFTConfig(
    142                 self.faft_client.system.get_platform_name())
    143         self.checkers = FAFTCheckers(self)
    144         self.switcher = mode_switcher.create_mode_switcher(self)
    145 
    146         if self.faft_config.chrome_ec:
    147             self.ec = chrome_ec.ChromeEC(self.servo)
    148         # Check for presence of a USBPD console
    149         if self.faft_config.chrome_usbpd:
    150             self.usbpd = chrome_ec.ChromeUSBPD(self.servo)
    151         elif self.faft_config.chrome_ec:
    152             # If no separate USBPD console, then PD exists on EC console
    153             self.usbpd = self.ec
    154         # Get plankton console
    155         self.plankton = host.plankton
    156         self.plankton_host = host._plankton_host
    157 
    158         # Create the BaseEC object. None if not available.
    159         self.base_ec = chrome_base_ec.create_base_ec(self.servo)
    160 
    161         self._setup_uart_capture()
    162         self._setup_servo_log()
    163         self._record_system_info()
    164         self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
    165         logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
    166         if self.fw_vboot2:
    167             self.faft_client.system.set_fw_try_next('A')
    168             if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B':
    169                 logging.info('mainfw_act is B. rebooting to set it A')
    170                 self.switcher.mode_aware_reboot()
    171         self._setup_gbb_flags()
    172         self.faft_client.updater.stop_daemon()
    173         self._create_faft_lockfile()
    174         self._setup_ec_write_protect(ec_wp)
    175         # See chromium:239034 regarding needing this sync.
    176         self.blocking_sync()
    177         logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
    178 
    179     def cleanup(self):
    180         """Autotest cleanup function."""
    181         # Unset state checker in case it's set by subclass
    182         logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
    183         try:
    184             self.faft_client.system.is_available()
    185         except:
    186             # Remote is not responding. Revive DUT so that subsequent tests
    187             # don't fail.
    188             self._restore_routine_from_timeout()
    189         self.switcher.restore_mode()
    190         self._restore_ec_write_protect()
    191         self._restore_servo_v4_role()
    192         self._restore_gbb_flags()
    193         self.faft_client.updater.start_daemon()
    194         self.faft_client.updater.cleanup()
    195         self._remove_faft_lockfile()
    196         self._record_servo_log()
    197         self._record_faft_client_log()
    198         self._cleanup_uart_capture()
    199         super(FirmwareTest, self).cleanup()
    200         logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
    201 
    202     def _record_system_info(self):
    203         """Record some critical system info to the attr keyval.
    204 
    205         This info is used by generate_test_report later.
    206         """
    207         system_info = {
    208             'hwid': self.faft_client.system.get_crossystem_value('hwid'),
    209             'ec_version': self.faft_client.ec.get_version(),
    210             'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
    211             'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
    212             'servod_version': self._client._servo_host.run(
    213                 'servod --version').stdout.strip(),
    214             'os_version': self._client.get_release_builder_path(),
    215             'servo_type': self.servo.get_servo_version()
    216         }
    217 
    218         # Record the servo v4 and servo micro versions when possible
    219         if 'servo_micro' in system_info['servo_type']:
    220             system_info['servo_micro_version'] = self.servo.get(
    221                     'servo_micro_version')
    222 
    223         if 'servo_v4' in system_info['servo_type']:
    224             system_info['servo_v4_version'] = self.servo.get('servo_v4_version')
    225 
    226         if hasattr(self, 'cr50'):
    227             system_info['cr50_version'] = self.servo.get('cr50_version')
    228 
    229         logging.info('System info:\n%s', pprint.pformat(system_info))
    230         self.write_attr_keyval(system_info)
    231 
    232     def invalidate_firmware_setup(self):
    233         """Invalidate all firmware related setup state.
    234 
    235         This method is called when the firmware is re-flashed. It resets all
    236         firmware related setup states so that the next test setup properly
    237         again.
    238         """
    239         self.unmark_setup_done('gbb_flags')
    240 
    241     def _retrieve_recovery_reason_from_trap(self):
    242         """Try to retrieve the recovery reason from a trapped recovery screen.
    243 
    244         @return: The recovery_reason, 0 if any error.
    245         """
    246         recovery_reason = 0
    247         logging.info('Try to retrieve recovery reason...')
    248         if self.servo.get_usbkey_direction() == 'dut':
    249             self.switcher.bypass_rec_mode()
    250         else:
    251             self.servo.switch_usbkey('dut')
    252 
    253         try:
    254             self.switcher.wait_for_client()
    255             lines = self.faft_client.system.run_shell_command_get_output(
    256                         'crossystem recovery_reason')
    257             recovery_reason = int(lines[0])
    258             logging.info('Got the recovery reason %d.', recovery_reason)
    259         except ConnectionError:
    260             logging.error('Failed to get the recovery reason due to connection '
    261                           'error.')
    262         return recovery_reason
    263 
    264     def _reset_client(self):
    265         """Reset client to a workable state.
    266 
    267         This method is called when the client is not responsive. It may be
    268         caused by the following cases:
    269           - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
    270           - corrupted firmware;
    271           - corrutped OS image.
    272         """
    273         # DUT may halt on a firmware screen. Try cold reboot.
    274         logging.info('Try cold reboot...')
    275         self.switcher.mode_aware_reboot(reboot_type='cold',
    276                                         sync_before_boot=False,
    277                                         wait_for_dut_up=False)
    278         self.switcher.wait_for_client_offline()
    279         self.switcher.bypass_dev_mode()
    280         try:
    281             self.switcher.wait_for_client()
    282             return
    283         except ConnectionError:
    284             logging.warn('Cold reboot doesn\'t help, still connection error.')
    285 
    286         # DUT may be broken by a corrupted firmware. Restore firmware.
    287         # We assume the recovery boot still works fine. Since the recovery
    288         # code is in RO region and all FAFT tests don't change the RO region
    289         # except GBB.
    290         if self.is_firmware_saved():
    291             self._ensure_client_in_recovery()
    292             logging.info('Try restore the original firmware...')
    293             if self.is_firmware_changed():
    294                 try:
    295                     self.restore_firmware()
    296                     return
    297                 except ConnectionError:
    298                     logging.warn('Restoring firmware doesn\'t help, still '
    299                                  'connection error.')
    300 
    301         # Perhaps it's kernel that's broken. Let's try restoring it.
    302         if self.is_kernel_saved():
    303             self._ensure_client_in_recovery()
    304             logging.info('Try restore the original kernel...')
    305             if self.is_kernel_changed():
    306                 try:
    307                     self.restore_kernel()
    308                     return
    309                 except ConnectionError:
    310                     logging.warn('Restoring kernel doesn\'t help, still '
    311                                  'connection error.')
    312 
    313         # DUT may be broken by a corrupted OS image. Restore OS image.
    314         self._ensure_client_in_recovery()
    315         logging.info('Try restore the OS image...')
    316         self.faft_client.system.run_shell_command('chromeos-install --yes')
    317         self.switcher.mode_aware_reboot(wait_for_dut_up=False)
    318         self.switcher.wait_for_client_offline()
    319         self.switcher.bypass_dev_mode()
    320         try:
    321             self.switcher.wait_for_client()
    322             logging.info('Successfully restore OS image.')
    323             return
    324         except ConnectionError:
    325             logging.warn('Restoring OS image doesn\'t help, still connection '
    326                          'error.')
    327 
    328     def _ensure_client_in_recovery(self):
    329         """Ensure client in recovery boot; reboot into it if necessary.
    330 
    331         @raise TestError: if failed to boot the USB image.
    332         """
    333         logging.info('Try boot into USB image...')
    334         self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
    335                                      wait_for_dut_up=False)
    336         self.servo.switch_usbkey('host')
    337         self.switcher.bypass_rec_mode()
    338         try:
    339             self.switcher.wait_for_client()
    340         except ConnectionError:
    341             raise error.TestError('Failed to boot the USB image.')
    342 
    343     def _restore_routine_from_timeout(self):
    344         """A routine to try to restore the system from a timeout error.
    345 
    346         This method is called when FAFT failed to connect DUT after reboot.
    347 
    348         @raise TestFail: This exception is already raised, with a decription
    349                          why it failed.
    350         """
    351         # DUT is disconnected. Capture the UART output for debug.
    352         self._record_uart_capture()
    353 
    354         # TODO(waihong (at] chromium.org): Implement replugging the Ethernet to
    355         # identify if it is a network flaky.
    356 
    357         recovery_reason = self._retrieve_recovery_reason_from_trap()
    358 
    359         # Reset client to a workable state.
    360         self._reset_client()
    361 
    362         # Raise the proper TestFail exception.
    363         if recovery_reason:
    364             raise error.TestFail('Trapped in the recovery screen (reason: %d) '
    365                                  'and timed out' % recovery_reason)
    366         else:
    367             raise error.TestFail('Timed out waiting for DUT reboot')
    368 
    369     def assert_test_image_in_usb_disk(self, usb_dev=None):
    370         """Assert an USB disk plugged-in on servo and a test image inside.
    371 
    372         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
    373                         If None, it is detected automatically.
    374         @raise TestError: if USB disk not detected or not a test image.
    375         """
    376         if self.check_setup_done('usb_check'):
    377             return
    378         if usb_dev:
    379             assert self.servo.get_usbkey_direction() == 'host'
    380         else:
    381             self.servo.switch_usbkey('host')
    382             usb_dev = self.servo.probe_host_usb_dev()
    383             if not usb_dev:
    384                 raise error.TestError(
    385                         'An USB disk should be plugged in the servo board.')
    386 
    387         rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
    388         logging.info('usb dev is %s', usb_dev)
    389         tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
    390         self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
    391 
    392         try:
    393             usb_lsb = self.servo.system_output('cat %s' %
    394                 os.path.join(tmpd, 'etc/lsb-release'))
    395             logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
    396             dut_lsb = '\n'.join(self.faft_client.system.
    397                 run_shell_command_get_output('cat /etc/lsb-release'))
    398             logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
    399             if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
    400                 raise error.TestError('USB stick in servo is no test image')
    401             usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
    402             dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
    403             if usb_board != dut_board:
    404                 raise error.TestError('USB stick in servo contains a %s '
    405                     'image, but DUT is a %s' % (usb_board, dut_board))
    406         finally:
    407             for cmd in ('umount -l %s' % tmpd, 'sync', 'rm -rf %s' % tmpd):
    408                 self.servo.system(cmd)
    409 
    410         self.mark_setup_done('usb_check')
    411 
    412     def setup_usbkey(self, usbkey, host=None, used_for_recovery=None):
    413         """Setup the USB disk for the test.
    414 
    415         It checks the setup of USB disk and a valid ChromeOS test image inside.
    416         It also muxes the USB disk to either the host or DUT by request.
    417 
    418         @param usbkey: True if the USB disk is required for the test, False if
    419                        not required.
    420         @param host: Optional, True to mux the USB disk to host, False to mux it
    421                     to DUT, default to do nothing.
    422         @param used_for_recovery: Optional, True if the USB disk is used for
    423                                   recovery boot; False if the USB disk is not
    424                                   used for recovery boot, like Ctrl-U USB boot.
    425         """
    426         if usbkey:
    427             self.assert_test_image_in_usb_disk()
    428         elif host is None:
    429             # USB disk is not required for the test. Better to mux it to host.
    430             host = True
    431 
    432         if host is True:
    433             self.servo.switch_usbkey('host')
    434         elif host is False:
    435             self.servo.switch_usbkey('dut')
    436 
    437         if used_for_recovery is None:
    438             # Default value is True if usbkey == True.
    439             # As the common usecase of USB disk is for recovery boot. Tests
    440             # can define it explicitly if not.
    441             used_for_recovery = usbkey
    442 
    443         if used_for_recovery:
    444             # In recovery boot, the locked EC RO doesn't support PD for most
    445             # of the CrOS devices. The default servo v4 power role is a SRC.
    446             # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the
    447             # data role swap from UFP to DFP; as a result, DUT can't see the
    448             # USB disk and the Ethernet dongle on servo v4.
    449             #
    450             # This is a workaround to set servo v4 as a SNK, for every FAFT
    451             # test which boots into the USB disk in the recovery mode.
    452             #
    453             # TODO(waihong): Add a check to see if the battery level is too
    454             # low and sleep for a while for charging.
    455             self.set_servo_v4_role_to_snk()
    456 
    457     def set_servo_v4_role_to_snk(self):
    458         """Set the servo v4 role to SNK."""
    459         self._needed_restore_servo_v4_role = True
    460         self.servo.set_servo_v4_role('snk')
    461 
    462     def _restore_servo_v4_role(self):
    463         """Restore the servo v4 role to default SRC."""
    464         if not hasattr(self, '_needed_restore_servo_v4_role'):
    465             return
    466         if self._needed_restore_servo_v4_role:
    467             self.servo.set_servo_v4_role('src')
    468 
    469     def get_usbdisk_path_on_dut(self):
    470         """Get the path of the USB disk device plugged-in the servo on DUT.
    471 
    472         Returns:
    473           A string representing USB disk path, like '/dev/sdb', or None if
    474           no USB disk is found.
    475         """
    476         cmd = 'ls -d /dev/s*[a-z]'
    477         original_value = self.servo.get_usbkey_direction()
    478 
    479         # Make the dut unable to see the USB disk.
    480         self.servo.switch_usbkey('off')
    481         no_usb_set = set(
    482             self.faft_client.system.run_shell_command_get_output(cmd))
    483 
    484         # Make the dut able to see the USB disk.
    485         self.servo.switch_usbkey('dut')
    486         time.sleep(self.faft_config.usb_plug)
    487         has_usb_set = set(
    488             self.faft_client.system.run_shell_command_get_output(cmd))
    489 
    490         # Back to its original value.
    491         if original_value != self.servo.get_usbkey_direction():
    492             self.servo.switch_usbkey(original_value)
    493 
    494         diff_set = has_usb_set - no_usb_set
    495         if len(diff_set) == 1:
    496             return diff_set.pop()
    497         else:
    498             return None
    499 
    500     def _create_faft_lockfile(self):
    501         """Creates the FAFT lockfile."""
    502         logging.info('Creating FAFT lockfile...')
    503         command = 'touch %s' % (self.lockfile)
    504         self.faft_client.system.run_shell_command(command)
    505 
    506     def _remove_faft_lockfile(self):
    507         """Removes the FAFT lockfile."""
    508         logging.info('Removing FAFT lockfile...')
    509         command = 'rm -f %s' % (self.lockfile)
    510         self.faft_client.system.run_shell_command(command)
    511 
    512     def clear_set_gbb_flags(self, clear_mask, set_mask):
    513         """Clear and set the GBB flags in the current flashrom.
    514 
    515         @param clear_mask: A mask of flags to be cleared.
    516         @param set_mask: A mask of flags to be set.
    517         """
    518         gbb_flags = self.faft_client.bios.get_gbb_flags()
    519         new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
    520         self.gbb_flags = new_flags
    521         if new_flags != gbb_flags:
    522             self._backup_gbb_flags = gbb_flags
    523             logging.info('Changing GBB flags from 0x%x to 0x%x.',
    524                          gbb_flags, new_flags)
    525             self.faft_client.bios.set_gbb_flags(new_flags)
    526             # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag,
    527             # reboot to get a clear state
    528             if ((gbb_flags ^ new_flags) &
    529                 (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
    530                  vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)):
    531                 self.switcher.mode_aware_reboot()
    532         else:
    533             logging.info('Current GBB flags look good for test: 0x%x.',
    534                          gbb_flags)
    535 
    536     def check_ec_capability(self, required_cap=None, suppress_warning=False):
    537         """Check if current platform has required EC capabilities.
    538 
    539         @param required_cap: A list containing required EC capabilities. Pass in
    540                              None to only check for presence of Chrome EC.
    541         @param suppress_warning: True to suppress any warning messages.
    542         @return: True if requirements are met. Otherwise, False.
    543         """
    544         if not self.faft_config.chrome_ec:
    545             if not suppress_warning:
    546                 logging.warn('Requires Chrome EC to run this test.')
    547             return False
    548 
    549         if not required_cap:
    550             return True
    551 
    552         for cap in required_cap:
    553             if cap not in self.faft_config.ec_capability:
    554                 if not suppress_warning:
    555                     logging.warn('Requires EC capability "%s" to run this '
    556                                  'test.', cap)
    557                 return False
    558 
    559         return True
    560 
    561     def check_root_part_on_non_recovery(self, part):
    562         """Check the partition number of root device and on normal/dev boot.
    563 
    564         @param part: A string of partition number, e.g.'3'.
    565         @return: True if the root device matched and on normal/dev boot;
    566                  otherwise, False.
    567         """
    568         return self.checkers.root_part_checker(part) and \
    569                 self.checkers.crossystem_checker({
    570                     'mainfw_type': ('normal', 'developer'),
    571                 })
    572 
    573     def _join_part(self, dev, part):
    574         """Return a concatenated string of device and partition number.
    575 
    576         @param dev: A string of device, e.g.'/dev/sda'.
    577         @param part: A string of partition number, e.g.'3'.
    578         @return: A concatenated string of device and partition number,
    579                  e.g.'/dev/sda3'.
    580 
    581         >>> seq = FirmwareTest()
    582         >>> seq._join_part('/dev/sda', '3')
    583         '/dev/sda3'
    584         >>> seq._join_part('/dev/mmcblk0', '2')
    585         '/dev/mmcblk0p2'
    586         """
    587         if 'mmcblk' in dev:
    588             return dev + 'p' + part
    589         elif 'nvme' in dev:
    590             return dev + 'p' + part
    591         else:
    592             return dev + part
    593 
    594     def copy_kernel_and_rootfs(self, from_part, to_part):
    595         """Copy kernel and rootfs from from_part to to_part.
    596 
    597         @param from_part: A string of partition number to be copied from.
    598         @param to_part: A string of partition number to be copied to.
    599         """
    600         root_dev = self.faft_client.system.get_root_dev()
    601         logging.info('Copying kernel from %s to %s. Please wait...',
    602                      from_part, to_part)
    603         self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
    604                 (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
    605                  self._join_part(root_dev, self.KERNEL_MAP[to_part])))
    606         logging.info('Copying rootfs from %s to %s. Please wait...',
    607                      from_part, to_part)
    608         self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
    609                 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
    610                  self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
    611 
    612     def ensure_kernel_boot(self, part):
    613         """Ensure the request kernel boot.
    614 
    615         If not, it duplicates the current kernel to the requested kernel
    616         and sets the requested higher priority to ensure it boot.
    617 
    618         @param part: A string of kernel partition number or 'a'/'b'.
    619         """
    620         if not self.checkers.root_part_checker(part):
    621             if self.faft_client.kernel.diff_a_b():
    622                 self.copy_kernel_and_rootfs(
    623                         from_part=self.OTHER_KERNEL_MAP[part],
    624                         to_part=part)
    625             self.reset_and_prioritize_kernel(part)
    626             self.switcher.mode_aware_reboot()
    627 
    628     def ensure_dev_internal_boot(self, original_dev_boot_usb):
    629         """Ensure internal device boot in developer mode.
    630 
    631         If not internal device boot, it will try to reboot the device and
    632         bypass dev mode to boot into internal device.
    633 
    634         @param original_dev_boot_usb: Original dev_boot_usb value.
    635         """
    636         logging.info('Checking internal device boot.')
    637         if self.faft_client.system.is_removable_device_boot():
    638             logging.info('Reboot into internal disk...')
    639             self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb)
    640             self.switcher.mode_aware_reboot()
    641         self.check_state((self.checkers.dev_boot_usb_checker, False,
    642                           'Device not booted from internal disk properly.'))
    643 
    644     def set_hardware_write_protect(self, enable):
    645         """Set hardware write protect pin.
    646 
    647         @param enable: True if asserting write protect pin. Otherwise, False.
    648         """
    649         self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
    650 
    651     def set_ec_write_protect_and_reboot(self, enable):
    652         """Set EC write protect status and reboot to take effect.
    653 
    654         The write protect state is only activated if both hardware write
    655         protect pin is asserted and software write protect flag is set.
    656         This method asserts/deasserts hardware write protect pin first, and
    657         set corresponding EC software write protect flag.
    658 
    659         If the device uses non-Chrome EC, set the software write protect via
    660         flashrom.
    661 
    662         If the device uses Chrome EC, a reboot is required for write protect
    663         to take effect. Since the software write protect flag cannot be unset
    664         if hardware write protect pin is asserted, we need to deasserted the
    665         pin first if we are deactivating write protect. Similarly, a reboot
    666         is required before we can modify the software flag.
    667 
    668         @param enable: True if activating EC write protect. Otherwise, False.
    669         """
    670         self.set_hardware_write_protect(enable)
    671         if self.faft_config.chrome_ec:
    672             self.set_chrome_ec_write_protect_and_reboot(enable)
    673         else:
    674             self.faft_client.ec.set_write_protect(enable)
    675             self.switcher.mode_aware_reboot()
    676 
    677     def set_chrome_ec_write_protect_and_reboot(self, enable):
    678         """Set Chrome EC write protect status and reboot to take effect.
    679 
    680         @param enable: True if activating EC write protect. Otherwise, False.
    681         """
    682         if enable:
    683             # Set write protect flag and reboot to take effect.
    684             self.ec.set_flash_write_protect(enable)
    685             self.sync_and_ec_reboot()
    686         else:
    687             # Reboot after deasserting hardware write protect pin to deactivate
    688             # write protect. And then remove software write protect flag.
    689             self.sync_and_ec_reboot()
    690             self.ec.set_flash_write_protect(enable)
    691 
    692     def _setup_ec_write_protect(self, ec_wp):
    693         """Setup for EC write-protection.
    694 
    695         It makes sure the EC in the requested write-protection state. If not, it
    696         flips the state. Flipping the write-protection requires DUT reboot.
    697 
    698         @param ec_wp: True to request EC write-protected; False to request EC
    699                       not write-protected; None to do nothing.
    700         """
    701         if ec_wp is None:
    702             self._old_wpsw_boot = None
    703             return
    704         self._old_wpsw_cur = self.checkers.crossystem_checker(
    705                                     {'wpsw_cur': '1'}, suppress_logging=True)
    706         self._old_wpsw_boot = self.checkers.crossystem_checker(
    707                                    {'wpsw_boot': '1'}, suppress_logging=True)
    708         if not (ec_wp == self._old_wpsw_cur == self._old_wpsw_boot):
    709             if not self.faft_config.ap_access_ec_flash:
    710                 raise error.TestNAError(
    711                         "Cannot change EC write-protect for this device")
    712 
    713             logging.info('The test required EC is %swrite-protected. Reboot '
    714                          'and flip the state.', '' if ec_wp else 'not ')
    715             self.switcher.mode_aware_reboot(
    716                     'custom',
    717                      lambda:self.set_ec_write_protect_and_reboot(ec_wp))
    718         wpsw_boot = wpsw_cur = '1' if ec_wp == True else '0'
    719         self.check_state((self.checkers.crossystem_checker, {
    720                                'wpsw_boot': wpsw_boot, 'wpsw_cur': wpsw_cur}))
    721 
    722     def _restore_ec_write_protect(self):
    723         """Restore the original EC write-protection."""
    724         if (not hasattr(self, '_old_wpsw_boot')) or (self._old_wpsw_boot is
    725                                                      None):
    726             return
    727         if not self.checkers.crossystem_checker({'wpsw_boot': '1' if
    728                        self._old_wpsw_boot else '0'}, suppress_logging=True):
    729             logging.info('Restore original EC write protection and reboot.')
    730             self.switcher.mode_aware_reboot(
    731                     'custom',
    732                     lambda:self.set_ec_write_protect_and_reboot(
    733                             self._old_wpsw_boot))
    734         self.check_state((self.checkers.crossystem_checker, {
    735                            'wpsw_boot': '1' if self._old_wpsw_boot else '0'}))
    736 
    737     def _setup_uart_capture(self):
    738         """Setup the CPU/EC/PD UART capture."""
    739         self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
    740         self.servo.set('cpu_uart_capture', 'on')
    741         self.cr50_uart_file = None
    742         self.ec_uart_file = None
    743         self.servo_micro_uart_file = None
    744         self.servo_v4_uart_file = None
    745         self.usbpd_uart_file = None
    746         try:
    747             # Check that the console works before declaring the cr50 console
    748             # connection exists and enabling uart capture.
    749             self.servo.get('cr50_version')
    750             self.servo.set('cr50_uart_capture', 'on')
    751             self.cr50_uart_file = os.path.join(self.resultsdir, 'cr50_uart.txt')
    752             self.cr50 = chrome_cr50.ChromeCr50(self.servo)
    753         except error.TestFail as e:
    754             if 'No control named' in str(e):
    755                 logging.warn('cr50 console not supported.')
    756         if self.faft_config.chrome_ec:
    757             try:
    758                 self.servo.set('ec_uart_capture', 'on')
    759                 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
    760             except error.TestFail as e:
    761                 if 'No control named' in str(e):
    762                     logging.warn('The servod is too old that ec_uart_capture '
    763                                  'not supported.')
    764             # Log separate PD console if supported
    765             if self.check_ec_capability(['usbpd_uart'], suppress_warning=True):
    766                 try:
    767                     self.servo.set('usbpd_uart_capture', 'on')
    768                     self.usbpd_uart_file = os.path.join(self.resultsdir,
    769                                                         'usbpd_uart.txt')
    770                 except error.TestFail as e:
    771                     if 'No control named' in str(e):
    772                         logging.warn('The servod is too old that '
    773                                      'usbpd_uart_capture is not supported.')
    774         else:
    775             logging.info('Not a Google EC, cannot capture ec console output.')
    776         try:
    777             self.servo.set('servo_micro_uart_capture', 'on')
    778             self.servo_micro_uart_file = os.path.join(self.resultsdir,
    779                                                       'servo_micro_uart.txt')
    780         except error.TestFail as e:
    781             if 'No control named' in str(e):
    782                 logging.warn('servo micro console not supported.')
    783         try:
    784             self.servo.set('servo_v4_uart_capture', 'on')
    785             self.servo_v4_uart_file = os.path.join(self.resultsdir,
    786                                                    'servo_v4_uart.txt')
    787         except error.TestFail as e:
    788             if 'No control named' in str(e):
    789                 logging.warn('servo v4 console not supported.')
    790 
    791     def _record_uart_capture(self):
    792         """Record the CPU/EC/PD UART output stream to files."""
    793         if self.cpu_uart_file:
    794             with open(self.cpu_uart_file, 'a') as f:
    795                 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
    796         if self.cr50_uart_file:
    797             with open(self.cr50_uart_file, 'a') as f:
    798                 f.write(ast.literal_eval(self.servo.get('cr50_uart_stream')))
    799         if self.ec_uart_file and self.faft_config.chrome_ec:
    800             with open(self.ec_uart_file, 'a') as f:
    801                 f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
    802         if self.servo_micro_uart_file:
    803             with open(self.servo_micro_uart_file, 'a') as f:
    804                 f.write(ast.literal_eval(self.servo.get(
    805                         'servo_micro_uart_stream')))
    806         if self.servo_v4_uart_file:
    807             with open(self.servo_v4_uart_file, 'a') as f:
    808                 f.write(ast.literal_eval(self.servo.get(
    809                         'servo_v4_uart_stream')))
    810         if (self.usbpd_uart_file and self.faft_config.chrome_ec and
    811             self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
    812             with open(self.usbpd_uart_file, 'a') as f:
    813                 f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream')))
    814 
    815     def _cleanup_uart_capture(self):
    816         """Cleanup the CPU/EC/PD UART capture."""
    817         # Flush the remaining UART output.
    818         self._record_uart_capture()
    819         self.servo.set('cpu_uart_capture', 'off')
    820         if self.cr50_uart_file:
    821             self.servo.set('cr50_uart_capture', 'off')
    822         if self.ec_uart_file and self.faft_config.chrome_ec:
    823             self.servo.set('ec_uart_capture', 'off')
    824         if self.servo_micro_uart_file:
    825             self.servo.set('servo_micro_uart_capture', 'off')
    826         if self.servo_v4_uart_file:
    827             self.servo.set('servo_v4_uart_capture', 'off')
    828         if (self.usbpd_uart_file and self.faft_config.chrome_ec and
    829             self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
    830             self.servo.set('usbpd_uart_capture', 'off')
    831 
    832     def _get_power_state(self, power_state):
    833         """
    834         Return the current power state of the AP
    835         """
    836         return self.ec.send_command_get_output("powerinfo", [power_state])
    837 
    838     def wait_power_state(self, power_state, retries):
    839         """
    840         Wait for certain power state.
    841 
    842         @param power_state: power state you are expecting
    843         @param retries: retries.  This is necessary if AP is powering down
    844         and transitioning through different states.
    845         """
    846         logging.info('Checking power state "%s" maximum %d times.',
    847                      power_state, retries)
    848         while retries > 0:
    849             logging.info("try count: %d", retries)
    850             try:
    851                 retries = retries - 1
    852                 ret = self._get_power_state(power_state)
    853                 return True
    854             except error.TestFail:
    855                 pass
    856         return False
    857 
    858     def suspend(self):
    859         """Suspends the DUT."""
    860         cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY
    861         self.faft_client.system.run_shell_command(cmd)
    862         time.sleep(self.EC_SUSPEND_DELAY)
    863 
    864     def _fetch_servo_log(self):
    865         """Fetch the servo log."""
    866         cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
    867         servo_log = self.servo.system_output(cmd)
    868         return None if servo_log == 'NOTFOUND' else servo_log
    869 
    870     def _setup_servo_log(self):
    871         """Setup the servo log capturing."""
    872         self.servo_log_original_len = -1
    873         if self.servo.is_localhost():
    874             # No servo log recorded when servod runs locally.
    875             return
    876 
    877         servo_log = self._fetch_servo_log()
    878         if servo_log:
    879             self.servo_log_original_len = len(servo_log)
    880         else:
    881             logging.warn('Servo log file not found.')
    882 
    883     def _record_servo_log(self):
    884         """Record the servo log to the results directory."""
    885         if self.servo_log_original_len != -1:
    886             servo_log = self._fetch_servo_log()
    887             servo_log_file = os.path.join(self.resultsdir, 'servod.log')
    888             with open(servo_log_file, 'a') as f:
    889                 f.write(servo_log[self.servo_log_original_len:])
    890 
    891     def _record_faft_client_log(self):
    892         """Record the faft client log to the results directory."""
    893         client_log = self.faft_client.system.dump_log(True)
    894         client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
    895         with open(client_log_file, 'w') as f:
    896             f.write(client_log)
    897 
    898     def _setup_gbb_flags(self):
    899         """Setup the GBB flags for FAFT test."""
    900         if self.check_setup_done('gbb_flags'):
    901             return
    902 
    903         logging.info('Set proper GBB flags for test.')
    904         # Ensure that GBB flags are set to 0x140.
    905         flags_to_set = (vboot.GBB_FLAG_FAFT_KEY_OVERIDE |
    906                         vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM)
    907         # And if the "no_ec_sync" argument is set, then disable EC software
    908         # sync.
    909         if self._no_ec_sync:
    910             flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC
    911 
    912         self.clear_set_gbb_flags(0xffffffff, flags_to_set)
    913         self.mark_setup_done('gbb_flags')
    914 
    915     def drop_backup_gbb_flags(self):
    916         """Drops the backup GBB flags.
    917 
    918         This can be used when a test intends to permanently change GBB flags.
    919         """
    920         self._backup_gbb_flags = None
    921 
    922     def _restore_gbb_flags(self):
    923         """Restore GBB flags to their original state."""
    924         if self._backup_gbb_flags is None:
    925             return
    926         # Setting up and restoring the GBB flags take a lot of time. For
    927         # speed-up purpose, don't restore it.
    928         logging.info('***')
    929         logging.info('*** Please manually restore the original GBB flags to: '
    930                      '0x%x ***', self._backup_gbb_flags)
    931         logging.info('***')
    932         self.unmark_setup_done('gbb_flags')
    933 
    934     def setup_tried_fwb(self, tried_fwb):
    935         """Setup for fw B tried state.
    936 
    937         It makes sure the system in the requested fw B tried state. If not, it
    938         tries to do so.
    939 
    940         @param tried_fwb: True if requested in tried_fwb=1;
    941                           False if tried_fwb=0.
    942         """
    943         if tried_fwb:
    944             if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
    945                 logging.info(
    946                     'Firmware is not booted with tried_fwb. Reboot into it.')
    947                 self.faft_client.system.set_try_fw_b()
    948         else:
    949             if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
    950                 logging.info(
    951                     'Firmware is booted with tried_fwb. Reboot to clear.')
    952 
    953     def power_on(self):
    954         """Switch DUT AC power on."""
    955         self._client.power_on(self.power_control)
    956 
    957     def power_off(self):
    958         """Switch DUT AC power off."""
    959         self._client.power_off(self.power_control)
    960 
    961     def power_cycle(self):
    962         """Power cycle DUT AC power."""
    963         self._client.power_cycle(self.power_control)
    964 
    965     def setup_rw_boot(self, section='a'):
    966         """Make sure firmware is in RW-boot mode.
    967 
    968         If the given firmware section is in RO-boot mode, turn off the RO-boot
    969         flag and reboot DUT into RW-boot mode.
    970 
    971         @param section: A firmware section, either 'a' or 'b'.
    972         """
    973         flags = self.faft_client.bios.get_preamble_flags(section)
    974         if flags & vboot.PREAMBLE_USE_RO_NORMAL:
    975             flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
    976             self.faft_client.bios.set_preamble_flags(section, flags)
    977             self.switcher.mode_aware_reboot()
    978 
    979     def setup_kernel(self, part):
    980         """Setup for kernel test.
    981 
    982         It makes sure both kernel A and B bootable and the current boot is
    983         the requested kernel part.
    984 
    985         @param part: A string of kernel partition number or 'a'/'b'.
    986         """
    987         self.ensure_kernel_boot(part)
    988         logging.info('Checking the integrity of kernel B and rootfs B...')
    989         if (self.faft_client.kernel.diff_a_b() or
    990                 not self.faft_client.rootfs.verify_rootfs('B')):
    991             logging.info('Copying kernel and rootfs from A to B...')
    992             self.copy_kernel_and_rootfs(from_part=part,
    993                                         to_part=self.OTHER_KERNEL_MAP[part])
    994         self.reset_and_prioritize_kernel(part)
    995 
    996     def reset_and_prioritize_kernel(self, part):
    997         """Make the requested partition highest priority.
    998 
    999         This function also reset kerenl A and B to bootable.
   1000 
   1001         @param part: A string of partition number to be prioritized.
   1002         """
   1003         root_dev = self.faft_client.system.get_root_dev()
   1004         # Reset kernel A and B to bootable.
   1005         self.faft_client.system.run_shell_command(
   1006             'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
   1007         self.faft_client.system.run_shell_command(
   1008             'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
   1009         # Set kernel part highest priority.
   1010         self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
   1011                 (self.KERNEL_MAP[part], root_dev))
   1012 
   1013     def do_blocking_sync(self, device):
   1014         """Run a blocking sync command."""
   1015         logging.info("Blocking sync for %s", device)
   1016         if 'mmcblk' in device:
   1017             # For mmc devices, use `mmc status get` command to send an
   1018             # empty command to wait for the disk to be available again.
   1019             self.faft_client.system.run_shell_command('mmc status get %s' %
   1020                                                       device)
   1021         elif 'nvme' in device:
   1022             # Get a list of NVMe namespace and flush them individually
   1023             # Assumes the output format from nvme list-ns command will
   1024             # be something like follows:
   1025             # [ 0]:0x1
   1026             # [ 1]:0x2
   1027             available_ns = self.faft_client.system.run_shell_command_get_output(
   1028                                                   'nvme list-ns %s -a' % device)
   1029             for ns in available_ns:
   1030                 ns = ns.split(':')[-1]
   1031                 # For NVMe devices, use `nvme flush` command to commit data
   1032                 # and metadata to non-volatile media.
   1033                 self.faft_client.system.run_shell_command(
   1034                                  'nvme flush %s -n %s' % (device, ns))
   1035         else:
   1036             # For other devices, hdparm sends TUR to check if
   1037             # a device is ready for transfer operation.
   1038             self.faft_client.system.run_shell_command('hdparm -f %s' % device)
   1039 
   1040     def blocking_sync(self):
   1041         """Sync root device and internal device."""
   1042         # The double calls to sync fakes a blocking call
   1043         # since the first call returns before the flush
   1044         # is complete, but the second will wait for the
   1045         # first to finish.
   1046         self.faft_client.system.run_shell_command('sync')
   1047         self.faft_client.system.run_shell_command('sync')
   1048 
   1049         # sync only sends SYNCHRONIZE_CACHE but doesn't check the status.
   1050         # This function will perform a device-specific sync command.
   1051         root_dev = self.faft_client.system.get_root_dev()
   1052         self.do_blocking_sync(root_dev)
   1053 
   1054         # Also sync the internal device if booted from removable media.
   1055         if self.faft_client.system.is_removable_device_boot():
   1056             internal_dev = self.faft_client.system.get_internal_device()
   1057             self.do_blocking_sync(internal_dev)
   1058 
   1059     def sync_and_ec_reboot(self, flags=''):
   1060         """Request the client sync and do a EC triggered reboot.
   1061 
   1062         @param flags: Optional, a space-separated string of flags passed to EC
   1063                       reboot command, including:
   1064                           default: EC soft reboot;
   1065                           'hard': EC cold/hard reboot.
   1066         """
   1067         self.blocking_sync()
   1068         self.ec.reboot(flags)
   1069         time.sleep(self.faft_config.ec_boot_to_console)
   1070         self.check_lid_and_power_on()
   1071 
   1072     def reboot_and_reset_tpm(self):
   1073         """Reboot into recovery mode, reset TPM, then reboot back to disk."""
   1074         self.switcher.reboot_to_mode(to_mode='rec')
   1075         self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
   1076         self.switcher.mode_aware_reboot()
   1077 
   1078     def full_power_off_and_on(self):
   1079         """Shutdown the device by pressing power button and power on again."""
   1080         boot_id = self.get_bootid()
   1081         # Press power button to trigger Chrome OS normal shutdown process.
   1082         # We use a customized delay since the normal-press 1.2s is not enough.
   1083         self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
   1084         # device can take 44-51 seconds to restart,
   1085         # add buffer from the default timeout of 60 seconds.
   1086         self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
   1087         time.sleep(self.faft_config.shutdown)
   1088         if self.check_ec_capability(['x86'], suppress_warning=True):
   1089             self.check_shutdown_power_state("G3", pwr_retries=5)
   1090         # Short press power button to boot DUT again.
   1091         self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
   1092 
   1093     def check_shutdown_power_state(self, power_state, pwr_retries):
   1094         """Check whether the device entered into requested EC power state
   1095         after shutdown.
   1096 
   1097         @param power_state: EC power state has to be checked. Either S5 or G3.
   1098         @param pwr_retries: Times to check if the DUT in expected power state.
   1099         @raise TestFail: If device failed to enter into requested power state.
   1100         """
   1101         if not self.wait_power_state(power_state, pwr_retries):
   1102             raise error.TestFail('System not shutdown properly and EC fails '
   1103                                  'to enter into %s state.' % power_state)
   1104         logging.info('System entered into %s state..', power_state)
   1105 
   1106     def check_lid_and_power_on(self):
   1107         """
   1108         On devices with EC software sync, system powers on after EC reboots if
   1109         lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
   1110         This method checks lid switch state and presses power button if
   1111         necessary.
   1112         """
   1113         if self.servo.get("lid_open") == "no":
   1114             time.sleep(self.faft_config.software_sync)
   1115             self.servo.power_short_press()
   1116 
   1117     def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
   1118         """Modify the kernel header magic in USB stick.
   1119 
   1120         The kernel header magic is the first 8-byte of kernel partition.
   1121         We modify it to make it fail on kernel verification check.
   1122 
   1123         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
   1124         @param from_magic: A string of magic which we change it from.
   1125         @param to_magic: A string of magic which we change it to.
   1126         @raise TestError: if failed to change magic.
   1127         """
   1128         assert len(from_magic) == 8
   1129         assert len(to_magic) == 8
   1130         # USB image only contains one kernel.
   1131         kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
   1132         read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
   1133         current_magic = self.servo.system_output(read_cmd)
   1134         if current_magic == to_magic:
   1135             logging.info("The kernel magic is already %s.", current_magic)
   1136             return
   1137         if current_magic != from_magic:
   1138             raise error.TestError("Invalid kernel image on USB: wrong magic.")
   1139 
   1140         logging.info('Modify the kernel magic in USB, from %s to %s.',
   1141                      from_magic, to_magic)
   1142         write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
   1143                      " 2>/dev/null" % (to_magic, kernel_part))
   1144         self.servo.system(write_cmd)
   1145 
   1146         if self.servo.system_output(read_cmd) != to_magic:
   1147             raise error.TestError("Failed to write new magic.")
   1148 
   1149     def corrupt_usb_kernel(self, usb_dev):
   1150         """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
   1151 
   1152         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
   1153         """
   1154         self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
   1155                                 self.CORRUPTED_MAGIC)
   1156 
   1157     def restore_usb_kernel(self, usb_dev):
   1158         """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
   1159 
   1160         @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
   1161         """
   1162         self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
   1163                                 self.CHROMEOS_MAGIC)
   1164 
   1165     def _call_action(self, action_tuple, check_status=False):
   1166         """Call the action function with/without arguments.
   1167 
   1168         @param action_tuple: A function, or a tuple (function, args, error_msg),
   1169                              in which, args and error_msg are optional. args is
   1170                              either a value or a tuple if multiple arguments.
   1171                              This can also be a list containing multiple
   1172                              function or tuple. In this case, these actions are
   1173                              called in sequence.
   1174         @param check_status: Check the return value of action function. If not
   1175                              succeed, raises a TestFail exception.
   1176         @return: The result value of the action function.
   1177         @raise TestError: An error when the action function is not callable.
   1178         @raise TestFail: When check_status=True, action function not succeed.
   1179         """
   1180         if isinstance(action_tuple, list):
   1181             return all([self._call_action(action, check_status=check_status)
   1182                         for action in action_tuple])
   1183 
   1184         action = action_tuple
   1185         args = ()
   1186         error_msg = 'Not succeed'
   1187         if isinstance(action_tuple, tuple):
   1188             action = action_tuple[0]
   1189             if len(action_tuple) >= 2:
   1190                 args = action_tuple[1]
   1191                 if not isinstance(args, tuple):
   1192                     args = (args,)
   1193             if len(action_tuple) >= 3:
   1194                 error_msg = action_tuple[2]
   1195 
   1196         if action is None:
   1197             return
   1198 
   1199         if not callable(action):
   1200             raise error.TestError('action is not callable!')
   1201 
   1202         info_msg = 'calling %s' % action.__name__
   1203         if args:
   1204             info_msg += ' with args %s' % str(args)
   1205         logging.info(info_msg)
   1206         ret = action(*args)
   1207 
   1208         if check_status and not ret:
   1209             raise error.TestFail('%s: %s returning %s' %
   1210                                  (error_msg, info_msg, str(ret)))
   1211         return ret
   1212 
   1213     def run_shutdown_process(self, shutdown_action, pre_power_action=None,
   1214                              run_power_action=True, post_power_action=None,
   1215                              shutdown_timeout=None):
   1216         """Run shutdown_action(), which makes DUT shutdown, and power it on.
   1217 
   1218         @param shutdown_action: function which makes DUT shutdown, like
   1219                                 pressing power key.
   1220         @param pre_power_action: function which is called before next power on.
   1221         @param run_power_action: power_key press by default, set to None to skip.
   1222         @param post_power_action: function which is called after next power on.
   1223         @param shutdown_timeout: a timeout to confirm DUT shutdown.
   1224         @raise TestFail: if the shutdown_action() failed to turn DUT off.
   1225         """
   1226         self._call_action(shutdown_action)
   1227         logging.info('Wait to ensure DUT shut down...')
   1228         try:
   1229             if shutdown_timeout is None:
   1230                 shutdown_timeout = self.faft_config.shutdown_timeout
   1231             self.switcher.wait_for_client(timeout=shutdown_timeout)
   1232             raise error.TestFail(
   1233                     'Should shut the device down after calling %s.' %
   1234                     shutdown_action.__name__)
   1235         except ConnectionError:
   1236             if self.check_ec_capability(['x86'], suppress_warning=True):
   1237                 self.check_shutdown_power_state("G3", pwr_retries=5)
   1238             logging.info(
   1239                 'DUT is surely shutdown. We are going to power it on again...')
   1240 
   1241         if pre_power_action:
   1242             self._call_action(pre_power_action)
   1243         if run_power_action:
   1244             self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
   1245         if post_power_action:
   1246             self._call_action(post_power_action)
   1247 
   1248     def get_bootid(self, retry=3):
   1249         """
   1250         Return the bootid.
   1251         """
   1252         boot_id = None
   1253         while retry:
   1254             try:
   1255                 boot_id = self._client.get_boot_id()
   1256                 break
   1257             except error.AutoservRunError:
   1258                 retry -= 1
   1259                 if retry:
   1260                     logging.info('Retry to get boot_id...')
   1261                 else:
   1262                     logging.warning('Failed to get boot_id.')
   1263         logging.info('boot_id: %s', boot_id)
   1264         return boot_id
   1265 
   1266     def check_state(self, func):
   1267         """
   1268         Wrapper around _call_action with check_status set to True. This is a
   1269         helper function to be used by tests and is currently implemented by
   1270         calling _call_action with check_status=True.
   1271 
   1272         TODO: This function's arguments need to be made more stringent. And
   1273         its functionality should be moved over to check functions directly in
   1274         the future.
   1275 
   1276         @param func: A function, or a tuple (function, args, error_msg),
   1277                              in which, args and error_msg are optional. args is
   1278                              either a value or a tuple if multiple arguments.
   1279                              This can also be a list containing multiple
   1280                              function or tuple. In this case, these actions are
   1281                              called in sequence.
   1282         @return: The result value of the action function.
   1283         @raise TestFail: If the function does notsucceed.
   1284         """
   1285         logging.info("-[FAFT]-[ start stepstate_checker ]----------")
   1286         self._call_action(func, check_status=True)
   1287         logging.info("-[FAFT]-[ end state_checker ]----------------")
   1288 
   1289     def get_current_firmware_sha(self):
   1290         """Get current firmware sha of body and vblock.
   1291 
   1292         @return: Current firmware sha follows the order (
   1293                  vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
   1294         """
   1295         current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
   1296                                 self.faft_client.bios.get_body_sha('a'),
   1297                                 self.faft_client.bios.get_sig_sha('b'),
   1298                                 self.faft_client.bios.get_body_sha('b'))
   1299         if not all(current_firmware_sha):
   1300             raise error.TestError('Failed to get firmware sha.')
   1301         return current_firmware_sha
   1302 
   1303     def is_firmware_changed(self):
   1304         """Check if the current firmware changed, by comparing its SHA.
   1305 
   1306         @return: True if it is changed, otherwise Flase.
   1307         """
   1308         # Device may not be rebooted after test.
   1309         self.faft_client.bios.reload()
   1310 
   1311         current_sha = self.get_current_firmware_sha()
   1312 
   1313         if current_sha == self._backup_firmware_sha:
   1314             return False
   1315         else:
   1316             corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
   1317             corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
   1318             corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
   1319             corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
   1320             logging.info('Firmware changed:')
   1321             logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
   1322             logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
   1323             logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
   1324             logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
   1325             return True
   1326 
   1327     def backup_firmware(self, suffix='.original'):
   1328         """Backup firmware to file, and then send it to host.
   1329 
   1330         @param suffix: a string appended to backup file name
   1331         """
   1332         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1333         remote_bios_path = os.path.join(remote_temp_dir, 'bios')
   1334         self.faft_client.bios.dump_whole(remote_bios_path)
   1335         self._client.get_file(remote_bios_path,
   1336                               os.path.join(self.resultsdir, 'bios' + suffix))
   1337 
   1338         if self.faft_config.chrome_ec:
   1339             remote_ec_path = os.path.join(remote_temp_dir, 'ec')
   1340             self.faft_client.ec.dump_whole(remote_ec_path)
   1341             self._client.get_file(remote_ec_path,
   1342                               os.path.join(self.resultsdir, 'ec' + suffix))
   1343 
   1344         self._client.run('rm -rf %s' % remote_temp_dir)
   1345         logging.info('Backup firmware stored in %s with suffix %s',
   1346             self.resultsdir, suffix)
   1347 
   1348         self._backup_firmware_sha = self.get_current_firmware_sha()
   1349 
   1350     def is_firmware_saved(self):
   1351         """Check if a firmware saved (called backup_firmware before).
   1352 
   1353         @return: True if the firmware is backuped; otherwise False.
   1354         """
   1355         return self._backup_firmware_sha != ()
   1356 
   1357     def clear_saved_firmware(self):
   1358         """Clear the firmware saved by the method backup_firmware."""
   1359         self._backup_firmware_sha = ()
   1360 
   1361     def restore_firmware(self, suffix='.original', restore_ec=True):
   1362         """Restore firmware from host in resultsdir.
   1363 
   1364         @param suffix: a string appended to backup file name
   1365         @param restore_ec: True to restore the ec firmware; False not to do.
   1366         """
   1367         if not self.is_firmware_changed():
   1368             return
   1369 
   1370         # Backup current corrupted firmware.
   1371         self.backup_firmware(suffix='.corrupt')
   1372 
   1373         # Restore firmware.
   1374         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1375         self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
   1376                                os.path.join(remote_temp_dir, 'bios'))
   1377 
   1378         self.faft_client.bios.write_whole(
   1379             os.path.join(remote_temp_dir, 'bios'))
   1380 
   1381         if self.faft_config.chrome_ec and restore_ec:
   1382             self._client.send_file(os.path.join(self.resultsdir, 'ec' + suffix),
   1383                 os.path.join(remote_temp_dir, 'ec'))
   1384             self.faft_client.ec.write_whole(
   1385                 os.path.join(remote_temp_dir, 'ec'))
   1386 
   1387         self.switcher.mode_aware_reboot()
   1388         logging.info('Successfully restore firmware.')
   1389 
   1390     def setup_firmwareupdate_shellball(self, shellball=None):
   1391         """Setup a shellball to use in firmware update test.
   1392 
   1393         Check if there is a given shellball, and it is a shell script. Then,
   1394         send it to the remote host. Otherwise, use the
   1395         /usr/sbin/chromeos-firmwareupdate in the image and replace its inside
   1396         BIOS and EC images with the active firmware images.
   1397 
   1398         @param shellball: path of a shellball or default to None.
   1399         """
   1400         if shellball:
   1401             # Determine the firmware file is a shellball or a raw binary.
   1402             is_shellball = (utils.system_output("file %s" % shellball).find(
   1403                     "shell script") != -1)
   1404             if is_shellball:
   1405                 logging.info('Device will update firmware with shellball %s',
   1406                              shellball)
   1407                 temp_path = self.faft_client.updater.get_temp_path()
   1408                 working_shellball = os.path.join(temp_path,
   1409                                                  'chromeos-firmwareupdate')
   1410                 self._client.send_file(shellball, working_shellball)
   1411                 self.faft_client.updater.extract_shellball()
   1412             else:
   1413                 raise error.TestFail(
   1414                     'The given shellball is not a shell script.')
   1415         else:
   1416             logging.info('No shellball given, use the original shellball and '
   1417                          'replace its BIOS and EC images.')
   1418             work_path = self.faft_client.updater.get_work_path()
   1419             bios_in_work_path = os.path.join(
   1420                 work_path, self.faft_client.updater.get_bios_relative_path())
   1421             ec_in_work_path = os.path.join(
   1422                 work_path, self.faft_client.updater.get_ec_relative_path())
   1423             logging.info('Writing current BIOS to: %s', bios_in_work_path)
   1424             self.faft_client.bios.dump_whole(bios_in_work_path)
   1425             if self.faft_config.chrome_ec:
   1426                 logging.info('Writing current EC to: %s', ec_in_work_path)
   1427                 self.faft_client.ec.dump_firmware(ec_in_work_path)
   1428             self.faft_client.updater.repack_shellball()
   1429 
   1430     def is_kernel_changed(self):
   1431         """Check if the current kernel is changed, by comparing its SHA1 hash.
   1432 
   1433         @return: True if it is changed; otherwise, False.
   1434         """
   1435         changed = False
   1436         for p in ('A', 'B'):
   1437             backup_sha = self._backup_kernel_sha.get(p, None)
   1438             current_sha = self.faft_client.kernel.get_sha(p)
   1439             if backup_sha != current_sha:
   1440                 changed = True
   1441                 logging.info('Kernel %s is changed', p)
   1442         return changed
   1443 
   1444     def backup_kernel(self, suffix='.original'):
   1445         """Backup kernel to files, and the send them to host.
   1446 
   1447         @param suffix: a string appended to backup file name.
   1448         """
   1449         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1450         for p in ('A', 'B'):
   1451             remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
   1452             self.faft_client.kernel.dump(p, remote_path)
   1453             self._client.get_file(
   1454                     remote_path,
   1455                     os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
   1456             self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
   1457         logging.info('Backup kernel stored in %s with suffix %s',
   1458             self.resultsdir, suffix)
   1459 
   1460     def is_kernel_saved(self):
   1461         """Check if kernel images are saved (backup_kernel called before).
   1462 
   1463         @return: True if the kernel is saved; otherwise, False.
   1464         """
   1465         return len(self._backup_kernel_sha) != 0
   1466 
   1467     def clear_saved_kernel(self):
   1468         """Clear the kernel saved by backup_kernel()."""
   1469         self._backup_kernel_sha = dict()
   1470 
   1471     def restore_kernel(self, suffix='.original'):
   1472         """Restore kernel from host in resultsdir.
   1473 
   1474         @param suffix: a string appended to backup file name.
   1475         """
   1476         if not self.is_kernel_changed():
   1477             return
   1478 
   1479         # Backup current corrupted kernel.
   1480         self.backup_kernel(suffix='.corrupt')
   1481 
   1482         # Restore kernel.
   1483         remote_temp_dir = self.faft_client.system.create_temp_dir()
   1484         for p in ('A', 'B'):
   1485             remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
   1486             self._client.send_file(
   1487                     os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
   1488                     remote_path)
   1489             self.faft_client.kernel.write(p, remote_path)
   1490 
   1491         self.switcher.mode_aware_reboot()
   1492         logging.info('Successfully restored kernel.')
   1493 
   1494     def backup_cgpt_attributes(self):
   1495         """Backup CGPT partition table attributes."""
   1496         self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
   1497 
   1498     def restore_cgpt_attributes(self):
   1499         """Restore CGPT partition table attributes."""
   1500         current_table = self.faft_client.cgpt.get_attributes()
   1501         if current_table == self._backup_cgpt_attr:
   1502             return
   1503         logging.info('CGPT table is changed. Original: %r. Current: %r.',
   1504                      self._backup_cgpt_attr,
   1505                      current_table)
   1506         self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
   1507 
   1508         self.switcher.mode_aware_reboot()
   1509         logging.info('Successfully restored CGPT table.')
   1510 
   1511     def try_fwb(self, count=0):
   1512         """set to try booting FWB count # times
   1513 
   1514         Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
   1515         vboot2
   1516 
   1517         @param count: an integer specifying value to program into
   1518                       fwb_tries(vb1)/fw_try_next(vb2)
   1519         """
   1520         if self.fw_vboot2:
   1521             self.faft_client.system.set_fw_try_next('B', count)
   1522         else:
   1523             # vboot1: we need to boot into fwb at least once
   1524             if not count:
   1525                 count = count + 1
   1526             self.faft_client.system.set_try_fw_b(count)
   1527 
   1528