1 # Copyright (c) 2016 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import re 7 import time 8 9 from autotest_lib.client.common_lib import error 10 from autotest_lib.client.common_lib.cros import crash_detector 11 from autotest_lib.client.common_lib.cros import tpm_utils 12 from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices 13 from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector 14 from autotest_lib.server import test 15 from autotest_lib.server.cros.multimedia import remote_facade_factory 16 17 _SHORT_TIMEOUT = 5 18 19 # Constants for Hangouts UI peripherals names. Each entry is a tuple in the form 20 # of (<hangouts_name_regex>, <USB device spec>). We use regular expressions for 21 # the labels since these are not constant values, i.e. they depend on which USB 22 # port a device has been plugged in to. 23 # TODO(dtosic): Move this out to a separate util so other tests can reuse it. 24 _HANGOUTS_UI_CAMERA_NAMES = [ 25 (r'HD Pro Webcam C920 \(046d:082d\)', cfm_usb_devices.HD_PRO_WEBCAM_C920), 26 (r'Logitech Webcam C930e \(046d:0843\)', 27 cfm_usb_devices.LOGITECH_WEBCAM_C930E), 28 ] 29 30 _HANGOUTS_UI_MICROPHONE_NAMES = [ 31 (r'Jabra SPEAK 410', cfm_usb_devices.JABRA_SPEAK_410), 32 # Some cameras also have integrated microphones. 33 (r'HD Pro Webcam C920: USB Audio:[0-9],[0-9]', 34 cfm_usb_devices.HD_PRO_WEBCAM_C920), 35 (r'Logitech Webcam C930e: USB Audio:[0-9],[0-9]', 36 cfm_usb_devices.LOGITECH_WEBCAM_C930E), 37 ] 38 39 _HANGOUTS_UI_SPEAKER_NAMES = [ 40 (r'Jabra SPEAK 410', cfm_usb_devices.JABRA_SPEAK_410), 41 ] 42 43 def _get_usb_spec_for_hangouts_device_name(device_name, name_to_spec_map): 44 """Returns a USB spec (if found) for the specified name, or None.""" 45 for name_regex, usb_spec in name_to_spec_map: 46 if re.match(name_regex, device_name): 47 return usb_spec 48 return None 49 50 51 class _Peripherals(object): 52 """Utility class for storing peripherals names by type.""" 53 54 def __init__(self): 55 self._dict = { 56 'Microphone': [], 57 'Speaker': [], 58 'Camera':[] 59 } 60 61 def add_mic(self, spec): 62 """Registers a mic name.""" 63 self._add_device('Microphone', spec) 64 65 def add_speaker(self, spec): 66 """Registers a speaker name.""" 67 self._add_device('Speaker', spec) 68 69 def add_camera(self, spec): 70 """Registers a camera name.""" 71 self._add_device('Camera', spec) 72 73 def _add_device(self, type, spec): 74 self._dict[type].append(str(spec)) 75 76 def _get_by_type(self, type): 77 return self._dict[type] 78 79 def get_diff(self, other_peripherals): 80 """Returns a diff dictonary for other_peripherals.""" 81 peripherals_diff = {} 82 for type in self._dict: 83 self_devices = set(self._get_by_type(type)) 84 other_devices = set(other_peripherals._get_by_type(type)) 85 type_diff = other_devices.difference(self_devices) 86 if type_diff: 87 peripherals_diff[type] = type_diff 88 return peripherals_diff 89 90 def __repr__(self): 91 return str(self._dict) 92 93 94 class enterprise_CFM_USBPeripheralHotplugDetect(test.test): 95 """ 96 Uses servo to hotplug and detect USB peripherals on CrOS and hotrod. 97 98 It compares attached audio/video peripheral names on CrOS against what 99 Hangouts detects. 100 """ 101 version = 1 102 103 104 def _get_usb_device_types(self, vid_pid): 105 """ 106 Returns a list of types (based on lsusb) for the specified vid:pid. 107 108 @param vid_pid: The vid:pid of the device to query. 109 @returns List of device types (string). 110 """ 111 details_list = [] 112 cmd = 'lsusb -v -d %s' % vid_pid 113 cmd_out = self.client.run(cmd).stdout.strip().split('\n') 114 for line in cmd_out: 115 if (any(phrase in line for phrase in ('bInterfaceClass', 116 'wTerminalType'))): 117 device_type = line.split(None)[2] 118 details_list.append(device_type) 119 120 return details_list 121 122 123 def _get_cros_usb_peripherals(self, peripherals_to_check): 124 """ 125 Queries cros for connected USB devices. 126 127 @param peripherals_to_check: A list of peripherals to query. If a 128 connected USB device is not in this list it will be ignored. 129 @returns A _Peripherals object 130 """ 131 cros_peripherals = _Peripherals() 132 device_collector = usb_device_collector.UsbDeviceCollector(self.client) 133 for device in device_collector.get_usb_devices(): 134 vid_pid = device.vid_pid 135 device_types = self._get_usb_device_types(vid_pid) 136 device_spec = cfm_usb_devices.get_usb_device_spec(vid_pid) 137 if device_spec in peripherals_to_check: 138 if 'Microphone' in device_types: 139 cros_peripherals.add_mic(device_spec) 140 if 'Speaker' in device_types: 141 cros_peripherals.add_speaker(device_spec) 142 if 'Video' in device_types: 143 cros_peripherals.add_camera(device_spec) 144 return cros_peripherals 145 146 147 def _enroll_device_and_skip_oobe(self): 148 """Enroll device into CFM and skip CFM oobe.""" 149 self.cfm_facade.enroll_device() 150 self.cfm_facade.skip_oobe_after_enrollment() 151 self.cfm_facade.wait_for_hangouts_telemetry_commands() 152 153 154 def _get_connected_cfm_hangouts_peripherals(self, peripherals_to_check): 155 """ 156 Gets peripheral information as reported by Hangouts. 157 158 @param peripherals_to_check: A list of peripherals to query. If a 159 connected USB device is not in this list it will be ignored. 160 @returns A _Peripherals object 161 """ 162 cfm_peripherals = _Peripherals() 163 for mic_name in self.cfm_facade.get_mic_devices(): 164 mic_spec = _get_usb_spec_for_hangouts_device_name( 165 mic_name, _HANGOUTS_UI_MICROPHONE_NAMES) 166 if mic_spec and mic_spec in peripherals_to_check: 167 cfm_peripherals.add_mic(mic_spec) 168 169 for speaker_name in self.cfm_facade.get_speaker_devices(): 170 speaker_spec = _get_usb_spec_for_hangouts_device_name( 171 speaker_name, _HANGOUTS_UI_SPEAKER_NAMES) 172 if speaker_spec and speaker_spec in peripherals_to_check: 173 cfm_peripherals.add_speaker(speaker_spec) 174 175 for camera_name in self.cfm_facade.get_camera_devices(): 176 logging.info("Ccamera_name: %s", camera_name) 177 camera_spec = _get_usb_spec_for_hangouts_device_name( 178 camera_name, _HANGOUTS_UI_CAMERA_NAMES) 179 logging.info("camera_spec: %s", camera_spec) 180 if camera_spec and camera_spec in peripherals_to_check: 181 cfm_peripherals.add_camera(camera_spec) 182 return cfm_peripherals 183 184 185 def _upload_crash_count(self, count): 186 """Uploads crash count based on length of crash_files list.""" 187 self.output_perf_value(description='number_of_crashes', 188 value=int(count), 189 units='count', higher_is_better=False) 190 191 192 def run_once(self, host, peripherals_to_check): 193 """ 194 Main function to run autotest. 195 196 @param host: Host object representing the DUT. 197 @param peripherals_to_check: List of USB specs to check. 198 """ 199 self.client = host 200 self.crash_list =[] 201 202 factory = remote_facade_factory.RemoteFacadeFactory( 203 host, no_chrome=True) 204 self.cfm_facade = factory.create_cfm_facade() 205 206 detect_crash = crash_detector.CrashDetector(self.client) 207 208 tpm_utils.ClearTPMOwnerRequest(self.client) 209 210 factory = remote_facade_factory.RemoteFacadeFactory( 211 host, no_chrome=True) 212 self.cfm_facade = factory.create_cfm_facade() 213 214 if detect_crash.is_new_crash_present(): 215 self.crash_list.append('New Warning or Crash Detected before ' + 216 'plugging in usb peripherals.') 217 218 # Turns on the USB port on the servo so that any peripheral connected to 219 # the DUT it is visible. 220 if self.client.servo: 221 self.client.servo.switch_usbkey('dut') 222 self.client.servo.set('usb_mux_sel3', 'dut_sees_usbkey') 223 time.sleep(_SHORT_TIMEOUT) 224 self.client.servo.set('dut_hub1_rst1', 'off') 225 time.sleep(_SHORT_TIMEOUT) 226 227 if detect_crash.is_new_crash_present(): 228 self.crash_list.append('New Warning or Crash Detected after ' + 229 'plugging in usb peripherals.') 230 231 cros_peripherals = self._get_cros_usb_peripherals( 232 peripherals_to_check) 233 logging.info('Peripherals detected by CrOS: %s', cros_peripherals) 234 235 try: 236 self._enroll_device_and_skip_oobe() 237 cfm_peripherals = self._get_connected_cfm_hangouts_peripherals( 238 peripherals_to_check) 239 logging.info('Peripherals detected by hangouts: %s', 240 cfm_peripherals) 241 except Exception as e: 242 exception_msg = str(e) 243 if self.crash_list: 244 crash_identified_at = (' ').join(self.crash_list) 245 exception_msg += '. ' + crash_identified_at 246 self._upload_crash_count(len(detect_crash.get_crash_files())) 247 raise error.TestFail(str(exception_msg)) 248 249 if detect_crash.is_new_crash_present(): 250 self.crash_list.append('New Warning or Crash detected after ' 251 'device enrolled into CFM.') 252 253 tpm_utils.ClearTPMOwnerRequest(self.client) 254 255 if self.crash_list: 256 crash_identified_at = (', ').join(self.crash_list) 257 else: 258 crash_identified_at = 'No Crash or Warning detected.' 259 260 self._upload_crash_count(len(detect_crash.get_crash_files())) 261 262 peripherals_diff = cfm_peripherals.get_diff(cros_peripherals) 263 if peripherals_diff: 264 raise error.TestFail( 265 'Peripherals do not match.\n' 266 'Diff: {0} \n Cros: {1} \n Hangouts: {2} \n.' 267 'No of Crashes: {3}. Crashes: {4}'.format( 268 peripherals_diff, cros_peripherals, cfm_peripherals, 269 len(detect_crash.get_crash_files()), crash_identified_at)) 270