Home | History | Annotate | Download | only in wardmodem
      1 #!/usr/bin/env python
      2 
      3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 import at_channel
      8 
      9 import fcntl
     10 import functools
     11 import glib
     12 import logging
     13 import mox
     14 import os
     15 import tempfile
     16 import unittest
     17 
     18 import task_loop
     19 
     20 class ATChannelTestCase(unittest.TestCase):
     21     """
     22     Test fixture for ATChannel class.
     23 
     24     """
     25 
     26     def setUp(self):
     27         self.mox = mox.Mox()
     28 
     29         master, slave = os.openpty()
     30         self._at_channel = at_channel.ATChannel(
     31                 self._recieve_command_local_callback, slave, 'test')
     32 
     33         # Replace the channel inside _at_channel with a tempfile
     34         # We will use the tempfile to simulate a tty pair.
     35         os.close(master)
     36         os.close(slave)
     37         self._channel_file = tempfile.TemporaryFile(mode = 'w+')
     38         # These properties are a copy of the properties set in ATChannel for the
     39         # tty pair.
     40         flags = fcntl.fcntl(self._channel_file.fileno(), fcntl.F_GETFL)
     41         flags = flags | os.O_NONBLOCK
     42         fcntl.fcntl(self._channel_file.fileno(), fcntl.F_SETFL, flags)
     43         self._at_channel._channel = self._channel_file.fileno()
     44         # We need to seek() to the beginning of the file to simulate tty read.
     45         # So remember the head of the file.
     46         self._channel_file_head = self._channel_file.tell()
     47 
     48         # Also mock out the task_loop
     49         self._mox_task_loop = self.mox.CreateMock(task_loop.TaskLoop)
     50         self._at_channel._task_loop = self._mox_task_loop
     51 
     52 
     53     def tearDown(self):
     54         self._channel_file.close()
     55 
     56     # ##########################################################################
     57     # Tests
     58 
     59     def test_successful_send(self):
     60         """
     61         Test that a single AT command can be sent on the channel.
     62 
     63         """
     64         payload = 'A not so huge AT+CEREG command.'
     65         self._at_channel.send(payload)
     66         received_command = self._recieve_command_remote()
     67         self.assertTrue(received_command.endswith('\r\n'))
     68         self.assertEqual(payload.strip(), received_command.strip())
     69 
     70         # Change the AT command guard strings and check again.
     71         self._at_channel.at_prefix = '$$'
     72         self._at_channel.at_suffix = '##'
     73         payload = 'A not so huge AT+CEREG command.'
     74         self._at_channel.send(payload)
     75         received_command = self._recieve_command_remote()
     76         self.assertTrue(received_command.startswith('$$'))
     77         self.assertTrue(received_command.endswith('##'))
     78         self.assertEqual(payload.strip(),
     79                          received_command.strip('$$').strip('##'))
     80 
     81 
     82     def test_recieve_single_at_command(self):
     83         """
     84         Test that a single AT command can be received together on the channel.
     85 
     86         """
     87         payload = 'We send you our AT+good wishes too!\r\n'
     88         callback = lambda channel, payload: None
     89         self._at_channel._receiver_callback = callback
     90         self._mox_task_loop.post_task(callback, payload.strip())
     91         self.mox.ReplayAll()
     92         self._send_command_remote(payload)
     93         self._at_channel._handle_channel_cb(self._channel_file.fileno(),
     94                                           glib.IO_IN)
     95         self.mox.VerifyAll()
     96 
     97 
     98     def test_receive_at_commands_differet_terminators(self):
     99         """
    100         Test that AT commands are recieved correctly when different supported
    101         termination strings are being used.
    102 
    103         """
    104         # ; is a continuation marker. AT1;2 == AT1\r\nAT2
    105         payloads = ['AT1\r\nA', 'T2\rA', 'T3\nA', 'T4;', '5\r\n']
    106         callback = lambda channel, payload: None
    107         self._at_channel._receiver_callback = callback
    108         self._mox_task_loop.post_task(callback, 'AT1')
    109         self._mox_task_loop.post_task(callback, 'AT2')
    110         self._mox_task_loop.post_task(callback, 'AT3')
    111         self._mox_task_loop.post_task(callback, 'AT4')
    112         self._mox_task_loop.post_task(callback, 'AT5')
    113 
    114         self.mox.ReplayAll()
    115         for payload in payloads:
    116             self._send_command_remote(payload)
    117             self._at_channel._handle_channel_cb(self._channel_file.fileno(),
    118                                                 glib.IO_IN)
    119         self.mox.VerifyAll()
    120 
    121 
    122     def test_recieve_at_commands_in_parts(self):
    123         """
    124         Test that a multiple AT commands can be received in parts on the
    125         channel.
    126 
    127         """
    128         payloads = ['AT1', '11\r\n', '\r\nAT22', '2\r\nAT333', '\r\n']
    129         callback = lambda channel, payload: None
    130         self._at_channel._receiver_callback = callback
    131         self._mox_task_loop.post_task(callback, 'AT111')
    132         self._mox_task_loop.post_task(callback, 'AT222')
    133         self._mox_task_loop.post_task(callback, 'AT333')
    134 
    135         self.mox.ReplayAll()
    136         for payload in payloads:
    137             self._send_command_remote(payload)
    138             self._at_channel._handle_channel_cb(self._channel_file.fileno(),
    139                                                 glib.IO_IN)
    140         self.mox.VerifyAll()
    141 
    142 
    143     def test_recieve_long_at_commands(self):
    144         """
    145         Test that a multiple AT commands can be received in parts on the
    146         channel.
    147 
    148         """
    149         payloads = ['AT1+',
    150                     '123456789\r\nAT2+123456789\r\nAT3+1234567',
    151                     '89\r\n']
    152         callback = lambda channel, payload: None
    153         self._at_channel._receiver_callback = callback
    154         self._mox_task_loop.post_task(callback, 'AT1+123456789')
    155         self._mox_task_loop.post_task(callback, 'AT2+123456789')
    156         self._mox_task_loop.post_task(callback, 'AT3+123456789')
    157 
    158         self.mox.ReplayAll()
    159         at_channel.CHANNEL_READ_CHUNK_SIZE = 4
    160         for payload in payloads:
    161             self._send_command_remote(payload)
    162             self._at_channel._handle_channel_cb(self._channel_file.fileno(),
    163                                                 glib.IO_IN)
    164         self.mox.VerifyAll()
    165 
    166     # ##########################################################################
    167     # Helper functions
    168 
    169     def _clean_channel_file(self):
    170         """
    171         Clean the tempfile used to simulate tty, and reset the r/w head.
    172 
    173         """
    174         self._channel_file.truncate(0)
    175         self._channel_file_head = self._channel_file.tell()
    176 
    177 
    178     def _send_command_remote(self, payload):
    179         """
    180         Simulate a command being sent from the remote tty port.
    181 
    182         @param payload: The command to send.
    183 
    184         """
    185         self._clean_channel_file()
    186         self._channel_file.write(payload)
    187         self._channel_file.flush()
    188         self._channel_file.seek(self._channel_file_head)
    189 
    190 
    191     def _recieve_command_remote(self):
    192         """
    193         Simluate a command being received at the remote tty port.
    194 
    195         """
    196         self._channel_file.flush()
    197         self._channel_file.seek(self._channel_file_head)
    198         payload_list = []
    199         for buf in iter(functools.partial(self._channel_file.read, 128), ''):
    200             payload_list.append(buf)
    201         self._clean_channel_file()
    202         return ''.join(payload_list)
    203 
    204 
    205     def _recieve_command_local_callback(self, payload):
    206         pass
    207 
    208 
    209 if __name__ == '__main__':
    210     logging.basicConfig(level=logging.DEBUG)
    211     unittest.main()
    212