Home | History | Annotate | Download | only in platform_EncryptedStateful
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging, os, tempfile, shutil, stat, time, posix
      6 from autotest_lib.client.bin import test, utils
      7 from autotest_lib.client.common_lib import error
      8 
      9 # TODO:
     10 #  - mock out TPM and check all error conditions
     11 #  - test failure when things aren't mounted correctly
     12 
     13 class test_checker(object):
     14     def __init__(self):
     15         logging.info("test_checker.__init__")
     16         # Empty failure list means test passes.
     17         self._failures = []
     18 
     19     def _passed(self, msg):
     20         logging.info('ok: %s' % (msg))
     21 
     22     def _failed(self, msg):
     23         logging.error('FAIL: %s' % (msg))
     24         self._failures.append(msg)
     25 
     26     def _fatal(self, msg):
     27         logging.error('FATAL: %s' % (msg))
     28         raise error.TestError(msg)
     29 
     30     def check(self, boolean, msg, fatal=False):
     31         if boolean == True:
     32             self._passed(msg)
     33         else:
     34             msg = "could not satisfy '%s'" % (msg)
     35             if fatal:
     36                 self._fatal(msg)
     37             else:
     38                 self._failed(msg)
     39 
     40     def test_raise(self):
     41         # Raise a failure if anything unexpected was seen.
     42         if len(self._failures):
     43             raise error.TestFail((", ".join(self._failures)))
     44 
     45 chk = test_checker()
     46 
     47 
     48 class EncryptedStateful(object):
     49     def _prepare_simulated_root(self):
     50         os.makedirs(self.var)
     51         os.makedirs(self.chronos)
     52         os.makedirs(self.stateful)
     53 
     54         # Build fake stateful block device (emulate 10G sda1).
     55         self.stateful_block = os.path.join(self.root, 'stateful.block')
     56         utils.system("truncate -s 10G %s" % (self.stateful_block))
     57         utils.system("mkfs.ext4 -F %s" % (self.stateful_block))
     58         utils.system("mount -n -t ext4 -o loop,noatime,commit=600 %s %s" %
     59                      (self.stateful_block, self.stateful))
     60 
     61     def __init__(self, root=None):
     62         if root == None:
     63             self.root = tempfile.mkdtemp(dir='/mnt/stateful_partition',
     64                                          prefix='.test-enc-stateful-')
     65             self.simulated = True
     66         else:
     67             self.root = root
     68             self.simulated = False
     69 
     70         self.var = os.path.join(self.root, 'var')
     71         self.chronos = os.path.join(self.root, 'home', 'chronos')
     72         self.stateful = os.path.join(self.root, 'mnt', 'stateful_partition')
     73         self.mount_log = os.path.join(self.stateful, 'mount.log')
     74         self.key = os.path.join(self.stateful, 'encrypted.key')
     75         self.needs_finalization = os.path.join(self.stateful,
     76                                                'encrypted.needs-finalization')
     77         self.block = os.path.join(self.stateful, 'encrypted.block')
     78         self.encrypted = os.path.join(self.stateful, 'encrypted')
     79 
     80         if self.simulated:
     81             try:
     82                 self._prepare_simulated_root()
     83             except:
     84                 shutil.rmtree(self.root)
     85                 raise
     86 
     87         self.mounted = not self.simulated
     88 
     89     def mount(self, args=""):
     90         if self.mounted or not self.simulated:
     91             return
     92         # TODO(keescook): figure out what is killing the resizer and
     93         # remove the explicit use of "tee" here.
     94         # Without the pipe to "tee", mount-encrypted's forked resizing
     95         # process gets killed, even though it is using daemon(). (Is
     96         # autotest doing something odd here?) This leaves the filesystem
     97         # unresized. It would be better to have the resizer running in
     98         # the background, as it is designed, so we can examine its behavior
     99         # during testing (e.g. "does the filesystem actually grow?").
    100         utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted %s 2>&1 "
    101                      "| tee %s" % (self.root, args, self.mount_log))
    102         self.mounted = True
    103 
    104     def umount(self):
    105         if not self.mounted or not self.simulated:
    106             return
    107         utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted umount" %
    108                          (self.root))
    109         self.mounted = False
    110 
    111     # Clean up when destroyed.
    112     def __del__(self):
    113         if self.simulated:
    114             self.umount()
    115             utils.system("umount -n %s" % (self.stateful))
    116             shutil.rmtree(self.root)
    117 
    118     # Perform common post-mount size/owner checks on the filesystem and
    119     # backing files.
    120     def check_sizes(self, finalized=True):
    121         # Do we have the expected backing files?
    122         chk.check(os.path.exists(self.block), "%s exists" % (self.block))
    123         if finalized:
    124             keyfile = self.key
    125             other = self.needs_finalization
    126         else:
    127             keyfile = self.needs_finalization
    128             other = self.key
    129         chk.check(os.path.exists(keyfile), "%s exists" % (keyfile))
    130         chk.check(not os.path.exists(other), "%s does not exist" % (other))
    131 
    132         # Sanity check the key file stat.
    133         info = os.stat(keyfile)
    134         chk.check(stat.S_ISREG(info.st_mode),
    135                   "%s is regular file" % (keyfile))
    136         chk.check(info.st_uid == 0, "%s is owned by root" % (keyfile))
    137         chk.check(info.st_gid == 0, "%s has group root" % (keyfile))
    138         chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
    139                   "%s is S_IRUSR | S_IWUSR" % (keyfile))
    140         chk.check(info.st_size == 48, "%s is 48 bytes" % (keyfile))
    141 
    142         # Sanity check the block file stat.
    143         info = os.stat(self.block)
    144         chk.check(stat.S_ISREG(info.st_mode),
    145                   "%s is regular file" % (self.block))
    146         chk.check(info.st_uid == 0, "%s is owned by root" % (self.block))
    147         chk.check(info.st_gid == 0, "%s has group root" % (self.block))
    148         chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
    149                   "%s is S_IRUSR | S_IWUSR" % (self.block))
    150         # Make sure block file is roughly a third of the size of the root
    151         # filesystem (within 5%).
    152         top = os.statvfs(self.stateful)
    153         backing_size = float(info.st_size)
    154         third = top.f_blocks * top.f_frsize * .3
    155         chk.check(backing_size > (third * .95)
    156                   and backing_size < (third * 1.05),
    157                   "%s is near %d bytes (was %d)" % (self.block, third,
    158                                                     info.st_size))
    159 
    160         # Wait for resize manager task to finish.
    161         utils.poll_for_condition(lambda: utils.system("pgrep mount-encrypted",
    162                                                       ignore_status=True) != 0,
    163                                  error.TestError('resizer still running'))
    164 
    165         # Verify filesystem is within 5% of the block file size.
    166         info = os.statvfs(self.encrypted)
    167         encrypted_size = float(info.f_frsize) * float(info.f_blocks)
    168         chk.check(encrypted_size / backing_size > 0.95,
    169                   "%s fs (%d) is nearly the backing device size (%d)" %
    170                   (self.encrypted, encrypted_size, backing_size))
    171         # Verify there is a reasonable number of inodes in the encrypted
    172         # filesystem (near 25% inodes-to-blocks ratio).
    173         inode_ratio = float(info.f_files) / float(info.f_blocks)
    174         chk.check(inode_ratio > 0.20 and inode_ratio < 0.30,
    175                   "%s has close to 25%% ratio of inodes-to-blocks (%.2f%%)" %
    176                   (self.encrypted, inode_ratio*100))
    177 
    178         # Raise non-fatal failures now, if they were encountered.
    179         chk.test_raise()
    180 
    181     # Wait for kernel background writing to finish.
    182     def _backing_stabilize(self):
    183         start = None
    184         size = 0
    185         while True:
    186             k = long(utils.system_output("du -sk %s" % (self.block),
    187                                          retain_output = True).split()[0])
    188             if start == None:
    189                 start = k
    190             if k == size:
    191                 # Backing file has remained the same size for 10 seconds.
    192                 # Assume the kernel is done with background initialization.
    193                 break
    194             time.sleep(10)
    195             utils.system("sync")
    196             size = k
    197         logging.info("%s stabilized at %dK (was %dK)" %
    198                      (self.block, size, start))
    199 
    200     # Check that the backing file reclaims space when filesystem contents
    201     # are deleted.
    202     def check_reclamation(self):
    203         # This test is sensitive to other things happening on the filesystem,
    204         # so we must wait for background initialization to finish first.
    205         self._backing_stabilize()
    206 
    207         megs = 200
    208         data = os.path.join(self.var, "check_reclamation")
    209         orig = os.statvfs(self.stateful)
    210 
    211         # 200M file added to encrypted filesystem.
    212         utils.system("dd if=/dev/zero of=%s bs=1M count=%s; sync" % (data,
    213                                                                      megs))
    214         # Wait for background allocations to finish.
    215         self._backing_stabilize()
    216         filled = os.statvfs(self.stateful)
    217 
    218         # 200M file removed from encrypted filesystem.
    219         utils.system("rm %s; sync" % (data))
    220         # Wait for background hole-punching to finish.
    221         self._backing_stabilize()
    222         done = os.statvfs(self.stateful)
    223 
    224         # Did the underlying filesystem grow by the size of the test file?
    225         file_blocks_used = float((megs * 1024 * 1024) / orig.f_frsize)
    226         fs_blocks_used = float(orig.f_bfree - filled.f_bfree)
    227         chk.check(file_blocks_used / fs_blocks_used > 0.95,
    228                   "%d file blocks account for most of %d fs blocks" %
    229                   (file_blocks_used, fs_blocks_used))
    230 
    231         # Did the underlying filesystem shrink on removal?
    232         fs_blocks_done = float(orig.f_bfree - done.f_bfree)
    233         chk.check(fs_blocks_done / file_blocks_used < 0.05,
    234                   "most of %d fs blocks reclaimed (%d fs blocks left over, "
    235                   "free: %d -> %d -> %d)" %
    236                   (fs_blocks_used, fs_blocks_done,
    237                    orig.f_bfree, filled.f_bfree, done.f_bfree))
    238 
    239         # Raise non-fatal failures now, if they were encountered.
    240         chk.test_raise()
    241 
    242 
    243 class platform_EncryptedStateful(test.test):
    244     version = 1
    245 
    246     def existing_partition(self):
    247         # Examine the existing encrypted partition.
    248         encstate = EncryptedStateful("/")
    249 
    250         # Perform post-mount sanity checks (and handle unfinalized devices).
    251         encstate.check_sizes(finalized=os.path.exists(encstate.key))
    252 
    253     def factory_key(self):
    254         # Create test root directory.
    255         encstate = EncryptedStateful()
    256 
    257         # Make sure we haven't run here before.
    258         chk.check(not os.path.exists(encstate.key),
    259                   "%s does not exist" % (encstate.key))
    260         chk.check(not os.path.exists(encstate.block),
    261                   "%s does not exist" % (encstate.block))
    262 
    263         # Mount a fresh encrypted stateful, with factory static key.
    264         encstate.mount("factory")
    265 
    266         # Perform post-mount sanity checks.
    267         encstate.check_sizes()
    268 
    269         # Check disk reclamation.
    270         encstate.check_reclamation()
    271 
    272         # Check explicit umount.
    273         encstate.umount()
    274 
    275     def no_tpm(self):
    276         encstate = EncryptedStateful()
    277 
    278         # Relocate the TPM device during mount.
    279         tpm = "/dev/tpm0"
    280         off = "%s.off" % (tpm)
    281         try:
    282             if os.path.exists(tpm):
    283                 utils.system("mv %s %s" % (tpm, off))
    284             # Mount without a TPM.
    285             encstate.mount()
    286         finally:
    287             if os.path.exists(off):
    288                 utils.system("mv %s %s" % (off, tpm))
    289 
    290         # Perform post-mount sanity checks.
    291         encstate.check_sizes(finalized=False)
    292 
    293     def run_once(self):
    294         # Do a no-write test of system's existing encrypted partition.
    295         self.existing_partition()
    296 
    297         # Do a no-write, no-TPM test with sanity checks.
    298         self.no_tpm()
    299 
    300         # There is no interactively controllable TPM mock yet for
    301         # mount-encrypted, so we can only test the static key currently.
    302         self.factory_key()
    303