Home | History | Annotate | Download | only in tests
      1 # (c) 2005 Ben Bangert
      2 # This module is part of the Python Paste Project and is released under
      3 # the MIT License: http://www.opensource.org/licenses/mit-license.php
      4 from nose.tools import assert_raises
      5 
      6 from paste.fixture import *
      7 from paste.registry import *
      8 from paste.registry import Registry
      9 from paste.evalexception.middleware import EvalException
     10 
     11 regobj = StackedObjectProxy()
     12 secondobj = StackedObjectProxy(default=dict(hi='people'))
     13 
     14 def simpleapp(environ, start_response):
     15     status = '200 OK'
     16     response_headers = [('Content-type','text/plain')]
     17     start_response(status, response_headers)
     18     return [b'Hello world!\n']
     19 
     20 def simpleapp_withregistry(environ, start_response):
     21     status = '200 OK'
     22     response_headers = [('Content-type','text/plain')]
     23     start_response(status, response_headers)
     24     body = 'Hello world!Value is %s\n' % regobj.keys()
     25     if six.PY3:
     26         body = body.encode('utf8')
     27     return [body]
     28 
     29 def simpleapp_withregistry_default(environ, start_response):
     30     status = '200 OK'
     31     response_headers = [('Content-type','text/plain')]
     32     start_response(status, response_headers)
     33     body = 'Hello world!Value is %s\n' % secondobj
     34     if six.PY3:
     35         body = body.encode('utf8')
     36     return [body]
     37 
     38 
     39 class RegistryUsingApp(object):
     40     def __init__(self, var, value, raise_exc=False):
     41         self.var = var
     42         self.value = value
     43         self.raise_exc = raise_exc
     44 
     45     def __call__(self, environ, start_response):
     46         if 'paste.registry' in environ:
     47             environ['paste.registry'].register(self.var, self.value)
     48         if self.raise_exc:
     49             raise self.raise_exc
     50         status = '200 OK'
     51         response_headers = [('Content-type','text/plain')]
     52         start_response(status, response_headers)
     53         body = 'Hello world!\nThe variable is %s' % str(regobj)
     54         if six.PY3:
     55             body = body.encode('utf8')
     56         return [body]
     57 
     58 class RegistryUsingIteratorApp(object):
     59     def __init__(self, var, value):
     60         self.var = var
     61         self.value = value
     62 
     63     def __call__(self, environ, start_response):
     64         if 'paste.registry' in environ:
     65             environ['paste.registry'].register(self.var, self.value)
     66         status = '200 OK'
     67         response_headers = [('Content-type','text/plain')]
     68         start_response(status, response_headers)
     69         body = 'Hello world!\nThe variable is %s' % str(regobj)
     70         if six.PY3:
     71             body = body.encode('utf8')
     72         return iter([body])
     73 
     74 class RegistryMiddleMan(object):
     75     def __init__(self, app, var, value, depth):
     76         self.app = app
     77         self.var = var
     78         self.value = value
     79         self.depth = depth
     80 
     81     def __call__(self, environ, start_response):
     82         if 'paste.registry' in environ:
     83             environ['paste.registry'].register(self.var, self.value)
     84         line = ('\nInserted by middleware!\nInsertValue at depth %s is %s'
     85                 % (self.depth, str(regobj)))
     86         if six.PY3:
     87             line = line.encode('utf8')
     88         app_response = [line]
     89         app_iter = None
     90         app_iter = self.app(environ, start_response)
     91         if type(app_iter) in (list, tuple):
     92             app_response.extend(app_iter)
     93         else:
     94             response = []
     95             for line in app_iter:
     96                 response.append(line)
     97             if hasattr(app_iter, 'close'):
     98                 app_iter.close()
     99             app_response.extend(response)
    100         line = ('\nAppended by middleware!\nAppendValue at \
    101                 depth %s is %s' % (self.depth, str(regobj)))
    102         if six.PY3:
    103             line = line.encode('utf8')
    104         app_response.append(line)
    105         return app_response
    106 
    107 
    108 def test_simple():
    109     app = TestApp(simpleapp)
    110     response = app.get('/')
    111     assert 'Hello world' in response
    112 
    113 def test_solo_registry():
    114     obj = {'hi':'people'}
    115     wsgiapp = RegistryUsingApp(regobj, obj)
    116     wsgiapp = RegistryManager(wsgiapp)
    117     app = TestApp(wsgiapp)
    118     res = app.get('/')
    119     assert 'Hello world' in res
    120     assert 'The variable is' in res
    121     assert "{'hi': 'people'}" in res
    122 
    123 def test_registry_no_object_error():
    124     app = TestApp(simpleapp_withregistry)
    125     assert_raises(TypeError, app.get, '/')
    126 
    127 def test_with_default_object():
    128     app = TestApp(simpleapp_withregistry_default)
    129     res = app.get('/')
    130     print(res)
    131     assert 'Hello world' in res
    132     assert "Value is {'hi': 'people'}" in res
    133 
    134 def test_double_registry():
    135     obj = {'hi':'people'}
    136     secondobj = {'bye':'friends'}
    137     wsgiapp = RegistryUsingApp(regobj, obj)
    138     wsgiapp = RegistryManager(wsgiapp)
    139     wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0)
    140     wsgiapp = RegistryManager(wsgiapp)
    141     app = TestApp(wsgiapp)
    142     res = app.get('/')
    143     assert 'Hello world' in res
    144     assert 'The variable is' in res
    145     assert "{'hi': 'people'}" in res
    146     assert "InsertValue at depth 0 is {'bye': 'friends'}" in res
    147     assert "AppendValue at depth 0 is {'bye': 'friends'}" in res
    148 
    149 def test_really_deep_registry():
    150     keylist = ['fred', 'wilma', 'barney', 'homer', 'marge', 'bart', 'lisa',
    151         'maggie']
    152     valuelist = range(0, len(keylist))
    153     obj = {'hi':'people'}
    154     wsgiapp = RegistryUsingApp(regobj, obj)
    155     wsgiapp = RegistryManager(wsgiapp)
    156     for depth in valuelist:
    157         newobj = {keylist[depth]: depth}
    158         wsgiapp = RegistryMiddleMan(wsgiapp, regobj, newobj, depth)
    159         wsgiapp = RegistryManager(wsgiapp)
    160     app = TestApp(wsgiapp)
    161     res = app.get('/')
    162     assert 'Hello world' in res
    163     assert 'The variable is' in res
    164     assert "{'hi': 'people'}" in res
    165     for depth in valuelist:
    166         assert "InsertValue at depth %s is {'%s': %s}" % \
    167             (depth, keylist[depth], depth) in res
    168     for depth in valuelist:
    169         assert "AppendValue at depth %s is {'%s': %s}" % \
    170             (depth, keylist[depth], depth) in res
    171 
    172 def test_iterating_response():
    173     obj = {'hi':'people'}
    174     secondobj = {'bye':'friends'}
    175     wsgiapp = RegistryUsingIteratorApp(regobj, obj)
    176     wsgiapp = RegistryManager(wsgiapp)
    177     wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0)
    178     wsgiapp = RegistryManager(wsgiapp)
    179     app = TestApp(wsgiapp)
    180     res = app.get('/')
    181     assert 'Hello world' in res
    182     assert 'The variable is' in res
    183     assert "{'hi': 'people'}" in res
    184     assert "InsertValue at depth 0 is {'bye': 'friends'}" in res
    185     assert "AppendValue at depth 0 is {'bye': 'friends'}" in res
    186 
    187 def _test_restorer(stack, data):
    188     # We need to test the request's specific Registry. Initialize it here so we
    189     # can use it later (RegistryManager will re-use one preexisting in the
    190     # environ)
    191     registry = Registry()
    192     extra_environ={'paste.throw_errors': False,
    193                    'paste.registry': registry}
    194     request_id = restorer.get_request_id(extra_environ)
    195     app = TestApp(stack)
    196     res = app.get('/', extra_environ=extra_environ, expect_errors=True)
    197 
    198     # Ensure all the StackedObjectProxies are empty after the RegistryUsingApp
    199     # raises an Exception
    200     for stacked, proxied_obj, test_cleanup in data:
    201         only_key = list(proxied_obj.keys())[0]
    202         try:
    203             assert only_key not in stacked
    204             assert False
    205         except TypeError:
    206             # Definitely empty
    207             pass
    208 
    209     # Ensure the StackedObjectProxies & Registry 'work' in the simulated
    210     # EvalException context
    211     replace = {'replace': 'dict'}
    212     new = {'new': 'object'}
    213     restorer.restoration_begin(request_id)
    214     try:
    215         for stacked, proxied_obj, test_cleanup in data:
    216             # Ensure our original data magically re-appears in this context
    217             only_key, only_val = list(proxied_obj.items())[0]
    218             assert only_key in stacked and stacked[only_key] == only_val
    219 
    220             # Ensure the Registry still works
    221             registry.prepare()
    222             registry.register(stacked, new)
    223             assert 'new' in stacked and stacked['new'] == 'object'
    224             registry.cleanup()
    225 
    226             # Back to the original (pre-prepare())
    227             assert only_key in stacked and stacked[only_key] == only_val
    228 
    229             registry.replace(stacked, replace)
    230             assert 'replace' in stacked and stacked['replace'] == 'dict'
    231 
    232             if test_cleanup:
    233                 registry.cleanup()
    234                 try:
    235                     stacked._current_obj()
    236                     assert False
    237                 except TypeError:
    238                     # Definitely empty
    239                     pass
    240     finally:
    241         restorer.restoration_end()
    242 
    243 def _restorer_data():
    244     S = StackedObjectProxy
    245     d = [[S(name='first'), dict(top='of the registry stack'), False],
    246          [S(name='second'), dict(middle='of the stack'), False],
    247          [S(name='third'), dict(bottom='of the STACK.'), False]]
    248     return d
    249 
    250 def _set_cleanup_test(data):
    251     """Instruct _test_restorer to check registry cleanup at this level of the stack
    252     """
    253     data[2] = True
    254 
    255 def test_restorer_basic():
    256     data = _restorer_data()[0]
    257     wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception())
    258     wsgiapp = RegistryManager(wsgiapp)
    259     _set_cleanup_test(data)
    260     wsgiapp = EvalException(wsgiapp)
    261     _test_restorer(wsgiapp, [data])
    262 
    263 def test_restorer_basic_manager_outside():
    264     data = _restorer_data()[0]
    265     wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception())
    266     wsgiapp = EvalException(wsgiapp)
    267     wsgiapp = RegistryManager(wsgiapp)
    268     _set_cleanup_test(data)
    269     _test_restorer(wsgiapp, [data])
    270 
    271 def test_restorer_middleman_nested_evalexception():
    272     data = _restorer_data()[:2]
    273     wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
    274     wsgiapp = EvalException(wsgiapp)
    275     wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
    276     wsgiapp = RegistryManager(wsgiapp)
    277     _set_cleanup_test(data[1])
    278     _test_restorer(wsgiapp, data)
    279 
    280 def test_restorer_nested_middleman():
    281     data = _restorer_data()[:2]
    282     wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
    283     wsgiapp = RegistryManager(wsgiapp)
    284     _set_cleanup_test(data[0])
    285     wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
    286     wsgiapp = EvalException(wsgiapp)
    287     wsgiapp = RegistryManager(wsgiapp)
    288     _set_cleanup_test(data[1])
    289     _test_restorer(wsgiapp, data)
    290 
    291 def test_restorer_middlemen_nested_evalexception():
    292     data = _restorer_data()
    293     wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
    294     wsgiapp = RegistryManager(wsgiapp)
    295     _set_cleanup_test(data[0])
    296     wsgiapp = EvalException(wsgiapp)
    297     wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
    298     wsgiapp = RegistryManager(wsgiapp)
    299     _set_cleanup_test(data[1])
    300     wsgiapp = RegistryMiddleMan(wsgiapp, data[2][0], data[2][1], 1)
    301     wsgiapp = RegistryManager(wsgiapp)
    302     _set_cleanup_test(data[2])
    303     _test_restorer(wsgiapp, data)
    304 
    305 def test_restorer_disabled():
    306     # Ensure restoration_begin/end work safely when there's no Registry
    307     wsgiapp = TestApp(simpleapp)
    308     wsgiapp.get('/')
    309     try:
    310         restorer.restoration_begin(1)
    311     finally:
    312         restorer.restoration_end()
    313         # A second call should do nothing
    314         restorer.restoration_end()
    315