Home | History | Annotate | Download | only in security_ModuleLocking
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging, os
      6 from autotest_lib.client.bin import test, utils
      7 from autotest_lib.client.common_lib import error
      8 import tempfile
      9 
     10 class security_ModuleLocking(test.test):
     11     """
     12     Handle examining the system for specific module loading capabilities.
     13     """
     14     version = 1
     15 
     16     def _passed(self, msg):
     17         logging.info('ok: %s', msg)
     18 
     19     def _failed(self, msg):
     20         logging.error('FAIL: %s', msg)
     21         self._failures.append(msg)
     22 
     23     def _fatal(self, msg):
     24         logging.error('FATAL: %s', msg)
     25         raise error.TestError(msg)
     26 
     27     def check(self, boolean, msg, fatal=False):
     28         """
     29         Check boolean state and report condition to log.
     30 
     31         @param boolean: condition to examine
     32         @param msg: what the condition is testing
     33         @param fatal: should the test full abort on the condition failing
     34         """
     35         if boolean == True:
     36             self._passed(msg)
     37         else:
     38             msg = "could not satisfy '%s'" % (msg)
     39             if fatal:
     40                 self._fatal(msg)
     41             else:
     42                 self._failed(msg)
     43 
     44     def module_loaded(self, module):
     45         """
     46         Detect if the given module is already loaded in the kernel.
     47 
     48         @param module: name of module to check
     49         """
     50         module = module.replace('-', '_')
     51         match = "%s " % (module)
     52         for line in open("/proc/modules"):
     53             if line.startswith(match):
     54                 return True
     55         return False
     56 
     57     def rmmod(self, module):
     58         """
     59         Unload a module if it is already loaded in the kernel.
     60 
     61         @param module: name of module to unload
     62         """
     63         if self.module_loaded(module):
     64             utils.system("rmmod %s" % (module))
     65 
     66     def modprobe(self, module):
     67         """
     68         If a module is not already loaded in the kernel, load it via modprobe.
     69 
     70         @param module: name of module to load
     71         """
     72         if not self.module_loaded(module):
     73             utils.system("modprobe %s" % (module))
     74 
     75     def _module_path(self, module):
     76         """
     77         Locate a kernel module's full filesystem path.
     78 
     79         @param module: name of module to locate
     80         """
     81         ko = utils.system_output("find /lib/modules -name '%s.ko'" % (module))
     82         return ko.splitlines()[0]
     83 
     84     def module_loads_outside_rootfs(self, module):
     85         """
     86         Copies the given module into /tmp and tries to load it from there
     87         using insmod directly.
     88 
     89         @param module: name of module to test
     90         """
     91         # Start from a clean slate.
     92         self.rmmod(module)
     93 
     94         # Make sure we can load with standard mechanisms.
     95         self.modprobe(module)
     96         self.rmmod(module)
     97 
     98         # Load module directly with insmod from root filesystem.
     99         ko = self._module_path(module)
    100         utils.system("insmod %s" % (ko))
    101         self.rmmod(module)
    102 
    103         # Load module directly with insmod from /tmp.
    104         tmp = "/tmp/%s.ko" % (module)
    105         utils.system("cp %s %s" % (ko, tmp))
    106         rc = utils.system("insmod %s" % (tmp), ignore_status=True)
    107 
    108         # Clean up.
    109         self.rmmod(module)
    110         utils.system("rm %s" % (tmp))
    111 
    112         if rc == 0:
    113             return True
    114         return False
    115 
    116     def module_loads_old_api(self, module):
    117         """
    118         Loads a module using the old blob-style kernel syscall. With
    119         kmod, this requires compressing the module first to trigger
    120         in-memory decompression and loading.
    121 
    122         @param module: name of module to test
    123         """
    124         # Start from a clean slate.
    125         self.rmmod(module)
    126 
    127         # Compress module to trigger the old API.
    128         tmp = "/tmp/%s.ko.gz" % (module)
    129         ko = self._module_path(module)
    130         utils.system("gzip -c %s > %s" % (ko, tmp))
    131         rc = utils.system("insmod %s" % (tmp), ignore_status=True)
    132 
    133         # Clean up.
    134         self.rmmod(module)
    135         utils.system("rm %s" % (tmp))
    136 
    137         if rc == 0:
    138             return True
    139         return False
    140 
    141     def module_loads_after_bind_umount(self, module):
    142         """
    143         Makes sure modules can still load after a bind mount of the
    144         filesystem is umounted.
    145 
    146         @param module: name of module to test
    147         """
    148 
    149         # Start from a clean slate.
    150         self.rmmod(module)
    151 
    152         # Make sure we can load with standard mechanisms.
    153         self.modprobe(module)
    154         self.rmmod(module)
    155 
    156         # Create and umount a bind mount of the root filesystem.
    157         bind = tempfile.mkdtemp(prefix=module)
    158         rc = utils.system("mount -o bind / %s && umount %s" % (bind, bind))
    159         utils.system("rmdir %s" % (bind))
    160 
    161         # Attempt to load again.
    162         self.modprobe(module)
    163         self.rmmod(module)
    164 
    165         if rc == 0:
    166             return True
    167         return False
    168 
    169     def run_once(self):
    170         """
    171         Check that the fd-based module loading syscall is enforcing the
    172         module fd origin to the root filesystem, and that it can be
    173         disabled and will allow the old syscall API as well.
    174         TODO(keescook): add production test to make sure that on a verified
    175         boot, "/proc/sys/kernel/chromiumos/module_locking" does not exist.
    176         """
    177         # Empty failure list means test passes.
    178         self._failures = []
    179 
    180         # Check that the sysctl is either missing or set to 1.
    181         sysctl = "/proc/sys/kernel/chromiumos/module_locking"
    182         if os.path.exists(sysctl):
    183             self.check(open(sysctl).read() == '1\n', "%s enabled" % (sysctl))
    184 
    185         # Check the enforced state is to deny non-rootfs module loads.
    186         module = "test_module"
    187         loaded = self.module_loads_outside_rootfs(module)
    188         self.check(loaded == False, "cannot load %s from /tmp" % (module))
    189 
    190         # Check old API fails when enforcement enabled.
    191         loaded = self.module_loads_old_api(module)
    192         self.check(loaded == False, "cannot load %s with old API" % (module))
    193 
    194         # Make sure the bind umount bug is not present.
    195         loaded = self.module_loads_after_bind_umount(module)
    196         self.check(loaded == True, "can load %s after bind umount" % (module))
    197 
    198         # If the sysctl exists, verify that it will disable the restriction.
    199         if os.path.exists(sysctl):
    200             # Disable restriction.
    201             open(sysctl, "w").write("0\n")
    202             self.check(open(sysctl).read() == '0\n', "%s disabled" % (sysctl))
    203 
    204             # Check enforcement is disabled.
    205             loaded = self.module_loads_outside_rootfs(module)
    206             self.check(loaded == True, "can load %s from /tmp" % (module))
    207 
    208             # Check old API works when enforcement disabled.
    209             loaded = self.module_loads_old_api(module)
    210             self.check(loaded == True, "can load %s with old API" % (module))
    211 
    212             # Clean up.
    213             open(sysctl, "w").write("1\n")
    214             self.check(open(sysctl).read() == '1\n', "%s enabled" % (sysctl))
    215 
    216         # Raise a failure if anything unexpected was seen.
    217         if len(self._failures):
    218             raise error.TestFail((", ".join(self._failures)))
    219