Home | History | Annotate | Download | only in wifi
      1 #!/usr/bin/env python3.4
      2 #
      3 # Copyright (C) 2016 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
      6 # use this file except in compliance with the License. You may obtain a copy of
      7 # the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     14 # License for the specific language governing permissions and limitations under
     15 # the License.
     16 
     17 import pprint
     18 import queue
     19 
     20 import acts.base_test
     21 import acts.test_utils.wifi.wifi_test_utils as wutils
     22 import acts.utils
     23 from acts import asserts
     24 from acts.controllers.android import SL4AAPIError
     25 
     26 WifiEnums = wutils.WifiEnums
     27 
     28 # Macros for RttParam keywords
     29 RttParam = WifiEnums.RttParam
     30 # Macros for RttManager
     31 Rtt = WifiEnums.Rtt
     32 RttBW = WifiEnums.RttBW
     33 RttPreamble = WifiEnums.RttPreamble
     34 RttPeerType = WifiEnums.RttPeerType
     35 RttType = WifiEnums.RttType
     36 
     37 ScanResult = WifiEnums.ScanResult
     38 RTT_MARGIN_OF_ERROR = WifiEnums.RTT_MARGIN_OF_ERROR
     39 
     40 class WifiRTTRangingError (Exception):
     41      """Error in WifiScanner Rtt."""
     42 
     43 class WifiRttManagerTest(acts.base_test.BaseTestClass):
     44     """Tests for wifi's RttManager APIs."""
     45     tests = None
     46     MAX_RTT_AP = 10
     47 
     48     def __init__(self, controllers):
     49         acts.base_test.BaseTestClass.__init__(self, controllers)
     50         self.tests = (
     51             "test_support_check",
     52             "test_invalid_params",
     53             "test_capability_check",
     54             "test_rtt_ranging_single_AP_stress",
     55             "test_regular_scan_then_rtt_ranging_stress",
     56             "test_gscan_then_rtt_ranging_stress"
     57         )
     58 
     59     def setup_class(self):
     60         self.dut = self.android_devices[0]
     61         wutils.wifi_test_device_init(self.dut)
     62         required_params = (
     63             "support_models",
     64             "stress_num",
     65             "vht80_5g",
     66             "actual_distance"
     67         )
     68         self.unpack_userparams(required_params)
     69         asserts.assert_true(self.actual_distance >= 5,
     70             "Actual distance should be no shorter than 5 meters.")
     71         self.visible_networks = (
     72             self.vht80_5g,
     73         )
     74         self.default_rtt_params = {
     75             RttParam.request_type: RttType.TYPE_TWO_SIDED,
     76             RttParam.device_type: RttPeerType.PEER_TYPE_AP,
     77             RttParam.preamble: RttPreamble.PREAMBLE_HT,
     78             RttParam.bandwidth: RttBW.BW_80_SUPPORT
     79         }
     80         # Expected capability for devices that don't support RTT.
     81         rtt_cap_neg = {
     82             'lcrSupported': False,
     83             'bwSupported': 0,
     84             'twoSided11McRttSupported': False,
     85             'preambleSupported': 0,
     86             'oneSidedRttSupported': False,
     87             'lciSupported': False
     88         }
     89         rtt_cap_shamu = {
     90             'lcrSupported': False,
     91             'bwSupported': 0x1C,
     92             'twoSided11McRttSupported': True,
     93             'preambleSupported': 6,
     94             'oneSidedRttSupported': False,
     95             'lciSupported': False
     96         }
     97         rtt_cap_bullhead = {
     98             'lcrSupported': True,
     99             'bwSupported': 0x1C,
    100             'twoSided11McRttSupported': True,
    101             'preambleSupported': 7,
    102             'oneSidedRttSupported': True,
    103             'lciSupported': True
    104         }
    105         rtt_cap_angler = {
    106             'lcrSupported': True,
    107             'bwSupported': 0x1C,
    108             'twoSided11McRttSupported': True,
    109             'preambleSupported': 6,
    110             'oneSidedRttSupported': False,
    111             'lciSupported': True
    112         }
    113         self.rtt_cap_table = {
    114             "hammerhead": rtt_cap_neg,
    115             "shamu": rtt_cap_shamu,
    116             "volantis": rtt_cap_neg,
    117             "volantisg": rtt_cap_neg,
    118             "bullhead": rtt_cap_bullhead,
    119             "angler": rtt_cap_angler
    120         }
    121 
    122     """Helper Functions"""
    123     def invalid_params_logic(self, rtt_params):
    124         try:
    125             self.dut.droid.wifiRttStartRanging([rtt_params])
    126         except SL4AAPIError as e:
    127             e_str = str(e)
    128             asserts.assert_true("IllegalArgumentException" in e_str,
    129                 "Missing IllegalArgumentException in %s." % e_str)
    130             msg = "Got expected exception with invalid param %s." % rtt_params
    131             self.log.info(msg)
    132 
    133     def get_rtt_results(self, rtt_params):
    134         """Starts RTT ranging and get results.
    135 
    136         Args:
    137             rtt_params: A list of dicts each representing an RttParam.
    138 
    139         Returns:
    140             Rtt ranging results.
    141         """
    142         self.log.debug("Start ranging with:\n%s" % pprint.pformat(rtt_params))
    143         idx = self.dut.droid.wifiRttStartRanging(rtt_params)
    144         event = None
    145         try:
    146             event = self.dut.ed.pop_events("WifiRttRanging%d" % idx, 30)
    147             if event[0]["name"].endswith("onSuccess"):
    148                 results = event[0]["data"]["Results"]
    149                 result_len = len(results)
    150                 param_len = len(rtt_params)
    151                 asserts.assert_true(result_len == param_len,
    152                     "Expected %d results, got %d." % (param_len, result_len))
    153                 # Add acceptable margin of error to results, which will be used
    154                 # during result processing.
    155                 for i, r in enumerate(results):
    156                     bw_mode = rtt_params[i][RttParam.bandwidth]
    157                     r[RttParam.margin] = RTT_MARGIN_OF_ERROR[bw_mode]
    158             self.log.debug(pprint.pformat(event))
    159             return event
    160         except queue.Empty:
    161             self.log.error("Waiting for RTT event timed out.")
    162             return None
    163 
    164     def network_selector(self, network_info):
    165         """Decides if a network should be used for rtt ranging.
    166 
    167         There are a few conditions:
    168         1. This network supports 80211mc.
    169         2. This network's info matches certain conditions.
    170 
    171         This is added to better control which networks to range against instead
    172         of blindly use all 80211mc networks in air.
    173 
    174         Args:
    175             network_info: A dict representing a WiFi network.
    176 
    177         Returns:
    178             True if the input network should be used for ranging, False
    179             otherwise.
    180         """
    181         target_params = {
    182             "is80211McRTTResponder": True,
    183             WifiEnums.BSSID_KEY: self.vht80_5g[WifiEnums.BSSID_KEY],
    184         }
    185         for k, v in target_params.items():
    186             if k not in network_info:
    187                 return False
    188             if type(network_info[k]) is str:
    189                 network_info[k] = network_info[k].lower()
    190                 v = v.lower()
    191             if network_info[k] != v:
    192                 return False
    193         return True
    194 
    195     def regular_scan_for_rtt_networks(self):
    196         """Scans for 11mc-capable WiFi networks using regular wifi scan.
    197 
    198         Networks are selected based on self.network_selector.
    199 
    200         Returns:
    201             A list of networks that have RTTResponders.
    202         """
    203         wutils.start_wifi_connection_scan(self.dut)
    204         networks = self.dut.droid.wifiGetScanResults()
    205         rtt_networks = []
    206         for nw in networks:
    207             if self.network_selector(nw):
    208                 rtt_networks.append(nw)
    209         return rtt_networks
    210 
    211     def gscan_for_rtt_networks(self):
    212         """Scans for 11mc-capable WiFi networks using wifi gscan.
    213 
    214         Networks are selected based on self.network_selector.
    215 
    216         Returns:
    217             A list of networks that have RTTResponders.
    218         """
    219         s = {
    220             "reportEvents" : WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT,
    221             "band": WifiEnums.WIFI_BAND_BOTH,
    222             "periodInMs": 10000,
    223             "numBssidsPerScan": 32
    224         }
    225         idx = wutils.start_wifi_single_scan(self.android_devices[0], s)["Index"]
    226         self.log.info("Scan index is %d" % idx)
    227         event_name = "WifiScannerScan%donFullResult" % idx
    228         def condition(event):
    229             nw = event["data"]["Results"][0]
    230             return self.network_selector(nw)
    231         rtt_networks = []
    232         try:
    233             for i in range(len(self.visible_networks)):
    234                 event = self.dut.ed.wait_for_event(event_name, condition, 30)
    235                 rtt_networks.append(event["data"]["Results"][0])
    236             self.log.info("Waiting for gscan to finish.")
    237             event_name = "WifiScannerScan%donResults" % idx
    238             event = self.dut.ed.pop_event(event_name, 300)
    239             total_network_cnt = len(event["data"]["Results"][0]["ScanResults"])
    240             self.log.info("Found %d networks in total." % total_network_cnt)
    241             self.log.debug(rtt_networks)
    242             return rtt_networks
    243         except queue.Empty:
    244             self.log.error("Timed out waiting for gscan result.")
    245 
    246     def process_rtt_events(self, events):
    247         """Processes rtt ranging events.
    248 
    249         Validates RTT event types.
    250         Validates RTT response status and measured RTT values.
    251         Enforces success rate.
    252 
    253         Args:
    254             events: A list of callback results from RTT ranging.
    255         """
    256         total = aborted = failure = invalid = out_of_range = 0
    257         for e in events:
    258             if e["name"].endswith("onAborted"):
    259                 aborted += 1
    260             if e["name"].endswith("onFailure"):
    261                 failure += 1
    262             if e["name"].endswith("onSuccess"):
    263                 results = e["data"]["Results"]
    264                 for r in results:
    265                     total += 1
    266                     # Status needs to be "success".
    267                     status = r["status"]
    268                     if status != Rtt.STATUS_SUCCESS:
    269                         self.log.warning("Got error status %d." % status)
    270                         invalid += 1
    271                         continue
    272                     # RTT value should be positive.
    273                     value = r["rtt"]
    274                     if value <= 0:
    275                         self.log.warning("Got error RTT value %d." % value)
    276                         invalid += 1
    277                         continue
    278                     # Vadlidate values in successful responses.
    279                     acd = self.actual_distance
    280                     margin = r[RttParam.margin]
    281                     # If the distance is >= 0, check distance only.
    282                     d = r["distance"] / 100.0
    283                     if d > 0:
    284                         # Distance should be in acceptable range.
    285                         is_d_valid = (acd - margin) <= d <= acd + (margin)
    286                         if not is_d_valid:
    287                             self.log.warning(("Reported distance %.2fm is out of the"
    288                                 " acceptable range %.2f%.2fm.") % (d, acd, margin))
    289                             out_of_range += 1
    290                         continue
    291                     # Check if the RTT value is in range.
    292                     d = (value / 2) / 1E10 * wutils.SPEED_OF_LIGHT
    293                     is_rtt_valid = (acd - margin) <= d <= (acd + margin)
    294                     if not is_rtt_valid:
    295                         self.log.warning(
    296                            ("Distance calculated from RTT value %d - %.2fm is "
    297                             "out of the acceptable range %.2f%dm.") % (
    298                             value, d, acd, margin))
    299                         out_of_range += 1
    300                         continue
    301                     # Check if the RSSI value is in range.
    302                     rssi = r["rssi"]
    303                     # average rssi in 0.5dB steps, e.g. 143 implies -71.5dB,
    304                     # so the valid range is 0 to 200
    305                     is_rssi_valid = 0 <= rssi <= 200
    306                     if not is_rssi_valid:
    307                         self.log.warning(("Reported RSSI %d is out of the"
    308                                 " acceptable range 0-200") % rssi)
    309                         out_of_range += 1
    310                         continue
    311         self.log.info(("Processed %d RTT events. %d aborted, %s failed. Among"
    312             " the %d responses in successful callbacks, %s are invalid, %s has"
    313             " RTT values that are out of range.") % (
    314             len(events), aborted, failure, total, invalid, out_of_range))
    315         asserts.assert_true(total > 0, "No RTT response received.")
    316         # Percentage of responses that are valid should be >= 90%.
    317         valid_total = float(total - invalid)
    318         valid_response_rate = valid_total / total
    319         self.log.info("%.2f%% of the responses are valid." % (
    320                       valid_response_rate * 100))
    321         asserts.assert_true(valid_response_rate >= 0.9,
    322                          "Valid response rate is below 90%%.")
    323         # Among the valid responses, the percentage of having an in-range RTT
    324         # value should be >= 67%.
    325         valid_value_rate = (total - invalid - out_of_range) / valid_total
    326         self.log.info("%.2f%% of valid responses have in-range RTT value" % (
    327                       valid_value_rate * 100))
    328         msg = "In-range response rate is below 67%%."
    329         asserts.assert_true(valid_value_rate >= 0.67, msg)
    330 
    331     def scan_then_rtt_ranging_stress_logic(self, scan_func):
    332         """Test logic to scan then do rtt ranging based on the scan results.
    333 
    334         Steps:
    335         1. Start scan and get scan results.
    336         2. Filter out the networks that support rtt in scan results.
    337         3. Start rtt ranging against those networks that support rtt.
    338         4. Repeat
    339         5. Process RTT events.
    340 
    341         Args:
    342             scan_func: A function that does a wifi scan and only returns the
    343                 networks that support rtt in the scan results.
    344 
    345         Returns:
    346             True if rtt behaves as expected, False otherwise.
    347         """
    348         total = self.stress_num
    349         failed = 0
    350         all_results = []
    351         for i in range(total):
    352             self.log.info("Iteration %d" % i)
    353             rtt_networks = scan_func()
    354             if not rtt_networks:
    355                 self.log.warning("Found no rtt network, skip this iteration.")
    356                 failed += 1
    357                 continue
    358             self.log.debug("Found rtt networks:%s" % rtt_networks)
    359             rtt_params = []
    360             for rn in rtt_networks:
    361                 rtt_params.append(self.rtt_config_from_scan_result(rn))
    362             results = self.get_rtt_results(rtt_params)
    363             if results:
    364                 self.log.debug(results)
    365                 all_results += results
    366         self.process_rtt_events(all_results)
    367 
    368     def rtt_config_from_scan_result(self, scan_result):
    369         """Creates an Rtt configuration based on the scan result of a network.
    370         """
    371         scan_result_channel_width_to_rtt = {
    372             ScanResult.CHANNEL_WIDTH_20MHZ: RttBW.BW_20_SUPPORT,
    373             ScanResult.CHANNEL_WIDTH_40MHZ: RttBW.BW_40_SUPPORT,
    374             ScanResult.CHANNEL_WIDTH_80MHZ: RttBW.BW_80_SUPPORT,
    375             ScanResult.CHANNEL_WIDTH_160MHZ: RttBW.BW_160_SUPPORT,
    376             ScanResult.CHANNEL_WIDTH_80MHZ_PLUS_MHZ: RttBW.BW_160_SUPPORT
    377         }
    378         p = {}
    379         freq = scan_result[RttParam.frequency]
    380         p[RttParam.frequency] = freq
    381         p[RttParam.BSSID] = scan_result[WifiEnums.BSSID_KEY]
    382         if freq > 5000:
    383             p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT
    384         else:
    385             p[RttParam.preamble] = RttPreamble.PREAMBLE_HT
    386         cf0 = scan_result[RttParam.center_freq0]
    387         if cf0 > 0:
    388             p[RttParam.center_freq0] = cf0
    389         cf1 = scan_result[RttParam.center_freq1]
    390         if cf1 > 0:
    391             p[RttParam.center_freq1] = cf1
    392         cw = scan_result["channelWidth"]
    393         p[RttParam.channel_width] = cw
    394         p[RttParam.bandwidth] = scan_result_channel_width_to_rtt[cw]
    395         if scan_result["is80211McRTTResponder"]:
    396             p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
    397         else:
    398             p[RttParam.request_type] = RttType.TYPE_ONE_SIDED
    399         return p
    400 
    401     """Tests"""
    402     def test_invalid_params(self):
    403         """Tests the sanity check function in RttManager.
    404         """
    405         param_list = [
    406             {RttParam.device_type: 3},
    407             {RttParam.device_type: 1, RttParam.request_type:3},
    408             {
    409                 RttParam.device_type: 1,
    410                 RttParam.request_type:1,
    411                 RttParam.BSSID: None
    412             },
    413             {RttParam.BSSID: "xxxxxxxx", RttParam.number_burst: 1},
    414             {RttParam.number_burst: 0, RttParam.num_samples_per_burst: -1},
    415             {RttParam.num_samples_per_burst:32},
    416             {
    417                 RttParam.num_samples_per_burst:5,
    418                 RttParam.num_retries_per_measurement_frame: -1
    419             },
    420             {RttParam.num_retries_per_measurement_frame: 4 },
    421             {
    422                 RttParam.num_retries_per_measurement_frame: 2,
    423                 RttParam.num_retries_per_FTMR: -1
    424             },
    425             {RttParam.num_retries_per_FTMR: 4}
    426         ]
    427         for param in param_list:
    428             self.invalid_params_logic(param)
    429         return True
    430 
    431     def test_support_check(self):
    432         """No device supports device-to-device RTT; only shamu and volantis
    433         devices support device-to-ap RTT.
    434         """
    435         model = acts.utils.trim_model_name(self.dut.model)
    436         asserts.assert_true(not self.dut.droid.wifiIsDeviceToDeviceRttSupported(),
    437             "Device to device is not supposed to be supported.")
    438         if any([model in m for m in self.support_models]):
    439             asserts.assert_true(self.dut.droid.wifiIsDeviceToApRttSupported(),
    440                 "%s should support device-to-ap RTT." % model)
    441             self.log.info("%s supports device-to-ap RTT as expected." % model)
    442         else:
    443             asserts.assert_true(not self.dut.droid.wifiIsDeviceToApRttSupported(),
    444                 "%s should not support device-to-ap RTT." % model)
    445             self.log.info(("%s does not support device-to-ap RTT as expected."
    446                 ) % model)
    447             asserts.abort_class("Device %s does not support RTT, abort." % model)
    448         return True
    449 
    450     def test_capability_check(self):
    451         """Checks the capabilities params are reported as expected.
    452         """
    453         caps = self.dut.droid.wifiRttGetCapabilities()
    454         asserts.assert_true(caps, "Unable to get rtt capabilities.")
    455         self.log.debug("Got rtt capabilities %s" % caps)
    456         model = acts.utils.trim_model_name(self.dut.model)
    457         asserts.assert_true(model in self.rtt_cap_table, "Unknown model %s" % model)
    458         expected_caps = self.rtt_cap_table[model]
    459         for k, v in expected_caps.items():
    460             asserts.assert_true(k in caps, "%s missing in capabilities." % k)
    461             asserts.assert_true(v == caps[k],
    462                 "Expected %s for %s, got %s." % (v, k, caps[k]))
    463         return True
    464 
    465     def test_discovery(self):
    466         """Make sure all the expected 11mc BSSIDs are discovered properly, and
    467         they are all reported as 802.11mc Rtt Responder.
    468 
    469         Procedures:
    470             1. Scan for wifi networks.
    471 
    472         Expect:
    473             All the RTT networks show up in scan results and their
    474             "is80211McRTTResponder" is True.
    475             All the non-RTT networks show up in scan results and their
    476             "is80211McRTTResponder" is False.
    477         """
    478         wutils.start_wifi_connection_scan(self.dut)
    479         scan_results = self.dut.droid.wifiGetScanResults()
    480         self.log.debug(scan_results)
    481         for n in visible_networks:
    482             asserts.assert_true(wutils.match_networks(n, scan_results),
    483                 "Network %s was not discovered properly." % n)
    484         return True
    485 
    486     def test_missing_bssid(self):
    487         """Start Rtt ranging with a config that does not have BSSID set.
    488         Should not get onSuccess.
    489         """
    490         p = {}
    491         p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
    492         p[RttParam.device_type]  = RttPeerType.PEER_TYPE_AP
    493         p[RttParam.preamble]     = RttPreamble.PREAMBLE_VHT
    494         p[RttParam.bandwidth]    = RttBW.BW_80_SUPPORT
    495         p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key]
    496         p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0]
    497         results = self.get_rtt_results([p])
    498         asserts.assert_true(results, "Did not get any result.")
    499         self.log.info(pprint.pformat(results))
    500 
    501     def test_rtt_ranging_single_AP_stress(self):
    502         """Stress test for Rtt against one AP.
    503 
    504         Steps:
    505             1. Do RTT ranging against the self.vht80_5g BSSID.
    506             2. Repeat self.stress_num times.
    507             3. Verify RTT results.
    508         """
    509         p = {}
    510         p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
    511         p[RttParam.device_type]  = RttPeerType.PEER_TYPE_AP
    512         p[RttParam.preamble]     = RttPreamble.PREAMBLE_VHT
    513         p[RttParam.bandwidth]    = RttBW.BW_80_SUPPORT
    514         p[RttParam.BSSID] = self.vht80_5g[WifiEnums.BSSID_KEY]
    515         p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key]
    516         p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0]
    517         p[RttParam.channel_width] = ScanResult.CHANNEL_WIDTH_80MHZ
    518         all_results = []
    519         for i in range(self.stress_num):
    520             self.log.info("RTT Ranging iteration %d" % (i + 1))
    521             results = self.get_rtt_results([p])
    522             if results:
    523                 all_results += results
    524             else:
    525                 self.log.warning("Did not get result for iteration %d." % i)
    526         frate = self.process_rtt_events(all_results)
    527 
    528     def test_regular_scan_then_rtt_ranging_stress(self):
    529         """Stress test for regular scan then start rtt ranging against the RTT
    530         compatible networks found by the scan.
    531 
    532         Steps:
    533             1. Start a WiFi connection scan.
    534             2. Get scan results.
    535             3. Find all the 11mc capable BSSIDs and choose the ones to use
    536                (self.network_selector)
    537             4. Do RTT ranging against the selected BSSIDs, with the info from
    538                the scan results.
    539             5. Repeat self.stress_num times.
    540             6. Verify RTT results.
    541         """
    542         scan_func = self.regular_scan_for_rtt_networks
    543         self.scan_then_rtt_ranging_stress_logic(scan_func)
    544 
    545     def test_gscan_then_rtt_ranging_stress(self):
    546         """Stress test for gscan then start rtt ranging against the RTT
    547         compatible networks found by the scan.
    548 
    549         Steps:
    550             1. Start a WifiScanner single shot scan on all channels.
    551             2. Wait for full scan results of the expected 11mc capable BSSIDs.
    552             3. Wait for single shot scan to finish on all channels.
    553             4. Do RTT ranging against the selected BSSIDs, with the info from
    554                the scan results.
    555             5. Repeat self.stress_num times.
    556             6. Verify RTT results.
    557         """
    558         scan_func = self.gscan_for_rtt_networks
    559         self.scan_then_rtt_ranging_stress_logic(scan_func)
    560