1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import dbus 6 import logging 7 import time 8 9 from autotest_lib.client.cros.networking import shill_proxy 10 11 12 class CellularProxy(shill_proxy.ShillProxy): 13 """Wrapper around shill dbus interface used by cellular tests.""" 14 15 # Properties exposed by shill. 16 DEVICE_PROPERTY_DBUS_OBJECT = 'DBus.Object' 17 DEVICE_PROPERTY_MODEL_ID = 'Cellular.ModelID' 18 DEVICE_PROPERTY_OUT_OF_CREDITS = 'Cellular.OutOfCredits' 19 DEVICE_PROPERTY_SIM_LOCK_STATUS = 'Cellular.SIMLockStatus' 20 DEVICE_PROPERTY_SIM_PRESENT = 'Cellular.SIMPresent' 21 DEVICE_PROPERTY_TECHNOLOGY_FAMILY = 'Cellular.Family' 22 DEVICE_PROPERTY_TECHNOLOGY_FAMILY_CDMA = 'CDMA' 23 DEVICE_PROPERTY_TECHNOLOGY_FAMILY_GSM = 'GSM' 24 SERVICE_PROPERTY_LAST_GOOD_APN = 'Cellular.LastGoodAPN' 25 26 # APN info property names. 27 APN_INFO_PROPERTY_APN = 'apn' 28 29 # Keys into the dictionaries exposed as properties. 30 PROPERTY_KEY_SIM_LOCK_TYPE = 'LockType' 31 PROPERTY_KEY_SIM_LOCK_ENABLED = 'LockEnabled' 32 PROPERTY_KEY_SIM_LOCK_RETRIES_LEFT = 'RetriesLeft' 33 34 # Valid values taken by properties exposed by shill. 35 VALUE_SIM_LOCK_TYPE_PIN = 'sim-pin' 36 VALUE_SIM_LOCK_TYPE_PUK = 'sim-puk' 37 38 # Various timeouts in seconds. 39 SERVICE_CONNECT_TIMEOUT = 60 40 SERVICE_DISCONNECT_TIMEOUT = 60 41 SERVICE_REGISTRATION_TIMEOUT = 60 42 SLEEP_INTERVAL = 0.1 43 44 def set_logging_for_cellular_test(self): 45 """Set the logging in shill for a test of cellular technology. 46 47 Set the log level to |ShillProxy.LOG_LEVEL_FOR_TEST| and the log scopes 48 to the ones defined in |ShillProxy.LOG_SCOPES_FOR_TEST| for 49 |ShillProxy.TECHNOLOGY_CELLULAR|. 50 51 """ 52 self.set_logging_for_test(self.TECHNOLOGY_CELLULAR) 53 54 55 def find_cellular_service_object(self): 56 """Returns the first dbus object found that is a cellular service. 57 58 @return DBus object for the first cellular service found. None if no 59 service found. 60 61 """ 62 return self.find_object('Service', {'Type': self.TECHNOLOGY_CELLULAR}) 63 64 65 def wait_for_cellular_service_object( 66 self, timeout_seconds=SERVICE_REGISTRATION_TIMEOUT): 67 """Waits for the cellular service object to show up. 68 69 @param timeout_seconds: Amount of time to wait for cellular service. 70 @return DBus object for the first cellular service found. 71 @raises ShillProxyError if no cellular service is found within the 72 registration timeout period. 73 74 """ 75 CellularProxy._poll_for_condition( 76 lambda: self.find_cellular_service_object() is not None, 77 'Failed to find cellular service object', 78 timeout=timeout_seconds) 79 return self.find_cellular_service_object() 80 81 82 def find_cellular_device_object(self): 83 """Returns the first dbus object found that is a cellular device. 84 85 @return DBus object for the first cellular device found. None if no 86 device found. 87 88 """ 89 return self.find_object('Device', {'Type': self.TECHNOLOGY_CELLULAR}) 90 91 92 def reset_modem(self, modem, expect_device=True, expect_powered=True, 93 expect_service=True): 94 """Reset |modem|. 95 96 Do, in sequence, 97 (1) Ensure that the current device object disappears. 98 (2) If |expect_device|, ensure that the device reappears. 99 (3) If |expect_powered|, ensure that the device is powered. 100 (4) If |expect_service|, ensure that the service reappears. 101 102 This function does not check the service state for the device after 103 reset. 104 105 @param modem: DBus object for the modem to reset. 106 @param expect_device: If True, ensure that a DBus object reappears for 107 the same modem after the reset. 108 @param expect_powered: If True, ensure that the modem is powered on 109 after the reset. 110 @param expect_service: If True, ensure that a service managing the 111 reappeared modem also reappears. 112 113 @return (device, service) 114 device: DBus object for the reappeared Device after the reset. 115 service: DBus object for the reappeared Service after the reset. 116 Either of these may be None, if the object is not expected to 117 reappear. 118 119 @raises ShillProxyError if any of the conditions (1)-(4) fail. 120 121 """ 122 logging.info('Resetting modem') 123 # Obtain identifying information about the modem. 124 properties = modem.GetProperties(utf8_strings=True) 125 # NOTE: Using the Model ID means that this will break if we have two 126 # identical cellular modems in a DUT. Fortunately, we only support one 127 # modem at a time. 128 model_id = properties.get(self.DEVICE_PROPERTY_MODEL_ID) 129 if not model_id: 130 raise shill_proxy.ShillProxyError( 131 'Failed to get identifying information for the modem.') 132 old_modem_path = modem.object_path 133 old_modem_mm_object = properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT) 134 if not old_modem_mm_object: 135 raise shill_proxy.ShillProxyError( 136 'Failed to get the mm object path for the modem.') 137 138 modem.Reset() 139 140 # (1) Wait for the old modem to disappear 141 CellularProxy._poll_for_condition( 142 lambda: self._is_old_modem_gone(old_modem_path, 143 old_modem_mm_object), 144 'Old modem disappeared', 145 timeout=60) 146 147 # (2) Wait for the device to reappear 148 if not expect_device: 149 return None, None 150 # The timeout here should be sufficient for our slowest modem to 151 # reappear. 152 CellularProxy._poll_for_condition( 153 lambda: self._get_reappeared_modem(model_id, 154 old_modem_mm_object), 155 desc='The modem reappeared after reset.', 156 timeout=60) 157 new_modem = self._get_reappeared_modem(model_id, old_modem_mm_object) 158 159 # (3) Check powered state of the device 160 if not expect_powered: 161 return new_modem, None 162 success, _, _ = self.wait_for_property_in(new_modem, 163 self.DEVICE_PROPERTY_POWERED, 164 [self.VALUE_POWERED_ON], 165 timeout_seconds=10) 166 if not success: 167 raise shill_proxy.ShillProxyError( 168 'After modem reset, new modem failed to enter powered ' 169 'state.') 170 171 # (4) Check that service reappears 172 if not expect_service: 173 return new_modem, None 174 new_service = self.get_service_for_device(new_modem) 175 if not new_service: 176 raise shill_proxy.ShillProxyError( 177 'Failed to find a shill service managing the reappeared ' 178 'device.') 179 return new_modem, new_service 180 181 182 def disable_modem_for_test_setup(self, timeout_seconds=10): 183 """ 184 Disables all cellular modems. 185 186 Use this method only for setting up tests. Do not use this method to 187 test disable functionality because this method repeatedly attempts to 188 disable the cellular technology until it succeeds (ignoring all DBus 189 errors) since the DisableTechnology() call may fail for various reasons 190 (eg. an enable is in progress). 191 192 @param timeout_seconds: Amount of time to wait until the modem is 193 disabled. 194 @raises ShillProxyError if the modems fail to disable within 195 |timeout_seconds|. 196 197 """ 198 def _disable_cellular_technology(self): 199 try: 200 self._manager.DisableTechnology(self.TECHNOLOGY_CELLULAR) 201 return True 202 except dbus.DBusException as e: 203 return False 204 205 CellularProxy._poll_for_condition( 206 lambda: _disable_cellular_technology(self), 207 'Failed to disable cellular technology.', 208 timeout=timeout_seconds) 209 modem = self.find_cellular_device_object() 210 self.wait_for_property_in(modem, self.DEVICE_PROPERTY_POWERED, 211 [self.VALUE_POWERED_OFF], 212 timeout_seconds=timeout_seconds) 213 214 215 # TODO(pprabhu) Use utils.poll_for_condition instead 216 @staticmethod 217 def _poll_for_condition(condition, desc, timeout=10): 218 """Poll till |condition| is satisfied. 219 220 @param condition: A function taking no arguments. The condition is 221 met when the return value can be cast to the bool True. 222 @param desc: The description given when we timeout waiting for 223 |condition|. 224 225 """ 226 start_time = time.time() 227 while True: 228 value = condition() 229 if value: 230 return value 231 if(time.time() + CellularProxy.SLEEP_INTERVAL - start_time > 232 timeout): 233 raise shill_proxy.ShillProxyError( 234 'Timed out waiting for condition %s.' % desc) 235 time.sleep(CellularProxy.SLEEP_INTERVAL) 236 237 238 def _is_old_modem_gone(self, modem_path, modem_mm_object): 239 """Tests if the DBus object for modem disappears after Reset. 240 241 @param modem_path: The DBus path for the modem object that must vanish. 242 @param modem_mm_object: The modemmanager object path reported by the 243 old modem. This is unique everytime a new modem is (re)exposed. 244 245 @return True if the object disappeared, false otherwise. 246 247 """ 248 device = self.get_dbus_object(self.DBUS_TYPE_DEVICE, modem_path) 249 try: 250 properties = device.GetProperties() 251 # DBus object exists, perhaps a reappeared device? 252 return (properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT) != 253 modem_mm_object) 254 except dbus.DBusException as e: 255 if e.get_dbus_name() == self.DBUS_ERROR_UNKNOWN_OBJECT: 256 return True 257 return False 258 259 260 def _get_reappeared_modem(self, model_id, old_modem_mm_object): 261 """Check that a vanished modem reappers. 262 263 @param model_id: The model ID reported by the vanished modem. 264 @param old_modem_mm_object: The previously reported modemmanager object 265 path for this modem. 266 267 @return The reappeared DBus object, if any. None otherwise. 268 269 """ 270 # TODO(pprabhu) This will break if we have multiple cellular devices 271 # in the system at the same time. 272 device = self.find_cellular_device_object() 273 if not device: 274 return None 275 properties = device.GetProperties(utf8_strings=True) 276 if (model_id == properties.get(self.DEVICE_PROPERTY_MODEL_ID) and 277 (old_modem_mm_object != 278 properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT))): 279 return device 280 return None 281