1 #!/usr/bin/python 2 # 3 # Copyright 2014 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 17 # pylint: disable=g-bad-todo 18 19 import errno 20 import os 21 import posix 22 import random 23 from socket import * # pylint: disable=wildcard-import 24 import struct 25 import sys 26 import threading 27 import time 28 import unittest 29 30 from scapy import all as scapy 31 32 import csocket 33 import multinetwork_base 34 import net_test 35 36 37 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6") 38 39 ICMP_ECHO = 8 40 ICMP_ECHOREPLY = 0 41 ICMPV6_ECHO_REQUEST = 128 42 ICMPV6_ECHO_REPLY = 129 43 IPV6_MIN_MTU = 1280 44 ICMPV6_HEADER_LEN = 8 45 ICMPV6_PKT_TOOBIG = 2 46 47 48 class PingReplyThread(threading.Thread): 49 50 MIN_TTL = 10 51 INTERMEDIATE_IPV4 = "192.0.2.2" 52 INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d" 53 NEIGHBOURS = ["fe80::1"] 54 LINK_MTU = 1300 55 56 def __init__(self, tun, mymac, routermac, routeraddr): 57 super(PingReplyThread, self).__init__() 58 self._tun = tun 59 self._started = False 60 self._stopped = False 61 self._mymac = mymac 62 self._routermac = routermac 63 self._routeraddr = routeraddr 64 65 def IsStarted(self): 66 return self._started 67 68 def Stop(self): 69 self._stopped = True 70 71 def ChecksumValid(self, packet): 72 # Get and clear the checksums. 73 def GetAndClearChecksum(layer): 74 if not layer: 75 return 76 try: 77 checksum = layer.chksum 78 del layer.chksum 79 except AttributeError: 80 checksum = layer.cksum 81 del layer.cksum 82 return checksum 83 84 def GetChecksum(layer): 85 try: 86 return layer.chksum 87 except AttributeError: 88 return layer.cksum 89 90 layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest] 91 sums = {} 92 for name in layers: 93 sums[name] = GetAndClearChecksum(packet.getlayer(name)) 94 95 # Serialize the packet, so scapy recalculates the checksums, and compare 96 # them with the ones in the packet. 97 packet = packet.__class__(str(packet)) 98 for name in layers: 99 layer = packet.getlayer(name) 100 if layer and GetChecksum(layer) != sums[name]: 101 return False 102 103 return True 104 105 def SendTimeExceeded(self, version, packet): 106 if version == 4: 107 src = packet.getlayer(scapy.IP).src 108 self.SendPacket( 109 scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) / 110 scapy.ICMP(type=11, code=0) / 111 packet) 112 elif version == 6: 113 src = packet.getlayer(scapy.IPv6).src 114 self.SendPacket( 115 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 116 scapy.ICMPv6TimeExceeded(code=0) / 117 packet) 118 119 def SendPacketTooBig(self, packet): 120 src = packet.getlayer(scapy.IPv6).src 121 datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN 122 self.SendPacket( 123 scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) / 124 scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) / 125 str(packet)[:datalen]) 126 127 def IPv4Packet(self, ip): 128 icmp = ip.getlayer(scapy.ICMP) 129 130 # We only support ping for now. 131 if (ip.proto != IPPROTO_ICMP or 132 icmp.type != ICMP_ECHO or 133 icmp.code != 0): 134 return 135 136 # Check the checksums. 137 if not self.ChecksumValid(ip): 138 return 139 140 if ip.ttl < self.MIN_TTL: 141 self.SendTimeExceeded(4, ip) 142 return 143 144 icmp.type = ICMP_ECHOREPLY 145 self.SwapAddresses(ip) 146 self.SendPacket(ip) 147 148 def IPv6Packet(self, ipv6): 149 icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest) 150 151 # We only support ping for now. 152 if (ipv6.nh != IPPROTO_ICMPV6 or 153 not icmpv6 or 154 icmpv6.type != ICMPV6_ECHO_REQUEST or 155 icmpv6.code != 0): 156 return 157 158 # Check the checksums. 159 if not self.ChecksumValid(ipv6): 160 return 161 162 if ipv6.dst.startswith("ff02::"): 163 ipv6.dst = ipv6.src 164 for src in [self._routeraddr]: 165 ipv6.src = src 166 icmpv6.type = ICMPV6_ECHO_REPLY 167 self.SendPacket(ipv6) 168 elif ipv6.hlim < self.MIN_TTL: 169 self.SendTimeExceeded(6, ipv6) 170 elif ipv6.plen > self.LINK_MTU: 171 self.SendPacketTooBig(ipv6) 172 else: 173 icmpv6.type = ICMPV6_ECHO_REPLY 174 if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr: 175 return 176 self.SwapAddresses(ipv6) 177 self.SendPacket(ipv6) 178 179 def SwapAddresses(self, packet): 180 src = packet.src 181 packet.src = packet.dst 182 packet.dst = src 183 184 def SendPacket(self, packet): 185 packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet 186 try: 187 posix.write(self._tun.fileno(), str(packet)) 188 except Exception, e: 189 if not self._stopped: 190 raise e 191 192 def run(self): 193 self._started = True 194 while not self._stopped: 195 try: 196 packet = posix.read(self._tun.fileno(), 4096) 197 except OSError, e: 198 if e.errno == errno.EAGAIN: 199 continue 200 else: 201 break 202 except ValueError, e: 203 if not self._stopped: 204 raise e 205 206 ether = scapy.Ether(packet) 207 if ether.type == net_test.ETH_P_IPV6: 208 self.IPv6Packet(ether.payload) 209 elif ether.type == net_test.ETH_P_IP: 210 self.IPv4Packet(ether.payload) 211 212 213 class Ping6Test(multinetwork_base.MultiNetworkBaseTest): 214 215 @classmethod 216 def WaitForReplyThreads(cls): 217 # Wait 2s for the reply threads to start. If they don't, don't blow up, as 218 # that would cause tearDownClass not to be called and thus not clean up 219 # routing configuration, breaking subsequent tests. Instead, just let these 220 # tests fail. 221 _INTERVAL = 0.1 222 _ATTEMPTS = 20 223 for i in xrange(0, _ATTEMPTS): 224 for netid in cls.NETIDS: 225 if all(thread.IsStarted() for thread in cls.reply_threads.values()): 226 return 227 time.sleep(_INTERVAL) 228 msg = "WARNING: reply threads not all started after %.1f seconds\n" % ( 229 _ATTEMPTS * _INTERVAL) 230 sys.stderr.write(msg) 231 232 @classmethod 233 def StopReplyThreads(cls): 234 for thread in cls.reply_threads.values(): 235 thread.Stop() 236 237 @classmethod 238 def setUpClass(cls): 239 super(Ping6Test, cls).setUpClass() 240 cls.reply_threads = {} 241 for netid in cls.NETIDS: 242 cls.reply_threads[netid] = PingReplyThread( 243 cls.tuns[netid], 244 cls.MyMacAddress(netid), 245 cls.RouterMacAddress(netid), 246 cls._RouterAddress(netid, 6)) 247 cls.reply_threads[netid].start() 248 cls.WaitForReplyThreads() 249 cls.netid = random.choice(cls.NETIDS) 250 cls.SetDefaultNetwork(cls.netid) 251 252 @classmethod 253 def tearDownClass(cls): 254 cls.StopReplyThreads() 255 cls.ClearDefaultNetwork() 256 super(Ping6Test, cls).tearDownClass() 257 258 def setUp(self): 259 self.ifname = self.GetInterfaceName(self.netid) 260 self.ifindex = self.ifindices[self.netid] 261 self.lladdr = net_test.GetLinkAddress(self.ifname, True) 262 self.globaladdr = net_test.GetLinkAddress(self.ifname, False) 263 264 def assertValidPingResponse(self, s, data): 265 family = s.family 266 267 # Receive the reply. 268 rcvd, src = s.recvfrom(32768) 269 self.assertNotEqual(0, len(rcvd), "No data received") 270 271 # If this is a dual-stack socket sending to a mapped IPv4 address, treat it 272 # as IPv4. 273 if src[0].startswith("::ffff:"): 274 family = AF_INET 275 src = (src[0].replace("::ffff:", ""), src[1:]) 276 277 # Check the data being sent is valid. 278 self.assertGreater(len(data), 7, "Not enough data for ping packet") 279 if family == AF_INET: 280 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request") 281 elif family == AF_INET6: 282 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request") 283 else: 284 self.fail("Unknown socket address family %d" * s.family) 285 286 # Check address, ICMP type, and ICMP code. 287 if family == AF_INET: 288 addr, unused_port = src 289 self.assertGreaterEqual(len(addr), len("1.1.1.1")) 290 self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply") 291 else: 292 addr, unused_port, flowlabel, scope_id = src # pylint: disable=unbalanced-tuple-unpacking 293 self.assertGreaterEqual(len(addr), len("::")) 294 self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply") 295 # Check that the flow label is zero and that the scope ID is sane. 296 self.assertEqual(flowlabel, 0) 297 if addr.startswith("fe80::"): 298 self.assertTrue(scope_id in self.ifindices.values()) 299 else: 300 self.assertEquals(0, scope_id) 301 302 # TODO: check the checksum. We can't do this easily now for ICMPv6 because 303 # we don't have the IP addresses so we can't construct the pseudoheader. 304 305 # Check the sequence number and the data. 306 self.assertEqual(len(data), len(rcvd)) 307 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) 308 309 @staticmethod 310 def IsAlmostEqual(expected, actual, delta): 311 return abs(expected - actual) < delta 312 313 def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, 314 txmem=0, rxmem=0): 315 expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), 316 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), 317 "%02X" % state, 318 "%08X:%08X" % (txmem, rxmem), 319 str(os.getuid()), "2", "0"] 320 for actual in self.ReadProcNetSocket(name): 321 # Check that rxmem and txmem don't differ too much from each other. 322 actual_txmem, actual_rxmem = expected[3].split(":") 323 if self.IsAlmostEqual(txmem, int(actual_txmem, 16), txmem / 4): 324 return 325 if self.IsAlmostEqual(rxmem, int(actual_rxmem, 16), rxmem / 4): 326 return 327 328 # Check all the parameters except rxmem and txmem. 329 expected[3] = actual[3] 330 if expected == actual: 331 return 332 333 self.fail("Cound not find socket matching %s" % expected) 334 335 def testIPv4SendWithNoConnection(self): 336 s = net_test.IPv4PingSocket() 337 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) 338 339 def testIPv6SendWithNoConnection(self): 340 s = net_test.IPv6PingSocket() 341 self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) 342 343 def testIPv4LoopbackPingWithConnect(self): 344 s = net_test.IPv4PingSocket() 345 s.connect(("127.0.0.1", 55)) 346 data = net_test.IPV4_PING + "foobarbaz" 347 s.send(data) 348 self.assertValidPingResponse(s, data) 349 350 def testIPv6LoopbackPingWithConnect(self): 351 s = net_test.IPv6PingSocket() 352 s.connect(("::1", 55)) 353 s.send(net_test.IPV6_PING) 354 self.assertValidPingResponse(s, net_test.IPV6_PING) 355 356 def testIPv4PingUsingSendto(self): 357 s = net_test.IPv4PingSocket() 358 written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 359 self.assertEquals(len(net_test.IPV4_PING), written) 360 self.assertValidPingResponse(s, net_test.IPV4_PING) 361 362 def testIPv6PingUsingSendto(self): 363 s = net_test.IPv6PingSocket() 364 written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 365 self.assertEquals(len(net_test.IPV6_PING), written) 366 self.assertValidPingResponse(s, net_test.IPV6_PING) 367 368 def testIPv4NoCrash(self): 369 # Python 2.x does not provide either read() or recvmsg. 370 s = net_test.IPv4PingSocket() 371 written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) 372 self.assertEquals(len(net_test.IPV4_PING), written) 373 fd = s.fileno() 374 reply = posix.read(fd, 4096) 375 self.assertEquals(written, len(reply)) 376 377 def testIPv6NoCrash(self): 378 # Python 2.x does not provide either read() or recvmsg. 379 s = net_test.IPv6PingSocket() 380 written = s.sendto(net_test.IPV6_PING, ("::1", 55)) 381 self.assertEquals(len(net_test.IPV6_PING), written) 382 fd = s.fileno() 383 reply = posix.read(fd, 4096) 384 self.assertEquals(written, len(reply)) 385 386 def testCrossProtocolCrash(self): 387 # Checks that an ICMP error containing a ping packet that matches the ID 388 # of a socket of the wrong protocol (which can happen when using 464xlat) 389 # doesn't crash the kernel. 390 391 # We can only test this using IPv6 unreachables and IPv4 ping sockets, 392 # because IPv4 packets sent by scapy.send() on loopback are not received by 393 # the kernel. So we don't actually use this function yet. 394 def GetIPv4Unreachable(port): # pylint: disable=unused-variable 395 return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") / 396 scapy.ICMP(type=3, code=0) / 397 scapy.IP(src="127.0.0.1", dst="127.0.0.1") / 398 scapy.ICMP(type=8, id=port, seq=1)) 399 400 def GetIPv6Unreachable(port): 401 return (scapy.IPv6(src="::1", dst="::1") / 402 scapy.ICMPv6DestUnreach() / 403 scapy.IPv6(src="::1", dst="::1") / 404 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) 405 406 # An unreachable matching the ID of a socket of the wrong protocol 407 # shouldn't crash. 408 s = net_test.IPv4PingSocket() 409 s.connect(("127.0.0.1", 12345)) 410 _, port = s.getsockname() 411 scapy.send(GetIPv6Unreachable(port), verbose=False) 412 # No crash? Good. 413 414 def testCrossProtocolCalls(self): 415 """Tests that passing in the wrong family returns EAFNOSUPPORT. 416 417 Relevant kernel commits: 418 upstream net: 419 91a0b60 net/ping: handle protocol mismatching scenario 420 9145736d net: ping: Return EAFNOSUPPORT when appropriate. 421 422 android-3.10: 423 78a6809 net/ping: handle protocol mismatching scenario 424 428e6d6 net: ping: Return EAFNOSUPPORT when appropriate. 425 """ 426 427 def CheckEAFNoSupport(function, *args): 428 self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args) 429 430 ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53)) 431 432 # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed 433 # IPv4 socket address structures, we need to pass down a socket address 434 # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls 435 # will fail immediately with EINVAL because the passed-in socket length is 436 # too short. So create a sockaddr_in that's as long as a sockaddr_in6. 437 ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53)) 438 ipv4sockaddr = csocket.SockaddrIn6( 439 ipv4sockaddr.Pack() + 440 "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn))) 441 442 s4 = net_test.IPv4PingSocket() 443 s6 = net_test.IPv6PingSocket() 444 445 # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong 446 # address family, because the Python implementation will just pass garbage 447 # down to the kernel. So call the C functions directly. 448 CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr) 449 CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr) 450 CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr) 451 CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr) 452 CheckEAFNoSupport(csocket.Sendmsg, 453 s4, ipv6sockaddr, net_test.IPV4_PING, None, 0) 454 CheckEAFNoSupport(csocket.Sendmsg, 455 s6, ipv4sockaddr, net_test.IPV6_PING, None, 0) 456 457 def testIPv4Bind(self): 458 # Bind to unspecified address. 459 s = net_test.IPv4PingSocket() 460 s.bind(("0.0.0.0", 544)) 461 self.assertEquals(("0.0.0.0", 544), s.getsockname()) 462 463 # Bind to loopback. 464 s = net_test.IPv4PingSocket() 465 s.bind(("127.0.0.1", 99)) 466 self.assertEquals(("127.0.0.1", 99), s.getsockname()) 467 468 # Binding twice is not allowed. 469 self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) 470 471 # But binding two different sockets to the same ID is allowed. 472 s2 = net_test.IPv4PingSocket() 473 s2.bind(("127.0.0.1", 99)) 474 self.assertEquals(("127.0.0.1", 99), s2.getsockname()) 475 s3 = net_test.IPv4PingSocket() 476 s3.bind(("127.0.0.1", 99)) 477 self.assertEquals(("127.0.0.1", 99), s3.getsockname()) 478 479 # If two sockets bind to the same port, the first one to call read() gets 480 # the response. 481 s4 = net_test.IPv4PingSocket() 482 s5 = net_test.IPv4PingSocket() 483 s4.bind(("0.0.0.0", 167)) 484 s5.bind(("0.0.0.0", 167)) 485 s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) 486 self.assertValidPingResponse(s5, net_test.IPV4_PING) 487 csocket.SetSocketTimeout(s4, 100) 488 self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) 489 490 # If SO_REUSEADDR is turned off, then we get EADDRINUSE. 491 s6 = net_test.IPv4PingSocket() 492 s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) 493 self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) 494 495 # Can't bind after sendto. 496 s = net_test.IPv4PingSocket() 497 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) 498 self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) 499 500 def testIPv6Bind(self): 501 # Bind to unspecified address. 502 s = net_test.IPv6PingSocket() 503 s.bind(("::", 769)) 504 self.assertEquals(("::", 769, 0, 0), s.getsockname()) 505 506 # Bind to loopback. 507 s = net_test.IPv6PingSocket() 508 s.bind(("::1", 99)) 509 self.assertEquals(("::1", 99, 0, 0), s.getsockname()) 510 511 # Binding twice is not allowed. 512 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) 513 514 # But binding two different sockets to the same ID is allowed. 515 s2 = net_test.IPv6PingSocket() 516 s2.bind(("::1", 99)) 517 self.assertEquals(("::1", 99, 0, 0), s2.getsockname()) 518 s3 = net_test.IPv6PingSocket() 519 s3.bind(("::1", 99)) 520 self.assertEquals(("::1", 99, 0, 0), s3.getsockname()) 521 522 # Binding both IPv4 and IPv6 to the same socket works. 523 s4 = net_test.IPv4PingSocket() 524 s6 = net_test.IPv6PingSocket() 525 s4.bind(("0.0.0.0", 444)) 526 s6.bind(("::", 666, 0, 0)) 527 528 # Can't bind after sendto. 529 s = net_test.IPv6PingSocket() 530 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) 531 self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) 532 533 def testIPv4InvalidBind(self): 534 s = net_test.IPv4PingSocket() 535 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 536 s.bind, ("255.255.255.255", 1026)) 537 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 538 s.bind, ("224.0.0.1", 651)) 539 # Binding to an address we don't have only works with IP_TRANSPARENT. 540 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 541 s.bind, (net_test.IPV4_ADDR, 651)) 542 try: 543 s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) 544 s.bind((net_test.IPV4_ADDR, 651)) 545 except IOError, e: 546 if e.errno == errno.EACCES: 547 pass # We're not root. let it go for now. 548 549 def testIPv6InvalidBind(self): 550 s = net_test.IPv6PingSocket() 551 self.assertRaisesErrno(errno.EINVAL, 552 s.bind, ("ff02::2", 1026)) 553 554 # Binding to an address we don't have only works with IPV6_TRANSPARENT. 555 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 556 s.bind, (net_test.IPV6_ADDR, 651)) 557 try: 558 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) 559 s.bind((net_test.IPV6_ADDR, 651)) 560 except IOError, e: 561 if e.errno == errno.EACCES: 562 pass # We're not root. let it go for now. 563 564 def testAfUnspecBind(self): 565 # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0. 566 s4 = net_test.IPv4PingSocket() 567 sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) 568 sockaddr.family = AF_UNSPEC 569 csocket.Bind(s4, sockaddr) 570 self.assertEquals(("0.0.0.0", 12996), s4.getsockname()) 571 572 # But not if the address is anything else. 573 sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) 574 sockaddr.family = AF_UNSPEC 575 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr) 576 577 # This doesn't work for IPv6. 578 s6 = net_test.IPv6PingSocket() 579 sockaddr = csocket.Sockaddr(("::1", 58997)) 580 sockaddr.family = AF_UNSPEC 581 self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr) 582 583 def testIPv6ScopedBind(self): 584 # Can't bind to a link-local address without a scope ID. 585 s = net_test.IPv6PingSocket() 586 self.assertRaisesErrno(errno.EINVAL, 587 s.bind, (self.lladdr, 1026, 0, 0)) 588 589 # Binding to a link-local address with a scope ID works, and the scope ID is 590 # returned by a subsequent getsockname. Interestingly, Python's getsockname 591 # returns "fe80:1%foo", even though it does not understand it. 592 expected = self.lladdr + "%" + self.ifname 593 s.bind((self.lladdr, 4646, 0, self.ifindex)) 594 self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname()) 595 596 # Of course, for the above to work the address actually has to be configured 597 # on the machine. 598 self.assertRaisesErrno(errno.EADDRNOTAVAIL, 599 s.bind, ("fe80::f00", 1026, 0, 1)) 600 601 # Scope IDs on non-link-local addresses are silently ignored. 602 s = net_test.IPv6PingSocket() 603 s.bind(("::1", 1234, 0, 1)) 604 self.assertEquals(("::1", 1234, 0, 0), s.getsockname()) 605 606 def testBindAffectsIdentifier(self): 607 s = net_test.IPv6PingSocket() 608 s.bind((self.globaladdr, 0xf976)) 609 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 610 self.assertEquals("\xf9\x76", s.recv(32768)[4:6]) 611 612 s = net_test.IPv6PingSocket() 613 s.bind((self.globaladdr, 0xace)) 614 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 615 self.assertEquals("\x0a\xce", s.recv(32768)[4:6]) 616 617 def testLinkLocalAddress(self): 618 s = net_test.IPv6PingSocket() 619 # Sending to a link-local address with no scope fails with EINVAL. 620 self.assertRaisesErrno(errno.EINVAL, 621 s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) 622 # Sending to link-local address with a scope succeeds. Note that Python 623 # doesn't understand the "fe80::1%lo" format, even though it returns it. 624 s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) 625 # No exceptions? Good. 626 627 def testLinkLocalOif(self): 628 """Checks that ping to link-local addresses works correctly. 629 630 Relevant kernel commits: 631 upstream net: 632 5e45789 net: ipv6: Fix ping to link-local addresses. 633 """ 634 for mode in ["oif", "ucast_oif", None]: 635 s = net_test.IPv6PingSocket() 636 for netid in self.NETIDS: 637 s2 = net_test.IPv6PingSocket() 638 dst = self._RouterAddress(netid, 6) 639 self.assertTrue(dst.startswith("fe80:")) 640 641 if mode: 642 self.SelectInterface(s, netid, mode) 643 self.SelectInterface(s2, netid, mode) 644 scopeid = 0 645 else: 646 scopeid = self.ifindices[netid] 647 648 if mode == "oif": 649 # If SO_BINDTODEVICE has been set, any attempt to send on another 650 # interface returns EINVAL. 651 othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1) 652 % len(self.NETIDS)] 653 otherscopeid = self.ifindices[othernetid] 654 self.assertRaisesErrno( 655 errno.EINVAL, 656 s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid)) 657 self.assertRaisesErrno( 658 errno.EINVAL, 659 s.connect, (dst, 55, 0, otherscopeid)) 660 661 # Try using both sendto and connect/send. 662 # If we get a reply, we sent the packet out on the right interface. 663 s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid)) 664 self.assertValidPingResponse(s, net_test.IPV6_PING) 665 666 # IPV6_UNICAST_IF doesn't work on connected sockets. 667 if mode != "ucast_oif": 668 s2.connect((dst, 123, 0, scopeid)) 669 s2.send(net_test.IPV6_PING) 670 self.assertValidPingResponse(s2, net_test.IPV6_PING) 671 672 def testMappedAddressFails(self): 673 s = net_test.IPv6PingSocket() 674 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 675 self.assertValidPingResponse(s, net_test.IPV6_PING) 676 s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) 677 self.assertValidPingResponse(s, net_test.IPV6_PING) 678 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 679 ("::ffff:192.0.2.1", 55)) 680 681 @unittest.skipUnless(False, "skipping: does not work yet") 682 def testFlowLabel(self): 683 s = net_test.IPv6PingSocket() 684 685 # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but 686 # the flow label in the packet is not set. 687 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 688 self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks flow label==0. 689 690 # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label 691 # that is not registered with the flow manager should return EINVAL... 692 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) 693 # ... but this doesn't work yet. 694 if False: 695 self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, 696 (net_test.IPV6_ADDR, 93, 0xdead, 0)) 697 698 # After registering the flow label, it gets sent properly, appears in the 699 # output packet, and is returned in the response. 700 net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead) 701 self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, 702 net_test.IPV6_FLOWINFO_SEND)) 703 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) 704 _, src = s.recvfrom(32768) 705 _, _, flowlabel, _ = src 706 self.assertEqual(0xdead, flowlabel & 0xfffff) 707 708 def testIPv4Error(self): 709 s = net_test.IPv4PingSocket() 710 s.setsockopt(SOL_IP, IP_TTL, 2) 711 s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) 712 s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) 713 # We can't check the actual error because Python 2.7 doesn't implement 714 # recvmsg, but we can at least check that the socket returns an error. 715 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 716 717 def testIPv6Error(self): 718 s = net_test.IPv6PingSocket() 719 s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) 720 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 721 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) 722 # We can't check the actual error because Python 2.7 doesn't implement 723 # recvmsg, but we can at least check that the socket returns an error. 724 self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. 725 726 def testIPv6MulticastPing(self): 727 s = net_test.IPv6PingSocket() 728 # Send a multicast ping and check we get at least one duplicate. 729 # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug. 730 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 731 s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) 732 self.assertValidPingResponse(s, net_test.IPV6_PING) 733 self.assertValidPingResponse(s, net_test.IPV6_PING) 734 735 def testIPv4LargePacket(self): 736 s = net_test.IPv4PingSocket() 737 data = net_test.IPV4_PING + 20000 * "a" 738 s.sendto(data, ("127.0.0.1", 987)) 739 self.assertValidPingResponse(s, data) 740 741 def testIPv6LargePacket(self): 742 s = net_test.IPv6PingSocket() 743 s.bind(("::", 0xace)) 744 data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa" 745 s.sendto(data, ("::1", 953)) 746 747 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 748 def testIcmpSocketsNotInIcmp6(self): 749 numrows = len(self.ReadProcNetSocket("icmp")) 750 numrows6 = len(self.ReadProcNetSocket("icmp6")) 751 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 752 s.bind(("127.0.0.1", 0xace)) 753 s.connect(("127.0.0.1", 0xbeef)) 754 self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp"))) 755 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 756 757 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 758 def testIcmp6SocketsNotInIcmp(self): 759 numrows = len(self.ReadProcNetSocket("icmp")) 760 numrows6 = len(self.ReadProcNetSocket("icmp6")) 761 s = net_test.IPv6PingSocket() 762 s.bind(("::1", 0xace)) 763 s.connect(("::1", 0xbeef)) 764 self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp"))) 765 self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) 766 767 def testProcNetIcmp(self): 768 s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) 769 s.bind(("127.0.0.1", 0xace)) 770 s.connect(("127.0.0.1", 0xbeef)) 771 self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) 772 773 @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") 774 def testProcNetIcmp6(self): 775 numrows6 = len(self.ReadProcNetSocket("icmp6")) 776 s = net_test.IPv6PingSocket() 777 s.bind(("::1", 0xace)) 778 s.connect(("::1", 0xbeef)) 779 self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) 780 781 # Check the row goes away when the socket is closed. 782 s.close() 783 self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) 784 785 # Try send, bind and connect to check the addresses and the state. 786 s = net_test.IPv6PingSocket() 787 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 788 s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) 789 self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) 790 791 # Can't bind after sendto, apparently. 792 s = net_test.IPv6PingSocket() 793 self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) 794 s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) 795 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) 796 797 # Check receive bytes. 798 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex) 799 s.connect(("ff02::1", 0xdead)) 800 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) 801 s.send(net_test.IPV6_PING) 802 s.recvfrom(32768, MSG_PEEK) # Wait until the receive thread replies. 803 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 804 txmem=0, rxmem=0x300) 805 self.assertValidPingResponse(s, net_test.IPV6_PING) 806 self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, 807 txmem=0, rxmem=0) 808 809 def testProcNetUdp6(self): 810 s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) 811 s.bind(("::1", 0xace)) 812 s.connect(("::1", 0xbeef)) 813 self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) 814 815 def testProcNetRaw6(self): 816 s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) 817 s.bind(("::1", 0xace)) 818 s.connect(("::1", 0xbeef)) 819 self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) 820 821 def testIPv6MTU(self): 822 """Tests IPV6_RECVERR and path MTU discovery on ping sockets. 823 824 Relevant kernel commits: 825 upstream net-next: 826 dcb94b8 ipv6: fix endianness error in icmpv6_err 827 """ 828 s = net_test.IPv6PingSocket() 829 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1) 830 s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2) 831 s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) 832 s.connect((net_test.IPV6_ADDR, 55)) 833 pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a" 834 s.send(pkt) 835 self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768) 836 data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE) 837 838 # Compare the offending packet with the one we sent. To do this we need to 839 # calculate the ident of the packet we sent and blank out the checksum of 840 # the one we received. 841 ident = struct.pack("!H", s.getsockname()[1]) 842 pkt = pkt[:4] + ident + pkt[6:] 843 data = data[:2] + "\x00\x00" + pkt[4:] 844 self.assertEquals(pkt, data) 845 846 # Check the address that the packet was sent to. 847 # ... except in 4.1, where it just returns an AF_UNSPEC, like this: 848 # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC, 849 # sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, 850 # msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}], 851 # msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...}, 852 # msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232 853 if net_test.LINUX_VERSION != (4, 1, 0): 854 self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr) 855 856 # Check the cmsg data, including the link MTU. 857 mtu = PingReplyThread.LINK_MTU 858 src = self.reply_threads[self.netid].INTERMEDIATE_IPV6 859 msglist = [ 860 (net_test.SOL_IPV6, net_test.IPV6_RECVERR, 861 (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6, 862 ICMPV6_PKT_TOOBIG, 0, mtu, 0)), 863 csocket.Sockaddr((src, 0)))) 864 ] 865 866 # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port. 867 # The fix might have been in 676d236, but we don't have that in 3.10 and it 868 # touches code all over the tree. Instead, just don't check the port. 869 if net_test.LINUX_VERSION <= (3, 14, 0): 870 msglist[0][2][1].port = cmsg[0][2][1].port 871 872 self.assertEquals(msglist, cmsg) 873 874 875 if __name__ == "__main__": 876 unittest.main() 877