1 # Copyright 2015 Google Inc. All rights reserved. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 """Unit tests for oauth2client.multistore_file.""" 16 17 import datetime 18 import errno 19 import os 20 import stat 21 import tempfile 22 23 import mock 24 import unittest2 25 26 from oauth2client import client 27 from oauth2client import util 28 from oauth2client.contrib import locked_file 29 from oauth2client.contrib import multistore_file 30 31 _filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data') 32 os.close(_filehandle) 33 34 35 class _MockLockedFile(object): 36 37 def __init__(self, filename_str, error_class, error_code): 38 self.filename_str = filename_str 39 self.error_class = error_class 40 self.error_code = error_code 41 self.open_and_lock_called = False 42 43 def open_and_lock(self): 44 self.open_and_lock_called = True 45 raise self.error_class(self.error_code, '') 46 47 def is_locked(self): 48 return False 49 50 def filename(self): 51 return self.filename_str 52 53 54 class Test__dict_to_tuple_key(unittest2.TestCase): 55 56 def test_key_conversions(self): 57 key1, val1 = 'somekey', 'some value' 58 key2, val2 = 'another', 'something else' 59 key3, val3 = 'onemore', 'foo' 60 test_dict = { 61 key1: val1, 62 key2: val2, 63 key3: val3, 64 } 65 tuple_key = multistore_file._dict_to_tuple_key(test_dict) 66 67 # the resulting key should be naturally sorted 68 expected_output = ( 69 (key2, val2), 70 (key3, val3), 71 (key1, val1), 72 ) 73 self.assertTupleEqual(expected_output, tuple_key) 74 # check we get the original dictionary back 75 self.assertDictEqual(test_dict, dict(tuple_key)) 76 77 78 class MultistoreFileTests(unittest2.TestCase): 79 80 def tearDown(self): 81 try: 82 os.unlink(FILENAME) 83 except OSError: 84 pass 85 86 def setUp(self): 87 try: 88 os.unlink(FILENAME) 89 except OSError: 90 pass 91 92 def _create_test_credentials(self, client_id='some_client_id', 93 expiration=None): 94 access_token = 'foo' 95 client_secret = 'cOuDdkfjxxnv+' 96 refresh_token = '1/0/a.df219fjls0' 97 token_expiry = expiration or datetime.datetime.utcnow() 98 token_uri = 'https://www.google.com/accounts/o8/oauth2/token' 99 user_agent = 'refresh_checker/1.0' 100 101 credentials = client.OAuth2Credentials( 102 access_token, client_id, client_secret, 103 refresh_token, token_expiry, token_uri, 104 user_agent) 105 return credentials 106 107 def test_lock_file_raises_ioerror(self): 108 filehandle, filename = tempfile.mkstemp() 109 os.close(filehandle) 110 111 try: 112 for error_code in (errno.EDEADLK, errno.ENOSYS, errno.ENOLCK, 113 errno.EACCES): 114 for error_class in (IOError, OSError): 115 multistore = multistore_file._MultiStore(filename) 116 multistore._file = _MockLockedFile( 117 filename, error_class, error_code) 118 # Should not raise though the underlying file class did. 119 multistore._lock() 120 self.assertTrue(multistore._file.open_and_lock_called) 121 finally: 122 os.unlink(filename) 123 124 def test_lock_file_raise_unexpected_error(self): 125 filehandle, filename = tempfile.mkstemp() 126 os.close(filehandle) 127 128 try: 129 multistore = multistore_file._MultiStore(filename) 130 multistore._file = _MockLockedFile(filename, IOError, errno.EBUSY) 131 with self.assertRaises(IOError): 132 multistore._lock() 133 self.assertTrue(multistore._file.open_and_lock_called) 134 finally: 135 os.unlink(filename) 136 137 def test_read_only_file_fail_lock(self): 138 credentials = self._create_test_credentials() 139 140 open(FILENAME, 'a+b').close() 141 os.chmod(FILENAME, 0o400) 142 143 store = multistore_file.get_credential_storage( 144 FILENAME, 145 credentials.client_id, 146 credentials.user_agent, 147 ['some-scope', 'some-other-scope']) 148 149 store.put(credentials) 150 if os.name == 'posix': # pragma: NO COVER 151 self.assertTrue(store._multistore._read_only) 152 os.chmod(FILENAME, 0o600) 153 154 def test_read_only_file_fail_lock_no_warning(self): 155 open(FILENAME, 'a+b').close() 156 os.chmod(FILENAME, 0o400) 157 158 multistore = multistore_file._MultiStore(FILENAME) 159 160 with mock.patch.object(multistore_file.logger, 'warn') as mock_warn: 161 multistore._warn_on_readonly = False 162 multistore._lock() 163 self.assertFalse(mock_warn.called) 164 165 def test_lock_skip_refresh(self): 166 with open(FILENAME, 'w') as f: 167 f.write('123') 168 os.chmod(FILENAME, 0o400) 169 170 multistore = multistore_file._MultiStore(FILENAME) 171 172 refresh_patch = mock.patch.object( 173 multistore, '_refresh_data_cache') 174 175 with refresh_patch as refresh_mock: 176 multistore._data = {} 177 multistore._lock() 178 self.assertFalse(refresh_mock.called) 179 180 @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available') 181 def test_multistore_no_symbolic_link_files(self): 182 SYMFILENAME = FILENAME + 'sym' 183 os.symlink(FILENAME, SYMFILENAME) 184 store = multistore_file.get_credential_storage( 185 SYMFILENAME, 186 'some_client_id', 187 'user-agent/1.0', 188 ['some-scope', 'some-other-scope']) 189 try: 190 with self.assertRaises( 191 locked_file.CredentialsFileSymbolicLinkError): 192 store.get() 193 finally: 194 os.unlink(SYMFILENAME) 195 196 def test_multistore_non_existent_file(self): 197 store = multistore_file.get_credential_storage( 198 FILENAME, 199 'some_client_id', 200 'user-agent/1.0', 201 ['some-scope', 'some-other-scope']) 202 203 credentials = store.get() 204 self.assertEquals(None, credentials) 205 206 def test_multistore_file(self): 207 credentials = self._create_test_credentials() 208 209 store = multistore_file.get_credential_storage( 210 FILENAME, 211 credentials.client_id, 212 credentials.user_agent, 213 ['some-scope', 'some-other-scope']) 214 215 # Save credentials 216 store.put(credentials) 217 credentials = store.get() 218 219 self.assertNotEquals(None, credentials) 220 self.assertEquals('foo', credentials.access_token) 221 222 # Delete credentials 223 store.delete() 224 credentials = store.get() 225 226 self.assertEquals(None, credentials) 227 228 if os.name == 'posix': # pragma: NO COVER 229 self.assertEquals( 230 0o600, stat.S_IMODE(os.stat(FILENAME).st_mode)) 231 232 def test_multistore_file_custom_key(self): 233 credentials = self._create_test_credentials() 234 235 custom_key = {'myapp': 'testing', 'clientid': 'some client'} 236 store = multistore_file.get_credential_storage_custom_key( 237 FILENAME, custom_key) 238 239 store.put(credentials) 240 stored_credentials = store.get() 241 242 self.assertNotEquals(None, stored_credentials) 243 self.assertEqual(credentials.access_token, 244 stored_credentials.access_token) 245 246 store.delete() 247 stored_credentials = store.get() 248 249 self.assertEquals(None, stored_credentials) 250 251 def test_multistore_file_custom_string_key(self): 252 credentials = self._create_test_credentials() 253 254 # store with string key 255 store = multistore_file.get_credential_storage_custom_string_key( 256 FILENAME, 'mykey') 257 258 store.put(credentials) 259 stored_credentials = store.get() 260 261 self.assertNotEquals(None, stored_credentials) 262 self.assertEqual(credentials.access_token, 263 stored_credentials.access_token) 264 265 # try retrieving with a dictionary 266 multistore_file.get_credential_storage_custom_string_key( 267 FILENAME, {'key': 'mykey'}) 268 stored_credentials = store.get() 269 self.assertNotEquals(None, stored_credentials) 270 self.assertEqual(credentials.access_token, 271 stored_credentials.access_token) 272 273 store.delete() 274 stored_credentials = store.get() 275 276 self.assertEquals(None, stored_credentials) 277 278 def test_multistore_file_backwards_compatibility(self): 279 credentials = self._create_test_credentials() 280 scopes = ['scope1', 'scope2'] 281 282 # store the credentials using the legacy key method 283 store = multistore_file.get_credential_storage( 284 FILENAME, 'client_id', 'user_agent', scopes) 285 store.put(credentials) 286 287 # retrieve the credentials using a custom key that matches the 288 # legacy key 289 key = {'clientId': 'client_id', 'userAgent': 'user_agent', 290 'scope': util.scopes_to_string(scopes)} 291 store = multistore_file.get_credential_storage_custom_key( 292 FILENAME, key) 293 stored_credentials = store.get() 294 295 self.assertEqual(credentials.access_token, 296 stored_credentials.access_token) 297 298 def test_multistore_file_get_all_keys(self): 299 # start with no keys 300 keys = multistore_file.get_all_credential_keys(FILENAME) 301 self.assertEquals([], keys) 302 303 # store credentials 304 credentials = self._create_test_credentials(client_id='client1') 305 custom_key = {'myapp': 'testing', 'clientid': 'client1'} 306 store1 = multistore_file.get_credential_storage_custom_key( 307 FILENAME, custom_key) 308 store1.put(credentials) 309 310 keys = multistore_file.get_all_credential_keys(FILENAME) 311 self.assertEquals([custom_key], keys) 312 313 # store more credentials 314 credentials = self._create_test_credentials(client_id='client2') 315 string_key = 'string_key' 316 store2 = multistore_file.get_credential_storage_custom_string_key( 317 FILENAME, string_key) 318 store2.put(credentials) 319 320 keys = multistore_file.get_all_credential_keys(FILENAME) 321 self.assertEquals(2, len(keys)) 322 self.assertTrue(custom_key in keys) 323 self.assertTrue({'key': string_key} in keys) 324 325 # back to no keys 326 store1.delete() 327 store2.delete() 328 keys = multistore_file.get_all_credential_keys(FILENAME) 329 self.assertEquals([], keys) 330 331 def _refresh_data_cache_helper(self): 332 multistore = multistore_file._MultiStore(FILENAME) 333 json_patch = mock.patch.object(multistore, '_locked_json_read') 334 335 return multistore, json_patch 336 337 def test__refresh_data_cache_bad_json(self): 338 multistore, json_patch = self._refresh_data_cache_helper() 339 340 with json_patch as json_mock: 341 json_mock.side_effect = ValueError('') 342 multistore._refresh_data_cache() 343 self.assertTrue(json_mock.called) 344 self.assertEqual(multistore._data, {}) 345 346 def test__refresh_data_cache_bad_version(self): 347 multistore, json_patch = self._refresh_data_cache_helper() 348 349 with json_patch as json_mock: 350 json_mock.return_value = {} 351 multistore._refresh_data_cache() 352 self.assertTrue(json_mock.called) 353 self.assertEqual(multistore._data, {}) 354 355 def test__refresh_data_cache_newer_version(self): 356 multistore, json_patch = self._refresh_data_cache_helper() 357 358 with json_patch as json_mock: 359 json_mock.return_value = {'file_version': 5} 360 with self.assertRaises(multistore_file.NewerCredentialStoreError): 361 multistore._refresh_data_cache() 362 self.assertTrue(json_mock.called) 363 364 def test__refresh_data_cache_bad_credentials(self): 365 multistore, json_patch = self._refresh_data_cache_helper() 366 367 with json_patch as json_mock: 368 json_mock.return_value = { 369 'file_version': 1, 370 'data': [ 371 {'lol': 'this is a bad credential object.'} 372 ]} 373 multistore._refresh_data_cache() 374 self.assertTrue(json_mock.called) 375 self.assertEqual(multistore._data, {}) 376 377 def test__delete_credential_nonexistent(self): 378 multistore = multistore_file._MultiStore(FILENAME) 379 380 with mock.patch.object(multistore, '_write') as write_mock: 381 multistore._data = {} 382 multistore._delete_credential('nonexistent_key') 383 self.assertTrue(write_mock.called) 384