Home | History | Annotate | Download | only in test
      1 #!/usr/bin/env python
      2 #
      3 # Copyright 2011, Google Inc.
      4 # All rights reserved.
      5 #
      6 # Redistribution and use in source and binary forms, with or without
      7 # modification, are permitted provided that the following conditions are
      8 # met:
      9 #
     10 #     * Redistributions of source code must retain the above copyright
     11 # notice, this list of conditions and the following disclaimer.
     12 #     * Redistributions in binary form must reproduce the above
     13 # copyright notice, this list of conditions and the following disclaimer
     14 # in the documentation and/or other materials provided with the
     15 # distribution.
     16 #     * Neither the name of Google Inc. nor the names of its
     17 # contributors may be used to endorse or promote products derived from
     18 # this software without specific prior written permission.
     19 #
     20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     31 
     32 
     33 """Tests for dispatch module."""
     34 
     35 
     36 import os
     37 import unittest
     38 
     39 import set_sys_path  # Update sys.path to locate mod_pywebsocket module.
     40 
     41 from mod_pywebsocket import dispatch
     42 from mod_pywebsocket import handshake
     43 from test import mock
     44 
     45 
     46 _TEST_HANDLERS_DIR = os.path.join(
     47         os.path.split(__file__)[0], 'testdata', 'handlers')
     48 
     49 _TEST_HANDLERS_SUB_DIR = os.path.join(_TEST_HANDLERS_DIR, 'sub')
     50 
     51 
     52 class DispatcherTest(unittest.TestCase):
     53     """A unittest for dispatch module."""
     54 
     55     def test_normalize_path(self):
     56         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
     57                          dispatch._normalize_path('/a/b'))
     58         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
     59                          dispatch._normalize_path('\\a\\b'))
     60         self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
     61                          dispatch._normalize_path('/a/c/../b'))
     62         self.assertEqual(os.path.abspath('abc').replace('\\', '/'),
     63                          dispatch._normalize_path('abc'))
     64 
     65     def test_converter(self):
     66         converter = dispatch._create_path_to_resource_converter('/a/b')
     67         # Python built by MSC inserts a drive name like 'C:\' via realpath().
     68         # Converter Generator expands provided path using realpath() and uses
     69         # the path including a drive name to verify the prefix.
     70         os_root = os.path.realpath('/')
     71         self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
     72         self.assertEqual('/c/h', converter(os_root + 'a/b/c/h_wsh.py'))
     73         self.assertEqual(None, converter(os_root + 'a/b/h.py'))
     74         self.assertEqual(None, converter('a/b/h_wsh.py'))
     75 
     76         converter = dispatch._create_path_to_resource_converter('a/b')
     77         self.assertEqual('/h', converter(dispatch._normalize_path(
     78             'a/b/h_wsh.py')))
     79 
     80         converter = dispatch._create_path_to_resource_converter('/a/b///')
     81         self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
     82         self.assertEqual('/h', converter(dispatch._normalize_path(
     83             '/a/b/../b/h_wsh.py')))
     84 
     85         converter = dispatch._create_path_to_resource_converter(
     86             '/a/../a/b/../b/')
     87         self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
     88 
     89         converter = dispatch._create_path_to_resource_converter(r'\a\b')
     90         self.assertEqual('/h', converter(os_root + r'a\b\h_wsh.py'))
     91         self.assertEqual('/h', converter(os_root + r'a/b/h_wsh.py'))
     92 
     93     def test_enumerate_handler_file_paths(self):
     94         paths = list(
     95             dispatch._enumerate_handler_file_paths(_TEST_HANDLERS_DIR))
     96         paths.sort()
     97         self.assertEqual(8, len(paths))
     98         expected_paths = [
     99                 os.path.join(_TEST_HANDLERS_DIR, 'abort_by_user_wsh.py'),
    100                 os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'),
    101                 os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'),
    102                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
    103                              'exception_in_transfer_wsh.py'),
    104                 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'),
    105                 os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'),
    106                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
    107                              'wrong_handshake_sig_wsh.py'),
    108                 os.path.join(_TEST_HANDLERS_DIR, 'sub',
    109                              'wrong_transfer_sig_wsh.py'),
    110                 ]
    111         for expected, actual in zip(expected_paths, paths):
    112             self.assertEqual(expected, actual)
    113 
    114     def test_source_handler_file(self):
    115         self.assertRaises(
    116             dispatch.DispatchException, dispatch._source_handler_file, '')
    117         self.assertRaises(
    118             dispatch.DispatchException, dispatch._source_handler_file, 'def')
    119         self.assertRaises(
    120             dispatch.DispatchException, dispatch._source_handler_file, '1/0')
    121         self.failUnless(dispatch._source_handler_file(
    122                 'def web_socket_do_extra_handshake(request):pass\n'
    123                 'def web_socket_transfer_data(request):pass\n'))
    124 
    125     def test_source_warnings(self):
    126         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    127         warnings = dispatcher.source_warnings()
    128         warnings.sort()
    129         expected_warnings = [
    130                 (os.path.realpath(os.path.join(
    131                     _TEST_HANDLERS_DIR, 'blank_wsh.py')) +
    132                  ': web_socket_do_extra_handshake is not defined.'),
    133                 (os.path.realpath(os.path.join(
    134                     _TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py')) +
    135                  ': web_socket_do_extra_handshake is not callable.'),
    136                 (os.path.realpath(os.path.join(
    137                     _TEST_HANDLERS_DIR, 'sub', 'wrong_handshake_sig_wsh.py')) +
    138                  ': web_socket_do_extra_handshake is not defined.'),
    139                 (os.path.realpath(os.path.join(
    140                     _TEST_HANDLERS_DIR, 'sub', 'wrong_transfer_sig_wsh.py')) +
    141                  ': web_socket_transfer_data is not defined.'),
    142                 ]
    143         self.assertEquals(4, len(warnings))
    144         for expected, actual in zip(expected_warnings, warnings):
    145             self.assertEquals(expected, actual)
    146 
    147     def test_do_extra_handshake(self):
    148         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    149         request = mock.MockRequest()
    150         request.ws_resource = '/origin_check'
    151         request.ws_origin = 'http://example.com'
    152         dispatcher.do_extra_handshake(request)  # Must not raise exception.
    153 
    154         request.ws_origin = 'http://bad.example.com'
    155         try:
    156             dispatcher.do_extra_handshake(request)
    157             self.fail('Could not catch HandshakeException with 403 status')
    158         except handshake.HandshakeException, e:
    159             self.assertEquals(403, e.status)
    160         except Exception, e:
    161             self.fail('Unexpected exception: %r' % e)
    162 
    163     def test_abort_extra_handshake(self):
    164         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    165         request = mock.MockRequest()
    166         request.ws_resource = '/abort_by_user'
    167         self.assertRaises(handshake.AbortedByUserException,
    168                           dispatcher.do_extra_handshake, request)
    169 
    170     def test_transfer_data(self):
    171         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    172 
    173         request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
    174         request.ws_resource = '/origin_check'
    175         request.ws_protocol = 'p1'
    176         dispatcher.transfer_data(request)
    177         self.assertEqual('origin_check_wsh.py is called for /origin_check, p1'
    178                          '\xff\x00',
    179                          request.connection.written_data())
    180 
    181         request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
    182         request.ws_resource = '/sub/plain'
    183         request.ws_protocol = None
    184         dispatcher.transfer_data(request)
    185         self.assertEqual('sub/plain_wsh.py is called for /sub/plain, None'
    186                          '\xff\x00',
    187                          request.connection.written_data())
    188 
    189         request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
    190         request.ws_resource = '/sub/plain?'
    191         request.ws_protocol = None
    192         dispatcher.transfer_data(request)
    193         self.assertEqual('sub/plain_wsh.py is called for /sub/plain?, None'
    194                          '\xff\x00',
    195                          request.connection.written_data())
    196 
    197         request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
    198         request.ws_resource = '/sub/plain?q=v'
    199         request.ws_protocol = None
    200         dispatcher.transfer_data(request)
    201         self.assertEqual('sub/plain_wsh.py is called for /sub/plain?q=v, None'
    202                          '\xff\x00',
    203                          request.connection.written_data())
    204 
    205     def test_transfer_data_no_handler(self):
    206         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    207         for resource in ['/blank', '/sub/non_callable',
    208                          '/sub/no_wsh_at_the_end', '/does/not/exist']:
    209             request = mock.MockRequest(connection=mock.MockConn(''))
    210             request.ws_resource = resource
    211             request.ws_protocol = 'p2'
    212             try:
    213                 dispatcher.transfer_data(request)
    214                 self.fail()
    215             except dispatch.DispatchException, e:
    216                 self.failUnless(str(e).find('No handler') != -1)
    217             except Exception:
    218                 self.fail()
    219 
    220     def test_transfer_data_handler_exception(self):
    221         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    222         request = mock.MockRequest(connection=mock.MockConn(''))
    223         request.ws_resource = '/sub/exception_in_transfer'
    224         request.ws_protocol = 'p3'
    225         try:
    226             dispatcher.transfer_data(request)
    227             self.fail()
    228         except Exception, e:
    229             self.failUnless(str(e).find('Intentional') != -1,
    230                             'Unexpected exception: %s' % e)
    231 
    232     def test_abort_transfer_data(self):
    233         dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    234         request = mock.MockRequest()
    235         request.ws_resource = '/abort_by_user'
    236         self.assertRaises(handshake.AbortedByUserException,
    237                           dispatcher.transfer_data, request)
    238 
    239     def test_scan_dir(self):
    240         disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    241         self.assertEqual(4, len(disp._handler_suite_map))
    242         self.failUnless('/origin_check' in disp._handler_suite_map)
    243         self.failUnless(
    244             '/sub/exception_in_transfer' in disp._handler_suite_map)
    245         self.failUnless('/sub/plain' in disp._handler_suite_map)
    246 
    247     def test_scan_sub_dir(self):
    248         disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, _TEST_HANDLERS_SUB_DIR)
    249         self.assertEqual(2, len(disp._handler_suite_map))
    250         self.failIf('/origin_check' in disp._handler_suite_map)
    251         self.failUnless(
    252             '/sub/exception_in_transfer' in disp._handler_suite_map)
    253         self.failUnless('/sub/plain' in disp._handler_suite_map)
    254 
    255     def test_scan_sub_dir_as_root(self):
    256         disp = dispatch.Dispatcher(_TEST_HANDLERS_SUB_DIR,
    257                                    _TEST_HANDLERS_SUB_DIR)
    258         self.assertEqual(2, len(disp._handler_suite_map))
    259         self.failIf('/origin_check' in disp._handler_suite_map)
    260         self.failIf('/sub/exception_in_transfer' in disp._handler_suite_map)
    261         self.failIf('/sub/plain' in disp._handler_suite_map)
    262         self.failUnless('/exception_in_transfer' in disp._handler_suite_map)
    263         self.failUnless('/plain' in disp._handler_suite_map)
    264 
    265     def test_scan_dir_must_under_root(self):
    266         dispatch.Dispatcher('a/b', 'a/b/c')  # OK
    267         dispatch.Dispatcher('a/b///', 'a/b')  # OK
    268         self.assertRaises(dispatch.DispatchException,
    269                           dispatch.Dispatcher, 'a/b/c', 'a/b')
    270 
    271     def test_resource_path_alias(self):
    272         disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
    273         disp.add_resource_path_alias('/', '/origin_check')
    274         self.assertEqual(5, len(disp._handler_suite_map))
    275         self.failUnless('/origin_check' in disp._handler_suite_map)
    276         self.failUnless(
    277             '/sub/exception_in_transfer' in disp._handler_suite_map)
    278         self.failUnless('/sub/plain' in disp._handler_suite_map)
    279         self.failUnless('/' in disp._handler_suite_map)
    280         self.assertRaises(dispatch.DispatchException,
    281                           disp.add_resource_path_alias, '/alias', '/not-exist')
    282 
    283 
    284 if __name__ == '__main__':
    285     unittest.main()
    286 
    287 
    288 # vi:sts=4 sw=4 et
    289