Home | History | Annotate | Download | only in tests
      1 from optparse import OptionParser
      2 from optparse import Option, OptionValueError
      3 import os
      4 import mini_parser
      5 import policy
      6 from policy import MatchPathPrefix
      7 import re
      8 import sys
      9 
     10 DEBUG=False
     11 
     12 '''
     13 Use file_contexts and policy to verify Treble requirements
     14 are not violated.
     15 '''
     16 ###
     17 # Differentiate between domains that are part of the core Android platform and
     18 # domains introduced by vendors
     19 coreAppdomain = {
     20         'bluetooth',
     21         'ephemeral_app',
     22         'isolated_app',
     23         'nfc',
     24         'platform_app',
     25         'priv_app',
     26         'radio',
     27         'shared_relro',
     28         'shell',
     29         'system_app',
     30         'untrusted_app',
     31         'untrusted_app_25',
     32         'untrusted_v2_app',
     33         }
     34 coredomainWhitelist = {
     35         'adbd',
     36         'kernel',
     37         'postinstall',
     38         'postinstall_dexopt',
     39         'recovery',
     40         'system_server',
     41         }
     42 coredomainWhitelist |= coreAppdomain
     43 
     44 class scontext:
     45     def __init__(self):
     46         self.fromSystem = False
     47         self.fromVendor = False
     48         self.coredomain = False
     49         self.appdomain = False
     50         self.attributes = set()
     51         self.entrypoints = []
     52         self.entrypointpaths = []
     53 
     54 def PrintScontexts():
     55     for d in sorted(alldomains.keys()):
     56         sctx = alldomains[d]
     57         print d
     58         print "\tcoredomain="+str(sctx.coredomain)
     59         print "\tappdomain="+str(sctx.appdomain)
     60         print "\tfromSystem="+str(sctx.fromSystem)
     61         print "\tfromVendor="+str(sctx.fromVendor)
     62         print "\tattributes="+str(sctx.attributes)
     63         print "\tentrypoints="+str(sctx.entrypoints)
     64         print "\tentrypointpaths="
     65         if sctx.entrypointpaths is not None:
     66             for path in sctx.entrypointpaths:
     67                 print "\t\t"+str(path)
     68 
     69 alldomains = {}
     70 coredomains = set()
     71 appdomains = set()
     72 vendordomains = set()
     73 
     74 # compat vars
     75 alltypes = set()
     76 oldalltypes = set()
     77 compatMapping = None
     78 
     79 def GetAllDomains(pol):
     80     global alldomains
     81     for result in pol.QueryTypeAttribute("domain", True):
     82         alldomains[result] = scontext()
     83 
     84 def GetAppDomains():
     85     global appdomains
     86     global alldomains
     87     for d in alldomains:
     88         # The application of the "appdomain" attribute is trusted because core
     89         # selinux policy contains neverallow rules that enforce that only zygote
     90         # and runas spawned processes may transition to processes that have
     91         # the appdomain attribute.
     92         if "appdomain" in alldomains[d].attributes:
     93             alldomains[d].appdomain = True
     94             appdomains.add(d)
     95 
     96 def GetCoreDomains():
     97     global alldomains
     98     global coredomains
     99     for d in alldomains:
    100         # TestCoredomainViolations will verify if coredomain was incorrectly
    101         # applied.
    102         if "coredomain" in alldomains[d].attributes:
    103             alldomains[d].coredomain = True
    104             coredomains.add(d)
    105         # check whether domains are executed off of /system or /vendor
    106         if d in coredomainWhitelist:
    107             continue
    108         # TODO, add checks to prevent app domains from being incorrectly
    109         # labeled as coredomain. Apps don't have entrypoints as they're always
    110         # dynamically transitioned to by zygote.
    111         if d in appdomains:
    112             continue
    113         if not alldomains[d].entrypointpaths:
    114             continue
    115         for path in alldomains[d].entrypointpaths:
    116             # Processes with entrypoint on /system
    117             if ((MatchPathPrefix(path, "/system") and not
    118                     MatchPathPrefix(path, "/system/vendor")) or
    119                     MatchPathPrefix(path, "/init") or
    120                     MatchPathPrefix(path, "/charger")):
    121                 alldomains[d].fromSystem = True
    122             # Processes with entrypoint on /vendor or /system/vendor
    123             if (MatchPathPrefix(path, "/vendor") or
    124                     MatchPathPrefix(path, "/system/vendor")):
    125                 alldomains[d].fromVendor = True
    126 
    127 ###
    128 # Add the entrypoint type and path(s) to each domain.
    129 #
    130 def GetDomainEntrypoints(pol):
    131     global alldomains
    132     for x in pol.QueryTERule(tclass="file", perms=["entrypoint"]):
    133         if not x.sctx in alldomains:
    134             continue
    135         alldomains[x.sctx].entrypoints.append(str(x.tctx))
    136         # postinstall_file represents a special case specific to A/B OTAs.
    137         # Update_engine mounts a partition and relabels it postinstall_file.
    138         # There is no file_contexts entry associated with postinstall_file
    139         # so skip the lookup.
    140         if x.tctx == "postinstall_file":
    141             continue
    142         entrypointpath = pol.QueryFc(x.tctx)
    143         if not entrypointpath:
    144             continue
    145         alldomains[x.sctx].entrypointpaths.extend(entrypointpath)
    146 ###
    147 # Get attributes associated with each domain
    148 #
    149 def GetAttributes(pol):
    150     global alldomains
    151     for domain in alldomains:
    152         for result in pol.QueryTypeAttribute(domain, False):
    153             alldomains[domain].attributes.add(result)
    154 
    155 def GetAllTypes(pol, oldpol):
    156     global alltypes
    157     global oldalltypes
    158     alltypes = pol.GetAllTypes(False)
    159     oldalltypes = oldpol.GetAllTypes(False)
    160 
    161 def setup(pol):
    162     GetAllDomains(pol)
    163     GetAttributes(pol)
    164     GetDomainEntrypoints(pol)
    165     GetAppDomains()
    166     GetCoreDomains()
    167 
    168 # setup for the policy compatibility tests
    169 def compatSetup(pol, oldpol, mapping):
    170     global compatMapping
    171 
    172     GetAllTypes(pol, oldpol)
    173     compatMapping = mapping
    174 
    175 #############################################################
    176 # Tests
    177 #############################################################
    178 def TestCoredomainViolations():
    179     global alldomains
    180     # verify that all domains launched from /system have the coredomain
    181     # attribute
    182     ret = ""
    183     violators = []
    184     for d in alldomains:
    185         domain = alldomains[d]
    186         if domain.fromSystem and "coredomain" not in domain.attributes:
    187                 violators.append(d);
    188     if len(violators) > 0:
    189         ret += "The following domain(s) must be associated with the "
    190         ret += "\"coredomain\" attribute because they are executed off of "
    191         ret += "/system:\n"
    192         ret += " ".join(str(x) for x in sorted(violators)) + "\n"
    193 
    194     # verify that all domains launched form /vendor do not have the coredomain
    195     # attribute
    196     violators = []
    197     for d in alldomains:
    198         domain = alldomains[d]
    199         if domain.fromVendor and "coredomain" in domain.attributes:
    200             violators.append(d)
    201     if len(violators) > 0:
    202         ret += "The following domains must not be associated with the "
    203         ret += "\"coredomain\" attribute because they are executed off of "
    204         ret += "/vendor or /system/vendor:\n"
    205         ret += " ".join(str(x) for x in sorted(violators)) + "\n"
    206 
    207     return ret
    208 
    209 ###
    210 # Make sure that any new type introduced in the new policy that was not present
    211 # in the old policy has been recorded in the mapping file.
    212 def TestNoUnmappedNewTypes():
    213     global alltypes
    214     global oldalltypes
    215     global compatMapping
    216     newt = alltypes - oldalltypes
    217     ret = ""
    218     violators = []
    219 
    220     for n in newt:
    221         if compatMapping.rTypeattributesets.get(n) is None:
    222             violators.append(n)
    223 
    224     if len(violators) > 0:
    225         ret += "SELinux: The following types were found added to the policy "
    226         ret += "without an entry into the compatibility mapping file(s) found "
    227         ret += "in private/compat/" + compatMapping.apiLevel + "/"
    228         ret +=  compatMapping.apiLevel + "[.ignore].cil/n"
    229         ret += " ".join(str(x) for x in sorted(violators)) + "\n"
    230     return ret
    231 
    232 ###
    233 # Make sure that any public type removed in the current policy has its
    234 # declaration added to the mapping file for use in non-platform policy
    235 def TestNoUnmappedRmTypes():
    236     global alltypes
    237     global oldalltypes
    238     global compatMapping
    239     rmt = oldalltypes - alltypes
    240     ret = ""
    241     violators = []
    242 
    243     for o in rmt:
    244         if o in compatMapping.pubtypes and not o in compatMapping.types:
    245             violators.append(o)
    246 
    247     if len(violators) > 0:
    248         ret += "SELinux: The following formerly public types were removed from "
    249         ret += "policy without a declaration in the compatibility mapping "
    250         ret += "file(s) found in prebuilts/api/" + compatMapping.apiLevel + "/\n"
    251         ret += " ".join(str(x) for x in sorted(violators)) + "\n"
    252     return ret
    253 
    254 def TestTrebleCompatMapping():
    255     ret = TestNoUnmappedNewTypes()
    256     ret += TestNoUnmappedRmTypes()
    257     return ret
    258 ###
    259 # extend OptionParser to allow the same option flag to be used multiple times.
    260 # This is used to allow multiple file_contexts files and tests to be
    261 # specified.
    262 #
    263 class MultipleOption(Option):
    264     ACTIONS = Option.ACTIONS + ("extend",)
    265     STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",)
    266     TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",)
    267     ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + ("extend",)
    268 
    269     def take_action(self, action, dest, opt, value, values, parser):
    270         if action == "extend":
    271             values.ensure_value(dest, []).append(value)
    272         else:
    273             Option.take_action(self, action, dest, opt, value, values, parser)
    274 
    275 Tests = {"CoredomainViolations": TestCoredomainViolations,
    276          "TrebleCompatMapping": TestTrebleCompatMapping }
    277 
    278 if __name__ == '__main__':
    279     usage = "treble_sepolicy_tests.py -f nonplat_file_contexts -f "
    280     usage +="plat_file_contexts -p curr_policy -b base_policy -o old_policy "
    281     usage +="-m mapping file [--test test] [--help]"
    282     parser = OptionParser(option_class=MultipleOption, usage=usage)
    283     parser.add_option("-b", "--basepolicy", dest="basepolicy", metavar="FILE")
    284     parser.add_option("-f", "--file_contexts", dest="file_contexts",
    285             metavar="FILE", action="extend", type="string")
    286     parser.add_option("-l", "--library-path", dest="libpath", metavar="FILE")
    287     parser.add_option("-m", "--mapping", dest="mapping", metavar="FILE")
    288     parser.add_option("-o", "--oldpolicy", dest="oldpolicy", metavar="FILE")
    289     parser.add_option("-p", "--policy", dest="policy", metavar="FILE")
    290     parser.add_option("-t", "--test", dest="tests", action="extend",
    291 
    292             help="Test options include "+str(Tests))
    293 
    294     (options, args) = parser.parse_args()
    295 
    296     if not options.libpath:
    297         sys.exit("Must specify path to host libraries\n" + parser.usage)
    298     if not os.path.exists(options.libpath):
    299         sys.exit("Error: library-path " + options.libpath + " does not exist\n"
    300                 + parser.usage)
    301     if not options.basepolicy:
    302         sys.exit("Must specify the current platform-only policy file\n" + parser.usage)
    303     if not options.mapping:
    304         sys.exit("Must specify a compatibility mapping file\n" + parser.usage)
    305     if not options.oldpolicy:
    306         sys.exit("Must specify the previous monolithic policy file\n" + parser.usage)
    307     if not options.policy:
    308         sys.exit("Must specify current monolithic policy file\n" + parser.usage)
    309     if not os.path.exists(options.policy):
    310         sys.exit("Error: policy file " + options.policy + " does not exist\n"
    311                 + parser.usage)
    312 
    313     if not options.file_contexts:
    314         sys.exit("Error: Must specify file_contexts file(s)\n" + parser.usage)
    315     for f in options.file_contexts:
    316         if not os.path.exists(f):
    317             sys.exit("Error: File_contexts file " + f + " does not exist\n" +
    318                     parser.usage)
    319 
    320     pol = policy.Policy(options.policy, options.file_contexts, options.libpath)
    321     setup(pol)
    322     basepol = policy.Policy(options.basepolicy, None, options.libpath)
    323     oldpol = policy.Policy(options.oldpolicy, None, options.libpath)
    324     mapping = mini_parser.MiniCilParser(options.mapping)
    325     compatSetup(basepol, oldpol, mapping)
    326 
    327     if DEBUG:
    328         PrintScontexts()
    329 
    330     results = ""
    331     # If an individual test is not specified, run all tests.
    332     if options.tests is None:
    333         for t in Tests.values():
    334             results += t()
    335     else:
    336         for tn in options.tests:
    337             t = Tests.get(tn)
    338             if t:
    339                 results += t()
    340             else:
    341                 err = "Error: unknown test: " + tn + "\n"
    342                 err += "Available tests:\n"
    343                 for tn in Tests.keys():
    344                     err += tn + "\n"
    345                 sys.exit(err)
    346 
    347     if len(results) > 0:
    348         sys.exit(results)
    349