Home | History | Annotate | Download | only in network_VPNConnect
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 
      6 from autotest_lib.client.bin import test
      7 from autotest_lib.client.common_lib import error
      8 from autotest_lib.client.common_lib import utils
      9 from autotest_lib.client.common_lib.cros import site_eap_certs
     10 from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
     11 from autotest_lib.client.cros import certificate_util
     12 from autotest_lib.client.cros import shill_temporary_profile
     13 from autotest_lib.client.cros import tpm_store
     14 from autotest_lib.client.cros import vpn_server
     15 from autotest_lib.client.cros.networking import shill_proxy
     16 
     17 class network_VPNConnect(test.test):
     18     """The VPN authentication class.
     19 
     20     Starts up a VPN server within a chroot on the other end of a virtual
     21     ethernet pair and attempts a VPN association using shill.
     22 
     23     """
     24     CLIENT_INTERFACE_NAME = 'pseudoethernet0'
     25     SERVER_INTERFACE_NAME = 'serverethernet0'
     26     TEST_PROFILE_NAME = 'testVPN'
     27     CONNECT_TIMEOUT_SECONDS = 15
     28     version = 1
     29     SERVER_ADDRESS = '10.9.8.1'
     30     CLIENT_ADDRESS = '10.9.8.2'
     31     NETWORK_PREFIX = 24
     32 
     33     def get_device(self, interface_name):
     34         """Finds the corresponding Device object for an ethernet
     35         interface with the name |interface_name|.
     36 
     37         @param interface_name string The name of the interface to check.
     38 
     39         @return DBus interface object representing the associated device.
     40 
     41         """
     42         device = self._shill_proxy.find_object('Device',
     43                                                {'Name': interface_name})
     44         if device is None:
     45             raise error.TestFail('Device was not found.')
     46 
     47         return device
     48 
     49 
     50     def find_ethernet_service(self, interface_name):
     51         """Finds the corresponding service object for an ethernet
     52         interface.
     53 
     54         @param interface_name string The name of the associated interface
     55 
     56         @return Service object representing the associated service.
     57 
     58         """
     59         device = self.get_device(interface_name)
     60         device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
     61         return self._shill_proxy.find_object('Service', {'Device': device_path})
     62 
     63 
     64     def configure_static_ip(self, interface_name, address, prefix_len):
     65         """Configures the Static IP parameters for the Ethernet interface
     66         |interface_name| and applies those parameters to the interface by
     67         forcing a re-connect.
     68 
     69         @param interface_name string The name of the associated interface.
     70         @param address string the IP address this interface should have.
     71         @param prefix_len string the IP address prefix for the interface.
     72 
     73         """
     74         service = self.find_ethernet_service(interface_name)
     75         service.SetProperty('StaticIP.Address', address)
     76         service.SetProperty('StaticIP.Prefixlen', prefix_len)
     77         service.Disconnect()
     78         service.Connect()
     79 
     80 
     81     def get_vpn_server(self):
     82         """Returns a VPN server instance."""
     83         if self._vpn_type.startswith('l2tpipsec-psk'):
     84             return vpn_server.L2TPIPSecVPNServer(
     85                 'psk',
     86                 self.SERVER_INTERFACE_NAME,
     87                 self.SERVER_ADDRESS,
     88                 self.NETWORK_PREFIX,
     89                 perform_xauth_authentication = 'xauth' in self._vpn_type,
     90                 local_ip_is_public_ip = 'evil' in self._vpn_type)
     91         elif self._vpn_type.startswith('l2tpipsec-cert'):
     92             return vpn_server.L2TPIPSecVPNServer('cert',
     93                                                  self.SERVER_INTERFACE_NAME,
     94                                                  self.SERVER_ADDRESS,
     95                                                  self.NETWORK_PREFIX)
     96         elif self._vpn_type.startswith('openvpn'):
     97             return vpn_server.OpenVPNServer(self.SERVER_INTERFACE_NAME,
     98                                             self.SERVER_ADDRESS,
     99                                             self.NETWORK_PREFIX,
    100                                             'user_pass' in self._vpn_type)
    101         else:
    102             raise error.TestFail('Unknown vpn server type %s' % self._vpn_type)
    103 
    104 
    105     def get_vpn_client_properties(self, tpm):
    106         """Returns VPN configuration properties.
    107 
    108         @param tpm object TPM store instance to add credentials if necessary.
    109 
    110         """
    111         if self._vpn_type.startswith('l2tpipsec-psk'):
    112             params = {
    113                 'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET,
    114                 'L2TPIPsec.PSK':
    115                         vpn_server.L2TPIPSecVPNServer.IPSEC_PRESHARED_KEY,
    116                 'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER,
    117                 'Name': 'test-vpn-l2tp-psk',
    118                 'Provider.Host': self.SERVER_ADDRESS,
    119                 'Provider.Type': 'l2tpipsec',
    120                 'Type': 'vpn',
    121                 'VPN.Domain': 'test-vpn-psk-domain'
    122             }
    123             if 'xauth' in self._vpn_type:
    124                 if 'incorrect_user' in self._vpn_type:
    125                     params['L2TPIPsec.XauthUser'] = 'wrong_user'
    126                     params['L2TPIPsec.XauthPassword'] = 'wrong_password'
    127                 elif 'incorrect_missing_user' not in self._vpn_type:
    128                     params['L2TPIPsec.XauthUser'] = (
    129                             vpn_server.L2TPIPSecVPNServer.XAUTH_USER)
    130                     params['L2TPIPsec.XauthPassword'] = (
    131                             vpn_server.L2TPIPSecVPNServer.XAUTH_PASSWORD)
    132             return params
    133         elif self._vpn_type == 'l2tpipsec-cert':
    134             tpm.install_certificate(site_eap_certs.client_cert_1,
    135                                     site_eap_certs.cert_1_tpm_key_id)
    136             tpm.install_private_key(site_eap_certs.client_private_key_1,
    137                                     site_eap_certs.cert_1_tpm_key_id)
    138             return {
    139                 'L2TPIPsec.CACertPEM': [ site_eap_certs.ca_cert_1 ],
    140                 'L2TPIPsec.ClientCertID': site_eap_certs.cert_1_tpm_key_id,
    141                 'L2TPIPsec.ClientCertSlot': tpm.SLOT_ID,
    142                 'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER,
    143                 'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET,
    144                 'L2TPIPsec.PIN': tpm.PIN,
    145                 'Name': 'test-vpn-l2tp-cert',
    146                 'Provider.Host': self.SERVER_ADDRESS,
    147                 'Provider.Type': 'l2tpipsec',
    148                 'Type': 'vpn',
    149                 'VPN.Domain': 'test-vpn-psk-domain'
    150             }
    151         elif self._vpn_type.startswith('openvpn'):
    152             tpm.install_certificate(site_eap_certs.client_cert_1,
    153                                     site_eap_certs.cert_1_tpm_key_id)
    154             tpm.install_private_key(site_eap_certs.client_private_key_1,
    155                                     site_eap_certs.cert_1_tpm_key_id)
    156             params = {
    157                 'Name': 'test-vpn-openvpn',
    158                 'Provider.Host': self.SERVER_ADDRESS,
    159                 'Provider.Type': 'openvpn',
    160                 'Type': 'vpn',
    161                 'VPN.Domain': 'test-openvpn-domain',
    162                 'OpenVPN.CACertPEM': [ site_eap_certs.ca_cert_1 ],
    163                 'OpenVPN.Pkcs11.ID': site_eap_certs.cert_1_tpm_key_id,
    164                 'OpenVPN.Pkcs11.PIN': tpm.PIN,
    165                 'OpenVPN.RemoteCertEKU': 'TLS Web Server Authentication',
    166                 'OpenVPN.Verb': '5'
    167             }
    168             if 'user_pass' in self._vpn_type:
    169                 params['OpenVPN.User'] = vpn_server.OpenVPNServer.USERNAME
    170                 params['OpenVPN.Password'] = vpn_server.OpenVPNServer.PASSWORD
    171             if 'cert_verify' in self._vpn_type:
    172                 ca = certificate_util.PEMCertificate(site_eap_certs.ca_cert_1)
    173                 if 'incorrect_hash' in self._vpn_type:
    174                     bogus_hash = ':'.join(['00'] * 20)
    175                     params['OpenVPN.VerifyHash'] = bogus_hash
    176                 else:
    177                     params['OpenVPN.VerifyHash'] = ca.fingerprint
    178                 server = certificate_util.PEMCertificate(
    179                         site_eap_certs.server_cert_1)
    180                 if 'incorrect_subject' in self._vpn_type:
    181                     params['OpenVPN.VerifyX509Name'] = 'bogus subject name'
    182                 elif 'incorrect_cn' in self._vpn_type:
    183                     params['OpenVPN.VerifyX509Name'] = 'bogus cn'
    184                     params['OpenVPN.VerifyX509Type'] = 'name'
    185                 elif 'cn_only' in self._vpn_type:
    186                     params['OpenVPN.VerifyX509Name'] = server.subject_dict['CN']
    187                     params['OpenVPN.VerifyX509Type'] = 'name'
    188                 else:
    189                     # This is the form OpenVPN expects.
    190                     params['OpenVPN.VerifyX509Name'] = ', '.join(server.subject)
    191             return params
    192         else:
    193             raise error.TestFail('Unknown vpn client type %s' % self._vpn_type)
    194 
    195 
    196     def connect_vpn(self):
    197         """Connects the client to the VPN server."""
    198         proxy = self._shill_proxy
    199         with tpm_store.TPMStore() as tpm:
    200             service = proxy.get_service(self.get_vpn_client_properties(tpm))
    201             service.Connect()
    202             result = proxy.wait_for_property_in(service,
    203                                                 proxy.SERVICE_PROPERTY_STATE,
    204                                                 ('ready', 'online'),
    205                                                 self.CONNECT_TIMEOUT_SECONDS)
    206         (successful, _, _) = result
    207         if not successful and self._expect_success:
    208             raise error.TestFail('VPN connection failed')
    209         if successful and not self._expect_success:
    210             raise error.TestFail('VPN connection suceeded '
    211                                  'when it should have failed')
    212         return successful
    213 
    214 
    215     def run_once(self, vpn_types=[]):
    216         """Test main loop."""
    217         self._shill_proxy = shill_proxy.ShillProxy()
    218         for vpn_type in vpn_types:
    219             self.run_vpn_test(vpn_type)
    220 
    221 
    222     def run_vpn_test(self, vpn_type):
    223         """Run a vpn test of |vpn_type|.
    224 
    225         @param vpn_type string type of VPN test to run.
    226 
    227         """
    228         manager = self._shill_proxy.manager
    229         server_address_and_prefix = '%s/%d' % (self.SERVER_ADDRESS,
    230                                                self.NETWORK_PREFIX)
    231         client_address_and_prefix = '%s/%d' % (self.CLIENT_ADDRESS,
    232                                                self.NETWORK_PREFIX)
    233         self._vpn_type = vpn_type
    234         self._expect_success = 'incorrect' not in vpn_type
    235 
    236         with shill_temporary_profile.ShillTemporaryProfile(
    237                 manager, profile_name=self.TEST_PROFILE_NAME):
    238             with virtual_ethernet_pair.VirtualEthernetPair(
    239                     interface_name=self.SERVER_INTERFACE_NAME,
    240                     peer_interface_name=self.CLIENT_INTERFACE_NAME,
    241                     peer_interface_ip=client_address_and_prefix,
    242                     interface_ip=server_address_and_prefix,
    243                     ignore_shutdown_errors=True) as ethernet_pair:
    244                 if not ethernet_pair.is_healthy:
    245                     raise error.TestFail('Virtual ethernet pair failed.')
    246 
    247                 # When shill finds this ethernet interface, it will reset
    248                 # its IP address and start a DHCP client.  We must configure
    249                 # the static IP address through shill.
    250                 self.configure_static_ip(self.CLIENT_INTERFACE_NAME,
    251                                          self.CLIENT_ADDRESS,
    252                                          self.NETWORK_PREFIX)
    253 
    254                 with self.get_vpn_server() as server:
    255                     if self.connect_vpn():
    256                         res = utils.ping(server.SERVER_IP_ADDRESS, tries=3,
    257                                          user='chronos')
    258                         if res != 0:
    259                             raise error.TestFail('Error pinging server IP')
    260 
    261                         # IPv6 should be blackholed, so ping returns
    262                         # "other error"
    263                         res = utils.ping("2001:db8::1", tries=1, user='chronos')
    264                         if res != 2:
    265                             raise error.TestFail('IPv6 ping should '
    266                                                  'have aborted')
    267