Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus, gobject, logging, os, random, re, shutil, string
      6 from dbus.mainloop.glib import DBusGMainLoop
      7 
      8 import common, constants
      9 from autotest_lib.client.bin import utils
     10 from autotest_lib.client.common_lib import error
     11 from autotest_lib.client.cros.cros_disks import DBusClient
     12 
     13 CRYPTOHOME_CMD = '/usr/sbin/cryptohome'
     14 GUEST_USER_NAME = '$guest'
     15 UNAVAILABLE_ACTION = 'Unknown action or no action given.'
     16 
     17 class ChromiumOSError(error.TestError):
     18     """Generic error for ChromiumOS-specific exceptions."""
     19     pass
     20 
     21 def __run_cmd(cmd):
     22     return utils.system_output(cmd + ' 2>&1', retain_output=True,
     23                                ignore_status=True).strip()
     24 
     25 def get_user_hash(user):
     26     """Get the user hash for the given user."""
     27     return utils.system_output(['cryptohome', '--action=obfuscate_user',
     28                                 '--user=%s' % user])
     29 
     30 
     31 def user_path(user):
     32     """Get the user mount point for the given user."""
     33     return utils.system_output(['cryptohome-path', 'user', user])
     34 
     35 
     36 def system_path(user):
     37     """Get the system mount point for the given user."""
     38     return utils.system_output(['cryptohome-path', 'system', user])
     39 
     40 
     41 def ensure_clean_cryptohome_for(user, password=None):
     42     """Ensure a fresh cryptohome exists for user.
     43 
     44     @param user: user who needs a shiny new cryptohome.
     45     @param password: if unset, a random password will be used.
     46     """
     47     if not password:
     48         password = ''.join(random.sample(string.ascii_lowercase, 6))
     49     remove_vault(user)
     50     mount_vault(user, password, create=True)
     51 
     52 
     53 def get_tpm_status():
     54     """Get the TPM status.
     55 
     56     Returns:
     57         A TPM status dictionary, for example:
     58         { 'Enabled': True,
     59           'Owned': True,
     60           'Being Owned': False,
     61           'Ready': True,
     62           'Password': ''
     63         }
     64     """
     65     out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_status')
     66     status = {}
     67     for field in ['Enabled', 'Owned', 'Being Owned', 'Ready']:
     68         match = re.search('TPM %s: (true|false)' % field, out)
     69         if not match:
     70             raise ChromiumOSError('Invalid TPM status: "%s".' % out)
     71         status[field] = match.group(1) == 'true'
     72     match = re.search('TPM Password: (\w*)', out)
     73     status['Password'] = ''
     74     if match:
     75         status['Password'] = match.group(1)
     76     return status
     77 
     78 
     79 def get_tpm_more_status():
     80     """Get more of the TPM status.
     81 
     82     Returns:
     83         A TPM more status dictionary, for example:
     84         { 'dictionary_attack_lockout_in_effect': False,
     85           'attestation_prepared': False,
     86           'boot_lockbox_finalized': False,
     87           'enabled': True,
     88           'owned': True,
     89           'owner_password': ''
     90           'dictionary_attack_counter': 0,
     91           'dictionary_attack_lockout_seconds_remaining': 0,
     92           'dictionary_attack_threshold': 10,
     93           'attestation_enrolled': False,
     94           'initialized': True,
     95           'verified_boot_measured': False,
     96           'install_lockbox_finalized': True
     97         }
     98         An empty dictionary is returned if the command is not supported.
     99     """
    100     status = {}
    101     out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_more_status | grep :')
    102     if out.startswith(UNAVAILABLE_ACTION):
    103         # --action=tpm_more_status only exists >= 41.
    104         logging.info('Method not supported!')
    105         return status
    106     for line in out.splitlines():
    107         items = line.strip().split(':')
    108         if items[1].strip() == 'false':
    109             value = False
    110         elif items[1].strip() == 'true':
    111             value = True
    112         elif items[1].strip().isdigit():
    113             value = int(items[1].strip())
    114         else:
    115             value = items[1].strip(' "')
    116         status[items[0]] = value
    117     return status
    118 
    119 
    120 def is_tpm_lockout_in_effect():
    121     """Returns true if the TPM lockout is in effect; false otherwise."""
    122     status = get_tpm_more_status()
    123     return status.get('dictionary_attack_lockout_in_effect', None)
    124 
    125 
    126 def get_login_status():
    127     """Query the login status
    128 
    129     Returns:
    130         A login status dictionary containing:
    131         { 'owner_user_exists': True|False,
    132           'boot_lockbox_finalized': True|False
    133         }
    134     """
    135     out = __run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status')
    136     status = {}
    137     for field in ['owner_user_exists', 'boot_lockbox_finalized']:
    138         match = re.search('%s: (true|false)' % field, out)
    139         if not match:
    140             raise ChromiumOSError('Invalid login status: "%s".' % out)
    141         status[field] = match.group(1) == 'true'
    142     return status
    143 
    144 
    145 def get_tpm_attestation_status():
    146     """Get the TPM attestation status.  Works similar to get_tpm_status().
    147     """
    148     out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_attestation_status')
    149     status = {}
    150     for field in ['Prepared', 'Enrolled']:
    151         match = re.search('Attestation %s: (true|false)' % field, out)
    152         if not match:
    153             raise ChromiumOSError('Invalid attestation status: "%s".' % out)
    154         status[field] = match.group(1) == 'true'
    155     return status
    156 
    157 
    158 def take_tpm_ownership():
    159     """Take TPM owernship.
    160 
    161     Blocks until TPM is owned.
    162     """
    163     __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership')
    164     __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_wait_ownership')
    165 
    166 
    167 def verify_ek():
    168     """Verify the TPM endorsement key.
    169 
    170     Returns true if EK is valid.
    171     """
    172     cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek'
    173     return (utils.system(cmd, ignore_status=True) == 0)
    174 
    175 
    176 def remove_vault(user):
    177     """Remove the given user's vault from the shadow directory."""
    178     logging.debug('user is %s', user)
    179     user_hash = get_user_hash(user)
    180     logging.debug('Removing vault for user %s with hash %s' % (user, user_hash))
    181     cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user
    182     __run_cmd(cmd)
    183     # Ensure that the vault does not exist.
    184     if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
    185         raise ChromiumOSError('Cryptohome could not remove the user\'s vault.')
    186 
    187 
    188 def remove_all_vaults():
    189     """Remove any existing vaults from the shadow directory.
    190 
    191     This function must be run with root privileges.
    192     """
    193     for item in os.listdir(constants.SHADOW_ROOT):
    194         abs_item = os.path.join(constants.SHADOW_ROOT, item)
    195         if os.path.isdir(os.path.join(abs_item, 'vault')):
    196             logging.debug('Removing vault for user with hash %s' % item)
    197             shutil.rmtree(abs_item)
    198 
    199 
    200 def mount_vault(user, password, create=False):
    201     """Mount the given user's vault."""
    202     args = [CRYPTOHOME_CMD, '--action=mount', '--user=%s' % user,
    203             '--password=%s' % password, '--async']
    204     if create:
    205         args.append('--create')
    206     logging.info(__run_cmd(' '.join(args)))
    207     # Ensure that the vault exists in the shadow directory.
    208     user_hash = get_user_hash(user)
    209     if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
    210         raise ChromiumOSError('Cryptohome vault not found after mount.')
    211     # Ensure that the vault is mounted.
    212     if not is_vault_mounted(
    213             user=user,
    214             device_regex=constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER,
    215             allow_fail=True):
    216         raise ChromiumOSError('Cryptohome created a vault but did not mount.')
    217 
    218 
    219 def mount_guest():
    220     """Mount the given user's vault."""
    221     args = [CRYPTOHOME_CMD, '--action=mount_guest', '--async']
    222     logging.info(__run_cmd(' '.join(args)))
    223     # Ensure that the guest tmpfs is mounted.
    224     if not is_guest_vault_mounted(allow_fail=True):
    225         raise ChromiumOSError('Cryptohome did not mount tmpfs.')
    226 
    227 
    228 def test_auth(user, password):
    229     cmd = [CRYPTOHOME_CMD, '--action=test_auth', '--user=%s' % user,
    230            '--password=%s' % password, '--async']
    231     return 'Authentication succeeded' in utils.system_output(cmd)
    232 
    233 
    234 def unmount_vault(user):
    235     """Unmount the given user's vault.
    236 
    237     Once unmounting for a specific user is supported, the user parameter will
    238     name the target user. See crosbug.com/20778.
    239     """
    240     __run_cmd(CRYPTOHOME_CMD + ' --action=unmount')
    241     # Ensure that the vault is not mounted.
    242     if is_vault_mounted(user, allow_fail=True):
    243         raise ChromiumOSError('Cryptohome did not unmount the user.')
    244 
    245 
    246 def __get_mount_info(mount_point, allow_fail=False):
    247     """Get information about the active mount at a given mount point."""
    248     cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts'
    249     try:
    250         logging.info(utils.system_output('cat %s' % cryptohomed_path))
    251         mount_line = utils.system_output(
    252             'grep %s %s' % (mount_point, cryptohomed_path),
    253             ignore_status=allow_fail)
    254     except Exception as e:
    255         logging.error(e)
    256         raise ChromiumOSError('Could not get info about cryptohome vault '
    257                               'through %s. See logs for complete mount-point.'
    258                               % os.path.dirname(str(mount_point)))
    259     return mount_line.split()
    260 
    261 
    262 def __get_user_mount_info(user, allow_fail=False):
    263     """Get information about the active mounts for a given user.
    264 
    265     Returns the active mounts at the user's user and system mount points. If no
    266     user is given, the active mount at the shared mount point is returned
    267     (regular users have a bind-mount at this mount point for backwards
    268     compatibility; the guest user has a mount at this mount point only).
    269     """
    270     return [__get_mount_info(mount_point=user_path(user),
    271                              allow_fail=allow_fail),
    272             __get_mount_info(mount_point=system_path(user),
    273                              allow_fail=allow_fail)]
    274 
    275 def is_vault_mounted(
    276         user,
    277         device_regex=constants.CRYPTOHOME_DEV_REGEX_ANY,
    278         fs_regex=constants.CRYPTOHOME_FS_REGEX_ANY,
    279         allow_fail=False):
    280     """Check whether a vault is mounted for the given user.
    281 
    282     If no user is given, the shared mount point is checked, determining whether
    283     a vault is mounted for any user.
    284     """
    285     user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail)
    286     for mount_info in user_mount_info:
    287         if (len(mount_info) < 3 or
    288                 not re.match(device_regex, mount_info[0]) or
    289                 not re.match(fs_regex, mount_info[2])):
    290             return False
    291     return True
    292 
    293 
    294 def is_guest_vault_mounted(allow_fail=False):
    295     """Check whether a vault backed by tmpfs is mounted for the guest user."""
    296     return is_vault_mounted(
    297         user=GUEST_USER_NAME,
    298         device_regex=constants.CRYPTOHOME_DEV_REGEX_GUEST,
    299         fs_regex=constants.CRYPTOHOME_FS_REGEX_TMPFS,
    300         allow_fail=allow_fail)
    301 
    302 
    303 def get_mounted_vault_devices(user, allow_fail=False):
    304     """Get the device(s) backing the vault mounted for the given user.
    305 
    306     Returns the devices mounted at the user's user and system mount points. If
    307     no user is given, the device mounted at the shared mount point is returned.
    308     """
    309     return [mount_info[0]
    310             for mount_info
    311             in __get_user_mount_info(user=user, allow_fail=allow_fail)
    312             if len(mount_info)]
    313 
    314 
    315 def canonicalize(credential):
    316     """Perform basic canonicalization of |email_address|.
    317 
    318     Perform basic canonicalization of |email_address|, taking into account that
    319     gmail does not consider '.' or caps inside a username to matter. It also
    320     ignores everything after a '+'. For example,
    321     c.masone+abc (at] gmail.com == cMaSone (at] gmail.com, per
    322     http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313
    323     """
    324     if not credential:
    325       return None
    326 
    327     parts = credential.split('@')
    328     if len(parts) != 2:
    329         raise error.TestError('Malformed email: ' + credential)
    330 
    331     (name, domain) = parts
    332     name = name.partition('+')[0]
    333     if (domain == constants.SPECIAL_CASE_DOMAIN):
    334         name = name.replace('.', '')
    335     return '@'.join([name, domain]).lower()
    336 
    337 
    338 def crash_cryptohomed():
    339     # Try to kill cryptohomed so we get something to work with.
    340     pid = __run_cmd('pgrep cryptohomed')
    341     try:
    342         pid = int(pid)
    343     except ValueError, e:  # empty or invalid string
    344         raise error.TestError('Cryptohomed was not running')
    345     utils.system('kill -ABRT %d' % pid)
    346     # CONT just in case cryptohomed had a spurious STOP.
    347     utils.system('kill -CONT %d' % pid)
    348     utils.poll_for_condition(
    349         lambda: utils.system('ps -p %d' % pid,
    350                              ignore_status=True) != 0,
    351             timeout=180,
    352             exception=error.TestError(
    353                 'Timeout waiting for cryptohomed to coredump'))
    354 
    355 
    356 class CryptohomeProxy(DBusClient):
    357     """A DBus proxy client for testing the Cryptohome DBus server.
    358     """
    359     CRYPTOHOME_BUS_NAME = 'org.chromium.Cryptohome'
    360     CRYPTOHOME_OBJECT_PATH = '/org/chromium/Cryptohome'
    361     CRYPTOHOME_INTERFACE = 'org.chromium.CryptohomeInterface'
    362     ASYNC_CALL_STATUS_SIGNAL = 'AsyncCallStatus'
    363     ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS = (
    364         'async_id', 'return_status', 'return_code'
    365     )
    366     DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
    367 
    368 
    369     def __init__(self, bus_loop=None):
    370         self.main_loop = gobject.MainLoop()
    371         if bus_loop is None:
    372             bus_loop = DBusGMainLoop(set_as_default=True)
    373         self.bus = dbus.SystemBus(mainloop=bus_loop)
    374         super(CryptohomeProxy, self).__init__(self.main_loop, self.bus,
    375                                               self.CRYPTOHOME_BUS_NAME,
    376                                               self.CRYPTOHOME_OBJECT_PATH)
    377         self.iface = dbus.Interface(self.proxy_object,
    378                                     self.CRYPTOHOME_INTERFACE)
    379         self.properties = dbus.Interface(self.proxy_object,
    380                                          self.DBUS_PROPERTIES_INTERFACE)
    381         self.handle_signal(self.CRYPTOHOME_INTERFACE,
    382                            self.ASYNC_CALL_STATUS_SIGNAL,
    383                            self.ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS)
    384 
    385 
    386     # Wrap all proxied calls to catch cryptohomed failures.
    387     def __call(self, method, *args):
    388         try:
    389             return method(*args, timeout=180)
    390         except dbus.exceptions.DBusException, e:
    391             if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply':
    392                 logging.error('Cryptohome is not responding. Sending ABRT')
    393                 crash_cryptohomed()
    394                 raise ChromiumOSError('cryptohomed aborted. Check crashes!')
    395             raise e
    396 
    397 
    398     def __wait_for_specific_signal(self, signal, data):
    399       """Wait for the |signal| with matching |data|
    400          Returns the resulting dict on success or {} on error.
    401       """
    402       # Do not bubble up the timeout here, just return {}.
    403       result = {}
    404       try:
    405           result = self.wait_for_signal(signal)
    406       except utils.TimeoutError:
    407           return {}
    408       for k in data.keys():
    409           if not result.has_key(k) or result[k] != data[k]:
    410             return {}
    411       return result
    412 
    413 
    414     # Perform a data-less async call.
    415     # TODO(wad) Add __async_data_call.
    416     def __async_call(self, method, *args):
    417         # Clear out any superfluous async call signals.
    418         self.clear_signal_content(self.ASYNC_CALL_STATUS_SIGNAL)
    419         out = self.__call(method, *args)
    420         logging.debug('Issued call ' + str(method) +
    421                       ' with async_id ' + str(out))
    422         result = {}
    423         try:
    424             # __wait_for_specific_signal has a 10s timeout
    425             result = utils.poll_for_condition(
    426                 lambda: self.__wait_for_specific_signal(
    427                     self.ASYNC_CALL_STATUS_SIGNAL, {'async_id' : out}),
    428                 timeout=180,
    429                 desc='matching %s signal' % self.ASYNC_CALL_STATUS_SIGNAL)
    430         except utils.TimeoutError, e:
    431             logging.error('Cryptohome timed out. Sending ABRT.')
    432             crash_cryptohomed()
    433             raise ChromiumOSError('cryptohomed aborted. Check crashes!')
    434         return result
    435 
    436 
    437     def mount(self, user, password, create=False, async=True):
    438         """Mounts a cryptohome.
    439 
    440         Returns True if the mount succeeds or False otherwise.
    441         TODO(ellyjones): Migrate mount_vault() to use a multi-user-safe
    442         heuristic, then remove this method. See <crosbug.com/20778>.
    443         """
    444         if async:
    445             return self.__async_call(self.iface.AsyncMount, user, password,
    446                                      create, False, [])['return_status']
    447         out = self.__call(self.iface.Mount, user, password, create, False, [])
    448         # Sync returns (return code, return status)
    449         return out[1] if len(out) > 1 else False
    450 
    451 
    452     def unmount(self, user):
    453         """Unmounts a cryptohome.
    454 
    455         Returns True if the unmount suceeds or false otherwise.
    456         TODO(ellyjones): Once there's a per-user unmount method, use it. See
    457         <crosbug.com/20778>.
    458         """
    459         return self.__call(self.iface.Unmount)
    460 
    461 
    462     def is_mounted(self, user):
    463         """Tests whether a user's cryptohome is mounted."""
    464         return (utils.is_mountpoint(user_path(user))
    465                 and utils.is_mountpoint(system_path(user)))
    466 
    467 
    468     def require_mounted(self, user):
    469         """Raises a test failure if a user's cryptohome is not mounted."""
    470         utils.require_mountpoint(user_path(user))
    471         utils.require_mountpoint(system_path(user))
    472 
    473 
    474     def migrate(self, user, oldkey, newkey, async=True):
    475         """Migrates the specified user's cryptohome from one key to another."""
    476         if async:
    477             return self.__async_call(self.iface.AsyncMigrateKey,
    478                                      user, oldkey, newkey)['return_status']
    479         return self.__call(self.iface.MigrateKey, user, oldkey, newkey)
    480 
    481 
    482     def remove(self, user, async=True):
    483         if async:
    484             return self.__async_call(self.iface.AsyncRemove,
    485                                      user)['return_status']
    486         return self.__call(self.iface.Remove, user)
    487 
    488 
    489     def ensure_clean_cryptohome_for(self, user, password=None):
    490         """Ensure a fresh cryptohome exists for user.
    491 
    492         @param user: user who needs a shiny new cryptohome.
    493         @param password: if unset, a random password will be used.
    494         """
    495         if not password:
    496             password = ''.join(random.sample(string.ascii_lowercase, 6))
    497         self.remove(user)
    498         self.mount(user, password, create=True)
    499