1 # -*- coding: utf-8 -*- 2 # Copyright 2013 Google Inc. All Rights Reserved. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 """Integration tests for the defacl command.""" 16 17 from __future__ import absolute_import 18 19 import re 20 21 from gslib.cs_api_map import ApiSelector 22 import gslib.tests.testcase as case 23 from gslib.tests.testcase.integration_testcase import SkipForS3 24 from gslib.tests.util import ObjectToURI as suri 25 26 PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"' 27 28 29 @SkipForS3('S3 does not support default object ACLs.') 30 class TestDefacl(case.GsUtilIntegrationTestCase): 31 """Integration tests for the defacl command.""" 32 33 _defacl_ch_prefix = ['defacl', 'ch'] 34 _defacl_get_prefix = ['defacl', 'get'] 35 _defacl_set_prefix = ['defacl', 'set'] 36 37 def _MakeScopeRegex(self, role, entity_type, email_address): 38 template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' % 39 (entity_type, email_address, role)) 40 return re.compile(template_regex, flags=re.DOTALL) 41 42 def testChangeDefaultAcl(self): 43 """Tests defacl ch.""" 44 bucket = self.CreateBucket() 45 46 test_regex = self._MakeScopeRegex( 47 'OWNER', 'group', self.GROUP_TEST_ADDRESS) 48 test_regex2 = self._MakeScopeRegex( 49 'READER', 'group', self.GROUP_TEST_ADDRESS) 50 json_text = self.RunGsUtil(self._defacl_get_prefix + 51 [suri(bucket)], return_stdout=True) 52 self.assertNotRegexpMatches(json_text, test_regex) 53 54 self.RunGsUtil(self._defacl_ch_prefix + 55 ['-g', self.GROUP_TEST_ADDRESS+':FC', suri(bucket)]) 56 json_text2 = self.RunGsUtil(self._defacl_get_prefix + 57 [suri(bucket)], return_stdout=True) 58 self.assertRegexpMatches(json_text2, test_regex) 59 60 self.RunGsUtil(self._defacl_ch_prefix + 61 ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)]) 62 json_text3 = self.RunGsUtil(self._defacl_get_prefix + 63 [suri(bucket)], return_stdout=True) 64 self.assertRegexpMatches(json_text3, test_regex2) 65 66 stderr = self.RunGsUtil(self._defacl_ch_prefix + 67 ['-g', self.GROUP_TEST_ADDRESS+':WRITE', 68 suri(bucket)], 69 return_stderr=True, expected_status=1) 70 self.assertIn('WRITER cannot be set as a default object ACL', stderr) 71 72 def testChangeDefaultAclEmpty(self): 73 """Tests adding and removing an entry from an empty default object ACL.""" 74 75 bucket = self.CreateBucket() 76 77 # First, clear out the default object ACL on the bucket. 78 self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)]) 79 json_text = self.RunGsUtil(self._defacl_get_prefix + 80 [suri(bucket)], return_stdout=True) 81 empty_regex = r'\[\]\s*' 82 self.assertRegexpMatches(json_text, empty_regex) 83 84 group_regex = self._MakeScopeRegex( 85 'READER', 'group', self.GROUP_TEST_ADDRESS) 86 self.RunGsUtil(self._defacl_ch_prefix + 87 ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(bucket)]) 88 json_text2 = self.RunGsUtil(self._defacl_get_prefix + 89 [suri(bucket)], return_stdout=True) 90 self.assertRegexpMatches(json_text2, group_regex) 91 92 if self.test_api == ApiSelector.JSON: 93 # TODO: Enable when JSON service respects creating a private (no entries) 94 # default object ACL via PATCH. For now, only supported in XML. 95 return 96 97 # After adding and removing a group, the default object ACL should be empty. 98 self.RunGsUtil(self._defacl_ch_prefix + 99 ['-d', self.GROUP_TEST_ADDRESS, suri(bucket)]) 100 json_text3 = self.RunGsUtil(self._defacl_get_prefix + 101 [suri(bucket)], return_stdout=True) 102 self.assertRegexpMatches(json_text3, empty_regex) 103 104 def testChangeMultipleBuckets(self): 105 """Tests defacl ch on multiple buckets.""" 106 bucket1 = self.CreateBucket() 107 bucket2 = self.CreateBucket() 108 109 test_regex = self._MakeScopeRegex( 110 'READER', 'group', self.GROUP_TEST_ADDRESS) 111 json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)], 112 return_stdout=True) 113 self.assertNotRegexpMatches(json_text, test_regex) 114 json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)], 115 return_stdout=True) 116 self.assertNotRegexpMatches(json_text, test_regex) 117 118 self.RunGsUtil(self._defacl_ch_prefix + 119 ['-g', self.GROUP_TEST_ADDRESS+':READ', 120 suri(bucket1), suri(bucket2)]) 121 json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket1)], 122 return_stdout=True) 123 self.assertRegexpMatches(json_text, test_regex) 124 json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket2)], 125 return_stdout=True) 126 self.assertRegexpMatches(json_text, test_regex) 127 128 def testChangeMultipleAcls(self): 129 """Tests defacl ch with multiple ACL entries.""" 130 bucket = self.CreateBucket() 131 132 test_regex_group = self._MakeScopeRegex( 133 'READER', 'group', self.GROUP_TEST_ADDRESS) 134 test_regex_user = self._MakeScopeRegex( 135 'OWNER', 'user', self.USER_TEST_ADDRESS) 136 json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], 137 return_stdout=True) 138 self.assertNotRegexpMatches(json_text, test_regex_group) 139 self.assertNotRegexpMatches(json_text, test_regex_user) 140 141 self.RunGsUtil(self._defacl_ch_prefix + 142 ['-g', self.GROUP_TEST_ADDRESS+':READ', 143 '-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) 144 json_text = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], 145 return_stdout=True) 146 self.assertRegexpMatches(json_text, test_regex_group) 147 self.assertRegexpMatches(json_text, test_regex_user) 148 149 def testEmptyDefAcl(self): 150 bucket = self.CreateBucket() 151 self.RunGsUtil(self._defacl_set_prefix + ['private', suri(bucket)]) 152 stdout = self.RunGsUtil(self._defacl_get_prefix + [suri(bucket)], 153 return_stdout=True) 154 self.assertEquals(stdout.rstrip(), '[]') 155 self.RunGsUtil(self._defacl_ch_prefix + 156 ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) 157 158 def testDeletePermissionsWithCh(self): 159 """Tests removing permissions with defacl ch.""" 160 bucket = self.CreateBucket() 161 162 test_regex = self._MakeScopeRegex( 163 'OWNER', 'user', self.USER_TEST_ADDRESS) 164 json_text = self.RunGsUtil( 165 self._defacl_get_prefix + [suri(bucket)], return_stdout=True) 166 self.assertNotRegexpMatches(json_text, test_regex) 167 168 self.RunGsUtil(self._defacl_ch_prefix + 169 ['-u', self.USER_TEST_ADDRESS+':fc', suri(bucket)]) 170 json_text = self.RunGsUtil( 171 self._defacl_get_prefix + [suri(bucket)], return_stdout=True) 172 self.assertRegexpMatches(json_text, test_regex) 173 174 self.RunGsUtil(self._defacl_ch_prefix + 175 ['-d', self.USER_TEST_ADDRESS, suri(bucket)]) 176 json_text = self.RunGsUtil( 177 self._defacl_get_prefix + [suri(bucket)], return_stdout=True) 178 self.assertNotRegexpMatches(json_text, test_regex) 179 180 def testTooFewArgumentsFails(self): 181 """Tests calling defacl with insufficient number of arguments.""" 182 # No arguments for get, but valid subcommand. 183 stderr = self.RunGsUtil(self._defacl_get_prefix, return_stderr=True, 184 expected_status=1) 185 self.assertIn('command requires at least', stderr) 186 187 # No arguments for set, but valid subcommand. 188 stderr = self.RunGsUtil(self._defacl_set_prefix, return_stderr=True, 189 expected_status=1) 190 self.assertIn('command requires at least', stderr) 191 192 # No arguments for ch, but valid subcommand. 193 stderr = self.RunGsUtil(self._defacl_ch_prefix, return_stderr=True, 194 expected_status=1) 195 self.assertIn('command requires at least', stderr) 196 197 # Neither arguments nor subcommand. 198 stderr = self.RunGsUtil(['defacl'], return_stderr=True, expected_status=1) 199 self.assertIn('command requires at least', stderr) 200 201 202 class TestDefaclOldAlias(TestDefacl): 203 _defacl_ch_prefix = ['chdefacl'] 204 _defacl_get_prefix = ['getdefacl'] 205 _defacl_set_prefix = ['setdefacl'] 206