1 # Copyright 2016 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Server side bluetooth adapter subtests.""" 6 7 import functools 8 import logging 9 import re 10 import time 11 12 from autotest_lib.client.bin import utils 13 from autotest_lib.client.bin.input import input_event_recorder as recorder 14 from autotest_lib.client.common_lib import error 15 from autotest_lib.server import test 16 from autotest_lib.client.bin.input.linux_input import ( 17 BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL) 18 19 Event = recorder.Event 20 21 22 # Delay binding the methods since host is only available at run time. 23 SUPPORTED_DEVICE_TYPES = { 24 'MOUSE': lambda host: host.chameleon.get_bluetooh_hid_mouse} 25 26 27 def _run_method(method, method_name, *args, **kwargs): 28 """Run a target method and capture exceptions if any. 29 30 This is just a wrapper of the target method so that we do not need to 31 write the exception capturing structure repeatedly. The method could 32 be either a device method or a facade method. 33 34 @param method: the method to run 35 @param method_name: the name of the method 36 37 @returns: the return value of target method() if successful. 38 False otherwise. 39 40 """ 41 result = False 42 try: 43 result = method(*args, **kwargs) 44 except Exception as e: 45 logging.error('%s: %s', method_name, e) 46 except: 47 logging.error('%s: unexpected error', method_name) 48 return result 49 50 51 def get_bluetooth_emulated_device(host, device_type): 52 """Get the bluetooth emulated device object. 53 54 @param host: the DUT, usually a chromebook 55 @param device_type : the bluetooth HID device type, e.g., 'MOUSE' 56 57 @returns: the bluetooth device object 58 59 """ 60 61 def _retry_device_method(method_name): 62 """retry the emulated device's method. 63 64 The method is invoked as device.xxxx() e.g., device.GetChipName(). 65 66 Note that the method name string is provided to get the device's actual 67 method object at run time through getattr(). The rebinding is required 68 because a new device may have been created previously or during the 69 execution of fix_serial_device(). 70 71 Given a device's method, it is not feasible to get the method name 72 through __name__ attribute. This limitation is due to the fact that 73 the device is a dotted object of an XML RPC server proxy. 74 As an example, with the method name 'GetChipName', we could derive the 75 correspoinding method device.GetChipName. On the contrary, given 76 device.GetChipName, it is not feasible to get the method name by 77 device.GetChipName.__name__ 78 79 Also note that if the device method fails at the first time, we would 80 try to fix the problem by re-creating the serial device and see if the 81 problem is fixed. If not, we will reboot the chameleon board and see 82 if the problem is fixed. If yes, execute the target method the second 83 time. 84 85 @param method_name: the string of the method name. 86 87 @returns: the result returned by the device's method. 88 89 """ 90 result = _run_method(getattr(device, method_name), method_name) 91 if _is_successful(result): 92 return result 93 94 logging.error('%s failed the 1st time. Try to fix the serial device.', 95 method_name) 96 97 # Try to fix the serial device if possible. 98 if not fix_serial_device(host, device): 99 return False 100 101 logging.info('%s: retry the 2nd time.', method_name) 102 return _run_method(getattr(device, method_name), method_name) 103 104 105 if device_type not in SUPPORTED_DEVICE_TYPES: 106 raise error.TestError('The device type is not supported: %s', 107 device_type) 108 109 # Get the bluetooth device object and query some important properties. 110 device = SUPPORTED_DEVICE_TYPES[device_type](host)() 111 112 _retry_device_method('Init') 113 logging.info('device type: %s', device_type) 114 115 device.name = _retry_device_method('GetChipName') 116 logging.info('device name: %s', device.name) 117 118 device.address = _retry_device_method('GetLocalBluetoothAddress') 119 logging.info('address: %s', device.address) 120 121 device.pin = _retry_device_method('GetPinCode') 122 logging.info('pin: %s', device.pin) 123 124 device.class_of_service = _retry_device_method('GetClassOfService') 125 logging.info('class of service: 0x%04X', device.class_of_service) 126 127 device.class_of_device = _retry_device_method('GetClassOfDevice') 128 logging.info('class of device: 0x%04X', device.class_of_device) 129 130 device.device_type = _retry_device_method('GetHIDDeviceType') 131 logging.info('device type: %s', device.device_type) 132 133 device.authenticaiton_mode = _retry_device_method('GetAuthenticationMode') 134 logging.info('authentication mode: %s', device.authenticaiton_mode) 135 136 device.port = _retry_device_method('GetPort') 137 logging.info('serial port: %s\n', device.port) 138 139 return device 140 141 142 def recreate_serial_device(device): 143 """Create and connect to a new serial device. 144 145 @param device: the bluetooth HID device 146 147 @returns: True if the serial device is re-created successfully. 148 149 """ 150 logging.info('Remove the old serial device and create a new one.') 151 if device is not None: 152 try: 153 device.Close() 154 except: 155 logging.error('failed to close the serial device.') 156 return False 157 try: 158 device.CreateSerialDevice() 159 return True 160 except: 161 logging.error('failed to invoke CreateSerialDevice.') 162 return False 163 164 165 def _reboot_chameleon(host, device): 166 REBOOT_SLEEP_SECS = 40 167 168 # Close the bluetooth peripheral device and reboot the chameleon board. 169 device.Close() 170 logging.info('rebooting chameleon...') 171 host.chameleon.reboot() 172 173 # Every chameleon reboot would take a bit more than REBOOT_SLEEP_SECS. 174 # Sleep REBOOT_SLEEP_SECS and then begin probing the chameleon board. 175 time.sleep(REBOOT_SLEEP_SECS) 176 177 # Check if the serial device could initialize, connect, and 178 # enter command mode correctly. 179 logging.info('Checking device status...') 180 if not _run_method(device.Init, 'Init'): 181 logging.info('device.Init: failed after reboot') 182 return False 183 if not device.CheckSerialConnection(): 184 logging.info('device.CheckSerialConnection: failed after reboot') 185 return False 186 if not _run_method(device.EnterCommandMode, 'EnterCommandMode'): 187 logging.info('device.EnterCommandMode: failed after reboot') 188 return False 189 logging.info('The device is created successfully after reboot.') 190 return True 191 192 193 def _is_successful(result): 194 """Is the method result successful? 195 196 @param result: a method result 197 198 @returns: True if bool(result) is True or result is 0. 199 Some method result, e.g., class_of_service, may be 0 200 which is considered a valid result. 201 202 """ 203 return bool(result) or result is 0 204 205 206 def fix_serial_device(host, device): 207 """Fix the serial device. 208 209 This function tries to fix the serial device by 210 (1) re-creating a serial device, or 211 (2) rebooting the chameleon board. 212 213 @param host: the DUT, usually a chromebook 214 @param device: the bluetooth HID device 215 216 @returns: True if the serial device is fixed. False otherwise. 217 218 """ 219 # Check the serial connection. Fix it if needed. 220 if device.CheckSerialConnection(): 221 # The USB serial connection still exists. 222 # Re-connection suffices to solve the problem. The problem 223 # is usually caused by serial port change. For example, 224 # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1. 225 logging.info('retry: creating a new serial device...') 226 if not recreate_serial_device(device): 227 return False 228 229 # Check if recreate_serial_device() above fixes the problem. 230 # If not, reboot the chameleon board including creation of a new 231 # bluetooth device. Check if reboot fixes the problem. 232 # If not, return False. 233 result = _run_method(device.EnterCommandMode, 'EnterCommandMode') 234 return _is_successful(result) or _reboot_chameleon(host, device) 235 236 237 def retry(test_method, instance, *args, **kwargs): 238 """Execute the target facade test_method(). Retry if failing the first time. 239 240 A test_method is something like self.test_xxxx() in BluetoothAdapterTests, 241 e.g., BluetoothAdapterTests.test_bluetoothd_running(). 242 243 @param test_method: the test method to retry 244 245 @returns: True if the return value of test_method() is successful. 246 False otherwise. 247 248 """ 249 if _is_successful(_run_method(test_method, test_method.__name__, 250 instance, *args, **kwargs)): 251 return True 252 253 # Try to fix the serial device if applicable. 254 logging.error('%s failed at the 1st time.', test_method.__name__) 255 256 # If this test does not use any attached serial device, just re-run 257 # the test. 258 logging.info('%s: retry the 2nd time.', test_method.__name__) 259 time.sleep(1) 260 if not hasattr(instance, 'device_type'): 261 return _is_successful(_run_method(test_method, test_method.__name__, 262 instance, *args, **kwargs)) 263 264 host = instance.host 265 device = instance.devices[instance.device_type] 266 if not fix_serial_device(host, device): 267 return False 268 269 logging.info('%s: retry the 2nd time.', test_method.__name__) 270 return _is_successful(_run_method(test_method, test_method.__name__, 271 instance, *args, **kwargs)) 272 273 274 def _test_retry_and_log(test_method_or_retry_flag): 275 """A decorator that logs test results, collects error messages, and retries 276 on request. 277 278 @param test_method_or_retry_flag: either the test_method or a retry_flag. 279 There are some possibilities of this argument: 280 1. the test_method to conduct and retry: should retry the test_method. 281 This occurs with 282 @_test_retry_and_log 283 2. the retry flag is True. Should retry the test_method. 284 This occurs with 285 @_test_retry_and_log(True) 286 3. the retry flag is False. Do not retry the test_method. 287 This occurs with 288 @_test_retry_and_log(False) 289 290 @returns: a wrapper of the test_method with test log. The retry mechanism 291 would depend on the retry flag. 292 293 """ 294 295 def decorator(test_method): 296 """A decorator wrapper of the decorated test_method. 297 298 @param test_method: the test method being decorated. 299 300 @returns the wrapper of the test method. 301 302 """ 303 @functools.wraps(test_method) 304 def wrapper(instance, *args, **kwargs): 305 """A wrapper of the decorated method. 306 307 @param instance: an BluetoothAdapterTests instance 308 309 @returns the result of the test method 310 311 """ 312 instance.results = None 313 if callable(test_method_or_retry_flag) or test_method_or_retry_flag: 314 test_result = retry(test_method, instance, *args, **kwargs) 315 else: 316 test_result = test_method(instance, *args, **kwargs) 317 318 if test_result: 319 logging.info('[*** passed: %s]', test_method.__name__) 320 else: 321 fail_msg = '[--- failed: %s (%s)]' % (test_method.__name__, 322 str(instance.results)) 323 logging.error(fail_msg) 324 instance.fails.append(fail_msg) 325 return test_result 326 return wrapper 327 328 if callable(test_method_or_retry_flag): 329 # If the decorator function comes with no argument like 330 # @_test_retry_and_log 331 return decorator(test_method_or_retry_flag) 332 else: 333 # If the decorator function comes with an argument like 334 # @_test_retry_and_log(False) 335 return decorator 336 337 338 def test_case_log(method): 339 """A decorator for test case methods. 340 341 The main purpose of this decorator is to display the test case name 342 in the test log which looks like 343 344 <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...> 345 346 @param method: the test case method to decorate. 347 348 @returns: a wrapper function of the decorated method. 349 350 """ 351 @functools.wraps(method) 352 def wrapper(instance, *args, **kwargs): 353 logging.info('\n<... %s ...>', method.__name__) 354 method(instance, *args, **kwargs) 355 return wrapper 356 357 358 class BluetoothAdapterTests(test.test): 359 """Server side bluetooth adapter tests. 360 361 This test class tries to thoroughly verify most of the important work 362 states of a bluetooth adapter. 363 364 The various test methods are supposed to be invoked by actual autotest 365 tests such as server/cros/site_tests/bluetooth_Adapter*. 366 367 """ 368 version = 1 369 ADAPTER_POWER_STATE_TIMEOUT_SECS = 5 370 ADAPTER_ACTION_SLEEP_SECS = 1 371 ADAPTER_PAIRING_TIMEOUT_SECS = 60 372 ADAPTER_CONNECTION_TIMEOUT_SECS = 30 373 ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30 374 ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3 375 ADAPTER_DISCOVER_TIMEOUT_SECS = 60 # 30 seconds too short sometimes 376 ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1 377 ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30 378 379 HID_REPORT_SLEEP_SECS = 1 380 381 # hci0 is the default hci device if there is no external bluetooth dongle. 382 EXPECTED_HCI = 'hci0' 383 384 CLASS_OF_SERVICE_MASK = 0xFFE000 385 CLASS_OF_DEVICE_MASK = 0x001FFF 386 387 # Constants about advertising. 388 DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 1280 389 DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 1280 390 ADVERTISING_INTERVAL_UNIT = 0.625 391 392 # Error messages about advertising dbus methods. 393 ERROR_MAX_ADVERTISEMENTS = ( 394 'org.bluez.Error.Failed: Maximum advertisements reached') 395 ERROR_INVALID_ADVERTISING_INTERVALS = ( 396 'org.bluez.Error.InvalidArguments: Invalid arguments') 397 398 # Supported profiles by chrome os. 399 SUPPORTED_UUIDS = { 400 'HSP_AG_UUID': '00001112-0000-1000-8000-00805f9b34fb', 401 'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb', 402 'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb', 403 'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb', 404 'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb', 405 'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'} 406 407 408 def get_device(self, device_type): 409 """Get the bluetooth device object. 410 411 @param device_type : the bluetooth HID device type, e.g., 'MOUSE' 412 413 @returns: the bluetooth device object 414 415 """ 416 self.device_type = device_type 417 if self.devices[device_type] is None: 418 self.devices[device_type] = get_bluetooth_emulated_device( 419 self.host, device_type) 420 return self.devices[device_type] 421 422 423 # ------------------------------------------------------------------- 424 # Adater standalone tests 425 # ------------------------------------------------------------------- 426 427 428 @_test_retry_and_log 429 def test_bluetoothd_running(self): 430 """Test that bluetoothd is running.""" 431 return self.bluetooth_facade.is_bluetoothd_running() 432 433 434 @_test_retry_and_log 435 def test_start_bluetoothd(self): 436 """Test that bluetoothd could be started successfully.""" 437 return self.bluetooth_facade.start_bluetoothd() 438 439 440 @_test_retry_and_log 441 def test_stop_bluetoothd(self): 442 """Test that bluetoothd could be stopped successfully.""" 443 return self.bluetooth_facade.stop_bluetoothd() 444 445 446 @_test_retry_and_log 447 def test_adapter_work_state(self): 448 """Test that the bluetooth adapter is in the correct working state. 449 450 This includes that the adapter is detectable, is powered on, 451 and its hci device is hci0. 452 """ 453 has_adapter = self.bluetooth_facade.has_adapter() 454 is_powered_on = self.bluetooth_facade.is_powered_on() 455 hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI 456 self.results = { 457 'has_adapter': has_adapter, 458 'is_powered_on': is_powered_on, 459 'hci': hci} 460 return all(self.results.values()) 461 462 463 @_test_retry_and_log 464 def test_power_on_adapter(self): 465 """Test that the adapter could be powered on successfully.""" 466 power_on = self.bluetooth_facade.set_powered(True) 467 is_powered_on = False 468 try: 469 utils.poll_for_condition( 470 condition=self.bluetooth_facade.is_powered_on, 471 timeout=self.ADAPTER_POWER_STATE_TIMEOUT_SECS, 472 desc='Waiting for adapter powered on') 473 is_powered_on = True 474 except utils.TimeoutError as e: 475 logging.error('test_power_on_adapter: %s', e) 476 except: 477 logging.error('test_power_on_adapter: unexpected error') 478 479 self.results = {'power_on': power_on, 'is_powered_on': is_powered_on} 480 return all(self.results.values()) 481 482 483 @_test_retry_and_log 484 def test_power_off_adapter(self): 485 """Test that the adapter could be powered off successfully.""" 486 power_off = self.bluetooth_facade.set_powered(False) 487 is_powered_off = False 488 try: 489 utils.poll_for_condition( 490 condition=(lambda: 491 not self.bluetooth_facade.is_powered_on()), 492 timeout=self.ADAPTER_POWER_STATE_TIMEOUT_SECS, 493 desc='Waiting for adapter powered off') 494 is_powered_off = True 495 except utils.TimeoutError as e: 496 logging.error('test_power_off_adapter: %s', e) 497 except: 498 logging.error('test_power_off_adapter: unexpected error') 499 500 self.results = { 501 'power_off': power_off, 502 'is_powered_off': is_powered_off} 503 return all(self.results.values()) 504 505 506 @_test_retry_and_log 507 def test_reset_on_adapter(self): 508 """Test that the adapter could be reset on successfully. 509 510 This includes restarting bluetoothd, and removing the settings 511 and cached devices. 512 """ 513 reset_on = self.bluetooth_facade.reset_on() 514 is_powered_on = False 515 try: 516 utils.poll_for_condition( 517 condition=self.bluetooth_facade.is_powered_on, 518 timeout=self.ADAPTER_POWER_STATE_TIMEOUT_SECS, 519 desc='Waiting for adapter reset on') 520 is_powered_on = True 521 except utils.TimeoutError as e: 522 logging.error('test_reset_on_adapter: %s', e) 523 except: 524 logging.error('test_reset_on_adapter: unexpected error') 525 526 self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on} 527 return all(self.results.values()) 528 529 530 @_test_retry_and_log 531 def test_reset_off_adapter(self): 532 """Test that the adapter could be reset off successfully. 533 534 This includes restarting bluetoothd, and removing the settings 535 and cached devices. 536 """ 537 reset_off = self.bluetooth_facade.reset_off() 538 is_powered_off = False 539 try: 540 utils.poll_for_condition( 541 condition=(lambda: 542 not self.bluetooth_facade.is_powered_on()), 543 timeout=self.ADAPTER_POWER_STATE_TIMEOUT_SECS, 544 desc='Waiting for adapter reset off') 545 is_powered_off = True 546 except utils.TimeoutError as e: 547 logging.error('test_reset_off_adapter: %s', e) 548 except: 549 logging.error('test_reset_off_adapter: unexpected error') 550 551 self.results = { 552 'reset_off': reset_off, 553 'is_powered_off': is_powered_off} 554 return all(self.results.values()) 555 556 557 @_test_retry_and_log 558 def test_UUIDs(self): 559 """Test that basic profiles are supported.""" 560 adapter_UUIDs = self.bluetooth_facade.get_UUIDs() 561 self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values() 562 if uuid not in adapter_UUIDs] 563 return not bool(self.results) 564 565 566 @_test_retry_and_log 567 def test_start_discovery(self): 568 """Test that the adapter could start discovery.""" 569 start_discovery = self.bluetooth_facade.start_discovery() 570 time.sleep(self.ADAPTER_ACTION_SLEEP_SECS) 571 is_discovering = self.bluetooth_facade.is_discovering() 572 self.results = { 573 'start_discovery': start_discovery, 574 'is_discovering': is_discovering} 575 return all(self.results.values()) 576 577 578 @_test_retry_and_log 579 def test_stop_discovery(self): 580 """Test that the adapter could stop discovery.""" 581 stop_discovery = self.bluetooth_facade.stop_discovery() 582 time.sleep(self.ADAPTER_ACTION_SLEEP_SECS) 583 is_not_discovering = not self.bluetooth_facade.is_discovering() 584 self.results = { 585 'stop_discovery': stop_discovery, 586 'is_not_discovering': is_not_discovering} 587 return all(self.results.values()) 588 589 590 @_test_retry_and_log 591 def test_discoverable(self): 592 """Test that the adapter could be set discoverable.""" 593 set_discoverable = self.bluetooth_facade.set_discoverable(True) 594 time.sleep(self.ADAPTER_ACTION_SLEEP_SECS) 595 is_discoverable = self.bluetooth_facade.is_discoverable() 596 self.results = { 597 'set_discoverable': set_discoverable, 598 'is_discoverable': is_discoverable} 599 return all(self.results.values()) 600 601 602 @_test_retry_and_log 603 def test_nondiscoverable(self): 604 """Test that the adapter could be set non-discoverable.""" 605 set_nondiscoverable = self.bluetooth_facade.set_discoverable(False) 606 time.sleep(self.ADAPTER_ACTION_SLEEP_SECS) 607 is_nondiscoverable = not self.bluetooth_facade.is_discoverable() 608 self.results = { 609 'set_nondiscoverable': set_nondiscoverable, 610 'is_nondiscoverable': is_nondiscoverable} 611 return all(self.results.values()) 612 613 614 @_test_retry_and_log 615 def test_pairable(self): 616 """Test that the adapter could be set pairable.""" 617 set_pairable = self.bluetooth_facade.set_pairable(True) 618 time.sleep(self.ADAPTER_ACTION_SLEEP_SECS) 619 is_pairable = self.bluetooth_facade.is_pairable() 620 self.results = { 621 'set_pairable': set_pairable, 622 'is_pairable': is_pairable} 623 return all(self.results.values()) 624 625 626 @_test_retry_and_log 627 def test_nonpairable(self): 628 """Test that the adapter could be set non-pairable.""" 629 set_nonpairable = self.bluetooth_facade.set_pairable(False) 630 time.sleep(self.ADAPTER_ACTION_SLEEP_SECS) 631 is_nonpairable = not self.bluetooth_facade.is_pairable() 632 self.results = { 633 'set_nonpairable': set_nonpairable, 634 'is_nonpairable': is_nonpairable} 635 return all(self.results.values()) 636 637 638 # ------------------------------------------------------------------- 639 # Tests about general discovering, pairing, and connection 640 # ------------------------------------------------------------------- 641 642 643 @_test_retry_and_log 644 def test_discover_device(self, device_address): 645 """Test that the adapter could discover the specified device address. 646 647 @param device_address: Address of the device. 648 649 @returns: True if the device is found. False otherwise. 650 651 """ 652 has_device_initially = False 653 start_discovery = False 654 device_discovered = False 655 has_device = self.bluetooth_facade.has_device 656 657 if has_device(device_address): 658 has_device_initially = True 659 elif self.bluetooth_facade.start_discovery(): 660 start_discovery = True 661 try: 662 utils.poll_for_condition( 663 condition=(lambda: has_device(device_address)), 664 timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS, 665 sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 666 desc='Waiting for discovering %s' % device_address) 667 device_discovered = True 668 except utils.TimeoutError as e: 669 logging.error('test_discover_device: %s', e) 670 except Exception as e: 671 logging.error('test_discover_device: %s', e) 672 err = 'bluetoothd probably crashed. Check out /var/log/messages' 673 logging.error(err) 674 except: 675 logging.error('test_discover_device: unexpected error') 676 677 self.results = { 678 'has_device_initially': has_device_initially, 679 'start_discovery': start_discovery, 680 'device_discovered': device_discovered} 681 return has_device_initially or device_discovered 682 683 684 @_test_retry_and_log 685 def test_pairing(self, device_address, pin, trusted=True): 686 """Test that the adapter could pair with the device successfully. 687 688 @param device_address: Address of the device. 689 @param pin: pin code to pair with the device. 690 @param trusted: indicating whether to set the device trusted. 691 692 @returns: True if pairing succeeds. False otherwise. 693 694 """ 695 696 def _pair_device(): 697 """Pair to the device. 698 699 @returns: True if it could pair with the device. False otherwise. 700 701 """ 702 return self.bluetooth_facade.pair_legacy_device( 703 device_address, pin, trusted, 704 self.ADAPTER_PAIRING_TIMEOUT_SECS) 705 706 707 has_device = False 708 paired = False 709 if self.bluetooth_facade.has_device(device_address): 710 has_device = True 711 try: 712 utils.poll_for_condition( 713 condition=_pair_device, 714 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 715 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 716 desc='Waiting for pairing %s' % device_address) 717 paired = True 718 except utils.TimeoutError as e: 719 logging.error('test_pairing: %s', e) 720 except: 721 logging.error('test_pairing: unexpected error') 722 723 self.results = {'has_device': has_device, 'paired': paired} 724 return all(self.results.values()) 725 726 727 @_test_retry_and_log 728 def test_remove_pairing(self, device_address): 729 """Test that the adapter could remove the paired device. 730 731 @param device_address: Address of the device. 732 733 @returns: True if the device is removed successfully. False otherwise. 734 735 """ 736 device_is_paired_initially = self.bluetooth_facade.device_is_paired( 737 device_address) 738 remove_pairing = False 739 pairing_removed = False 740 741 if device_is_paired_initially: 742 remove_pairing = self.bluetooth_facade.remove_device_object( 743 device_address) 744 pairing_removed = not self.bluetooth_facade.device_is_paired( 745 device_address) 746 747 self.results = { 748 'device_is_paired_initially': device_is_paired_initially, 749 'remove_pairing': remove_pairing, 750 'pairing_removed': pairing_removed} 751 return all(self.results.values()) 752 753 754 def test_set_trusted(self, device_address, trusted=True): 755 """Test whether the device with the specified address is trusted. 756 757 @param device_address: Address of the device. 758 @param trusted : True or False indicating if trusted is expected. 759 760 @returns: True if the device's "Trusted" property is as specified; 761 False otherwise. 762 763 """ 764 765 set_trusted = self.bluetooth_facade.set_trusted( 766 device_address, trusted) 767 768 properties = self.bluetooth_facade.get_device_properties( 769 device_address) 770 actual_trusted = properties.get('Trusted') 771 772 self.results = { 773 'set_trusted': set_trusted, 774 'actual trusted': actual_trusted, 775 'expected trusted': trusted} 776 return actual_trusted == trusted 777 778 779 @_test_retry_and_log 780 def test_connection_by_adapter(self, device_address): 781 """Test that the adapter of dut could connect to the device successfully 782 783 It is the caller's responsibility to pair to the device before 784 doing connection. 785 786 @param device_address: Address of the device. 787 788 @returns: True if connection is performed. False otherwise. 789 790 """ 791 792 def _connect_device(): 793 """Connect to the device. 794 795 @returns: True if it could connect to the device. False otherwise. 796 797 """ 798 return self.bluetooth_facade.connect_device(device_address) 799 800 801 has_device = False 802 connected = False 803 if self.bluetooth_facade.has_device(device_address): 804 has_device = True 805 try: 806 utils.poll_for_condition( 807 condition=_connect_device, 808 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 809 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 810 desc='Waiting for connecting to %s' % device_address) 811 connected = True 812 except utils.TimeoutError as e: 813 logging.error('test_connection_by_adapter: %s', e) 814 except: 815 logging.error('test_connection_by_adapter: unexpected error') 816 817 self.results = {'has_device': has_device, 'connected': connected} 818 return all(self.results.values()) 819 820 821 @_test_retry_and_log 822 def test_disconnection_by_adapter(self, device_address): 823 """Test that the adapter of dut could disconnect the device successfully 824 825 @param device_address: Address of the device. 826 827 @returns: True if disconnection is performed. False otherwise. 828 829 """ 830 return self.bluetooth_facade.disconnect_device(device_address) 831 832 833 def _enter_command_mode(self, device): 834 """Let the device enter command mode. 835 836 Before using the device, need to call this method to make sure 837 it is in the command mode. 838 839 @param device: the bluetooth HID device 840 841 @returns: True if successful. False otherwise. 842 843 """ 844 result = _is_successful(_run_method(device.EnterCommandMode, 845 'EnterCommandMode')) 846 if not result: 847 logging.error('EnterCommandMode failed') 848 return result 849 850 851 @_test_retry_and_log 852 def test_connection_by_device(self, device): 853 """Test that the device could connect to the adapter successfully. 854 855 This emulates the behavior that a device may initiate a 856 connection request after waking up from power saving mode. 857 858 @param device: the bluetooth HID device 859 860 @returns: True if connection is performed correctly by device and 861 the adapter also enters connection state. 862 False otherwise. 863 864 """ 865 if not self._enter_command_mode(device): 866 return False 867 868 method_name = 'test_connection_by_device' 869 connection_by_device = False 870 adapter_address = self.bluetooth_facade.address 871 try: 872 device.ConnectToRemoteAddress(adapter_address) 873 connection_by_device = True 874 except Exception as e: 875 logging.error('%s (device): %s', method_name, e) 876 except: 877 logging.error('%s (device): unexpected error', method_name) 878 879 connection_seen_by_adapter = False 880 device_address = device.address 881 device_is_connected = self.bluetooth_facade.device_is_connected 882 try: 883 utils.poll_for_condition( 884 condition=lambda: device_is_connected(device_address), 885 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 886 desc=('Waiting for connection from %s' % device_address)) 887 connection_seen_by_adapter = True 888 except utils.TimeoutError as e: 889 logging.error('%s (adapter): %s', method_name, e) 890 except: 891 logging.error('%s (adapter): unexpected error', method_name) 892 893 self.results = { 894 'connection_by_device': connection_by_device, 895 'connection_seen_by_adapter': connection_seen_by_adapter} 896 return all(self.results.values()) 897 898 899 @_test_retry_and_log 900 def test_disconnection_by_device(self, device): 901 """Test that the device could disconnect the adapter successfully. 902 903 This emulates the behavior that a device may initiate a 904 disconnection request before going into power saving mode. 905 906 Note: should not try to enter command mode in this method. When 907 a device is connected, there is no way to enter command mode. 908 One could just issue a special disconnect command without 909 entering command mode. 910 911 @param device: the bluetooth HID device 912 913 @returns: True if disconnection is performed correctly by device and 914 the adapter also observes the disconnection. 915 False otherwise. 916 917 """ 918 method_name = 'test_disconnection_by_device' 919 disconnection_by_device = False 920 try: 921 device.Disconnect() 922 disconnection_by_device = True 923 except Exception as e: 924 logging.error('%s (device): %s', method_name, e) 925 except: 926 logging.error('%s (device): unexpected error', method_name) 927 928 disconnection_seen_by_adapter = False 929 device_address = device.address 930 device_is_connected = self.bluetooth_facade.device_is_connected 931 try: 932 utils.poll_for_condition( 933 condition=lambda: not device_is_connected(device_address), 934 timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS, 935 desc=('Waiting for disconnection from %s' % device_address)) 936 disconnection_seen_by_adapter = True 937 except utils.TimeoutError as e: 938 logging.error('%s (adapter): %s', method_name, e) 939 except: 940 logging.error('%s (adapter): unexpected error', method_name) 941 942 self.results = { 943 'disconnection_by_device': disconnection_by_device, 944 'disconnection_seen_by_adapter': disconnection_seen_by_adapter} 945 return all(self.results.values()) 946 947 948 def _get_device_name(self, device_address): 949 """Get the device name. 950 951 @returns: True if the device name is derived. None otherwise. 952 953 """ 954 properties = self.bluetooth_facade.get_device_properties( 955 device_address) 956 self.discovered_device_name = properties.get('Name') 957 return bool(self.discovered_device_name) 958 959 960 @_test_retry_and_log 961 def test_device_name(self, device_address, expected_device_name): 962 """Test that the device name discovered by the adapter is correct. 963 964 @param device_address: Address of the device. 965 @param expected_device_name: the bluetooth device name 966 967 @returns: True if the discovered_device_name is expected_device_name. 968 False otherwise. 969 970 """ 971 try: 972 utils.poll_for_condition( 973 condition=lambda: self._get_device_name(device_address), 974 timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS, 975 sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 976 desc='Waiting for device name of %s' % device_address) 977 except utils.TimeoutError as e: 978 logging.error('test_device_name: %s', e) 979 except: 980 logging.error('test_device_name: unexpected error') 981 982 self.results = { 983 'expected_device_name': expected_device_name, 984 'discovered_device_name': self.discovered_device_name} 985 return self.discovered_device_name == expected_device_name 986 987 988 @_test_retry_and_log 989 def test_device_class_of_service(self, device_address, 990 expected_class_of_service): 991 """Test that the discovered device class of service is as expected. 992 993 @param device_address: Address of the device. 994 @param expected_class_of_service: the expected class of service 995 996 @returns: True if the discovered class of service matches the 997 expected class of service. False otherwise. 998 999 """ 1000 properties = self.bluetooth_facade.get_device_properties( 1001 device_address) 1002 device_class = properties.get('Class') 1003 discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK 1004 if device_class else None) 1005 1006 self.results = { 1007 'device_class': device_class, 1008 'expected_class_of_service': expected_class_of_service, 1009 'discovered_class_of_service': discovered_class_of_service} 1010 return discovered_class_of_service == expected_class_of_service 1011 1012 1013 @_test_retry_and_log 1014 def test_device_class_of_device(self, device_address, 1015 expected_class_of_device): 1016 """Test that the discovered device class of device is as expected. 1017 1018 @param device_address: Address of the device. 1019 @param expected_class_of_device: the expected class of device 1020 1021 @returns: True if the discovered class of device matches the 1022 expected class of device. False otherwise. 1023 1024 """ 1025 properties = self.bluetooth_facade.get_device_properties( 1026 device_address) 1027 device_class = properties.get('Class') 1028 discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK 1029 if device_class else None) 1030 1031 self.results = { 1032 'device_class': device_class, 1033 'expected_class_of_device': expected_class_of_device, 1034 'discovered_class_of_device': discovered_class_of_device} 1035 return discovered_class_of_device == expected_class_of_device 1036 1037 1038 def _get_btmon_log(self, method, logging_timespan=1): 1039 """Capture the btmon log when executing the specified method. 1040 1041 @param method: the method to capture log. 1042 The method would be executed only when it is not None. 1043 This allows us to simply capture btmon log without 1044 executing any command. 1045 @param logging_timespan: capture btmon log for logging_timespan seconds. 1046 1047 """ 1048 self.bluetooth_le_facade.btmon_start() 1049 self.advertising_msg = method() if method else '' 1050 time.sleep(logging_timespan) 1051 self.bluetooth_le_facade.btmon_stop() 1052 1053 1054 def convert_to_adv_jiffies(self, adv_interval_ms): 1055 """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms. 1056 1057 @param adv_interval_ms: an advertising interval 1058 1059 @returns: the equivalent jiffies 1060 1061 """ 1062 return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT 1063 1064 1065 def compute_duration(self, max_adv_interval_ms): 1066 """Compute duration from max_adv_interval_ms. 1067 1068 Advertising duration is calculated approximately as 1069 duration = max_adv_interval_ms / 1000.0 * 1.1 1070 1071 @param max_adv_interval_ms: max advertising interval in milliseconds. 1072 1073 @returns: duration in seconds. 1074 1075 """ 1076 return max_adv_interval_ms / 1000.0 * 1.1 1077 1078 1079 def compute_logging_timespan(self, max_adv_interval_ms): 1080 """Compute the logging timespan from max_adv_interval_ms. 1081 1082 The logging timespan is the time needed to record btmon log. 1083 1084 @param max_adv_interval_ms: max advertising interval in milliseconds. 1085 1086 @returns: logging_timespan in seconds. 1087 1088 """ 1089 duration = self.compute_duration(max_adv_interval_ms) 1090 logging_timespan = max(self.count_advertisements * duration, 1) 1091 return logging_timespan 1092 1093 1094 @_test_retry_and_log(False) 1095 def test_check_duration_and_intervals(self, min_adv_interval_ms, 1096 max_adv_interval_ms, 1097 number_advertisements): 1098 """Verify that every advertisements are scheduled according to the 1099 duration and intervals. 1100 1101 An advertisement would be scheduled at the time span of 1102 duration * number_advertisements 1103 1104 @param min_adv_interval_ms: min advertising interval in milliseconds. 1105 @param max_adv_interval_ms: max advertising interval in milliseconds. 1106 @param number_advertisements: the number of existing advertisements 1107 1108 @returns: True if all advertisements are scheduled based on the 1109 duration and intervals. 1110 1111 """ 1112 1113 1114 def within_tolerance(a, b, ratio=0.1): 1115 return abs(a - b) / abs(a) <= ratio 1116 1117 1118 start_str = 'Set Advertising Intervals:' 1119 search_strings = ['HCI Command: LE Set Advertising Data', 'Company'] 1120 search_str = '|'.join(search_strings) 1121 1122 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 1123 start_str=start_str) 1124 1125 # Company string looks like 1126 # Company: not assigned (65283) 1127 company_pattern = re.compile('Company:.*\((\d*)\)') 1128 1129 # The string with timestamp looks like 1130 # < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236 1131 set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)' 1132 set_adv_time_pattern = re.compile(set_adv_time_str) 1133 1134 adv_timestamps = {} 1135 timestamp = None 1136 manufacturer_id = None 1137 for line in contents: 1138 result = set_adv_time_pattern.search(line) 1139 if result: 1140 timestamp = float(result.group(1)) 1141 1142 result = company_pattern.search(line) 1143 if result: 1144 manufacturer_id = '0x%04x' % int(result.group(1)) 1145 1146 if timestamp and manufacturer_id: 1147 if manufacturer_id not in adv_timestamps: 1148 adv_timestamps[manufacturer_id] = [] 1149 adv_timestamps[manufacturer_id].append(timestamp) 1150 timestamp = None 1151 manufacturer_id = None 1152 1153 duration = self.compute_duration(max_adv_interval_ms) 1154 expected_timespan = duration * number_advertisements 1155 1156 check_duration = True 1157 for manufacturer_id, values in adv_timestamps.iteritems(): 1158 logging.debug('manufacturer_id %s: %s', manufacturer_id, values) 1159 timespans = [values[i] - values[i - 1] 1160 for i in xrange(1, len(values))] 1161 errors = [timespans[i] for i in xrange(len(timespans)) 1162 if not within_tolerance(expected_timespan, timespans[i])] 1163 logging.debug('timespans: %s', timespans) 1164 logging.debug('errors: %s', errors) 1165 if bool(errors): 1166 check_duration = False 1167 1168 # Verify that the advertising intervals are also correct. 1169 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 1170 self._verify_advertising_intervals(min_adv_interval_ms, 1171 max_adv_interval_ms)) 1172 1173 self.results = { 1174 'check_duration': check_duration, 1175 'max_adv_interval_ms_found': max_adv_interval_ms_found, 1176 'max_adv_interval_ms_found': max_adv_interval_ms_found, 1177 } 1178 return all(self.results.values()) 1179 1180 1181 def _get_min_max_intervals_strings(self, min_adv_interval_ms, 1182 max_adv_interval_ms): 1183 """Get the min and max advertising intervals strings shown in btmon. 1184 1185 Advertising intervals shown in the btmon log look like 1186 Min advertising interval: 1280.000 msec (0x0800) 1187 Max advertising interval: 1280.000 msec (0x0800) 1188 1189 @param min_adv_interval_ms: min advertising interval in milliseconds. 1190 @param max_adv_interval_ms: max advertising interval in milliseconds. 1191 1192 @returns: the min and max intervals strings. 1193 1194 """ 1195 min_str = ('Min advertising interval: %.3f msec (0x%04x)' % 1196 (min_adv_interval_ms, 1197 min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) 1198 logging.debug('min_adv_interval_ms: %s', min_str) 1199 1200 max_str = ('Max advertising interval: %.3f msec (0x%04x)' % 1201 (max_adv_interval_ms, 1202 max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) 1203 logging.debug('max_adv_interval_ms: %s', max_str) 1204 1205 return (min_str, max_str) 1206 1207 1208 def _verify_advertising_intervals(self, min_adv_interval_ms, 1209 max_adv_interval_ms): 1210 """Verify min and max advertising intervals. 1211 1212 Advertising intervals look like 1213 Min advertising interval: 1280.000 msec (0x0800) 1214 Max advertising interval: 1280.000 msec (0x0800) 1215 1216 @param min_adv_interval_ms: min advertising interval in milliseconds. 1217 @param max_adv_interval_ms: max advertising interval in milliseconds. 1218 1219 @returns: a tuple of (True, True) if both min and max advertising 1220 intervals could be found. Otherwise, the corresponding element 1221 in the tuple if False. 1222 1223 """ 1224 min_str, max_str = self._get_min_max_intervals_strings( 1225 min_adv_interval_ms, max_adv_interval_ms) 1226 1227 min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str) 1228 max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str) 1229 1230 return min_adv_interval_ms_found, max_adv_interval_ms_found 1231 1232 1233 @_test_retry_and_log(False) 1234 def test_register_advertisement(self, advertisement_data, instance_id, 1235 min_adv_interval_ms, max_adv_interval_ms): 1236 """Verify that an advertisement is registered correctly. 1237 1238 This test verifies the following data: 1239 - advertisement added 1240 - manufacturer data 1241 - service UUIDs 1242 - service data 1243 - advertising intervals 1244 - advertising enabled 1245 1246 @param advertisement_data: the data of an advertisement to register. 1247 @param instance_id: the instance id which starts at 1. 1248 @param min_adv_interval_ms: min_adv_interval in milliseconds. 1249 @param max_adv_interval_ms: max_adv_interval in milliseconds. 1250 1251 @returns: True if the advertisement is registered correctly. 1252 False otherwise. 1253 1254 """ 1255 # When registering a new advertisement, it is possible that another 1256 # instance is advertising. It may need to wait for all other 1257 # advertisements to complete advertising once. 1258 self.count_advertisements += 1 1259 logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) 1260 self._get_btmon_log( 1261 lambda: self.bluetooth_le_facade.register_advertisement( 1262 advertisement_data), 1263 logging_timespan=logging_timespan) 1264 1265 # Verify that a new advertisement is added. 1266 advertisement_added = self.bluetooth_le_facade.btmon_find( 1267 'Advertising Added: %d' % instance_id) 1268 1269 # Verify that the manufacturer data could be found. 1270 manufacturer_data = advertisement_data.get('ManufacturerData', '') 1271 for manufacturer_id in manufacturer_data: 1272 # The 'not assigned' text below means the manufacturer id 1273 # is not actually assigned to any real manufacturer. 1274 # For real 16-bit manufacturer UUIDs, refer to 1275 # https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members 1276 manufacturer_data_found = self.bluetooth_le_facade.btmon_find( 1277 'Company: not assigned (%d)' % int(manufacturer_id, 16)) 1278 1279 # Verify that all service UUIDs could be found. 1280 service_uuids_found = True 1281 for uuid in advertisement_data.get('ServiceUUIDs', []): 1282 # Service UUIDs looks like ['0x180D', '0x180F'] 1283 # Heart Rate (0x180D) 1284 # Battery Service (0x180F) 1285 # For actual 16-bit service UUIDs, refer to 1286 # https://www.bluetooth.com/specifications/gatt/services 1287 if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid): 1288 service_uuids_found = False 1289 break 1290 1291 # Verify service data. 1292 service_data_found = True 1293 for uuid, data in advertisement_data.get('ServiceData', {}).items(): 1294 # A service data looks like 1295 # Service Data (UUID 0x9999): 0001020304 1296 # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04] 1297 data_str = ''.join(map(lambda n: '%02x' % n, data)) 1298 if not self.bluetooth_le_facade.btmon_find( 1299 'Service Data (UUID 0x%s): %s' % (uuid, data_str)): 1300 service_data_found = False 1301 break 1302 1303 # Verify that the advertising intervals are correct. 1304 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 1305 self._verify_advertising_intervals(min_adv_interval_ms, 1306 max_adv_interval_ms)) 1307 1308 # Verify advertising is enabled. 1309 advertising_enabled = self.bluetooth_le_facade.btmon_find( 1310 'Advertising: Enabled (0x01)') 1311 1312 self.results = { 1313 'advertisement_added': advertisement_added, 1314 'manufacturer_data_found': manufacturer_data_found, 1315 'service_uuids_found': service_uuids_found, 1316 'service_data_found': service_data_found, 1317 'min_adv_interval_ms_found': min_adv_interval_ms_found, 1318 'max_adv_interval_ms_found': max_adv_interval_ms_found, 1319 'advertising_enabled': advertising_enabled, 1320 } 1321 return all(self.results.values()) 1322 1323 1324 @_test_retry_and_log(False) 1325 def test_fail_to_register_advertisement(self, advertisement_data, 1326 min_adv_interval_ms, 1327 max_adv_interval_ms): 1328 """Verify that failure is incurred when max advertisements are reached. 1329 1330 This test verifies that a registration failure is incurred when 1331 max advertisements are reached. The error message looks like: 1332 1333 org.bluez.Error.Failed: Maximum advertisements reached 1334 1335 @param advertisement_data: the advertisement to register. 1336 @param min_adv_interval_ms: min_adv_interval in milliseconds. 1337 @param max_adv_interval_ms: max_adv_interval in milliseconds. 1338 1339 @returns: True if the error message is received correctly. 1340 False otherwise. 1341 1342 """ 1343 logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) 1344 self._get_btmon_log( 1345 lambda: self.bluetooth_le_facade.register_advertisement( 1346 advertisement_data), 1347 logging_timespan=logging_timespan) 1348 1349 # Verify that max advertisements are reached. 1350 max_advertisements_error = ( 1351 self.ERROR_MAX_ADVERTISEMENTS in self.advertising_msg) 1352 1353 # Verify that no new advertisement is added. 1354 advertisement_not_added = not self.bluetooth_le_facade.btmon_find( 1355 'Advertising Added:') 1356 1357 # Verify that the advertising intervals are correct. 1358 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 1359 self._verify_advertising_intervals(min_adv_interval_ms, 1360 max_adv_interval_ms)) 1361 1362 # Verify advertising remains enabled. 1363 advertising_enabled = self.bluetooth_le_facade.btmon_find( 1364 'Advertising: Enabled (0x01)') 1365 1366 self.results = { 1367 'max_advertisements_error': max_advertisements_error, 1368 'advertisement_not_added': advertisement_not_added, 1369 'min_adv_interval_ms_found': min_adv_interval_ms_found, 1370 'max_adv_interval_ms_found': max_adv_interval_ms_found, 1371 'advertising_enabled': advertising_enabled, 1372 } 1373 return all(self.results.values()) 1374 1375 1376 @_test_retry_and_log(False) 1377 def test_unregister_advertisement(self, advertisement_data, instance_id, 1378 min_adv_interval_ms, max_adv_interval_ms, 1379 advertising_disabled): 1380 """Verify that an advertisement is unregistered correctly. 1381 1382 This test verifies the following data: 1383 - advertisement removed 1384 - advertising status: enabled if there are advertisements left; 1385 disabled otherwise. 1386 1387 @param advertisement_data: the data of an advertisement to unregister. 1388 @param instance_id: the instance id of the advertisement to remove. 1389 @param min_adv_interval_ms: min_adv_interval in milliseconds. 1390 @param max_adv_interval_ms: max_adv_interval in milliseconds. 1391 @param advertising_disabled: is advertising disabled? This happens 1392 only when all advertisements are removed. 1393 1394 @returns: True if the advertisement is unregistered correctly. 1395 False otherwise. 1396 1397 """ 1398 self.count_advertisements -= 1 1399 self._get_btmon_log( 1400 lambda: self.bluetooth_le_facade.unregister_advertisement( 1401 advertisement_data)) 1402 1403 # Verify that the advertisement is removed. 1404 advertisement_removed = self.bluetooth_le_facade.btmon_find( 1405 'Advertising Removed: %d' % instance_id) 1406 1407 # If advertising_disabled is True, there should be no log like 1408 # 'Advertising: Enabled (0x01)' 1409 # If advertising_disabled is False, there should be log like 1410 # 'Advertising: Enabled (0x01)' 1411 1412 # Only need to check advertising status when the last advertisement 1413 # is removed. For any other advertisements prior to the last one, 1414 # we may or may not observe 'Advertising: Enabled (0x01)' message. 1415 # Hence, the test would become flaky if we insist to see that message. 1416 # A possible workaround is to sleep for a while and then check the 1417 # message. The drawback is that we may need to wait up to 10 seconds 1418 # if the advertising duration and intervals are long. 1419 # In a test case, we always run test_check_duration_and_intervals() 1420 # to check if advertising duration and intervals are correct after 1421 # un-registering one or all advertisements, it is safe to do so. 1422 advertising_enabled_found = self.bluetooth_le_facade.btmon_find( 1423 'Advertising: Enabled (0x01)') 1424 advertising_disabled_found = self.bluetooth_le_facade.btmon_find( 1425 'Advertising: Disabled (0x00)') 1426 advertising_status_correct = not advertising_disabled or ( 1427 advertising_disabled_found and not advertising_enabled_found) 1428 1429 self.results = { 1430 'advertisement_removed': advertisement_removed, 1431 'advertising_status_correct': advertising_status_correct, 1432 } 1433 return all(self.results.values()) 1434 1435 1436 @_test_retry_and_log(False) 1437 def test_set_advertising_intervals(self, min_adv_interval_ms, 1438 max_adv_interval_ms): 1439 """Verify that new advertising intervals are set correctly. 1440 1441 Note that setting advertising intervals does not enable/disable 1442 advertising. Hence, there is no need to check the advertising 1443 status. 1444 1445 @param min_adv_interval_ms: the min advertising interval in ms. 1446 @param max_adv_interval_ms: the max advertising interval in ms. 1447 1448 @returns: True if the new advertising intervals are correct. 1449 False otherwise. 1450 1451 """ 1452 self._get_btmon_log( 1453 lambda: self.bluetooth_le_facade.set_advertising_intervals( 1454 min_adv_interval_ms, max_adv_interval_ms)) 1455 1456 # Verify the new advertising intervals. 1457 # With intervals of 200 ms and 200 ms, the log looks like 1458 # bluetoothd: Set Advertising Intervals: 0x0140, 0x0140 1459 txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' 1460 adv_intervals_found = self.bluetooth_le_facade.btmon_find( 1461 txt % (self.convert_to_adv_jiffies(min_adv_interval_ms), 1462 self.convert_to_adv_jiffies(max_adv_interval_ms))) 1463 1464 self.results = {'adv_intervals_found': adv_intervals_found} 1465 return all(self.results.values()) 1466 1467 1468 @_test_retry_and_log(False) 1469 def test_fail_to_set_advertising_intervals( 1470 self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms, 1471 orig_min_adv_interval_ms, orig_max_adv_interval_ms): 1472 """Verify that setting invalid advertising intervals results in error. 1473 1474 If invalid min/max advertising intervals are given, it would incur 1475 the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'. 1476 Note that valid advertising intervals fall between 20 ms and 10,240 ms. 1477 1478 @param invalid_min_adv_interval_ms: the invalid min advertising interval 1479 in ms. 1480 @param invalid_max_adv_interval_ms: the invalid max advertising interval 1481 in ms. 1482 @param orig_min_adv_interval_ms: the original min advertising interval 1483 in ms. 1484 @param orig_max_adv_interval_ms: the original max advertising interval 1485 in ms. 1486 1487 @returns: True if it fails to set invalid advertising intervals. 1488 False otherwise. 1489 1490 """ 1491 self._get_btmon_log( 1492 lambda: self.bluetooth_le_facade.set_advertising_intervals( 1493 invalid_min_adv_interval_ms, 1494 invalid_max_adv_interval_ms)) 1495 1496 # Verify that the invalid error is observed in the dbus error callback 1497 # message. 1498 invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in 1499 self.advertising_msg) 1500 1501 # Verify that the min/max advertising intervals remain the same 1502 # after setting the invalid advertising intervals. 1503 # 1504 # In btmon log, we would see the following message first. 1505 # bluetoothd: Set Advertising Intervals: 0x0010, 0x0010 1506 # And then, we should check if "Min advertising interval" and 1507 # "Max advertising interval" remain the same. 1508 start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % ( 1509 self.convert_to_adv_jiffies(invalid_min_adv_interval_ms), 1510 self.convert_to_adv_jiffies(invalid_max_adv_interval_ms)) 1511 1512 search_strings = ['Min advertising interval:', 1513 'Max advertising interval:'] 1514 search_str = '|'.join(search_strings) 1515 1516 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 1517 start_str=start_str) 1518 1519 # The min/max advertising intervals of all advertisements should remain 1520 # the same as the previous valid ones. 1521 min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec' 1522 min_max_pattern = re.compile(min_max_str) 1523 correct_orig_min_adv_interval = True 1524 correct_orig_max_adv_interval = True 1525 for line in contents: 1526 result = min_max_pattern.search(line) 1527 if result: 1528 interval = float(result.group(1)) 1529 if 'Min' in line and interval != orig_min_adv_interval_ms: 1530 correct_orig_min_adv_interval = False 1531 elif 'Max' in line and interval != orig_max_adv_interval_ms: 1532 correct_orig_max_adv_interval = False 1533 1534 self.results = { 1535 'invalid_intervals_error': invalid_intervals_error, 1536 'correct_orig_min_adv_interval': correct_orig_min_adv_interval, 1537 'correct_orig_max_adv_interval': correct_orig_max_adv_interval} 1538 1539 return all(self.results.values()) 1540 1541 1542 @_test_retry_and_log(False) 1543 def test_check_advertising_intervals(self, min_adv_interval_ms, 1544 max_adv_interval_ms): 1545 """Verify that the advertising intervals are as expected. 1546 1547 @param min_adv_interval_ms: the min advertising interval in ms. 1548 @param max_adv_interval_ms: the max advertising interval in ms. 1549 1550 @returns: True if the advertising intervals are correct. 1551 False otherwise. 1552 1553 """ 1554 self._get_btmon_log(None) 1555 1556 # Verify that the advertising intervals are correct. 1557 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 1558 self._verify_advertising_intervals(min_adv_interval_ms, 1559 max_adv_interval_ms)) 1560 1561 self.results = { 1562 'min_adv_interval_ms_found': min_adv_interval_ms_found, 1563 'max_adv_interval_ms_found': max_adv_interval_ms_found, 1564 } 1565 return all(self.results.values()) 1566 1567 1568 @_test_retry_and_log(False) 1569 def test_reset_advertising(self, instance_ids=[]): 1570 """Verify that advertising is reset correctly. 1571 1572 Note that reset advertising would set advertising intervals to 1573 the default values. However, we would not be able to observe 1574 the values change until new advertisements are registered. 1575 Therefore, it is required that a test_register_advertisement() 1576 test is conducted after this test. 1577 1578 If instance_ids is [], all advertisements would still be removed 1579 if there are any. However, no need to check 'Advertising Removed' 1580 in btmon log since we may or may not be able to observe the message. 1581 This feature is needed if this test is invoked as the first one in 1582 a test case to reset advertising. In this situation, this test does 1583 not know how many advertisements exist. 1584 1585 @param instance_ids: the list of instance IDs that should be removed. 1586 1587 @returns: True if advertising is reset correctly. 1588 False otherwise. 1589 1590 """ 1591 self.count_advertisements = 0 1592 self._get_btmon_log( 1593 lambda: self.bluetooth_le_facade.reset_advertising()) 1594 1595 # Verify that every advertisement is removed. When an advertisement 1596 # with instance id 1 is removed, the log looks like 1597 # @ Advertising Removed: 1 1598 txt = 'Advertising Removed: %d' 1599 for instance_id in instance_ids: 1600 if not self.bluetooth_le_facade.btmon_find(txt % instance_id): 1601 advertisement_removed = False 1602 logging.error('Failed to remove advertisement instance: %d', 1603 instance_id) 1604 break 1605 else: 1606 advertisement_removed = True 1607 1608 # Verify that "Reset Advertising Intervals" command has been issued. 1609 reset_advertising_intervals = self.bluetooth_le_facade.btmon_find( 1610 'bluetoothd: Reset Advertising Intervals') 1611 1612 # Verify the advertising is disabled. 1613 advertising_disabled_observied = self.bluetooth_le_facade.btmon_find( 1614 'Advertising: Disabled') 1615 # If there are no existing advertisements, we may not observe 1616 # 'Advertising: Disabled'. 1617 advertising_disabled = (instance_ids == [] or 1618 advertising_disabled_observied) 1619 1620 self.results = { 1621 'advertisement_removed': advertisement_removed, 1622 'reset_advertising_intervals': reset_advertising_intervals, 1623 'advertising_disabled': advertising_disabled, 1624 } 1625 return all(self.results.values()) 1626 1627 1628 # ------------------------------------------------------------------- 1629 # Bluetooth mouse related tests 1630 # ------------------------------------------------------------------- 1631 1632 1633 def _record_input_events(self, device, gesture): 1634 """Record the input events. 1635 1636 @param device: the bluetooth HID device. 1637 @param gesture: the gesture method to perform. 1638 1639 @returns: the input events received on the DUT. 1640 1641 """ 1642 self.input_facade.initialize_input_recorder(device.name) 1643 self.input_facade.start_input_recorder() 1644 time.sleep(self.HID_REPORT_SLEEP_SECS) 1645 gesture() 1646 time.sleep(self.HID_REPORT_SLEEP_SECS) 1647 self.input_facade.stop_input_recorder() 1648 time.sleep(self.HID_REPORT_SLEEP_SECS) 1649 event_values = self.input_facade.get_input_events() 1650 events = [Event(*ev) for ev in event_values] 1651 return events 1652 1653 1654 def _test_mouse_click(self, device, button): 1655 """Test that the mouse click events could be received correctly. 1656 1657 @param device: the meta device containing a bluetooth HID device 1658 @param button: which button to test, 'LEFT' or 'RIGHT' 1659 1660 @returns: True if the report received by the host matches the 1661 expected one. False otherwise. 1662 1663 """ 1664 if button == 'LEFT': 1665 gesture = device.LeftClick 1666 elif button == 'RIGHT': 1667 gesture = device.RightClick 1668 else: 1669 raise error.TestError('Button (%s) is not valid.' % button) 1670 1671 actual_events = self._record_input_events(device, gesture) 1672 1673 linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT} 1674 expected_events = [ 1675 # Button down 1676 recorder.MSC_SCAN_BTN_EVENT[button], 1677 Event(EV_KEY, linux_input_button[button], 1), 1678 recorder.SYN_EVENT, 1679 # Button up 1680 recorder.MSC_SCAN_BTN_EVENT[button], 1681 Event(EV_KEY, linux_input_button[button], 0), 1682 recorder.SYN_EVENT] 1683 1684 self.results = { 1685 'actual_events': map(str, actual_events), 1686 'expected_events': map(str, expected_events)} 1687 return actual_events == expected_events 1688 1689 1690 @_test_retry_and_log 1691 def test_mouse_left_click(self, device): 1692 """Test that the mouse left click events could be received correctly. 1693 1694 @param device: the meta device containing a bluetooth HID device 1695 1696 @returns: True if the report received by the host matches the 1697 expected one. False otherwise. 1698 1699 """ 1700 return self._test_mouse_click(device, 'LEFT') 1701 1702 1703 @_test_retry_and_log 1704 def test_mouse_right_click(self, device): 1705 """Test that the mouse right click events could be received correctly. 1706 1707 @param device: the meta device containing a bluetooth HID device 1708 1709 @returns: True if the report received by the host matches the 1710 expected one. False otherwise. 1711 1712 """ 1713 return self._test_mouse_click(device, 'RIGHT') 1714 1715 1716 def _test_mouse_move(self, device, delta_x=0, delta_y=0): 1717 """Test that the mouse move events could be received correctly. 1718 1719 @param device: the meta device containing a bluetooth HID device 1720 @param delta_x: the distance to move cursor in x axis 1721 @param delta_y: the distance to move cursor in y axis 1722 1723 @returns: True if the report received by the host matches the 1724 expected one. False otherwise. 1725 1726 """ 1727 gesture = lambda: device.Move(delta_x, delta_y) 1728 actual_events = self._record_input_events(device, gesture) 1729 1730 events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else [] 1731 events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else [] 1732 expected_events = events_x + events_y + [recorder.SYN_EVENT] 1733 1734 self.results = { 1735 'actual_events': map(str, actual_events), 1736 'expected_events': map(str, expected_events)} 1737 return actual_events == expected_events 1738 1739 1740 @_test_retry_and_log 1741 def test_mouse_move_in_x(self, device, delta_x): 1742 """Test that the mouse move events in x could be received correctly. 1743 1744 @param device: the meta device containing a bluetooth HID device 1745 @param delta_x: the distance to move cursor in x axis 1746 1747 @returns: True if the report received by the host matches the 1748 expected one. False otherwise. 1749 1750 """ 1751 return self._test_mouse_move(device, delta_x=delta_x) 1752 1753 1754 @_test_retry_and_log 1755 def test_mouse_move_in_y(self, device, delta_y): 1756 """Test that the mouse move events in y could be received correctly. 1757 1758 @param device: the meta device containing a bluetooth HID device 1759 @param delta_y: the distance to move cursor in y axis 1760 1761 @returns: True if the report received by the host matches the 1762 expected one. False otherwise. 1763 1764 """ 1765 return self._test_mouse_move(device, delta_y=delta_y) 1766 1767 1768 @_test_retry_and_log 1769 def test_mouse_move_in_xy(self, device, delta_x, delta_y): 1770 """Test that the mouse move events could be received correctly. 1771 1772 @param device: the meta device containing a bluetooth HID device 1773 @param delta_x: the distance to move cursor in x axis 1774 @param delta_y: the distance to move cursor in y axis 1775 1776 @returns: True if the report received by the host matches the 1777 expected one. False otherwise. 1778 1779 """ 1780 return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y) 1781 1782 1783 def _test_mouse_scroll(self, device, units): 1784 """Test that the mouse wheel events could be received correctly. 1785 1786 @param device: the meta device containing a bluetooth HID device 1787 @param units: the units to scroll in y axis 1788 1789 @returns: True if the report received by the host matches the 1790 expected one. False otherwise. 1791 1792 """ 1793 gesture = lambda: device.Scroll(units) 1794 actual_events = self._record_input_events(device, gesture) 1795 expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT] 1796 self.results = { 1797 'actual_events': map(str, actual_events), 1798 'expected_events': map(str, expected_events)} 1799 return actual_events == expected_events 1800 1801 1802 @_test_retry_and_log 1803 def test_mouse_scroll_down(self, device, delta_y): 1804 """Test that the mouse wheel events could be received correctly. 1805 1806 @param device: the meta device containing a bluetooth HID device 1807 @param delta_y: the units to scroll down in y axis; 1808 should be a postive value 1809 1810 @returns: True if the report received by the host matches the 1811 expected one. False otherwise. 1812 1813 """ 1814 if delta_y > 0: 1815 return self._test_mouse_scroll(device, delta_y) 1816 else: 1817 raise error.TestError('delta_y (%d) should be a positive value', 1818 delta_y) 1819 1820 1821 @_test_retry_and_log 1822 def test_mouse_scroll_up(self, device, delta_y): 1823 """Test that the mouse wheel events could be received correctly. 1824 1825 @param device: the meta device containing a bluetooth HID device 1826 @param delta_y: the units to scroll up in y axis; 1827 should be a postive value 1828 1829 @returns: True if the report received by the host matches the 1830 expected one. False otherwise. 1831 1832 """ 1833 if delta_y > 0: 1834 return self._test_mouse_scroll(device, -delta_y) 1835 else: 1836 raise error.TestError('delta_y (%d) should be a positive value', 1837 delta_y) 1838 1839 1840 @_test_retry_and_log 1841 def test_mouse_click_and_drag(self, device, delta_x, delta_y): 1842 """Test that the mouse click-and-drag events could be received 1843 correctly. 1844 1845 @param device: the meta device containing a bluetooth HID device 1846 @param delta_x: the distance to drag in x axis 1847 @param delta_y: the distance to drag in y axis 1848 1849 @returns: True if the report received by the host matches the 1850 expected one. False otherwise. 1851 1852 """ 1853 gesture = lambda: device.ClickAndDrag(delta_x, delta_y) 1854 actual_events = self._record_input_events(device, gesture) 1855 1856 button = 'LEFT' 1857 expected_events = ( 1858 [# Button down 1859 recorder.MSC_SCAN_BTN_EVENT[button], 1860 Event(EV_KEY, BTN_LEFT, 1), 1861 recorder.SYN_EVENT] + 1862 # cursor movement in x and y 1863 ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) + 1864 ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) + 1865 [recorder.SYN_EVENT] + 1866 # Button up 1867 [recorder.MSC_SCAN_BTN_EVENT[button], 1868 Event(EV_KEY, BTN_LEFT, 0), 1869 recorder.SYN_EVENT]) 1870 1871 self.results = { 1872 'actual_events': map(str, actual_events), 1873 'expected_events': map(str, expected_events)} 1874 return actual_events == expected_events 1875 1876 1877 # ------------------------------------------------------------------- 1878 # Autotest methods 1879 # ------------------------------------------------------------------- 1880 1881 1882 def initialize(self): 1883 """Initialize bluetooth adapter tests.""" 1884 # Run through every tests and collect failed tests in self.fails. 1885 self.fails = [] 1886 1887 # If a test depends on multiple conditions, write the results of 1888 # the conditions in self.results so that it is easy to know 1889 # what conditions failed by looking at the log. 1890 self.results = None 1891 1892 # Some tests may instantiate a peripheral device for testing. 1893 self.devices = dict() 1894 for device_type in SUPPORTED_DEVICE_TYPES: 1895 self.devices[device_type] = None 1896 1897 # The count of registered advertisements. 1898 self.count_advertisements = 0 1899 1900 def run_once(self, *args, **kwargs): 1901 """This method should be implemented by children classes. 1902 1903 Typically, the run_once() method would look like: 1904 1905 factory = remote_facade_factory.RemoteFacadeFactory(host) 1906 self.bluetooth_facade = factory.create_bluetooth_hid_facade() 1907 1908 self.test_bluetoothd_running() 1909 # ... 1910 # invoke more self.test_xxx() tests. 1911 # ... 1912 1913 if self.fails: 1914 raise error.TestFail(self.fails) 1915 1916 """ 1917 raise NotImplementedError 1918 1919 1920 def cleanup(self): 1921 """Clean up bluetooth adapter tests.""" 1922 # Close the device properly if a device is instantiated. 1923 # Note: do not write something like the following statements 1924 # if self.devices[device_type]: 1925 # or 1926 # if bool(self.devices[device_type]): 1927 # Otherwise, it would try to invoke bluetooth_mouse.__nonzero__() 1928 # which just does not exist. 1929 for device_type in SUPPORTED_DEVICE_TYPES: 1930 if self.devices[device_type] is not None: 1931 self.devices[device_type].Close() 1932