Home | History | Annotate | Download | only in network_Ipv6SimpleNegotiation
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import time
      7 
      8 from autotest_lib.client.common_lib import error
      9 from autotest_lib.client.common_lib import utils
     10 from autotest_lib.client.cros import dhcp_test_base
     11 from autotest_lib.client.cros import radvd_server
     12 from autotest_lib.client.cros.networking import shill_proxy
     13 
     14 class network_Ipv6SimpleNegotiation(dhcp_test_base.DhcpTestBase):
     15     """
     16     The test subclass that implements IPv6 negotiation.  This test
     17     starts an IPv6 router, then performs a series of tests on the
     18     IPv6 addresses and IPv6 DNS addresses that the DUT should have
     19     gained.
     20     """
     21 
     22     def _get_ip6_addresses(self):
     23         """Gets the list of client IPv6 addresses.
     24 
     25         Retrieve IPv6 addresses associated with the "client side" of the
     26         pseudo-interface pair.  Returns a dict keyed by the IPv6 address,
     27         with the values being the array of attribute strings that follow
     28         int the "ip addr show" output.  For example, a line containing:
     29 
     30             inet6 fe80::ae16:2dff:fe01:0203/64 scope link
     31 
     32         will turn into a dict key:
     33 
     34             'fe80::ae16:2dff:fe01:0203/64': [ 'scope', 'link' ]
     35 
     36         """
     37         addr_output = utils.system_output(
     38             "ip -6 addr show dev %s" % self.ethernet_pair.peer_interface_name)
     39         addresses = {}
     40         for line in addr_output.splitlines():
     41             parts = line.lstrip().split()
     42             if parts[0] != 'inet6' or 'deprecated' in parts:
     43                 continue
     44             addresses[parts[1]] = parts[2:]
     45         return addresses
     46 
     47 
     48     def _get_link_address(self):
     49         """Get the client MAC address.
     50 
     51         Retrieve the MAC address associated with the "client side" of the
     52         pseudo-interface pair.  For example, the "ip link show" output:
     53 
     54             link/ether 01:02:03:04:05:05 brd ff:ff:ff:ff:ff:ff
     55 
     56         will cause a return of "01:02:03:04:05:05"
     57 
     58         """
     59         addr_output = utils.system_output(
     60             'ip link show %s' % self.ethernet_pair.peer_interface_name)
     61         for line in addr_output.splitlines():
     62             parts = line.lstrip().split(' ')
     63             if parts[0] == 'link/ether':
     64                 return parts[1]
     65 
     66 
     67     def _get_ipconfig_properties(self):
     68         for ipconfig in self.get_interface_ipconfig_objects(
     69                 self.ethernet_pair.peer_interface_name):
     70             ipconfig_properties = shill_proxy.ShillProxy.dbus2primitive(
     71                     ipconfig.GetProperties(utf8_strings=True))
     72             if 'Method' not in ipconfig_properties:
     73                 continue
     74 
     75             if ipconfig_properties['Method'] != 'ipv6':
     76                 continue
     77 
     78             return ipconfig_properties
     79         else:
     80             raise error.TestError('Found no IPv6 IPConfig entries')
     81 
     82 
     83     def verify_ipv6_addresses(self):
     84         """Verify IPv6 configuration.
     85 
     86         Perform various tests to validate the IPv6 addresses acquired by
     87         the client.
     88 
     89         """
     90         addresses = self._get_ip6_addresses()
     91         logging.info('Got addresses %r', addresses)
     92         global_addresses = [key for key in addresses
     93                             if 'global' in addresses[key]]
     94 
     95         if len(global_addresses) != 2:
     96             raise error.TestError('Expected 2 global address but got %d' %
     97                                   len(global_addresses))
     98 
     99         prefix = radvd_server.RADVD_DEFAULT_PREFIX
    100         prefix = prefix[:prefix.index('::')]
    101         for address in global_addresses:
    102             if not address.startswith(prefix):
    103                 raise error.TestError('Global address %s does not start with '
    104                                       'expected prefix %s' %
    105                                       address, prefix)
    106 
    107         # One globally scoped address should be based on the last 3 octets
    108         # of the MAC adddress, while the other should not.  For example,
    109         # for MAC address "01:02:03:04:05:06", we should see an address
    110         # that ends with "4:506/64" (the "/64" is the default radvd suffix).
    111         link_parts = [int(b, 16) for b in self._get_link_address().split(':')]
    112         address_suffix = '%x:%x%s' % (link_parts[3],
    113                                       (link_parts[4] << 8) | link_parts[5],
    114                                       radvd_server.RADVD_DEFAULT_SUFFIX)
    115         mac_related_addresses = [addr for addr in global_addresses
    116                                  if addr.endswith(address_suffix)]
    117         if len(mac_related_addresses) != 1:
    118             raise error.TestError('Expected 1 mac-related global address but '
    119                                   'got %d' % len(mac_related_addresses))
    120         mac_related_address = mac_related_addresses[0]
    121 
    122         local_address_count = len(addresses) - len(global_addresses)
    123         if local_address_count <= 0:
    124             raise error.TestError('Expected at least 1 non-global address but '
    125                                   'got %d' % local_address_count)
    126 
    127         temporary_address = [addr for addr in global_addresses
    128                                  if addr != mac_related_address][0]
    129         self.verify_ipconfig_contains(temporary_address)
    130 
    131 
    132     def verify_ipconfig_contains(self, address_and_prefix):
    133         """Verify that shill has an IPConfig entry with the specified address.
    134 
    135         @param address_and_prefix string with address/prefix to search for.
    136 
    137         """
    138         address, prefix_str = address_and_prefix.split('/')
    139         prefix = int(prefix_str)
    140         ipconfig_properties = self._get_ipconfig_properties()
    141 
    142         for property, value in (('Address', address), ('Prefixlen', prefix)):
    143             if property not in ipconfig_properties:
    144                raise error.TestError('IPv6 IPConfig entry does not '
    145                                      'contain property %s' % property)
    146             if ipconfig_properties[property] != value:
    147                raise error.TestError('IPv6 IPConfig property %s does not '
    148                                      'contain the expected value %s; '
    149                                      'instead it is %s' %
    150                                      (property, value,
    151                                       ipconfig_properties[property]))
    152 
    153 
    154     def verify_ipconfig_name_servers(self, name_servers):
    155         """Verify that shill has an IPConfig entry with the specified name
    156         servers.
    157 
    158         @param name_servers list of expected name servers.
    159 
    160         """
    161         ipconfig_properties = self._get_ipconfig_properties()
    162 
    163         if ipconfig_properties['NameServers'] != name_servers:
    164             raise error.TestError('IPv6 name servers mismatched: '
    165                                   'expected %r actual %r' %
    166                                   name_servers,
    167                                   ipconfig_properties['NameServers'])
    168 
    169 
    170     def test_body(self):
    171         """The main body for this test."""
    172         server = radvd_server.RadvdServer(self.ethernet_pair.interface_name)
    173         server.start_server()
    174 
    175         try:
    176             # Wait for IPv6 negotiation to complete.
    177             time.sleep(radvd_server.RADVD_DEFAULT_MAX_ADV_INTERVAL)
    178 
    179             # In this time, we should have also acquired an IPv6 address.
    180             self.verify_ipv6_addresses()
    181             self.verify_ipconfig_name_servers(
    182                     radvd_server.RADVD_DEFAULT_RDNSS_SERVERS.split(' '))
    183         finally:
    184             server.stop_server()
    185