1 # 2 # Copyright 2015 The Android Open Source Project 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 17 """Script for sending testing parameters and commands to a Bluetooth device. 18 19 This script provides a simple shell interface for sending data at run-time to a 20 Bluetooth device. It is intended to be used in tandem with the test vendor 21 library project. 22 23 Usage: 24 Option A: Script 25 1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the 26 port to use for the test channel. 27 Option B: Manual 28 1. Choose a port to use for the test channel. Use 'adb forward tcp:<port> 29 tcp:<port>' to forward the port to the device. 30 2. In a separate shell, build and push the test vendor library to the device 31 using the script mentioned in option A (i.e. without the --test-channel flag 32 set). 33 3. Once logcat has started, turn Bluetooth on from the device. 34 4. Run this program, in the shell from step 1, the port, also from step 1, 35 as arguments. 36 """ 37 38 #!/usr/bin/env python 39 40 import cmd 41 import random 42 import socket 43 import string 44 import struct 45 import sys 46 47 DEVICE_NAME_LENGTH = 6 48 DEVICE_ADDRESS_LENGTH = 6 49 50 # Used to generate fake device names and addresses during discovery. 51 def generate_random_name(): 52 return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ 53 string.digits) for _ in range(DEVICE_NAME_LENGTH)) 54 55 def generate_random_address(): 56 return ''.join(random.SystemRandom().choice(string.digits) for _ in \ 57 range(DEVICE_ADDRESS_LENGTH)) 58 59 class Connection(object): 60 """Simple wrapper class for a socket object. 61 62 Attributes: 63 socket: The underlying socket created for the specified address and port. 64 """ 65 66 def __init__(self, port): 67 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 68 self._socket.connect(('localhost', port)) 69 70 def close(self): 71 self._socket.close() 72 73 def send(self, data): 74 self._socket.sendall(data) 75 76 class TestChannel(object): 77 """Checks outgoing commands and sends them once verified. 78 79 Attributes: 80 connection: The connection to the test vendor library that commands are sent 81 on. 82 """ 83 84 def __init__(self, port): 85 self._connection = Connection(port) 86 self._discovered_devices = DeviceManager() 87 88 def discover_new_device(self, name=None, address=None): 89 device = Device(name, address) 90 self._discovered_devices.add_device(device) 91 return device 92 93 def close(self): 94 self._connection.close() 95 96 def send_command(self, name, args): 97 name_size = len(name) 98 args_size = len(args) 99 self.lint_command(name, args, name_size, args_size) 100 encoded_name = chr(name_size) + name 101 encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args) 102 command = encoded_name + encoded_args 103 self._connection.send(command) 104 105 def lint_command(self, name, args, name_size, args_size): 106 assert name_size == len(name) and args_size == len(args) 107 try: 108 name.encode('utf-8') 109 for arg in args: 110 arg.encode('utf-8') 111 except UnicodeError: 112 print 'Unrecognized characters.' 113 raise 114 if name_size > 255 or args_size > 255: 115 raise ValueError # Size must be encodable in one octet. 116 for arg in args: 117 if len(arg) > 255: 118 raise ValueError # Size must be encodable in one octet. 119 120 class DeviceManager(object): 121 """Maintains the active fake devices that have been "discovered". 122 123 Attributes: 124 device_list: Maps device addresses (keys) to devices (values). 125 """ 126 127 def __init__(self): 128 self.device_list = {} 129 130 def add_device(self, device): 131 self.device_list[device.get_address()] = device 132 133 class Device(object): 134 """A fake device to be returned in inquiry and scan results. Note that if an 135 explicit name or address is not provided, a random string of characters 136 is used. 137 138 Attributes: 139 name: The device name for use in extended results. 140 address: The BD address of the device. 141 """ 142 143 def __init__(self, name=None, address=None): 144 # TODO(dennischeng): Generate device properties more robustly. 145 self._name = generate_random_name() if name is None else name 146 self._address = generate_random_address() if address is None else address 147 148 def get_name(self): 149 return self._name 150 151 def get_address(self): 152 return self._address 153 154 class TestChannelShell(cmd.Cmd): 155 """Shell for sending test channel data to controller. 156 157 Manages the test channel to the controller and defines a set of commands the 158 user can send to the controller as well. These commands are processed parallel 159 to commands sent from the device stack and used to provide additional 160 debugging/testing capabilities. 161 162 Attributes: 163 test_channel: The communication channel to send data to the controller. 164 """ 165 166 def __init__(self, test_channel): 167 print 'Type \'help\' for more information.' 168 cmd.Cmd.__init__(self) 169 self._test_channel = test_channel 170 171 def do_clear(self, args): 172 """ 173 Arguments: None. 174 Resets the controller to its original, unmodified state. 175 """ 176 self._test_channel.send_command('CLEAR', []) 177 178 def do_clear_event_delay(self, args): 179 """ 180 Arguments: None. 181 Clears the response delay set by set_event_delay. 182 """ 183 self._test_channel.send_command('CLEAR_EVENT_DELAY', args.split()) 184 185 def do_discover(self, args): 186 """ 187 Arguments: name_1 name_2 ... 188 Sends an inquiry result for named device(s). If no names are provided, a 189 random name is used instead. 190 """ 191 if len(args) == 0: 192 args = generate_random_name() 193 device_list = [self._test_channel.discover_new_device(arg) for arg in \ 194 args.split()] 195 device_names_and_addresses = [] 196 for device in device_list: 197 device_names_and_addresses.append(device.get_name()) 198 device_names_and_addresses.append(device.get_address()) 199 self._test_channel.send_command('DISCOVER', device_names_and_addresses) 200 201 def do_set_event_delay(self, args): 202 """ 203 Arguments: interval_in_ms 204 Sets the response delay for all event packets sent from the controller back 205 to the HCI. 206 """ 207 self._test_channel.send_command('SET_EVENT_DELAY', args.split()) 208 209 def do_timeout_all(self, args): 210 """ 211 Arguments: None. 212 Causes all HCI commands to timeout. 213 """ 214 self._test_channel.send_command('TIMEOUT_ALL', []) 215 216 def do_quit(self, args): 217 """ 218 Arguments: None. 219 Exits the test channel. 220 """ 221 self._test_channel.send_command('CLOSE_TEST_CHANNEL', []) 222 self._test_channel.close() 223 print 'Goodbye.' 224 return True 225 226 def main(argv): 227 if len(argv) != 2: 228 print 'Usage: python test_channel.py [port]' 229 return 230 try: 231 port = int(argv[1]) 232 except ValueError: 233 print 'Error parsing port.' 234 else: 235 try: 236 test_channel = TestChannel(port) 237 except socket.error, e: 238 print 'Error connecting to socket: %s' % e 239 except: 240 print 'Error creating test channel (check argument).' 241 else: 242 test_channel_shell = TestChannelShell(test_channel) 243 test_channel_shell.prompt = '$ ' 244 test_channel_shell.cmdloop() 245 246 if __name__ == '__main__': 247 main(sys.argv) 248