1 #!/usr/bin/env python3.4 2 # 3 # Copyright (C) 2016 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); you may not 6 # use this file except in compliance with the License. You may obtain a copy of 7 # the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14 # License for the specific language governing permissions and limitations under 15 # the License. 16 17 import pprint 18 import queue 19 20 import acts.base_test 21 import acts.test_utils.wifi.wifi_test_utils as wutils 22 import acts.utils 23 from acts import asserts 24 from acts.controllers.android import SL4AAPIError 25 26 WifiEnums = wutils.WifiEnums 27 28 # Macros for RttParam keywords 29 RttParam = WifiEnums.RttParam 30 # Macros for RttManager 31 Rtt = WifiEnums.Rtt 32 RttBW = WifiEnums.RttBW 33 RttPreamble = WifiEnums.RttPreamble 34 RttPeerType = WifiEnums.RttPeerType 35 RttType = WifiEnums.RttType 36 37 ScanResult = WifiEnums.ScanResult 38 RTT_MARGIN_OF_ERROR = WifiEnums.RTT_MARGIN_OF_ERROR 39 40 class WifiRTTRangingError (Exception): 41 """Error in WifiScanner Rtt.""" 42 43 class WifiRttManagerTest(acts.base_test.BaseTestClass): 44 """Tests for wifi's RttManager APIs.""" 45 tests = None 46 MAX_RTT_AP = 10 47 48 def __init__(self, controllers): 49 acts.base_test.BaseTestClass.__init__(self, controllers) 50 self.tests = ( 51 "test_support_check", 52 "test_invalid_params", 53 "test_capability_check", 54 "test_rtt_ranging_single_AP_stress", 55 "test_regular_scan_then_rtt_ranging_stress", 56 "test_gscan_then_rtt_ranging_stress" 57 ) 58 59 def setup_class(self): 60 self.dut = self.android_devices[0] 61 wutils.wifi_test_device_init(self.dut) 62 required_params = ( 63 "support_models", 64 "stress_num", 65 "vht80_5g", 66 "actual_distance" 67 ) 68 self.unpack_userparams(required_params) 69 asserts.assert_true(self.actual_distance >= 5, 70 "Actual distance should be no shorter than 5 meters.") 71 self.visible_networks = ( 72 self.vht80_5g, 73 ) 74 self.default_rtt_params = { 75 RttParam.request_type: RttType.TYPE_TWO_SIDED, 76 RttParam.device_type: RttPeerType.PEER_TYPE_AP, 77 RttParam.preamble: RttPreamble.PREAMBLE_HT, 78 RttParam.bandwidth: RttBW.BW_80_SUPPORT 79 } 80 # Expected capability for devices that don't support RTT. 81 rtt_cap_neg = { 82 'lcrSupported': False, 83 'bwSupported': 0, 84 'twoSided11McRttSupported': False, 85 'preambleSupported': 0, 86 'oneSidedRttSupported': False, 87 'lciSupported': False 88 } 89 rtt_cap_shamu = { 90 'lcrSupported': False, 91 'bwSupported': 0x1C, 92 'twoSided11McRttSupported': True, 93 'preambleSupported': 6, 94 'oneSidedRttSupported': False, 95 'lciSupported': False 96 } 97 rtt_cap_bullhead = { 98 'lcrSupported': True, 99 'bwSupported': 0x1C, 100 'twoSided11McRttSupported': True, 101 'preambleSupported': 7, 102 'oneSidedRttSupported': True, 103 'lciSupported': True 104 } 105 rtt_cap_angler = { 106 'lcrSupported': True, 107 'bwSupported': 0x1C, 108 'twoSided11McRttSupported': True, 109 'preambleSupported': 6, 110 'oneSidedRttSupported': False, 111 'lciSupported': True 112 } 113 self.rtt_cap_table = { 114 "hammerhead": rtt_cap_neg, 115 "shamu": rtt_cap_shamu, 116 "volantis": rtt_cap_neg, 117 "volantisg": rtt_cap_neg, 118 "bullhead": rtt_cap_bullhead, 119 "angler": rtt_cap_angler 120 } 121 122 """Helper Functions""" 123 def invalid_params_logic(self, rtt_params): 124 try: 125 self.dut.droid.wifiRttStartRanging([rtt_params]) 126 except SL4AAPIError as e: 127 e_str = str(e) 128 asserts.assert_true("IllegalArgumentException" in e_str, 129 "Missing IllegalArgumentException in %s." % e_str) 130 msg = "Got expected exception with invalid param %s." % rtt_params 131 self.log.info(msg) 132 133 def get_rtt_results(self, rtt_params): 134 """Starts RTT ranging and get results. 135 136 Args: 137 rtt_params: A list of dicts each representing an RttParam. 138 139 Returns: 140 Rtt ranging results. 141 """ 142 self.log.debug("Start ranging with:\n%s" % pprint.pformat(rtt_params)) 143 idx = self.dut.droid.wifiRttStartRanging(rtt_params) 144 event = None 145 try: 146 event = self.dut.ed.pop_events("WifiRttRanging%d" % idx, 30) 147 if event[0]["name"].endswith("onSuccess"): 148 results = event[0]["data"]["Results"] 149 result_len = len(results) 150 param_len = len(rtt_params) 151 asserts.assert_true(result_len == param_len, 152 "Expected %d results, got %d." % (param_len, result_len)) 153 # Add acceptable margin of error to results, which will be used 154 # during result processing. 155 for i, r in enumerate(results): 156 bw_mode = rtt_params[i][RttParam.bandwidth] 157 r[RttParam.margin] = RTT_MARGIN_OF_ERROR[bw_mode] 158 self.log.debug(pprint.pformat(event)) 159 return event 160 except queue.Empty: 161 self.log.error("Waiting for RTT event timed out.") 162 return None 163 164 def network_selector(self, network_info): 165 """Decides if a network should be used for rtt ranging. 166 167 There are a few conditions: 168 1. This network supports 80211mc. 169 2. This network's info matches certain conditions. 170 171 This is added to better control which networks to range against instead 172 of blindly use all 80211mc networks in air. 173 174 Args: 175 network_info: A dict representing a WiFi network. 176 177 Returns: 178 True if the input network should be used for ranging, False 179 otherwise. 180 """ 181 target_params = { 182 "is80211McRTTResponder": True, 183 WifiEnums.BSSID_KEY: self.vht80_5g[WifiEnums.BSSID_KEY], 184 } 185 for k, v in target_params.items(): 186 if k not in network_info: 187 return False 188 if type(network_info[k]) is str: 189 network_info[k] = network_info[k].lower() 190 v = v.lower() 191 if network_info[k] != v: 192 return False 193 return True 194 195 def regular_scan_for_rtt_networks(self): 196 """Scans for 11mc-capable WiFi networks using regular wifi scan. 197 198 Networks are selected based on self.network_selector. 199 200 Returns: 201 A list of networks that have RTTResponders. 202 """ 203 wutils.start_wifi_connection_scan(self.dut) 204 networks = self.dut.droid.wifiGetScanResults() 205 rtt_networks = [] 206 for nw in networks: 207 if self.network_selector(nw): 208 rtt_networks.append(nw) 209 return rtt_networks 210 211 def gscan_for_rtt_networks(self): 212 """Scans for 11mc-capable WiFi networks using wifi gscan. 213 214 Networks are selected based on self.network_selector. 215 216 Returns: 217 A list of networks that have RTTResponders. 218 """ 219 s = { 220 "reportEvents" : WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT, 221 "band": WifiEnums.WIFI_BAND_BOTH, 222 "periodInMs": 10000, 223 "numBssidsPerScan": 32 224 } 225 idx = wutils.start_wifi_single_scan(self.android_devices[0], s)["Index"] 226 self.log.info("Scan index is %d" % idx) 227 event_name = "WifiScannerScan%donFullResult" % idx 228 def condition(event): 229 nw = event["data"]["Results"][0] 230 return self.network_selector(nw) 231 rtt_networks = [] 232 try: 233 for i in range(len(self.visible_networks)): 234 event = self.dut.ed.wait_for_event(event_name, condition, 30) 235 rtt_networks.append(event["data"]["Results"][0]) 236 self.log.info("Waiting for gscan to finish.") 237 event_name = "WifiScannerScan%donResults" % idx 238 event = self.dut.ed.pop_event(event_name, 300) 239 total_network_cnt = len(event["data"]["Results"][0]["ScanResults"]) 240 self.log.info("Found %d networks in total." % total_network_cnt) 241 self.log.debug(rtt_networks) 242 return rtt_networks 243 except queue.Empty: 244 self.log.error("Timed out waiting for gscan result.") 245 246 def process_rtt_events(self, events): 247 """Processes rtt ranging events. 248 249 Validates RTT event types. 250 Validates RTT response status and measured RTT values. 251 Enforces success rate. 252 253 Args: 254 events: A list of callback results from RTT ranging. 255 """ 256 total = aborted = failure = invalid = out_of_range = 0 257 for e in events: 258 if e["name"].endswith("onAborted"): 259 aborted += 1 260 if e["name"].endswith("onFailure"): 261 failure += 1 262 if e["name"].endswith("onSuccess"): 263 results = e["data"]["Results"] 264 for r in results: 265 total += 1 266 # Status needs to be "success". 267 status = r["status"] 268 if status != Rtt.STATUS_SUCCESS: 269 self.log.warning("Got error status %d." % status) 270 invalid += 1 271 continue 272 # RTT value should be positive. 273 value = r["rtt"] 274 if value <= 0: 275 self.log.warning("Got error RTT value %d." % value) 276 invalid += 1 277 continue 278 # Vadlidate values in successful responses. 279 acd = self.actual_distance 280 margin = r[RttParam.margin] 281 # If the distance is >= 0, check distance only. 282 d = r["distance"] / 100.0 283 if d > 0: 284 # Distance should be in acceptable range. 285 is_d_valid = (acd - margin) <= d <= acd + (margin) 286 if not is_d_valid: 287 self.log.warning(("Reported distance %.2fm is out of the" 288 " acceptable range %.2f%.2fm.") % (d, acd, margin)) 289 out_of_range += 1 290 continue 291 # Check if the RTT value is in range. 292 d = (value / 2) / 1E10 * wutils.SPEED_OF_LIGHT 293 is_rtt_valid = (acd - margin) <= d <= (acd + margin) 294 if not is_rtt_valid: 295 self.log.warning( 296 ("Distance calculated from RTT value %d - %.2fm is " 297 "out of the acceptable range %.2f%dm.") % ( 298 value, d, acd, margin)) 299 out_of_range += 1 300 continue 301 # Check if the RSSI value is in range. 302 rssi = r["rssi"] 303 # average rssi in 0.5dB steps, e.g. 143 implies -71.5dB, 304 # so the valid range is 0 to 200 305 is_rssi_valid = 0 <= rssi <= 200 306 if not is_rssi_valid: 307 self.log.warning(("Reported RSSI %d is out of the" 308 " acceptable range 0-200") % rssi) 309 out_of_range += 1 310 continue 311 self.log.info(("Processed %d RTT events. %d aborted, %s failed. Among" 312 " the %d responses in successful callbacks, %s are invalid, %s has" 313 " RTT values that are out of range.") % ( 314 len(events), aborted, failure, total, invalid, out_of_range)) 315 asserts.assert_true(total > 0, "No RTT response received.") 316 # Percentage of responses that are valid should be >= 90%. 317 valid_total = float(total - invalid) 318 valid_response_rate = valid_total / total 319 self.log.info("%.2f%% of the responses are valid." % ( 320 valid_response_rate * 100)) 321 asserts.assert_true(valid_response_rate >= 0.9, 322 "Valid response rate is below 90%%.") 323 # Among the valid responses, the percentage of having an in-range RTT 324 # value should be >= 67%. 325 valid_value_rate = (total - invalid - out_of_range) / valid_total 326 self.log.info("%.2f%% of valid responses have in-range RTT value" % ( 327 valid_value_rate * 100)) 328 msg = "In-range response rate is below 67%%." 329 asserts.assert_true(valid_value_rate >= 0.67, msg) 330 331 def scan_then_rtt_ranging_stress_logic(self, scan_func): 332 """Test logic to scan then do rtt ranging based on the scan results. 333 334 Steps: 335 1. Start scan and get scan results. 336 2. Filter out the networks that support rtt in scan results. 337 3. Start rtt ranging against those networks that support rtt. 338 4. Repeat 339 5. Process RTT events. 340 341 Args: 342 scan_func: A function that does a wifi scan and only returns the 343 networks that support rtt in the scan results. 344 345 Returns: 346 True if rtt behaves as expected, False otherwise. 347 """ 348 total = self.stress_num 349 failed = 0 350 all_results = [] 351 for i in range(total): 352 self.log.info("Iteration %d" % i) 353 rtt_networks = scan_func() 354 if not rtt_networks: 355 self.log.warning("Found no rtt network, skip this iteration.") 356 failed += 1 357 continue 358 self.log.debug("Found rtt networks:%s" % rtt_networks) 359 rtt_params = [] 360 for rn in rtt_networks: 361 rtt_params.append(self.rtt_config_from_scan_result(rn)) 362 results = self.get_rtt_results(rtt_params) 363 if results: 364 self.log.debug(results) 365 all_results += results 366 self.process_rtt_events(all_results) 367 368 def rtt_config_from_scan_result(self, scan_result): 369 """Creates an Rtt configuration based on the scan result of a network. 370 """ 371 scan_result_channel_width_to_rtt = { 372 ScanResult.CHANNEL_WIDTH_20MHZ: RttBW.BW_20_SUPPORT, 373 ScanResult.CHANNEL_WIDTH_40MHZ: RttBW.BW_40_SUPPORT, 374 ScanResult.CHANNEL_WIDTH_80MHZ: RttBW.BW_80_SUPPORT, 375 ScanResult.CHANNEL_WIDTH_160MHZ: RttBW.BW_160_SUPPORT, 376 ScanResult.CHANNEL_WIDTH_80MHZ_PLUS_MHZ: RttBW.BW_160_SUPPORT 377 } 378 p = {} 379 freq = scan_result[RttParam.frequency] 380 p[RttParam.frequency] = freq 381 p[RttParam.BSSID] = scan_result[WifiEnums.BSSID_KEY] 382 if freq > 5000: 383 p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT 384 else: 385 p[RttParam.preamble] = RttPreamble.PREAMBLE_HT 386 cf0 = scan_result[RttParam.center_freq0] 387 if cf0 > 0: 388 p[RttParam.center_freq0] = cf0 389 cf1 = scan_result[RttParam.center_freq1] 390 if cf1 > 0: 391 p[RttParam.center_freq1] = cf1 392 cw = scan_result["channelWidth"] 393 p[RttParam.channel_width] = cw 394 p[RttParam.bandwidth] = scan_result_channel_width_to_rtt[cw] 395 if scan_result["is80211McRTTResponder"]: 396 p[RttParam.request_type] = RttType.TYPE_TWO_SIDED 397 else: 398 p[RttParam.request_type] = RttType.TYPE_ONE_SIDED 399 return p 400 401 """Tests""" 402 def test_invalid_params(self): 403 """Tests the sanity check function in RttManager. 404 """ 405 param_list = [ 406 {RttParam.device_type: 3}, 407 {RttParam.device_type: 1, RttParam.request_type:3}, 408 { 409 RttParam.device_type: 1, 410 RttParam.request_type:1, 411 RttParam.BSSID: None 412 }, 413 {RttParam.BSSID: "xxxxxxxx", RttParam.number_burst: 1}, 414 {RttParam.number_burst: 0, RttParam.num_samples_per_burst: -1}, 415 {RttParam.num_samples_per_burst:32}, 416 { 417 RttParam.num_samples_per_burst:5, 418 RttParam.num_retries_per_measurement_frame: -1 419 }, 420 {RttParam.num_retries_per_measurement_frame: 4 }, 421 { 422 RttParam.num_retries_per_measurement_frame: 2, 423 RttParam.num_retries_per_FTMR: -1 424 }, 425 {RttParam.num_retries_per_FTMR: 4} 426 ] 427 for param in param_list: 428 self.invalid_params_logic(param) 429 return True 430 431 def test_support_check(self): 432 """No device supports device-to-device RTT; only shamu and volantis 433 devices support device-to-ap RTT. 434 """ 435 model = acts.utils.trim_model_name(self.dut.model) 436 asserts.assert_true(not self.dut.droid.wifiIsDeviceToDeviceRttSupported(), 437 "Device to device is not supposed to be supported.") 438 if any([model in m for m in self.support_models]): 439 asserts.assert_true(self.dut.droid.wifiIsDeviceToApRttSupported(), 440 "%s should support device-to-ap RTT." % model) 441 self.log.info("%s supports device-to-ap RTT as expected." % model) 442 else: 443 asserts.assert_true(not self.dut.droid.wifiIsDeviceToApRttSupported(), 444 "%s should not support device-to-ap RTT." % model) 445 self.log.info(("%s does not support device-to-ap RTT as expected." 446 ) % model) 447 asserts.abort_class("Device %s does not support RTT, abort." % model) 448 return True 449 450 def test_capability_check(self): 451 """Checks the capabilities params are reported as expected. 452 """ 453 caps = self.dut.droid.wifiRttGetCapabilities() 454 asserts.assert_true(caps, "Unable to get rtt capabilities.") 455 self.log.debug("Got rtt capabilities %s" % caps) 456 model = acts.utils.trim_model_name(self.dut.model) 457 asserts.assert_true(model in self.rtt_cap_table, "Unknown model %s" % model) 458 expected_caps = self.rtt_cap_table[model] 459 for k, v in expected_caps.items(): 460 asserts.assert_true(k in caps, "%s missing in capabilities." % k) 461 asserts.assert_true(v == caps[k], 462 "Expected %s for %s, got %s." % (v, k, caps[k])) 463 return True 464 465 def test_discovery(self): 466 """Make sure all the expected 11mc BSSIDs are discovered properly, and 467 they are all reported as 802.11mc Rtt Responder. 468 469 Procedures: 470 1. Scan for wifi networks. 471 472 Expect: 473 All the RTT networks show up in scan results and their 474 "is80211McRTTResponder" is True. 475 All the non-RTT networks show up in scan results and their 476 "is80211McRTTResponder" is False. 477 """ 478 wutils.start_wifi_connection_scan(self.dut) 479 scan_results = self.dut.droid.wifiGetScanResults() 480 self.log.debug(scan_results) 481 for n in visible_networks: 482 asserts.assert_true(wutils.match_networks(n, scan_results), 483 "Network %s was not discovered properly." % n) 484 return True 485 486 def test_missing_bssid(self): 487 """Start Rtt ranging with a config that does not have BSSID set. 488 Should not get onSuccess. 489 """ 490 p = {} 491 p[RttParam.request_type] = RttType.TYPE_TWO_SIDED 492 p[RttParam.device_type] = RttPeerType.PEER_TYPE_AP 493 p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT 494 p[RttParam.bandwidth] = RttBW.BW_80_SUPPORT 495 p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key] 496 p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0] 497 results = self.get_rtt_results([p]) 498 asserts.assert_true(results, "Did not get any result.") 499 self.log.info(pprint.pformat(results)) 500 501 def test_rtt_ranging_single_AP_stress(self): 502 """Stress test for Rtt against one AP. 503 504 Steps: 505 1. Do RTT ranging against the self.vht80_5g BSSID. 506 2. Repeat self.stress_num times. 507 3. Verify RTT results. 508 """ 509 p = {} 510 p[RttParam.request_type] = RttType.TYPE_TWO_SIDED 511 p[RttParam.device_type] = RttPeerType.PEER_TYPE_AP 512 p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT 513 p[RttParam.bandwidth] = RttBW.BW_80_SUPPORT 514 p[RttParam.BSSID] = self.vht80_5g[WifiEnums.BSSID_KEY] 515 p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key] 516 p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0] 517 p[RttParam.channel_width] = ScanResult.CHANNEL_WIDTH_80MHZ 518 all_results = [] 519 for i in range(self.stress_num): 520 self.log.info("RTT Ranging iteration %d" % (i + 1)) 521 results = self.get_rtt_results([p]) 522 if results: 523 all_results += results 524 else: 525 self.log.warning("Did not get result for iteration %d." % i) 526 frate = self.process_rtt_events(all_results) 527 528 def test_regular_scan_then_rtt_ranging_stress(self): 529 """Stress test for regular scan then start rtt ranging against the RTT 530 compatible networks found by the scan. 531 532 Steps: 533 1. Start a WiFi connection scan. 534 2. Get scan results. 535 3. Find all the 11mc capable BSSIDs and choose the ones to use 536 (self.network_selector) 537 4. Do RTT ranging against the selected BSSIDs, with the info from 538 the scan results. 539 5. Repeat self.stress_num times. 540 6. Verify RTT results. 541 """ 542 scan_func = self.regular_scan_for_rtt_networks 543 self.scan_then_rtt_ranging_stress_logic(scan_func) 544 545 def test_gscan_then_rtt_ranging_stress(self): 546 """Stress test for gscan then start rtt ranging against the RTT 547 compatible networks found by the scan. 548 549 Steps: 550 1. Start a WifiScanner single shot scan on all channels. 551 2. Wait for full scan results of the expected 11mc capable BSSIDs. 552 3. Wait for single shot scan to finish on all channels. 553 4. Do RTT ranging against the selected BSSIDs, with the info from 554 the scan results. 555 5. Repeat self.stress_num times. 556 6. Verify RTT results. 557 """ 558 scan_func = self.gscan_for_rtt_networks 559 self.scan_then_rtt_ranging_stress_logic(scan_func) 560