1 #!/usr/bin/python3.4 2 # 3 # Copyright 2017 - The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 17 import queue 18 import time 19 20 from acts import asserts 21 from acts.test_decorators import test_tracker_info 22 from acts.test_utils.wifi.aware import aware_const as aconsts 23 from acts.test_utils.wifi.aware import aware_test_utils as autils 24 from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest 25 from acts.test_utils.wifi.rtt import rtt_const as rconsts 26 from acts.test_utils.wifi.rtt import rtt_test_utils as rutils 27 from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest 28 29 30 class RangeAwareTest(AwareBaseTest, RttBaseTest): 31 """Test class for RTT ranging to Wi-Fi Aware peers""" 32 SERVICE_NAME = "GoogleTestServiceXY" 33 34 # Number of RTT iterations 35 NUM_ITER = 10 36 37 # Time gap (in seconds) between iterations 38 TIME_BETWEEN_ITERATIONS = 0 39 40 # Time gap (in seconds) when switching between Initiator and Responder 41 TIME_BETWEEN_ROLES = 4 42 43 def __init__(self, controllers): 44 AwareBaseTest.__init__(self, controllers) 45 RttBaseTest.__init__(self, controllers) 46 47 def setup_test(self): 48 """Manual setup here due to multiple inheritance: explicitly execute the 49 setup method from both parents.""" 50 AwareBaseTest.setup_test(self) 51 RttBaseTest.setup_test(self) 52 53 def teardown_test(self): 54 """Manual teardown here due to multiple inheritance: explicitly execute the 55 teardown method from both parents.""" 56 AwareBaseTest.teardown_test(self) 57 RttBaseTest.teardown_test(self) 58 59 ############################################################################# 60 61 def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None): 62 """Perform single RTT measurement, using Aware, from the Initiator DUT to 63 a Responder. The RTT Responder can be specified using its MAC address 64 (obtained using out- of-band discovery) or its Peer ID (using Aware 65 discovery). 66 67 Args: 68 init_dut: RTT Initiator device 69 resp_mac: MAC address of the RTT Responder device 70 resp_peer_id: Peer ID of the RTT Responder device 71 """ 72 asserts.assert_true(resp_mac is not None or resp_peer_id is not None, 73 "One of the Responder specifications (MAC or Peer ID)" 74 " must be provided!") 75 if resp_mac is not None: 76 id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac) 77 else: 78 id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id) 79 try: 80 event = init_dut.ed.pop_event(rutils.decorate_event( 81 rconsts.EVENT_CB_RANGING_ON_RESULT, id), rutils.EVENT_TIMEOUT) 82 result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0] 83 if resp_mac is not None: 84 rutils.validate_aware_mac_result(result, resp_mac, "DUT") 85 else: 86 rutils.validate_aware_peer_id_result(result, resp_peer_id, "DUT") 87 return result 88 except queue.Empty: 89 return None 90 91 def run_rtt_ib_discovery_set(self, do_both_directions, iter_count, 92 time_between_iterations, time_between_roles): 93 """Perform a set of RTT measurements, using in-band (Aware) discovery. 94 95 Args: 96 do_both_directions: False - perform all measurements in one direction, 97 True - perform 2 measurements one in both directions. 98 iter_count: Number of measurements to perform. 99 time_between_iterations: Number of seconds to wait between iterations. 100 time_between_roles: Number of seconds to wait when switching between 101 Initiator and Responder roles (only matters if 102 do_both_directions=True). 103 104 Returns: a list of the events containing the RTT results (or None for a 105 failed measurement). If both directions are tested then returns a list of 106 2 elements: one set for each direction. 107 """ 108 p_dut = self.android_devices[0] 109 s_dut = self.android_devices[1] 110 111 (p_id, s_id, p_disc_id, s_disc_id, 112 peer_id_on_sub, peer_id_on_pub) = autils.create_discovery_pair( 113 p_dut, 114 s_dut, 115 p_config=autils.add_ranging_to_pub(autils.create_discovery_config( 116 self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), True), 117 s_config=autils.add_ranging_to_pub(autils.create_discovery_config( 118 self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), True), 119 device_startup_offset=self.device_startup_offset, 120 msg_id=self.get_next_msg_id()) 121 122 resultsPS = [] 123 resultsSP = [] 124 for i in range(iter_count): 125 if i != 0 and time_between_iterations != 0: 126 time.sleep(time_between_iterations) 127 128 # perform RTT from pub -> sub 129 resultsPS.append( 130 self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub)) 131 132 if do_both_directions: 133 if time_between_roles != 0: 134 time.sleep(time_between_roles) 135 136 # perform RTT from sub -> pub 137 resultsSP.append( 138 self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub)) 139 140 return resultsPS if not do_both_directions else [resultsPS, resultsSP] 141 142 def run_rtt_oob_discovery_set(self, do_both_directions, iter_count, 143 time_between_iterations, time_between_roles): 144 """Perform a set of RTT measurements, using out-of-band discovery. 145 146 Args: 147 do_both_directions: False - perform all measurements in one direction, 148 True - perform 2 measurements one in both directions. 149 iter_count: Number of measurements to perform. 150 time_between_iterations: Number of seconds to wait between iterations. 151 time_between_roles: Number of seconds to wait when switching between 152 Initiator and Responder roles (only matters if 153 do_both_directions=True). 154 enable_ranging: True to enable Ranging, False to disable. 155 156 Returns: a list of the events containing the RTT results (or None for a 157 failed measurement). If both directions are tested then returns a list of 158 2 elements: one set for each direction. 159 """ 160 dut0 = self.android_devices[0] 161 dut1 = self.android_devices[1] 162 163 id0, mac0 = autils.attach_with_identity(dut0) 164 id1, mac1 = autils.attach_with_identity(dut1) 165 166 # wait for for devices to synchronize with each other - there are no other 167 # mechanisms to make sure this happens for OOB discovery (except retrying 168 # to execute the data-path request) 169 time.sleep(autils.WAIT_FOR_CLUSTER) 170 171 # start publisher(s) on the Responder(s) with ranging enabled 172 p_config = autils.add_ranging_to_pub( 173 autils.create_discovery_config(self.SERVICE_NAME, 174 aconsts.PUBLISH_TYPE_UNSOLICITED), 175 enable_ranging=True) 176 dut1.droid.wifiAwarePublish(id1, p_config) 177 autils.wait_for_event(dut1, aconsts.SESSION_CB_ON_PUBLISH_STARTED) 178 if do_both_directions: 179 dut0.droid.wifiAwarePublish(id0, p_config) 180 autils.wait_for_event(dut0, aconsts.SESSION_CB_ON_PUBLISH_STARTED) 181 182 results01 = [] 183 results10 = [] 184 for i in range(iter_count): 185 if i != 0 and time_between_iterations != 0: 186 time.sleep(time_between_iterations) 187 188 # perform RTT from dut0 -> dut1 189 results01.append( 190 self.run_rtt_discovery(dut0, resp_mac=mac1)) 191 192 if do_both_directions: 193 if time_between_roles != 0: 194 time.sleep(time_between_roles) 195 196 # perform RTT from dut1 -> dut0 197 results10.append( 198 self.run_rtt_discovery(dut1, resp_mac=mac0)) 199 200 return results01 if not do_both_directions else [results01, results10] 201 202 def verify_results(self, results, results_reverse_direction=None): 203 """Verifies the results of the RTT experiment. 204 205 Args: 206 results: List of RTT results. 207 results_reverse_direction: List of RTT results executed in the 208 reverse direction. Optional. 209 """ 210 stats = rutils.extract_stats(results, self.rtt_reference_distance_mm, 211 self.rtt_reference_distance_margin_mm, 212 self.rtt_min_expected_rssi_dbm) 213 stats_reverse_direction = None 214 if results_reverse_direction is not None: 215 stats_reverse_direction = rutils.extract_stats(results_reverse_direction, 216 self.rtt_reference_distance_mm, self.rtt_reference_distance_margin_mm, 217 self.rtt_min_expected_rssi_dbm) 218 self.log.debug("Stats: %s", stats) 219 if stats_reverse_direction is not None: 220 self.log.debug("Stats in reverse direction: %s", stats_reverse_direction) 221 222 extras = stats if stats_reverse_direction is None else { 223 "forward": stats, 224 "reverse": stats_reverse_direction} 225 226 asserts.assert_true(stats['num_no_results'] == 0, 227 "Missing (timed-out) results", extras=extras) 228 asserts.assert_false(stats['any_lci_mismatch'], 229 "LCI mismatch", extras=extras) 230 asserts.assert_false(stats['any_lcr_mismatch'], 231 "LCR mismatch", extras=extras) 232 asserts.assert_false(stats['invalid_num_attempted'], 233 "Invalid (0) number of attempts", extras=stats) 234 asserts.assert_false(stats['invalid_num_successful'], 235 "Invalid (0) number of successes", extras=stats) 236 asserts.assert_equal(stats['num_invalid_rssi'], 0, "Invalid RSSI", 237 extras=extras) 238 asserts.assert_true( 239 stats['num_failures'] <= 240 self.rtt_max_failure_rate_two_sided_rtt_percentage 241 * stats['num_results'] / 100, 242 "Failure rate is too high", extras=extras) 243 asserts.assert_true( 244 stats['num_range_out_of_margin'] 245 <= self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage 246 * stats['num_success_results'] / 100, 247 "Results exceeding error margin rate is too high", extras=extras) 248 249 if stats_reverse_direction is not None: 250 asserts.assert_true(stats_reverse_direction['num_no_results'] == 0, 251 "Missing (timed-out) results", 252 extras=extras) 253 asserts.assert_false(stats['any_lci_mismatch'], 254 "LCI mismatch", extras=extras) 255 asserts.assert_false(stats['any_lcr_mismatch'], 256 "LCR mismatch", extras=extras) 257 asserts.assert_equal(stats['num_invalid_rssi'], 0, "Invalid RSSI", 258 extras=extras) 259 asserts.assert_true( 260 stats_reverse_direction['num_failures'] 261 <= self.rtt_max_failure_rate_two_sided_rtt_percentage 262 * stats['num_results'] / 100, 263 "Failure rate is too high", extras=extras) 264 asserts.assert_true( 265 stats_reverse_direction['num_range_out_of_margin'] 266 <= self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage 267 * stats['num_success_results'] / 100, 268 "Results exceeding error margin rate is too high", 269 extras=extras) 270 271 asserts.explicit_pass("RTT Aware test done", extras=extras) 272 273 ############################################################################# 274 275 @test_tracker_info(uuid="9e4e7ab4-2254-498c-9788-21e15ed9a370") 276 def test_rtt_oob_discovery_one_way(self): 277 """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery 278 to communicate the MAC addresses to the peer. Test one-direction RTT only. 279 """ 280 rtt_results = self.run_rtt_oob_discovery_set(do_both_directions=False, 281 iter_count=self.NUM_ITER, 282 time_between_iterations=self.TIME_BETWEEN_ITERATIONS, 283 time_between_roles=self.TIME_BETWEEN_ROLES) 284 self.verify_results(rtt_results) 285 286 @test_tracker_info(uuid="22edba77-eeb2-43ee-875a-84437550ad84") 287 def test_rtt_oob_discovery_both_ways(self): 288 """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery 289 to communicate the MAC addresses to the peer. Test RTT both-ways: 290 switching rapidly between Initiator and Responder. 291 """ 292 rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set( 293 do_both_directions=True, iter_count=self.NUM_ITER, 294 time_between_iterations=self.TIME_BETWEEN_ITERATIONS, 295 time_between_roles=self.TIME_BETWEEN_ROLES) 296 self.verify_results(rtt_results1, rtt_results2) 297 298 @test_tracker_info(uuid="18cef4be-95b4-4f7d-a140-5165874e7d1c") 299 def test_rtt_ib_discovery_one_way(self): 300 """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery 301 to communicate the MAC addresses to the peer. Test one-direction RTT only. 302 """ 303 rtt_results = self.run_rtt_ib_discovery_set(do_both_directions=False, 304 iter_count=self.NUM_ITER, 305 time_between_iterations=self.TIME_BETWEEN_ITERATIONS, 306 time_between_roles=self.TIME_BETWEEN_ROLES) 307 self.verify_results(rtt_results) 308 309 @test_tracker_info(uuid="c67c8e70-c417-42d9-9bca-af3a89f1ddd9") 310 def test_rtt_ib_discovery_both_ways(self): 311 """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery 312 to communicate the MAC addresses to the peer. Test RTT both-ways: 313 switching rapidly between Initiator and Responder. 314 """ 315 rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set( 316 do_both_directions=True, iter_count=self.NUM_ITER, 317 time_between_iterations=self.TIME_BETWEEN_ITERATIONS, 318 time_between_roles=self.TIME_BETWEEN_ROLES) 319 self.verify_results(rtt_results1, rtt_results2) 320 321 @test_tracker_info(uuid="54f9693d-45e5-4979-adbb-1b875d217c0c") 322 def test_rtt_without_initiator_aware(self): 323 """Try to perform RTT operation when there is no local Aware session (on the 324 Initiator). The Responder is configured normally: Aware on and a Publisher 325 with Ranging enable. Should FAIL.""" 326 init_dut = self.android_devices[0] 327 resp_dut = self.android_devices[1] 328 329 # Enable a Responder and start a Publisher 330 resp_id = resp_dut.droid.wifiAwareAttach(True) 331 autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED) 332 resp_ident_event = autils.wait_for_event(resp_dut, 333 aconsts.EVENT_CB_ON_IDENTITY_CHANGED) 334 resp_mac = resp_ident_event['data']['mac'] 335 336 resp_config = autils.add_ranging_to_pub( 337 autils.create_discovery_config(self.SERVICE_NAME, 338 aconsts.PUBLISH_TYPE_UNSOLICITED), 339 enable_ranging=True) 340 resp_dut.droid.wifiAwarePublish(resp_id, resp_config) 341 autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) 342 343 # Initiate an RTT to Responder (no Aware started on Initiator!) 344 results = [] 345 num_no_responses = 0 346 num_successes = 0 347 for i in range(self.NUM_ITER): 348 result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac) 349 self.log.debug("result: %s", result) 350 results.append(result) 351 if result is None: 352 num_no_responses = num_no_responses + 1 353 elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS] 354 == rconsts.EVENT_CB_RANGING_STATUS_SUCCESS): 355 num_successes = num_successes + 1 356 357 asserts.assert_equal(num_no_responses, 0, "No RTT response?", 358 extras={"data":results}) 359 asserts.assert_equal(num_successes, 0, "Aware RTT w/o Aware should FAIL!", 360 extras={"data":results}) 361 asserts.explicit_pass("RTT Aware test done", extras={"data":results}) 362 363 @test_tracker_info(uuid="87a69053-8261-4928-8ec1-c93aac7f3a8d") 364 def test_rtt_without_responder_aware(self): 365 """Try to perform RTT operation when there is no peer Aware session (on the 366 Responder). Should FAIL.""" 367 init_dut = self.android_devices[0] 368 resp_dut = self.android_devices[1] 369 370 # Enable a Responder and start a Publisher 371 resp_id = resp_dut.droid.wifiAwareAttach(True) 372 autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED) 373 resp_ident_event = autils.wait_for_event(resp_dut, 374 aconsts.EVENT_CB_ON_IDENTITY_CHANGED) 375 resp_mac = resp_ident_event['data']['mac'] 376 377 resp_config = autils.add_ranging_to_pub( 378 autils.create_discovery_config(self.SERVICE_NAME, 379 aconsts.PUBLISH_TYPE_UNSOLICITED), 380 enable_ranging=True) 381 resp_dut.droid.wifiAwarePublish(resp_id, resp_config) 382 autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED) 383 384 # Disable Responder 385 resp_dut.droid.wifiAwareDestroy(resp_id) 386 387 # Enable the Initiator 388 init_id = init_dut.droid.wifiAwareAttach() 389 autils.wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED) 390 391 # Initiate an RTT to Responder (no Aware started on Initiator!) 392 results = [] 393 num_no_responses = 0 394 num_successes = 0 395 for i in range(self.NUM_ITER): 396 result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac) 397 self.log.debug("result: %s", result) 398 results.append(result) 399 if result is None: 400 num_no_responses = num_no_responses + 1 401 elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS] 402 == rconsts.EVENT_CB_RANGING_STATUS_SUCCESS): 403 num_successes = num_successes + 1 404 405 asserts.assert_equal(num_no_responses, 0, "No RTT response?", 406 extras={"data":results}) 407 asserts.assert_equal(num_successes, 0, "Aware RTT w/o Aware should FAIL!", 408 extras={"data":results}) 409 asserts.explicit_pass("RTT Aware test done", extras={"data":results}) 410