1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import gobject 6 import logging 7 import time 8 9 from autotest_lib.client.bin import test 10 from autotest_lib.client.common_lib import error 11 from autotest_lib.client.cros.cellular import modem_utils 12 from autotest_lib.client.cros.mainloop import GenericTesterMainLoop 13 from autotest_lib.client.cros.mainloop import ExceptionForward 14 15 DEFAULT_TEST_TIMEOUT_S = 600 16 17 18 class DisableTester(GenericTesterMainLoop): 19 """Base class containing main test logic.""" 20 def __init__(self, *args, **kwargs): 21 super(DisableTester, self).__init__(*args, **kwargs) 22 23 @ExceptionForward 24 def perform_one_test(self): 25 """Called by GenericMainTesterMainLoop to execute the test.""" 26 self._configure() 27 disable_delay_ms = ( 28 self.test_kwargs.get('delay_before_disable_ms', 0) + 29 self.test.iteration * 30 self.test_kwargs.get('disable_delay_per_iteration_ms', 0)) 31 gobject.timeout_add(disable_delay_ms, self._start_disable) 32 self._start_test() 33 34 @ExceptionForward 35 def _connect_success_handler(self, *ignored_args): 36 logging.info('connect succeeded') 37 self.requirement_completed('connect') 38 39 @ExceptionForward 40 def _connect_error_handler(self, e): 41 # We disabled while connecting; error is OK 42 logging.info('connect errored: %s', e) 43 self.requirement_completed('connect') 44 45 @ExceptionForward 46 def _start_disable(self): 47 logging.info('disabling') 48 self.disable_start = time.time() 49 self._enable(False) 50 51 @ExceptionForward 52 def _disable_success_handler(self): 53 disable_elapsed = time.time() - self.disable_start 54 self.requirement_completed('disable') 55 56 @ExceptionForward 57 def _get_status_success_handler(self, status): 58 logging.info('Got status') 59 self.requirement_completed('get_status', warn_if_already_completed=False) 60 if self.status_delay_ms: 61 gobject.timeout_add(self.status_delay_ms, self._start_get_status) 62 63 def after_main_loop(self): 64 """Called by GenericTesterMainLoop after the main loop has exited.""" 65 enabled = self._enabled() 66 logging.info('Modem enabled: %s', enabled) 67 # Will return happily if no Gobi present 68 modem_utils.ClearGobiModemFaultInjection() 69 70 71 class ShillDisableTester(DisableTester): 72 """Tests that disable-while-connecting works at the shill level. 73 Expected control flow: 74 75 * self._configure() called; registers self._disable_property_changed 76 to be called when device is en/disabled 77 78 * Parent class sets a timer that calls self._enable(False) when it expires. 79 80 * _start_test calls _start_connect() which sends a connect request to 81 the device. 82 83 * we wait for the modem to power off, at which point 84 _disable_property_changed (registered above) will get called 85 86 * _disable_property_changed() completes the 'disable' requirement, 87 and we're done. 88 89 """ 90 def __init__(self, *args, **kwargs): 91 super(ShillDisableTester, self).__init__(*args, **kwargs) 92 93 def _disable_property_changed(self, property, value, *args, **kwargs): 94 self._disable_success_handler() 95 96 def _start_test(self): 97 # We would love to add requirements based on connect, but in many 98 # scenarios, there is no observable response to a cancelled 99 # connect: We issue a connect, it returns instantly to let us know 100 # that the connect has started, but then the disable takes effect 101 # and the connect fails. We don't get a state change because no 102 # state change has happened: the modem never got to a different 103 # state before we cancelled 104 self.remaining_requirements = set(['disable']) 105 self._start_connect() 106 107 def _configure(self): 108 self.cellular_device = \ 109 self.test.test_env.shill.find_cellular_device_object() 110 if self.cellular_device is None: 111 raise error.TestError("Could not find cellular device") 112 113 self.cellular_service = \ 114 self.test.test_env.shill.find_cellular_service_object() 115 116 self.test.test_env.bus.add_signal_receiver( 117 self.dispatch_property_changed, 118 signal_name='PropertyChanged', 119 dbus_interface=self.cellular_device.dbus_interface, 120 path=self.cellular_device.object_path) 121 122 @ExceptionForward 123 def _expect_einprogress_handler(self, e): 124 pass 125 126 def _enable(self, value): 127 self.property_changed_actions['Powered'] = self._disable_property_changed 128 129 if value: 130 self.cellular_device.Enable( 131 reply_handler=self.ignore_handler, 132 error_handler=self._expect_einprogress_handler) 133 else: 134 self.cellular_device.Disable( 135 reply_handler=self.ignore_handler, 136 error_handler=self._expect_einprogress_handler) 137 138 @ExceptionForward 139 def _start_connect(self): 140 logging.info('connecting') 141 142 def _log_connect_event(property, value, *ignored_args): 143 logging.info('%s property changed: %s', property, value) 144 145 self.property_changed_actions['Connected'] = _log_connect_event 146 147 # Contrary to documentation, Connect just returns when it has 148 # fired off the lower-level dbus messages. So a success means 149 # nothing to us. But a failure means it didn't even try. 150 self.cellular_service.Connect( 151 reply_handler=self.ignore_handler, 152 error_handler=self.build_error_handler('Connect')) 153 154 def _enabled(self): 155 return self.cellular_device.GetProperties()['Powered'] 156 157 158 class ModemDisableTester(DisableTester): 159 """Tests that disable-while-connecting works at the modem-manager level. 160 161 Expected control flow: 162 163 * _configure() is called. 164 165 * Parent class sets a timer that calls self._enable(False) when it 166 expires. 167 168 * _start_test calls _start_connect() which sends a connect request to 169 the device, also sets a timer that calls GetStatus on the modem. 170 171 * wait for all three (connect, disable, get_status) to complete. 172 173 """ 174 def __init__(self, *args, **kwargs): 175 super(ModemDisableTester, self).__init__(*args, **kwargs) 176 177 def _is_gobi(self): 178 return 'Gobi' in self.test.test_env.modem.path 179 180 def _start_test(self): 181 self.remaining_requirements = set(['connect', 'disable']) 182 183 # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus 184 # will always succeed, so we only check it if the modem is a Gobi. 185 if self._is_gobi(): 186 self.remaining_requirements.add('get_status') 187 self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200) 188 gobject.timeout_add(self.status_delay_ms, self._start_get_status) 189 190 self._start_connect() 191 192 def _configure(self): 193 self.simple_modem = self.test.test_env.modem.SimpleModem() 194 195 logging.info('Modem path: %s', self.test.test_env.modem.path) 196 197 if self._is_gobi(): 198 self._configure_gobi() 199 else: 200 self._configure_non_gobi() 201 202 service = self.test.test_env.shill.wait_for_cellular_service_object() 203 if not service: 204 raise error.TestError('Modem failed to register with the network after ' 205 're-enabling.') 206 207 def _configure_gobi(self): 208 gobi_modem = self.test.test_env.modem.GobiModem() 209 210 if 'async_connect_sleep_ms' in self.test_kwargs: 211 sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0) 212 logging.info('Sleeping %d ms before connect', sleep_ms) 213 gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms) 214 215 if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs: 216 logging.info('Injecting QMI failure') 217 gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1) 218 219 def _configure_non_gobi(self): 220 # Check to make sure no Gobi-specific arguments were specified. 221 if 'async_connect_sleep_ms' in self.test_kwargs: 222 raise error.TestError('async_connect_sleep_ms on non-Gobi modem') 223 if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs: 224 raise error.TestError( 225 'connect_fails_with_error_sending_qmi_request on non-Gobi modem') 226 227 @ExceptionForward 228 def _start_connect(self): 229 logging.info('connecting') 230 231 retval = self.simple_modem.Connect( 232 {}, 233 reply_handler=self._connect_success_handler, 234 error_handler=self._connect_error_handler) 235 logging.info('connect call made. retval = %s', retval) 236 237 238 @ExceptionForward 239 def _start_get_status(self): 240 # Keep on calling get_status to make sure it works at all times 241 self.simple_modem.GetStatus( 242 reply_handler=self._get_status_success_handler, 243 error_handler=self.build_error_handler('GetStatus')) 244 245 def _enabled(self): 246 return self.test.test_env.modem.GetModemProperties().get('Enabled', -1) 247 248 def _enable(self, value): 249 self.test.test_env.modem.Enable( 250 value, 251 reply_handler=self._disable_success_handler, 252 error_handler=self.build_error_handler('Enable')) 253 254 255 class network_3GDisableWhileConnecting(test.test): 256 """Check that the modem can handle a disconnect while connecting.""" 257 version = 1 258 259 def run_once(self, test_env, **kwargs): 260 self.test_env = test_env 261 timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S) 262 gobject_main_loop = gobject.MainLoop() 263 264 with test_env: 265 logging.info('Shill-level test') 266 shill_level_test = ShillDisableTester(self, 267 gobject_main_loop, 268 timeout_s=timeout_s) 269 shill_level_test.run(**kwargs) 270 271 with test_env: 272 try: 273 logging.info('Modem-level test') 274 modem_level_test = ModemDisableTester(self, 275 gobject_main_loop, 276 timeout_s=timeout_s) 277 modem_level_test.run(**kwargs) 278 finally: 279 modem_utils.ClearGobiModemFaultInjection() 280