1 # Copyright (c) 2016 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging, os, time 6 7 from autotest_lib.client.common_lib import error 8 from autotest_lib.client.common_lib.cros import tpm_utils 9 from autotest_lib.server import test 10 from autotest_lib.server.cros.multimedia import remote_facade_factory 11 12 13 _SHORT_TIMEOUT = 2 14 _WAIT_DELAY = 15 15 _USB_DIR = '/sys/bus/usb/devices' 16 17 18 class enterprise_CFM_USBPeripheralHotplugDetect(test.test): 19 """Uses servo to hotplug and detect USB peripherals on CrOS and hotrod. It 20 compares attached audio/video peripheral names on CrOS against what hotrod 21 detects.""" 22 version = 1 23 24 25 def _set_hub_power(self, on=True): 26 """Setting USB hub power status 27 28 @param on: To power on the servo usb hub or not. 29 30 """ 31 reset = 'off' 32 if not on: 33 reset = 'on' 34 self.client.servo.set('dut_hub1_rst1', reset) 35 time.sleep(_WAIT_DELAY) 36 37 38 def _get_usb_device_dirs(self): 39 """Gets usb device dirs from _USB_DIR path. 40 41 @returns list with number of device dirs else Non 42 43 """ 44 usb_dir_list = list() 45 cmd = 'ls %s' % _USB_DIR 46 cmd_output = self.client.run(cmd).stdout.strip().split('\n') 47 for d in cmd_output: 48 usb_dir_list.append(os.path.join(_USB_DIR, d)) 49 return usb_dir_list 50 51 52 def _get_usb_device_type(self, vendor_id): 53 """Gets usb device type info from lsusb output based on vendor id. 54 55 @vendor_id: Device vendor id. 56 @returns list of device types associated with vendor id 57 58 """ 59 details_list = list() 60 cmd = 'lsusb -v -d ' + vendor_id + ': | head -150' 61 cmd_out = self.client.run(cmd).stdout.strip().split('\n') 62 for line in cmd_out: 63 if (any(phrase in line for phrase in ('bInterfaceClass', 64 'wTerminalType'))): 65 details_list.append(line.split(None)[2]) 66 67 return list(set(details_list)) 68 69 70 def _get_product_info(self, directory, prod_string): 71 """Gets the product name from device path. 72 73 @param directory: Driver path for USB device. 74 @param prod_string: Device attribute string. 75 @returns the output of the cat command 76 77 """ 78 product_file_name = os.path.join(directory, prod_string) 79 if self._file_exists_on_host(product_file_name): 80 return self.client.run('cat %s' % product_file_name).stdout.strip() 81 return None 82 83 84 def _parse_device_dir_for_info(self, dir_list, peripheral_whitelist_dict): 85 """Uses device path and vendor id to get device type attibutes. 86 87 @param dir_list: Complete list of device directories. 88 @returns cros_peripheral_dict with device names 89 90 """ 91 cros_peripheral_dict = {'Camera': None, 'Microphone': None, 92 'Speaker': None} 93 94 for d_path in dir_list: 95 file_name = os.path.join(d_path, 'idVendor') 96 if self._file_exists_on_host(file_name): 97 vendor_id = self.client.run('cat %s' % file_name).stdout.strip() 98 product_id = self._get_product_info(d_path, 'idProduct') 99 vId_pId = vendor_id + ':' + product_id 100 device_types = self._get_usb_device_type(vendor_id) 101 if vId_pId in peripheral_whitelist_dict: 102 if 'Microphone' in device_types: 103 cros_peripheral_dict['Microphone'] = ( 104 peripheral_whitelist_dict.get(vId_pId)) 105 if 'Speaker' in device_types: 106 cros_peripheral_dict['Speaker'] = ( 107 peripheral_whitelist_dict.get(vId_pId)) 108 if 'Video' in device_types: 109 cros_peripheral_dict['Camera'] = ( 110 peripheral_whitelist_dict.get(vId_pId)) 111 112 for device_type, is_found in cros_peripheral_dict.iteritems(): 113 if not is_found: 114 cros_peripheral_dict[device_type] = 'Not Found' 115 116 return cros_peripheral_dict 117 118 119 def _file_exists_on_host(self, path): 120 """Checks if file exists on host. 121 122 @param path: File path 123 @returns True or False 124 125 """ 126 return self.client.run('ls %s' % path, 127 ignore_status=True).exit_status == 0 128 129 130 def _enroll_device_and_skip_oobe(self): 131 """Enroll device into CFM and skip CFM oobe.""" 132 self.cfm_facade.enroll_device() 133 self.cfm_facade.restart_chrome_for_cfm() 134 self.cfm_facade.wait_for_telemetry_commands() 135 self.cfm_facade.wait_for_oobe_start_page() 136 137 if not self.cfm_facade.is_oobe_start_page(): 138 raise error.TestFail('CFM did not reach oobe screen.') 139 140 self.cfm_facade.skip_oobe_screen() 141 time.sleep(_SHORT_TIMEOUT) 142 143 144 def _set_preferred_peripherals(self, cros_peripherals): 145 """Set perferred peripherals. 146 147 @param cros_peripherals: Dictionary of peripherals 148 """ 149 avail_mics = self.cfm_facade.get_mic_devices() 150 avail_speakers = self.cfm_facade.get_speaker_devices() 151 avail_cameras = self.cfm_facade.get_camera_devices() 152 153 if cros_peripherals.get('Microphone') in avail_mics: 154 self.cfm_facade.set_preferred_mic( 155 cros_peripherals.get('Microphone')) 156 if cros_peripherals.get('Speaker') in avail_speakers: 157 self.cfm_facade.set_preferred_speaker( 158 cros_peripherals.get('Speaker')) 159 if cros_peripherals.get('Camera') in avail_cameras: 160 self.cfm_facade.set_preferred_camera( 161 cros_peripherals.get('Camera')) 162 163 164 def _peripheral_detection(self): 165 """Get attached peripheral information.""" 166 cfm_peripheral_dict = {'Microphone': None, 'Speaker': None, 167 'Camera': None} 168 169 cfm_peripheral_dict['Microphone'] = self.cfm_facade.get_preferred_mic() 170 cfm_peripheral_dict['Speaker'] = self.cfm_facade.get_preferred_speaker() 171 cfm_peripheral_dict['Camera'] = self.cfm_facade.get_preferred_camera() 172 173 for device_type, is_found in cfm_peripheral_dict.iteritems(): 174 if not is_found: 175 cfm_peripheral_dict[device_type] = 'Not Found' 176 177 return cfm_peripheral_dict 178 179 180 def run_once(self, host, peripheral_whitelist_dict): 181 """Main function to run autotest. 182 183 @param host: Host object representing the DUT. 184 185 """ 186 self.client = host 187 188 factory = remote_facade_factory.RemoteFacadeFactory( 189 host, no_chrome=True) 190 self.cfm_facade = factory.create_cfm_facade() 191 192 tpm_utils.ClearTPMOwnerRequest(self.client) 193 194 if self.client.servo: 195 self.client.servo.switch_usbkey('dut') 196 self.client.servo.set('usb_mux_sel3', 'dut_sees_usbkey') 197 time.sleep(_WAIT_DELAY) 198 self._set_hub_power(True) 199 200 usb_list_dir_on = self._get_usb_device_dirs() 201 202 cros_peripheral_dict = self._parse_device_dir_for_info(usb_list_dir_on, 203 peripheral_whitelist_dict) 204 logging.debug('Peripherals detected by CrOS: %s', cros_peripheral_dict) 205 206 try: 207 self._enroll_device_and_skip_oobe() 208 self._set_preferred_peripherals(cros_peripheral_dict) 209 cfm_peripheral_dict = self._peripheral_detection() 210 logging.debug('Peripherals detected by hotrod: %s', 211 cfm_peripheral_dict) 212 except Exception as e: 213 raise error.TestFail(str(e)) 214 215 tpm_utils.ClearTPMOwnerRequest(self.client) 216 217 cros_peripherals = set(cros_peripheral_dict.iteritems()) 218 cfm_peripherals = set(cfm_peripheral_dict.iteritems()) 219 220 peripheral_diff = cros_peripherals.difference(cfm_peripherals) 221 222 if peripheral_diff: 223 no_match_list = list() 224 for item in peripheral_diff: 225 no_match_list.append(item[0]) 226 raise error.TestFail('Following peripherals do not match: %s' % 227 ', '.join(no_match_list)) 228