Home | History | Annotate | Download | only in peerd_MonitorsDBusConnections
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus
      6 import time
      7 
      8 from autotest_lib.client.bin import test
      9 from autotest_lib.client.common_lib import error
     10 from autotest_lib.client.common_lib.cros.tendo import peerd_config
     11 from autotest_lib.client.cros import dbus_util
     12 
     13 
     14 SERVICE_ID = 'test-service'
     15 
     16 class peerd_MonitorsDBusConnections(test.test):
     17     """Test that peerd removes services when processes disconnect from DBus."""
     18 
     19     version = 1
     20 
     21 
     22     def _check_has_test_service(self, expect_service=True):
     23         services = dbus_util.get_objects_with_interface(
     24                 peerd_config.SERVICE_NAME,
     25                 peerd_config.OBJECT_MANAGER_PATH,
     26                 peerd_config.DBUS_INTERFACE_SERVICE,
     27                 path_prefix=peerd_config.DBUS_PATH_SELF,
     28                 bus=self._bus)
     29         found_service = False
     30         # services is a map of object path to dicts of DBus interface to
     31         # properties exposed by that interface.
     32         for path, interfaces in services.iteritems():
     33             for interface, properties in interfaces.iteritems():
     34                 if interface != peerd_config.DBUS_INTERFACE_SERVICE:
     35                     continue
     36                 if (properties[peerd_config.SERVICE_PROPERTY_SERVICE_ID]
     37                         != SERVICE_ID):
     38                     continue
     39                 if found_service:
     40                     raise error.TestFail('Found multiple test service '
     41                                          'instances?')
     42                 found_service = True
     43 
     44         if expect_service != found_service:
     45             raise error.TestFail('Expected to see test service, but did not.')
     46 
     47 
     48     def run_once(self):
     49         self._bus = dbus.SystemBus()
     50         config = peerd_config.PeerdConfig(verbosity_level=5)
     51         config.restart_with_config()
     52         self._check_has_test_service(expect_service=False)
     53         self._manager = dbus.Interface(
     54                 self._bus.get_object(peerd_config.SERVICE_NAME,
     55                                      peerd_config.DBUS_PATH_MANAGER),
     56                 peerd_config.DBUS_INTERFACE_MANAGER)
     57         self._manager.ExposeService(SERVICE_ID,
     58                                     dbus.Dictionary(signature='ss'),
     59                                     dbus.Dictionary(signature='sv'))
     60         # Python keeps the DBus connection sitting around unless we
     61         # explicitly close it.  The service should still be there.
     62         time.sleep(1)  # Peerd might take some time to publish the service.
     63         self._check_has_test_service()
     64         # Close our previous connection, open a new one.
     65         self._bus.close()
     66         self._bus = dbus.SystemBus()
     67         time.sleep(1)  # Peerd might take some time to remove the service.
     68         self._check_has_test_service(expect_service=False)
     69