Home | History | Annotate | Download | only in bluetooth
      1 #!/usr/bin/env python
      2 
      3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 import base64
      8 import json
      9 import logging
     10 import logging.handlers
     11 
     12 import common
     13 from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_sdp_socket
     14 from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket
     15 from autotest_lib.client.cros import constants
     16 from autotest_lib.client.cros import xmlrpc_server
     17 
     18 
     19 class BluetoothTesterXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate):
     20     """Exposes Tester methods called remotely during Bluetooth autotests.
     21 
     22     All instance methods of this object without a preceding '_' are exposed via
     23     an XML-RPC server. This is not a stateless handler object, which means that
     24     if you store state inside the delegate, that state will remain around for
     25     future calls.
     26     """
     27 
     28     BR_EDR_LE_PROFILE = (
     29             bluetooth_socket.MGMT_SETTING_POWERED |
     30             bluetooth_socket.MGMT_SETTING_CONNECTABLE |
     31             bluetooth_socket.MGMT_SETTING_PAIRABLE |
     32             bluetooth_socket.MGMT_SETTING_SSP |
     33             bluetooth_socket.MGMT_SETTING_BREDR |
     34             bluetooth_socket.MGMT_SETTING_LE)
     35 
     36     LE_PROFILE = (
     37             bluetooth_socket.MGMT_SETTING_POWERED |
     38             bluetooth_socket.MGMT_SETTING_CONNECTABLE |
     39             bluetooth_socket.MGMT_SETTING_PAIRABLE |
     40             bluetooth_socket.MGMT_SETTING_LE)
     41 
     42     PROFILE_SETTINGS = {
     43         'computer': BR_EDR_LE_PROFILE,
     44         'peripheral': LE_PROFILE
     45     }
     46 
     47     PROFILE_CLASS = {
     48         'computer': 0x000104,
     49         'peripheral': None
     50     }
     51 
     52     PROFILE_NAMES = {
     53         'computer': ('ChromeOS Bluetooth Tester', 'Tester'),
     54         'peripheral': ('ChromeOS Bluetooth Tester', 'Tester')
     55     }
     56 
     57 
     58     def __init__(self):
     59         super(BluetoothTesterXmlRpcDelegate, self).__init__()
     60 
     61         # Open the Bluetooth Control socket to the kernel which provides us
     62         # the needed raw management access to the Bluetooth Host Subsystem.
     63         self._control = bluetooth_socket.BluetoothControlSocket()
     64         # Open the Bluetooth SDP socket to the kernel which provides us the
     65         # needed interface to use SDP commands.
     66         self._sdp = bluetooth_sdp_socket.BluetoothSDPSocket()
     67         # This is almost a constant, but it might not be forever.
     68         self.index = 0
     69 
     70 
     71     def setup(self, profile):
     72         """Set up the tester with the given profile.
     73 
     74         @param profile: Profile to use for this test, valid values are:
     75                 computer - a standard computer profile
     76 
     77         @return True on success, False otherwise.
     78 
     79         """
     80         profile_settings = self.PROFILE_SETTINGS[profile]
     81         profile_class = self.PROFILE_CLASS[profile]
     82         (profile_name, profile_short_name) = self.PROFILE_NAMES[profile]
     83 
     84         # Make sure the controller actually exists.
     85         if self.index not in self._control.read_index_list():
     86             logging.warning('Bluetooth Controller missing on tester')
     87             return False
     88 
     89         # Make sure all of the settings are supported by the controller.
     90         ( address, bluetooth_version, manufacturer_id,
     91           supported_settings, current_settings, class_of_device,
     92           name, short_name ) = self._control.read_info(self.index)
     93         if profile_settings & supported_settings != profile_settings:
     94             logging.warning('Controller does not support requested settings')
     95             logging.debug('Supported: %b; Requested: %b', supported_settings,
     96                           profile_settings)
     97             return False
     98 
     99         # Before beginning, force the adapter power off, even if it's already
    100         # off; this is enough to persuade an AP-mode Intel chip to accept
    101         # settings.
    102         if not self._control.set_powered(self.index, False):
    103             logging.warning('Failed to power off adapter to accept settings')
    104             return False
    105 
    106         # Set the controller up as either BR/EDR only, LE only or Dual Mode.
    107         # This is a bit tricky because it rejects commands outright unless
    108         # it's in dual mode, so we actually have to figure out what changes
    109         # we have to make, and we have to turn things on before we turn them
    110         # off.
    111         turn_on = (current_settings ^ profile_settings) & profile_settings
    112         if turn_on & bluetooth_socket.MGMT_SETTING_BREDR:
    113             if self._control.set_bredr(self.index, True) is None:
    114                 logging.warning('Failed to enable BR/EDR')
    115                 return False
    116         if turn_on & bluetooth_socket.MGMT_SETTING_LE:
    117             if self._control.set_le(self.index, True) is None:
    118                 logging.warning('Failed to enable LE')
    119                 return False
    120 
    121         turn_off = (current_settings ^ profile_settings) & current_settings
    122         if turn_off & bluetooth_socket.MGMT_SETTING_BREDR:
    123             if self._control.set_bredr(self.index, False) is None:
    124                 logging.warning('Failed to disable BR/EDR')
    125                 return False
    126         if turn_off & bluetooth_socket.MGMT_SETTING_LE:
    127             if self._control.set_le(self.index, False) is None:
    128                 logging.warning('Failed to disable LE')
    129                 return False
    130 
    131         # Adjust settings that are BR/EDR specific that we need to set before
    132         # powering on the adapter, and would be rejected otherwise.
    133         if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR:
    134             if (self._control.set_link_security(
    135                     self.index,
    136                     (profile_settings &
    137                             bluetooth_socket.MGMT_SETTING_LINK_SECURITY))
    138                         is None):
    139                 logging.warning('Failed to set link security setting')
    140                 return False
    141             if (self._control.set_ssp(
    142                     self.index,
    143                     profile_settings & bluetooth_socket.MGMT_SETTING_SSP)
    144                         is None):
    145                 logging.warning('Failed to set SSP setting')
    146                 return False
    147             if (self._control.set_hs(
    148                     self.index,
    149                     profile_settings & bluetooth_socket.MGMT_SETTING_HS)
    150                         is None):
    151                 logging.warning('Failed to set High Speed setting')
    152                 return False
    153 
    154             # Split our the major and minor class; it's listed as a kernel bug
    155             # that we supply these to the kernel without shifting the bits over
    156             # to take out the CoD format field, so this might have to change
    157             # one day.
    158             major_class = (profile_class & 0x00ff00) >> 8
    159             minor_class = profile_class & 0x0000ff
    160             if (self._control.set_device_class(
    161                     self.index, major_class, minor_class)
    162                         is None):
    163                 logging.warning('Failed to set device class')
    164                 return False
    165 
    166         # Setup generic settings that apply to either BR/EDR, LE or dual-mode
    167         # that still require the power to be off.
    168         if (self._control.set_connectable(
    169                 self.index,
    170                 profile_settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE)
    171                     is None):
    172             logging.warning('Failed to set connectable setting')
    173             return False
    174         if (self._control.set_pairable(
    175                 self.index,
    176                 profile_settings & bluetooth_socket.MGMT_SETTING_PAIRABLE)
    177                     is None):
    178             logging.warning('Failed to set pairable setting')
    179             return False
    180 
    181         if (self._control.set_local_name(
    182                     self.index, profile_name, profile_short_name)
    183                     is None):
    184             logging.warning('Failed to set local name')
    185             return False
    186 
    187         # Now the settings have been set, power up the adapter.
    188         if not self._control.set_powered(
    189                 self.index,
    190                 profile_settings & bluetooth_socket.MGMT_SETTING_POWERED):
    191             logging.warning('Failed to set powered setting')
    192             return False
    193 
    194         # Fast connectable can only be set once the controller is powered,
    195         # and only when BR/EDR is enabled.
    196         if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR:
    197             # Wait for the device class set event, this happens after the
    198             # power up "command complete" event when we've pre-set the class
    199             # even though it's a side-effect of doing that.
    200             self._control.wait_for_events(
    201                     self.index,
    202                     ( bluetooth_socket.MGMT_EV_CLASS_OF_DEV_CHANGED, ))
    203 
    204             if (self._control.set_fast_connectable(
    205                     self.index,
    206                     profile_settings &
    207                     bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE)
    208                         is None):
    209                 logging.warning('Failed to set fast connectable setting')
    210                 return False
    211 
    212         # Fetch the settings again and make sure they're all set correctly,
    213         # including the BR/EDR flag.
    214         ( address, bluetooth_version, manufacturer_id,
    215           supported_settings, current_settings, class_of_device,
    216           name, short_name ) = self._control.read_info(self.index)
    217 
    218         # Check generic settings.
    219         if profile_settings != current_settings:
    220             logging.warning('Controller settings did not match those set: '
    221                             '%x != %x', current_settings, profile_settings)
    222             return False
    223         if name != profile_name:
    224             logging.warning('Local name did not match that set: "%s" != "%s"',
    225                             name, profile_name)
    226             return False
    227         elif short_name != profile_short_name:
    228             logging.warning('Short name did not match that set: "%s" != "%s"',
    229                             short_name, profile_short_name)
    230             return False
    231 
    232         # Check BR/EDR specific settings.
    233         if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR:
    234             if class_of_device != profile_class:
    235                 if class_of_device & 0x00ffff == profile_class & 0x00ffff:
    236                     logging.warning('Class of device matched that set, but '
    237                                     'Service Class field did not: %x != %x '
    238                                     'Reboot Tester? ',
    239                                     class_of_device, profile_class)
    240                 else:
    241                     logging.warning('Class of device did not match that set: '
    242                                     '%x != %x', class_of_device, profile_class)
    243                 return False
    244 
    245         return True
    246 
    247 
    248     def set_discoverable(self, discoverable, timeout=0):
    249         """Set the discoverable state of the controller.
    250 
    251         @param discoverable: Whether controller should be discoverable.
    252         @param timeout: Timeout in seconds before disabling discovery again,
    253                 ignored when discoverable is False, must not be zero when
    254                 discoverable is True.
    255 
    256         @return True on success, False otherwise.
    257 
    258         """
    259         settings = self._control.set_discoverable(self.index,
    260                                                   discoverable, timeout)
    261         return settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE
    262 
    263 
    264     def read_info(self):
    265         """Read the adapter information from the Kernel.
    266 
    267         @return the information as a JSON-encoded tuple of:
    268           ( address, bluetooth_version, manufacturer_id,
    269             supported_settings, current_settings, class_of_device,
    270             name, short_name )
    271 
    272         """
    273         return json.dumps(self._control.read_info(self.index))
    274 
    275 
    276     def set_advertising(self, advertising):
    277         """Set the whether the controller is advertising via LE.
    278 
    279         @param advertising: Whether controller should advertise via LE.
    280 
    281         @return True on success, False otherwise.
    282 
    283         """
    284         settings = self._control.set_advertising(self.index, advertising)
    285         return settings & bluetooth_socket.MGMT_SETTING_ADVERTISING
    286 
    287 
    288     def discover_devices(self, br_edr=True, le_public=True, le_random=True):
    289         """Discover remote devices.
    290 
    291         Activates device discovery and collects the set of devices found,
    292         returning them as a list.
    293 
    294         @param br_edr: Whether to detect BR/EDR devices.
    295         @param le_public: Whether to detect LE Public Address devices.
    296         @param le_random: Whether to detect LE Random Address devices.
    297 
    298         @return List of devices found as JSON-encoded tuples with the format
    299                 (address, address_type, rssi, flags, base64-encoded eirdata),
    300                 or False if discovery could not be started.
    301 
    302         """
    303         address_type = 0
    304         if br_edr:
    305             address_type |= 0x1
    306         if le_public:
    307             address_type |= 0x2
    308         if le_random:
    309             address_type |= 0x4
    310 
    311         set_type = self._control.start_discovery(self.index, address_type)
    312         if set_type != address_type:
    313             logging.warning('Discovery address type did not match that set: '
    314                             '%x != %x', set_type, address_type)
    315             return False
    316 
    317         devices = self._control.get_discovered_devices(self.index)
    318         return json.dumps([
    319                 (address, address_type, rssi, flags,
    320                  base64.encodestring(eirdata))
    321                 for address, address_type, rssi, flags, eirdata in devices
    322         ])
    323 
    324 
    325     def connect(self, address):
    326         """Connect to device with the given address
    327 
    328         @param address: Bluetooth address.
    329 
    330         """
    331         self._sdp.connect(address)
    332         return True
    333 
    334 
    335     def service_search_request(self, uuids, max_rec_cnt, preferred_size=32,
    336                                forced_pdu_size=None, invalid_request=False):
    337         """Send a Service Search Request
    338 
    339         @param uuids: List of UUIDs (as integers) to look for.
    340         @param max_rec_cnt: Maximum count of returned service records.
    341         @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128).
    342         @param forced_pdu_size: Use certain PDU size parameter instead of
    343                calculating actual length of sequence.
    344         @param invalid_request: Whether to send request with intentionally
    345                invalid syntax for testing purposes (bool flag).
    346 
    347         @return list of found services' service record handles or Error Code
    348 
    349         """
    350         return json.dumps(
    351                 self._sdp.service_search_request(
    352                  uuids, max_rec_cnt, preferred_size, forced_pdu_size,
    353                  invalid_request)
    354         )
    355 
    356 
    357     def service_attribute_request(self, handle, max_attr_byte_count, attr_ids,
    358                                   forced_pdu_size=None, invalid_request=None):
    359         """Send a Service Attribute Request
    360 
    361         @param handle: service record from which attribute values are to be
    362                retrieved.
    363         @param max_attr_byte_count: maximum number of bytes of attribute data to
    364                be returned in the response to this request.
    365         @param attr_ids: a list, where each element is either an attribute ID
    366                or a range of attribute IDs.
    367         @param forced_pdu_size: Use certain PDU size parameter instead of
    368                calculating actual length of sequence.
    369         @param invalid_request: Whether to send request with intentionally
    370                invalid syntax for testing purposes (string with raw request).
    371 
    372         @return list of found attributes IDs and their values or Error Code
    373 
    374         """
    375         return json.dumps(
    376                 self._sdp.service_attribute_request(
    377                  handle, max_attr_byte_count, attr_ids, forced_pdu_size,
    378                  invalid_request)
    379         )
    380 
    381 
    382     def service_search_attribute_request(self, uuids, max_attr_byte_count,
    383                                          attr_ids, preferred_size=32,
    384                                          forced_pdu_size=None,
    385                                          invalid_request=None):
    386         """Send a Service Search Attribute Request
    387 
    388         @param uuids: list of UUIDs (as integers) to look for.
    389         @param max_attr_byte_count: maximum number of bytes of attribute data to
    390                be returned in the response to this request.
    391         @param attr_ids: a list, where each element is either an attribute ID
    392                or a range of attribute IDs.
    393         @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128).
    394         @param forced_pdu_size: Use certain PDU size parameter instead of
    395                calculating actual length of sequence.
    396         @param invalid_request: Whether to send request with intentionally
    397                invalid syntax for testing purposes (string to be prepended
    398                to correct request).
    399 
    400         @return list of found attributes IDs and their values or Error Code
    401 
    402         """
    403         return json.dumps(
    404                 self._sdp.service_search_attribute_request(
    405                  uuids, max_attr_byte_count, attr_ids, preferred_size,
    406                  forced_pdu_size, invalid_request)
    407         )
    408 
    409 
    410 if __name__ == '__main__':
    411     logging.basicConfig(level=logging.DEBUG)
    412     handler = logging.handlers.SysLogHandler(address = '/dev/log')
    413     formatter = logging.Formatter(
    414             'bluetooth_tester_xmlrpc_server: [%(levelname)s] %(message)s')
    415     handler.setFormatter(formatter)
    416     logging.getLogger().addHandler(handler)
    417     logging.debug('bluetooth_tester_xmlrpc_server main...')
    418     server = xmlrpc_server.XmlRpcServer(
    419             'localhost',
    420             constants.BLUETOOTH_TESTER_XMLRPC_SERVER_PORT)
    421     server.register_delegate(BluetoothTesterXmlRpcDelegate())
    422     server.run()
    423