Home | History | Annotate | Download | only in django_util
      1 # Copyright 2016 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 """Unit test for django_util views"""
     16 
     17 import copy
     18 import json
     19 
     20 import django
     21 from django import http
     22 import django.conf
     23 from django.contrib.auth.models import AnonymousUser, User
     24 import mock
     25 from six.moves import reload_module
     26 
     27 from tests.contrib.django_util import TestWithDjangoEnvironment
     28 from tests.contrib.django_util.models import CredentialsModel
     29 
     30 from oauth2client.client import FlowExchangeError, OAuth2WebServerFlow
     31 import oauth2client.contrib.django_util
     32 from oauth2client.contrib.django_util import views
     33 from oauth2client.contrib.django_util.models import CredentialsField
     34 
     35 
     36 class OAuth2AuthorizeTest(TestWithDjangoEnvironment):
     37 
     38     def setUp(self):
     39         super(OAuth2AuthorizeTest, self).setUp()
     40         self.save_settings = copy.deepcopy(django.conf.settings)
     41         reload_module(oauth2client.contrib.django_util)
     42         self.user = User.objects.create_user(
     43           username='bill', email='bill (at] example.com', password='hunter2')
     44 
     45     def tearDown(self):
     46         django.conf.settings = copy.deepcopy(self.save_settings)
     47 
     48     def test_authorize_works(self):
     49         request = self.factory.get('oauth2/oauth2authorize')
     50         request.session = self.session
     51         request.user = self.user
     52         response = views.oauth2_authorize(request)
     53         self.assertIsInstance(response, http.HttpResponseRedirect)
     54 
     55     def test_authorize_anonymous_user(self):
     56         request = self.factory.get('oauth2/oauth2authorize')
     57         request.session = self.session
     58         request.user = AnonymousUser()
     59         response = views.oauth2_authorize(request)
     60         self.assertIsInstance(response, http.HttpResponseRedirect)
     61 
     62     def test_authorize_works_explicit_return_url(self):
     63         request = self.factory.get('oauth2/oauth2authorize',
     64                                    data={'return_url': '/return_endpoint'})
     65         request.session = self.session
     66         request.user = self.user
     67         response = views.oauth2_authorize(request)
     68         self.assertIsInstance(response, http.HttpResponseRedirect)
     69 
     70 
     71 class Oauth2AuthorizeStorageModelTest(TestWithDjangoEnvironment):
     72 
     73     def setUp(self):
     74         super(Oauth2AuthorizeStorageModelTest, self).setUp()
     75         self.save_settings = copy.deepcopy(django.conf.settings)
     76 
     77         STORAGE_MODEL = {
     78             'model': 'tests.contrib.django_util.models.CredentialsModel',
     79             'user_property': 'user_id',
     80             'credentials_property': 'credentials'
     81         }
     82         django.conf.settings.GOOGLE_OAUTH2_STORAGE_MODEL = STORAGE_MODEL
     83 
     84         # OAuth2 Settings gets configured based on Django settings
     85         # at import time, so in order for us to reload the settings
     86         # we need to reload the module
     87         reload_module(oauth2client.contrib.django_util)
     88         self.user = User.objects.create_user(
     89             username='bill', email='bill (at] example.com', password='hunter2')
     90 
     91     def tearDown(self):
     92         django.conf.settings = copy.deepcopy(self.save_settings)
     93 
     94     def test_authorize_works(self):
     95         request = self.factory.get('oauth2/oauth2authorize')
     96         request.session = self.session
     97         request.user = self.user
     98         response = views.oauth2_authorize(request)
     99         self.assertIsInstance(response, http.HttpResponseRedirect)
    100         # redirects to Google oauth
    101         self.assertIn('accounts.google.com', response.url)
    102 
    103     def test_authorize_anonymous_user_redirects_login(self):
    104         request = self.factory.get('oauth2/oauth2authorize')
    105         request.session = self.session
    106         request.user = AnonymousUser()
    107         response = views.oauth2_authorize(request)
    108         self.assertIsInstance(response, http.HttpResponseRedirect)
    109         # redirects to Django login
    110         self.assertIn(django.conf.settings.LOGIN_URL, response.url)
    111 
    112     def test_authorize_works_explicit_return_url(self):
    113         request = self.factory.get('oauth2/oauth2authorize',
    114                                    data={'return_url': '/return_endpoint'})
    115         request.session = self.session
    116         request.user = self.user
    117         response = views.oauth2_authorize(request)
    118         self.assertIsInstance(response, http.HttpResponseRedirect)
    119 
    120     def test_authorized_user_not_logged_in_redirects(self):
    121         request = self.factory.get('oauth2/oauth2authorize',
    122                                    data={'return_url': '/return_endpoint'})
    123         request.session = self.session
    124 
    125         authorized_user = User.objects.create_user(
    126             username='bill2', email='bill (at] example.com', password='hunter2')
    127         credentials = CredentialsField()
    128 
    129         CredentialsModel.objects.create(
    130             user_id=authorized_user,
    131             credentials=credentials)
    132 
    133         request.user = authorized_user
    134         response = views.oauth2_authorize(request)
    135         self.assertIsInstance(response, http.HttpResponseRedirect)
    136 
    137 
    138 class Oauth2CallbackTest(TestWithDjangoEnvironment):
    139 
    140     def setUp(self):
    141         super(Oauth2CallbackTest, self).setUp()
    142         self.save_settings = copy.deepcopy(django.conf.settings)
    143         reload_module(oauth2client.contrib.django_util)
    144 
    145         self.CSRF_TOKEN = 'token'
    146         self.RETURN_URL = 'http://return-url.com'
    147         self.fake_state = {
    148             'csrf_token': self.CSRF_TOKEN,
    149             'return_url': self.RETURN_URL,
    150             'scopes': django.conf.settings.GOOGLE_OAUTH2_SCOPES
    151         }
    152         self.user = User.objects.create_user(
    153             username='bill', email='bill (at] example.com', password='hunter2')
    154 
    155     @mock.patch('oauth2client.contrib.django_util.views.pickle')
    156     def test_callback_works(self, pickle):
    157         request = self.factory.get('oauth2/oauth2callback', data={
    158             'state': json.dumps(self.fake_state),
    159             'code': 123
    160         })
    161 
    162         self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
    163 
    164         flow = OAuth2WebServerFlow(
    165             client_id='clientid',
    166             client_secret='clientsecret',
    167             scope=['email'],
    168             state=json.dumps(self.fake_state),
    169             redirect_uri=request.build_absolute_uri("oauth2/oauth2callback"))
    170 
    171         name = 'google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)
    172         self.session[name] = pickle.dumps(flow)
    173         flow.step2_exchange = mock.Mock()
    174         pickle.loads.return_value = flow
    175 
    176         request.session = self.session
    177         request.user = self.user
    178         response = views.oauth2_callback(request)
    179         self.assertIsInstance(response, http.HttpResponseRedirect)
    180         self.assertEqual(
    181             response.status_code, django.http.HttpResponseRedirect.status_code)
    182         self.assertEqual(response['Location'], self.RETURN_URL)
    183 
    184     @mock.patch('oauth2client.contrib.django_util.views.pickle')
    185     def test_callback_handles_bad_flow_exchange(self, pickle):
    186         request = self.factory.get('oauth2/oauth2callback', data={
    187             "state": json.dumps(self.fake_state),
    188             "code": 123
    189         })
    190 
    191         self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
    192 
    193         flow = OAuth2WebServerFlow(
    194             client_id='clientid',
    195             client_secret='clientsecret',
    196             scope=['email'],
    197             state=json.dumps(self.fake_state),
    198             redirect_uri=request.build_absolute_uri('oauth2/oauth2callback'))
    199 
    200         self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] \
    201             = pickle.dumps(flow)
    202 
    203         def local_throws(code):
    204             raise FlowExchangeError('test')
    205 
    206         flow.step2_exchange = local_throws
    207         pickle.loads.return_value = flow
    208 
    209         request.session = self.session
    210         response = views.oauth2_callback(request)
    211         self.assertIsInstance(response, http.HttpResponseBadRequest)
    212 
    213     def test_error_returns_bad_request(self):
    214         request = self.factory.get('oauth2/oauth2callback', data={
    215             'error': 'There was an error in your authorization.',
    216         })
    217         response = views.oauth2_callback(request)
    218         self.assertIsInstance(response, http.HttpResponseBadRequest)
    219         self.assertIn(b'Authorization failed', response.content)
    220 
    221     def test_no_session(self):
    222         request = self.factory.get('oauth2/oauth2callback', data={
    223             'code': 123,
    224             'state': json.dumps(self.fake_state)
    225         })
    226 
    227         request.session = self.session
    228         response = views.oauth2_callback(request)
    229         self.assertIsInstance(response, http.HttpResponseBadRequest)
    230         self.assertEqual(
    231             response.content, b'No existing session for this flow.')
    232 
    233     def test_missing_state_returns_bad_request(self):
    234         request = self.factory.get('oauth2/oauth2callback', data={
    235             'code': 123
    236         })
    237         self.session['google_oauth2_csrf_token'] = "token"
    238         request.session = self.session
    239         response = views.oauth2_callback(request)
    240         self.assertIsInstance(response, http.HttpResponseBadRequest)
    241 
    242     def test_bad_state(self):
    243         request = self.factory.get('oauth2/oauth2callback', data={
    244             'code': 123,
    245             'state': json.dumps({'wrong': 'state'})
    246         })
    247         self.session['google_oauth2_csrf_token'] = 'token'
    248         request.session = self.session
    249         response = views.oauth2_callback(request)
    250         self.assertIsInstance(response, http.HttpResponseBadRequest)
    251         self.assertEqual(response.content, b'Invalid state parameter.')
    252 
    253     def test_bad_csrf(self):
    254         request = self.factory.get('oauth2/oauth2callback', data={
    255             "state": json.dumps(self.fake_state),
    256             "code": 123
    257         })
    258         self.session['google_oauth2_csrf_token'] = 'WRONG TOKEN'
    259         request.session = self.session
    260         response = views.oauth2_callback(request)
    261         self.assertIsInstance(response, http.HttpResponseBadRequest)
    262         self.assertEqual(response.content, b'Invalid CSRF token.')
    263 
    264     def test_no_saved_flow(self):
    265         request = self.factory.get('oauth2/oauth2callback', data={
    266             'state': json.dumps(self.fake_state),
    267             'code': 123
    268         })
    269         self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
    270         self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] = None
    271         request.session = self.session
    272         response = views.oauth2_callback(request)
    273         self.assertIsInstance(response, http.HttpResponseBadRequest)
    274         self.assertEqual(response.content, b'Missing Oauth2 flow.')
    275