Home | History | Annotate | Download | only in network_3GDisableWhileConnecting
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import gobject
      6 import logging
      7 import time
      8 
      9 from autotest_lib.client.bin import test
     10 from autotest_lib.client.common_lib import error
     11 from autotest_lib.client.cros.cellular import modem_utils
     12 from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
     13 from autotest_lib.client.cros.mainloop import ExceptionForward
     14 
     15 DEFAULT_TEST_TIMEOUT_S = 600
     16 
     17 
     18 class DisableTester(GenericTesterMainLoop):
     19   """Base class containing main test logic."""
     20   def __init__(self, *args, **kwargs):
     21     super(DisableTester, self).__init__(*args, **kwargs)
     22 
     23   @ExceptionForward
     24   def perform_one_test(self):
     25     """Called by GenericMainTesterMainLoop to execute the test."""
     26     self._configure()
     27     disable_delay_ms = (
     28         self.test_kwargs.get('delay_before_disable_ms', 0) +
     29         self.test.iteration *
     30         self.test_kwargs.get('disable_delay_per_iteration_ms', 0))
     31     gobject.timeout_add(disable_delay_ms, self._start_disable)
     32     self._start_test()
     33 
     34   @ExceptionForward
     35   def _connect_success_handler(self, *ignored_args):
     36     logging.info('connect succeeded')
     37     self.requirement_completed('connect')
     38 
     39   @ExceptionForward
     40   def _connect_error_handler(self, e):
     41     # We disabled while connecting; error is OK
     42     logging.info('connect errored: %s', e)
     43     self.requirement_completed('connect')
     44 
     45   @ExceptionForward
     46   def _start_disable(self):
     47     logging.info('disabling')
     48     self.disable_start = time.time()
     49     self._enable(False)
     50 
     51   @ExceptionForward
     52   def _disable_success_handler(self):
     53     disable_elapsed = time.time() - self.disable_start
     54     self.requirement_completed('disable')
     55 
     56   @ExceptionForward
     57   def _get_status_success_handler(self, status):
     58     logging.info('Got status')
     59     self.requirement_completed('get_status', warn_if_already_completed=False)
     60     if self.status_delay_ms:
     61       gobject.timeout_add(self.status_delay_ms, self._start_get_status)
     62 
     63   def after_main_loop(self):
     64     """Called by GenericTesterMainLoop after the main loop has exited."""
     65     enabled = self._enabled()
     66     logging.info('Modem enabled: %s', enabled)
     67     # Will return happily if no Gobi present
     68     modem_utils.ClearGobiModemFaultInjection()
     69 
     70 
     71 class ShillDisableTester(DisableTester):
     72   """Tests that disable-while-connecting works at the shill level.
     73   Expected control flow:
     74 
     75   * self._configure() called; registers self._disable_property_changed
     76     to be called when device is en/disabled
     77 
     78   * Parent class sets a timer that calls self._enable(False) when it expires.
     79 
     80   * _start_test calls _start_connect() which sends a connect request to
     81     the device.
     82 
     83   * we wait for the modem to power off, at which point
     84     _disable_property_changed (registered above) will get called
     85 
     86   * _disable_property_changed() completes the 'disable' requirement,
     87     and we're done.
     88 
     89   """
     90   def __init__(self, *args, **kwargs):
     91     super(ShillDisableTester, self).__init__(*args, **kwargs)
     92 
     93   def _disable_property_changed(self, property, value, *args, **kwargs):
     94     self._disable_success_handler()
     95 
     96   def _start_test(self):
     97     # We would love to add requirements based on connect, but in many
     98     # scenarios, there is no observable response to a cancelled
     99     # connect: We issue a connect, it returns instantly to let us know
    100     # that the connect has started, but then the disable takes effect
    101     # and the connect fails.  We don't get a state change because no
    102     # state change has happened: the modem never got to a different
    103     # state before we cancelled
    104     self.remaining_requirements = set(['disable'])
    105     self._start_connect()
    106 
    107   def _configure(self):
    108     self.cellular_device = \
    109         self.test.test_env.shill.find_cellular_device_object()
    110     if self.cellular_device is None:
    111       raise error.TestError("Could not find cellular device")
    112 
    113     self.cellular_service = \
    114         self.test.test_env.shill.find_cellular_service_object()
    115 
    116     self.test.test_env.bus.add_signal_receiver(
    117             self.dispatch_property_changed,
    118             signal_name='PropertyChanged',
    119             dbus_interface=self.cellular_device.dbus_interface,
    120             path=self.cellular_device.object_path)
    121 
    122   @ExceptionForward
    123   def _expect_einprogress_handler(self, e):
    124     pass
    125 
    126   def _enable(self, value):
    127     self.property_changed_actions['Powered'] = self._disable_property_changed
    128 
    129     if value:
    130       self.cellular_device.Enable(
    131           reply_handler=self.ignore_handler,
    132           error_handler=self._expect_einprogress_handler)
    133     else:
    134       self.cellular_device.Disable(
    135           reply_handler=self.ignore_handler,
    136           error_handler=self._expect_einprogress_handler)
    137 
    138   @ExceptionForward
    139   def _start_connect(self):
    140     logging.info('connecting')
    141 
    142     def _log_connect_event(property, value, *ignored_args):
    143       logging.info('%s property changed: %s', property, value)
    144 
    145     self.property_changed_actions['Connected'] = _log_connect_event
    146 
    147     # Contrary to documentation, Connect just returns when it has
    148     # fired off the lower-level dbus messages.  So a success means
    149     # nothing to us.  But a failure means it didn't even try.
    150     self.cellular_service.Connect(
    151         reply_handler=self.ignore_handler,
    152         error_handler=self.build_error_handler('Connect'))
    153 
    154   def _enabled(self):
    155     return self.cellular_device.GetProperties()['Powered']
    156 
    157 
    158 class ModemDisableTester(DisableTester):
    159   """Tests that disable-while-connecting works at the modem-manager level.
    160 
    161   Expected control flow:
    162 
    163   * _configure() is called.
    164 
    165   * Parent class sets a timer that calls self._enable(False) when it
    166     expires.
    167 
    168   * _start_test calls _start_connect() which sends a connect request to
    169     the device, also sets a timer that calls GetStatus on the modem.
    170 
    171   * wait for all three (connect, disable, get_status) to complete.
    172 
    173   """
    174   def __init__(self, *args, **kwargs):
    175     super(ModemDisableTester, self).__init__(*args, **kwargs)
    176 
    177   def _is_gobi(self):
    178     return 'Gobi' in self.test.test_env.modem.path
    179 
    180   def _start_test(self):
    181     self.remaining_requirements = set(['connect', 'disable'])
    182 
    183     # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus
    184     # will always succeed, so we only check it if the modem is a Gobi.
    185     if self._is_gobi():
    186       self.remaining_requirements.add('get_status')
    187       self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200)
    188       gobject.timeout_add(self.status_delay_ms, self._start_get_status)
    189 
    190     self._start_connect()
    191 
    192   def _configure(self):
    193     self.simple_modem = self.test.test_env.modem.SimpleModem()
    194 
    195     logging.info('Modem path: %s', self.test.test_env.modem.path)
    196 
    197     if self._is_gobi():
    198       self._configure_gobi()
    199     else:
    200       self._configure_non_gobi()
    201 
    202     service = self.test.test_env.shill.wait_for_cellular_service_object()
    203     if not service:
    204       raise error.TestError('Modem failed to register with the network after '
    205                             're-enabling.')
    206 
    207   def _configure_gobi(self):
    208     gobi_modem = self.test.test_env.modem.GobiModem()
    209 
    210     if 'async_connect_sleep_ms' in self.test_kwargs:
    211       sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0)
    212       logging.info('Sleeping %d ms before connect', sleep_ms)
    213       gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms)
    214 
    215     if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
    216       logging.info('Injecting QMI failure')
    217       gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1)
    218 
    219   def _configure_non_gobi(self):
    220     # Check to make sure no Gobi-specific arguments were specified.
    221     if 'async_connect_sleep_ms' in self.test_kwargs:
    222       raise error.TestError('async_connect_sleep_ms on non-Gobi modem')
    223     if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
    224       raise error.TestError(
    225           'connect_fails_with_error_sending_qmi_request on non-Gobi modem')
    226 
    227   @ExceptionForward
    228   def _start_connect(self):
    229     logging.info('connecting')
    230 
    231     retval = self.simple_modem.Connect(
    232         {},
    233         reply_handler=self._connect_success_handler,
    234         error_handler=self._connect_error_handler)
    235     logging.info('connect call made.  retval = %s', retval)
    236 
    237 
    238   @ExceptionForward
    239   def _start_get_status(self):
    240     # Keep on calling get_status to make sure it works at all times
    241     self.simple_modem.GetStatus(
    242         reply_handler=self._get_status_success_handler,
    243         error_handler=self.build_error_handler('GetStatus'))
    244 
    245   def _enabled(self):
    246     return self.test.test_env.modem.GetModemProperties().get('Enabled', -1)
    247 
    248   def _enable(self, value):
    249     self.test.test_env.modem.Enable(
    250         value,
    251         reply_handler=self._disable_success_handler,
    252         error_handler=self.build_error_handler('Enable'))
    253 
    254 
    255 class network_3GDisableWhileConnecting(test.test):
    256   """Check that the modem can handle a disconnect while connecting."""
    257   version = 1
    258 
    259   def run_once(self, test_env, **kwargs):
    260     self.test_env = test_env
    261     timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S)
    262     gobject_main_loop = gobject.MainLoop()
    263 
    264     with test_env:
    265       logging.info('Shill-level test')
    266       shill_level_test = ShillDisableTester(self,
    267                                             gobject_main_loop,
    268                                             timeout_s=timeout_s)
    269       shill_level_test.run(**kwargs)
    270 
    271     with test_env:
    272       try:
    273         logging.info('Modem-level test')
    274         modem_level_test = ModemDisableTester(self,
    275                                               gobject_main_loop,
    276                                               timeout_s=timeout_s)
    277         modem_level_test.run(**kwargs)
    278       finally:
    279         modem_utils.ClearGobiModemFaultInjection()
    280