Home | History | Annotate | Download | only in cros
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """
      6 Base class for DHCPv6 tests.  This class just sets up a little bit of plumbing,
      7 like a virtual ethernet device with one end that looks like a real ethernet
      8 device to shill and a DHCPv6 test server on the end that doesn't look like a
      9 real ethernet interface to shill.  Child classes should override test_body()
     10 with the logic of their test.
     11 """
     12 
     13 import logging
     14 import time
     15 import traceback
     16 
     17 from autotest_lib.client.bin import test
     18 from autotest_lib.client.common_lib import error
     19 from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
     20 from autotest_lib.client.cros import dhcpv6_test_server
     21 from autotest_lib.client.cros.networking import shill_proxy
     22 
     23 # These are keys that may be used with the DBus dictionary returned from
     24 # Dhcpv6TestBase.get_interface_ipconfig().
     25 DHCPV6_KEY_ADDRESS = 'Address'
     26 DHCPV6_KEY_DELEGATED_PREFIX = 'DelegatedPrefix'
     27 DHCPV6_KEY_DELEGATED_PREFIX_LENGTH = 'DelegatedPrefixLength'
     28 DHCPV6_KEY_NAMESERVERS = 'NameServers'
     29 DHCPV6_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains'
     30 
     31 # After DHCPv6 completes, an ipconfig should appear shortly after
     32 IPCONFIG_POLL_COUNT = 5
     33 IPCONFIG_POLL_PERIOD_SECONDS = 1
     34 
     35 class Dhcpv6TestBase(test.test):
     36     """Parent class for tests that work verify DHCPv6 behavior."""
     37     version = 1
     38 
     39     def get_device(self, interface_name):
     40         """Finds the corresponding Device object for an interface with
     41         the name |interface_name|.
     42 
     43         @param interface_name string The name of the interface to check.
     44 
     45         @return DBus interface object representing the associated device.
     46 
     47         """
     48         return self.shill_proxy.find_object('Device',
     49                                             {'Name': interface_name})
     50 
     51 
     52     def find_ethernet_service(self, interface_name):
     53         """Finds the corresponding service object for an Ethernet interface.
     54 
     55         @param interface_name string The name of the associated interface
     56 
     57         @return Service object representing the associated service.
     58 
     59         """
     60         device = self.get_device(interface_name)
     61         device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
     62         return self.shill_proxy.find_object('Service', {'Device': device_path})
     63 
     64 
     65     def get_interface_ipconfig_objects(self, interface_name):
     66         """
     67         Returns a list of dbus object proxies for |interface_name|.
     68         Returns an empty list if no such interface exists.
     69 
     70         @param interface_name string name of the device to query (e.g., "eth0").
     71 
     72         @return list of objects representing DBus IPConfig RPC endpoints.
     73 
     74         """
     75         device = self.get_device(interface_name)
     76         if device is None:
     77             return []
     78 
     79         device_properties = device.GetProperties(utf8_strings=True)
     80         proxy = self.shill_proxy
     81 
     82         ipconfig_object = proxy.DBUS_TYPE_IPCONFIG
     83         return filter(bool,
     84                       [ proxy.get_dbus_object(ipconfig_object, property_path)
     85                         for property_path in device_properties['IPConfigs'] ])
     86 
     87 
     88     def get_interface_ipconfig(self, interface_name):
     89         """
     90         Returns a dictionary containing settings for an |interface_name| set
     91         via DHCPv6.  Returns None if no such interface or setting bundle on
     92         that interface can be found in shill.
     93 
     94         @param interface_name string name of the device to query (e.g., "eth0").
     95 
     96         @return dict containing the the properties of the IPConfig stripped
     97             of DBus meta-data or None.
     98 
     99         """
    100         dhcp_properties = None
    101         for ipconfig in self.get_interface_ipconfig_objects(interface_name):
    102           logging.info('Looking at ipconfig %r', ipconfig)
    103           ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
    104           if 'Method' not in ipconfig_properties:
    105               logging.info('Found ipconfig object with no method field')
    106               continue
    107           if ipconfig_properties['Method'] != 'dhcp6':
    108               logging.info('Found ipconfig object with method != dhcp6')
    109               continue
    110           if dhcp_properties != None:
    111               raise error.TestFail('Found multiple ipconfig objects '
    112                                    'with method == dhcp6')
    113           dhcp_properties = ipconfig_properties
    114         if dhcp_properties is None:
    115             logging.info('Did not find IPConfig object with method == dhcp6')
    116             return None
    117         logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties)
    118         return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties)
    119 
    120 
    121     def run_once(self):
    122         self._server = None
    123         self._server_ip = None
    124         self._ethernet_pair = None
    125         self._shill_proxy = shill_proxy.ShillProxy()
    126         try:
    127             # TODO(zqiu): enable DHCPv6 for peer interface, either by restarting
    128             # shill with appropriate command line options or via a new DBUS
    129             # command.
    130             self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
    131                     interface_ip=None,
    132                     peer_interface_name='pseudoethernet0',
    133                     peer_interface_ip=None,
    134                     interface_ipv6=dhcpv6_test_server.DHCPV6_SERVER_ADDRESS)
    135             self._ethernet_pair.setup()
    136             if not self._ethernet_pair.is_healthy:
    137                 raise error.TestFail('Could not create virtual ethernet pair.')
    138             self._server_ip = self._ethernet_pair.interface_ip
    139             self._server = dhcpv6_test_server.Dhcpv6TestServer(
    140                     self._ethernet_pair.interface_name)
    141             self._server.start()
    142             self.test_body()
    143         except (error.TestFail, error.TestNAError):
    144             # Pass these through without modification.
    145             raise
    146         except Exception as e:
    147             logging.error('Caught exception: %s.', str(e))
    148             logging.error('Trace: %s', traceback.format_exc())
    149             raise error.TestFail('Caught exception: %s.' % str(e))
    150         finally:
    151             if self._server is not None:
    152                 self._server.stop()
    153             if self._ethernet_pair is not None:
    154                 self._ethernet_pair.teardown()
    155 
    156     def test_body(self):
    157         """
    158         Override this method with the body of your test.  You may safely assume
    159         that the the properties exposed by DhcpTestBase correctly return
    160         references to the test apparatus.
    161         """
    162         raise error.TestFail('No test body implemented')
    163 
    164     @property
    165     def server_ip(self):
    166         """
    167         Return the IP address of the side of the interface that the DHCPv6 test
    168         server is bound to.  The server itself is bound the the broadcast
    169         address on the interface.
    170         """
    171         return self._server_ip
    172 
    173     @property
    174     def server(self):
    175         """
    176         Returns a reference to the DHCP test server.  Use this to add handlers
    177         and run tests.
    178         """
    179         return self._server
    180 
    181     @property
    182     def ethernet_pair(self):
    183         """
    184         Returns a reference to the virtual ethernet pair created to run DHCP
    185         tests on.
    186         """
    187         return self._ethernet_pair
    188 
    189     @property
    190     def shill_proxy(self):
    191         """
    192         Returns a the shill proxy instance.
    193         """
    194         return self._shill_proxy
    195 
    196 
    197     def check_dhcpv6_config(self):
    198         """
    199         Compare the DHCPv6 ipconfig with DHCP lease parameters to ensure
    200         that the DUT attained the correct values.
    201 
    202         """
    203         # Retrieve DHCPv6 configuration.
    204         for attempt in range(IPCONFIG_POLL_COUNT):
    205             dhcpv6_config = self.get_interface_ipconfig(
    206                     self.ethernet_pair.peer_interface_name)
    207             # Wait until both IP address and delegated prefix are obtained.
    208             if (dhcpv6_config is not None and
    209                 dhcpv6_config.get(DHCPV6_KEY_ADDRESS) and
    210                 dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)):
    211                 break;
    212             time.sleep(IPCONFIG_POLL_PERIOD_SECONDS)
    213         else:
    214             raise error.TestFail('Failed to retrieve DHCPv6 ipconfig object '
    215                                  'from shill.')
    216 
    217         # Verify Non-temporary Address prefix.
    218         address = dhcpv6_config.get(DHCPV6_KEY_ADDRESS)
    219         actual_prefix = address[:address.index('::')]
    220         expected_prefix = dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX[:
    221                 dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX.index('::')]
    222         if actual_prefix != expected_prefix:
    223             raise error.TestFail('Address prefix mismatch: '
    224                                  'actual %s expected %s.' %
    225                                  (actual_prefix, expected_prefix))
    226         # Verify Non-temporary Address suffix.
    227         actual_suffix = int(address[address.index('::')+2:], 16)
    228         if (actual_suffix < dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW or
    229             actual_suffix > dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH):
    230             raise error.TestFail('Invalid address suffix: '
    231                                  'actual %x expected (%x-%x)' %
    232                                  (actual_suffix,
    233                                   dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW,
    234                                   dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH))
    235 
    236         # Verify delegated prefix.
    237         delegated_prefix = dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)
    238         for x in range(
    239                 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_LOW,
    240                 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_HIGH+1):
    241             valid_prefix = \
    242                     dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_RANGE_FORMAT % x
    243             if delegated_prefix == valid_prefix:
    244                 break;
    245         else:
    246             raise error.TestFail('Invalid delegated prefix: %s' %
    247                                  (delegated_prefix))
    248         # Verify delegated prefix length.
    249         delegated_prefix_length = \
    250                 int(dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX_LENGTH))
    251         expected_delegated_prefix_length = \
    252                 dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_PREFIX_LENGTH
    253         if delegated_prefix_length != expected_delegated_prefix_length:
    254             raise error.TestFail('Delegated prefix length mismatch: '
    255                                  'actual %d expected %d' %
    256                                  (delegated_prefix_length,
    257                                   expected_delegated_prefix_length))
    258 
    259         # Verify name servers.
    260         actual_name_servers = dhcpv6_config.get(DHCPV6_KEY_NAMESERVERS)
    261         expected_name_servers = \
    262                 dhcpv6_test_server.DHCPV6_NAME_SERVERS.split(',')
    263         if actual_name_servers != expected_name_servers:
    264             raise error.TestFail('Name servers mismatch: actual %r expected %r'
    265                                  % (actual_name_servers, expected_name_servers))
    266         # Verify domain search.
    267         actual_domain_search = dhcpv6_config.get(DHCPV6_KEY_SEARCH_DOMAIN_LIST)
    268         expected_domain_search = \
    269                 dhcpv6_test_server.DHCPV6_DOMAIN_SEARCH.split(',')
    270         if actual_domain_search != expected_domain_search:
    271             raise error.TestFail('Domain search list mismatch: '
    272                                  'actual %r expected %r' %
    273                                  (actual_domain_search, expected_domain_search))
    274