1 # Copyright 2016 Google Inc. All Rights Reserved. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 """Unit test for django_util views""" 16 17 import copy 18 import json 19 20 import django 21 from django import http 22 import django.conf 23 from django.contrib.auth.models import AnonymousUser, User 24 import mock 25 from six.moves import reload_module 26 27 from tests.contrib.django_util import TestWithDjangoEnvironment 28 from tests.contrib.django_util.models import CredentialsModel 29 30 from oauth2client.client import FlowExchangeError, OAuth2WebServerFlow 31 import oauth2client.contrib.django_util 32 from oauth2client.contrib.django_util import views 33 from oauth2client.contrib.django_util.models import CredentialsField 34 35 36 class OAuth2AuthorizeTest(TestWithDjangoEnvironment): 37 38 def setUp(self): 39 super(OAuth2AuthorizeTest, self).setUp() 40 self.save_settings = copy.deepcopy(django.conf.settings) 41 reload_module(oauth2client.contrib.django_util) 42 self.user = User.objects.create_user( 43 username='bill', email='bill (at] example.com', password='hunter2') 44 45 def tearDown(self): 46 django.conf.settings = copy.deepcopy(self.save_settings) 47 48 def test_authorize_works(self): 49 request = self.factory.get('oauth2/oauth2authorize') 50 request.session = self.session 51 request.user = self.user 52 response = views.oauth2_authorize(request) 53 self.assertIsInstance(response, http.HttpResponseRedirect) 54 55 def test_authorize_anonymous_user(self): 56 request = self.factory.get('oauth2/oauth2authorize') 57 request.session = self.session 58 request.user = AnonymousUser() 59 response = views.oauth2_authorize(request) 60 self.assertIsInstance(response, http.HttpResponseRedirect) 61 62 def test_authorize_works_explicit_return_url(self): 63 request = self.factory.get('oauth2/oauth2authorize', 64 data={'return_url': '/return_endpoint'}) 65 request.session = self.session 66 request.user = self.user 67 response = views.oauth2_authorize(request) 68 self.assertIsInstance(response, http.HttpResponseRedirect) 69 70 71 class Oauth2AuthorizeStorageModelTest(TestWithDjangoEnvironment): 72 73 def setUp(self): 74 super(Oauth2AuthorizeStorageModelTest, self).setUp() 75 self.save_settings = copy.deepcopy(django.conf.settings) 76 77 STORAGE_MODEL = { 78 'model': 'tests.contrib.django_util.models.CredentialsModel', 79 'user_property': 'user_id', 80 'credentials_property': 'credentials' 81 } 82 django.conf.settings.GOOGLE_OAUTH2_STORAGE_MODEL = STORAGE_MODEL 83 84 # OAuth2 Settings gets configured based on Django settings 85 # at import time, so in order for us to reload the settings 86 # we need to reload the module 87 reload_module(oauth2client.contrib.django_util) 88 self.user = User.objects.create_user( 89 username='bill', email='bill (at] example.com', password='hunter2') 90 91 def tearDown(self): 92 django.conf.settings = copy.deepcopy(self.save_settings) 93 94 def test_authorize_works(self): 95 request = self.factory.get('oauth2/oauth2authorize') 96 request.session = self.session 97 request.user = self.user 98 response = views.oauth2_authorize(request) 99 self.assertIsInstance(response, http.HttpResponseRedirect) 100 # redirects to Google oauth 101 self.assertIn('accounts.google.com', response.url) 102 103 def test_authorize_anonymous_user_redirects_login(self): 104 request = self.factory.get('oauth2/oauth2authorize') 105 request.session = self.session 106 request.user = AnonymousUser() 107 response = views.oauth2_authorize(request) 108 self.assertIsInstance(response, http.HttpResponseRedirect) 109 # redirects to Django login 110 self.assertIn(django.conf.settings.LOGIN_URL, response.url) 111 112 def test_authorize_works_explicit_return_url(self): 113 request = self.factory.get('oauth2/oauth2authorize', 114 data={'return_url': '/return_endpoint'}) 115 request.session = self.session 116 request.user = self.user 117 response = views.oauth2_authorize(request) 118 self.assertIsInstance(response, http.HttpResponseRedirect) 119 120 def test_authorized_user_not_logged_in_redirects(self): 121 request = self.factory.get('oauth2/oauth2authorize', 122 data={'return_url': '/return_endpoint'}) 123 request.session = self.session 124 125 authorized_user = User.objects.create_user( 126 username='bill2', email='bill (at] example.com', password='hunter2') 127 credentials = CredentialsField() 128 129 CredentialsModel.objects.create( 130 user_id=authorized_user, 131 credentials=credentials) 132 133 request.user = authorized_user 134 response = views.oauth2_authorize(request) 135 self.assertIsInstance(response, http.HttpResponseRedirect) 136 137 138 class Oauth2CallbackTest(TestWithDjangoEnvironment): 139 140 def setUp(self): 141 super(Oauth2CallbackTest, self).setUp() 142 self.save_settings = copy.deepcopy(django.conf.settings) 143 reload_module(oauth2client.contrib.django_util) 144 145 self.CSRF_TOKEN = 'token' 146 self.RETURN_URL = 'http://return-url.com' 147 self.fake_state = { 148 'csrf_token': self.CSRF_TOKEN, 149 'return_url': self.RETURN_URL, 150 'scopes': django.conf.settings.GOOGLE_OAUTH2_SCOPES 151 } 152 self.user = User.objects.create_user( 153 username='bill', email='bill (at] example.com', password='hunter2') 154 155 @mock.patch('oauth2client.contrib.django_util.views.pickle') 156 def test_callback_works(self, pickle): 157 request = self.factory.get('oauth2/oauth2callback', data={ 158 'state': json.dumps(self.fake_state), 159 'code': 123 160 }) 161 162 self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN 163 164 flow = OAuth2WebServerFlow( 165 client_id='clientid', 166 client_secret='clientsecret', 167 scope=['email'], 168 state=json.dumps(self.fake_state), 169 redirect_uri=request.build_absolute_uri("oauth2/oauth2callback")) 170 171 name = 'google_oauth2_flow_{0}'.format(self.CSRF_TOKEN) 172 self.session[name] = pickle.dumps(flow) 173 flow.step2_exchange = mock.Mock() 174 pickle.loads.return_value = flow 175 176 request.session = self.session 177 request.user = self.user 178 response = views.oauth2_callback(request) 179 self.assertIsInstance(response, http.HttpResponseRedirect) 180 self.assertEqual( 181 response.status_code, django.http.HttpResponseRedirect.status_code) 182 self.assertEqual(response['Location'], self.RETURN_URL) 183 184 @mock.patch('oauth2client.contrib.django_util.views.pickle') 185 def test_callback_handles_bad_flow_exchange(self, pickle): 186 request = self.factory.get('oauth2/oauth2callback', data={ 187 "state": json.dumps(self.fake_state), 188 "code": 123 189 }) 190 191 self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN 192 193 flow = OAuth2WebServerFlow( 194 client_id='clientid', 195 client_secret='clientsecret', 196 scope=['email'], 197 state=json.dumps(self.fake_state), 198 redirect_uri=request.build_absolute_uri('oauth2/oauth2callback')) 199 200 self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] \ 201 = pickle.dumps(flow) 202 203 def local_throws(code): 204 raise FlowExchangeError('test') 205 206 flow.step2_exchange = local_throws 207 pickle.loads.return_value = flow 208 209 request.session = self.session 210 response = views.oauth2_callback(request) 211 self.assertIsInstance(response, http.HttpResponseBadRequest) 212 213 def test_error_returns_bad_request(self): 214 request = self.factory.get('oauth2/oauth2callback', data={ 215 'error': 'There was an error in your authorization.', 216 }) 217 response = views.oauth2_callback(request) 218 self.assertIsInstance(response, http.HttpResponseBadRequest) 219 self.assertIn(b'Authorization failed', response.content) 220 221 def test_no_session(self): 222 request = self.factory.get('oauth2/oauth2callback', data={ 223 'code': 123, 224 'state': json.dumps(self.fake_state) 225 }) 226 227 request.session = self.session 228 response = views.oauth2_callback(request) 229 self.assertIsInstance(response, http.HttpResponseBadRequest) 230 self.assertEqual( 231 response.content, b'No existing session for this flow.') 232 233 def test_missing_state_returns_bad_request(self): 234 request = self.factory.get('oauth2/oauth2callback', data={ 235 'code': 123 236 }) 237 self.session['google_oauth2_csrf_token'] = "token" 238 request.session = self.session 239 response = views.oauth2_callback(request) 240 self.assertIsInstance(response, http.HttpResponseBadRequest) 241 242 def test_bad_state(self): 243 request = self.factory.get('oauth2/oauth2callback', data={ 244 'code': 123, 245 'state': json.dumps({'wrong': 'state'}) 246 }) 247 self.session['google_oauth2_csrf_token'] = 'token' 248 request.session = self.session 249 response = views.oauth2_callback(request) 250 self.assertIsInstance(response, http.HttpResponseBadRequest) 251 self.assertEqual(response.content, b'Invalid state parameter.') 252 253 def test_bad_csrf(self): 254 request = self.factory.get('oauth2/oauth2callback', data={ 255 "state": json.dumps(self.fake_state), 256 "code": 123 257 }) 258 self.session['google_oauth2_csrf_token'] = 'WRONG TOKEN' 259 request.session = self.session 260 response = views.oauth2_callback(request) 261 self.assertIsInstance(response, http.HttpResponseBadRequest) 262 self.assertEqual(response.content, b'Invalid CSRF token.') 263 264 def test_no_saved_flow(self): 265 request = self.factory.get('oauth2/oauth2callback', data={ 266 'state': json.dumps(self.fake_state), 267 'code': 123 268 }) 269 self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN 270 self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] = None 271 request.session = self.session 272 response = views.oauth2_callback(request) 273 self.assertIsInstance(response, http.HttpResponseBadRequest) 274 self.assertEqual(response.content, b'Missing Oauth2 flow.') 275