1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging, os, re, time, random 6 7 from autotest_lib.client.bin import utils 8 from autotest_lib.server import test 9 from autotest_lib.client.common_lib import error 10 from autotest_lib.server.cros.usb_mux_controller import USBMuxController 11 12 _WAIT_DELAY = 5 13 _USB_DIR = '/sys/bus/usb/devices' 14 MAX_PORTS = 8 15 TMP_FAILED_TEST_LIST = list() 16 PORT_BEING_TESTED = list() 17 18 class kernel_ExternalUsbPeripheralsDetectionStress(test.test): 19 """Uses USB multiplexer to repeatedly connect and disconnect USB devices.""" 20 version = 1 21 22 23 def set_hub_power(self, on=True): 24 """Setting USB hub power status 25 26 @param on: To power on the servo-usb hub or not. 27 28 """ 29 reset = 'off' 30 if not on: 31 reset = 'on' 32 self.host.servo.set('dut_hub1_rst1', reset) 33 time.sleep(_WAIT_DELAY) 34 35 36 def check_manufacturer_and_product_info( 37 self, vId, pId, manufacturer, pName): 38 """Check manufacturer and product info from lsusb against dict values. 39 40 @param vId: Vendor id of the connected USB device. 41 @param pId: Product id of the connected USB device. 42 @param manufacturer: Manufacturer name of the connected USB device. 43 @param pName: Product name of the connected USB device 44 @param result: To track test result. 45 @return result value 46 47 """ 48 result = True 49 manu_cmd = ('lsusb -v -d ' + vId + ':' + pId + ' | grep iManufacturer') 50 prod_cmd = ('lsusb -v -d ' + vId + ':' + pId + ' | grep iProduct') 51 52 manu_cmd_output = (self.host.run(manu_cmd, ignore_status=True). 53 stdout.strip()) 54 prod_cmd_output = (self.host.run(prod_cmd, ignore_status=True). 55 stdout.strip()) 56 57 manu_verify = 'iManufacturer.*' + manufacturer 58 prod_verify = 'iProduct.*' + pName 59 60 match_result_manu = re.search(manu_verify, manu_cmd_output) != None 61 match_result_prod = re.search(prod_verify, prod_cmd_output) != None 62 63 if not match_result_manu or not match_result_prod: 64 logging.debug('Manufacturer or productName do not match.') 65 result = False 66 67 return result 68 69 70 def check_driver_symlink_and_dir(self, devicePath, productName): 71 """Check driver symlink and dir against devicePath value from dict. 72 73 @param devicePath: Device driver path. 74 @param productName: Product name of the connected USB device. 75 @param result: To track test result. 76 @return result value 77 78 """ 79 result = True 80 tmp_list = [device_dir for device_dir in 81 self.host.run('ls -1 %s' % devicePath, 82 ignore_status=True).stdout.split('\n') 83 if re.match(r'\d-\d.*:\d\.\d', device_dir)] 84 85 if not tmp_list: 86 logging.debug('No driver created/loaded for %s', productName) 87 result = False 88 89 flag = False 90 for device_dir in tmp_list: 91 driver_path = os.path.join(devicePath, 92 '%s/driver' % device_dir) 93 if self._exists_on(driver_path): 94 flag = True 95 link = (self.host.run('ls -l %s | grep ^l' 96 '| grep driver' 97 % driver_path, ignore_status=True) 98 .stdout.strip()) 99 logging.info('%s', link) 100 101 if not flag: 102 logging.debug('Device driver not found') 103 result = False 104 105 return result 106 107 108 def check_usb_peripherals_details(self, connected_device_dict): 109 """Checks USB peripheral details against the values in dictionary. 110 111 @param connected_device_dict: Dictionary of device attributes. 112 113 """ 114 result = True 115 usbPort = connected_device_dict['usb_port'] 116 PORT_BEING_TESTED.append(usbPort) 117 self.usb_mux.enable_port(usbPort) 118 119 vendorId = connected_device_dict['deviceInfo']['vendorId'] 120 productId = connected_device_dict['deviceInfo']['productId'] 121 manufacturer = connected_device_dict['deviceInfo']['manufacturer'] 122 productName = connected_device_dict['deviceInfo']['productName'] 123 lsusbOutput = connected_device_dict['deviceInfo']['lsusb'] 124 devicePath = connected_device_dict['deviceInfo']['devicePath'] 125 126 try: 127 utils.poll_for_condition( 128 lambda: self.host.path_exists(devicePath), 129 exception=utils.TimeoutError('Trouble finding USB device ' 130 'on port %d' % usbPort), timeout=15, sleep_interval=1) 131 except utils.TimeoutError: 132 logging.debug('Trouble finding USB device on port %d', usbPort) 133 result = False 134 pass 135 136 if not ((vendorId + ':' + productId in lsusbOutput) and 137 self.check_manufacturer_and_product_info( 138 vendorId, productId, manufacturer, productName) and 139 self.check_driver_symlink_and_dir(devicePath, productName)): 140 result = False 141 142 self.usb_mux.disable_all_ports() 143 try: 144 utils.poll_for_condition( 145 lambda: not self.host.path_exists(devicePath), 146 exception=utils.TimeoutError('Device driver path does not ' 147 'disappear after device is disconnected.'), timeout=15, 148 sleep_interval=1) 149 except utils.TimeoutError: 150 logging.debug('Device driver path is still present after device is ' 151 'disconnected from the DUT.') 152 result = False 153 pass 154 155 if result is False: 156 logging.debug('Test failed on port %s.', usbPort) 157 TMP_FAILED_TEST_LIST.append(usbPort) 158 159 160 def get_usb_device_dirs(self): 161 """Gets the usb device dirs from _USB_DIR path. 162 163 @returns list with number of device dirs else None 164 165 """ 166 usb_dir_list = list() 167 cmd = 'ls -1 %s' % _USB_DIR 168 cmd_output = self.host.run(cmd).stdout.strip().split('\n') 169 for d in cmd_output: 170 usb_dir_list.append(os.path.join(_USB_DIR, d)) 171 return usb_dir_list 172 173 174 def parse_device_dir_for_info(self, dir_list, usb_device_dict): 175 """Uses vendorId/device path and to get other device attributes. 176 177 @param dir_list: Complete path of directories. 178 @param usb_device_dict: Dictionary to store device attributes. 179 @returns usb_device_dict with device attributes 180 181 """ 182 for d_path in dir_list: 183 file_name = os.path.join(d_path, 'idVendor') 184 if self._exists_on(file_name): 185 vendor_id = self.host.run('cat %s' % file_name).stdout.strip() 186 if vendor_id: 187 usb_device_dict['deviceInfo']['vendorId'] = vendor_id 188 usb_device_dict['deviceInfo']['devicePath'] = d_path 189 usb_device_dict['deviceInfo']['productId'] = ( 190 self.get_product_info(d_path, 'idProduct')) 191 usb_device_dict['deviceInfo']['productName'] = ( 192 self.get_product_info(d_path, 'product')) 193 usb_device_dict['deviceInfo']['manufacturer'] = ( 194 self.get_product_info(d_path, 'manufacturer')) 195 return usb_device_dict 196 197 198 def get_product_info(self, directory, prod_string): 199 """Gets the product id, name and manufacturer info from device path. 200 201 @param directory: Driver path for the USB device. 202 @param prod_string: Device attribute string. 203 returns the output of the cat command 204 205 """ 206 product_file_name = os.path.join(directory, prod_string) 207 if self._exists_on(product_file_name): 208 return self.host.run('cat %s' % product_file_name).stdout.strip() 209 return None 210 211 212 def _exists_on(self, path): 213 """Checks if file exists on host or not. 214 215 @returns True or False 216 """ 217 return self.host.run('ls %s' % path, 218 ignore_status=True).exit_status == 0 219 220 221 def check_lsusb_diff(self, original_lsusb_output): 222 """Compare LSUSB output for before and after connecting each device. 223 224 @param original_lsusb_output: lsusb output prior to connecting device. 225 @returns the difference between new and old lsusb outputs 226 227 """ 228 lsusb_output = (self.host.run('lsusb', ignore_status=True). 229 stdout.strip().split('\n')) 230 lsusb_diff = (list(set(lsusb_output). 231 difference(set(original_lsusb_output)))) 232 return lsusb_diff 233 234 235 def run_once(self, host, loop_count): 236 """Main function to run the autotest. 237 238 @param host: Host object representing the DUT. 239 @param loop_count: Number of iteration cycles. 240 @raise error.TestFail if one or more USB devices are not detected 241 242 """ 243 self.host = host 244 self.usb_mux = USBMuxController(self.host) 245 246 # Make sure all USB ports are disabled prior to starting the test. 247 self.usb_mux.disable_all_ports() 248 249 self.host.servo.switch_usbkey('dut') 250 self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey') 251 252 self.set_hub_power(False) 253 # Collect the USB devices directories before switching on hub 254 usb_list_dir_off = self.get_usb_device_dirs() 255 256 self.set_hub_power(True) 257 # Collect the USB devices directories after switching on hub 258 usb_list_dir_on = self.get_usb_device_dirs() 259 260 lsusb_original_out = (self.host.run('lsusb', ignore_status=True). 261 stdout.strip().split('\n')) 262 list_of_usb_device_dictionaries = list() 263 usb_port = 0 264 265 # Extract connected USB device information and store it in a dict. 266 while usb_port < MAX_PORTS: 267 usb_device_dict = {'usb_port':None,'deviceInfo': 268 {'devicePath':None,'vendorId':None,'productId':None, 269 'productName':None,'manufacturer':None,'lsusb':None}} 270 usb_device_dir_list = list() 271 self.usb_mux.enable_port(usb_port) 272 try: 273 utils.poll_for_condition( 274 lambda: self.check_lsusb_diff(lsusb_original_out), 275 exception=utils.TimeoutError('No USB device on port ' 276 '%d' % usb_port), timeout=_WAIT_DELAY, sleep_interval=1) 277 except utils.TimeoutError: 278 logging.debug('No USB device found on port %d', usb_port) 279 pass 280 281 # Maintain list of associated dirs for each connected USB device 282 for device in self.get_usb_device_dirs(): 283 if device not in usb_list_dir_on: 284 usb_device_dir_list.append(device) 285 286 usb_device_dict = self.parse_device_dir_for_info( 287 usb_device_dir_list, usb_device_dict) 288 289 lsusb_diff = self.check_lsusb_diff(lsusb_original_out) 290 if lsusb_diff: 291 usb_device_dict['usb_port'] = usb_port 292 usb_device_dict['deviceInfo']['lsusb'] = lsusb_diff[0] 293 list_of_usb_device_dictionaries.append(usb_device_dict) 294 295 self.usb_mux.disable_all_ports() 296 try: 297 utils.poll_for_condition( 298 lambda: not self.check_lsusb_diff(lsusb_original_out), 299 exception=utils.TimeoutError('Timed out waiting for ' 300 'USB device to disappear.'), timeout=_WAIT_DELAY, 301 sleep_interval=1) 302 except utils.TimeoutError: 303 logging.debug('Timed out waiting for USB device to disappear.') 304 pass 305 logging.info('%s', usb_device_dict) 306 usb_port += 1 307 308 if len(list_of_usb_device_dictionaries) == 0: 309 # Fails if no devices detected 310 raise error.TestError('No connected devices were detected. Make ' 311 'sure the devices are connected to USB_KEY ' 312 'and DUT_HUB1_USB on the servo board.') 313 logging.info('Connected devices list: %s', 314 list_of_usb_device_dictionaries) 315 316 # loop_count defines the number of times the USB peripheral details 317 # should be checked and random.choice is used to randomly select one of 318 # the elements from the usb device list specifying the device whose 319 # details should be checked. 320 for i in xrange(loop_count): 321 self.check_usb_peripherals_details( 322 random.choice(list_of_usb_device_dictionaries)) 323 324 logging.info('Sequence of ports tested with random picker: %s', 325 ', '.join(map(str, PORT_BEING_TESTED))) 326 327 if TMP_FAILED_TEST_LIST: 328 logging.info('Failed to verify devices on following ports: %s', 329 ', '.join(map(str, TMP_FAILED_TEST_LIST))) 330 raise error.TestFail('Failed to do full device verification on ' 331 'some ports.') 332