Home | History | Annotate | Download | only in bluetooth
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import json, logging, os, pwd, shutil, subprocess, time
      6 
      7 import dbus
      8 
      9 from autotest_lib.client.bin import utils
     10 from autotest_lib.client.common_lib import error
     11 from autotest_lib.client.cros import semiauto_framework
     12 from autotest_lib.client.cros import sys_power
     13 
     14 _USER_TIMEOUT_TIME = 321  # Seconds a tester has to respond to prompts
     15 _DEVICE_TIMEOUT_TIME = 321  # Seconds a tester has to pair or connect device
     16 _ADAPTER_INTERFACE = 'org.bluez.Adapter1' # Name of adapter in DBus interface
     17 _DEVICE_INTERFACE = 'org.bluez.Device1' # Name of a device in DBus interface
     18 _TIME_FORMAT = '%d %b %Y %H:%M:%S' # Human-readable time format for logs
     19 _SECTION_BREAK = '='*75
     20 
     21 
     22 class BluetoothSemiAutoHelper(semiauto_framework.semiauto_test):
     23     """Generic Bluetooth SemiAutoTest.
     24 
     25     Contains functions needed to implement an actual Bluetooth SemiAutoTest,
     26     such as accessing the state of Bluetooth adapter/devices via dbus,
     27     opening dialogs with tester via Telemetry browser, and getting log data.
     28     """
     29     version = 1
     30 
     31     # Boards without Bluetooth support.
     32     _INVALID_BOARDS = ['x86-alex', 'x86-alex_he', 'lumpy']
     33 
     34     def _err(self, message):
     35         """Raise error after first collecting more information.
     36 
     37         @param message: error message to raise and add to logs.
     38 
     39         """
     40         self.collect_logs('ERROR HAS OCCURED: %s' % message)
     41         raise error.TestError(message)
     42 
     43     def supports_bluetooth(self):
     44         """Return True if this device has Bluetooth capabilities; else False."""
     45         device = utils.get_board()
     46         if device in self._INVALID_BOARDS:
     47             logging.info('%s does not have Bluetooth.', device)
     48             return False
     49         return True
     50 
     51     def _get_objects(self):
     52         """Return the managed objects for this chromebook."""
     53         manager = dbus.Interface(
     54                 self._bus.get_object('org.bluez', '/'),
     55                 dbus_interface='org.freedesktop.DBus.ObjectManager')
     56         return manager.GetManagedObjects()
     57 
     58     def _get_adapter_info(self):
     59         """Return the adapter interface objects, or None if not found."""
     60         objects = self._get_objects()
     61         for path, interfaces in objects.items():
     62             if _ADAPTER_INTERFACE in interfaces:
     63                 self._adapter_path = path
     64                 return interfaces[_ADAPTER_INTERFACE]
     65         return None
     66 
     67     def _get_device_info(self, addr):
     68         """Return the device interface objects, or None if not found."""
     69         objects = self._get_objects()
     70         for _, interfaces in objects.items():
     71             if _DEVICE_INTERFACE in interfaces:
     72                 if interfaces[_DEVICE_INTERFACE]['Address'] == addr:
     73                     return interfaces[_DEVICE_INTERFACE]
     74         return None
     75 
     76     def _verify_adapter_power(self, adapter_power_status):
     77         """Return True/False if adapter power status matches given value."""
     78         info = self._get_adapter_info()
     79         if not info:
     80             self._err('No adapter found!')
     81         return True if info['Powered'] == adapter_power_status else False
     82 
     83     def _verify_device_connection(self, addr, paired_status=True,
     84                                   connected_status=True):
     85         """Return True/False if device statuses match given values."""
     86         def _check_info():
     87             info = self._get_device_info(addr)
     88             if info:
     89                 if (info['Paired'] != paired_status or
     90                     info['Connected'] != connected_status):
     91                     return False
     92                 return True
     93             # Return True if no entry was found for an unpaired device
     94             return not paired_status and not connected_status
     95 
     96         results = _check_info()
     97 
     98         # To avoid spotting brief connections, sleep and check again.
     99         if results:
    100             time.sleep(0.5)
    101             results = _check_info()
    102         return results
    103 
    104     def set_adapter_power(self, adapter_power_status):
    105         """Set adapter power status to match given value via dbus call.
    106 
    107         Block until the power is set.
    108 
    109         @param adapter_power_status: True to turn adapter on; False for off.
    110 
    111         """
    112         info = self._get_adapter_info()
    113         if not info:
    114             self._err('No adapter found!')
    115         properties = dbus.Interface(
    116                 self._bus.get_object('org.bluez', self._adapter_path),
    117                 dbus_interface='org.freedesktop.DBus.Properties')
    118         properties.Set(_ADAPTER_INTERFACE, 'Powered', adapter_power_status)
    119 
    120         self.poll_adapter_power(adapter_power_status)
    121 
    122     def poll_adapter_presence(self):
    123         """Raise error if adapter is not found after some time."""
    124         complete = lambda: self._get_adapter_info() is not None
    125         try:
    126             utils.poll_for_condition(
    127                     condition=complete, timeout=15, sleep_interval=1)
    128         except utils.TimeoutError:
    129             self._err('No adapter found after polling!')
    130 
    131     def poll_adapter_power(self, adapter_power_status=True):
    132         """Wait until adapter power status matches given value.
    133 
    134         @param adapter_power_status: True for adapter is on; False for off.
    135 
    136         """
    137         complete = lambda: self._verify_adapter_power(
    138                 adapter_power_status=adapter_power_status)
    139         adapter_str = 'ON' if adapter_power_status else 'OFF'
    140         utils.poll_for_condition(
    141                 condition=complete, timeout=_DEVICE_TIMEOUT_TIME,
    142                 sleep_interval=1,
    143                 desc=('Timeout for Bluetooth Adapter to be %s' % adapter_str))
    144 
    145     def _poll_connection(self, addr, paired_status, connected_status):
    146         """Wait until device statuses match given values."""
    147         paired_str = 'PAIRED' if paired_status else 'NOT PAIRED'
    148         conn_str = 'CONNECTED' if connected_status else 'NOT CONNECTED'
    149         message = 'Waiting for device %s to be %s and %s' % (addr, paired_str,
    150                                                              conn_str)
    151         logging.info(message)
    152 
    153         complete = lambda: self._verify_device_connection(
    154                 addr, paired_status=paired_status,
    155                 connected_status=connected_status)
    156         utils.poll_for_condition(
    157                 condition=complete, timeout=_DEVICE_TIMEOUT_TIME,
    158                 sleep_interval=1, desc=('Timeout while %s' % message))
    159 
    160     def poll_connections(self, paired_status=True, connected_status=True):
    161         """Wait until all Bluetooth devices have the given statues.
    162 
    163         @param paired_status: True for device paired; False for unpaired.
    164         @param connected_status: True for device connected; False for not.
    165 
    166         """
    167         for addr in self._addrs:
    168             self._poll_connection(addr, paired_status=paired_status,
    169                                   connected_status=connected_status)
    170 
    171     def login_and_open_browser(self):
    172         """Log in to machine, open browser, and navigate to dialog template.
    173 
    174         Assumes the existence of 'client/cros/audio/music.mp3' file, and will
    175         fail if not found.
    176 
    177         """
    178         # Open browser and interactive tab
    179         self.login_and_open_interactive_tab()
    180 
    181         # Find mounted home directory
    182         user_home = None
    183         for udir in os.listdir(os.path.join('/', 'home', 'user')):
    184             d = os.path.join('/', 'home', 'user', udir)
    185             if os.path.ismount(d):
    186                 user_home = d
    187         if user_home is None:
    188             raise error.TestError('Could not find mounted home directory')
    189 
    190         # Setup Audio File
    191         audio_dir = os.path.join(self.bindir, '..', '..', 'cros', 'audio')
    192         loop_file = os.path.join(audio_dir, 'loop.html')
    193         music_file = os.path.join(audio_dir, 'music.mp3')
    194         dl_dir = os.path.join(user_home, 'Downloads')
    195         self._added_loop_file = os.path.join(dl_dir, 'loop.html')
    196         self._added_music_file = os.path.join(dl_dir, 'music.mp3')
    197         shutil.copyfile(loop_file, self._added_loop_file)
    198         shutil.copyfile(music_file, self._added_music_file)
    199         uid = pwd.getpwnam('chronos').pw_uid
    200         gid = pwd.getpwnam('chronos').pw_gid
    201         os.chmod(self._added_loop_file, 0755)
    202         os.chmod(self._added_music_file, 0755)
    203         os.chown(self._added_loop_file, uid, gid)
    204         os.chown(self._added_music_file, uid, gid)
    205 
    206         # Open Test Dialog tab, Settings tab, and Audio file
    207         self._settings_tab = self._browser.tabs.New()
    208         self._settings_tab.Navigate('chrome://settings/search#Bluetooth')
    209         music_tab = self._browser.tabs.New()
    210         music_tab.Navigate('file:///home/chronos/user/Downloads/loop.html')
    211 
    212     def ask_user(self, message):
    213         """Ask the user a yes or no question in an open tab.
    214 
    215         Reset dialog page to be a question (message param) with 'PASS' and
    216         'FAIL' buttons.  Wait for answer.  If no, ask for more information.
    217 
    218         @param message: string sent to the user via browswer interaction.
    219 
    220         """
    221         logging.info('Asking user "%s"', message)
    222         sandbox = 'SANDBOX:<input type="text"/>'
    223         html = '<h3>%s</h3>%s' % (message, sandbox)
    224         self.set_tab_with_buttons(html, buttons=['PASS', 'FAIL'])
    225 
    226         # Intepret results.
    227         result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME)
    228         if result == 1:
    229             # Ask for more information on error.
    230             html='<h3>Please provide more info:</h3>'
    231             self.set_tab_with_textbox(html)
    232 
    233             # Get explanation of error, clear output, and raise error.
    234             result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME)
    235             self.clear_output()
    236             self._err('Testing %s. "%s".' % (self._test_type, result))
    237         elif result != 0:
    238             raise error.TestError('Bad dialog value: %s' % result)
    239         logging.info('Answer was PASS')
    240 
    241         # Clear user screen.
    242         self.clear_output()
    243 
    244     def tell_user(self, message):
    245         """Tell the user the given message in an open tab.
    246 
    247         @param message: the text string to be displayed.
    248 
    249         """
    250         logging.info('Telling user "%s"', message)
    251         html = '<h3>%s</h3>' % message
    252         self.set_tab(html)
    253 
    254     def check_working(self, message=None):
    255         """Steps to check that all devices are functioning.
    256 
    257         Ask user to connect all devices, verify connections, and ask for
    258         user input if they are working.
    259 
    260         @param message: string of text the user is asked.  Defaults to asking
    261                         the user to connect all devices.
    262 
    263         """
    264         if not message:
    265             message = ('Please connect all devices.<br>(You may need to '
    266                        'click mice, press keyboard keys, or use the '
    267                        'Connect button in Settings.)')
    268         self.tell_user(message)
    269         self.poll_adapter_power(True)
    270         self.poll_connections(paired_status=True, connected_status=True)
    271         self.ask_user('Are all Bluetooth devices working?<br>'
    272                        'Is audio playing only through Bluetooth devices?<br>'
    273                        'Do onboard keyboard and trackpad work?')
    274 
    275     def ask_not_working(self):
    276         """Ask the user pre-defined message about NOT working."""
    277         self.ask_user('No Bluetooth devices work.<br>Audio is NOT playing '
    278                       'through onboard speakers or wired headphones.')
    279 
    280     def start_dump(self, message=''):
    281         """Run btmon in subprocess.
    282 
    283         Kill previous btmon (if needed) and start new one using current
    284         test type as base filename.  Dumps stored in results folder.
    285 
    286         @param message: string of text added to top of log entry.
    287 
    288         """
    289         if hasattr(self, '_dump') and self._dump:
    290             self._dump.kill()
    291         if not hasattr(self, '_test_type'):
    292             self._test_type = 'test'
    293         logging.info('Starting btmon')
    294         filename = '%s_btmon' % self._test_type
    295         path = os.path.join(self.resultsdir, filename)
    296         with open(path, 'a') as f:
    297             f.write('%s\n' % _SECTION_BREAK)
    298             f.write('%s: Starting btmon\n' % time.strftime(_TIME_FORMAT))
    299             f.write('%s\n' % message)
    300             f.flush()
    301             btmon_path = '/usr/bin/btmon'
    302             try:
    303                 self._dump = subprocess.Popen([btmon_path], stdout=f,
    304                                               stderr=subprocess.PIPE)
    305             except Exception as e:
    306                 raise error.TestError('btmon: %s' % e)
    307 
    308     def collect_logs(self, message=''):
    309         """Store results of dbus GetManagedObjects and hciconfig.
    310 
    311         Use current test type as base filename.  Stored in results folder.
    312 
    313         @param message: string of text added to top of log entry.
    314 
    315         """
    316         logging.info('Collecting dbus info')
    317         if not hasattr(self, '_test_type'):
    318             self._test_type = 'test'
    319         filename = '%s_dbus' % self._test_type
    320         path = os.path.join(self.resultsdir, filename)
    321         with open(path, 'a') as f:
    322             f.write('%s\n' % _SECTION_BREAK)
    323             f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message))
    324             f.write(json.dumps(self._get_objects().items(), indent=2))
    325             f.write('\n')
    326 
    327         logging.info('Collecting hciconfig info')
    328         filename = '%s_hciconfig' % self._test_type
    329         path = os.path.join(self.resultsdir, filename)
    330         with open(path, 'a') as f:
    331             f.write('%s\n' % _SECTION_BREAK)
    332             f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message))
    333             f.flush()
    334             hciconfig_path = '/usr/bin/hciconfig'
    335             try:
    336                 subprocess.check_call([hciconfig_path, '-a'], stdout=f)
    337             except Exception as e:
    338                 raise error.TestError('hciconfig: %s' % e)
    339 
    340     def os_idle_time_set(self, reset=False):
    341         """Function to set short idle time or to reset to normal.
    342 
    343         Not using sys_power so that user can use Bluetooth to wake machine.
    344 
    345         @param reset: true to reset to normal idle time, false for short.
    346 
    347         """
    348         powerd_path = '/usr/bin/set_short_powerd_timeouts'
    349         flag = '--reset' if reset else ''
    350         try:
    351             subprocess.check_call([powerd_path, flag])
    352         except Exception as e:
    353             raise error.TestError('idle cmd: %s' % e)
    354 
    355     def os_suspend(self):
    356         """Function to suspend ChromeOS using sys_power."""
    357         sys_power.do_suspend(5)
    358 
    359         # Sleep
    360         time.sleep(5)
    361 
    362     def initialize(self):
    363         self._bus = dbus.SystemBus()
    364 
    365     def warmup(self, addrs='', test_phase='client', close_browser=True):
    366         """Warmup setting paramters for semi-automated Bluetooth Test.
    367 
    368         Actual test steps are implemened in run_once() function.
    369 
    370         @param: addrs: list of MAC address of Bluetooth devices under test.
    371         @param: test_phase: for use by server side tests to, for example, call
    372                             the same test before and after a reboot.
    373         @param: close_browser: True if client side test should close browser
    374                                at end of test.
    375 
    376         """
    377         self.login_and_open_browser()
    378 
    379         self._addrs = addrs
    380         self._test_type = 'start'
    381         self._test_phase = test_phase
    382         self._will_close_browser = close_browser
    383 
    384     def cleanup(self):
    385         """Cleanup of various files/processes opened during test.
    386 
    387         Closes running btmon, closes browser (if asked to at start), and
    388         deletes files added during test.
    389 
    390         """
    391         if hasattr(self, '_dump'):
    392             self._dump.kill()
    393         if hasattr(self, '_will_close_browser') and self._will_close_browser:
    394             self.close_browser()
    395         if (hasattr(self, '_added_loop_file')
    396                 and os.path.exists(self._added_loop_file)):
    397             os.remove(self._added_loop_file)
    398         if (hasattr(self, '_added_music_file')
    399                 and os.path.exists(self._added_music_file)):
    400             os.remove(self._added_music_file)
    401