1 #/usr/bin/env python3.4 2 # 3 # Copyright (C) 2016 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); you may not 6 # use this file except in compliance with the License. You may obtain a copy of 7 # the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14 # License for the specific language governing permissions and limitations under 15 # the License. 16 """ 17 Test script to execute Bluetooth basic functionality test cases. 18 This test was designed to be run in a shield box. 19 """ 20 21 import threading 22 import time 23 from contextlib import suppress 24 from random import randint 25 26 from queue import Empty 27 from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 28 from acts.test_utils.bt.bt_test_utils import reset_bluetooth 29 from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 30 from acts.test_utils.bt.bt_test_utils import rfcomm_accept 31 from acts.test_utils.bt.bt_test_utils import rfcomm_connect 32 from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs 33 from acts.test_utils.bt.bt_test_utils import write_read_verify_data 34 35 class RfcommLongevityTest(BluetoothBaseTest): 36 default_timeout = 10 37 longev_iterations = 200 38 thread_list = [] 39 generic_message = ( 40 "Space: the final frontier. These are the voyages of " 41 "the starship Enterprise. Its continuing mission: to explore " 42 "strange new worlds, to seek out new life and new civilizations," 43 " to boldly go where no man has gone before.") 44 45 def __init__(self, controllers): 46 BluetoothBaseTest.__init__(self, controllers) 47 self.client_ad = self.android_devices[0] 48 self.server_ad = self.android_devices[1] 49 50 def on_fail(self, test_name, begin_time): 51 take_btsnoop_logs(self.android_devices, self, test_name) 52 for _ in range(5): 53 if reset_bluetooth(self.android_devices): 54 break 55 else: 56 self.log.info("Failed to reset bluetooth state, retrying...") 57 58 def teardown_test(self): 59 with suppress(Exception): 60 for thread in self.thread_list: 61 thread.join() 62 for _ in range(5): 63 if reset_bluetooth(self.android_devices): 64 break 65 else: 66 self.log.info("Failed to reset bluetooth state, retrying...") 67 time.sleep(20) #safeguard in case of connId errors 68 69 def orchestrate_rfcomm_connect(self, server_mac): 70 accept_thread = threading.Thread(target=rfcomm_accept, 71 args=(self.server_ad, )) 72 self.thread_list.append(accept_thread) 73 accept_thread.start() 74 connect_thread = threading.Thread( 75 target=rfcomm_connect, 76 args=(self.client_ad, server_mac)) 77 self.thread_list.append(connect_thread) 78 connect_thread.start() 79 80 def test_rfcomm_longev_read_write_message(self): 81 """Longevity test an RFCOMM connection's I/O with a generic message 82 83 Test the longevity of RFCOMM with a basic read/write 84 connect/disconnect sequence. 85 86 Steps: 87 1. Establish a bonding between two Android devices. 88 2. Write data to RFCOMM from the client droid. 89 3. Read data from RFCOMM from the server droid. 90 4. Verify data written matches data read. 91 5. Repeat steps 2-4 5000 times. 92 6. Disconnect RFCOMM connection. 93 7. Repeat steps 1-6 1000 times. 94 95 Expected Result: 96 Each iteration should read and write to the RFCOMM connection 97 successfully. Each connect and disconnect should be successful. 98 99 Returns: 100 Pass if True 101 Fail if False 102 103 TAGS: Classic, Longevity, RFCOMM 104 Priority: 2 105 """ 106 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 107 write_iterations = 5000 108 for i in range(self.longev_iterations): 109 self.log.info("iteration {} connection".format(i)) 110 self.orchestrate_rfcomm_connect(server_mac) 111 for n in range(write_iterations): 112 self.log.info("iteration {} data".format(((n + 1) + ( 113 i * write_iterations)))) 114 if not write_read_verify_data(self.client_ad, self.server_ad, 115 self.generic_message, False): 116 return False 117 self.log.info("Iteration {} completed".format(n)) 118 self.client_ad.droid.bluetoothRfcommStop() 119 self.server_ad.droid.bluetoothRfcommStop() 120 for t in self.thread_list: 121 t.join() 122 self.thread_list.clear() 123 return True 124 125 def test_rfcomm_longev_read_write_small_message(self): 126 """Longevity test an RFCOMM connection's I/O with a small message 127 128 Test the longevity of RFCOMM with a basic read/write 129 connect/disconnect sequence. The data being transfered is only 130 one character in size. 131 132 Steps: 133 1. Establish a bonding between two Android devices. 134 2. Write data to RFCOMM from the client droid. 135 3. Read data from RFCOMM from the server droid. 136 4. Verify data written matches data read. 137 5. Repeat steps 2-4 5000 times. 138 6. Disconnect RFCOMM connection. 139 7. Repeat steps 1-6 1000 times. 140 141 Expected Result: 142 Each iteration should read and write to the RFCOMM connection 143 successfully. Each connect and disconnect should be successful. 144 145 Returns: 146 Pass if True 147 Fail if False 148 149 TAGS: Classic, Longevity, RFCOMM 150 Priority: 2 151 """ 152 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 153 message = "x" 154 write_iterations = 5000 155 for i in range(self.longev_iterations): 156 self.log.info("iteration {} connection".format(i)) 157 self.orchestrate_rfcomm_connect(server_mac) 158 for n in range(write_iterations): 159 self.log.info("iteration {} data".format(((n + 1) + ( 160 i * write_iterations)))) 161 if not write_read_verify_data(self.client_ad, self.server_ad, 162 message, False): 163 return False 164 self.log.info("Iteration {} completed".format(n)) 165 self.client_ad.droid.bluetoothRfcommStop() 166 self.server_ad.droid.bluetoothRfcommStop() 167 for t in self.thread_list: 168 t.join() 169 self.thread_list.clear() 170 return True 171 172 def test_rfcomm_longev_read_write_binary_message(self): 173 """Longevity test an RFCOMM connection's I/O with a binary message 174 175 Test the longevity of RFCOMM with a basic read/write 176 connect/disconnect sequence. The data being transfered is in a 177 binary format. 178 179 Steps: 180 1. Establish a bonding between two Android devices. 181 2. Write data to RFCOMM from the client droid. 182 3. Read data from RFCOMM from the server droid. 183 4. Verify data written matches data read. 184 5. Repeat steps 2-4 5000 times. 185 6. Disconnect RFCOMM connection. 186 7. Repeat steps 1-6 1000 times. 187 188 Expected Result: 189 Each iteration should read and write to the RFCOMM connection 190 successfully. Each connect and disconnect should be successful. 191 192 Returns: 193 Pass if True 194 Fail if False 195 196 TAGS: Classic, Longevity, RFCOMM 197 Priority: 2 198 """ 199 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 200 binary_message = "11010101" 201 write_iterations = 5000 202 for i in range(self.longev_iterations): 203 self.log.info("iteration {} connection".format(i)) 204 self.orchestrate_rfcomm_connect(server_mac) 205 for n in range(write_iterations): 206 self.log.info("iteration {} data".format(((n + 1) + ( 207 i * write_iterations)))) 208 if not write_read_verify_data(self.client_ad, self.server_ad, 209 binary_message, True): 210 return False 211 self.log.info("Iteration {} completed".format(n)) 212 self.client_ad.droid.bluetoothRfcommStop() 213 self.server_ad.droid.bluetoothRfcommStop() 214 for t in self.thread_list: 215 t.join() 216 self.thread_list.clear() 217 return True 218 219 def test_rfcomm_longev_read_write_large_message(self): 220 """Longevity test an RFCOMM connection's I/O with a large message 221 222 Test the longevity of RFCOMM with a basic read/write 223 connect/disconnect sequence. The data being transfered is 990 chars 224 in size. 225 226 Steps: 227 1. Establish a bonding between two Android devices. 228 2. Write data to RFCOMM from the client droid. 229 3. Read data from RFCOMM from the server droid. 230 4. Verify data written matches data read. 231 5. Repeat steps 2-4 5000 times. 232 6. Disconnect RFCOMM connection. 233 7. Repeat steps 1-6 1000 times. 234 235 Expected Result: 236 Each iteration should read and write to the RFCOMM connection 237 successfully. Each connect and disconnect should be successful. 238 239 Returns: 240 Pass if True 241 Fail if False 242 243 TAGS: Classic, Longevity, RFCOMM 244 Priority: 2 245 """ 246 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 247 message = "x" * 990 #largest message size till sl4a fixed 248 write_iterations = 5000 249 for i in range(self.longev_iterations): 250 self.log.info("iteration {} connection".format(i)) 251 self.orchestrate_rfcomm_connect(server_mac) 252 for n in range(write_iterations): 253 self.log.info("iteration {} data".format(((n + 1) + ( 254 i * write_iterations)))) 255 if not write_read_verify_data(self.client_ad, self.server_ad, 256 message, False): 257 return False 258 self.log.info("Iteration {} completed".format(n)) 259 self.client_ad.droid.bluetoothRfcommStop() 260 self.server_ad.droid.bluetoothRfcommStop() 261 for t in self.thread_list: 262 t.join() 263 self.thread_list.clear() 264 return True 265 266 def test_rfcomm_longev_connection_interuption(self): 267 """Longevity test an RFCOMM connection's with socket interuptions 268 269 Test the longevity of RFCOMM with a basic read/write 270 connect/disconnect sequence. Randomly in the sequence of reads and 271 writes the socket on the client side will close. There should be 272 an exception thrown for writing the next set of data and the 273 test should start up a new connection and continue. 274 275 Steps: 276 1. Establish a bonding between two Android devices. 277 2. Write data to RFCOMM from the client droid. 278 3. Read data from RFCOMM from the server droid. 279 4. Verify data written matches data read. 280 5. Repeat steps 2-4 5000 times or until the random interupt occurs. 281 6. Re-establish an RFCOMM connection. 282 7. Repeat steps 1-6 1000 times. 283 284 Expected Result: 285 Each iteration should read and write to the RFCOMM connection 286 successfully. Each connect and disconnect should be successful. 287 Devices should recover a new connection after each interruption. 288 289 Returns: 290 Pass if True 291 Fail if False 292 293 TAGS: Classic, Longevity, RFCOMM 294 Priority: 2 295 """ 296 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 297 write_iterations = 5000 298 for i in range(self.longev_iterations): 299 try: 300 self.log.info("iteration {} connection".format(i)) 301 self.orchestrate_rfcomm_connect(server_mac) 302 random_interup_iteration = randint(0, write_iterations) 303 for n in range(write_iterations): 304 self.log.info("iteration {} data".format(((n + 1) + ( 305 i * write_iterations)))) 306 if not write_read_verify_data(self.client_ad, self.server_ad, 307 self.generic_message, False): 308 return False 309 self.log.info("Iteration {} completed".format(n)) 310 if n > random_interup_iteration: 311 self.client_ad.droid.bluetoothRfcommCloseSocket() 312 self.client_ad.droid.bluetoothRfcommStop() 313 self.server_ad.droid.bluetoothRfcommStop() 314 except Exception: 315 self.log.info("Exception found as expected. Continuing...") 316 try: 317 self.client_ad.droid.bluetoothRfcommStop() 318 except Exception as err: 319 self.log.error( 320 "Error closing client connection: {}".format(err)) 321 return False 322 try: 323 self.server_ad.droid.bluetoothRfcommStop() 324 except Exception as err: 325 self.log.error( 326 "Error closing server connection: {}".format(err)) 327 return False 328 for t in self.thread_list: 329 t.join() 330 self.thread_list.clear() 331 for t in self.thread_list: 332 t.join() 333 self.thread_list.clear() 334 return True 335 336 def test_rfcomm_longev_data_elasticity(self): 337 """Longevity test an RFCOMM connection's I/O with changing data size 338 339 Test the longevity of RFCOMM with a basic read/write 340 connect/disconnect sequence. The data being transfered changes 341 in size after each write/read sequence to increase up to 990 342 chars in size and decrease down to 1 in size. This repeats through 343 the entire test in order to exercise different size values being 344 written. 345 346 Steps: 347 1. Establish a bonding between two Android devices. 348 2. Write data to RFCOMM from the client droid. 349 3. Read data from RFCOMM from the server droid. 350 4. Verify data written matches data read. 351 5. Change data size according to above description. 352 6. Repeat steps 2-5 5000 times. 353 7. Disconnect RFCOMM connection. 354 8. Repeat steps 1-6 1000 times. 355 356 Expected Result: 357 Each iteration should read and write to the RFCOMM connection 358 successfully. Each connect and disconnect should be successful. 359 360 Returns: 361 Pass if True 362 Fail if False 363 364 TAGS: Classic, Longevity, RFCOMM 365 Priority: 2 366 """ 367 server_mac = self.server_ad.droid.bluetoothGetLocalAddress() 368 message = "x" 369 resize_toggle = 1 370 write_iterations = 5000 371 for i in range(self.longev_iterations): 372 try: 373 self.log.info("iteration {} connection".format(i)) 374 self.orchestrate_rfcomm_connect(server_mac) 375 for n in range(write_iterations): 376 self.log.info("iteration {} data".format(((n + 1) + ( 377 i * write_iterations)))) 378 if not write_read_verify_data(self.client_ad, self.server_ad, 379 message, False): 380 return False 381 self.log.info("Iteration {} completed".format(n)) 382 size_of_message = len(message) 383 #max size is 990 due to a bug in sl4a. 384 if size_of_message >= 990: 385 resize_toggle = 0 386 elif size_of_message <= 1: 387 resize_toggle = 1 388 if resize_toggle == 0: 389 message = "x" * (size_of_message - 1) 390 else: 391 message = "x" * (size_of_message + 1) 392 self.client_ad.droid.bluetoothRfcommStop() 393 self.server_ad.droid.bluetoothRfcommStop() 394 except Exception as err: 395 self.log.info("Error in longevity test: {}".format(err)) 396 for t in self.thread_list: 397 t.join() 398 self.thread_list.clear() 399 return False 400 401 for t in self.thread_list: 402 t.join() 403 self.thread_list.clear() 404 return True 405