Home | History | Annotate | Download | only in kernel_ExternalUsbPeripheralsDetectionTest
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging, os, re, time
      6 
      7 from autotest_lib.server import test
      8 from autotest_lib.client.common_lib import error
      9 
     10 _WAIT_DELAY = 25
     11 _USB_DIR = '/sys/bus/usb/devices'
     12 
     13 class kernel_ExternalUsbPeripheralsDetectionTest(test.test):
     14     """Uses servo to repeatedly connect/remove USB devices during boot."""
     15     version = 1
     16 
     17 
     18     def set_hub_power(self, on=True):
     19         """Setting USB hub power status
     20 
     21         @param: on To power on the servo-usb hub or not
     22 
     23         """
     24         reset = 'off'
     25         if not on:
     26             reset = 'on'
     27         self.host.servo.set('dut_hub1_rst1', reset)
     28         self.pluged_status = on
     29         time.sleep(_WAIT_DELAY)
     30 
     31 
     32     def check_usb_peripherals_details(self):
     33         """Checks the effect from plugged in USB peripherals.
     34 
     35         @returns True if command line output is matched successfuly; Else False
     36         """
     37         failed = list()
     38         for cmd in self.usb_checks.keys():
     39             out_match_list = self.usb_checks.get(cmd)
     40             logging.info('Running %s',  cmd)
     41 
     42             # Run the usb check command
     43             cmd_out_lines = (self.host.run(cmd, ignore_status=True).
     44                              stdout.strip().split('\n'))
     45             for out_match in out_match_list:
     46                 match_result = False
     47                 for cmd_out_line in cmd_out_lines:
     48                     match_result = (match_result or
     49                         re.search(out_match, cmd_out_line) != None)
     50                 if not match_result:
     51                     failed.append((cmd,out_match))
     52         return failed
     53 
     54 
     55     def get_usb_device_dirs(self):
     56         """Gets the usb device dirs from _USB_DIR path.
     57 
     58         @returns list with number of device dirs else None
     59         """
     60         usb_dir_list = []
     61         cmd = 'ls -1 %s' % _USB_DIR
     62         tmp = self.host.run(cmd).stdout.strip().split('\n')
     63         for d in tmp:
     64             usb_dir_list.append(os.path.join(_USB_DIR, d))
     65         return usb_dir_list
     66 
     67 
     68     def get_vendor_id_dict_from_dut(self, dir_list):
     69         """Finds the vendor id from provided dir list.
     70 
     71         @param dir_list: full path of directories
     72         @returns dict of all vendor ids vs file path
     73         """
     74         vendor_id_dict = dict()
     75         for d in dir_list:
     76             file_name = os.path.join(d, 'idVendor')
     77             if self._exists_on(file_name):
     78                 vendor_id = self.host.run('cat %s' % file_name).stdout.strip()
     79                 if vendor_id:
     80                     vendor_id_dict[vendor_id] = d
     81         logging.info('%s', vendor_id_dict)
     82         return vendor_id_dict
     83 
     84 
     85     def _exists_on(self, path):
     86         """Checks if file exists on host or not.
     87 
     88         @returns True or False
     89         """
     90         return self.host.run('ls %s' % path,
     91                              ignore_status=True).exit_status == 0
     92 
     93 
     94 
     95     def run_once(self, host, usb_checks=None,
     96                  vendor_id_dict_control_file=None):
     97         """Main function to run the autotest.
     98 
     99         @param host: name of the host
    100         @param usb_checks: dictionary defined in control file
    101         @param vendor_id_list: dictionary defined in control file
    102         """
    103         self.host = host
    104         self.usb_checks = usb_checks
    105 
    106         self.host.servo.switch_usbkey('dut')
    107         self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
    108         time.sleep(_WAIT_DELAY)
    109 
    110         self.set_hub_power(False)
    111         # Collect the USB devices directories before switching on hub
    112         usb_list_dir_off = self.get_usb_device_dirs()
    113 
    114         self.set_hub_power(True)
    115         # Collect the USB devices directories after switching on hub
    116         usb_list_dir_on = self.get_usb_device_dirs()
    117 
    118         diff_list = list(set(usb_list_dir_on).difference(set(usb_list_dir_off)))
    119         if len(diff_list) == 0:
    120             # Fail if no devices detected after
    121             raise error.TestError('No connected devices were detected. Make '
    122                                   'sure the devices are connected to USB_KEY '
    123                                   'and DUT_HUB1_USB on the servo board.')
    124         logging.debug('Connected devices list: %s', diff_list)
    125 
    126         # Test 1: check USB peripherals info in detail
    127         failed = self.check_usb_peripherals_details()
    128         if len(failed)> 0:
    129             raise error.TestError('USB device not detected %s', str(failed))
    130 
    131         # Test 2: check USB device dir under /sys/bus/usb/devices
    132         vendor_ids = {}
    133         # Gets a dict idVendor: dir_path
    134         vendor_ids = self.get_vendor_id_dict_from_dut(diff_list)
    135         for vid in vendor_id_dict_control_file.keys():
    136             peripheral = vendor_id_dict_control_file[vid]
    137             if vid not in vendor_ids.keys():
    138                 raise error.TestFail('%s is not detected at %s dir'
    139                                      % (peripheral, _USB_DIR))
    140             else:
    141             # Test 3: check driver symlink and dir for each USB device
    142                 tmp_list = [device_dir for device_dir in
    143                             self.host.run('ls -1 %s' % vendor_ids[vid],
    144                             ignore_status=True).stdout.split('\n')
    145                             if re.match(r'\d-\d.*:\d\.\d', device_dir)]
    146                 if not tmp_list:
    147                     raise error.TestFail('No driver created/loaded for %s'
    148                                          % peripheral)
    149                 logging.info('---- Drivers for %s ----', peripheral)
    150                 flag = False
    151                 for device_dir in tmp_list:
    152                     driver_path = os.path.join(vendor_ids[vid],
    153                                                '%s/driver' % device_dir)
    154                     if self._exists_on(driver_path):
    155                         flag = True
    156                         link = (self.host.run('ls -l %s | grep ^l'
    157                                               '| grep driver'
    158                                               % driver_path, ignore_status=True)
    159                                               .stdout.strip())
    160                         logging.info('%s', link)
    161                 if not flag:
    162                     raise error.TestFail('Driver for %s is not loaded - %s'
    163                                          % (peripheral, driver_path))
    164