1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import time 7 8 from autotest_lib.client.common_lib import error 9 from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 10 from autotest_lib.server.cros.network import attenuator_controller 11 from autotest_lib.server.cros.network import hostap_config 12 from autotest_lib.server.cros.network import wifi_cell_test_base 13 14 ATTENUATION_STEP = 4 15 FINAL_ATTENUATION = 90 16 ATTENUATORS_PER_PHY = 2 17 18 LOW_POWER_SIGNAL = -75 19 20 class AttenuatorInfo(object): 21 """Contains debug information about an attenuator.""" 22 23 def __init__(self): 24 self.attenuator_zeros = False 25 self.zeroed_scan_signal = None 26 self.allows_connection = False 27 self._zeroed_linked_signal = None 28 29 30 @property 31 def zeroed_linked_signal(self): 32 """Returns the linked signal as a float.""" 33 return self._zeroed_linked_signal 34 35 36 @zeroed_linked_signal.setter 37 def zeroed_linked_signal(self, value): 38 """Sets the linked signal to a float. 39 40 @param value: the linked signal as a float 41 42 """ 43 if (self._zeroed_linked_signal is None or 44 value > self._zeroed_linked_signal): 45 self._zeroed_linked_signal = value 46 47 48 def healthy_attenuator(self): 49 """Returns True if the attenuator looks good; False otherwise.""" 50 if (not self.allows_connection and 51 self.zeroed_scan_signal is None): 52 return False 53 elif not self.attenuator_zeros: 54 return False 55 if (self.zeroed_scan_signal < LOW_POWER_SIGNAL and 56 self.zeroed_linked_signal < LOW_POWER_SIGNAL): 57 return False 58 return True 59 60 61 class network_WiFi_VerifyAttenuator(wifi_cell_test_base.WiFiCellTestBase): 62 """Test that all connected attenuators are functioning correctly.""" 63 version = 1 64 65 66 def _refresh_ap_ssids(self, frequency): 67 """Start up new APs, with unique SSIDs. 68 69 Doing this before each connection attempt in the test prevents 70 spillover from previous connection attempts interfering with 71 our intentions. 72 73 @param frequency: int WiFi frequency to configure the APs on. 74 75 """ 76 ap_config = hostap_config.HostapConfig( 77 frequency=frequency, 78 mode=hostap_config.HostapConfig.MODE_11N_PURE) 79 self.context.router.deconfig_aps() 80 self._all_ssids = list() 81 for i in range(self.num_phys): 82 self.context.configure(ap_config, multi_interface=True) 83 self._all_ssids.append(self.context.router.get_ssid(instance=i)) 84 85 86 def _get_phy_num_for_instance(self, instance): 87 """Get the phy number corresponding to a hostapd instance. 88 89 @param instance: int hostapd instance to test against. 90 @return int phy number corresponding to that AP (e.g. 91 for phy0 return 0). 92 93 """ 94 phy = self.context.router.get_hostapd_phy(instance) 95 if not phy.startswith('phy'): 96 raise error.TestError('Unexpected phy name %s' % phy) 97 98 return int(phy[3:]) 99 100 101 def _wait_for_good_signal_levels(self, ssid, attenuator_info): 102 """Verify the desired SSID is available with a good signal. 103 104 @param ssid: the ssid as a string 105 @param attenuator_info: dictionary with information about the 106 current attenuator 107 108 @returns an updated attenuator_info dictionary 109 110 """ 111 # In practice it has been observed that going from max attuation 112 # to 0 attenuation may take several scans until the signal is what 113 # is desirable. 114 for _ in range(5): 115 scan_result = self._client_iw_runner.wait_for_scan_result( 116 self._client_if, ssids=[ssid], timeout_seconds=10) 117 if scan_result is None or len(scan_result) == 0: 118 # Device is busy or not results at this time, try again 119 continue 120 for network in scan_result: 121 if network.ssid == ssid and network.signal < LOW_POWER_SIGNAL: 122 logging.info('WARNING: Signal strength is less than ' 123 'optimal (%f) consider re-calibrating or ' 124 'check the conductive cabling.', 125 network.signal) 126 attenuator_info.zeroed_scan_signal = network.signal 127 return attenuator_info 128 elif network.ssid == ssid and network.signal > LOW_POWER_SIGNAL: 129 logging.info('Scan found an acceptable signal strength %f', 130 network.signal) 131 attenuator_info.zeroed_scan_signal = network.signal 132 return attenuator_info 133 raise error.TestError('The desired SSID is not visible, the ' 134 'attenuator may be stuck or broken. ' 135 'OR the AP is in a bad state or is ' 136 'bad, try swapping.') 137 138 139 def _verify_attenuator(self, ap_num, frequency_mhz, attenuator_num): 140 """Verify that each phy has two attenuators controlling its signal. 141 142 @param ap_num: int hostapd instance to test against. 143 @param frequency_mhz: int frequency of the AP. 144 @param attenuator_num: int attenuator num controlling one antenna on 145 the AP. 146 147 @return AttenuatorInfo object 148 149 """ 150 logging.info('Verifying attenuator functionality') 151 ai = AttenuatorInfo() 152 # Remove knowledge of previous networks from shill. 153 self.context.client.shill.init_test_network_state() 154 # Isolate the client entirely. 155 self.context.attenuator.set_variable_attenuation( 156 attenuator_controller.MAX_VARIABLE_ATTENUATION) 157 logging.info('Removing variable attenuation for attenuator=%d', 158 attenuator_num) 159 # But allow one antenna on this phy. 160 self.context.attenuator.set_variable_attenuation( 161 0, attenuator_num=attenuator_num) 162 client_conf = xmlrpc_datatypes.AssociationParameters( 163 ssid=self.context.router.get_ssid(instance=ap_num)) 164 165 logging.info('Waiting for client signal levels to settle.') 166 ai = self._wait_for_good_signal_levels(client_conf.ssid, ai) 167 logging.info('Connecting to %s', client_conf.ssid) 168 assoc_result = xmlrpc_datatypes.deserialize( 169 self.context.client.shill.connect_wifi(client_conf)) 170 if not assoc_result.success: 171 logging.error('Failed to connect to AP %d on attenuator %d', 172 ap_num, attenuator_num) 173 return ai 174 ai.allows_connection = True 175 ai.zeroed_linked_signal = self.context.client.wifi_signal_level 176 logging.info('Connected successfully') 177 start_atten = self.context.attenuator.get_minimal_total_attenuation() 178 for atten in range(start_atten, 179 min(start_atten + 20, FINAL_ATTENUATION), 180 ATTENUATION_STEP): 181 self.context.attenuator.set_total_attenuation( 182 atten, frequency_mhz, attenuator_num=attenuator_num) 183 time.sleep(2) 184 logging.info('Attenuator %d signal at attenuation=%d is %d dBm.', 185 attenuator_num, atten, 186 self.context.client.wifi_signal_level) 187 return ai 188 189 190 def _debug_phy_attenuator_correspondence(self, visible_ssid, hidden_ssid): 191 """Verify that the non-attenuated SSID is the only one that is visble. 192 193 If everything is working correctly then all the DUT should see is one 194 SSID that is not the one which is attenuated. Here are the different 195 possible failure scenarios: 196 - Two network_<blah> SSIDs are visible, both with a strong signal 197 (something greater than -80 dBm) means the rainbow cables on the 198 attenuation rig are backwards. 199 - Two network_<blah> SSIDs are visble, the one which should be 200 hidden is visible with something less than -80 dBm means one 201 of the attenuators is broken. 202 - The attenuated SSID is the only visible one, means that rainbow 203 cables are in the wrong order. 204 - The visible SSID is not seen, means that both attenuators are 205 stuck at max attenuation or there is a cabling problem. 206 207 @param visible_ssid: string of the SSID that should be visible. 208 @param hidden_ssid: string of the SSID that should be hidden 209 210 """ 211 scan_result = self._client_iw_runner.wait_for_scan_result( 212 self._client_if, ssids=[visible_ssid, hidden_ssid]) 213 if scan_result is None or len(scan_result) == 0: 214 raise error.TestFail('No visible SSIDs. Check cables, the ' 215 'attenuators may be stuck') 216 elif (len(scan_result) == 1 and scan_result[0].ssid == hidden_ssid): 217 raise error.TestFail('The wrong network is visible, the rainbow ' 218 'cables are in the wrong order.') 219 elif len(scan_result) > 1: 220 for network in scan_result: 221 if (network.ssid == hidden_ssid): 222 # The SSID that should be hidden from the DUT is not, 223 # along with what is presumably the network that should 224 # be visible. Check the signal strength. 225 if network.signal > LOW_POWER_SIGNAL: 226 raise error.TestFail('Two SSIDs are visible, the ' 227 'rainbow cables may be ' 228 'connected backwards.') 229 else: 230 logging.warning('The attenuated SSID is visible with ' 231 'very low power (%f), the attenuator ' 232 'may be broken, or this is ghost ' 233 'signal; will attempt to connect', 234 network.signal) 235 236 237 def _verify_phy_attenuator_correspondence(self, instance): 238 """Verify that we cannot connect to a phy when it is attenuated. 239 240 Check that putting maximum attenuation on the attenuators expected 241 to gate a particular phy produces the expected result. We should 242 be unable to connect to the corresponding SSID. 243 244 @param instance: int hostapd instance to verify corresponds to 245 a particular 2 attenuators. 246 247 """ 248 logging.info('Verifying attenuator correspondence') 249 # Turn up all attenuation. 250 self.context.attenuator.set_variable_attenuation( 251 attenuator_controller.MAX_VARIABLE_ATTENUATION) 252 # Turn down attenuation for phys other than the instance we're 253 # interested in. 254 for other_instance in [x for x in range(self.num_phys) 255 if x != instance]: 256 other_phy_num = self._get_phy_num_for_instance(other_instance) 257 for attenuator_offset in range(ATTENUATORS_PER_PHY): 258 attenuator_num = (other_phy_num * ATTENUATORS_PER_PHY + 259 attenuator_offset) 260 self.context.attenuator.set_variable_attenuation( 261 0, attenuator_num=attenuator_num) 262 # The other SSID should be available. 263 self._debug_phy_attenuator_correspondence( 264 self.context.router.get_ssid(instance=other_instance), 265 self.context.router.get_ssid(instance=instance)) 266 # We should be unable to connect. 267 client_conf = xmlrpc_datatypes.AssociationParameters( 268 ssid=self.context.router.get_ssid(instance=instance), 269 expect_failure=True) 270 self.context.assert_connect_wifi(client_conf) 271 272 273 def run_once(self): 274 """For each PHY on a router, for 2 and 5 Ghz bands on a PHY: 275 276 1) Set up an AP on the PHY. 277 2) Walk the attenuators from low to high attenuations. 278 3) Measure AP signal as attenuation increases. 279 4) Tester should manually inspect that signal levels decrease linearly 280 and are consistent from attenuator to attenuator. 281 282 """ 283 # Create some re-usable client objects 284 self._client_iw_runner = self.context.client.iw_runner 285 self._client_if = self.context.client.wifi_if 286 287 # Verify the client cell is clean 288 scan_result = self._client_iw_runner.scan(self._client_if) 289 if scan_result and len(scan_result) > 0: 290 raise error.TestError('SSIDs found, the cell is not closed or ' 291 'is not cabled correctly.') 292 293 attenuators_info = list() 294 self.num_phys = len(self.context.router.iw_runner.list_phys()) 295 # Pick channels other than the calibrated ones. 296 for frequency in (2447, 5660): 297 for instance in range(self.num_phys): 298 if self.num_phys > 1: 299 self._refresh_ap_ssids(frequency) 300 self._verify_phy_attenuator_correspondence(instance) 301 phy_num = self._get_phy_num_for_instance(instance) 302 for attenuator_offset in range(ATTENUATORS_PER_PHY): 303 attenuator_num = (phy_num * ATTENUATORS_PER_PHY + 304 attenuator_offset) 305 self._refresh_ap_ssids(frequency) 306 attenuator_info = self._verify_attenuator( 307 instance, frequency, attenuator_num) 308 attenuators_info.append(attenuator_info) 309 310 for info in attenuators_info: 311 if info.healthy_attenuator is False: 312 raise error.TestFail('One or more attenuators are broken!') 313