Home | History | Annotate | Download | only in wardmodem
      1 #!/usr/bin/env python
      2 
      3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 import at_transceiver
      8 
      9 import logging
     10 import mox
     11 import os
     12 import unittest
     13 
     14 import at_channel
     15 import modem_configuration
     16 import task_loop
     17 import wardmodem_exceptions as wme
     18 
     19 class ATTransceiverTestCase(unittest.TestCase):
     20     """
     21     Base test fixture for ATTransceiver class.
     22 
     23     """
     24     class TestMachine(object):
     25         """ Stub test machine used by tests below. """
     26         def test_function(self, _):
     27             """
     28             A stub StateMachine API function.
     29 
     30             wardmodem calls will be placed to this function.
     31 
     32             @param _: Ignored.
     33 
     34             """
     35             pass
     36 
     37 
     38         # Needed in a test machine.
     39         def get_well_known_name(self):
     40             """ Get the well known name of this machine as str. """
     41             return "test_machine"
     42 
     43 
     44     def setUp(self):
     45         self._mox = mox.Mox()
     46 
     47         # Create a temporary pty pair for the ATTransceiver constructor
     48         master, slave = os.openpty()
     49 
     50         self._modem_conf = modem_configuration.ModemConfiguration()
     51         self._at_transceiver = at_transceiver.ATTransceiver(slave,
     52                                                             self._modem_conf,
     53                                                             slave)
     54 
     55         # Now replace internal objects in _at_transceiver with mocks
     56         self._at_transceiver._modem_response_timeout_milliseconds = 0
     57         self._mock_modem_channel = self._mox.CreateMock(at_channel.ATChannel)
     58         self._at_transceiver._modem_channel = self._mock_modem_channel
     59         self._mock_mm_channel = self._mox.CreateMock(at_channel.ATChannel)
     60         self._at_transceiver._mm_channel = self._mock_mm_channel
     61         self._mock_task_loop = self._mox.CreateMock(task_loop.TaskLoop)
     62         self._at_transceiver._task_loop = self._mock_task_loop
     63 
     64         # Also empty out the internal maps, so that actual loaded configuration
     65         # does not interfere with the test.
     66         self._at_transceiver._at_to_wm_action_map = {}
     67         self._at_transceiver._wm_response_to_at_map = {}
     68 
     69 
     70 class ATTransceiverCommonTestCase(ATTransceiverTestCase):
     71     """
     72     Tests common to all three modes of ATTransceiver.
     73 
     74     """
     75 
     76     def test_successful_mode_selection(self):
     77         """
     78         Test that all modes can be selected, when both channels are provided.
     79 
     80         """
     81         self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
     82         self.assertEqual(self._at_transceiver.mode,
     83                          at_transceiver.ATTransceiverMode.WARDMODEM)
     84         self._at_transceiver.mode = (
     85                 at_transceiver.ATTransceiverMode.PASS_THROUGH)
     86         self.assertEqual(self._at_transceiver.mode,
     87                          at_transceiver.ATTransceiverMode.PASS_THROUGH)
     88         self._at_transceiver.mode = (
     89                at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
     90         self.assertEqual(self._at_transceiver.mode,
     91                          at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
     92 
     93     def test_unsuccessful_mode_selection(self):
     94         """
     95         Test that only WARDMODEM mode can be selected if the modem channel is
     96         missing.
     97 
     98         """
     99         self._at_transceiver._modem_channel = None
    100         self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
    101         self.assertEqual(self._at_transceiver.mode,
    102                          at_transceiver.ATTransceiverMode.WARDMODEM)
    103         self._at_transceiver.mode = (
    104                 at_transceiver.ATTransceiverMode.PASS_THROUGH)
    105         self.assertEqual(self._at_transceiver.mode,
    106                          at_transceiver.ATTransceiverMode.WARDMODEM)
    107         self._at_transceiver.mode = (
    108                at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
    109         self.assertEqual(self._at_transceiver.mode,
    110                          at_transceiver.ATTransceiverMode.WARDMODEM)
    111 
    112 
    113     def test_update_at_to_wm_action_map(self):
    114         """
    115         Test that _at_to_wm_action_map is updated correctly under different
    116         scenarios.
    117 
    118         """
    119         # The diffs if this test fails can be rather long.
    120         self.maxDiff = None
    121         self._at_transceiver._at_to_wm_action_map = {}
    122 
    123         # Test initialization
    124         raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'),
    125                    'AT2=1,2': ('STATE_MACHINE2', 'function2'),
    126                    'AT3=*,care,do': ('STATE_MACHINE3', 'function3', (0, 1)),
    127                    'AT4?': ('STATE_MACHINE4', 'function4'),
    128                    'AT5=': ('STATE_MACHINE5', 'function5', ()),
    129                    'AT5=*': ('STATE_MACHINE6', 'function6')}
    130         parsed_map = {'AT1=': {(): ('STATE_MACHINE1', 'function1', ())},
    131                       'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())},
    132                       'AT3=': {('*','care','do'): ('STATE_MACHINE3',
    133                                                    'function3', (0, 1))},
    134                       'AT4?': {(): ('STATE_MACHINE4', 'function4', ())},
    135                       'AT5=': {(): ('STATE_MACHINE5', 'function5', ()),
    136                                ('*',): ('STATE_MACHINE6', 'function6', ())}}
    137 
    138         self._at_transceiver._update_at_to_wm_action_map(raw_map)
    139         self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map)
    140 
    141         # Test update
    142         raw_good_update = {'AT1=': ('STATE_MACHINE7', 'function7'),
    143                            'AT5=2': ('STATE_MACHINE8', 'function8', 0),
    144                            'AT6?': ('STATE_MACHINE9', 'function9')}
    145         parsed_map = {'AT1=': {(): ('STATE_MACHINE7', 'function7', ())},
    146                       'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())},
    147                       'AT3=': {('*','care','do'): ('STATE_MACHINE3',
    148                                                    'function3', (0, 1))},
    149                       'AT4?': {(): ('STATE_MACHINE4', 'function4', ())},
    150                       'AT5=': {(): ('STATE_MACHINE5', 'function5', ()),
    151                                ('*',): ('STATE_MACHINE6', 'function6', ()),
    152                                ('2',): ('STATE_MACHINE8', 'function8', (0,))},
    153                       'AT6?': {(): ('STATE_MACHINE9', 'function9', ())}}
    154         self._at_transceiver._update_at_to_wm_action_map(raw_good_update)
    155         self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map)
    156 
    157 
    158     def test_find_wardmodem_action_for_at(self):
    159         """
    160         Setup _at_to_wm_action_map in the test and then test whether we can find
    161         actions for AT commands off of that map.
    162 
    163         """
    164         raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'),
    165                    'AT2=1,2': ('STATE_MACHINE2', 'function2'),
    166                    'AT3=*,b,c': ('STATE_MACHINE3', 'function3', (0, 1)),
    167                    'AT4?': ('STATE_MACHINE4', 'function4'),
    168                    'AT5=': ('STATE_MACHINE5', 'function5', ()),
    169                    'AT5=*': ('STATE_MACHINE6', 'function6')}
    170         self._at_transceiver._update_at_to_wm_action_map(raw_map)
    171 
    172         self.assertEqual(
    173                 ('STATE_MACHINE1', 'function1', ()),
    174                 self._at_transceiver._find_wardmodem_action_for_at('AT1='))
    175         self.assertEqual(
    176                 ('STATE_MACHINE2', 'function2', ()),
    177                 self._at_transceiver._find_wardmodem_action_for_at('AT2=1,2'))
    178         self.assertEqual(
    179                 ('STATE_MACHINE3', 'function3', ('a','b')),
    180                 self._at_transceiver._find_wardmodem_action_for_at('AT3=a,b,c'))
    181         self.assertEqual(
    182                 ('STATE_MACHINE3', 'function3', ('','b')),
    183                 self._at_transceiver._find_wardmodem_action_for_at('AT3=,b,c'))
    184         self.assertEqual(
    185                 ('STATE_MACHINE5', 'function5', ()),
    186                 self._at_transceiver._find_wardmodem_action_for_at('AT5='))
    187         self.assertEqual(
    188                 ('STATE_MACHINE6', 'function6', ()),
    189                 self._at_transceiver._find_wardmodem_action_for_at('AT5=s'))
    190         # Unsuccessful cases
    191         self.assertRaises(
    192                 wme.ATTransceiverException,
    193                 self._at_transceiver._find_wardmodem_action_for_at,
    194                 'DOESNOTEXIST')
    195 
    196 
    197     def test_find_wardmodem_action_for_at_returns_fallback(self):
    198         """
    199         Test that when a fallback machine is setup, and unmatched AT command is
    200         forwarded to this machine.
    201 
    202         """
    203         mock_test_machine = self._mox.CreateMock(self.TestMachine)
    204         mock_test_machine.get_well_known_name().MultipleTimes().AndReturn(
    205                 'FALLBACK_MACHINE')
    206         self._mox.ReplayAll()
    207         self._at_transceiver.register_state_machine(mock_test_machine)
    208         self._at_transceiver.register_fallback_state_machine(
    209                 mock_test_machine.get_well_known_name(),
    210                 'act_on')
    211         self.assertEqual(
    212                 ('FALLBACK_MACHINE', 'act_on', ('DOESNOTEXIST',)),
    213                 self._at_transceiver._find_wardmodem_action_for_at(
    214                         'DOESNOTEXIST'))
    215         self._mox.VerifyAll()
    216 
    217 
    218     def test_post_wardmodem_request(self):
    219         """
    220         Test that a wardmodem request can be posted successfully end-to-end.
    221 
    222         """
    223         raw_map = {'AT=*': ('TestMachine', 'test_function', 0)}
    224         arg = 'fake_arg'
    225         command = 'AT=' + arg
    226         mock_test_machine = self._mox.CreateMock(self.TestMachine)
    227         self._at_transceiver._update_at_to_wm_action_map(raw_map)
    228         mock_test_machine.get_well_known_name().AndReturn('TestMachine')
    229         self._mock_task_loop.post_task(
    230                 self._at_transceiver._execute_state_machine_function,
    231                 command, mox.IgnoreArg(), mock_test_machine.test_function,
    232                 arg)
    233 
    234         self._mox.ReplayAll()
    235         self._at_transceiver.register_state_machine(mock_test_machine)
    236         self._at_transceiver._post_wardmodem_request(command)
    237         self._mox.VerifyAll()
    238 
    239 
    240     def test_update_wm_response_to_at_map(self):
    241         """
    242         Test that the wm_response_to_at_map is correctly updated.
    243 
    244         """
    245         raw_map = {'some_function': 'AT=some_function',
    246                    'some_other_function': 'AT=some_other_function'}
    247         self._at_transceiver._update_wm_response_to_at_map(raw_map)
    248         self.assertEqual(raw_map,
    249                          self._at_transceiver._wm_response_to_at_map)
    250 
    251         raw_map = {'some_other_function': 'AT=overwritten_function',
    252                    'some_new_function': 'AT=this_is_new_too'}
    253         updated_map = {'some_function': 'AT=some_function',
    254                        'some_other_function': 'AT=overwritten_function',
    255                        'some_new_function': 'AT=this_is_new_too'}
    256         self._at_transceiver._update_wm_response_to_at_map(raw_map)
    257         self.assertEqual(updated_map,
    258                          self._at_transceiver._wm_response_to_at_map)
    259 
    260 
    261     def test_construct_at_response(self):
    262         """
    263         Test that construct_at_response correctly replaces by actual arguments.
    264 
    265         """
    266         self.assertEqual(
    267                 'AT=arg1,some,arg2',
    268                 self._at_transceiver._construct_at_response(
    269                         'AT=*,some,*', 'arg1','arg2'))
    270         self.assertEqual(
    271                 'AT=1,some,thing',
    272                 self._at_transceiver._construct_at_response(
    273                         'AT=*,some,thing', 1))
    274         self.assertEqual(
    275                 'AT=some,other,thing',
    276                 self._at_transceiver._construct_at_response(
    277                         'AT=some,other,thing'))
    278         self.assertEqual(
    279                 'AT=needsnone',
    280                 self._at_transceiver._construct_at_response(
    281                         'AT=needsnone', 'butonegiven'))
    282         # Unsuccessful cases
    283         self.assertRaises(
    284                 wme.ATTransceiverException,
    285                 self._at_transceiver._construct_at_response,
    286                 'AT=*,needstwo,*', 'onlyonegiven')
    287 
    288 
    289     def test_process_wardmodem_response(self):
    290         """
    291         A basic test for process_wardmodem_response.
    292 
    293         """
    294         self._mox.StubOutWithMock(self._at_transceiver,
    295                              '_process_wardmodem_at_command')
    296         raw_map = {'func1': 'AT=*,given,*',
    297                    'func2': 'AT=nothing,needed'}
    298         self._at_transceiver._update_wm_response_to_at_map(raw_map)
    299 
    300         self._at_transceiver._process_wardmodem_at_command('AT=a,given,2')
    301         self._at_transceiver._process_wardmodem_at_command('AT=nothing,needed')
    302 
    303         self._mox.ReplayAll()
    304         self._at_transceiver.process_wardmodem_response('func1','a',2)
    305         self._at_transceiver.process_wardmodem_response('func2')
    306         self._mox.UnsetStubs()
    307         self._mox.VerifyAll()
    308 
    309 
    310 class ATTransceiverWardModemTestCase(ATTransceiverTestCase):
    311     """
    312     Test ATTransceiver class in the WARDMODEM mode.
    313 
    314     """
    315 
    316     def setUp(self):
    317         super(ATTransceiverWardModemTestCase, self).setUp()
    318         self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
    319 
    320 
    321     def test_wardmodem_at_command(self):
    322         """
    323         Test the case when AT command is received from wardmodem.
    324 
    325         """
    326         at_command = 'AT+commmmmmmmmand'
    327         self._mock_mm_channel.send(at_command)
    328 
    329         self._mox.ReplayAll()
    330         self._at_transceiver._process_wardmodem_at_command(at_command)
    331         self._mox.VerifyAll()
    332 
    333 
    334     def test_mm_at_command(self):
    335         """
    336         Test the case when AT command is received from modem manager.
    337 
    338         """
    339         at_command = 'AT+commmmmmmmmand'
    340         self._mox.StubOutWithMock(self._at_transceiver,
    341                                   '_post_wardmodem_request')
    342 
    343         self._at_transceiver._post_wardmodem_request(at_command)
    344 
    345         self._mox.ReplayAll()
    346         self._at_transceiver._process_mm_at_command(at_command)
    347         self._mox.UnsetStubs()
    348         self._mox.VerifyAll()
    349 
    350 
    351 class ATTransceiverPassThroughTestCase(ATTransceiverTestCase):
    352     """
    353     Test ATTransceiver class in the PASS_THROUGH mode.
    354 
    355     """
    356 
    357     def setUp(self):
    358         super(ATTransceiverPassThroughTestCase, self).setUp()
    359         self._at_transceiver.mode = (
    360                 at_transceiver.ATTransceiverMode.PASS_THROUGH)
    361 
    362 
    363     def test_modem_at_command(self):
    364         """
    365         Test the case when AT command received from physical modem.
    366 
    367         """
    368         at_command = 'AT+commmmmmmmmand'
    369         self._mock_mm_channel.send(at_command)
    370 
    371         self._mox.ReplayAll()
    372         self._at_transceiver._process_modem_at_command(at_command)
    373         self._mox.VerifyAll()
    374 
    375 
    376     def test_mm_at_command(self):
    377         """
    378         Test the case when AT command is received from modem manager.
    379 
    380         """
    381         at_command = 'AT+commmmmmmmmand'
    382         self._mock_modem_channel.send(at_command)
    383 
    384         self._mox.ReplayAll()
    385         self._at_transceiver._process_mm_at_command(at_command)
    386         self._mox.VerifyAll()
    387 
    388 
    389 class ATTransceiverSplitVerifyTestCase(ATTransceiverTestCase):
    390     """
    391     Test ATTransceiver class in the SPLIT_VERIFY mode.
    392 
    393     """
    394 
    395     def setUp(self):
    396         super(ATTransceiverSplitVerifyTestCase, self).setUp()
    397         self._at_transceiver.mode = (
    398                 at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
    399 
    400 
    401     def test_mm_at_command(self):
    402         """
    403         Test that that incoming modem manager command is multiplexed to
    404         wardmodem and physical modem.
    405 
    406         """
    407         at_command = 'AT+commmmmmmmmand'
    408         self._mox.StubOutWithMock(self._at_transceiver,
    409                                   '_post_wardmodem_request')
    410         self._mock_modem_channel.send(at_command).InAnyOrder()
    411         self._at_transceiver._post_wardmodem_request(at_command).InAnyOrder()
    412 
    413         self._mox.ReplayAll()
    414         self._at_transceiver._process_mm_at_command(at_command)
    415         self._mox.UnsetStubs()
    416         self._mox.VerifyAll()
    417 
    418 
    419     def test_successful_single_at_response_modem_wardmodem(self):
    420         """
    421         Test the case when one AT response is received successfully.
    422         In this case, physical modem command comes first.
    423 
    424         """
    425         at_command = 'AT+commmmmmmmmand'
    426         self._mock_mm_channel.send(at_command)
    427 
    428         self._mox.ReplayAll()
    429         self._at_transceiver._process_modem_at_command(at_command)
    430         self._at_transceiver._process_wardmodem_at_command(at_command)
    431         self._mox.VerifyAll()
    432 
    433 
    434     def test_successful_single_at_response_wardmodem_modem(self):
    435         """
    436         Test the case when one AT response is received successfully.
    437         In this case, wardmodem command comes first.
    438 
    439         """
    440         at_command = 'AT+commmmmmmmmand'
    441         task_id = 3
    442         self._mock_task_loop.post_task_after_delay(
    443                 self._at_transceiver._modem_response_timed_out,
    444                 mox.IgnoreArg()).AndReturn(task_id)
    445         self._mock_task_loop.cancel_posted_task(task_id)
    446         self._mock_mm_channel.send(at_command)
    447 
    448         self._mox.ReplayAll()
    449         self._at_transceiver._process_wardmodem_at_command(at_command)
    450         self._at_transceiver._process_modem_at_command(at_command)
    451         self._mox.VerifyAll()
    452 
    453     def test_mismatched_at_response(self):
    454         """
    455         Test the case when both responses arrive, but are not identical.
    456 
    457         """
    458         wardmodem_command = 'AT+wardmodem'
    459         modem_command = 'AT+modem'
    460         self._mox.StubOutWithMock(self._at_transceiver,
    461                                   '_report_verification_failure')
    462         self._at_transceiver._report_verification_failure(
    463                 self._at_transceiver.VERIFICATION_FAILED_MISMATCH,
    464                 modem_command,
    465                 wardmodem_command)
    466         self._mock_mm_channel.send(wardmodem_command)
    467 
    468         self._mox.ReplayAll()
    469         self._at_transceiver._process_modem_at_command(modem_command)
    470         self._at_transceiver._process_wardmodem_at_command(wardmodem_command)
    471         self._mox.UnsetStubs()
    472         self._mox.VerifyAll()
    473 
    474 
    475     def test_modem_response_times_out(self):
    476         """
    477         Test the case when the physical modem fails to respond.
    478 
    479         """
    480         at_command = 'AT+commmmmmmmmand'
    481         task_id = 3
    482         self._mox.StubOutWithMock(self._at_transceiver,
    483                                   '_report_verification_failure')
    484 
    485         self._mock_task_loop.post_task_after_delay(
    486                 self._at_transceiver._modem_response_timed_out,
    487                 mox.IgnoreArg()).AndReturn(task_id)
    488         self._at_transceiver._report_verification_failure(
    489                 self._at_transceiver.VERIFICATION_FAILED_TIME_OUT,
    490                 None,
    491                 at_command)
    492         self._mock_mm_channel.send(at_command)
    493 
    494         self._mox.ReplayAll()
    495         self._at_transceiver._process_wardmodem_at_command(at_command)
    496         self._at_transceiver._modem_response_timed_out()
    497         self._mox.UnsetStubs()
    498         self._mox.VerifyAll()
    499 
    500 
    501     def test_multiple_successful_responses(self):
    502         """
    503         Test the case two wardmodem responses are queued, and then two matching
    504         modem responses are received.
    505 
    506         """
    507         first_at_command = 'AT+first'
    508         second_at_command = 'AT+second'
    509         first_task_id = 3
    510         second_task_id = 4
    511 
    512         self._mock_task_loop.post_task_after_delay(
    513                 self._at_transceiver._modem_response_timed_out,
    514                 mox.IgnoreArg()).AndReturn(first_task_id)
    515         self._mock_task_loop.cancel_posted_task(first_task_id)
    516         self._mock_mm_channel.send(first_at_command)
    517         self._mock_task_loop.post_task_after_delay(
    518                 self._at_transceiver._modem_response_timed_out,
    519                 mox.IgnoreArg()).AndReturn(second_task_id)
    520         self._mock_task_loop.cancel_posted_task(second_task_id)
    521         self._mock_mm_channel.send(second_at_command)
    522 
    523         self._mox.ReplayAll()
    524         self._at_transceiver._process_wardmodem_at_command(first_at_command)
    525         self._at_transceiver._process_wardmodem_at_command(second_at_command)
    526         self._at_transceiver._process_modem_at_command(first_at_command)
    527         self._at_transceiver._process_modem_at_command(second_at_command)
    528         self._mox.VerifyAll()
    529 
    530 
    531 if __name__ == '__main__':
    532     logging.basicConfig(level=logging.DEBUG)
    533     unittest.main()
    534