Home | History | Annotate | Download | only in login_UserPolicyKeys
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus, grp, os, pwd, stat
      6 from dbus.mainloop.glib import DBusGMainLoop
      7 
      8 from autotest_lib.client.bin import test, utils
      9 from autotest_lib.client.common_lib import error
     10 from autotest_lib.client.common_lib.cros import policy, session_manager
     11 from autotest_lib.client.cros import cros_ui, cryptohome, ownership
     12 
     13 
     14 class login_UserPolicyKeys(test.test):
     15     """Verifies that, after user policy is pushed, the user policy key winds
     16        up stored in the right place.
     17     """
     18     version = 1
     19 
     20     def _can_read(self, uid, gid, info):
     21         """Returns true if uid or gid can read a file with the info stat."""
     22         if uid == info.st_uid:
     23             return info.st_mode & stat.S_IRUSR
     24         if gid == info.st_gid:
     25             return info.st_mode & stat.S_IRGRP
     26         return info.st_mode & stat.S_IROTH
     27 
     28 
     29     def _can_execute(self, uid, gid, info):
     30         """Returns true if uid or gid can execute a file with the info stat."""
     31         if uid == info.st_uid:
     32             return info.st_mode & stat.S_IXUSR
     33         if gid == info.st_gid:
     34             return info.st_mode & stat.S_IXGRP
     35         return info.st_mode & stat.S_IXOTH
     36 
     37 
     38     def _verify_key_file(self, key_file):
     39         """Verifies that the key file has been created and is readable."""
     40         if not os.path.isfile(key_file):
     41             raise error.TestFail('%s does not exist!' % key_file)
     42         # And is readable by chronos.
     43         chronos_uid = pwd.getpwnam('chronos').pw_uid
     44         chronos_gid = grp.getgrnam('chronos').gr_gid
     45         info = os.stat(key_file)
     46         if not stat.S_ISREG(info.st_mode):
     47             raise error.TestFail('%s is not a regular file' % key_file)
     48         if not self._can_read(chronos_uid, chronos_gid, info):
     49             raise error.TestFail('chronos can\' read %s, mode is %s' %
     50                                  (key_file, oct(info.st_mode)))
     51         # All the parent directories must be executable by chronos.
     52         current = key_file
     53         parent = os.path.dirname(current)
     54         while current != parent:
     55             current = parent
     56             parent = os.path.dirname(parent)
     57             info = os.stat(current)
     58             mode = stat.S_IMODE(info.st_mode)
     59             if not self._can_execute(chronos_uid, chronos_gid, info):
     60                 raise error.TestFail('chronos can\'t execute %s, mode is %s' %
     61                                      (current, oct(info.st_mode)))
     62 
     63 
     64     def setup(self):
     65         os.chdir(self.srcdir)
     66         utils.make('OUT_DIR=.')
     67 
     68 
     69     def initialize(self):
     70         super(login_UserPolicyKeys, self).initialize()
     71         self._bus_loop = DBusGMainLoop(set_as_default=True)
     72         self._cryptohome_proxy = cryptohome.CryptohomeProxy(self._bus_loop)
     73 
     74         # Clear the user's vault, to make sure the test starts without any
     75         # policy or key lingering around. At this stage the session isn't
     76         # started and there's no user signed in.
     77         ownership.restart_ui_to_clear_ownership_files()
     78         self._cryptohome_proxy.remove(ownership.TESTUSER)
     79 
     80 
     81     def run_once(self):
     82         # Mount the vault, connect to session_manager and start the session.
     83         self._cryptohome_proxy.mount(ownership.TESTUSER,
     84                                      ownership.TESTPASS,
     85                                      create=True)
     86         sm = session_manager.connect(self._bus_loop)
     87         if not sm.StartSession(ownership.TESTUSER, ''):
     88             raise error.TestError('Could not start session')
     89 
     90         # No policy stored yet.
     91         retrieved_policy = sm.RetrievePolicyForUser(ownership.TESTUSER,
     92                                                     byte_arrays=True)
     93         if retrieved_policy:
     94             raise error.TestError('session_manager already has user policy!')
     95 
     96         # And no user key exists.
     97         key_file = ownership.get_user_policy_key_filename(ownership.TESTUSER)
     98         if os.path.exists(key_file):
     99             raise error.TestFail('%s exists before storing user policy!' %
    100                                  key_file)
    101 
    102         # Now store a policy. This is building a device policy protobuf, but
    103         # that's fine as far as the session_manager is concerned; it's the
    104         # outer PolicyFetchResponse that contains the public_key.
    105         public_key = ownership.known_pubkey()
    106         private_key = ownership.known_privkey()
    107         policy_data = policy.build_policy_data(self.srcdir)
    108         policy_response = policy.generate_policy(self.srcdir,
    109                                                  private_key,
    110                                                  public_key,
    111                                                  policy_data)
    112         try:
    113           result = sm.StorePolicyForUser(ownership.TESTUSER,
    114                                          dbus.ByteArray(policy_response))
    115           if not result:
    116               raise error.TestFail('Failed to store user policy')
    117         except dbus.exceptions.DBusException, e:
    118           raise error.TestFail('Failed to store user policy', e)
    119 
    120         # The policy key should have been created now.
    121         self._verify_key_file(key_file)
    122 
    123         # Restart the ui; the key should be deleted.
    124         self._cryptohome_proxy.unmount(ownership.TESTUSER)
    125         cros_ui.restart()
    126         if os.path.exists(key_file):
    127             raise error.TestFail('%s exists after restarting ui!' %
    128                                  key_file)
    129 
    130         # Starting a new session will restore the key that was previously
    131         # stored. Reconnect to the session_manager, since the restart killed it.
    132         self._cryptohome_proxy.mount(ownership.TESTUSER,
    133                                      ownership.TESTPASS,
    134                                      create=True)
    135         sm = session_manager.connect(self._bus_loop)
    136         if not sm.StartSession(ownership.TESTUSER, ''):
    137             raise error.TestError('Could not start session after restart')
    138         self._verify_key_file(key_file)
    139 
    140 
    141     def cleanup(self):
    142         cros_ui.restart()
    143         self._cryptohome_proxy.remove(ownership.TESTUSER)
    144         super(login_UserPolicyKeys, self).cleanup()
    145