Home | History | Annotate | Download | only in network_8021xWiredAuthentication
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import time
      7 
      8 from autotest_lib.client.bin import test
      9 from autotest_lib.client.common_lib import error
     10 from autotest_lib.client.common_lib.cros import site_eap_certs
     11 from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
     12 from autotest_lib.client.cros import hostapd_server
     13 from autotest_lib.client.cros import shill_temporary_profile
     14 from autotest_lib.client.cros.networking import shill_proxy
     15 
     16 class network_8021xWiredAuthentication(test.test):
     17     """The 802.1x EAP wired authentication class.
     18 
     19     Runs hostapd on one side of an ethernet pair, and shill on the other.
     20     Configures the Ethernet service with 802.1x credentials and ensures
     21     that when shill detects an EAP authenticator, it is successful in
     22     using its credentials to gain access.
     23 
     24     """
     25     INTERFACE_NAME = 'pseudoethernet0'
     26     AUTHENTICATION_FLAG = 'EapAuthenticationCompleted'
     27     TEST_PROFILE_NAME = 'test1x'
     28     AUTHENTICATION_TIMEOUT = 10
     29     version = 1
     30 
     31     def get_device(self, interface_name):
     32         """Finds the corresponding Device object for an ethernet
     33         interface with the name |interface_name|.
     34 
     35         @param interface_name string The name of the interface to check.
     36 
     37         @return DBus interface object representing the associated device.
     38 
     39         """
     40         device = self._shill_proxy.find_object('Device',
     41                                                {'Name': interface_name})
     42         if device is None:
     43             raise error.TestFail('Device was not found.')
     44 
     45         return device
     46 
     47 
     48     def get_authenticated_flag(self, interface_name):
     49         """Checks whether |interface_name| has successfully negotiated
     50         802.1x.
     51 
     52         @param interface_name string The name of the interface to check.
     53 
     54         @return True if the authenticated flag is set, False otherwise.
     55 
     56         """
     57         device = self.get_device(interface_name)
     58         device_properties = device.GetProperties(utf8_strings=True)
     59         logging.info('Device properties are %r', device_properties)
     60         return shill_proxy.ShillProxy.dbus2primitive(
     61                 device_properties[self.AUTHENTICATION_FLAG])
     62 
     63 
     64     def wait_for_authentication(self, interface_name):
     65         """Wait for |interface_name| to get to enter authentication state.
     66 
     67         @param interface_name string The name of the interface to check.
     68 
     69         """
     70         device = self.get_device(interface_name)
     71         result = self._shill_proxy.wait_for_property_in(
     72                          device,
     73                          self.AUTHENTICATION_FLAG,
     74                          (True,),
     75                          self.AUTHENTICATION_TIMEOUT)
     76         (successful, _, _) = result
     77         return successful
     78 
     79 
     80     def configure_credentials(self, interface_name):
     81         """Adds authentication properties to the Ethernet EAP service.
     82 
     83         @param interface_name string The name of the associated interface
     84 
     85         """
     86         service = self._shill_proxy.configure_service({
     87             'Type': 'etherneteap',
     88             'EAP.EAP': hostapd_server.HostapdServer.EAP_TYPE,
     89             'EAP.InnerEAP': 'auth=%s' % hostapd_server.HostapdServer.EAP_PHASE2,
     90             'EAP.Identity': hostapd_server.HostapdServer.EAP_USERNAME,
     91             'EAP.Password': hostapd_server.HostapdServer.EAP_PASSWORD,
     92             'EAP.CACertPEM': [ site_eap_certs.ca_cert_1 ]
     93         })
     94 
     95 
     96     def run_once(self):
     97         """Test main loop."""
     98         self._shill_proxy = shill_proxy.ShillProxy()
     99         manager = self._shill_proxy.manager
    100 
    101         with shill_temporary_profile.ShillTemporaryProfile(
    102                 manager, profile_name=self.TEST_PROFILE_NAME):
    103             with virtual_ethernet_pair.VirtualEthernetPair(
    104                     peer_interface_name=self.INTERFACE_NAME,
    105                     peer_interface_ip=None) as ethernet_pair:
    106                 if not ethernet_pair.is_healthy:
    107                     raise error.TestFail('Virtual ethernet pair failed.')
    108 
    109                 if self.get_authenticated_flag(self.INTERFACE_NAME):
    110                     raise error.TestFail('Authentication flag already set.')
    111 
    112                 with hostapd_server.HostapdServer(
    113                         interface=ethernet_pair.interface_name) as hostapd:
    114                     # Wait for hostapd to initialize.
    115                     time.sleep(1)
    116                     if not hostapd.running():
    117                         raise error.TestFail('hostapd process exited.')
    118 
    119                     self.configure_credentials(self.INTERFACE_NAME)
    120                     hostapd.send_eap_packets()
    121                     if not self.wait_for_authentication(self.INTERFACE_NAME):
    122                         raise error.TestFail('Authentication did not complete.')
    123 
    124                     client_mac_address = ethernet_pair.peer_interface_mac
    125                     if not hostapd.client_has_authenticated(client_mac_address):
    126                         raise error.TestFail('Server does not agree that '
    127                                              'client is authenticated')
    128 
    129                     if hostapd.client_has_logged_off(client_mac_address):
    130                         raise error.TestFail('Client has already logged off')
    131 
    132                     # Since the EAP credentials are associated with the
    133                     # top-most profile, popping it should cause the client
    134                     # to immediately log-off.
    135                     manager.PopProfile(self.TEST_PROFILE_NAME)
    136 
    137                     if self.get_authenticated_flag(self.INTERFACE_NAME):
    138                         raise error.TestFail('Client is still authenticated.')
    139 
    140                     if not hostapd.client_has_logged_off(client_mac_address):
    141                         raise error.TestFail('Client did not log off')
    142 
    143                     # Re-pushing the profile should make the EAP credentials
    144                     # available again, and should cause the client to
    145                     # re-authenticate.
    146                     manager.PushProfile(self.TEST_PROFILE_NAME)
    147 
    148                     if not self.wait_for_authentication(self.INTERFACE_NAME):
    149                         raise error.TestFail('Re-authentication did not '
    150                                              'complete.')
    151