Home | History | Annotate | Download | only in net_test
      1 #!/usr/bin/python
      2 #
      3 # Copyright 2014 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 import errno
     18 import random
     19 from socket import *  # pylint: disable=wildcard-import
     20 import time
     21 import unittest
     22 
     23 from scapy import all as scapy
     24 
     25 import csocket
     26 import iproute
     27 import multinetwork_base
     28 import packets
     29 import net_test
     30 
     31 # Setsockopt values.
     32 IPV6_ADDR_PREFERENCES = 72
     33 IPV6_PREFER_SRC_PUBLIC = 0x0002
     34 
     35 
     36 class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
     37   """Test for IPv6 source address selection.
     38 
     39   Relevant kernel commits:
     40     upstream net-next:
     41       7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
     42       c58da4c net: ipv6: allow explicitly choosing optimistic addresses
     43       9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface.
     44       c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr().
     45       c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr().
     46       3985e8a ipv6: sysctl to restrict candidate source addresses
     47 
     48     android-3.10:
     49       2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
     50       0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic
     51       0633924 ipv6: sysctl to restrict candidate source addresses
     52   """
     53 
     54   def SetIPv6Sysctl(self, ifname, sysctl, value):
     55     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value)
     56 
     57   def SetDAD(self, ifname, value):
     58     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
     59     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)
     60 
     61   def SetOptimisticDAD(self, ifname, value):
     62     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)
     63 
     64   def SetUseTempaddrs(self, ifname, value):
     65     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)
     66 
     67   def SetUseOptimistic(self, ifname, value):
     68     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)
     69 
     70   def GetSourceIP(self, netid, mode="mark"):
     71     s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
     72     # Because why not...testing for temporary addresses is a separate thing.
     73     s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)
     74 
     75     s.connect((net_test.IPV6_ADDR, 123))
     76     src_addr = s.getsockname()[0]
     77     self.assertTrue(src_addr)
     78     return src_addr
     79 
     80   def assertAddressNotPresent(self, address):
     81     self.assertRaises(IOError, self.iproute.GetAddress, address)
     82 
     83   def assertAddressHasExpectedAttributes(
     84       self, address, expected_ifindex, expected_flags):
     85     ifa_msg = self.iproute.GetAddress(address)[0]
     86     self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
     87     self.assertEquals(64, ifa_msg.prefixlen)
     88     self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
     89     self.assertEquals(expected_ifindex, ifa_msg.index)
     90     self.assertEquals(expected_flags, ifa_msg.flags & expected_flags)
     91 
     92   def AddressIsTentative(self, address):
     93     ifa_msg = self.iproute.GetAddress(address)[0]
     94     return ifa_msg.flags & iproute.IFA_F_TENTATIVE
     95 
     96   def BindToAddress(self, address):
     97     s = net_test.UDPSocket(AF_INET6)
     98     s.bind((address, 0, 0, 0))
     99 
    100   def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
    101     pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
    102     cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
    103     s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
    104     return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)
    105 
    106   def assertAddressUsable(self, address, netid):
    107     self.BindToAddress(address)
    108     self.SendWithSourceAddress(address, netid)
    109     # No exceptions? Good.
    110 
    111   def assertAddressNotUsable(self, address, netid):
    112     self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
    113     self.assertRaisesErrno(errno.EINVAL,
    114                            self.SendWithSourceAddress, address, netid)
    115 
    116   def assertAddressSelected(self, address, netid):
    117     self.assertEquals(address, self.GetSourceIP(netid))
    118 
    119   def assertAddressNotSelected(self, address, netid):
    120     self.assertNotEquals(address, self.GetSourceIP(netid))
    121 
    122   def WaitForDad(self, address):
    123     for _ in xrange(20):
    124       if not self.AddressIsTentative(address):
    125         return
    126       time.sleep(0.1)
    127     raise AssertionError("%s did not complete DAD after 2 seconds")
    128 
    129 
    130 class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
    131 
    132   def setUp(self):
    133     # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
    134     # are all consistently disabled at the outset.
    135     for netid in self.tuns:
    136       ifname = self.GetInterfaceName(netid)
    137       self.SetDAD(ifname, 0)
    138       self.SetOptimisticDAD(ifname, 0)
    139       self.SetUseTempaddrs(ifname, 0)
    140       self.SetUseOptimistic(ifname, 0)
    141       self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0)
    142 
    143     # [1]  Pick an interface on which to test.
    144     self.test_netid = random.choice(self.tuns.keys())
    145     self.test_ip = self.MyAddress(6, self.test_netid)
    146     self.test_ifindex = self.ifindices[self.test_netid]
    147     self.test_ifname = self.GetInterfaceName(self.test_netid)
    148     self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True)
    149 
    150     # [2]  Delete the test interface's IPv6 address.
    151     self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
    152     self.assertAddressNotPresent(self.test_ip)
    153 
    154     self.assertAddressNotUsable(self.test_ip, self.test_netid)
    155     # Verify that the link-local address is not tentative.
    156     self.assertFalse(self.AddressIsTentative(self.test_lladdr))
    157 
    158 
    159 class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):
    160 
    161   def testRfc6724Behaviour(self):
    162     # [3]  Get an IPv6 address back, in DAD start-up.
    163     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    164     # Send a RA to start SLAAC and subsequent DAD.
    165     self.SendRA(self.test_netid, 0)
    166     # Get flags and prove tentative-ness.
    167     self.assertAddressHasExpectedAttributes(
    168         self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)
    169 
    170     # Even though the interface has an IPv6 address, its tentative nature
    171     # prevents it from being selected.
    172     self.assertAddressNotUsable(self.test_ip, self.test_netid)
    173     self.assertAddressNotSelected(self.test_ip, self.test_netid)
    174 
    175     # Busy wait for DAD to complete (should be less than 1 second).
    176     self.WaitForDad(self.test_ip)
    177 
    178     # The test_ip should have completed DAD by now, and should be the
    179     # chosen source address, eligible to bind to, etc.
    180     self.assertAddressUsable(self.test_ip, self.test_netid)
    181     self.assertAddressSelected(self.test_ip, self.test_netid)
    182 
    183 
    184 class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):
    185 
    186   def testRfc6724Behaviour(self):
    187     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    188     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    189     self.SetOptimisticDAD(self.test_ifname, 1)
    190     # Send a RA to start SLAAC and subsequent DAD.
    191     self.SendRA(self.test_netid, 0)
    192     # Get flags and prove optimism.
    193     self.assertAddressHasExpectedAttributes(
    194         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    195 
    196     # Optimistic addresses are usable but are not selected.
    197     if net_test.LinuxVersion() >= (3, 18, 0):
    198       # The version checked in to android kernels <= 3.10 requires the
    199       # use_optimistic sysctl to be turned on.
    200       self.assertAddressUsable(self.test_ip, self.test_netid)
    201     self.assertAddressNotSelected(self.test_ip, self.test_netid)
    202 
    203     # Busy wait for DAD to complete (should be less than 1 second).
    204     self.WaitForDad(self.test_ip)
    205 
    206     # The test_ip should have completed DAD by now, and should be the
    207     # chosen source address.
    208     self.assertAddressUsable(self.test_ip, self.test_netid)
    209     self.assertAddressSelected(self.test_ip, self.test_netid)
    210 
    211 
    212 class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):
    213 
    214   def testModifiedRfc6724Behaviour(self):
    215     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    216     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    217     self.SetOptimisticDAD(self.test_ifname, 1)
    218     self.SetUseOptimistic(self.test_ifname, 1)
    219     # Send a RA to start SLAAC and subsequent DAD.
    220     self.SendRA(self.test_netid, 0)
    221     # Get flags and prove optimistism.
    222     self.assertAddressHasExpectedAttributes(
    223         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    224 
    225     # The interface has an IPv6 address and, despite its optimistic nature,
    226     # the use_optimistic option allows it to be selected.
    227     self.assertAddressUsable(self.test_ip, self.test_netid)
    228     self.assertAddressSelected(self.test_ip, self.test_netid)
    229 
    230 
    231 class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
    232 
    233   def testModifiedRfc6724Behaviour(self):
    234     # [3]  Add a valid IPv6 address to this interface and verify it is
    235     # selected as the source address.
    236     preferred_ip = self.IPv6Prefix(self.test_netid) + "cafe"
    237     self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
    238     self.assertAddressHasExpectedAttributes(
    239         preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
    240     self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid))
    241 
    242     # [4]  Get another IPv6 address, in optimistic DAD start-up.
    243     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    244     self.SetOptimisticDAD(self.test_ifname, 1)
    245     self.SetUseOptimistic(self.test_ifname, 1)
    246     # Send a RA to start SLAAC and subsequent DAD.
    247     self.SendRA(self.test_netid, 0)
    248     # Get flags and prove optimism.
    249     self.assertAddressHasExpectedAttributes(
    250         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    251 
    252     # Since the interface has another IPv6 address, the optimistic address
    253     # is not selected--the other, valid address is chosen.
    254     self.assertAddressUsable(self.test_ip, self.test_netid)
    255     self.assertAddressNotSelected(self.test_ip, self.test_netid)
    256     self.assertAddressSelected(preferred_ip, self.test_netid)
    257 
    258 
    259 class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):
    260 
    261   def testDadFailure(self):
    262     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    263     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    264     self.SetOptimisticDAD(self.test_ifname, 1)
    265     self.SetUseOptimistic(self.test_ifname, 1)
    266     # Send a RA to start SLAAC and subsequent DAD.
    267     self.SendRA(self.test_netid, 0)
    268     # Prove optimism and usability.
    269     self.assertAddressHasExpectedAttributes(
    270         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    271     self.assertAddressUsable(self.test_ip, self.test_netid)
    272     self.assertAddressSelected(self.test_ip, self.test_netid)
    273 
    274     # Send a NA for the optimistic address, indicating address conflict
    275     # ("DAD defense").
    276     conflict_macaddr = "02:00:0b:ad:d0:0d"
    277     dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
    278                    scapy.IPv6(src=self.test_ip, dst="ff02::1") /
    279                    scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
    280                    scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
    281     self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
    282 
    283     # The address should have failed DAD, and therefore no longer be usable.
    284     self.assertAddressNotUsable(self.test_ip, self.test_netid)
    285     self.assertAddressNotSelected(self.test_ip, self.test_netid)
    286 
    287     # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.
    288 
    289 
    290 class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
    291 
    292   def testSendToOnlinkDestination(self):
    293     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    294     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    295     self.SetOptimisticDAD(self.test_ifname, 1)
    296     self.SetUseOptimistic(self.test_ifname, 1)
    297     # Send a RA to start SLAAC and subsequent DAD.
    298     self.SendRA(self.test_netid, 0)
    299     # Prove optimism and usability.
    300     self.assertAddressHasExpectedAttributes(
    301         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    302     self.assertAddressUsable(self.test_ip, self.test_netid)
    303     self.assertAddressSelected(self.test_ip, self.test_netid)
    304 
    305     # [4]  Send to an on-link destination and observe a Neighbor Solicitation
    306     # packet with a source address that is NOT the optimistic address.
    307     # In this setup, the only usable address is the link-local address.
    308     onlink_dest = self.GetRandomDestination(self.IPv6Prefix(self.test_netid))
    309     self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)
    310 
    311     if net_test.LinuxVersion() >= (3, 18, 0):
    312       # Older versions will actually choose the optimistic address to
    313       # originate Neighbor Solications (RFC violation).
    314       expected_ns = packets.NS(
    315           self.test_lladdr,
    316           onlink_dest,
    317           self.MyMacAddress(self.test_netid))[1]
    318       self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)
    319 
    320 
    321 # TODO(ek): add tests listening for netlink events.
    322 
    323 
    324 class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
    325 
    326   def testChoosesNonInterfaceSourceAddress(self):
    327     self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0)
    328     src_ip = self.GetSourceIP(self.test_netid)
    329     self.assertFalse(src_ip in [self.test_ip, self.test_lladdr])
    330     self.assertTrue(src_ip in
    331                     [self.MyAddress(6, netid)
    332                      for netid in self.tuns if netid != self.test_netid])
    333 
    334 
    335 class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
    336 
    337   def testChoosesOnlyInterfaceSourceAddress(self):
    338     self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1)
    339     # self.test_ifname does not have a global IPv6 address, so the only
    340     # candidate is the existing link-local address.
    341     self.assertAddressSelected(self.test_lladdr, self.test_netid)
    342 
    343 
    344 if __name__ == "__main__":
    345   unittest.main()
    346