Home | History | Annotate | Download | only in cellular_DisableWhileConnecting
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import gobject
      6 import logging
      7 import time
      8 
      9 from autotest_lib.client.bin import test
     10 from autotest_lib.client.common_lib import error
     11 from autotest_lib.client.cros.cellular import modem_utils
     12 from autotest_lib.client.cros.mainloop import ExceptionForward
     13 from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
     14 from autotest_lib.client.cros.networking import shill_proxy
     15 
     16 DEFAULT_TEST_TIMEOUT_S = 600
     17 
     18 
     19 class DisableTester(GenericTesterMainLoop):
     20   """Base class containing main test logic."""
     21   def __init__(self, *args, **kwargs):
     22     super(DisableTester, self).__init__(*args, **kwargs)
     23 
     24   @ExceptionForward
     25   def perform_one_test(self):
     26     """Called by GenericMainTesterMainLoop to execute the test."""
     27     self._configure()
     28     disable_delay_ms = (
     29         self.test_kwargs.get('delay_before_disable_ms', 0) +
     30         self.test.iteration *
     31         self.test_kwargs.get('disable_delay_per_iteration_ms', 0))
     32     gobject.timeout_add(disable_delay_ms, self._start_disable)
     33     self._start_test()
     34 
     35   @ExceptionForward
     36   def _connect_success_handler(self, *ignored_args):
     37     logging.info('connect succeeded')
     38     self.requirement_completed('connect')
     39 
     40   @ExceptionForward
     41   def _connect_error_handler(self, e):
     42     # We disabled while connecting; error is OK
     43     logging.info('connect errored: %s', e)
     44     self.requirement_completed('connect')
     45 
     46   @ExceptionForward
     47   def _start_disable(self):
     48     logging.info('disabling')
     49     self.disable_start = time.time()
     50     self._enable(False)
     51 
     52   @ExceptionForward
     53   def _disable_success_handler(self):
     54     disable_elapsed = time.time() - self.disable_start
     55     self.requirement_completed('disable')
     56 
     57   @ExceptionForward
     58   def _get_status_success_handler(self, status):
     59     logging.info('Got status')
     60     self.requirement_completed('get_status', warn_if_already_completed=False)
     61     if self.status_delay_ms:
     62       gobject.timeout_add(self.status_delay_ms, self._start_get_status)
     63 
     64   def after_main_loop(self):
     65     """Called by GenericTesterMainLoop after the main loop has exited."""
     66     enabled = self._enabled()
     67     logging.info('Modem enabled: %s', enabled)
     68     # Will return happily if no Gobi present
     69     modem_utils.ClearGobiModemFaultInjection()
     70 
     71 
     72 class ShillDisableTester(DisableTester):
     73   """Tests that disable-while-connecting works at the shill level.
     74   Expected control flow:
     75 
     76   * self._configure() called; registers self._disable_property_changed
     77     to be called when device is en/disabled
     78 
     79   * Parent class sets a timer that calls self._enable(False) when it expires.
     80 
     81   * _start_test calls _start_connect() which sends a connect request to
     82     the device.
     83 
     84   * we wait for the modem to power off, at which point
     85     _disable_property_changed (registered above) will get called
     86 
     87   * _disable_property_changed() completes the 'disable' requirement,
     88     and we're done.
     89 
     90   """
     91   def __init__(self, *args, **kwargs):
     92     super(ShillDisableTester, self).__init__(*args, **kwargs)
     93 
     94   def _disable_property_changed(self, property, value, *args, **kwargs):
     95     self._disable_success_handler()
     96 
     97   def _start_test(self):
     98     # We would love to add requirements based on connect, but in many
     99     # scenarios, there is no observable response to a cancelled
    100     # connect: We issue a connect, it returns instantly to let us know
    101     # that the connect has started, but then the disable takes effect
    102     # and the connect fails.  We don't get a state change because no
    103     # state change has happened: the modem never got to a different
    104     # state before we cancelled
    105     self.remaining_requirements = set(['disable'])
    106     self._start_connect()
    107 
    108   def _configure(self):
    109     self.cellular_device = \
    110         self.test.test_env.shill.find_cellular_device_object()
    111     if self.cellular_device is None:
    112       raise error.TestError("Could not find cellular device")
    113 
    114     self.cellular_service = \
    115         self.test.test_env.shill.find_cellular_service_object()
    116 
    117     self.test.test_env.bus.add_signal_receiver(
    118             self.dispatch_property_changed,
    119             signal_name='PropertyChanged',
    120             dbus_interface=self.cellular_device.dbus_interface,
    121             path=self.cellular_device.object_path)
    122 
    123   @ExceptionForward
    124   def _expect_einprogress_handler(self, e):
    125     pass
    126 
    127   def _enable(self, value):
    128     self.property_changed_actions['Powered'] = self._disable_property_changed
    129 
    130     if value:
    131       self.cellular_device.Enable(
    132           reply_handler=self.ignore_handler,
    133           error_handler=self._expect_einprogress_handler)
    134     else:
    135       self.cellular_device.Disable(
    136           reply_handler=self.ignore_handler,
    137           error_handler=self._expect_einprogress_handler)
    138 
    139   @ExceptionForward
    140   def _start_connect(self):
    141     logging.info('connecting')
    142 
    143     def _log_connect_event(property, value, *ignored_args):
    144       logging.info('%s property changed: %s', property, value)
    145 
    146     self.property_changed_actions['Connected'] = _log_connect_event
    147 
    148     # Contrary to documentation, Connect just returns when it has
    149     # fired off the lower-level dbus messages.  So a success means
    150     # nothing to us.  But a failure means it didn't even try.
    151     self.cellular_service.Connect(
    152         reply_handler=self.ignore_handler,
    153         error_handler=self.build_error_handler('Connect'))
    154 
    155   def _enabled(self):
    156     return self.test.test_env.shill.get_dbus_property(
    157             self.cellular_device,
    158             shill_proxy.ShillProxy.DEVICE_PROPERTY_POWERED)
    159 
    160 
    161 class ModemDisableTester(DisableTester):
    162   """Tests that disable-while-connecting works at the modem-manager level.
    163 
    164   Expected control flow:
    165 
    166   * _configure() is called.
    167 
    168   * Parent class sets a timer that calls self._enable(False) when it
    169     expires.
    170 
    171   * _start_test calls _start_connect() which sends a connect request to
    172     the device, also sets a timer that calls GetStatus on the modem.
    173 
    174   * wait for all three (connect, disable, get_status) to complete.
    175 
    176   """
    177   def __init__(self, *args, **kwargs):
    178     super(ModemDisableTester, self).__init__(*args, **kwargs)
    179 
    180   def _is_gobi(self):
    181     return 'Gobi' in self.test.test_env.modem.path
    182 
    183   def _start_test(self):
    184     self.remaining_requirements = set(['connect', 'disable'])
    185 
    186     # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus
    187     # will always succeed, so we only check it if the modem is a Gobi.
    188     if self._is_gobi():
    189       self.remaining_requirements.add('get_status')
    190       self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200)
    191       gobject.timeout_add(self.status_delay_ms, self._start_get_status)
    192 
    193     self._start_connect()
    194 
    195   def _configure(self):
    196     self.simple_modem = self.test.test_env.modem.SimpleModem()
    197 
    198     logging.info('Modem path: %s', self.test.test_env.modem.path)
    199 
    200     if self._is_gobi():
    201       self._configure_gobi()
    202     else:
    203       self._configure_non_gobi()
    204 
    205     service = self.test.test_env.shill.wait_for_cellular_service_object()
    206     if not service:
    207       raise error.TestError('Modem failed to register with the network after '
    208                             're-enabling.')
    209 
    210   def _configure_gobi(self):
    211     gobi_modem = self.test.test_env.modem.GobiModem()
    212 
    213     if 'async_connect_sleep_ms' in self.test_kwargs:
    214       sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0)
    215       logging.info('Sleeping %d ms before connect', sleep_ms)
    216       gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms)
    217 
    218     if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
    219       logging.info('Injecting QMI failure')
    220       gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1)
    221 
    222   def _configure_non_gobi(self):
    223     # Check to make sure no Gobi-specific arguments were specified.
    224     if 'async_connect_sleep_ms' in self.test_kwargs:
    225       raise error.TestError('async_connect_sleep_ms on non-Gobi modem')
    226     if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
    227       raise error.TestError(
    228           'connect_fails_with_error_sending_qmi_request on non-Gobi modem')
    229 
    230   @ExceptionForward
    231   def _start_connect(self):
    232     logging.info('connecting')
    233 
    234     retval = self.simple_modem.Connect(
    235         {},
    236         reply_handler=self._connect_success_handler,
    237         error_handler=self._connect_error_handler)
    238     logging.info('connect call made.  retval = %s', retval)
    239 
    240 
    241   @ExceptionForward
    242   def _start_get_status(self):
    243     # Keep on calling get_status to make sure it works at all times
    244     self.simple_modem.GetStatus(
    245         reply_handler=self._get_status_success_handler,
    246         error_handler=self.build_error_handler('GetStatus'))
    247 
    248   def _enabled(self):
    249     return self.test.test_env.modem.GetModemProperties().get('Enabled', -1)
    250 
    251   def _enable(self, value):
    252     self.test.test_env.modem.Enable(
    253         value,
    254         reply_handler=self._disable_success_handler,
    255         error_handler=self.build_error_handler('Enable'))
    256 
    257 
    258 class cellular_DisableWhileConnecting(test.test):
    259   """Check that the modem can handle a disconnect while connecting."""
    260   version = 1
    261 
    262   def run_once(self, test_env, **kwargs):
    263     self.test_env = test_env
    264     timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S)
    265     gobject_main_loop = gobject.MainLoop()
    266 
    267     with test_env:
    268       logging.info('Shill-level test')
    269       shill_level_test = ShillDisableTester(self,
    270                                             gobject_main_loop,
    271                                             timeout_s=timeout_s)
    272       shill_level_test.run(**kwargs)
    273 
    274     with test_env:
    275       try:
    276         logging.info('Modem-level test')
    277         modem_level_test = ModemDisableTester(self,
    278                                               gobject_main_loop,
    279                                               timeout_s=timeout_s)
    280         modem_level_test.run(**kwargs)
    281       finally:
    282         modem_utils.ClearGobiModemFaultInjection()
    283