Home | History | Annotate | Download | only in concurrency
      1 #/usr/bin/env python3.4
      2 #
      3 # Copyright (C) 2018 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
      6 # use this file except in compliance with the License. You may obtain a copy of
      7 # the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     14 # License for the specific language governing permissions and limitations under
     15 # the License.
     16 """
     17 Test script for concurrent Gatt connections.
     18 Testbed assumes 6 Android devices. One will be the central and the rest
     19 peripherals.
     20 """
     21 
     22 from queue import Empty
     23 import concurrent.futures
     24 import threading
     25 import time
     26 from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
     27 from acts.test_utils.bt.bt_constants import ble_scan_settings_modes
     28 from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes
     29 from acts.test_utils.bt.bt_constants import bt_profile_constants
     30 from acts.test_utils.bt.bt_constants import gatt_characteristic
     31 from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format
     32 from acts.test_utils.bt.bt_constants import gatt_char_desc_uuids
     33 from acts.test_utils.bt.bt_constants import gatt_descriptor
     34 from acts.test_utils.bt.bt_constants import gatt_service_types
     35 from acts.test_utils.bt.bt_constants import scan_result
     36 from acts.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor
     37 from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection
     38 from acts.test_utils.bt.gatts_lib import GattServerLib
     39 from acts.test_decorators import test_tracker_info
     40 
     41 service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb'
     42 characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630'
     43 descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb"
     44 
     45 gatt_server_read_descriptor_sample = {
     46     'services': [{
     47         'uuid':
     48         service_uuid,
     49         'type':
     50         gatt_service_types['primary'],
     51         'characteristics': [{
     52             'uuid':
     53             characteristic_uuid,
     54             'properties':
     55             gatt_characteristic['property_write'],
     56             'permissions':
     57             gatt_characteristic['permission_write'],
     58             'instance_id':
     59             0x002a,
     60             'value_type':
     61             gatt_characteristic_value_format['string'],
     62             'value':
     63             'Test Database',
     64             'descriptors': [{
     65                 'uuid': descriptor_uuid,
     66                 'permissions': gatt_descriptor['permission_write'],
     67             }]
     68         }]
     69     }]
     70 }
     71 
     72 
     73 class ConcurrentGattConnectTest(BluetoothBaseTest):
     74     bt_default_timeout = 10
     75     max_connections = 5
     76     # List of tuples (android_device, advertise_callback)
     77     advertise_callbacks = []
     78     # List of tuples (android_device, advertisement_name)
     79     advertisement_names = []
     80     list_of_arguments_list = []
     81 
     82     def __init__(self, controllers):
     83         BluetoothBaseTest.__init__(self, controllers)
     84         self.pri_dut = self.android_devices[0]
     85 
     86     def setup_class(self):
     87         super(BluetoothBaseTest, self).setup_class()
     88 
     89         # Create 5 advertisements from different android devices
     90         for i in range(1, self.max_connections + 1):
     91             # Set device name
     92             ad = self.android_devices[i]
     93             name = "test_adv_{}".format(i)
     94             self.advertisement_names.append((ad, name))
     95             ad.droid.bluetoothSetLocalName(name)
     96 
     97             # Setup and start advertisements
     98             ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
     99             ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
    100                 ble_advertise_settings_modes['low_latency'])
    101             advertise_data = ad.droid.bleBuildAdvertiseData()
    102             advertise_settings = ad.droid.bleBuildAdvertiseSettings()
    103             advertise_callback = ad.droid.bleGenBleAdvertiseCallback()
    104             ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
    105                                             advertise_settings)
    106             self.advertise_callbacks.append((ad, advertise_callback))
    107 
    108     def obtain_address_list_from_scan(self):
    109         """Returns the address list of all devices that match the scan filter.
    110 
    111         Returns:
    112           A list if all devices are found; None is any devices are not found.
    113         """
    114         # From central device, scan for all appropriate addresses by name.
    115         filter_list = self.pri_dut.droid.bleGenFilterList()
    116         self.pri_dut.droid.bleSetScanSettingsScanMode(
    117             ble_scan_settings_modes['low_latency'])
    118         scan_settings = self.pri_dut.droid.bleBuildScanSetting()
    119         scan_callback = self.pri_dut.droid.bleGenScanCallback()
    120         for android_device, name in self.advertisement_names:
    121             self.pri_dut.droid.bleSetScanFilterDeviceName(name)
    122             self.pri_dut.droid.bleBuildScanFilter(filter_list)
    123         self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings,
    124                                            scan_callback)
    125         address_list = []
    126         devices_found = []
    127         # Set the scan time out to 20 sec to provide enough time to discover the
    128         # devices in busy environment
    129         scan_timeout = 20
    130         end_time = time.time() + scan_timeout
    131         while time.time() < end_time and len(address_list) < len(
    132                 self.advertisement_names):
    133             try:
    134                 event = self.pri_dut.ed.pop_event(
    135                     "BleScan{}onScanResults".format(scan_callback),
    136                     self.bt_default_timeout)
    137 
    138                 adv_name = event['data']['Result']['deviceInfo']['name']
    139                 mac_address = event['data']['Result']['deviceInfo']['address']
    140                 # Look up the android device handle based on event name
    141                 device = [
    142                     item for item in self.advertisement_names
    143                     if adv_name in item
    144                 ]
    145                 devices_found.append(device[0][0].serial)
    146                 if len(device) is not 0:
    147                     address_list_tuple = (device[0][0], mac_address)
    148                 else:
    149                     continue
    150                 result = [item for item in address_list if mac_address in item]
    151                 # if length of result is 0, it indicates that we have discovered
    152                 # new mac address.
    153                 if len(result) is 0:
    154                     self.log.info("Found new mac address: {}".format(
    155                         address_list_tuple[1]))
    156                     address_list.append(address_list_tuple)
    157             except Empty as err:
    158                 self.log.error("Failed to find any scan results.")
    159                 return None
    160         if len(address_list) < self.max_connections:
    161             self.log.info("Only found these devices: {}".format(devices_found))
    162             self.log.error("Could not find all necessary advertisements.")
    163             return None
    164         return address_list
    165 
    166     @BluetoothBaseTest.bt_test_wrap
    167     @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f')
    168     def test_concurrent_gatt_connections(self):
    169         """Test max concurrent GATT connections
    170 
    171         Connect to all peripherals.
    172 
    173         Steps:
    174         1. Scan
    175         2. Save addresses
    176         3. Connect all addresses of the peripherals
    177 
    178         Expected Result:
    179         All connections successful.
    180 
    181         Returns:
    182           Pass if True
    183           Fail if False
    184 
    185         TAGS: Bluetooth, GATT
    186         Priority: 2
    187         """
    188 
    189         address_list = self.obtain_address_list_from_scan()
    190         if address_list is None:
    191             return False
    192 
    193         # Connect to all addresses
    194         for address_tuple in address_list:
    195             address = address_tuple[1]
    196             try:
    197                 autoconnect = False
    198                 bluetooth_gatt, gatt_callback = setup_gatt_connection(
    199                     self.pri_dut, address, autoconnect)
    200                 self.log.info("Successfully connected to {}".format(address))
    201             except Exception as err:
    202                 self.log.error(
    203                     "Failed to establish connection to {}".format(address))
    204                 return False
    205         if (len(
    206                 self.pri_dut.droid.bluetoothGetConnectedLeDevices(
    207                     bt_profile_constants['gatt_server'])) !=
    208                 self.max_connections):
    209             self.log.error("Did not reach max connection count.")
    210             return False
    211 
    212         return True
    213 
    214     @BluetoothBaseTest.bt_test_wrap
    215     @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f')
    216     def test_data_transfer_to_concurrent_gatt_connections(self):
    217         """Test writing GATT descriptors concurrently to many peripherals.
    218 
    219         Connect to all peripherals and write gatt descriptors concurrently.
    220 
    221 
    222         Steps:
    223         1. Scan the addresses by names
    224         2. Save mac addresses of the peripherals
    225         3. Connect all addresses of the peripherals and write gatt descriptors
    226 
    227 
    228         Expected Result:
    229         All connections and data transfers are successful.
    230 
    231         Returns:
    232           Pass if True
    233           Fail if False
    234 
    235         TAGS: Bluetooth, GATT
    236         Priority: 2
    237         """
    238 
    239         address_list = self.obtain_address_list_from_scan()
    240         if address_list is None:
    241             return False
    242 
    243         # Connect to all addresses
    244         executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    245 
    246         for address_tuple in address_list:
    247             ad, address = address_tuple
    248 
    249             gatts = GattServerLib(log=self.log, dut=ad)
    250             gatt_server, gatt_server_callback = gatts.setup_gatts_db(
    251                 database=gatt_server_read_descriptor_sample)
    252 
    253             try:
    254                 bluetooth_gatt, gatt_callback = setup_gatt_connection(
    255                     self.pri_dut, address, autoconnect=False)
    256                 self.log.info("Successfully connected to {}".format(address))
    257 
    258             except Exception as err:
    259                 self.log.error(
    260                     "Failed to establish connection to {}".format(address))
    261                 return False
    262 
    263             if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt):
    264                 event = self.pri_dut.ed.pop_event(
    265                     "GattConnect{}onServicesDiscovered".format(bluetooth_gatt),
    266                     self.bt_default_timeout)
    267                 discovered_services_index = event['data']['ServicesIndex']
    268             else:
    269                 self.log.info("Failed to discover services.")
    270                 return False
    271             services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount(
    272                 discovered_services_index)
    273 
    274             arguments_list = [
    275                 self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed,
    276                 gatt_server, gatt_server_callback, bluetooth_gatt,
    277                 services_count, discovered_services_index, 100
    278             ]
    279             self.list_of_arguments_list.append(arguments_list)
    280 
    281         for arguments_list in self.list_of_arguments_list:
    282             executor.submit(run_continuous_write_descriptor, *arguments_list)
    283 
    284         executor.shutdown(wait=True)
    285 
    286         if (len(
    287                 self.pri_dut.droid.bluetoothGetConnectedLeDevices(
    288                     bt_profile_constants['gatt_server'])) !=
    289                 self.max_connections):
    290             self.log.error("Failed to write concurrently.")
    291             return False
    292 
    293         return True
    294