Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus, gobject, logging, os, random, re, shutil, string, time
      6 from dbus.mainloop.glib import DBusGMainLoop
      7 
      8 import common, constants
      9 from autotest_lib.client.bin import utils
     10 from autotest_lib.client.common_lib import error
     11 from autotest_lib.client.cros.cros_disks import DBusClient
     12 
     13 CRYPTOHOME_CMD = '/usr/sbin/cryptohome'
     14 GUEST_USER_NAME = '$guest'
     15 UNAVAILABLE_ACTION = 'Unknown action or no action given.'
     16 MOUNT_RETRY_COUNT = 20
     17 
     18 class ChromiumOSError(error.TestError):
     19     """Generic error for ChromiumOS-specific exceptions."""
     20     pass
     21 
     22 def __run_cmd(cmd):
     23     return utils.system_output(cmd + ' 2>&1', retain_output=True,
     24                                ignore_status=True).strip()
     25 
     26 def get_user_hash(user):
     27     """Get the user hash for the given user."""
     28     return utils.system_output(['cryptohome', '--action=obfuscate_user',
     29                                 '--user=%s' % user])
     30 
     31 
     32 def user_path(user):
     33     """Get the user mount point for the given user."""
     34     return utils.system_output(['cryptohome-path', 'user', user])
     35 
     36 
     37 def system_path(user):
     38     """Get the system mount point for the given user."""
     39     return utils.system_output(['cryptohome-path', 'system', user])
     40 
     41 
     42 def ensure_clean_cryptohome_for(user, password=None):
     43     """Ensure a fresh cryptohome exists for user.
     44 
     45     @param user: user who needs a shiny new cryptohome.
     46     @param password: if unset, a random password will be used.
     47     """
     48     if not password:
     49         password = ''.join(random.sample(string.ascii_lowercase, 6))
     50     remove_vault(user)
     51     mount_vault(user, password, create=True)
     52 
     53 
     54 def get_tpm_status():
     55     """Get the TPM status.
     56 
     57     Returns:
     58         A TPM status dictionary, for example:
     59         { 'Enabled': True,
     60           'Owned': True,
     61           'Being Owned': False,
     62           'Ready': True,
     63           'Password': ''
     64         }
     65     """
     66     out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_status')
     67     status = {}
     68     for field in ['Enabled', 'Owned', 'Being Owned', 'Ready']:
     69         match = re.search('TPM %s: (true|false)' % field, out)
     70         if not match:
     71             raise ChromiumOSError('Invalid TPM status: "%s".' % out)
     72         status[field] = match.group(1) == 'true'
     73     match = re.search('TPM Password: (\w*)', out)
     74     status['Password'] = ''
     75     if match:
     76         status['Password'] = match.group(1)
     77     return status
     78 
     79 
     80 def get_tpm_more_status():
     81     """Get more of the TPM status.
     82 
     83     Returns:
     84         A TPM more status dictionary, for example:
     85         { 'dictionary_attack_lockout_in_effect': False,
     86           'attestation_prepared': False,
     87           'boot_lockbox_finalized': False,
     88           'enabled': True,
     89           'owned': True,
     90           'owner_password': ''
     91           'dictionary_attack_counter': 0,
     92           'dictionary_attack_lockout_seconds_remaining': 0,
     93           'dictionary_attack_threshold': 10,
     94           'attestation_enrolled': False,
     95           'initialized': True,
     96           'verified_boot_measured': False,
     97           'install_lockbox_finalized': True
     98         }
     99         An empty dictionary is returned if the command is not supported.
    100     """
    101     status = {}
    102     out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_more_status | grep :')
    103     if out.startswith(UNAVAILABLE_ACTION):
    104         # --action=tpm_more_status only exists >= 41.
    105         logging.info('Method not supported!')
    106         return status
    107     for line in out.splitlines():
    108         items = line.strip().split(':')
    109         if items[1].strip() == 'false':
    110             value = False
    111         elif items[1].strip() == 'true':
    112             value = True
    113         elif items[1].strip().isdigit():
    114             value = int(items[1].strip())
    115         else:
    116             value = items[1].strip(' "')
    117         status[items[0]] = value
    118     return status
    119 
    120 
    121 def is_tpm_lockout_in_effect():
    122     """Returns true if the TPM lockout is in effect; false otherwise."""
    123     status = get_tpm_more_status()
    124     return status.get('dictionary_attack_lockout_in_effect', None)
    125 
    126 
    127 def get_login_status():
    128     """Query the login status
    129 
    130     Returns:
    131         A login status dictionary containing:
    132         { 'owner_user_exists': True|False,
    133           'boot_lockbox_finalized': True|False
    134         }
    135     """
    136     out = __run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status')
    137     status = {}
    138     for field in ['owner_user_exists', 'boot_lockbox_finalized']:
    139         match = re.search('%s: (true|false)' % field, out)
    140         if not match:
    141             raise ChromiumOSError('Invalid login status: "%s".' % out)
    142         status[field] = match.group(1) == 'true'
    143     return status
    144 
    145 
    146 def get_tpm_attestation_status():
    147     """Get the TPM attestation status.  Works similar to get_tpm_status().
    148     """
    149     out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_attestation_status')
    150     status = {}
    151     for field in ['Prepared', 'Enrolled']:
    152         match = re.search('Attestation %s: (true|false)' % field, out)
    153         if not match:
    154             raise ChromiumOSError('Invalid attestation status: "%s".' % out)
    155         status[field] = match.group(1) == 'true'
    156     return status
    157 
    158 
    159 def take_tpm_ownership():
    160     """Take TPM owernship.
    161 
    162     Blocks until TPM is owned.
    163     """
    164     __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership')
    165     __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_wait_ownership')
    166 
    167 
    168 def verify_ek():
    169     """Verify the TPM endorsement key.
    170 
    171     Returns true if EK is valid.
    172     """
    173     cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek'
    174     return (utils.system(cmd, ignore_status=True) == 0)
    175 
    176 
    177 def remove_vault(user):
    178     """Remove the given user's vault from the shadow directory."""
    179     logging.debug('user is %s', user)
    180     user_hash = get_user_hash(user)
    181     logging.debug('Removing vault for user %s with hash %s', user, user_hash)
    182     cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user
    183     __run_cmd(cmd)
    184     # Ensure that the vault does not exist.
    185     if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
    186         raise ChromiumOSError('Cryptohome could not remove the user\'s vault.')
    187 
    188 
    189 def remove_all_vaults():
    190     """Remove any existing vaults from the shadow directory.
    191 
    192     This function must be run with root privileges.
    193     """
    194     for item in os.listdir(constants.SHADOW_ROOT):
    195         abs_item = os.path.join(constants.SHADOW_ROOT, item)
    196         if os.path.isdir(os.path.join(abs_item, 'vault')):
    197             logging.debug('Removing vault for user with hash %s', item)
    198             shutil.rmtree(abs_item)
    199 
    200 
    201 def mount_vault(user, password, create=False):
    202     """Mount the given user's vault."""
    203     args = [CRYPTOHOME_CMD, '--action=mount', '--user=%s' % user,
    204             '--password=%s' % password, '--async']
    205     if create:
    206         args.append('--create')
    207     logging.info(__run_cmd(' '.join(args)))
    208     # Ensure that the vault exists in the shadow directory.
    209     user_hash = get_user_hash(user)
    210     if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
    211         retry = 0
    212         mounted = False
    213         while retry < MOUNT_RETRY_COUNT and not mounted:
    214             time.sleep(1)
    215             logging.info("Retry " + str(retry + 1))
    216             __run_cmd(' '.join(args))
    217             # TODO: Remove this additional call to get_user_hash(user) when
    218             # crbug.com/690994 is fixed
    219             user_hash = get_user_hash(user)
    220             if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
    221                 mounted = True
    222             retry += 1
    223         if not mounted:
    224             raise ChromiumOSError('Cryptohome vault not found after mount.')
    225     # Ensure that the vault is mounted.
    226     if not is_permanent_vault_mounted(user=user, allow_fail=True):
    227         raise ChromiumOSError('Cryptohome created a vault but did not mount.')
    228 
    229 
    230 def mount_guest():
    231     """Mount the given user's vault."""
    232     args = [CRYPTOHOME_CMD, '--action=mount_guest', '--async']
    233     logging.info(__run_cmd(' '.join(args)))
    234     # Ensure that the guest tmpfs is mounted.
    235     if not is_guest_vault_mounted(allow_fail=True):
    236         raise ChromiumOSError('Cryptohome did not mount tmpfs.')
    237 
    238 
    239 def test_auth(user, password):
    240     cmd = [CRYPTOHOME_CMD, '--action=test_auth', '--user=%s' % user,
    241            '--password=%s' % password, '--async']
    242     return 'Authentication succeeded' in utils.system_output(cmd)
    243 
    244 
    245 def unmount_vault(user):
    246     """Unmount the given user's vault.
    247 
    248     Once unmounting for a specific user is supported, the user parameter will
    249     name the target user. See crosbug.com/20778.
    250     """
    251     __run_cmd(CRYPTOHOME_CMD + ' --action=unmount')
    252     # Ensure that the vault is not mounted.
    253     if is_vault_mounted(user, allow_fail=True):
    254         raise ChromiumOSError('Cryptohome did not unmount the user.')
    255 
    256 
    257 def __get_mount_info(mount_point, allow_fail=False):
    258     """Get information about the active mount at a given mount point."""
    259     cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts'
    260     try:
    261         logging.debug("Active cryptohome mounts:\n" +
    262                       utils.system_output('cat %s' % cryptohomed_path))
    263         mount_line = utils.system_output(
    264             'grep %s %s' % (mount_point, cryptohomed_path),
    265             ignore_status=allow_fail)
    266     except Exception as e:
    267         logging.error(e)
    268         raise ChromiumOSError('Could not get info about cryptohome vault '
    269                               'through %s. See logs for complete mount-point.'
    270                               % os.path.dirname(str(mount_point)))
    271     return mount_line.split()
    272 
    273 
    274 def __get_user_mount_info(user, allow_fail=False):
    275     """Get information about the active mounts for a given user.
    276 
    277     Returns the active mounts at the user's user and system mount points. If no
    278     user is given, the active mount at the shared mount point is returned
    279     (regular users have a bind-mount at this mount point for backwards
    280     compatibility; the guest user has a mount at this mount point only).
    281     """
    282     return [__get_mount_info(mount_point=user_path(user),
    283                              allow_fail=allow_fail),
    284             __get_mount_info(mount_point=system_path(user),
    285                              allow_fail=allow_fail)]
    286 
    287 def is_vault_mounted(user, regexes=None, allow_fail=False):
    288     """Check whether a vault is mounted for the given user.
    289 
    290     user: If no user is given, the shared mount point is checked, determining
    291       whether a vault is mounted for any user.
    292     regexes: dictionary of regexes to matches against the mount information.
    293       The mount filesystem for the user's user and system mounts point must
    294       match one of the keys.
    295       The mount source point must match the selected device regex.
    296 
    297     In addition, if mounted over ext4, we check the directory is encrypted.
    298     """
    299     if regexes is None:
    300         regexes = {
    301             constants.CRYPTOHOME_FS_REGEX_ANY :
    302                constants.CRYPTOHOME_DEV_REGEX_ANY
    303         }
    304     user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail)
    305     for mount_info in user_mount_info:
    306         # Look at each /proc/../mount lines that match mount point for a given
    307         # user user/system mount (/home/user/.... /home/root/...)
    308 
    309         # We should have at least 3 arguments (source, mount, type of mount)
    310         if len(mount_info) < 3:
    311             return False
    312 
    313         device_regex = None
    314         for fs_regex in regexes.keys():
    315             if re.match(fs_regex, mount_info[2]):
    316                 device_regex = regexes[fs_regex]
    317                 break
    318 
    319         if not device_regex:
    320             # The thrid argument in not the expectd mount point type.
    321             return False
    322 
    323         # Check if the mount source match the device regex: it can be loose,
    324         # (anything) or stricter if we expect guest filesystem.
    325         if not re.match(device_regex, mount_info[0]):
    326             return False
    327 
    328         if re.match(constants.CRYPTOHOME_FS_REGEX_EXT4, mount_info[2]):
    329             # We are using ext4 crypto. Check there is an encryption key for
    330             # that directory.
    331             find_key_cmd_list = ['e4crypt  get_policy %s' % (mount_info[1]),
    332                                  'cut -d \' \' -f 2']
    333             key = __run_cmd(' | ' .join(find_key_cmd_list))
    334             cmd_list = ['keyctl show @s',
    335                         'grep %s' % (key),
    336                         'wc -l']
    337             out = __run_cmd(' | '.join(cmd_list))
    338             if int(out) != 1:
    339                 return False
    340     return True
    341 
    342 
    343 def is_guest_vault_mounted(allow_fail=False):
    344     """Check whether a vault backed by tmpfs is mounted for the guest user."""
    345     return is_vault_mounted(
    346         user=GUEST_USER_NAME,
    347         regexes={
    348             constants.CRYPTOHOME_FS_REGEX_TMPFS :
    349                 constants.CRYPTOHOME_DEV_REGEX_GUEST,
    350         },
    351         allow_fail=allow_fail)
    352 
    353 def is_permanent_vault_mounted(user, allow_fail=False):
    354     """Check if user is mounted over ecryptfs or ext4 crypto. """
    355     return is_vault_mounted(
    356         user=user,
    357         regexes={
    358             constants.CRYPTOHOME_FS_REGEX_ECRYPTFS :
    359                 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW,
    360             constants.CRYPTOHOME_FS_REGEX_EXT4 :
    361                 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_DEVICE,
    362         },
    363         allow_fail=allow_fail)
    364 
    365 def get_mounted_vault_path(user, allow_fail=False):
    366     """Get the path where the decrypted data for the user is located."""
    367     return os.path.join(constants.SHADOW_ROOT, get_user_hash(user), 'mount')
    368 
    369 
    370 def canonicalize(credential):
    371     """Perform basic canonicalization of |email_address|.
    372 
    373     Perform basic canonicalization of |email_address|, taking into account that
    374     gmail does not consider '.' or caps inside a username to matter. It also
    375     ignores everything after a '+'. For example,
    376     c.masone+abc (at] gmail.com == cMaSone (at] gmail.com, per
    377     http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313
    378     """
    379     if not credential:
    380       return None
    381 
    382     parts = credential.split('@')
    383     if len(parts) != 2:
    384         raise error.TestError('Malformed email: ' + credential)
    385 
    386     (name, domain) = parts
    387     name = name.partition('+')[0]
    388     if (domain == constants.SPECIAL_CASE_DOMAIN):
    389         name = name.replace('.', '')
    390     return '@'.join([name, domain]).lower()
    391 
    392 
    393 def crash_cryptohomed():
    394     # Try to kill cryptohomed so we get something to work with.
    395     pid = __run_cmd('pgrep cryptohomed')
    396     try:
    397         pid = int(pid)
    398     except ValueError, e:  # empty or invalid string
    399         raise error.TestError('Cryptohomed was not running')
    400     utils.system('kill -ABRT %d' % pid)
    401     # CONT just in case cryptohomed had a spurious STOP.
    402     utils.system('kill -CONT %d' % pid)
    403     utils.poll_for_condition(
    404         lambda: utils.system('ps -p %d' % pid,
    405                              ignore_status=True) != 0,
    406             timeout=180,
    407             exception=error.TestError(
    408                 'Timeout waiting for cryptohomed to coredump'))
    409 
    410 
    411 class CryptohomeProxy(DBusClient):
    412     """A DBus proxy client for testing the Cryptohome DBus server.
    413     """
    414     CRYPTOHOME_BUS_NAME = 'org.chromium.Cryptohome'
    415     CRYPTOHOME_OBJECT_PATH = '/org/chromium/Cryptohome'
    416     CRYPTOHOME_INTERFACE = 'org.chromium.CryptohomeInterface'
    417     ASYNC_CALL_STATUS_SIGNAL = 'AsyncCallStatus'
    418     ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS = (
    419         'async_id', 'return_status', 'return_code'
    420     )
    421     DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
    422 
    423 
    424     def __init__(self, bus_loop=None):
    425         self.main_loop = gobject.MainLoop()
    426         if bus_loop is None:
    427             bus_loop = DBusGMainLoop(set_as_default=True)
    428         self.bus = dbus.SystemBus(mainloop=bus_loop)
    429         super(CryptohomeProxy, self).__init__(self.main_loop, self.bus,
    430                                               self.CRYPTOHOME_BUS_NAME,
    431                                               self.CRYPTOHOME_OBJECT_PATH)
    432         self.iface = dbus.Interface(self.proxy_object,
    433                                     self.CRYPTOHOME_INTERFACE)
    434         self.properties = dbus.Interface(self.proxy_object,
    435                                          self.DBUS_PROPERTIES_INTERFACE)
    436         self.handle_signal(self.CRYPTOHOME_INTERFACE,
    437                            self.ASYNC_CALL_STATUS_SIGNAL,
    438                            self.ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS)
    439 
    440 
    441     # Wrap all proxied calls to catch cryptohomed failures.
    442     def __call(self, method, *args):
    443         try:
    444             return method(*args, timeout=180)
    445         except dbus.exceptions.DBusException, e:
    446             if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply':
    447                 logging.error('Cryptohome is not responding. Sending ABRT')
    448                 crash_cryptohomed()
    449                 raise ChromiumOSError('cryptohomed aborted. Check crashes!')
    450             raise e
    451 
    452 
    453     def __wait_for_specific_signal(self, signal, data):
    454       """Wait for the |signal| with matching |data|
    455          Returns the resulting dict on success or {} on error.
    456       """
    457       # Do not bubble up the timeout here, just return {}.
    458       result = {}
    459       try:
    460           result = self.wait_for_signal(signal)
    461       except utils.TimeoutError:
    462           return {}
    463       for k in data.keys():
    464           if not result.has_key(k) or result[k] != data[k]:
    465             return {}
    466       return result
    467 
    468 
    469     # Perform a data-less async call.
    470     # TODO(wad) Add __async_data_call.
    471     def __async_call(self, method, *args):
    472         # Clear out any superfluous async call signals.
    473         self.clear_signal_content(self.ASYNC_CALL_STATUS_SIGNAL)
    474         out = self.__call(method, *args)
    475         logging.debug('Issued call ' + str(method) +
    476                       ' with async_id ' + str(out))
    477         result = {}
    478         try:
    479             # __wait_for_specific_signal has a 10s timeout
    480             result = utils.poll_for_condition(
    481                 lambda: self.__wait_for_specific_signal(
    482                     self.ASYNC_CALL_STATUS_SIGNAL, {'async_id' : out}),
    483                 timeout=180,
    484                 desc='matching %s signal' % self.ASYNC_CALL_STATUS_SIGNAL)
    485         except utils.TimeoutError, e:
    486             logging.error('Cryptohome timed out. Sending ABRT.')
    487             crash_cryptohomed()
    488             raise ChromiumOSError('cryptohomed aborted. Check crashes!')
    489         return result
    490 
    491 
    492     def mount(self, user, password, create=False, async=True):
    493         """Mounts a cryptohome.
    494 
    495         Returns True if the mount succeeds or False otherwise.
    496         TODO(ellyjones): Migrate mount_vault() to use a multi-user-safe
    497         heuristic, then remove this method. See <crosbug.com/20778>.
    498         """
    499         if async:
    500             return self.__async_call(self.iface.AsyncMount, user, password,
    501                                      create, False, [])['return_status']
    502         out = self.__call(self.iface.Mount, user, password, create, False, [])
    503         # Sync returns (return code, return status)
    504         return out[1] if len(out) > 1 else False
    505 
    506 
    507     def unmount(self, user):
    508         """Unmounts a cryptohome.
    509 
    510         Returns True if the unmount suceeds or false otherwise.
    511         TODO(ellyjones): Once there's a per-user unmount method, use it. See
    512         <crosbug.com/20778>.
    513         """
    514         return self.__call(self.iface.Unmount)
    515 
    516 
    517     def is_mounted(self, user):
    518         """Tests whether a user's cryptohome is mounted."""
    519         return (utils.is_mountpoint(user_path(user))
    520                 and utils.is_mountpoint(system_path(user)))
    521 
    522 
    523     def require_mounted(self, user):
    524         """Raises a test failure if a user's cryptohome is not mounted."""
    525         utils.require_mountpoint(user_path(user))
    526         utils.require_mountpoint(system_path(user))
    527 
    528 
    529     def migrate(self, user, oldkey, newkey, async=True):
    530         """Migrates the specified user's cryptohome from one key to another."""
    531         if async:
    532             return self.__async_call(self.iface.AsyncMigrateKey,
    533                                      user, oldkey, newkey)['return_status']
    534         return self.__call(self.iface.MigrateKey, user, oldkey, newkey)
    535 
    536 
    537     def remove(self, user, async=True):
    538         if async:
    539             return self.__async_call(self.iface.AsyncRemove,
    540                                      user)['return_status']
    541         return self.__call(self.iface.Remove, user)
    542 
    543 
    544     def ensure_clean_cryptohome_for(self, user, password=None):
    545         """Ensure a fresh cryptohome exists for user.
    546 
    547         @param user: user who needs a shiny new cryptohome.
    548         @param password: if unset, a random password will be used.
    549         """
    550         if not password:
    551             password = ''.join(random.sample(string.ascii_lowercase, 6))
    552         self.remove(user)
    553         self.mount(user, password, create=True)
    554