Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Utility functions used for PKCS#11 library testing."""
      6 
      7 import grp, logging, os, pwd, re, stat, sys, shutil, pwd, grp
      8 
      9 from autotest_lib.client.bin import utils
     10 from autotest_lib.client.common_lib import error
     11 
     12 USER_TOKEN_PREFIX = 'User TPM Token '
     13 TMP_CHAPS_DIR = '/tmp/chaps'
     14 CHAPS_DIR_PERM = 0750
     15 SYSTEM_TOKEN_NAME = 'System TPM Token'
     16 SYSTEM_TOKEN_DIR = '/var/lib/chaps'
     17 INVALID_SLOT_ID = '100'
     18 
     19 
     20 def __run_cmd(cmd, ignore_status=False):
     21     """Runs a command and returns the output from both stdout and stderr."""
     22     return utils.system_output(cmd + ' 2>&1', retain_output=True,
     23                                ignore_status=ignore_status).strip()
     24 
     25 def __get_token_paths(exclude_system_token):
     26     """Return a list with a path for each PKCS #11 token currently loaded."""
     27     token_paths = []
     28     for line in __run_cmd('chaps_client --list').split('\n'):
     29         match = re.search(r'Slot \d+: (/.*)$', line)
     30         if match:
     31             if exclude_system_token and match.group(1) == SYSTEM_TOKEN_DIR:
     32                 continue
     33             token_paths.append(match.group(1))
     34     return token_paths
     35 
     36 def __get_pkcs11_file_list(token_path):
     37     """Return string with PKCS#11 file paths and their associated metadata."""
     38     find_args = '-printf "\'%p\', \'%u:%g\', 0%m\n"'
     39     file_list_output = __run_cmd('find %s ' % token_path + find_args)
     40     return file_list_output
     41 
     42 def __get_token_slot_by_path(token_path):
     43     token_list = __run_cmd('p11_replay --list_tokens')
     44     for line in token_list.split('\n'):
     45         match = re.search(r'^Slot (\d+): ' + token_path, line)
     46         if not match:
     47             continue
     48         return match.group(1)
     49     return INVALID_SLOT_ID
     50 
     51 def __verify_tokenname(token_path):
     52     """Verify that the TPM token name is correct."""
     53     # The token path is expected to be of the form:
     54     # /home/root/<obfuscated_user_id>/chaps
     55     match = re.search(r'/home/root/(.*)/chaps', token_path)
     56     if not match:
     57         return False
     58     obfuscated_user = match.group(1)
     59     # We expect the token label to contain first 16 characters of the obfuscated
     60     # user id. This is the same value we extracted from |token_path|.
     61     expected_user_token_label = USER_TOKEN_PREFIX + obfuscated_user[:16]
     62     # The p11_replay tool will list tokens in the following form:
     63     # Slot 1: <token label>
     64     token_list = __run_cmd('p11_replay --list_tokens')
     65     for line in token_list.split('\n'):
     66         match = re.search(r'^Slot \d+: (.*)$', line)
     67         if not match:
     68             continue
     69         token_label = match.group(1).rstrip()
     70         if (token_label == expected_user_token_label):
     71             return True
     72         # Ignore the system token label.
     73         if token_label == SYSTEM_TOKEN_NAME:
     74             continue
     75         logging.error('Unexpected token label: |%s|', token_label)
     76     logging.error('Invalid or missing PKCS#11 token label!')
     77     return False
     78 
     79 def __verify_permissions(token_path):
     80     """Verify that the permissions on the initialized token dir are correct."""
     81     # List of 3-tuples consisting of (path, user:group, octal permissions).
     82     # Can be generated (for example), by:
     83     # find <token_path>/chaps -printf "'%p', '%u:%g', 0%m\n"
     84     expected_permissions = [
     85         (token_path, 'chaps:chronos-access', CHAPS_DIR_PERM),
     86         ('%s/database' % token_path, 'chaps:chronos-access', CHAPS_DIR_PERM)]
     87     for item in expected_permissions:
     88         path = item[0]
     89         (user, group) = item[1].split(':')
     90         perms = item[2]
     91         stat_buf = os.lstat(path)
     92         if not stat_buf:
     93             logging.error('Could not stat %s while checking for permissions.',
     94                           path)
     95             return False
     96         # Check ownership.
     97         path_user = pwd.getpwuid(stat_buf.st_uid).pw_name
     98         path_group = grp.getgrgid(stat_buf.st_gid).gr_name
     99         if path_user != user or path_group != group:
    100             logging.error('Ownership of %s does not match! Got = (%s, %s)'
    101                           ', Expected = (%s, %s)', path, path_user, path_group,
    102                           user, group)
    103             return False
    104 
    105         # Check permissions.
    106         path_perms = stat.S_IMODE(stat_buf.st_mode)
    107         if path_perms != perms:
    108             logging.error('Permissions for %s do not match! (Got = %s'
    109                           ', Expected = %s)', path, oct(path_perms), oct(perms))
    110             return False
    111 
    112     return True
    113 
    114 def verify_pkcs11_initialized():
    115     """Checks if the PKCS#11 token is initialized properly."""
    116     token_path_list = __get_token_paths(exclude_system_token=True)
    117     if len(token_path_list) != 1:
    118         logging.error('Expecting a single signed-in user with a token.')
    119         return False
    120 
    121     verify_cmd = ('cryptohome --action=pkcs11_token_status')
    122     __run_cmd(verify_cmd)
    123 
    124     verify_result = True
    125     # Do additional sanity tests.
    126     if not __verify_tokenname(token_path_list[0]):
    127         logging.error('Verification of token name failed!')
    128         verify_result = False
    129     if not __verify_permissions(token_path_list[0]):
    130         logging.error('PKCS#11 file list:\n%s',
    131                       __get_pkcs11_file_list(token_path_list[0]))
    132         logging.error(
    133             'Verification of PKCS#11 subsystem and token permissions failed!')
    134         verify_result = False
    135     return verify_result
    136 
    137 def load_p11_test_token(auth_data='1234'):
    138     """Loads the test token onto a slot.
    139 
    140     @param auth_data: The authorization data to use for the token.
    141     """
    142     utils.system('sudo chaps_client --load --path=%s --auth="%s"' %
    143                  (TMP_CHAPS_DIR, auth_data))
    144 
    145 def change_p11_test_token_auth_data(auth_data, new_auth_data):
    146     """Changes authorization data for the test token.
    147 
    148     @param auth_data: The current authorization data.
    149     @param new_auth_data: The new authorization data.
    150     """
    151     utils.system('sudo chaps_client --change_auth --path=%s --auth="%s" '
    152                  '--new_auth="%s"' % (TMP_CHAPS_DIR, auth_data, new_auth_data))
    153 
    154 def unload_p11_test_token():
    155     """Unloads a loaded test token."""
    156     utils.system('sudo chaps_client --unload --path=%s' % TMP_CHAPS_DIR)
    157 
    158 def copytree_with_ownership(src, dst):
    159     """Like shutil.copytree but also copies owner and group attributes.
    160     @param src: Source directory.
    161     @param dst: Destination directory.
    162     """
    163     utils.system('cp -rp %s %s' % (src, dst))
    164 
    165 def setup_p11_test_token(unload_user_tokens, auth_data='1234'):
    166     """Configures a PKCS #11 token for testing.
    167 
    168     Any existing test token will be automatically cleaned up.
    169 
    170     @param unload_user_tokens: Whether to unload all user tokens.
    171     @param auth_data: Initial token authorization data.
    172     """
    173     cleanup_p11_test_token()
    174     if unload_user_tokens:
    175         for path in __get_token_paths(exclude_system_token=False):
    176             utils.system('sudo chaps_client --unload --path=%s' % path)
    177     os.makedirs(TMP_CHAPS_DIR)
    178     uid = pwd.getpwnam('chaps')[2]
    179     gid = grp.getgrnam('chronos-access')[2]
    180     os.chown(TMP_CHAPS_DIR, uid, gid)
    181     os.chmod(TMP_CHAPS_DIR, CHAPS_DIR_PERM)
    182     load_p11_test_token(auth_data)
    183     unload_p11_test_token()
    184     copytree_with_ownership(TMP_CHAPS_DIR, '%s_bak' % TMP_CHAPS_DIR)
    185 
    186 def restore_p11_test_token():
    187     """Restores a PKCS #11 test token to its initial state."""
    188     shutil.rmtree(TMP_CHAPS_DIR)
    189     copytree_with_ownership('%s_bak' % TMP_CHAPS_DIR, TMP_CHAPS_DIR)
    190 
    191 def get_p11_test_token_db_path():
    192     """Returns the test token database path."""
    193     return '%s/database' % TMP_CHAPS_DIR
    194 
    195 def verify_p11_test_token():
    196     """Verifies that a test token is working and persistent."""
    197     output = __run_cmd('p11_replay --generate --replay_wifi',
    198                        ignore_status=True)
    199     if not re.search('Sign: CKR_OK', output):
    200         print >> sys.stderr, output
    201         return False
    202     unload_p11_test_token()
    203     load_p11_test_token()
    204     output = __run_cmd('p11_replay --replay_wifi --cleanup',
    205                        ignore_status=True)
    206     if not re.search('Sign: CKR_OK', output):
    207         print >> sys.stderr, output
    208         return False
    209     return True
    210 
    211 def cleanup_p11_test_token():
    212     """Deletes the test token."""
    213     unload_p11_test_token()
    214     shutil.rmtree(TMP_CHAPS_DIR, ignore_errors=True)
    215     shutil.rmtree('%s_bak' % TMP_CHAPS_DIR, ignore_errors=True)
    216 
    217 def wait_for_pkcs11_token():
    218     """Waits for the PKCS #11 token to be available.
    219 
    220     This should be called only after a login and is typically called immediately
    221     after a login.
    222 
    223     Returns:
    224         True if the token is available.
    225     """
    226     try:
    227         utils.poll_for_condition(
    228             lambda: utils.system('cryptohome --action=pkcs11_token_status',
    229                                  ignore_status=True) == 0,
    230             desc='PKCS #11 token.',
    231             timeout=300)
    232     except utils.TimeoutError:
    233         return False
    234     return True
    235 
    236 def __p11_replay_on_user_token(extra_args=''):
    237     """Executes a typical command replay on the current user token.
    238 
    239     Args:
    240         extra_args: Additional arguments to pass to p11_replay.
    241 
    242     Returns:
    243         The command output.
    244     """
    245     if not wait_for_pkcs11_token():
    246        raise error.TestError('Timeout while waiting for pkcs11 token')
    247     return __run_cmd('p11_replay --slot=%s %s'
    248                      % (__get_token_slot_by_path(USER_TOKEN_PREFIX),
    249                         extra_args),
    250                      ignore_status=True)
    251 
    252 def inject_and_test_key():
    253     """Injects a key into a PKCS #11 token and tests that it can sign."""
    254     output = __p11_replay_on_user_token('--replay_wifi --inject')
    255     return re.search('Sign: CKR_OK', output)
    256 
    257 def test_and_cleanup_key():
    258     """Tests a PKCS #11 key before deleting it."""
    259     output = __p11_replay_on_user_token('--replay_wifi --cleanup')
    260     return re.search('Sign: CKR_OK', output)
    261 
    262 def generate_user_key():
    263     """Generates a key in the current user token."""
    264     output = __p11_replay_on_user_token('--generate')
    265     return re.search('Sign: CKR_OK', output)
    266 
    267