1 # 2 # Copyright 2015 The Android Open Source Project 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 17 """Script for sending testing parameters and commands to a Bluetooth device. 18 19 This script provides a simple shell interface for sending data at run-time to a 20 Bluetooth device. It is intended to be used in tandem with the test vendor 21 library project. 22 23 Usage: 24 Option A: Script 25 1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the 26 port to use for the test channel. 27 Option B: Manual 28 1. Choose a port to use for the test channel. Use 'adb forward tcp:<port> 29 tcp:<port>' to forward the port to the device. 30 2. In a separate shell, build and push the test vendor library to the device 31 using the script mentioned in option A (i.e. without the --test-channel flag 32 set). 33 3. Once logcat has started, turn Bluetooth on from the device. 34 4. Run this program, in the shell from step 1, the port, also from step 1, 35 as arguments. 36 """ 37 38 #!/usr/bin/env python 39 40 import cmd 41 import random 42 import socket 43 import string 44 import struct 45 import sys 46 47 DEVICE_NAME_LENGTH = 6 48 DEVICE_ADDRESS_LENGTH = 6 49 50 # Used to generate fake device names and addresses during discovery. 51 def generate_random_name(): 52 return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ 53 string.digits) for _ in range(DEVICE_NAME_LENGTH)) 54 55 def generate_random_address(): 56 return ''.join(random.SystemRandom().choice(string.digits) for _ in \ 57 range(DEVICE_ADDRESS_LENGTH)) 58 59 class Connection(object): 60 """Simple wrapper class for a socket object. 61 62 Attributes: 63 socket: The underlying socket created for the specified address and port. 64 """ 65 66 def __init__(self, port): 67 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 68 self._socket.connect(('localhost', port)) 69 70 def close(self): 71 self._socket.close() 72 73 def send(self, data): 74 self._socket.sendall(data) 75 76 class TestChannel(object): 77 """Checks outgoing commands and sends them once verified. 78 79 Attributes: 80 connection: The connection to the test vendor library that commands are sent 81 on. 82 """ 83 84 def __init__(self, port): 85 self._connection = Connection(port) 86 self._discovered_devices = DeviceManager() 87 88 def discover_new_device(self, name=None, address=None): 89 device = Device(name, address) 90 self._discovered_devices.add_device(device) 91 return device 92 93 def close(self): 94 self._connection.close() 95 96 def send_command(self, name, args): 97 name_size = len(name) 98 args_size = len(args) 99 self.lint_command(name, args, name_size, args_size) 100 encoded_name = chr(name_size) + name 101 encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args) 102 command = encoded_name + encoded_args 103 self._connection.send(command) 104 105 def lint_command(self, name, args, name_size, args_size): 106 assert name_size == len(name) and args_size == len(args) 107 try: 108 name.encode('utf-8') 109 for arg in args: 110 arg.encode('utf-8') 111 except UnicodeError: 112 print 'Unrecognized characters.' 113 raise 114 if name_size > 255 or args_size > 255: 115 raise ValueError # Size must be encodable in one octet. 116 for arg in args: 117 if len(arg) > 255: 118 raise ValueError # Size must be encodable in one octet. 119 120 class DeviceManager(object): 121 """Maintains the active fake devices that have been "discovered". 122 123 Attributes: 124 device_list: Maps device addresses (keys) to devices (values). 125 """ 126 127 def __init__(self): 128 self.device_list = {} 129 130 def add_device(self, device): 131 self.device_list[device.get_address()] = device 132 133 class Device(object): 134 """A fake device to be returned in inquiry and scan results. Note that if an 135 explicit name or address is not provided, a random string of characters 136 is used. 137 138 Attributes: 139 name: The device name for use in extended results. 140 address: The BD address of the device. 141 """ 142 143 def __init__(self, name=None, address=None): 144 # TODO(dennischeng): Generate device properties more robustly. 145 self._name = generate_random_name() if name is None else name 146 self._address = generate_random_address() if address is None else address 147 148 def get_name(self): 149 return self._name 150 151 def get_address(self): 152 return self._address 153 154 class TestChannelShell(cmd.Cmd): 155 """Shell for sending test channel data to controller. 156 157 Manages the test channel to the controller and defines a set of commands the 158 user can send to the controller as well. These commands are processed parallel 159 to commands sent from the device stack and used to provide additional 160 debugging/testing capabilities. 161 162 Attributes: 163 test_channel: The communication channel to send data to the controller. 164 """ 165 166 def __init__(self, test_channel): 167 print 'Type \'help\' for more information.' 168 cmd.Cmd.__init__(self) 169 self._test_channel = test_channel 170 171 def do_add(self, args): 172 """ 173 Arguments: dev_type_str 174 Add a new device of type dev_type_str. 175 """ 176 self._test_channel.send_command('add', args.split()) 177 178 def do_del(self, args): 179 """ 180 Arguments: device index 181 Delete the device with the specified index. 182 """ 183 self._test_channel.send_command('del', args.split()) 184 185 def do_get(self, args): 186 """ 187 Arguments: dev_num attr_str 188 Get the value of the attribute attr_str from device dev_num. 189 """ 190 self._test_channel.send_command('get', args.split()) 191 192 def do_set(self, args): 193 """ 194 Arguments: dev_num attr_str val 195 Set the value of the attribute attr_str from device dev_num equal to val. 196 """ 197 self._test_channel.send_command('set', args.split()) 198 199 def do_list(self, args): 200 """ 201 Arguments: [dev_num [attr]] 202 List the devices from the controller, optionally filtered by device and attr. 203 """ 204 self._test_channel.send_command('list', args.split()) 205 206 def do_quit(self, args): 207 """ 208 Arguments: None. 209 Exits the test channel. 210 """ 211 self._test_channel.send_command('CLOSE_TEST_CHANNEL', []) 212 self._test_channel.close() 213 print 'Goodbye.' 214 return True 215 216 def do_help(self, args): 217 """ 218 Arguments: [dev_num [attr]] 219 List the commands available, optionally filtered by device and attr. 220 """ 221 self._test_channel.send_command('help', args.split()) 222 if (len(args) == 0): 223 cmd.Cmd.do_help(self, args) 224 225 def main(argv): 226 if len(argv) != 2: 227 print 'Usage: python test_channel.py [port]' 228 return 229 try: 230 port = int(argv[1]) 231 except ValueError: 232 print 'Error parsing port.' 233 else: 234 try: 235 test_channel = TestChannel(port) 236 except socket.error, e: 237 print 'Error connecting to socket: %s' % e 238 except: 239 print 'Error creating test channel (check argument).' 240 else: 241 test_channel_shell = TestChannelShell(test_channel) 242 test_channel_shell.prompt = '$ ' 243 test_channel_shell.cmdloop() 244 245 if __name__ == '__main__': 246 main(sys.argv) 247