Home | History | Annotate | Download | only in kernel_ExternalUsbPeripheralsDetectionStress
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging, os, re, time, random
      6 
      7 from autotest_lib.client.bin import utils
      8 from autotest_lib.server import test
      9 from autotest_lib.client.common_lib import error
     10 from autotest_lib.server.cros.usb_mux_controller import USBMuxController
     11 
     12 _WAIT_DELAY = 5
     13 _USB_DIR = '/sys/bus/usb/devices'
     14 MAX_PORTS = 8
     15 TMP_FAILED_TEST_LIST = list()
     16 PORT_BEING_TESTED = list()
     17 
     18 class kernel_ExternalUsbPeripheralsDetectionStress(test.test):
     19     """Uses USB multiplexer to repeatedly connect and disconnect USB devices."""
     20     version = 1
     21 
     22 
     23     def set_hub_power(self, on=True):
     24         """Setting USB hub power status
     25 
     26         @param on: To power on the servo-usb hub or not.
     27 
     28         """
     29         reset = 'off'
     30         if not on:
     31             reset = 'on'
     32         self.host.servo.set('dut_hub1_rst1', reset)
     33         time.sleep(_WAIT_DELAY)
     34 
     35 
     36     def check_manufacturer_and_product_info(
     37             self, vId, pId, manufacturer, pName):
     38         """Check manufacturer and product info from lsusb against dict values.
     39 
     40         @param vId: Vendor id of the connected USB device.
     41         @param pId: Product id of the connected USB device.
     42         @param manufacturer: Manufacturer name of the connected USB device.
     43         @param pName: Product name of the connected USB device
     44         @param result: To track test result.
     45         @return result value
     46 
     47         """
     48         result = True
     49         manu_cmd = ('lsusb -v -d ' + vId + ':' +  pId + ' | grep iManufacturer')
     50         prod_cmd = ('lsusb -v -d ' + vId + ':' +  pId + ' | grep iProduct')
     51 
     52         manu_cmd_output = (self.host.run(manu_cmd, ignore_status=True).
     53                            stdout.strip())
     54         prod_cmd_output = (self.host.run(prod_cmd, ignore_status=True).
     55                            stdout.strip())
     56 
     57         manu_verify = 'iManufacturer.*' + manufacturer
     58         prod_verify = 'iProduct.*' + pName
     59 
     60         match_result_manu = re.search(manu_verify, manu_cmd_output) != None
     61         match_result_prod = re.search(prod_verify, prod_cmd_output) != None
     62 
     63         if not match_result_manu or not match_result_prod:
     64             logging.debug('Manufacturer or productName do not match.')
     65             result = False
     66 
     67         return result
     68 
     69 
     70     def check_driver_symlink_and_dir(self, devicePath, productName):
     71         """Check driver symlink and dir against devicePath value from dict.
     72 
     73         @param devicePath: Device driver path.
     74         @param productName: Product name of the connected USB device.
     75         @param result: To track test result.
     76         @return result value
     77 
     78         """
     79         result = True
     80         tmp_list = [device_dir for device_dir in
     81                     self.host.run('ls -1 %s' % devicePath,
     82                     ignore_status=True).stdout.split('\n')
     83                     if re.match(r'\d-\d.*:\d\.\d', device_dir)]
     84 
     85         if not tmp_list:
     86             logging.debug('No driver created/loaded for %s', productName)
     87             result = False
     88 
     89         flag = False
     90         for device_dir in tmp_list:
     91             driver_path = os.path.join(devicePath,
     92                                        '%s/driver' % device_dir)
     93             if self._exists_on(driver_path):
     94                 flag = True
     95                 link = (self.host.run('ls -l %s | grep ^l'
     96                                       '| grep driver'
     97                                       % driver_path, ignore_status=True)
     98                                       .stdout.strip())
     99                 logging.info('%s', link)
    100 
    101         if not flag:
    102             logging.debug('Device driver not found')
    103             result = False
    104 
    105         return result
    106 
    107 
    108     def check_usb_peripherals_details(self, connected_device_dict):
    109         """Checks USB peripheral details against the values in dictionary.
    110 
    111         @param connected_device_dict: Dictionary of device attributes.
    112 
    113         """
    114         result = True
    115         usbPort = connected_device_dict['usb_port']
    116         PORT_BEING_TESTED.append(usbPort)
    117         self.usb_mux.enable_port(usbPort)
    118 
    119         vendorId = connected_device_dict['deviceInfo']['vendorId']
    120         productId = connected_device_dict['deviceInfo']['productId']
    121         manufacturer = connected_device_dict['deviceInfo']['manufacturer']
    122         productName = connected_device_dict['deviceInfo']['productName']
    123         lsusbOutput = connected_device_dict['deviceInfo']['lsusb']
    124         devicePath = connected_device_dict['deviceInfo']['devicePath']
    125 
    126         try:
    127             utils.poll_for_condition(
    128                     lambda: self.host.path_exists(devicePath),
    129                     exception=utils.TimeoutError('Trouble finding USB device '
    130                     'on port %d' % usbPort), timeout=15, sleep_interval=1)
    131         except utils.TimeoutError:
    132             logging.debug('Trouble finding USB device on port %d', usbPort)
    133             result = False
    134             pass
    135 
    136         if not ((vendorId + ':' + productId in lsusbOutput) and
    137                 self.check_manufacturer_and_product_info(
    138                 vendorId, productId, manufacturer, productName) and
    139                 self.check_driver_symlink_and_dir(devicePath, productName)):
    140             result = False
    141 
    142         self.usb_mux.disable_all_ports()
    143         try:
    144             utils.poll_for_condition(
    145                     lambda: not self.host.path_exists(devicePath),
    146                     exception=utils.TimeoutError('Device driver path does not '
    147                     'disappear after device is disconnected.'), timeout=15,
    148                     sleep_interval=1)
    149         except utils.TimeoutError:
    150             logging.debug('Device driver path is still present after device is '
    151                          'disconnected from the DUT.')
    152             result = False
    153             pass
    154 
    155         if result is False:
    156             logging.debug('Test failed on port %s.', usbPort)
    157             TMP_FAILED_TEST_LIST.append(usbPort)
    158 
    159 
    160     def get_usb_device_dirs(self):
    161         """Gets the usb device dirs from _USB_DIR path.
    162 
    163         @returns list with number of device dirs else None
    164 
    165         """
    166         usb_dir_list = list()
    167         cmd = 'ls -1 %s' % _USB_DIR
    168         cmd_output = self.host.run(cmd).stdout.strip().split('\n')
    169         for d in cmd_output:
    170             usb_dir_list.append(os.path.join(_USB_DIR, d))
    171         return usb_dir_list
    172 
    173 
    174     def parse_device_dir_for_info(self, dir_list, usb_device_dict):
    175         """Uses vendorId/device path and to get other device attributes.
    176 
    177         @param dir_list: Complete path of directories.
    178         @param usb_device_dict: Dictionary to store device attributes.
    179         @returns usb_device_dict with device attributes
    180 
    181         """
    182         for d_path in dir_list:
    183             file_name = os.path.join(d_path, 'idVendor')
    184             if self._exists_on(file_name):
    185                 vendor_id = self.host.run('cat %s' % file_name).stdout.strip()
    186                 if vendor_id:
    187                     usb_device_dict['deviceInfo']['vendorId'] = vendor_id
    188                     usb_device_dict['deviceInfo']['devicePath'] = d_path
    189                     usb_device_dict['deviceInfo']['productId'] = (
    190                             self.get_product_info(d_path, 'idProduct'))
    191                     usb_device_dict['deviceInfo']['productName'] = (
    192                             self.get_product_info(d_path, 'product'))
    193                     usb_device_dict['deviceInfo']['manufacturer'] = (
    194                             self.get_product_info(d_path, 'manufacturer'))
    195         return usb_device_dict
    196 
    197 
    198     def get_product_info(self, directory, prod_string):
    199         """Gets the product id, name and manufacturer info from device path.
    200 
    201         @param directory: Driver path for the USB device.
    202         @param prod_string: Device attribute string.
    203         returns the output of the cat command
    204 
    205         """
    206         product_file_name = os.path.join(directory, prod_string)
    207         if self._exists_on(product_file_name):
    208             return self.host.run('cat %s' % product_file_name).stdout.strip()
    209         return None
    210 
    211 
    212     def _exists_on(self, path):
    213         """Checks if file exists on host or not.
    214 
    215         @returns True or False
    216         """
    217         return self.host.run('ls %s' % path,
    218                              ignore_status=True).exit_status == 0
    219 
    220 
    221     def check_lsusb_diff(self, original_lsusb_output):
    222         """Compare LSUSB output for before and after connecting each device.
    223 
    224         @param original_lsusb_output: lsusb output prior to connecting device.
    225         @returns the difference between new and old lsusb outputs
    226 
    227         """
    228         lsusb_output = (self.host.run('lsusb', ignore_status=True).
    229                         stdout.strip().split('\n'))
    230         lsusb_diff = (list(set(lsusb_output).
    231                       difference(set(original_lsusb_output))))
    232         return lsusb_diff
    233 
    234 
    235     def run_once(self, host, loop_count):
    236         """Main function to run the autotest.
    237 
    238         @param host: Host object representing the DUT.
    239         @param loop_count: Number of iteration cycles.
    240         @raise error.TestFail if one or more USB devices are not detected
    241 
    242         """
    243         self.host = host
    244         self.usb_mux = USBMuxController(self.host)
    245 
    246         # Make sure all USB ports are disabled prior to starting the test.
    247         self.usb_mux.disable_all_ports()
    248 
    249         self.host.servo.switch_usbkey('dut')
    250         self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
    251 
    252         self.set_hub_power(False)
    253         # Collect the USB devices directories before switching on hub
    254         usb_list_dir_off = self.get_usb_device_dirs()
    255 
    256         self.set_hub_power(True)
    257         # Collect the USB devices directories after switching on hub
    258         usb_list_dir_on = self.get_usb_device_dirs()
    259 
    260         lsusb_original_out = (self.host.run('lsusb', ignore_status=True).
    261                                  stdout.strip().split('\n'))
    262         list_of_usb_device_dictionaries = list()
    263         usb_port = 0
    264 
    265         # Extract connected USB device information and store it in a dict.
    266         while usb_port < MAX_PORTS:
    267             usb_device_dict = {'usb_port':None,'deviceInfo':
    268                     {'devicePath':None,'vendorId':None,'productId':None,
    269                     'productName':None,'manufacturer':None,'lsusb':None}}
    270             usb_device_dir_list = list()
    271             self.usb_mux.enable_port(usb_port)
    272             try:
    273                 utils.poll_for_condition(
    274                         lambda: self.check_lsusb_diff(lsusb_original_out),
    275                         exception=utils.TimeoutError('No USB device on port '
    276                         '%d' % usb_port), timeout=_WAIT_DELAY, sleep_interval=1)
    277             except utils.TimeoutError:
    278                 logging.debug('No USB device found on port %d', usb_port)
    279                 pass
    280 
    281             # Maintain list of associated dirs for each connected USB device
    282             for device in self.get_usb_device_dirs():
    283                 if device not in usb_list_dir_on:
    284                     usb_device_dir_list.append(device)
    285 
    286             usb_device_dict = self.parse_device_dir_for_info(
    287                     usb_device_dir_list, usb_device_dict)
    288 
    289             lsusb_diff = self.check_lsusb_diff(lsusb_original_out)
    290             if lsusb_diff:
    291                 usb_device_dict['usb_port'] = usb_port
    292                 usb_device_dict['deviceInfo']['lsusb'] = lsusb_diff[0]
    293                 list_of_usb_device_dictionaries.append(usb_device_dict)
    294 
    295             self.usb_mux.disable_all_ports()
    296             try:
    297                 utils.poll_for_condition(
    298                         lambda: not self.check_lsusb_diff(lsusb_original_out),
    299                         exception=utils.TimeoutError('Timed out waiting for '
    300                         'USB device to disappear.'), timeout=_WAIT_DELAY,
    301                         sleep_interval=1)
    302             except utils.TimeoutError:
    303                 logging.debug('Timed out waiting for USB device to disappear.')
    304                 pass
    305             logging.info('%s', usb_device_dict)
    306             usb_port += 1
    307 
    308         if len(list_of_usb_device_dictionaries) == 0:
    309             # Fails if no devices detected
    310             raise error.TestError('No connected devices were detected. Make '
    311                                   'sure the devices are connected to USB_KEY '
    312                                   'and DUT_HUB1_USB on the servo board.')
    313         logging.info('Connected devices list: %s',
    314                       list_of_usb_device_dictionaries)
    315 
    316         # loop_count defines the number of times the USB peripheral details
    317         # should be checked and random.choice is used to randomly select one of
    318         # the elements from the usb device list specifying the device whose
    319         # details should be checked.
    320         for i in xrange(loop_count):
    321             self.check_usb_peripherals_details(
    322                     random.choice(list_of_usb_device_dictionaries))
    323 
    324         logging.info('Sequence of ports tested with random picker: %s',
    325                       ', '.join(map(str, PORT_BEING_TESTED)))
    326 
    327         if TMP_FAILED_TEST_LIST:
    328             logging.info('Failed to verify devices on following ports: %s',
    329                          ', '.join(map(str, TMP_FAILED_TEST_LIST)))
    330             raise error.TestFail('Failed to do full device verification on '
    331                                  'some ports.')
    332