Home | History | Annotate | Download | only in tests
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2014 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 """Tests for signurl command."""
     16 from datetime import timedelta
     17 import pkgutil
     18 
     19 import gslib.commands.signurl
     20 from gslib.commands.signurl import HAVE_OPENSSL
     21 from gslib.exception import CommandException
     22 import gslib.tests.testcase as testcase
     23 from gslib.tests.testcase.integration_testcase import SkipForS3
     24 from gslib.tests.util import ObjectToURI as suri
     25 from gslib.tests.util import unittest
     26 
     27 
     28 # pylint: disable=protected-access
     29 @unittest.skipUnless(HAVE_OPENSSL, 'signurl requires pyopenssl.')
     30 @SkipForS3('Signed URLs are only supported for gs:// URLs.')
     31 class TestSignUrl(testcase.GsUtilIntegrationTestCase):
     32   """Integration tests for signurl command."""
     33 
     34   def _GetKsFile(self):
     35     if not hasattr(self, 'ks_file'):
     36       # Dummy pkcs12 keystore generated with the command
     37 
     38       # openssl req -new -passout pass:notasecret -batch \
     39       # -x509 -keyout signed_url_test.key -out signed_url_test.pem \
     40       # -subj '/CN=test.apps.googleusercontent.com'
     41 
     42       # &&
     43 
     44       # openssl pkcs12 -export -passin pass:notasecret \
     45       # -passout pass:notasecret -inkey signed_url_test.key \
     46       # -in signed_url_test.pem -out test.p12
     47 
     48       # &&
     49 
     50       # rm signed_url_test.key signed_url_test.pem
     51       contents = pkgutil.get_data('gslib', 'tests/test_data/test.p12')
     52       self.ks_file = self.CreateTempFile(contents=contents)
     53     return self.ks_file
     54 
     55   def testSignUrlOutput(self):
     56     """Tests signurl output of a sample object."""
     57 
     58     object_url = self.CreateObject(contents='z')
     59     stdout = self.RunGsUtil(['signurl', '-p', 'notasecret',
     60                              self._GetKsFile(), suri(object_url)],
     61                             return_stdout=True)
     62 
     63     self.assertIn(object_url.uri, stdout)
     64     self.assertIn('test (at] developer.gserviceaccount.com', stdout)
     65     self.assertIn('Expires=', stdout)
     66     self.assertIn('\tGET\t', stdout)
     67 
     68     stdout = self.RunGsUtil(['signurl', '-m', 'PUT', '-p',
     69                              'notasecret', self._GetKsFile(),
     70                              'gs://test/test.txt'], return_stdout=True)
     71 
     72     self.assertIn('test (at] developer.gserviceaccount.com', stdout)
     73     self.assertIn('Expires=', stdout)
     74     self.assertIn('\tPUT\t', stdout)
     75 
     76   def testSignUrlWithURLEncodeRequiredChars(self):
     77     objs = ['gs://example.org/test 1', 'gs://example.org/test/test 2',
     78             'gs://example.org/i ']
     79     expected_partial_urls = [
     80         ('https://storage.googleapis.com/example.org/test%201?GoogleAccessId=te'
     81          'st (at] developer.gserviceaccount.com'),
     82         ('https://storage.googleapis.com/example.org/test/test%202?GoogleAccess'
     83          'Id=test (at] developer.gserviceaccount.com'),
     84         ('https://storage.googleapis.com/example.org/%D0%90%D1%83%D0%B4%D0%B8%D'
     85          '0%BE%D0%B0%D1%80i%20%D1%85%D0%B8%D0%B2?GoogleAccessId=test@developer.'
     86          'gserviceaccount.com')
     87         ]
     88 
     89     self.assertEquals(len(objs), len(expected_partial_urls))
     90 
     91     cmd_args = ['signurl', '-p', 'notasecret', self._GetKsFile()]
     92     cmd_args.extend(objs)
     93 
     94     stdout = self.RunGsUtil(cmd_args, return_stdout=True)
     95 
     96     lines = stdout.split('\n')
     97     # Header, signed urls, trailing newline.
     98     self.assertEquals(len(lines), len(objs) + 2)
     99 
    100     # Strip the header line to make the indices line up.
    101     lines = lines[1:]
    102 
    103     for obj, line, partial_url in zip(objs, lines, expected_partial_urls):
    104       self.assertIn(obj, line)
    105       self.assertIn(partial_url, line)
    106 
    107   def testSignUrlWithWildcard(self):
    108     objs = ['test1', 'test2', 'test3']
    109     bucket = self.CreateBucket()
    110     obj_urls = []
    111 
    112     for obj_name in objs:
    113       obj_urls.append(self.CreateObject(bucket_uri=bucket,
    114                                         object_name=obj_name, contents=''))
    115 
    116     stdout = self.RunGsUtil(['signurl', '-p',
    117                              'notasecret', self._GetKsFile(),
    118                              suri(bucket) + '/*'], return_stdout=True)
    119 
    120     # Header, 3 signed urls, trailing newline
    121     self.assertEquals(len(stdout.split('\n')), 5)
    122 
    123     for obj_url in obj_urls:
    124       self.assertIn(suri(obj_url), stdout)
    125 
    126   def testSignUrlOfNonObjectUrl(self):
    127     """Tests the signurl output of a non-existent file."""
    128     self.RunGsUtil(['signurl', self._GetKsFile(), 'gs://'],
    129                    expected_status=1, stdin='notasecret')
    130     self.RunGsUtil(['signurl', 'file://tmp/abc'], expected_status=1)
    131 
    132 
    133 @unittest.skipUnless(HAVE_OPENSSL, 'signurl requires pyopenssl.')
    134 class UnitTestSignUrl(testcase.GsUtilUnitTestCase):
    135   """Unit tests for the signurl command."""
    136 
    137   def setUp(self):
    138     super(UnitTestSignUrl, self).setUp()
    139     self.ks_contents = pkgutil.get_data('gslib', 'tests/test_data/test.p12')
    140 
    141   def testDurationSpec(self):
    142     tests = [('1h', timedelta(hours=1)),
    143              ('2d', timedelta(days=2)),
    144              ('5D', timedelta(days=5)),
    145              ('35s', timedelta(seconds=35)),
    146              ('1h', timedelta(hours=1)),
    147              ('33', timedelta(hours=33)),
    148              ('22m', timedelta(minutes=22)),
    149              ('3.7', None),
    150              ('27Z', None),
    151             ]
    152 
    153     for inp, expected in tests:
    154       try:
    155         td = gslib.commands.signurl._DurationToTimeDelta(inp)
    156         self.assertEquals(td, expected)
    157       except CommandException:
    158         if expected is not None:
    159           self.fail('{0} failed to parse')
    160 
    161   def testSignPut(self):
    162     """Tests the return value of the _GenSignedUrl function with \
    163     a PUT method."""
    164 
    165     expected = ('https://storage.googleapis.com/test/test.txt?'
    166                 'GoogleAccessId=test (at] developer.gserviceaccount.com'
    167                 '&Expires=1391816302&Signature=A6QbgTA8cXZCtjy2xCr401bdi0e'
    168                 '7zChTBQ6BX61L7AfytTGEQDMD%2BbvOQKjX7%2FsEh77cmzcSxOEKqTLUD'
    169                 'bbkPgPqW3j8sGPSRX9VM58bgj1vt9yU8cRKoegFHXAqsATx2G5rc%2FvEl'
    170                 'iFp9UWMfVj5TaukqlBAVuzZWlyx0aQa9tCKXRtC9YcxORxG41RfiowA2kd8'
    171                 'XBTQt4M9XTzpVyr5rVMzfr2LvtGf9UAJvlt8p6T6nThl2vy9%2FwBoPcMFa'
    172                 'OWQcGTagwjyKWDcI1vQPIFQLGftAcv3QnGZxZTtg8pZW%2FIxRJrBhfFfcA'
    173                 'c62hDKyaU2YssSMy%2FjUJynWx3TIiJjhg%3D%3D')
    174 
    175     expiration = 1391816302
    176     ks, client_id = (gslib.commands.signurl
    177                      ._ReadKeystore(self.ks_contents, 'notasecret'))
    178     signed_url = (gslib.commands.signurl
    179                   ._GenSignedUrl(ks.get_privatekey(),
    180                                  client_id, 'PUT', '',
    181                                  '', expiration, 'test/test.txt'))
    182     self.assertEquals(expected, signed_url)
    183 
    184   def testSignurlPutContentype(self):
    185     """Tests the return value of the _GenSignedUrl function with \
    186     a PUT method and specified content type."""
    187 
    188     expected = ('https://storage.googleapis.com/test/test.txt?'
    189                 'GoogleAccessId=test (at] developer.gserviceaccount.com&'
    190                 'Expires=1391816302&Signature=APn%2BCCVcQrfc1fKQXrs'
    191                 'PEZFj9%2FmASO%2BolR8xwgBY6PbWMkcCtrUVFBauP6t4NxqZO'
    192                 'UnbOFYTZYzul0RC57ZkEWJp3VcyDIHcn6usEE%2FTzUHhbDCDW'
    193                 'awAkZS7p8kO8IIACuJlF5s9xZmZzaEBtzF0%2BBOsGgBPBlg2y'
    194                 'zrhFB6cyyAwNiUgmhLQaVkdobnSwtI5QJkvXoIjJb6hhLiVbLC'
    195                 'rWdgSZVusjAKGlWCJsM%2B4TkCR%2Bi8AnrkECngcMHuJ9mYbS'
    196                 'XI1VfEmcnRVcfkKkJGZGctaDIWK%2FMTEmfYCW6USt3Zk2WowJ'
    197                 'SGuJHqEcFz0kyfAlkpmG%2Fl5E1FQROYqLN2kZQ%3D%3D')
    198 
    199     expiration = 1391816302
    200     ks, client_id = (gslib.commands.signurl
    201                      ._ReadKeystore(self.ks_contents,
    202                                     'notasecret'))
    203     signed_url = (gslib.commands.signurl
    204                   ._GenSignedUrl(ks.get_privatekey(),
    205                                  client_id, 'PUT', '',
    206                                  'text/plain', expiration,
    207                                  'test/test.txt'))
    208     self.assertEquals(expected, signed_url)
    209 
    210   def testSignurlGet(self):
    211     """Tests the return value of the _GenSignedUrl function with \
    212     a GET method."""
    213 
    214     expected = ('https://storage.googleapis.com/test/test.txt?'
    215                 'GoogleAccessId=test (at] developer.gserviceaccount.com&'
    216                 'Expires=0&Signature=TCZwe32cU%2BMksmLiSY9shHXQjLs1'
    217                 'F3y%2F%2F1M0UhiK4qsPRVNZVwI7YWvv2qa2Xa%2BVBBafboF0'
    218                 '1%2BWvx3ZG316pwpNIRR6y7jNnE0LvQmHE8afbm2VYCi%2B2JS'
    219                 'ZK2YZFJAyEek8si53jhYQEmaRq1zPfGbX84B2FJ8v4iI%2FTC1'
    220                 'I9OE5vHF0sWwIR9d73JDrFLjaync7QYFWRExdwvqlQX%2BPO3r'
    221                 'OG9Ns%2BcQFIN7npnsVjH28yNY9gBzXya8LYmNvUx6bWHWZMiu'
    222                 'fLwDZ0jejNeDZTOfQGRM%2B0vY7NslzaT06W1wo8P7McSkAZEl'
    223                 'DCbhR0Vo1fturPMwmAhi88f0qzRzywbg%3D%3D')
    224 
    225     expiration = 0
    226     ks, client_id = (gslib.commands.signurl
    227                      ._ReadKeystore(self.ks_contents,
    228                                     'notasecret'))
    229     signed_url = (gslib.commands.signurl
    230                   ._GenSignedUrl(ks.get_privatekey(),
    231                                  client_id, 'GET', '',
    232                                  '', expiration, 'test/test.txt'))
    233     self.assertEquals(expected, signed_url)
    234