1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """ 6 This module provides bindings for ModemManager1. 7 8 """ 9 10 import dbus 11 import dbus.mainloop.glib 12 13 from autotest_lib.client.bin import utils 14 from autotest_lib.client.cros.cellular import mm1_constants 15 16 17 def _is_unknown_dbus_binding_exception(e): 18 return (isinstance(e, dbus.exceptions.DBusException) and 19 e.get_dbus_name() in [mm1_constants.DBUS_SERVICE_UNKNOWN, 20 mm1_constants.DBUS_UNKNOWN_METHOD, 21 mm1_constants.DBUS_UNKNOWN_OBJECT, 22 mm1_constants.DBUS_UNKNOWN_INTERFACE]) 23 24 25 class ModemManager1ProxyError(Exception): 26 """Exceptions raised by ModemManager1ProxyError and it's children.""" 27 pass 28 29 30 class ModemManager1Proxy(object): 31 """A wrapper around a DBus proxy for ModemManager1.""" 32 33 # Amount of time to wait between attempts to connect to ModemManager1. 34 CONNECT_WAIT_INTERVAL_SECONDS = 0.2 35 36 @classmethod 37 def get_proxy(cls, bus=None, timeout_seconds=10): 38 """Connect to ModemManager1 over DBus, retrying if necessary. 39 40 After connecting to ModemManager1, this method will verify that 41 ModemManager1 is answering RPCs. 42 43 @param bus: D-Bus bus to use, or specify None and this object will 44 create a mainloop and bus. 45 @param timeout_seconds: float number of seconds to try connecting 46 A value <= 0 will cause the method to return immediately, 47 without trying to connect. 48 @return a ModemManager1Proxy instance if we connected, or None 49 otherwise. 50 @raise ModemManager1ProxyError if it fails to connect to 51 ModemManager1. 52 53 """ 54 def _connect_to_mm1(bus): 55 try: 56 # We create instance of class on which this classmethod was 57 # called. This way, calling 58 # SubclassOfModemManager1Proxy.get_proxy() will get a proxy of 59 # the right type. 60 return cls(bus=bus) 61 except dbus.exceptions.DBusException as e: 62 if _is_unknown_dbus_binding_exception(e): 63 return None 64 raise ModemManager1ProxyError( 65 'Error connecting to ModemManager1. DBus error: |%s|', 66 repr(e)) 67 68 utils.poll_for_condition( 69 lambda: _connect_to_mm1(bus) is not None, 70 exception=ModemManager1ProxyError( 71 'Timed out connecting to ModemManager1'), 72 timeout=timeout_seconds, 73 sleep_interval=ModemManager1Proxy.CONNECT_WAIT_INTERVAL_SECONDS) 74 connection = _connect_to_mm1(bus) 75 76 # Check to make sure ModemManager1 is responding to DBus requests by 77 # setting the logging to debug. 78 connection.manager.SetLogging('DEBUG', timeout=timeout_seconds) 79 80 return connection 81 82 83 def __init__(self, bus=None): 84 if bus is None: 85 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 86 bus = dbus.SystemBus() 87 self._bus = bus 88 self._manager = dbus.Interface( 89 self._bus.get_object(mm1_constants.I_MODEM_MANAGER, 90 mm1_constants.MM1), 91 mm1_constants.I_MODEM_MANAGER) 92 93 94 @property 95 def manager(self): 96 """@return the DBus ModemManager1 Manager object.""" 97 return self._manager 98 99 100 def get_modem(self): 101 """ 102 Return the one and only modem object. 103 104 This method distinguishes between no modem and more than one modem. 105 In the former, this could happen if the modem has not yet surfaced and 106 is not really considered an error. The caller can wait for the modem 107 by repeatedly calling this method. In the latter, it is a clear error 108 condition and an exception will be raised. 109 110 Every call to |get_modem| obtains a fresh DBus proxy for the modem. So, 111 if the modem DBus object has changed between two calls to this method, 112 the proxy returned will be for the currently exported modem. 113 114 @return a ModemProxy object. Return None if no modem is found. 115 @raise ModemManager1ProxyError unless exactly one modem is found. 116 117 """ 118 try: 119 object_manager = dbus.Interface( 120 self._bus.get_object(mm1_constants.I_MODEM_MANAGER, 121 mm1_constants.MM1), 122 mm1_constants.I_OBJECT_MANAGER) 123 modems = object_manager.GetManagedObjects() 124 except dbus.exceptions.DBusException as e: 125 raise ModemManager1ProxyError( 126 'Failed to list the available modems. DBus error: |%s|', 127 repr(e)) 128 129 if not modems: 130 return None 131 elif len(modems) > 1: 132 raise ModemManager1ProxyError( 133 'Expected one modem object, found %d', len(modems)) 134 135 modem_proxy = ModemProxy(self._bus, modems.keys()[0]) 136 # Check that this object is valid 137 try: 138 modem_proxy.modem.GetAll(mm1_constants.I_MODEM, 139 dbus_interface=mm1_constants.I_PROPERTIES) 140 return modem_proxy 141 except dbus.exceptions.DBusException as e: 142 if _is_unknown_dbus_binding_exception(e): 143 return None 144 raise ModemManager1ProxyError( 145 'Failed to obtain dbus object for the modem. DBus error: ' 146 '|%s|', repr(e)) 147 148 149 def wait_for_modem(self, timeout_seconds): 150 """ 151 Wait for the modem to appear. 152 153 @param timeout_seconds: Number of seconds to wait for modem to appear. 154 @return a ModemProxy object. 155 @raise ModemManager1ProxyError if no modem is found within the timeout 156 or if more than one modem is found. NOTE: This method does not 157 wait for a second modem. The exception is raised if there is 158 more than one modem at the time of polling. 159 160 """ 161 return utils.poll_for_condition( 162 self.get_modem, 163 exception=ModemManager1ProxyError('No modem found'), 164 timeout=timeout_seconds) 165 166 167 class ModemProxy(object): 168 """A wrapper around a DBus proxy for ModemManager1 modem object.""" 169 170 # Amount of time to wait for a state transition. 171 STATE_TRANSITION_WAIT_SECONDS = 10 172 173 def __init__(self, bus, path): 174 self._bus = bus 175 self._modem = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path) 176 177 178 @property 179 def modem(self): 180 """@return the DBus modem object.""" 181 return self._modem 182 183 184 @property 185 def iface_modem(self): 186 """@return org.freedesktop.ModemManager1.Modem DBus interface.""" 187 return dbus.Interface(self._modem, mm1_constants.I_MODEM) 188 189 190 @property 191 def iface_simple_modem(self): 192 """@return org.freedesktop.ModemManager1.Simple DBus interface.""" 193 return dbus.Interface(self._modem, mm1_constants.I_MODEM_SIMPLE) 194 195 196 @property 197 def iface_gsm_modem(self): 198 """@return org.freedesktop.ModemManager1.Modem3gpp DBus interface.""" 199 return dbus.Interface(self._modem, mm1_constants.I_MODEM_3GPP) 200 201 202 @property 203 def iface_cdma_modem(self): 204 """@return org.freedesktop.ModemManager1.ModemCdma DBus interface.""" 205 return dbus.Interface(self._modem, mm1_constants.I_MODEM_CDMA) 206 207 208 @property 209 def iface_properties(self): 210 """@return org.freedesktop.DBus.Properties DBus interface.""" 211 return dbus.Interface(self._modem, dbus.PROPERTIES_IFACE) 212 213 214 def properties(self, iface): 215 """Return the properties associated with the specified interface. 216 217 @param iface: Name of interface to retrieve the properties from. 218 @return array of properties. 219 220 """ 221 return self.iface_properties.GetAll(iface) 222 223 224 def get_sim(self): 225 """ 226 Return the SIM proxy object associated with this modem. 227 228 @return SimProxy object or None if no SIM exists. 229 230 """ 231 sim_path = self.properties(mm1_constants.I_MODEM).get('Sim') 232 if not sim_path: 233 return None 234 sim_proxy = SimProxy(self._bus, sim_path) 235 # Check that this object is valid 236 try: 237 sim_proxy.properties(mm1_constants.I_SIM) 238 return sim_proxy 239 except dbus.exceptions.DBusException as e: 240 if _is_unknown_dbus_binding_exception(e): 241 return None 242 raise ModemManager1ProxyError( 243 'Failed to obtain dbus object for the SIM. DBus error: ' 244 '|%s|', repr(e)) 245 246 247 def wait_for_states(self, states, 248 timeout_seconds=STATE_TRANSITION_WAIT_SECONDS): 249 """ 250 Wait for the modem to transition to a state in |states|. 251 252 This method does not support transitory states (eg. enabling, 253 disabling, connecting, disconnecting, etc). 254 255 @param states: List of states the modem can transition to. 256 @param timeout_seconds: Max number of seconds to wait. 257 @raise ModemManager1ProxyError if the modem does not transition to 258 one of the accepted states. 259 260 """ 261 for state in states: 262 if state in [mm1_constants.MM_MODEM_STATE_INITIALIZING, 263 mm1_constants.MM_MODEM_STATE_DISABLING, 264 mm1_constants.MM_MODEM_STATE_ENABLING, 265 mm1_constants.MM_MODEM_STATE_SEARCHING, 266 mm1_constants.MM_MODEM_STATE_DISCONNECTING, 267 mm1_constants.MM_MODEM_STATE_CONNECTING]: 268 raise ModemManager1ProxyError( 269 'wait_for_states() does not support transitory states.') 270 271 utils.poll_for_condition( 272 lambda: self.properties(mm1_constants.I_MODEM)[ 273 mm1_constants.MM_MODEM_PROPERTY_NAME_STATE] in states, 274 exception=ModemManager1ProxyError( 275 'Timed out waiting for modem to enter one of these ' 276 'states: %s, current state=%s', 277 states, 278 self.properties(mm1_constants.I_MODEM)[ 279 mm1_constants.MM_MODEM_PROPERTY_NAME_STATE]), 280 timeout=timeout_seconds) 281 282 283 class SimProxy(object): 284 """A wrapper around a DBus proxy for ModemManager1 SIM object.""" 285 286 def __init__(self, bus, path): 287 self._bus = bus 288 self._sim = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path) 289 290 291 @property 292 def sim(self): 293 """@return the DBus SIM object.""" 294 return self._sim 295 296 297 @property 298 def iface_properties(self): 299 """@return org.freedesktop.DBus.Properties DBus interface.""" 300 return dbus.Interface(self._sim, dbus.PROPERTIES_IFACE) 301 302 303 @property 304 def iface_sim(self): 305 """@return org.freedesktop.ModemManager1.Sim DBus interface.""" 306 return dbus.Interface(self._sim, mm1_constants.I_SIM) 307 308 309 def properties(self, iface=mm1_constants.I_SIM): 310 """Return the properties associated with the specified interface. 311 312 @param iface: Name of interface to retrieve the properties from. 313 @return array of properties. 314 315 """ 316 return self.iface_properties.GetAll(iface) 317