1 #!/usr/bin/python 2 # 3 # Copyright 2014 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 17 # pylint: disable=g-bad-todo 18 19 import errno 20 import os 21 import posix 22 import random 23 from socket import * # pylint: disable=wildcard-import 24 import struct 25 import sys 26 import threading 27 import time 28 import unittest 29 30 from scapy import all as scapy 31 32 import csocket 33 import multinetwork_base 34 import net_test 35 36 37 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6") 38 39 ICMP_ECHO = 8 40 ICMP_ECHOREPLY = 0 41 ICMPV6_ECHO_REQUEST = 128 42 ICMPV6_ECHO_REPLY = 129 43 IPV6_MIN_MTU = 1280 44 ICMPV6_HEADER_LEN = 8 45 ICMPV6_PKT_TOOBIG = 2 46 47 48 class PingReplyThread(threading.Thread): 49 50 MIN_TTL = 10 51 INTERMEDIATE_IPV4 = "192.0.2.2" 52 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d" 53 NEIGHBOURS = ["fe80::1"] 54 LINK_MTU = 1300 55 56 def __init__(self, tun, mymac, routermac, routeraddr): 57 super(PingReplyThread, self).__init__() 58 self._tun = tun 59 self._started = False 60 self._stopped = False 61 self._mymac = mymac 62 self._routermac = routermac 63 self._routeraddr = routeraddr 64 65 def IsStarted(self): 66 return self._started 67 68 def Stop(self): 69 self._stopped = True 70 71 def ChecksumValid(self, packet): 72 # Get and clear the checksums. 73 def GetAndClearChecksum(layer): 74 if not layer: 75 return 76 try: 77 checksum = layer.chksum 78 del layer.chksum 79 except AttributeError: 80 checksum = layer.cksum 81 del layer.cksum 82 return checksum 83 84 def GetChecksum(layer): 85 try: 86 return layer.chksum 87 except AttributeError: 88 return layer.cksum 89 90 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest] 91 sums = {} 92 for name in layers: 93 sums[name] = GetAndClearChecksum(packet.getlayer(name)) 94 95 # Serialize the packet, so scapy recalculates the checksums, and compare 96 # them with the ones in the packet. 97 packet = packet.__class__(str(packet)) 98 for name in layers: 99 layer = packet.getlayer(name) 100 if layer and GetChecksum(layer) != sums[name]: 101 return False 102 103 return True 104 105 def SendTimeExceeded(self, version, packet): 106 if version == 4: 107 src = packet.getlayer(scapy.IP).src 108 self.SendPacket( 109 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) / 110 scapy.ICMP(type=11, code=0) / 111 packet) 112 elif version == 6: 113 src = packet.getlayer(scapy.IPv6).src 114 self.SendPacket( 115 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 116 scapy.ICMPv6TimeExceeded(code=0) / 117 packet) 118 119 def SendPacketTooBig(self, packet): 120 src = packet.getlayer(scapy.IPv6).src 121 datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN 122 self.SendPacket( 123 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 124 scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) / 125 str(packet)[:datalen]) 126 127 def IPv4Packet(self, ip): 128 icmp = ip.getlayer(scapy.ICMP) 129 130 # We only support ping for now. 131 if (ip.proto != IPPROTO_ICMP or 132 icmp.type != ICMP_ECHO or 133 icmp.code != 0): 134 return 135 136 # Check the checksums. 137 if not self.ChecksumValid(ip): 138 return 139 140 if ip.ttl < self.MIN_TTL: 141 self.SendTimeExceeded(4, ip) 142 return 143 144 icmp.type = ICMP_ECHOREPLY 145 self.SwapAddresses(ip) 146 self.SendPacket(ip) 147 148 def IPv6Packet(self, ipv6): 149 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest) 150 151 # We only support ping for now. 152 if (ipv6.nh != IPPROTO_ICMPV6 or 153 not icmpv6 or 154 icmpv6.type != ICMPV6_ECHO_REQUEST or 155 icmpv6.code != 0): 156 return 157 158 # Check the checksums. 159 if not self.ChecksumValid(ipv6): 160 return 161 162 if ipv6.dst.startswith("ff02::"): 163 ipv6.dst = ipv6.src 164 for src in [self._routeraddr]: 165 ipv6.src = src 166 icmpv6.type = ICMPV6_ECHO_REPLY 167 self.SendPacket(ipv6) 168 elif ipv6.hlim < self.MIN_TTL: 169 self.SendTimeExceeded(6, ipv6) 170 elif ipv6.plen > self.LINK_MTU: 171 self.SendPacketTooBig(ipv6) 172 else: 173 icmpv6.type = ICMPV6_ECHO_REPLY 174 if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr: 175 return 176 self.SwapAddresses(ipv6) 177 self.SendPacket(ipv6) 178 179 def SwapAddresses(self, packet): 180 src = packet.src 181 packet.src = packet.dst 182 packet.dst = src 183 184 def SendPacket(self, packet): 185 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet 186 try: 187 posix.write(self._tun.fileno(), str(packet)) 188 except Exception, e: 189 if not self._stopped: 190 raise e 191 192 def run(self): 193 self._started = True 194 while not self._stopped: 195 try: 196 packet = posix.read(self._tun.fileno(), 4096) 197 except OSError, e: 198 if e.errno == errno.EAGAIN: 199 continue 200 else: 201 break 202 except ValueError, e: 203 if not self._stopped: 204 raise e 205 206 ether = scapy.Ether(packet) 207 if ether.type == net_test.ETH_P_IPV6: 208 self.IPv6Packet(ether.payload) 209 elif ether.type == net_test.ETH_P_IP: 210 self.IPv4Packet(ether.payload) 211 212 213 class Ping6Test(multinetwork_base.MultiNetworkBaseTest): 214 215 @classmethod 216 def WaitForReplyThreads(cls): 217 # Wait 2s for the reply threads to start. If they don't, don't blow up, as 218 # that would cause tearDownClass not to be called and thus not clean up 219 # routing configuration, breaking subsequent tests. Instead, just let these 220 # tests fail. 221 _INTERVAL = 0.1 222 _ATTEMPTS = 20 223 for i in xrange(0, _ATTEMPTS): 224 for netid in cls.NETIDS: 225 if all(thread.IsStarted() for thread in cls.reply_threads.values()): 226 return 227 time.sleep(_INTERVAL) 228 msg = "WARNING: reply threads not all started after %.1f seconds\n" % ( 229 _ATTEMPTS * _INTERVAL) 230 sys.stderr.write(msg) 231 232 @classmethod 233 def StopReplyThreads(cls): 234 for thread in cls.reply_threads.values(): 235 thread.Stop() 236 237 @classmethod 238 def setUpClass(cls): 239 super(Ping6Test, cls).setUpClass() 240 cls.reply_threads = {} 241 for netid in cls.NETIDS: 242 cls.reply_threads[netid] = PingReplyThread( 243 cls.tuns[netid], 244 cls.MyMacAddress(netid), 245 cls.RouterMacAddress(netid), 246 cls._RouterAddress(netid, 6)) 247 cls.reply_threads[netid].start() 248 cls.WaitForReplyThreads() 249 cls.netid = random.choice(cls.NETIDS) 250 cls.SetDefaultNetwork(cls.netid) 251 252 @classmethod 253 def tearDownClass(cls): 254 cls.StopReplyThreads() 255 cls.ClearDefaultNetwork() 256 super(Ping6Test, cls).tearDownClass() 257 258 def setUp(self): 259 self.ifname = self.GetInterfaceName(self.netid) 260 self.ifindex = self.ifindices[self.netid] 261 self.lladdr = net_test.GetLinkAddress(self.ifname, True) 262 self.globaladdr = net_test.GetLinkAddress(self.ifname, False) 263 264 def assertValidPingResponse(self, s, data): 265 family = s.family 266 267 # Receive the reply. 268 rcvd, src = s.recvfrom(32768) 269 self.assertNotEqual(0, len(rcvd), "No data received") 270 271 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it 272 # as IPv4. 273 if src[0].startswith("::ffff:"): 274 family = AF_INET 275 src = (src[0].replace("::ffff:", ""), src[1:]) 276 277 # Check the data being sent is valid. 278 self.assertGreater(len(data), 7, "Not enough data for ping packet") 279 if family == AF_INET: 280 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request") 281 elif family == AF_INET6: 282 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request") 283 else: 284 self.fail("Unknown socket address family %d" * s.family) 285 286 # Check address, ICMP type, and ICMP code. 287 if family == AF_INET: 288 addr, unused_port = src 289 self.assertGreaterEqual(len(addr), len("1.1.1.1")) 290 self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply") 291 else: 292 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking 293 self.assertGreaterEqual(len(addr), len("::")) 294 self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply") 295 # Check that the flow label is zero and that the scope ID is sane. 296 self.assertEqual(flowlabel, 0) 297 if addr.startswith("fe80::"): 298 self.assertTrue(scope_id in self.ifindices.values()) 299 else: 300 self.assertEquals(0, scope_id) 301 302 # TODO: check the checksum. We can't do this easily now for ICMPv6 because 303 # we don't have the IP addresses so we can't construct the pseudoheader. 304 305 # Check the sequence number and the data. 306 self.assertEqual(len(data), len(rcvd)) 307 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) 308 309 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, 310 txmem=0, rxmem=0): 311 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), 312 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), 313 "%02X" % state, 314 "%08X:%08X" % (txmem, rxmem), 315 str(os.getuid()), "2", "0"] 316 actual = self.ReadProcNetSocket(name)[-1] 317 self.assertListEqual(expected, actual) 318 319 def testIPv4SendWithNoConnection(self): 320 s = net_test.IPv4PingSocket() 321 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) 322 323 def testIPv6SendWithNoConnection(self): 324 s = net_test.IPv6PingSocket() 325 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) 326 327 def testIPv4LoopbackPingWithConnect(self): 328 s = net_test.IPv4PingSocket() 329 s.connect(("127.0.0.1", 55)) 330 data = net_test.IPV4_PING + "foobarbaz" 331 s.send(data) 332 self.assertValidPingResponse(s, data) 333 334 def testIPv6LoopbackPingWithConnect(self): 335 s = net_test.IPv6PingSocket() 336 s.connect(("::1", 55)) 337 s.send(net_test.IPV6_PING) 338 self.assertValidPingResponse(s, net_test.IPV6_PING) 339 340 def testIPv4PingUsingSendto(self): 341 s = net_test.IPv4PingSocket() 342 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 343 self.assertEquals(len(net_test.IPV4_PING), written) 344 self.assertValidPingResponse(s, net_test.IPV4_PING) 345 346 def testIPv6PingUsingSendto(self): 347 s = net_test.IPv6PingSocket() 348 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 349 self.assertEquals(len(net_test.IPV6_PING), written) 350 self.assertValidPingResponse(s, net_test.IPV6_PING) 351 352 def testIPv4NoCrash(self): 353 # Python 2.x does not provide either read() or recvmsg. 354 s = net_test.IPv4PingSocket() 355 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) 356 self.assertEquals(len(net_test.IPV4_PING), written) 357 fd = s.fileno() 358 reply = posix.read(fd, 4096) 359 self.assertEquals(written, len(reply)) 360 361 def testIPv6NoCrash(self): 362 # Python 2.x does not provide either read() or recvmsg. 363 s = net_test.IPv6PingSocket() 364 written = s.sendto(net_test.IPV6_PING, ("::1", 55)) 365 self.assertEquals(len(net_test.IPV6_PING), written) 366 fd = s.fileno() 367 reply = posix.read(fd, 4096) 368 self.assertEquals(written, len(reply)) 369 370 def testCrossProtocolCrash(self): 371 # Checks that an ICMP error containing a ping packet that matches the ID 372 # of a socket of the wrong protocol (which can happen when using 464xlat) 373 # doesn't crash the kernel. 374 375 # We can only test this using IPv6 unreachables and IPv4 ping sockets, 376 # because IPv4 packets sent by scapy.send() on loopback are not received by 377 # the kernel. So we don't actually use this function yet. 378 def GetIPv4Unreachable(port): # pylint: disable=unused-variable 379 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") / 380 scapy.ICMP(type=3, code=0) / 381 scapy.IP(src="127.0.0.1", dst="127.0.0.1") / 382 scapy.ICMP(type=8, id=port, seq=1)) 383 384 def GetIPv6Unreachable(port): 385 return (scapy.IPv6(src="::1", dst="::1") / 386 scapy.ICMPv6DestUnreach() / 387 scapy.IPv6(src="::1", dst="::1") / 388 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) 389 390 # An unreachable matching the ID of a socket of the wrong protocol 391 # shouldn't crash. 392 s = net_test.IPv4PingSocket() 393 s.connect(("127.0.0.1", 12345)) 394 _, port = s.getsockname() 395 scapy.send(GetIPv6Unreachable(port), verbose=False) 396 # No crash? Good. 397 398 def testCrossProtocolCalls(self): 399 """Tests that passing in the wrong family returns EAFNOSUPPORT. 400 401 Relevant kernel commits: 402 upstream net: 403 91a0b60 net/ping: handle protocol mismatching scenario 404 9145736d net: ping: Return EAFNOSUPPORT when appropriate. 405 406 android-3.10: 407 78a6809 net/ping: handle protocol mismatching scenario 408 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate. 409 """ 410 411 def CheckEAFNoSupport(function, *args): 412 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args) 413 414 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53)) 415 416 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed 417 # IPv4 socket address structures, we need to pass down a socket address 418 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls 419 # will fail immediately with EINVAL because the passed-in socket length is 420 # too short. So create a sockaddr_in that's as long as a sockaddr_in6. 421 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53)) 422 ipv4sockaddr = csocket.SockaddrIn6( 423 ipv4sockaddr.Pack() + 424 "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn))) 425 426 s4 = net_test.IPv4PingSocket() 427 s6 = net_test.IPv6PingSocket() 428 429 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong 430 # address family, because the Python implementation will just pass garbage 431 # down to the kernel. So call the C functions directly. 432 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr) 433 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr) 434 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr) 435 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr) 436 CheckEAFNoSupport(csocket.Sendmsg, 437 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0) 438 CheckEAFNoSupport(csocket.Sendmsg, 439 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0) 440 441 def testIPv4Bind(self): 442 # Bind to unspecified address. 443 s = net_test.IPv4PingSocket() 444 s.bind(("0.0.0.0", 544)) 445 self.assertEquals(("0.0.0.0", 544), s.getsockname()) 446 447 # Bind to loopback. 448 s = net_test.IPv4PingSocket() 449 s.bind(("127.0.0.1", 99)) 450 self.assertEquals(("127.0.0.1", 99), s.getsockname()) 451 452 # Binding twice is not allowed. 453 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) 454 455 # But binding two different sockets to the same ID is allowed. 456 s2 = net_test.IPv4PingSocket() 457 s2.bind(("127.0.0.1", 99)) 458 self.assertEquals(("127.0.0.1", 99), s2.getsockname()) 459 s3 = net_test.IPv4PingSocket() 460 s3.bind(("127.0.0.1", 99)) 461 self.assertEquals(("127.0.0.1", 99), s3.getsockname()) 462 463 # If two sockets bind to the same port, the first one to call read() gets 464 # the response. 465 s4 = net_test.IPv4PingSocket() 466 s5 = net_test.IPv4PingSocket() 467 s4.bind(("0.0.0.0", 167)) 468 s5.bind(("0.0.0.0", 167)) 469 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) 470 self.assertValidPingResponse(s5, net_test.IPV4_PING) 471 net_test.SetSocketTimeout(s4, 100) 472 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) 473 474 # If SO_REUSEADDR is turned off, then we get EADDRINUSE. 475 s6 = net_test.IPv4PingSocket() 476 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) 477 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) 478 479 # Can't bind after sendto. 480 s = net_test.IPv4PingSocket() 481 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) 482 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) 483 484 def testIPv6Bind(self): 485 # Bind to unspecified address. 486 s = net_test.IPv6PingSocket() 487 s.bind(("::", 769)) 488 self.assertEquals(("::", 769, 0, 0), s.getsockname()) 489 490 # Bind to loopback. 491 s = net_test.IPv6PingSocket() 492 s.bind(("::1", 99)) 493 self.assertEquals(("::1", 99, 0, 0), s.getsockname()) 494 495 # Binding twice is not allowed. 496 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) 497 498 # But binding two different sockets to the same ID is allowed. 499 s2 = net_test.IPv6PingSocket() 500 s2.bind(("::1", 99)) 501 self.assertEquals(("::1", 99, 0, 0), s2.getsockname()) 502 s3 = net_test.IPv6PingSocket() 503 s3.bind(("::1", 99)) 504 self.assertEquals(("::1", 99, 0, 0), s3.getsockname()) 505 506 # Binding both IPv4 and IPv6 to the same socket works. 507 s4 = net_test.IPv4PingSocket() 508 s6 = net_test.IPv6PingSocket() 509 s4.bind(("0.0.0.0", 444)) 510 s6.bind(("::", 666, 0, 0)) 511 512 # Can't bind after sendto. 513 s = net_test.IPv6PingSocket() 514 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) 515 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) 516 517 def testIPv4InvalidBind(self): 518 s = net_test.IPv4PingSocket() 519 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 520 s.bind, ("255.255.255.255", 1026)) 521 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 522 s.bind, ("224.0.0.1", 651)) 523 # Binding to an address we don't have only works with IP_TRANSPARENT. 524 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 525 s.bind, (net_test.IPV4_ADDR, 651)) 526 try: 527 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) 528 s.bind((net_test.IPV4_ADDR, 651)) 529 except IOError, e: 530 if e.errno == errno.EACCES: 531 pass # We're not root. let it go for now. 532 533 def testIPv6InvalidBind(self): 534 s = net_test.IPv6PingSocket() 535 self.assertRaisesErrno(errno.EINVAL, 536 s.bind, ("ff02::2", 1026)) 537 538 # Binding to an address we don't have only works with IPV6_TRANSPARENT. 539 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 540 s.bind, (net_test.IPV6_ADDR, 651)) 541 try: 542 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) 543 s.bind((net_test.IPV6_ADDR, 651)) 544 except IOError, e: 545 if e.errno == errno.EACCES: 546 pass # We're not root. let it go for now. 547 548 def testAfUnspecBind(self): 549 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0. 550 s4 = net_test.IPv4PingSocket() 551 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) 552 sockaddr.family = AF_UNSPEC 553 csocket.Bind(s4, sockaddr) 554 self.assertEquals(("0.0.0.0", 12996), s4.getsockname()) 555 556 # But not if the address is anything else. 557 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) 558 sockaddr.family = AF_UNSPEC 559 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr) 560 561 # This doesn't work for IPv6. 562 s6 = net_test.IPv6PingSocket() 563 sockaddr = csocket.Sockaddr(("::1", 58997)) 564 sockaddr.family = AF_UNSPEC 565 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr) 566 567 def testIPv6ScopedBind(self): 568 # Can't bind to a link-local address without a scope ID. 569 s = net_test.IPv6PingSocket() 570 self.assertRaisesErrno(errno.EINVAL, 571 s.bind, (self.lladdr, 1026, 0, 0)) 572 573 # Binding to a link-local address with a scope ID works, and the scope ID is 574 # returned by a subsequent getsockname. Interestingly, Python's getsockname 575 # returns "fe80:1%foo", even though it does not understand it. 576 expected = self.lladdr + "%" + self.ifname 577 s.bind((self.lladdr, 4646, 0, self.ifindex)) 578 self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname()) 579 580 # Of course, for the above to work the address actually has to be configured 581 # on the machine. 582 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 583 s.bind, ("fe80::f00", 1026, 0, 1)) 584 585 # Scope IDs on non-link-local addresses are silently ignored. 586 s = net_test.IPv6PingSocket() 587 s.bind(("::1", 1234, 0, 1)) 588 self.assertEquals(("::1", 1234, 0, 0), s.getsockname()) 589 590 def testBindAffectsIdentifier(self): 591 s = net_test.IPv6PingSocket() 592 s.bind((self.globaladdr, 0xf976)) 593 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 594 self.assertEquals("\xf9\x76", s.recv(32768)[4:6]) 595 596 s = net_test.IPv6PingSocket() 597 s.bind((self.globaladdr, 0xace)) 598 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 599 self.assertEquals("\x0a\xce", s.recv(32768)[4:6]) 600 601 def testLinkLocalAddress(self): 602 s = net_test.IPv6PingSocket() 603 # Sending to a link-local address with no scope fails with EINVAL. 604 self.assertRaisesErrno(errno.EINVAL, 605 s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) 606 # Sending to link-local address with a scope succeeds. Note that Python 607 # doesn't understand the "fe80::1%lo" format, even though it returns it. 608 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) 609 # No exceptions? Good. 610 611 def testLinkLocalOif(self): 612 """Checks that ping to link-local addresses works correctly. 613 614 Relevant kernel commits: 615 upstream net: 616 5e45789 net: ipv6: Fix ping to link-local addresses. 617 """ 618 s = net_test.IPv6PingSocket() 619 for mode in ["oif", "ucast_oif", None]: 620 s = net_test.IPv6PingSocket() 621 for netid in self.NETIDS: 622 dst = self._RouterAddress(netid, 6) 623 self.assertTrue(dst.startswith("fe80:")) 624 625 if mode: 626 self.SelectInterface(s, netid, mode) 627 scopeid = 0 628 else: 629 scopeid = self.ifindices[netid] 630 631 if mode == "oif": 632 # If SO_BINDTODEVICE has been set, any attempt to send on another 633 # interface returns EINVAL. 634 othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1) 635 % len(self.NETIDS)] 636 otherscopeid = self.ifindices[othernetid] 637 self.assertRaisesErrno( 638 errno.EINVAL, 639 s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid)) 640 641 s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid)) 642 # If we got a reply, we sent the packet out on the right interface. 643 self.assertValidPingResponse(s, net_test.IPV6_PING) 644 645 def testMappedAddressFails(self): 646 s = net_test.IPv6PingSocket() 647 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 648 self.assertValidPingResponse(s, net_test.IPV6_PING) 649 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) 650 self.assertValidPingResponse(s, net_test.IPV6_PING) 651 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 652 ("::ffff:192.0.2.1", 55)) 653 654 @unittest.skipUnless(False, "skipping: does not work yet") 655 def testFlowLabel(self): 656 s = net_test.IPv6PingSocket() 657 658 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but 659 # the flow label in the packet is not set. 660 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 661 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0. 662 663 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label 664 # that is not registered with the flow manager should return EINVAL... 665 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) 666 # ... but this doesn't work yet. 667 if False: 668 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 669 (net_test.IPV6_ADDR, 93, 0xdead, 0)) 670 671 # After registering the flow label, it gets sent properly, appears in the 672 # output packet, and is returned in the response. 673 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead) 674 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, 675 net_test.IPV6_FLOWINFO_SEND)) 676 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 677 _, src = s.recvfrom(32768) 678 _, _, flowlabel, _ = src 679 self.assertEqual(0xdead, flowlabel & 0xfffff) 680 681 def testIPv4Error(self): 682 s = net_test.IPv4PingSocket() 683 s.setsockopt(SOL_IP, IP_TTL, 2) 684 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) 685 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 686 # We can't check the actual error because Python 2.7 doesn't implement 687 # recvmsg, but we can at least check that the socket returns an error. 688 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 689 690 def testIPv6Error(self): 691 s = net_test.IPv6PingSocket() 692 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) 693 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 694 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 695 # We can't check the actual error because Python 2.7 doesn't implement 696 # recvmsg, but we can at least check that the socket returns an error. 697 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 698 699 def testIPv6MulticastPing(self): 700 s = net_test.IPv6PingSocket() 701 # Send a multicast ping and check we get at least one duplicate. 702 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug. 703 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 704 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) 705 self.assertValidPingResponse(s, net_test.IPV6_PING) 706 self.assertValidPingResponse(s, net_test.IPV6_PING) 707 708 def testIPv4LargePacket(self): 709 s = net_test.IPv4PingSocket() 710 data = net_test.IPV4_PING + 20000 * "a" 711 s.sendto(data, ("127.0.0.1", 987)) 712 self.assertValidPingResponse(s, data) 713 714 def testIPv6LargePacket(self): 715 s = net_test.IPv6PingSocket() 716 s.bind(("::", 0xace)) 717 data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa" 718 s.sendto(data, ("::1", 953)) 719 720 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 721 def testIcmpSocketsNotInIcmp6(self): 722 numrows = len(self.ReadProcNetSocket("icmp")) 723 numrows6 = len(self.ReadProcNetSocket("icmp6")) 724 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 725 s.bind(("127.0.0.1", 0xace)) 726 s.connect(("127.0.0.1", 0xbeef)) 727 self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp"))) 728 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 729 730 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 731 def testIcmp6SocketsNotInIcmp(self): 732 numrows = len(self.ReadProcNetSocket("icmp")) 733 numrows6 = len(self.ReadProcNetSocket("icmp6")) 734 s = net_test.IPv6PingSocket() 735 s.bind(("::1", 0xace)) 736 s.connect(("::1", 0xbeef)) 737 self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp"))) 738 self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) 739 740 def testProcNetIcmp(self): 741 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 742 s.bind(("127.0.0.1", 0xace)) 743 s.connect(("127.0.0.1", 0xbeef)) 744 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) 745 746 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 747 def testProcNetIcmp6(self): 748 numrows6 = len(self.ReadProcNetSocket("icmp6")) 749 s = net_test.IPv6PingSocket() 750 s.bind(("::1", 0xace)) 751 s.connect(("::1", 0xbeef)) 752 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) 753 754 # Check the row goes away when the socket is closed. 755 s.close() 756 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 757 758 # Try send, bind and connect to check the addresses and the state. 759 s = net_test.IPv6PingSocket() 760 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 761 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) 762 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) 763 764 # Can't bind after sendto, apparently. 765 s = net_test.IPv6PingSocket() 766 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 767 s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) 768 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) 769 770 # Check receive bytes. 771 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 772 s.connect(("ff02::1", 0xdead)) 773 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) 774 s.send(net_test.IPV6_PING) 775 s.recvfrom(32768, MSG_PEEK) # Wait until the receive thread replies. 776 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 777 txmem=0, rxmem=0x300) 778 self.assertValidPingResponse(s, net_test.IPV6_PING) 779 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 780 txmem=0, rxmem=0) 781 782 def testProcNetUdp6(self): 783 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) 784 s.bind(("::1", 0xace)) 785 s.connect(("::1", 0xbeef)) 786 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) 787 788 def testProcNetRaw6(self): 789 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) 790 s.bind(("::1", 0xace)) 791 s.connect(("::1", 0xbeef)) 792 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) 793 794 def testIPv6MTU(self): 795 """Tests IPV6_RECVERR and path MTU discovery on ping sockets. 796 797 Relevant kernel commits: 798 upstream net-next: 799 dcb94b8 ipv6: fix endianness error in icmpv6_err 800 """ 801 s = net_test.IPv6PingSocket() 802 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1) 803 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2) 804 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 805 s.connect((net_test.IPV6_ADDR, 55)) 806 pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a" 807 s.send(pkt) 808 self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768) 809 data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE) 810 811 # Compare the offending packet with the one we sent. To do this we need to 812 # calculate the ident of the packet we sent and blank out the checksum of 813 # the one we received. 814 ident = struct.pack("!H", s.getsockname()[1]) 815 pkt = pkt[:4] + ident + pkt[6:] 816 data = data[:2] + "\x00\x00" + pkt[4:] 817 self.assertEquals(pkt, data) 818 819 # Check the address that the packet was sent to. 820 # ... except in 4.1, where it just returns an AF_UNSPEC, like this: 821 # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC, 822 # sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, 823 # msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}], 824 # msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...}, 825 # msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232 826 if net_test.LINUX_VERSION != (4, 1, 0): 827 self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr) 828 829 # Check the cmsg data, including the link MTU. 830 mtu = PingReplyThread.LINK_MTU 831 src = self.reply_threads[self.netid].INTERMEDIATE_IPV6 832 msglist = [ 833 (net_test.SOL_IPV6, net_test.IPV6_RECVERR, 834 (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6, 835 ICMPV6_PKT_TOOBIG, 0, mtu, 0)), 836 csocket.Sockaddr((src, 0)))) 837 ] 838 839 # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port. 840 # The fix might have been in 676d236, but we don't have that in 3.10 and it 841 # touches code all over the tree. Instead, just don't check the port. 842 if net_test.LINUX_VERSION <= (3, 14, 0): 843 msglist[0][2][1].port = cmsg[0][2][1].port 844 845 self.assertEquals(msglist, cmsg) 846 847 848 if __name__ == "__main__": 849 unittest.main() 850