1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging, os 6 from autotest_lib.client.bin import test, utils 7 from autotest_lib.client.common_lib import error 8 import tempfile 9 10 class security_ModuleLocking(test.test): 11 """ 12 Handle examining the system for specific module loading capabilities. 13 """ 14 version = 1 15 16 def _passed(self, msg): 17 logging.info('ok: %s', msg) 18 19 def _failed(self, msg): 20 logging.error('FAIL: %s', msg) 21 self._failures.append(msg) 22 23 def _fatal(self, msg): 24 logging.error('FATAL: %s', msg) 25 raise error.TestError(msg) 26 27 def check(self, boolean, msg, fatal=False): 28 """ 29 Check boolean state and report condition to log. 30 31 @param boolean: condition to examine 32 @param msg: what the condition is testing 33 @param fatal: should the test full abort on the condition failing 34 """ 35 if boolean == True: 36 self._passed(msg) 37 else: 38 msg = "could not satisfy '%s'" % (msg) 39 if fatal: 40 self._fatal(msg) 41 else: 42 self._failed(msg) 43 44 def module_loaded(self, module): 45 """ 46 Detect if the given module is already loaded in the kernel. 47 48 @param module: name of module to check 49 """ 50 module = module.replace('-', '_') 51 match = "%s " % (module) 52 for line in open("/proc/modules"): 53 if line.startswith(match): 54 return True 55 return False 56 57 def rmmod(self, module): 58 """ 59 Unload a module if it is already loaded in the kernel. 60 61 @param module: name of module to unload 62 """ 63 if self.module_loaded(module): 64 utils.system("rmmod %s" % (module)) 65 66 def modprobe(self, module): 67 """ 68 If a module is not already loaded in the kernel, load it via modprobe. 69 70 @param module: name of module to load 71 """ 72 if not self.module_loaded(module): 73 utils.system("modprobe %s" % (module)) 74 75 def _module_path(self, module): 76 """ 77 Locate a kernel module's full filesystem path. 78 79 @param module: name of module to locate 80 """ 81 ko = utils.system_output("find /lib/modules -name '%s.ko'" % (module)) 82 return ko.splitlines()[0] 83 84 def module_loads_outside_rootfs(self, module): 85 """ 86 Copies the given module into /tmp and tries to load it from there 87 using insmod directly. 88 89 @param module: name of module to test 90 """ 91 # Start from a clean slate. 92 self.rmmod(module) 93 94 # Make sure we can load with standard mechanisms. 95 self.modprobe(module) 96 self.rmmod(module) 97 98 # Load module directly with insmod from root filesystem. 99 ko = self._module_path(module) 100 utils.system("insmod %s" % (ko)) 101 self.rmmod(module) 102 103 # Load module directly with insmod from /tmp. 104 tmp = "/tmp/%s.ko" % (module) 105 utils.system("cp %s %s" % (ko, tmp)) 106 rc = utils.system("insmod %s" % (tmp), ignore_status=True) 107 108 # Clean up. 109 self.rmmod(module) 110 utils.system("rm %s" % (tmp)) 111 112 if rc == 0: 113 return True 114 return False 115 116 def module_loads_old_api(self, module): 117 """ 118 Loads a module using the old blob-style kernel syscall. With 119 kmod, this requires compressing the module first to trigger 120 in-memory decompression and loading. 121 122 @param module: name of module to test 123 """ 124 # Start from a clean slate. 125 self.rmmod(module) 126 127 # Compress module to trigger the old API. 128 tmp = "/tmp/%s.ko.gz" % (module) 129 ko = self._module_path(module) 130 utils.system("gzip -c %s > %s" % (ko, tmp)) 131 rc = utils.system("insmod %s" % (tmp), ignore_status=True) 132 133 # Clean up. 134 self.rmmod(module) 135 utils.system("rm %s" % (tmp)) 136 137 if rc == 0: 138 return True 139 return False 140 141 def module_loads_after_bind_umount(self, module): 142 """ 143 Makes sure modules can still load after a bind mount of the 144 filesystem is umounted. 145 146 @param module: name of module to test 147 """ 148 149 # Start from a clean slate. 150 self.rmmod(module) 151 152 # Make sure we can load with standard mechanisms. 153 self.modprobe(module) 154 self.rmmod(module) 155 156 # Create and umount a bind mount of the root filesystem. 157 bind = tempfile.mkdtemp(prefix=module) 158 rc = utils.system("mount -o bind / %s && umount %s" % (bind, bind)) 159 utils.system("rmdir %s" % (bind)) 160 161 # Attempt to load again. 162 self.modprobe(module) 163 self.rmmod(module) 164 165 if rc == 0: 166 return True 167 return False 168 169 def run_once(self): 170 """ 171 Check that the fd-based module loading syscall is enforcing the 172 module fd origin to the root filesystem, and that it can be 173 disabled and will allow the old syscall API as well. 174 TODO(keescook): add production test to make sure that on a verified 175 boot, "/proc/sys/kernel/chromiumos/module_locking" does not exist. 176 """ 177 # Empty failure list means test passes. 178 self._failures = [] 179 180 # Check that the sysctl is either missing or set to 1. 181 sysctl = "/proc/sys/kernel/chromiumos/module_locking" 182 if os.path.exists(sysctl): 183 self.check(open(sysctl).read() == '1\n', "%s enabled" % (sysctl)) 184 185 # Check the enforced state is to deny non-rootfs module loads. 186 module = "test_module" 187 loaded = self.module_loads_outside_rootfs(module) 188 self.check(loaded == False, "cannot load %s from /tmp" % (module)) 189 190 # Check old API fails when enforcement enabled. 191 loaded = self.module_loads_old_api(module) 192 self.check(loaded == False, "cannot load %s with old API" % (module)) 193 194 # Make sure the bind umount bug is not present. 195 loaded = self.module_loads_after_bind_umount(module) 196 self.check(loaded == True, "can load %s after bind umount" % (module)) 197 198 # If the sysctl exists, verify that it will disable the restriction. 199 if os.path.exists(sysctl): 200 # Disable restriction. 201 open(sysctl, "w").write("0\n") 202 self.check(open(sysctl).read() == '0\n', "%s disabled" % (sysctl)) 203 204 # Check enforcement is disabled. 205 loaded = self.module_loads_outside_rootfs(module) 206 self.check(loaded == True, "can load %s from /tmp" % (module)) 207 208 # Check old API works when enforcement disabled. 209 loaded = self.module_loads_old_api(module) 210 self.check(loaded == True, "can load %s with old API" % (module)) 211 212 # Clean up. 213 open(sysctl, "w").write("1\n") 214 self.check(open(sysctl).read() == '1\n', "%s enabled" % (sysctl)) 215 216 # Raise a failure if anything unexpected was seen. 217 if len(self._failures): 218 raise error.TestFail((", ".join(self._failures))) 219