Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging, os, shutil, tempfile
      6 
      7 import common, constants, cryptohome
      8 from autotest_lib.client.bin import utils
      9 from autotest_lib.client.common_lib import autotemp, error
     10 from autotest_lib.client.cros import cros_ui
     11 
     12 
     13 PK12UTIL = 'pk12util'
     14 CERTUTIL = 'certutil'
     15 OPENSSLP12 = 'openssl pkcs12'
     16 OPENSSLX509 = 'openssl x509'
     17 OPENSSLRSA = 'openssl rsa'
     18 OPENSSLREQ = 'openssl req'
     19 OPENSSLCRYPTO = 'openssl sha1'
     20 
     21 TESTUSER = 'ownership_test (at] chromium.org'
     22 TESTPASS = 'testme'
     23 
     24 
     25 class OwnershipError(error.TestError):
     26     """Generic error for ownership-related failures."""
     27     pass
     28 
     29 
     30 class scoped_tempfile(object):
     31     """A wrapper that provides scoped semantics for temporary files.
     32 
     33     Providing a file path causes the scoped_tempfile to take ownership of the
     34     file at the provided path.  The file at the path will be deleted when this
     35     object goes out of scope.  If no path is provided, then a temporary file
     36     object will be created for the lifetime of the scoped_tempfile
     37 
     38     autotemp.tempfile objects don't seem to play nicely with being
     39     used in system commands, so they can't be used for my purposes.
     40     """
     41 
     42     tempdir = autotemp.tempdir(unique_id='ownership')
     43 
     44     def __init__(self, name=None):
     45         self.name = name
     46         if not self.name:
     47             self.fo = tempfile.TemporaryFile()
     48 
     49 
     50     def __del__(self):
     51         if self.name:
     52             if os.path.exists(self.name):
     53                 os.unlink(self.name)
     54         else:
     55             self.fo.close()  # Will destroy the underlying tempfile
     56 
     57 
     58 def system_output_on_fail(cmd):
     59     """Run a |cmd|, capturing output and logging it only on error.
     60 
     61     @param cmd: the command to run.
     62     """
     63     output = None
     64     try:
     65         output = utils.system_output(cmd)
     66     except:
     67         logging.error(output)
     68         raise
     69 
     70 
     71 def __unlink(filename):
     72     """unlink a file, but log OSError and IOError instead of raising.
     73 
     74     This allows unlinking files that don't exist safely.
     75 
     76     @param filename: the file to attempt to unlink.
     77     """
     78     try:
     79         os.unlink(filename)
     80     except (IOError, OSError) as error:
     81         logging.info(error)
     82 
     83 
     84 def restart_ui_to_clear_ownership_files():
     85     """Remove on-disk state related to device ownership.
     86 
     87     The UI must be stopped while we do this, or the session_manager will
     88     write the policy and key files out again.
     89     """
     90     cros_ui.stop(allow_fail=not cros_ui.is_up())
     91     clear_ownership_files_no_restart()
     92     cros_ui.start()
     93 
     94 
     95 def clear_ownership_files_no_restart():
     96     """Remove on-disk state related to device ownership.
     97 
     98     The UI must be stopped while we do this, or the session_manager will
     99     write the policy and key files out again.
    100     """
    101     if cros_ui.is_up():
    102         raise error.TestError("Tried to clear ownership with UI running.")
    103     __unlink(constants.OWNER_KEY_FILE)
    104     __unlink(constants.SIGNED_POLICY_FILE)
    105     __unlink(os.path.join(constants.USER_DATA_DIR, 'Local State'))
    106 
    107 
    108 def fake_ownership():
    109     """Fake ownership by generating the necessary magic files."""
    110     # Determine the module directory.
    111     dirname = os.path.dirname(__file__)
    112     mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
    113     mock_signedpolicyfile = os.path.join(dirname,
    114                                          constants.MOCK_OWNER_POLICY)
    115     utils.open_write_close(constants.OWNER_KEY_FILE,
    116                            cert_extract_pubkey_der(mock_certfile))
    117     shutil.copy(mock_signedpolicyfile,
    118                 constants.SIGNED_POLICY_FILE)
    119 
    120 
    121 POLICY_TYPE = 'google/chromeos/device'
    122 
    123 
    124 def assert_has_policy_data(response_proto):
    125     """Assert that given protobuf has a policy_data field.
    126 
    127     @param response_proto: a PolicyFetchResponse protobuf.
    128     @raises OwnershipError on failure.
    129     """
    130     if not response_proto.HasField("policy_data"):
    131         raise OwnershipError('Malformatted response.')
    132 
    133 
    134 def assert_has_device_settings(data_proto):
    135     """Assert that given protobuf is a policy with device settings in it.
    136 
    137     @param data_proto: a PolicyData protobuf.
    138     @raises OwnershipError if this isn't CrOS policy, or has no settings inside.
    139     """
    140     if (not data_proto.HasField("policy_type") or
    141         data_proto.policy_type != POLICY_TYPE or
    142         not data_proto.HasField("policy_value")):
    143         raise OwnershipError('Malformatted response.')
    144 
    145 
    146 def assert_username(data_proto, username):
    147     """Assert that given protobuf is a policy associated with the given user.
    148 
    149     @param data_proto: a PolicyData protobuf.
    150     @param username: the username to check for
    151     @raises OwnershipError if data_proto isn't associated with username
    152     """
    153     if data_proto.username != username:
    154         raise OwnershipError('Incorrect username.')
    155 
    156 
    157 def assert_guest_setting(settings, guests):
    158     """Assert that given protobuf has given guest-related settings.
    159 
    160     @param settings: a ChromeDeviceSettingsProto protobuf.
    161     @param guests: boolean indicating whether guests are allowed to sign in.
    162     @raises OwnershipError if settings doesn't enforce the provided setting.
    163     """
    164     if not settings.HasField("guest_mode_enabled"):
    165         raise OwnershipError('No guest mode setting protobuf.')
    166     if not settings.guest_mode_enabled.HasField("guest_mode_enabled"):
    167         raise OwnershipError('No guest mode setting.')
    168     if settings.guest_mode_enabled.guest_mode_enabled != guests:
    169         raise OwnershipError('Incorrect guest mode setting.')
    170 
    171 
    172 def assert_show_users(settings, show_users):
    173     """Assert that given protobuf has given user-avatar-showing settings.
    174 
    175     @param settings: a ChromeDeviceSettingsProto protobuf.
    176     @param show_users: boolean indicating whether avatars are shown on sign in.
    177     @raises OwnershipError if settings doesn't enforce the provided setting.
    178     """
    179     if not settings.HasField("show_user_names"):
    180         raise OwnershipError('No show users setting protobuf.')
    181     if not settings.show_user_names.HasField("show_user_names"):
    182         raise OwnershipError('No show users setting.')
    183     if settings.show_user_names.show_user_names != show_users:
    184         raise OwnershipError('Incorrect show users setting.')
    185 
    186 
    187 def assert_roaming(settings, roaming):
    188     """Assert that given protobuf has given roaming settings.
    189 
    190     @param settings: a ChromeDeviceSettingsProto protobuf.
    191     @param roaming: boolean indicating whether roaming is allowed.
    192     @raises OwnershipError if settings doesn't enforce the provided setting.
    193     """
    194     if not settings.HasField("data_roaming_enabled"):
    195         raise OwnershipError('No roaming setting protobuf.')
    196     if not settings.data_roaming_enabled.HasField("data_roaming_enabled"):
    197         raise OwnershipError('No roaming setting.')
    198     if settings.data_roaming_enabled.data_roaming_enabled != roaming:
    199         raise OwnershipError('Incorrect roaming setting.')
    200 
    201 
    202 def assert_new_users(settings, new_users):
    203     """Assert that given protobuf has given new user settings.
    204 
    205     @param settings: a ChromeDeviceSettingsProto protobuf.
    206     @param new_users: boolean indicating whether adding users is allowed.
    207     @raises OwnershipError if settings doesn't enforce the provided setting.
    208     """
    209     if not settings.HasField("allow_new_users"):
    210         raise OwnershipError('No allow new users setting protobuf.')
    211     if not settings.allow_new_users.HasField("allow_new_users"):
    212         raise OwnershipError('No allow new users setting.')
    213     if settings.allow_new_users.allow_new_users != new_users:
    214         raise OwnershipError('Incorrect allow new users setting.')
    215 
    216 
    217 def assert_users_on_whitelist(settings, users):
    218     """Assert that given protobuf has given users on the whitelist.
    219 
    220     @param settings: a ChromeDeviceSettingsProto protobuf.
    221     @param users: iterable containing usernames that should be on whitelist.
    222     @raises OwnershipError if settings doesn't enforce the provided setting.
    223     """
    224     if settings.HasField("user_whitelist"):
    225         for user in users:
    226             if user not in settings.user_whitelist.user_whitelist:
    227                 raise OwnershipError(user + ' not whitelisted.')
    228     else:
    229         raise OwnershipError('No user whitelist.')
    230 
    231 
    232 def assert_proxy_settings(settings, proxies):
    233     """Assert that given protobuf has given proxy settings.
    234 
    235     @param settings: a ChromeDeviceSettingsProto protobuf.
    236     @param proxies: dict { 'proxy_mode': <mode string> }
    237     @raises OwnershipError if settings doesn't enforce the provided setting.
    238     """
    239     if not settings.HasField("device_proxy_settings"):
    240         raise OwnershipError('No proxy settings protobuf.')
    241     if not settings.device_proxy_settings.HasField("proxy_mode"):
    242         raise OwnershipError('No proxy_mode setting.')
    243     if settings.device_proxy_settings.proxy_mode != proxies['proxy_mode']:
    244         raise OwnershipError('Incorrect proxies: %s' % proxies)
    245 
    246 
    247 def __user_nssdb(user):
    248     """Returns the path to the NSSDB for the provided user.
    249 
    250     @param user: the user whose NSSDB the caller wants.
    251     @return: absolute path to user's NSSDB.
    252     """
    253     return os.path.join(cryptohome.user_path(user), '.pki', 'nssdb')
    254 
    255 
    256 def use_known_ownerkeys(user):
    257     """Sets the system up to use a well-known keypair for owner operations.
    258 
    259     Assuming the appropriate cryptohome is already mounted, configures the
    260     device to accept policies signed with the checked-in 'mock' owner key.
    261 
    262     @param user: the user whose NSSDB should be populated with key material.
    263     """
    264     dirname = os.path.dirname(__file__)
    265     mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY)
    266     mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
    267     push_to_nss(mock_keyfile, mock_certfile, __user_nssdb(user))
    268     utils.open_write_close(constants.OWNER_KEY_FILE,
    269                            cert_extract_pubkey_der(mock_certfile))
    270 
    271 
    272 def known_privkey():
    273     """Returns the mock owner private key in PEM format.
    274 
    275     @return: mock owner private key in PEM format.
    276     """
    277     dirname = os.path.dirname(__file__)
    278     return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY))
    279 
    280 
    281 def known_pubkey():
    282     """Returns the mock owner public key in DER format.
    283 
    284     @return: mock owner public key in DER format.
    285     """
    286     dirname = os.path.dirname(__file__)
    287     return cert_extract_pubkey_der(os.path.join(dirname,
    288                                                 constants.MOCK_OWNER_CERT))
    289 
    290 
    291 def pairgen():
    292     """Generate a self-signed cert and associated private key.
    293 
    294     Generates a self-signed X509 certificate and the associated private key.
    295     The key is 2048 bits.  The generated material is stored in PEM format
    296     and the paths to the two files are returned.
    297 
    298     The caller is responsible for cleaning up these files.
    299 
    300     @return: (/path/to/private_key, /path/to/self-signed_cert)
    301     """
    302     keyfile = scoped_tempfile.tempdir.name + '/private.key'
    303     certfile = scoped_tempfile.tempdir.name + '/cert.pem'
    304     cmd = '%s -x509 -subj %s -newkey rsa:2048 -nodes -keyout %s -out %s' % (
    305         OPENSSLREQ, '/CN=me', keyfile, certfile)
    306     system_output_on_fail(cmd)
    307     return (keyfile, certfile)
    308 
    309 
    310 def pairgen_as_data():
    311     """Generates keypair, returns keys as data.
    312 
    313     Generates a fresh owner keypair and then passes back the
    314     PEM-encoded private key and the DER-encoded public key.
    315 
    316     @return: (PEM-encoded private key, DER-encoded public key)
    317     """
    318     (keypath, certpath) = pairgen()
    319     keyfile = scoped_tempfile(keypath)
    320     certfile = scoped_tempfile(certpath)
    321     return (utils.read_file(keyfile.name),
    322             cert_extract_pubkey_der(certfile.name))
    323 
    324 
    325 def push_to_nss(keyfile, certfile, nssdb):
    326     """Takes a pre-generated key pair and pushes them to an NSS DB.
    327 
    328     Given paths to a private key and cert in PEM format, stores the pair
    329     in the provided nssdb.
    330 
    331     @param keyfile: path to PEM-formatted private key file.
    332     @param certfile: path to PEM-formatted cert file for associated public key.
    333     @param nssdb: path to NSSDB to be populated with the provided keys.
    334     """
    335     for_push = scoped_tempfile(scoped_tempfile.tempdir.name + '/for_push.p12')
    336     cmd = '%s -export -in %s -inkey %s -out %s ' % (
    337         OPENSSLP12, certfile, keyfile, for_push.name)
    338     cmd += '-passin pass: -passout pass:'
    339     system_output_on_fail(cmd)
    340     cmd = '%s -d "sql:%s" -i %s -W ""' % (PK12UTIL,
    341                                           nssdb,
    342                                           for_push.name)
    343     system_output_on_fail(cmd)
    344 
    345 
    346 def cert_extract_pubkey_der(pem):
    347     """Given a PEM-formatted cert, extracts the public key in DER format.
    348 
    349     Pass in an X509 certificate in PEM format, and you'll get back the
    350     DER-formatted public key as a string.
    351 
    352     @param pem: path to a PEM-formatted cert file.
    353     @return: DER-encoded public key from cert, as a string.
    354     """
    355     outfile = scoped_tempfile(scoped_tempfile.tempdir.name + '/pubkey.der')
    356     cmd = '%s -in %s -pubkey -noout ' % (OPENSSLX509, pem)
    357     cmd += '| %s -outform DER -pubin -out %s' % (OPENSSLRSA,
    358                                                  outfile.name)
    359     system_output_on_fail(cmd)
    360     der = utils.read_file(outfile.name)
    361     return der
    362 
    363 
    364 def sign(pem_key, data):
    365     """Signs |data| with key from |pem_key|, returns signature.
    366 
    367     Using the PEM-formatted private key in |pem_key|, generates an
    368     RSA-with-SHA1 signature over |data| and returns the signature in
    369     a string.
    370 
    371     @param pem_key: PEM-formatted private key, as a string.
    372     @param data: data to be signed.
    373     @return: signature as a string.
    374     """
    375     sig = scoped_tempfile()
    376     err = scoped_tempfile()
    377     data_file = scoped_tempfile()
    378     data_file.fo.write(data)
    379     data_file.fo.seek(0)
    380 
    381     pem_key_file = scoped_tempfile(scoped_tempfile.tempdir.name + '/pkey.pem')
    382     utils.open_write_close(pem_key_file.name, pem_key)
    383 
    384     cmd = '%s -sign %s' % (OPENSSLCRYPTO, pem_key_file.name)
    385     try:
    386         utils.run(cmd,
    387                   stdin=data_file.fo,
    388                   stdout_tee=sig.fo,
    389                   stderr_tee=err.fo)
    390     except:
    391         err.fo.seek(0)
    392         logging.error(err.fo.read())
    393         raise
    394 
    395     sig.fo.seek(0)
    396     sig_data = sig.fo.read()
    397     if not sig_data:
    398         raise error.OwnershipError('Empty signature!')
    399     return sig_data
    400 
    401 
    402 def get_user_policy_key_filename(username):
    403     """Returns the path to the user policy key for the given username.
    404 
    405     @param username: the user whose policy key we want the path to.
    406     @return: absolute path to user's policy key file.
    407     """
    408     return os.path.join(constants.USER_POLICY_DIR,
    409                         cryptohome.get_user_hash(username),
    410                         constants.USER_POLICY_KEY_FILENAME)
    411