Home | History | Annotate | Download | only in scripts
      1 #
      2 # Copyright 2015 The Android Open Source Project
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 # http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 #
     16 
     17 """Script for sending testing parameters and commands to a Bluetooth device.
     18 
     19 This script provides a simple shell interface for sending data at run-time to a
     20 Bluetooth device. It is intended to be used in tandem with the test vendor
     21 library project.
     22 
     23 Usage:
     24   Option A: Script
     25     1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the
     26     port to use for the test channel.
     27   Option B: Manual
     28     1. Choose a port to use for the test channel. Use 'adb forward tcp:<port>
     29     tcp:<port>' to forward the port to the device.
     30     2. In a separate shell, build and push the test vendor library to the device
     31     using the script mentioned in option A (i.e. without the --test-channel flag
     32     set).
     33     3. Once logcat has started, turn Bluetooth on from the device.
     34     4. Run this program, in the shell from step 1,  the port, also from step 1,
     35     as arguments.
     36 """
     37 
     38 #!/usr/bin/env python
     39 
     40 import cmd
     41 import random
     42 import socket
     43 import string
     44 import struct
     45 import sys
     46 
     47 DEVICE_NAME_LENGTH = 6
     48 DEVICE_ADDRESS_LENGTH = 6
     49 
     50 # Used to generate fake device names and addresses during discovery.
     51 def generate_random_name():
     52   return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
     53     string.digits) for _ in range(DEVICE_NAME_LENGTH))
     54 
     55 def generate_random_address():
     56   return ''.join(random.SystemRandom().choice(string.digits) for _ in \
     57     range(DEVICE_ADDRESS_LENGTH))
     58 
     59 class Connection(object):
     60   """Simple wrapper class for a socket object.
     61 
     62   Attributes:
     63     socket: The underlying socket created for the specified address and port.
     64   """
     65 
     66   def __init__(self, port):
     67     self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     68     self._socket.connect(('localhost', port))
     69 
     70   def close(self):
     71     self._socket.close()
     72 
     73   def send(self, data):
     74     self._socket.sendall(data)
     75 
     76 class TestChannel(object):
     77   """Checks outgoing commands and sends them once verified.
     78 
     79   Attributes:
     80     connection: The connection to the test vendor library that commands are sent
     81     on.
     82   """
     83 
     84   def __init__(self, port):
     85     self._connection = Connection(port)
     86     self._discovered_devices = DeviceManager()
     87 
     88   def discover_new_device(self, name=None, address=None):
     89     device = Device(name, address)
     90     self._discovered_devices.add_device(device)
     91     return device
     92 
     93   def close(self):
     94     self._connection.close()
     95 
     96   def send_command(self, name, args):
     97     name_size = len(name)
     98     args_size = len(args)
     99     self.lint_command(name, args, name_size, args_size)
    100     encoded_name = chr(name_size) + name
    101     encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args)
    102     command = encoded_name + encoded_args
    103     self._connection.send(command)
    104 
    105   def lint_command(self, name, args, name_size, args_size):
    106     assert name_size == len(name) and args_size == len(args)
    107     try:
    108       name.encode('utf-8')
    109       for arg in args:
    110         arg.encode('utf-8')
    111     except UnicodeError:
    112       print 'Unrecognized characters.'
    113       raise
    114     if name_size > 255 or args_size > 255:
    115       raise ValueError  # Size must be encodable in one octet.
    116     for arg in args:
    117       if len(arg) > 255:
    118         raise ValueError  # Size must be encodable in one octet.
    119 
    120 class DeviceManager(object):
    121   """Maintains the active fake devices that have been "discovered".
    122 
    123   Attributes:
    124     device_list: Maps device addresses (keys) to devices (values).
    125   """
    126 
    127   def __init__(self):
    128     self.device_list = {}
    129 
    130   def add_device(self, device):
    131     self.device_list[device.get_address()] = device
    132 
    133 class Device(object):
    134   """A fake device to be returned in inquiry and scan results. Note that if an
    135   explicit name or address is not provided, a random string of characters
    136   is used.
    137 
    138   Attributes:
    139     name: The device name for use in extended results.
    140     address: The BD address of the device.
    141   """
    142 
    143   def __init__(self, name=None, address=None):
    144     # TODO(dennischeng): Generate device properties more robustly.
    145     self._name = generate_random_name() if name is None else name
    146     self._address = generate_random_address() if address is None else address
    147 
    148   def get_name(self):
    149     return self._name
    150 
    151   def get_address(self):
    152     return self._address
    153 
    154 class TestChannelShell(cmd.Cmd):
    155   """Shell for sending test channel data to controller.
    156 
    157   Manages the test channel to the controller and defines a set of commands the
    158   user can send to the controller as well. These commands are processed parallel
    159   to commands sent from the device stack and used to provide additional
    160   debugging/testing capabilities.
    161 
    162   Attributes:
    163     test_channel: The communication channel to send data to the controller.
    164   """
    165 
    166   def __init__(self, test_channel):
    167     print 'Type \'help\' for more information.'
    168     cmd.Cmd.__init__(self)
    169     self._test_channel = test_channel
    170 
    171   def do_add(self, args):
    172     """
    173     Arguments: dev_type_str
    174     Add a new device of type dev_type_str.
    175     """
    176     self._test_channel.send_command('add', args.split())
    177 
    178   def do_del(self, args):
    179     """
    180     Arguments: device index
    181     Delete the device with the specified index.
    182     """
    183     self._test_channel.send_command('del', args.split())
    184 
    185   def do_get(self, args):
    186     """
    187     Arguments: dev_num attr_str
    188     Get the value of the attribute attr_str from device dev_num.
    189     """
    190     self._test_channel.send_command('get', args.split())
    191 
    192   def do_set(self, args):
    193     """
    194     Arguments: dev_num attr_str val
    195     Set the value of the attribute attr_str from device dev_num equal to val.
    196     """
    197     self._test_channel.send_command('set', args.split())
    198 
    199   def do_list(self, args):
    200     """
    201     Arguments: [dev_num [attr]]
    202     List the devices from the controller, optionally filtered by device and attr.
    203     """
    204     self._test_channel.send_command('list', args.split())
    205 
    206   def do_quit(self, args):
    207     """
    208     Arguments: None.
    209     Exits the test channel.
    210     """
    211     self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
    212     self._test_channel.close()
    213     print 'Goodbye.'
    214     return True
    215 
    216   def do_help(self, args):
    217     """
    218     Arguments: [dev_num [attr]]
    219     List the commands available, optionally filtered by device and attr.
    220     """
    221     self._test_channel.send_command('help', args.split())
    222     if (len(args) == 0):
    223       cmd.Cmd.do_help(self, args)
    224 
    225 def main(argv):
    226   if len(argv) != 2:
    227     print 'Usage: python test_channel.py [port]'
    228     return
    229   try:
    230     port = int(argv[1])
    231   except ValueError:
    232     print 'Error parsing port.'
    233   else:
    234     try:
    235       test_channel = TestChannel(port)
    236     except socket.error, e:
    237       print 'Error connecting to socket: %s' % e
    238     except:
    239       print 'Error creating test channel (check argument).'
    240     else:
    241       test_channel_shell = TestChannelShell(test_channel)
    242       test_channel_shell.prompt = '$ '
    243       test_channel_shell.cmdloop()
    244 
    245 if __name__ == '__main__':
    246   main(sys.argv)
    247