Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus, gobject, os, sys
      6 
      7 import common
      8 from autotest_lib.client.common_lib import error
      9 from autotest_lib.client.common_lib.cros import session_manager
     10 from autotest_lib.client.cros import ownership
     11 
     12 """Utility class for tests that generate, push and fetch policies.
     13 
     14 As the python bindings for the protobufs used in policies are built as a part
     15 of tests that use them, callers must pass in their location at call time."""
     16 
     17 
     18 def install_protobufs(autodir, job):
     19     """Installs policy protobuf dependencies and set import path.
     20 
     21     After calling this, you can simply import any policy pb2.py file directly,
     22     e.g. import chrome_device_policy_pb2.
     23 
     24     @param autodir: Autotest directory (usually the caller's self.autodir).
     25     @param job: Job instance (usually the caller's self.job).
     26     """
     27     # TODO(crbug.com/807950): Change the installation process so that policy
     28     #                         proto imports can be moved to the top.
     29     dep = 'policy_protos'
     30     dep_dir = os.path.join(autodir, 'deps', dep)
     31     job.install_pkg(dep, 'dep', dep_dir)
     32     sys.path.append(dep_dir)
     33 
     34 
     35 def compare_policy_response(policy_response, owner=None, guests=None,
     36                             new_users=None, roaming=None, whitelist=None):
     37     """Check the contents of |policy_response| against given args.
     38 
     39     Deserializes |policy_response| into a PolicyFetchResponse protobuf,
     40     with an embedded (serialized) PolicyData protobuf that embeds a
     41     (serialized) ChromeDeviceSettingsProto, and checks to see if this
     42     protobuf turducken contains the information passed in.
     43 
     44     @param policy_response: string serialization of a PolicyData protobuf.
     45     @param owner: string representing the owner's name/account.
     46     @param guests: boolean indicating whether guests should be allowed.
     47     @param new_users: boolean indicating if user pods are on login screen.
     48     @param roaming: boolean indicating whether data roaming is enabled.
     49     @param whitelist: list of accounts that are allowed to log in.
     50 
     51     @return True if |policy_response| has all the provided data, else False.
     52     """
     53     import chrome_device_policy_pb2
     54     import device_management_backend_pb2
     55 
     56     response_proto = device_management_backend_pb2.PolicyFetchResponse()
     57     response_proto.ParseFromString(policy_response)
     58     ownership.assert_has_policy_data(response_proto)
     59 
     60     data_proto = device_management_backend_pb2.PolicyData()
     61     data_proto.ParseFromString(response_proto.policy_data)
     62     ownership.assert_has_device_settings(data_proto)
     63     if owner: ownership.assert_username(data_proto, owner)
     64 
     65     settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto()
     66     settings.ParseFromString(data_proto.policy_value)
     67     if guests: ownership.assert_guest_setting(settings, guests)
     68     if new_users: ownership.assert_show_users(settings, new_users)
     69     if roaming: ownership.assert_roaming(settings, roaming)
     70     if whitelist:
     71         ownership.assert_new_users(settings, False)
     72         ownership.assert_users_on_whitelist(settings, whitelist)
     73 
     74 
     75 def build_policy_data(owner=None, guests=None, new_users=None, roaming=None,
     76                       whitelist=None):
     77     """Generate and serialize a populated device policy protobuffer.
     78 
     79     Creates a PolicyData protobuf, with an embedded
     80     ChromeDeviceSettingsProto, containing the information passed in.
     81 
     82     @param owner: string representing the owner's name/account.
     83     @param guests: boolean indicating whether guests should be allowed.
     84     @param new_users: boolean indicating if user pods are on login screen.
     85     @param roaming: boolean indicating whether data roaming is enabled.
     86     @param whitelist: list of accounts that are allowed to log in.
     87 
     88     @return serialization of the PolicyData proto that we build.
     89     """
     90     import chrome_device_policy_pb2
     91     import device_management_backend_pb2
     92 
     93     data_proto = device_management_backend_pb2.PolicyData()
     94     data_proto.policy_type = ownership.POLICY_TYPE
     95     if owner: data_proto.username = owner
     96 
     97     settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto()
     98     if guests:
     99         settings.guest_mode_enabled.guest_mode_enabled = guests
    100     if new_users:
    101         settings.show_user_names.show_user_names = new_users
    102     if roaming:
    103         settings.data_roaming_enabled.data_roaming_enabled = roaming
    104     if whitelist:
    105         settings.allow_new_users.allow_new_users = False
    106         for user in whitelist:
    107             settings.user_whitelist.user_whitelist.append(user)
    108 
    109     data_proto.policy_value = settings.SerializeToString()
    110     return data_proto.SerializeToString()
    111 
    112 
    113 def generate_policy(key, pubkey, policy, old_key=None):
    114     """Generate and serialize a populated, signed device policy protobuffer.
    115 
    116     Creates a protobuf containing the device policy |policy|, signed with
    117     |key|.  Also includes the public key |pubkey|, signed with |old_key|
    118     if provided.  If not, |pubkey| is signed with |key|.  The protobuf
    119     is serialized to a string and returned.
    120 
    121     @param key: new policy signing key.
    122     @param pubkey: new public key to be signed and embedded in generated
    123                    PolicyFetchResponse.
    124     @param policy: policy data to be embedded in generated PolicyFetchResponse.
    125     @param old_key: if provided, this implies the generated PolicyFetchRespone
    126                     is intended to represent a key rotation.  pubkey will be
    127                     signed with this key before embedding.
    128 
    129     @return serialization of the PolicyFetchResponse proto that we build.
    130     """
    131     import device_management_backend_pb2
    132 
    133     if old_key == None:
    134         old_key = key
    135     policy_proto = device_management_backend_pb2.PolicyFetchResponse()
    136     policy_proto.policy_data = policy
    137     policy_proto.policy_data_signature = ownership.sign(key, policy)
    138     policy_proto.new_public_key = pubkey
    139     policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey)
    140     return policy_proto.SerializeToString()
    141 
    142 
    143 def push_policy_and_verify(policy_string, sm):
    144     """Push a device policy to the session manager over DBus.
    145 
    146     The serialized device policy |policy_string| is sent to the session
    147     manager with the StorePolicyEx DBus call.  Success of the store is
    148     validated by fetching the policy again and comparing.
    149 
    150     @param policy_string: serialized policy to push to the session manager.
    151     @param sm: a connected SessionManagerInterface.
    152 
    153     @raises error.TestFail if policy push failed.
    154     """
    155     listener = session_manager.OwnershipSignalListener(gobject.MainLoop())
    156     listener.listen_for_new_policy()
    157     descriptor = session_manager.make_device_policy_descriptor()
    158     sm.StorePolicyEx(descriptor,
    159                      dbus.ByteArray(policy_string), byte_arrays=True)
    160     listener.wait_for_signals(desc='Policy push.')
    161 
    162     retrieved_policy = sm.RetrievePolicyEx(descriptor, byte_arrays=True)
    163     if retrieved_policy != policy_string:
    164         raise error.TestFail('Policy should not be %s' % retrieved_policy)
    165 
    166 
    167 def get_policy(sm):
    168     """Get a device policy from the session manager over DBus.
    169 
    170     Provided mainly for symmetry with push_policy_and_verify().
    171 
    172     @param sm: a connected SessionManagerInterface.
    173 
    174     @return Serialized PolicyFetchResponse.
    175     """
    176     return sm.RetrievePolicyEx(session_manager.make_device_policy_descriptor(),
    177                                byte_arrays=True)
    178