1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import dbus, gobject, sys 6 7 import common 8 from autotest_lib.client.common_lib import error 9 from autotest_lib.client.common_lib.cros import session_manager 10 from autotest_lib.client.cros import ownership 11 12 13 """Utility class for tests that generate, push and fetch policies. 14 15 As the python bindings for the protobufs used in policies are built as a part 16 of tests that use them, callers must pass in their location at call time.""" 17 18 19 def compare_policy_response(proto_binding_location, policy_response, 20 owner=None, guests=None, new_users=None, 21 roaming=None, whitelist=None): 22 """Check the contents of |policy_response| against given args. 23 24 Deserializes |policy_response| into a PolicyFetchResponse protobuf, 25 with an embedded (serialized) PolicyData protobuf that embeds a 26 (serialized) ChromeDeviceSettingsProto, and checks to see if this 27 protobuf turducken contains the information passed in. 28 29 @param proto_binding_location: the location of generated python bindings 30 for policy protobufs. 31 @param policy_response: string serialization of a PolicyData protobuf. 32 @param owner: string representing the owner's name/account. 33 @param guests: boolean indicating whether guests should be allowed. 34 @param new_users: boolean indicating if user pods are on login screen. 35 @param roaming: boolean indicating whether data roaming is enabled. 36 @param whitelist: list of accounts that are allowed to log in. 37 38 @return True if |policy_response| has all the provided data, else False. 39 """ 40 # Pull in protobuf bindings. 41 sys.path.append(proto_binding_location) 42 from device_management_backend_pb2 import PolicyFetchResponse 43 from device_management_backend_pb2 import PolicyData 44 from chrome_device_policy_pb2 import ChromeDeviceSettingsProto 45 from chrome_device_policy_pb2 import AllowNewUsersProto 46 from chrome_device_policy_pb2 import GuestModeEnabledProto 47 from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto 48 from chrome_device_policy_pb2 import DataRoamingEnabledProto 49 50 response_proto = PolicyFetchResponse() 51 response_proto.ParseFromString(policy_response) 52 ownership.assert_has_policy_data(response_proto) 53 54 data_proto = PolicyData() 55 data_proto.ParseFromString(response_proto.policy_data) 56 ownership.assert_has_device_settings(data_proto) 57 if owner: ownership.assert_username(data_proto, owner) 58 59 settings = ChromeDeviceSettingsProto() 60 settings.ParseFromString(data_proto.policy_value) 61 if guests: ownership.assert_guest_setting(settings, guests) 62 if new_users: ownership.assert_show_users(settings, new_users) 63 if roaming: ownership.assert_roaming(settings, roaming) 64 if whitelist: 65 ownership.assert_new_users(settings, False) 66 ownership.assert_users_on_whitelist(settings, whitelist) 67 68 69 def build_policy_data(proto_binding_location, owner=None, guests=None, 70 new_users=None, roaming=None, whitelist=None): 71 """Generate and serialize a populated device policy protobuffer. 72 73 Creates a PolicyData protobuf, with an embedded 74 ChromeDeviceSettingsProto, containing the information passed in. 75 76 @param proto_binding_location: the location of generated python bindings 77 for policy protobufs. 78 @param owner: string representing the owner's name/account. 79 @param guests: boolean indicating whether guests should be allowed. 80 @param new_users: boolean indicating if user pods are on login screen. 81 @param roaming: boolean indicating whether data roaming is enabled. 82 @param whitelist: list of accounts that are allowed to log in. 83 84 @return serialization of the PolicyData proto that we build. 85 """ 86 # Pull in protobuf bindings. 87 sys.path.append(proto_binding_location) 88 from device_management_backend_pb2 import PolicyData 89 from chrome_device_policy_pb2 import ChromeDeviceSettingsProto 90 from chrome_device_policy_pb2 import AllowNewUsersProto 91 from chrome_device_policy_pb2 import GuestModeEnabledProto 92 from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto 93 from chrome_device_policy_pb2 import DataRoamingEnabledProto 94 95 data_proto = PolicyData() 96 data_proto.policy_type = ownership.POLICY_TYPE 97 if owner: data_proto.username = owner 98 99 settings = ChromeDeviceSettingsProto() 100 if guests: 101 settings.guest_mode_enabled.guest_mode_enabled = guests 102 if new_users: 103 settings.show_user_names.show_user_names = new_users 104 if roaming: 105 settings.data_roaming_enabled.data_roaming_enabled = roaming 106 if whitelist: 107 settings.allow_new_users.allow_new_users = False 108 for user in whitelist: 109 settings.user_whitelist.user_whitelist.append(user) 110 111 data_proto.policy_value = settings.SerializeToString() 112 return data_proto.SerializeToString() 113 114 115 def generate_policy(proto_binding_location, key, pubkey, policy, old_key=None): 116 """Generate and serialize a populated, signed device policy protobuffer. 117 118 Creates a protobuf containing the device policy |policy|, signed with 119 |key|. Also includes the public key |pubkey|, signed with |old_key| 120 if provided. If not, |pubkey| is signed with |key|. The protobuf 121 is serialized to a string and returned. 122 123 @param proto_binding_location: the location of generated python bindings 124 for policy protobufs. 125 @param key: new policy signing key. 126 @param pubkey: new public key to be signed and embedded in generated 127 PolicyFetchResponse. 128 @param policy: policy data to be embedded in generated PolicyFetchResponse. 129 @param old_key: if provided, this implies the generated PolicyFetchRespone 130 is intended to represent a key rotation. pubkey will be 131 signed with this key before embedding. 132 133 @return serialization of the PolicyFetchResponse proto that we build. 134 """ 135 # Pull in protobuf bindings. 136 sys.path.append(proto_binding_location) 137 from device_management_backend_pb2 import PolicyFetchResponse 138 139 if old_key == None: 140 old_key = key 141 policy_proto = PolicyFetchResponse() 142 policy_proto.policy_data = policy 143 policy_proto.policy_data_signature = ownership.sign(key, policy) 144 policy_proto.new_public_key = pubkey 145 policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey) 146 return policy_proto.SerializeToString() 147 148 149 def push_policy_and_verify(policy_string, sm): 150 """Push a device policy to the session manager over DBus. 151 152 The serialized device policy |policy_string| is sent to the session 153 manager with the StorePolicy DBus call. Success of the store is 154 validated by fetching the policy again and comparing. 155 156 @param policy_string: serialized policy to push to the session manager. 157 @param sm: a connected SessionManagerInterface. 158 159 @raises error.TestFail if policy push failed. 160 """ 161 listener = session_manager.OwnershipSignalListener(gobject.MainLoop()) 162 listener.listen_for_new_policy() 163 sm.StorePolicy(dbus.ByteArray(policy_string), byte_arrays=True) 164 listener.wait_for_signals(desc='Policy push.') 165 166 retrieved_policy = sm.RetrievePolicy(byte_arrays=True) 167 if retrieved_policy != policy_string: 168 raise error.TestFail('Policy should not be %s' % retrieved_policy) 169 170 171 def get_policy(sm): 172 """Get a device policy from the session manager over DBus. 173 174 Provided mainly for symmetry with push_policy_and_verify(). 175 176 @param sm: a connected SessionManagerInterface. 177 178 @return Serialized PolicyFetchResponse. 179 """ 180 return sm.RetrievePolicy(byte_arrays=True) 181