Home | History | Annotate | Download | only in scripts
      1 #
      2 # Copyright 2015 The Android Open Source Project
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 # http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 #
     16 
     17 """Script for sending testing parameters and commands to a Bluetooth device.
     18 
     19 This script provides a simple shell interface for sending data at run-time to a
     20 Bluetooth device. It is intended to be used in tandem with the test vendor
     21 library project.
     22 
     23 Usage:
     24   Option A: Script
     25     1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the
     26     port to use for the test channel.
     27   Option B: Manual
     28     1. Choose a port to use for the test channel. Use 'adb forward tcp:<port>
     29     tcp:<port>' to forward the port to the device.
     30     2. In a separate shell, build and push the test vendor library to the device
     31     using the script mentioned in option A (i.e. without the --test-channel flag
     32     set).
     33     3. Once logcat has started, turn Bluetooth on from the device.
     34     4. Run this program, in the shell from step 1,  the port, also from step 1,
     35     as arguments.
     36 """
     37 
     38 #!/usr/bin/env python
     39 
     40 import cmd
     41 import random
     42 import socket
     43 import string
     44 import struct
     45 import sys
     46 
     47 DEVICE_NAME_LENGTH = 6
     48 DEVICE_ADDRESS_LENGTH = 6
     49 
     50 # Used to generate fake device names and addresses during discovery.
     51 def generate_random_name():
     52   return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
     53     string.digits) for _ in range(DEVICE_NAME_LENGTH))
     54 
     55 def generate_random_address():
     56   return ''.join(random.SystemRandom().choice(string.digits) for _ in \
     57     range(DEVICE_ADDRESS_LENGTH))
     58 
     59 class Connection(object):
     60   """Simple wrapper class for a socket object.
     61 
     62   Attributes:
     63     socket: The underlying socket created for the specified address and port.
     64   """
     65 
     66   def __init__(self, port):
     67     self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     68     self._socket.connect(('localhost', port))
     69 
     70   def close(self):
     71     self._socket.close()
     72 
     73   def send(self, data):
     74     self._socket.sendall(data)
     75 
     76 class TestChannel(object):
     77   """Checks outgoing commands and sends them once verified.
     78 
     79   Attributes:
     80     connection: The connection to the test vendor library that commands are sent
     81     on.
     82   """
     83 
     84   def __init__(self, port):
     85     self._connection = Connection(port)
     86     self._discovered_devices = DeviceManager()
     87 
     88   def discover_new_device(self, name=None, address=None):
     89     device = Device(name, address)
     90     self._discovered_devices.add_device(device)
     91     return device
     92 
     93   def close(self):
     94     self._connection.close()
     95 
     96   def send_command(self, name, args):
     97     name_size = len(name)
     98     args_size = len(args)
     99     self.lint_command(name, args, name_size, args_size)
    100     encoded_name = chr(name_size) + name
    101     encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args)
    102     command = encoded_name + encoded_args
    103     self._connection.send(command)
    104 
    105   def lint_command(self, name, args, name_size, args_size):
    106     assert name_size == len(name) and args_size == len(args)
    107     try:
    108       name.encode('utf-8')
    109       for arg in args:
    110         arg.encode('utf-8')
    111     except UnicodeError:
    112       print 'Unrecognized characters.'
    113       raise
    114     if name_size > 255 or args_size > 255:
    115       raise ValueError  # Size must be encodable in one octet.
    116     for arg in args:
    117       if len(arg) > 255:
    118         raise ValueError  # Size must be encodable in one octet.
    119 
    120 class DeviceManager(object):
    121   """Maintains the active fake devices that have been "discovered".
    122 
    123   Attributes:
    124     device_list: Maps device addresses (keys) to devices (values).
    125   """
    126 
    127   def __init__(self):
    128     self.device_list = {}
    129 
    130   def add_device(self, device):
    131     self.device_list[device.get_address()] = device
    132 
    133 class Device(object):
    134   """A fake device to be returned in inquiry and scan results. Note that if an
    135   explicit name or address is not provided, a random string of characters
    136   is used.
    137 
    138   Attributes:
    139     name: The device name for use in extended results.
    140     address: The BD address of the device.
    141   """
    142 
    143   def __init__(self, name=None, address=None):
    144     # TODO(dennischeng): Generate device properties more robustly.
    145     self._name = generate_random_name() if name is None else name
    146     self._address = generate_random_address() if address is None else address
    147 
    148   def get_name(self):
    149     return self._name
    150 
    151   def get_address(self):
    152     return self._address
    153 
    154 class TestChannelShell(cmd.Cmd):
    155   """Shell for sending test channel data to controller.
    156 
    157   Manages the test channel to the controller and defines a set of commands the
    158   user can send to the controller as well. These commands are processed parallel
    159   to commands sent from the device stack and used to provide additional
    160   debugging/testing capabilities.
    161 
    162   Attributes:
    163     test_channel: The communication channel to send data to the controller.
    164   """
    165 
    166   def __init__(self, test_channel):
    167     print 'Type \'help\' for more information.'
    168     cmd.Cmd.__init__(self)
    169     self._test_channel = test_channel
    170 
    171   def do_clear(self, args):
    172     """
    173     Arguments: None.
    174     Resets the controller to its original, unmodified state.
    175     """
    176     self._test_channel.send_command('CLEAR', [])
    177 
    178   def do_clear_event_delay(self, args):
    179     """
    180     Arguments: None.
    181     Clears the response delay set by set_event_delay.
    182     """
    183     self._test_channel.send_command('CLEAR_EVENT_DELAY', args.split())
    184 
    185   def do_discover(self, args):
    186     """
    187     Arguments: name_1 name_2 ...
    188     Sends an inquiry result for named device(s). If no names are provided, a
    189     random name is used instead.
    190     """
    191     if len(args) == 0:
    192       args = generate_random_name()
    193     device_list = [self._test_channel.discover_new_device(arg) for arg in \
    194                    args.split()]
    195     device_names_and_addresses = []
    196     for device in device_list:
    197       device_names_and_addresses.append(device.get_name())
    198       device_names_and_addresses.append(device.get_address())
    199     self._test_channel.send_command('DISCOVER', device_names_and_addresses)
    200 
    201   def do_set_event_delay(self, args):
    202     """
    203     Arguments: interval_in_ms
    204     Sets the response delay for all event packets sent from the controller back
    205     to the HCI.
    206     """
    207     self._test_channel.send_command('SET_EVENT_DELAY', args.split())
    208 
    209   def do_timeout_all(self, args):
    210     """
    211     Arguments: None.
    212     Causes all HCI commands to timeout.
    213     """
    214     self._test_channel.send_command('TIMEOUT_ALL', [])
    215 
    216   def do_quit(self, args):
    217     """
    218     Arguments: None.
    219     Exits the test channel.
    220     """
    221     self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
    222     self._test_channel.close()
    223     print 'Goodbye.'
    224     return True
    225 
    226 def main(argv):
    227   if len(argv) != 2:
    228     print 'Usage: python test_channel.py [port]'
    229     return
    230   try:
    231     port = int(argv[1])
    232   except ValueError:
    233     print 'Error parsing port.'
    234   else:
    235     try:
    236       test_channel = TestChannel(port)
    237     except socket.error, e:
    238       print 'Error connecting to socket: %s' % e
    239     except:
    240       print 'Error creating test channel (check argument).'
    241     else:
    242       test_channel_shell = TestChannelShell(test_channel)
    243       test_channel_shell.prompt = '$ '
    244       test_channel_shell.cmdloop()
    245 
    246 if __name__ == '__main__':
    247   main(sys.argv)
    248