1 from optparse import OptionParser 2 from optparse import Option, OptionValueError 3 import os 4 import mini_parser 5 import policy 6 from policy import MatchPathPrefix 7 import re 8 import sys 9 10 DEBUG=False 11 12 ''' 13 Use file_contexts and policy to verify Treble requirements 14 are not violated. 15 ''' 16 ### 17 # Differentiate between domains that are part of the core Android platform and 18 # domains introduced by vendors 19 coreAppdomain = { 20 'bluetooth', 21 'ephemeral_app', 22 'isolated_app', 23 'nfc', 24 'platform_app', 25 'priv_app', 26 'radio', 27 'shared_relro', 28 'shell', 29 'system_app', 30 'untrusted_app', 31 'untrusted_app_25', 32 'untrusted_v2_app', 33 } 34 coredomainWhitelist = { 35 'adbd', 36 'kernel', 37 'postinstall', 38 'postinstall_dexopt', 39 'recovery', 40 'system_server', 41 } 42 coredomainWhitelist |= coreAppdomain 43 44 class scontext: 45 def __init__(self): 46 self.fromSystem = False 47 self.fromVendor = False 48 self.coredomain = False 49 self.appdomain = False 50 self.attributes = set() 51 self.entrypoints = [] 52 self.entrypointpaths = [] 53 54 def PrintScontexts(): 55 for d in sorted(alldomains.keys()): 56 sctx = alldomains[d] 57 print d 58 print "\tcoredomain="+str(sctx.coredomain) 59 print "\tappdomain="+str(sctx.appdomain) 60 print "\tfromSystem="+str(sctx.fromSystem) 61 print "\tfromVendor="+str(sctx.fromVendor) 62 print "\tattributes="+str(sctx.attributes) 63 print "\tentrypoints="+str(sctx.entrypoints) 64 print "\tentrypointpaths=" 65 if sctx.entrypointpaths is not None: 66 for path in sctx.entrypointpaths: 67 print "\t\t"+str(path) 68 69 alldomains = {} 70 coredomains = set() 71 appdomains = set() 72 vendordomains = set() 73 74 # compat vars 75 alltypes = set() 76 oldalltypes = set() 77 compatMapping = None 78 79 def GetAllDomains(pol): 80 global alldomains 81 for result in pol.QueryTypeAttribute("domain", True): 82 alldomains[result] = scontext() 83 84 def GetAppDomains(): 85 global appdomains 86 global alldomains 87 for d in alldomains: 88 # The application of the "appdomain" attribute is trusted because core 89 # selinux policy contains neverallow rules that enforce that only zygote 90 # and runas spawned processes may transition to processes that have 91 # the appdomain attribute. 92 if "appdomain" in alldomains[d].attributes: 93 alldomains[d].appdomain = True 94 appdomains.add(d) 95 96 def GetCoreDomains(): 97 global alldomains 98 global coredomains 99 for d in alldomains: 100 # TestCoredomainViolations will verify if coredomain was incorrectly 101 # applied. 102 if "coredomain" in alldomains[d].attributes: 103 alldomains[d].coredomain = True 104 coredomains.add(d) 105 # check whether domains are executed off of /system or /vendor 106 if d in coredomainWhitelist: 107 continue 108 # TODO, add checks to prevent app domains from being incorrectly 109 # labeled as coredomain. Apps don't have entrypoints as they're always 110 # dynamically transitioned to by zygote. 111 if d in appdomains: 112 continue 113 if not alldomains[d].entrypointpaths: 114 continue 115 for path in alldomains[d].entrypointpaths: 116 # Processes with entrypoint on /system 117 if ((MatchPathPrefix(path, "/system") and not 118 MatchPathPrefix(path, "/system/vendor")) or 119 MatchPathPrefix(path, "/init") or 120 MatchPathPrefix(path, "/charger")): 121 alldomains[d].fromSystem = True 122 # Processes with entrypoint on /vendor or /system/vendor 123 if (MatchPathPrefix(path, "/vendor") or 124 MatchPathPrefix(path, "/system/vendor")): 125 alldomains[d].fromVendor = True 126 127 ### 128 # Add the entrypoint type and path(s) to each domain. 129 # 130 def GetDomainEntrypoints(pol): 131 global alldomains 132 for x in pol.QueryTERule(tclass="file", perms=["entrypoint"]): 133 if not x.sctx in alldomains: 134 continue 135 alldomains[x.sctx].entrypoints.append(str(x.tctx)) 136 # postinstall_file represents a special case specific to A/B OTAs. 137 # Update_engine mounts a partition and relabels it postinstall_file. 138 # There is no file_contexts entry associated with postinstall_file 139 # so skip the lookup. 140 if x.tctx == "postinstall_file": 141 continue 142 entrypointpath = pol.QueryFc(x.tctx) 143 if not entrypointpath: 144 continue 145 alldomains[x.sctx].entrypointpaths.extend(entrypointpath) 146 ### 147 # Get attributes associated with each domain 148 # 149 def GetAttributes(pol): 150 global alldomains 151 for domain in alldomains: 152 for result in pol.QueryTypeAttribute(domain, False): 153 alldomains[domain].attributes.add(result) 154 155 def GetAllTypes(pol, oldpol): 156 global alltypes 157 global oldalltypes 158 alltypes = pol.GetAllTypes(False) 159 oldalltypes = oldpol.GetAllTypes(False) 160 161 def setup(pol): 162 GetAllDomains(pol) 163 GetAttributes(pol) 164 GetDomainEntrypoints(pol) 165 GetAppDomains() 166 GetCoreDomains() 167 168 # setup for the policy compatibility tests 169 def compatSetup(pol, oldpol, mapping): 170 global compatMapping 171 172 GetAllTypes(pol, oldpol) 173 compatMapping = mapping 174 175 ############################################################# 176 # Tests 177 ############################################################# 178 def TestCoredomainViolations(): 179 global alldomains 180 # verify that all domains launched from /system have the coredomain 181 # attribute 182 ret = "" 183 violators = [] 184 for d in alldomains: 185 domain = alldomains[d] 186 if domain.fromSystem and "coredomain" not in domain.attributes: 187 violators.append(d); 188 if len(violators) > 0: 189 ret += "The following domain(s) must be associated with the " 190 ret += "\"coredomain\" attribute because they are executed off of " 191 ret += "/system:\n" 192 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 193 194 # verify that all domains launched form /vendor do not have the coredomain 195 # attribute 196 violators = [] 197 for d in alldomains: 198 domain = alldomains[d] 199 if domain.fromVendor and "coredomain" in domain.attributes: 200 violators.append(d) 201 if len(violators) > 0: 202 ret += "The following domains must not be associated with the " 203 ret += "\"coredomain\" attribute because they are executed off of " 204 ret += "/vendor or /system/vendor:\n" 205 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 206 207 return ret 208 209 ### 210 # Make sure that any new type introduced in the new policy that was not present 211 # in the old policy has been recorded in the mapping file. 212 def TestNoUnmappedNewTypes(): 213 global alltypes 214 global oldalltypes 215 global compatMapping 216 newt = alltypes - oldalltypes 217 ret = "" 218 violators = [] 219 220 for n in newt: 221 if compatMapping.rTypeattributesets.get(n) is None: 222 violators.append(n) 223 224 if len(violators) > 0: 225 ret += "SELinux: The following types were found added to the policy " 226 ret += "without an entry into the compatibility mapping file(s) found " 227 ret += "in private/compat/" + compatMapping.apiLevel + "/" 228 ret += compatMapping.apiLevel + "[.ignore].cil/n" 229 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 230 return ret 231 232 ### 233 # Make sure that any public type removed in the current policy has its 234 # declaration added to the mapping file for use in non-platform policy 235 def TestNoUnmappedRmTypes(): 236 global alltypes 237 global oldalltypes 238 global compatMapping 239 rmt = oldalltypes - alltypes 240 ret = "" 241 violators = [] 242 243 for o in rmt: 244 if o in compatMapping.pubtypes and not o in compatMapping.types: 245 violators.append(o) 246 247 if len(violators) > 0: 248 ret += "SELinux: The following formerly public types were removed from " 249 ret += "policy without a declaration in the compatibility mapping " 250 ret += "file(s) found in prebuilts/api/" + compatMapping.apiLevel + "/\n" 251 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 252 return ret 253 254 def TestTrebleCompatMapping(): 255 ret = TestNoUnmappedNewTypes() 256 ret += TestNoUnmappedRmTypes() 257 return ret 258 ### 259 # extend OptionParser to allow the same option flag to be used multiple times. 260 # This is used to allow multiple file_contexts files and tests to be 261 # specified. 262 # 263 class MultipleOption(Option): 264 ACTIONS = Option.ACTIONS + ("extend",) 265 STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",) 266 TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",) 267 ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",) 268 269 def take_action(self, action, dest, opt, value, values, parser): 270 if action == "extend": 271 values.ensure_value(dest, []).append(value) 272 else: 273 Option.take_action(self, action, dest, opt, value, values, parser) 274 275 Tests = {"CoredomainViolations": TestCoredomainViolations, 276 "TrebleCompatMapping": TestTrebleCompatMapping } 277 278 if __name__ == '__main__': 279 usage = "treble_sepolicy_tests.py -f nonplat_file_contexts -f " 280 usage +="plat_file_contexts -p curr_policy -b base_policy -o old_policy " 281 usage +="-m mapping file [--test test] [--help]" 282 parser = OptionParser(option_class=MultipleOption, usage=usage) 283 parser.add_option("-b", "--basepolicy", dest="basepolicy", metavar="FILE") 284 parser.add_option("-f", "--file_contexts", dest="file_contexts", 285 metavar="FILE", action="extend", type="string") 286 parser.add_option("-l", "--library-path", dest="libpath", metavar="FILE") 287 parser.add_option("-m", "--mapping", dest="mapping", metavar="FILE") 288 parser.add_option("-o", "--oldpolicy", dest="oldpolicy", metavar="FILE") 289 parser.add_option("-p", "--policy", dest="policy", metavar="FILE") 290 parser.add_option("-t", "--test", dest="tests", action="extend", 291 292 help="Test options include "+str(Tests)) 293 294 (options, args) = parser.parse_args() 295 296 if not options.libpath: 297 sys.exit("Must specify path to host libraries\n" + parser.usage) 298 if not os.path.exists(options.libpath): 299 sys.exit("Error: library-path " + options.libpath + " does not exist\n" 300 + parser.usage) 301 if not options.basepolicy: 302 sys.exit("Must specify the current platform-only policy file\n" + parser.usage) 303 if not options.mapping: 304 sys.exit("Must specify a compatibility mapping file\n" + parser.usage) 305 if not options.oldpolicy: 306 sys.exit("Must specify the previous monolithic policy file\n" + parser.usage) 307 if not options.policy: 308 sys.exit("Must specify current monolithic policy file\n" + parser.usage) 309 if not os.path.exists(options.policy): 310 sys.exit("Error: policy file " + options.policy + " does not exist\n" 311 + parser.usage) 312 313 if not options.file_contexts: 314 sys.exit("Error: Must specify file_contexts file(s)\n" + parser.usage) 315 for f in options.file_contexts: 316 if not os.path.exists(f): 317 sys.exit("Error: File_contexts file " + f + " does not exist\n" + 318 parser.usage) 319 320 pol = policy.Policy(options.policy, options.file_contexts, options.libpath) 321 setup(pol) 322 basepol = policy.Policy(options.basepolicy, None, options.libpath) 323 oldpol = policy.Policy(options.oldpolicy, None, options.libpath) 324 mapping = mini_parser.MiniCilParser(options.mapping) 325 compatSetup(basepol, oldpol, mapping) 326 327 if DEBUG: 328 PrintScontexts() 329 330 results = "" 331 # If an individual test is not specified, run all tests. 332 if options.tests is None: 333 for t in Tests.values(): 334 results += t() 335 else: 336 for tn in options.tests: 337 t = Tests.get(tn) 338 if t: 339 results += t() 340 else: 341 err = "Error: unknown test: " + tn + "\n" 342 err += "Available tests:\n" 343 for tn in Tests.keys(): 344 err += tn + "\n" 345 sys.exit(err) 346 347 if len(results) > 0: 348 sys.exit(results) 349