1 #/usr/bin/env python3.4 2 # 3 # Copyright (C) 2016 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); you may not 6 # use this file except in compliance with the License. You may obtain a copy of 7 # the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14 # License for the specific language governing permissions and limitations under 15 # the License. 16 """ 17 This test script exercises different GATT connection tests. 18 """ 19 20 import pprint 21 from queue import Empty 22 import time 23 from contextlib import suppress 24 25 from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 26 from acts.test_utils.bt.GattEnum import GattCharacteristic 27 from acts.test_utils.bt.GattEnum import GattDescriptor 28 from acts.test_utils.bt.GattEnum import GattService 29 from acts.test_utils.bt.GattEnum import MtuSize 30 from acts.test_utils.bt.GattEnum import GattCbErr 31 from acts.test_utils.bt.GattEnum import GattCbStrings 32 from acts.test_utils.bt.GattEnum import GattConnectionPriority 33 from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection 34 from acts.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection 35 from acts.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics 36 from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection 37 from acts.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors 38 from acts.test_utils.bt.bt_test_utils import get_advanced_droid_list 39 from acts.test_utils.bt.bt_test_utils import get_mac_address_of_generic_advertisement 40 from acts.test_utils.bt.bt_test_utils import log_energy_info 41 42 43 class GattConnectTest(BluetoothBaseTest): 44 adv_instances = [] 45 default_timeout = 10 46 default_discovery_timeout = 3 47 droid_list = () 48 49 def __init__(self, controllers): 50 BluetoothBaseTest.__init__(self, controllers) 51 self.droid_list = get_advanced_droid_list(self.android_devices) 52 self.cen_ad = self.android_devices[0] 53 self.per_ad = self.android_devices[1] 54 if self.droid_list[1]['max_advertisements'] == 0: 55 self.tests = () 56 return 57 58 def teardown_test(self): 59 for adv in self.adv_instances: 60 self.per_ad.droid.bleStopBleAdvertising(adv) 61 self.log.debug(log_energy_info(self.android_devices, "End")) 62 return True 63 64 def _setup_characteristics_and_descriptors(self, droid): 65 characteristic_input = [ 66 { 67 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 68 'property': GattCharacteristic.PROPERTY_WRITE.value 69 | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value, 70 'permission': GattCharacteristic.PERMISSION_WRITE.value 71 }, 72 { 73 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 74 'property': GattCharacteristic.PROPERTY_NOTIFY.value 75 | GattCharacteristic.PROPERTY_READ.value, 76 'permission': GattCharacteristic.PERMISSION_READ.value 77 }, 78 { 79 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 80 'property': GattCharacteristic.PROPERTY_NOTIFY.value 81 | GattCharacteristic.PROPERTY_READ.value, 82 'permission': GattCharacteristic.PERMISSION_READ.value 83 }, 84 ] 85 descriptor_input = [ 86 { 87 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 88 'property': GattDescriptor.PERMISSION_READ.value 89 | GattDescriptor.PERMISSION_WRITE.value, 90 }, { 91 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 92 'property': GattDescriptor.PERMISSION_READ.value 93 | GattCharacteristic.PERMISSION_WRITE.value, 94 } 95 ] 96 characteristic_list = setup_gatt_characteristics(droid, 97 characteristic_input) 98 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 99 return characteristic_list, descriptor_list 100 101 def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): 102 self.log.info("Disconnecting from peripheral device.") 103 test_result = disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, 104 gatt_callback) 105 self.cen_ad.droid.gattClientClose(bluetooth_gatt) 106 if not test_result: 107 self.log.info("Failed to disconnect from peripheral device.") 108 return False 109 return True 110 111 def _iterate_attributes(self, discovered_services_index): 112 services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( 113 discovered_services_index) 114 for i in range(services_count): 115 service = self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( 116 discovered_services_index, i) 117 self.log.info("Discovered service uuid {}".format(service)) 118 characteristic_uuids = ( 119 self.cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( 120 discovered_services_index, i)) 121 for characteristic in characteristic_uuids: 122 self.log.info("Discovered characteristic uuid {}".format( 123 characteristic)) 124 descriptor_uuids = ( 125 self.cen_ad.droid.gattClientGetDiscoveredDescriptorUuids( 126 discovered_services_index, i, characteristic)) 127 for descriptor in descriptor_uuids: 128 self.log.info("Discovered descriptor uuid {}".format( 129 descriptor)) 130 131 def _find_service_added_event(self, gatt_server_callback, uuid): 132 expected_event = GattCbStrings.SERV_ADDED.value.format( 133 gatt_server_callback) 134 try: 135 event = self.per_ad.ed.pop_event(expected_event, 136 self.default_timeout) 137 except Empty: 138 self.log.error(GattCbErr.SERV_ADDED_ERR.value.format( 139 expected_event)) 140 return False 141 if event['data']['serviceUuid'].lower() != uuid.lower(): 142 self.log.error("Uuid mismatch. Found: {}, Expected {}.".format( 143 event['data']['serviceUuid'], uuid)) 144 return False 145 return True 146 147 def _setup_multiple_services(self): 148 gatt_server_callback = ( 149 self.per_ad.droid.gattServerCreateGattServerCallback()) 150 gatt_server = self.per_ad.droid.gattServerOpenGattServer( 151 gatt_server_callback) 152 characteristic_list, descriptor_list = ( 153 self._setup_characteristics_and_descriptors(self.per_ad.droid)) 154 self.per_ad.droid.gattServerCharacteristicAddDescriptor( 155 characteristic_list[1], descriptor_list[0]) 156 self.per_ad.droid.gattServerCharacteristicAddDescriptor( 157 characteristic_list[2], descriptor_list[1]) 158 gatt_service = self.per_ad.droid.gattServerCreateService( 159 "00000000-0000-1000-8000-00805f9b34fb", 160 GattService.SERVICE_TYPE_PRIMARY.value) 161 gatt_service2 = self.per_ad.droid.gattServerCreateService( 162 "FFFFFFFF-0000-1000-8000-00805f9b34fb", 163 GattService.SERVICE_TYPE_PRIMARY.value) 164 gatt_service3 = self.per_ad.droid.gattServerCreateService( 165 "3846D7A0-69C8-11E4-BA00-0002A5D5C51B", 166 GattService.SERVICE_TYPE_PRIMARY.value) 167 for characteristic in characteristic_list: 168 self.per_ad.droid.gattServerAddCharacteristicToService( 169 gatt_service, characteristic) 170 self.per_ad.droid.gattServerAddService(gatt_server, gatt_service) 171 result = self._find_service_added_event( 172 gatt_server_callback, "00000000-0000-1000-8000-00805f9b34fb") 173 if not result: 174 return False 175 for characteristic in characteristic_list: 176 self.per_ad.droid.gattServerAddCharacteristicToService( 177 gatt_service2, characteristic) 178 self.per_ad.droid.gattServerAddService(gatt_server, gatt_service2) 179 result = self._find_service_added_event( 180 gatt_server_callback, "FFFFFFFF-0000-1000-8000-00805f9b34fb") 181 if not result: 182 return False 183 for characteristic in characteristic_list: 184 self.per_ad.droid.gattServerAddCharacteristicToService( 185 gatt_service3, characteristic) 186 self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3) 187 result = self._find_service_added_event( 188 gatt_server_callback, "3846D7A0-69C8-11E4-BA00-0002A5D5C51B") 189 if not result: 190 return False, False 191 return gatt_server_callback, gatt_server 192 193 def _cleanup_services(self, gatt_server): 194 self.per_ad.droid.gattServerClearServices(gatt_server) 195 196 @BluetoothBaseTest.bt_test_wrap 197 def test_gatt_connect(self): 198 """Test GATT connection over LE. 199 200 Test establishing a gatt connection between a GATT server and GATT 201 client. 202 203 Steps: 204 1. Start a generic advertisement. 205 2. Start a generic scanner. 206 3. Find the advertisement and extract the mac address. 207 4. Stop the first scanner. 208 5. Create a GATT connection between the scanner and advertiser. 209 6. Disconnect the GATT connection. 210 211 Expected Result: 212 Verify that a connection was established and then disconnected 213 successfully. 214 215 Returns: 216 Pass if True 217 Fail if False 218 219 TAGS: LE, Advertising, Filtering, Scanning, GATT 220 Priority: 0 221 """ 222 bluetooth_gatt, gatt_callback, adv_callback = ( 223 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 224 self.adv_instances.append(adv_callback) 225 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 226 gatt_callback) 227 228 @BluetoothBaseTest.bt_test_wrap 229 def test_gatt_connect_autoconnect(self): 230 """Test GATT connection over LE. 231 232 Test re-establishing a gat connection using autoconnect 233 set to True in order to test connection whitelist. 234 235 Steps: 236 1. Start a generic advertisement. 237 2. Start a generic scanner. 238 3. Find the advertisement and extract the mac address. 239 4. Stop the first scanner. 240 5. Create a GATT connection between the scanner and advertiser. 241 6. Disconnect the GATT connection. 242 7. Create a GATT connection with autoconnect set to True 243 8. Disconnect the GATT connection. 244 245 Expected Result: 246 Verify that a connection was re-established and then disconnected 247 successfully. 248 249 Returns: 250 Pass if True 251 Fail if False 252 253 TAGS: LE, Advertising, Filtering, Scanning, GATT 254 Priority: 0 255 """ 256 autoconnect = False 257 mac_address, adv_callback = ( 258 get_mac_address_of_generic_advertisement(self.cen_ad, self.per_ad)) 259 test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection( 260 self.cen_ad, mac_address, autoconnect) 261 if not disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, 262 gatt_callback): 263 return False 264 autoconnect = True 265 bluetooth_gatt = self.cen_ad.droid.gattClientConnectGatt( 266 gatt_callback, mac_address, autoconnect) 267 expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format( 268 gatt_callback) 269 try: 270 event = self.cen_ad.ed.pop_event(expected_event, 271 self.default_timeout) 272 except Empty: 273 log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format( 274 expected_event)) 275 test_result = False 276 return True 277 278 @BluetoothBaseTest.bt_test_wrap 279 def test_gatt_request_min_mtu(self): 280 """Test GATT connection over LE and exercise MTU sizes. 281 282 Test establishing a gatt connection between a GATT server and GATT 283 client. Request an MTU size that matches the correct minimum size. 284 285 Steps: 286 1. Start a generic advertisement. 287 2. Start a generic scanner. 288 3. Find the advertisement and extract the mac address. 289 4. Stop the first scanner. 290 5. Create a GATT connection between the scanner and advertiser. 291 6. From the scanner (client) request MTU size change to the 292 minimum value. 293 7. Find the MTU changed event on the client. 294 8. Disconnect the GATT connection. 295 296 Expected Result: 297 Verify that a connection was established and the MTU value found 298 matches the expected MTU value. 299 300 Returns: 301 Pass if True 302 Fail if False 303 304 TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU 305 Priority: 0 306 """ 307 bluetooth_gatt, gatt_callback, adv_callback = ( 308 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 309 self.adv_instances.append(adv_callback) 310 expected_mtu = MtuSize.MIN.value 311 self.cen_ad.droid.gattClientRequestMtu(bluetooth_gatt, expected_mtu) 312 expected_event = GattCbStrings.MTU_CHANGED.value.format(gatt_callback) 313 try: 314 mtu_event = self.cen_ad.ed.pop_event(expected_event, 315 self.default_timeout) 316 mtu_size_found = mtu_event['data']['MTU'] 317 if mtu_size_found != expected_mtu: 318 self.log.error("MTU size found: {}, expected: {}".format( 319 mtu_size_found, expected_mtu)) 320 return False 321 except Empty: 322 self.log.error(GattCbErr.MTU_CHANGED_ERR.value.format( 323 expected_event)) 324 return False 325 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 326 gatt_callback) 327 328 @BluetoothBaseTest.bt_test_wrap 329 def test_gatt_request_max_mtu(self): 330 """Test GATT connection over LE and exercise MTU sizes. 331 332 Test establishing a gatt connection between a GATT server and GATT 333 client. Request an MTU size that matches the correct maximum size. 334 335 Steps: 336 1. Start a generic advertisement. 337 2. Start a generic scanner. 338 3. Find the advertisement and extract the mac address. 339 4. Stop the first scanner. 340 5. Create a GATT connection between the scanner and advertiser. 341 6. From the scanner (client) request MTU size change to the 342 maximum value. 343 7. Find the MTU changed event on the client. 344 8. Disconnect the GATT connection. 345 346 Expected Result: 347 Verify that a connection was established and the MTU value found 348 matches the expected MTU value. 349 350 Returns: 351 Pass if True 352 Fail if False 353 354 TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU 355 Priority: 0 356 """ 357 bluetooth_gatt, gatt_callback, adv_callback = ( 358 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 359 self.adv_instances.append(adv_callback) 360 expected_mtu = MtuSize.MAX.value 361 self.cen_ad.droid.gattClientRequestMtu(bluetooth_gatt, expected_mtu) 362 expected_event = GattCbStrings.MTU_CHANGED.value.format(gatt_callback) 363 try: 364 mtu_event = self.cen_ad.ed.pop_event(expected_event, 365 self.default_timeout) 366 mtu_size_found = mtu_event['data']['MTU'] 367 if mtu_size_found != expected_mtu: 368 self.log.error("MTU size found: {}, expected: {}".format( 369 mtu_size_found, expected_mtu)) 370 return False 371 except Empty: 372 self.log.error(GattCbErr.MTU_CHANGED_ERR.value.format( 373 expected_event)) 374 return False 375 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 376 gatt_callback) 377 378 @BluetoothBaseTest.bt_test_wrap 379 def test_gatt_request_out_of_bounds_mtu(self): 380 """Test GATT connection over LE and exercise an out of bound MTU size. 381 382 Test establishing a gatt connection between a GATT server and GATT 383 client. Request an MTU size that is the MIN value minus 1. 384 385 Steps: 386 1. Start a generic advertisement. 387 2. Start a generic scanner. 388 3. Find the advertisement and extract the mac address. 389 4. Stop the first scanner. 390 5. Create a GATT connection between the scanner and advertiser. 391 6. From the scanner (client) request MTU size change to the 392 minimum value minus one. 393 7. Find the MTU changed event on the client. 394 8. Disconnect the GATT connection. 395 396 Expected Result: 397 Verify that an MTU changed event was not discovered and that 398 it didn't cause an exception when requesting an out of bounds 399 MTU. 400 401 Returns: 402 Pass if True 403 Fail if False 404 405 TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU 406 Priority: 0 407 """ 408 bluetooth_gatt, gatt_callback, adv_callback = ( 409 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 410 self.adv_instances.append(adv_callback) 411 self.cen_ad.droid.gattClientRequestMtu(bluetooth_gatt, 412 MtuSize.MIN.value - 1) 413 expected_event = GattCbStrings.MTU_CHANGED.value.format(gatt_callback) 414 try: 415 self.cen_ad.ed.pop_event(expected_event, self.default_timeout) 416 self.log.error("Found {} event when it wasn't expected".format( 417 expected_event)) 418 return False 419 except Empty: 420 self.log.debug("Successfully didn't find {} event".format( 421 expected_event)) 422 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 423 gatt_callback) 424 425 @BluetoothBaseTest.bt_test_wrap 426 def test_gatt_connect_trigger_on_read_rssi(self): 427 """Test GATT connection over LE read RSSI. 428 429 Test establishing a gatt connection between a GATT server and GATT 430 client then read the RSSI. 431 432 Steps: 433 1. Start a generic advertisement. 434 2. Start a generic scanner. 435 3. Find the advertisement and extract the mac address. 436 4. Stop the first scanner. 437 5. Create a GATT connection between the scanner and advertiser. 438 6. From the scanner, request to read the RSSI of the advertiser. 439 7. Disconnect the GATT connection. 440 441 Expected Result: 442 Verify that a connection was established and then disconnected 443 successfully. Verify that the RSSI was ready correctly. 444 445 Returns: 446 Pass if True 447 Fail if False 448 449 TAGS: LE, Advertising, Filtering, Scanning, GATT, RSSI 450 Priority: 1 451 """ 452 bluetooth_gatt, gatt_callback, adv_callback = ( 453 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 454 self.adv_instances.append(adv_callback) 455 expected_event = GattCbStrings.RD_REMOTE_RSSI.value.format( 456 gatt_callback) 457 if self.cen_ad.droid.gattClientReadRSSI(bluetooth_gatt): 458 try: 459 self.cen_ad.ed.pop_event(expected_event, self.default_timeout) 460 except Empty: 461 self.log.error(GattCbErr.RD_REMOTE_RSSI_ERR.value.format( 462 expected_event)) 463 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 464 gatt_callback) 465 466 @BluetoothBaseTest.bt_test_wrap 467 def test_gatt_connect_trigger_on_services_discovered(self): 468 """Test GATT connection and discover services of peripheral. 469 470 Test establishing a gatt connection between a GATT server and GATT 471 client the discover all services from the connected device. 472 473 Steps: 474 1. Start a generic advertisement. 475 2. Start a generic scanner. 476 3. Find the advertisement and extract the mac address. 477 4. Stop the first scanner. 478 5. Create a GATT connection between the scanner and advertiser. 479 6. From the scanner (central device), discover services. 480 7. Disconnect the GATT connection. 481 482 Expected Result: 483 Verify that a connection was established and then disconnected 484 successfully. Verify that the service were discovered. 485 486 Returns: 487 Pass if True 488 Fail if False 489 490 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 491 Priority: 1 492 """ 493 bluetooth_gatt, gatt_callback, adv_callback = ( 494 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 495 self.adv_instances.append(adv_callback) 496 if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): 497 expected_event = GattCbStrings.GATT_SERV_DISC.value.format( 498 gatt_callback) 499 try: 500 event = self.cen_ad.ed.pop_event(expected_event, 501 self.default_timeout) 502 except Empty: 503 self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( 504 expected_event)) 505 return False 506 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 507 gatt_callback) 508 509 @BluetoothBaseTest.bt_test_wrap 510 def test_gatt_connect_trigger_on_services_discovered_iterate_attributes( 511 self): 512 """Test GATT connection and iterate peripherals attributes. 513 514 Test establishing a gatt connection between a GATT server and GATT 515 client and iterate over all the characteristics and descriptors of the 516 discovered services. 517 518 Steps: 519 1. Start a generic advertisement. 520 2. Start a generic scanner. 521 3. Find the advertisement and extract the mac address. 522 4. Stop the first scanner. 523 5. Create a GATT connection between the scanner and advertiser. 524 6. From the scanner (central device), discover services. 525 7. Iterate over all the characteristics and descriptors of the 526 discovered features. 527 8. Disconnect the GATT connection. 528 529 Expected Result: 530 Verify that a connection was established and then disconnected 531 successfully. Verify that the services, characteristics, and descriptors 532 were discovered. 533 534 Returns: 535 Pass if True 536 Fail if False 537 538 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 539 Characteristics, Descriptors 540 Priority: 1 541 """ 542 bluetooth_gatt, gatt_callback, adv_callback = ( 543 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 544 self.adv_instances.append(adv_callback) 545 if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): 546 expected_event = GattCbStrings.GATT_SERV_DISC.value.format( 547 gatt_callback) 548 try: 549 event = self.cen_ad.ed.pop_event(expected_event, 550 self.default_timeout) 551 discovered_services_index = event['data']['ServicesIndex'] 552 except Empty: 553 self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( 554 expected_event)) 555 return False 556 self._iterate_attributes(discovered_services_index) 557 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 558 gatt_callback) 559 560 @BluetoothBaseTest.bt_test_wrap 561 def test_gatt_connect_with_service_uuid_variations(self): 562 """Test GATT connection with multiple service uuids. 563 564 Test establishing a gatt connection between a GATT server and GATT 565 client with multiple service uuid variations. 566 567 Steps: 568 1. Start a generic advertisement. 569 2. Start a generic scanner. 570 3. Find the advertisement and extract the mac address. 571 4. Stop the first scanner. 572 5. Create a GATT connection between the scanner and advertiser. 573 6. From the scanner (central device), discover services. 574 7. Verify that all the service uuid variations are found. 575 8. Disconnect the GATT connection. 576 577 Expected Result: 578 Verify that a connection was established and then disconnected 579 successfully. Verify that the service uuid variations are found. 580 581 Returns: 582 Pass if True 583 Fail if False 584 585 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 586 Priority: 2 587 """ 588 gatt_server_callback, gatt_server = self._setup_multiple_services() 589 if not gatt_server_callback or not gatt_server: 590 return False 591 bluetooth_gatt, gatt_callback, adv_callback = ( 592 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 593 self.adv_instances.append(adv_callback) 594 if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): 595 expected_event = GattCbStrings.GATT_SERV_DISC.value.format( 596 gatt_callback) 597 try: 598 event = self.cen_ad.ed.pop_event(expected_event, 599 self.default_timeout) 600 except Empty: 601 self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( 602 expected_event)) 603 return False 604 discovered_services_index = event['data']['ServicesIndex'] 605 self._iterate_attributes(discovered_services_index) 606 607 self._cleanup_services(gatt_server) 608 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 609 gatt_callback) 610 611 @BluetoothBaseTest.bt_test_wrap 612 def test_gatt_connect_in_quick_succession(self): 613 """Test GATT connections multiple times. 614 615 Test establishing a gatt connection between a GATT server and GATT 616 client with multiple iterations. 617 618 Steps: 619 1. Start a generic advertisement. 620 2. Start a generic scanner. 621 3. Find the advertisement and extract the mac address. 622 4. Stop the first scanner. 623 5. Create a GATT connection between the scanner and advertiser. 624 6. Disconnect the GATT connection. 625 7. Repeat steps 5 and 6 twenty times. 626 627 Expected Result: 628 Verify that a connection was established and then disconnected 629 successfully twenty times. 630 631 Returns: 632 Pass if True 633 Fail if False 634 635 TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress 636 Priority: 1 637 """ 638 mac_address, adv_callback = get_mac_address_of_generic_advertisement( 639 self.cen_ad, self.per_ad) 640 autoconnect = False 641 for i in range(1000): 642 self.log.info("Starting connection iteration {}".format(i + 1)) 643 test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection( 644 self.cen_ad, mac_address, autoconnect) 645 if not test_result: 646 self.log.info("Could not connect to peripheral.") 647 return False 648 test_result = self._orchestrate_gatt_disconnection(bluetooth_gatt, 649 gatt_callback) 650 if not test_result: 651 self.log.info("Failed to disconnect from peripheral device.") 652 return False 653 self.adv_instances.append(adv_callback) 654 return True 655 656 @BluetoothBaseTest.bt_test_wrap 657 def test_write_descriptor_stress(self): 658 """Test GATT connection writing and reading descriptors. 659 660 Test establishing a gatt connection between a GATT server and GATT 661 client with multiple service uuid variations. 662 663 Steps: 664 1. Start a generic advertisement. 665 2. Start a generic scanner. 666 3. Find the advertisement and extract the mac address. 667 4. Stop the first scanner. 668 5. Create a GATT connection between the scanner and advertiser. 669 6. Discover services. 670 7. Write data to the descriptors of each characteristic 100 times. 671 8. Read the data sent to the descriptors. 672 9. Disconnect the GATT connection. 673 674 Expected Result: 675 Each descriptor in each characteristic is written and read 100 times. 676 677 Returns: 678 Pass if True 679 Fail if False 680 681 TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress, 682 Characteristics, Descriptors 683 Priority: 1 684 """ 685 gatt_server_callback, gatt_server = self._setup_multiple_services() 686 if not gatt_server_callback or not gatt_server: 687 return False 688 bluetooth_gatt, gatt_callback, adv_callback = ( 689 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 690 self.adv_instances.append(adv_callback) 691 if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): 692 expected_event = GattCbStrings.GATT_SERV_DISC.value.format( 693 gatt_callback) 694 try: 695 event = self.cen_ad.ed.pop_event(expected_event, 696 self.default_timeout) 697 except Empty: 698 self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( 699 expected_event)) 700 return False 701 discovered_services_index = event['data']['ServicesIndex'] 702 else: 703 self.log.info("Failed to discover services.") 704 return False 705 services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( 706 discovered_services_index) 707 708 connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices( 709 gatt_server) 710 if len(connected_device_list) == 0: 711 self.log.info("No devices connected from peripheral.") 712 return False 713 bt_device_id = 0 714 status = 1 715 offset = 1 716 test_value = [1,2,3,4,5,6,7] 717 test_value_return = [1,2,3] 718 for i in range(services_count): 719 characteristic_uuids = ( 720 self.cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( 721 discovered_services_index, i)) 722 for characteristic in characteristic_uuids: 723 descriptor_uuids = ( 724 self.cen_ad.droid.gattClientGetDiscoveredDescriptorUuids( 725 discovered_services_index, i, characteristic)) 726 for x in range(100): 727 for descriptor in descriptor_uuids: 728 self.log.info( 729 "Starting write iteration {} on (Characteristic::Descriptor) {}::{}".format( 730 x + 1, characteristic, descriptor)) 731 self.cen_ad.droid.gattClientDescriptorSetValue( 732 bluetooth_gatt, discovered_services_index, i, 733 characteristic, descriptor, test_value) 734 self.cen_ad.droid.gattClientWriteDescriptor( 735 bluetooth_gatt, discovered_services_index, i, 736 characteristic, descriptor) 737 expected_event = GattCbStrings.DESC_WRITE_REQ.value.format( 738 gatt_server_callback) 739 try: 740 event = self.per_ad.ed.pop_event( 741 expected_event, self.default_timeout) 742 except Empty: 743 self.log.error( 744 GattCbErr.DESC_WRITE_REQ_ERR.value.format( 745 expected_event)) 746 return False 747 request_id = event['data']['requestId'] 748 found_value = event['data']['value'] 749 if found_value != test_value: 750 self.log.error("Values didn't match. Found: {}, " 751 "Expected: {}".format(found_value, 752 test_value)) 753 return False 754 self.per_ad.droid.gattServerSendResponse( 755 gatt_server, bt_device_id, request_id, status, 756 offset, test_value_return) 757 expected_event = GattCbStrings.DESC_WRITE.value.format( 758 gatt_callback) 759 try: 760 self.cen_ad.ed.pop_event(expected_event, 761 self.default_timeout) 762 except Empty: 763 self.log.error( 764 GattCbErr.DESC_WRITE_ERR.value.format( 765 expected_event)) 766 return False 767 self._cleanup_services(gatt_server) 768 return True 769 770 @BluetoothBaseTest.bt_test_wrap 771 def test_write_characteristic(self): 772 """Test GATT connection writing characteristics. 773 774 Test establishing a gatt connection between a GATT server and GATT 775 client and exercise writing a characteristic. 776 777 Steps: 778 1. Start a generic advertisement. 779 2. Start a generic scanner. 780 3. Find the advertisement and extract the mac address. 781 4. Stop the first scanner. 782 5. Create a GATT connection between the scanner and advertiser. 783 6. Discover services. 784 7. Set discovered characteristic notification to True 785 8. Write data to the characteristic. 786 9. Send a response from the peripheral to the central. 787 10. Disconnect the GATT connection. 788 789 Expected Result: 790 The characteristic data should be written successfully 791 792 Returns: 793 Pass if True 794 Fail if False 795 796 TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress, 797 Characteristics, Descriptors 798 Priority: 1 799 """ 800 gatt_server_callback, gatt_server = self._setup_multiple_services() 801 if not gatt_server_callback or not gatt_server: 802 return False 803 bluetooth_gatt, gatt_callback, adv_callback = ( 804 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 805 self.adv_instances.append(adv_callback) 806 807 service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" 808 characteristic_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 809 810 if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): 811 expected_event = GattCbStrings.GATT_SERV_DISC.value.format( 812 gatt_callback) 813 try: 814 event = self.cen_ad.ed.pop_event(expected_event, 815 self.default_timeout) 816 except Empty: 817 self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( 818 expected_event)) 819 discovered_services_index = event['data']['ServicesIndex'] 820 services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( 821 discovered_services_index) 822 disc_service_index = None 823 for i in range(services_count): 824 disc_service_uuid = ( 825 self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( 826 discovered_services_index, i).upper()) 827 if disc_service_uuid == service_uuid: 828 disc_service_index = i 829 break 830 831 test_value = [1,2,3,4,5,6,7] 832 833 self.cen_ad.droid.gattClientCharacteristicSetValue( 834 bluetooth_gatt, discovered_services_index, disc_service_index, 835 characteristic_uuid, test_value) 836 837 self.cen_ad.droid.gattClientWriteCharacteristic( 838 bluetooth_gatt, discovered_services_index, disc_service_index, 839 characteristic_uuid) 840 841 expected_event = GattCbStrings.CHAR_WRITE_REQ.value.format( 842 gatt_server_callback) 843 try: 844 event = self.per_ad.ed.pop_event(expected_event, 845 self.default_timeout) 846 except Empty: 847 self.log.error(GattCbErr.CHAR_WRITE_REQ_ERR.value.format( 848 expected_event)) 849 return False 850 851 request_id = event['data']['requestId'] 852 bt_device_id = 0 853 status = 0 854 offset = 0 855 test_value_return = [1,2,3] 856 self.per_ad.droid.gattServerGetConnectedDevices(gatt_server) 857 self.per_ad.droid.gattServerSendResponse(gatt_server, bt_device_id, 858 request_id, status, offset, 859 test_value_return) 860 861 expected_event = GattCbStrings.CHAR_WRITE.value.format(gatt_callback) 862 try: 863 event = self.cen_ad.ed.pop_event(expected_event, self.default_timeout) 864 if event["data"]["Status"] != status: 865 self.log.error("Write status should be 0") 866 return False; 867 868 except Empty: 869 self.log.error(GattCbErr.CHAR_WRITE_ERR.value.format( 870 expected_event)) 871 return False 872 873 self._cleanup_services(gatt_server) 874 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 875 gatt_callback) 876 877 @BluetoothBaseTest.bt_test_wrap 878 def test_write_characteristic_stress(self): 879 """Test GATT connection writing characteristics in quick succession. 880 881 Test establishing a gatt connection between a GATT server and GATT 882 client and exercise writing a characteristic. Do this quickly 100 times. 883 884 Steps: 885 1. Start a generic advertisement. 886 2. Start a generic scanner. 887 3. Find the advertisement and extract the mac address. 888 4. Stop the first scanner. 889 5. Create a GATT connection between the scanner and advertiser. 890 6. Discover services. 891 7. Set discovered characteristic notification to True. 892 8. Write data to the characteristic 100 times as fast as possible. 893 9. Send a response from the peripheral to the central. 894 10. Disconnect the GATT connection. 895 896 Expected Result: 897 The characteristic data should be written successfully each iteration 898 899 Returns: 900 Pass if True 901 Fail if False 902 903 TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress, 904 Characteristics, Descriptors 905 Priority: 1 906 """ 907 gatt_server_callback, gatt_server = self._setup_multiple_services() 908 if not gatt_server_callback or not gatt_server: 909 return False 910 bluetooth_gatt, gatt_callback, adv_callback = ( 911 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 912 self.adv_instances.append(adv_callback) 913 914 service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" 915 characteristic_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 916 917 if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): 918 expected_event = GattCbStrings.GATT_SERV_DISC.value.format( 919 gatt_callback) 920 try: 921 event = self.cen_ad.ed.pop_event(expected_event, 922 self.default_timeout) 923 except Empty: 924 self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( 925 expected_event)) 926 return False 927 discovered_services_index = event['data']['ServicesIndex'] 928 else: 929 self.log.info("Failed to discover services.") 930 return False 931 services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( 932 discovered_services_index) 933 934 disc_service_index = None 935 for i in range(services_count): 936 disc_service_uuid = ( 937 self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( 938 discovered_services_index, i).upper()) 939 if disc_service_uuid == service_uuid: 940 disc_service_index = i 941 break 942 943 self.cen_ad.droid.gattClientRequestConnectionPriority( 944 bluetooth_gatt, GattConnectionPriority.CONNECTION_PRIORITY_HIGH.value) 945 946 connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices( 947 gatt_server) 948 if len(connected_device_list) == 0: 949 self.log.info("No devices connected from peripheral.") 950 return False 951 bt_device_id = 0 952 status = 0 953 offset = 0 954 test_value = [1,2,3,4,5,6,7] 955 test_value_return = [1,2,3] 956 for i in range(100): 957 self.cen_ad.droid.gattClientCharacteristicSetValue( 958 bluetooth_gatt, discovered_services_index, 959 disc_service_index, characteristic_uuid, test_value) 960 self.cen_ad.droid.gattClientWriteCharacteristic( 961 bluetooth_gatt, discovered_services_index, 962 disc_service_index, characteristic_uuid) 963 964 expected_event = GattCbStrings.CHAR_WRITE_REQ.value.format( 965 gatt_server_callback) 966 try: 967 event = self.per_ad.ed.pop_event(expected_event, 968 self.default_timeout) 969 except Empty: 970 self.log.error( 971 GattCbErr.CHAR_WRITE_REQ_ERR.value.format( 972 expected_event)) 973 return False 974 975 self.log.info("{} event found: {}".format( 976 GattCbStrings.CHAR_WRITE_REQ.value.format( 977 gatt_server_callback), event)) 978 request_id = event['data']['requestId'] 979 found_value = event['data']['value'] 980 if found_value != test_value: 981 self.log.info("Values didn't match. Found: {}, " 982 "Expected: {}".format(found_value, 983 test_value)) 984 return False 985 self.per_ad.droid.gattServerSendResponse( 986 gatt_server, bt_device_id, request_id, status, offset, 987 test_value_return) 988 expected_event = GattCbStrings.CHAR_WRITE.value.format( 989 gatt_callback) 990 try: 991 self.cen_ad.ed.pop_event(expected_event, 992 self.default_timeout) 993 except Empty: 994 self.log.error( 995 GattCbErr.CHAR_WRITE_ERR.value.format( 996 expected_event)) 997 return False 998 999 self._cleanup_services(gatt_server) 1000 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 1001 gatt_callback) 1002 1003 @BluetoothBaseTest.bt_test_wrap 1004 def test_gatt_connect_mitm_attack(self): 1005 """Test GATT connection with permission write encrypted mitm. 1006 1007 Test establishing a gatt connection between a GATT server and GATT 1008 client while the GATT server's characteristic includes the property 1009 write value and the permission write encrypted mitm value. This will 1010 prompt LE pairing and then the devices will create a bond. 1011 1012 Steps: 1013 1. Create a GATT server and server callback on the peripheral device. 1014 2. Create a unique service and characteristic uuid on the peripheral. 1015 3. Create a characteristic on the peripheral with these properties: 1016 GattCharacteristic.PROPERTY_WRITE.value, 1017 GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM.value 1018 4. Create a GATT service on the peripheral. 1019 5. Add the characteristic to the GATT service. 1020 6. Create a GATT connection between your central and peripheral device. 1021 7. From the central device, discover the peripheral's services. 1022 8. Iterate the services found until you find the unique characteristic 1023 created in step 3. 1024 9. Once found, write a random but valid value to the characteristic. 1025 10. Start pairing helpers on both devices immediately after attempting 1026 to write to the characteristic. 1027 11. Within 10 seconds of writing the characteristic, there should be 1028 a prompt to bond the device from the peripheral. The helpers will 1029 handle the UI interaction automatically. (see 1030 BluetoothConnectionFacade.java bluetoothStartPairingHelper). 1031 12. Verify that the two devices are bonded. 1032 1033 Expected Result: 1034 Verify that a connection was established and the devices are bonded. 1035 1036 Returns: 1037 Pass if True 1038 Fail if False 1039 1040 TAGS: LE, Advertising, Filtering, Scanning, GATT, Characteristic, MITM 1041 Priority: 1 1042 """ 1043 gatt_server_callback, gatt_server = self._setup_multiple_services() 1044 if not gatt_server_callback or not gatt_server: 1045 return False 1046 bonded = False 1047 test_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 1048 bluetooth_gatt, gatt_callback, adv_callback = ( 1049 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 1050 self.adv_instances.append(adv_callback) 1051 if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): 1052 expected_event = GattCbStrings.GATT_SERV_DISC.value.format( 1053 gatt_callback) 1054 try: 1055 event = self.cen_ad.ed.pop_event(expected_event, 1056 self.default_timeout) 1057 except Empty: 1058 self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( 1059 expected_event)) 1060 return False 1061 discovered_services_index = event['data']['ServicesIndex'] 1062 else: 1063 self.log.info("Failed to discover services.") 1064 return False 1065 test_value = [1,2,3,4,5,6,7] 1066 services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( 1067 discovered_services_index) 1068 for i in range(services_count): 1069 characteristic_uuids = ( 1070 self.cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( 1071 discovered_services_index, i)) 1072 for characteristic_uuid in characteristic_uuids: 1073 if characteristic_uuid == test_uuid: 1074 self.cen_ad.droid.bluetoothStartPairingHelper() 1075 self.per_ad.droid.bluetoothStartPairingHelper() 1076 self.cen_ad.droid.gattClientCharacteristicSetValue( 1077 bluetooth_gatt, discovered_services_index, i, 1078 characteristic_uuid, test_value) 1079 self.cen_ad.droid.gattClientWriteCharacteristic( 1080 bluetooth_gatt, discovered_services_index, i, 1081 characteristic_uuid) 1082 start_time = time.time() + self.default_timeout 1083 target_name = self.per_ad.droid.bluetoothGetLocalName() 1084 while time.time() < start_time and bonded == False: 1085 bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices( 1086 ) 1087 for device in bonded_devices: 1088 if 'name' in device.keys() and device[ 1089 'name'] == target_name: 1090 bonded = True 1091 break 1092 self._cleanup_services(gatt_server) 1093 return self._orchestrate_gatt_disconnection(bluetooth_gatt, 1094 gatt_callback) 1095 1096