Home | History | Annotate | Download | only in test
      1 #!/usr/bin/python
      2 #
      3 # Copyright 2014 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 import errno
     18 import random
     19 from socket import *  # pylint: disable=wildcard-import
     20 import time
     21 import unittest
     22 
     23 from scapy import all as scapy
     24 
     25 import csocket
     26 import iproute
     27 import multinetwork_base
     28 import packets
     29 import net_test
     30 
     31 # Setsockopt values.
     32 IPV6_ADDR_PREFERENCES = 72
     33 IPV6_PREFER_SRC_PUBLIC = 0x0002
     34 
     35 # The retrans timer is also the DAD timeout. We set this to a value that's not
     36 # so short that DAD will complete before we attempt to use the network, but
     37 # short enough that we don't have to wait too long for DAD to complete.
     38 RETRANS_TIMER = 150
     39 
     40 
     41 class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
     42   """Test for IPv6 source address selection.
     43 
     44   Relevant kernel commits:
     45     upstream net-next:
     46       7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
     47       c58da4c net: ipv6: allow explicitly choosing optimistic addresses
     48       9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface.
     49       c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr().
     50       c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr().
     51       3985e8a ipv6: sysctl to restrict candidate source addresses
     52 
     53     android-3.10:
     54       2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
     55       0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic
     56       0633924 ipv6: sysctl to restrict candidate source addresses
     57   """
     58 
     59   def SetIPv6Sysctl(self, ifname, sysctl, value):
     60     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value)
     61 
     62   def SetDAD(self, ifname, value):
     63     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
     64     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)
     65 
     66   def SetOptimisticDAD(self, ifname, value):
     67     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)
     68 
     69   def SetUseTempaddrs(self, ifname, value):
     70     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)
     71 
     72   def SetUseOptimistic(self, ifname, value):
     73     self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)
     74 
     75   def GetSourceIP(self, netid, mode="mark"):
     76     s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
     77     # Because why not...testing for temporary addresses is a separate thing.
     78     s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)
     79 
     80     s.connect((net_test.IPV6_ADDR, 123))
     81     src_addr = s.getsockname()[0]
     82     self.assertTrue(src_addr)
     83     return src_addr
     84 
     85   def assertAddressNotPresent(self, address):
     86     self.assertRaises(IOError, self.iproute.GetAddress, address)
     87 
     88   def assertAddressHasExpectedAttributes(
     89       self, address, expected_ifindex, expected_flags):
     90     ifa_msg = self.iproute.GetAddress(address)[0]
     91     self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
     92     self.assertEquals(64, ifa_msg.prefixlen)
     93     self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
     94     self.assertEquals(expected_ifindex, ifa_msg.index)
     95     self.assertEquals(expected_flags, ifa_msg.flags & expected_flags)
     96 
     97   def AddressIsTentative(self, address):
     98     ifa_msg = self.iproute.GetAddress(address)[0]
     99     return ifa_msg.flags & iproute.IFA_F_TENTATIVE
    100 
    101   def BindToAddress(self, address):
    102     s = net_test.UDPSocket(AF_INET6)
    103     s.bind((address, 0, 0, 0))
    104 
    105   def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
    106     pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
    107     cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
    108     s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
    109     return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)
    110 
    111   def assertAddressUsable(self, address, netid):
    112     self.BindToAddress(address)
    113     self.SendWithSourceAddress(address, netid)
    114     # No exceptions? Good.
    115 
    116   def assertAddressNotUsable(self, address, netid):
    117     self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
    118     self.assertRaisesErrno(errno.EINVAL,
    119                            self.SendWithSourceAddress, address, netid)
    120 
    121   def assertAddressSelected(self, address, netid):
    122     self.assertEquals(address, self.GetSourceIP(netid))
    123 
    124   def assertAddressNotSelected(self, address, netid):
    125     self.assertNotEquals(address, self.GetSourceIP(netid))
    126 
    127   def WaitForDad(self, address):
    128     for _ in xrange(20):
    129       if not self.AddressIsTentative(address):
    130         return
    131       time.sleep(0.1)
    132     raise AssertionError("%s did not complete DAD after 2 seconds")
    133 
    134 
    135 class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
    136 
    137   def setUp(self):
    138     # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
    139     # are all consistently disabled at the outset.
    140     for netid in self.tuns:
    141       ifname = self.GetInterfaceName(netid)
    142       self.SetDAD(ifname, 0)
    143       self.SetOptimisticDAD(ifname, 0)
    144       self.SetUseTempaddrs(ifname, 0)
    145       self.SetUseOptimistic(ifname, 0)
    146       self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0)
    147 
    148     # [1]  Pick an interface on which to test.
    149     self.test_netid = random.choice(self.tuns.keys())
    150     self.test_ip = self.MyAddress(6, self.test_netid)
    151     self.test_ifindex = self.ifindices[self.test_netid]
    152     self.test_ifname = self.GetInterfaceName(self.test_netid)
    153     self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True)
    154 
    155     # [2]  Delete the test interface's IPv6 address.
    156     self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
    157     self.assertAddressNotPresent(self.test_ip)
    158 
    159     self.assertAddressNotUsable(self.test_ip, self.test_netid)
    160     # Verify that the link-local address is not tentative.
    161     # Even though we disable DAD above, without this change occasionally the
    162     # test fails. This might be due to us disabling DAD only after the
    163     # link-local address is generated.
    164     self.WaitForDad(self.test_lladdr)
    165 
    166 
    167 class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):
    168 
    169   def testRfc6724Behaviour(self):
    170     # [3]  Get an IPv6 address back, in DAD start-up.
    171     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    172     # Send a RA to start SLAAC and subsequent DAD.
    173     self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    174     # Get flags and prove tentative-ness.
    175     self.assertAddressHasExpectedAttributes(
    176         self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)
    177 
    178     # Even though the interface has an IPv6 address, its tentative nature
    179     # prevents it from being selected.
    180     self.assertAddressNotUsable(self.test_ip, self.test_netid)
    181     self.assertAddressNotSelected(self.test_ip, self.test_netid)
    182 
    183     # Busy wait for DAD to complete (should be less than 1 second).
    184     self.WaitForDad(self.test_ip)
    185 
    186     # The test_ip should have completed DAD by now, and should be the
    187     # chosen source address, eligible to bind to, etc.
    188     self.assertAddressUsable(self.test_ip, self.test_netid)
    189     self.assertAddressSelected(self.test_ip, self.test_netid)
    190 
    191 
    192 class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):
    193 
    194   def testRfc6724Behaviour(self):
    195     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    196     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    197     self.SetOptimisticDAD(self.test_ifname, 1)
    198     # Send a RA to start SLAAC and subsequent DAD.
    199     self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    200     # Get flags and prove optimism.
    201     self.assertAddressHasExpectedAttributes(
    202         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    203 
    204     # Optimistic addresses are usable but are not selected.
    205     if net_test.LINUX_VERSION >= (3, 18, 0):
    206       # The version checked in to android kernels <= 3.10 requires the
    207       # use_optimistic sysctl to be turned on.
    208       self.assertAddressUsable(self.test_ip, self.test_netid)
    209     self.assertAddressNotSelected(self.test_ip, self.test_netid)
    210 
    211     # Busy wait for DAD to complete (should be less than 1 second).
    212     self.WaitForDad(self.test_ip)
    213 
    214     # The test_ip should have completed DAD by now, and should be the
    215     # chosen source address.
    216     self.assertAddressUsable(self.test_ip, self.test_netid)
    217     self.assertAddressSelected(self.test_ip, self.test_netid)
    218 
    219 
    220 class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):
    221 
    222   def testModifiedRfc6724Behaviour(self):
    223     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    224     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    225     self.SetOptimisticDAD(self.test_ifname, 1)
    226     self.SetUseOptimistic(self.test_ifname, 1)
    227     # Send a RA to start SLAAC and subsequent DAD.
    228     self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    229     # Get flags and prove optimistism.
    230     self.assertAddressHasExpectedAttributes(
    231         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    232 
    233     # The interface has an IPv6 address and, despite its optimistic nature,
    234     # the use_optimistic option allows it to be selected.
    235     self.assertAddressUsable(self.test_ip, self.test_netid)
    236     self.assertAddressSelected(self.test_ip, self.test_netid)
    237 
    238 
    239 class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
    240 
    241   def testModifiedRfc6724Behaviour(self):
    242     # [3]  Add a valid IPv6 address to this interface and verify it is
    243     # selected as the source address.
    244     preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe"
    245     self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
    246     self.assertAddressHasExpectedAttributes(
    247         preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
    248     self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid))
    249 
    250     # [4]  Get another IPv6 address, in optimistic DAD start-up.
    251     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    252     self.SetOptimisticDAD(self.test_ifname, 1)
    253     self.SetUseOptimistic(self.test_ifname, 1)
    254     # Send a RA to start SLAAC and subsequent DAD.
    255     self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    256     # Get flags and prove optimism.
    257     self.assertAddressHasExpectedAttributes(
    258         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    259 
    260     # Since the interface has another IPv6 address, the optimistic address
    261     # is not selected--the other, valid address is chosen.
    262     self.assertAddressUsable(self.test_ip, self.test_netid)
    263     self.assertAddressNotSelected(self.test_ip, self.test_netid)
    264     self.assertAddressSelected(preferred_ip, self.test_netid)
    265 
    266 
    267 class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):
    268 
    269   def testDadFailure(self):
    270     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    271     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    272     self.SetOptimisticDAD(self.test_ifname, 1)
    273     self.SetUseOptimistic(self.test_ifname, 1)
    274     # Send a RA to start SLAAC and subsequent DAD.
    275     self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    276     # Prove optimism and usability.
    277     self.assertAddressHasExpectedAttributes(
    278         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    279     self.assertAddressUsable(self.test_ip, self.test_netid)
    280     self.assertAddressSelected(self.test_ip, self.test_netid)
    281 
    282     # Send a NA for the optimistic address, indicating address conflict
    283     # ("DAD defense").
    284     conflict_macaddr = "02:00:0b:ad:d0:0d"
    285     dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
    286                    scapy.IPv6(src=self.test_ip, dst="ff02::1") /
    287                    scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
    288                    scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
    289     self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
    290 
    291     # The address should have failed DAD, and therefore no longer be usable.
    292     self.assertAddressNotUsable(self.test_ip, self.test_netid)
    293     self.assertAddressNotSelected(self.test_ip, self.test_netid)
    294 
    295     # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.
    296 
    297 
    298 class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
    299 
    300   def testSendToOnlinkDestination(self):
    301     # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    302     self.SetDAD(self.test_ifname, 1)  # Enable DAD
    303     self.SetOptimisticDAD(self.test_ifname, 1)
    304     self.SetUseOptimistic(self.test_ifname, 1)
    305     # Send a RA to start SLAAC and subsequent DAD.
    306     self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    307     # Prove optimism and usability.
    308     self.assertAddressHasExpectedAttributes(
    309         self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    310     self.assertAddressUsable(self.test_ip, self.test_netid)
    311     self.assertAddressSelected(self.test_ip, self.test_netid)
    312 
    313     # [4]  Send to an on-link destination and observe a Neighbor Solicitation
    314     # packet with a source address that is NOT the optimistic address.
    315     # In this setup, the only usable address is the link-local address.
    316     onlink_dest = self.GetRandomDestination(
    317         self.OnlinkPrefix(6, self.test_netid))
    318     self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)
    319 
    320     if net_test.LINUX_VERSION >= (3, 18, 0):
    321       # Older versions will actually choose the optimistic address to
    322       # originate Neighbor Solications (RFC violation).
    323       expected_ns = packets.NS(
    324           self.test_lladdr,
    325           onlink_dest,
    326           self.MyMacAddress(self.test_netid))[1]
    327       self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)
    328 
    329 
    330 # TODO(ek): add tests listening for netlink events.
    331 
    332 
    333 class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
    334 
    335   def testChoosesNonInterfaceSourceAddress(self):
    336     self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0)
    337     src_ip = self.GetSourceIP(self.test_netid)
    338     self.assertFalse(src_ip in [self.test_ip, self.test_lladdr])
    339     self.assertTrue(src_ip in
    340                     [self.MyAddress(6, netid)
    341                      for netid in self.tuns if netid != self.test_netid])
    342 
    343 
    344 class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):
    345 
    346   def testChoosesOnlyInterfaceSourceAddress(self):
    347     self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1)
    348     # self.test_ifname does not have a global IPv6 address, so the only
    349     # candidate is the existing link-local address.
    350     self.assertAddressSelected(self.test_lladdr, self.test_netid)
    351 
    352 
    353 if __name__ == "__main__":
    354   unittest.main()
    355