1 #!/usr/bin/python 2 # 3 # Copyright 2015 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 17 import errno 18 import random 19 from socket import * # pylint: disable=wildcard-import 20 import time 21 import unittest 22 23 from scapy import all as scapy 24 25 import multinetwork_base 26 import net_test 27 28 29 RTMGRP_NEIGH = 4 30 31 NUD_INCOMPLETE = 0x01 32 NUD_REACHABLE = 0x02 33 NUD_STALE = 0x04 34 NUD_DELAY = 0x08 35 NUD_PROBE = 0x10 36 NUD_FAILED = 0x20 37 NUD_PERMANENT = 0x80 38 39 40 # TODO: Support IPv4. 41 class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): 42 43 # Set a 500-ms retrans timer so we can test for ND retransmits without 44 # waiting too long. Apparently this cannot go below 500ms. 45 RETRANS_TIME_MS = 500 46 47 # This can only be in seconds, so 1000 is the minimum. 48 DELAY_TIME_MS = 1000 49 50 # Unfortunately, this must be above the delay timer or the kernel ND code will 51 # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is 52 # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value 53 # that's 2x the delay timer. 54 BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS 55 MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS 56 57 @classmethod 58 def setUpClass(cls): 59 super(NeighbourTest, cls).setUpClass() 60 for netid in cls.tuns: 61 iface = cls.GetInterfaceName(netid) 62 # This can't be set in an RA. 63 cls.SetSysctl( 64 "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface, 65 cls.DELAY_TIME_MS / 1000) 66 67 def setUp(self): 68 super(NeighbourTest, self).setUp() 69 70 for netid in self.tuns: 71 # Clear the ND cache entries for all routers, so each test starts with 72 # the IPv6 default router in state STALE. 73 addr = self._RouterAddress(netid, 6) 74 ifindex = self.ifindices[netid] 75 self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED) 76 77 # Configure IPv6 by sending an RA. 78 self.SendRA(netid, 79 retranstimer=self.RETRANS_TIME_MS, 80 reachabletime=self.BASE_REACHABLE_TIME_MS) 81 82 self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE) 83 self.sock.bind((0, RTMGRP_NEIGH)) 84 net_test.SetNonBlocking(self.sock) 85 86 self.netid = random.choice(self.tuns.keys()) 87 self.ifindex = self.ifindices[self.netid] 88 89 def GetNeighbour(self, addr): 90 version = 6 if ":" in addr else 4 91 for msg, args in self.iproute.DumpNeighbours(version): 92 if args["NDA_DST"] == addr: 93 return msg, args 94 95 def GetNdEntry(self, addr): 96 return self.GetNeighbour(addr) 97 98 def CheckNoNdEvents(self): 99 self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK) 100 101 def assertNeighbourState(self, state, addr): 102 self.assertEquals(state, self.GetNdEntry(addr)[0].state) 103 104 def assertNeighbourAttr(self, addr, name, value): 105 self.assertEquals(value, self.GetNdEntry(addr)[1][name]) 106 107 def ExpectNeighbourNotification(self, addr, state, attrs=None): 108 msg = self.sock.recv(4096) 109 msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg) 110 self.assertEquals(addr, actual_attrs["NDA_DST"]) 111 self.assertEquals(state, msg.state) 112 if attrs: 113 for name in attrs: 114 self.assertEquals(attrs[name], actual_attrs[name]) 115 116 def ExpectProbe(self, is_unicast, addr): 117 version = 6 if ":" in addr else 4 118 if version == 6: 119 llsrc = self.MyMacAddress(self.netid) 120 if is_unicast: 121 src = self.MyLinkLocalAddress(self.netid) 122 dst = addr 123 else: 124 solicited = inet_pton(AF_INET6, addr) 125 last3bytes = tuple([ord(b) for b in solicited[-3:]]) 126 dst = "ff02::1:ff%02x:%02x%02x" % last3bytes 127 src = self.MyAddress(6, self.netid) 128 expected = ( 129 scapy.IPv6(src=src, dst=dst) / 130 scapy.ICMPv6ND_NS(tgt=addr) / 131 scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc) 132 ) 133 msg = "%s probe" % ("Unicast" if is_unicast else "Multicast") 134 self.ExpectPacketOn(self.netid, msg, expected) 135 else: 136 raise NotImplementedError 137 138 def ExpectUnicastProbe(self, addr): 139 self.ExpectProbe(True, addr) 140 141 def ExpectMulticastNS(self, addr): 142 self.ExpectProbe(False, addr) 143 144 def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None, 145 S=1, O=0, R=1): 146 version = 6 if ":" in addr else 4 147 if srcaddr is None: 148 srcaddr = addr 149 if dstaddr is None: 150 dstaddr = self.MyLinkLocalAddress(self.netid) 151 if version == 6: 152 packet = ( 153 scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) / 154 scapy.IPv6(src=srcaddr, dst=dstaddr) / 155 scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) / 156 scapy.ICMPv6NDOptDstLLAddr(lladdr=mac) 157 ) 158 self.ReceiveEtherPacketOn(self.netid, packet) 159 else: 160 raise NotImplementedError 161 162 def MonitorSleepMs(self, interval, addr): 163 slept = 0 164 while slept < interval: 165 sleep_ms = min(100, interval - slept) 166 time.sleep(sleep_ms / 1000.0) 167 slept += sleep_ms 168 print self.GetNdEntry(addr) 169 170 def MonitorSleep(self, intervalseconds, addr): 171 self.MonitorSleepMs(intervalseconds * 1000, addr) 172 173 def SleepMs(self, ms): 174 time.sleep(ms / 1000.0) 175 176 def testNotifications(self): 177 """Tests neighbour notifications. 178 179 Relevant kernel commits: 180 upstream net-next: 181 765c9c6 neigh: Better handling of transition to NUD_PROBE state 182 53385d2 neigh: Netlink notification for administrative NUD state change 183 (only checked on kernel v3.13+, not on v3.10) 184 185 android-3.10: 186 e4a6d6b neigh: Better handling of transition to NUD_PROBE state 187 188 android-3.18: 189 2011e72 neigh: Better handling of transition to NUD_PROBE state 190 """ 191 router4 = self._RouterAddress(self.netid, 4) 192 router6 = self._RouterAddress(self.netid, 6) 193 self.assertNeighbourState(NUD_PERMANENT, router4) 194 self.assertNeighbourState(NUD_STALE, router6) 195 196 # Send a packet and check that we go into DELAY. 197 routing_mode = random.choice(["mark", "oif", "uid"]) 198 s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode) 199 s.connect((net_test.IPV6_ADDR, 53)) 200 s.send(net_test.UDP_PAYLOAD) 201 self.assertNeighbourState(NUD_DELAY, router6) 202 203 # Wait for the probe interval, then check that we're in PROBE, and that the 204 # kernel has notified us. 205 self.SleepMs(self.DELAY_TIME_MS * 1.1) 206 self.ExpectNeighbourNotification(router6, NUD_PROBE) 207 self.assertNeighbourState(NUD_PROBE, router6) 208 self.ExpectUnicastProbe(router6) 209 210 # Respond to the NS and verify we're in REACHABLE again. 211 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid)) 212 self.assertNeighbourState(NUD_REACHABLE, router6) 213 if net_test.LINUX_VERSION >= (3, 13, 0): 214 # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative 215 # NUD state change" produces notifications for NUD_REACHABLE, but these 216 # are not generated on earlier kernels. 217 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 218 219 # Wait until the reachable time has passed, and verify we're in STALE. 220 self.SleepMs(self.MAX_REACHABLE_TIME_MS * 1.2) 221 self.assertNeighbourState(NUD_STALE, router6) 222 self.ExpectNeighbourNotification(router6, NUD_STALE) 223 224 # Send a packet, and verify we go into DELAY and then to PROBE. 225 s.send(net_test.UDP_PAYLOAD) 226 self.assertNeighbourState(NUD_DELAY, router6) 227 self.SleepMs(self.DELAY_TIME_MS * 1.1) 228 self.assertNeighbourState(NUD_PROBE, router6) 229 self.ExpectNeighbourNotification(router6, NUD_PROBE) 230 231 # Wait for the probes to time out, and expect a FAILED notification. 232 self.assertNeighbourAttr(router6, "NDA_PROBES", 1) 233 self.ExpectUnicastProbe(router6) 234 235 self.SleepMs(self.RETRANS_TIME_MS) 236 self.ExpectUnicastProbe(router6) 237 self.assertNeighbourAttr(router6, "NDA_PROBES", 2) 238 239 self.SleepMs(self.RETRANS_TIME_MS) 240 self.ExpectUnicastProbe(router6) 241 self.assertNeighbourAttr(router6, "NDA_PROBES", 3) 242 243 self.SleepMs(self.RETRANS_TIME_MS) 244 self.assertNeighbourState(NUD_FAILED, router6) 245 self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3}) 246 247 def testRepeatedProbes(self): 248 router4 = self._RouterAddress(self.netid, 4) 249 router6 = self._RouterAddress(self.netid, 6) 250 routermac = self.RouterMacAddress(self.netid) 251 self.assertNeighbourState(NUD_PERMANENT, router4) 252 self.assertNeighbourState(NUD_STALE, router6) 253 254 def ForceProbe(addr, mac): 255 self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE) 256 self.assertNeighbourState(NUD_PROBE, addr) 257 self.SleepMs(1) # TODO: Why is this necessary? 258 self.assertNeighbourState(NUD_PROBE, addr) 259 self.ExpectUnicastProbe(addr) 260 self.ReceiveUnicastAdvertisement(addr, mac) 261 self.assertNeighbourState(NUD_REACHABLE, addr) 262 263 for _ in xrange(5): 264 ForceProbe(router6, routermac) 265 266 def testIsRouterFlag(self): 267 router6 = self._RouterAddress(self.netid, 6) 268 self.assertNeighbourState(NUD_STALE, router6) 269 270 # Get into FAILED. 271 ifindex = self.ifindices[self.netid] 272 self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED) 273 self.ExpectNeighbourNotification(router6, NUD_FAILED) 274 self.assertNeighbourState(NUD_FAILED, router6) 275 276 time.sleep(1) 277 278 # Send another packet and expect a multicast NS. 279 routing_mode = random.choice(["mark", "oif", "uid"]) 280 s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode) 281 s.connect((net_test.IPV6_ADDR, 53)) 282 s.send(net_test.UDP_PAYLOAD) 283 self.ExpectMulticastNS(router6) 284 285 # Receive a unicast NA with the R flag set to 0. 286 self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid), 287 srcaddr=self._RouterAddress(self.netid, 6), 288 dstaddr=self.MyAddress(6, self.netid), 289 S=1, O=0, R=0) 290 291 # Expect that this takes us to REACHABLE. 292 self.ExpectNeighbourNotification(router6, NUD_REACHABLE) 293 self.assertNeighbourState(NUD_REACHABLE, router6) 294 295 296 if __name__ == "__main__": 297 unittest.main() 298