Home | History | Annotate | Download | only in cellular
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Utilities for cellular tests."""
      6 import copy, dbus, os, tempfile
      7 
      8 # TODO(thieule): Consider renaming mm.py, mm1.py, modem.py, etc to be more
      9 # descriptive (crosbug.com/37060).
     10 import common
     11 from autotest_lib.client.bin import utils
     12 from autotest_lib.client.common_lib import error
     13 from autotest_lib.client.cros.cellular import cellular
     14 from autotest_lib.client.cros.cellular import cellular_system_error
     15 from autotest_lib.client.cros.cellular import mm
     16 from autotest_lib.client.cros.cellular import modem
     17 
     18 from autotest_lib.client.cros import flimflam_test_path
     19 import flimflam
     20 
     21 
     22 TIMEOUT = 30
     23 SERVICE_TIMEOUT = 60
     24 
     25 import cellular_logging
     26 
     27 logger = cellular_logging.SetupCellularLogging('cell_tools')
     28 
     29 
     30 def ConnectToCellular(flim, timeout=TIMEOUT):
     31     """Attempts to connect to a cell network using FlimFlam.
     32 
     33     Args:
     34         flim: A flimflam object
     35         timeout: Timeout (in seconds) before giving up on connect
     36 
     37     Returns:
     38         a tuple of the service and the service state
     39 
     40     Raises:
     41         Error if connection fails or times out
     42     """
     43 
     44     service = flim.FindCellularService(timeout=timeout)
     45     if not service:
     46         raise cellular_system_error.ConnectionFailure(
     47             'Could not find cell service')
     48     properties = service.GetProperties(utf8_strings=True)
     49     logger.error('Properties are: %s', properties)
     50 
     51     logger.info('Connecting to cell service: %s', service)
     52 
     53     states = ['portal', 'online', 'idle']
     54     state = flim.WaitForServiceState(service=service,
     55                                      expected_states=states,
     56                                      timeout=timeout,
     57                                      ignore_failure=True)[0]
     58     logger.debug('Cell connection state : %s ' % state)
     59     connected_states = ['portal', 'online']
     60     if state in connected_states:
     61         logger.debug('Looks good, skip ConnectService')
     62         return service, state
     63     else:
     64         logger.debug('Trying to ConnectService')
     65 
     66     success, status = flim.ConnectService(
     67         service=service,
     68         assoc_timeout=timeout,
     69         config_timeout=timeout)
     70 
     71     if not success:
     72         logger.error('Connect failed: %s' % status)
     73         # TODO(rochberg):  Turn off autoconnect
     74         if 'Error.AlreadyConnected' not in status['reason']:
     75             raise cellular_system_error.ConnectionFailure(
     76                 'Could not connect: %s.' % status)
     77 
     78     state = flim.WaitForServiceState(service=service,
     79                                      expected_states=connected_states,
     80                                      timeout=timeout,
     81                                      ignore_failure=True)[0]
     82     if not state in connected_states:
     83         raise cellular_system_error.BadState(
     84             'Still in state %s, expecting one of: %s ' %
     85             (state, str(connected_states)))
     86 
     87     return service, state
     88 
     89 
     90 def FindLastGoodAPN(service, default=None):
     91     if not service:
     92         return default
     93     props = service.GetProperties()
     94     if 'Cellular.LastGoodAPN' not in props:
     95         return default
     96     last_good_apn = props['Cellular.LastGoodAPN']
     97     return last_good_apn.get('apn', default)
     98 
     99 
    100 def DisconnectFromCellularService(bs, flim, service):
    101     """Attempts to disconnect from the supplied cellular service.
    102 
    103     Args:
    104         bs:  A basestation object.  Pass None to skip basestation-side checks
    105         flim:  A flimflam object
    106         service:  A cellular service object
    107     """
    108 
    109     flim.DisconnectService(service)  # Waits for flimflam state to go to idle
    110 
    111     if bs:
    112         verifier = bs.GetAirStateVerifier()
    113         # This is racy: The modem is free to report itself as
    114         # disconnected before it actually finishes tearing down its RF
    115         # connection.
    116         verifier.AssertDataStatusIn([
    117             cellular.UeGenericDataStatus.DISCONNECTING,
    118             cellular.UeGenericDataStatus.REGISTERED,
    119             cellular.UeGenericDataStatus.NONE,])
    120 
    121         def _ModemIsFullyDisconnected():
    122             return verifier.IsDataStatusIn([
    123                 cellular.UeGenericDataStatus.REGISTERED,
    124                 cellular.UeGenericDataStatus.NONE,])
    125 
    126         utils.poll_for_condition(
    127             _ModemIsFullyDisconnected,
    128             timeout=20,
    129             exception=cellular_system_error.BadState(
    130                 'modem not disconnected from base station'))
    131 
    132 
    133 def _EnumerateModems(manager):
    134     """Get a set of modem paths."""
    135     return set([x[1] for x in mm.EnumerateDevices(manager)])
    136 
    137 
    138 def _SawNewModem(manager, preexisting_modems, old_modem):
    139     current_modems = _EnumerateModems(manager)
    140     if old_modem in current_modems:
    141         return False
    142     # NB: This fails if an unrelated modem disappears.  Not fixing
    143     # until we support > 1 modem
    144     return preexisting_modems != current_modems
    145 
    146 
    147 def _WaitForModemToReturn(manager, preexisting_modems_original, modem_path):
    148     preexisting_modems = copy.copy(preexisting_modems_original)
    149     preexisting_modems.remove(modem_path)
    150 
    151     utils.poll_for_condition(
    152         lambda: _SawNewModem(manager, preexisting_modems, modem_path),
    153         timeout=50,
    154         exception=cellular_system_error.BadState(
    155             'Modem did not come back after settings change'))
    156 
    157     current_modems = _EnumerateModems(manager)
    158 
    159     new_modems = [x for x in current_modems - preexisting_modems]
    160     if len(new_modems) != 1:
    161         raise cellular_system_error.BadState(
    162             'Unexpected modem list change: %s vs %s' %
    163             (current_modems, new_modems))
    164 
    165     logger.info('New modem: %s' % new_modems[0])
    166     return new_modems[0]
    167 
    168 
    169 def SetFirmwareForTechnologyFamily(manager, modem_path, family):
    170     """Set the modem to firmware.  Return potentially-new modem path."""
    171     # todo(byronk): put this in a modem object?
    172     if family == cellular.TechnologyFamily.LTE:
    173         return  # nothing to set up on a Pixel. todo(byronk) how about others?
    174     logger.debug('SetFirmwareForTechnologyFamily : manager : %s ' % manager)
    175     logger.debug('SetFirmwareForTechnologyFamily : modem_path : %s ' %
    176                  modem_path)
    177     logger.debug('SetFirmwareForTechnologyFamily : family : %s ' % family)
    178     preexisting_modems = _EnumerateModems(manager)
    179     # We do not currently support any multi-family modems besides Gobi
    180     gobi = manager.GetModem(modem_path).GobiModem()
    181     if not gobi:
    182         raise cellular_system_error.BadScpiCommand(
    183             'Modem %s does not support %s, cannot change technologies' %
    184             modem_path, family)
    185 
    186     logger.info('Changing firmware to technology family %s' % family)
    187 
    188     FamilyToCarrierString = {
    189             cellular.TechnologyFamily.UMTS: 'Generic UMTS',
    190             cellular.TechnologyFamily.CDMA: 'Verizon Wireless',}
    191 
    192     gobi.SetCarrier(FamilyToCarrierString[family])
    193     return _WaitForModemToReturn(manager, preexisting_modems, modem_path)
    194 
    195 
    196 # A test PRL that has an ID of 3333 and sets the device to aquire the
    197 # default config of an 8960 with system_id 331.  Base64 encoding
    198 # Generated with "base64 < prl"
    199 
    200 TEST_PRL_3333 = (
    201     'ADENBQMAAMAAAYADAgmABgIKDQsEAYAKDUBAAQKWAAICQGAJApYAAgIw8BAAAQDhWA=='.
    202     decode('base64_codec'))
    203 
    204 
    205 # A modem with this MDN will always report itself as activated
    206 TESTING_MDN = dbus.String('1115551212', variant_level=1)
    207 
    208 
    209 def _IsCdmaModemConfiguredCorrectly(manager, modem_path):
    210     """Returns true iff the CDMA modem at modem_path is configured correctly."""
    211     # We don't test for systemID because the PRL should take care of
    212     # that.
    213 
    214     status = manager.GetModem(modem_path).SimpleModem().GetStatus()
    215 
    216     required_settings = {'mdn': TESTING_MDN,
    217                          'min': TESTING_MDN,
    218                          'prl_version': 3333}
    219     configured_correctly = True
    220 
    221     for rk, rv in required_settings.iteritems():
    222         if rk not in status or rv != status[rk]:
    223             logger.error('_CheckCdmaModemStatus:  %s: expected %s, got %s' % (
    224                 rk, rv, status.get(rk)))
    225             configured_correctly = False
    226     return configured_correctly
    227 
    228 
    229 def PrepareCdmaModem(manager, modem_path):
    230     """Configure a CDMA device (including PRL, MIN, and MDN)."""
    231 
    232     if _IsCdmaModemConfiguredCorrectly(manager, modem_path):
    233         return modem_path
    234 
    235     logger.info('Updating modem settings')
    236     preexisting_modems = _EnumerateModems(manager)
    237     cdma = manager.GetModem(modem_path).CdmaModem()
    238 
    239     with tempfile.NamedTemporaryFile() as f:
    240         os.chmod(f.name, 0744)
    241         f.write(TEST_PRL_3333)
    242         f.flush()
    243         logger.info('Calling ActivateManual to change PRL')
    244 
    245         cdma.ActivateManual({
    246             'mdn': TESTING_MDN,
    247             'min': TESTING_MDN,
    248             'prlfile': dbus.String(f.name, variant_level=1),
    249             'system_id': dbus.UInt16(331, variant_level=1),  # Default 8960 SID
    250             'spc': dbus.String('000000'),})
    251         new_path = _WaitForModemToReturn(
    252             manager, preexisting_modems, modem_path)
    253 
    254     if not _IsCdmaModemConfiguredCorrectly(manager, new_path):
    255         raise cellular_system_error.BadState('Modem configuration failed')
    256     return new_path
    257 
    258 
    259 def PrepareModemForTechnology(modem_path, target_technology):
    260     """Prepare modem for the technology: Sets things like firmware, PRL."""
    261 
    262     manager, modem_path = mm.PickOneModem(modem_path)
    263 
    264     logger.info('Found modem %s' % modem_path)
    265 
    266 
    267     # todo(byronk) : This returns TechnologyFamily:UMTS on a Pixel. ????
    268     current_family = manager.GetModem(modem_path).GetCurrentTechnologyFamily()
    269     target_family = cellular.TechnologyToFamily[target_technology]
    270 
    271     if current_family != target_family:
    272         logger.debug('Modem Current Family: %s ' % current_family)
    273         logger.debug('Modem Target Family : %s ' %target_family )
    274         modem_path = SetFirmwareForTechnologyFamily(
    275             manager, modem_path, target_family)
    276 
    277     if target_family == cellular.TechnologyFamily.CDMA:
    278         modem_path = PrepareCdmaModem(manager, modem_path)
    279         # Force the modem to report that is has been activated since we
    280         # use a custom PRL and have already manually activated it.
    281         manager.GetModem(modem_path).GobiModem().ForceModemActivatedStatus()
    282 
    283     # When testing EVDO, we need to force the modem to register with EVDO
    284     # directly (bypassing CDMA 1x RTT) else the modem will not register
    285     # properly because it looks for CDMA 1x RTT first but can't find it
    286     # because the call box can only emulate one technology at a time (EVDO).
    287     try:
    288         if target_technology == cellular.Technology.EVDO_1X:
    289             network_preference = modem.Modem.NETWORK_PREFERENCE_EVDO_1X
    290         else:
    291             network_preference = modem.Modem.NETWORK_PREFERENCE_AUTOMATIC
    292         gobi = manager.GetModem(modem_path).GobiModem()
    293         gobi.SetNetworkPreference(network_preference)
    294     except AttributeError:
    295         # Not a Gobi modem
    296         pass
    297 
    298     return modem_path
    299 
    300 
    301 def FactoryResetModem(modem_pattern, spc='000000'):
    302     """Factory resets modem, returns DBus pathname of modem after reset."""
    303     manager, modem_path = mm.PickOneModem(modem_pattern)
    304     preexisting_modems = _EnumerateModems(manager)
    305     modem = manager.GetModem(modem_path).Modem()
    306     modem.FactoryReset(spc)
    307     return _WaitForModemToReturn(manager, preexisting_modems, modem_path)
    308 
    309 
    310 class OtherDeviceShutdownContext(object):
    311     """Context manager that shuts down other devices.
    312 
    313     Usage:
    314         with cell_tools.OtherDeviceShutdownContext('cellular'):
    315             block
    316 
    317     TODO(rochberg):  Replace flimflam.DeviceManager with this
    318     """
    319 
    320     def __init__(self, device_type):
    321         self.device_type = device_type
    322         self.device_manager = None
    323 
    324     def __enter__(self):
    325         self.device_manager = flimflam.DeviceManager(flimflam.FlimFlam())
    326         self.device_manager.ShutdownAllExcept(self.device_type)
    327         return self
    328 
    329     def __exit__(self, exception, value, traceback):
    330         if self.device_manager:
    331             self.device_manager.RestoreDevices()
    332         return False
    333 
    334 
    335 class AutoConnectContext(object):
    336     """Context manager which sets autoconnect to either true or false.
    337 
    338        Enable or Disable autoconnect for the cellular service.
    339        Restore it when done.
    340 
    341        Usage:
    342            with cell_tools.DisableAutoConnectContext(device, flim, autoconnect):
    343                block
    344     """
    345 
    346     def __init__(self, device, flim, autoconnect):
    347         self.device = device
    348         self.flim = flim
    349         self.autoconnect = autoconnect
    350         self.autoconnect_changed = False
    351 
    352     def PowerOnDevice(self, device):
    353         """Power on a flimflam device, ignoring in progress errors."""
    354         logger.info('powered = %s' % device.GetProperties()['Powered'])
    355         if device.GetProperties()['Powered']:
    356             return
    357         try:
    358             device.Enable()
    359         except dbus.exceptions.DBusException, e:
    360             if e._dbus_error_name != 'org.chromium.flimflam.Error.InProgress':
    361                 raise e
    362 
    363     def __enter__(self):
    364         """Power up device, get the service and disable autoconnect."""
    365         changed = False
    366         self.PowerOnDevice(self.device)
    367 
    368         # Use SERVICE_TIMEOUT*2 here because it may take SERVICE_TIMEOUT
    369         # seconds for the modem to disconnect when the base emulator is taken
    370         # offline for reconfiguration and then another SERVICE_TIMEOUT
    371         # seconds for the modem to reconnect after the base emulator is
    372         # brought back online.
    373         #
    374         # TODO(jglasgow): generalize to use services associated with device
    375         service = self.flim.FindCellularService(timeout=SERVICE_TIMEOUT*2)
    376         if not service:
    377             raise error.TestFail('No cellular service available.')
    378 
    379         # Always set the AutoConnect property even if the requested value
    380         # is the same so that shill will retain the AutoConnect property, else
    381         # shill may override it.
    382         props = service.GetProperties()
    383         autoconnect = props['AutoConnect']
    384         logger.info('AutoConnect = %s' % autoconnect)
    385         logger.info('Setting AutoConnect = %s.', self.autoconnect)
    386         service.SetProperty('AutoConnect', dbus.Boolean(self.autoconnect))
    387 
    388         if autoconnect != self.autoconnect:
    389             props = service.GetProperties()
    390             autoconnect = props['AutoConnect']
    391             changed = True
    392 
    393         # Make sure the cellular service gets persisted by taking it out of
    394         # the ephemeral profile.
    395         if not props['Profile']:
    396             manager_props = self.flim.manager.GetProperties()
    397             active_profile = manager_props['ActiveProfile']
    398             logger.info("Setting cellular service profile to %s",
    399                         active_profile)
    400             service.SetProperty('Profile', active_profile)
    401 
    402         if autoconnect != self.autoconnect:
    403             raise error.TestFail('AutoConnect is %s, but we want it to be %s' %
    404                                  (autoconnect, self.autoconnect))
    405 
    406         self.autoconnect_changed = changed
    407 
    408         return self
    409 
    410     def __exit__(self, exception, value, traceback):
    411         """Restore autoconnect state if we changed it."""
    412         if not self.autoconnect_changed:
    413             return False
    414 
    415         try:
    416             self.PowerOnDevice(self.device)
    417         except Exception as e:
    418             if exception:
    419                 logger.error(
    420                     'Exiting AutoConnectContext with one exception, but ' +
    421                     'PowerOnDevice raised another')
    422                 logger.error(
    423                     'Swallowing PowerOnDevice exception %s' % e)
    424                 return False
    425             else:
    426                 raise e
    427 
    428         # TODO(jglasgow): generalize to use services associated with
    429         # device, and restore state only on changed services
    430         service = self.flim.FindCellularService()
    431         if not service:
    432             logger.error('Cannot find cellular service.  '
    433                           'Autoconnect state not restored.')
    434             return False
    435         service.SetProperty('AutoConnect', dbus.Boolean(not self.autoconnect))
    436 
    437         return False
    438