Home | History | Annotate | Download | only in networking
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """
      6 This module provides bindings for ModemManager1.
      7 
      8 """
      9 
     10 import dbus
     11 import dbus.mainloop.glib
     12 
     13 from autotest_lib.client.bin import utils
     14 from autotest_lib.client.cros.cellular import mm1_constants
     15 
     16 
     17 def _is_unknown_dbus_binding_exception(e):
     18     return (isinstance(e, dbus.exceptions.DBusException) and
     19             e.get_dbus_name() in [mm1_constants.DBUS_SERVICE_UNKNOWN,
     20                                   mm1_constants.DBUS_UNKNOWN_METHOD,
     21                                   mm1_constants.DBUS_UNKNOWN_OBJECT,
     22                                   mm1_constants.DBUS_UNKNOWN_INTERFACE])
     23 
     24 
     25 class ModemManager1ProxyError(Exception):
     26     """Exceptions raised by ModemManager1ProxyError and it's children."""
     27     pass
     28 
     29 
     30 class ModemManager1Proxy(object):
     31     """A wrapper around a DBus proxy for ModemManager1."""
     32 
     33     # Amount of time to wait between attempts to connect to ModemManager1.
     34     CONNECT_WAIT_INTERVAL_SECONDS = 0.2
     35 
     36     @classmethod
     37     def get_proxy(cls, bus=None, timeout_seconds=10):
     38         """Connect to ModemManager1 over DBus, retrying if necessary.
     39 
     40         After connecting to ModemManager1, this method will verify that
     41         ModemManager1 is answering RPCs.
     42 
     43         @param bus: D-Bus bus to use, or specify None and this object will
     44             create a mainloop and bus.
     45         @param timeout_seconds: float number of seconds to try connecting
     46             A value <= 0 will cause the method to return immediately,
     47             without trying to connect.
     48         @return a ModemManager1Proxy instance if we connected, or None
     49             otherwise.
     50         @raise ModemManager1ProxyError if it fails to connect to
     51             ModemManager1.
     52 
     53         """
     54         def _connect_to_mm1(bus):
     55             try:
     56                 # We create instance of class on which this classmethod was
     57                 # called. This way, calling
     58                 # SubclassOfModemManager1Proxy.get_proxy() will get a proxy of
     59                 # the right type.
     60                 return cls(bus=bus)
     61             except dbus.exceptions.DBusException as e:
     62                 if _is_unknown_dbus_binding_exception(e):
     63                     return None
     64                 raise ModemManager1ProxyError(
     65                         'Error connecting to ModemManager1. DBus error: |%s|',
     66                         repr(e))
     67 
     68         utils.poll_for_condition(
     69             lambda: _connect_to_mm1(bus) is not None,
     70             exception=ModemManager1ProxyError(
     71                     'Timed out connecting to ModemManager1'),
     72             timeout=timeout_seconds,
     73             sleep_interval=ModemManager1Proxy.CONNECT_WAIT_INTERVAL_SECONDS)
     74         connection = _connect_to_mm1(bus)
     75 
     76         # Check to make sure ModemManager1 is responding to DBus requests by
     77         # setting the logging to debug.
     78         connection.manager.SetLogging('DEBUG', timeout=timeout_seconds)
     79 
     80         return connection
     81 
     82 
     83     def __init__(self, bus=None):
     84         if bus is None:
     85             dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
     86             bus = dbus.SystemBus()
     87         self._bus = bus
     88         self._manager = dbus.Interface(
     89                 self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
     90                                      mm1_constants.MM1),
     91                 mm1_constants.I_MODEM_MANAGER)
     92 
     93 
     94     @property
     95     def manager(self):
     96         """@return the DBus ModemManager1 Manager object."""
     97         return self._manager
     98 
     99 
    100     def get_modem(self):
    101         """
    102         Return the one and only modem object.
    103 
    104         This method distinguishes between no modem and more than one modem.
    105         In the former, this could happen if the modem has not yet surfaced and
    106         is not really considered an error. The caller can wait for the modem
    107         by repeatedly calling this method. In the latter, it is a clear error
    108         condition and an exception will be raised.
    109 
    110         Every call to |get_modem| obtains a fresh DBus proxy for the modem. So,
    111         if the modem DBus object has changed between two calls to this method,
    112         the proxy returned will be for the currently exported modem.
    113 
    114         @return a ModemProxy object.  Return None if no modem is found.
    115         @raise ModemManager1ProxyError unless exactly one modem is found.
    116 
    117         """
    118         try:
    119             object_manager = dbus.Interface(
    120                 self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
    121                                      mm1_constants.MM1),
    122                 mm1_constants.I_OBJECT_MANAGER)
    123             modems = object_manager.GetManagedObjects()
    124         except dbus.exceptions.DBusException as e:
    125             raise ModemManager1ProxyError(
    126                     'Failed to list the available modems. DBus error: |%s|',
    127                     repr(e))
    128 
    129         if not modems:
    130             return None
    131         elif len(modems) > 1:
    132             raise ModemManager1ProxyError(
    133                     'Expected one modem object, found %d', len(modems))
    134 
    135         modem_proxy = ModemProxy(self._bus, modems.keys()[0])
    136         # Check that this object is valid
    137         try:
    138             modem_proxy.modem.GetAll(mm1_constants.I_MODEM,
    139                                      dbus_interface=mm1_constants.I_PROPERTIES)
    140             return modem_proxy
    141         except dbus.exceptions.DBusException as e:
    142             if _is_unknown_dbus_binding_exception(e):
    143                 return None
    144             raise ModemManager1ProxyError(
    145                     'Failed to obtain dbus object for the modem. DBus error: '
    146                     '|%s|', repr(e))
    147 
    148 
    149     def wait_for_modem(self, timeout_seconds):
    150         """
    151         Wait for the modem to appear.
    152 
    153         @param timeout_seconds: Number of seconds to wait for modem to appear.
    154         @return a ModemProxy object.
    155         @raise ModemManager1ProxyError if no modem is found within the timeout
    156                 or if more than one modem is found. NOTE: This method does not
    157                 wait for a second modem. The exception is raised if there is
    158                 more than one modem at the time of polling.
    159 
    160         """
    161         return utils.poll_for_condition(
    162                 self.get_modem,
    163                 exception=ModemManager1ProxyError('No modem found'),
    164                 timeout=timeout_seconds)
    165 
    166 
    167 class ModemProxy(object):
    168     """A wrapper around a DBus proxy for ModemManager1 modem object."""
    169 
    170     # Amount of time to wait for a state transition.
    171     STATE_TRANSITION_WAIT_SECONDS = 10
    172 
    173     def __init__(self, bus, path):
    174         self._bus = bus
    175         self._modem = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
    176 
    177 
    178     @property
    179     def modem(self):
    180         """@return the DBus modem object."""
    181         return self._modem
    182 
    183 
    184     @property
    185     def iface_modem(self):
    186         """@return org.freedesktop.ModemManager1.Modem DBus interface."""
    187         return dbus.Interface(self._modem, mm1_constants.I_MODEM)
    188 
    189 
    190     @property
    191     def iface_simple_modem(self):
    192         """@return org.freedesktop.ModemManager1.Simple DBus interface."""
    193         return dbus.Interface(self._modem, mm1_constants.I_MODEM_SIMPLE)
    194 
    195 
    196     @property
    197     def iface_gsm_modem(self):
    198         """@return org.freedesktop.ModemManager1.Modem3gpp DBus interface."""
    199         return dbus.Interface(self._modem, mm1_constants.I_MODEM_3GPP)
    200 
    201 
    202     @property
    203     def iface_cdma_modem(self):
    204         """@return org.freedesktop.ModemManager1.ModemCdma DBus interface."""
    205         return dbus.Interface(self._modem, mm1_constants.I_MODEM_CDMA)
    206 
    207 
    208     @property
    209     def iface_properties(self):
    210         """@return org.freedesktop.DBus.Properties DBus interface."""
    211         return dbus.Interface(self._modem, dbus.PROPERTIES_IFACE)
    212 
    213 
    214     def properties(self, iface):
    215         """Return the properties associated with the specified interface.
    216 
    217         @param iface: Name of interface to retrieve the properties from.
    218         @return array of properties.
    219 
    220         """
    221         return self.iface_properties.GetAll(iface)
    222 
    223 
    224     def get_sim(self):
    225         """
    226         Return the SIM proxy object associated with this modem.
    227 
    228         @return SimProxy object or None if no SIM exists.
    229 
    230         """
    231         sim_path = self.properties(mm1_constants.I_MODEM).get('Sim')
    232         if not sim_path:
    233             return None
    234         sim_proxy = SimProxy(self._bus, sim_path)
    235         # Check that this object is valid
    236         try:
    237             sim_proxy.properties(mm1_constants.I_SIM)
    238             return sim_proxy
    239         except dbus.exceptions.DBusException as e:
    240             if _is_unknown_dbus_binding_exception(e):
    241                 return None
    242             raise ModemManager1ProxyError(
    243                     'Failed to obtain dbus object for the SIM. DBus error: '
    244                     '|%s|', repr(e))
    245 
    246 
    247     def wait_for_states(self, states,
    248                         timeout_seconds=STATE_TRANSITION_WAIT_SECONDS):
    249         """
    250         Wait for the modem to transition to a state in |states|.
    251 
    252         This method does not support transitory states (eg. enabling,
    253         disabling, connecting, disconnecting, etc).
    254 
    255         @param states: List of states the modem can transition to.
    256         @param timeout_seconds: Max number of seconds to wait.
    257         @raise ModemManager1ProxyError if the modem does not transition to
    258             one of the accepted states.
    259 
    260         """
    261         for state in states:
    262             if state in [mm1_constants.MM_MODEM_STATE_INITIALIZING,
    263                          mm1_constants.MM_MODEM_STATE_DISABLING,
    264                          mm1_constants.MM_MODEM_STATE_ENABLING,
    265                          mm1_constants.MM_MODEM_STATE_SEARCHING,
    266                          mm1_constants.MM_MODEM_STATE_DISCONNECTING,
    267                          mm1_constants.MM_MODEM_STATE_CONNECTING]:
    268                 raise ModemManager1ProxyError(
    269                         'wait_for_states() does not support transitory states.')
    270 
    271         utils.poll_for_condition(
    272                 lambda: self.properties(mm1_constants.I_MODEM)[
    273                         mm1_constants.MM_MODEM_PROPERTY_NAME_STATE] in states,
    274                 exception=ModemManager1ProxyError(
    275                         'Timed out waiting for modem to enter one of these '
    276                         'states: %s, current state=%s',
    277                         states,
    278                         self.properties(mm1_constants.I_MODEM)[
    279                                 mm1_constants.MM_MODEM_PROPERTY_NAME_STATE]),
    280                 timeout=timeout_seconds)
    281 
    282 
    283 class SimProxy(object):
    284     """A wrapper around a DBus proxy for ModemManager1 SIM object."""
    285 
    286     def __init__(self, bus, path):
    287         self._bus = bus
    288         self._sim = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
    289 
    290 
    291     @property
    292     def sim(self):
    293         """@return the DBus SIM object."""
    294         return self._sim
    295 
    296 
    297     @property
    298     def iface_properties(self):
    299         """@return org.freedesktop.DBus.Properties DBus interface."""
    300         return dbus.Interface(self._sim, dbus.PROPERTIES_IFACE)
    301 
    302 
    303     @property
    304     def iface_sim(self):
    305         """@return org.freedesktop.ModemManager1.Sim DBus interface."""
    306         return dbus.Interface(self._sim, mm1_constants.I_SIM)
    307 
    308 
    309     def properties(self, iface=mm1_constants.I_SIM):
    310         """Return the properties associated with the specified interface.
    311 
    312         @param iface: Name of interface to retrieve the properties from.
    313         @return array of properties.
    314 
    315         """
    316         return self.iface_properties.GetAll(iface)
    317