Home | History | Annotate | Download | only in test
      1 #!/usr/bin/python
      2 #
      3 # Copyright 2015 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 import errno
     18 import random
     19 from socket import *  # pylint: disable=wildcard-import
     20 import time
     21 import unittest
     22 
     23 from scapy import all as scapy
     24 
     25 import csocket
     26 import multinetwork_base
     27 import net_test
     28 
     29 
     30 RTMGRP_NEIGH = 4
     31 
     32 NUD_INCOMPLETE = 0x01
     33 NUD_REACHABLE = 0x02
     34 NUD_STALE = 0x04
     35 NUD_DELAY = 0x08
     36 NUD_PROBE = 0x10
     37 NUD_FAILED = 0x20
     38 NUD_PERMANENT = 0x80
     39 
     40 
     41 # TODO: Support IPv4.
     42 class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
     43 
     44   # Set a 500-ms retrans timer so we can test for ND retransmits without
     45   # waiting too long. Apparently this cannot go below 500ms.
     46   RETRANS_TIME_MS = 500
     47 
     48   # This can only be in seconds, so 1000 is the minimum.
     49   DELAY_TIME_MS = 1000
     50 
     51   # Unfortunately, this must be above the delay timer or the kernel ND code will
     52   # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is
     53   # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value
     54   # that's 2x the delay timer.
     55   BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS
     56   MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS
     57 
     58   @classmethod
     59   def setUpClass(cls):
     60     super(NeighbourTest, cls).setUpClass()
     61     for netid in cls.tuns:
     62       iface = cls.GetInterfaceName(netid)
     63       # This can't be set in an RA.
     64       cls.SetSysctl(
     65           "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface,
     66           cls.DELAY_TIME_MS / 1000)
     67 
     68   def setUp(self):
     69     super(NeighbourTest, self).setUp()
     70 
     71     for netid in self.tuns:
     72       # Clear the ND cache entries for all routers, so each test starts with
     73       # the IPv6 default router in state STALE.
     74       addr = self._RouterAddress(netid, 6)
     75       ifindex = self.ifindices[netid]
     76       self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED)
     77 
     78       # Configure IPv6 by sending an RA.
     79       self.SendRA(netid,
     80                   retranstimer=self.RETRANS_TIME_MS,
     81                   reachabletime=self.BASE_REACHABLE_TIME_MS)
     82 
     83     self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
     84     self.sock.bind((0, RTMGRP_NEIGH))
     85     net_test.SetNonBlocking(self.sock)
     86 
     87     self.netid = random.choice(self.tuns.keys())
     88     self.ifindex = self.ifindices[self.netid]
     89 
     90   def GetNeighbour(self, addr):
     91     version = csocket.AddressVersion(addr)
     92     for msg, args in self.iproute.DumpNeighbours(version):
     93       if args["NDA_DST"] == addr:
     94         return msg, args
     95 
     96   def GetNdEntry(self, addr):
     97     return self.GetNeighbour(addr)
     98 
     99   def CheckNoNdEvents(self):
    100     self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK)
    101 
    102   def assertNeighbourState(self, state, addr):
    103     self.assertEquals(state, self.GetNdEntry(addr)[0].state)
    104 
    105   def assertNeighbourAttr(self, addr, name, value):
    106     self.assertEquals(value, self.GetNdEntry(addr)[1][name])
    107 
    108   def ExpectNeighbourNotification(self, addr, state, attrs=None):
    109     msg = self.sock.recv(4096)
    110     msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg)
    111     self.assertEquals(addr, actual_attrs["NDA_DST"])
    112     self.assertEquals(state, msg.state)
    113     if attrs:
    114       for name in attrs:
    115         self.assertEquals(attrs[name], actual_attrs[name])
    116 
    117   def ExpectProbe(self, is_unicast, addr):
    118     version = csocket.AddressVersion(addr)
    119     if version == 6:
    120       llsrc = self.MyMacAddress(self.netid)
    121       if is_unicast:
    122         src = self.MyLinkLocalAddress(self.netid)
    123         dst = addr
    124       else:
    125         solicited = inet_pton(AF_INET6, addr)
    126         last3bytes = tuple([ord(b) for b in solicited[-3:]])
    127         dst = "ff02::1:ff%02x:%02x%02x" % last3bytes
    128         src = self.MyAddress(6, self.netid)
    129       expected = (
    130           scapy.IPv6(src=src, dst=dst) /
    131           scapy.ICMPv6ND_NS(tgt=addr) /
    132           scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc)
    133       )
    134       msg = "%s probe" % ("Unicast" if is_unicast else "Multicast")
    135       self.ExpectPacketOn(self.netid, msg, expected)
    136     else:
    137       raise NotImplementedError
    138 
    139   def ExpectUnicastProbe(self, addr):
    140     self.ExpectProbe(True, addr)
    141 
    142   def ExpectMulticastNS(self, addr):
    143     self.ExpectProbe(False, addr)
    144 
    145   def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None,
    146                                   S=1, O=0, R=1):
    147     version = csocket.AddressVersion(addr)
    148     if srcaddr is None:
    149       srcaddr = addr
    150     if dstaddr is None:
    151       dstaddr = self.MyLinkLocalAddress(self.netid)
    152     if version == 6:
    153       packet = (
    154           scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
    155           scapy.IPv6(src=srcaddr, dst=dstaddr) /
    156           scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) /
    157           scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
    158       )
    159       self.ReceiveEtherPacketOn(self.netid, packet)
    160     else:
    161       raise NotImplementedError
    162 
    163   def MonitorSleepMs(self, interval, addr):
    164     slept = 0
    165     while slept < interval:
    166       sleep_ms = min(100, interval - slept)
    167       time.sleep(sleep_ms / 1000.0)
    168       slept += sleep_ms
    169       print self.GetNdEntry(addr)
    170 
    171   def MonitorSleep(self, intervalseconds, addr):
    172     self.MonitorSleepMs(intervalseconds * 1000, addr)
    173 
    174   def SleepMs(self, ms):
    175     time.sleep(ms / 1000.0)
    176 
    177   def testNotifications(self):
    178     """Tests neighbour notifications.
    179 
    180     Relevant kernel commits:
    181       upstream net-next:
    182         765c9c6 neigh: Better handling of transition to NUD_PROBE state
    183         53385d2 neigh: Netlink notification for administrative NUD state change
    184           (only checked on kernel v3.13+, not on v3.10)
    185 
    186       android-3.10:
    187         e4a6d6b neigh: Better handling of transition to NUD_PROBE state
    188 
    189       android-3.18:
    190         2011e72 neigh: Better handling of transition to NUD_PROBE state
    191     """
    192     router4 = self._RouterAddress(self.netid, 4)
    193     router6 = self._RouterAddress(self.netid, 6)
    194     self.assertNeighbourState(NUD_PERMANENT, router4)
    195     self.assertNeighbourState(NUD_STALE, router6)
    196 
    197     # Send a packet and check that we go into DELAY.
    198     routing_mode = random.choice(["mark", "oif", "uid"])
    199     s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
    200     s.connect((net_test.IPV6_ADDR, 53))
    201     s.send(net_test.UDP_PAYLOAD)
    202     self.assertNeighbourState(NUD_DELAY, router6)
    203 
    204     # Wait for the probe interval, then check that we're in PROBE, and that the
    205     # kernel has notified us.
    206     self.SleepMs(self.DELAY_TIME_MS * 1.1)
    207     self.ExpectNeighbourNotification(router6, NUD_PROBE)
    208     self.assertNeighbourState(NUD_PROBE, router6)
    209     self.ExpectUnicastProbe(router6)
    210 
    211     # Respond to the NS and verify we're in REACHABLE again.
    212     self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid))
    213     self.assertNeighbourState(NUD_REACHABLE, router6)
    214     if net_test.LINUX_VERSION >= (3, 13, 0):
    215       # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative
    216       # NUD state change" produces notifications for NUD_REACHABLE, but these
    217       # are not generated on earlier kernels.
    218       self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
    219 
    220     # Wait until the reachable time has passed, and verify we're in STALE.
    221     self.SleepMs(self.MAX_REACHABLE_TIME_MS * 1.2)
    222     self.assertNeighbourState(NUD_STALE, router6)
    223     self.ExpectNeighbourNotification(router6, NUD_STALE)
    224 
    225     # Send a packet, and verify we go into DELAY and then to PROBE.
    226     s.send(net_test.UDP_PAYLOAD)
    227     self.assertNeighbourState(NUD_DELAY, router6)
    228     self.SleepMs(self.DELAY_TIME_MS * 1.1)
    229     self.assertNeighbourState(NUD_PROBE, router6)
    230     self.ExpectNeighbourNotification(router6, NUD_PROBE)
    231 
    232     # Wait for the probes to time out, and expect a FAILED notification.
    233     self.assertNeighbourAttr(router6, "NDA_PROBES", 1)
    234     self.ExpectUnicastProbe(router6)
    235 
    236     self.SleepMs(self.RETRANS_TIME_MS)
    237     self.ExpectUnicastProbe(router6)
    238     self.assertNeighbourAttr(router6, "NDA_PROBES", 2)
    239 
    240     self.SleepMs(self.RETRANS_TIME_MS)
    241     self.ExpectUnicastProbe(router6)
    242     self.assertNeighbourAttr(router6, "NDA_PROBES", 3)
    243 
    244     self.SleepMs(self.RETRANS_TIME_MS)
    245     self.assertNeighbourState(NUD_FAILED, router6)
    246     self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3})
    247 
    248   def testRepeatedProbes(self):
    249     router4 = self._RouterAddress(self.netid, 4)
    250     router6 = self._RouterAddress(self.netid, 6)
    251     routermac = self.RouterMacAddress(self.netid)
    252     self.assertNeighbourState(NUD_PERMANENT, router4)
    253     self.assertNeighbourState(NUD_STALE, router6)
    254 
    255     def ForceProbe(addr, mac):
    256       self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE)
    257       self.assertNeighbourState(NUD_PROBE, addr)
    258       self.SleepMs(1)  # TODO: Why is this necessary?
    259       self.assertNeighbourState(NUD_PROBE, addr)
    260       self.ExpectUnicastProbe(addr)
    261       self.ReceiveUnicastAdvertisement(addr, mac)
    262       self.assertNeighbourState(NUD_REACHABLE, addr)
    263 
    264     for _ in xrange(5):
    265       ForceProbe(router6, routermac)
    266 
    267   def testIsRouterFlag(self):
    268     router6 = self._RouterAddress(self.netid, 6)
    269     self.assertNeighbourState(NUD_STALE, router6)
    270 
    271     # Get into FAILED.
    272     ifindex = self.ifindices[self.netid]
    273     self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED)
    274     self.ExpectNeighbourNotification(router6, NUD_FAILED)
    275     self.assertNeighbourState(NUD_FAILED, router6)
    276 
    277     time.sleep(1)
    278 
    279     # Send another packet and expect a multicast NS.
    280     routing_mode = random.choice(["mark", "oif", "uid"])
    281     s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
    282     s.connect((net_test.IPV6_ADDR, 53))
    283     s.send(net_test.UDP_PAYLOAD)
    284     self.ExpectMulticastNS(router6)
    285 
    286     # Receive a unicast NA with the R flag set to 0.
    287     self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid),
    288                                      srcaddr=self._RouterAddress(self.netid, 6),
    289                                      dstaddr=self.MyAddress(6, self.netid),
    290                                      S=1, O=0, R=0)
    291 
    292     # Expect that this takes us to REACHABLE.
    293     self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
    294     self.assertNeighbourState(NUD_REACHABLE, router6)
    295 
    296 
    297 if __name__ == "__main__":
    298   unittest.main()
    299