Home | History | Annotate | Download | only in test
      1 #!/usr/bin/python
      2 #
      3 # Copyright 2014 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # pylint: disable=g-bad-todo
     18 
     19 import errno
     20 import os
     21 import posix
     22 import random
     23 from socket import *  # pylint: disable=wildcard-import
     24 import struct
     25 import sys
     26 import threading
     27 import time
     28 import unittest
     29 
     30 from scapy import all as scapy
     31 
     32 import csocket
     33 import multinetwork_base
     34 import net_test
     35 
     36 
     37 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
     38 
     39 ICMP_ECHO = 8
     40 ICMP_ECHOREPLY = 0
     41 ICMPV6_ECHO_REQUEST = 128
     42 ICMPV6_ECHO_REPLY = 129
     43 IPV6_MIN_MTU = 1280
     44 ICMPV6_HEADER_LEN = 8
     45 ICMPV6_PKT_TOOBIG = 2
     46 
     47 
     48 class PingReplyThread(threading.Thread):
     49 
     50   MIN_TTL = 10
     51   INTERMEDIATE_IPV4 = "192.0.2.2"
     52   INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
     53   NEIGHBOURS = ["fe80::1"]
     54   LINK_MTU = 1300
     55 
     56   def __init__(self, tun, mymac, routermac, routeraddr):
     57     super(PingReplyThread, self).__init__()
     58     self._tun = tun
     59     self._started = False
     60     self._stopped = False
     61     self._mymac = mymac
     62     self._routermac = routermac
     63     self._routeraddr = routeraddr
     64 
     65   def IsStarted(self):
     66     return self._started
     67 
     68   def Stop(self):
     69     self._stopped = True
     70 
     71   def ChecksumValid(self, packet):
     72     # Get and clear the checksums.
     73     def GetAndClearChecksum(layer):
     74       if not layer:
     75         return
     76       try:
     77         checksum = layer.chksum
     78         del layer.chksum
     79       except AttributeError:
     80         checksum = layer.cksum
     81         del layer.cksum
     82       return checksum
     83 
     84     def GetChecksum(layer):
     85       try:
     86         return layer.chksum
     87       except AttributeError:
     88         return layer.cksum
     89 
     90     layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
     91     sums = {}
     92     for name in layers:
     93       sums[name] = GetAndClearChecksum(packet.getlayer(name))
     94 
     95     # Serialize the packet, so scapy recalculates the checksums, and compare
     96     # them with the ones in the packet.
     97     packet = packet.__class__(str(packet))
     98     for name in layers:
     99       layer = packet.getlayer(name)
    100       if layer and GetChecksum(layer) != sums[name]:
    101         return False
    102 
    103     return True
    104 
    105   def SendTimeExceeded(self, version, packet):
    106     if version == 4:
    107       src = packet.getlayer(scapy.IP).src
    108       self.SendPacket(
    109           scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
    110           scapy.ICMP(type=11, code=0) /
    111           packet)
    112     elif version == 6:
    113       src = packet.getlayer(scapy.IPv6).src
    114       self.SendPacket(
    115           scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
    116           scapy.ICMPv6TimeExceeded(code=0) /
    117           packet)
    118 
    119   def SendPacketTooBig(self, packet):
    120       src = packet.getlayer(scapy.IPv6).src
    121       datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN
    122       self.SendPacket(
    123           scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
    124           scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) /
    125           str(packet)[:datalen])
    126 
    127   def IPv4Packet(self, ip):
    128     icmp = ip.getlayer(scapy.ICMP)
    129 
    130     # We only support ping for now.
    131     if (ip.proto != IPPROTO_ICMP or
    132         icmp.type != ICMP_ECHO or
    133         icmp.code != 0):
    134       return
    135 
    136     # Check the checksums.
    137     if not self.ChecksumValid(ip):
    138       return
    139 
    140     if ip.ttl < self.MIN_TTL:
    141       self.SendTimeExceeded(4, ip)
    142       return
    143 
    144     icmp.type = ICMP_ECHOREPLY
    145     self.SwapAddresses(ip)
    146     self.SendPacket(ip)
    147 
    148   def IPv6Packet(self, ipv6):
    149     icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
    150 
    151     # We only support ping for now.
    152     if (ipv6.nh != IPPROTO_ICMPV6 or
    153         not icmpv6 or
    154         icmpv6.type != ICMPV6_ECHO_REQUEST or
    155         icmpv6.code != 0):
    156       return
    157 
    158     # Check the checksums.
    159     if not self.ChecksumValid(ipv6):
    160       return
    161 
    162     if ipv6.dst.startswith("ff02::"):
    163       ipv6.dst = ipv6.src
    164       for src in [self._routeraddr]:
    165         ipv6.src = src
    166         icmpv6.type = ICMPV6_ECHO_REPLY
    167         self.SendPacket(ipv6)
    168     elif ipv6.hlim < self.MIN_TTL:
    169       self.SendTimeExceeded(6, ipv6)
    170     elif ipv6.plen > self.LINK_MTU:
    171       self.SendPacketTooBig(ipv6)
    172     else:
    173       icmpv6.type = ICMPV6_ECHO_REPLY
    174       if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr:
    175         return
    176       self.SwapAddresses(ipv6)
    177       self.SendPacket(ipv6)
    178 
    179   def SwapAddresses(self, packet):
    180     src = packet.src
    181     packet.src = packet.dst
    182     packet.dst = src
    183 
    184   def SendPacket(self, packet):
    185     packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
    186     try:
    187       posix.write(self._tun.fileno(), str(packet))
    188     except Exception, e:
    189       if not self._stopped:
    190         raise e
    191 
    192   def run(self):
    193     self._started = True
    194     while not self._stopped:
    195       try:
    196         packet = posix.read(self._tun.fileno(), 4096)
    197       except OSError, e:
    198         if e.errno == errno.EAGAIN:
    199           continue
    200         else:
    201           break
    202       except ValueError, e:
    203         if not self._stopped:
    204           raise e
    205 
    206       ether = scapy.Ether(packet)
    207       if ether.type == net_test.ETH_P_IPV6:
    208         self.IPv6Packet(ether.payload)
    209       elif ether.type == net_test.ETH_P_IP:
    210         self.IPv4Packet(ether.payload)
    211 
    212 
    213 class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
    214 
    215   @classmethod
    216   def WaitForReplyThreads(cls):
    217     # Wait 2s for the reply threads to start. If they don't, don't blow up, as
    218     # that would cause tearDownClass not to be called and thus not clean up
    219     # routing configuration, breaking subsequent tests. Instead, just let these
    220     # tests fail.
    221     _INTERVAL = 0.1
    222     _ATTEMPTS = 20
    223     for i in xrange(0, _ATTEMPTS):
    224       for netid in cls.NETIDS:
    225         if all(thread.IsStarted() for thread in cls.reply_threads.values()):
    226           return
    227         time.sleep(_INTERVAL)
    228     msg = "WARNING: reply threads not all started after %.1f seconds\n" % (
    229         _ATTEMPTS * _INTERVAL)
    230     sys.stderr.write(msg)
    231 
    232   @classmethod
    233   def StopReplyThreads(cls):
    234     for thread in cls.reply_threads.values():
    235       thread.Stop()
    236 
    237   @classmethod
    238   def setUpClass(cls):
    239     super(Ping6Test, cls).setUpClass()
    240     cls.reply_threads = {}
    241     for netid in cls.NETIDS:
    242       cls.reply_threads[netid] = PingReplyThread(
    243         cls.tuns[netid],
    244         cls.MyMacAddress(netid),
    245         cls.RouterMacAddress(netid),
    246         cls._RouterAddress(netid, 6))
    247       cls.reply_threads[netid].start()
    248     cls.WaitForReplyThreads()
    249     cls.netid = random.choice(cls.NETIDS)
    250     cls.SetDefaultNetwork(cls.netid)
    251 
    252   @classmethod
    253   def tearDownClass(cls):
    254     cls.StopReplyThreads()
    255     cls.ClearDefaultNetwork()
    256     super(Ping6Test, cls).tearDownClass()
    257 
    258   def setUp(self):
    259     self.ifname = self.GetInterfaceName(self.netid)
    260     self.ifindex = self.ifindices[self.netid]
    261     self.lladdr = net_test.GetLinkAddress(self.ifname, True)
    262     self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
    263 
    264   def assertValidPingResponse(self, s, data):
    265     family = s.family
    266 
    267     # Receive the reply.
    268     rcvd, src = s.recvfrom(32768)
    269     self.assertNotEqual(0, len(rcvd), "No data received")
    270 
    271     # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
    272     # as IPv4.
    273     if src[0].startswith("::ffff:"):
    274       family = AF_INET
    275       src = (src[0].replace("::ffff:", ""), src[1:])
    276 
    277     # Check the data being sent is valid.
    278     self.assertGreater(len(data), 7, "Not enough data for ping packet")
    279     if family == AF_INET:
    280       self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
    281     elif family == AF_INET6:
    282       self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
    283     else:
    284       self.fail("Unknown socket address family %d" * s.family)
    285 
    286     # Check address, ICMP type, and ICMP code.
    287     if family == AF_INET:
    288       addr, unused_port = src
    289       self.assertGreaterEqual(len(addr), len("1.1.1.1"))
    290       self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
    291     else:
    292       addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
    293       self.assertGreaterEqual(len(addr), len("::"))
    294       self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
    295       # Check that the flow label is zero and that the scope ID is sane.
    296       self.assertEqual(flowlabel, 0)
    297       if addr.startswith("fe80::"):
    298         self.assertTrue(scope_id in self.ifindices.values())
    299       else:
    300         self.assertEquals(0, scope_id)
    301 
    302     # TODO: check the checksum. We can't do this easily now for ICMPv6 because
    303     # we don't have the IP addresses so we can't construct the pseudoheader.
    304 
    305     # Check the sequence number and the data.
    306     self.assertEqual(len(data), len(rcvd))
    307     self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
    308 
    309   @staticmethod
    310   def IsAlmostEqual(expected, actual, delta):
    311     return abs(expected - actual) < delta
    312 
    313   def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
    314                         txmem=0, rxmem=0):
    315     expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
    316                 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
    317                 "%02X" % state,
    318                 "%08X:%08X" % (txmem, rxmem),
    319                 str(os.getuid()), "2", "0"]
    320     for actual in self.ReadProcNetSocket(name):
    321       # Check that rxmem and txmem don't differ too much from each other.
    322       actual_txmem, actual_rxmem = expected[3].split(":")
    323       if self.IsAlmostEqual(txmem, int(actual_txmem, 16), txmem / 4):
    324         return
    325       if self.IsAlmostEqual(rxmem, int(actual_rxmem, 16), rxmem / 4):
    326         return
    327 
    328       # Check all the parameters except rxmem and txmem.
    329       expected[3] = actual[3]
    330       if expected == actual:
    331         return
    332 
    333     self.fail("Cound not find socket matching %s" % expected)
    334 
    335   def testIPv4SendWithNoConnection(self):
    336     s = net_test.IPv4PingSocket()
    337     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
    338 
    339   def testIPv6SendWithNoConnection(self):
    340     s = net_test.IPv6PingSocket()
    341     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
    342 
    343   def testIPv4LoopbackPingWithConnect(self):
    344     s = net_test.IPv4PingSocket()
    345     s.connect(("127.0.0.1", 55))
    346     data = net_test.IPV4_PING + "foobarbaz"
    347     s.send(data)
    348     self.assertValidPingResponse(s, data)
    349 
    350   def testIPv6LoopbackPingWithConnect(self):
    351     s = net_test.IPv6PingSocket()
    352     s.connect(("::1", 55))
    353     s.send(net_test.IPV6_PING)
    354     self.assertValidPingResponse(s, net_test.IPV6_PING)
    355 
    356   def testIPv4PingUsingSendto(self):
    357     s = net_test.IPv4PingSocket()
    358     written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
    359     self.assertEquals(len(net_test.IPV4_PING), written)
    360     self.assertValidPingResponse(s, net_test.IPV4_PING)
    361 
    362   def testIPv6PingUsingSendto(self):
    363     s = net_test.IPv6PingSocket()
    364     written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    365     self.assertEquals(len(net_test.IPV6_PING), written)
    366     self.assertValidPingResponse(s, net_test.IPV6_PING)
    367 
    368   def testIPv4NoCrash(self):
    369     # Python 2.x does not provide either read() or recvmsg.
    370     s = net_test.IPv4PingSocket()
    371     written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
    372     self.assertEquals(len(net_test.IPV4_PING), written)
    373     fd = s.fileno()
    374     reply = posix.read(fd, 4096)
    375     self.assertEquals(written, len(reply))
    376 
    377   def testIPv6NoCrash(self):
    378     # Python 2.x does not provide either read() or recvmsg.
    379     s = net_test.IPv6PingSocket()
    380     written = s.sendto(net_test.IPV6_PING, ("::1", 55))
    381     self.assertEquals(len(net_test.IPV6_PING), written)
    382     fd = s.fileno()
    383     reply = posix.read(fd, 4096)
    384     self.assertEquals(written, len(reply))
    385 
    386   def testCrossProtocolCrash(self):
    387     # Checks that an ICMP error containing a ping packet that matches the ID
    388     # of a socket of the wrong protocol (which can happen when using 464xlat)
    389     # doesn't crash the kernel.
    390 
    391     # We can only test this using IPv6 unreachables and IPv4 ping sockets,
    392     # because IPv4 packets sent by scapy.send() on loopback are not received by
    393     # the kernel. So we don't actually use this function yet.
    394     def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
    395       return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
    396               scapy.ICMP(type=3, code=0) /
    397               scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
    398               scapy.ICMP(type=8, id=port, seq=1))
    399 
    400     def GetIPv6Unreachable(port):
    401       return (scapy.IPv6(src="::1", dst="::1") /
    402               scapy.ICMPv6DestUnreach() /
    403               scapy.IPv6(src="::1", dst="::1") /
    404               scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
    405 
    406     # An unreachable matching the ID of a socket of the wrong protocol
    407     # shouldn't crash.
    408     s = net_test.IPv4PingSocket()
    409     s.connect(("127.0.0.1", 12345))
    410     _, port = s.getsockname()
    411     scapy.send(GetIPv6Unreachable(port), verbose=False)
    412     # No crash? Good.
    413 
    414   def testCrossProtocolCalls(self):
    415     """Tests that passing in the wrong family returns EAFNOSUPPORT.
    416 
    417     Relevant kernel commits:
    418       upstream net:
    419         91a0b60 net/ping: handle protocol mismatching scenario
    420         9145736d net: ping: Return EAFNOSUPPORT when appropriate.
    421 
    422       android-3.10:
    423         78a6809 net/ping: handle protocol mismatching scenario
    424         428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
    425     """
    426 
    427     def CheckEAFNoSupport(function, *args):
    428       self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
    429 
    430     ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
    431 
    432     # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
    433     # IPv4 socket address structures, we need to pass down a socket address
    434     # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
    435     # will fail immediately with EINVAL because the passed-in socket length is
    436     # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
    437     ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
    438     ipv4sockaddr = csocket.SockaddrIn6(
    439         ipv4sockaddr.Pack() +
    440         "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
    441 
    442     s4 = net_test.IPv4PingSocket()
    443     s6 = net_test.IPv6PingSocket()
    444 
    445     # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
    446     # address family, because the Python implementation will just pass garbage
    447     # down to the kernel. So call the C functions directly.
    448     CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
    449     CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
    450     CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
    451     CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
    452     CheckEAFNoSupport(csocket.Sendmsg,
    453                       s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
    454     CheckEAFNoSupport(csocket.Sendmsg,
    455                       s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
    456 
    457   def testIPv4Bind(self):
    458     # Bind to unspecified address.
    459     s = net_test.IPv4PingSocket()
    460     s.bind(("0.0.0.0", 544))
    461     self.assertEquals(("0.0.0.0", 544), s.getsockname())
    462 
    463     # Bind to loopback.
    464     s = net_test.IPv4PingSocket()
    465     s.bind(("127.0.0.1", 99))
    466     self.assertEquals(("127.0.0.1", 99), s.getsockname())
    467 
    468     # Binding twice is not allowed.
    469     self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
    470 
    471     # But binding two different sockets to the same ID is allowed.
    472     s2 = net_test.IPv4PingSocket()
    473     s2.bind(("127.0.0.1", 99))
    474     self.assertEquals(("127.0.0.1", 99), s2.getsockname())
    475     s3 = net_test.IPv4PingSocket()
    476     s3.bind(("127.0.0.1", 99))
    477     self.assertEquals(("127.0.0.1", 99), s3.getsockname())
    478 
    479     # If two sockets bind to the same port, the first one to call read() gets
    480     # the response.
    481     s4 = net_test.IPv4PingSocket()
    482     s5 = net_test.IPv4PingSocket()
    483     s4.bind(("0.0.0.0", 167))
    484     s5.bind(("0.0.0.0", 167))
    485     s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
    486     self.assertValidPingResponse(s5, net_test.IPV4_PING)
    487     csocket.SetSocketTimeout(s4, 100)
    488     self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
    489 
    490     # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
    491     s6 = net_test.IPv4PingSocket()
    492     s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
    493     self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
    494 
    495     # Can't bind after sendto.
    496     s = net_test.IPv4PingSocket()
    497     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
    498     self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
    499 
    500   def testIPv6Bind(self):
    501     # Bind to unspecified address.
    502     s = net_test.IPv6PingSocket()
    503     s.bind(("::", 769))
    504     self.assertEquals(("::", 769, 0, 0), s.getsockname())
    505 
    506     # Bind to loopback.
    507     s = net_test.IPv6PingSocket()
    508     s.bind(("::1", 99))
    509     self.assertEquals(("::1", 99, 0, 0), s.getsockname())
    510 
    511     # Binding twice is not allowed.
    512     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
    513 
    514     # But binding two different sockets to the same ID is allowed.
    515     s2 = net_test.IPv6PingSocket()
    516     s2.bind(("::1", 99))
    517     self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
    518     s3 = net_test.IPv6PingSocket()
    519     s3.bind(("::1", 99))
    520     self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
    521 
    522     # Binding both IPv4 and IPv6 to the same socket works.
    523     s4 = net_test.IPv4PingSocket()
    524     s6 = net_test.IPv6PingSocket()
    525     s4.bind(("0.0.0.0", 444))
    526     s6.bind(("::", 666, 0, 0))
    527 
    528     # Can't bind after sendto.
    529     s = net_test.IPv6PingSocket()
    530     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
    531     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
    532 
    533   def testIPv4InvalidBind(self):
    534     s = net_test.IPv4PingSocket()
    535     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    536                            s.bind, ("255.255.255.255", 1026))
    537     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    538                            s.bind, ("224.0.0.1", 651))
    539     # Binding to an address we don't have only works with IP_TRANSPARENT.
    540     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    541                            s.bind, (net_test.IPV4_ADDR, 651))
    542     try:
    543       s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
    544       s.bind((net_test.IPV4_ADDR, 651))
    545     except IOError, e:
    546       if e.errno == errno.EACCES:
    547         pass  # We're not root. let it go for now.
    548 
    549   def testIPv6InvalidBind(self):
    550     s = net_test.IPv6PingSocket()
    551     self.assertRaisesErrno(errno.EINVAL,
    552                            s.bind, ("ff02::2", 1026))
    553 
    554     # Binding to an address we don't have only works with IPV6_TRANSPARENT.
    555     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    556                            s.bind, (net_test.IPV6_ADDR, 651))
    557     try:
    558       s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
    559       s.bind((net_test.IPV6_ADDR, 651))
    560     except IOError, e:
    561       if e.errno == errno.EACCES:
    562         pass  # We're not root. let it go for now.
    563 
    564   def testAfUnspecBind(self):
    565     # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
    566     s4 = net_test.IPv4PingSocket()
    567     sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
    568     sockaddr.family = AF_UNSPEC
    569     csocket.Bind(s4, sockaddr)
    570     self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
    571 
    572     # But not if the address is anything else.
    573     sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
    574     sockaddr.family = AF_UNSPEC
    575     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
    576 
    577     # This doesn't work for IPv6.
    578     s6 = net_test.IPv6PingSocket()
    579     sockaddr = csocket.Sockaddr(("::1", 58997))
    580     sockaddr.family = AF_UNSPEC
    581     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
    582 
    583   def testIPv6ScopedBind(self):
    584     # Can't bind to a link-local address without a scope ID.
    585     s = net_test.IPv6PingSocket()
    586     self.assertRaisesErrno(errno.EINVAL,
    587                            s.bind, (self.lladdr, 1026, 0, 0))
    588 
    589     # Binding to a link-local address with a scope ID works, and the scope ID is
    590     # returned by a subsequent getsockname. Interestingly, Python's getsockname
    591     # returns "fe80:1%foo", even though it does not understand it.
    592     expected = self.lladdr + "%" + self.ifname
    593     s.bind((self.lladdr, 4646, 0, self.ifindex))
    594     self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
    595 
    596     # Of course, for the above to work the address actually has to be configured
    597     # on the machine.
    598     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    599                            s.bind, ("fe80::f00", 1026, 0, 1))
    600 
    601     # Scope IDs on non-link-local addresses are silently ignored.
    602     s = net_test.IPv6PingSocket()
    603     s.bind(("::1", 1234, 0, 1))
    604     self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
    605 
    606   def testBindAffectsIdentifier(self):
    607     s = net_test.IPv6PingSocket()
    608     s.bind((self.globaladdr, 0xf976))
    609     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    610     self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
    611 
    612     s = net_test.IPv6PingSocket()
    613     s.bind((self.globaladdr, 0xace))
    614     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    615     self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
    616 
    617   def testLinkLocalAddress(self):
    618     s = net_test.IPv6PingSocket()
    619     # Sending to a link-local address with no scope fails with EINVAL.
    620     self.assertRaisesErrno(errno.EINVAL,
    621                            s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
    622     # Sending to link-local address with a scope succeeds. Note that Python
    623     # doesn't understand the "fe80::1%lo" format, even though it returns it.
    624     s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
    625     # No exceptions? Good.
    626 
    627   def testLinkLocalOif(self):
    628     """Checks that ping to link-local addresses works correctly.
    629 
    630     Relevant kernel commits:
    631       upstream net:
    632         5e45789 net: ipv6: Fix ping to link-local addresses.
    633     """
    634     for mode in ["oif", "ucast_oif", None]:
    635       s = net_test.IPv6PingSocket()
    636       for netid in self.NETIDS:
    637         s2 = net_test.IPv6PingSocket()
    638         dst = self._RouterAddress(netid, 6)
    639         self.assertTrue(dst.startswith("fe80:"))
    640 
    641         if mode:
    642           self.SelectInterface(s, netid, mode)
    643           self.SelectInterface(s2, netid, mode)
    644           scopeid = 0
    645         else:
    646           scopeid = self.ifindices[netid]
    647 
    648         if mode == "oif":
    649           # If SO_BINDTODEVICE has been set, any attempt to send on another
    650           # interface returns EINVAL.
    651           othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1)
    652                                    % len(self.NETIDS)]
    653           otherscopeid = self.ifindices[othernetid]
    654           self.assertRaisesErrno(
    655               errno.EINVAL,
    656               s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid))
    657           self.assertRaisesErrno(
    658               errno.EINVAL,
    659               s.connect, (dst, 55, 0, otherscopeid))
    660 
    661         # Try using both sendto and connect/send.
    662         # If we get a reply, we sent the packet out on the right interface.
    663         s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid))
    664         self.assertValidPingResponse(s, net_test.IPV6_PING)
    665 
    666         # IPV6_UNICAST_IF doesn't work on connected sockets.
    667         if mode != "ucast_oif":
    668           s2.connect((dst, 123, 0, scopeid))
    669           s2.send(net_test.IPV6_PING)
    670           self.assertValidPingResponse(s2, net_test.IPV6_PING)
    671 
    672   def testMappedAddressFails(self):
    673     s = net_test.IPv6PingSocket()
    674     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    675     self.assertValidPingResponse(s, net_test.IPV6_PING)
    676     s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
    677     self.assertValidPingResponse(s, net_test.IPV6_PING)
    678     self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
    679                            ("::ffff:192.0.2.1", 55))
    680 
    681   @unittest.skipUnless(False, "skipping: does not work yet")
    682   def testFlowLabel(self):
    683     s = net_test.IPv6PingSocket()
    684 
    685     # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
    686     # the flow label in the packet is not set.
    687     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
    688     self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
    689 
    690     # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
    691     # that is not registered with the flow manager should return EINVAL...
    692     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
    693     # ... but this doesn't work yet.
    694     if False:
    695       self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
    696                              (net_test.IPV6_ADDR, 93, 0xdead, 0))
    697 
    698     # After registering the flow label, it gets sent properly, appears in the
    699     # output packet, and is returned in the response.
    700     net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
    701     self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
    702                                      net_test.IPV6_FLOWINFO_SEND))
    703     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
    704     _, src = s.recvfrom(32768)
    705     _, _, flowlabel, _ = src
    706     self.assertEqual(0xdead, flowlabel & 0xfffff)
    707 
    708   def testIPv4Error(self):
    709     s = net_test.IPv4PingSocket()
    710     s.setsockopt(SOL_IP, IP_TTL, 2)
    711     s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
    712     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
    713     # We can't check the actual error because Python 2.7 doesn't implement
    714     # recvmsg, but we can at least check that the socket returns an error.
    715     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
    716 
    717   def testIPv6Error(self):
    718     s = net_test.IPv6PingSocket()
    719     s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
    720     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
    721     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    722     # We can't check the actual error because Python 2.7 doesn't implement
    723     # recvmsg, but we can at least check that the socket returns an error.
    724     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
    725 
    726   def testIPv6MulticastPing(self):
    727     s = net_test.IPv6PingSocket()
    728     # Send a multicast ping and check we get at least one duplicate.
    729     # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
    730     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
    731     s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
    732     self.assertValidPingResponse(s, net_test.IPV6_PING)
    733     self.assertValidPingResponse(s, net_test.IPV6_PING)
    734 
    735   def testIPv4LargePacket(self):
    736     s = net_test.IPv4PingSocket()
    737     data = net_test.IPV4_PING + 20000 * "a"
    738     s.sendto(data, ("127.0.0.1", 987))
    739     self.assertValidPingResponse(s, data)
    740 
    741   def testIPv6LargePacket(self):
    742     s = net_test.IPv6PingSocket()
    743     s.bind(("::", 0xace))
    744     data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
    745     s.sendto(data, ("::1", 953))
    746 
    747   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    748   def testIcmpSocketsNotInIcmp6(self):
    749     numrows = len(self.ReadProcNetSocket("icmp"))
    750     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    751     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
    752     s.bind(("127.0.0.1", 0xace))
    753     s.connect(("127.0.0.1", 0xbeef))
    754     self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
    755     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
    756 
    757   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    758   def testIcmp6SocketsNotInIcmp(self):
    759     numrows = len(self.ReadProcNetSocket("icmp"))
    760     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    761     s = net_test.IPv6PingSocket()
    762     s.bind(("::1", 0xace))
    763     s.connect(("::1", 0xbeef))
    764     self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
    765     self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
    766 
    767   def testProcNetIcmp(self):
    768     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
    769     s.bind(("127.0.0.1", 0xace))
    770     s.connect(("127.0.0.1", 0xbeef))
    771     self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
    772 
    773   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    774   def testProcNetIcmp6(self):
    775     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    776     s = net_test.IPv6PingSocket()
    777     s.bind(("::1", 0xace))
    778     s.connect(("::1", 0xbeef))
    779     self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
    780 
    781     # Check the row goes away when the socket is closed.
    782     s.close()
    783     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
    784 
    785     # Try send, bind and connect to check the addresses and the state.
    786     s = net_test.IPv6PingSocket()
    787     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
    788     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
    789     self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
    790 
    791     # Can't bind after sendto, apparently.
    792     s = net_test.IPv6PingSocket()
    793     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
    794     s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
    795     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
    796 
    797     # Check receive bytes.
    798     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
    799     s.connect(("ff02::1", 0xdead))
    800     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
    801     s.send(net_test.IPV6_PING)
    802     s.recvfrom(32768, MSG_PEEK)  # Wait until the receive thread replies.
    803     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
    804                            txmem=0, rxmem=0x300)
    805     self.assertValidPingResponse(s, net_test.IPV6_PING)
    806     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
    807                            txmem=0, rxmem=0)
    808 
    809   def testProcNetUdp6(self):
    810     s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
    811     s.bind(("::1", 0xace))
    812     s.connect(("::1", 0xbeef))
    813     self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
    814 
    815   def testProcNetRaw6(self):
    816     s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
    817     s.bind(("::1", 0xace))
    818     s.connect(("::1", 0xbeef))
    819     self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
    820 
    821   def testIPv6MTU(self):
    822     """Tests IPV6_RECVERR and path MTU discovery on ping sockets.
    823 
    824     Relevant kernel commits:
    825       upstream net-next:
    826         dcb94b8 ipv6: fix endianness error in icmpv6_err
    827     """
    828     s = net_test.IPv6PingSocket()
    829     s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1)
    830     s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2)
    831     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
    832     s.connect((net_test.IPV6_ADDR, 55))
    833     pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a"
    834     s.send(pkt)
    835     self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768)
    836     data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE)
    837 
    838     # Compare the offending packet with the one we sent. To do this we need to
    839     # calculate the ident of the packet we sent and blank out the checksum of
    840     # the one we received.
    841     ident = struct.pack("!H", s.getsockname()[1])
    842     pkt = pkt[:4] + ident + pkt[6:]
    843     data = data[:2] + "\x00\x00" + pkt[4:]
    844     self.assertEquals(pkt, data)
    845 
    846     # Check the address that the packet was sent to.
    847     # ... except in 4.1, where it just returns an AF_UNSPEC, like this:
    848     # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC,
    849     #     sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"},
    850     #     msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}],
    851     #     msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...},
    852     #     msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232
    853     if net_test.LINUX_VERSION != (4, 1, 0):
    854       self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr)
    855 
    856     # Check the cmsg data, including the link MTU.
    857     mtu = PingReplyThread.LINK_MTU
    858     src = self.reply_threads[self.netid].INTERMEDIATE_IPV6
    859     msglist = [
    860         (net_test.SOL_IPV6, net_test.IPV6_RECVERR,
    861          (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6,
    862                                    ICMPV6_PKT_TOOBIG, 0, mtu, 0)),
    863           csocket.Sockaddr((src, 0))))
    864     ]
    865 
    866     # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port.
    867     # The fix might have been in 676d236, but we don't have that in 3.10 and it
    868     # touches code all over the tree. Instead, just don't check the port.
    869     if net_test.LINUX_VERSION <= (3, 14, 0):
    870       msglist[0][2][1].port = cmsg[0][2][1].port
    871 
    872     self.assertEquals(msglist, cmsg)
    873 
    874 
    875 if __name__ == "__main__":
    876   unittest.main()
    877