Home | History | Annotate | Download | only in enterprise_CFM_USBPeripheralHotplugDetect
      1 # Copyright (c) 2016 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import re
      7 import time
      8 
      9 from autotest_lib.client.common_lib import error
     10 from autotest_lib.client.common_lib.cros import crash_detector
     11 from autotest_lib.client.common_lib.cros import tpm_utils
     12 from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices
     13 from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector
     14 from autotest_lib.server import test
     15 from autotest_lib.server.cros.multimedia import remote_facade_factory
     16 
     17 _SHORT_TIMEOUT = 5
     18 
     19 # Constants for Hangouts UI peripherals names. Each entry is a tuple in the form
     20 # of (<hangouts_name_regex>, <USB device spec>). We use regular expressions for
     21 # the labels since these are not constant values, i.e. they depend on which USB
     22 # port a device has been plugged in to.
     23 # TODO(dtosic): Move this out to a separate util so other tests can reuse it.
     24 _HANGOUTS_UI_CAMERA_NAMES = [
     25     (r'HD Pro Webcam C920 \(046d:082d\)', cfm_usb_devices.HD_PRO_WEBCAM_C920),
     26     (r'Logitech Webcam C930e \(046d:0843\)',
     27         cfm_usb_devices.LOGITECH_WEBCAM_C930E),
     28 ]
     29 
     30 _HANGOUTS_UI_MICROPHONE_NAMES = [
     31     (r'Jabra SPEAK 410', cfm_usb_devices.JABRA_SPEAK_410),
     32     # Some cameras also have integrated microphones.
     33     (r'HD Pro Webcam C920: USB Audio:[0-9],[0-9]',
     34         cfm_usb_devices.HD_PRO_WEBCAM_C920),
     35     (r'Logitech Webcam C930e: USB Audio:[0-9],[0-9]',
     36         cfm_usb_devices.LOGITECH_WEBCAM_C930E),
     37 ]
     38 
     39 _HANGOUTS_UI_SPEAKER_NAMES = [
     40     (r'Jabra SPEAK 410', cfm_usb_devices.JABRA_SPEAK_410),
     41 ]
     42 
     43 def _get_usb_spec_for_hangouts_device_name(device_name, name_to_spec_map):
     44     """Returns a USB spec (if found) for the specified name, or None."""
     45     for name_regex, usb_spec in name_to_spec_map:
     46         if re.match(name_regex, device_name):
     47             return usb_spec
     48     return None
     49 
     50 
     51 class _Peripherals(object):
     52     """Utility class for storing peripherals names by type."""
     53 
     54     def __init__(self):
     55         self._dict = {
     56             'Microphone': [],
     57             'Speaker': [],
     58             'Camera':[]
     59         }
     60 
     61     def add_mic(self, spec):
     62         """Registers a mic name."""
     63         self._add_device('Microphone', spec)
     64 
     65     def add_speaker(self, spec):
     66         """Registers a speaker name."""
     67         self._add_device('Speaker', spec)
     68 
     69     def add_camera(self, spec):
     70         """Registers a camera name."""
     71         self._add_device('Camera', spec)
     72 
     73     def _add_device(self, type, spec):
     74         self._dict[type].append(str(spec))
     75 
     76     def _get_by_type(self, type):
     77         return self._dict[type]
     78 
     79     def get_diff(self, other_peripherals):
     80         """Returns a diff dictonary for other_peripherals."""
     81         peripherals_diff = {}
     82         for type in self._dict:
     83             self_devices = set(self._get_by_type(type))
     84             other_devices = set(other_peripherals._get_by_type(type))
     85             type_diff = other_devices.difference(self_devices)
     86             if type_diff:
     87                 peripherals_diff[type] = type_diff
     88         return peripherals_diff
     89 
     90     def __repr__(self):
     91      return str(self._dict)
     92 
     93 
     94 class enterprise_CFM_USBPeripheralHotplugDetect(test.test):
     95     """
     96     Uses servo to hotplug and detect USB peripherals on CrOS and hotrod.
     97 
     98     It compares attached audio/video peripheral names on CrOS against what
     99     Hangouts detects.
    100     """
    101     version = 1
    102 
    103 
    104     def _get_usb_device_types(self, vid_pid):
    105         """
    106         Returns a list of types (based on lsusb) for the specified vid:pid.
    107 
    108         @param vid_pid: The vid:pid of the device to query.
    109         @returns List of device types (string).
    110         """
    111         details_list = []
    112         cmd = 'lsusb -v -d %s' % vid_pid
    113         cmd_out = self.client.run(cmd).stdout.strip().split('\n')
    114         for line in cmd_out:
    115             if (any(phrase in line for phrase in ('bInterfaceClass',
    116                     'wTerminalType'))):
    117                 device_type = line.split(None)[2]
    118                 details_list.append(device_type)
    119 
    120         return details_list
    121 
    122 
    123     def _get_cros_usb_peripherals(self, peripherals_to_check):
    124         """
    125         Queries cros for connected USB devices.
    126 
    127         @param peripherals_to_check: A list of peripherals to query. If a
    128             connected USB device is not in this list it will be ignored.
    129         @returns  A _Peripherals object
    130         """
    131         cros_peripherals = _Peripherals()
    132         device_collector = usb_device_collector.UsbDeviceCollector(self.client)
    133         for device in device_collector.get_usb_devices():
    134             vid_pid = device.vid_pid
    135             device_types = self._get_usb_device_types(vid_pid)
    136             device_spec = cfm_usb_devices.get_usb_device_spec(vid_pid)
    137             if device_spec in peripherals_to_check:
    138                 if 'Microphone' in device_types:
    139                     cros_peripherals.add_mic(device_spec)
    140                 if 'Speaker' in device_types:
    141                     cros_peripherals.add_speaker(device_spec)
    142                 if 'Video' in device_types:
    143                     cros_peripherals.add_camera(device_spec)
    144         return cros_peripherals
    145 
    146 
    147     def _enroll_device_and_skip_oobe(self):
    148         """Enroll device into CFM and skip CFM oobe."""
    149         self.cfm_facade.enroll_device()
    150         self.cfm_facade.skip_oobe_after_enrollment()
    151         self.cfm_facade.wait_for_hangouts_telemetry_commands()
    152 
    153 
    154     def _get_connected_cfm_hangouts_peripherals(self, peripherals_to_check):
    155         """
    156         Gets peripheral information as reported by Hangouts.
    157 
    158         @param peripherals_to_check: A list of peripherals to query. If a
    159             connected USB device is not in this list it will be ignored.
    160         @returns  A _Peripherals object
    161         """
    162         cfm_peripherals = _Peripherals()
    163         for mic_name in self.cfm_facade.get_mic_devices():
    164             mic_spec = _get_usb_spec_for_hangouts_device_name(
    165                 mic_name, _HANGOUTS_UI_MICROPHONE_NAMES)
    166             if mic_spec and mic_spec in peripherals_to_check:
    167                 cfm_peripherals.add_mic(mic_spec)
    168 
    169         for speaker_name in self.cfm_facade.get_speaker_devices():
    170             speaker_spec = _get_usb_spec_for_hangouts_device_name(
    171                 speaker_name, _HANGOUTS_UI_SPEAKER_NAMES)
    172             if speaker_spec and speaker_spec in peripherals_to_check:
    173                 cfm_peripherals.add_speaker(speaker_spec)
    174 
    175         for camera_name in self.cfm_facade.get_camera_devices():
    176             logging.info("Ccamera_name: %s", camera_name)
    177             camera_spec = _get_usb_spec_for_hangouts_device_name(
    178                 camera_name, _HANGOUTS_UI_CAMERA_NAMES)
    179             logging.info("camera_spec: %s", camera_spec)
    180             if camera_spec and camera_spec in peripherals_to_check:
    181                 cfm_peripherals.add_camera(camera_spec)
    182         return cfm_peripherals
    183 
    184 
    185     def _upload_crash_count(self, count):
    186         """Uploads crash count based on length of crash_files list."""
    187         self.output_perf_value(description='number_of_crashes',
    188                                value=int(count),
    189                                units='count', higher_is_better=False)
    190 
    191 
    192     def run_once(self, host, peripherals_to_check):
    193         """
    194         Main function to run autotest.
    195 
    196         @param host: Host object representing the DUT.
    197         @param peripherals_to_check: List of USB specs to check.
    198         """
    199         self.client = host
    200         self.crash_list =[]
    201 
    202         factory = remote_facade_factory.RemoteFacadeFactory(
    203                 host, no_chrome=True)
    204         self.cfm_facade = factory.create_cfm_facade()
    205 
    206         detect_crash = crash_detector.CrashDetector(self.client)
    207 
    208         tpm_utils.ClearTPMOwnerRequest(self.client)
    209 
    210         factory = remote_facade_factory.RemoteFacadeFactory(
    211                 host, no_chrome=True)
    212         self.cfm_facade = factory.create_cfm_facade()
    213 
    214         if detect_crash.is_new_crash_present():
    215             self.crash_list.append('New Warning or Crash Detected before ' +
    216                                    'plugging in usb peripherals.')
    217 
    218         # Turns on the USB port on the servo so that any peripheral connected to
    219         # the DUT it is visible.
    220         if self.client.servo:
    221             self.client.servo.switch_usbkey('dut')
    222             self.client.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
    223             time.sleep(_SHORT_TIMEOUT)
    224             self.client.servo.set('dut_hub1_rst1', 'off')
    225             time.sleep(_SHORT_TIMEOUT)
    226 
    227         if detect_crash.is_new_crash_present():
    228             self.crash_list.append('New Warning or Crash Detected after ' +
    229                                    'plugging in usb peripherals.')
    230 
    231         cros_peripherals = self._get_cros_usb_peripherals(
    232             peripherals_to_check)
    233         logging.info('Peripherals detected by CrOS: %s', cros_peripherals)
    234 
    235         try:
    236             self._enroll_device_and_skip_oobe()
    237             cfm_peripherals = self._get_connected_cfm_hangouts_peripherals(
    238                 peripherals_to_check)
    239             logging.info('Peripherals detected by hangouts: %s',
    240                           cfm_peripherals)
    241         except Exception as e:
    242             exception_msg = str(e)
    243             if self.crash_list:
    244                 crash_identified_at = (' ').join(self.crash_list)
    245                 exception_msg += '. ' + crash_identified_at
    246                 self._upload_crash_count(len(detect_crash.get_crash_files()))
    247             raise error.TestFail(str(exception_msg))
    248 
    249         if detect_crash.is_new_crash_present():
    250             self.crash_list.append('New Warning or Crash detected after '
    251                                    'device enrolled into CFM.')
    252 
    253         tpm_utils.ClearTPMOwnerRequest(self.client)
    254 
    255         if self.crash_list:
    256             crash_identified_at = (', ').join(self.crash_list)
    257         else:
    258             crash_identified_at = 'No Crash or Warning detected.'
    259 
    260         self._upload_crash_count(len(detect_crash.get_crash_files()))
    261 
    262         peripherals_diff = cfm_peripherals.get_diff(cros_peripherals)
    263         if peripherals_diff:
    264             raise error.TestFail(
    265                 'Peripherals do not match.\n'
    266                 'Diff: {0} \n Cros: {1} \n Hangouts: {2} \n.'
    267                 'No of Crashes: {3}. Crashes: {4}'.format(
    268                 peripherals_diff, cros_peripherals, cfm_peripherals,
    269                 len(detect_crash.get_crash_files()), crash_identified_at))
    270