Home | History | Annotate | Download | only in contrib
      1 # Copyright 2015 Google Inc. All rights reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #      http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 """Unit tests for oauth2client.multistore_file."""
     16 
     17 import datetime
     18 import errno
     19 import os
     20 import stat
     21 import tempfile
     22 
     23 import mock
     24 import unittest2
     25 
     26 from oauth2client import client
     27 from oauth2client import util
     28 from oauth2client.contrib import locked_file
     29 from oauth2client.contrib import multistore_file
     30 
     31 _filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data')
     32 os.close(_filehandle)
     33 
     34 
     35 class _MockLockedFile(object):
     36 
     37     def __init__(self, filename_str, error_class, error_code):
     38         self.filename_str = filename_str
     39         self.error_class = error_class
     40         self.error_code = error_code
     41         self.open_and_lock_called = False
     42 
     43     def open_and_lock(self):
     44         self.open_and_lock_called = True
     45         raise self.error_class(self.error_code, '')
     46 
     47     def is_locked(self):
     48         return False
     49 
     50     def filename(self):
     51         return self.filename_str
     52 
     53 
     54 class Test__dict_to_tuple_key(unittest2.TestCase):
     55 
     56     def test_key_conversions(self):
     57         key1, val1 = 'somekey', 'some value'
     58         key2, val2 = 'another', 'something else'
     59         key3, val3 = 'onemore', 'foo'
     60         test_dict = {
     61             key1: val1,
     62             key2: val2,
     63             key3: val3,
     64         }
     65         tuple_key = multistore_file._dict_to_tuple_key(test_dict)
     66 
     67         # the resulting key should be naturally sorted
     68         expected_output = (
     69             (key2, val2),
     70             (key3, val3),
     71             (key1, val1),
     72         )
     73         self.assertTupleEqual(expected_output, tuple_key)
     74         # check we get the original dictionary back
     75         self.assertDictEqual(test_dict, dict(tuple_key))
     76 
     77 
     78 class MultistoreFileTests(unittest2.TestCase):
     79 
     80     def tearDown(self):
     81         try:
     82             os.unlink(FILENAME)
     83         except OSError:
     84             pass
     85 
     86     def setUp(self):
     87         try:
     88             os.unlink(FILENAME)
     89         except OSError:
     90             pass
     91 
     92     def _create_test_credentials(self, client_id='some_client_id',
     93                                  expiration=None):
     94         access_token = 'foo'
     95         client_secret = 'cOuDdkfjxxnv+'
     96         refresh_token = '1/0/a.df219fjls0'
     97         token_expiry = expiration or datetime.datetime.utcnow()
     98         token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
     99         user_agent = 'refresh_checker/1.0'
    100 
    101         credentials = client.OAuth2Credentials(
    102             access_token, client_id, client_secret,
    103             refresh_token, token_expiry, token_uri,
    104             user_agent)
    105         return credentials
    106 
    107     def test_lock_file_raises_ioerror(self):
    108         filehandle, filename = tempfile.mkstemp()
    109         os.close(filehandle)
    110 
    111         try:
    112             for error_code in (errno.EDEADLK, errno.ENOSYS, errno.ENOLCK,
    113                                errno.EACCES):
    114                 for error_class in (IOError, OSError):
    115                     multistore = multistore_file._MultiStore(filename)
    116                     multistore._file = _MockLockedFile(
    117                         filename, error_class, error_code)
    118                     # Should not raise though the underlying file class did.
    119                     multistore._lock()
    120                     self.assertTrue(multistore._file.open_and_lock_called)
    121         finally:
    122             os.unlink(filename)
    123 
    124     def test_lock_file_raise_unexpected_error(self):
    125         filehandle, filename = tempfile.mkstemp()
    126         os.close(filehandle)
    127 
    128         try:
    129             multistore = multistore_file._MultiStore(filename)
    130             multistore._file = _MockLockedFile(filename, IOError, errno.EBUSY)
    131             with self.assertRaises(IOError):
    132                 multistore._lock()
    133             self.assertTrue(multistore._file.open_and_lock_called)
    134         finally:
    135             os.unlink(filename)
    136 
    137     def test_read_only_file_fail_lock(self):
    138         credentials = self._create_test_credentials()
    139 
    140         open(FILENAME, 'a+b').close()
    141         os.chmod(FILENAME, 0o400)
    142 
    143         store = multistore_file.get_credential_storage(
    144             FILENAME,
    145             credentials.client_id,
    146             credentials.user_agent,
    147             ['some-scope', 'some-other-scope'])
    148 
    149         store.put(credentials)
    150         if os.name == 'posix':  # pragma: NO COVER
    151             self.assertTrue(store._multistore._read_only)
    152         os.chmod(FILENAME, 0o600)
    153 
    154     def test_read_only_file_fail_lock_no_warning(self):
    155         open(FILENAME, 'a+b').close()
    156         os.chmod(FILENAME, 0o400)
    157 
    158         multistore = multistore_file._MultiStore(FILENAME)
    159 
    160         with mock.patch.object(multistore_file.logger, 'warn') as mock_warn:
    161             multistore._warn_on_readonly = False
    162             multistore._lock()
    163             self.assertFalse(mock_warn.called)
    164 
    165     def test_lock_skip_refresh(self):
    166         with open(FILENAME, 'w') as f:
    167             f.write('123')
    168         os.chmod(FILENAME, 0o400)
    169 
    170         multistore = multistore_file._MultiStore(FILENAME)
    171 
    172         refresh_patch = mock.patch.object(
    173             multistore, '_refresh_data_cache')
    174 
    175         with refresh_patch as refresh_mock:
    176             multistore._data = {}
    177             multistore._lock()
    178             self.assertFalse(refresh_mock.called)
    179 
    180     @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available')
    181     def test_multistore_no_symbolic_link_files(self):
    182         SYMFILENAME = FILENAME + 'sym'
    183         os.symlink(FILENAME, SYMFILENAME)
    184         store = multistore_file.get_credential_storage(
    185             SYMFILENAME,
    186             'some_client_id',
    187             'user-agent/1.0',
    188             ['some-scope', 'some-other-scope'])
    189         try:
    190             with self.assertRaises(
    191                     locked_file.CredentialsFileSymbolicLinkError):
    192                 store.get()
    193         finally:
    194             os.unlink(SYMFILENAME)
    195 
    196     def test_multistore_non_existent_file(self):
    197         store = multistore_file.get_credential_storage(
    198             FILENAME,
    199             'some_client_id',
    200             'user-agent/1.0',
    201             ['some-scope', 'some-other-scope'])
    202 
    203         credentials = store.get()
    204         self.assertEquals(None, credentials)
    205 
    206     def test_multistore_file(self):
    207         credentials = self._create_test_credentials()
    208 
    209         store = multistore_file.get_credential_storage(
    210             FILENAME,
    211             credentials.client_id,
    212             credentials.user_agent,
    213             ['some-scope', 'some-other-scope'])
    214 
    215         # Save credentials
    216         store.put(credentials)
    217         credentials = store.get()
    218 
    219         self.assertNotEquals(None, credentials)
    220         self.assertEquals('foo', credentials.access_token)
    221 
    222         # Delete credentials
    223         store.delete()
    224         credentials = store.get()
    225 
    226         self.assertEquals(None, credentials)
    227 
    228         if os.name == 'posix':  # pragma: NO COVER
    229             self.assertEquals(
    230                 0o600, stat.S_IMODE(os.stat(FILENAME).st_mode))
    231 
    232     def test_multistore_file_custom_key(self):
    233         credentials = self._create_test_credentials()
    234 
    235         custom_key = {'myapp': 'testing', 'clientid': 'some client'}
    236         store = multistore_file.get_credential_storage_custom_key(
    237             FILENAME, custom_key)
    238 
    239         store.put(credentials)
    240         stored_credentials = store.get()
    241 
    242         self.assertNotEquals(None, stored_credentials)
    243         self.assertEqual(credentials.access_token,
    244                          stored_credentials.access_token)
    245 
    246         store.delete()
    247         stored_credentials = store.get()
    248 
    249         self.assertEquals(None, stored_credentials)
    250 
    251     def test_multistore_file_custom_string_key(self):
    252         credentials = self._create_test_credentials()
    253 
    254         # store with string key
    255         store = multistore_file.get_credential_storage_custom_string_key(
    256             FILENAME, 'mykey')
    257 
    258         store.put(credentials)
    259         stored_credentials = store.get()
    260 
    261         self.assertNotEquals(None, stored_credentials)
    262         self.assertEqual(credentials.access_token,
    263                          stored_credentials.access_token)
    264 
    265         # try retrieving with a dictionary
    266         multistore_file.get_credential_storage_custom_string_key(
    267             FILENAME, {'key': 'mykey'})
    268         stored_credentials = store.get()
    269         self.assertNotEquals(None, stored_credentials)
    270         self.assertEqual(credentials.access_token,
    271                          stored_credentials.access_token)
    272 
    273         store.delete()
    274         stored_credentials = store.get()
    275 
    276         self.assertEquals(None, stored_credentials)
    277 
    278     def test_multistore_file_backwards_compatibility(self):
    279         credentials = self._create_test_credentials()
    280         scopes = ['scope1', 'scope2']
    281 
    282         # store the credentials using the legacy key method
    283         store = multistore_file.get_credential_storage(
    284             FILENAME, 'client_id', 'user_agent', scopes)
    285         store.put(credentials)
    286 
    287         # retrieve the credentials using a custom key that matches the
    288         # legacy key
    289         key = {'clientId': 'client_id', 'userAgent': 'user_agent',
    290                'scope': util.scopes_to_string(scopes)}
    291         store = multistore_file.get_credential_storage_custom_key(
    292             FILENAME, key)
    293         stored_credentials = store.get()
    294 
    295         self.assertEqual(credentials.access_token,
    296                          stored_credentials.access_token)
    297 
    298     def test_multistore_file_get_all_keys(self):
    299         # start with no keys
    300         keys = multistore_file.get_all_credential_keys(FILENAME)
    301         self.assertEquals([], keys)
    302 
    303         # store credentials
    304         credentials = self._create_test_credentials(client_id='client1')
    305         custom_key = {'myapp': 'testing', 'clientid': 'client1'}
    306         store1 = multistore_file.get_credential_storage_custom_key(
    307             FILENAME, custom_key)
    308         store1.put(credentials)
    309 
    310         keys = multistore_file.get_all_credential_keys(FILENAME)
    311         self.assertEquals([custom_key], keys)
    312 
    313         # store more credentials
    314         credentials = self._create_test_credentials(client_id='client2')
    315         string_key = 'string_key'
    316         store2 = multistore_file.get_credential_storage_custom_string_key(
    317             FILENAME, string_key)
    318         store2.put(credentials)
    319 
    320         keys = multistore_file.get_all_credential_keys(FILENAME)
    321         self.assertEquals(2, len(keys))
    322         self.assertTrue(custom_key in keys)
    323         self.assertTrue({'key': string_key} in keys)
    324 
    325         # back to no keys
    326         store1.delete()
    327         store2.delete()
    328         keys = multistore_file.get_all_credential_keys(FILENAME)
    329         self.assertEquals([], keys)
    330 
    331     def _refresh_data_cache_helper(self):
    332         multistore = multistore_file._MultiStore(FILENAME)
    333         json_patch = mock.patch.object(multistore, '_locked_json_read')
    334 
    335         return multistore, json_patch
    336 
    337     def test__refresh_data_cache_bad_json(self):
    338         multistore, json_patch = self._refresh_data_cache_helper()
    339 
    340         with json_patch as json_mock:
    341             json_mock.side_effect = ValueError('')
    342             multistore._refresh_data_cache()
    343             self.assertTrue(json_mock.called)
    344             self.assertEqual(multistore._data, {})
    345 
    346     def test__refresh_data_cache_bad_version(self):
    347         multistore, json_patch = self._refresh_data_cache_helper()
    348 
    349         with json_patch as json_mock:
    350             json_mock.return_value = {}
    351             multistore._refresh_data_cache()
    352             self.assertTrue(json_mock.called)
    353             self.assertEqual(multistore._data, {})
    354 
    355     def test__refresh_data_cache_newer_version(self):
    356         multistore, json_patch = self._refresh_data_cache_helper()
    357 
    358         with json_patch as json_mock:
    359             json_mock.return_value = {'file_version': 5}
    360             with self.assertRaises(multistore_file.NewerCredentialStoreError):
    361                 multistore._refresh_data_cache()
    362             self.assertTrue(json_mock.called)
    363 
    364     def test__refresh_data_cache_bad_credentials(self):
    365         multistore, json_patch = self._refresh_data_cache_helper()
    366 
    367         with json_patch as json_mock:
    368             json_mock.return_value = {
    369                 'file_version': 1,
    370                 'data': [
    371                     {'lol': 'this is a bad credential object.'}
    372                 ]}
    373             multistore._refresh_data_cache()
    374             self.assertTrue(json_mock.called)
    375             self.assertEqual(multistore._data, {})
    376 
    377     def test__delete_credential_nonexistent(self):
    378         multistore = multistore_file._MultiStore(FILENAME)
    379 
    380         with mock.patch.object(multistore, '_write') as write_mock:
    381             multistore._data = {}
    382             multistore._delete_credential('nonexistent_key')
    383             self.assertTrue(write_mock.called)
    384