Home | History | Annotate | Download | only in bluetooth
      1 # Copyright 2016 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Server side bluetooth adapter subtests."""
      6 
      7 import inspect
      8 import functools
      9 import logging
     10 import re
     11 import time
     12 
     13 from autotest_lib.client.bin import utils
     14 from autotest_lib.client.bin.input import input_event_recorder as recorder
     15 from autotest_lib.client.common_lib import error
     16 from autotest_lib.server import test
     17 from autotest_lib.client.bin.input.linux_input import (
     18         BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL)
     19 
     20 
     21 REBOOTING_CHAMELEON = False
     22 
     23 Event = recorder.Event
     24 
     25 
     26 # Delay binding the methods since host is only available at run time.
     27 SUPPORTED_DEVICE_TYPES = {
     28     'MOUSE': lambda host: host.chameleon.get_bluetooth_hid_mouse,
     29     'LE_MOUSE': lambda host: host.chameleon.get_bluetooth_hog_mouse,
     30     'BLE_MOUSE': lambda host: host.chameleon.get_ble_mouse,
     31 }
     32 
     33 
     34 def method_name():
     35     """Get the method name of a class.
     36 
     37     This function is supposed to be invoked inside a class and will
     38     return current method name who invokes this function.
     39 
     40     @returns: the string of the method name inside the class.
     41     """
     42     return inspect.getouterframes(inspect.currentframe())[1][3]
     43 
     44 
     45 def _run_method(method, method_name, *args, **kwargs):
     46     """Run a target method and capture exceptions if any.
     47 
     48     This is just a wrapper of the target method so that we do not need to
     49     write the exception capturing structure repeatedly. The method could
     50     be either a device method or a facade method.
     51 
     52     @param method: the method to run
     53     @param method_name: the name of the method
     54 
     55     @returns: the return value of target method() if successful.
     56               False otherwise.
     57 
     58     """
     59     result = False
     60     try:
     61         result = method(*args, **kwargs)
     62     except Exception as e:
     63         logging.error('%s: %s', method_name, e)
     64     except:
     65         logging.error('%s: unexpected error', method_name)
     66     return result
     67 
     68 
     69 def get_bluetooth_emulated_device(host, device_type):
     70     """Get the bluetooth emulated device object.
     71 
     72     @param host: the DUT, usually a chromebook
     73     @param device_type : the bluetooth HID device type, e.g., 'MOUSE'
     74 
     75     @returns: the bluetooth device object
     76 
     77     """
     78 
     79     def _retry_device_method(method_name, legal_falsy_values=[]):
     80         """retry the emulated device's method.
     81 
     82         The method is invoked as device.xxxx() e.g., device.GetAdvertisedName().
     83 
     84         Note that the method name string is provided to get the device's actual
     85         method object at run time through getattr(). The rebinding is required
     86         because a new device may have been created previously or during the
     87         execution of fix_serial_device().
     88 
     89         Given a device's method, it is not feasible to get the method name
     90         through __name__ attribute. This limitation is due to the fact that
     91         the device is a dotted object of an XML RPC server proxy.
     92         As an example, with the method name 'GetAdvertisedName', we could
     93         derive the correspoinding method device.GetAdvertisedName. On the
     94         contrary, given device.GetAdvertisedName, it is not feasible to get the
     95         method name by device.GetAdvertisedName.__name__
     96 
     97         Also note that if the device method fails at the first time, we would
     98         try to fix the problem by re-creating the serial device and see if the
     99         problem is fixed. If not, we will reboot the chameleon board and see
    100         if the problem is fixed. If yes, execute the target method the second
    101         time.
    102 
    103         The default values exist for uses of this function before the options
    104         were added, ideally we should change zero_ok to False.
    105 
    106         @param method_name: the string of the method name.
    107         @param legal_falsy_values: Values that are falsy but might be OK.
    108 
    109         @returns: the result returned by the device's method.
    110 
    111         """
    112         result = _run_method(getattr(device, method_name), method_name)
    113         if _is_successful(result, legal_falsy_values):
    114             return result
    115 
    116         logging.error('%s failed the 1st time. Try to fix the serial device.',
    117                       method_name)
    118 
    119         # Try to fix the serial device if possible.
    120         if not fix_serial_device(host, device):
    121             return False
    122 
    123         logging.info('%s: retry the 2nd time.', method_name)
    124         return _run_method(getattr(device, method_name), method_name)
    125 
    126 
    127     if device_type not in SUPPORTED_DEVICE_TYPES:
    128         raise error.TestError('The device type is not supported: %s',
    129                               device_type)
    130 
    131     # Get the bluetooth device object and query some important properties.
    132     device = SUPPORTED_DEVICE_TYPES[device_type](host)()
    133 
    134     # Get some properties of the kit
    135     # NOTE: Strings updated here must be kept in sync with Chameleon.
    136     device._capabilities = _retry_device_method('GetCapabilities')
    137     device._transports = device._capabilities["CAP_TRANSPORTS"]
    138     device._is_le_only = ("TRANSPORT_LE" in device._transports and
    139                           len(device._transports) == 1)
    140     device._has_pin = device._capabilities["CAP_HAS_PIN"]
    141     device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"]
    142 
    143     _retry_device_method('Init')
    144     logging.info('device type: %s', device_type)
    145 
    146     device.name = _retry_device_method('GetAdvertisedName')
    147     logging.info('device name: %s', device.name)
    148 
    149     device.address = _retry_device_method('GetLocalBluetoothAddress')
    150     logging.info('address: %s', device.address)
    151 
    152     pin_falsy_values = [] if device._has_pin else [None]
    153     device.pin = _retry_device_method('GetPinCode', pin_falsy_values)
    154     logging.info('pin: %s', device.pin)
    155 
    156     class_falsy_values = [None] if device._is_le_only else [0]
    157 
    158     # Class of service is None for LE-only devices. Don't fail or parse it.
    159     device.class_of_service = _retry_device_method('GetClassOfService',
    160                                                    class_falsy_values)
    161     if device._is_le_only:
    162       parsed_class_of_service = device.class_of_service
    163     else:
    164       parsed_class_of_service = "0x%04X" % device.class_of_service
    165     logging.info('class of service: %s', parsed_class_of_service)
    166 
    167     device.class_of_device = _retry_device_method('GetClassOfDevice',
    168                                                   class_falsy_values)
    169     # Class of device is None for LE-only devices. Don't fail or parse it.
    170     if device._is_le_only:
    171       parsed_class_of_device = device.class_of_device
    172     else:
    173       parsed_class_of_device = "0x%04X" % device.class_of_device
    174     logging.info('class of device: %s', parsed_class_of_device)
    175 
    176     device.device_type = _retry_device_method('GetHIDDeviceType')
    177     logging.info('device type: %s', device.device_type)
    178 
    179     device.authentication_mode = None
    180     if not device._is_le_only:
    181       device.authentication_mode = _retry_device_method('GetAuthenticationMode')
    182       logging.info('authentication mode: %s', device.authentication_mode)
    183 
    184     device.port = _retry_device_method('GetPort')
    185     logging.info('serial port: %s\n', device.port)
    186 
    187     return device
    188 
    189 
    190 def recreate_serial_device(device):
    191     """Create and connect to a new serial device.
    192 
    193     @param device: the bluetooth HID device
    194 
    195     @returns: True if the serial device is re-created successfully.
    196 
    197     """
    198     logging.info('Remove the old serial device and create a new one.')
    199     if device is not None:
    200         try:
    201             device.Close()
    202         except:
    203             logging.error('failed to close the serial device.')
    204             return False
    205     try:
    206         device.CreateSerialDevice()
    207         return True
    208     except:
    209         logging.error('failed to invoke CreateSerialDevice.')
    210         return False
    211 
    212 
    213 def _reboot_chameleon(host, device):
    214     REBOOT_SLEEP_SECS = 40
    215 
    216     if not REBOOTING_CHAMELEON:
    217         logging.info('Skip rebooting chameleon.')
    218         return False
    219 
    220     # Close the bluetooth peripheral device and reboot the chameleon board.
    221     device.Close()
    222     logging.info('rebooting chameleon...')
    223     host.chameleon.reboot()
    224 
    225     # Every chameleon reboot would take a bit more than REBOOT_SLEEP_SECS.
    226     # Sleep REBOOT_SLEEP_SECS and then begin probing the chameleon board.
    227     time.sleep(REBOOT_SLEEP_SECS)
    228 
    229     # Check if the serial device could initialize, connect, and
    230     # enter command mode correctly.
    231     logging.info('Checking device status...')
    232     if not _run_method(device.Init, 'Init'):
    233         logging.info('device.Init: failed after reboot')
    234         return False
    235     if not device.CheckSerialConnection():
    236         logging.info('device.CheckSerialConnection: failed after reboot')
    237         return False
    238     if not _run_method(device.EnterCommandMode, 'EnterCommandMode'):
    239         logging.info('device.EnterCommandMode: failed after reboot')
    240         return False
    241     logging.info('The device is created successfully after reboot.')
    242     return True
    243 
    244 
    245 def _is_successful(result, legal_falsy_values=[]):
    246     """Is the method result considered successful?
    247 
    248     Some method results, for example that of class_of_service, may be 0 which is
    249     considered a valid result. Occassionally, None is acceptable.
    250 
    251     The default values exist for uses of this function before the options were
    252     added, ideally we should change zero_ok to False.
    253 
    254     @param result: a method result
    255     @param legal_falsy_values: Values that are falsy but might be OK.
    256 
    257     @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if
    258               result is None and none_ok.
    259     """
    260     truthiness_of_result = bool(result)
    261     return truthiness_of_result or result in legal_falsy_values
    262 
    263 
    264 def fix_serial_device(host, device):
    265     """Fix the serial device.
    266 
    267     This function tries to fix the serial device by
    268     (1) re-creating a serial device, or
    269     (2) rebooting the chameleon board.
    270 
    271     @param host: the DUT, usually a chromebook
    272     @param device: the bluetooth HID device
    273 
    274     @returns: True if the serial device is fixed. False otherwise.
    275 
    276     """
    277     # Check the serial connection. Fix it if needed.
    278     if device.CheckSerialConnection():
    279         # The USB serial connection still exists.
    280         # Re-connection suffices to solve the problem. The problem
    281         # is usually caused by serial port change. For example,
    282         # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1.
    283         logging.info('retry: creating a new serial device...')
    284         if not recreate_serial_device(device):
    285             return False
    286 
    287     # Check if recreate_serial_device() above fixes the problem.
    288     # If not, reboot the chameleon board including creation of a new
    289     # bluetooth device. Check if reboot fixes the problem.
    290     # If not, return False.
    291     result = _run_method(device.EnterCommandMode, 'EnterCommandMode')
    292     return _is_successful(result) or _reboot_chameleon(host, device)
    293 
    294 
    295 def retry(test_method, instance, *args, **kwargs):
    296     """Execute the target facade test_method(). Retry if failing the first time.
    297 
    298     A test_method is something like self.test_xxxx() in BluetoothAdapterTests,
    299     e.g., BluetoothAdapterTests.test_bluetoothd_running().
    300 
    301     @param test_method: the test method to retry
    302 
    303     @returns: True if the return value of test_method() is successful.
    304               False otherwise.
    305 
    306     """
    307     if _is_successful(_run_method(test_method, test_method.__name__,
    308                                   instance, *args, **kwargs)):
    309         return True
    310 
    311     # Try to fix the serial device if applicable.
    312     logging.error('%s failed at the 1st time.', test_method.__name__)
    313 
    314     # If this test does not use any attached serial device, just re-run
    315     # the test.
    316     logging.info('%s: retry the 2nd time.', test_method.__name__)
    317     time.sleep(1)
    318     if not hasattr(instance, 'device_type'):
    319         return _is_successful(_run_method(test_method, test_method.__name__,
    320                                           instance, *args, **kwargs))
    321 
    322     host = instance.host
    323     device = instance.devices[instance.device_type]
    324     if not fix_serial_device(host, device):
    325         return False
    326 
    327     logging.info('%s: retry the 2nd time.', test_method.__name__)
    328     return _is_successful(_run_method(test_method, test_method.__name__,
    329                                       instance, *args, **kwargs))
    330 
    331 
    332 def _test_retry_and_log(test_method_or_retry_flag):
    333     """A decorator that logs test results, collects error messages, and retries
    334        on request.
    335 
    336     @param test_method_or_retry_flag: either the test_method or a retry_flag.
    337         There are some possibilities of this argument:
    338         1. the test_method to conduct and retry: should retry the test_method.
    339             This occurs with
    340             @_test_retry_and_log
    341         2. the retry flag is True. Should retry the test_method.
    342             This occurs with
    343             @_test_retry_and_log(True)
    344         3. the retry flag is False. Do not retry the test_method.
    345             This occurs with
    346             @_test_retry_and_log(False)
    347 
    348     @returns: a wrapper of the test_method with test log. The retry mechanism
    349         would depend on the retry flag.
    350 
    351     """
    352 
    353     def decorator(test_method):
    354         """A decorator wrapper of the decorated test_method.
    355 
    356         @param test_method: the test method being decorated.
    357 
    358         @returns the wrapper of the test method.
    359 
    360         """
    361         @functools.wraps(test_method)
    362         def wrapper(instance, *args, **kwargs):
    363             """A wrapper of the decorated method.
    364 
    365             @param instance: an BluetoothAdapterTests instance
    366 
    367             @returns the result of the test method
    368 
    369             """
    370             instance.results = None
    371             if callable(test_method_or_retry_flag) or test_method_or_retry_flag:
    372                 test_result = retry(test_method, instance, *args, **kwargs)
    373             else:
    374                 test_result = test_method(instance, *args, **kwargs)
    375 
    376             if test_result:
    377                 logging.info('[*** passed: %s]', test_method.__name__)
    378             else:
    379                 fail_msg = '[--- failed: %s (%s)]' % (test_method.__name__,
    380                                                       str(instance.results))
    381                 logging.error(fail_msg)
    382                 instance.fails.append(fail_msg)
    383             return test_result
    384         return wrapper
    385 
    386     if callable(test_method_or_retry_flag):
    387         # If the decorator function comes with no argument like
    388         # @_test_retry_and_log
    389         return decorator(test_method_or_retry_flag)
    390     else:
    391         # If the decorator function comes with an argument like
    392         # @_test_retry_and_log(False)
    393         return decorator
    394 
    395 
    396 def test_case_log(method):
    397     """A decorator for test case methods.
    398 
    399     The main purpose of this decorator is to display the test case name
    400     in the test log which looks like
    401 
    402         <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...>
    403 
    404     @param method: the test case method to decorate.
    405 
    406     @returns: a wrapper function of the decorated method.
    407 
    408     """
    409     @functools.wraps(method)
    410     def wrapper(instance, *args, **kwargs):
    411         """Log the name of the wrapped method before execution"""
    412         logging.info('\n<... %s ...>', method.__name__)
    413         method(instance, *args, **kwargs)
    414     return wrapper
    415 
    416 
    417 class BluetoothAdapterTests(test.test):
    418     """Server side bluetooth adapter tests.
    419 
    420     This test class tries to thoroughly verify most of the important work
    421     states of a bluetooth adapter.
    422 
    423     The various test methods are supposed to be invoked by actual autotest
    424     tests such as server/cros/site_tests/bluetooth_Adapter*.
    425 
    426     """
    427     version = 1
    428     ADAPTER_ACTION_SLEEP_SECS = 1
    429     ADAPTER_PAIRING_TIMEOUT_SECS = 60
    430     ADAPTER_CONNECTION_TIMEOUT_SECS = 30
    431     ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30
    432     ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3
    433     ADAPTER_DISCOVER_TIMEOUT_SECS = 60          # 30 seconds too short sometimes
    434     ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1
    435     ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30
    436     # TODO(shijinabraham@) Remove when crbug/905374 is fixed
    437     ADAPTER_STOP_DISCOVERY_TIMEOUT_SECS = 180
    438 
    439     ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10
    440     ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1
    441 
    442     HID_REPORT_SLEEP_SECS = 1
    443 
    444     # Default suspend time in seconds for suspend resume.
    445     SUSPEND_TIME_SECS=10
    446 
    447     # hci0 is the default hci device if there is no external bluetooth dongle.
    448     EXPECTED_HCI = 'hci0'
    449 
    450     CLASS_OF_SERVICE_MASK = 0xFFE000
    451     CLASS_OF_DEVICE_MASK = 0x001FFF
    452 
    453     # Constants about advertising.
    454     DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 1280
    455     DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 1280
    456     ADVERTISING_INTERVAL_UNIT = 0.625
    457 
    458     # Error messages about advertising dbus methods.
    459     ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = (
    460             'org.bluez.Error.Failed: Failed to register advertisement')
    461     ERROR_INVALID_ADVERTISING_INTERVALS = (
    462             'org.bluez.Error.InvalidArguments: Invalid arguments')
    463 
    464     # Supported profiles by chrome os.
    465     SUPPORTED_UUIDS = {
    466             'HSP_AG_UUID': '00001112-0000-1000-8000-00805f9b34fb',
    467             'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb',
    468             'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb',
    469             'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb',
    470             'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb',
    471             'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'}
    472 
    473 
    474     def get_device(self, device_type):
    475         """Get the bluetooth device object.
    476 
    477         @param device_type : the bluetooth HID device type, e.g., 'MOUSE'
    478 
    479         @returns: the bluetooth device object
    480 
    481         """
    482         self.device_type = device_type
    483         if self.devices[device_type] is None:
    484             self.devices[device_type] = get_bluetooth_emulated_device(
    485                     self.host, device_type)
    486         return self.devices[device_type]
    487 
    488 
    489     def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS):
    490         """Suspend the DUT for a while and then resume.
    491 
    492         @param suspend_time: the suspend time in secs
    493 
    494         """
    495         logging.info('The DUT suspends for %d seconds...', suspend_time)
    496         try:
    497             self.host.suspend(suspend_time=suspend_time)
    498         except error.AutoservSuspendError:
    499             logging.error('The DUT did not suspend for %d seconds', suspend_time)
    500             pass
    501         logging.info('The DUT is waken up.')
    502 
    503 
    504     def _wait_for_condition(self, func, method_name,
    505                             timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS,
    506                             sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS):
    507         """Wait for the func() to become True.
    508 
    509         @param fun: the function to wait for.
    510         @param method_name: the invoking class method.
    511         @param timeout: number of seconds to wait before giving up.
    512         @param sleep_interval: the interval in seconds to sleep between
    513                 invoking func().
    514 
    515         @returns: the bluetooth device object
    516 
    517         """
    518 
    519         try:
    520             utils.poll_for_condition(condition=func,
    521                                      timeout=timeout,
    522                                      sleep_interval=sleep_interval,
    523                                      desc=('Waiting %s' % method_name))
    524             return True
    525         except utils.TimeoutError as e:
    526             logging.error('%s: %s', method_name, e)
    527         except Exception as e:
    528             logging.error('%s: %s', method_name, e)
    529             err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
    530             logging.error(err)
    531         except:
    532             logging.error('%s: unexpected error', method_name)
    533         return False
    534 
    535 
    536     # -------------------------------------------------------------------
    537     # Adater standalone tests
    538     # -------------------------------------------------------------------
    539 
    540 
    541     @_test_retry_and_log
    542     def test_bluetoothd_running(self):
    543         """Test that bluetoothd is running."""
    544         return self.bluetooth_facade.is_bluetoothd_running()
    545 
    546 
    547     @_test_retry_and_log
    548     def test_start_bluetoothd(self):
    549         """Test that bluetoothd could be started successfully."""
    550         return self.bluetooth_facade.start_bluetoothd()
    551 
    552 
    553     @_test_retry_and_log
    554     def test_stop_bluetoothd(self):
    555         """Test that bluetoothd could be stopped successfully."""
    556         return self.bluetooth_facade.stop_bluetoothd()
    557 
    558 
    559     @_test_retry_and_log
    560     def test_adapter_work_state(self):
    561         """Test that the bluetooth adapter is in the correct working state.
    562 
    563         This includes that the adapter is detectable, is powered on,
    564         and its hci device is hci0.
    565         """
    566         has_adapter = self.bluetooth_facade.has_adapter()
    567         is_powered_on = self._wait_for_condition(
    568                 self.bluetooth_facade.is_powered_on, method_name())
    569         hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI
    570         self.results = {
    571                 'has_adapter': has_adapter,
    572                 'is_powered_on': is_powered_on,
    573                 'hci': hci}
    574         return all(self.results.values())
    575 
    576 
    577     @_test_retry_and_log
    578     def test_power_on_adapter(self):
    579         """Test that the adapter could be powered on successfully."""
    580         power_on = self.bluetooth_facade.set_powered(True)
    581         is_powered_on = self._wait_for_condition(
    582                 self.bluetooth_facade.is_powered_on, method_name())
    583 
    584         self.results = {'power_on': power_on, 'is_powered_on': is_powered_on}
    585         return all(self.results.values())
    586 
    587 
    588     @_test_retry_and_log
    589     def test_power_off_adapter(self):
    590         """Test that the adapter could be powered off successfully."""
    591         power_off = self.bluetooth_facade.set_powered(False)
    592         is_powered_off = self._wait_for_condition(
    593                 lambda: not self.bluetooth_facade.is_powered_on(),
    594                 method_name())
    595 
    596         self.results = {
    597                 'power_off': power_off,
    598                 'is_powered_off': is_powered_off}
    599         return all(self.results.values())
    600 
    601 
    602     @_test_retry_and_log
    603     def test_reset_on_adapter(self):
    604         """Test that the adapter could be reset on successfully.
    605 
    606         This includes restarting bluetoothd, and removing the settings
    607         and cached devices.
    608         """
    609         reset_on = self.bluetooth_facade.reset_on()
    610         is_powered_on = self._wait_for_condition(
    611                 self.bluetooth_facade.is_powered_on, method_name())
    612 
    613         self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on}
    614         return all(self.results.values())
    615 
    616 
    617     @_test_retry_and_log
    618     def test_reset_off_adapter(self):
    619         """Test that the adapter could be reset off successfully.
    620 
    621         This includes restarting bluetoothd, and removing the settings
    622         and cached devices.
    623         """
    624         reset_off = self.bluetooth_facade.reset_off()
    625         is_powered_off = self._wait_for_condition(
    626                 lambda: not self.bluetooth_facade.is_powered_on(),
    627                 method_name())
    628 
    629         self.results = {
    630                 'reset_off': reset_off,
    631                 'is_powered_off': is_powered_off}
    632         return all(self.results.values())
    633 
    634 
    635     @_test_retry_and_log
    636     def test_UUIDs(self):
    637         """Test that basic profiles are supported."""
    638         adapter_UUIDs = self.bluetooth_facade.get_UUIDs()
    639         self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values()
    640                         if uuid not in adapter_UUIDs]
    641         return not bool(self.results)
    642 
    643 
    644     @_test_retry_and_log
    645     def test_start_discovery(self):
    646         """Test that the adapter could start discovery."""
    647         start_discovery = self.bluetooth_facade.start_discovery()
    648         is_discovering = self._wait_for_condition(
    649                 self.bluetooth_facade.is_discovering, method_name())
    650 
    651         self.results = {
    652                 'start_discovery': start_discovery,
    653                 'is_discovering': is_discovering}
    654         return all(self.results.values())
    655 
    656 
    657     @_test_retry_and_log
    658     def test_stop_discovery(self):
    659         """Test that the adapter could stop discovery."""
    660         stop_discovery = self.bluetooth_facade.stop_discovery()
    661         is_not_discovering = self._wait_for_condition(
    662                 lambda: not self.bluetooth_facade.is_discovering(),
    663                 method_name(),
    664                 timeout=self.ADAPTER_STOP_DISCOVERY_TIMEOUT_SECS)
    665 
    666         self.results = {
    667                 'stop_discovery': stop_discovery,
    668                 'is_not_discovering': is_not_discovering}
    669         return all(self.results.values())
    670 
    671 
    672     @_test_retry_and_log
    673     def test_discoverable(self):
    674         """Test that the adapter could be set discoverable."""
    675         set_discoverable = self.bluetooth_facade.set_discoverable(True)
    676         is_discoverable = self._wait_for_condition(
    677                 self.bluetooth_facade.is_discoverable, method_name())
    678 
    679         self.results = {
    680                 'set_discoverable': set_discoverable,
    681                 'is_discoverable': is_discoverable}
    682         return all(self.results.values())
    683 
    684 
    685     @_test_retry_and_log
    686     def test_nondiscoverable(self):
    687         """Test that the adapter could be set non-discoverable."""
    688         set_nondiscoverable = self.bluetooth_facade.set_discoverable(False)
    689         is_nondiscoverable = self._wait_for_condition(
    690                 lambda: not self.bluetooth_facade.is_discoverable(),
    691                 method_name())
    692 
    693         self.results = {
    694                 'set_nondiscoverable': set_nondiscoverable,
    695                 'is_nondiscoverable': is_nondiscoverable}
    696         return all(self.results.values())
    697 
    698 
    699     @_test_retry_and_log
    700     def test_pairable(self):
    701         """Test that the adapter could be set pairable."""
    702         set_pairable = self.bluetooth_facade.set_pairable(True)
    703         is_pairable = self._wait_for_condition(
    704                 self.bluetooth_facade.is_pairable, method_name())
    705 
    706         self.results = {
    707                 'set_pairable': set_pairable,
    708                 'is_pairable': is_pairable}
    709         return all(self.results.values())
    710 
    711 
    712     @_test_retry_and_log
    713     def test_nonpairable(self):
    714         """Test that the adapter could be set non-pairable."""
    715         set_nonpairable = self.bluetooth_facade.set_pairable(False)
    716         is_nonpairable = self._wait_for_condition(
    717                 lambda: not self.bluetooth_facade.is_pairable(), method_name())
    718 
    719         self.results = {
    720                 'set_nonpairable': set_nonpairable,
    721                 'is_nonpairable': is_nonpairable}
    722         return all(self.results.values())
    723 
    724 
    725     # -------------------------------------------------------------------
    726     # Tests about general discovering, pairing, and connection
    727     # -------------------------------------------------------------------
    728 
    729 
    730     @_test_retry_and_log
    731     def test_discover_device(self, device_address):
    732         """Test that the adapter could discover the specified device address.
    733 
    734         @param device_address: Address of the device.
    735 
    736         @returns: True if the device is found. False otherwise.
    737 
    738         """
    739         has_device_initially = False
    740         start_discovery = False
    741         device_discovered = False
    742         has_device = self.bluetooth_facade.has_device
    743 
    744         if has_device(device_address):
    745             has_device_initially = True
    746         elif self.bluetooth_facade.start_discovery():
    747             start_discovery = True
    748             try:
    749                 utils.poll_for_condition(
    750                         condition=(lambda: has_device(device_address)),
    751                         timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
    752                         sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
    753                         desc='Waiting for discovering %s' % device_address)
    754                 device_discovered = True
    755             except utils.TimeoutError as e:
    756                 logging.error('test_discover_device: %s', e)
    757             except Exception as e:
    758                 logging.error('test_discover_device: %s', e)
    759                 err = 'bluetoothd probably crashed. Check out /var/log/messages'
    760                 logging.error(err)
    761             except:
    762                 logging.error('test_discover_device: unexpected error')
    763 
    764         self.results = {
    765                 'has_device_initially': has_device_initially,
    766                 'start_discovery': start_discovery,
    767                 'device_discovered': device_discovered}
    768         return has_device_initially or device_discovered
    769 
    770 
    771     @_test_retry_and_log
    772     def test_pairing(self, device_address, pin, trusted=True):
    773         """Test that the adapter could pair with the device successfully.
    774 
    775         @param device_address: Address of the device.
    776         @param pin: pin code to pair with the device.
    777         @param trusted: indicating whether to set the device trusted.
    778 
    779         @returns: True if pairing succeeds. False otherwise.
    780 
    781         """
    782 
    783         def _pair_device():
    784             """Pair to the device.
    785 
    786             @returns: True if it could pair with the device. False otherwise.
    787 
    788             """
    789             return self.bluetooth_facade.pair_legacy_device(
    790                     device_address, pin, trusted,
    791                     self.ADAPTER_PAIRING_TIMEOUT_SECS)
    792 
    793 
    794         has_device = False
    795         paired = False
    796         if self.bluetooth_facade.has_device(device_address):
    797             has_device = True
    798             try:
    799                 utils.poll_for_condition(
    800                         condition=_pair_device,
    801                         timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
    802                         sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
    803                         desc='Waiting for pairing %s' % device_address)
    804                 paired = True
    805             except utils.TimeoutError as e:
    806                 logging.error('test_pairing: %s', e)
    807             except:
    808                 logging.error('test_pairing: unexpected error')
    809 
    810         self.results = {'has_device': has_device, 'paired': paired}
    811         return all(self.results.values())
    812 
    813 
    814     @_test_retry_and_log
    815     def test_remove_pairing(self, device_address):
    816         """Test that the adapter could remove the paired device.
    817 
    818         @param device_address: Address of the device.
    819 
    820         @returns: True if the device is removed successfully. False otherwise.
    821 
    822         """
    823         device_is_paired_initially = self.bluetooth_facade.device_is_paired(
    824                 device_address)
    825         remove_pairing = False
    826         pairing_removed = False
    827 
    828         if device_is_paired_initially:
    829             remove_pairing = self.bluetooth_facade.remove_device_object(
    830                     device_address)
    831             pairing_removed = not self.bluetooth_facade.device_is_paired(
    832                     device_address)
    833 
    834         self.results = {
    835                 'device_is_paired_initially': device_is_paired_initially,
    836                 'remove_pairing': remove_pairing,
    837                 'pairing_removed': pairing_removed}
    838         return all(self.results.values())
    839 
    840 
    841     def test_set_trusted(self, device_address, trusted=True):
    842         """Test whether the device with the specified address is trusted.
    843 
    844         @param device_address: Address of the device.
    845         @param trusted : True or False indicating if trusted is expected.
    846 
    847         @returns: True if the device's "Trusted" property is as specified;
    848                   False otherwise.
    849 
    850         """
    851 
    852         set_trusted = self.bluetooth_facade.set_trusted(
    853                 device_address, trusted)
    854 
    855         properties = self.bluetooth_facade.get_device_properties(
    856                 device_address)
    857         actual_trusted = properties.get('Trusted')
    858 
    859         self.results = {
    860                 'set_trusted': set_trusted,
    861                 'actual trusted': actual_trusted,
    862                 'expected trusted': trusted}
    863         return actual_trusted == trusted
    864 
    865 
    866     @_test_retry_and_log
    867     def test_connection_by_adapter(self, device_address):
    868         """Test that the adapter of dut could connect to the device successfully
    869 
    870         It is the caller's responsibility to pair to the device before
    871         doing connection.
    872 
    873         @param device_address: Address of the device.
    874 
    875         @returns: True if connection is performed. False otherwise.
    876 
    877         """
    878 
    879         def _connect_device():
    880             """Connect to the device.
    881 
    882             @returns: True if it could connect to the device. False otherwise.
    883 
    884             """
    885             return self.bluetooth_facade.connect_device(device_address)
    886 
    887 
    888         has_device = False
    889         connected = False
    890         if self.bluetooth_facade.has_device(device_address):
    891             has_device = True
    892             try:
    893                 utils.poll_for_condition(
    894                         condition=_connect_device,
    895                         timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
    896                         sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
    897                         desc='Waiting for connecting to %s' % device_address)
    898                 connected = True
    899             except utils.TimeoutError as e:
    900                 logging.error('test_connection_by_adapter: %s', e)
    901             except:
    902                 logging.error('test_connection_by_adapter: unexpected error')
    903 
    904         self.results = {'has_device': has_device, 'connected': connected}
    905         return all(self.results.values())
    906 
    907 
    908     @_test_retry_and_log
    909     def test_disconnection_by_adapter(self, device_address):
    910         """Test that the adapter of dut could disconnect the device successfully
    911 
    912         @param device_address: Address of the device.
    913 
    914         @returns: True if disconnection is performed. False otherwise.
    915 
    916         """
    917         return self.bluetooth_facade.disconnect_device(device_address)
    918 
    919 
    920     def _enter_command_mode(self, device):
    921         """Let the device enter command mode.
    922 
    923         Before using the device, need to call this method to make sure
    924         it is in the command mode.
    925 
    926         @param device: the bluetooth HID device
    927 
    928         @returns: True if successful. False otherwise.
    929 
    930         """
    931         result = _is_successful(_run_method(device.EnterCommandMode,
    932                                             'EnterCommandMode'))
    933         if not result:
    934             logging.error('EnterCommandMode failed')
    935         return result
    936 
    937 
    938     @_test_retry_and_log
    939     def test_connection_by_device(self, device):
    940         """Test that the device could connect to the adapter successfully.
    941 
    942         This emulates the behavior that a device may initiate a
    943         connection request after waking up from power saving mode.
    944 
    945         @param device: the bluetooth HID device
    946 
    947         @returns: True if connection is performed correctly by device and
    948                   the adapter also enters connection state.
    949                   False otherwise.
    950 
    951         """
    952         if not self._enter_command_mode(device):
    953             return False
    954 
    955         method_name = 'test_connection_by_device'
    956         connection_by_device = False
    957         adapter_address = self.bluetooth_facade.address
    958         try:
    959             device.ConnectToRemoteAddress(adapter_address)
    960             connection_by_device = True
    961         except Exception as e:
    962             logging.error('%s (device): %s', method_name, e)
    963         except:
    964             logging.error('%s (device): unexpected error', method_name)
    965 
    966         connection_seen_by_adapter = False
    967         device_address = device.address
    968         device_is_connected = self.bluetooth_facade.device_is_connected
    969         try:
    970             utils.poll_for_condition(
    971                     condition=lambda: device_is_connected(device_address),
    972                     timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
    973                     desc=('Waiting for connection from %s' % device_address))
    974             connection_seen_by_adapter = True
    975         except utils.TimeoutError as e:
    976             logging.error('%s (adapter): %s', method_name, e)
    977         except:
    978             logging.error('%s (adapter): unexpected error', method_name)
    979 
    980         self.results = {
    981                 'connection_by_device': connection_by_device,
    982                 'connection_seen_by_adapter': connection_seen_by_adapter}
    983         return all(self.results.values())
    984 
    985 
    986     @_test_retry_and_log
    987     def test_disconnection_by_device(self, device):
    988         """Test that the device could disconnect the adapter successfully.
    989 
    990         This emulates the behavior that a device may initiate a
    991         disconnection request before going into power saving mode.
    992 
    993         Note: should not try to enter command mode in this method. When
    994               a device is connected, there is no way to enter command mode.
    995               One could just issue a special disconnect command without
    996               entering command mode.
    997 
    998         @param device: the bluetooth HID device
    999 
   1000         @returns: True if disconnection is performed correctly by device and
   1001                   the adapter also observes the disconnection.
   1002                   False otherwise.
   1003 
   1004         """
   1005         method_name = 'test_disconnection_by_device'
   1006         disconnection_by_device = False
   1007         try:
   1008             device.Disconnect()
   1009             disconnection_by_device = True
   1010         except Exception as e:
   1011             logging.error('%s (device): %s', method_name, e)
   1012         except:
   1013             logging.error('%s (device): unexpected error', method_name)
   1014 
   1015         disconnection_seen_by_adapter = False
   1016         device_address = device.address
   1017         device_is_connected = self.bluetooth_facade.device_is_connected
   1018         try:
   1019             utils.poll_for_condition(
   1020                     condition=lambda: not device_is_connected(device_address),
   1021                     timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS,
   1022                     desc=('Waiting for disconnection from %s' % device_address))
   1023             disconnection_seen_by_adapter = True
   1024         except utils.TimeoutError as e:
   1025             logging.error('%s (adapter): %s', method_name, e)
   1026         except:
   1027             logging.error('%s (adapter): unexpected error', method_name)
   1028 
   1029         self.results = {
   1030                 'disconnection_by_device': disconnection_by_device,
   1031                 'disconnection_seen_by_adapter': disconnection_seen_by_adapter}
   1032         return all(self.results.values())
   1033 
   1034     @_test_retry_and_log
   1035     def test_device_is_connected(self, device_address):
   1036         """Test that device address given is currently connected.
   1037 
   1038         @param device_address: Address of the device.
   1039 
   1040         @returns: True if the device is connected.
   1041                   False otherwise.
   1042 
   1043         """
   1044         def _is_connected():
   1045             """Test if device is connected.
   1046 
   1047             @returns: True if device is connected. False otherwise.
   1048 
   1049             """
   1050             return self.bluetooth_facade.device_is_connected(device_address)
   1051 
   1052 
   1053         method_name = 'test_device_is_connected'
   1054         has_device = False
   1055         connected = False
   1056         if self.bluetooth_facade.has_device(device_address):
   1057             has_device = True
   1058             try:
   1059                 utils.poll_for_condition(
   1060                         condition=_is_connected,
   1061                         timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
   1062                         sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
   1063                         desc='Waiting for connection to %s' % device_address)
   1064                 connected = True
   1065             except utils.TimeoutError as e:
   1066                 logging.error('%s: %s', method_name, e)
   1067             except:
   1068                 logging.error('%s: unexpected error', method_name)
   1069         self.results = {'has_device': has_device, 'connected': connected}
   1070         return all(self.results.values())
   1071 
   1072 
   1073     @_test_retry_and_log
   1074     def test_device_is_paired(self, device_address):
   1075         """Test that the device address given is currently paired.
   1076 
   1077         @param device_address: Address of the device.
   1078 
   1079         @returns: True if the device is paired.
   1080                   False otherwise.
   1081 
   1082         """
   1083         def _is_paired():
   1084             """Test if device is paired.
   1085 
   1086             @returns: True if device is paired. False otherwise.
   1087 
   1088             """
   1089             return self.bluetooth_facade.device_is_paired(device_address)
   1090 
   1091 
   1092         method_name = 'test_device_is_paired'
   1093         has_device = False
   1094         paired = False
   1095         if self.bluetooth_facade.has_device(device_address):
   1096             has_device = True
   1097             try:
   1098                 utils.poll_for_condition(
   1099                         condition=_is_paired,
   1100                         timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
   1101                         sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
   1102                         desc='Waiting for connection to %s' % device_address)
   1103                 paired = True
   1104             except utils.TimeoutError as e:
   1105                 logging.error('%s: %s', method_name, e)
   1106             except:
   1107                 logging.error('%s: unexpected error', method_name)
   1108         self.results = {'has_device': has_device, 'paired': paired}
   1109         return all(self.results.values())
   1110 
   1111 
   1112     def _get_device_name(self, device_address):
   1113         """Get the device name.
   1114 
   1115         @returns: True if the device name is derived. None otherwise.
   1116 
   1117         """
   1118         properties = self.bluetooth_facade.get_device_properties(
   1119                 device_address)
   1120         self.discovered_device_name = properties.get('Name')
   1121         return bool(self.discovered_device_name)
   1122 
   1123 
   1124     @_test_retry_and_log
   1125     def test_device_name(self, device_address, expected_device_name):
   1126         """Test that the device name discovered by the adapter is correct.
   1127 
   1128         @param device_address: Address of the device.
   1129         @param expected_device_name: the bluetooth device name
   1130 
   1131         @returns: True if the discovered_device_name is expected_device_name.
   1132                   False otherwise.
   1133 
   1134         """
   1135         try:
   1136             utils.poll_for_condition(
   1137                     condition=lambda: self._get_device_name(device_address),
   1138                     timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS,
   1139                     sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
   1140                     desc='Waiting for device name of %s' % device_address)
   1141         except utils.TimeoutError as e:
   1142             logging.error('test_device_name: %s', e)
   1143         except:
   1144             logging.error('test_device_name: unexpected error')
   1145 
   1146         self.results = {
   1147                 'expected_device_name': expected_device_name,
   1148                 'discovered_device_name': self.discovered_device_name}
   1149         return self.discovered_device_name == expected_device_name
   1150 
   1151 
   1152     @_test_retry_and_log
   1153     def test_device_class_of_service(self, device_address,
   1154                                      expected_class_of_service):
   1155         """Test that the discovered device class of service is as expected.
   1156 
   1157         @param device_address: Address of the device.
   1158         @param expected_class_of_service: the expected class of service
   1159 
   1160         @returns: True if the discovered class of service matches the
   1161                   expected class of service. False otherwise.
   1162 
   1163         """
   1164         properties = self.bluetooth_facade.get_device_properties(
   1165                 device_address)
   1166         device_class = properties.get('Class')
   1167         discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK
   1168                                        if device_class else None)
   1169 
   1170         self.results = {
   1171                 'device_class': device_class,
   1172                 'expected_class_of_service': expected_class_of_service,
   1173                 'discovered_class_of_service': discovered_class_of_service}
   1174         return discovered_class_of_service == expected_class_of_service
   1175 
   1176 
   1177     @_test_retry_and_log
   1178     def test_device_class_of_device(self, device_address,
   1179                                     expected_class_of_device):
   1180         """Test that the discovered device class of device is as expected.
   1181 
   1182         @param device_address: Address of the device.
   1183         @param expected_class_of_device: the expected class of device
   1184 
   1185         @returns: True if the discovered class of device matches the
   1186                   expected class of device. False otherwise.
   1187 
   1188         """
   1189         properties = self.bluetooth_facade.get_device_properties(
   1190                 device_address)
   1191         device_class = properties.get('Class')
   1192         discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK
   1193                                       if device_class else None)
   1194 
   1195         self.results = {
   1196                 'device_class': device_class,
   1197                 'expected_class_of_device': expected_class_of_device,
   1198                 'discovered_class_of_device': discovered_class_of_device}
   1199         return discovered_class_of_device == expected_class_of_device
   1200 
   1201 
   1202     def _get_btmon_log(self, method, logging_timespan=1):
   1203         """Capture the btmon log when executing the specified method.
   1204 
   1205         @param method: the method to capture log.
   1206                        The method would be executed only when it is not None.
   1207                        This allows us to simply capture btmon log without
   1208                        executing any command.
   1209         @param logging_timespan: capture btmon log for logging_timespan seconds.
   1210 
   1211         """
   1212         self.bluetooth_le_facade.btmon_start()
   1213         self.advertising_msg = method() if method else ''
   1214         time.sleep(logging_timespan)
   1215         self.bluetooth_le_facade.btmon_stop()
   1216 
   1217 
   1218     def convert_to_adv_jiffies(self, adv_interval_ms):
   1219         """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms.
   1220 
   1221         @param adv_interval_ms: an advertising interval
   1222 
   1223         @returns: the equivalent jiffies
   1224 
   1225         """
   1226         return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT
   1227 
   1228 
   1229     def compute_duration(self, max_adv_interval_ms):
   1230         """Compute duration from max_adv_interval_ms.
   1231 
   1232         Advertising duration is calculated approximately as
   1233             duration = max_adv_interval_ms / 1000.0 * 1.1
   1234 
   1235         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1236 
   1237         @returns: duration in seconds.
   1238 
   1239         """
   1240         return max_adv_interval_ms / 1000.0 * 1.1
   1241 
   1242 
   1243     def compute_logging_timespan(self, max_adv_interval_ms):
   1244         """Compute the logging timespan from max_adv_interval_ms.
   1245 
   1246         The logging timespan is the time needed to record btmon log.
   1247 
   1248         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1249 
   1250         @returns: logging_timespan in seconds.
   1251 
   1252         """
   1253         duration = self.compute_duration(max_adv_interval_ms)
   1254         logging_timespan = max(self.count_advertisements * duration, 1)
   1255         return logging_timespan
   1256 
   1257 
   1258     @_test_retry_and_log(False)
   1259     def test_check_duration_and_intervals(self, min_adv_interval_ms,
   1260                                           max_adv_interval_ms,
   1261                                           number_advertisements):
   1262         """Verify that every advertisements are scheduled according to the
   1263         duration and intervals.
   1264 
   1265         An advertisement would be scheduled at the time span of
   1266              duration * number_advertisements
   1267 
   1268         @param min_adv_interval_ms: min advertising interval in milliseconds.
   1269         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1270         @param number_advertisements: the number of existing advertisements
   1271 
   1272         @returns: True if all advertisements are scheduled based on the
   1273                 duration and intervals.
   1274 
   1275         """
   1276 
   1277 
   1278         def within_tolerance(expected, actual, max_error=0.1):
   1279             """Determine if the percent error is within specified tolerance.
   1280 
   1281             @param expected: The expected value.
   1282             @param actual: The actual (measured) value.
   1283             @param max_error: The maximum percent error acceptable.
   1284 
   1285             @returns: True if the percent error is less than or equal to
   1286                       max_error.
   1287             """
   1288             return abs(expected - actual) / abs(expected) <= max_error
   1289 
   1290 
   1291         start_str = 'Set Advertising Intervals:'
   1292         search_strings = ['HCI Command: LE Set Advertising Data', 'Company']
   1293         search_str = '|'.join(search_strings)
   1294 
   1295         contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
   1296                                                       start_str=start_str)
   1297 
   1298         # Company string looks like
   1299         #   Company: not assigned (65283)
   1300         company_pattern = re.compile('Company:.*\((\d*)\)')
   1301 
   1302         # The string with timestamp looks like
   1303         #   < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236
   1304         set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)'
   1305         set_adv_time_pattern = re.compile(set_adv_time_str)
   1306 
   1307         adv_timestamps = {}
   1308         timestamp = None
   1309         manufacturer_id = None
   1310         for line in contents:
   1311             result = set_adv_time_pattern.search(line)
   1312             if result:
   1313                 timestamp = float(result.group(1))
   1314 
   1315             result = company_pattern.search(line)
   1316             if result:
   1317                 manufacturer_id = '0x%04x' % int(result.group(1))
   1318 
   1319             if timestamp and manufacturer_id:
   1320                 if manufacturer_id not in adv_timestamps:
   1321                     adv_timestamps[manufacturer_id] = []
   1322                 adv_timestamps[manufacturer_id].append(timestamp)
   1323                 timestamp = None
   1324                 manufacturer_id = None
   1325 
   1326         duration = self.compute_duration(max_adv_interval_ms)
   1327         expected_timespan = duration * number_advertisements
   1328 
   1329         check_duration = True
   1330         for manufacturer_id, values in adv_timestamps.iteritems():
   1331             logging.debug('manufacturer_id %s: %s', manufacturer_id, values)
   1332             timespans = [values[i] - values[i - 1]
   1333                          for i in xrange(1, len(values))]
   1334             errors = [timespans[i] for i in xrange(len(timespans))
   1335                       if not within_tolerance(expected_timespan, timespans[i])]
   1336             logging.debug('timespans: %s', timespans)
   1337             logging.debug('errors: %s', errors)
   1338             if bool(errors):
   1339                 check_duration = False
   1340 
   1341         # Verify that the advertising intervals are also correct.
   1342         min_adv_interval_ms_found, max_adv_interval_ms_found = (
   1343                 self._verify_advertising_intervals(min_adv_interval_ms,
   1344                                                    max_adv_interval_ms))
   1345 
   1346         self.results = {
   1347                 'check_duration': check_duration,
   1348                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1349                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1350         }
   1351         return all(self.results.values())
   1352 
   1353 
   1354     def _get_min_max_intervals_strings(self, min_adv_interval_ms,
   1355                                        max_adv_interval_ms):
   1356         """Get the min and max advertising intervals strings shown in btmon.
   1357 
   1358         Advertising intervals shown in the btmon log look like
   1359             Min advertising interval: 1280.000 msec (0x0800)
   1360             Max advertising interval: 1280.000 msec (0x0800)
   1361 
   1362         @param min_adv_interval_ms: min advertising interval in milliseconds.
   1363         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1364 
   1365         @returns: the min and max intervals strings.
   1366 
   1367         """
   1368         min_str = ('Min advertising interval: %.3f msec (0x%04x)' %
   1369                    (min_adv_interval_ms,
   1370                     min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
   1371         logging.debug('min_adv_interval_ms: %s', min_str)
   1372 
   1373         max_str = ('Max advertising interval: %.3f msec (0x%04x)' %
   1374                    (max_adv_interval_ms,
   1375                     max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
   1376         logging.debug('max_adv_interval_ms: %s', max_str)
   1377 
   1378         return (min_str, max_str)
   1379 
   1380 
   1381     def _verify_advertising_intervals(self, min_adv_interval_ms,
   1382                                       max_adv_interval_ms):
   1383         """Verify min and max advertising intervals.
   1384 
   1385         Advertising intervals look like
   1386             Min advertising interval: 1280.000 msec (0x0800)
   1387             Max advertising interval: 1280.000 msec (0x0800)
   1388 
   1389         @param min_adv_interval_ms: min advertising interval in milliseconds.
   1390         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1391 
   1392         @returns: a tuple of (True, True) if both min and max advertising
   1393             intervals could be found. Otherwise, the corresponding element
   1394             in the tuple if False.
   1395 
   1396         """
   1397         min_str, max_str = self._get_min_max_intervals_strings(
   1398                 min_adv_interval_ms, max_adv_interval_ms)
   1399 
   1400         min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str)
   1401         max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str)
   1402 
   1403         return min_adv_interval_ms_found, max_adv_interval_ms_found
   1404 
   1405 
   1406     @_test_retry_and_log(False)
   1407     def test_register_advertisement(self, advertisement_data, instance_id,
   1408                                     min_adv_interval_ms, max_adv_interval_ms):
   1409         """Verify that an advertisement is registered correctly.
   1410 
   1411         This test verifies the following data:
   1412         - advertisement added
   1413         - manufacturer data
   1414         - service UUIDs
   1415         - service data
   1416         - advertising intervals
   1417         - advertising enabled
   1418 
   1419         @param advertisement_data: the data of an advertisement to register.
   1420         @param instance_id: the instance id which starts at 1.
   1421         @param min_adv_interval_ms: min_adv_interval in milliseconds.
   1422         @param max_adv_interval_ms: max_adv_interval in milliseconds.
   1423 
   1424         @returns: True if the advertisement is registered correctly.
   1425                   False otherwise.
   1426 
   1427         """
   1428         # When registering a new advertisement, it is possible that another
   1429         # instance is advertising. It may need to wait for all other
   1430         # advertisements to complete advertising once.
   1431         self.count_advertisements += 1
   1432         logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
   1433         self._get_btmon_log(
   1434                 lambda: self.bluetooth_le_facade.register_advertisement(
   1435                         advertisement_data),
   1436                 logging_timespan=logging_timespan)
   1437 
   1438         # Verify that a new advertisement is added.
   1439         advertisement_added = (
   1440                 self.bluetooth_le_facade.btmon_find('Advertising Added') and
   1441                 self.bluetooth_le_facade.btmon_find('Instance: %d' %
   1442                                                     instance_id))
   1443 
   1444         # Verify that the manufacturer data could be found.
   1445         manufacturer_data = advertisement_data.get('ManufacturerData', '')
   1446         for manufacturer_id in manufacturer_data:
   1447             # The 'not assigned' text below means the manufacturer id
   1448             # is not actually assigned to any real manufacturer.
   1449             # For real 16-bit manufacturer UUIDs, refer to
   1450             #  https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members
   1451             manufacturer_data_found = self.bluetooth_le_facade.btmon_find(
   1452                     'Company: not assigned (%d)' % int(manufacturer_id, 16))
   1453 
   1454         # Verify that all service UUIDs could be found.
   1455         service_uuids_found = True
   1456         for uuid in advertisement_data.get('ServiceUUIDs', []):
   1457             # Service UUIDs looks like ['0x180D', '0x180F']
   1458             #   Heart Rate (0x180D)
   1459             #   Battery Service (0x180F)
   1460             # For actual 16-bit service UUIDs, refer to
   1461             #   https://www.bluetooth.com/specifications/gatt/services
   1462             if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid):
   1463                 service_uuids_found = False
   1464                 break
   1465 
   1466         # Verify service data.
   1467         service_data_found = True
   1468         for uuid, data in advertisement_data.get('ServiceData', {}).items():
   1469             # A service data looks like
   1470             #   Service Data (UUID 0x9999): 0001020304
   1471             # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04]
   1472             data_str = ''.join(map(lambda n: '%02x' % n, data))
   1473             if not self.bluetooth_le_facade.btmon_find(
   1474                     'Service Data (UUID 0x%s): %s' % (uuid, data_str)):
   1475                 service_data_found = False
   1476                 break
   1477 
   1478         # Verify that the advertising intervals are correct.
   1479         min_adv_interval_ms_found, max_adv_interval_ms_found = (
   1480                 self._verify_advertising_intervals(min_adv_interval_ms,
   1481                                                    max_adv_interval_ms))
   1482 
   1483         # Verify advertising is enabled.
   1484         advertising_enabled = self.bluetooth_le_facade.btmon_find(
   1485                 'Advertising: Enabled (0x01)')
   1486 
   1487         self.results = {
   1488                 'advertisement_added': advertisement_added,
   1489                 'manufacturer_data_found': manufacturer_data_found,
   1490                 'service_uuids_found': service_uuids_found,
   1491                 'service_data_found': service_data_found,
   1492                 'min_adv_interval_ms_found': min_adv_interval_ms_found,
   1493                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1494                 'advertising_enabled': advertising_enabled,
   1495         }
   1496         return all(self.results.values())
   1497 
   1498 
   1499     @_test_retry_and_log(False)
   1500     def test_fail_to_register_advertisement(self, advertisement_data,
   1501                                             min_adv_interval_ms,
   1502                                             max_adv_interval_ms):
   1503         """Verify that failure is incurred when max advertisements are reached.
   1504 
   1505         This test verifies that a registration failure is incurred when
   1506         max advertisements are reached. The error message looks like:
   1507 
   1508             org.bluez.Error.Failed: Maximum advertisements reached
   1509 
   1510         @param advertisement_data: the advertisement to register.
   1511         @param min_adv_interval_ms: min_adv_interval in milliseconds.
   1512         @param max_adv_interval_ms: max_adv_interval in milliseconds.
   1513 
   1514         @returns: True if the error message is received correctly.
   1515                   False otherwise.
   1516 
   1517         """
   1518         logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
   1519         self._get_btmon_log(
   1520                 lambda: self.bluetooth_le_facade.register_advertisement(
   1521                         advertisement_data),
   1522                 logging_timespan=logging_timespan)
   1523 
   1524         # Verify that it failed to register advertisement due to the fact
   1525         # that max advertisements are reached.
   1526         failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT
   1527                                     in self.advertising_msg)
   1528 
   1529         # Verify that no new advertisement is added.
   1530         advertisement_not_added = not self.bluetooth_le_facade.btmon_find(
   1531                 'Advertising Added:')
   1532 
   1533         # Verify that the advertising intervals are correct.
   1534         min_adv_interval_ms_found, max_adv_interval_ms_found = (
   1535                 self._verify_advertising_intervals(min_adv_interval_ms,
   1536                                                    max_adv_interval_ms))
   1537 
   1538         # Verify advertising remains enabled.
   1539         advertising_enabled = self.bluetooth_le_facade.btmon_find(
   1540                 'Advertising: Enabled (0x01)')
   1541 
   1542         self.results = {
   1543                 'failed_to_register_error': failed_to_register_error,
   1544                 'advertisement_not_added': advertisement_not_added,
   1545                 'min_adv_interval_ms_found': min_adv_interval_ms_found,
   1546                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1547                 'advertising_enabled': advertising_enabled,
   1548         }
   1549         return all(self.results.values())
   1550 
   1551 
   1552     @_test_retry_and_log(False)
   1553     def test_unregister_advertisement(self, advertisement_data, instance_id,
   1554                                       advertising_disabled):
   1555         """Verify that an advertisement is unregistered correctly.
   1556 
   1557         This test verifies the following data:
   1558         - advertisement removed
   1559         - advertising status: enabled if there are advertisements left;
   1560                               disabled otherwise.
   1561 
   1562         @param advertisement_data: the data of an advertisement to unregister.
   1563         @param instance_id: the instance id of the advertisement to remove.
   1564         @param advertising_disabled: is advertising disabled? This happens
   1565                 only when all advertisements are removed.
   1566 
   1567         @returns: True if the advertisement is unregistered correctly.
   1568                   False otherwise.
   1569 
   1570         """
   1571         self.count_advertisements -= 1
   1572         self._get_btmon_log(
   1573                 lambda: self.bluetooth_le_facade.unregister_advertisement(
   1574                         advertisement_data))
   1575 
   1576         # Verify that the advertisement is removed.
   1577         advertisement_removed = (
   1578                 self.bluetooth_le_facade.btmon_find('Advertising Removed') and
   1579                 self.bluetooth_le_facade.btmon_find('Instance: %d' %
   1580                                                     instance_id))
   1581 
   1582         # If advertising_disabled is True, there should be no log like
   1583         #       'Advertising: Enabled (0x01)'
   1584         # If advertising_disabled is False, there should be log like
   1585         #       'Advertising: Enabled (0x01)'
   1586 
   1587         # Only need to check advertising status when the last advertisement
   1588         # is removed. For any other advertisements prior to the last one,
   1589         # we may or may not observe 'Advertising: Enabled (0x01)' message.
   1590         # Hence, the test would become flaky if we insist to see that message.
   1591         # A possible workaround is to sleep for a while and then check the
   1592         # message. The drawback is that we may need to wait up to 10 seconds
   1593         # if the advertising duration and intervals are long.
   1594         # In a test case, we always run test_check_duration_and_intervals()
   1595         # to check if advertising duration and intervals are correct after
   1596         # un-registering one or all advertisements, it is safe to do so.
   1597         advertising_enabled_found = self.bluetooth_le_facade.btmon_find(
   1598                 'Advertising: Enabled (0x01)')
   1599         advertising_disabled_found = self.bluetooth_le_facade.btmon_find(
   1600                 'Advertising: Disabled (0x00)')
   1601         advertising_status_correct = not advertising_disabled or (
   1602                 advertising_disabled_found and not advertising_enabled_found)
   1603 
   1604         self.results = {
   1605                 'advertisement_removed': advertisement_removed,
   1606                 'advertising_status_correct': advertising_status_correct,
   1607         }
   1608         return all(self.results.values())
   1609 
   1610 
   1611     @_test_retry_and_log(False)
   1612     def test_set_advertising_intervals(self, min_adv_interval_ms,
   1613                                        max_adv_interval_ms):
   1614         """Verify that new advertising intervals are set correctly.
   1615 
   1616         Note that setting advertising intervals does not enable/disable
   1617         advertising. Hence, there is no need to check the advertising
   1618         status.
   1619 
   1620         @param min_adv_interval_ms: the min advertising interval in ms.
   1621         @param max_adv_interval_ms: the max advertising interval in ms.
   1622 
   1623         @returns: True if the new advertising intervals are correct.
   1624                   False otherwise.
   1625 
   1626         """
   1627         self._get_btmon_log(
   1628                 lambda: self.bluetooth_le_facade.set_advertising_intervals(
   1629                         min_adv_interval_ms, max_adv_interval_ms))
   1630 
   1631         # Verify the new advertising intervals.
   1632         # With intervals of 200 ms and 200 ms, the log looks like
   1633         #   bluetoothd: Set Advertising Intervals: 0x0140, 0x0140
   1634         txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x'
   1635         adv_intervals_found = self.bluetooth_le_facade.btmon_find(
   1636                 txt % (self.convert_to_adv_jiffies(min_adv_interval_ms),
   1637                        self.convert_to_adv_jiffies(max_adv_interval_ms)))
   1638 
   1639         self.results = {'adv_intervals_found': adv_intervals_found}
   1640         return all(self.results.values())
   1641 
   1642 
   1643     @_test_retry_and_log(False)
   1644     def test_fail_to_set_advertising_intervals(
   1645             self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms,
   1646             orig_min_adv_interval_ms, orig_max_adv_interval_ms):
   1647         """Verify that setting invalid advertising intervals results in error.
   1648 
   1649         If invalid min/max advertising intervals are given, it would incur
   1650         the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'.
   1651         Note that valid advertising intervals fall between 20 ms and 10,240 ms.
   1652 
   1653         @param invalid_min_adv_interval_ms: the invalid min advertising interval
   1654                 in ms.
   1655         @param invalid_max_adv_interval_ms: the invalid max advertising interval
   1656                 in ms.
   1657         @param orig_min_adv_interval_ms: the original min advertising interval
   1658                 in ms.
   1659         @param orig_max_adv_interval_ms: the original max advertising interval
   1660                 in ms.
   1661 
   1662         @returns: True if it fails to set invalid advertising intervals.
   1663                   False otherwise.
   1664 
   1665         """
   1666         self._get_btmon_log(
   1667                 lambda: self.bluetooth_le_facade.set_advertising_intervals(
   1668                         invalid_min_adv_interval_ms,
   1669                         invalid_max_adv_interval_ms))
   1670 
   1671         # Verify that the invalid error is observed in the dbus error callback
   1672         # message.
   1673         invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in
   1674                                    self.advertising_msg)
   1675 
   1676         # Verify that the min/max advertising intervals remain the same
   1677         # after setting the invalid advertising intervals.
   1678         #
   1679         # In btmon log, we would see the following message first.
   1680         #    bluetoothd: Set Advertising Intervals: 0x0010, 0x0010
   1681         # And then, we should check if "Min advertising interval" and
   1682         # "Max advertising interval" remain the same.
   1683         start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % (
   1684                 self.convert_to_adv_jiffies(invalid_min_adv_interval_ms),
   1685                 self.convert_to_adv_jiffies(invalid_max_adv_interval_ms))
   1686 
   1687         search_strings = ['Min advertising interval:',
   1688                           'Max advertising interval:']
   1689         search_str = '|'.join(search_strings)
   1690 
   1691         contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
   1692                                                       start_str=start_str)
   1693 
   1694         # The min/max advertising intervals of all advertisements should remain
   1695         # the same as the previous valid ones.
   1696         min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec'
   1697         min_max_pattern = re.compile(min_max_str)
   1698         correct_orig_min_adv_interval = True
   1699         correct_orig_max_adv_interval = True
   1700         for line in contents:
   1701             result = min_max_pattern.search(line)
   1702             if result:
   1703                 interval = float(result.group(1))
   1704                 if 'Min' in line and interval != orig_min_adv_interval_ms:
   1705                     correct_orig_min_adv_interval = False
   1706                 elif 'Max' in line and interval != orig_max_adv_interval_ms:
   1707                     correct_orig_max_adv_interval = False
   1708 
   1709         self.results = {
   1710                 'invalid_intervals_error': invalid_intervals_error,
   1711                 'correct_orig_min_adv_interval': correct_orig_min_adv_interval,
   1712                 'correct_orig_max_adv_interval': correct_orig_max_adv_interval}
   1713 
   1714         return all(self.results.values())
   1715 
   1716 
   1717     @_test_retry_and_log(False)
   1718     def test_check_advertising_intervals(self, min_adv_interval_ms,
   1719                                          max_adv_interval_ms):
   1720         """Verify that the advertising intervals are as expected.
   1721 
   1722         @param min_adv_interval_ms: the min advertising interval in ms.
   1723         @param max_adv_interval_ms: the max advertising interval in ms.
   1724 
   1725         @returns: True if the advertising intervals are correct.
   1726                   False otherwise.
   1727 
   1728         """
   1729         self._get_btmon_log(None)
   1730 
   1731         # Verify that the advertising intervals are correct.
   1732         min_adv_interval_ms_found, max_adv_interval_ms_found = (
   1733                 self._verify_advertising_intervals(min_adv_interval_ms,
   1734                                                    max_adv_interval_ms))
   1735 
   1736         self.results = {
   1737                 'min_adv_interval_ms_found': min_adv_interval_ms_found,
   1738                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1739         }
   1740         return all(self.results.values())
   1741 
   1742 
   1743     @_test_retry_and_log(False)
   1744     def test_reset_advertising(self, instance_ids=[]):
   1745         """Verify that advertising is reset correctly.
   1746 
   1747         Note that reset advertising would set advertising intervals to
   1748         the default values. However, we would not be able to observe
   1749         the values change until new advertisements are registered.
   1750         Therefore, it is required that a test_register_advertisement()
   1751         test is conducted after this test.
   1752 
   1753         If instance_ids is [], all advertisements would still be removed
   1754         if there are any. However, no need to check 'Advertising Removed'
   1755         in btmon log since we may or may not be able to observe the message.
   1756         This feature is needed if this test is invoked as the first one in
   1757         a test case to reset advertising. In this situation, this test does
   1758         not know how many advertisements exist.
   1759 
   1760         @param instance_ids: the list of instance IDs that should be removed.
   1761 
   1762         @returns: True if advertising is reset correctly.
   1763                   False otherwise.
   1764 
   1765         """
   1766         self.count_advertisements = 0
   1767         self._get_btmon_log(
   1768                 lambda: self.bluetooth_le_facade.reset_advertising())
   1769 
   1770         # Verify that every advertisement is removed. When an advertisement
   1771         # with instance id 1 is removed, the log looks like
   1772         #   Advertising Removed
   1773         #       instance: 1
   1774         if len(instance_ids) > 0:
   1775             advertisement_removed = self.bluetooth_le_facade.btmon_find(
   1776                     'Advertising Removed')
   1777             if advertisement_removed:
   1778                 for instance_id in instance_ids:
   1779                     txt = 'Instance: %d' % instance_id
   1780                     if not self.bluetooth_le_facade.btmon_find(txt):
   1781                         advertisement_removed = False
   1782                         break
   1783         else:
   1784             advertisement_removed = True
   1785 
   1786         if not advertisement_removed:
   1787             logging.error('Failed to remove advertisement')
   1788 
   1789         # Verify that "Reset Advertising Intervals" command has been issued.
   1790         reset_advertising_intervals = self.bluetooth_le_facade.btmon_find(
   1791                 'bluetoothd: Reset Advertising Intervals')
   1792 
   1793         # Verify the advertising is disabled.
   1794         advertising_disabled_observied = self.bluetooth_le_facade.btmon_find(
   1795                 'Advertising: Disabled')
   1796         # If there are no existing advertisements, we may not observe
   1797         # 'Advertising: Disabled'.
   1798         advertising_disabled = (instance_ids == [] or
   1799                                 advertising_disabled_observied)
   1800 
   1801         self.results = {
   1802                 'advertisement_removed': advertisement_removed,
   1803                 'reset_advertising_intervals': reset_advertising_intervals,
   1804                 'advertising_disabled': advertising_disabled,
   1805         }
   1806         return all(self.results.values())
   1807 
   1808 
   1809     # -------------------------------------------------------------------
   1810     # Bluetooth mouse related tests
   1811     # -------------------------------------------------------------------
   1812 
   1813 
   1814     def _record_input_events(self, device, gesture):
   1815         """Record the input events.
   1816 
   1817         @param device: the bluetooth HID device.
   1818         @param gesture: the gesture method to perform.
   1819 
   1820         @returns: the input events received on the DUT.
   1821 
   1822         """
   1823         self.input_facade.initialize_input_recorder(device.name)
   1824         self.input_facade.start_input_recorder()
   1825         time.sleep(self.HID_REPORT_SLEEP_SECS)
   1826         gesture()
   1827         time.sleep(self.HID_REPORT_SLEEP_SECS)
   1828         self.input_facade.stop_input_recorder()
   1829         time.sleep(self.HID_REPORT_SLEEP_SECS)
   1830         event_values = self.input_facade.get_input_events()
   1831         events = [Event(*ev) for ev in event_values]
   1832         return events
   1833 
   1834 
   1835     def _test_mouse_click(self, device, button):
   1836         """Test that the mouse click events could be received correctly.
   1837 
   1838         @param device: the meta device containing a bluetooth HID device
   1839         @param button: which button to test, 'LEFT' or 'RIGHT'
   1840 
   1841         @returns: True if the report received by the host matches the
   1842                   expected one. False otherwise.
   1843 
   1844         """
   1845         if button == 'LEFT':
   1846             gesture = device.LeftClick
   1847         elif button == 'RIGHT':
   1848             gesture = device.RightClick
   1849         else:
   1850             raise error.TestError('Button (%s) is not valid.' % button)
   1851 
   1852         actual_events = self._record_input_events(device, gesture)
   1853 
   1854         linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT}
   1855         expected_events = [
   1856                 # Button down
   1857                 recorder.MSC_SCAN_BTN_EVENT[button],
   1858                 Event(EV_KEY, linux_input_button[button], 1),
   1859                 recorder.SYN_EVENT,
   1860                 # Button up
   1861                 recorder.MSC_SCAN_BTN_EVENT[button],
   1862                 Event(EV_KEY, linux_input_button[button], 0),
   1863                 recorder.SYN_EVENT]
   1864 
   1865         self.results = {
   1866                 'actual_events': map(str, actual_events),
   1867                 'expected_events': map(str, expected_events)}
   1868         return actual_events == expected_events
   1869 
   1870 
   1871     @_test_retry_and_log
   1872     def test_mouse_left_click(self, device):
   1873         """Test that the mouse left click events could be received correctly.
   1874 
   1875         @param device: the meta device containing a bluetooth HID device
   1876 
   1877         @returns: True if the report received by the host matches the
   1878                   expected one. False otherwise.
   1879 
   1880         """
   1881         return self._test_mouse_click(device, 'LEFT')
   1882 
   1883 
   1884     @_test_retry_and_log
   1885     def test_mouse_right_click(self, device):
   1886         """Test that the mouse right click events could be received correctly.
   1887 
   1888         @param device: the meta device containing a bluetooth HID device
   1889 
   1890         @returns: True if the report received by the host matches the
   1891                   expected one. False otherwise.
   1892 
   1893         """
   1894         return self._test_mouse_click(device, 'RIGHT')
   1895 
   1896 
   1897     def _test_mouse_move(self, device, delta_x=0, delta_y=0):
   1898         """Test that the mouse move events could be received correctly.
   1899 
   1900         @param device: the meta device containing a bluetooth HID device
   1901         @param delta_x: the distance to move cursor in x axis
   1902         @param delta_y: the distance to move cursor in y axis
   1903 
   1904         @returns: True if the report received by the host matches the
   1905                   expected one. False otherwise.
   1906 
   1907         """
   1908         gesture = lambda: device.Move(delta_x, delta_y)
   1909         actual_events = self._record_input_events(device, gesture)
   1910 
   1911         events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else []
   1912         events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else []
   1913         expected_events = events_x + events_y + [recorder.SYN_EVENT]
   1914 
   1915         self.results = {
   1916                 'actual_events': map(str, actual_events),
   1917                 'expected_events': map(str, expected_events)}
   1918         return actual_events == expected_events
   1919 
   1920 
   1921     @_test_retry_and_log
   1922     def test_mouse_move_in_x(self, device, delta_x):
   1923         """Test that the mouse move events in x could be received correctly.
   1924 
   1925         @param device: the meta device containing a bluetooth HID device
   1926         @param delta_x: the distance to move cursor in x axis
   1927 
   1928         @returns: True if the report received by the host matches the
   1929                   expected one. False otherwise.
   1930 
   1931         """
   1932         return self._test_mouse_move(device, delta_x=delta_x)
   1933 
   1934 
   1935     @_test_retry_and_log
   1936     def test_mouse_move_in_y(self, device, delta_y):
   1937         """Test that the mouse move events in y could be received correctly.
   1938 
   1939         @param device: the meta device containing a bluetooth HID device
   1940         @param delta_y: the distance to move cursor in y axis
   1941 
   1942         @returns: True if the report received by the host matches the
   1943                   expected one. False otherwise.
   1944 
   1945         """
   1946         return self._test_mouse_move(device, delta_y=delta_y)
   1947 
   1948 
   1949     @_test_retry_and_log
   1950     def test_mouse_move_in_xy(self, device, delta_x, delta_y):
   1951         """Test that the mouse move events could be received correctly.
   1952 
   1953         @param device: the meta device containing a bluetooth HID device
   1954         @param delta_x: the distance to move cursor in x axis
   1955         @param delta_y: the distance to move cursor in y axis
   1956 
   1957         @returns: True if the report received by the host matches the
   1958                   expected one. False otherwise.
   1959 
   1960         """
   1961         return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y)
   1962 
   1963 
   1964     def _test_mouse_scroll(self, device, units):
   1965         """Test that the mouse wheel events could be received correctly.
   1966 
   1967         @param device: the meta device containing a bluetooth HID device
   1968         @param units: the units to scroll in y axis
   1969 
   1970         @returns: True if the report received by the host matches the
   1971                   expected one. False otherwise.
   1972 
   1973         """
   1974         gesture = lambda: device.Scroll(units)
   1975         actual_events = self._record_input_events(device, gesture)
   1976         expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT]
   1977         self.results = {
   1978                 'actual_events': map(str, actual_events),
   1979                 'expected_events': map(str, expected_events)}
   1980         return actual_events == expected_events
   1981 
   1982 
   1983     @_test_retry_and_log
   1984     def test_mouse_scroll_down(self, device, delta_y):
   1985         """Test that the mouse wheel events could be received correctly.
   1986 
   1987         @param device: the meta device containing a bluetooth HID device
   1988         @param delta_y: the units to scroll down in y axis;
   1989                         should be a postive value
   1990 
   1991         @returns: True if the report received by the host matches the
   1992                   expected one. False otherwise.
   1993 
   1994         """
   1995         if delta_y > 0:
   1996             return self._test_mouse_scroll(device, delta_y)
   1997         else:
   1998             raise error.TestError('delta_y (%d) should be a positive value',
   1999                                   delta_y)
   2000 
   2001 
   2002     @_test_retry_and_log
   2003     def test_mouse_scroll_up(self, device, delta_y):
   2004         """Test that the mouse wheel events could be received correctly.
   2005 
   2006         @param device: the meta device containing a bluetooth HID device
   2007         @param delta_y: the units to scroll up in y axis;
   2008                         should be a postive value
   2009 
   2010         @returns: True if the report received by the host matches the
   2011                   expected one. False otherwise.
   2012 
   2013         """
   2014         if delta_y > 0:
   2015             return self._test_mouse_scroll(device, -delta_y)
   2016         else:
   2017             raise error.TestError('delta_y (%d) should be a positive value',
   2018                                   delta_y)
   2019 
   2020 
   2021     @_test_retry_and_log
   2022     def test_mouse_click_and_drag(self, device, delta_x, delta_y):
   2023         """Test that the mouse click-and-drag events could be received
   2024         correctly.
   2025 
   2026         @param device: the meta device containing a bluetooth HID device
   2027         @param delta_x: the distance to drag in x axis
   2028         @param delta_y: the distance to drag in y axis
   2029 
   2030         @returns: True if the report received by the host matches the
   2031                   expected one. False otherwise.
   2032 
   2033         """
   2034         gesture = lambda: device.ClickAndDrag(delta_x, delta_y)
   2035         actual_events = self._record_input_events(device, gesture)
   2036 
   2037         button = 'LEFT'
   2038         expected_events = (
   2039                 [# Button down
   2040                  recorder.MSC_SCAN_BTN_EVENT[button],
   2041                  Event(EV_KEY, BTN_LEFT, 1),
   2042                  recorder.SYN_EVENT] +
   2043                 # cursor movement in x and y
   2044                 ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) +
   2045                 ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) +
   2046                 [recorder.SYN_EVENT] +
   2047                 # Button up
   2048                 [recorder.MSC_SCAN_BTN_EVENT[button],
   2049                  Event(EV_KEY, BTN_LEFT, 0),
   2050                  recorder.SYN_EVENT])
   2051 
   2052         self.results = {
   2053                 'actual_events': map(str, actual_events),
   2054                 'expected_events': map(str, expected_events)}
   2055         return actual_events == expected_events
   2056 
   2057 
   2058     # -------------------------------------------------------------------
   2059     # Autotest methods
   2060     # -------------------------------------------------------------------
   2061 
   2062 
   2063     def initialize(self):
   2064         """Initialize bluetooth adapter tests."""
   2065         # Run through every tests and collect failed tests in self.fails.
   2066         self.fails = []
   2067 
   2068         # If a test depends on multiple conditions, write the results of
   2069         # the conditions in self.results so that it is easy to know
   2070         # what conditions failed by looking at the log.
   2071         self.results = None
   2072 
   2073         # Some tests may instantiate a peripheral device for testing.
   2074         self.devices = dict()
   2075         for device_type in SUPPORTED_DEVICE_TYPES:
   2076             self.devices[device_type] = None
   2077 
   2078         # The count of registered advertisements.
   2079         self.count_advertisements = 0
   2080 
   2081 
   2082     def check_chameleon(self):
   2083         """Check the existence of chameleon_host.
   2084 
   2085         The chameleon_host is specified in --args as follows
   2086 
   2087         (cr) $ test_that --args "chameleon_host=$CHAMELEON_IP" "$DUT_IP" <test>
   2088 
   2089         """
   2090         logging.debug('labels: %s', self.host.get_labels())
   2091         if self.host.chameleon is None:
   2092             raise error.TestError('Have to specify chameleon_host IP.')
   2093 
   2094 
   2095     def run_once(self, *args, **kwargs):
   2096         """This method should be implemented by children classes.
   2097 
   2098         Typically, the run_once() method would look like:
   2099 
   2100         factory = remote_facade_factory.RemoteFacadeFactory(host)
   2101         self.bluetooth_facade = factory.create_bluetooth_hid_facade()
   2102 
   2103         self.test_bluetoothd_running()
   2104         # ...
   2105         # invoke more self.test_xxx() tests.
   2106         # ...
   2107 
   2108         if self.fails:
   2109             raise error.TestFail(self.fails)
   2110 
   2111         """
   2112         raise NotImplementedError
   2113 
   2114 
   2115     def cleanup(self):
   2116         """Clean up bluetooth adapter tests."""
   2117         # Close the device properly if a device is instantiated.
   2118         # Note: do not write something like the following statements
   2119         #           if self.devices[device_type]:
   2120         #       or
   2121         #           if bool(self.devices[device_type]):
   2122         #       Otherwise, it would try to invoke bluetooth_mouse.__nonzero__()
   2123         #       which just does not exist.
   2124         for device_type in SUPPORTED_DEVICE_TYPES:
   2125             if self.devices[device_type] is not None:
   2126                 self.devices[device_type].Close()
   2127