Home | History | Annotate | Download | only in bluetooth
      1 # Copyright 2016 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Server side bluetooth adapter subtests."""
      6 
      7 import inspect
      8 import functools
      9 import logging
     10 import re
     11 import time
     12 
     13 from autotest_lib.client.bin import utils
     14 from autotest_lib.client.bin.input import input_event_recorder as recorder
     15 from autotest_lib.client.common_lib import error
     16 from autotest_lib.server import test
     17 from autotest_lib.client.bin.input.linux_input import (
     18         BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL)
     19 
     20 
     21 REBOOTING_CHAMELEON = False
     22 
     23 Event = recorder.Event
     24 
     25 
     26 # Delay binding the methods since host is only available at run time.
     27 SUPPORTED_DEVICE_TYPES = {
     28     'MOUSE': lambda host: host.chameleon.get_bluetooth_hid_mouse,
     29     'LE_MOUSE': lambda host: host.chameleon.get_bluetooth_hog_mouse
     30 }
     31 
     32 
     33 def method_name():
     34     """Get the method name of a class.
     35 
     36     This function is supposed to be invoked inside a class and will
     37     return current method name who invokes this function.
     38 
     39     @returns: the string of the method name inside the class.
     40     """
     41     return inspect.getouterframes(inspect.currentframe())[1][3]
     42 
     43 
     44 def _run_method(method, method_name, *args, **kwargs):
     45     """Run a target method and capture exceptions if any.
     46 
     47     This is just a wrapper of the target method so that we do not need to
     48     write the exception capturing structure repeatedly. The method could
     49     be either a device method or a facade method.
     50 
     51     @param method: the method to run
     52     @param method_name: the name of the method
     53 
     54     @returns: the return value of target method() if successful.
     55               False otherwise.
     56 
     57     """
     58     result = False
     59     try:
     60         result = method(*args, **kwargs)
     61     except Exception as e:
     62         logging.error('%s: %s', method_name, e)
     63     except:
     64         logging.error('%s: unexpected error', method_name)
     65     return result
     66 
     67 
     68 def get_bluetooth_emulated_device(host, device_type):
     69     """Get the bluetooth emulated device object.
     70 
     71     @param host: the DUT, usually a chromebook
     72     @param device_type : the bluetooth HID device type, e.g., 'MOUSE'
     73 
     74     @returns: the bluetooth device object
     75 
     76     """
     77 
     78     def _retry_device_method(method_name, legal_falsy_values=[]):
     79         """retry the emulated device's method.
     80 
     81         The method is invoked as device.xxxx() e.g., device.GetAdvertisedName().
     82 
     83         Note that the method name string is provided to get the device's actual
     84         method object at run time through getattr(). The rebinding is required
     85         because a new device may have been created previously or during the
     86         execution of fix_serial_device().
     87 
     88         Given a device's method, it is not feasible to get the method name
     89         through __name__ attribute. This limitation is due to the fact that
     90         the device is a dotted object of an XML RPC server proxy.
     91         As an example, with the method name 'GetAdvertisedName', we could
     92         derive the correspoinding method device.GetAdvertisedName. On the
     93         contrary, given device.GetAdvertisedName, it is not feasible to get the
     94         method name by device.GetAdvertisedName.__name__
     95 
     96         Also note that if the device method fails at the first time, we would
     97         try to fix the problem by re-creating the serial device and see if the
     98         problem is fixed. If not, we will reboot the chameleon board and see
     99         if the problem is fixed. If yes, execute the target method the second
    100         time.
    101 
    102         The default values exist for uses of this function before the options
    103         were added, ideally we should change zero_ok to False.
    104 
    105         @param method_name: the string of the method name.
    106         @param legal_falsy_values: Values that are falsy but might be OK.
    107 
    108         @returns: the result returned by the device's method.
    109 
    110         """
    111         result = _run_method(getattr(device, method_name), method_name)
    112         if _is_successful(result, legal_falsy_values):
    113             return result
    114 
    115         logging.error('%s failed the 1st time. Try to fix the serial device.',
    116                       method_name)
    117 
    118         # Try to fix the serial device if possible.
    119         if not fix_serial_device(host, device):
    120             return False
    121 
    122         logging.info('%s: retry the 2nd time.', method_name)
    123         return _run_method(getattr(device, method_name), method_name)
    124 
    125 
    126     if device_type not in SUPPORTED_DEVICE_TYPES:
    127         raise error.TestError('The device type is not supported: %s',
    128                               device_type)
    129 
    130     # Get the bluetooth device object and query some important properties.
    131     device = SUPPORTED_DEVICE_TYPES[device_type](host)()
    132 
    133     # Get some properties of the kit
    134     # NOTE: Strings updated here must be kept in sync with Chameleon.
    135     device._capabilities = _retry_device_method('GetCapabilities')
    136     device._transports = device._capabilities["CAP_TRANSPORTS"]
    137     device._is_le_only = ("TRANSPORT_LE" in device._transports and
    138                           len(device._transports) == 1)
    139     device._has_pin = device._capabilities["CAP_HAS_PIN"]
    140     device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"]
    141 
    142     _retry_device_method('Init')
    143     logging.info('device type: %s', device_type)
    144 
    145     device.name = _retry_device_method('GetAdvertisedName')
    146     logging.info('device name: %s', device.name)
    147 
    148     device.address = _retry_device_method('GetLocalBluetoothAddress')
    149     logging.info('address: %s', device.address)
    150 
    151     pin_falsy_values = [] if device._has_pin else [None]
    152     device.pin = _retry_device_method('GetPinCode', pin_falsy_values)
    153     logging.info('pin: %s', device.pin)
    154 
    155     class_falsy_values = [None] if device._is_le_only else [0]
    156 
    157     # Class of service is None for LE-only devices. Don't fail or parse it.
    158     device.class_of_service = _retry_device_method('GetClassOfService',
    159                                                    class_falsy_values)
    160     if device._is_le_only:
    161       parsed_class_of_service = device.class_of_service
    162     else:
    163       parsed_class_of_service = "0x%04X" % device.class_of_service
    164     logging.info('class of service: %s', parsed_class_of_service)
    165 
    166     device.class_of_device = _retry_device_method('GetClassOfDevice',
    167                                                   class_falsy_values)
    168     # Class of device is None for LE-only devices. Don't fail or parse it.
    169     if device._is_le_only:
    170       parsed_class_of_device = device.class_of_device
    171     else:
    172       parsed_class_of_device = "0x%04X" % device.class_of_device
    173     logging.info('class of device: %s', parsed_class_of_device)
    174 
    175     device.device_type = _retry_device_method('GetHIDDeviceType')
    176     logging.info('device type: %s', device.device_type)
    177 
    178     device.authenticaiton_mode = _retry_device_method('GetAuthenticationMode')
    179     logging.info('authentication mode: %s', device.authenticaiton_mode)
    180 
    181     device.port = _retry_device_method('GetPort')
    182     logging.info('serial port: %s\n', device.port)
    183 
    184     return device
    185 
    186 
    187 def recreate_serial_device(device):
    188     """Create and connect to a new serial device.
    189 
    190     @param device: the bluetooth HID device
    191 
    192     @returns: True if the serial device is re-created successfully.
    193 
    194     """
    195     logging.info('Remove the old serial device and create a new one.')
    196     if device is not None:
    197         try:
    198             device.Close()
    199         except:
    200             logging.error('failed to close the serial device.')
    201             return False
    202     try:
    203         device.CreateSerialDevice()
    204         return True
    205     except:
    206         logging.error('failed to invoke CreateSerialDevice.')
    207         return False
    208 
    209 
    210 def _reboot_chameleon(host, device):
    211     REBOOT_SLEEP_SECS = 40
    212 
    213     if not REBOOTING_CHAMELEON:
    214         logging.info('Skip rebooting chameleon.')
    215         return False
    216 
    217     # Close the bluetooth peripheral device and reboot the chameleon board.
    218     device.Close()
    219     logging.info('rebooting chameleon...')
    220     host.chameleon.reboot()
    221 
    222     # Every chameleon reboot would take a bit more than REBOOT_SLEEP_SECS.
    223     # Sleep REBOOT_SLEEP_SECS and then begin probing the chameleon board.
    224     time.sleep(REBOOT_SLEEP_SECS)
    225 
    226     # Check if the serial device could initialize, connect, and
    227     # enter command mode correctly.
    228     logging.info('Checking device status...')
    229     if not _run_method(device.Init, 'Init'):
    230         logging.info('device.Init: failed after reboot')
    231         return False
    232     if not device.CheckSerialConnection():
    233         logging.info('device.CheckSerialConnection: failed after reboot')
    234         return False
    235     if not _run_method(device.EnterCommandMode, 'EnterCommandMode'):
    236         logging.info('device.EnterCommandMode: failed after reboot')
    237         return False
    238     logging.info('The device is created successfully after reboot.')
    239     return True
    240 
    241 
    242 def _is_successful(result, legal_falsy_values=[]):
    243     """Is the method result considered successful?
    244 
    245     Some method results, for example that of class_of_service, may be 0 which is
    246     considered a valid result. Occassionally, None is acceptable.
    247 
    248     The default values exist for uses of this function before the options were
    249     added, ideally we should change zero_ok to False.
    250 
    251     @param result: a method result
    252     @param legal_falsy_values: Values that are falsy but might be OK.
    253 
    254     @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if
    255               result is None and none_ok.
    256     """
    257     truthiness_of_result = bool(result)
    258     return truthiness_of_result or result in legal_falsy_values
    259 
    260 
    261 def fix_serial_device(host, device):
    262     """Fix the serial device.
    263 
    264     This function tries to fix the serial device by
    265     (1) re-creating a serial device, or
    266     (2) rebooting the chameleon board.
    267 
    268     @param host: the DUT, usually a chromebook
    269     @param device: the bluetooth HID device
    270 
    271     @returns: True if the serial device is fixed. False otherwise.
    272 
    273     """
    274     # Check the serial connection. Fix it if needed.
    275     if device.CheckSerialConnection():
    276         # The USB serial connection still exists.
    277         # Re-connection suffices to solve the problem. The problem
    278         # is usually caused by serial port change. For example,
    279         # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1.
    280         logging.info('retry: creating a new serial device...')
    281         if not recreate_serial_device(device):
    282             return False
    283 
    284     # Check if recreate_serial_device() above fixes the problem.
    285     # If not, reboot the chameleon board including creation of a new
    286     # bluetooth device. Check if reboot fixes the problem.
    287     # If not, return False.
    288     result = _run_method(device.EnterCommandMode, 'EnterCommandMode')
    289     return _is_successful(result) or _reboot_chameleon(host, device)
    290 
    291 
    292 def retry(test_method, instance, *args, **kwargs):
    293     """Execute the target facade test_method(). Retry if failing the first time.
    294 
    295     A test_method is something like self.test_xxxx() in BluetoothAdapterTests,
    296     e.g., BluetoothAdapterTests.test_bluetoothd_running().
    297 
    298     @param test_method: the test method to retry
    299 
    300     @returns: True if the return value of test_method() is successful.
    301               False otherwise.
    302 
    303     """
    304     if _is_successful(_run_method(test_method, test_method.__name__,
    305                                   instance, *args, **kwargs)):
    306         return True
    307 
    308     # Try to fix the serial device if applicable.
    309     logging.error('%s failed at the 1st time.', test_method.__name__)
    310 
    311     # If this test does not use any attached serial device, just re-run
    312     # the test.
    313     logging.info('%s: retry the 2nd time.', test_method.__name__)
    314     time.sleep(1)
    315     if not hasattr(instance, 'device_type'):
    316         return _is_successful(_run_method(test_method, test_method.__name__,
    317                                           instance, *args, **kwargs))
    318 
    319     host = instance.host
    320     device = instance.devices[instance.device_type]
    321     if not fix_serial_device(host, device):
    322         return False
    323 
    324     logging.info('%s: retry the 2nd time.', test_method.__name__)
    325     return _is_successful(_run_method(test_method, test_method.__name__,
    326                                       instance, *args, **kwargs))
    327 
    328 
    329 def _test_retry_and_log(test_method_or_retry_flag):
    330     """A decorator that logs test results, collects error messages, and retries
    331        on request.
    332 
    333     @param test_method_or_retry_flag: either the test_method or a retry_flag.
    334         There are some possibilities of this argument:
    335         1. the test_method to conduct and retry: should retry the test_method.
    336             This occurs with
    337             @_test_retry_and_log
    338         2. the retry flag is True. Should retry the test_method.
    339             This occurs with
    340             @_test_retry_and_log(True)
    341         3. the retry flag is False. Do not retry the test_method.
    342             This occurs with
    343             @_test_retry_and_log(False)
    344 
    345     @returns: a wrapper of the test_method with test log. The retry mechanism
    346         would depend on the retry flag.
    347 
    348     """
    349 
    350     def decorator(test_method):
    351         """A decorator wrapper of the decorated test_method.
    352 
    353         @param test_method: the test method being decorated.
    354 
    355         @returns the wrapper of the test method.
    356 
    357         """
    358         @functools.wraps(test_method)
    359         def wrapper(instance, *args, **kwargs):
    360             """A wrapper of the decorated method.
    361 
    362             @param instance: an BluetoothAdapterTests instance
    363 
    364             @returns the result of the test method
    365 
    366             """
    367             instance.results = None
    368             if callable(test_method_or_retry_flag) or test_method_or_retry_flag:
    369                 test_result = retry(test_method, instance, *args, **kwargs)
    370             else:
    371                 test_result = test_method(instance, *args, **kwargs)
    372 
    373             if test_result:
    374                 logging.info('[*** passed: %s]', test_method.__name__)
    375             else:
    376                 fail_msg = '[--- failed: %s (%s)]' % (test_method.__name__,
    377                                                       str(instance.results))
    378                 logging.error(fail_msg)
    379                 instance.fails.append(fail_msg)
    380             return test_result
    381         return wrapper
    382 
    383     if callable(test_method_or_retry_flag):
    384         # If the decorator function comes with no argument like
    385         # @_test_retry_and_log
    386         return decorator(test_method_or_retry_flag)
    387     else:
    388         # If the decorator function comes with an argument like
    389         # @_test_retry_and_log(False)
    390         return decorator
    391 
    392 
    393 def test_case_log(method):
    394     """A decorator for test case methods.
    395 
    396     The main purpose of this decorator is to display the test case name
    397     in the test log which looks like
    398 
    399         <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...>
    400 
    401     @param method: the test case method to decorate.
    402 
    403     @returns: a wrapper function of the decorated method.
    404 
    405     """
    406     @functools.wraps(method)
    407     def wrapper(instance, *args, **kwargs):
    408         """Log the name of the wrapped method before execution"""
    409         logging.info('\n<... %s ...>', method.__name__)
    410         method(instance, *args, **kwargs)
    411     return wrapper
    412 
    413 
    414 class BluetoothAdapterTests(test.test):
    415     """Server side bluetooth adapter tests.
    416 
    417     This test class tries to thoroughly verify most of the important work
    418     states of a bluetooth adapter.
    419 
    420     The various test methods are supposed to be invoked by actual autotest
    421     tests such as server/cros/site_tests/bluetooth_Adapter*.
    422 
    423     """
    424     version = 1
    425     ADAPTER_ACTION_SLEEP_SECS = 1
    426     ADAPTER_PAIRING_TIMEOUT_SECS = 60
    427     ADAPTER_CONNECTION_TIMEOUT_SECS = 30
    428     ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30
    429     ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3
    430     ADAPTER_DISCOVER_TIMEOUT_SECS = 60          # 30 seconds too short sometimes
    431     ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1
    432     ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30
    433     ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10
    434     ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1
    435 
    436     HID_REPORT_SLEEP_SECS = 1
    437 
    438     # Default suspend time in seconds for suspend resume.
    439     SUSPEND_TIME_SECS=10
    440 
    441     # hci0 is the default hci device if there is no external bluetooth dongle.
    442     EXPECTED_HCI = 'hci0'
    443 
    444     CLASS_OF_SERVICE_MASK = 0xFFE000
    445     CLASS_OF_DEVICE_MASK = 0x001FFF
    446 
    447     # Constants about advertising.
    448     DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 1280
    449     DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 1280
    450     ADVERTISING_INTERVAL_UNIT = 0.625
    451 
    452     # Error messages about advertising dbus methods.
    453     ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = (
    454             'org.bluez.Error.Failed: Failed to register advertisement')
    455     ERROR_INVALID_ADVERTISING_INTERVALS = (
    456             'org.bluez.Error.InvalidArguments: Invalid arguments')
    457 
    458     # Supported profiles by chrome os.
    459     SUPPORTED_UUIDS = {
    460             'HSP_AG_UUID': '00001112-0000-1000-8000-00805f9b34fb',
    461             'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb',
    462             'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb',
    463             'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb',
    464             'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb',
    465             'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'}
    466 
    467 
    468     def get_device(self, device_type):
    469         """Get the bluetooth device object.
    470 
    471         @param device_type : the bluetooth HID device type, e.g., 'MOUSE'
    472 
    473         @returns: the bluetooth device object
    474 
    475         """
    476         self.device_type = device_type
    477         if self.devices[device_type] is None:
    478             self.devices[device_type] = get_bluetooth_emulated_device(
    479                     self.host, device_type)
    480         return self.devices[device_type]
    481 
    482 
    483     def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS):
    484         """Suspend the DUT for a while and then resume.
    485 
    486         @param suspend_time: the suspend time in secs
    487 
    488         """
    489         logging.info('The DUT suspends for %d seconds...', suspend_time)
    490         try:
    491             self.host.suspend(suspend_time=suspend_time)
    492         except error.AutoservSuspendError:
    493             logging.error('The DUT did not suspend for %d seconds', suspend_time)
    494             pass
    495         logging.info('The DUT is waken up.')
    496 
    497 
    498     def _wait_for_condition(self, func, method_name,
    499                             timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS,
    500                             sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS):
    501         """Wait for the func() to become True.
    502 
    503         @param fun: the function to wait for.
    504         @param method_name: the invoking class method.
    505         @param timeout: number of seconds to wait before giving up.
    506         @param sleep_interval: the interval in seconds to sleep between
    507                 invoking func().
    508 
    509         @returns: the bluetooth device object
    510 
    511         """
    512 
    513         try:
    514             utils.poll_for_condition(condition=func,
    515                                      timeout=timeout,
    516                                      sleep_interval=sleep_interval,
    517                                      desc=('Waiting %s', method_name))
    518             return True
    519         except utils.TimeoutError as e:
    520             logging.error('%s: %s', method_name, e)
    521         except Exception as e:
    522             logging.error('%s: %s', method_name, e)
    523             err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
    524             logging.error(err)
    525         except:
    526             logging.error('%s: unexpected error', method_name)
    527         return False
    528 
    529 
    530     # -------------------------------------------------------------------
    531     # Adater standalone tests
    532     # -------------------------------------------------------------------
    533 
    534 
    535     @_test_retry_and_log
    536     def test_bluetoothd_running(self):
    537         """Test that bluetoothd is running."""
    538         return self.bluetooth_facade.is_bluetoothd_running()
    539 
    540 
    541     @_test_retry_and_log
    542     def test_start_bluetoothd(self):
    543         """Test that bluetoothd could be started successfully."""
    544         return self.bluetooth_facade.start_bluetoothd()
    545 
    546 
    547     @_test_retry_and_log
    548     def test_stop_bluetoothd(self):
    549         """Test that bluetoothd could be stopped successfully."""
    550         return self.bluetooth_facade.stop_bluetoothd()
    551 
    552 
    553     @_test_retry_and_log
    554     def test_adapter_work_state(self):
    555         """Test that the bluetooth adapter is in the correct working state.
    556 
    557         This includes that the adapter is detectable, is powered on,
    558         and its hci device is hci0.
    559         """
    560         has_adapter = self.bluetooth_facade.has_adapter()
    561         is_powered_on = self._wait_for_condition(
    562                 self.bluetooth_facade.is_powered_on, method_name())
    563         hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI
    564         self.results = {
    565                 'has_adapter': has_adapter,
    566                 'is_powered_on': is_powered_on,
    567                 'hci': hci}
    568         return all(self.results.values())
    569 
    570 
    571     @_test_retry_and_log
    572     def test_power_on_adapter(self):
    573         """Test that the adapter could be powered on successfully."""
    574         power_on = self.bluetooth_facade.set_powered(True)
    575         is_powered_on = self._wait_for_condition(
    576                 self.bluetooth_facade.is_powered_on, method_name())
    577 
    578         self.results = {'power_on': power_on, 'is_powered_on': is_powered_on}
    579         return all(self.results.values())
    580 
    581 
    582     @_test_retry_and_log
    583     def test_power_off_adapter(self):
    584         """Test that the adapter could be powered off successfully."""
    585         power_off = self.bluetooth_facade.set_powered(False)
    586         is_powered_off = self._wait_for_condition(
    587                 lambda: not self.bluetooth_facade.is_powered_on(),
    588                 method_name())
    589 
    590         self.results = {
    591                 'power_off': power_off,
    592                 'is_powered_off': is_powered_off}
    593         return all(self.results.values())
    594 
    595 
    596     @_test_retry_and_log
    597     def test_reset_on_adapter(self):
    598         """Test that the adapter could be reset on successfully.
    599 
    600         This includes restarting bluetoothd, and removing the settings
    601         and cached devices.
    602         """
    603         reset_on = self.bluetooth_facade.reset_on()
    604         is_powered_on = self._wait_for_condition(
    605                 self.bluetooth_facade.is_powered_on, method_name())
    606 
    607         self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on}
    608         return all(self.results.values())
    609 
    610 
    611     @_test_retry_and_log
    612     def test_reset_off_adapter(self):
    613         """Test that the adapter could be reset off successfully.
    614 
    615         This includes restarting bluetoothd, and removing the settings
    616         and cached devices.
    617         """
    618         reset_off = self.bluetooth_facade.reset_off()
    619         is_powered_off = self._wait_for_condition(
    620                 lambda: not self.bluetooth_facade.is_powered_on(),
    621                 method_name())
    622 
    623         self.results = {
    624                 'reset_off': reset_off,
    625                 'is_powered_off': is_powered_off}
    626         return all(self.results.values())
    627 
    628 
    629     @_test_retry_and_log
    630     def test_UUIDs(self):
    631         """Test that basic profiles are supported."""
    632         adapter_UUIDs = self.bluetooth_facade.get_UUIDs()
    633         self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values()
    634                         if uuid not in adapter_UUIDs]
    635         return not bool(self.results)
    636 
    637 
    638     @_test_retry_and_log
    639     def test_start_discovery(self):
    640         """Test that the adapter could start discovery."""
    641         start_discovery = self.bluetooth_facade.start_discovery()
    642         is_discovering = self._wait_for_condition(
    643                 self.bluetooth_facade.is_discovering, method_name())
    644 
    645         self.results = {
    646                 'start_discovery': start_discovery,
    647                 'is_discovering': is_discovering}
    648         return all(self.results.values())
    649 
    650 
    651     @_test_retry_and_log
    652     def test_stop_discovery(self):
    653         """Test that the adapter could stop discovery."""
    654         stop_discovery = self.bluetooth_facade.stop_discovery()
    655         is_not_discovering = self._wait_for_condition(
    656                 lambda: not self.bluetooth_facade.is_discovering(),
    657                 method_name())
    658 
    659         self.results = {
    660                 'stop_discovery': stop_discovery,
    661                 'is_not_discovering': is_not_discovering}
    662         return all(self.results.values())
    663 
    664 
    665     @_test_retry_and_log
    666     def test_discoverable(self):
    667         """Test that the adapter could be set discoverable."""
    668         set_discoverable = self.bluetooth_facade.set_discoverable(True)
    669         is_discoverable = self._wait_for_condition(
    670                 self.bluetooth_facade.is_discoverable, method_name())
    671 
    672         self.results = {
    673                 'set_discoverable': set_discoverable,
    674                 'is_discoverable': is_discoverable}
    675         return all(self.results.values())
    676 
    677 
    678     @_test_retry_and_log
    679     def test_nondiscoverable(self):
    680         """Test that the adapter could be set non-discoverable."""
    681         set_nondiscoverable = self.bluetooth_facade.set_discoverable(False)
    682         is_nondiscoverable = self._wait_for_condition(
    683                 lambda: not self.bluetooth_facade.is_discoverable(),
    684                 method_name())
    685 
    686         self.results = {
    687                 'set_nondiscoverable': set_nondiscoverable,
    688                 'is_nondiscoverable': is_nondiscoverable}
    689         return all(self.results.values())
    690 
    691 
    692     @_test_retry_and_log
    693     def test_pairable(self):
    694         """Test that the adapter could be set pairable."""
    695         set_pairable = self.bluetooth_facade.set_pairable(True)
    696         is_pairable = self._wait_for_condition(
    697                 self.bluetooth_facade.is_pairable, method_name())
    698 
    699         self.results = {
    700                 'set_pairable': set_pairable,
    701                 'is_pairable': is_pairable}
    702         return all(self.results.values())
    703 
    704 
    705     @_test_retry_and_log
    706     def test_nonpairable(self):
    707         """Test that the adapter could be set non-pairable."""
    708         set_nonpairable = self.bluetooth_facade.set_pairable(False)
    709         is_nonpairable = self._wait_for_condition(
    710                 lambda: not self.bluetooth_facade.is_pairable(), method_name())
    711 
    712         self.results = {
    713                 'set_nonpairable': set_nonpairable,
    714                 'is_nonpairable': is_nonpairable}
    715         return all(self.results.values())
    716 
    717 
    718     # -------------------------------------------------------------------
    719     # Tests about general discovering, pairing, and connection
    720     # -------------------------------------------------------------------
    721 
    722 
    723     @_test_retry_and_log
    724     def test_discover_device(self, device_address):
    725         """Test that the adapter could discover the specified device address.
    726 
    727         @param device_address: Address of the device.
    728 
    729         @returns: True if the device is found. False otherwise.
    730 
    731         """
    732         has_device_initially = False
    733         start_discovery = False
    734         device_discovered = False
    735         has_device = self.bluetooth_facade.has_device
    736 
    737         if has_device(device_address):
    738             has_device_initially = True
    739         elif self.bluetooth_facade.start_discovery():
    740             start_discovery = True
    741             try:
    742                 utils.poll_for_condition(
    743                         condition=(lambda: has_device(device_address)),
    744                         timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
    745                         sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
    746                         desc='Waiting for discovering %s' % device_address)
    747                 device_discovered = True
    748             except utils.TimeoutError as e:
    749                 logging.error('test_discover_device: %s', e)
    750             except Exception as e:
    751                 logging.error('test_discover_device: %s', e)
    752                 err = 'bluetoothd probably crashed. Check out /var/log/messages'
    753                 logging.error(err)
    754             except:
    755                 logging.error('test_discover_device: unexpected error')
    756 
    757         self.results = {
    758                 'has_device_initially': has_device_initially,
    759                 'start_discovery': start_discovery,
    760                 'device_discovered': device_discovered}
    761         return has_device_initially or device_discovered
    762 
    763 
    764     @_test_retry_and_log
    765     def test_pairing(self, device_address, pin, trusted=True):
    766         """Test that the adapter could pair with the device successfully.
    767 
    768         @param device_address: Address of the device.
    769         @param pin: pin code to pair with the device.
    770         @param trusted: indicating whether to set the device trusted.
    771 
    772         @returns: True if pairing succeeds. False otherwise.
    773 
    774         """
    775 
    776         def _pair_device():
    777             """Pair to the device.
    778 
    779             @returns: True if it could pair with the device. False otherwise.
    780 
    781             """
    782             return self.bluetooth_facade.pair_legacy_device(
    783                     device_address, pin, trusted,
    784                     self.ADAPTER_PAIRING_TIMEOUT_SECS)
    785 
    786 
    787         has_device = False
    788         paired = False
    789         if self.bluetooth_facade.has_device(device_address):
    790             has_device = True
    791             try:
    792                 utils.poll_for_condition(
    793                         condition=_pair_device,
    794                         timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
    795                         sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
    796                         desc='Waiting for pairing %s' % device_address)
    797                 paired = True
    798             except utils.TimeoutError as e:
    799                 logging.error('test_pairing: %s', e)
    800             except:
    801                 logging.error('test_pairing: unexpected error')
    802 
    803         self.results = {'has_device': has_device, 'paired': paired}
    804         return all(self.results.values())
    805 
    806 
    807     @_test_retry_and_log
    808     def test_remove_pairing(self, device_address):
    809         """Test that the adapter could remove the paired device.
    810 
    811         @param device_address: Address of the device.
    812 
    813         @returns: True if the device is removed successfully. False otherwise.
    814 
    815         """
    816         device_is_paired_initially = self.bluetooth_facade.device_is_paired(
    817                 device_address)
    818         remove_pairing = False
    819         pairing_removed = False
    820 
    821         if device_is_paired_initially:
    822             remove_pairing = self.bluetooth_facade.remove_device_object(
    823                     device_address)
    824             pairing_removed = not self.bluetooth_facade.device_is_paired(
    825                     device_address)
    826 
    827         self.results = {
    828                 'device_is_paired_initially': device_is_paired_initially,
    829                 'remove_pairing': remove_pairing,
    830                 'pairing_removed': pairing_removed}
    831         return all(self.results.values())
    832 
    833 
    834     def test_set_trusted(self, device_address, trusted=True):
    835         """Test whether the device with the specified address is trusted.
    836 
    837         @param device_address: Address of the device.
    838         @param trusted : True or False indicating if trusted is expected.
    839 
    840         @returns: True if the device's "Trusted" property is as specified;
    841                   False otherwise.
    842 
    843         """
    844 
    845         set_trusted = self.bluetooth_facade.set_trusted(
    846                 device_address, trusted)
    847 
    848         properties = self.bluetooth_facade.get_device_properties(
    849                 device_address)
    850         actual_trusted = properties.get('Trusted')
    851 
    852         self.results = {
    853                 'set_trusted': set_trusted,
    854                 'actual trusted': actual_trusted,
    855                 'expected trusted': trusted}
    856         return actual_trusted == trusted
    857 
    858 
    859     @_test_retry_and_log
    860     def test_connection_by_adapter(self, device_address):
    861         """Test that the adapter of dut could connect to the device successfully
    862 
    863         It is the caller's responsibility to pair to the device before
    864         doing connection.
    865 
    866         @param device_address: Address of the device.
    867 
    868         @returns: True if connection is performed. False otherwise.
    869 
    870         """
    871 
    872         def _connect_device():
    873             """Connect to the device.
    874 
    875             @returns: True if it could connect to the device. False otherwise.
    876 
    877             """
    878             return self.bluetooth_facade.connect_device(device_address)
    879 
    880 
    881         has_device = False
    882         connected = False
    883         if self.bluetooth_facade.has_device(device_address):
    884             has_device = True
    885             try:
    886                 utils.poll_for_condition(
    887                         condition=_connect_device,
    888                         timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
    889                         sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
    890                         desc='Waiting for connecting to %s' % device_address)
    891                 connected = True
    892             except utils.TimeoutError as e:
    893                 logging.error('test_connection_by_adapter: %s', e)
    894             except:
    895                 logging.error('test_connection_by_adapter: unexpected error')
    896 
    897         self.results = {'has_device': has_device, 'connected': connected}
    898         return all(self.results.values())
    899 
    900 
    901     @_test_retry_and_log
    902     def test_disconnection_by_adapter(self, device_address):
    903         """Test that the adapter of dut could disconnect the device successfully
    904 
    905         @param device_address: Address of the device.
    906 
    907         @returns: True if disconnection is performed. False otherwise.
    908 
    909         """
    910         return self.bluetooth_facade.disconnect_device(device_address)
    911 
    912 
    913     def _enter_command_mode(self, device):
    914         """Let the device enter command mode.
    915 
    916         Before using the device, need to call this method to make sure
    917         it is in the command mode.
    918 
    919         @param device: the bluetooth HID device
    920 
    921         @returns: True if successful. False otherwise.
    922 
    923         """
    924         result = _is_successful(_run_method(device.EnterCommandMode,
    925                                             'EnterCommandMode'))
    926         if not result:
    927             logging.error('EnterCommandMode failed')
    928         return result
    929 
    930 
    931     @_test_retry_and_log
    932     def test_connection_by_device(self, device):
    933         """Test that the device could connect to the adapter successfully.
    934 
    935         This emulates the behavior that a device may initiate a
    936         connection request after waking up from power saving mode.
    937 
    938         @param device: the bluetooth HID device
    939 
    940         @returns: True if connection is performed correctly by device and
    941                   the adapter also enters connection state.
    942                   False otherwise.
    943 
    944         """
    945         if not self._enter_command_mode(device):
    946             return False
    947 
    948         method_name = 'test_connection_by_device'
    949         connection_by_device = False
    950         adapter_address = self.bluetooth_facade.address
    951         try:
    952             device.ConnectToRemoteAddress(adapter_address)
    953             connection_by_device = True
    954         except Exception as e:
    955             logging.error('%s (device): %s', method_name, e)
    956         except:
    957             logging.error('%s (device): unexpected error', method_name)
    958 
    959         connection_seen_by_adapter = False
    960         device_address = device.address
    961         device_is_connected = self.bluetooth_facade.device_is_connected
    962         try:
    963             utils.poll_for_condition(
    964                     condition=lambda: device_is_connected(device_address),
    965                     timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
    966                     desc=('Waiting for connection from %s' % device_address))
    967             connection_seen_by_adapter = True
    968         except utils.TimeoutError as e:
    969             logging.error('%s (adapter): %s', method_name, e)
    970         except:
    971             logging.error('%s (adapter): unexpected error', method_name)
    972 
    973         self.results = {
    974                 'connection_by_device': connection_by_device,
    975                 'connection_seen_by_adapter': connection_seen_by_adapter}
    976         return all(self.results.values())
    977 
    978 
    979     @_test_retry_and_log
    980     def test_disconnection_by_device(self, device):
    981         """Test that the device could disconnect the adapter successfully.
    982 
    983         This emulates the behavior that a device may initiate a
    984         disconnection request before going into power saving mode.
    985 
    986         Note: should not try to enter command mode in this method. When
    987               a device is connected, there is no way to enter command mode.
    988               One could just issue a special disconnect command without
    989               entering command mode.
    990 
    991         @param device: the bluetooth HID device
    992 
    993         @returns: True if disconnection is performed correctly by device and
    994                   the adapter also observes the disconnection.
    995                   False otherwise.
    996 
    997         """
    998         method_name = 'test_disconnection_by_device'
    999         disconnection_by_device = False
   1000         try:
   1001             device.Disconnect()
   1002             disconnection_by_device = True
   1003         except Exception as e:
   1004             logging.error('%s (device): %s', method_name, e)
   1005         except:
   1006             logging.error('%s (device): unexpected error', method_name)
   1007 
   1008         disconnection_seen_by_adapter = False
   1009         device_address = device.address
   1010         device_is_connected = self.bluetooth_facade.device_is_connected
   1011         try:
   1012             utils.poll_for_condition(
   1013                     condition=lambda: not device_is_connected(device_address),
   1014                     timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS,
   1015                     desc=('Waiting for disconnection from %s' % device_address))
   1016             disconnection_seen_by_adapter = True
   1017         except utils.TimeoutError as e:
   1018             logging.error('%s (adapter): %s', method_name, e)
   1019         except:
   1020             logging.error('%s (adapter): unexpected error', method_name)
   1021 
   1022         self.results = {
   1023                 'disconnection_by_device': disconnection_by_device,
   1024                 'disconnection_seen_by_adapter': disconnection_seen_by_adapter}
   1025         return all(self.results.values())
   1026 
   1027     @_test_retry_and_log
   1028     def test_device_is_connected(self, device_address):
   1029         """Test that device address given is currently connected.
   1030 
   1031         @param device_address: Address of the device.
   1032 
   1033         @returns: True if the device is connected.
   1034                   False otherwise.
   1035 
   1036         """
   1037         def _is_connected():
   1038             """Test if device is connected.
   1039 
   1040             @returns: True if device is connected. False otherwise.
   1041 
   1042             """
   1043             return self.bluetooth_facade.device_is_connected(device_address)
   1044 
   1045 
   1046         method_name = 'test_device_is_connected'
   1047         has_device = False
   1048         connected = False
   1049         if self.bluetooth_facade.has_device(device_address):
   1050             has_device = True
   1051             try:
   1052                 utils.poll_for_condition(
   1053                         condition=_is_connected,
   1054                         timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
   1055                         sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
   1056                         desc='Waiting for connection to %s' % device_address)
   1057                 connected = True
   1058             except utils.TimeoutError as e:
   1059                 logging.error('%s: %s', method_name, e)
   1060             except:
   1061                 logging.error('%s: unexpected error', method_name)
   1062         self.results = {'has_device': has_device, 'connected': connected}
   1063         return all(self.results.values())
   1064 
   1065 
   1066     @_test_retry_and_log
   1067     def test_device_is_paired(self, device_address):
   1068         """Test that the device address given is currently paired.
   1069 
   1070         @param device_address: Address of the device.
   1071 
   1072         @returns: True if the device is paired.
   1073                   False otherwise.
   1074 
   1075         """
   1076         def _is_paired():
   1077             """Test if device is paired.
   1078 
   1079             @returns: True if device is paired. False otherwise.
   1080 
   1081             """
   1082             return self.bluetooth_facade.device_is_paired(device_address)
   1083 
   1084 
   1085         method_name = 'test_device_is_paired'
   1086         has_device = False
   1087         paired = False
   1088         if self.bluetooth_facade.has_device(device_address):
   1089             has_device = True
   1090             try:
   1091                 utils.poll_for_condition(
   1092                         condition=_is_paired,
   1093                         timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
   1094                         sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
   1095                         desc='Waiting for connection to %s' % device_address)
   1096                 paired = True
   1097             except utils.TimeoutError as e:
   1098                 logging.error('%s: %s', method_name, e)
   1099             except:
   1100                 logging.error('%s: unexpected error', method_name)
   1101         self.results = {'has_device': has_device, 'paired': paired}
   1102         return all(self.results.values())
   1103 
   1104 
   1105     def _get_device_name(self, device_address):
   1106         """Get the device name.
   1107 
   1108         @returns: True if the device name is derived. None otherwise.
   1109 
   1110         """
   1111         properties = self.bluetooth_facade.get_device_properties(
   1112                 device_address)
   1113         self.discovered_device_name = properties.get('Name')
   1114         return bool(self.discovered_device_name)
   1115 
   1116 
   1117     @_test_retry_and_log
   1118     def test_device_name(self, device_address, expected_device_name):
   1119         """Test that the device name discovered by the adapter is correct.
   1120 
   1121         @param device_address: Address of the device.
   1122         @param expected_device_name: the bluetooth device name
   1123 
   1124         @returns: True if the discovered_device_name is expected_device_name.
   1125                   False otherwise.
   1126 
   1127         """
   1128         try:
   1129             utils.poll_for_condition(
   1130                     condition=lambda: self._get_device_name(device_address),
   1131                     timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS,
   1132                     sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
   1133                     desc='Waiting for device name of %s' % device_address)
   1134         except utils.TimeoutError as e:
   1135             logging.error('test_device_name: %s', e)
   1136         except:
   1137             logging.error('test_device_name: unexpected error')
   1138 
   1139         self.results = {
   1140                 'expected_device_name': expected_device_name,
   1141                 'discovered_device_name': self.discovered_device_name}
   1142         return self.discovered_device_name == expected_device_name
   1143 
   1144 
   1145     @_test_retry_and_log
   1146     def test_device_class_of_service(self, device_address,
   1147                                      expected_class_of_service):
   1148         """Test that the discovered device class of service is as expected.
   1149 
   1150         @param device_address: Address of the device.
   1151         @param expected_class_of_service: the expected class of service
   1152 
   1153         @returns: True if the discovered class of service matches the
   1154                   expected class of service. False otherwise.
   1155 
   1156         """
   1157         properties = self.bluetooth_facade.get_device_properties(
   1158                 device_address)
   1159         device_class = properties.get('Class')
   1160         discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK
   1161                                        if device_class else None)
   1162 
   1163         self.results = {
   1164                 'device_class': device_class,
   1165                 'expected_class_of_service': expected_class_of_service,
   1166                 'discovered_class_of_service': discovered_class_of_service}
   1167         return discovered_class_of_service == expected_class_of_service
   1168 
   1169 
   1170     @_test_retry_and_log
   1171     def test_device_class_of_device(self, device_address,
   1172                                     expected_class_of_device):
   1173         """Test that the discovered device class of device is as expected.
   1174 
   1175         @param device_address: Address of the device.
   1176         @param expected_class_of_device: the expected class of device
   1177 
   1178         @returns: True if the discovered class of device matches the
   1179                   expected class of device. False otherwise.
   1180 
   1181         """
   1182         properties = self.bluetooth_facade.get_device_properties(
   1183                 device_address)
   1184         device_class = properties.get('Class')
   1185         discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK
   1186                                       if device_class else None)
   1187 
   1188         self.results = {
   1189                 'device_class': device_class,
   1190                 'expected_class_of_device': expected_class_of_device,
   1191                 'discovered_class_of_device': discovered_class_of_device}
   1192         return discovered_class_of_device == expected_class_of_device
   1193 
   1194 
   1195     def _get_btmon_log(self, method, logging_timespan=1):
   1196         """Capture the btmon log when executing the specified method.
   1197 
   1198         @param method: the method to capture log.
   1199                        The method would be executed only when it is not None.
   1200                        This allows us to simply capture btmon log without
   1201                        executing any command.
   1202         @param logging_timespan: capture btmon log for logging_timespan seconds.
   1203 
   1204         """
   1205         self.bluetooth_le_facade.btmon_start()
   1206         self.advertising_msg = method() if method else ''
   1207         time.sleep(logging_timespan)
   1208         self.bluetooth_le_facade.btmon_stop()
   1209 
   1210 
   1211     def convert_to_adv_jiffies(self, adv_interval_ms):
   1212         """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms.
   1213 
   1214         @param adv_interval_ms: an advertising interval
   1215 
   1216         @returns: the equivalent jiffies
   1217 
   1218         """
   1219         return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT
   1220 
   1221 
   1222     def compute_duration(self, max_adv_interval_ms):
   1223         """Compute duration from max_adv_interval_ms.
   1224 
   1225         Advertising duration is calculated approximately as
   1226             duration = max_adv_interval_ms / 1000.0 * 1.1
   1227 
   1228         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1229 
   1230         @returns: duration in seconds.
   1231 
   1232         """
   1233         return max_adv_interval_ms / 1000.0 * 1.1
   1234 
   1235 
   1236     def compute_logging_timespan(self, max_adv_interval_ms):
   1237         """Compute the logging timespan from max_adv_interval_ms.
   1238 
   1239         The logging timespan is the time needed to record btmon log.
   1240 
   1241         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1242 
   1243         @returns: logging_timespan in seconds.
   1244 
   1245         """
   1246         duration = self.compute_duration(max_adv_interval_ms)
   1247         logging_timespan = max(self.count_advertisements * duration, 1)
   1248         return logging_timespan
   1249 
   1250 
   1251     @_test_retry_and_log(False)
   1252     def test_check_duration_and_intervals(self, min_adv_interval_ms,
   1253                                           max_adv_interval_ms,
   1254                                           number_advertisements):
   1255         """Verify that every advertisements are scheduled according to the
   1256         duration and intervals.
   1257 
   1258         An advertisement would be scheduled at the time span of
   1259              duration * number_advertisements
   1260 
   1261         @param min_adv_interval_ms: min advertising interval in milliseconds.
   1262         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1263         @param number_advertisements: the number of existing advertisements
   1264 
   1265         @returns: True if all advertisements are scheduled based on the
   1266                 duration and intervals.
   1267 
   1268         """
   1269 
   1270 
   1271         def within_tolerance(expected, actual, max_error=0.1):
   1272             """Determine if the percent error is within specified tolerance.
   1273 
   1274             @param expected: The expected value.
   1275             @param actual: The actual (measured) value.
   1276             @param max_error: The maximum percent error acceptable.
   1277 
   1278             @returns: True if the percent error is less than or equal to
   1279                       max_error.
   1280             """
   1281             return abs(expected - actual) / abs(expected) <= max_error
   1282 
   1283 
   1284         start_str = 'Set Advertising Intervals:'
   1285         search_strings = ['HCI Command: LE Set Advertising Data', 'Company']
   1286         search_str = '|'.join(search_strings)
   1287 
   1288         contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
   1289                                                       start_str=start_str)
   1290 
   1291         # Company string looks like
   1292         #   Company: not assigned (65283)
   1293         company_pattern = re.compile('Company:.*\((\d*)\)')
   1294 
   1295         # The string with timestamp looks like
   1296         #   < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236
   1297         set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)'
   1298         set_adv_time_pattern = re.compile(set_adv_time_str)
   1299 
   1300         adv_timestamps = {}
   1301         timestamp = None
   1302         manufacturer_id = None
   1303         for line in contents:
   1304             result = set_adv_time_pattern.search(line)
   1305             if result:
   1306                 timestamp = float(result.group(1))
   1307 
   1308             result = company_pattern.search(line)
   1309             if result:
   1310                 manufacturer_id = '0x%04x' % int(result.group(1))
   1311 
   1312             if timestamp and manufacturer_id:
   1313                 if manufacturer_id not in adv_timestamps:
   1314                     adv_timestamps[manufacturer_id] = []
   1315                 adv_timestamps[manufacturer_id].append(timestamp)
   1316                 timestamp = None
   1317                 manufacturer_id = None
   1318 
   1319         duration = self.compute_duration(max_adv_interval_ms)
   1320         expected_timespan = duration * number_advertisements
   1321 
   1322         check_duration = True
   1323         for manufacturer_id, values in adv_timestamps.iteritems():
   1324             logging.debug('manufacturer_id %s: %s', manufacturer_id, values)
   1325             timespans = [values[i] - values[i - 1]
   1326                          for i in xrange(1, len(values))]
   1327             errors = [timespans[i] for i in xrange(len(timespans))
   1328                       if not within_tolerance(expected_timespan, timespans[i])]
   1329             logging.debug('timespans: %s', timespans)
   1330             logging.debug('errors: %s', errors)
   1331             if bool(errors):
   1332                 check_duration = False
   1333 
   1334         # Verify that the advertising intervals are also correct.
   1335         min_adv_interval_ms_found, max_adv_interval_ms_found = (
   1336                 self._verify_advertising_intervals(min_adv_interval_ms,
   1337                                                    max_adv_interval_ms))
   1338 
   1339         self.results = {
   1340                 'check_duration': check_duration,
   1341                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1342                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1343         }
   1344         return all(self.results.values())
   1345 
   1346 
   1347     def _get_min_max_intervals_strings(self, min_adv_interval_ms,
   1348                                        max_adv_interval_ms):
   1349         """Get the min and max advertising intervals strings shown in btmon.
   1350 
   1351         Advertising intervals shown in the btmon log look like
   1352             Min advertising interval: 1280.000 msec (0x0800)
   1353             Max advertising interval: 1280.000 msec (0x0800)
   1354 
   1355         @param min_adv_interval_ms: min advertising interval in milliseconds.
   1356         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1357 
   1358         @returns: the min and max intervals strings.
   1359 
   1360         """
   1361         min_str = ('Min advertising interval: %.3f msec (0x%04x)' %
   1362                    (min_adv_interval_ms,
   1363                     min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
   1364         logging.debug('min_adv_interval_ms: %s', min_str)
   1365 
   1366         max_str = ('Max advertising interval: %.3f msec (0x%04x)' %
   1367                    (max_adv_interval_ms,
   1368                     max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
   1369         logging.debug('max_adv_interval_ms: %s', max_str)
   1370 
   1371         return (min_str, max_str)
   1372 
   1373 
   1374     def _verify_advertising_intervals(self, min_adv_interval_ms,
   1375                                       max_adv_interval_ms):
   1376         """Verify min and max advertising intervals.
   1377 
   1378         Advertising intervals look like
   1379             Min advertising interval: 1280.000 msec (0x0800)
   1380             Max advertising interval: 1280.000 msec (0x0800)
   1381 
   1382         @param min_adv_interval_ms: min advertising interval in milliseconds.
   1383         @param max_adv_interval_ms: max advertising interval in milliseconds.
   1384 
   1385         @returns: a tuple of (True, True) if both min and max advertising
   1386             intervals could be found. Otherwise, the corresponding element
   1387             in the tuple if False.
   1388 
   1389         """
   1390         min_str, max_str = self._get_min_max_intervals_strings(
   1391                 min_adv_interval_ms, max_adv_interval_ms)
   1392 
   1393         min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str)
   1394         max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str)
   1395 
   1396         return min_adv_interval_ms_found, max_adv_interval_ms_found
   1397 
   1398 
   1399     @_test_retry_and_log(False)
   1400     def test_register_advertisement(self, advertisement_data, instance_id,
   1401                                     min_adv_interval_ms, max_adv_interval_ms):
   1402         """Verify that an advertisement is registered correctly.
   1403 
   1404         This test verifies the following data:
   1405         - advertisement added
   1406         - manufacturer data
   1407         - service UUIDs
   1408         - service data
   1409         - advertising intervals
   1410         - advertising enabled
   1411 
   1412         @param advertisement_data: the data of an advertisement to register.
   1413         @param instance_id: the instance id which starts at 1.
   1414         @param min_adv_interval_ms: min_adv_interval in milliseconds.
   1415         @param max_adv_interval_ms: max_adv_interval in milliseconds.
   1416 
   1417         @returns: True if the advertisement is registered correctly.
   1418                   False otherwise.
   1419 
   1420         """
   1421         # When registering a new advertisement, it is possible that another
   1422         # instance is advertising. It may need to wait for all other
   1423         # advertisements to complete advertising once.
   1424         self.count_advertisements += 1
   1425         logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
   1426         self._get_btmon_log(
   1427                 lambda: self.bluetooth_le_facade.register_advertisement(
   1428                         advertisement_data),
   1429                 logging_timespan=logging_timespan)
   1430 
   1431         # Verify that a new advertisement is added.
   1432         advertisement_added = (
   1433                 self.bluetooth_le_facade.btmon_find('Advertising Added') and
   1434                 self.bluetooth_le_facade.btmon_find('Instance: %d' %
   1435                                                     instance_id))
   1436 
   1437         # Verify that the manufacturer data could be found.
   1438         manufacturer_data = advertisement_data.get('ManufacturerData', '')
   1439         for manufacturer_id in manufacturer_data:
   1440             # The 'not assigned' text below means the manufacturer id
   1441             # is not actually assigned to any real manufacturer.
   1442             # For real 16-bit manufacturer UUIDs, refer to
   1443             #  https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members
   1444             manufacturer_data_found = self.bluetooth_le_facade.btmon_find(
   1445                     'Company: not assigned (%d)' % int(manufacturer_id, 16))
   1446 
   1447         # Verify that all service UUIDs could be found.
   1448         service_uuids_found = True
   1449         for uuid in advertisement_data.get('ServiceUUIDs', []):
   1450             # Service UUIDs looks like ['0x180D', '0x180F']
   1451             #   Heart Rate (0x180D)
   1452             #   Battery Service (0x180F)
   1453             # For actual 16-bit service UUIDs, refer to
   1454             #   https://www.bluetooth.com/specifications/gatt/services
   1455             if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid):
   1456                 service_uuids_found = False
   1457                 break
   1458 
   1459         # Verify service data.
   1460         service_data_found = True
   1461         for uuid, data in advertisement_data.get('ServiceData', {}).items():
   1462             # A service data looks like
   1463             #   Service Data (UUID 0x9999): 0001020304
   1464             # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04]
   1465             data_str = ''.join(map(lambda n: '%02x' % n, data))
   1466             if not self.bluetooth_le_facade.btmon_find(
   1467                     'Service Data (UUID 0x%s): %s' % (uuid, data_str)):
   1468                 service_data_found = False
   1469                 break
   1470 
   1471         # Verify that the advertising intervals are correct.
   1472         min_adv_interval_ms_found, max_adv_interval_ms_found = (
   1473                 self._verify_advertising_intervals(min_adv_interval_ms,
   1474                                                    max_adv_interval_ms))
   1475 
   1476         # Verify advertising is enabled.
   1477         advertising_enabled = self.bluetooth_le_facade.btmon_find(
   1478                 'Advertising: Enabled (0x01)')
   1479 
   1480         self.results = {
   1481                 'advertisement_added': advertisement_added,
   1482                 'manufacturer_data_found': manufacturer_data_found,
   1483                 'service_uuids_found': service_uuids_found,
   1484                 'service_data_found': service_data_found,
   1485                 'min_adv_interval_ms_found': min_adv_interval_ms_found,
   1486                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1487                 'advertising_enabled': advertising_enabled,
   1488         }
   1489         return all(self.results.values())
   1490 
   1491 
   1492     @_test_retry_and_log(False)
   1493     def test_fail_to_register_advertisement(self, advertisement_data,
   1494                                             min_adv_interval_ms,
   1495                                             max_adv_interval_ms):
   1496         """Verify that failure is incurred when max advertisements are reached.
   1497 
   1498         This test verifies that a registration failure is incurred when
   1499         max advertisements are reached. The error message looks like:
   1500 
   1501             org.bluez.Error.Failed: Maximum advertisements reached
   1502 
   1503         @param advertisement_data: the advertisement to register.
   1504         @param min_adv_interval_ms: min_adv_interval in milliseconds.
   1505         @param max_adv_interval_ms: max_adv_interval in milliseconds.
   1506 
   1507         @returns: True if the error message is received correctly.
   1508                   False otherwise.
   1509 
   1510         """
   1511         logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
   1512         self._get_btmon_log(
   1513                 lambda: self.bluetooth_le_facade.register_advertisement(
   1514                         advertisement_data),
   1515                 logging_timespan=logging_timespan)
   1516 
   1517         # Verify that it failed to register advertisement due to the fact
   1518         # that max advertisements are reached.
   1519         failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT
   1520                                     in self.advertising_msg)
   1521 
   1522         # Verify that no new advertisement is added.
   1523         advertisement_not_added = not self.bluetooth_le_facade.btmon_find(
   1524                 'Advertising Added:')
   1525 
   1526         # Verify that the advertising intervals are correct.
   1527         min_adv_interval_ms_found, max_adv_interval_ms_found = (
   1528                 self._verify_advertising_intervals(min_adv_interval_ms,
   1529                                                    max_adv_interval_ms))
   1530 
   1531         # Verify advertising remains enabled.
   1532         advertising_enabled = self.bluetooth_le_facade.btmon_find(
   1533                 'Advertising: Enabled (0x01)')
   1534 
   1535         self.results = {
   1536                 'failed_to_register_error': failed_to_register_error,
   1537                 'advertisement_not_added': advertisement_not_added,
   1538                 'min_adv_interval_ms_found': min_adv_interval_ms_found,
   1539                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1540                 'advertising_enabled': advertising_enabled,
   1541         }
   1542         return all(self.results.values())
   1543 
   1544 
   1545     @_test_retry_and_log(False)
   1546     def test_unregister_advertisement(self, advertisement_data, instance_id,
   1547                                       advertising_disabled):
   1548         """Verify that an advertisement is unregistered correctly.
   1549 
   1550         This test verifies the following data:
   1551         - advertisement removed
   1552         - advertising status: enabled if there are advertisements left;
   1553                               disabled otherwise.
   1554 
   1555         @param advertisement_data: the data of an advertisement to unregister.
   1556         @param instance_id: the instance id of the advertisement to remove.
   1557         @param advertising_disabled: is advertising disabled? This happens
   1558                 only when all advertisements are removed.
   1559 
   1560         @returns: True if the advertisement is unregistered correctly.
   1561                   False otherwise.
   1562 
   1563         """
   1564         self.count_advertisements -= 1
   1565         self._get_btmon_log(
   1566                 lambda: self.bluetooth_le_facade.unregister_advertisement(
   1567                         advertisement_data))
   1568 
   1569         # Verify that the advertisement is removed.
   1570         advertisement_removed = (
   1571                 self.bluetooth_le_facade.btmon_find('Advertising Removed') and
   1572                 self.bluetooth_le_facade.btmon_find('Instance: %d' %
   1573                                                     instance_id))
   1574 
   1575         # If advertising_disabled is True, there should be no log like
   1576         #       'Advertising: Enabled (0x01)'
   1577         # If advertising_disabled is False, there should be log like
   1578         #       'Advertising: Enabled (0x01)'
   1579 
   1580         # Only need to check advertising status when the last advertisement
   1581         # is removed. For any other advertisements prior to the last one,
   1582         # we may or may not observe 'Advertising: Enabled (0x01)' message.
   1583         # Hence, the test would become flaky if we insist to see that message.
   1584         # A possible workaround is to sleep for a while and then check the
   1585         # message. The drawback is that we may need to wait up to 10 seconds
   1586         # if the advertising duration and intervals are long.
   1587         # In a test case, we always run test_check_duration_and_intervals()
   1588         # to check if advertising duration and intervals are correct after
   1589         # un-registering one or all advertisements, it is safe to do so.
   1590         advertising_enabled_found = self.bluetooth_le_facade.btmon_find(
   1591                 'Advertising: Enabled (0x01)')
   1592         advertising_disabled_found = self.bluetooth_le_facade.btmon_find(
   1593                 'Advertising: Disabled (0x00)')
   1594         advertising_status_correct = not advertising_disabled or (
   1595                 advertising_disabled_found and not advertising_enabled_found)
   1596 
   1597         self.results = {
   1598                 'advertisement_removed': advertisement_removed,
   1599                 'advertising_status_correct': advertising_status_correct,
   1600         }
   1601         return all(self.results.values())
   1602 
   1603 
   1604     @_test_retry_and_log(False)
   1605     def test_set_advertising_intervals(self, min_adv_interval_ms,
   1606                                        max_adv_interval_ms):
   1607         """Verify that new advertising intervals are set correctly.
   1608 
   1609         Note that setting advertising intervals does not enable/disable
   1610         advertising. Hence, there is no need to check the advertising
   1611         status.
   1612 
   1613         @param min_adv_interval_ms: the min advertising interval in ms.
   1614         @param max_adv_interval_ms: the max advertising interval in ms.
   1615 
   1616         @returns: True if the new advertising intervals are correct.
   1617                   False otherwise.
   1618 
   1619         """
   1620         self._get_btmon_log(
   1621                 lambda: self.bluetooth_le_facade.set_advertising_intervals(
   1622                         min_adv_interval_ms, max_adv_interval_ms))
   1623 
   1624         # Verify the new advertising intervals.
   1625         # With intervals of 200 ms and 200 ms, the log looks like
   1626         #   bluetoothd: Set Advertising Intervals: 0x0140, 0x0140
   1627         txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x'
   1628         adv_intervals_found = self.bluetooth_le_facade.btmon_find(
   1629                 txt % (self.convert_to_adv_jiffies(min_adv_interval_ms),
   1630                        self.convert_to_adv_jiffies(max_adv_interval_ms)))
   1631 
   1632         self.results = {'adv_intervals_found': adv_intervals_found}
   1633         return all(self.results.values())
   1634 
   1635 
   1636     @_test_retry_and_log(False)
   1637     def test_fail_to_set_advertising_intervals(
   1638             self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms,
   1639             orig_min_adv_interval_ms, orig_max_adv_interval_ms):
   1640         """Verify that setting invalid advertising intervals results in error.
   1641 
   1642         If invalid min/max advertising intervals are given, it would incur
   1643         the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'.
   1644         Note that valid advertising intervals fall between 20 ms and 10,240 ms.
   1645 
   1646         @param invalid_min_adv_interval_ms: the invalid min advertising interval
   1647                 in ms.
   1648         @param invalid_max_adv_interval_ms: the invalid max advertising interval
   1649                 in ms.
   1650         @param orig_min_adv_interval_ms: the original min advertising interval
   1651                 in ms.
   1652         @param orig_max_adv_interval_ms: the original max advertising interval
   1653                 in ms.
   1654 
   1655         @returns: True if it fails to set invalid advertising intervals.
   1656                   False otherwise.
   1657 
   1658         """
   1659         self._get_btmon_log(
   1660                 lambda: self.bluetooth_le_facade.set_advertising_intervals(
   1661                         invalid_min_adv_interval_ms,
   1662                         invalid_max_adv_interval_ms))
   1663 
   1664         # Verify that the invalid error is observed in the dbus error callback
   1665         # message.
   1666         invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in
   1667                                    self.advertising_msg)
   1668 
   1669         # Verify that the min/max advertising intervals remain the same
   1670         # after setting the invalid advertising intervals.
   1671         #
   1672         # In btmon log, we would see the following message first.
   1673         #    bluetoothd: Set Advertising Intervals: 0x0010, 0x0010
   1674         # And then, we should check if "Min advertising interval" and
   1675         # "Max advertising interval" remain the same.
   1676         start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % (
   1677                 self.convert_to_adv_jiffies(invalid_min_adv_interval_ms),
   1678                 self.convert_to_adv_jiffies(invalid_max_adv_interval_ms))
   1679 
   1680         search_strings = ['Min advertising interval:',
   1681                           'Max advertising interval:']
   1682         search_str = '|'.join(search_strings)
   1683 
   1684         contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
   1685                                                       start_str=start_str)
   1686 
   1687         # The min/max advertising intervals of all advertisements should remain
   1688         # the same as the previous valid ones.
   1689         min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec'
   1690         min_max_pattern = re.compile(min_max_str)
   1691         correct_orig_min_adv_interval = True
   1692         correct_orig_max_adv_interval = True
   1693         for line in contents:
   1694             result = min_max_pattern.search(line)
   1695             if result:
   1696                 interval = float(result.group(1))
   1697                 if 'Min' in line and interval != orig_min_adv_interval_ms:
   1698                     correct_orig_min_adv_interval = False
   1699                 elif 'Max' in line and interval != orig_max_adv_interval_ms:
   1700                     correct_orig_max_adv_interval = False
   1701 
   1702         self.results = {
   1703                 'invalid_intervals_error': invalid_intervals_error,
   1704                 'correct_orig_min_adv_interval': correct_orig_min_adv_interval,
   1705                 'correct_orig_max_adv_interval': correct_orig_max_adv_interval}
   1706 
   1707         return all(self.results.values())
   1708 
   1709 
   1710     @_test_retry_and_log(False)
   1711     def test_check_advertising_intervals(self, min_adv_interval_ms,
   1712                                          max_adv_interval_ms):
   1713         """Verify that the advertising intervals are as expected.
   1714 
   1715         @param min_adv_interval_ms: the min advertising interval in ms.
   1716         @param max_adv_interval_ms: the max advertising interval in ms.
   1717 
   1718         @returns: True if the advertising intervals are correct.
   1719                   False otherwise.
   1720 
   1721         """
   1722         self._get_btmon_log(None)
   1723 
   1724         # Verify that the advertising intervals are correct.
   1725         min_adv_interval_ms_found, max_adv_interval_ms_found = (
   1726                 self._verify_advertising_intervals(min_adv_interval_ms,
   1727                                                    max_adv_interval_ms))
   1728 
   1729         self.results = {
   1730                 'min_adv_interval_ms_found': min_adv_interval_ms_found,
   1731                 'max_adv_interval_ms_found': max_adv_interval_ms_found,
   1732         }
   1733         return all(self.results.values())
   1734 
   1735 
   1736     @_test_retry_and_log(False)
   1737     def test_reset_advertising(self, instance_ids=[]):
   1738         """Verify that advertising is reset correctly.
   1739 
   1740         Note that reset advertising would set advertising intervals to
   1741         the default values. However, we would not be able to observe
   1742         the values change until new advertisements are registered.
   1743         Therefore, it is required that a test_register_advertisement()
   1744         test is conducted after this test.
   1745 
   1746         If instance_ids is [], all advertisements would still be removed
   1747         if there are any. However, no need to check 'Advertising Removed'
   1748         in btmon log since we may or may not be able to observe the message.
   1749         This feature is needed if this test is invoked as the first one in
   1750         a test case to reset advertising. In this situation, this test does
   1751         not know how many advertisements exist.
   1752 
   1753         @param instance_ids: the list of instance IDs that should be removed.
   1754 
   1755         @returns: True if advertising is reset correctly.
   1756                   False otherwise.
   1757 
   1758         """
   1759         self.count_advertisements = 0
   1760         self._get_btmon_log(
   1761                 lambda: self.bluetooth_le_facade.reset_advertising())
   1762 
   1763         # Verify that every advertisement is removed. When an advertisement
   1764         # with instance id 1 is removed, the log looks like
   1765         #   Advertising Removed
   1766         #       instance: 1
   1767         if len(instance_ids) > 0:
   1768             advertisement_removed = self.bluetooth_le_facade.btmon_find(
   1769                     'Advertising Removed')
   1770             if advertisement_removed:
   1771                 for instance_id in instance_ids:
   1772                     txt = 'Instance: %d' % instance_id
   1773                     if not self.bluetooth_le_facade.btmon_find(txt):
   1774                         advertisement_removed = False
   1775                         break
   1776         else:
   1777             advertisement_removed = True
   1778 
   1779         if not advertisement_removed:
   1780             logging.error('Failed to remove advertisement')
   1781 
   1782         # Verify that "Reset Advertising Intervals" command has been issued.
   1783         reset_advertising_intervals = self.bluetooth_le_facade.btmon_find(
   1784                 'bluetoothd: Reset Advertising Intervals')
   1785 
   1786         # Verify the advertising is disabled.
   1787         advertising_disabled_observied = self.bluetooth_le_facade.btmon_find(
   1788                 'Advertising: Disabled')
   1789         # If there are no existing advertisements, we may not observe
   1790         # 'Advertising: Disabled'.
   1791         advertising_disabled = (instance_ids == [] or
   1792                                 advertising_disabled_observied)
   1793 
   1794         self.results = {
   1795                 'advertisement_removed': advertisement_removed,
   1796                 'reset_advertising_intervals': reset_advertising_intervals,
   1797                 'advertising_disabled': advertising_disabled,
   1798         }
   1799         return all(self.results.values())
   1800 
   1801 
   1802     # -------------------------------------------------------------------
   1803     # Bluetooth mouse related tests
   1804     # -------------------------------------------------------------------
   1805 
   1806 
   1807     def _record_input_events(self, device, gesture):
   1808         """Record the input events.
   1809 
   1810         @param device: the bluetooth HID device.
   1811         @param gesture: the gesture method to perform.
   1812 
   1813         @returns: the input events received on the DUT.
   1814 
   1815         """
   1816         self.input_facade.initialize_input_recorder(device.name)
   1817         self.input_facade.start_input_recorder()
   1818         time.sleep(self.HID_REPORT_SLEEP_SECS)
   1819         gesture()
   1820         time.sleep(self.HID_REPORT_SLEEP_SECS)
   1821         self.input_facade.stop_input_recorder()
   1822         time.sleep(self.HID_REPORT_SLEEP_SECS)
   1823         event_values = self.input_facade.get_input_events()
   1824         events = [Event(*ev) for ev in event_values]
   1825         return events
   1826 
   1827 
   1828     def _test_mouse_click(self, device, button):
   1829         """Test that the mouse click events could be received correctly.
   1830 
   1831         @param device: the meta device containing a bluetooth HID device
   1832         @param button: which button to test, 'LEFT' or 'RIGHT'
   1833 
   1834         @returns: True if the report received by the host matches the
   1835                   expected one. False otherwise.
   1836 
   1837         """
   1838         if button == 'LEFT':
   1839             gesture = device.LeftClick
   1840         elif button == 'RIGHT':
   1841             gesture = device.RightClick
   1842         else:
   1843             raise error.TestError('Button (%s) is not valid.' % button)
   1844 
   1845         actual_events = self._record_input_events(device, gesture)
   1846 
   1847         linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT}
   1848         expected_events = [
   1849                 # Button down
   1850                 recorder.MSC_SCAN_BTN_EVENT[button],
   1851                 Event(EV_KEY, linux_input_button[button], 1),
   1852                 recorder.SYN_EVENT,
   1853                 # Button up
   1854                 recorder.MSC_SCAN_BTN_EVENT[button],
   1855                 Event(EV_KEY, linux_input_button[button], 0),
   1856                 recorder.SYN_EVENT]
   1857 
   1858         self.results = {
   1859                 'actual_events': map(str, actual_events),
   1860                 'expected_events': map(str, expected_events)}
   1861         return actual_events == expected_events
   1862 
   1863 
   1864     @_test_retry_and_log
   1865     def test_mouse_left_click(self, device):
   1866         """Test that the mouse left click events could be received correctly.
   1867 
   1868         @param device: the meta device containing a bluetooth HID device
   1869 
   1870         @returns: True if the report received by the host matches the
   1871                   expected one. False otherwise.
   1872 
   1873         """
   1874         return self._test_mouse_click(device, 'LEFT')
   1875 
   1876 
   1877     @_test_retry_and_log
   1878     def test_mouse_right_click(self, device):
   1879         """Test that the mouse right click events could be received correctly.
   1880 
   1881         @param device: the meta device containing a bluetooth HID device
   1882 
   1883         @returns: True if the report received by the host matches the
   1884                   expected one. False otherwise.
   1885 
   1886         """
   1887         return self._test_mouse_click(device, 'RIGHT')
   1888 
   1889 
   1890     def _test_mouse_move(self, device, delta_x=0, delta_y=0):
   1891         """Test that the mouse move events could be received correctly.
   1892 
   1893         @param device: the meta device containing a bluetooth HID device
   1894         @param delta_x: the distance to move cursor in x axis
   1895         @param delta_y: the distance to move cursor in y axis
   1896 
   1897         @returns: True if the report received by the host matches the
   1898                   expected one. False otherwise.
   1899 
   1900         """
   1901         gesture = lambda: device.Move(delta_x, delta_y)
   1902         actual_events = self._record_input_events(device, gesture)
   1903 
   1904         events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else []
   1905         events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else []
   1906         expected_events = events_x + events_y + [recorder.SYN_EVENT]
   1907 
   1908         self.results = {
   1909                 'actual_events': map(str, actual_events),
   1910                 'expected_events': map(str, expected_events)}
   1911         return actual_events == expected_events
   1912 
   1913 
   1914     @_test_retry_and_log
   1915     def test_mouse_move_in_x(self, device, delta_x):
   1916         """Test that the mouse move events in x could be received correctly.
   1917 
   1918         @param device: the meta device containing a bluetooth HID device
   1919         @param delta_x: the distance to move cursor in x axis
   1920 
   1921         @returns: True if the report received by the host matches the
   1922                   expected one. False otherwise.
   1923 
   1924         """
   1925         return self._test_mouse_move(device, delta_x=delta_x)
   1926 
   1927 
   1928     @_test_retry_and_log
   1929     def test_mouse_move_in_y(self, device, delta_y):
   1930         """Test that the mouse move events in y could be received correctly.
   1931 
   1932         @param device: the meta device containing a bluetooth HID device
   1933         @param delta_y: the distance to move cursor in y axis
   1934 
   1935         @returns: True if the report received by the host matches the
   1936                   expected one. False otherwise.
   1937 
   1938         """
   1939         return self._test_mouse_move(device, delta_y=delta_y)
   1940 
   1941 
   1942     @_test_retry_and_log
   1943     def test_mouse_move_in_xy(self, device, delta_x, delta_y):
   1944         """Test that the mouse move events could be received correctly.
   1945 
   1946         @param device: the meta device containing a bluetooth HID device
   1947         @param delta_x: the distance to move cursor in x axis
   1948         @param delta_y: the distance to move cursor in y axis
   1949 
   1950         @returns: True if the report received by the host matches the
   1951                   expected one. False otherwise.
   1952 
   1953         """
   1954         return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y)
   1955 
   1956 
   1957     def _test_mouse_scroll(self, device, units):
   1958         """Test that the mouse wheel events could be received correctly.
   1959 
   1960         @param device: the meta device containing a bluetooth HID device
   1961         @param units: the units to scroll in y axis
   1962 
   1963         @returns: True if the report received by the host matches the
   1964                   expected one. False otherwise.
   1965 
   1966         """
   1967         gesture = lambda: device.Scroll(units)
   1968         actual_events = self._record_input_events(device, gesture)
   1969         expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT]
   1970         self.results = {
   1971                 'actual_events': map(str, actual_events),
   1972                 'expected_events': map(str, expected_events)}
   1973         return actual_events == expected_events
   1974 
   1975 
   1976     @_test_retry_and_log
   1977     def test_mouse_scroll_down(self, device, delta_y):
   1978         """Test that the mouse wheel events could be received correctly.
   1979 
   1980         @param device: the meta device containing a bluetooth HID device
   1981         @param delta_y: the units to scroll down in y axis;
   1982                         should be a postive value
   1983 
   1984         @returns: True if the report received by the host matches the
   1985                   expected one. False otherwise.
   1986 
   1987         """
   1988         if delta_y > 0:
   1989             return self._test_mouse_scroll(device, delta_y)
   1990         else:
   1991             raise error.TestError('delta_y (%d) should be a positive value',
   1992                                   delta_y)
   1993 
   1994 
   1995     @_test_retry_and_log
   1996     def test_mouse_scroll_up(self, device, delta_y):
   1997         """Test that the mouse wheel events could be received correctly.
   1998 
   1999         @param device: the meta device containing a bluetooth HID device
   2000         @param delta_y: the units to scroll up in y axis;
   2001                         should be a postive value
   2002 
   2003         @returns: True if the report received by the host matches the
   2004                   expected one. False otherwise.
   2005 
   2006         """
   2007         if delta_y > 0:
   2008             return self._test_mouse_scroll(device, -delta_y)
   2009         else:
   2010             raise error.TestError('delta_y (%d) should be a positive value',
   2011                                   delta_y)
   2012 
   2013 
   2014     @_test_retry_and_log
   2015     def test_mouse_click_and_drag(self, device, delta_x, delta_y):
   2016         """Test that the mouse click-and-drag events could be received
   2017         correctly.
   2018 
   2019         @param device: the meta device containing a bluetooth HID device
   2020         @param delta_x: the distance to drag in x axis
   2021         @param delta_y: the distance to drag in y axis
   2022 
   2023         @returns: True if the report received by the host matches the
   2024                   expected one. False otherwise.
   2025 
   2026         """
   2027         gesture = lambda: device.ClickAndDrag(delta_x, delta_y)
   2028         actual_events = self._record_input_events(device, gesture)
   2029 
   2030         button = 'LEFT'
   2031         expected_events = (
   2032                 [# Button down
   2033                  recorder.MSC_SCAN_BTN_EVENT[button],
   2034                  Event(EV_KEY, BTN_LEFT, 1),
   2035                  recorder.SYN_EVENT] +
   2036                 # cursor movement in x and y
   2037                 ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) +
   2038                 ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) +
   2039                 [recorder.SYN_EVENT] +
   2040                 # Button up
   2041                 [recorder.MSC_SCAN_BTN_EVENT[button],
   2042                  Event(EV_KEY, BTN_LEFT, 0),
   2043                  recorder.SYN_EVENT])
   2044 
   2045         self.results = {
   2046                 'actual_events': map(str, actual_events),
   2047                 'expected_events': map(str, expected_events)}
   2048         return actual_events == expected_events
   2049 
   2050 
   2051     # -------------------------------------------------------------------
   2052     # Autotest methods
   2053     # -------------------------------------------------------------------
   2054 
   2055 
   2056     def initialize(self):
   2057         """Initialize bluetooth adapter tests."""
   2058         # Run through every tests and collect failed tests in self.fails.
   2059         self.fails = []
   2060 
   2061         # If a test depends on multiple conditions, write the results of
   2062         # the conditions in self.results so that it is easy to know
   2063         # what conditions failed by looking at the log.
   2064         self.results = None
   2065 
   2066         # Some tests may instantiate a peripheral device for testing.
   2067         self.devices = dict()
   2068         for device_type in SUPPORTED_DEVICE_TYPES:
   2069             self.devices[device_type] = None
   2070 
   2071         # The count of registered advertisements.
   2072         self.count_advertisements = 0
   2073 
   2074 
   2075     def check_chameleon(self):
   2076         """Check the existence of chameleon_host.
   2077 
   2078         The chameleon_host is specified in --args as follows
   2079 
   2080         (cr) $ test_that --args "chameleon_host=$CHAMELEON_IP" "$DUT_IP" <test>
   2081 
   2082         """
   2083         if self.host.chameleon is None:
   2084             raise error.TestError('Have to specify chameleon_host IP.')
   2085 
   2086 
   2087     def run_once(self, *args, **kwargs):
   2088         """This method should be implemented by children classes.
   2089 
   2090         Typically, the run_once() method would look like:
   2091 
   2092         factory = remote_facade_factory.RemoteFacadeFactory(host)
   2093         self.bluetooth_facade = factory.create_bluetooth_hid_facade()
   2094 
   2095         self.test_bluetoothd_running()
   2096         # ...
   2097         # invoke more self.test_xxx() tests.
   2098         # ...
   2099 
   2100         if self.fails:
   2101             raise error.TestFail(self.fails)
   2102 
   2103         """
   2104         raise NotImplementedError
   2105 
   2106 
   2107     def cleanup(self):
   2108         """Clean up bluetooth adapter tests."""
   2109         # Close the device properly if a device is instantiated.
   2110         # Note: do not write something like the following statements
   2111         #           if self.devices[device_type]:
   2112         #       or
   2113         #           if bool(self.devices[device_type]):
   2114         #       Otherwise, it would try to invoke bluetooth_mouse.__nonzero__()
   2115         #       which just does not exist.
   2116         for device_type in SUPPORTED_DEVICE_TYPES:
   2117             if self.devices[device_type] is not None:
   2118                 self.devices[device_type].Close()
   2119