Home | History | Annotate | Download | only in enterprise_CFM_USBPeripheralHotplugDetect
      1 # Copyright (c) 2016 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging, os, time
      6 
      7 from autotest_lib.client.common_lib import error
      8 from autotest_lib.client.common_lib.cros import tpm_utils
      9 from autotest_lib.server import test
     10 from autotest_lib.server.cros.multimedia import remote_facade_factory
     11 
     12 
     13 _SHORT_TIMEOUT = 2
     14 _WAIT_DELAY = 15
     15 _USB_DIR = '/sys/bus/usb/devices'
     16 
     17 
     18 class enterprise_CFM_USBPeripheralHotplugDetect(test.test):
     19     """Uses servo to hotplug and detect USB peripherals on CrOS and hotrod. It
     20     compares attached audio/video peripheral names on CrOS against what hotrod
     21     detects."""
     22     version = 1
     23 
     24 
     25     def _set_hub_power(self, on=True):
     26         """Setting USB hub power status
     27 
     28         @param on: To power on the servo usb hub or not.
     29 
     30         """
     31         reset = 'off'
     32         if not on:
     33             reset = 'on'
     34         self.client.servo.set('dut_hub1_rst1', reset)
     35         time.sleep(_WAIT_DELAY)
     36 
     37 
     38     def _get_usb_device_dirs(self):
     39         """Gets usb device dirs from _USB_DIR path.
     40 
     41         @returns list with number of device dirs else Non
     42 
     43         """
     44         usb_dir_list = list()
     45         cmd = 'ls %s' % _USB_DIR
     46         cmd_output = self.client.run(cmd).stdout.strip().split('\n')
     47         for d in cmd_output:
     48             usb_dir_list.append(os.path.join(_USB_DIR, d))
     49         return usb_dir_list
     50 
     51 
     52     def _get_usb_device_type(self, vendor_id):
     53         """Gets usb device type info from lsusb output based on vendor id.
     54 
     55         @vendor_id: Device vendor id.
     56         @returns list of device types associated with vendor id
     57 
     58         """
     59         details_list = list()
     60         cmd = 'lsusb -v -d ' + vendor_id + ': | head -150'
     61         cmd_out = self.client.run(cmd).stdout.strip().split('\n')
     62         for line in cmd_out:
     63             if (any(phrase in line for phrase in ('bInterfaceClass',
     64                     'wTerminalType'))):
     65                 details_list.append(line.split(None)[2])
     66 
     67         return list(set(details_list))
     68 
     69 
     70     def _get_product_info(self, directory, prod_string):
     71         """Gets the product name from device path.
     72 
     73         @param directory: Driver path for USB device.
     74         @param prod_string: Device attribute string.
     75         @returns the output of the cat command
     76 
     77         """
     78         product_file_name = os.path.join(directory, prod_string)
     79         if self._file_exists_on_host(product_file_name):
     80             return self.client.run('cat %s' % product_file_name).stdout.strip()
     81         return None
     82 
     83 
     84     def _parse_device_dir_for_info(self, dir_list, peripheral_whitelist_dict):
     85         """Uses device path and vendor id to get device type attibutes.
     86 
     87         @param dir_list: Complete list of device directories.
     88         @returns cros_peripheral_dict with device names
     89 
     90         """
     91         cros_peripheral_dict = {'Camera': None, 'Microphone': None,
     92                                 'Speaker': None}
     93 
     94         for d_path in dir_list:
     95             file_name = os.path.join(d_path, 'idVendor')
     96             if self._file_exists_on_host(file_name):
     97                 vendor_id = self.client.run('cat %s' % file_name).stdout.strip()
     98                 product_id = self._get_product_info(d_path, 'idProduct')
     99                 vId_pId = vendor_id + ':' + product_id
    100                 device_types = self._get_usb_device_type(vendor_id)
    101                 if vId_pId in peripheral_whitelist_dict:
    102                     if 'Microphone' in device_types:
    103                         cros_peripheral_dict['Microphone'] = (
    104                                 peripheral_whitelist_dict.get(vId_pId))
    105                     if 'Speaker' in device_types:
    106                         cros_peripheral_dict['Speaker'] = (
    107                                 peripheral_whitelist_dict.get(vId_pId))
    108                     if 'Video' in device_types:
    109                         cros_peripheral_dict['Camera'] = (
    110                                 peripheral_whitelist_dict.get(vId_pId))
    111 
    112         for device_type, is_found in cros_peripheral_dict.iteritems():
    113             if not is_found:
    114                 cros_peripheral_dict[device_type] = 'Not Found'
    115 
    116         return cros_peripheral_dict
    117 
    118 
    119     def _file_exists_on_host(self, path):
    120         """Checks if file exists on host.
    121 
    122         @param path: File path
    123         @returns True or False
    124 
    125         """
    126         return self.client.run('ls %s' % path,
    127                              ignore_status=True).exit_status == 0
    128 
    129 
    130     def _enroll_device_and_skip_oobe(self):
    131         """Enroll device into CFM and skip CFM oobe."""
    132         self.cfm_facade.enroll_device()
    133         self.cfm_facade.restart_chrome_for_cfm()
    134         self.cfm_facade.wait_for_telemetry_commands()
    135         self.cfm_facade.wait_for_oobe_start_page()
    136 
    137         if not self.cfm_facade.is_oobe_start_page():
    138             raise error.TestFail('CFM did not reach oobe screen.')
    139 
    140         self.cfm_facade.skip_oobe_screen()
    141         time.sleep(_SHORT_TIMEOUT)
    142 
    143 
    144     def _set_preferred_peripherals(self, cros_peripherals):
    145         """Set perferred peripherals.
    146 
    147         @param cros_peripherals: Dictionary of peripherals
    148         """
    149         avail_mics = self.cfm_facade.get_mic_devices()
    150         avail_speakers = self.cfm_facade.get_speaker_devices()
    151         avail_cameras = self.cfm_facade.get_camera_devices()
    152 
    153         if cros_peripherals.get('Microphone') in avail_mics:
    154             self.cfm_facade.set_preferred_mic(
    155                     cros_peripherals.get('Microphone'))
    156         if cros_peripherals.get('Speaker') in avail_speakers:
    157             self.cfm_facade.set_preferred_speaker(
    158                     cros_peripherals.get('Speaker'))
    159         if cros_peripherals.get('Camera') in avail_cameras:
    160             self.cfm_facade.set_preferred_camera(
    161                     cros_peripherals.get('Camera'))
    162 
    163 
    164     def _peripheral_detection(self):
    165         """Get attached peripheral information."""
    166         cfm_peripheral_dict = {'Microphone': None, 'Speaker': None,
    167                                'Camera': None}
    168 
    169         cfm_peripheral_dict['Microphone'] = self.cfm_facade.get_preferred_mic()
    170         cfm_peripheral_dict['Speaker'] = self.cfm_facade.get_preferred_speaker()
    171         cfm_peripheral_dict['Camera'] = self.cfm_facade.get_preferred_camera()
    172 
    173         for device_type, is_found in cfm_peripheral_dict.iteritems():
    174             if not is_found:
    175                 cfm_peripheral_dict[device_type] = 'Not Found'
    176 
    177         return cfm_peripheral_dict
    178 
    179 
    180     def run_once(self, host, peripheral_whitelist_dict):
    181         """Main function to run autotest.
    182 
    183         @param host: Host object representing the DUT.
    184 
    185         """
    186         self.client = host
    187 
    188         factory = remote_facade_factory.RemoteFacadeFactory(
    189                 host, no_chrome=True)
    190         self.cfm_facade = factory.create_cfm_facade()
    191 
    192         tpm_utils.ClearTPMOwnerRequest(self.client)
    193 
    194         if self.client.servo:
    195             self.client.servo.switch_usbkey('dut')
    196             self.client.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
    197             time.sleep(_WAIT_DELAY)
    198             self._set_hub_power(True)
    199 
    200         usb_list_dir_on = self._get_usb_device_dirs()
    201 
    202         cros_peripheral_dict = self._parse_device_dir_for_info(usb_list_dir_on,
    203                 peripheral_whitelist_dict)
    204         logging.debug('Peripherals detected by CrOS: %s', cros_peripheral_dict)
    205 
    206         try:
    207             self._enroll_device_and_skip_oobe()
    208             self._set_preferred_peripherals(cros_peripheral_dict)
    209             cfm_peripheral_dict = self._peripheral_detection()
    210             logging.debug('Peripherals detected by hotrod: %s',
    211                           cfm_peripheral_dict)
    212         except Exception as e:
    213             raise error.TestFail(str(e))
    214 
    215         tpm_utils.ClearTPMOwnerRequest(self.client)
    216 
    217         cros_peripherals = set(cros_peripheral_dict.iteritems())
    218         cfm_peripherals = set(cfm_peripheral_dict.iteritems())
    219 
    220         peripheral_diff = cros_peripherals.difference(cfm_peripherals)
    221 
    222         if peripheral_diff:
    223             no_match_list = list()
    224             for item in peripheral_diff:
    225                 no_match_list.append(item[0])
    226             raise error.TestFail('Following peripherals do not match: %s' %
    227                                  ', '.join(no_match_list))
    228