Home | History | Annotate | Download | only in enterprise_CFM_HuddlyUpdater
      1 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import re
      8 import time
      9 
     10 from autotest_lib.client.common_lib import error
     11 from autotest_lib.server import test
     12 import parse
     13 
     14 GUADO_GPIO = 218  # For Front-Left USB port
     15 POWER_RECYCLE_WAIT_TIME = 1  # sec
     16 
     17 
     18 class enterprise_CFM_HuddlyUpdater(test.test):
     19     """Tests the firmware updatability of HuddlyGo camera.
     20 
     21     An event to trigger the firmware update is to power recycle of a USB port
     22     which the HuddlyGo camera is attached to. The power recycle emulates
     23     the power recycle of the ChromeBox or a reconnection of the peripheral
     24     to the ChromeBox.
     25 
     26     The test scenario involves the power recycling of a specific USB port
     27     of the Guado ChromeBox: Front-left one. This imposes a restriction in the
     28     testbed setup. This limitation is to be alleviated after the development
     29     of full-fledged usb power recycle code. TODO(frankhu).
     30     """
     31 
     32     version = 1
     33     _failed_test_list = []
     34 
     35     UPDATER_WAIT_TIME = 60  # sec
     36 
     37     FIRMWARE_PKG_ORG = 'huddly'
     38     FIRMWARE_PKG_TO_TEST = 'huddly052'
     39     FIRMWARE_PKG_BACKUP = 'huddly.backup'
     40 
     41     DUT_FIRMWARE_BASE = '/lib/firmware/'
     42     DUT_FIRMWARE_SRC = os.path.join(DUT_FIRMWARE_BASE, FIRMWARE_PKG_ORG)
     43     DUT_FIRMWARE_SRC_BACKUP = os.path.join(DUT_FIRMWARE_BASE,
     44                                            FIRMWARE_PKG_BACKUP)
     45     DUT_FIRMWARE_SRC_TEST = os.path.join(DUT_FIRMWARE_BASE,
     46                                          FIRMWARE_PKG_TO_TEST)
     47 
     48     def initialize(self):
     49         """initialize is a stub function."""
     50         # Placeholder.
     51         pass
     52 
     53     def ls(self):
     54         """ls tracks the directories of interest."""
     55         cmd = 'ls -l /lib/firmware/ | grep huddly'
     56         result = self._shcmd(cmd)
     57 
     58     def cleanup(self):
     59         """Bring the originally bundled firmware package back."""
     60         cmd = '[ -f {} ] && rm -rf {}'.format(self.DUT_FIRMWARE_SRC,
     61                                               self.DUT_FIRMWARE_SRC)
     62         self._shcmd(cmd)
     63 
     64         cmd = 'mv {} {} && rm -rf {}'.format(self.DUT_FIRMWARE_SRC_BACKUP,
     65                                              self.DUT_FIRMWARE_SRC,
     66                                              self.DUT_FIRMWARE_SRC_TEST)
     67         self._shcmd(cmd)
     68 
     69     def _shcmd(self, cmd):
     70         """A simple wrapper for remote shell command execution."""
     71         logging.info('CMD: [%s]', cmd)
     72         result = self._client.run(cmd)
     73 
     74         # result is an object with following attributes:
     75         # ['__class__', '__delattr__', '__dict__', '__doc__', '__eq__',
     76         # '__format__', '__getattribute__', '__hash__', '__init__',
     77         # '__module__', '__new__', '__reduce__', '__reduce_ex__',
     78         # '__repr__', '__setattr__', '__sizeof__', '__str__',
     79         # '__subclasshook__', '__weakref__', 'command', 'duration',
     80         # 'exit_status', 'stderr', 'stdout']
     81         try:
     82             result = self._client.run(cmd)
     83         except:
     84             pass
     85 
     86         if result.stderr:
     87             logging.info('CMD ERR:\n' + result.stderr)
     88         logging.info('CMD OUT:\n' + result.stdout)
     89         return result
     90 
     91     def copy_firmware(self):
     92         """Copy test firmware package from server to the DUT."""
     93         current_dir = os.path.dirname(os.path.realpath(__file__))
     94         src_firmware_path = os.path.join(current_dir, self.FIRMWARE_PKG_TO_TEST)
     95         dst_firmware_path = self.DUT_FIRMWARE_BASE
     96 
     97         msg = 'copy firmware from {} to {}'.format(src_firmware_path,
     98                                                    dst_firmware_path)
     99         logging.info(msg)
    100         self._client.send_file(
    101             src_firmware_path, dst_firmware_path, delete_dest=True)
    102 
    103     def update_firmware(self, firmware_pkg):
    104         """Update the peripheral's firmware with the specified package.
    105 
    106         @param firmware_pkg: A string of package name specified by the leaf
    107                 directory name in /lib/firmware/. See class constants
    108                 DUT_FIRMWARE_SRC*.
    109         """
    110         # Set up the firmware package to test with
    111         firmware_path = os.path.join(self.DUT_FIRMWARE_BASE, firmware_pkg)
    112         cmd = 'ln -sfn {} {}'.format(firmware_path, self.DUT_FIRMWARE_SRC)
    113         self._shcmd(cmd)
    114 
    115         ver_dic = self.get_fw_vers()
    116         had = ver_dic.get('peripheral', {}).get('app', '')
    117         want = ver_dic.get('package', {}).get('app', '')
    118 
    119         msg = 'Update plan: from {} to {} with package: {}'.format(
    120             had, want, firmware_pkg)
    121         logging.info(msg)
    122 
    123         logging.info('Recycle the power to the USB port '
    124                      'to which HuddlyGo is attached.')
    125         self.usb_power_recycle()
    126         time.sleep(self.UPDATER_WAIT_TIME)
    127 
    128         got = self.get_fw_vers().get('peripheral', {}).get('app', '')
    129 
    130         msg = 'Update result: had {} want {} got {}'.format(
    131             had, want, got)
    132         logging.info(msg)
    133 
    134         if want != got:
    135             self._failed_test_list.append(
    136                 'update_firmware({})'.format(firmware_pkg))
    137 
    138     def run_once(self, host=None):
    139         """Update two times. First with test package, second with the original.
    140 
    141         Test scenario:
    142           1. Copy test firmware from the server to the DUT.
    143           2. Update with the test package. Wait about 50 sec till completion.
    144              Confirm if the peripheral is updated with the test version.
    145           3. Update with the original package. Wait about 50 sec.
    146              Confirm if the peripheral is updated with the original version.
    147         """
    148         self._client = host
    149 
    150         if not self.is_filesystem_readwrite():
    151             # Make the file system read-writable, reboot, and continue the test
    152             logging.info('DUT root file system is not read-writable. '
    153                          'Converting it read-wriable...')
    154             self.convert_rootfs_writable()
    155         else:
    156             logging.info('DUT is read-writable')
    157 
    158 
    159         try:
    160             self.ls()
    161             cmd = 'mv {} {}'.format(self.DUT_FIRMWARE_SRC,
    162                                     self.DUT_FIRMWARE_SRC_BACKUP)
    163             self._shcmd(cmd)
    164 
    165             self.ls()
    166             self.copy_firmware()
    167             self.ls()
    168             self.update_firmware(self.FIRMWARE_PKG_TO_TEST)
    169             self.ls()
    170             self.update_firmware(self.FIRMWARE_PKG_BACKUP)
    171 
    172             if self._failed_test_list:
    173               msg = 'Test failed in {}'.format(
    174                   ', '.join(map(str, self._failed_test_list)))
    175               raise error.TestFail(msg)
    176         except:
    177             pass
    178         finally:
    179             self.cleanup()
    180 
    181     def convert_rootfs_writable(self):
    182         """Remove rootfs verification on DUT, reboot,
    183         and remount the filesystem read-writable"""
    184 
    185         logging.info('Disabling rootfs verification...')
    186         self.remove_rootfs_verification()
    187 
    188         logging.info('Rebooting...')
    189         self.reboot()
    190 
    191         logging.info('Remounting..')
    192         cmd = 'mount -o remount,rw /'
    193         self._shcmd(cmd)
    194 
    195     def remove_rootfs_verification(self):
    196         """Remove rootfs verification."""
    197         # 2 & 4 are default partitions, and the system boots from one of them.
    198         # Code from chromite/scripts/deploy_chrome.py
    199         KERNEL_A_PARTITION = 2
    200         KERNEL_B_PARTITION = 4
    201 
    202         cmd_template = ('/usr/share/vboot/bin/make_dev_ssd.sh --partitions %d '
    203                '--remove_rootfs_verification --force')
    204         for partition in (KERNEL_A_PARTITION, KERNEL_B_PARTITION):
    205             cmd = cmd_template % partition
    206             self._client.run(cmd)
    207 
    208     def reboot(self):
    209         """Reboots the DUT."""
    210         self._client.reboot()
    211 
    212     def get_fw_vers(self):
    213         """Queries the firmware versions.
    214 
    215         Utilizes the output of the command 'huddly-updater --info'.
    216         It queries and parses the firmware versions of app and bootloader of
    217         firmware package and the peripheral's running firmwares, respectively.
    218 
    219         @returns a dictionary hierachically storing the firmware versions.
    220         """
    221 
    222         # TODO(porce): The updater's output is to stdout, but Auto test
    223         # command output comes to stderr. Investigate.
    224         cmd = 'huddly-updater --info --log_to=stdout'
    225         result = self._shcmd(cmd).stderr
    226         ver_dic = parse.parse_fw_vers(result)
    227         return ver_dic
    228 
    229     def usb_power_recycle(self):
    230         """Recycle the power to a USB port.
    231 
    232         # TODO(frankhu): This code supports Guado, at a specific test
    233         # configuration. Develop an independent tool to perform this task
    234         # with minimal dependency.
    235         """
    236 
    237         try:
    238             # Ignorant handling of GPIO export.
    239             cmd = 'echo {} > /sys/class/gpio/export'.format(GUADO_GPIO)
    240             self._shcmd(cmd)
    241         except error.AutoservRunError:
    242             pass
    243 
    244         cmd = 'echo out > /sys/class/gpio/gpio{}/direction'.format(GUADO_GPIO)
    245         self._shcmd(cmd)
    246         cmd = 'echo 0 > /sys/class/gpio/gpio{}/value'.format(GUADO_GPIO)
    247         self._shcmd(cmd)
    248 
    249         # Wait for 1 second to avoid too fast removal and reconnection.
    250         time.sleep(POWER_RECYCLE_WAIT_TIME)
    251         cmd = 'echo 1 > /sys/class/gpio/gpio{}/value'.format(GUADO_GPIO)
    252         self._shcmd(cmd)
    253 
    254     def is_filesystem_readwrite(self):
    255         """Check if the root file system is read-writable.
    256 
    257         Query the DUT's filesystem /dev/root, often manifested as /dev/dm-0
    258         or  is mounted as read-only or not.
    259 
    260         @returns True if the /dev/root is read-writable. False otherwise.
    261         """
    262 
    263         cmd = 'cat /proc/mounts | grep "/dev/root"'
    264         result = self._shcmd(cmd).stdout
    265         fields = re.split(' |,', result)
    266         return True if fields.__len__() >= 4 and fields[3] == 'rw' else False
    267