Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus, gobject, sys
      6 
      7 import common
      8 from autotest_lib.client.common_lib import error
      9 from autotest_lib.client.common_lib.cros import session_manager
     10 from autotest_lib.client.cros import ownership
     11 
     12 
     13 """Utility class for tests that generate, push and fetch policies.
     14 
     15 As the python bindings for the protobufs used in policies are built as a part
     16 of tests that use them, callers must pass in their location at call time."""
     17 
     18 
     19 def compare_policy_response(proto_binding_location, policy_response,
     20                             owner=None, guests=None, new_users=None,
     21                             roaming=None, whitelist=None):
     22     """Check the contents of |policy_response| against given args.
     23 
     24     Deserializes |policy_response| into a PolicyFetchResponse protobuf,
     25     with an embedded (serialized) PolicyData protobuf that embeds a
     26     (serialized) ChromeDeviceSettingsProto, and checks to see if this
     27     protobuf turducken contains the information passed in.
     28 
     29     @param proto_binding_location: the location of generated python bindings
     30                                    for policy protobufs.
     31     @param policy_response: string serialization of a PolicyData protobuf.
     32     @param owner: string representing the owner's name/account.
     33     @param guests: boolean indicating whether guests should be allowed.
     34     @param new_users: boolean indicating if user pods are on login screen.
     35     @param roaming: boolean indicating whether data roaming is enabled.
     36     @param whitelist: list of accounts that are allowed to log in.
     37 
     38     @return True if |policy_response| has all the provided data, else False.
     39     """
     40     # Pull in protobuf bindings.
     41     sys.path.append(proto_binding_location)
     42     from device_management_backend_pb2 import PolicyFetchResponse
     43     from device_management_backend_pb2 import PolicyData
     44     from chrome_device_policy_pb2 import ChromeDeviceSettingsProto
     45     from chrome_device_policy_pb2 import AllowNewUsersProto
     46     from chrome_device_policy_pb2 import GuestModeEnabledProto
     47     from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto
     48     from chrome_device_policy_pb2 import DataRoamingEnabledProto
     49 
     50     response_proto = PolicyFetchResponse()
     51     response_proto.ParseFromString(policy_response)
     52     ownership.assert_has_policy_data(response_proto)
     53 
     54     data_proto = PolicyData()
     55     data_proto.ParseFromString(response_proto.policy_data)
     56     ownership.assert_has_device_settings(data_proto)
     57     if owner: ownership.assert_username(data_proto, owner)
     58 
     59     settings = ChromeDeviceSettingsProto()
     60     settings.ParseFromString(data_proto.policy_value)
     61     if guests: ownership.assert_guest_setting(settings, guests)
     62     if new_users: ownership.assert_show_users(settings, new_users)
     63     if roaming: ownership.assert_roaming(settings, roaming)
     64     if whitelist:
     65         ownership.assert_new_users(settings, False)
     66         ownership.assert_users_on_whitelist(settings, whitelist)
     67 
     68 
     69 def build_policy_data(proto_binding_location, owner=None, guests=None,
     70                       new_users=None, roaming=None, whitelist=None):
     71     """Generate and serialize a populated device policy protobuffer.
     72 
     73     Creates a PolicyData protobuf, with an embedded
     74     ChromeDeviceSettingsProto, containing the information passed in.
     75 
     76     @param proto_binding_location: the location of generated python bindings
     77                                    for policy protobufs.
     78     @param owner: string representing the owner's name/account.
     79     @param guests: boolean indicating whether guests should be allowed.
     80     @param new_users: boolean indicating if user pods are on login screen.
     81     @param roaming: boolean indicating whether data roaming is enabled.
     82     @param whitelist: list of accounts that are allowed to log in.
     83 
     84     @return serialization of the PolicyData proto that we build.
     85     """
     86     # Pull in protobuf bindings.
     87     sys.path.append(proto_binding_location)
     88     from device_management_backend_pb2 import PolicyData
     89     from chrome_device_policy_pb2 import ChromeDeviceSettingsProto
     90     from chrome_device_policy_pb2 import AllowNewUsersProto
     91     from chrome_device_policy_pb2 import GuestModeEnabledProto
     92     from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto
     93     from chrome_device_policy_pb2 import DataRoamingEnabledProto
     94 
     95     data_proto = PolicyData()
     96     data_proto.policy_type = ownership.POLICY_TYPE
     97     if owner: data_proto.username = owner
     98 
     99     settings = ChromeDeviceSettingsProto()
    100     if guests:
    101         settings.guest_mode_enabled.guest_mode_enabled = guests
    102     if new_users:
    103         settings.show_user_names.show_user_names = new_users
    104     if roaming:
    105         settings.data_roaming_enabled.data_roaming_enabled = roaming
    106     if whitelist:
    107         settings.allow_new_users.allow_new_users = False
    108         for user in whitelist:
    109             settings.user_whitelist.user_whitelist.append(user)
    110 
    111     data_proto.policy_value = settings.SerializeToString()
    112     return data_proto.SerializeToString()
    113 
    114 
    115 def generate_policy(proto_binding_location, key, pubkey, policy, old_key=None):
    116     """Generate and serialize a populated, signed device policy protobuffer.
    117 
    118     Creates a protobuf containing the device policy |policy|, signed with
    119     |key|.  Also includes the public key |pubkey|, signed with |old_key|
    120     if provided.  If not, |pubkey| is signed with |key|.  The protobuf
    121     is serialized to a string and returned.
    122 
    123     @param proto_binding_location: the location of generated python bindings
    124                                    for policy protobufs.
    125     @param key: new policy signing key.
    126     @param pubkey: new public key to be signed and embedded in generated
    127                    PolicyFetchResponse.
    128     @param policy: policy data to be embedded in generated PolicyFetchResponse.
    129     @param old_key: if provided, this implies the generated PolicyFetchRespone
    130                     is intended to represent a key rotation.  pubkey will be
    131                     signed with this key before embedding.
    132 
    133     @return serialization of the PolicyFetchResponse proto that we build.
    134     """
    135     # Pull in protobuf bindings.
    136     sys.path.append(proto_binding_location)
    137     from device_management_backend_pb2 import PolicyFetchResponse
    138 
    139     if old_key == None:
    140         old_key = key
    141     policy_proto = PolicyFetchResponse()
    142     policy_proto.policy_data = policy
    143     policy_proto.policy_data_signature = ownership.sign(key, policy)
    144     policy_proto.new_public_key = pubkey
    145     policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey)
    146     return policy_proto.SerializeToString()
    147 
    148 
    149 def push_policy_and_verify(policy_string, sm):
    150     """Push a device policy to the session manager over DBus.
    151 
    152     The serialized device policy |policy_string| is sent to the session
    153     manager with the StorePolicy DBus call.  Success of the store is
    154     validated by fetching the policy again and comparing.
    155 
    156     @param policy_string: serialized policy to push to the session manager.
    157     @param sm: a connected SessionManagerInterface.
    158 
    159     @raises error.TestFail if policy push failed.
    160     """
    161     listener = session_manager.OwnershipSignalListener(gobject.MainLoop())
    162     listener.listen_for_new_policy()
    163     sm.StorePolicy(dbus.ByteArray(policy_string), byte_arrays=True)
    164     listener.wait_for_signals(desc='Policy push.')
    165 
    166     retrieved_policy = sm.RetrievePolicy(byte_arrays=True)
    167     if retrieved_policy != policy_string:
    168         raise error.TestFail('Policy should not be %s' % retrieved_policy)
    169 
    170 
    171 def get_policy(sm):
    172     """Get a device policy from the session manager over DBus.
    173 
    174     Provided mainly for symmetry with push_policy_and_verify().
    175 
    176     @param sm: a connected SessionManagerInterface.
    177 
    178     @return Serialized PolicyFetchResponse.
    179     """
    180     return sm.RetrievePolicy(byte_arrays=True)
    181