1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging, os, tempfile, shutil, stat, time, posix 6 from autotest_lib.client.bin import test, utils 7 from autotest_lib.client.common_lib import error 8 9 # TODO: 10 # - mock out TPM and check all error conditions 11 # - test failure when things aren't mounted correctly 12 13 class test_checker(object): 14 def __init__(self): 15 logging.info("test_checker.__init__") 16 # Empty failure list means test passes. 17 self._failures = [] 18 19 def _passed(self, msg): 20 logging.info('ok: %s' % (msg)) 21 22 def _failed(self, msg): 23 logging.error('FAIL: %s' % (msg)) 24 self._failures.append(msg) 25 26 def _fatal(self, msg): 27 logging.error('FATAL: %s' % (msg)) 28 raise error.TestError(msg) 29 30 def check(self, boolean, msg, fatal=False): 31 if boolean == True: 32 self._passed(msg) 33 else: 34 msg = "could not satisfy '%s'" % (msg) 35 if fatal: 36 self._fatal(msg) 37 else: 38 self._failed(msg) 39 40 def test_raise(self): 41 # Raise a failure if anything unexpected was seen. 42 if len(self._failures): 43 raise error.TestFail((", ".join(self._failures))) 44 45 chk = test_checker() 46 47 48 class EncryptedStateful(object): 49 def _prepare_simulated_root(self): 50 os.makedirs(self.var) 51 os.makedirs(self.chronos) 52 os.makedirs(self.stateful) 53 54 # Build fake stateful block device (emulate 10G sda1). 55 self.stateful_block = os.path.join(self.root, 'stateful.block') 56 utils.system("truncate -s 10G %s" % (self.stateful_block)) 57 utils.system("mkfs.ext4 -F %s" % (self.stateful_block)) 58 utils.system("mount -n -t ext4 -o loop,noatime,commit=600 %s %s" % 59 (self.stateful_block, self.stateful)) 60 61 def __init__(self, root=None): 62 if root == None: 63 self.root = tempfile.mkdtemp(dir='/mnt/stateful_partition', 64 prefix='.test-enc-stateful-') 65 self.simulated = True 66 else: 67 self.root = root 68 self.simulated = False 69 70 self.var = os.path.join(self.root, 'var') 71 self.chronos = os.path.join(self.root, 'home', 'chronos') 72 self.stateful = os.path.join(self.root, 'mnt', 'stateful_partition') 73 self.mount_log = os.path.join(self.stateful, 'mount.log') 74 self.key = os.path.join(self.stateful, 'encrypted.key') 75 self.needs_finalization = os.path.join(self.stateful, 76 'encrypted.needs-finalization') 77 self.block = os.path.join(self.stateful, 'encrypted.block') 78 self.encrypted = os.path.join(self.stateful, 'encrypted') 79 80 if self.simulated: 81 try: 82 self._prepare_simulated_root() 83 except: 84 shutil.rmtree(self.root) 85 raise 86 87 self.mounted = not self.simulated 88 89 def mount(self, args=""): 90 if self.mounted or not self.simulated: 91 return 92 # TODO(keescook): figure out what is killing the resizer and 93 # remove the explicit use of "tee" here. 94 # Without the pipe to "tee", mount-encrypted's forked resizing 95 # process gets killed, even though it is using daemon(). (Is 96 # autotest doing something odd here?) This leaves the filesystem 97 # unresized. It would be better to have the resizer running in 98 # the background, as it is designed, so we can examine its behavior 99 # during testing (e.g. "does the filesystem actually grow?"). 100 utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted %s 2>&1 " 101 "| tee %s" % (self.root, args, self.mount_log)) 102 self.mounted = True 103 104 def umount(self): 105 if not self.mounted or not self.simulated: 106 return 107 utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted umount" % 108 (self.root)) 109 self.mounted = False 110 111 # Clean up when destroyed. 112 def __del__(self): 113 if self.simulated: 114 self.umount() 115 utils.system("umount -n %s" % (self.stateful)) 116 shutil.rmtree(self.root) 117 118 # Perform common post-mount size/owner checks on the filesystem and 119 # backing files. 120 def check_sizes(self, finalized=True): 121 # Do we have the expected backing files? 122 chk.check(os.path.exists(self.block), "%s exists" % (self.block)) 123 if finalized: 124 keyfile = self.key 125 other = self.needs_finalization 126 else: 127 keyfile = self.needs_finalization 128 other = self.key 129 chk.check(os.path.exists(keyfile), "%s exists" % (keyfile)) 130 chk.check(not os.path.exists(other), "%s does not exist" % (other)) 131 132 # Sanity check the key file stat. 133 info = os.stat(keyfile) 134 chk.check(stat.S_ISREG(info.st_mode), 135 "%s is regular file" % (keyfile)) 136 chk.check(info.st_uid == 0, "%s is owned by root" % (keyfile)) 137 chk.check(info.st_gid == 0, "%s has group root" % (keyfile)) 138 chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR), 139 "%s is S_IRUSR | S_IWUSR" % (keyfile)) 140 chk.check(info.st_size == 48, "%s is 48 bytes" % (keyfile)) 141 142 # Sanity check the block file stat. 143 info = os.stat(self.block) 144 chk.check(stat.S_ISREG(info.st_mode), 145 "%s is regular file" % (self.block)) 146 chk.check(info.st_uid == 0, "%s is owned by root" % (self.block)) 147 chk.check(info.st_gid == 0, "%s has group root" % (self.block)) 148 chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR), 149 "%s is S_IRUSR | S_IWUSR" % (self.block)) 150 # Make sure block file is roughly a third of the size of the root 151 # filesystem (within 5%). 152 top = os.statvfs(self.stateful) 153 backing_size = float(info.st_size) 154 third = top.f_blocks * top.f_frsize * .3 155 chk.check(backing_size > (third * .95) 156 and backing_size < (third * 1.05), 157 "%s is near %d bytes (was %d)" % (self.block, third, 158 info.st_size)) 159 160 # Wait for resize manager task to finish. 161 utils.poll_for_condition(lambda: utils.system("pgrep mount-encrypted", 162 ignore_status=True) != 0, 163 error.TestError('resizer still running')) 164 165 # Verify filesystem is within 5% of the block file size. 166 info = os.statvfs(self.encrypted) 167 encrypted_size = float(info.f_frsize) * float(info.f_blocks) 168 chk.check(encrypted_size / backing_size > 0.95, 169 "%s fs (%d) is nearly the backing device size (%d)" % 170 (self.encrypted, encrypted_size, backing_size)) 171 # Verify there is a reasonable number of inodes in the encrypted 172 # filesystem (near 25% inodes-to-blocks ratio). 173 inode_ratio = float(info.f_files) / float(info.f_blocks) 174 chk.check(inode_ratio > 0.20 and inode_ratio < 0.30, 175 "%s has close to 25%% ratio of inodes-to-blocks (%.2f%%)" % 176 (self.encrypted, inode_ratio*100)) 177 178 # Raise non-fatal failures now, if they were encountered. 179 chk.test_raise() 180 181 # Wait for kernel background writing to finish. 182 def _backing_stabilize(self): 183 start = None 184 size = 0 185 while True: 186 k = long(utils.system_output("du -sk %s" % (self.block), 187 retain_output = True).split()[0]) 188 if start == None: 189 start = k 190 if k == size: 191 # Backing file has remained the same size for 10 seconds. 192 # Assume the kernel is done with background initialization. 193 break 194 time.sleep(10) 195 utils.system("sync") 196 size = k 197 logging.info("%s stabilized at %dK (was %dK)" % 198 (self.block, size, start)) 199 200 # Check that the backing file reclaims space when filesystem contents 201 # are deleted. 202 def check_reclamation(self): 203 # This test is sensitive to other things happening on the filesystem, 204 # so we must wait for background initialization to finish first. 205 self._backing_stabilize() 206 207 megs = 200 208 data = os.path.join(self.var, "check_reclamation") 209 orig = os.statvfs(self.stateful) 210 211 # 200M file added to encrypted filesystem. 212 utils.system("dd if=/dev/zero of=%s bs=1M count=%s; sync" % (data, 213 megs)) 214 # Wait for background allocations to finish. 215 self._backing_stabilize() 216 filled = os.statvfs(self.stateful) 217 218 # 200M file removed from encrypted filesystem. 219 utils.system("rm %s; sync" % (data)) 220 # Wait for background hole-punching to finish. 221 self._backing_stabilize() 222 done = os.statvfs(self.stateful) 223 224 # Did the underlying filesystem grow by the size of the test file? 225 file_blocks_used = float((megs * 1024 * 1024) / orig.f_frsize) 226 fs_blocks_used = float(orig.f_bfree - filled.f_bfree) 227 chk.check(file_blocks_used / fs_blocks_used > 0.95, 228 "%d file blocks account for most of %d fs blocks" % 229 (file_blocks_used, fs_blocks_used)) 230 231 # Did the underlying filesystem shrink on removal? 232 fs_blocks_done = float(orig.f_bfree - done.f_bfree) 233 chk.check(fs_blocks_done / file_blocks_used < 0.05, 234 "most of %d fs blocks reclaimed (%d fs blocks left over, " 235 "free: %d -> %d -> %d)" % 236 (fs_blocks_used, fs_blocks_done, 237 orig.f_bfree, filled.f_bfree, done.f_bfree)) 238 239 # Raise non-fatal failures now, if they were encountered. 240 chk.test_raise() 241 242 243 class platform_EncryptedStateful(test.test): 244 version = 1 245 246 def existing_partition(self): 247 # Examine the existing encrypted partition. 248 encstate = EncryptedStateful("/") 249 250 # Perform post-mount sanity checks (and handle unfinalized devices). 251 encstate.check_sizes(finalized=os.path.exists(encstate.key)) 252 253 def factory_key(self): 254 # Create test root directory. 255 encstate = EncryptedStateful() 256 257 # Make sure we haven't run here before. 258 chk.check(not os.path.exists(encstate.key), 259 "%s does not exist" % (encstate.key)) 260 chk.check(not os.path.exists(encstate.block), 261 "%s does not exist" % (encstate.block)) 262 263 # Mount a fresh encrypted stateful, with factory static key. 264 encstate.mount("factory") 265 266 # Perform post-mount sanity checks. 267 encstate.check_sizes() 268 269 # Check disk reclamation. 270 encstate.check_reclamation() 271 272 # Check explicit umount. 273 encstate.umount() 274 275 def no_tpm(self): 276 encstate = EncryptedStateful() 277 278 # Relocate the TPM device during mount. 279 tpm = "/dev/tpm0" 280 off = "%s.off" % (tpm) 281 try: 282 if os.path.exists(tpm): 283 utils.system("mv %s %s" % (tpm, off)) 284 # Mount without a TPM. 285 encstate.mount() 286 finally: 287 if os.path.exists(off): 288 utils.system("mv %s %s" % (off, tpm)) 289 290 # Perform post-mount sanity checks. 291 encstate.check_sizes(finalized=False) 292 293 def run_once(self): 294 # Do a no-write test of system's existing encrypted partition. 295 self.existing_partition() 296 297 # Do a no-write, no-TPM test with sanity checks. 298 self.no_tpm() 299 300 # There is no interactively controllable TPM mock yet for 301 # mount-encrypted, so we can only test the static key currently. 302 self.factory_key() 303