Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging, os, shutil, tempfile
      6 
      7 import common, constants, cryptohome
      8 from autotest_lib.client.bin import utils
      9 from autotest_lib.client.common_lib import autotemp, error
     10 from autotest_lib.client.cros import cros_ui
     11 
     12 
     13 PK12UTIL = 'pk12util'
     14 CERTUTIL = 'certutil'
     15 OPENSSLP12 = 'openssl pkcs12'
     16 OPENSSLX509 = 'openssl x509'
     17 OPENSSLRSA = 'openssl rsa'
     18 OPENSSLREQ = 'openssl req'
     19 OPENSSLCRYPTO = 'openssl sha1'
     20 
     21 TESTUSER = 'ownership_test (at] chromium.org'
     22 TESTPASS = 'testme'
     23 
     24 
     25 class OwnershipError(error.TestError):
     26     """Generic error for ownership-related failures."""
     27     pass
     28 
     29 
     30 class scoped_tempfile(object):
     31     """A wrapper that provides scoped semantics for temporary files.
     32 
     33     Providing a file path causes the scoped_tempfile to take ownership of the
     34     file at the provided path.  The file at the path will be deleted when this
     35     object goes out of scope.  If no path is provided, then a temporary file
     36     object will be created for the lifetime of the scoped_tempfile
     37 
     38     autotemp.tempfile objects don't seem to play nicely with being
     39     used in system commands, so they can't be used for my purposes.
     40     """
     41 
     42     tempdir = autotemp.tempdir(unique_id='ownership')
     43 
     44     def __init__(self, name=None):
     45         self.name = name
     46         if not self.name:
     47             self.fo = tempfile.TemporaryFile()
     48 
     49 
     50     def __del__(self):
     51         if self.name:
     52             if os.path.exists(self.name):
     53                 os.unlink(self.name)
     54         else:
     55             self.fo.close()  # Will destroy the underlying tempfile
     56 
     57 
     58 def system_output_on_fail(cmd):
     59     """Run a |cmd|, capturing output and logging it only on error.
     60 
     61     @param cmd: the command to run.
     62     """
     63     output = None
     64     try:
     65         output = utils.system_output(cmd)
     66     except:
     67         logging.error(output)
     68         raise
     69 
     70 
     71 def __unlink(filename):
     72     """unlink a file, but log OSError and IOError instead of raising.
     73 
     74     This allows unlinking files that don't exist safely.
     75 
     76     @param filename: the file to attempt to unlink.
     77     """
     78     try:
     79         os.unlink(filename)
     80     except (IOError, OSError) as error:
     81         logging.info(error)
     82 
     83 
     84 def restart_ui_to_clear_ownership_files():
     85     """Remove on-disk state related to device ownership.
     86 
     87     The UI must be stopped while we do this, or the session_manager will
     88     write the policy and key files out again.
     89     """
     90     cros_ui.stop(allow_fail=not cros_ui.is_up())
     91     clear_ownership_files_no_restart()
     92     cros_ui.start()
     93 
     94 
     95 def clear_ownership_files_no_restart():
     96     """Remove on-disk state related to device ownership.
     97 
     98     The UI must be stopped while we do this, or the session_manager will
     99     write the policy and key files out again.
    100     """
    101     if cros_ui.is_up():
    102         raise error.TestError("Tried to clear ownership with UI running.")
    103     __unlink(constants.OWNER_KEY_FILE)
    104     __unlink(constants.SIGNED_POLICY_FILE)
    105     __unlink(os.path.join(constants.USER_DATA_DIR, 'Local State'))
    106 
    107 
    108 def fake_ownership():
    109     """Fake ownership by generating the necessary magic files."""
    110     # Determine the module directory.
    111     dirname = os.path.dirname(__file__)
    112     mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
    113     mock_signedpolicyfile = os.path.join(dirname,
    114                                          constants.MOCK_OWNER_POLICY)
    115     utils.open_write_close(constants.OWNER_KEY_FILE,
    116                            cert_extract_pubkey_der(mock_certfile))
    117     shutil.copy(mock_signedpolicyfile,
    118                 constants.SIGNED_POLICY_FILE)
    119 
    120 
    121 POLICY_TYPE = 'google/chromeos/device'
    122 
    123 
    124 def assert_has_policy_data(response_proto):
    125     """Assert that given protobuf has a policy_data field.
    126 
    127     @param response_proto: a PolicyFetchResponse protobuf.
    128     @raises OwnershipError on failure.
    129     """
    130     if not response_proto.HasField("policy_data"):
    131         raise OwnershipError('Malformatted response.')
    132 
    133 
    134 def assert_has_device_settings(data_proto):
    135     """Assert that given protobuf is a policy with device settings in it.
    136 
    137     @param data_proto: a PolicyData protobuf.
    138     @raises OwnershipError if this isn't CrOS policy, or has no settings inside.
    139     """
    140     if (not data_proto.HasField("policy_type") or
    141         data_proto.policy_type != POLICY_TYPE or
    142         not data_proto.HasField("policy_value")):
    143         raise OwnershipError('Malformatted response.')
    144 
    145 
    146 def assert_username(data_proto, username):
    147     """Assert that given protobuf is a policy associated with the given user.
    148 
    149     @param data_proto: a PolicyData protobuf.
    150     @param username: the username to check for
    151     @raises OwnershipError if data_proto isn't associated with username
    152     """
    153     if data_proto.username != username:
    154         raise OwnershipError('Incorrect username.')
    155 
    156 
    157 def assert_guest_setting(settings, guests):
    158     """Assert that given protobuf has given guest-related settings.
    159 
    160     @param settings: a ChromeDeviceSettingsProto protobuf.
    161     @param guests: boolean indicating whether guests are allowed to sign in.
    162     @raises OwnershipError if settings doesn't enforce the provided setting.
    163     """
    164     if not settings.HasField("guest_mode_enabled"):
    165         raise OwnershipError('No guest mode setting protobuf.')
    166     if not settings.guest_mode_enabled.HasField("guest_mode_enabled"):
    167         raise OwnershipError('No guest mode setting.')
    168     if settings.guest_mode_enabled.guest_mode_enabled != guests:
    169         raise OwnershipError('Incorrect guest mode setting.')
    170 
    171 
    172 def assert_show_users(settings, show_users):
    173     """Assert that given protobuf has given user-avatar-showing settings.
    174 
    175     @param settings: a ChromeDeviceSettingsProto protobuf.
    176     @param show_users: boolean indicating whether avatars are shown on sign in.
    177     @raises OwnershipError if settings doesn't enforce the provided setting.
    178     """
    179     if not settings.HasField("show_user_names"):
    180         raise OwnershipError('No show users setting protobuf.')
    181     if not settings.show_user_names.HasField("show_user_names"):
    182         raise OwnershipError('No show users setting.')
    183     if settings.show_user_names.show_user_names != show_users:
    184         raise OwnershipError('Incorrect show users setting.')
    185 
    186 
    187 def assert_roaming(settings, roaming):
    188     """Assert that given protobuf has given roaming settings.
    189 
    190     @param settings: a ChromeDeviceSettingsProto protobuf.
    191     @param roaming: boolean indicating whether roaming is allowed.
    192     @raises OwnershipError if settings doesn't enforce the provided setting.
    193     """
    194     if not settings.HasField("data_roaming_enabled"):
    195         raise OwnershipError('No roaming setting protobuf.')
    196     if not settings.data_roaming_enabled.HasField("data_roaming_enabled"):
    197         raise OwnershipError('No roaming setting.')
    198     if settings.data_roaming_enabled.data_roaming_enabled != roaming:
    199         raise OwnershipError('Incorrect roaming setting.')
    200 
    201 
    202 def assert_new_users(settings, new_users):
    203     """Assert that given protobuf has given new user settings.
    204 
    205     @param settings: a ChromeDeviceSettingsProto protobuf.
    206     @param new_users: boolean indicating whether adding users is allowed.
    207     @raises OwnershipError if settings doesn't enforce the provided setting.
    208     """
    209     if not settings.HasField("allow_new_users"):
    210         raise OwnershipError('No allow new users setting protobuf.')
    211     if not settings.allow_new_users.HasField("allow_new_users"):
    212         raise OwnershipError('No allow new users setting.')
    213     if settings.allow_new_users.allow_new_users != new_users:
    214         raise OwnershipError('Incorrect allow new users setting.')
    215 
    216 
    217 def assert_users_on_whitelist(settings, users):
    218     """Assert that given protobuf has given users on the whitelist.
    219 
    220     @param settings: a ChromeDeviceSettingsProto protobuf.
    221     @param users: iterable containing usernames that should be on whitelist.
    222     @raises OwnershipError if settings doesn't enforce the provided setting.
    223     """
    224     if settings.HasField("user_whitelist"):
    225         for user in users:
    226             if user not in settings.user_whitelist.user_whitelist:
    227                 raise OwnershipError(user + ' not whitelisted.')
    228     else:
    229         raise OwnershipError('No user whitelist.')
    230 
    231 
    232 def __user_nssdb(user):
    233     """Returns the path to the NSSDB for the provided user.
    234 
    235     @param user: the user whose NSSDB the caller wants.
    236     @return: absolute path to user's NSSDB.
    237     """
    238     return os.path.join(cryptohome.user_path(user), '.pki', 'nssdb')
    239 
    240 
    241 def use_known_ownerkeys(user):
    242     """Sets the system up to use a well-known keypair for owner operations.
    243 
    244     Assuming the appropriate cryptohome is already mounted, configures the
    245     device to accept policies signed with the checked-in 'mock' owner key.
    246 
    247     @param user: the user whose NSSDB should be populated with key material.
    248     """
    249     dirname = os.path.dirname(__file__)
    250     mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY)
    251     mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
    252     push_to_nss(mock_keyfile, mock_certfile, __user_nssdb(user))
    253     utils.open_write_close(constants.OWNER_KEY_FILE,
    254                            cert_extract_pubkey_der(mock_certfile))
    255 
    256 
    257 def known_privkey():
    258     """Returns the mock owner private key in PEM format.
    259 
    260     @return: mock owner private key in PEM format.
    261     """
    262     dirname = os.path.dirname(__file__)
    263     return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY))
    264 
    265 
    266 def known_pubkey():
    267     """Returns the mock owner public key in DER format.
    268 
    269     @return: mock owner public key in DER format.
    270     """
    271     dirname = os.path.dirname(__file__)
    272     return cert_extract_pubkey_der(os.path.join(dirname,
    273                                                 constants.MOCK_OWNER_CERT))
    274 
    275 
    276 def pairgen():
    277     """Generate a self-signed cert and associated private key.
    278 
    279     Generates a self-signed X509 certificate and the associated private key.
    280     The key is 2048 bits.  The generated material is stored in PEM format
    281     and the paths to the two files are returned.
    282 
    283     The caller is responsible for cleaning up these files.
    284 
    285     @return: (/path/to/private_key, /path/to/self-signed_cert)
    286     """
    287     keyfile = scoped_tempfile.tempdir.name + '/private.key'
    288     certfile = scoped_tempfile.tempdir.name + '/cert.pem'
    289     cmd = '%s -x509 -subj %s -newkey rsa:2048 -nodes -keyout %s -out %s' % (
    290         OPENSSLREQ, '/CN=me', keyfile, certfile)
    291     system_output_on_fail(cmd)
    292     return (keyfile, certfile)
    293 
    294 
    295 def pairgen_as_data():
    296     """Generates keypair, returns keys as data.
    297 
    298     Generates a fresh owner keypair and then passes back the
    299     PEM-encoded private key and the DER-encoded public key.
    300 
    301     @return: (PEM-encoded private key, DER-encoded public key)
    302     """
    303     (keypath, certpath) = pairgen()
    304     keyfile = scoped_tempfile(keypath)
    305     certfile = scoped_tempfile(certpath)
    306     return (utils.read_file(keyfile.name),
    307             cert_extract_pubkey_der(certfile.name))
    308 
    309 
    310 def push_to_nss(keyfile, certfile, nssdb):
    311     """Takes a pre-generated key pair and pushes them to an NSS DB.
    312 
    313     Given paths to a private key and cert in PEM format, stores the pair
    314     in the provided nssdb.
    315 
    316     @param keyfile: path to PEM-formatted private key file.
    317     @param certfile: path to PEM-formatted cert file for associated public key.
    318     @param nssdb: path to NSSDB to be populated with the provided keys.
    319     """
    320     for_push = scoped_tempfile(scoped_tempfile.tempdir.name + '/for_push.p12')
    321     cmd = '%s -export -in %s -inkey %s -out %s ' % (
    322         OPENSSLP12, certfile, keyfile, for_push.name)
    323     cmd += '-passin pass: -passout pass:'
    324     system_output_on_fail(cmd)
    325     cmd = '%s -d "sql:%s" -i %s -W ""' % (PK12UTIL,
    326                                           nssdb,
    327                                           for_push.name)
    328     system_output_on_fail(cmd)
    329 
    330 
    331 def cert_extract_pubkey_der(pem):
    332     """Given a PEM-formatted cert, extracts the public key in DER format.
    333 
    334     Pass in an X509 certificate in PEM format, and you'll get back the
    335     DER-formatted public key as a string.
    336 
    337     @param pem: path to a PEM-formatted cert file.
    338     @return: DER-encoded public key from cert, as a string.
    339     """
    340     outfile = scoped_tempfile(scoped_tempfile.tempdir.name + '/pubkey.der')
    341     cmd = '%s -in %s -pubkey -noout ' % (OPENSSLX509, pem)
    342     cmd += '| %s -outform DER -pubin -out %s' % (OPENSSLRSA,
    343                                                  outfile.name)
    344     system_output_on_fail(cmd)
    345     der = utils.read_file(outfile.name)
    346     return der
    347 
    348 
    349 def sign(pem_key, data):
    350     """Signs |data| with key from |pem_key|, returns signature.
    351 
    352     Using the PEM-formatted private key in |pem_key|, generates an
    353     RSA-with-SHA1 signature over |data| and returns the signature in
    354     a string.
    355 
    356     @param pem_key: PEM-formatted private key, as a string.
    357     @param data: data to be signed.
    358     @return: signature as a string.
    359     """
    360     sig = scoped_tempfile()
    361     err = scoped_tempfile()
    362     data_file = scoped_tempfile()
    363     data_file.fo.write(data)
    364     data_file.fo.seek(0)
    365 
    366     pem_key_file = scoped_tempfile(scoped_tempfile.tempdir.name + '/pkey.pem')
    367     utils.open_write_close(pem_key_file.name, pem_key)
    368 
    369     cmd = '%s -sign %s' % (OPENSSLCRYPTO, pem_key_file.name)
    370     try:
    371         utils.run(cmd,
    372                   stdin=data_file.fo,
    373                   stdout_tee=sig.fo,
    374                   stderr_tee=err.fo)
    375     except:
    376         err.fo.seek(0)
    377         logging.error(err.fo.read())
    378         raise
    379 
    380     sig.fo.seek(0)
    381     sig_data = sig.fo.read()
    382     if not sig_data:
    383         raise error.OwnershipError('Empty signature!')
    384     return sig_data
    385 
    386 
    387 def get_user_policy_key_filename(username):
    388     """Returns the path to the user policy key for the given username.
    389 
    390     @param username: the user whose policy key we want the path to.
    391     @return: absolute path to user's policy key file.
    392     """
    393     return os.path.join(constants.USER_POLICY_DIR,
    394                         cryptohome.get_user_hash(username),
    395                         constants.USER_POLICY_KEY_FILENAME)
    396