1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import dbus, grp, os, pwd, stat 6 from dbus.mainloop.glib import DBusGMainLoop 7 8 from autotest_lib.client.bin import test, utils 9 from autotest_lib.client.common_lib import error 10 from autotest_lib.client.common_lib.cros import policy, session_manager 11 from autotest_lib.client.cros import cros_ui, cryptohome, ownership 12 13 14 class login_UserPolicyKeys(test.test): 15 """Verifies that, after user policy is pushed, the user policy key winds 16 up stored in the right place. 17 """ 18 version = 1 19 20 def _can_read(self, uid, gid, info): 21 """Returns true if uid or gid can read a file with the info stat.""" 22 if uid == info.st_uid: 23 return info.st_mode & stat.S_IRUSR 24 if gid == info.st_gid: 25 return info.st_mode & stat.S_IRGRP 26 return info.st_mode & stat.S_IROTH 27 28 29 def _can_execute(self, uid, gid, info): 30 """Returns true if uid or gid can execute a file with the info stat.""" 31 if uid == info.st_uid: 32 return info.st_mode & stat.S_IXUSR 33 if gid == info.st_gid: 34 return info.st_mode & stat.S_IXGRP 35 return info.st_mode & stat.S_IXOTH 36 37 38 def _verify_key_file(self, key_file): 39 """Verifies that the key file has been created and is readable.""" 40 if not os.path.isfile(key_file): 41 raise error.TestFail('%s does not exist!' % key_file) 42 # And is readable by chronos. 43 chronos_uid = pwd.getpwnam('chronos').pw_uid 44 chronos_gid = grp.getgrnam('chronos').gr_gid 45 info = os.stat(key_file) 46 if not stat.S_ISREG(info.st_mode): 47 raise error.TestFail('%s is not a regular file' % key_file) 48 if not self._can_read(chronos_uid, chronos_gid, info): 49 raise error.TestFail('chronos can\' read %s, mode is %s' % 50 (key_file, oct(info.st_mode))) 51 # All the parent directories must be executable by chronos. 52 current = key_file 53 parent = os.path.dirname(current) 54 while current != parent: 55 current = parent 56 parent = os.path.dirname(parent) 57 info = os.stat(current) 58 mode = stat.S_IMODE(info.st_mode) 59 if not self._can_execute(chronos_uid, chronos_gid, info): 60 raise error.TestFail('chronos can\'t execute %s, mode is %s' % 61 (current, oct(info.st_mode))) 62 63 64 def setup(self): 65 os.chdir(self.srcdir) 66 utils.make('OUT_DIR=.') 67 68 69 def initialize(self): 70 super(login_UserPolicyKeys, self).initialize() 71 self._bus_loop = DBusGMainLoop(set_as_default=True) 72 self._cryptohome_proxy = cryptohome.CryptohomeProxy(self._bus_loop) 73 74 # Clear the user's vault, to make sure the test starts without any 75 # policy or key lingering around. At this stage the session isn't 76 # started and there's no user signed in. 77 ownership.restart_ui_to_clear_ownership_files() 78 self._cryptohome_proxy.remove(ownership.TESTUSER) 79 80 81 def run_once(self): 82 # Mount the vault, connect to session_manager and start the session. 83 self._cryptohome_proxy.mount(ownership.TESTUSER, 84 ownership.TESTPASS, 85 create=True) 86 sm = session_manager.connect(self._bus_loop) 87 if not sm.StartSession(ownership.TESTUSER, ''): 88 raise error.TestError('Could not start session') 89 90 # No policy stored yet. 91 retrieved_policy = sm.RetrievePolicyForUser(ownership.TESTUSER, 92 byte_arrays=True) 93 if retrieved_policy: 94 raise error.TestError('session_manager already has user policy!') 95 96 # And no user key exists. 97 key_file = ownership.get_user_policy_key_filename(ownership.TESTUSER) 98 if os.path.exists(key_file): 99 raise error.TestFail('%s exists before storing user policy!' % 100 key_file) 101 102 # Now store a policy. This is building a device policy protobuf, but 103 # that's fine as far as the session_manager is concerned; it's the 104 # outer PolicyFetchResponse that contains the public_key. 105 public_key = ownership.known_pubkey() 106 private_key = ownership.known_privkey() 107 policy_data = policy.build_policy_data(self.srcdir) 108 policy_response = policy.generate_policy(self.srcdir, 109 private_key, 110 public_key, 111 policy_data) 112 try: 113 result = sm.StorePolicyForUser(ownership.TESTUSER, 114 dbus.ByteArray(policy_response)) 115 if not result: 116 raise error.TestFail('Failed to store user policy') 117 except dbus.exceptions.DBusException, e: 118 raise error.TestFail('Failed to store user policy', e) 119 120 # The policy key should have been created now. 121 self._verify_key_file(key_file) 122 123 # Restart the ui; the key should be deleted. 124 self._cryptohome_proxy.unmount(ownership.TESTUSER) 125 cros_ui.restart() 126 if os.path.exists(key_file): 127 raise error.TestFail('%s exists after restarting ui!' % 128 key_file) 129 130 # Starting a new session will restore the key that was previously 131 # stored. Reconnect to the session_manager, since the restart killed it. 132 self._cryptohome_proxy.mount(ownership.TESTUSER, 133 ownership.TESTPASS, 134 create=True) 135 sm = session_manager.connect(self._bus_loop) 136 if not sm.StartSession(ownership.TESTUSER, ''): 137 raise error.TestError('Could not start session after restart') 138 self._verify_key_file(key_file) 139 140 141 def cleanup(self): 142 cros_ui.restart() 143 self._cryptohome_proxy.remove(ownership.TESTUSER) 144 super(login_UserPolicyKeys, self).cleanup() 145