Home | History | Annotate | Download | only in tests
      1 # -*- coding: utf-8 -*-
      2 # Copyright 2013 Google Inc. All Rights Reserved.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 """Integration tests for the defacl command."""
     16 
     17 from __future__ import absolute_import
     18 
     19 import re
     20 
     21 from gslib.cs_api_map import ApiSelector
     22 import gslib.tests.testcase as case
     23 from gslib.tests.testcase.integration_testcase import SkipForS3
     24 from gslib.tests.util import ObjectToURI as suri
     25 
     26 PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"'
     27 
     28 
     29 @SkipForS3('S3 does not support default object ACLs.')
     30 class TestDefacl(case.GsUtilIntegrationTestCase):
     31   """Integration tests for the defacl command."""
     32 
     33   _defacl_ch_prefix = ['defacl', 'ch']
     34   _defacl_get_prefix = ['defacl', 'get']
     35   _defacl_set_prefix = ['defacl', 'set']
     36 
     37   def _MakeScopeRegex(self, role, entity_type, email_address):
     38     template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' %
     39                       (entity_type, email_address, role))
     40     return re.compile(template_regex, flags=re.DOTALL)
     41 
     42   def testChangeDefaultAcl(self):
     43     """Tests defacl ch."""
     44     bucket = self.CreateBucket()
     45 
     46     test_regex = self._MakeScopeRegex(
     47         'OWNER', 'group', self.GROUP_TEST_ADDRESS)
     48     test_regex2 = self._MakeScopeRegex(
     49         'READER', 'group', self.GROUP_TEST_ADDRESS)
     50     json_text = self.RunGsUtil(self._defacl_get_prefix +
     51                                [suri(bucket)], return_stdout=True)
     52     self.assertNotRegexpMatches(json_text, test_regex)
     53 
     54     self.RunGsUtil(self._defacl_ch_prefix +
     55                    ['-g', self.GROUP_TEST_ADDRESS+':FC', suri(bucket)])
     56     json_text2 = self.RunGsUtil(self._defacl_get_prefix +
     57                                 [suri(bucket)], return_stdout=True)
     58     self.assertRegexpMatches(json_text2, test_regex)
     59 
     60     self.RunGsUtil(self._defacl_ch_prefix +
     61                    ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)])
     62     json_text3 = self.RunGsUtil(self._defacl_get_prefix +
     63                                 [suri(bucket)], return_stdout=True)
     64     self.assertRegexpMatches(json_text3, test_regex2)
     65 
     66     stderr = self.RunGsUtil(self._defacl_ch_prefix +
     67                             ['-g', self.GROUP_TEST_ADDRESS+':WRITE',
     68                              suri(bucket)],
     69                             return_stderr=True, expected_status=1)
     70     self.assertIn('WRITER cannot be set as a default object ACL', stderr)
     71 
     72   def testChangeDefaultAclEmpty(self):
     73     """Tests adding and removing an entry from an empty default object ACL."""
     74 
     75     bucket = self.CreateBucket()
     76 
     77     # First, clear out the default object ACL on the bucket.
     78     self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)])
     79     json_text = self.RunGsUtil(self._defacl_get_prefix +
     80                                [suri(bucket)], return_stdout=True)
     81     empty_regex = r'\[\]\s*'
     82     self.assertRegexpMatches(json_text, empty_regex)
     83 
     84     group_regex = self._MakeScopeRegex(
     85         'READER', 'group', self.GROUP_TEST_ADDRESS)
     86     self.RunGsUtil(self._defacl_ch_prefix +
     87                    ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)])
     88     json_text2 = self.RunGsUtil(self._defacl_get_prefix +
     89                                 [suri(bucket)], return_stdout=True)
     90     self.assertRegexpMatches(json_text2, group_regex)
     91 
     92     if self.test_api == ApiSelector.JSON:
     93       # TODO: Enable when JSON service respects creating a private (no entries)
     94       # default object ACL via PATCH. For now, only supported in XML.
     95       return
     96 
     97     # After adding and removing a group, the default object ACL should be empty.
     98     self.RunGsUtil(self._defacl_ch_prefix +
     99                    ['-d', self.GROUP_TEST_ADDRESS, suri(bucket)])
    100     json_text3 = self.RunGsUtil(self._defacl_get_prefix +
    101                                 [suri(bucket)], return_stdout=True)
    102     self.assertRegexpMatches(json_text3, empty_regex)
    103 
    104   def testChangeMultipleBuckets(self):
    105     """Tests defacl ch on multiple buckets."""
    106     bucket1 = self.CreateBucket()
    107     bucket2 = self.CreateBucket()
    108 
    109     test_regex = self._MakeScopeRegex(
    110         'READER', 'group', self.GROUP_TEST_ADDRESS)
    111     json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)],
    112                                return_stdout=True)
    113     self.assertNotRegexpMatches(json_text, test_regex)
    114     json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)],
    115                                return_stdout=True)
    116     self.assertNotRegexpMatches(json_text, test_regex)
    117 
    118     self.RunGsUtil(self._defacl_ch_prefix +
    119                    ['-g', self.GROUP_TEST_ADDRESS+':READ',
    120                     suri(bucket1), suri(bucket2)])
    121     json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)],
    122                                return_stdout=True)
    123     self.assertRegexpMatches(json_text, test_regex)
    124     json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)],
    125                                return_stdout=True)
    126     self.assertRegexpMatches(json_text, test_regex)
    127 
    128   def testChangeMultipleAcls(self):
    129     """Tests defacl ch with multiple ACL entries."""
    130     bucket = self.CreateBucket()
    131 
    132     test_regex_group = self._MakeScopeRegex(
    133         'READER', 'group', self.GROUP_TEST_ADDRESS)
    134     test_regex_user = self._MakeScopeRegex(
    135         'OWNER', 'user', self.USER_TEST_ADDRESS)
    136     json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
    137                                return_stdout=True)
    138     self.assertNotRegexpMatches(json_text, test_regex_group)
    139     self.assertNotRegexpMatches(json_text, test_regex_user)
    140 
    141     self.RunGsUtil(self._defacl_ch_prefix +
    142                    ['-g', self.GROUP_TEST_ADDRESS+':READ',
    143                     '-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
    144     json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
    145                                return_stdout=True)
    146     self.assertRegexpMatches(json_text, test_regex_group)
    147     self.assertRegexpMatches(json_text, test_regex_user)
    148 
    149   def testEmptyDefAcl(self):
    150     bucket = self.CreateBucket()
    151     self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)])
    152     stdout = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)],
    153                             return_stdout=True)
    154     self.assertEquals(stdout.rstrip(), '[]')
    155     self.RunGsUtil(self._defacl_ch_prefix +
    156                    ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
    157 
    158   def testDeletePermissionsWithCh(self):
    159     """Tests removing permissions with defacl ch."""
    160     bucket = self.CreateBucket()
    161 
    162     test_regex = self._MakeScopeRegex(
    163         'OWNER', 'user', self.USER_TEST_ADDRESS)
    164     json_text = self.RunGsUtil(
    165         self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
    166     self.assertNotRegexpMatches(json_text, test_regex)
    167 
    168     self.RunGsUtil(self._defacl_ch_prefix +
    169                    ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)])
    170     json_text = self.RunGsUtil(
    171         self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
    172     self.assertRegexpMatches(json_text, test_regex)
    173 
    174     self.RunGsUtil(self._defacl_ch_prefix +
    175                    ['-d', self.USER_TEST_ADDRESS, suri(bucket)])
    176     json_text = self.RunGsUtil(
    177         self._defacl_get_prefix + [suri(bucket)], return_stdout=True)
    178     self.assertNotRegexpMatches(json_text, test_regex)
    179 
    180   def testTooFewArgumentsFails(self):
    181     """Tests calling defacl with insufficient number of arguments."""
    182     # No arguments for get, but valid subcommand.
    183     stderr = self.RunGsUtil(self._defacl_get_prefix, return_stderr=True,
    184                             expected_status=1)
    185     self.assertIn('command requires at least', stderr)
    186 
    187     # No arguments for set, but valid subcommand.
    188     stderr = self.RunGsUtil(self._defacl_set_prefix, return_stderr=True,
    189                             expected_status=1)
    190     self.assertIn('command requires at least', stderr)
    191 
    192     # No arguments for ch, but valid subcommand.
    193     stderr = self.RunGsUtil(self._defacl_ch_prefix, return_stderr=True,
    194                             expected_status=1)
    195     self.assertIn('command requires at least', stderr)
    196 
    197     # Neither arguments nor subcommand.
    198     stderr = self.RunGsUtil(['defacl'], return_stderr=True, expected_status=1)
    199     self.assertIn('command requires at least', stderr)
    200 
    201 
    202 class TestDefaclOldAlias(TestDefacl):
    203   _defacl_ch_prefix = ['chdefacl']
    204   _defacl_get_prefix = ['getdefacl']
    205   _defacl_set_prefix = ['setdefacl']
    206